Add local live smoke test suite (#148)

## Summary
- add an opt-in local `smoke/` pytest suite for API, auth, providers,
CLI, IDE-shaped requests, messaging, voice, tools, and thinking stream
contracts
- keep smoke tests out of normal CI collection with `testpaths =
["tests"]`
- write sanitized smoke artifacts under `.smoke-results/`

## Verification
- `uv run ruff format`
- `uv run ruff check`
- `uv run ty check`
- `uv run ty check smoke`
- `FCC_LIVE_SMOKE=1 FCC_SMOKE_TARGETS=all FCC_SMOKE_RUN_VOICE=1 uv run
pytest smoke -n 0 -m live -s --tb=short` -> 17 passed, 9 skipped
- `uv run pytest` -> 904 passed

## Notes
- Skipped live checks require local credentials/tools/services, such as
provider models, Telegram/Discord targets, voice backend, or Claude CLI.
- `claude-pick` smoke was intentionally removed.
This commit is contained in:
Ali Khokhar 2026-04-23 19:06:09 -07:00 committed by GitHub
parent e8e13b9fea
commit 462a9430bb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 1789 additions and 2 deletions

3
.gitignore vendored
View file

@ -9,4 +9,5 @@ agent_workspace
.env
server.log
.coverage
llama_cache
llama_cache
.smoke-results

View file

@ -175,6 +175,26 @@ class AnthropicToOpenAIConverter:
for tool in tools
]
@staticmethod
def convert_tool_choice(tool_choice: Any) -> Any:
"""Convert Anthropic tool_choice to OpenAI-compatible format."""
if not isinstance(tool_choice, dict):
return tool_choice
choice_type = tool_choice.get("type")
if choice_type == "tool":
name = tool_choice.get("name")
if name:
return {"type": "function", "function": {"name": name}}
if choice_type == "any":
return "required"
if choice_type in {"auto", "none", "required"}:
return choice_type
if choice_type == "function" and isinstance(tool_choice.get("function"), dict):
return tool_choice
return tool_choice
@staticmethod
def convert_system_prompt(system: Any) -> dict[str, str] | None:
"""Convert Anthropic system prompt to OpenAI format."""
@ -236,6 +256,8 @@ def build_base_request_body(
body["tools"] = AnthropicToOpenAIConverter.convert_tools(tools)
tool_choice = getattr(request_data, "tool_choice", None)
if tool_choice:
body["tool_choice"] = tool_choice
body["tool_choice"] = AnthropicToOpenAIConverter.convert_tool_choice(
tool_choice
)
return body

View file

@ -96,6 +96,18 @@ skip-magic-trailing-comma = false
[tool.pytest.ini_options]
pythonpath = ["."]
addopts = "-n auto"
testpaths = ["tests"]
markers = [
"live: opt-in local smoke tests that can touch real services",
"interactive: smoke tests requiring manual user interaction",
"provider: live provider checks",
"messaging: live messaging platform checks",
"cli: CLI integration checks",
"vscode: VS Code or IDE client compatibility checks",
"voice: voice transcription checks",
"contract: deterministic smoke contract checks",
"smoke_target(name): route a smoke test behind FCC_SMOKE_TARGETS",
]
[tool.ty.environment]
python-version = "3.14"

54
smoke/README.md Normal file
View file

@ -0,0 +1,54 @@
# Local Live Smoke Tests
These tests are for maintainers running against their own `.env`. They are not
part of CI and are not collected by plain `uv run pytest`.
## Safe Default Run
```powershell
$env:FCC_LIVE_SMOKE = "1"
uv run pytest smoke -n 0 -m live -s --tb=short
```
`-n 0` is recommended because the normal project pytest config enables xdist.
The smoke suite can run with workers, but one process gives clearer logs when a
real provider or bot fails.
## Targeted Runs
```powershell
$env:FCC_LIVE_SMOKE = "1"
$env:FCC_SMOKE_TARGETS = "api,providers,thinking,tools"
uv run pytest smoke -n 0 -m live -s --tb=short
```
Use `FCC_SMOKE_TARGETS=all` to include Telegram, Discord, and voice checks.
The default target set intentionally excludes those side-effectful integrations.
## Environment
- `FCC_ENV_FILE`: optional explicit dotenv path. The app still uses its normal
env-file precedence.
- `FCC_SMOKE_PROVIDER_MATRIX`: comma-separated provider prefixes to test.
- `FCC_SMOKE_TIMEOUT_S`: per-request/subprocess timeout, default `45`.
- `FCC_SMOKE_CLAUDE_BIN`: Claude CLI executable name, default `claude`.
- `FCC_SMOKE_TELEGRAM_CHAT_ID`: Telegram chat/user ID for send/edit/delete.
- `FCC_SMOKE_DISCORD_CHANNEL_ID`: Discord channel ID for send/edit/delete.
- `FCC_SMOKE_INTERACTIVE=1`: enables manual inbound messaging checks.
- `FCC_SMOKE_RUN_VOICE=1`: allows the voice transcription backend to load/run.
## Results
Smoke artifacts are written to `.smoke-results/` and ignored by git. Reports and
logs redact env values whose names contain `KEY`, `TOKEN`, `SECRET`, `WEBHOOK`,
or `AUTH`.
## How To Read Failures
- `missing_env`: configure the required key, token, channel, or local base URL.
- `upstream_unavailable`: the provider/local model/bot API is not reachable.
- `product_failure`: the app returned the wrong shape or crashed.
- `harness_bug`: the smoke test itself made an invalid assumption.
The first real run is expected to find product failures. Fix those separately
from harness problems so the suite becomes a reliable regression signal.

1
smoke/__init__.py Normal file
View file

@ -0,0 +1 @@
"""Local-only live smoke tests for free-claude-code."""

75
smoke/conftest.py Normal file
View file

@ -0,0 +1,75 @@
from __future__ import annotations
from collections.abc import Iterator
import pytest
from smoke.lib.config import SmokeConfig, auth_headers
from smoke.lib.report import SmokeReport
from smoke.lib.server import RunningServer, start_server
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
if SmokeConfig.load().live:
return
skip = pytest.mark.skip(reason="set FCC_LIVE_SMOKE=1 to run local smoke tests")
for item in items:
item.add_marker(skip)
def pytest_configure(config: pytest.Config) -> None:
global _REPORT
smoke_config = SmokeConfig.load()
_REPORT = SmokeReport(smoke_config)
def pytest_runtest_setup(item: pytest.Item) -> None:
config = SmokeConfig.load()
target_marks = list(item.iter_markers("smoke_target"))
if not target_marks:
return
targets = [str(mark.args[0]) for mark in target_marks if mark.args]
if targets and not any(config.target_enabled(target) for target in targets):
pytest.skip(f"smoke target disabled: {', '.join(targets)}")
def pytest_runtest_logreport(report: pytest.TestReport) -> None:
if report.when != "call":
return
if _REPORT is None:
return
markers = sorted(
str(name) for name in report.keywords if str(name).startswith("smoke_")
)
detail = "" if report.longrepr is None else str(report.longrepr)
_REPORT.add(
nodeid=report.nodeid,
outcome=report.outcome,
duration_s=report.duration,
markers=markers,
detail=detail,
)
def pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None:
if _REPORT is not None:
_REPORT.write()
@pytest.fixture(scope="session")
def smoke_config() -> SmokeConfig:
return SmokeConfig.load()
@pytest.fixture
def smoke_server(smoke_config: SmokeConfig) -> Iterator[RunningServer]:
with start_server(smoke_config) as server:
yield server
@pytest.fixture
def smoke_headers() -> dict[str, str]:
return auth_headers()
_REPORT: SmokeReport | None = None

147
smoke/features.py Normal file
View file

@ -0,0 +1,147 @@
"""Feature inventory used by the local smoke suite.
This file is intentionally explicit: advertised features should not exist only
in README prose without at least one smoke check or a documented manual gap.
"""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class FeatureSmoke:
feature_id: str
title: str
mode: str
checks: tuple[str, ...]
README_FEATURES: tuple[str, ...] = (
"zero_cost_provider_access",
"drop_in_claude_code_replacement",
"provider_matrix",
"per_model_mapping",
"thinking_token_support",
"heuristic_tool_parser",
"request_optimization",
"smart_rate_limiting",
"discord_telegram_bot",
"subagent_control",
"extensible_provider_platform_abcs",
"optional_authentication",
"vscode_extension",
"intellij_extension",
"voice_notes",
)
FEATURE_SMOKES: tuple[FeatureSmoke, ...] = (
FeatureSmoke(
"zero_cost_provider_access",
"Configured provider accepts a real prompt",
"live",
("test_configured_provider_models_stream_successfully",),
),
FeatureSmoke(
"drop_in_claude_code_replacement",
"Claude-compatible routes and CLI environment work",
"live",
(
"test_probe_and_models_routes",
"test_claude_cli_prompt_when_available",
"test_vscode_and_jetbrains_shaped_requests",
),
),
FeatureSmoke(
"provider_matrix",
"All configured provider prefixes can be exercised",
"live",
("test_configured_provider_models_stream_successfully",),
),
FeatureSmoke(
"per_model_mapping",
"Opus, Sonnet, Haiku, and fallback mappings are visible",
"live",
("test_model_mapping_configuration_is_consistent",),
),
FeatureSmoke(
"thinking_token_support",
"Thinking streams and suppression are contract-tested",
"contract",
(
"test_interleaved_thinking_text_blocks_are_valid",
"test_split_think_tags_preserve_text_and_thinking",
"test_enable_thinking_false_suppresses_reasoning_only",
),
),
FeatureSmoke(
"heuristic_tool_parser",
"Text tool calls become structured tool_use blocks",
"contract",
("test_thinking_tool_text_and_transcript_order_contract",),
),
FeatureSmoke(
"request_optimization",
"Fast-path local optimizations respond without providers",
"live",
("test_optimization_fast_paths_do_not_need_provider",),
),
FeatureSmoke(
"smart_rate_limiting",
"Concurrency/disconnect and retry-sensitive paths are checked",
"live",
("test_client_disconnect_mid_stream_does_not_crash_server",),
),
FeatureSmoke(
"discord_telegram_bot",
"Messaging credentials, send/edit/delete, and transcript behavior",
"live_or_interactive",
(
"test_telegram_bot_api_permissions",
"test_discord_bot_api_permissions",
"test_thinking_tool_text_and_transcript_order_contract",
),
),
FeatureSmoke(
"subagent_control",
"Task tool calls do not run in the background",
"contract",
("test_task_tool_arguments_force_foreground_execution",),
),
FeatureSmoke(
"extensible_provider_platform_abcs",
"Provider/platform registries expose expected built-ins",
"contract",
("test_provider_and_platform_registries_include_advertised_builtins",),
),
FeatureSmoke(
"optional_authentication",
"Anthropic-style auth headers are accepted and enforced",
"live",
("test_auth_token_is_enforced_for_all_supported_header_shapes",),
),
FeatureSmoke(
"vscode_extension",
"VS Code-shaped beta requests work against the proxy",
"live",
("test_vscode_and_jetbrains_shaped_requests",),
),
FeatureSmoke(
"intellij_extension",
"JetBrains/ACP-shaped environment requests work against the proxy",
"live",
("test_vscode_and_jetbrains_shaped_requests",),
),
FeatureSmoke(
"voice_notes",
"Configured transcription backend can process an audio fixture",
"live_or_skip",
("test_voice_transcription_backend_when_explicitly_enabled",),
),
)
def smoke_ids() -> set[str]:
"""Return feature IDs covered by the smoke manifest."""
return {feature.feature_id for feature in FEATURE_SMOKES}

1
smoke/lib/__init__.py Normal file
View file

@ -0,0 +1 @@
"""Shared helpers for local-only smoke tests."""

152
smoke/lib/config.py Normal file
View file

@ -0,0 +1,152 @@
"""Smoke-suite configuration loaded from the real developer environment."""
from __future__ import annotations
import os
from collections.abc import Mapping
from dataclasses import dataclass
from pathlib import Path
from config.settings import Settings, get_settings
DEFAULT_TARGETS = frozenset(
{
"api",
"auth",
"cli",
"contract",
"optimizations",
"providers",
"thinking",
"tools",
"vscode",
}
)
ALL_TARGETS = DEFAULT_TARGETS | frozenset({"discord", "telegram", "voice"})
SECRET_KEY_PARTS = ("KEY", "TOKEN", "SECRET", "WEBHOOK", "AUTH")
@dataclass(frozen=True, slots=True)
class ProviderModel:
provider: str
full_model: str
source: str
@property
def model_name(self) -> str:
return Settings.parse_model_name(self.full_model)
@dataclass(frozen=True, slots=True)
class SmokeConfig:
root: Path
results_dir: Path
live: bool
interactive: bool
targets: frozenset[str]
provider_matrix: frozenset[str]
timeout_s: float
prompt: str
claude_bin: str
worker_id: str
settings: Settings
@classmethod
def load(cls) -> SmokeConfig:
root = Path(__file__).resolve().parents[2]
get_settings.cache_clear()
settings = get_settings()
return cls(
root=root,
results_dir=root / ".smoke-results",
live=os.getenv("FCC_LIVE_SMOKE") == "1",
interactive=os.getenv("FCC_SMOKE_INTERACTIVE") == "1",
targets=_parse_targets(os.getenv("FCC_SMOKE_TARGETS")),
provider_matrix=_parse_csv(os.getenv("FCC_SMOKE_PROVIDER_MATRIX")),
timeout_s=float(os.getenv("FCC_SMOKE_TIMEOUT_S", "45")),
prompt=os.getenv("FCC_SMOKE_PROMPT", "Reply with exactly: FCC_SMOKE_PONG"),
claude_bin=os.getenv("FCC_SMOKE_CLAUDE_BIN", "claude"),
worker_id=os.getenv("PYTEST_XDIST_WORKER", "main"),
settings=settings,
)
def target_enabled(self, *names: str) -> bool:
return any(name in self.targets for name in names)
def provider_models(self) -> list[ProviderModel]:
candidates = (
("MODEL", self.settings.model),
("MODEL_OPUS", self.settings.model_opus),
("MODEL_SONNET", self.settings.model_sonnet),
("MODEL_HAIKU", self.settings.model_haiku),
)
seen: set[str] = set()
models: list[ProviderModel] = []
for source, model in candidates:
if not model or model in seen:
continue
provider = Settings.parse_provider_type(model)
if self.provider_matrix and provider not in self.provider_matrix:
continue
if not self.has_provider_configuration(provider):
continue
seen.add(model)
models.append(
ProviderModel(provider=provider, full_model=model, source=source)
)
return models
def has_provider_configuration(self, provider: str) -> bool:
if provider == "nvidia_nim":
return bool(self.settings.nvidia_nim_api_key.strip())
if provider == "open_router":
return bool(self.settings.open_router_api_key.strip())
if provider == "deepseek":
return bool(self.settings.deepseek_api_key.strip())
if provider == "lmstudio":
return bool(self.settings.lm_studio_base_url.strip())
if provider == "llamacpp":
return bool(self.settings.llamacpp_base_url.strip())
return False
def _parse_csv(raw: str | None) -> frozenset[str]:
if not raw:
return frozenset()
return frozenset(part.strip() for part in raw.split(",") if part.strip())
def _parse_targets(raw: str | None) -> frozenset[str]:
if not raw:
return DEFAULT_TARGETS
parsed = _parse_csv(raw)
if "all" in parsed:
return ALL_TARGETS
return parsed
def auth_headers(token: str | None = None) -> dict[str, str]:
settings = get_settings()
resolved = token if token is not None else settings.anthropic_auth_token
headers = {
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
if resolved:
headers["x-api-key"] = resolved
return headers
def redacted(value: str, env: Mapping[str, str] | None = None) -> str:
"""Redact known secrets from a string before writing smoke artifacts."""
if not value:
return value
source = env if env is not None else os.environ
result = value
for key, secret in source.items():
if not secret or len(secret) < 4:
continue
if any(part in key.upper() for part in SECRET_KEY_PARTS):
result = result.replace(secret, f"<redacted:{key}>")
return result

70
smoke/lib/http.py Normal file
View file

@ -0,0 +1,70 @@
"""HTTP helpers for live smoke requests."""
from __future__ import annotations
from typing import Any
import httpx
from .config import SmokeConfig, auth_headers, redacted
from .server import RunningServer
from .sse import SSEEvent, parse_sse_lines
def message_payload(
text: str,
*,
model: str = "claude-3-5-sonnet-20241022",
max_tokens: int = 128,
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"model": model,
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": text}],
}
if extra:
payload.update(extra)
return payload
def post_json(
server: RunningServer,
path: str,
payload: dict[str, Any],
config: SmokeConfig,
*,
headers: dict[str, str] | None = None,
) -> httpx.Response:
request_headers = headers or auth_headers()
response = httpx.post(
f"{server.base_url}{path}",
headers=request_headers,
json=payload,
timeout=config.timeout_s,
)
return response
def collect_message_stream(
server: RunningServer,
payload: dict[str, Any],
config: SmokeConfig,
*,
headers: dict[str, str] | None = None,
) -> list[SSEEvent]:
request_headers = headers or auth_headers()
with httpx.stream(
"POST",
f"{server.base_url}/v1/messages",
headers=request_headers,
json=payload,
timeout=config.timeout_s,
) as response:
if response.status_code != 200:
body = response.read().decode("utf-8", errors="replace")
raise AssertionError(
f"stream request failed: HTTP {response.status_code} "
f"{redacted(body[:1000])}"
)
return parse_sse_lines(response.iter_lines())

58
smoke/lib/report.py Normal file
View file

@ -0,0 +1,58 @@
"""Small JSON report writer for smoke runs."""
from __future__ import annotations
import json
import time
from dataclasses import asdict, dataclass
from .config import SmokeConfig, redacted
@dataclass(slots=True)
class SmokeOutcome:
nodeid: str
outcome: str
duration_s: float
markers: list[str]
detail: str
class SmokeReport:
def __init__(self, config: SmokeConfig) -> None:
self.config = config
self.started_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
self.outcomes: list[SmokeOutcome] = []
def add(
self,
*,
nodeid: str,
outcome: str,
duration_s: float,
markers: list[str],
detail: str = "",
) -> None:
self.outcomes.append(
SmokeOutcome(
nodeid=nodeid,
outcome=outcome,
duration_s=duration_s,
markers=markers,
detail=redacted(detail),
)
)
def write(self) -> None:
self.config.results_dir.mkdir(parents=True, exist_ok=True)
path = (
self.config.results_dir
/ f"report-{self.config.worker_id}-{int(time.time())}.json"
)
payload = {
"started_at": self.started_at,
"worker_id": self.config.worker_id,
"targets": sorted(self.config.targets),
"outcomes": [asdict(outcome) for outcome in self.outcomes],
}
path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")

126
smoke/lib/server.py Normal file
View file

@ -0,0 +1,126 @@
"""Subprocess lifecycle helpers for local smoke servers."""
from __future__ import annotations
import os
import socket
import subprocess
import time
from collections.abc import Iterator
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from pathlib import Path
import httpx
from .config import SmokeConfig, redacted
@dataclass(slots=True)
class RunningServer:
base_url: str
port: int
log_path: Path
process: subprocess.Popen[bytes]
def find_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
return int(sock.getsockname()[1])
@contextmanager
def start_server(
config: SmokeConfig,
*,
env_overrides: dict[str, str] | None = None,
command: list[str] | None = None,
name: str = "server",
) -> Iterator[RunningServer]:
port = find_free_port()
config.results_dir.mkdir(parents=True, exist_ok=True)
log_path = config.results_dir / f"{name}-{config.worker_id}-{port}.log"
env = os.environ.copy()
env.update(
{
"HOST": "127.0.0.1",
"PORT": str(port),
"LOG_FILE": str(log_path),
"MESSAGING_PLATFORM": "none",
"PYTHONUNBUFFERED": "1",
}
)
if env_overrides:
env.update(env_overrides)
cmd = command or [
"uv",
"run",
"uvicorn",
"server:app",
"--host",
"127.0.0.1",
"--port",
str(port),
"--timeout-graceful-shutdown",
"5",
]
with log_path.open("ab") as log_file:
process = subprocess.Popen(
cmd,
cwd=config.root,
env=env,
stdout=log_file,
stderr=subprocess.STDOUT,
)
running = RunningServer(
base_url=f"http://127.0.0.1:{port}",
port=port,
log_path=log_path,
process=process,
)
try:
_wait_for_health(running, timeout_s=config.timeout_s)
yield running
finally:
_stop_process(process)
def _wait_for_health(server: RunningServer, *, timeout_s: float) -> None:
deadline = time.monotonic() + timeout_s
last_error = ""
while time.monotonic() < deadline:
if server.process.poll() is not None:
break
try:
response = httpx.get(f"{server.base_url}/health", timeout=2.0)
if response.status_code == 200:
return
last_error = f"HTTP {response.status_code}: {response.text[:200]}"
except Exception as exc:
last_error = f"{type(exc).__name__}: {exc}"
time.sleep(0.25)
log_excerpt = ""
with suppress(OSError):
log_excerpt = server.log_path.read_text(encoding="utf-8", errors="replace")[
-2000:
]
raise AssertionError(
"Smoke server did not become healthy. "
f"last_error={last_error!r} log={redacted(log_excerpt)!r}"
)
def _stop_process(process: subprocess.Popen[bytes]) -> None:
if process.poll() is not None:
return
process.terminate()
try:
process.wait(timeout=8)
except subprocess.TimeoutExpired:
process.kill()
process.wait(timeout=5)

148
smoke/lib/sse.py Normal file
View file

@ -0,0 +1,148 @@
"""SSE parsing and Anthropic stream assertions for smoke tests."""
from __future__ import annotations
import json
from collections.abc import Iterable
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True, slots=True)
class SSEEvent:
event: str
data: dict[str, Any]
raw: str
def parse_sse_lines(lines: Iterable[str]) -> list[SSEEvent]:
events: list[SSEEvent] = []
current_event = ""
data_parts: list[str] = []
raw_parts: list[str] = []
for line in lines:
stripped = line.rstrip("\r\n")
if stripped == "":
_append_event(events, current_event, data_parts, raw_parts)
current_event = ""
data_parts = []
raw_parts = []
continue
raw_parts.append(stripped)
if stripped.startswith("event:"):
current_event = stripped.split(":", 1)[1].strip()
elif stripped.startswith("data:"):
data_parts.append(stripped.split(":", 1)[1].strip())
_append_event(events, current_event, data_parts, raw_parts)
return events
def parse_sse_text(text: str) -> list[SSEEvent]:
return parse_sse_lines(text.splitlines())
def _append_event(
events: list[SSEEvent],
current_event: str,
data_parts: list[str],
raw_parts: list[str],
) -> None:
if not current_event and not data_parts:
return
data_text = "\n".join(data_parts)
data: dict[str, Any]
try:
parsed = json.loads(data_text) if data_text else {}
data = parsed if isinstance(parsed, dict) else {"value": parsed}
except json.JSONDecodeError:
data = {"raw": data_text}
events.append(SSEEvent(current_event, data, "\n".join(raw_parts)))
def assert_anthropic_stream_contract(
events: list[SSEEvent], *, allow_error: bool = False
) -> None:
assert events, "stream produced no SSE events"
event_names = [event.event for event in events]
assert "message_start" in event_names, event_names
assert event_names[-1] == "message_stop", event_names
open_blocks: dict[int, str] = {}
seen_blocks: set[int] = set()
for event in events:
if event.event == "error" and not allow_error:
raise AssertionError(f"unexpected SSE error event: {event.data}")
if event.event == "content_block_start":
index = _event_index(event)
block = event.data.get("content_block", {})
assert isinstance(block, dict), event.data
block_type = str(block.get("type", ""))
assert block_type in {"text", "thinking", "tool_use"}, event.data
assert index not in open_blocks, f"block {index} started twice"
assert index not in seen_blocks, f"block {index} reused after stop"
open_blocks[index] = block_type
seen_blocks.add(index)
continue
if event.event == "content_block_delta":
index = _event_index(event)
assert index in open_blocks, f"delta for unopened block {index}"
delta = event.data.get("delta", {})
assert isinstance(delta, dict), event.data
delta_type = str(delta.get("type", ""))
expected = {
"text": "text_delta",
"thinking": "thinking_delta",
"tool_use": "input_json_delta",
}[open_blocks[index]]
assert delta_type == expected, (
f"block {index} is {open_blocks[index]}, got {delta_type}"
)
continue
if event.event == "content_block_stop":
index = _event_index(event)
assert index in open_blocks, f"stop for unopened block {index}"
open_blocks.pop(index)
assert not open_blocks, f"unclosed blocks: {open_blocks}"
assert seen_blocks, "stream did not emit any content blocks"
def event_names(events: list[SSEEvent]) -> list[str]:
return [event.event for event in events]
def text_content(events: list[SSEEvent]) -> str:
parts: list[str] = []
for event in events:
delta = event.data.get("delta", {})
if isinstance(delta, dict) and delta.get("type") == "text_delta":
parts.append(str(delta.get("text", "")))
return "".join(parts)
def thinking_content(events: list[SSEEvent]) -> str:
parts: list[str] = []
for event in events:
delta = event.data.get("delta", {})
if isinstance(delta, dict) and delta.get("type") == "thinking_delta":
parts.append(str(delta.get("thinking", "")))
return "".join(parts)
def has_tool_use(events: list[SSEEvent]) -> bool:
for event in events:
block = event.data.get("content_block", {})
if isinstance(block, dict) and block.get("type") == "tool_use":
return True
return False
def _event_index(event: SSEEvent) -> int:
value = event.data.get("index")
assert isinstance(value, int), event.data
return value

175
smoke/test_api_live.py Normal file
View file

@ -0,0 +1,175 @@
from __future__ import annotations
from typing import Any
import httpx
import pytest
from smoke.lib.config import SmokeConfig
from smoke.lib.http import post_json
from smoke.lib.server import RunningServer
pytestmark = [pytest.mark.live, pytest.mark.smoke_target("api")]
def test_probe_and_models_routes(
smoke_server: RunningServer, smoke_headers: dict[str, str]
) -> None:
with httpx.Client(base_url=smoke_server.base_url, headers=smoke_headers) as client:
assert client.get("/health").json()["status"] == "healthy"
root = client.get("/")
assert root.status_code == 200
assert root.json()["status"] == "ok"
models = client.get("/v1/models")
assert models.status_code == 200
assert models.json()["data"]
for path in ("/", "/health", "/v1/messages", "/v1/messages/count_tokens"):
head = client.head(path)
assert head.status_code == 204, (path, head.status_code, head.text)
options = client.options(path)
assert options.status_code == 204, (path, options.status_code, options.text)
def test_count_tokens_accepts_thinking_tools_and_results(
smoke_server: RunningServer,
smoke_config: SmokeConfig,
smoke_headers: dict[str, str],
) -> None:
payload: dict[str, Any] = {
"model": "claude-3-5-sonnet-20241022",
"messages": [
{"role": "user", "content": "Use the tool."},
{
"role": "assistant",
"content": [
{"type": "thinking", "thinking": "Need to inspect the file."},
{
"type": "tool_use",
"id": "toolu_smoke",
"name": "Read",
"input": {"file_path": "README.md"},
},
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "toolu_smoke",
"content": "Free Claude Code",
}
],
},
],
"tools": [
{
"name": "Read",
"description": "Read a file",
"input_schema": {
"type": "object",
"properties": {"file_path": {"type": "string"}},
"required": ["file_path"],
},
}
],
}
response = post_json(
smoke_server,
"/v1/messages/count_tokens",
payload,
smoke_config,
headers=smoke_headers,
)
assert response.status_code == 200, response.text
assert response.json()["input_tokens"] > 0
def test_optimization_fast_paths_do_not_need_provider(
smoke_server: RunningServer,
smoke_config: SmokeConfig,
smoke_headers: dict[str, str],
) -> None:
cases: tuple[tuple[str, dict[str, Any], str], ...] = (
(
"quota",
{
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1,
"messages": [{"role": "user", "content": "quota"}],
},
"Quota check passed.",
),
(
"title",
{
"model": "claude-3-5-sonnet-20241022",
"system": "Generate a title for the new conversation topic.",
"messages": [{"role": "user", "content": "hello"}],
},
"Conversation",
),
(
"prefix",
{
"model": "claude-3-5-sonnet-20241022",
"messages": [
{
"role": "user",
"content": "<policy_spec>extract command</policy_spec>\nCommand: git status --short",
}
],
},
"git",
),
(
"suggestion",
{
"model": "claude-3-5-sonnet-20241022",
"messages": [{"role": "user", "content": "[SUGGESTION MODE: next]"}],
},
"",
),
(
"filepath",
{
"model": "claude-3-5-sonnet-20241022",
"system": "Extract any file paths that this command output contains.",
"messages": [
{
"role": "user",
"content": "Command: cat smoke/test_api_live.py\nOutput: file contents\n<filepaths>",
}
],
},
"smoke/test_api_live.py",
),
)
for name, payload, expected_text in cases:
response = post_json(
smoke_server, "/v1/messages", payload, smoke_config, headers=smoke_headers
)
assert response.status_code == 200, (name, response.text)
text = response.json()["content"][0]["text"]
assert expected_text in text
def test_invalid_messages_returns_anthropic_error(
smoke_server: RunningServer,
smoke_config: SmokeConfig,
smoke_headers: dict[str, str],
) -> None:
response = post_json(
smoke_server,
"/v1/messages",
{"model": "claude-3-5-sonnet-20241022", "messages": []},
smoke_config,
headers=smoke_headers,
)
assert response.status_code == 400
payload = response.json()
assert payload["type"] == "error"
assert payload["error"]["type"] == "invalid_request_error"

