diff --git a/skyvern/core/script_generations/real_skyvern_page_ai.py b/skyvern/core/script_generations/real_skyvern_page_ai.py index 9de35bcdb..222f3b4d4 100644 --- a/skyvern/core/script_generations/real_skyvern_page_ai.py +++ b/skyvern/core/script_generations/real_skyvern_page_ai.py @@ -981,7 +981,6 @@ class RealSkyvernPageAi(SkyvernPageAi): extracted_information_schema=post_ceiling_kwargs["extracted_information_schema"], error_code_mapping=error_code_mapping_str, llm_key=None, - local_datetime=local_datetime_str, ) lookup_result = extraction_cache.lookup(workflow_run_id, cache_key) except Exception: diff --git a/skyvern/forge/agent.py b/skyvern/forge/agent.py index 52b684040..0337be12c 100644 --- a/skyvern/forge/agent.py +++ b/skyvern/forge/agent.py @@ -5193,7 +5193,6 @@ class ForgeAgent: call_path="agent", data_extraction_goal=task.data_extraction_goal, extracted_information_schema=post_ceiling_kwargs["data_extraction_schema"], - local_datetime=local_datetime_str, llm_key=None, ) lookup_result = extraction_cache.lookup(workflow_run_id, cache_key) diff --git a/skyvern/forge/agent_functions.py b/skyvern/forge/agent_functions.py index 9018f3cfb..5be783810 100644 --- a/skyvern/forge/agent_functions.py +++ b/skyvern/forge/agent_functions.py @@ -530,6 +530,37 @@ class AgentFunction: """Cloud-overridable sample gate for extract-information shadow mode. OSS no-op.""" return False + async def lookup_cross_run_extraction_cache( + self, + workflow_permanent_id: str | None, + cache_key: str, + ) -> Any | None: + """Cross-run (wpid-scoped) extraction-cache read. OSS no-op. + + Cloud overrides this to consult the Redis tier (SKY-8873). Returns the + cached extraction value on a hit or None on a miss / error / disabled + flag. Implementations MUST swallow backend errors and return None so + the extract path always falls through to a fresh LLM call rather than + failing loud. + """ + return None + + async def store_cross_run_extraction_cache( + self, + workflow_permanent_id: str | None, + cache_key: str, + value: Any, + ) -> None: + """Cross-run (wpid-scoped) extraction-cache write. OSS no-op. + + Cloud overrides this to write to the Redis tier (SKY-8873) with a + long TTL. Called after a fresh LLM extraction so subsequent runs of + the same workflow against the same page content skip the LLM call. + Implementations MUST swallow backend errors — write-path failures + must never fail the user-visible request. + """ + return None + def build_workflow_schedule_id(self, workflow_schedule_id: str) -> str | None: """Return the backend-specific schedule id used by the execution engine. diff --git a/skyvern/forge/sdk/cache/extraction_cache.py b/skyvern/forge/sdk/cache/extraction_cache.py index 379e26798..5333e5342 100644 --- a/skyvern/forge/sdk/cache/extraction_cache.py +++ b/skyvern/forge/sdk/cache/extraction_cache.py @@ -8,22 +8,27 @@ output schema are identical across iterations, but we still pay the full LLM cost each time. This cache keys on the content that actually affects the extraction output and skips the LLM call on a hit. +This module is the v2 in-process tier. A cross-run Redis tier (SKY-8873) sits +behind the `lookup_cross_run_extraction_cache` / `store_cross_run_extraction_cache` +hooks on `AgentFunction` — this module stays OSS-safe and scope-neutral. + Scope and lifetime: -- Scoped by `workflow_run_id`. Cache entries for a different run are isolated, - and a run's entries can be cleared via `clear_workflow_run` (e.g. at run end). -- Purely in-memory, per-process. A workflow run that spans multiple workers - will still pay the LLM cost the first time each worker sees the page. +- In-process tier scoped by `workflow_run_id`. Cache entries for a different + run are isolated, and a run's entries can be cleared via `clear_workflow_run` + (e.g. at run end). +- Purely in-memory, per-process. Cross-run / cross-worker persistence is + handled by the cloud-side Redis tier behind the AgentFunction hooks. - Two-tier eviction: - **Outer** (workflow runs): LRU with a cap of `_MAX_WORKFLOW_RUNS`. Reads and writes refresh the run's position via `move_to_end`. - **Inner** (entries per run): FIFO with a cap of `_MAX_ENTRIES_PER_RUN`. Oldest entry is popped when the limit is exceeded. -Key derivation: -- Hashes the inputs that determine the LLM's output: - - element tree (HTML) - - extracted page text - - current URL +Key derivation (shared with the cross-run tier): +- Hashes only the inputs that determine the LLM's output: + - element tree (HTML, canonicalized to collapse transient IDs) + - extracted page text (ISO-timestamp lines collapsed to date prefix) + - current URL (query params sorted; nonce values redacted) - data extraction goal - extracted information schema (JSON-normalized) - navigation payload (JSON-normalized) @@ -33,9 +38,13 @@ Key derivation: first step of each task, so cache hits still land across iterations of a "list -> click row -> list again" loop. Including it keeps correctness if an intra-task second-step extraction happens. - - llm_key — the caller's model override. Cheap to include today (one key - per block type per run) and prevents stale hits if we later move this - cache off-process and a user changes models to retune quality. + - llm_key — the caller's model override. Prevents stale hits when a user + changes models to retune quality. +- Date is intentionally NOT in the key. Two calls on byte-identical page + content are semantically the same extraction regardless of wall-clock + date; relying on the content hash keeps hit rate up for scheduled + workflows that run many days apart on stable pages. Staleness is bounded + by the Redis TTL and the shadow-mode FP gate (SKY-8871). - Two calls with identical values hash to the same key. Any meaningful change (new page content, different schema, etc.) produces a fresh key and a miss. """ @@ -61,9 +70,11 @@ _MAX_WORKFLOW_RUNS = 256 # Sentinel hashed in place of None so that None and "" produce different keys. _NULL_SENTINEL = "\x00__NULL__" -# Cache scope identifiers. v1 ships with "run" only; "wpid" and "global" -# are reserved for the Redis cross-run cache (SKY-8873/SKY-8874). +# Cache scope identifiers. "run" is the in-process per-workflow-run tier; +# "wpid" is the Redis cross-run tier keyed on workflow_permanent_id +# (SKY-8873). "global" is reserved for a future cross-WPID tier. SCOPE_RUN = "run" +SCOPE_WPID = "wpid" # Fallback reasons emitted on cache misses. Used by log-based metrics to # distinguish first-call-in-run (unavoidable) from key-not-found (possible @@ -342,7 +353,6 @@ def compute_cache_key( error_code_mapping: Any = None, previous_extracted_information: Any = None, llm_key: str | None = None, - local_datetime: str | None = None, ) -> str: """Return a stable sha256 hex digest for the inputs that affect extraction output. @@ -352,14 +362,19 @@ def compute_cache_key( ``{{ var }}`` substitutions and no nav context, all other parts can match ``extract_information_for_navigation_goal``'s inputs and produce the same SHA otherwise. + + Date is intentionally omitted: two calls on the same page content are + semantically the same extraction regardless of wall-clock date. The + in-prompt ``{{ local_datetime }}`` interpolation is not part of the + key — if the scraped content is identical, the cached result is valid. + Staleness is bounded by the Redis TTL (cross-run tier) and the shadow- + mode FP gate (SKY-8871) that compares cached vs fresh on a sampled rate. """ def _s(v: str | None) -> str: """Map None to a sentinel so None and '' hash differently.""" return _NULL_SENTINEL if v is None else v - date_only = local_datetime[:10] if local_datetime and len(local_datetime) >= 10 else _s(local_datetime) - canonical_url = _canonical_url(current_url) canonical_element_tree = _canonical_element_tree(element_tree) canonical_extracted_text = _normalize_datetime_lines(extracted_text) if extracted_text is not None else None @@ -376,7 +391,6 @@ def compute_cache_key( _normalize(error_code_mapping), _normalize(previous_extracted_information), _s(llm_key), - date_only, ] joined = "\x1f".join(parts).encode("utf-8", errors="replace") return hashlib.sha256(joined).hexdigest() @@ -494,6 +508,24 @@ def clear_workflow_run(workflow_run_id: str | None) -> None: _CACHE.pop(workflow_run_id, None) +def invalidate_key(workflow_run_id: str | None, cache_key: str) -> bool: + """Drop a single cached entry within a workflow run. + + Used by the retry self-heal path (SKY-8873): when a step retries we + assume the previous attempt's cached value is suspect, so we evict it + and let the subsequent ``store`` overwrite with the fresh LLM result. + + Returns True if an entry was removed, False otherwise. Safe to call on + unknown IDs / keys. + """ + if not workflow_run_id: + return False + run_cache = _CACHE.get(workflow_run_id) + if run_cache is None: + return False + return run_cache.pop(cache_key, None) is not None + + def _reset_for_tests() -> None: """Test-only: wipe the global cache and counters between unit tests.""" global _hits, _misses # noqa: PLW0603 diff --git a/skyvern/webeye/actions/handler.py b/skyvern/webeye/actions/handler.py index d2454c4c6..590ccee06 100644 --- a/skyvern/webeye/actions/handler.py +++ b/skyvern/webeye/actions/handler.py @@ -4274,8 +4274,9 @@ async def extract_information_for_navigation_goal( # CUA tasks should use the default data extraction llm key llm_key_override = None - # Compute local_datetime once so both the cache key and the prompt use the - # same value (avoids stale hits when date-relative extraction goals cross midnight). + # Rendered into the prompt as ``{{ local_datetime }}``. Intentionally not + # part of the cache key — content-hash alone defines cache identity, so + # two calls on byte-identical pages hit the cache regardless of wall clock. local_datetime_str = datetime.now(context.tz_info).isoformat() extracted_text_for_prompt = scraped_page_refreshed.extracted_text if task.include_extracted_text else None @@ -4309,6 +4310,18 @@ async def extract_information_for_navigation_goal( local_datetime=local_datetime_str, ) + # Self-heal guard: on the second retry onward (``retry_index > 1``) the + # previous attempts' cached result is suspect — the first retry already + # failed to complete, so continuing to hand the same cached value back + # is not going to recover. Bypass both cache tiers on retry #2+ and + # force a fresh LLM call; the dual-write after extraction overwrites + # both the in-run entry and the cross-run Redis entry. + # Retry #1 still uses the cache: transient failures (network blip, + # downstream flake) often recover without the extraction itself being + # the cause, and paying the LLM cost on every first retry would burn + # hit rate for no self-heal benefit. + is_retry_step = step.retry_index > 1 + # Best-effort cache lookup — any failure falls through to LLM. The `try` # is narrowed to just compute_cache_key + lookup so a downstream log # failure can't re-enter the except block and double-count the call as @@ -4341,9 +4354,29 @@ async def extract_information_for_navigation_goal( error_code_mapping=error_code_mapping_str, previous_extracted_information=post_ceiling_kwargs["previous_extracted_information"], llm_key=llm_key_override, - local_datetime=local_datetime_str, ) - lookup_result = extraction_cache.lookup(task.workflow_run_id, cache_key) + if is_retry_step: + # Proactively evict the in-run entry. The cross-run tier will be + # overwritten by the store() after the LLM call below. + evicted = extraction_cache.invalidate_key(task.workflow_run_id, cache_key) + LOG.info( + "extract_information cache bypassed on retry (self-heal)", + task_id=task.task_id, + workflow_run_id=task.workflow_run_id, + step_id=step.step_id, + retry_index=step.retry_index, + cache_key=cache_key, + cache_hit=False, + # Covers both tiers — the in-run entry is evicted here and the + # cross-run entry will be overwritten by the store() below. + cache_scope=extraction_cache.SCOPE_RUN, + cache_age_seconds=None, + fallback_reason="retry_bypass", + in_run_entry_evicted=evicted, + cache_path="handler", + ) + else: + lookup_result = extraction_cache.lookup(task.workflow_run_id, cache_key) except Exception: LOG.warning( "extract_information cache lookup failed; falling through to LLM", @@ -4450,6 +4483,105 @@ async def extract_information_for_navigation_goal( cache_path="handler", ) + # Cross-run (wpid-scoped) cache lookup (SKY-8873). Consulted after an + # in-run miss so the tight in-process dict stays the hot path. Returns + # None in OSS; the cloud override hits Redis and is gated behind the + # EXTRACT_INFORMATION_CACHE_REDIS PostHog flag. All errors are swallowed + # by the backend so a Redis hiccup just falls through to the LLM call. + # Skipped on retry — the subsequent dual-write overwrites any stale + # Redis entry for this key with the fresh LLM result. + cross_run_value: Any | None = None + if cache_key is not None and not is_retry_step: + try: + cross_run_value = await app.AGENT_FUNCTION.lookup_cross_run_extraction_cache( + task.workflow_permanent_id, cache_key + ) + except Exception: + LOG.warning( + "extract_information cross-run cache lookup raised", + task_id=task.task_id, + workflow_run_id=task.workflow_run_id, + workflow_permanent_id=task.workflow_permanent_id, + organization_id=task.organization_id, + cache_key=cache_key, + exc_info=True, + ) + cross_run_value = None + + # Cross-run hit with a non-cacheable value type (e.g. a Redis payload + # that decoded to a bool or number). Mirror the in-run warning so the + # cross-run tier has the same diagnostic surface during rollout — + # without it, a corrupt-but-decodable entry would silently fall + # through to the LLM with no trail for post-hoc investigation. + if cache_key is not None and cross_run_value is not None and not isinstance(cross_run_value, (dict, list, str)): + LOG.warning( + "extract_information cross-run cache hit returned non-cacheable value type; falling through to LLM", + task_id=task.task_id, + workflow_run_id=task.workflow_run_id, + workflow_permanent_id=task.workflow_permanent_id, + organization_id=task.organization_id, + cache_key=cache_key, + value_type=type(cross_run_value).__name__, + cache_path="handler", + ) + cross_run_value = None + + if cache_key is not None and cross_run_value is not None and isinstance(cross_run_value, (dict, list, str)): + LOG.info( + "extract_information cache hit — skipping LLM call (cross-run)", + task_id=task.task_id, + workflow_run_id=task.workflow_run_id, + workflow_permanent_id=task.workflow_permanent_id, + cache_key=cache_key, + cache_hit=True, + cache_scope=extraction_cache.SCOPE_WPID, + # Age tracking in the cross-run tier is a follow-up; emit None so + # the field is always present but distinguishable from in-run hits. + cache_age_seconds=None, + fallback_reason=None, + cache_path="handler", + ) + # Backfill the in-run cache so subsequent identical lookups in this + # run short-circuit without crossing the Redis boundary. + try: + extraction_cache.store(task.workflow_run_id, cache_key, cross_run_value) + except Exception: + LOG.warning( + "extract_information cross-run cache backfill to in-run failed", + exc_info=True, + ) + # Shadow sampling on cross-run hits is a follow-up — plumbing needs + # a cached_age for the comparison event and the current Redis backend + # does not track it yet. + return ScrapeResult(scraped_data=cross_run_value) + + # Cross-run miss log — DEBUG so it doesn't flood INFO at 0% rollout + # (where the cloud override returns None for every call regardless of + # wpid or Redis state). When the flag ramps past the initial measurement + # checkpoint we can promote to INFO in the same commit that flips the + # percentage. Paired with the cross-run hit log (still INFO) this + # gives a computable hit rate at any rollout level. + # TODO(SKY-8992): promote this log from DEBUG → INFO in the same commit + # that flips the PostHog read flag past 0%, so the Datadog hit-rate + # dashboard has both sides of the ratio without a log-level backfill. + if cache_key is not None and not is_retry_step and cross_run_value is None: + LOG.debug( + "extract_information cache miss (cross-run)", + task_id=task.task_id, + workflow_run_id=task.workflow_run_id, + workflow_permanent_id=task.workflow_permanent_id, + cache_key=cache_key, + cache_hit=False, + cache_scope=extraction_cache.SCOPE_WPID, + cache_age_seconds=None, + # The wpid tier doesn't distinguish "flag disabled" from + # "key not found" at the handler — both surface as ``None`` — + # so label as ``cross_run_miss`` and let downstream metrics + # split by ``workflow_permanent_id`` populated vs empty. + fallback_reason="cross_run_miss", + cache_path="handler", + ) + # Use the appropriate LLM handler based on the feature flag llm_api_handler = LLMAPIHandlerFactory.get_override_llm_api_handler( llm_key_override, default=app.EXTRACTION_LLM_API_HANDLER @@ -4479,6 +4611,23 @@ async def extract_information_for_navigation_goal( extraction_cache.store(task.workflow_run_id, cache_key, json_response) except Exception: LOG.warning("extract_information cache store failed; ignoring", exc_info=True) + # Dual-write to the cross-run (Redis) tier. Ungated so the cache is + # warm before the read flag rolls out. OSS returns immediately; cloud + # writes to Redis with a long TTL and swallows backend errors. + try: + await app.AGENT_FUNCTION.store_cross_run_extraction_cache( + task.workflow_permanent_id, cache_key, json_response + ) + except Exception: + LOG.warning( + "extract_information cross-run cache store raised; ignoring", + task_id=task.task_id, + workflow_run_id=task.workflow_run_id, + workflow_permanent_id=task.workflow_permanent_id, + organization_id=task.organization_id, + cache_key=cache_key, + exc_info=True, + ) return ScrapeResult( scraped_data=json_response, diff --git a/tests/unit/test_extract_information_previous_info_cap.py b/tests/unit/test_extract_information_previous_info_cap.py index 5766a250f..5da3ecdc6 100644 --- a/tests/unit/test_extract_information_previous_info_cap.py +++ b/tests/unit/test_extract_information_previous_info_cap.py @@ -65,7 +65,11 @@ def _capture_handler_kwargs(monkeypatch, previous_extracted_information): task.task_id = "tsk_test" task.include_extracted_text = True - asyncio.run(handler.extract_information_for_navigation_goal(task=task, step=MagicMock(), scraped_page=scraped_page)) + asyncio.run( + handler.extract_information_for_navigation_goal( + task=task, step=MagicMock(retry_index=0), scraped_page=scraped_page + ) + ) return captured @@ -141,7 +145,11 @@ def _capture_handler_schema(monkeypatch, extracted_information_schema): task.task_id = "tsk_test" task.include_extracted_text = True - asyncio.run(handler.extract_information_for_navigation_goal(task=task, step=MagicMock(), scraped_page=scraped_page)) + asyncio.run( + handler.extract_information_for_navigation_goal( + task=task, step=MagicMock(retry_index=0), scraped_page=scraped_page + ) + ) return captured diff --git a/tests/unit/test_extract_information_retry_bypass.py b/tests/unit/test_extract_information_retry_bypass.py new file mode 100644 index 000000000..970d2fa8a --- /dev/null +++ b/tests/unit/test_extract_information_retry_bypass.py @@ -0,0 +1,154 @@ +"""Handler-level test for the extract-information retry self-heal path (SKY-8873). + +Covers the cache-bypass decision: when ``step.retry_index > 1`` the handler +must NOT consult the in-run cache and MUST evict the matching key before +the fresh LLM call, so the dual-write after extraction overwrites the +suspect prior entry rather than accumulating alongside it. +""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock + +from skyvern.forge.sdk.cache import extraction_cache +from skyvern.webeye.actions import handler + + +def _make_scraped_page(): + refreshed = MagicMock() + refreshed.extracted_text = "page text" + refreshed.url = "https://example.test" + refreshed.screenshots = [] + refreshed.build_element_tree = MagicMock(return_value="link") + refreshed.support_economy_elements_tree = MagicMock(return_value=False) + refreshed.last_used_element_tree_html = None + + scraped_page = MagicMock() + scraped_page.refresh = AsyncMock(return_value=refreshed) + scraped_page.screenshots = [] + return scraped_page + + +def _make_task(workflow_run_id: str, workflow_permanent_id: str | None = None): + task = MagicMock() + task.navigation_goal = None + task.navigation_payload = None + task.extracted_information = None + task.data_extraction_goal = "Extract documents" + task.extracted_information_schema = {"type": "object"} + task.error_code_mapping = None + task.llm_key = None + task.workflow_run_id = workflow_run_id + task.task_id = "tsk_test" + task.workflow_permanent_id = workflow_permanent_id + task.include_extracted_text = True + return task + + +def _stub_handler_dependencies(monkeypatch, llm_call_counter: list[int], synthetic_cache_key: str): + """Patch out the heavy handler deps and force ``compute_cache_key`` to + return a deterministic value so the test can pre-populate the cache at + that key and later assert eviction. + + Returns the cross-run lookup / store mocks so tests can assert call + counts and arguments on the dual-write path. + """ + + async def fake_llm(**_kwargs): + llm_call_counter.append(1) + return {"docs": ["fresh.pdf"]} + + monkeypatch.setattr( + handler, + "load_prompt_with_elements_tracked", + lambda **kwargs: ("rendered-prompt", dict(kwargs)), + ) + monkeypatch.setattr(handler, "ensure_context", lambda: MagicMock(tz_info=None)) + monkeypatch.setattr(handler.service_utils, "is_cua_task", AsyncMock(return_value=False)) + monkeypatch.setattr( + handler.LLMAPIHandlerFactory, + "get_override_llm_api_handler", + lambda llm_key, default: fake_llm, + ) + monkeypatch.setattr(handler.extraction_cache, "compute_cache_key", lambda **_: synthetic_cache_key) + # Shadow mode would fire a background LLM call on in-run cache hits and + # pollute the counter — not the behavior under test. + monkeypatch.setattr( + handler.app.AGENT_FUNCTION, + "should_shadow_extraction_cache_hit", + AsyncMock(return_value=False), + ) + # Cross-run tier hooks — the lookup returns None so the test targets the + # in-run / retry-bypass decision, not the cross-run hit path. The store + # mock is handed back so tests can assert the dual-write fired on the + # self-heal path. + lookup_mock = AsyncMock(return_value=None) + store_mock = AsyncMock(return_value=None) + monkeypatch.setattr(handler.app.AGENT_FUNCTION, "lookup_cross_run_extraction_cache", lookup_mock) + monkeypatch.setattr(handler.app.AGENT_FUNCTION, "store_cross_run_extraction_cache", store_mock) + return lookup_mock, store_mock + + +def test_retry_index_gt_one_evicts_in_run_entry_and_calls_llm(monkeypatch) -> None: + """On retry #2, the handler must drop the prior cached value, re-run the LLM, + and dual-write both tiers so the cross-run Redis entry is overwritten.""" + extraction_cache._reset_for_tests() + workflow_run_id = "wfr_retry_self_heal" + workflow_permanent_id = "wpid_retry_self_heal" + cache_key = "synthetic_cache_key_retry" + + # Prime the in-run cache with a stale/bad value — the retry bypass must + # drop it. If the guard fires correctly, a post-call lookup returns miss. + extraction_cache.store(workflow_run_id, cache_key, {"docs": ["stale.pdf"]}) + assert extraction_cache.lookup(workflow_run_id, cache_key).hit is True + + llm_calls: list[int] = [] + lookup_mock, store_mock = _stub_handler_dependencies(monkeypatch, llm_calls, cache_key) + + scraped_page = _make_scraped_page() + task = _make_task(workflow_run_id, workflow_permanent_id=workflow_permanent_id) + # retry_index = 2 → past the bypass threshold (> 1) + step = MagicMock(step_id="stp_retry2", retry_index=2) + + asyncio.run(handler.extract_information_for_navigation_goal(task=task, step=step, scraped_page=scraped_page)) + + # LLM must have been called (cache was bypassed, not consumed). + assert llm_calls == [1], "retry bypass must force a fresh LLM call" + # Post-call lookup returns the freshly-stored value, not the stale one — + # the in-run side of the dual-write overwrote the evicted entry. + after = extraction_cache.lookup(workflow_run_id, cache_key) + assert after.hit is True + assert after.value == {"docs": ["fresh.pdf"]} + # Cross-run side of the dual-write: the Redis store hook must have been + # called with the fresh value so the suspect Redis entry gets overwritten + # at the same wpid/cache_key pair. This is the self-heal path's whole point. + lookup_mock.assert_not_called() # retry bypass must not consult cross-run tier + store_mock.assert_awaited_once_with(workflow_permanent_id, cache_key, {"docs": ["fresh.pdf"]}) + extraction_cache._reset_for_tests() + + +def test_retry_index_one_still_uses_cache(monkeypatch) -> None: + """Retry #1 is below the bypass threshold — the cache must still serve.""" + extraction_cache._reset_for_tests() + workflow_run_id = "wfr_retry_first" + cache_key = "synthetic_cache_key_first_retry" + + extraction_cache.store(workflow_run_id, cache_key, {"docs": ["cached.pdf"]}) + + llm_calls: list[int] = [] + _stub_handler_dependencies(monkeypatch, llm_calls, cache_key) + + scraped_page = _make_scraped_page() + task = _make_task(workflow_run_id) + # retry_index = 1 — still below the bypass threshold + step = MagicMock(step_id="stp_retry1", retry_index=1) + + result = asyncio.run( + handler.extract_information_for_navigation_goal(task=task, step=step, scraped_page=scraped_page) + ) + + # Cache hit path — no LLM call. + assert llm_calls == [], "retry #1 must reuse the cached value (bypass threshold is retry_index > 1)" + assert result.scraped_data == {"docs": ["cached.pdf"]} + extraction_cache._reset_for_tests() diff --git a/tests/unit/test_extract_information_text_optout.py b/tests/unit/test_extract_information_text_optout.py index 089d5ff66..de5abf2b6 100644 --- a/tests/unit/test_extract_information_text_optout.py +++ b/tests/unit/test_extract_information_text_optout.py @@ -114,7 +114,11 @@ def _capture_extract_information_kwargs(monkeypatch, include_extracted_text: boo task = _make_task_for_extract_information(include_extracted_text=include_extracted_text) - asyncio.run(handler.extract_information_for_navigation_goal(task=task, step=MagicMock(), scraped_page=scraped_page)) + asyncio.run( + handler.extract_information_for_navigation_goal( + task=task, step=MagicMock(retry_index=0), scraped_page=scraped_page + ) + ) return captured diff --git a/tests/unit/test_extraction_cache.py b/tests/unit/test_extraction_cache.py index d861206e8..ef81d1d31 100644 --- a/tests/unit/test_extraction_cache.py +++ b/tests/unit/test_extraction_cache.py @@ -179,14 +179,44 @@ def test_lookup_age_seconds_is_monotonic_delta(monkeypatch: pytest.MonkeyPatch) assert result.age_seconds == pytest.approx(12.5) -def test_key_changes_when_local_date_changes() -> None: - """Date-relative extraction goals must miss when the date changes (midnight boundary).""" - assert _key(local_datetime="2026-04-10T00:00:00") != _key(local_datetime="2026-04-11T00:00:00") +def test_invalidate_key_drops_single_entry() -> None: + """Per-key eviction leaves sibling entries intact. Used by the retry self-heal path.""" + key_a = _key() + key_b = _key(current_url="https://example.com/other") + extraction_cache.store("wfr_1", key_a, {"v": "a"}) + extraction_cache.store("wfr_1", key_b, {"v": "b"}) + + removed = extraction_cache.invalidate_key("wfr_1", key_a) + assert removed is True + assert extraction_cache.lookup("wfr_1", key_a).hit is False + # Sibling entry must survive — invalidate is per-key, not per-run. + hit_b = extraction_cache.lookup("wfr_1", key_b) + assert hit_b.hit is True + assert hit_b.value == {"v": "b"} -def test_key_stable_across_same_date_different_times() -> None: - """Same date with different timestamps should produce the same key (truncated to date).""" - assert _key(local_datetime="2026-04-10T08:30:00.123456") == _key(local_datetime="2026-04-10T23:59:59.999999") +def test_invalidate_key_returns_false_for_unknown_key() -> None: + extraction_cache.store("wfr_1", _key(), {"v": "a"}) + assert extraction_cache.invalidate_key("wfr_1", "nonexistent-key") is False + + +def test_invalidate_key_returns_false_for_unknown_workflow_run() -> None: + assert extraction_cache.invalidate_key("wfr_missing", _key()) is False + + +def test_invalidate_key_returns_false_for_empty_workflow_run_id() -> None: + """Falsy workflow_run_id is a no-op, matching the store/lookup contract.""" + assert extraction_cache.invalidate_key(None, _key()) is False + assert extraction_cache.invalidate_key("", _key()) is False + + +def test_compute_cache_key_rejects_legacy_local_datetime_kwarg() -> None: + """``local_datetime`` was dropped from the signature (SKY-8873): content + hash alone defines cache identity, so callers that still try to pass it + must fail loudly rather than silently producing a key that happens to be + stable-for-the-wrong-reason.""" + with pytest.raises(TypeError): + extraction_cache.compute_cache_key(call_path="test", local_datetime="2026-04-10T00:00:00") # type: ignore[call-arg] def test_none_and_empty_string_produce_different_keys() -> None: