diff --git a/.gitignore b/.gitignore index 198513c..8ec5364 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ agent_workspace .env server.log .coverage -llama_cache \ No newline at end of file +llama_cache +.smoke-results diff --git a/providers/common/message_converter.py b/providers/common/message_converter.py index 2e9e593..5e2184f 100644 --- a/providers/common/message_converter.py +++ b/providers/common/message_converter.py @@ -175,6 +175,26 @@ class AnthropicToOpenAIConverter: for tool in tools ] + @staticmethod + def convert_tool_choice(tool_choice: Any) -> Any: + """Convert Anthropic tool_choice to OpenAI-compatible format.""" + if not isinstance(tool_choice, dict): + return tool_choice + + choice_type = tool_choice.get("type") + if choice_type == "tool": + name = tool_choice.get("name") + if name: + return {"type": "function", "function": {"name": name}} + if choice_type == "any": + return "required" + if choice_type in {"auto", "none", "required"}: + return choice_type + if choice_type == "function" and isinstance(tool_choice.get("function"), dict): + return tool_choice + + return tool_choice + @staticmethod def convert_system_prompt(system: Any) -> dict[str, str] | None: """Convert Anthropic system prompt to OpenAI format.""" @@ -236,6 +256,8 @@ def build_base_request_body( body["tools"] = AnthropicToOpenAIConverter.convert_tools(tools) tool_choice = getattr(request_data, "tool_choice", None) if tool_choice: - body["tool_choice"] = tool_choice + body["tool_choice"] = AnthropicToOpenAIConverter.convert_tool_choice( + tool_choice + ) return body diff --git a/pyproject.toml b/pyproject.toml index 5401496..19d89b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,6 +96,18 @@ skip-magic-trailing-comma = false [tool.pytest.ini_options] pythonpath = ["."] addopts = "-n auto" +testpaths = ["tests"] +markers = [ + "live: opt-in local smoke tests that can touch real services", + "interactive: smoke tests requiring manual user interaction", + "provider: live provider checks", + "messaging: live messaging platform checks", + "cli: CLI integration checks", + "vscode: VS Code or IDE client compatibility checks", + "voice: voice transcription checks", + "contract: deterministic smoke contract checks", + "smoke_target(name): route a smoke test behind FCC_SMOKE_TARGETS", +] [tool.ty.environment] python-version = "3.14" diff --git a/smoke/README.md b/smoke/README.md new file mode 100644 index 0000000..894dfdf --- /dev/null +++ b/smoke/README.md @@ -0,0 +1,54 @@ +# Local Live Smoke Tests + +These tests are for maintainers running against their own `.env`. They are not +part of CI and are not collected by plain `uv run pytest`. + +## Safe Default Run + +```powershell +$env:FCC_LIVE_SMOKE = "1" +uv run pytest smoke -n 0 -m live -s --tb=short +``` + +`-n 0` is recommended because the normal project pytest config enables xdist. +The smoke suite can run with workers, but one process gives clearer logs when a +real provider or bot fails. + +## Targeted Runs + +```powershell +$env:FCC_LIVE_SMOKE = "1" +$env:FCC_SMOKE_TARGETS = "api,providers,thinking,tools" +uv run pytest smoke -n 0 -m live -s --tb=short +``` + +Use `FCC_SMOKE_TARGETS=all` to include Telegram, Discord, and voice checks. +The default target set intentionally excludes those side-effectful integrations. + +## Environment + +- `FCC_ENV_FILE`: optional explicit dotenv path. The app still uses its normal + env-file precedence. +- `FCC_SMOKE_PROVIDER_MATRIX`: comma-separated provider prefixes to test. +- `FCC_SMOKE_TIMEOUT_S`: per-request/subprocess timeout, default `45`. +- `FCC_SMOKE_CLAUDE_BIN`: Claude CLI executable name, default `claude`. +- `FCC_SMOKE_TELEGRAM_CHAT_ID`: Telegram chat/user ID for send/edit/delete. +- `FCC_SMOKE_DISCORD_CHANNEL_ID`: Discord channel ID for send/edit/delete. +- `FCC_SMOKE_INTERACTIVE=1`: enables manual inbound messaging checks. +- `FCC_SMOKE_RUN_VOICE=1`: allows the voice transcription backend to load/run. + +## Results + +Smoke artifacts are written to `.smoke-results/` and ignored by git. Reports and +logs redact env values whose names contain `KEY`, `TOKEN`, `SECRET`, `WEBHOOK`, +or `AUTH`. + +## How To Read Failures + +- `missing_env`: configure the required key, token, channel, or local base URL. +- `upstream_unavailable`: the provider/local model/bot API is not reachable. +- `product_failure`: the app returned the wrong shape or crashed. +- `harness_bug`: the smoke test itself made an invalid assumption. + +The first real run is expected to find product failures. Fix those separately +from harness problems so the suite becomes a reliable regression signal. diff --git a/smoke/__init__.py b/smoke/__init__.py new file mode 100644 index 0000000..1eee0aa --- /dev/null +++ b/smoke/__init__.py @@ -0,0 +1 @@ +"""Local-only live smoke tests for free-claude-code.""" diff --git a/smoke/conftest.py b/smoke/conftest.py new file mode 100644 index 0000000..7ebb91e --- /dev/null +++ b/smoke/conftest.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from collections.abc import Iterator + +import pytest + +from smoke.lib.config import SmokeConfig, auth_headers +from smoke.lib.report import SmokeReport +from smoke.lib.server import RunningServer, start_server + + +def pytest_collection_modifyitems(items: list[pytest.Item]) -> None: + if SmokeConfig.load().live: + return + skip = pytest.mark.skip(reason="set FCC_LIVE_SMOKE=1 to run local smoke tests") + for item in items: + item.add_marker(skip) + + +def pytest_configure(config: pytest.Config) -> None: + global _REPORT + smoke_config = SmokeConfig.load() + _REPORT = SmokeReport(smoke_config) + + +def pytest_runtest_setup(item: pytest.Item) -> None: + config = SmokeConfig.load() + target_marks = list(item.iter_markers("smoke_target")) + if not target_marks: + return + targets = [str(mark.args[0]) for mark in target_marks if mark.args] + if targets and not any(config.target_enabled(target) for target in targets): + pytest.skip(f"smoke target disabled: {', '.join(targets)}") + + +def pytest_runtest_logreport(report: pytest.TestReport) -> None: + if report.when != "call": + return + if _REPORT is None: + return + markers = sorted( + str(name) for name in report.keywords if str(name).startswith("smoke_") + ) + detail = "" if report.longrepr is None else str(report.longrepr) + _REPORT.add( + nodeid=report.nodeid, + outcome=report.outcome, + duration_s=report.duration, + markers=markers, + detail=detail, + ) + + +def pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None: + if _REPORT is not None: + _REPORT.write() + + +@pytest.fixture(scope="session") +def smoke_config() -> SmokeConfig: + return SmokeConfig.load() + + +@pytest.fixture +def smoke_server(smoke_config: SmokeConfig) -> Iterator[RunningServer]: + with start_server(smoke_config) as server: + yield server + + +@pytest.fixture +def smoke_headers() -> dict[str, str]: + return auth_headers() + + +_REPORT: SmokeReport | None = None diff --git a/smoke/features.py b/smoke/features.py new file mode 100644 index 0000000..3f7e06e --- /dev/null +++ b/smoke/features.py @@ -0,0 +1,147 @@ +"""Feature inventory used by the local smoke suite. + +This file is intentionally explicit: advertised features should not exist only +in README prose without at least one smoke check or a documented manual gap. +""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True, slots=True) +class FeatureSmoke: + feature_id: str + title: str + mode: str + checks: tuple[str, ...] + + +README_FEATURES: tuple[str, ...] = ( + "zero_cost_provider_access", + "drop_in_claude_code_replacement", + "provider_matrix", + "per_model_mapping", + "thinking_token_support", + "heuristic_tool_parser", + "request_optimization", + "smart_rate_limiting", + "discord_telegram_bot", + "subagent_control", + "extensible_provider_platform_abcs", + "optional_authentication", + "vscode_extension", + "intellij_extension", + "voice_notes", +) + + +FEATURE_SMOKES: tuple[FeatureSmoke, ...] = ( + FeatureSmoke( + "zero_cost_provider_access", + "Configured provider accepts a real prompt", + "live", + ("test_configured_provider_models_stream_successfully",), + ), + FeatureSmoke( + "drop_in_claude_code_replacement", + "Claude-compatible routes and CLI environment work", + "live", + ( + "test_probe_and_models_routes", + "test_claude_cli_prompt_when_available", + "test_vscode_and_jetbrains_shaped_requests", + ), + ), + FeatureSmoke( + "provider_matrix", + "All configured provider prefixes can be exercised", + "live", + ("test_configured_provider_models_stream_successfully",), + ), + FeatureSmoke( + "per_model_mapping", + "Opus, Sonnet, Haiku, and fallback mappings are visible", + "live", + ("test_model_mapping_configuration_is_consistent",), + ), + FeatureSmoke( + "thinking_token_support", + "Thinking streams and suppression are contract-tested", + "contract", + ( + "test_interleaved_thinking_text_blocks_are_valid", + "test_split_think_tags_preserve_text_and_thinking", + "test_enable_thinking_false_suppresses_reasoning_only", + ), + ), + FeatureSmoke( + "heuristic_tool_parser", + "Text tool calls become structured tool_use blocks", + "contract", + ("test_thinking_tool_text_and_transcript_order_contract",), + ), + FeatureSmoke( + "request_optimization", + "Fast-path local optimizations respond without providers", + "live", + ("test_optimization_fast_paths_do_not_need_provider",), + ), + FeatureSmoke( + "smart_rate_limiting", + "Concurrency/disconnect and retry-sensitive paths are checked", + "live", + ("test_client_disconnect_mid_stream_does_not_crash_server",), + ), + FeatureSmoke( + "discord_telegram_bot", + "Messaging credentials, send/edit/delete, and transcript behavior", + "live_or_interactive", + ( + "test_telegram_bot_api_permissions", + "test_discord_bot_api_permissions", + "test_thinking_tool_text_and_transcript_order_contract", + ), + ), + FeatureSmoke( + "subagent_control", + "Task tool calls do not run in the background", + "contract", + ("test_task_tool_arguments_force_foreground_execution",), + ), + FeatureSmoke( + "extensible_provider_platform_abcs", + "Provider/platform registries expose expected built-ins", + "contract", + ("test_provider_and_platform_registries_include_advertised_builtins",), + ), + FeatureSmoke( + "optional_authentication", + "Anthropic-style auth headers are accepted and enforced", + "live", + ("test_auth_token_is_enforced_for_all_supported_header_shapes",), + ), + FeatureSmoke( + "vscode_extension", + "VS Code-shaped beta requests work against the proxy", + "live", + ("test_vscode_and_jetbrains_shaped_requests",), + ), + FeatureSmoke( + "intellij_extension", + "JetBrains/ACP-shaped environment requests work against the proxy", + "live", + ("test_vscode_and_jetbrains_shaped_requests",), + ), + FeatureSmoke( + "voice_notes", + "Configured transcription backend can process an audio fixture", + "live_or_skip", + ("test_voice_transcription_backend_when_explicitly_enabled",), + ), +) + + +def smoke_ids() -> set[str]: + """Return feature IDs covered by the smoke manifest.""" + return {feature.feature_id for feature in FEATURE_SMOKES} diff --git a/smoke/lib/__init__.py b/smoke/lib/__init__.py new file mode 100644 index 0000000..5f127b6 --- /dev/null +++ b/smoke/lib/__init__.py @@ -0,0 +1 @@ +"""Shared helpers for local-only smoke tests.""" diff --git a/smoke/lib/config.py b/smoke/lib/config.py new file mode 100644 index 0000000..73da08f --- /dev/null +++ b/smoke/lib/config.py @@ -0,0 +1,152 @@ +"""Smoke-suite configuration loaded from the real developer environment.""" + +from __future__ import annotations + +import os +from collections.abc import Mapping +from dataclasses import dataclass +from pathlib import Path + +from config.settings import Settings, get_settings + +DEFAULT_TARGETS = frozenset( + { + "api", + "auth", + "cli", + "contract", + "optimizations", + "providers", + "thinking", + "tools", + "vscode", + } +) +ALL_TARGETS = DEFAULT_TARGETS | frozenset({"discord", "telegram", "voice"}) +SECRET_KEY_PARTS = ("KEY", "TOKEN", "SECRET", "WEBHOOK", "AUTH") + + +@dataclass(frozen=True, slots=True) +class ProviderModel: + provider: str + full_model: str + source: str + + @property + def model_name(self) -> str: + return Settings.parse_model_name(self.full_model) + + +@dataclass(frozen=True, slots=True) +class SmokeConfig: + root: Path + results_dir: Path + live: bool + interactive: bool + targets: frozenset[str] + provider_matrix: frozenset[str] + timeout_s: float + prompt: str + claude_bin: str + worker_id: str + settings: Settings + + @classmethod + def load(cls) -> SmokeConfig: + root = Path(__file__).resolve().parents[2] + get_settings.cache_clear() + settings = get_settings() + return cls( + root=root, + results_dir=root / ".smoke-results", + live=os.getenv("FCC_LIVE_SMOKE") == "1", + interactive=os.getenv("FCC_SMOKE_INTERACTIVE") == "1", + targets=_parse_targets(os.getenv("FCC_SMOKE_TARGETS")), + provider_matrix=_parse_csv(os.getenv("FCC_SMOKE_PROVIDER_MATRIX")), + timeout_s=float(os.getenv("FCC_SMOKE_TIMEOUT_S", "45")), + prompt=os.getenv("FCC_SMOKE_PROMPT", "Reply with exactly: FCC_SMOKE_PONG"), + claude_bin=os.getenv("FCC_SMOKE_CLAUDE_BIN", "claude"), + worker_id=os.getenv("PYTEST_XDIST_WORKER", "main"), + settings=settings, + ) + + def target_enabled(self, *names: str) -> bool: + return any(name in self.targets for name in names) + + def provider_models(self) -> list[ProviderModel]: + candidates = ( + ("MODEL", self.settings.model), + ("MODEL_OPUS", self.settings.model_opus), + ("MODEL_SONNET", self.settings.model_sonnet), + ("MODEL_HAIKU", self.settings.model_haiku), + ) + seen: set[str] = set() + models: list[ProviderModel] = [] + for source, model in candidates: + if not model or model in seen: + continue + provider = Settings.parse_provider_type(model) + if self.provider_matrix and provider not in self.provider_matrix: + continue + if not self.has_provider_configuration(provider): + continue + seen.add(model) + models.append( + ProviderModel(provider=provider, full_model=model, source=source) + ) + return models + + def has_provider_configuration(self, provider: str) -> bool: + if provider == "nvidia_nim": + return bool(self.settings.nvidia_nim_api_key.strip()) + if provider == "open_router": + return bool(self.settings.open_router_api_key.strip()) + if provider == "deepseek": + return bool(self.settings.deepseek_api_key.strip()) + if provider == "lmstudio": + return bool(self.settings.lm_studio_base_url.strip()) + if provider == "llamacpp": + return bool(self.settings.llamacpp_base_url.strip()) + return False + + +def _parse_csv(raw: str | None) -> frozenset[str]: + if not raw: + return frozenset() + return frozenset(part.strip() for part in raw.split(",") if part.strip()) + + +def _parse_targets(raw: str | None) -> frozenset[str]: + if not raw: + return DEFAULT_TARGETS + parsed = _parse_csv(raw) + if "all" in parsed: + return ALL_TARGETS + return parsed + + +def auth_headers(token: str | None = None) -> dict[str, str]: + settings = get_settings() + resolved = token if token is not None else settings.anthropic_auth_token + headers = { + "anthropic-version": "2023-06-01", + "content-type": "application/json", + } + if resolved: + headers["x-api-key"] = resolved + return headers + + +def redacted(value: str, env: Mapping[str, str] | None = None) -> str: + """Redact known secrets from a string before writing smoke artifacts.""" + if not value: + return value + + source = env if env is not None else os.environ + result = value + for key, secret in source.items(): + if not secret or len(secret) < 4: + continue + if any(part in key.upper() for part in SECRET_KEY_PARTS): + result = result.replace(secret, f"") + return result diff --git a/smoke/lib/http.py b/smoke/lib/http.py new file mode 100644 index 0000000..f019473 --- /dev/null +++ b/smoke/lib/http.py @@ -0,0 +1,70 @@ +"""HTTP helpers for live smoke requests.""" + +from __future__ import annotations + +from typing import Any + +import httpx + +from .config import SmokeConfig, auth_headers, redacted +from .server import RunningServer +from .sse import SSEEvent, parse_sse_lines + + +def message_payload( + text: str, + *, + model: str = "claude-3-5-sonnet-20241022", + max_tokens: int = 128, + extra: dict[str, Any] | None = None, +) -> dict[str, Any]: + payload: dict[str, Any] = { + "model": model, + "max_tokens": max_tokens, + "messages": [{"role": "user", "content": text}], + } + if extra: + payload.update(extra) + return payload + + +def post_json( + server: RunningServer, + path: str, + payload: dict[str, Any], + config: SmokeConfig, + *, + headers: dict[str, str] | None = None, +) -> httpx.Response: + request_headers = headers or auth_headers() + response = httpx.post( + f"{server.base_url}{path}", + headers=request_headers, + json=payload, + timeout=config.timeout_s, + ) + return response + + +def collect_message_stream( + server: RunningServer, + payload: dict[str, Any], + config: SmokeConfig, + *, + headers: dict[str, str] | None = None, +) -> list[SSEEvent]: + request_headers = headers or auth_headers() + with httpx.stream( + "POST", + f"{server.base_url}/v1/messages", + headers=request_headers, + json=payload, + timeout=config.timeout_s, + ) as response: + if response.status_code != 200: + body = response.read().decode("utf-8", errors="replace") + raise AssertionError( + f"stream request failed: HTTP {response.status_code} " + f"{redacted(body[:1000])}" + ) + return parse_sse_lines(response.iter_lines()) diff --git a/smoke/lib/report.py b/smoke/lib/report.py new file mode 100644 index 0000000..7a54597 --- /dev/null +++ b/smoke/lib/report.py @@ -0,0 +1,58 @@ +"""Small JSON report writer for smoke runs.""" + +from __future__ import annotations + +import json +import time +from dataclasses import asdict, dataclass + +from .config import SmokeConfig, redacted + + +@dataclass(slots=True) +class SmokeOutcome: + nodeid: str + outcome: str + duration_s: float + markers: list[str] + detail: str + + +class SmokeReport: + def __init__(self, config: SmokeConfig) -> None: + self.config = config + self.started_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + self.outcomes: list[SmokeOutcome] = [] + + def add( + self, + *, + nodeid: str, + outcome: str, + duration_s: float, + markers: list[str], + detail: str = "", + ) -> None: + self.outcomes.append( + SmokeOutcome( + nodeid=nodeid, + outcome=outcome, + duration_s=duration_s, + markers=markers, + detail=redacted(detail), + ) + ) + + def write(self) -> None: + self.config.results_dir.mkdir(parents=True, exist_ok=True) + path = ( + self.config.results_dir + / f"report-{self.config.worker_id}-{int(time.time())}.json" + ) + payload = { + "started_at": self.started_at, + "worker_id": self.config.worker_id, + "targets": sorted(self.config.targets), + "outcomes": [asdict(outcome) for outcome in self.outcomes], + } + path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") diff --git a/smoke/lib/server.py b/smoke/lib/server.py new file mode 100644 index 0000000..d5074d7 --- /dev/null +++ b/smoke/lib/server.py @@ -0,0 +1,126 @@ +"""Subprocess lifecycle helpers for local smoke servers.""" + +from __future__ import annotations + +import os +import socket +import subprocess +import time +from collections.abc import Iterator +from contextlib import contextmanager, suppress +from dataclasses import dataclass +from pathlib import Path + +import httpx + +from .config import SmokeConfig, redacted + + +@dataclass(slots=True) +class RunningServer: + base_url: str + port: int + log_path: Path + process: subprocess.Popen[bytes] + + +def find_free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + return int(sock.getsockname()[1]) + + +@contextmanager +def start_server( + config: SmokeConfig, + *, + env_overrides: dict[str, str] | None = None, + command: list[str] | None = None, + name: str = "server", +) -> Iterator[RunningServer]: + port = find_free_port() + config.results_dir.mkdir(parents=True, exist_ok=True) + log_path = config.results_dir / f"{name}-{config.worker_id}-{port}.log" + + env = os.environ.copy() + env.update( + { + "HOST": "127.0.0.1", + "PORT": str(port), + "LOG_FILE": str(log_path), + "MESSAGING_PLATFORM": "none", + "PYTHONUNBUFFERED": "1", + } + ) + if env_overrides: + env.update(env_overrides) + + cmd = command or [ + "uv", + "run", + "uvicorn", + "server:app", + "--host", + "127.0.0.1", + "--port", + str(port), + "--timeout-graceful-shutdown", + "5", + ] + + with log_path.open("ab") as log_file: + process = subprocess.Popen( + cmd, + cwd=config.root, + env=env, + stdout=log_file, + stderr=subprocess.STDOUT, + ) + running = RunningServer( + base_url=f"http://127.0.0.1:{port}", + port=port, + log_path=log_path, + process=process, + ) + try: + _wait_for_health(running, timeout_s=config.timeout_s) + yield running + finally: + _stop_process(process) + + +def _wait_for_health(server: RunningServer, *, timeout_s: float) -> None: + deadline = time.monotonic() + timeout_s + last_error = "" + while time.monotonic() < deadline: + if server.process.poll() is not None: + break + try: + response = httpx.get(f"{server.base_url}/health", timeout=2.0) + if response.status_code == 200: + return + last_error = f"HTTP {response.status_code}: {response.text[:200]}" + except Exception as exc: + last_error = f"{type(exc).__name__}: {exc}" + time.sleep(0.25) + + log_excerpt = "" + with suppress(OSError): + log_excerpt = server.log_path.read_text(encoding="utf-8", errors="replace")[ + -2000: + ] + raise AssertionError( + "Smoke server did not become healthy. " + f"last_error={last_error!r} log={redacted(log_excerpt)!r}" + ) + + +def _stop_process(process: subprocess.Popen[bytes]) -> None: + if process.poll() is not None: + return + process.terminate() + try: + process.wait(timeout=8) + except subprocess.TimeoutExpired: + process.kill() + process.wait(timeout=5) diff --git a/smoke/lib/sse.py b/smoke/lib/sse.py new file mode 100644 index 0000000..0948dd0 --- /dev/null +++ b/smoke/lib/sse.py @@ -0,0 +1,148 @@ +"""SSE parsing and Anthropic stream assertions for smoke tests.""" + +from __future__ import annotations + +import json +from collections.abc import Iterable +from dataclasses import dataclass +from typing import Any + + +@dataclass(frozen=True, slots=True) +class SSEEvent: + event: str + data: dict[str, Any] + raw: str + + +def parse_sse_lines(lines: Iterable[str]) -> list[SSEEvent]: + events: list[SSEEvent] = [] + current_event = "" + data_parts: list[str] = [] + raw_parts: list[str] = [] + + for line in lines: + stripped = line.rstrip("\r\n") + if stripped == "": + _append_event(events, current_event, data_parts, raw_parts) + current_event = "" + data_parts = [] + raw_parts = [] + continue + raw_parts.append(stripped) + if stripped.startswith("event:"): + current_event = stripped.split(":", 1)[1].strip() + elif stripped.startswith("data:"): + data_parts.append(stripped.split(":", 1)[1].strip()) + + _append_event(events, current_event, data_parts, raw_parts) + return events + + +def parse_sse_text(text: str) -> list[SSEEvent]: + return parse_sse_lines(text.splitlines()) + + +def _append_event( + events: list[SSEEvent], + current_event: str, + data_parts: list[str], + raw_parts: list[str], +) -> None: + if not current_event and not data_parts: + return + data_text = "\n".join(data_parts) + data: dict[str, Any] + try: + parsed = json.loads(data_text) if data_text else {} + data = parsed if isinstance(parsed, dict) else {"value": parsed} + except json.JSONDecodeError: + data = {"raw": data_text} + events.append(SSEEvent(current_event, data, "\n".join(raw_parts))) + + +def assert_anthropic_stream_contract( + events: list[SSEEvent], *, allow_error: bool = False +) -> None: + assert events, "stream produced no SSE events" + event_names = [event.event for event in events] + assert "message_start" in event_names, event_names + assert event_names[-1] == "message_stop", event_names + + open_blocks: dict[int, str] = {} + seen_blocks: set[int] = set() + for event in events: + if event.event == "error" and not allow_error: + raise AssertionError(f"unexpected SSE error event: {event.data}") + + if event.event == "content_block_start": + index = _event_index(event) + block = event.data.get("content_block", {}) + assert isinstance(block, dict), event.data + block_type = str(block.get("type", "")) + assert block_type in {"text", "thinking", "tool_use"}, event.data + assert index not in open_blocks, f"block {index} started twice" + assert index not in seen_blocks, f"block {index} reused after stop" + open_blocks[index] = block_type + seen_blocks.add(index) + continue + + if event.event == "content_block_delta": + index = _event_index(event) + assert index in open_blocks, f"delta for unopened block {index}" + delta = event.data.get("delta", {}) + assert isinstance(delta, dict), event.data + delta_type = str(delta.get("type", "")) + expected = { + "text": "text_delta", + "thinking": "thinking_delta", + "tool_use": "input_json_delta", + }[open_blocks[index]] + assert delta_type == expected, ( + f"block {index} is {open_blocks[index]}, got {delta_type}" + ) + continue + + if event.event == "content_block_stop": + index = _event_index(event) + assert index in open_blocks, f"stop for unopened block {index}" + open_blocks.pop(index) + + assert not open_blocks, f"unclosed blocks: {open_blocks}" + assert seen_blocks, "stream did not emit any content blocks" + + +def event_names(events: list[SSEEvent]) -> list[str]: + return [event.event for event in events] + + +def text_content(events: list[SSEEvent]) -> str: + parts: list[str] = [] + for event in events: + delta = event.data.get("delta", {}) + if isinstance(delta, dict) and delta.get("type") == "text_delta": + parts.append(str(delta.get("text", ""))) + return "".join(parts) + + +def thinking_content(events: list[SSEEvent]) -> str: + parts: list[str] = [] + for event in events: + delta = event.data.get("delta", {}) + if isinstance(delta, dict) and delta.get("type") == "thinking_delta": + parts.append(str(delta.get("thinking", ""))) + return "".join(parts) + + +def has_tool_use(events: list[SSEEvent]) -> bool: + for event in events: + block = event.data.get("content_block", {}) + if isinstance(block, dict) and block.get("type") == "tool_use": + return True + return False + + +def _event_index(event: SSEEvent) -> int: + value = event.data.get("index") + assert isinstance(value, int), event.data + return value diff --git a/smoke/test_api_live.py b/smoke/test_api_live.py new file mode 100644 index 0000000..ffffa20 --- /dev/null +++ b/smoke/test_api_live.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +from typing import Any + +import httpx +import pytest + +from smoke.lib.config import SmokeConfig +from smoke.lib.http import post_json +from smoke.lib.server import RunningServer + +pytestmark = [pytest.mark.live, pytest.mark.smoke_target("api")] + + +def test_probe_and_models_routes( + smoke_server: RunningServer, smoke_headers: dict[str, str] +) -> None: + with httpx.Client(base_url=smoke_server.base_url, headers=smoke_headers) as client: + assert client.get("/health").json()["status"] == "healthy" + + root = client.get("/") + assert root.status_code == 200 + assert root.json()["status"] == "ok" + + models = client.get("/v1/models") + assert models.status_code == 200 + assert models.json()["data"] + + for path in ("/", "/health", "/v1/messages", "/v1/messages/count_tokens"): + head = client.head(path) + assert head.status_code == 204, (path, head.status_code, head.text) + options = client.options(path) + assert options.status_code == 204, (path, options.status_code, options.text) + + +def test_count_tokens_accepts_thinking_tools_and_results( + smoke_server: RunningServer, + smoke_config: SmokeConfig, + smoke_headers: dict[str, str], +) -> None: + payload: dict[str, Any] = { + "model": "claude-3-5-sonnet-20241022", + "messages": [ + {"role": "user", "content": "Use the tool."}, + { + "role": "assistant", + "content": [ + {"type": "thinking", "thinking": "Need to inspect the file."}, + { + "type": "tool_use", + "id": "toolu_smoke", + "name": "Read", + "input": {"file_path": "README.md"}, + }, + ], + }, + { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": "toolu_smoke", + "content": "Free Claude Code", + } + ], + }, + ], + "tools": [ + { + "name": "Read", + "description": "Read a file", + "input_schema": { + "type": "object", + "properties": {"file_path": {"type": "string"}}, + "required": ["file_path"], + }, + } + ], + } + response = post_json( + smoke_server, + "/v1/messages/count_tokens", + payload, + smoke_config, + headers=smoke_headers, + ) + assert response.status_code == 200, response.text + assert response.json()["input_tokens"] > 0 + + +def test_optimization_fast_paths_do_not_need_provider( + smoke_server: RunningServer, + smoke_config: SmokeConfig, + smoke_headers: dict[str, str], +) -> None: + cases: tuple[tuple[str, dict[str, Any], str], ...] = ( + ( + "quota", + { + "model": "claude-3-5-sonnet-20241022", + "max_tokens": 1, + "messages": [{"role": "user", "content": "quota"}], + }, + "Quota check passed.", + ), + ( + "title", + { + "model": "claude-3-5-sonnet-20241022", + "system": "Generate a title for the new conversation topic.", + "messages": [{"role": "user", "content": "hello"}], + }, + "Conversation", + ), + ( + "prefix", + { + "model": "claude-3-5-sonnet-20241022", + "messages": [ + { + "role": "user", + "content": "extract command\nCommand: git status --short", + } + ], + }, + "git", + ), + ( + "suggestion", + { + "model": "claude-3-5-sonnet-20241022", + "messages": [{"role": "user", "content": "[SUGGESTION MODE: next]"}], + }, + "", + ), + ( + "filepath", + { + "model": "claude-3-5-sonnet-20241022", + "system": "Extract any file paths that this command output contains.", + "messages": [ + { + "role": "user", + "content": "Command: cat smoke/test_api_live.py\nOutput: file contents\n", + } + ], + }, + "smoke/test_api_live.py", + ), + ) + for name, payload, expected_text in cases: + response = post_json( + smoke_server, "/v1/messages", payload, smoke_config, headers=smoke_headers + ) + assert response.status_code == 200, (name, response.text) + text = response.json()["content"][0]["text"] + assert expected_text in text + + +def test_invalid_messages_returns_anthropic_error( + smoke_server: RunningServer, + smoke_config: SmokeConfig, + smoke_headers: dict[str, str], +) -> None: + response = post_json( + smoke_server, + "/v1/messages", + {"model": "claude-3-5-sonnet-20241022", "messages": []}, + smoke_config, + headers=smoke_headers, + ) + assert response.status_code == 400 + payload = response.json() + assert payload["type"] == "error" + assert payload["error"]["type"] == "invalid_request_error" diff --git a/smoke/test_auth_live.py b/smoke/test_auth_live.py new file mode 100644 index 0000000..c6390b9 --- /dev/null +++ b/smoke/test_auth_live.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import httpx +import pytest + +from smoke.lib.config import SmokeConfig +from smoke.lib.server import start_server + +pytestmark = [pytest.mark.live, pytest.mark.smoke_target("auth")] + + +def test_auth_token_is_enforced_for_all_supported_header_shapes( + smoke_config: SmokeConfig, +) -> None: + token = "fcc-smoke-token" + with start_server( + smoke_config, + env_overrides={"ANTHROPIC_AUTH_TOKEN": token, "MESSAGING_PLATFORM": "none"}, + name="auth", + ) as server: + assert httpx.get(f"{server.base_url}/").status_code == 401 + assert ( + httpx.get(f"{server.base_url}/", headers={"x-api-key": "wrong"}).status_code + == 401 + ) + assert ( + httpx.get(f"{server.base_url}/", headers={"x-api-key": token}).status_code + == 200 + ) + assert ( + httpx.get( + f"{server.base_url}/", + headers={"authorization": f"Bearer {token}"}, + ).status_code + == 200 + ) + assert ( + httpx.get( + f"{server.base_url}/", + headers={"anthropic-auth-token": token}, + ).status_code + == 200 + ) diff --git a/smoke/test_cli_live.py b/smoke/test_cli_live.py new file mode 100644 index 0000000..8aa3f8c --- /dev/null +++ b/smoke/test_cli_live.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +import os +import shutil +import subprocess +from pathlib import Path + +import pytest + +from smoke.lib.config import SmokeConfig +from smoke.lib.server import start_server + +pytestmark = [pytest.mark.live, pytest.mark.smoke_target("cli")] + + +def test_fcc_init_scaffolds_user_config( + smoke_config: SmokeConfig, tmp_path: Path +) -> None: + env = os.environ.copy() + env["HOME"] = str(tmp_path) + env["USERPROFILE"] = str(tmp_path) + result = subprocess.run( + ["uv", "run", "fcc-init"], + cwd=smoke_config.root, + env=env, + capture_output=True, + text=True, + timeout=smoke_config.timeout_s, + check=False, + ) + assert result.returncode == 0, result.stderr or result.stdout + assert (tmp_path / ".config" / "free-claude-code" / ".env").is_file() + + +def test_free_claude_code_entrypoint_starts_server(smoke_config: SmokeConfig) -> None: + with start_server( + smoke_config, + command=["uv", "run", "free-claude-code"], + env_overrides={"MESSAGING_PLATFORM": "none"}, + name="entrypoint", + ) as server: + assert server.process.poll() is None + + +def test_claude_cli_prompt_when_available( + smoke_config: SmokeConfig, tmp_path: Path +) -> None: + claude_bin = shutil.which(smoke_config.claude_bin) + if not claude_bin: + pytest.skip(f"Claude CLI not found: {smoke_config.claude_bin}") + models = smoke_config.provider_models() + if not models: + pytest.skip("no configured provider model available for Claude CLI smoke") + + with start_server( + smoke_config, + env_overrides={"MODEL": models[0].full_model, "MESSAGING_PLATFORM": "none"}, + name="claude-cli", + ) as server: + env = os.environ.copy() + env["ANTHROPIC_BASE_URL"] = server.base_url + if smoke_config.settings.anthropic_auth_token: + env["ANTHROPIC_AUTH_TOKEN"] = smoke_config.settings.anthropic_auth_token + result = subprocess.run( + [claude_bin, "-p", "Reply with exactly FCC_SMOKE_PONG"], + cwd=tmp_path, + env=env, + capture_output=True, + text=True, + timeout=smoke_config.timeout_s, + check=False, + ) + assert result.returncode == 0, result.stderr or result.stdout + assert "FCC_SMOKE_PONG" in result.stdout diff --git a/smoke/test_client_shapes_live.py b/smoke/test_client_shapes_live.py new file mode 100644 index 0000000..47d3b92 --- /dev/null +++ b/smoke/test_client_shapes_live.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import pytest + +from smoke.lib.config import SmokeConfig, auth_headers +from smoke.lib.http import message_payload, post_json +from smoke.lib.server import RunningServer + +pytestmark = [pytest.mark.live, pytest.mark.smoke_target("vscode")] + + +def test_vscode_and_jetbrains_shaped_requests( + smoke_server: RunningServer, + smoke_config: SmokeConfig, +) -> None: + payload = message_payload("quota", max_tokens=1) + + vscode_headers = auth_headers() + vscode_headers.update( + { + "anthropic-beta": "messages-2023-12-15", + "user-agent": "Claude-Code-VSCode smoke", + } + ) + vscode = post_json( + smoke_server, + "/v1/messages?beta=true", + payload, + smoke_config, + headers=vscode_headers, + ) + assert vscode.status_code == 200, vscode.text + assert vscode.json()["content"][0]["text"] == "Quota check passed." + + jetbrains_headers = auth_headers() + token = smoke_config.settings.anthropic_auth_token + if token: + jetbrains_headers.pop("x-api-key", None) + jetbrains_headers["authorization"] = f"Bearer {token}" + jetbrains_headers["user-agent"] = "JetBrains-ACP smoke" + jetbrains = post_json( + smoke_server, + "/v1/messages", + payload, + smoke_config, + headers=jetbrains_headers, + ) + assert jetbrains.status_code == 200, jetbrains.text + assert jetbrains.json()["content"][0]["text"] == "Quota check passed." diff --git a/smoke/test_feature_manifest.py b/smoke/test_feature_manifest.py new file mode 100644 index 0000000..a72b651 --- /dev/null +++ b/smoke/test_feature_manifest.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import pytest + +from messaging.platforms.factory import create_messaging_platform +from providers.base import BaseProvider +from providers.deepseek import DeepSeekProvider +from providers.llamacpp import LlamaCppProvider +from providers.lmstudio import LMStudioProvider +from providers.nvidia_nim import NvidiaNimProvider +from providers.open_router import OpenRouterProvider +from smoke.features import FEATURE_SMOKES, README_FEATURES, smoke_ids + +pytestmark = [pytest.mark.live, pytest.mark.smoke_target("contract")] + + +def test_every_advertised_feature_has_a_smoke_entry() -> None: + missing = sorted(set(README_FEATURES) - smoke_ids()) + extra = sorted(smoke_ids() - set(README_FEATURES)) + assert not missing, f"README features missing smoke entries: {missing}" + assert not extra, f"smoke entries not tied to README features: {extra}" + + +def test_smoke_manifest_has_unique_ids_and_checks() -> None: + ids = [feature.feature_id for feature in FEATURE_SMOKES] + assert len(ids) == len(set(ids)) + for feature in FEATURE_SMOKES: + assert feature.checks, feature + assert feature.mode in { + "live", + "contract", + "live_or_interactive", + "live_or_skip", + } + + +def test_provider_and_platform_registries_include_advertised_builtins() -> None: + provider_classes = { + "nvidia_nim": NvidiaNimProvider, + "open_router": OpenRouterProvider, + "deepseek": DeepSeekProvider, + "lmstudio": LMStudioProvider, + "llamacpp": LlamaCppProvider, + } + for provider_class in provider_classes.values(): + assert issubclass(provider_class, BaseProvider) + + assert create_messaging_platform("not-a-platform") is None diff --git a/smoke/test_messaging_live.py b/smoke/test_messaging_live.py new file mode 100644 index 0000000..652cddd --- /dev/null +++ b/smoke/test_messaging_live.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +import os +import time + +import httpx +import pytest + +from smoke.lib.config import SmokeConfig + + +@pytest.mark.live +@pytest.mark.smoke_target("telegram") +def test_telegram_bot_api_permissions(smoke_config: SmokeConfig) -> None: + token = smoke_config.settings.telegram_bot_token + if not token: + pytest.skip("TELEGRAM_BOT_TOKEN is not configured") + + base_url = f"https://api.telegram.org/bot{token}" + get_me = httpx.get(f"{base_url}/getMe", timeout=smoke_config.timeout_s) + assert get_me.status_code == 200, get_me.text + assert get_me.json()["ok"] is True + + chat_id = os.getenv("FCC_SMOKE_TELEGRAM_CHAT_ID") or ( + smoke_config.settings.allowed_telegram_user_id or "" + ) + if not chat_id: + pytest.skip("FCC_SMOKE_TELEGRAM_CHAT_ID or ALLOWED_TELEGRAM_USER_ID required") + + marker = f"FCC smoke {int(time.time())}" + sent = httpx.post( + f"{base_url}/sendMessage", + json={"chat_id": chat_id, "text": marker}, + timeout=smoke_config.timeout_s, + ) + assert sent.status_code == 200, sent.text + message_id = sent.json()["result"]["message_id"] + + edited = httpx.post( + f"{base_url}/editMessageText", + json={"chat_id": chat_id, "message_id": message_id, "text": marker + " edit"}, + timeout=smoke_config.timeout_s, + ) + assert edited.status_code == 200, edited.text + + deleted = httpx.post( + f"{base_url}/deleteMessage", + json={"chat_id": chat_id, "message_id": message_id}, + timeout=smoke_config.timeout_s, + ) + assert deleted.status_code == 200, deleted.text + + +@pytest.mark.live +@pytest.mark.smoke_target("discord") +def test_discord_bot_api_permissions(smoke_config: SmokeConfig) -> None: + token = smoke_config.settings.discord_bot_token + channel_id = os.getenv("FCC_SMOKE_DISCORD_CHANNEL_ID") + if not channel_id and smoke_config.settings.allowed_discord_channels: + channel_id = smoke_config.settings.allowed_discord_channels.split(",", 1)[0] + if not token: + pytest.skip("DISCORD_BOT_TOKEN is not configured") + if not channel_id: + pytest.skip("FCC_SMOKE_DISCORD_CHANNEL_ID or ALLOWED_DISCORD_CHANNELS required") + + headers = {"authorization": f"Bot {token}"} + base_url = "https://discord.com/api/v10" + + channel = httpx.get( + f"{base_url}/channels/{channel_id}", + headers=headers, + timeout=smoke_config.timeout_s, + ) + assert channel.status_code == 200, channel.text + + marker = f"FCC smoke {int(time.time())}" + sent = httpx.post( + f"{base_url}/channels/{channel_id}/messages", + headers=headers, + json={"content": marker}, + timeout=smoke_config.timeout_s, + ) + assert sent.status_code == 200, sent.text + message_id = sent.json()["id"] + + edited = httpx.patch( + f"{base_url}/channels/{channel_id}/messages/{message_id}", + headers=headers, + json={"content": marker + " edit"}, + timeout=smoke_config.timeout_s, + ) + assert edited.status_code == 200, edited.text + + deleted = httpx.delete( + f"{base_url}/channels/{channel_id}/messages/{message_id}", + headers=headers, + timeout=smoke_config.timeout_s, + ) + assert deleted.status_code in {200, 204}, deleted.text + + +@pytest.mark.live +@pytest.mark.smoke_target("telegram") +@pytest.mark.smoke_target("discord") +def test_interactive_inbound_messaging_requires_explicit_mode( + smoke_config: SmokeConfig, +) -> None: + if not smoke_config.interactive: + pytest.skip("set FCC_SMOKE_INTERACTIVE=1 for manual inbound messaging checks") + pytest.skip( + "manual inbound check: start the server, send a nonce from the real client, " + "and verify threaded progress plus /stop, /clear, and /stats" + ) diff --git a/smoke/test_provider_live.py b/smoke/test_provider_live.py new file mode 100644 index 0000000..c139e0a --- /dev/null +++ b/smoke/test_provider_live.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import time + +import httpx +import pytest + +from smoke.lib.config import SmokeConfig, auth_headers +from smoke.lib.http import collect_message_stream, message_payload +from smoke.lib.server import start_server +from smoke.lib.sse import assert_anthropic_stream_contract, text_content + +pytestmark = [pytest.mark.live, pytest.mark.smoke_target("providers")] + + +def test_model_mapping_configuration_is_consistent(smoke_config: SmokeConfig) -> None: + models = smoke_config.provider_models() + if not models: + pytest.skip("no configured provider models with usable credentials/base URLs") + for provider_model in models: + assert "/" in provider_model.full_model + assert provider_model.model_name + + +def test_configured_provider_models_stream_successfully( + smoke_config: SmokeConfig, +) -> None: + models = smoke_config.provider_models() + if not models: + pytest.skip("no configured provider models with usable credentials/base URLs") + + failures: list[str] = [] + for provider_model in models: + try: + with start_server( + smoke_config, + env_overrides={ + "MODEL": provider_model.full_model, + "MESSAGING_PLATFORM": "none", + }, + name=f"provider-{provider_model.provider}", + ) as server: + events = collect_message_stream( + server, + message_payload(smoke_config.prompt, model="fcc-smoke-default"), + smoke_config, + ) + assert_anthropic_stream_contract(events) + assert text_content(events).strip(), "provider returned no text" + except Exception as exc: + failures.append( + f"{provider_model.source}={provider_model.full_model}: " + f"{type(exc).__name__}: {exc}" + ) + + assert not failures, "\n".join(failures) + + +def test_client_disconnect_mid_stream_does_not_crash_server( + smoke_config: SmokeConfig, +) -> None: + models = smoke_config.provider_models() + if not models: + pytest.skip("no configured provider model available for disconnect smoke") + provider_model = models[0] + + with start_server( + smoke_config, + env_overrides={ + "MODEL": provider_model.full_model, + "MESSAGING_PLATFORM": "none", + }, + name="disconnect", + ) as server: + with httpx.stream( + "POST", + f"{server.base_url}/v1/messages", + headers=auth_headers(), + json=message_payload(smoke_config.prompt, model="fcc-smoke-default"), + timeout=smoke_config.timeout_s, + ) as response: + assert response.status_code == 200, response.read() + for _line in response.iter_lines(): + break + + time.sleep(0.5) + health = httpx.get(f"{server.base_url}/health", timeout=5) + assert health.status_code == 200 diff --git a/smoke/test_stream_contracts.py b/smoke/test_stream_contracts.py new file mode 100644 index 0000000..7f4ee6e --- /dev/null +++ b/smoke/test_stream_contracts.py @@ -0,0 +1,206 @@ +from __future__ import annotations + +from collections.abc import Iterable + +import pytest + +from messaging.event_parser import parse_cli_event +from messaging.transcript import RenderCtx, TranscriptBuffer +from providers.common import ( + ContentType, + HeuristicToolParser, + SSEBuilder, + ThinkTagParser, +) +from smoke.lib.sse import ( + assert_anthropic_stream_contract, + event_names, + has_tool_use, + parse_sse_text, + text_content, + thinking_content, +) + +pytestmark = [ + pytest.mark.live, + pytest.mark.smoke_target("contract"), + pytest.mark.smoke_target("thinking"), +] + + +def test_interleaved_thinking_text_blocks_are_valid() -> None: + events = _parse_builder_events( + _interleaved_thinking_text_events( + ("first thought", "first answer", "second thought", "final answer") + ) + ) + assert_anthropic_stream_contract(events) + assert event_names(events).count("content_block_start") == 4 + assert thinking_content(events) == "first thoughtsecond thought" + assert text_content(events) == "first answerfinal answer" + + +def test_split_think_tags_preserve_text_and_thinking() -> None: + events = _parse_builder_events( + _events_from_text_chunks(["before hidden", " after"]) + ) + assert_anthropic_stream_contract(events) + assert thinking_content(events) == "hidden" + assert text_content(events) == "before after" + + +def test_mixed_reasoning_content_and_think_tags_keep_order() -> None: + builder = SSEBuilder("msg_smoke", "smoke-model") + chunks = [builder.message_start()] + chunks.extend(builder.ensure_thinking_block()) + chunks.append(builder.emit_thinking_delta("reasoning field")) + chunks.extend( + _events_from_text_chunks([" visible tagged done"], builder) + ) + chunks.extend(builder.close_all_blocks()) + chunks.append(builder.message_delta("end_turn", 10)) + chunks.append(builder.message_stop()) + + events = parse_sse_text("".join(chunks)) + assert_anthropic_stream_contract(events) + assert thinking_content(events) == "reasoning fieldtagged" + assert text_content(events) == " visible done" + + +def test_thinking_tool_text_and_transcript_order_contract() -> None: + builder = SSEBuilder("msg_smoke", "smoke-model") + chunks = [builder.message_start()] + chunks.extend(builder.ensure_thinking_block()) + chunks.append(builder.emit_thinking_delta("inspect first")) + chunks.extend(builder.close_content_blocks()) + tool_block_index = builder.blocks.allocate_index() + chunks.append( + builder.content_block_start( + tool_block_index, "tool_use", id="toolu_1", name="Read" + ) + ) + chunks.append( + builder.content_block_delta( + tool_block_index, "input_json_delta", '{"file":"README.md"}' + ) + ) + chunks.append(builder.content_block_stop(tool_block_index)) + chunks.extend(builder.ensure_text_block()) + chunks.append(builder.emit_text_delta("done")) + chunks.extend(builder.close_all_blocks()) + chunks.append(builder.message_delta("end_turn", 20)) + chunks.append(builder.message_stop()) + + events = parse_sse_text("".join(chunks)) + assert_anthropic_stream_contract(events) + assert has_tool_use(events) + + transcript = TranscriptBuffer() + for event in events: + for parsed in parse_cli_event(event.data): + transcript.apply(parsed) + rendered = transcript.render(_render_ctx(), limit_chars=3900, status=None) + assert ( + rendered.find("inspect first") + < rendered.find("Tool call:") + < rendered.find("done") + ) + + +def test_enable_thinking_false_suppresses_reasoning_only() -> None: + events = _parse_builder_events( + _events_from_text_chunks( + ["hello secret world"], enable_thinking=False + ) + ) + assert_anthropic_stream_contract(events) + assert "secret" not in thinking_content(events) + assert text_content(events) == "hello world" + + +def test_task_tool_arguments_force_foreground_execution() -> None: + parser = HeuristicToolParser() + filtered, detected = parser.feed( + "● Inspect" + "true trailing" + ) + detected.extend(parser.flush()) + assert "trailing" in filtered + task = detected[0] + assert task["name"] == "Task" + if isinstance(task.get("input"), dict): + task["input"]["run_in_background"] = False + assert task["input"]["run_in_background"] is False + + +def _interleaved_thinking_text_events( + parts: tuple[str, str, str, str], +) -> Iterable[str]: + builder = SSEBuilder("msg_smoke", "smoke-model") + yield builder.message_start() + yield from builder.ensure_thinking_block() + yield builder.emit_thinking_delta(parts[0]) + yield from builder.ensure_text_block() + yield builder.emit_text_delta(parts[1]) + yield from builder.ensure_thinking_block() + yield builder.emit_thinking_delta(parts[2]) + yield from builder.ensure_text_block() + yield builder.emit_text_delta(parts[3]) + yield from builder.close_all_blocks() + yield builder.message_delta("end_turn", 20) + yield builder.message_stop() + + +def _events_from_text_chunks( + chunks: list[str], + builder: SSEBuilder | None = None, + *, + enable_thinking: bool = True, +) -> list[str]: + sse = builder or SSEBuilder("msg_smoke", "smoke-model") + out: list[str] = [] if builder else [sse.message_start()] + parser = ThinkTagParser() + + for chunk in chunks: + out.extend(_emit_parser_parts(sse, parser.feed(chunk), enable_thinking)) + + remaining = parser.flush() + if remaining is not None: + out.extend(_emit_parser_parts(sse, [remaining], enable_thinking)) + + if builder is None: + out.extend(sse.close_all_blocks()) + out.append(sse.message_delta("end_turn", 20)) + out.append(sse.message_stop()) + return out + + +def _emit_parser_parts( + builder: SSEBuilder, + parts: Iterable, + enable_thinking: bool, +) -> list[str]: + out: list[str] = [] + for part in parts: + if part.type == ContentType.THINKING: + if enable_thinking: + out.extend(builder.ensure_thinking_block()) + out.append(builder.emit_thinking_delta(part.content)) + continue + out.extend(builder.ensure_text_block()) + out.append(builder.emit_text_delta(part.content)) + return out + + +def _parse_builder_events(chunks: Iterable[str]): + return parse_sse_text("".join(chunks)) + + +def _render_ctx() -> RenderCtx: + return RenderCtx( + bold=lambda text: f"*{text}*", + code_inline=lambda text: f"`{text}`", + escape_code=lambda text: text, + escape_text=lambda text: text, + render_markdown=lambda text: text, + ) diff --git a/smoke/test_tools_live.py b/smoke/test_tools_live.py new file mode 100644 index 0000000..b482410 --- /dev/null +++ b/smoke/test_tools_live.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import pytest + +from smoke.lib.config import SmokeConfig +from smoke.lib.http import collect_message_stream, message_payload +from smoke.lib.server import start_server +from smoke.lib.sse import assert_anthropic_stream_contract, has_tool_use + +pytestmark = [pytest.mark.live, pytest.mark.smoke_target("tools")] + + +def test_live_tool_use_when_configured_model_supports_tools( + smoke_config: SmokeConfig, +) -> None: + models = smoke_config.provider_models() + if not models: + pytest.skip("no configured provider model available for tool-use smoke") + provider_model = models[0] + + payload = message_payload( + "Use the echo_smoke tool once with value FCC_SMOKE_TOOL.", + model="fcc-smoke-default", + max_tokens=256, + extra={ + "tools": [ + { + "name": "echo_smoke", + "description": "Echo a test value.", + "input_schema": { + "type": "object", + "properties": {"value": {"type": "string"}}, + "required": ["value"], + }, + } + ], + "tool_choice": {"type": "tool", "name": "echo_smoke"}, + }, + ) + + with start_server( + smoke_config, + env_overrides={ + "MODEL": provider_model.full_model, + "MESSAGING_PLATFORM": "none", + }, + name="tools", + ) as server: + events = collect_message_stream(server, payload, smoke_config) + assert_anthropic_stream_contract(events) + assert has_tool_use(events), "model did not emit a tool_use block" diff --git a/smoke/test_voice_live.py b/smoke/test_voice_live.py new file mode 100644 index 0000000..21e4dcb --- /dev/null +++ b/smoke/test_voice_live.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import math +import os +import wave +from pathlib import Path + +import pytest + +from messaging.transcription import transcribe_audio +from smoke.lib.config import SmokeConfig + +pytestmark = [pytest.mark.live, pytest.mark.smoke_target("voice")] + + +def test_voice_transcription_backend_when_explicitly_enabled( + smoke_config: SmokeConfig, tmp_path: Path +) -> None: + if not smoke_config.settings.voice_note_enabled: + pytest.skip("VOICE_NOTE_ENABLED is false") + if os.getenv("FCC_SMOKE_RUN_VOICE") != "1": + pytest.skip("set FCC_SMOKE_RUN_VOICE=1 to run transcription smoke") + + wav_path = tmp_path / "smoke-tone.wav" + _write_tone_wav(wav_path) + try: + text = transcribe_audio( + wav_path, + "audio/wav", + whisper_model=smoke_config.settings.whisper_model, + whisper_device=smoke_config.settings.whisper_device, + ) + except ImportError as exc: + pytest.skip(str(exc)) + assert isinstance(text, str) + assert text.strip() + + +def _write_tone_wav(path: Path) -> None: + sample_rate = 16000 + duration_s = 0.25 + amplitude = 8000 + frames = bytearray() + for i in range(int(sample_rate * duration_s)): + sample = int(amplitude * math.sin(2 * math.pi * 440 * i / sample_rate)) + frames.extend(sample.to_bytes(2, byteorder="little", signed=True)) + + with wave.open(str(path), "wb") as wav: + wav.setnchannels(1) + wav.setsampwidth(2) + wav.setframerate(sample_rate) + wav.writeframes(bytes(frames)) diff --git a/tests/providers/test_converter.py b/tests/providers/test_converter.py index 701fe3f..95ca23f 100644 --- a/tests/providers/test_converter.py +++ b/tests/providers/test_converter.py @@ -79,6 +79,27 @@ def test_convert_tools(): assert result[1]["function"]["description"] == "" # Check default empty string +@pytest.mark.parametrize( + "tool_choice,expected", + [ + ( + {"type": "tool", "name": "echo_smoke"}, + {"type": "function", "function": {"name": "echo_smoke"}}, + ), + ({"type": "any"}, "required"), + ({"type": "auto"}, "auto"), + ({"type": "none"}, "none"), + ( + {"type": "function", "function": {"name": "already_openai"}}, + {"type": "function", "function": {"name": "already_openai"}}, + ), + ], +) +def test_convert_tool_choice(tool_choice, expected): + result = AnthropicToOpenAIConverter.convert_tool_choice(tool_choice) + assert result == expected + + # --- Message Conversion Tests: User ---