43
smoke/test_auth_live.py Normal file
View file

@ -0,0 +1,43 @@
from __future__ import annotations
import httpx
import pytest
from smoke.lib.config import SmokeConfig
from smoke.lib.server import start_server
pytestmark = [pytest.mark.live, pytest.mark.smoke_target("auth")]
def test_auth_token_is_enforced_for_all_supported_header_shapes(
smoke_config: SmokeConfig,
) -> None:
token = "fcc-smoke-token"
with start_server(
smoke_config,
env_overrides={"ANTHROPIC_AUTH_TOKEN": token, "MESSAGING_PLATFORM": "none"},
name="auth",
) as server:
assert httpx.get(f"{server.base_url}/").status_code == 401
assert (
httpx.get(f"{server.base_url}/", headers={"x-api-key": "wrong"}).status_code
== 401
)
assert (
httpx.get(f"{server.base_url}/", headers={"x-api-key": token}).status_code
== 200
)
assert (
httpx.get(
f"{server.base_url}/",
headers={"authorization": f"Bearer {token}"},
).status_code
== 200
)
assert (
httpx.get(
f"{server.base_url}/",
headers={"anthropic-auth-token": token},
).status_code
== 200
)

74
smoke/test_cli_live.py Normal file
View file

@ -0,0 +1,74 @@
from __future__ import annotations
import os
import shutil
import subprocess
from pathlib import Path
import pytest
from smoke.lib.config import SmokeConfig
from smoke.lib.server import start_server
pytestmark = [pytest.mark.live, pytest.mark.smoke_target("cli")]
def test_fcc_init_scaffolds_user_config(
smoke_config: SmokeConfig, tmp_path: Path
) -> None:
env = os.environ.copy()
env["HOME"] = str(tmp_path)
env["USERPROFILE"] = str(tmp_path)
result = subprocess.run(
["uv", "run", "fcc-init"],
cwd=smoke_config.root,
env=env,
capture_output=True,
text=True,
timeout=smoke_config.timeout_s,
check=False,
)
assert result.returncode == 0, result.stderr or result.stdout
assert (tmp_path / ".config" / "free-claude-code" / ".env").is_file()
def test_free_claude_code_entrypoint_starts_server(smoke_config: SmokeConfig) -> None:
with start_server(
smoke_config,
command=["uv", "run", "free-claude-code"],
env_overrides={"MESSAGING_PLATFORM": "none"},
name="entrypoint",
) as server:
assert server.process.poll() is None
def test_claude_cli_prompt_when_available(
smoke_config: SmokeConfig, tmp_path: Path
) -> None:
claude_bin = shutil.which(smoke_config.claude_bin)
if not claude_bin:
pytest.skip(f"Claude CLI not found: {smoke_config.claude_bin}")
models = smoke_config.provider_models()
if not models:
pytest.skip("no configured provider model available for Claude CLI smoke")
with start_server(
smoke_config,
env_overrides={"MODEL": models[0].full_model, "MESSAGING_PLATFORM": "none"},
name="claude-cli",
) as server:
env = os.environ.copy()
env["ANTHROPIC_BASE_URL"] = server.base_url
if smoke_config.settings.anthropic_auth_token:
env["ANTHROPIC_AUTH_TOKEN"] = smoke_config.settings.anthropic_auth_token
result = subprocess.run(
[claude_bin, "-p", "Reply with exactly FCC_SMOKE_PONG"],
cwd=tmp_path,
env=env,
capture_output=True,
text=True,
timeout=smoke_config.timeout_s,
check=False,
)
assert result.returncode == 0, result.stderr or result.stdout
assert "FCC_SMOKE_PONG" in result.stdout

