fix(SKY-9163): replace 5-min wall-clock timeout with progress watchdog (#5601)

This commit is contained in:
Andrew Neilson 2026-04-22 15:14:42 -07:00 committed by GitHub
parent 733dc5f779
commit 24971083be
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 707 additions and 108 deletions

View file

@ -6,8 +6,10 @@ import asyncio
import base64
import json
import re
import time
from collections import defaultdict
from typing import Any
from datetime import datetime
from typing import Any, Literal
import structlog
import yaml
@ -42,6 +44,7 @@ from skyvern.forge.sdk.workflow.models.parameter import (
WorkflowParameterType,
)
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus
from skyvern.schemas.workflows import BlockType
from skyvern.webeye.navigation import is_skip_inner_retry_error
LOG = structlog.get_logger()
@ -55,7 +58,24 @@ _FAILED_BLOCK_STATUSES: frozenset[str] = frozenset(
}
)
_DATA_PRODUCING_BLOCK_TYPES = frozenset({"EXTRACTION", "TEXT_PROMPT"})
RUN_BLOCKS_DEBUG_TIMEOUT_SECONDS = 300
# Absolute upper bound on a single ``run_blocks`` tool invocation. Exists only
# as a last-resort trip wire for runaway loops — progressing runs should never
# approach this. The OpenAI Agents SDK wraps the tool in
# ``asyncio.wait_for(..., timeout=RUN_BLOCKS_SAFETY_CEILING_SECONDS)``; the
# inner poll loop leaves a 10 s headroom below this ceiling for orderly
# cleanup before the SDK cancels.
RUN_BLOCKS_SAFETY_CEILING_SECONDS = 1200 # 20 min
# Primary exit condition: seconds of no observed progress across the combined
# run / block / step heartbeat. Sized to accommodate the slowest single LLM
# round-trip (~30-60 s in practice) with headroom; going tighter risks
# false-positives on healthy runs.
RUN_BLOCKS_STAGNATION_WINDOW_SECONDS = 90
# 5 s balances responsiveness (18 samples inside the stagnation window) against
# DB load (240 polls worst case at the safety ceiling).
RUN_BLOCKS_POLL_INTERVAL_SECONDS = 5.0
# Detached cleanup tasks held here so the garbage collector does not drop them
# while they still have work to do, and so the "task exception was never
@ -315,18 +335,20 @@ def _tool_loop_error(ctx: AgentContext, tool_name: str) -> str | None:
# to block-running tools so planning/metadata tools (update_workflow,
# list_credentials, get_run_results) stay unaffected.
if tool_name in _BLOCK_RUNNING_TOOLS:
# Reconciliation guard: a previous block-running tool call timed out
# without reconciling the run's outcome (post-drain status was
# ``canceled``, non-final, or unreadable). Block further block-running
# calls until ``get_run_results`` clears the flag. Prevents the LLM
# from auto-retrying a mutation block whose side effects may have
# landed.
# Reconciliation guard: the previous block-running tool call exited
# without a trustworthy terminal status for its workflow run (the
# watchdog's stagnation / ceiling / task_exit_unfinalized paths, or
# the SKY-9167 post-drain branch where the row read as ``canceled``,
# non-final, or unreadable). Block further block-running calls until
# ``get_run_results`` clears the flag — prevents the LLM from
# auto-retrying a mutation block whose side effects may already
# have landed.
pending_run_id = getattr(ctx, "pending_reconciliation_run_id", None)
if isinstance(pending_run_id, str) and pending_run_id:
return (
f"The previous block-running tool call timed out and run "
f"{pending_run_id} was not reconciled to a trustworthy terminal "
f'status. Call `get_run_results(workflow_run_id="{pending_run_id}")` '
f"The previous block-running tool call for run {pending_run_id} "
f"ended without a trustworthy terminal status. "
f'Call `get_run_results(workflow_run_id="{pending_run_id}")` '
f"first, report the result to the user, then await user input "
f"before running more blocks. This guard prevents duplicate "
f"side effects on live sites."
@ -773,6 +795,155 @@ def _seed_for_frontier(
return labels_to_execute, seed, frontier
# Watchdog exit reasons. ``success`` means the run reached a trustworthy
# terminal status inside the poll loop OR after the post-drain reconcile.
# The three non-success reasons share the reconcile path but produce distinct
# error messages: ``stagnation`` is the primary trip (no progress signals
# for ``RUN_BLOCKS_STAGNATION_WINDOW_SECONDS`` seconds), ``ceiling`` is the
# last-resort budget-exhausted branch, and ``task_exit_unfinalized`` is the
# rare race where ``execute_workflow`` naturally exits before writing a
# terminal row.
WatchdogExitReason = Literal["success", "stagnation", "ceiling", "task_exit_unfinalized"]
# Block types that legitimately execute long silent periods: one DB write on
# entry, work done without intermediate writes (sleep / LLM call / await human
# input), one write on finish. The watchdog can't distinguish these from
# "stuck", so any invocation that includes one disables stagnation for the
# whole run and relies on the safety ceiling alone.
_QUIET_BLOCK_TYPES: frozenset[str] = frozenset(
{BlockType.WAIT.value, BlockType.TEXT_PROMPT.value, BlockType.HUMAN_INTERACTION.value}
)
def _any_quiet_block_requested(
copilot_ctx: CopilotContext,
labels: list[str] | None,
) -> bool:
"""Return True if any of ``labels`` refers to a block whose type is in
``_QUIET_BLOCK_TYPES``. Reuses ``_blocks_by_label`` on the already-loaded
workflow definition no DB call.
"""
if not labels:
return False
last_workflow = getattr(copilot_ctx, "last_workflow", None)
if last_workflow is None:
return False
by_label = _blocks_by_label(getattr(last_workflow, "workflow_definition", None))
for label in labels:
block = by_label.get(label)
if block is None:
continue
block_type = getattr(block, "block_type", None)
if block_type is None:
continue
block_type_str = block_type.value if hasattr(block_type, "value") else str(block_type)
if block_type_str in _QUIET_BLOCK_TYPES:
return True
return False
async def _read_progress_sources(
ctx: CopilotContext,
workflow_run_id: str,
) -> tuple[WorkflowRun | None, datetime | None, datetime | None]:
"""Read one ``workflow_runs`` row + the two progress aggregates needed
by the watchdog marker. Three cheap indexed queries; no row hydration
on the aggregate side. The two repo calls run concurrently they open
separate async sessions and hit different tables.
"""
async def _read_timestamps() -> tuple[datetime | None, datetime | None]:
try:
return await app.DATABASE.tasks.get_workflow_run_progress_timestamps(
workflow_run_id=workflow_run_id,
organization_id=ctx.organization_id,
)
except Exception:
LOG.warning(
"Workflow run progress timestamps read failed",
workflow_run_id=workflow_run_id,
exc_info=True,
)
return None, None
run, (step_ts, block_ts) = await asyncio.gather(
_safe_read_workflow_run(workflow_run_id, ctx.organization_id, context="watchdog-poll"),
_read_timestamps(),
)
return run, step_ts, block_ts
def _progress_marker(
run: WorkflowRun | None,
step_ts: datetime | None,
block_ts: datetime | None,
) -> tuple[Any, ...]:
"""Hashable scalar snapshot. Changes iff any observable progress has
occurred at the run, step, or block level since the last poll. Every
``update_step`` fires during action execution (including incremental
token/cost accumulators at ``forge/agent.py:1449``), so
``max(step.modified_at)`` is the per-LLM-call heartbeat. Non-task blocks
(CODE, TEXT_PROMPT) don't create step rows — ``max(workflow_run_block.modified_at)``
covers that case. ``run.modified_at`` and ``run.status`` catch the
run-level transitions that happen outside those two tables.
"""
return (
run.status if run else None,
run.modified_at if run is not None else None,
step_ts,
block_ts,
)
async def _watchdog_error_message(
exit_reason: WatchdogExitReason,
ctx: AgentContext,
workflow_run_id: str,
run: WorkflowRun | None,
) -> str:
"""Build the LLM-facing error string for a non-success watchdog exit.
Every variant ends with the same reconciliation instruction so the agent
has a consistent next step: call ``get_run_results`` with the run_id to
resolve the outcome before running more blocks. The ``pending_reconciliation_run_id``
guard in ``_tool_loop_error`` enforces that the agent actually does so.
None of the variants contain "timed out" or retry-inviting phrasing
that's the SKY-9163 regression we're fixing.
"""
if exit_reason == "stagnation":
body = (
f"The run has not made progress for {RUN_BLOCKS_STAGNATION_WINDOW_SECONDS}s. "
f"No step, block, or workflow-run row updates were observed in that window. "
f"The page is most likely blocked by a captcha, popup, anti-bot challenge, "
f"hidden validation error, or an infinite-retry loop on an action the agent "
f"cannot detect is failing."
)
elif exit_reason == "ceiling":
body = (
f"The run exceeded the {RUN_BLOCKS_SAFETY_CEILING_SECONDS}s absolute ceiling "
f"while still showing progress. The workflow is too long to fit in a single "
f"tool invocation — split it into smaller block groups."
)
else: # task_exit_unfinalized
last_observed = f"last observed status: {run.status}" if run is not None else "workflow run row was unreadable"
body = (
f"The run ended but did not record a trustworthy terminal status in the "
f"cancellation grace window ({last_observed})."
)
message = (
f"{body} Run ID: {workflow_run_id}. Outcome is uncertain. "
f"Do NOT re-invoke block-running tools in this session without first calling "
f"`get_run_results` with this workflow_run_id and reporting the result to the user."
)
current_url, _ = await _fallback_page_info(ctx)
if current_url:
message += f" Browser was on: {current_url}"
return message
async def _run_blocks_and_collect_debug(
params: dict[str, Any],
ctx: CopilotContext,
@ -908,123 +1079,97 @@ async def _run_blocks_and_collect_debug(
)
)
# Internal poll budget strictly below the SDK timeout. The OpenAI Agents
# SDK wraps this tool in ``asyncio.wait_for(..., timeout=RUN_BLOCKS_DEBUG_TIMEOUT_SECONDS)``
# (see openai-agents-python tool.py:invoke_function_tool). If our poll and
# the SDK timeout share the same budget, the SDK wins the race, cancels
# the tool coroutine, and our orderly cleanup branch below never runs —
# leaving ``run_task`` as an orphan that runs to natural completion.
# Leaving 10 s headroom ensures the orderly path fires first in the
# normal-slow case; the outer ``except asyncio.CancelledError`` covers
# the abnormal case where we get cancelled anyway.
max_poll = max(1, RUN_BLOCKS_DEBUG_TIMEOUT_SECONDS - 10)
poll_interval = 2.0
elapsed = 0.0
final_status = None
run: Any = None
# The OpenAI Agents SDK wraps this tool in
# ``asyncio.wait_for(..., timeout=RUN_BLOCKS_SAFETY_CEILING_SECONDS)``, so
# the inner budget leaves 10 s of headroom for the cancel-drain and
# post-drain reconcile to finish before the SDK's own cancel fires.
#
# Do NOT short-circuit on client disconnect: the agent loop runs to
# completion after the SSE stream is gone so its reply persists
# (SKY-8986); aborting mid-block would strand the run without debug
# output for the final chat message.
initial_run, initial_step_ts, initial_block_ts = await _read_progress_sources(ctx, workflow_run.workflow_run_id)
progress_marker = _progress_marker(initial_run, initial_step_ts, initial_block_ts)
last_progress_monotonic = time.monotonic()
started_monotonic = last_progress_monotonic
budget_seconds = max(1, RUN_BLOCKS_SAFETY_CEILING_SECONDS - 10)
final_status: str | None = None
run: Any = initial_run
exit_reason: WatchdogExitReason | None = None
# Quiet blocks (WAIT/TEXT_PROMPT/HUMAN_INTERACTION) legitimately have
# DB-silent periods; disable stagnation for any invocation that includes
# one. Safety ceiling still applies.
stagnation_enabled = not _any_quiet_block_requested(ctx, labels_to_execute)
try:
while elapsed < max_poll:
await asyncio.sleep(poll_interval)
elapsed += poll_interval
while True:
await asyncio.sleep(RUN_BLOCKS_POLL_INTERVAL_SECONDS)
if run_task.done():
run = await app.DATABASE.workflow_runs.get_workflow_run(
workflow_run_id=workflow_run.workflow_run_id,
organization_id=ctx.organization_id,
)
if run and WorkflowRunStatus(run.status).is_final():
final_status = run.status
break
run, step_ts, block_ts = await _read_progress_sources(ctx, workflow_run.workflow_run_id)
# Deliberately do NOT short-circuit on client disconnect here.
# The agent loop is allowed to run to completion after the SSE
# stream is gone (see SKY-8986) so its reply can be persisted;
# aborting an in-flight block execution would leave the
# workflow run in a limbo state and the agent would have no
# debug output to summarize in the final chat message.
run = await app.DATABASE.workflow_runs.get_workflow_run(
workflow_run_id=workflow_run.workflow_run_id,
organization_id=ctx.organization_id,
)
if run and WorkflowRunStatus(run.status).is_final():
final_status = run.status
exit_reason = "success"
break
if final_status is None:
# Read the row BEFORE the cancel helper runs. The poll interval is
# 2 s, so a legitimate self-finalize (``canceled`` from a user/block
# cancel, or any other terminal) can land in the narrow gap between
# the last poll and entering this branch. Trusting the pre-cancel
# status preserves that signal without ambiguity — nothing we do
# could have written it. Any post-cancel re-read has to exclude
# ``canceled`` because the helper itself writes synthetic
# ``canceled`` (see ``WorkflowRunStatus.is_final_excluding_canceled``).
if run_task.done():
# Row not terminal yet — shared reconcile path below flips
# most of these back to success after post-drain reread.
exit_reason = "task_exit_unfinalized"
break
now = time.monotonic()
new_marker = _progress_marker(run, step_ts, block_ts)
# A run in ``paused`` status (e.g. HumanInteractionBlock) is a
# user-driven wait, not stagnation — never trip.
is_paused = run is not None and run.status == WorkflowRunStatus.paused.value
stagnation_active = stagnation_enabled and not is_paused
if new_marker != progress_marker:
progress_marker = new_marker
last_progress_monotonic = now
elif stagnation_active and now - last_progress_monotonic >= RUN_BLOCKS_STAGNATION_WINDOW_SECONDS:
exit_reason = "stagnation"
break
if now - started_monotonic >= budget_seconds:
exit_reason = "ceiling"
break
if exit_reason is not None and exit_reason != "success":
# Pre-cancel read first: a legitimate self-finalize (user/block
# cancel, or any terminal the run wrote itself) can land between
# the last poll and here, and trusting it avoids the
# synthetic-``canceled`` ambiguity that the post-drain reread
# has to exclude. Then cancel + reread +
# ``_trusted_post_drain_status`` applies SKY-9167's success-race
# recovery uniformly to all three non-success exit reasons.
pre_cancel_run = await _safe_read_workflow_run(
workflow_run.workflow_run_id, ctx.organization_id, context="pre-cancel"
)
if pre_cancel_run is not None and WorkflowRunStatus(pre_cancel_run.status).is_final():
final_status = pre_cancel_run.status
run = pre_cancel_run
exit_reason = "success"
else:
await _cancel_run_task_if_not_final(run_task, workflow_run.workflow_run_id)
# The cancel helper waits for ``execute_workflow``'s shielded
# ``_finalize_workflow_run_status`` to run, which writes the real
# terminal status when the run completed during the drain window.
# Re-read once before declaring timeout so a run that finished
# in those last few seconds is not mis-reported as a failure the
# LLM will retry against a live site.
run = await _safe_read_workflow_run(
workflow_run.workflow_run_id, ctx.organization_id, context="post-drain"
)
trusted = _trusted_post_drain_status(run)
if trusted is not None:
final_status = trusted
else:
# Three distinct cases produce a ``None`` from
# ``_trusted_post_drain_status`` — DB read failed, row
# non-final, or ``canceled`` (legitimate vs synthetic
# indistinguishable at this read). Body differs so logs
# are attributable; the LLM-facing instruction is
# identical: outcome is uncertain, reconcile via
# ``get_run_results`` before re-invoking block-running
# tools. The reconciliation guard below enforces that.
if run is None:
timeout_body = (
f"Block execution exceeded the {max_poll}s tool budget and the workflow "
f"run row could not be re-read after cancellation (see 'Workflow run "
f"re-read failed' log)."
)
elif run.status == WorkflowRunStatus.canceled.value:
timeout_body = (
f"Block execution was cancelled after exceeding the {max_poll}s tool "
f"budget. Side effects of any actions already taken may have landed."
)
else:
timeout_body = (
f"Block execution exceeded the {max_poll}s tool budget and the workflow "
f"run did not reach a terminal status within the cancellation grace "
f"window (last observed status: {run.status})."
)
timeout_msg = (
f"{timeout_body} Run ID: {workflow_run.workflow_run_id}. "
f"Outcome is uncertain. Do NOT re-invoke block-running tools in this "
f"session without first calling `get_run_results` with this "
f"workflow_run_id and reporting the result to the user."
)
current_url, _ = await _fallback_page_info(ctx)
if current_url:
timeout_msg += f" Browser was on: {current_url}"
# Turn-scoped reconciliation guard. Clearing requires an
# explicit ``get_run_results`` read that resolves to this
# run_id with a status that passes
# ``WorkflowRunStatus.is_final_excluding_canceled`` (see
# clearing logic in ``get_run_results_tool``).
ctx.pending_reconciliation_run_id = workflow_run.workflow_run_id
return {"ok": False, "error": timeout_msg}
exit_reason = "success"
if exit_reason != "success":
# Turn-scoped reconciliation guard — cleared only by a
# ``get_run_results`` call that resolves this run_id to an
# ``is_final_excluding_canceled`` status
# (``_maybe_clear_reconciliation_flag``).
ctx.pending_reconciliation_run_id = workflow_run.workflow_run_id
assert exit_reason is not None # narrows for mypy; outer check excludes "success" but not None
error_msg = await _watchdog_error_message(exit_reason, ctx, workflow_run.workflow_run_id, run)
return {"ok": False, "error": error_msg}
except asyncio.CancelledError:
# The SDK's @function_tool(timeout=...) cancelled us mid-poll. Shield
# the cleanup so the parent cancellation can't interrupt it mid-await.
@ -1071,6 +1216,9 @@ async def _run_blocks_and_collect_debug(
await _attach_action_traces(blocks, results, ctx.organization_id)
# final_status is guaranteed set here: every non-success exit returns
# above, and the success path always populates final_status.
assert final_status is not None
run_ok = WorkflowRunStatus(final_status) == WorkflowRunStatus.completed
action_trace_summary: list[str] = []
@ -1765,7 +1913,7 @@ def _run_blocks_span_data(
@function_tool(
name_override="run_blocks_and_collect_debug",
timeout=RUN_BLOCKS_DEBUG_TIMEOUT_SECONDS,
timeout=RUN_BLOCKS_SAFETY_CEILING_SECONDS,
strict_mode=False,
)
async def run_blocks_tool(
@ -1851,7 +1999,7 @@ async def get_run_results_tool(
@function_tool(
name_override="update_and_run_blocks",
timeout=RUN_BLOCKS_DEBUG_TIMEOUT_SECONDS,
timeout=RUN_BLOCKS_SAFETY_CEILING_SECONDS,
strict_mode=False,
)
async def update_and_run_blocks_tool(

View file

@ -199,6 +199,9 @@ class AgentDB(BaseAlchemyDB):
async def get_total_unique_step_order_count_by_task_ids(self, *args: Any, **kwargs: Any) -> Any:
return await self.tasks.get_total_unique_step_order_count_by_task_ids(*args, **kwargs)
async def get_workflow_run_progress_timestamps(self, *args: Any, **kwargs: Any) -> Any:
return await self.tasks.get_workflow_run_progress_timestamps(*args, **kwargs)
async def get_task_step_models(self, *args: Any, **kwargs: Any) -> Any:
return await self.tasks.get_task_step_models(*args, **kwargs)

View file

@ -17,6 +17,7 @@ from skyvern.forge.sdk.db.models import (
StepModel,
TaskModel,
TaskRunModel,
WorkflowRunBlockModel,
WorkflowRunModel,
)
from skyvern.forge.sdk.db.utils import convert_to_step, convert_to_task, hydrate_action, serialize_proxy_location
@ -228,6 +229,39 @@ class TasksRepository(BaseRepository):
row = (await session.execute(query)).one()
return row.total, row.completed
@db_operation("get_workflow_run_progress_timestamps")
async def get_workflow_run_progress_timestamps(
self,
workflow_run_id: str,
organization_id: str | None = None,
) -> tuple[datetime | None, datetime | None]:
"""Return ``(max(step.modified_at), max(workflow_run_block.modified_at))``
for a given workflow run. Both values are scalar aggregates no row
hydration and are designed for the copilot watchdog poll path where
the only question is "has anything changed since the last poll?".
Step updates are the per-LLM-call heartbeat (status transitions plus
incremental token/cost accumulators every ``update_step`` call bumps
``StepModel.modified_at``). Block updates cover non-task block types
(CODE, TEXT_PROMPT) that never create a task row. Together the two
aggregates cover every kind of block the copilot can run.
"""
async with self.Session() as session:
step_stmt = (
select(func.max(StepModel.modified_at))
.join(TaskModel, StepModel.task_id == TaskModel.task_id)
.where(TaskModel.workflow_run_id == workflow_run_id)
.where(StepModel.organization_id == organization_id)
)
block_stmt = (
select(func.max(WorkflowRunBlockModel.modified_at))
.where(WorkflowRunBlockModel.workflow_run_id == workflow_run_id)
.where(WorkflowRunBlockModel.organization_id == organization_id)
)
step_ts = (await session.execute(step_stmt)).scalar_one_or_none()
block_ts = (await session.execute(block_stmt)).scalar_one_or_none()
return step_ts, block_ts
@db_operation("get_total_unique_step_order_count_by_task_ids")
async def get_total_unique_step_order_count_by_task_ids(
self,

View file

@ -0,0 +1,414 @@
"""Tests for the SKY-9163 progress-based watchdog inside
``_run_blocks_and_collect_debug``.
The full function does too much setup (prepare_workflow, execute_workflow,
parameter-binding invariants) to unit-test end-to-end cheaply. Instead we
target the isolated watchdog surface:
- ``_progress_marker`` marker stability and field-change sensitivity.
- ``_read_progress_sources`` correct delegation + graceful handling of
DB failures.
- ``_watchdog_error_message`` the regression-guard strings (no
"timed out", reconciliation-instruction, per-reason body).
Those three are where the SKY-9163 correctness properties live:
1. A stale marker must be exactly equal across two polls when nothing
changed in the DB (otherwise the watchdog would false-reset on every
poll, making stagnation detection impossible).
2. Any change in ``run.status`` / ``run.modified_at`` / ``step_ts`` /
``block_ts`` must produce a new marker (otherwise the watchdog would
false-trip on a progressing run).
3. The error messages must not read as retry-invites that was the
original bug. "timed out" / "likely stuck repeating failing actions"
are the exact phrases the LLM used to read as "try again".
"""
from __future__ import annotations
from datetime import datetime, timezone
from types import SimpleNamespace
from typing import Any
import pytest
from skyvern.forge.sdk.copilot.tools import (
RUN_BLOCKS_SAFETY_CEILING_SECONDS,
RUN_BLOCKS_STAGNATION_WINDOW_SECONDS,
_any_quiet_block_requested,
_progress_marker,
_read_progress_sources,
_tool_loop_error,
_watchdog_error_message,
)
def _fake_run(status: str = "running", modified_at: datetime | None = None) -> Any:
"""A bare-minimum stand-in for ``WorkflowRun`` — the marker only reads
``.status`` and ``.modified_at``.
"""
return SimpleNamespace(
status=status,
modified_at=modified_at or datetime(2026, 4, 21, 12, 0, 0, tzinfo=timezone.utc),
browser_session_id=None,
)
# ---------------------------------------------------------------------------
# _progress_marker: stability + per-field sensitivity.
# ---------------------------------------------------------------------------
def test_progress_marker_stable_for_identical_inputs() -> None:
"""If the DB reports identical values on two successive polls, the marker
must compare equal. A marker that drifts on repeated reads would make the
stagnation window unreachable."""
run = _fake_run()
step_ts = datetime(2026, 4, 21, 12, 0, 30, tzinfo=timezone.utc)
block_ts = datetime(2026, 4, 21, 12, 0, 31, tzinfo=timezone.utc)
m1 = _progress_marker(run, step_ts, block_ts)
m2 = _progress_marker(run, step_ts, block_ts)
assert m1 == m2
def test_progress_marker_changes_on_run_status() -> None:
run1 = _fake_run(status="running")
run2 = _fake_run(status="queued")
assert _progress_marker(run1, None, None) != _progress_marker(run2, None, None)
def test_progress_marker_changes_on_run_modified_at() -> None:
run1 = _fake_run(modified_at=datetime(2026, 4, 21, 12, 0, 0, tzinfo=timezone.utc))
run2 = _fake_run(modified_at=datetime(2026, 4, 21, 12, 0, 1, tzinfo=timezone.utc))
assert _progress_marker(run1, None, None) != _progress_marker(run2, None, None)
def test_progress_marker_changes_on_step_ts() -> None:
run = _fake_run()
t1 = datetime(2026, 4, 21, 12, 0, 0, tzinfo=timezone.utc)
t2 = datetime(2026, 4, 21, 12, 0, 5, tzinfo=timezone.utc)
assert _progress_marker(run, t1, None) != _progress_marker(run, t2, None)
def test_progress_marker_changes_on_block_ts() -> None:
run = _fake_run()
t1 = datetime(2026, 4, 21, 12, 0, 0, tzinfo=timezone.utc)
t2 = datetime(2026, 4, 21, 12, 0, 5, tzinfo=timezone.utc)
assert _progress_marker(run, None, t1) != _progress_marker(run, None, t2)
def test_progress_marker_tolerates_none_run() -> None:
"""A transient DB read failure can return ``run=None``. The marker must
still be hashable and comparable."""
m_none = _progress_marker(None, None, None)
assert m_none == (None, None, None, None)
# Two consecutive failed reads produce equal markers → stagnation clock
# keeps ticking (the right behavior when we can't confirm progress).
assert _progress_marker(None, None, None) == _progress_marker(None, None, None)
# ---------------------------------------------------------------------------
# _read_progress_sources: delegation + graceful DB-failure handling.
# ---------------------------------------------------------------------------
class _FakeTasksRepo:
def __init__(
self,
*,
step_ts: datetime | None = None,
block_ts: datetime | None = None,
raise_on_call: Exception | None = None,
) -> None:
self.step_ts = step_ts
self.block_ts = block_ts
self.raise_on_call = raise_on_call
self.call_count = 0
async def get_workflow_run_progress_timestamps(
self,
*,
workflow_run_id: str,
organization_id: str | None = None,
) -> tuple[datetime | None, datetime | None]:
self.call_count += 1
if self.raise_on_call is not None:
raise self.raise_on_call
return self.step_ts, self.block_ts
class _FakeWorkflowRunsRepo:
def __init__(self, run: Any | None = None, raise_on_call: Exception | None = None) -> None:
self.run = run
self.raise_on_call = raise_on_call
async def get_workflow_run(
self,
*,
workflow_run_id: str,
organization_id: str,
) -> Any:
if self.raise_on_call is not None:
raise self.raise_on_call
return self.run
class _FakeDatabase:
def __init__(self, tasks: _FakeTasksRepo, workflow_runs: _FakeWorkflowRunsRepo) -> None:
self.tasks = tasks
self.workflow_runs = workflow_runs
class _FakeCtx:
organization_id = "o_test"
@pytest.mark.asyncio
async def test_read_progress_sources_returns_run_and_timestamps(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from skyvern.forge import app as forge_app
run = _fake_run()
step_ts = datetime(2026, 4, 21, 12, 0, 10, tzinfo=timezone.utc)
block_ts = datetime(2026, 4, 21, 12, 0, 11, tzinfo=timezone.utc)
db = _FakeDatabase(
tasks=_FakeTasksRepo(step_ts=step_ts, block_ts=block_ts),
workflow_runs=_FakeWorkflowRunsRepo(run=run),
)
monkeypatch.setattr(forge_app, "DATABASE", db)
read_run, read_step_ts, read_block_ts = await _read_progress_sources(_FakeCtx(), "wr_1")
assert read_run is run
assert read_step_ts == step_ts
assert read_block_ts == block_ts
@pytest.mark.asyncio
async def test_read_progress_sources_swallows_workflow_run_errors(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A DB read failure on the workflow-run row must not crash the watchdog —
``_safe_read_workflow_run`` returns None and the poll continues."""
from skyvern.forge import app as forge_app
db = _FakeDatabase(
tasks=_FakeTasksRepo(step_ts=None, block_ts=None),
workflow_runs=_FakeWorkflowRunsRepo(raise_on_call=RuntimeError("DB flake")),
)
monkeypatch.setattr(forge_app, "DATABASE", db)
read_run, read_step_ts, read_block_ts = await _read_progress_sources(_FakeCtx(), "wr_1")
assert read_run is None
assert read_step_ts is None
assert read_block_ts is None
@pytest.mark.asyncio
async def test_read_progress_sources_swallows_progress_timestamps_errors(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A DB read failure on the aggregate timestamps must also not crash — the
caller still gets the run (if readable) and ``None`` for the timestamps.
"""
from skyvern.forge import app as forge_app
run = _fake_run()
db = _FakeDatabase(
tasks=_FakeTasksRepo(raise_on_call=RuntimeError("aggregate query failed")),
workflow_runs=_FakeWorkflowRunsRepo(run=run),
)
monkeypatch.setattr(forge_app, "DATABASE", db)
read_run, read_step_ts, read_block_ts = await _read_progress_sources(_FakeCtx(), "wr_1")
assert read_run is run
assert read_step_ts is None
assert read_block_ts is None
# ---------------------------------------------------------------------------
# _watchdog_error_message: the regression-guard strings.
# ---------------------------------------------------------------------------
class _ErrorCtx:
"""Minimal ``AgentContext`` stand-in for the error-message path — only
``browser_session_id`` is read, and only by ``_fallback_page_info``."""
organization_id = "o_test"
browser_session_id = None
@pytest.mark.asyncio
async def test_stagnation_error_message_does_not_invite_retry() -> None:
"""The exact SKY-9163 bug: the old copy said "likely stuck repeating
failing actions" which the LLM read as "try again". The stagnation
message must explicitly discourage retry."""
msg = await _watchdog_error_message("stagnation", _ErrorCtx(), "wr_test", _fake_run())
assert "timed out" not in msg.lower()
assert "likely stuck repeating" not in msg.lower()
assert str(RUN_BLOCKS_STAGNATION_WINDOW_SECONDS) in msg
assert "Run ID: wr_test" in msg
assert "get_run_results" in msg
assert "Do NOT re-invoke block-running tools" in msg
@pytest.mark.asyncio
async def test_ceiling_error_message_advises_splitting() -> None:
"""The ceiling path is rare (a runaway run that keeps making progress
past 20 min). Its error must tell the LLM to split the workflow, not
retry a longer run won't fit either."""
msg = await _watchdog_error_message("ceiling", _ErrorCtx(), "wr_test", _fake_run())
assert "timed out" not in msg.lower()
assert str(RUN_BLOCKS_SAFETY_CEILING_SECONDS) in msg
assert "split" in msg.lower()
assert "Run ID: wr_test" in msg
assert "get_run_results" in msg
@pytest.mark.asyncio
async def test_task_exit_unfinalized_message_reports_last_observed_status() -> None:
"""When ``execute_workflow`` naturally exits but the row isn't terminal,
the error must name the last-observed status so the LLM has a concrete
anchor for the follow-up ``get_run_results`` call."""
run = _fake_run(status="running")
msg = await _watchdog_error_message("task_exit_unfinalized", _ErrorCtx(), "wr_test", run)
assert "timed out" not in msg.lower()
assert "last observed status: running" in msg
assert "Run ID: wr_test" in msg
assert "get_run_results" in msg
@pytest.mark.asyncio
async def test_task_exit_unfinalized_message_tolerates_unreadable_run() -> None:
"""If the post-drain reread also fails (``run is None``), the message must
still be well-formed and mention the unreadable state rather than
crashing on a ``None.status`` access."""
msg = await _watchdog_error_message("task_exit_unfinalized", _ErrorCtx(), "wr_test", None)
assert "unreadable" in msg.lower()
assert "Run ID: wr_test" in msg
assert "get_run_results" in msg
# ---------------------------------------------------------------------------
# _any_quiet_block_requested: stagnation bypass for block types that
# legitimately do long-silent work. Without this bypass, a WAIT block with
# wait_sec >= 90, a slow TEXT_PROMPT LLM call, or a HumanInteractionBlock
# pausing for user input would be falsely reported as stagnation and the
# tool would cancel a healthy run.
# ---------------------------------------------------------------------------
def _workflow_with_block_types(*type_value_label_pairs: tuple[str, str]) -> Any:
"""Build a minimal `last_workflow`-shaped object that
``_any_quiet_block_requested`` can walk. Each pair is
``(block_type_value, label)`` e.g. ``("wait", "pause1")``.
"""
blocks = [
SimpleNamespace(label=label, block_type=SimpleNamespace(value=block_type_value))
for block_type_value, label in type_value_label_pairs
]
definition = SimpleNamespace(blocks=blocks)
return SimpleNamespace(workflow_definition=definition)
def test_any_quiet_block_requested_wait() -> None:
ctx = SimpleNamespace(last_workflow=_workflow_with_block_types(("wait", "pause1")))
assert _any_quiet_block_requested(ctx, ["pause1"]) is True
def test_any_quiet_block_requested_text_prompt() -> None:
ctx = SimpleNamespace(last_workflow=_workflow_with_block_types(("text_prompt", "prompt1")))
assert _any_quiet_block_requested(ctx, ["prompt1"]) is True
def test_any_quiet_block_requested_human_interaction() -> None:
ctx = SimpleNamespace(last_workflow=_workflow_with_block_types(("human_interaction", "wait_for_user")))
assert _any_quiet_block_requested(ctx, ["wait_for_user"]) is True
def test_any_quiet_block_requested_mixed_requested_labels_match_quiet_one() -> None:
"""When multiple blocks are requested, having any one quiet type is
enough to disable stagnation for the whole invocation."""
ctx = SimpleNamespace(
last_workflow=_workflow_with_block_types(
("navigation", "nav1"),
("wait", "pause1"),
("extraction", "extract1"),
)
)
assert _any_quiet_block_requested(ctx, ["nav1", "pause1", "extract1"]) is True
def test_any_quiet_block_requested_only_task_blocks_returns_false() -> None:
"""The normal case: task-heavy workflows produce regular step writes.
Stagnation is safe to enable."""
ctx = SimpleNamespace(
last_workflow=_workflow_with_block_types(
("navigation", "nav1"),
("extraction", "extract1"),
)
)
assert _any_quiet_block_requested(ctx, ["nav1", "extract1"]) is False
def test_any_quiet_block_requested_label_not_in_requested_ignored() -> None:
"""A WAIT block defined in the workflow but not requested in this
invocation must not disable stagnation."""
ctx = SimpleNamespace(
last_workflow=_workflow_with_block_types(
("wait", "not_requested_pause"),
("navigation", "requested_nav"),
)
)
assert _any_quiet_block_requested(ctx, ["requested_nav"]) is False
def test_any_quiet_block_requested_no_workflow_returns_false() -> None:
"""Defensive: no workflow loaded → no bypass. The loop will use its
default stagnation behavior (safe for the common case)."""
ctx = SimpleNamespace(last_workflow=None)
assert _any_quiet_block_requested(ctx, ["anything"]) is False
def test_any_quiet_block_requested_empty_labels_returns_false() -> None:
ctx = SimpleNamespace(last_workflow=_workflow_with_block_types(("wait", "pause1")))
assert _any_quiet_block_requested(ctx, None) is False
assert _any_quiet_block_requested(ctx, []) is False
# ---------------------------------------------------------------------------
# Reconciliation guard message: regression guard on "timed out" phrasing.
# The guard itself is tested in test_copilot_cancel_helpers.py; this test is
# specifically about the LLM-facing STRING, which previously said "timed out"
# and read as a retry-invite when combined with LLM priors.
# ---------------------------------------------------------------------------
def test_reconciliation_guard_message_does_not_say_timed_out() -> None:
"""The reconciliation guard message is what the LLM reads on the *next*
block-running tool call after a watchdog exit. The stagnation/ceiling/
unfinalized error messages all purge "timed out"; the guard message must
match, otherwise the phrase leaks right back in and the regression is
only cosmetic."""
ctx = SimpleNamespace(
consecutive_tool_tracker=[],
repeated_action_fingerprint_streak_count=0,
last_test_non_retriable_nav_error=None,
pending_reconciliation_run_id="wr_guarded",
)
msg = _tool_loop_error(ctx, "update_and_run_blocks")
assert isinstance(msg, str)
assert "timed out" not in msg.lower()
assert "wr_guarded" in msg
assert "get_run_results" in msg