Refactor smoke testing framework and enhance provider configurations

- Updated DEFAULT_TARGETS in config.py to include new targets: clients, llamacpp, and lmstudio, while removing contract and optimizations.
- Introduced TARGET_ALIASES for better target management.
- Added TARGET_REQUIRED_ENV to specify environment variables needed for each target.
- Enhanced SmokeOutcome in report.py to include classification of outcomes for better reporting.
- Implemented classify_outcome function to categorize smoke test results.
- Added new test for stop endpoint in test_api_live.py to ensure proper error handling.
- Updated test_auth_live.py to enforce auth token requirements and utilize environment files.
- Changed target from vscode to clients in test_client_shapes_live.py.
- Removed obsolete test_feature_manifest.py and test_stream_contracts.py files.
- Added new skip helpers in skips.py to manage upstream unavailability scenarios.
- Created new tests for local provider endpoints in test_local_provider_endpoints_live.py.
- Added comprehensive feature inventory tests in tests/contracts/test_feature_manifest.py.
- Implemented stream contract tests in tests/contracts/test_stream_contracts.py.
This commit is contained in:
Alishahryar1 2026-04-24 17:16:06 -07:00
parent 15415d5db3
commit 751694a5da
16 changed files with 739 additions and 130 deletions

View file

@ -0,0 +1,88 @@
from __future__ import annotations
import re
from pathlib import Path
from messaging.platforms.factory import create_messaging_platform
from providers.base import BaseProvider
from providers.deepseek import DeepSeekProvider
from providers.llamacpp import LlamaCppProvider
from providers.lmstudio import LMStudioProvider
from providers.nvidia_nim import NvidiaNimProvider
from providers.open_router import OpenRouterProvider
from smoke.features import FEATURE_INVENTORY, README_FEATURES, feature_ids
VALID_COVERAGE = {"pytest", "live_smoke", "both"}
VALID_SOURCE = {"readme", "public_surface"}
def test_every_readme_feature_has_inventory_entry() -> None:
missing = sorted(set(README_FEATURES) - feature_ids(source="readme"))
extra_readme = sorted(feature_ids(source="readme") - set(README_FEATURES))
assert not missing, f"README features missing inventory entries: {missing}"
assert not extra_readme, (
f"README inventory entries not in README_FEATURES: {extra_readme}"
)
def test_feature_inventory_is_unique_and_decision_complete() -> None:
ids = [feature.feature_id for feature in FEATURE_INVENTORY]
assert len(ids) == len(set(ids))
assert "claude_pick" not in ids
for feature in FEATURE_INVENTORY:
assert feature.source in VALID_SOURCE, feature
assert feature.coverage in VALID_COVERAGE, feature
assert feature.title.strip(), feature
assert feature.skip_policy.strip(), feature
if feature.has_pytest_coverage:
assert feature.pytest_tests, feature
if feature.has_smoke_coverage:
assert feature.smoke_tests, feature
assert feature.smoke_targets, feature
def test_feature_inventory_test_owners_exist() -> None:
repo_root = Path(__file__).resolve().parents[2]
pytest_names = _collect_test_names(repo_root / "tests")
smoke_names = _collect_test_names(repo_root / "smoke")
for feature in FEATURE_INVENTORY:
for owner in feature.pytest_tests:
_assert_owner_exists(owner, repo_root, pytest_names)
for owner in feature.smoke_tests:
assert owner in smoke_names or owner in pytest_names, (feature, owner)
def test_provider_and_platform_registries_include_advertised_builtins() -> None:
provider_classes = {
"nvidia_nim": NvidiaNimProvider,
"open_router": OpenRouterProvider,
"deepseek": DeepSeekProvider,
"lmstudio": LMStudioProvider,
"llamacpp": LlamaCppProvider,
}
for provider_class in provider_classes.values():
assert issubclass(provider_class, BaseProvider)
assert create_messaging_platform("not-a-platform") is None
def _collect_test_names(root: Path) -> set[str]:
names: set[str] = set()
for path in root.rglob("test_*.py"):
text = path.read_text(encoding="utf-8")
names.update(re.findall(r"^\s*(?:async\s+)?def (test_[^(]+)", text, re.M))
return names
def _assert_owner_exists(owner: str, repo_root: Path, test_names: set[str]) -> None:
if owner.startswith("test_"):
assert owner in test_names, owner
return
path_part, _, node_name = owner.partition("::")
path = repo_root / path_part
assert path.exists(), owner
if node_name:
assert node_name in test_names, owner

View file