View file

@ -0,0 +1,49 @@
from __future__ import annotations
import pytest
from smoke.lib.config import SmokeConfig, auth_headers
from smoke.lib.http import message_payload, post_json
from smoke.lib.server import RunningServer
pytestmark = [pytest.mark.live, pytest.mark.smoke_target("vscode")]
def test_vscode_and_jetbrains_shaped_requests(
smoke_server: RunningServer,
smoke_config: SmokeConfig,
) -> None:
payload = message_payload("quota", max_tokens=1)
vscode_headers = auth_headers()
vscode_headers.update(
{
"anthropic-beta": "messages-2023-12-15",
"user-agent": "Claude-Code-VSCode smoke",
}
)
vscode = post_json(
smoke_server,
"/v1/messages?beta=true",
payload,
smoke_config,
headers=vscode_headers,
)
assert vscode.status_code == 200, vscode.text
assert vscode.json()["content"][0]["text"] == "Quota check passed."
jetbrains_headers = auth_headers()
token = smoke_config.settings.anthropic_auth_token
if token:
jetbrains_headers.pop("x-api-key", None)
jetbrains_headers["authorization"] = f"Bearer {token}"
jetbrains_headers["user-agent"] = "JetBrains-ACP smoke"
jetbrains = post_json(
smoke_server,
"/v1/messages",
payload,
smoke_config,
headers=jetbrains_headers,
)
assert jetbrains.status_code == 200, jetbrains.text
assert jetbrains.json()["content"][0]["text"] == "Quota check passed."

