free-claude-code/tests/test_open_router.py
Alishahryar1 e5a096049d feat: add OpenRouter support and configuration options
- Introduced OpenRouter as a new provider option in settings and environment configuration.
- Updated README.md to include instructions for using OpenRouter.
- Enhanced the message converter to support reasoning content for OpenRouter.
- Added tests for OpenRouter provider functionality and message conversion.
- Updated dependencies to include OpenRouterProvider.
2026-02-15 10:50:53 -08:00

179 lines
5.6 KiB
Python

"""Tests for OpenRouter provider."""
import pytest
import json
from unittest.mock import MagicMock, AsyncMock, patch
from providers.open_router import OpenRouterProvider
from providers.base import ProviderConfig
from config.nim import NimSettings
class MockMessage:
def __init__(self, role, content):
self.role = role
self.content = content
class MockRequest:
def __init__(self, **kwargs):
self.model = "stepfun/step-3.5-flash:free"
self.messages = [MockMessage("user", "Hello")]
self.max_tokens = 100
self.temperature = 0.5
self.top_p = 0.9
self.system = "System prompt"
self.stop_sequences = None
self.tools = []
self.extra_body = {}
self.thinking = MagicMock()
self.thinking.enabled = True
for k, v in kwargs.items():
setattr(self, k, v)
@pytest.fixture
def open_router_config():
return ProviderConfig(
api_key="test_openrouter_key",
base_url="https://openrouter.ai/api/v1",
rate_limit=10,
rate_window=60,
nim_settings=NimSettings(),
)
@pytest.fixture(autouse=True)
def mock_rate_limiter():
"""Mock the global rate limiter to prevent waiting."""
with patch("providers.open_router.client.GlobalRateLimiter") as mock:
instance = mock.get_instance.return_value
instance.wait_if_blocked = AsyncMock(return_value=False)
async def _passthrough(fn, *args, **kwargs):
return await fn(*args, **kwargs)
instance.execute_with_retry = AsyncMock(side_effect=_passthrough)
yield instance
@pytest.fixture
def open_router_provider(open_router_config):
return OpenRouterProvider(open_router_config)
def test_init(open_router_config):
"""Test provider initialization."""
with patch("providers.open_router.client.AsyncOpenAI") as mock_openai:
provider = OpenRouterProvider(open_router_config)
assert provider._api_key == "test_openrouter_key"
assert provider._base_url == "https://openrouter.ai/api/v1"
mock_openai.assert_called_once()
def test_build_request_body_has_reasoning_extra(open_router_provider):
"""Request body has extra_body.reasoning.enabled for thinking models."""
req = MockRequest()
body = open_router_provider._build_request_body(req)
assert body["model"] == "stepfun/step-3.5-flash:free"
assert body["temperature"] == 0.5
assert len(body["messages"]) == 2 # System + User
assert body["messages"][0]["role"] == "system"
assert body["messages"][0]["content"] == "System prompt"
assert "extra_body" in body
assert "reasoning" in body["extra_body"]
assert body["extra_body"]["reasoning"]["enabled"] is True
def test_build_request_body_base_url_and_model(open_router_provider):
"""Base URL and model are correct in provider config."""
assert open_router_provider._base_url == "https://openrouter.ai/api/v1"
req = MockRequest(model="stepfun/step-3.5-flash:free")
body = open_router_provider._build_request_body(req)
assert body["model"] == "stepfun/step-3.5-flash:free"
@pytest.mark.asyncio
async def test_stream_response_text(open_router_provider):
"""Test streaming text response."""
req = MockRequest()
mock_chunk1 = MagicMock()
mock_chunk1.choices = [
MagicMock(
delta=MagicMock(content="Hello", reasoning_content=None),
finish_reason=None,
)
]
mock_chunk1.usage = None
mock_chunk2 = MagicMock()
mock_chunk2.choices = [
MagicMock(
delta=MagicMock(content=" World", reasoning_content=None),
finish_reason="stop",
)
]
mock_chunk2.usage = MagicMock(completion_tokens=10)
async def mock_stream():
yield mock_chunk1
yield mock_chunk2
with patch.object(
open_router_provider._client.chat.completions, "create", new_callable=AsyncMock
) as mock_create:
mock_create.return_value = mock_stream()
events = []
async for event in open_router_provider.stream_response(req):
events.append(event)
assert len(events) > 0
assert "event: message_start" in events[0]
text_content = ""
for e in events:
if "event: content_block_delta" in e and '"text_delta"' in e:
for line in e.splitlines():
if line.startswith("data: "):
data = json.loads(line[6:])
if "delta" in data and "text" in data["delta"]:
text_content += data["delta"]["text"]
assert "Hello World" in text_content
@pytest.mark.asyncio
async def test_stream_response_reasoning_content(open_router_provider):
"""Test streaming with reasoning_content delta."""
req = MockRequest()
mock_chunk = MagicMock()
mock_chunk.choices = [
MagicMock(
delta=MagicMock(content=None, reasoning_content="Thinking..."),
finish_reason=None,
)
]
mock_chunk.usage = None
async def mock_stream():
yield mock_chunk
with patch.object(
open_router_provider._client.chat.completions, "create", new_callable=AsyncMock
) as mock_create:
mock_create.return_value = mock_stream()
events = []
async for event in open_router_provider.stream_response(req):
events.append(event)
found_thinking = False
for e in events:
if "event: content_block_delta" in e and '"thinking_delta"' in e:
if "Thinking..." in e:
found_thinking = True
assert found_thinking