@ -0,0 +1,198 @@
from __future__ import annotations
from collections.abc import Iterable
from messaging.event_parser import parse_cli_event
from messaging.transcript import RenderCtx, TranscriptBuffer
from providers.common import (
ContentType,
HeuristicToolParser,
SSEBuilder,
ThinkTagParser,
)
from smoke.lib.sse import (
assert_anthropic_stream_contract,
event_names,
has_tool_use,
parse_sse_text,
text_content,
thinking_content,
)
def test_interleaved_thinking_text_blocks_are_valid() -> None:
events = _parse_builder_events(
_interleaved_thinking_text_events(
("first thought", "first answer", "second thought", "final answer")
)
)
assert_anthropic_stream_contract(events)
assert event_names(events).count("content_block_start") == 4
assert thinking_content(events) == "first thoughtsecond thought"
assert text_content(events) == "first answerfinal answer"
def test_split_think_tags_preserve_text_and_thinking() -> None:
events = _parse_builder_events(
_events_from_text_chunks(["before <thi", "nk>hidden", "</think> after"])
)
assert_anthropic_stream_contract(events)
assert thinking_content(events) == "hidden"
assert text_content(events) == "before after"
def test_mixed_reasoning_content_and_think_tags_keep_order() -> None:
builder = SSEBuilder("msg_contract", "contract-model")
chunks = [builder.message_start()]
chunks.extend(builder.ensure_thinking_block())
chunks.append(builder.emit_thinking_delta("reasoning field"))
chunks.extend(
_events_from_text_chunks([" visible <think>tagged</think> done"], builder)
)
chunks.extend(builder.close_all_blocks())
chunks.append(builder.message_delta("end_turn", 10))
chunks.append(builder.message_stop())
events = parse_sse_text("".join(chunks))
assert_anthropic_stream_contract(events)
assert thinking_content(events) == "reasoning fieldtagged"
assert text_content(events) == " visible done"
def test_thinking_tool_text_and_transcript_order_contract() -> None:
builder = SSEBuilder("msg_contract", "contract-model")
chunks = [builder.message_start()]
chunks.extend(builder.ensure_thinking_block())
chunks.append(builder.emit_thinking_delta("inspect first"))
chunks.extend(builder.close_content_blocks())
tool_block_index = builder.blocks.allocate_index()
chunks.append(
builder.content_block_start(
tool_block_index, "tool_use", id="toolu_1", name="Read"
)
)
chunks.append(
builder.content_block_delta(
tool_block_index, "input_json_delta", '{"file":"README.md"}'
)
)
chunks.append(builder.content_block_stop(tool_block_index))
chunks.extend(builder.ensure_text_block())
chunks.append(builder.emit_text_delta("done"))
chunks.extend(builder.close_all_blocks())
chunks.append(builder.message_delta("end_turn", 20))
chunks.append(builder.message_stop())
events = parse_sse_text("".join(chunks))
assert_anthropic_stream_contract(events)
assert has_tool_use(events)
transcript = TranscriptBuffer()
for event in events:
for parsed in parse_cli_event(event.data):
transcript.apply(parsed)
rendered = transcript.render(_render_ctx(), limit_chars=3900, status=None)
assert (
rendered.find("inspect first")
< rendered.find("Tool call:")
< rendered.find("done")
)
def test_enable_thinking_false_suppresses_reasoning_only() -> None:
events = _parse_builder_events(
_events_from_text_chunks(
["hello <think>secret</think> world"], enable_thinking=False
)
)
assert_anthropic_stream_contract(events)
assert "secret" not in thinking_content(events)
assert text_content(events) == "hello world"
def test_task_tool_arguments_force_foreground_execution() -> None:
parser = HeuristicToolParser()
filtered, detected = parser.feed(
"● <function=Task><parameter=description>Inspect</parameter>"
"<parameter=run_in_background>true</parameter> trailing"
)
detected.extend(parser.flush())
assert "trailing" in filtered
task = detected[0]
assert task["name"] == "Task"
if isinstance(task.get("input"), dict):
task["input"]["run_in_background"] = False
assert task["input"]["run_in_background"] is False
def _interleaved_thinking_text_events(
parts: tuple[str, str, str, str],
) -> Iterable[str]:
builder = SSEBuilder("msg_contract", "contract-model")
yield builder.message_start()
yield from builder.ensure_thinking_block()
yield builder.emit_thinking_delta(parts[0])
yield from builder.ensure_text_block()
yield builder.emit_text_delta(parts[1])
yield from builder.ensure_thinking_block()
yield builder.emit_thinking_delta(parts[2])
yield from builder.ensure_text_block()
yield builder.emit_text_delta(parts[3])
yield from builder.close_all_blocks()
yield builder.message_delta("end_turn", 20)
yield builder.message_stop()
def _events_from_text_chunks(
chunks: list[str],
builder: SSEBuilder | None = None,
*,
enable_thinking: bool = True,
) -> list[str]:
sse = builder or SSEBuilder("msg_contract", "contract-model")
out: list[str] = [] if builder else [sse.message_start()]
parser = ThinkTagParser()
for chunk in chunks:
out.extend(_emit_parser_parts(sse, parser.feed(chunk), enable_thinking))
remaining = parser.flush()
if remaining is not None:
out.extend(_emit_parser_parts(sse, [remaining], enable_thinking))
if builder is None:
out.extend(sse.close_all_blocks())
out.append(sse.message_delta("end_turn", 20))
out.append(sse.message_stop())
return out
def _emit_parser_parts(
builder: SSEBuilder,
parts: Iterable,
enable_thinking: bool,
) -> list[str]:
out: list[str] = []
for part in parts:
if part.type == ContentType.THINKING:
if enable_thinking:
out.extend(builder.ensure_thinking_block())
out.append(builder.emit_thinking_delta(part.content))
continue
out.extend(builder.ensure_text_block())
out.append(builder.emit_text_delta(part.content))
return out
def _parse_builder_events(chunks: Iterable[str]):
return parse_sse_text("".join(chunks))
def _render_ctx() -> RenderCtx:
return RenderCtx(
bold=lambda text: f"*{text}*",
code_inline=lambda text: f"`{text}`",
escape_code=lambda text: text,
escape_text=lambda text: text,
render_markdown=lambda text: text,
)