View file

@ -0,0 +1,48 @@
from __future__ import annotations
import pytest
from messaging.platforms.factory import create_messaging_platform
from providers.base import BaseProvider
from providers.deepseek import DeepSeekProvider
from providers.llamacpp import LlamaCppProvider
from providers.lmstudio import LMStudioProvider
from providers.nvidia_nim import NvidiaNimProvider
from providers.open_router import OpenRouterProvider
from smoke.features import FEATURE_SMOKES, README_FEATURES, smoke_ids
pytestmark = [pytest.mark.live, pytest.mark.smoke_target("contract")]
def test_every_advertised_feature_has_a_smoke_entry() -> None:
missing = sorted(set(README_FEATURES) - smoke_ids())
extra = sorted(smoke_ids() - set(README_FEATURES))
assert not missing, f"README features missing smoke entries: {missing}"
assert not extra, f"smoke entries not tied to README features: {extra}"
def test_smoke_manifest_has_unique_ids_and_checks() -> None:
ids = [feature.feature_id for feature in FEATURE_SMOKES]
assert len(ids) == len(set(ids))
for feature in FEATURE_SMOKES:
assert feature.checks, feature
assert feature.mode in {
"live",
"contract",
"live_or_interactive",
"live_or_skip",
}
def test_provider_and_platform_registries_include_advertised_builtins() -> None:
provider_classes = {
"nvidia_nim": NvidiaNimProvider,
"open_router": OpenRouterProvider,
"deepseek": DeepSeekProvider,
"lmstudio": LMStudioProvider,
"llamacpp": LlamaCppProvider,
}
for provider_class in provider_classes.values():
assert issubclass(provider_class, BaseProvider)
assert create_messaging_platform("not-a-platform") is None

