diff --git a/skyvern/forge/agent_functions.py b/skyvern/forge/agent_functions.py index 4b1a1b80c..bd622b915 100644 --- a/skyvern/forge/agent_functions.py +++ b/skyvern/forge/agent_functions.py @@ -500,6 +500,10 @@ class AgentFunction: async def post_cache_step_execution(self, task: Task, step: Step) -> None: return + async def should_shadow_extraction_cache_hit(self, task: Task) -> bool: + """Cloud-overridable sample gate for extract-information shadow mode. OSS no-op.""" + return False + def build_workflow_schedule_id(self, workflow_schedule_id: str) -> str | None: """Return the backend-specific schedule id used by the execution engine. diff --git a/skyvern/forge/sdk/cache/extraction_shadow.py b/skyvern/forge/sdk/cache/extraction_shadow.py new file mode 100644 index 000000000..4c49dde2e --- /dev/null +++ b/skyvern/forge/sdk/cache/extraction_shadow.py @@ -0,0 +1,440 @@ +"""Shadow-mode FP-rate sampling for the extract-information cache. + +On sampled cache hits, fire the LLM call in the background and log a +comparison event so a log-based metric can derive the cache's false-positive +rate. ``diff_summary`` records only paths, never values (log safety). LLM +errors are swallowed — shadow is best-effort. +""" + +from __future__ import annotations + +import asyncio +import json +import re +import time +from collections import Counter +from dataclasses import dataclass, field +from typing import Any, Awaitable, Callable, Protocol + +import structlog + +LOG = structlog.get_logger() + +# Single consolidated event. Filter on `status:ok` for the FP metric, `status:error` for reliability. +_SHADOW_EVENT = "extract_information.shadow_comparison" + +_MODE_STRICT = "strict" +_MODE_SEMANTIC = "semantic" + +# Strips the `[0]`/`[12]` index segments from a runtime diff path so it can be +# matched against schema paths that use `[*]` wildcards for list elements. +_LIST_INDEX_RE = re.compile(r"\[\d+\]") + +_LlmCall = Callable[[], Awaitable[Any]] + + +def _elapsed_ms(start_seconds: float) -> int: + """Elapsed ms since ``start_seconds`` (a ``time.monotonic()`` reading).""" + return int((time.monotonic() - start_seconds) * 1000) + + +class _LoggerLike(Protocol): + def debug(self, event: str, **kwargs: Any) -> None: ... + + def info(self, event: str, **kwargs: Any) -> None: ... + + def warning(self, event: str, **kwargs: Any) -> None: ... + + +@dataclass +class ComparisonResult: + match: bool + mode: str + diff_summary: set[str] = field(default_factory=set) + + +def _resolve_ref(ref: str, root: Any) -> Any | None: + """Resolve a local JSON Schema ``$ref`` (``#/$defs/Foo``) against ``root``. + + Returns ``None`` for anything non-local or malformed; the caller then just + skips that branch rather than raising. External refs are never followed. + """ + if not isinstance(ref, str) or not ref.startswith("#/"): + return None + node: Any = root + for segment in ref[2:].split("/"): + if not isinstance(node, dict) or segment not in node: + return None + node = node[segment] + return node + + +def _collect_unique_item_paths( + schema: Any, + prefix: str = "root", + *, + root: Any | None = None, + seen_refs: frozenset[str] | None = None, +) -> set[str]: + """Return dotted paths for arrays in ``schema`` declared ``uniqueItems``. + + Paths use ``_diff_paths``'s convention: ``"root"`` at the top, bare field + names for top-level properties, dotted names for nested ones, and ``[*]`` + for array-item recursion so nested unique arrays don't pollute their + parent's path (``groups`` vs ``groups[*]``). + + Recurses into ``allOf``/``anyOf``/``oneOf`` combinators and resolves local + ``$ref`` pointers against ``$defs`` — Pydantic's ``model_json_schema()`` + uses both heavily for nested models and ``Field(description=...)`` fields. + Missing either would make semantic mode a no-op for most real schemas. + ``seen_refs`` guards against circular references. + """ + paths: set[str] = set() + if not isinstance(schema, dict): + return paths + + if root is None: + root = schema + if seen_refs is None: + seen_refs = frozenset() + + ref = schema.get("$ref") + if isinstance(ref, str): + if ref in seen_refs: + return paths + resolved = _resolve_ref(ref, root) + if resolved is not None: + paths.update(_collect_unique_item_paths(resolved, prefix, root=root, seen_refs=seen_refs | {ref})) + + if schema.get("uniqueItems") is True: + paths.add(prefix) + + properties = schema.get("properties") + if isinstance(properties, dict): + for name, sub in properties.items(): + child_prefix = f"{prefix}.{name}" if prefix != "root" else name + paths.update(_collect_unique_item_paths(sub, child_prefix, root=root, seen_refs=seen_refs)) + + items = schema.get("items") + if isinstance(items, dict): + paths.update(_collect_unique_item_paths(items, f"{prefix}[*]", root=root, seen_refs=seen_refs)) + + for combinator in ("allOf", "anyOf", "oneOf"): + branches = schema.get(combinator) + if isinstance(branches, list): + for branch in branches: + paths.update(_collect_unique_item_paths(branch, prefix, root=root, seen_refs=seen_refs)) + + return paths + + +def _json_key(value: Any) -> str: + return json.dumps(value, sort_keys=True, default=str) + + +def _canonical_form(value: Any, *, unique_item_paths: set[str], prefix: str) -> Any: + """Hashable canonical form for set-equality inside ``uniqueItems`` comparisons. + + Mirrors the semantic rules ``_diff_paths`` applies elsewhere so set-equality + doesn't contradict field-level equality: + + - ``int`` and ``float`` collapse to the same ``("n", float)`` key (JSON + number equivalence, matching the numeric cross-type exception in + ``_diff_paths``). ``bool`` stays distinct from numerics. + - Nested ``uniqueItems`` arrays inside an element are sorted so two + elements that differ only by inner reorder produce the same form. + - All other lists stay order-sensitive. + + ``prefix`` is the schema path of ``value`` using ``[*]`` wildcards (no + runtime indices), so membership against ``unique_item_paths`` is direct. + """ + if isinstance(value, bool): + return ("b", value) + if isinstance(value, int): + # Keep exact int precision — float(large_int) would collapse distinct + # ids above 2^53 to the same canonical key. + return ("n", value) + if isinstance(value, float): + # Collapse integer-valued floats into the int form so 1 == 1.0 under + # semantic equality (matches _diff_paths's int/float numeric exception). + if value.is_integer(): + return ("n", int(value)) + return ("n", value) + if value is None: + return ("z",) + if isinstance(value, str): + return ("s", value) + if isinstance(value, dict): + entries = [] + for key in sorted(value.keys()): + child_prefix = f"{prefix}.{key}" if prefix != "root" else key + entries.append((key, _canonical_form(value[key], unique_item_paths=unique_item_paths, prefix=child_prefix))) + return ("d", tuple(entries)) + if isinstance(value, list): + child_prefix = f"{prefix}[*]" + child_forms = tuple( + _canonical_form(item, unique_item_paths=unique_item_paths, prefix=child_prefix) for item in value + ) + if prefix in unique_item_paths: + # Sort by repr: canonical forms mix tuple shapes, and repr gives + # a stable total order without requiring all elements to be + # mutually comparable. + return ("lu", tuple(sorted(Counter(child_forms).items(), key=lambda kv: repr(kv[0])))) + return ("l", child_forms) + raise TypeError(f"Non-JSON-serializable value in extraction result: {type(value)}") + + +def _diff_paths( + cached: Any, + fresh: Any, + *, + unique_item_paths: set[str], + prefix: str = "root", +) -> set[str]: + """Dotted paths where ``cached`` and ``fresh`` differ; ``uniqueItems`` lists compared as sets.""" + diffs: set[str] = set() + + if type(cached) is not type(fresh): + # int/float cross-compare is the one narrow exception. bool is a subclass of int, + # so exclude it explicitly — True vs 1 must register as a diff. + cached_numeric = isinstance(cached, (int, float)) and not isinstance(cached, bool) + fresh_numeric = isinstance(fresh, (int, float)) and not isinstance(fresh, bool) + if not (cached_numeric and fresh_numeric): + diffs.add(prefix) + return diffs + + if isinstance(cached, dict) and isinstance(fresh, dict): + keys = set(cached.keys()) | set(fresh.keys()) + for key in keys: + child_path = f"{prefix}.{key}" if prefix != "root" else key + if key not in cached or key not in fresh: + diffs.add(child_path) + continue + diffs.update( + _diff_paths( + cached[key], + fresh[key], + unique_item_paths=unique_item_paths, + prefix=child_path, + ) + ) + return diffs + + if isinstance(cached, list) and isinstance(fresh, list): + # Normalize runtime indices (`groups[0]`) to schema wildcards (`groups[*]`) for lookup. + normalized = _LIST_INDEX_RE.sub("[*]", prefix) + if normalized in unique_item_paths: + # Counter preserves multiplicity — ['a','a'] vs ['a'] is a mismatch. + # _canonical_form recursively applies int/float equivalence and + # nested-uniqueItems reorder tolerance so set-equality here stays + # consistent with _diff_paths's field-level rules. + child_prefix = f"{normalized}[*]" + cached_forms = Counter( + _canonical_form(item, unique_item_paths=unique_item_paths, prefix=child_prefix) for item in cached + ) + fresh_forms = Counter( + _canonical_form(item, unique_item_paths=unique_item_paths, prefix=child_prefix) for item in fresh + ) + if cached_forms != fresh_forms: + diffs.add(prefix) + return diffs + if len(cached) != len(fresh): + diffs.add(prefix) + return diffs + for idx, (a, b) in enumerate(zip(cached, fresh)): + diffs.update(_diff_paths(a, b, unique_item_paths=unique_item_paths, prefix=f"{prefix}[{idx}]")) + return diffs + + if cached != fresh: + diffs.add(prefix) + return diffs + + +def compare_results(cached: Any, fresh: Any, *, schema: Any | None) -> ComparisonResult: + """Compare cached vs fresh; ``uniqueItems`` arrays use set-equality (``semantic`` mode). + + Note: never short-circuit on ``cached == fresh``. Python treats ``True == 1`` + and ``False == 0`` as equal, which would mask real bool/int diffs. + """ + unique_item_paths = _collect_unique_item_paths(schema) if schema else set() + mode = _MODE_SEMANTIC if unique_item_paths else _MODE_STRICT + + diffs = _diff_paths(cached, fresh, unique_item_paths=unique_item_paths) + return ComparisonResult(match=not diffs, mode=mode, diff_summary=diffs) + + +async def run_shadow_comparison( + *, + cache_key: str, + workflow_run_id: str, + cached_value: Any, + cached_age_seconds: float, + llm_call: _LlmCall, + schema: Any | None, + logger: _LoggerLike | None = None, +) -> None: + """Run the shadow LLM call, compare, emit one log event. Never raises. + + Emits a single ``extract_information.shadow_comparison`` event with a + ``status`` field (``ok`` or ``error``). Only exception *class names* are + logged — messages can contain raw model output and would leak extracted + content into observability data. + """ + log = logger or LOG + started_at = time.monotonic() + try: + fresh = await llm_call() + except Exception as exc: # noqa: BLE001 — shadow is best-effort + log.warning( + _SHADOW_EVENT, + status="error", + error_stage="llm_call", + error_type=type(exc).__name__, + cache_key=cache_key, + workflow_run_id=workflow_run_id, + cached_age_seconds=cached_age_seconds, + shadow_duration_ms=_elapsed_ms(started_at), + ) + return + + try: + comparison = compare_results(cached_value, fresh, schema=schema) + except Exception as exc: # noqa: BLE001 — defensive; compare_results is pure + log.warning( + _SHADOW_EVENT, + status="error", + error_stage="compare", + error_type=type(exc).__name__, + cache_key=cache_key, + workflow_run_id=workflow_run_id, + cached_age_seconds=cached_age_seconds, + shadow_duration_ms=_elapsed_ms(started_at), + ) + return + + log.info( + _SHADOW_EVENT, + status="ok", + cache_key=cache_key, + workflow_run_id=workflow_run_id, + match=comparison.match, + mode=comparison.mode, + diff_summary=sorted(_LIST_INDEX_RE.sub("[*]", p) for p in comparison.diff_summary), + cached_age_seconds=cached_age_seconds, + shadow_duration_ms=_elapsed_ms(started_at), + ) + + +# Strong refs so asyncio doesn't GC in-flight shadow tasks (create_task only holds a weak ref). +_PENDING_SHADOW_TASKS: set[asyncio.Task[None]] = set() +# Cap chosen so shadow calls never sustain more than ~10% of a typical provider's +# burst quota at the expected 1% sample rate. Raise if sample rate climbs. +_MAX_PENDING_SHADOWS = 50 + + +def _prune_pending() -> None: + """Drop already-finished tasks before the cap check. + + ``add_done_callback`` fires on the task's own event loop, so tasks created on + a loop that later gets replaced (worker recycle, test-session boundaries) + would never ``discard`` themselves and the set would leak. + """ + _PENDING_SHADOW_TASKS.difference_update({t for t in _PENDING_SHADOW_TASKS if t.done()}) + + +def _track(task: asyncio.Task[None]) -> asyncio.Task[None]: + _PENDING_SHADOW_TASKS.add(task) + task.add_done_callback(_PENDING_SHADOW_TASKS.discard) + return task + + +def schedule_shadow_comparison( + *, + cache_key: str, + workflow_run_id: str, + cached_value: Any, + cached_age_seconds: float, + llm_call: _LlmCall, + schema: Any | None, + logger: _LoggerLike | None = None, +) -> asyncio.Task[None] | None: + """Fire-and-forget ``run_shadow_comparison``; returns the task so tests can await.""" + _prune_pending() + if len(_PENDING_SHADOW_TASKS) >= _MAX_PENDING_SHADOWS: + (logger or LOG).warning("shadow_task_cap_reached", pending=len(_PENDING_SHADOW_TASKS)) + return None + + return _track( + asyncio.create_task( + run_shadow_comparison( + cache_key=cache_key, + workflow_run_id=workflow_run_id, + cached_value=cached_value, + cached_age_seconds=cached_age_seconds, + llm_call=llm_call, + schema=schema, + logger=logger, + ) + ) + ) + + +def schedule_shadow_check( + *, + gate: Callable[[], Awaitable[bool]], + cache_key: str, + workflow_run_id: str, + cached_value: Any, + cached_age_seconds: float, + llm_call: _LlmCall, + schema: Any | None, + logger: _LoggerLike | None = None, +) -> asyncio.Task[None] | None: + """Fire-and-forget: await ``gate`` in the background, run the comparison if it returns True. + + Keeps the PostHog (or any other) feature-flag lookup off the cache-hit hot + path. The caller returns immediately; the background task does both the + gate evaluation and the shadow LLM call. + """ + _prune_pending() + if len(_PENDING_SHADOW_TASKS) >= _MAX_PENDING_SHADOWS: + (logger or LOG).warning("shadow_task_cap_reached", pending=len(_PENDING_SHADOW_TASKS)) + return None + + async def _runner() -> None: + log = logger or LOG + try: + enabled = await gate() + except Exception as exc: # noqa: BLE001 — gate errors must not propagate + log.warning( + _SHADOW_EVENT, + status="error", + error_stage="gate", + error_type=type(exc).__name__, + cache_key=cache_key, + workflow_run_id=workflow_run_id, + cached_age_seconds=cached_age_seconds, + ) + return + if not enabled: + # Info, not debug — service default level is INFO, so debug events are + # dropped in production. We need the skipped count as the denominator + # when verifying the PostHog sampling rate from logs. + log.info( + _SHADOW_EVENT, + status="skipped", + cache_key=cache_key, + workflow_run_id=workflow_run_id, + ) + return + await run_shadow_comparison( + cache_key=cache_key, + workflow_run_id=workflow_run_id, + cached_value=cached_value, + cached_age_seconds=cached_age_seconds, + llm_call=llm_call, + schema=schema, + logger=logger, + ) + + return _track(asyncio.create_task(_runner())) diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py index cddaee0cc..bec0d8419 100644 --- a/skyvern/webeye/actions/handler.py +++ b/skyvern/webeye/actions/handler.py @@ -67,7 +67,7 @@ from skyvern.forge.sdk.api.files import ( from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory, LLMCallerManager from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError from skyvern.forge.sdk.api.llm.schema_validator import validate_and_fill_extraction_result -from skyvern.forge.sdk.cache import extraction_cache +from skyvern.forge.sdk.cache import extraction_cache, extraction_shadow from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.skyvern_context import current as skyvern_current from skyvern.forge.sdk.core.skyvern_context import ensure_context @@ -4328,6 +4328,59 @@ async def extract_information_for_navigation_goal( fallback_reason=None, cache_path="agent", ) + # Fire-and-forget shadow sampling on sampled hits. Flag lookup happens + # inside the background task so the cache-hit return is not blocked + # by the flag provider (e.g. PostHog latency on the first hit per run). + if cache_key is not None and task.workflow_run_id is not None: + shadow_llm_api_handler = LLMAPIHandlerFactory.get_override_llm_api_handler( + llm_key_override, default=app.EXTRACTION_LLM_API_HANDLER + ) + shadow_schema = task.extracted_information_schema + # Snapshot screenshots at schedule time — scraped_page is mutable + # and may be refreshed before the background task runs. + shadow_screenshots = list(scraped_page.screenshots) + + async def _shadow_gate() -> bool: + # Captures `task` by reference — safe because the cloud override + # only reads immutable identifiers (workflow_run_id, organization_id, + # workflow_permanent_id, task_id) set at construction. + return await app.AGENT_FUNCTION.should_shadow_extraction_cache_hit(task) + + async def _shadow_llm_call() -> Any: + fresh = await shadow_llm_api_handler( + prompt=extract_information_prompt, + # step=None suppresses both update_step (token/cost accounting) + # and artifact persistence in LLMAPIHandlerFactory. Shadow calls + # are an observability side-channel — the user-visible request + # was served from cache, so they must not inflate step usage, + # billing, or artifact counts. + step=None, + screenshots=shadow_screenshots, + # Use the same prompt_name as the miss path so prompt-level + # LLM tuning (e.g. thinking-budget overrides) matches — otherwise + # cached (tuned) vs fresh (untuned) would diverge for config + # reasons unrelated to cache correctness. + prompt_name="extract-information", + force_dict=False, + ) + # Apply the same post-processing the miss path applies so the + # comparison is apples-to-apples against the cached value. + if shadow_schema: + fresh = validate_and_fill_extraction_result( + extraction_result=fresh, + schema=shadow_schema, + ) + return fresh + + extraction_shadow.schedule_shadow_check( + gate=_shadow_gate, + cache_key=cache_key, + workflow_run_id=task.workflow_run_id, + cached_value=lookup_result.value, + cached_age_seconds=lookup_result.age_seconds if lookup_result.age_seconds is not None else -1.0, + llm_call=_shadow_llm_call, + schema=shadow_schema, + ) return ScrapeResult(scraped_data=lookup_result.value) if lookup_result is not None: LOG.info( diff --git a/tests/unit/test_extraction_shadow.py b/tests/unit/test_extraction_shadow.py new file mode 100644 index 000000000..42daeb0b0 --- /dev/null +++ b/tests/unit/test_extraction_shadow.py @@ -0,0 +1,882 @@ +"""Unit tests for the extract-information shadow-mode correctness verification.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +import pytest + +from skyvern.forge.sdk.cache import extraction_shadow + +# --------------------------------------------------------------------------- +# compare_results — strict equality +# --------------------------------------------------------------------------- + + +def test_compare_strict_identical_dicts_match() -> None: + """Two dicts with identical fields should match under strict comparison.""" + cached = {"title": "Invoice #123", "total": 42.5} + fresh = {"title": "Invoice #123", "total": 42.5} + result = extraction_shadow.compare_results(cached, fresh, schema=None) + assert result.match is True + assert result.mode == "strict" + assert result.diff_summary == set() + + +def test_compare_strict_field_value_mismatch_reports_diff() -> None: + cached = {"title": "Invoice #123", "total": 42.5} + fresh = {"title": "Invoice #123", "total": 42.6} + result = extraction_shadow.compare_results(cached, fresh, schema=None) + assert result.match is False + assert result.mode == "strict" + # diff_summary should name the mismatching path. + assert "total" in result.diff_summary + # And must NOT leak the raw mismatching values — we care about which path + # differed, not the exact content (diff_summary is going to a log line). + assert "42.5" not in str(result.diff_summary) + assert "42.6" not in str(result.diff_summary) + + +def test_compare_strict_missing_field_reports_diff() -> None: + cached = {"title": "x", "total": 1} + fresh = {"title": "x"} + result = extraction_shadow.compare_results(cached, fresh, schema=None) + assert result.match is False + assert "total" in result.diff_summary + + +def test_compare_strict_extra_field_reports_diff() -> None: + cached = {"title": "x"} + fresh = {"title": "x", "extra": True} + result = extraction_shadow.compare_results(cached, fresh, schema=None) + assert result.match is False + assert "extra" in result.diff_summary + + +def test_compare_strict_nested_dict_mismatch_reports_path() -> None: + cached = {"meta": {"page": 1, "count": 10}} + fresh = {"meta": {"page": 1, "count": 11}} + result = extraction_shadow.compare_results(cached, fresh, schema=None) + assert result.match is False + # Path should surface the nested key so we can bucket regressions by field. + assert any("count" in path for path in result.diff_summary) + + +def test_compare_strict_list_order_matters_without_schema() -> None: + cached = {"docs": ["a.pdf", "b.pdf"]} + fresh = {"docs": ["b.pdf", "a.pdf"]} + # With no schema hint, lists are ordered — reordering is a mismatch. + result = extraction_shadow.compare_results(cached, fresh, schema=None) + assert result.match is False + + +def test_compare_strict_list_identical_order_match() -> None: + cached = {"docs": ["a.pdf", "b.pdf"]} + fresh = {"docs": ["a.pdf", "b.pdf"]} + result = extraction_shadow.compare_results(cached, fresh, schema=None) + assert result.match is True + + +def test_compare_string_result_match() -> None: + result = extraction_shadow.compare_results("hello world", "hello world", schema=None) + assert result.match is True + + +def test_compare_string_result_mismatch() -> None: + result = extraction_shadow.compare_results("hello world", "hello universe", schema=None) + assert result.match is False + assert "root" in result.diff_summary or "" in result.diff_summary + + +def test_compare_root_list_result_match() -> None: + """Some extraction schemas produce a list at the root — must still compare correctly.""" + cached = [{"id": 1}, {"id": 2}] + fresh = [{"id": 1}, {"id": 2}] + result = extraction_shadow.compare_results(cached, fresh, schema=None) + assert result.match is True + + +def test_compare_none_results_match() -> None: + result = extraction_shadow.compare_results(None, None, schema=None) + assert result.match is True + + +def test_compare_one_none_one_populated_mismatch() -> None: + result = extraction_shadow.compare_results(None, {"a": 1}, schema=None) + assert result.match is False + + +# --------------------------------------------------------------------------- +# compare_results — semantic list-as-set when schema declares uniqueItems +# --------------------------------------------------------------------------- + + +def test_compare_semantic_unique_items_list_order_insensitive() -> None: + """When schema marks a list as uniqueItems, reordering is a match, not a diff. + + This matches the RFC: extract-information may return list elements in a + different order on a fresh run even though the set of items is identical. + """ + schema = { + "type": "object", + "properties": { + "docs": {"type": "array", "uniqueItems": True, "items": {"type": "string"}}, + }, + } + cached = {"docs": ["a.pdf", "b.pdf"]} + fresh = {"docs": ["b.pdf", "a.pdf"]} + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is True + assert result.mode == "semantic" + + +def test_compare_semantic_unique_items_list_content_mismatch_is_diff() -> None: + """Different contents — not just order — are still a mismatch.""" + schema = { + "type": "object", + "properties": { + "docs": {"type": "array", "uniqueItems": True, "items": {"type": "string"}}, + }, + } + cached = {"docs": ["a.pdf", "b.pdf"]} + fresh = {"docs": ["a.pdf", "c.pdf"]} + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is False + assert "docs" in result.diff_summary + + +def test_compare_semantic_non_unique_list_still_order_sensitive() -> None: + """Lists without uniqueItems must stay order-sensitive — we can't assume set semantics.""" + schema = { + "type": "object", + "properties": { + "entries": {"type": "array", "items": {"type": "string"}}, # no uniqueItems + }, + } + cached = {"entries": ["a", "b"]} + fresh = {"entries": ["b", "a"]} + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is False + + +def test_compare_semantic_unique_items_list_of_dicts() -> None: + """Unique-item lists of dicts must compare as sets (hashable via sorted-json).""" + schema = { + "type": "object", + "properties": { + "items": { + "type": "array", + "uniqueItems": True, + "items": {"type": "object"}, + }, + }, + } + cached = {"items": [{"id": 1, "name": "a"}, {"id": 2, "name": "b"}]} + fresh = {"items": [{"id": 2, "name": "b"}, {"id": 1, "name": "a"}]} + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is True + + +def test_compare_semantic_root_array_with_unique_items_order_insensitive() -> None: + """Schema whose *root* is a uniqueItems array must also get set semantics. + + Regression guard for a bug where _collect_unique_item_paths only recorded + uniqueItems paths when they had a non-empty dotted prefix, so root arrays + were still compared order-sensitively — inflating the shadow FP metric. + """ + schema = {"type": "array", "uniqueItems": True, "items": {"type": "string"}} + cached = ["a.pdf", "b.pdf"] + fresh = ["b.pdf", "a.pdf"] + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is True + assert result.mode == "semantic" + + +def test_compare_semantic_root_array_with_unique_items_content_mismatch() -> None: + schema = {"type": "array", "uniqueItems": True, "items": {"type": "string"}} + cached = ["a.pdf", "b.pdf"] + fresh = ["a.pdf", "c.pdf"] + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is False + + +# --------------------------------------------------------------------------- +# compare_results — combinator schemas (allOf/anyOf/oneOf) +# --------------------------------------------------------------------------- + + +def test_compare_semantic_unique_items_inside_all_of_wrapper() -> None: + """Pydantic wraps array fields in allOf when Field(description=...) is used. + + Without combinator traversal, uniqueItems on these fields would be missed + and reorder-only diffs would inflate the shadow FP metric for most + real-world extraction schemas. + """ + schema = { + "type": "object", + "properties": { + "ids": { + "allOf": [{"type": "array", "uniqueItems": True, "items": {"type": "integer"}}], + "description": "unique identifiers", + }, + }, + } + cached = {"ids": [1, 2, 3]} + fresh = {"ids": [3, 2, 1]} + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is True + assert result.mode == "semantic" + + +def test_compare_semantic_unique_items_inside_any_of_nullable() -> None: + """anyOf is how JSON Schema expresses ``Optional[list[...]]`` — must still honor uniqueItems.""" + schema = { + "type": "object", + "properties": { + "tags": { + "anyOf": [ + {"type": "array", "uniqueItems": True, "items": {"type": "string"}}, + {"type": "null"}, + ], + }, + }, + } + cached = {"tags": ["a", "b"]} + fresh = {"tags": ["b", "a"]} + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is True + + +def test_compare_semantic_unique_items_inside_one_of() -> None: + schema = { + "type": "object", + "properties": { + "vals": { + "oneOf": [ + {"type": "array", "uniqueItems": True, "items": {"type": "integer"}}, + {"type": "string"}, + ], + }, + }, + } + result = extraction_shadow.compare_results( + {"vals": [1, 2]}, + {"vals": [2, 1]}, + schema=schema, + ) + assert result.match is True + + +# --------------------------------------------------------------------------- +# compare_results — $ref resolution for Pydantic-generated schemas +# --------------------------------------------------------------------------- + + +def test_compare_semantic_unique_items_behind_ref() -> None: + """Pydantic puts nested models under $defs and references them via $ref. + + Without $ref resolution, uniqueItems inside those definitions would be + missed and reorder-only diffs would inflate the shadow FP metric. + """ + schema = { + "$defs": { + "Item": { + "type": "object", + "properties": { + "tags": {"type": "array", "uniqueItems": True, "items": {"type": "string"}}, + }, + }, + }, + "type": "object", + "properties": { + "item": {"$ref": "#/$defs/Item"}, + }, + } + cached = {"item": {"tags": ["a", "b"]}} + fresh = {"item": {"tags": ["b", "a"]}} + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is True + + +def test_compare_semantic_unique_items_ref_circular_safe() -> None: + """Circular $ref must not cause infinite recursion in the collector.""" + schema = { + "$defs": { + "Node": { + "type": "object", + "properties": { + "children": { + "type": "array", + "uniqueItems": True, + "items": {"$ref": "#/$defs/Node"}, + }, + }, + }, + }, + "$ref": "#/$defs/Node", + } + # Same-shape trees, inner children reordered — should match. + cached = {"children": [{"children": []}, {"children": []}]} + fresh = {"children": [{"children": []}, {"children": []}]} + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is True + + +def test_compare_semantic_unique_items_external_ref_ignored() -> None: + """External $ref (non-#/) must be silently skipped, not crash.""" + schema = { + "type": "object", + "properties": { + "ids": {"$ref": "https://example.com/schema.json#/Foo"}, + }, + } + # External ref can't be resolved, so these compare strictly. + # The important thing is the collector doesn't raise. + result = extraction_shadow.compare_results({"ids": [1]}, {"ids": [1]}, schema=schema) + assert result.match is True + + +# --------------------------------------------------------------------------- +# compare_results — bool vs int must be a mismatch (Python treats True == 1) +# --------------------------------------------------------------------------- + + +def test_compare_bool_vs_int_at_field_is_mismatch() -> None: + """True vs 1 must be a diff — Python treats them as equal, the cache metric must not.""" + result = extraction_shadow.compare_results({"flag": True}, {"flag": 1}, schema=None) + assert result.match is False + assert "flag" in result.diff_summary + + +def test_compare_bool_vs_int_at_root_is_mismatch() -> None: + result = extraction_shadow.compare_results(True, 1, schema=None) + assert result.match is False + + +def test_compare_false_vs_zero_is_mismatch() -> None: + result = extraction_shadow.compare_results({"f": False}, {"f": 0}, schema=None) + assert result.match is False + + +def test_compare_int_vs_float_still_allowed_when_equal() -> None: + """Int vs float with the same value should still match — that's a JSON-ism, not a real diff.""" + result = extraction_shadow.compare_results({"n": 1}, {"n": 1.0}, schema=None) + assert result.match is True + + +# --------------------------------------------------------------------------- +# compare_results — uniqueItems set comparison must preserve multiplicity +# --------------------------------------------------------------------------- + + +def test_compare_semantic_unique_items_preserves_multiplicity() -> None: + """uniqueItems set comparison must not collapse duplicates. + + If cached is ['a', 'a'] and fresh is ['a'], the payloads differ even + though the underlying set is identical — treat it as a mismatch so the + FP metric doesn't undercount real divergences. + """ + schema = { + "type": "object", + "properties": { + "docs": {"type": "array", "uniqueItems": True, "items": {"type": "string"}}, + }, + } + result = extraction_shadow.compare_results( + {"docs": ["a.pdf", "a.pdf"]}, + {"docs": ["a.pdf"]}, + schema=schema, + ) + assert result.match is False + assert "docs" in result.diff_summary + + +def test_compare_semantic_unique_items_multiplicity_match() -> None: + """Same multiset with different order should still match.""" + schema = { + "type": "object", + "properties": { + "docs": {"type": "array", "uniqueItems": True, "items": {"type": "string"}}, + }, + } + result = extraction_shadow.compare_results( + {"docs": ["a.pdf", "b.pdf", "a.pdf"]}, + {"docs": ["a.pdf", "a.pdf", "b.pdf"]}, + schema=schema, + ) + assert result.match is True + + +# --------------------------------------------------------------------------- +# compare_results — uniqueItems inside array items (nested arrays) +# --------------------------------------------------------------------------- + + +def test_compare_semantic_unique_items_nested_inside_array_items() -> None: + """Schema: {groups: array}. Inner reordering must match.""" + schema = { + "type": "object", + "properties": { + "groups": { + "type": "array", + "items": {"type": "array", "uniqueItems": True, "items": {"type": "string"}}, + }, + }, + } + cached = {"groups": [["a", "b"], ["c", "d"]]} + fresh = {"groups": [["b", "a"], ["d", "c"]]} + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is True + assert result.mode == "semantic" + + +def test_compare_semantic_outer_array_without_unique_stays_order_sensitive_when_inner_is_unique() -> None: + """Outer array (no uniqueItems) must NOT inherit set semantics from inner unique arrays.""" + schema = { + "type": "object", + "properties": { + "groups": { + "type": "array", + "items": {"type": "array", "uniqueItems": True, "items": {"type": "string"}}, + }, + }, + } + # Outer order changed — must be a diff even though inner elements are the same sets. + cached = {"groups": [["a"], ["b"]]} + fresh = {"groups": [["b"], ["a"]]} + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is False + + +def test_compare_semantic_unique_items_preserves_large_int_precision() -> None: + """Large ints above 2^53 must not collapse to the same float in canonical form.""" + schema = { + "type": "object", + "properties": { + "ids": {"type": "array", "uniqueItems": True, "items": {"type": "integer"}}, + }, + } + # 2^53 + 1 cannot be represented exactly as a float64; naive float(int) + # conversion would collapse these two distinct ids to the same value. + result = extraction_shadow.compare_results( + {"ids": [9007199254740992]}, + {"ids": [9007199254740993]}, + schema=schema, + ) + assert result.match is False + assert "ids" in result.diff_summary + + +def test_compare_semantic_unique_items_number_array_int_vs_float_match() -> None: + """uniqueItems number array must treat 1 and 1.0 as equal, matching _diff_paths.""" + schema = { + "type": "object", + "properties": { + "vals": {"type": "array", "uniqueItems": True, "items": {"type": "number"}}, + }, + } + result = extraction_shadow.compare_results( + {"vals": [1, 2]}, + {"vals": [1.0, 2.0]}, + schema=schema, + ) + assert result.match is True + + +def test_compare_semantic_unique_items_close_but_unequal_number_array() -> None: + """Arrays differing in one numeric value must register as a mismatch under set-equality.""" + schema = { + "type": "object", + "properties": { + "vals": {"type": "array", "uniqueItems": True, "items": {"type": "number"}}, + }, + } + result = extraction_shadow.compare_results( + {"vals": [1, 2, 3]}, + {"vals": [1, 2, 4]}, + schema=schema, + ) + assert result.match is False + assert "vals" in result.diff_summary + + +def test_compare_semantic_unique_items_bool_still_distinct_from_int() -> None: + """Even inside a uniqueItems array, True must not equal 1.""" + schema = { + "type": "object", + "properties": { + "flags": {"type": "array", "uniqueItems": True, "items": {}}, + }, + } + result = extraction_shadow.compare_results( + {"flags": [True]}, + {"flags": [1]}, + schema=schema, + ) + assert result.match is False + + +def test_compare_semantic_unique_items_with_nested_unique_objects_reorder_matches() -> None: + """uniqueItems array of objects containing nested uniqueItems lists. + + Cached and fresh have identical elements modulo (a) outer reorder and + (b) inner uniqueItems list reorder. Must match — the recursive semantic + rules have to apply inside the set-equality comparison, not just at the + top level. + """ + schema = { + "type": "object", + "properties": { + "items": { + "type": "array", + "uniqueItems": True, + "items": { + "type": "object", + "properties": { + "tags": {"type": "array", "uniqueItems": True, "items": {"type": "string"}}, + }, + }, + }, + }, + } + cached = {"items": [{"tags": ["a", "b"]}, {"tags": ["c", "d"]}]} + fresh = {"items": [{"tags": ["d", "c"]}, {"tags": ["b", "a"]}]} + result = extraction_shadow.compare_results(cached, fresh, schema=schema) + assert result.match is True + + +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- + + +class _DummyLogCapture: + """Structlog capture helper — records each call as (event, kwargs).""" + + def __init__(self) -> None: + self.calls: list[tuple[str, dict[str, Any]]] = [] + + def debug(self, event: str, **kwargs: Any) -> None: + self.calls.append((event, kwargs)) + + def info(self, event: str, **kwargs: Any) -> None: + self.calls.append((event, kwargs)) + + def warning(self, event: str, **kwargs: Any) -> None: + self.calls.append((event, kwargs)) + + +# --------------------------------------------------------------------------- +# run_shadow_comparison — exception sanitization +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_run_shadow_comparison_error_log_does_not_leak_exception_message() -> None: + """Exception messages can contain raw LLM response payloads — log only the type.""" + captured = _DummyLogCapture() + + async def llm_call() -> Any: + raise ValueError("SSN: 123-45-6789 leaked from model response") + + await extraction_shadow.run_shadow_comparison( + cache_key="k1", + workflow_run_id="wfr_1", + cached_value={"a": 1}, + cached_age_seconds=0.0, + llm_call=llm_call, + schema=None, + logger=captured, + ) + + assert len(captured.calls) == 1 + _event, fields = captured.calls[0] + flat = " ".join(str(v) for v in fields.values()) + assert "123-45-6789" not in flat + assert "SSN" not in flat + # Class name is fine to log. + assert "ValueError" in flat + + +# --------------------------------------------------------------------------- +# run_shadow_comparison — background runner +# --------------------------------------------------------------------------- + + +async def _fresh_ok(_result: Any) -> Any: + return _result + + +@pytest.mark.asyncio +async def test_run_shadow_comparison_logs_match_event() -> None: + cached = {"docs": ["a.pdf"]} + captured = _DummyLogCapture() + + async def llm_call() -> Any: + return {"docs": ["a.pdf"]} + + await extraction_shadow.run_shadow_comparison( + cache_key="k1", + workflow_run_id="wfr_1", + cached_value=cached, + cached_age_seconds=12.3, + llm_call=llm_call, + schema=None, + logger=captured, + ) + + assert len(captured.calls) == 1 + event, fields = captured.calls[0] + assert event == "extract_information.shadow_comparison" + assert fields["status"] == "ok" + assert fields["cache_key"] == "k1" + assert fields["workflow_run_id"] == "wfr_1" + assert fields["match"] is True + assert fields["cached_age_seconds"] == 12.3 + assert "shadow_duration_ms" in fields + assert fields["shadow_duration_ms"] >= 0 + assert fields["mode"] == "strict" + + +@pytest.mark.asyncio +async def test_run_shadow_comparison_logs_mismatch_with_diff() -> None: + captured = _DummyLogCapture() + + async def llm_call() -> Any: + return {"docs": ["a.pdf", "b.pdf"]} + + await extraction_shadow.run_shadow_comparison( + cache_key="k1", + workflow_run_id="wfr_1", + cached_value={"docs": ["a.pdf"]}, + cached_age_seconds=0.0, + llm_call=llm_call, + schema=None, + logger=captured, + ) + + assert len(captured.calls) == 1 + event, fields = captured.calls[0] + assert event == "extract_information.shadow_comparison" + assert fields["match"] is False + assert fields["diff_summary"] # non-empty + assert "docs" in fields["diff_summary"] + + +@pytest.mark.asyncio +async def test_run_shadow_comparison_swallows_llm_errors() -> None: + """A failing LLM call must not propagate — shadow is best-effort and fire-and-forget.""" + captured = _DummyLogCapture() + + async def llm_call() -> Any: + raise RuntimeError("LLM down") + + # Must not raise. + await extraction_shadow.run_shadow_comparison( + cache_key="k1", + workflow_run_id="wfr_1", + cached_value={"a": 1}, + cached_age_seconds=0.0, + llm_call=llm_call, + schema=None, + logger=captured, + ) + + assert len(captured.calls) == 1 + event, fields = captured.calls[0] + # Single consolidated event — filter on status=error to exclude from the FP metric. + assert event == "extract_information.shadow_comparison" + assert fields["status"] == "error" + assert fields["cache_key"] == "k1" + assert fields["error_type"] == "RuntimeError" + assert fields["error_stage"] == "llm_call" + + +@pytest.mark.asyncio +async def test_run_shadow_comparison_uses_structlog_by_default(caplog: pytest.LogCaptureFixture) -> None: + """If no logger is injected, the module's default structlog logger is used.""" + + async def llm_call() -> Any: + return {"a": 1} + + with caplog.at_level(logging.INFO): + await extraction_shadow.run_shadow_comparison( + cache_key="k1", + workflow_run_id="wfr_1", + cached_value={"a": 1}, + cached_age_seconds=0.0, + llm_call=llm_call, + schema=None, + ) + + # Default path should succeed without raising even when no logger override is provided. + # We don't assert on caplog content (structlog routing varies by test env), only that + # no exception escaped. + + +@pytest.mark.asyncio +async def test_schedule_shadow_check_runs_gate_in_background() -> None: + """schedule_shadow_check must not await the gate on the caller's stack. + + Regression guard for the P1 where handler.py used to `await` the PostHog + flag lookup directly, blocking cache-hit returns on the flag provider. + """ + gate_release = asyncio.Event() + captured = _DummyLogCapture() + + async def slow_gate() -> bool: + await gate_release.wait() + return True + + async def llm_call() -> Any: + return {"a": 1} + + task = extraction_shadow.schedule_shadow_check( + gate=slow_gate, + cache_key="k1", + workflow_run_id="wfr_1", + cached_value={"a": 1}, + cached_age_seconds=0.0, + llm_call=llm_call, + schema=None, + logger=captured, + ) + + # Caller returns immediately — gate has not run yet. + assert len(captured.calls) == 0 + + gate_release.set() + await task + assert len(captured.calls) == 1 + assert captured.calls[0][1]["status"] == "ok" + + +@pytest.mark.asyncio +async def test_schedule_shadow_check_skips_when_gate_returns_false() -> None: + captured = _DummyLogCapture() + + async def gate() -> bool: + return False + + async def llm_call() -> Any: # pragma: no cover — must not be called + raise AssertionError("LLM should not be called when gate is False") + + task = extraction_shadow.schedule_shadow_check( + gate=gate, + cache_key="k1", + workflow_run_id="wfr_1", + cached_value={"a": 1}, + cached_age_seconds=0.0, + llm_call=llm_call, + schema=None, + logger=captured, + ) + await task + # One debug log confirming the gate evaluated to False — useful for + # verifying the sampling rate in production without running the shadow LLM. + assert len(captured.calls) == 1 + event, fields = captured.calls[0] + assert event == "extract_information.shadow_comparison" + assert fields["status"] == "skipped" + + +@pytest.mark.asyncio +async def test_schedule_shadow_check_swallows_gate_errors() -> None: + captured = _DummyLogCapture() + + async def gate() -> bool: + raise RuntimeError("posthog unavailable") + + async def llm_call() -> Any: # pragma: no cover — must not be called + raise AssertionError("LLM should not be called when gate raises") + + task = extraction_shadow.schedule_shadow_check( + gate=gate, + cache_key="k1", + workflow_run_id="wfr_1", + cached_value={"a": 1}, + cached_age_seconds=0.0, + llm_call=llm_call, + schema=None, + logger=captured, + ) + await task # must not raise + assert len(captured.calls) == 1 + event, fields = captured.calls[0] + assert event == "extract_information.shadow_comparison" + assert fields["status"] == "error" + assert fields["error_stage"] == "gate" + assert fields["error_type"] == "RuntimeError" + + +@pytest.mark.asyncio +async def test_schedule_returns_none_and_warns_when_cap_reached(monkeypatch: pytest.MonkeyPatch) -> None: + """Safety valve: when _PENDING_SHADOW_TASKS is full, schedule must skip and warn. + + Protects the hot path from LLM-provider rate-limit contention when shadow + tasks pile up (slow provider, sustained cache-hit burst). + """ + + # Fill the pending set with already-done tasks that won't be pruned by _prune_pending() + # — simulates an in-flight backlog rather than leaked done tasks. + class _PendingMarker: + def done(self) -> bool: + return False + + fake_pending: set[Any] = {_PendingMarker() for _ in range(extraction_shadow._MAX_PENDING_SHADOWS)} + monkeypatch.setattr(extraction_shadow, "_PENDING_SHADOW_TASKS", fake_pending) + + captured = _DummyLogCapture() + + async def llm_call() -> Any: # pragma: no cover — must not run when capped + raise AssertionError("shadow LLM must not run when the cap is hit") + + task = extraction_shadow.schedule_shadow_comparison( + cache_key="k1", + workflow_run_id="wfr_1", + cached_value={"a": 1}, + cached_age_seconds=0.0, + llm_call=llm_call, + schema=None, + logger=captured, + ) + assert task is None + assert len(captured.calls) == 1 + event, fields = captured.calls[0] + assert event == "shadow_task_cap_reached" + assert fields["pending"] == extraction_shadow._MAX_PENDING_SHADOWS + + +@pytest.mark.asyncio +async def test_schedule_shadow_comparison_does_not_block_caller() -> None: + """schedule_shadow_comparison must return immediately; background task executes after.""" + release = asyncio.Event() + observed: list[bool] = [] + + async def slow_llm_call() -> Any: + # Block until the test tells us to proceed, proving the caller isn't awaiting us. + await release.wait() + return {"a": 1} + + captured = _DummyLogCapture() + + task = extraction_shadow.schedule_shadow_comparison( + cache_key="k1", + workflow_run_id="wfr_1", + cached_value={"a": 1}, + cached_age_seconds=0.0, + llm_call=slow_llm_call, + schema=None, + logger=captured, + ) + + # Caller returns immediately — no logs yet. + observed.append(task is not None) + assert len(captured.calls) == 0 + + release.set() + await task + assert len(captured.calls) == 1 + assert captured.calls[0][1]["match"] is True