From 24971083be9f6087b91a9ce13f3887e8f8c3662d Mon Sep 17 00:00:00 2001 From: Andrew Neilson Date: Wed, 22 Apr 2026 15:14:42 -0700 Subject: [PATCH] fix(SKY-9163): replace 5-min wall-clock timeout with progress watchdog (#5601) --- skyvern/forge/sdk/copilot/tools.py | 364 ++++++++++++------ skyvern/forge/sdk/db/agent_db.py | 3 + skyvern/forge/sdk/db/repositories/tasks.py | 34 ++ tests/unit/test_copilot_watchdog.py | 414 +++++++++++++++++++++ 4 files changed, 707 insertions(+), 108 deletions(-) create mode 100644 tests/unit/test_copilot_watchdog.py diff --git a/skyvern/forge/sdk/copilot/tools.py b/skyvern/forge/sdk/copilot/tools.py index ddda69433..6ab907bb3 100644 --- a/skyvern/forge/sdk/copilot/tools.py +++ b/skyvern/forge/sdk/copilot/tools.py @@ -6,8 +6,10 @@ import asyncio import base64 import json import re +import time from collections import defaultdict -from typing import Any +from datetime import datetime +from typing import Any, Literal import structlog import yaml @@ -42,6 +44,7 @@ from skyvern.forge.sdk.workflow.models.parameter import ( WorkflowParameterType, ) from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus +from skyvern.schemas.workflows import BlockType from skyvern.webeye.navigation import is_skip_inner_retry_error LOG = structlog.get_logger() @@ -55,7 +58,24 @@ _FAILED_BLOCK_STATUSES: frozenset[str] = frozenset( } ) _DATA_PRODUCING_BLOCK_TYPES = frozenset({"EXTRACTION", "TEXT_PROMPT"}) -RUN_BLOCKS_DEBUG_TIMEOUT_SECONDS = 300 + +# Absolute upper bound on a single ``run_blocks`` tool invocation. Exists only +# as a last-resort trip wire for runaway loops — progressing runs should never +# approach this. The OpenAI Agents SDK wraps the tool in +# ``asyncio.wait_for(..., timeout=RUN_BLOCKS_SAFETY_CEILING_SECONDS)``; the +# inner poll loop leaves a 10 s headroom below this ceiling for orderly +# cleanup before the SDK cancels. +RUN_BLOCKS_SAFETY_CEILING_SECONDS = 1200 # 20 min + +# Primary exit condition: seconds of no observed progress across the combined +# run / block / step heartbeat. Sized to accommodate the slowest single LLM +# round-trip (~30-60 s in practice) with headroom; going tighter risks +# false-positives on healthy runs. +RUN_BLOCKS_STAGNATION_WINDOW_SECONDS = 90 + +# 5 s balances responsiveness (18 samples inside the stagnation window) against +# DB load (240 polls worst case at the safety ceiling). +RUN_BLOCKS_POLL_INTERVAL_SECONDS = 5.0 # Detached cleanup tasks held here so the garbage collector does not drop them # while they still have work to do, and so the "task exception was never @@ -315,18 +335,20 @@ def _tool_loop_error(ctx: AgentContext, tool_name: str) -> str | None: # to block-running tools so planning/metadata tools (update_workflow, # list_credentials, get_run_results) stay unaffected. if tool_name in _BLOCK_RUNNING_TOOLS: - # Reconciliation guard: a previous block-running tool call timed out - # without reconciling the run's outcome (post-drain status was - # ``canceled``, non-final, or unreadable). Block further block-running - # calls until ``get_run_results`` clears the flag. Prevents the LLM - # from auto-retrying a mutation block whose side effects may have - # landed. + # Reconciliation guard: the previous block-running tool call exited + # without a trustworthy terminal status for its workflow run (the + # watchdog's stagnation / ceiling / task_exit_unfinalized paths, or + # the SKY-9167 post-drain branch where the row read as ``canceled``, + # non-final, or unreadable). Block further block-running calls until + # ``get_run_results`` clears the flag — prevents the LLM from + # auto-retrying a mutation block whose side effects may already + # have landed. pending_run_id = getattr(ctx, "pending_reconciliation_run_id", None) if isinstance(pending_run_id, str) and pending_run_id: return ( - f"The previous block-running tool call timed out and run " - f"{pending_run_id} was not reconciled to a trustworthy terminal " - f'status. Call `get_run_results(workflow_run_id="{pending_run_id}")` ' + f"The previous block-running tool call for run {pending_run_id} " + f"ended without a trustworthy terminal status. " + f'Call `get_run_results(workflow_run_id="{pending_run_id}")` ' f"first, report the result to the user, then await user input " f"before running more blocks. This guard prevents duplicate " f"side effects on live sites." @@ -773,6 +795,155 @@ def _seed_for_frontier( return labels_to_execute, seed, frontier +# Watchdog exit reasons. ``success`` means the run reached a trustworthy +# terminal status inside the poll loop OR after the post-drain reconcile. +# The three non-success reasons share the reconcile path but produce distinct +# error messages: ``stagnation`` is the primary trip (no progress signals +# for ``RUN_BLOCKS_STAGNATION_WINDOW_SECONDS`` seconds), ``ceiling`` is the +# last-resort budget-exhausted branch, and ``task_exit_unfinalized`` is the +# rare race where ``execute_workflow`` naturally exits before writing a +# terminal row. +WatchdogExitReason = Literal["success", "stagnation", "ceiling", "task_exit_unfinalized"] + + +# Block types that legitimately execute long silent periods: one DB write on +# entry, work done without intermediate writes (sleep / LLM call / await human +# input), one write on finish. The watchdog can't distinguish these from +# "stuck", so any invocation that includes one disables stagnation for the +# whole run and relies on the safety ceiling alone. +_QUIET_BLOCK_TYPES: frozenset[str] = frozenset( + {BlockType.WAIT.value, BlockType.TEXT_PROMPT.value, BlockType.HUMAN_INTERACTION.value} +) + + +def _any_quiet_block_requested( + copilot_ctx: CopilotContext, + labels: list[str] | None, +) -> bool: + """Return True if any of ``labels`` refers to a block whose type is in + ``_QUIET_BLOCK_TYPES``. Reuses ``_blocks_by_label`` on the already-loaded + workflow definition — no DB call. + """ + if not labels: + return False + last_workflow = getattr(copilot_ctx, "last_workflow", None) + if last_workflow is None: + return False + by_label = _blocks_by_label(getattr(last_workflow, "workflow_definition", None)) + for label in labels: + block = by_label.get(label) + if block is None: + continue + block_type = getattr(block, "block_type", None) + if block_type is None: + continue + block_type_str = block_type.value if hasattr(block_type, "value") else str(block_type) + if block_type_str in _QUIET_BLOCK_TYPES: + return True + return False + + +async def _read_progress_sources( + ctx: CopilotContext, + workflow_run_id: str, +) -> tuple[WorkflowRun | None, datetime | None, datetime | None]: + """Read one ``workflow_runs`` row + the two progress aggregates needed + by the watchdog marker. Three cheap indexed queries; no row hydration + on the aggregate side. The two repo calls run concurrently — they open + separate async sessions and hit different tables. + """ + + async def _read_timestamps() -> tuple[datetime | None, datetime | None]: + try: + return await app.DATABASE.tasks.get_workflow_run_progress_timestamps( + workflow_run_id=workflow_run_id, + organization_id=ctx.organization_id, + ) + except Exception: + LOG.warning( + "Workflow run progress timestamps read failed", + workflow_run_id=workflow_run_id, + exc_info=True, + ) + return None, None + + run, (step_ts, block_ts) = await asyncio.gather( + _safe_read_workflow_run(workflow_run_id, ctx.organization_id, context="watchdog-poll"), + _read_timestamps(), + ) + return run, step_ts, block_ts + + +def _progress_marker( + run: WorkflowRun | None, + step_ts: datetime | None, + block_ts: datetime | None, +) -> tuple[Any, ...]: + """Hashable scalar snapshot. Changes iff any observable progress has + occurred at the run, step, or block level since the last poll. Every + ``update_step`` fires during action execution (including incremental + token/cost accumulators at ``forge/agent.py:1449``), so + ``max(step.modified_at)`` is the per-LLM-call heartbeat. Non-task blocks + (CODE, TEXT_PROMPT) don't create step rows — ``max(workflow_run_block.modified_at)`` + covers that case. ``run.modified_at`` and ``run.status`` catch the + run-level transitions that happen outside those two tables. + """ + return ( + run.status if run else None, + run.modified_at if run is not None else None, + step_ts, + block_ts, + ) + + +async def _watchdog_error_message( + exit_reason: WatchdogExitReason, + ctx: AgentContext, + workflow_run_id: str, + run: WorkflowRun | None, +) -> str: + """Build the LLM-facing error string for a non-success watchdog exit. + + Every variant ends with the same reconciliation instruction so the agent + has a consistent next step: call ``get_run_results`` with the run_id to + resolve the outcome before running more blocks. The ``pending_reconciliation_run_id`` + guard in ``_tool_loop_error`` enforces that the agent actually does so. + + None of the variants contain "timed out" or retry-inviting phrasing — + that's the SKY-9163 regression we're fixing. + """ + if exit_reason == "stagnation": + body = ( + f"The run has not made progress for {RUN_BLOCKS_STAGNATION_WINDOW_SECONDS}s. " + f"No step, block, or workflow-run row updates were observed in that window. " + f"The page is most likely blocked by a captcha, popup, anti-bot challenge, " + f"hidden validation error, or an infinite-retry loop on an action the agent " + f"cannot detect is failing." + ) + elif exit_reason == "ceiling": + body = ( + f"The run exceeded the {RUN_BLOCKS_SAFETY_CEILING_SECONDS}s absolute ceiling " + f"while still showing progress. The workflow is too long to fit in a single " + f"tool invocation — split it into smaller block groups." + ) + else: # task_exit_unfinalized + last_observed = f"last observed status: {run.status}" if run is not None else "workflow run row was unreadable" + body = ( + f"The run ended but did not record a trustworthy terminal status in the " + f"cancellation grace window ({last_observed})." + ) + + message = ( + f"{body} Run ID: {workflow_run_id}. Outcome is uncertain. " + f"Do NOT re-invoke block-running tools in this session without first calling " + f"`get_run_results` with this workflow_run_id and reporting the result to the user." + ) + current_url, _ = await _fallback_page_info(ctx) + if current_url: + message += f" Browser was on: {current_url}" + return message + + async def _run_blocks_and_collect_debug( params: dict[str, Any], ctx: CopilotContext, @@ -908,123 +1079,97 @@ async def _run_blocks_and_collect_debug( ) ) - # Internal poll budget strictly below the SDK timeout. The OpenAI Agents - # SDK wraps this tool in ``asyncio.wait_for(..., timeout=RUN_BLOCKS_DEBUG_TIMEOUT_SECONDS)`` - # (see openai-agents-python tool.py:invoke_function_tool). If our poll and - # the SDK timeout share the same budget, the SDK wins the race, cancels - # the tool coroutine, and our orderly cleanup branch below never runs — - # leaving ``run_task`` as an orphan that runs to natural completion. - # Leaving 10 s headroom ensures the orderly path fires first in the - # normal-slow case; the outer ``except asyncio.CancelledError`` covers - # the abnormal case where we get cancelled anyway. - max_poll = max(1, RUN_BLOCKS_DEBUG_TIMEOUT_SECONDS - 10) - poll_interval = 2.0 - elapsed = 0.0 - final_status = None - run: Any = None + # The OpenAI Agents SDK wraps this tool in + # ``asyncio.wait_for(..., timeout=RUN_BLOCKS_SAFETY_CEILING_SECONDS)``, so + # the inner budget leaves 10 s of headroom for the cancel-drain and + # post-drain reconcile to finish before the SDK's own cancel fires. + # + # Do NOT short-circuit on client disconnect: the agent loop runs to + # completion after the SSE stream is gone so its reply persists + # (SKY-8986); aborting mid-block would strand the run without debug + # output for the final chat message. + initial_run, initial_step_ts, initial_block_ts = await _read_progress_sources(ctx, workflow_run.workflow_run_id) + progress_marker = _progress_marker(initial_run, initial_step_ts, initial_block_ts) + last_progress_monotonic = time.monotonic() + started_monotonic = last_progress_monotonic + budget_seconds = max(1, RUN_BLOCKS_SAFETY_CEILING_SECONDS - 10) + final_status: str | None = None + run: Any = initial_run + exit_reason: WatchdogExitReason | None = None + # Quiet blocks (WAIT/TEXT_PROMPT/HUMAN_INTERACTION) legitimately have + # DB-silent periods; disable stagnation for any invocation that includes + # one. Safety ceiling still applies. + stagnation_enabled = not _any_quiet_block_requested(ctx, labels_to_execute) try: - while elapsed < max_poll: - await asyncio.sleep(poll_interval) - elapsed += poll_interval + while True: + await asyncio.sleep(RUN_BLOCKS_POLL_INTERVAL_SECONDS) - if run_task.done(): - run = await app.DATABASE.workflow_runs.get_workflow_run( - workflow_run_id=workflow_run.workflow_run_id, - organization_id=ctx.organization_id, - ) - if run and WorkflowRunStatus(run.status).is_final(): - final_status = run.status - break + run, step_ts, block_ts = await _read_progress_sources(ctx, workflow_run.workflow_run_id) - # Deliberately do NOT short-circuit on client disconnect here. - # The agent loop is allowed to run to completion after the SSE - # stream is gone (see SKY-8986) so its reply can be persisted; - # aborting an in-flight block execution would leave the - # workflow run in a limbo state and the agent would have no - # debug output to summarize in the final chat message. - - run = await app.DATABASE.workflow_runs.get_workflow_run( - workflow_run_id=workflow_run.workflow_run_id, - organization_id=ctx.organization_id, - ) if run and WorkflowRunStatus(run.status).is_final(): final_status = run.status + exit_reason = "success" break - if final_status is None: - # Read the row BEFORE the cancel helper runs. The poll interval is - # 2 s, so a legitimate self-finalize (``canceled`` from a user/block - # cancel, or any other terminal) can land in the narrow gap between - # the last poll and entering this branch. Trusting the pre-cancel - # status preserves that signal without ambiguity — nothing we do - # could have written it. Any post-cancel re-read has to exclude - # ``canceled`` because the helper itself writes synthetic - # ``canceled`` (see ``WorkflowRunStatus.is_final_excluding_canceled``). + if run_task.done(): + # Row not terminal yet — shared reconcile path below flips + # most of these back to success after post-drain reread. + exit_reason = "task_exit_unfinalized" + break + + now = time.monotonic() + new_marker = _progress_marker(run, step_ts, block_ts) + # A run in ``paused`` status (e.g. HumanInteractionBlock) is a + # user-driven wait, not stagnation — never trip. + is_paused = run is not None and run.status == WorkflowRunStatus.paused.value + stagnation_active = stagnation_enabled and not is_paused + + if new_marker != progress_marker: + progress_marker = new_marker + last_progress_monotonic = now + elif stagnation_active and now - last_progress_monotonic >= RUN_BLOCKS_STAGNATION_WINDOW_SECONDS: + exit_reason = "stagnation" + break + + if now - started_monotonic >= budget_seconds: + exit_reason = "ceiling" + break + + if exit_reason is not None and exit_reason != "success": + # Pre-cancel read first: a legitimate self-finalize (user/block + # cancel, or any terminal the run wrote itself) can land between + # the last poll and here, and trusting it avoids the + # synthetic-``canceled`` ambiguity that the post-drain reread + # has to exclude. Then cancel + reread + + # ``_trusted_post_drain_status`` applies SKY-9167's success-race + # recovery uniformly to all three non-success exit reasons. pre_cancel_run = await _safe_read_workflow_run( workflow_run.workflow_run_id, ctx.organization_id, context="pre-cancel" ) - if pre_cancel_run is not None and WorkflowRunStatus(pre_cancel_run.status).is_final(): final_status = pre_cancel_run.status run = pre_cancel_run + exit_reason = "success" else: await _cancel_run_task_if_not_final(run_task, workflow_run.workflow_run_id) - - # The cancel helper waits for ``execute_workflow``'s shielded - # ``_finalize_workflow_run_status`` to run, which writes the real - # terminal status when the run completed during the drain window. - # Re-read once before declaring timeout so a run that finished - # in those last few seconds is not mis-reported as a failure the - # LLM will retry against a live site. run = await _safe_read_workflow_run( workflow_run.workflow_run_id, ctx.organization_id, context="post-drain" ) trusted = _trusted_post_drain_status(run) if trusted is not None: final_status = trusted - else: - # Three distinct cases produce a ``None`` from - # ``_trusted_post_drain_status`` — DB read failed, row - # non-final, or ``canceled`` (legitimate vs synthetic - # indistinguishable at this read). Body differs so logs - # are attributable; the LLM-facing instruction is - # identical: outcome is uncertain, reconcile via - # ``get_run_results`` before re-invoking block-running - # tools. The reconciliation guard below enforces that. - if run is None: - timeout_body = ( - f"Block execution exceeded the {max_poll}s tool budget and the workflow " - f"run row could not be re-read after cancellation (see 'Workflow run " - f"re-read failed' log)." - ) - elif run.status == WorkflowRunStatus.canceled.value: - timeout_body = ( - f"Block execution was cancelled after exceeding the {max_poll}s tool " - f"budget. Side effects of any actions already taken may have landed." - ) - else: - timeout_body = ( - f"Block execution exceeded the {max_poll}s tool budget and the workflow " - f"run did not reach a terminal status within the cancellation grace " - f"window (last observed status: {run.status})." - ) - timeout_msg = ( - f"{timeout_body} Run ID: {workflow_run.workflow_run_id}. " - f"Outcome is uncertain. Do NOT re-invoke block-running tools in this " - f"session without first calling `get_run_results` with this " - f"workflow_run_id and reporting the result to the user." - ) - current_url, _ = await _fallback_page_info(ctx) - if current_url: - timeout_msg += f" Browser was on: {current_url}" - # Turn-scoped reconciliation guard. Clearing requires an - # explicit ``get_run_results`` read that resolves to this - # run_id with a status that passes - # ``WorkflowRunStatus.is_final_excluding_canceled`` (see - # clearing logic in ``get_run_results_tool``). - ctx.pending_reconciliation_run_id = workflow_run.workflow_run_id - return {"ok": False, "error": timeout_msg} + exit_reason = "success" + + if exit_reason != "success": + # Turn-scoped reconciliation guard — cleared only by a + # ``get_run_results`` call that resolves this run_id to an + # ``is_final_excluding_canceled`` status + # (``_maybe_clear_reconciliation_flag``). + ctx.pending_reconciliation_run_id = workflow_run.workflow_run_id + assert exit_reason is not None # narrows for mypy; outer check excludes "success" but not None + error_msg = await _watchdog_error_message(exit_reason, ctx, workflow_run.workflow_run_id, run) + return {"ok": False, "error": error_msg} except asyncio.CancelledError: # The SDK's @function_tool(timeout=...) cancelled us mid-poll. Shield # the cleanup so the parent cancellation can't interrupt it mid-await. @@ -1071,6 +1216,9 @@ async def _run_blocks_and_collect_debug( await _attach_action_traces(blocks, results, ctx.organization_id) + # final_status is guaranteed set here: every non-success exit returns + # above, and the success path always populates final_status. + assert final_status is not None run_ok = WorkflowRunStatus(final_status) == WorkflowRunStatus.completed action_trace_summary: list[str] = [] @@ -1765,7 +1913,7 @@ def _run_blocks_span_data( @function_tool( name_override="run_blocks_and_collect_debug", - timeout=RUN_BLOCKS_DEBUG_TIMEOUT_SECONDS, + timeout=RUN_BLOCKS_SAFETY_CEILING_SECONDS, strict_mode=False, ) async def run_blocks_tool( @@ -1851,7 +1999,7 @@ async def get_run_results_tool( @function_tool( name_override="update_and_run_blocks", - timeout=RUN_BLOCKS_DEBUG_TIMEOUT_SECONDS, + timeout=RUN_BLOCKS_SAFETY_CEILING_SECONDS, strict_mode=False, ) async def update_and_run_blocks_tool( diff --git a/skyvern/forge/sdk/db/agent_db.py b/skyvern/forge/sdk/db/agent_db.py index 304dee7b9..fa1402690 100644 --- a/skyvern/forge/sdk/db/agent_db.py +++ b/skyvern/forge/sdk/db/agent_db.py @@ -199,6 +199,9 @@ class AgentDB(BaseAlchemyDB): async def get_total_unique_step_order_count_by_task_ids(self, *args: Any, **kwargs: Any) -> Any: return await self.tasks.get_total_unique_step_order_count_by_task_ids(*args, **kwargs) + async def get_workflow_run_progress_timestamps(self, *args: Any, **kwargs: Any) -> Any: + return await self.tasks.get_workflow_run_progress_timestamps(*args, **kwargs) + async def get_task_step_models(self, *args: Any, **kwargs: Any) -> Any: return await self.tasks.get_task_step_models(*args, **kwargs) diff --git a/skyvern/forge/sdk/db/repositories/tasks.py b/skyvern/forge/sdk/db/repositories/tasks.py index 2be76d725..4504f872d 100644 --- a/skyvern/forge/sdk/db/repositories/tasks.py +++ b/skyvern/forge/sdk/db/repositories/tasks.py @@ -17,6 +17,7 @@ from skyvern.forge.sdk.db.models import ( StepModel, TaskModel, TaskRunModel, + WorkflowRunBlockModel, WorkflowRunModel, ) from skyvern.forge.sdk.db.utils import convert_to_step, convert_to_task, hydrate_action, serialize_proxy_location @@ -228,6 +229,39 @@ class TasksRepository(BaseRepository): row = (await session.execute(query)).one() return row.total, row.completed + @db_operation("get_workflow_run_progress_timestamps") + async def get_workflow_run_progress_timestamps( + self, + workflow_run_id: str, + organization_id: str | None = None, + ) -> tuple[datetime | None, datetime | None]: + """Return ``(max(step.modified_at), max(workflow_run_block.modified_at))`` + for a given workflow run. Both values are scalar aggregates — no row + hydration — and are designed for the copilot watchdog poll path where + the only question is "has anything changed since the last poll?". + + Step updates are the per-LLM-call heartbeat (status transitions plus + incremental token/cost accumulators — every ``update_step`` call bumps + ``StepModel.modified_at``). Block updates cover non-task block types + (CODE, TEXT_PROMPT) that never create a task row. Together the two + aggregates cover every kind of block the copilot can run. + """ + async with self.Session() as session: + step_stmt = ( + select(func.max(StepModel.modified_at)) + .join(TaskModel, StepModel.task_id == TaskModel.task_id) + .where(TaskModel.workflow_run_id == workflow_run_id) + .where(StepModel.organization_id == organization_id) + ) + block_stmt = ( + select(func.max(WorkflowRunBlockModel.modified_at)) + .where(WorkflowRunBlockModel.workflow_run_id == workflow_run_id) + .where(WorkflowRunBlockModel.organization_id == organization_id) + ) + step_ts = (await session.execute(step_stmt)).scalar_one_or_none() + block_ts = (await session.execute(block_stmt)).scalar_one_or_none() + return step_ts, block_ts + @db_operation("get_total_unique_step_order_count_by_task_ids") async def get_total_unique_step_order_count_by_task_ids( self, diff --git a/tests/unit/test_copilot_watchdog.py b/tests/unit/test_copilot_watchdog.py new file mode 100644 index 000000000..7dc9dee91 --- /dev/null +++ b/tests/unit/test_copilot_watchdog.py @@ -0,0 +1,414 @@ +"""Tests for the SKY-9163 progress-based watchdog inside +``_run_blocks_and_collect_debug``. + +The full function does too much setup (prepare_workflow, execute_workflow, +parameter-binding invariants) to unit-test end-to-end cheaply. Instead we +target the isolated watchdog surface: + +- ``_progress_marker`` — marker stability and field-change sensitivity. +- ``_read_progress_sources`` — correct delegation + graceful handling of + DB failures. +- ``_watchdog_error_message`` — the regression-guard strings (no + "timed out", reconciliation-instruction, per-reason body). + +Those three are where the SKY-9163 correctness properties live: + +1. A stale marker must be exactly equal across two polls when nothing + changed in the DB (otherwise the watchdog would false-reset on every + poll, making stagnation detection impossible). +2. Any change in ``run.status`` / ``run.modified_at`` / ``step_ts`` / + ``block_ts`` must produce a new marker (otherwise the watchdog would + false-trip on a progressing run). +3. The error messages must not read as retry-invites — that was the + original bug. "timed out" / "likely stuck repeating failing actions" + are the exact phrases the LLM used to read as "try again". +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from types import SimpleNamespace +from typing import Any + +import pytest + +from skyvern.forge.sdk.copilot.tools import ( + RUN_BLOCKS_SAFETY_CEILING_SECONDS, + RUN_BLOCKS_STAGNATION_WINDOW_SECONDS, + _any_quiet_block_requested, + _progress_marker, + _read_progress_sources, + _tool_loop_error, + _watchdog_error_message, +) + + +def _fake_run(status: str = "running", modified_at: datetime | None = None) -> Any: + """A bare-minimum stand-in for ``WorkflowRun`` — the marker only reads + ``.status`` and ``.modified_at``. + """ + return SimpleNamespace( + status=status, + modified_at=modified_at or datetime(2026, 4, 21, 12, 0, 0, tzinfo=timezone.utc), + browser_session_id=None, + ) + + +# --------------------------------------------------------------------------- +# _progress_marker: stability + per-field sensitivity. +# --------------------------------------------------------------------------- + + +def test_progress_marker_stable_for_identical_inputs() -> None: + """If the DB reports identical values on two successive polls, the marker + must compare equal. A marker that drifts on repeated reads would make the + stagnation window unreachable.""" + run = _fake_run() + step_ts = datetime(2026, 4, 21, 12, 0, 30, tzinfo=timezone.utc) + block_ts = datetime(2026, 4, 21, 12, 0, 31, tzinfo=timezone.utc) + + m1 = _progress_marker(run, step_ts, block_ts) + m2 = _progress_marker(run, step_ts, block_ts) + + assert m1 == m2 + + +def test_progress_marker_changes_on_run_status() -> None: + run1 = _fake_run(status="running") + run2 = _fake_run(status="queued") + assert _progress_marker(run1, None, None) != _progress_marker(run2, None, None) + + +def test_progress_marker_changes_on_run_modified_at() -> None: + run1 = _fake_run(modified_at=datetime(2026, 4, 21, 12, 0, 0, tzinfo=timezone.utc)) + run2 = _fake_run(modified_at=datetime(2026, 4, 21, 12, 0, 1, tzinfo=timezone.utc)) + assert _progress_marker(run1, None, None) != _progress_marker(run2, None, None) + + +def test_progress_marker_changes_on_step_ts() -> None: + run = _fake_run() + t1 = datetime(2026, 4, 21, 12, 0, 0, tzinfo=timezone.utc) + t2 = datetime(2026, 4, 21, 12, 0, 5, tzinfo=timezone.utc) + assert _progress_marker(run, t1, None) != _progress_marker(run, t2, None) + + +def test_progress_marker_changes_on_block_ts() -> None: + run = _fake_run() + t1 = datetime(2026, 4, 21, 12, 0, 0, tzinfo=timezone.utc) + t2 = datetime(2026, 4, 21, 12, 0, 5, tzinfo=timezone.utc) + assert _progress_marker(run, None, t1) != _progress_marker(run, None, t2) + + +def test_progress_marker_tolerates_none_run() -> None: + """A transient DB read failure can return ``run=None``. The marker must + still be hashable and comparable.""" + m_none = _progress_marker(None, None, None) + assert m_none == (None, None, None, None) + + # Two consecutive failed reads produce equal markers → stagnation clock + # keeps ticking (the right behavior when we can't confirm progress). + assert _progress_marker(None, None, None) == _progress_marker(None, None, None) + + +# --------------------------------------------------------------------------- +# _read_progress_sources: delegation + graceful DB-failure handling. +# --------------------------------------------------------------------------- + + +class _FakeTasksRepo: + def __init__( + self, + *, + step_ts: datetime | None = None, + block_ts: datetime | None = None, + raise_on_call: Exception | None = None, + ) -> None: + self.step_ts = step_ts + self.block_ts = block_ts + self.raise_on_call = raise_on_call + self.call_count = 0 + + async def get_workflow_run_progress_timestamps( + self, + *, + workflow_run_id: str, + organization_id: str | None = None, + ) -> tuple[datetime | None, datetime | None]: + self.call_count += 1 + if self.raise_on_call is not None: + raise self.raise_on_call + return self.step_ts, self.block_ts + + +class _FakeWorkflowRunsRepo: + def __init__(self, run: Any | None = None, raise_on_call: Exception | None = None) -> None: + self.run = run + self.raise_on_call = raise_on_call + + async def get_workflow_run( + self, + *, + workflow_run_id: str, + organization_id: str, + ) -> Any: + if self.raise_on_call is not None: + raise self.raise_on_call + return self.run + + +class _FakeDatabase: + def __init__(self, tasks: _FakeTasksRepo, workflow_runs: _FakeWorkflowRunsRepo) -> None: + self.tasks = tasks + self.workflow_runs = workflow_runs + + +class _FakeCtx: + organization_id = "o_test" + + +@pytest.mark.asyncio +async def test_read_progress_sources_returns_run_and_timestamps( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from skyvern.forge import app as forge_app + + run = _fake_run() + step_ts = datetime(2026, 4, 21, 12, 0, 10, tzinfo=timezone.utc) + block_ts = datetime(2026, 4, 21, 12, 0, 11, tzinfo=timezone.utc) + db = _FakeDatabase( + tasks=_FakeTasksRepo(step_ts=step_ts, block_ts=block_ts), + workflow_runs=_FakeWorkflowRunsRepo(run=run), + ) + monkeypatch.setattr(forge_app, "DATABASE", db) + + read_run, read_step_ts, read_block_ts = await _read_progress_sources(_FakeCtx(), "wr_1") + + assert read_run is run + assert read_step_ts == step_ts + assert read_block_ts == block_ts + + +@pytest.mark.asyncio +async def test_read_progress_sources_swallows_workflow_run_errors( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A DB read failure on the workflow-run row must not crash the watchdog — + ``_safe_read_workflow_run`` returns None and the poll continues.""" + from skyvern.forge import app as forge_app + + db = _FakeDatabase( + tasks=_FakeTasksRepo(step_ts=None, block_ts=None), + workflow_runs=_FakeWorkflowRunsRepo(raise_on_call=RuntimeError("DB flake")), + ) + monkeypatch.setattr(forge_app, "DATABASE", db) + + read_run, read_step_ts, read_block_ts = await _read_progress_sources(_FakeCtx(), "wr_1") + + assert read_run is None + assert read_step_ts is None + assert read_block_ts is None + + +@pytest.mark.asyncio +async def test_read_progress_sources_swallows_progress_timestamps_errors( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A DB read failure on the aggregate timestamps must also not crash — the + caller still gets the run (if readable) and ``None`` for the timestamps. + """ + from skyvern.forge import app as forge_app + + run = _fake_run() + db = _FakeDatabase( + tasks=_FakeTasksRepo(raise_on_call=RuntimeError("aggregate query failed")), + workflow_runs=_FakeWorkflowRunsRepo(run=run), + ) + monkeypatch.setattr(forge_app, "DATABASE", db) + + read_run, read_step_ts, read_block_ts = await _read_progress_sources(_FakeCtx(), "wr_1") + + assert read_run is run + assert read_step_ts is None + assert read_block_ts is None + + +# --------------------------------------------------------------------------- +# _watchdog_error_message: the regression-guard strings. +# --------------------------------------------------------------------------- + + +class _ErrorCtx: + """Minimal ``AgentContext`` stand-in for the error-message path — only + ``browser_session_id`` is read, and only by ``_fallback_page_info``.""" + + organization_id = "o_test" + browser_session_id = None + + +@pytest.mark.asyncio +async def test_stagnation_error_message_does_not_invite_retry() -> None: + """The exact SKY-9163 bug: the old copy said "likely stuck repeating + failing actions" which the LLM read as "try again". The stagnation + message must explicitly discourage retry.""" + msg = await _watchdog_error_message("stagnation", _ErrorCtx(), "wr_test", _fake_run()) + + assert "timed out" not in msg.lower() + assert "likely stuck repeating" not in msg.lower() + assert str(RUN_BLOCKS_STAGNATION_WINDOW_SECONDS) in msg + assert "Run ID: wr_test" in msg + assert "get_run_results" in msg + assert "Do NOT re-invoke block-running tools" in msg + + +@pytest.mark.asyncio +async def test_ceiling_error_message_advises_splitting() -> None: + """The ceiling path is rare (a runaway run that keeps making progress + past 20 min). Its error must tell the LLM to split the workflow, not + retry — a longer run won't fit either.""" + msg = await _watchdog_error_message("ceiling", _ErrorCtx(), "wr_test", _fake_run()) + + assert "timed out" not in msg.lower() + assert str(RUN_BLOCKS_SAFETY_CEILING_SECONDS) in msg + assert "split" in msg.lower() + assert "Run ID: wr_test" in msg + assert "get_run_results" in msg + + +@pytest.mark.asyncio +async def test_task_exit_unfinalized_message_reports_last_observed_status() -> None: + """When ``execute_workflow`` naturally exits but the row isn't terminal, + the error must name the last-observed status so the LLM has a concrete + anchor for the follow-up ``get_run_results`` call.""" + run = _fake_run(status="running") + msg = await _watchdog_error_message("task_exit_unfinalized", _ErrorCtx(), "wr_test", run) + + assert "timed out" not in msg.lower() + assert "last observed status: running" in msg + assert "Run ID: wr_test" in msg + assert "get_run_results" in msg + + +@pytest.mark.asyncio +async def test_task_exit_unfinalized_message_tolerates_unreadable_run() -> None: + """If the post-drain reread also fails (``run is None``), the message must + still be well-formed and mention the unreadable state rather than + crashing on a ``None.status`` access.""" + msg = await _watchdog_error_message("task_exit_unfinalized", _ErrorCtx(), "wr_test", None) + + assert "unreadable" in msg.lower() + assert "Run ID: wr_test" in msg + assert "get_run_results" in msg + + +# --------------------------------------------------------------------------- +# _any_quiet_block_requested: stagnation bypass for block types that +# legitimately do long-silent work. Without this bypass, a WAIT block with +# wait_sec >= 90, a slow TEXT_PROMPT LLM call, or a HumanInteractionBlock +# pausing for user input would be falsely reported as stagnation and the +# tool would cancel a healthy run. +# --------------------------------------------------------------------------- + + +def _workflow_with_block_types(*type_value_label_pairs: tuple[str, str]) -> Any: + """Build a minimal `last_workflow`-shaped object that + ``_any_quiet_block_requested`` can walk. Each pair is + ``(block_type_value, label)`` — e.g. ``("wait", "pause1")``. + """ + blocks = [ + SimpleNamespace(label=label, block_type=SimpleNamespace(value=block_type_value)) + for block_type_value, label in type_value_label_pairs + ] + definition = SimpleNamespace(blocks=blocks) + return SimpleNamespace(workflow_definition=definition) + + +def test_any_quiet_block_requested_wait() -> None: + ctx = SimpleNamespace(last_workflow=_workflow_with_block_types(("wait", "pause1"))) + assert _any_quiet_block_requested(ctx, ["pause1"]) is True + + +def test_any_quiet_block_requested_text_prompt() -> None: + ctx = SimpleNamespace(last_workflow=_workflow_with_block_types(("text_prompt", "prompt1"))) + assert _any_quiet_block_requested(ctx, ["prompt1"]) is True + + +def test_any_quiet_block_requested_human_interaction() -> None: + ctx = SimpleNamespace(last_workflow=_workflow_with_block_types(("human_interaction", "wait_for_user"))) + assert _any_quiet_block_requested(ctx, ["wait_for_user"]) is True + + +def test_any_quiet_block_requested_mixed_requested_labels_match_quiet_one() -> None: + """When multiple blocks are requested, having any one quiet type is + enough to disable stagnation for the whole invocation.""" + ctx = SimpleNamespace( + last_workflow=_workflow_with_block_types( + ("navigation", "nav1"), + ("wait", "pause1"), + ("extraction", "extract1"), + ) + ) + assert _any_quiet_block_requested(ctx, ["nav1", "pause1", "extract1"]) is True + + +def test_any_quiet_block_requested_only_task_blocks_returns_false() -> None: + """The normal case: task-heavy workflows produce regular step writes. + Stagnation is safe to enable.""" + ctx = SimpleNamespace( + last_workflow=_workflow_with_block_types( + ("navigation", "nav1"), + ("extraction", "extract1"), + ) + ) + assert _any_quiet_block_requested(ctx, ["nav1", "extract1"]) is False + + +def test_any_quiet_block_requested_label_not_in_requested_ignored() -> None: + """A WAIT block defined in the workflow but not requested in this + invocation must not disable stagnation.""" + ctx = SimpleNamespace( + last_workflow=_workflow_with_block_types( + ("wait", "not_requested_pause"), + ("navigation", "requested_nav"), + ) + ) + assert _any_quiet_block_requested(ctx, ["requested_nav"]) is False + + +def test_any_quiet_block_requested_no_workflow_returns_false() -> None: + """Defensive: no workflow loaded → no bypass. The loop will use its + default stagnation behavior (safe for the common case).""" + ctx = SimpleNamespace(last_workflow=None) + assert _any_quiet_block_requested(ctx, ["anything"]) is False + + +def test_any_quiet_block_requested_empty_labels_returns_false() -> None: + ctx = SimpleNamespace(last_workflow=_workflow_with_block_types(("wait", "pause1"))) + assert _any_quiet_block_requested(ctx, None) is False + assert _any_quiet_block_requested(ctx, []) is False + + +# --------------------------------------------------------------------------- +# Reconciliation guard message: regression guard on "timed out" phrasing. +# The guard itself is tested in test_copilot_cancel_helpers.py; this test is +# specifically about the LLM-facing STRING, which previously said "timed out" +# and read as a retry-invite when combined with LLM priors. +# --------------------------------------------------------------------------- + + +def test_reconciliation_guard_message_does_not_say_timed_out() -> None: + """The reconciliation guard message is what the LLM reads on the *next* + block-running tool call after a watchdog exit. The stagnation/ceiling/ + unfinalized error messages all purge "timed out"; the guard message must + match, otherwise the phrase leaks right back in and the regression is + only cosmetic.""" + ctx = SimpleNamespace( + consecutive_tool_tracker=[], + repeated_action_fingerprint_streak_count=0, + last_test_non_retriable_nav_error=None, + pending_reconciliation_run_id="wr_guarded", + ) + msg = _tool_loop_error(ctx, "update_and_run_blocks") + assert isinstance(msg, str) + assert "timed out" not in msg.lower() + assert "wr_guarded" in msg + assert "get_run_results" in msg