View file

@ -0,0 +1,113 @@
from __future__ import annotations
import os
import time
import httpx
import pytest
from smoke.lib.config import SmokeConfig
@pytest.mark.live
@pytest.mark.smoke_target("telegram")
def test_telegram_bot_api_permissions(smoke_config: SmokeConfig) -> None:
token = smoke_config.settings.telegram_bot_token
if not token:
pytest.skip("TELEGRAM_BOT_TOKEN is not configured")
base_url = f"https://api.telegram.org/bot{token}"
get_me = httpx.get(f"{base_url}/getMe", timeout=smoke_config.timeout_s)
assert get_me.status_code == 200, get_me.text
assert get_me.json()["ok"] is True
chat_id = os.getenv("FCC_SMOKE_TELEGRAM_CHAT_ID") or (
smoke_config.settings.allowed_telegram_user_id or ""
)
if not chat_id:
pytest.skip("FCC_SMOKE_TELEGRAM_CHAT_ID or ALLOWED_TELEGRAM_USER_ID required")
marker = f"FCC smoke {int(time.time())}"
sent = httpx.post(
f"{base_url}/sendMessage",
json={"chat_id": chat_id, "text": marker},
timeout=smoke_config.timeout_s,
)
assert sent.status_code == 200, sent.text
message_id = sent.json()["result"]["message_id"]
edited = httpx.post(
f"{base_url}/editMessageText",
json={"chat_id": chat_id, "message_id": message_id, "text": marker + " edit"},
timeout=smoke_config.timeout_s,
)
assert edited.status_code == 200, edited.text
deleted = httpx.post(
f"{base_url}/deleteMessage",
json={"chat_id": chat_id, "message_id": message_id},
timeout=smoke_config.timeout_s,
)
assert deleted.status_code == 200, deleted.text
@pytest.mark.live
@pytest.mark.smoke_target("discord")
def test_discord_bot_api_permissions(smoke_config: SmokeConfig) -> None:
token = smoke_config.settings.discord_bot_token
channel_id = os.getenv("FCC_SMOKE_DISCORD_CHANNEL_ID")
if not channel_id and smoke_config.settings.allowed_discord_channels:
channel_id = smoke_config.settings.allowed_discord_channels.split(",", 1)[0]
if not token:
pytest.skip("DISCORD_BOT_TOKEN is not configured")
if not channel_id:
pytest.skip("FCC_SMOKE_DISCORD_CHANNEL_ID or ALLOWED_DISCORD_CHANNELS required")
headers = {"authorization": f"Bot {token}"}
base_url = "https://discord.com/api/v10"
channel = httpx.get(
f"{base_url}/channels/{channel_id}",
headers=headers,
timeout=smoke_config.timeout_s,
)
assert channel.status_code == 200, channel.text
marker = f"FCC smoke {int(time.time())}"
sent = httpx.post(
f"{base_url}/channels/{channel_id}/messages",
headers=headers,
json={"content": marker},
timeout=smoke_config.timeout_s,
)
assert sent.status_code == 200, sent.text
message_id = sent.json()["id"]
edited = httpx.patch(
f"{base_url}/channels/{channel_id}/messages/{message_id}",
headers=headers,
json={"content": marker + " edit"},
timeout=smoke_config.timeout_s,
)
assert edited.status_code == 200, edited.text
deleted = httpx.delete(
f"{base_url}/channels/{channel_id}/messages/{message_id}",
headers=headers,
timeout=smoke_config.timeout_s,
)
assert deleted.status_code in {200, 204}, deleted.text
@pytest.mark.live
@pytest.mark.smoke_target("telegram")
@pytest.mark.smoke_target("discord")
def test_interactive_inbound_messaging_requires_explicit_mode(
smoke_config: SmokeConfig,
) -> None:
if not smoke_config.interactive:
pytest.skip("set FCC_SMOKE_INTERACTIVE=1 for manual inbound messaging checks")
pytest.skip(
"manual inbound check: start the server, send a nonce from the real client, "
"and verify threaded progress plus /stop, /clear, and /stats"
)

View file

@ -0,0 +1,88 @@
from __future__ import annotations
import time
import httpx
import pytest
from smoke.lib.config import SmokeConfig, auth_headers
from smoke.lib.http import collect_message_stream, message_payload
from smoke.lib.server import start_server
from smoke.lib.sse import assert_anthropic_stream_contract, text_content
pytestmark = [pytest.mark.live, pytest.mark.smoke_target("providers")]
def test_model_mapping_configuration_is_consistent(smoke_config: SmokeConfig) -> None:
models = smoke_config.provider_models()
if not models:
pytest.skip("no configured provider models with usable credentials/base URLs")
for provider_model in models:
assert "/" in provider_model.full_model
assert provider_model.model_name
def test_configured_provider_models_stream_successfully(
smoke_config: SmokeConfig,
) -> None:
models = smoke_config.provider_models()
if not models:
pytest.skip("no configured provider models with usable credentials/base URLs")
failures: list[str] = []
for provider_model in models:
try:
with start_server(
smoke_config,
env_overrides={
"MODEL": provider_model.full_model,
"MESSAGING_PLATFORM": "none",
},
name=f"provider-{provider_model.provider}",
) as server:
events = collect_message_stream(
server,
message_payload(smoke_config.prompt, model="fcc-smoke-default"),
smoke_config,
)
assert_anthropic_stream_contract(events)
assert text_content(events).strip(), "provider returned no text"
except Exception as exc:
failures.append(
f"{provider_model.source}={provider_model.full_model}: "
f"{type(exc).__name__}: {exc}"
)
assert not failures, "\n".join(failures)
def test_client_disconnect_mid_stream_does_not_crash_server(
smoke_config: SmokeConfig,
) -> None:
models = smoke_config.provider_models()
if not models:
pytest.skip("no configured provider model available for disconnect smoke")
provider_model = models[0]
with start_server(
smoke_config,
env_overrides={
"MODEL": provider_model.full_model,
"MESSAGING_PLATFORM": "none",
},
name="disconnect",
) as server:
with httpx.stream(
"POST",
f"{server.base_url}/v1/messages",
headers=auth_headers(),
json=message_payload(smoke_config.prompt, model="fcc-smoke-default"),
timeout=smoke_config.timeout_s,
) as response:
assert response.status_code == 200, response.read()
for _line in response.iter_lines():
break
time.sleep(0.5)
health = httpx.get(f"{server.base_url}/health", timeout=5)
assert health.status_code == 200

View file

@ -0,0 +1,206 @@
from __future__ import annotations
from collections.abc import Iterable
import pytest
from messaging.event_parser import parse_cli_event
from messaging.transcript import RenderCtx, TranscriptBuffer
from providers.common import (
ContentType,
HeuristicToolParser,
SSEBuilder,
ThinkTagParser,
)
from smoke.lib.sse import (
assert_anthropic_stream_contract,
event_names,
has_tool_use,
parse_sse_text,
text_content,
thinking_content,
)
pytestmark = [
pytest.mark.live,
pytest.mark.smoke_target("contract"),
pytest.mark.smoke_target("thinking"),
]
def test_interleaved_thinking_text_blocks_are_valid() -> None:
events = _parse_builder_events(
_interleaved_thinking_text_events(
("first thought", "first answer", "second thought", "final answer")
)
)
assert_anthropic_stream_contract(events)
assert event_names(events).count("content_block_start") == 4
assert thinking_content(events) == "first thoughtsecond thought"
assert text_content(events) == "first answerfinal answer"
def test_split_think_tags_preserve_text_and_thinking() -> None:
events = _parse_builder_events(
_events_from_text_chunks(["before <thi", "nk>hidden", "</think> after"])
)
assert_anthropic_stream_contract(events)
assert thinking_content(events) == "hidden"
assert text_content(events) == "before after"
def test_mixed_reasoning_content_and_think_tags_keep_order() -> None:
builder = SSEBuilder("msg_smoke", "smoke-model")
chunks = [builder.message_start()]
chunks.extend(builder.ensure_thinking_block())
chunks.append(builder.emit_thinking_delta("reasoning field"))
chunks.extend(
_events_from_text_chunks([" visible <think>tagged</think> done"], builder)
)
chunks.extend(builder.close_all_blocks())
chunks.append(builder.message_delta("end_turn", 10))
chunks.append(builder.message_stop())
events = parse_sse_text("".join(chunks))
assert_anthropic_stream_contract(events)
assert thinking_content(events) == "reasoning fieldtagged"
assert text_content(events) == " visible done"
def test_thinking_tool_text_and_transcript_order_contract() -> None:
builder = SSEBuilder("msg_smoke", "smoke-model")
chunks = [builder.message_start()]
chunks.extend(builder.ensure_thinking_block())
chunks.append(builder.emit_thinking_delta("inspect first"))
chunks.extend(builder.close_content_blocks())
tool_block_index = builder.blocks.allocate_index()
chunks.append(
builder.content_block_start(
tool_block_index, "tool_use", id="toolu_1", name="Read"
)
)
chunks.append(
builder.content_block_delta(
tool_block_index, "input_json_delta", '{"file":"README.md"}'
)
)
chunks.append(builder.content_block_stop(tool_block_index))
chunks.extend(builder.ensure_text_block())
chunks.append(builder.emit_text_delta("done"))
chunks.extend(builder.close_all_blocks())
chunks.append(builder.message_delta("end_turn", 20))
chunks.append(builder.message_stop())
events = parse_sse_text("".join(chunks))
assert_anthropic_stream_contract(events)
assert has_tool_use(events)
transcript = TranscriptBuffer()
for event in events:
for parsed in parse_cli_event(event.data):
transcript.apply(parsed)
rendered = transcript.render(_render_ctx(), limit_chars=3900, status=None)
assert (
rendered.find("inspect first")
< rendered.find("Tool call:")
< rendered.find("done")
)
def test_enable_thinking_false_suppresses_reasoning_only() -> None:
events = _parse_builder_events(
_events_from_text_chunks(
["hello <think>secret</think> world"], enable_thinking=False
)
)
assert_anthropic_stream_contract(events)
assert "secret" not in thinking_content(events)
assert text_content(events) == "hello world"
def test_task_tool_arguments_force_foreground_execution() -> None:
parser = HeuristicToolParser()
filtered, detected = parser.feed(
"● <function=Task><parameter=description>Inspect</parameter>"
"<parameter=run_in_background>true</parameter> trailing"
)
detected.extend(parser.flush())
assert "trailing" in filtered
task = detected[0]
assert task["name"] == "Task"
if isinstance(task.get("input"), dict):
task["input"]["run_in_background"] = False
assert task["input"]["run_in_background"] is False
def _interleaved_thinking_text_events(
parts: tuple[str, str, str, str],
) -> Iterable[str]:
builder = SSEBuilder("msg_smoke", "smoke-model")
yield builder.message_start()
yield from builder.ensure_thinking_block()
yield builder.emit_thinking_delta(parts[0])
yield from builder.ensure_text_block()
yield builder.emit_text_delta(parts[1])
yield from builder.ensure_thinking_block()
yield builder.emit_thinking_delta(parts[2])
yield from builder.ensure_text_block()
yield builder.emit_text_delta(parts[3])
yield from builder.close_all_blocks()
yield builder.message_delta("end_turn", 20)
yield builder.message_stop()
def _events_from_text_chunks(
chunks: list[str],
builder: SSEBuilder | None = None,
*,
enable_thinking: bool = True,
) -> list[str]:
sse = builder or SSEBuilder("msg_smoke", "smoke-model")
out: list[str] = [] if builder else [sse.message_start()]
parser = ThinkTagParser()
for chunk in chunks:
out.extend(_emit_parser_parts(sse, parser.feed(chunk), enable_thinking))
remaining = parser.flush()
if remaining is not None:
out.extend(_emit_parser_parts(sse, [remaining], enable_thinking))
if builder is None:
out.extend(sse.close_all_blocks())
out.append(sse.message_delta("end_turn", 20))
out.append(sse.message_stop())
return out
def _emit_parser_parts(
builder: SSEBuilder,
parts: Iterable,
enable_thinking: bool,
) -> list[str]:
out: list[str] = []
for part in parts:
if part.type == ContentType.THINKING:
if enable_thinking:
out.extend(builder.ensure_thinking_block())
out.append(builder.emit_thinking_delta(part.content))
continue
out.extend(builder.ensure_text_block())
out.append(builder.emit_text_delta(part.content))
return out
def _parse_builder_events(chunks: Iterable[str]):
return parse_sse_text("".join(chunks))
def _render_ctx() -> RenderCtx:
return RenderCtx(
bold=lambda text: f"*{text}*",
code_inline=lambda text: f"`{text}`",
escape_code=lambda text: text,
escape_text=lambda text: text,
render_markdown=lambda text: text,
)

51
smoke/test_tools_live.py Normal file
View file

@ -0,0 +1,51 @@
from __future__ import annotations
import pytest
from smoke.lib.config import SmokeConfig
from smoke.lib.http import collect_message_stream, message_payload
from smoke.lib.server import start_server
from smoke.lib.sse import assert_anthropic_stream_contract, has_tool_use
pytestmark = [pytest.mark.live, pytest.mark.smoke_target("tools")]
def test_live_tool_use_when_configured_model_supports_tools(
smoke_config: SmokeConfig,
) -> None:
models = smoke_config.provider_models()
if not models:
pytest.skip("no configured provider model available for tool-use smoke")
provider_model = models[0]
payload = message_payload(
"Use the echo_smoke tool once with value FCC_SMOKE_TOOL.",
model="fcc-smoke-default",
max_tokens=256,
extra={
"tools": [
{
"name": "echo_smoke",
"description": "Echo a test value.",
"input_schema": {
"type": "object",
"properties": {"value": {"type": "string"}},
"required": ["value"],
},
}
],
"tool_choice": {"type": "tool", "name": "echo_smoke"},
},
)
with start_server(
smoke_config,
env_overrides={
"MODEL": provider_model.full_model,
"MESSAGING_PLATFORM": "none",
},
name="tools",
) as server:
events = collect_message_stream(server, payload, smoke_config)
assert_anthropic_stream_contract(events)
assert has_tool_use(events), "model did not emit a tool_use block"

52
smoke/test_voice_live.py Normal file
View file

@ -0,0 +1,52 @@
from __future__ import annotations
import math
import os
import wave
from pathlib import Path
import pytest
from messaging.transcription import transcribe_audio
from smoke.lib.config import SmokeConfig
pytestmark = [pytest.mark.live, pytest.mark.smoke_target("voice")]
def test_voice_transcription_backend_when_explicitly_enabled(
smoke_config: SmokeConfig, tmp_path: Path
) -> None:
if not smoke_config.settings.voice_note_enabled:
pytest.skip("VOICE_NOTE_ENABLED is false")
if os.getenv("FCC_SMOKE_RUN_VOICE") != "1":
pytest.skip("set FCC_SMOKE_RUN_VOICE=1 to run transcription smoke")
wav_path = tmp_path / "smoke-tone.wav"
_write_tone_wav(wav_path)
try:
text = transcribe_audio(
wav_path,
"audio/wav",
whisper_model=smoke_config.settings.whisper_model,
whisper_device=smoke_config.settings.whisper_device,
)
except ImportError as exc:
pytest.skip(str(exc))
assert isinstance(text, str)
assert text.strip()
def _write_tone_wav(path: Path) -> None:
sample_rate = 16000
duration_s = 0.25
amplitude = 8000
frames = bytearray()
for i in range(int(sample_rate * duration_s)):
sample = int(amplitude * math.sin(2 * math.pi * 440 * i / sample_rate))
frames.extend(sample.to_bytes(2, byteorder="little", signed=True))
with wave.open(str(path), "wb") as wav:
wav.setnchannels(1)
wav.setsampwidth(2)
wav.setframerate(sample_rate)
wav.writeframes(bytes(frames))

View file

@ -79,6 +79,27 @@ def test_convert_tools():
assert result[1]["function"]["description"] == "" # Check default empty string
@pytest.mark.parametrize(
"tool_choice,expected",
[
(
{"type": "tool", "name": "echo_smoke"},
{"type": "function", "function": {"name": "echo_smoke"}},
),
({"type": "any"}, "required"),
({"type": "auto"}, "auto"),
({"type": "none"}, "none"),
(
{"type": "function", "function": {"name": "already_openai"}},
{"type": "function", "function": {"name": "already_openai"}},
),
],
)
def test_convert_tool_choice(tool_choice, expected):
result = AnthropicToOpenAIConverter.convert_tool_choice(tool_choice)
assert result == expected
# --- Message Conversion Tests: User ---