fix(SKY-9001): harden copilot narrator URL test against CodeQL sub-match (#5570)

This commit is contained in:
Andrew Neilson 2026-04-21 00:35:01 -07:00 committed by GitHub
parent dae0e97e5a
commit e9d9f190d2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 1460 additions and 18 deletions

View file

@ -17,6 +17,7 @@ import {
WorkflowCopilotToolCallUpdate,
WorkflowCopilotToolResultUpdate,
WorkflowCopilotCondensingUpdate,
WorkflowCopilotNarrationUpdate,
WorkflowCopilotChatSender,
WorkflowCopilotChatRequest,
WorkflowCopilotClearProposedWorkflowRequest,
@ -35,7 +36,8 @@ type WorkflowCopilotSsePayload =
| WorkflowCopilotStreamErrorUpdate
| WorkflowCopilotToolCallUpdate
| WorkflowCopilotToolResultUpdate
| WorkflowCopilotCondensingUpdate;
| WorkflowCopilotCondensingUpdate
| WorkflowCopilotNarrationUpdate;
interface ToolActivity {
tool_name: string;
@ -77,7 +79,7 @@ const MessageItem = memo(({ message, footer }: MessageItemProps) => {
return (
<div className="flex items-start gap-3">
<div
className={`flex h-8 w-8 items-center justify-center rounded-full text-xs font-bold text-white ${
className={`flex h-8 w-8 shrink-0 items-center justify-center rounded-full text-xs font-bold text-white ${
message.sender === "ai" ? "bg-blue-600" : "bg-purple-600"
}`}
>
@ -167,6 +169,7 @@ export function WorkflowCopilotChat({
const [inputValue, setInputValue] = useState("");
const [isLoading, setIsLoading] = useState(false);
const [processingStatus, setProcessingStatus] = useState<string>("");
const [latestNarration, setLatestNarration] = useState<string>("");
const [toolActivity, setToolActivity] = useState<ToolActivity[]>([]);
const [isLoadingHistory, setIsLoadingHistory] = useState(false);
const streamingAbortController = useRef<AbortController | null>(null);
@ -453,7 +456,6 @@ export function WorkflowCopilotChat({
}
setIsLoading(false);
setProcessingStatus("");
setToolActivity([]);
streamingAbortController.current?.abort();
};
@ -482,6 +484,7 @@ export function WorkflowCopilotChat({
setInputValue("");
setIsLoading(true);
setProcessingStatus("Starting...");
setLatestNarration("");
setToolActivity([]);
const abortController = new AbortController();
@ -627,6 +630,10 @@ export function WorkflowCopilotChat({
handleProcessingUpdate(payload);
return false;
case "tool_call":
setProcessingStatus(
TOOL_DISPLAY_NAMES[payload.tool_name] ??
payload.tool_name + "...",
);
setToolActivity((prev) => [
...prev,
{
@ -635,10 +642,6 @@ export function WorkflowCopilotChat({
status: "running",
},
]);
setProcessingStatus(
TOOL_DISPLAY_NAMES[payload.tool_name] ??
payload.tool_name + "...",
);
return false;
case "tool_result":
setToolActivity((prev) =>
@ -658,6 +661,11 @@ export function WorkflowCopilotChat({
setProcessingStatus("Condensing context...");
}
return false;
case "narration":
if (payload.narration) {
setLatestNarration(payload.narration);
}
return false;
case "response":
handleResponse(payload);
return true;
@ -688,6 +696,8 @@ export function WorkflowCopilotChat({
pendingMessageId.current = null;
setIsLoading(false);
setProcessingStatus("");
setLatestNarration("");
setToolActivity([]);
}
};
@ -953,23 +963,25 @@ export function WorkflowCopilotChat({
})}
{isLoading && (
<div className="flex items-start gap-3">
<div className="flex h-8 w-8 items-center justify-center rounded-full bg-blue-600 text-xs font-bold text-white">
<div className="flex h-8 w-8 shrink-0 items-center justify-center rounded-full bg-blue-600 text-xs font-bold text-white">
AI
</div>
<div className="flex-1 rounded-lg bg-slate-800 p-3">
<div className="flex items-center gap-2 text-sm text-slate-400">
<div className="flex items-center gap-2 text-sm text-slate-300">
<ReloadIcon className="h-4 w-4 animate-spin" />
<span>{processingStatus || "Processing..."}</span>
<span>
{latestNarration || processingStatus || "Processing..."}
</span>
</div>
{toolActivity.length > 0 && (
<div className="mt-2 space-y-1">
{toolActivity.map((activity, index) => (
<div
key={index}
className="flex items-center gap-1.5 text-xs text-slate-500"
className="flex items-start gap-1.5 text-xs text-slate-500"
>
<span
className={`inline-block h-1.5 w-1.5 rounded-full ${
className={`mt-1.5 inline-block h-1.5 w-1.5 shrink-0 rounded-full ${
activity.status === "running"
? "animate-pulse bg-blue-400"
: activity.status === "success"
@ -977,7 +989,7 @@ export function WorkflowCopilotChat({
: "bg-red-400"
}`}
/>
<span>
<span className="line-clamp-2 min-w-0 flex-1">
{TOOL_DISPLAY_NAMES[activity.tool_name] ??
activity.tool_name}
{activity.summary ? `${activity.summary}` : ""}

View file

@ -56,7 +56,8 @@ export type WorkflowCopilotStreamMessageType =
| "error"
| "tool_call"
| "tool_result"
| "condensing";
| "condensing"
| "narration";
export interface WorkflowCopilotProcessingUpdate {
type: "processing_update";
@ -99,6 +100,13 @@ export interface WorkflowCopilotCondensingUpdate {
status: "started" | "completed";
}
export interface WorkflowCopilotNarrationUpdate {
type: "narration";
narration: string;
iteration: number;
timestamp: string;
}
export interface WorkflowYAMLConversionRequest {
workflow_definition_yaml: string;
workflow_id: string;

View file

@ -4,13 +4,16 @@ from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Any
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel, Field
from skyvern.forge.sdk.copilot.runtime import AgentContext
from skyvern.forge.sdk.workflow.models.workflow import Workflow
if TYPE_CHECKING:
from skyvern.forge.sdk.copilot.narration import NarratorState
class UrlVisit(BaseModel):
url: str
@ -205,3 +208,9 @@ class CopilotContext(AgentContext):
last_action_sequence_fingerprint: str | None = None
pending_action_sequence_fingerprint: str | None = None
repeated_action_fingerprint_streak_count: int = 0
# Populated lazily by ``stream_to_sse`` and reused across enforcement
# iterations so cadence/last-emitted-at survive ``run_with_enforcement``
# retries. Declared here (rather than attached dynamically) so future
# refactors can't strip it silently.
narrator_state: NarratorState | None = None

View file

@ -12,6 +12,7 @@ import structlog
from agents.run import Runner
from skyvern.forge.sdk.copilot.failure_tracking import normalize_failure_reason
from skyvern.forge.sdk.copilot.narration import TransitionKind
from skyvern.forge.sdk.copilot.output_utils import extract_final_text, parse_final_response
from skyvern.forge.sdk.copilot.screenshot_utils import ScreenshotEntry
from skyvern.forge.sdk.copilot.tracing_setup import copilot_span
@ -1047,4 +1048,12 @@ async def run_with_enforcement(
current_input = (
[nudge_msg] if session is not None else _prune_input_list(result.to_input_list()) + [nudge_msg]
)
# Signal the narrator that the agent is re-entering the loop after an
# enforcement correction. stream_to_sse creates the state on the first
# pass; on later passes we poke the transition latch directly so the
# next narration (produced after the next tool round-trip) can describe
# the course-correction.
narrator_state = getattr(ctx, "narrator_state", None)
if narrator_state is not None:
narrator_state.record_transition(TransitionKind.ENFORCEMENT_RETRY)
iteration += 1

View file

@ -0,0 +1,588 @@
"""User-facing progress narration for the workflow copilot.
The main agent loop can run for 1-5 minutes between submit and final reply.
This module watches the agent's tool round-trips, detects meaningful state
transitions, and emits short human-readable sentences over the existing SSE
channel so the user can see "what the copilot is doing" in real time.
Narration is ephemeral -- not persisted to chat history. The frontend clears
it when the final response lands. The narrator LLM runs as a background task
so it never blocks the primary event pump. At most one narration is in flight
at a time; if a second transition fires while the first is still in flight,
it is dropped (cadence is already transition-driven, not spammy).
"""
from __future__ import annotations
import asyncio
import re
import time
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import StrEnum
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
import structlog
from skyvern.forge import app
from skyvern.forge.sdk.schemas.workflow_copilot import (
WorkflowCopilotNarrationUpdate,
WorkflowCopilotStreamMessageType,
)
if TYPE_CHECKING:
from skyvern.forge.sdk.routes.event_source_stream import EventSourceStream
LOG = structlog.get_logger()
# Lower bound on time between narration emissions. The ticket asks for roughly
# one narration every 10-20 seconds; the state-transition trigger sets the
# upper bound loosely (a quiet agent produces none), and this floor prevents
# a burst of transitions (tool cluster + workflow_updated arriving together)
# from producing back-to-back emissions.
MIN_NARRATION_GAP_SECONDS = 10.0
# Cap on how many tool round-trips we hand to the narrator LLM. The narrator
# only needs recent context; keeping this small caps prompt cost.
MAX_TOOL_ACTIVITY_BUFFER = 8
# Tight deadline on the narrator LLM call. On timeout we drop the emission
# rather than delaying narration further.
NARRATOR_TIMEOUT_SECONDS = 8.0
class TransitionKind(StrEnum):
# Ordered by ascending priority: higher-priority transitions overwrite a
# lower-priority pending one within the min-gap window.
TOOL_STARTED = "tool_started"
NEW_TOOL_CLUSTER = "new_tool_cluster"
ENFORCEMENT_RETRY = "enforcement_retry"
NAVIGATION_COMPLETED = "navigation_completed"
TEST_COMPLETED = "test_completed"
WORKFLOW_UPDATED = "workflow_updated"
_TRANSITION_PRIORITY: dict[TransitionKind, int] = {kind: rank for rank, kind in enumerate(TransitionKind)}
@dataclass
class _ToolActivityEntry:
tool_name: str
summary: str
success: bool
iteration: int
# Compact excerpt of the tool's parsed payload (counts, domains, statuses
# -- see extract_tool_details). Gives the narrator concrete nouns.
details: str = ""
@dataclass
class NarratorState:
"""Cadence + buffer state carried across stream_to_sse iterations."""
last_emitted_at: float | None = None
pending_activity: deque[_ToolActivityEntry] = field(default_factory=lambda: deque(maxlen=MAX_TOOL_ACTIVITY_BUFFER))
in_flight_task: asyncio.Task[None] | None = None
pending_transition: TransitionKind | None = None
user_goal: str = ""
# Tool whose tool_called arrived but tool_output hasn't yet. Cleared on
# tool_output so post-tool transitions describe the finished action, not
# the in-flight one.
pending_tool_name: str | None = None
def record_tool(
self,
tool_name: str,
summary: str,
success: bool,
iteration: int,
details: str = "",
) -> None:
self.pending_activity.append(
_ToolActivityEntry(
tool_name=tool_name,
summary=summary,
success=success,
iteration=iteration,
details=details,
)
)
def record_transition(self, kind: TransitionKind) -> None:
if (
self.pending_transition is None
or _TRANSITION_PRIORITY[kind] > _TRANSITION_PRIORITY[self.pending_transition]
):
self.pending_transition = kind
@dataclass(frozen=True)
class _CtxSnapshot:
"""Subset of copilot-context flags the narrator watches for transitions."""
update_workflow_called: bool
test_after_update_done: bool
navigate_called: bool
observation_after_navigate: bool
def snapshot_ctx(ctx: Any) -> _CtxSnapshot:
return _CtxSnapshot(
update_workflow_called=bool(getattr(ctx, "update_workflow_called", False)),
test_after_update_done=bool(getattr(ctx, "test_after_update_done", False)),
navigate_called=bool(getattr(ctx, "navigate_called", False)),
observation_after_navigate=bool(getattr(ctx, "observation_after_navigate", False)),
)
def detect_transitions(
before: _CtxSnapshot,
after: _CtxSnapshot,
tool_name: str,
prior_tool_name: str | None,
) -> list[TransitionKind]:
transitions: list[TransitionKind] = []
if not before.update_workflow_called and after.update_workflow_called:
transitions.append(TransitionKind.WORKFLOW_UPDATED)
if not before.test_after_update_done and after.test_after_update_done:
transitions.append(TransitionKind.TEST_COMPLETED)
if not before.navigate_called and after.navigate_called:
transitions.append(TransitionKind.NAVIGATION_COMPLETED)
if prior_tool_name is not None and tool_name != prior_tool_name:
transitions.append(TransitionKind.NEW_TOOL_CLUSTER)
return transitions
@dataclass(frozen=True)
class _NarratorPromptContext:
"""Frozen snapshot of prompt inputs passed to the background task."""
transition: TransitionKind
activity: list[_ToolActivityEntry]
user_goal: str = ""
pending_tool_name: str | None = None
def should_emit(state: NarratorState, now: float) -> bool:
if state.pending_transition is None:
return False
if state.in_flight_task is not None and not state.in_flight_task.done():
return False
if state.last_emitted_at is not None and (now - state.last_emitted_at) < MIN_NARRATION_GAP_SECONDS:
return False
return True
def schedule_narration(
state: NarratorState,
stream: EventSourceStream,
iteration: int,
) -> None:
"""Kick off a background narration task if the gate allows. Fire-and-drop:
errors, timeouts, and empty responses are swallowed inside the task."""
now = time.monotonic()
if not should_emit(state, now):
return
transition = state.pending_transition
assert transition is not None # guaranteed by should_emit
state.pending_transition = None
# last_emitted_at is advanced only after a narration is actually delivered
# (see _narration_task_body). Advancing here would silence the next 10s of
# valid transitions when a narration fails, times out, or is leak-dropped
# -- a bad trade since the in_flight_task slot already prevents concurrent
# emissions during the LLM call.
# Copy the deque at schedule time so the background task sees a stable
# view while streaming_adapter keeps appending.
prompt_ctx = _NarratorPromptContext(
transition=transition,
activity=list(state.pending_activity),
user_goal=state.user_goal,
pending_tool_name=state.pending_tool_name,
)
task = asyncio.create_task(
_narration_task_body(state=state, stream=stream, iteration=iteration, prompt_ctx=prompt_ctx)
)
state.in_flight_task = task
async def cancel_in_flight(state: NarratorState) -> None:
"""Hard-cancel any in-flight narration task.
Called from ``stream_to_sse``'s finally. A narration LLM call takes ~2-3s;
blocking the final-response send for that window just to let one more
narration land is the wrong trade -- the final assistant message is about
to appear anyway, and on a client disconnect the narration has nowhere to
go. Cancel immediately; fire-and-drop semantics cover the loss.
"""
task = state.in_flight_task
if task is None or task.done():
return
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass
async def _narration_task_body(
state: NarratorState,
stream: EventSourceStream,
iteration: int,
prompt_ctx: _NarratorPromptContext,
) -> None:
transition_value = prompt_ctx.transition.value
try:
try:
narration = await _call_narrator_llm(prompt_ctx)
except asyncio.CancelledError:
raise
except Exception as exc:
LOG.warning("copilot narrator failed, dropping emission", error=str(exc), transition=transition_value)
return
if not narration:
return
try:
await stream.send(
WorkflowCopilotNarrationUpdate(
type=WorkflowCopilotStreamMessageType.NARRATION,
narration=narration,
iteration=iteration,
timestamp=datetime.now(timezone.utc),
)
)
except Exception as exc:
LOG.warning("copilot narrator send failed", error=str(exc), transition=transition_value)
return
# Only advance last_emitted_at after a real delivery. A failed /
# empty / leak-dropped emission leaves the clock where it was so the
# next valid transition can emit immediately instead of waiting 10s
# behind a narration that never reached the user.
state.last_emitted_at = time.monotonic()
finally:
# Release the slot only after the send completes (or errors). Clearing
# earlier opened a window where schedule_narration could spawn a
# second task during the await stream.send, running two narrations
# concurrently.
state.in_flight_task = None
async def _call_narrator_llm(prompt_ctx: _NarratorPromptContext) -> str | None:
"""Invoke a small/fast LLM to produce one user-facing sentence.
Returns None on timeout or when no handler is configured. Never raises;
failures propagate as None so the narration is silently dropped.
"""
handler = _get_narrator_handler()
if handler is None:
return None
prompt = _build_narrator_prompt(prompt_ctx)
try:
# force_dict=False keeps the handler from running its json_repair /
# JSON-dict coercion on a response that's intentionally plain prose.
# With the default force_dict=True the handler raises InvalidLLMResponseType
# on a one-sentence narration and we lose every emission.
response = await asyncio.wait_for(
handler(prompt=prompt, prompt_name="workflow-copilot-narration", force_dict=False),
timeout=NARRATOR_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
LOG.warning(
"copilot narrator timed out",
timeout=NARRATOR_TIMEOUT_SECONDS,
transition=prompt_ctx.transition.value,
)
return None
narration = _extract_narration_text(response)
if not narration:
return None
sanitized = _sanitize_narration(narration)
if _narration_leaks_identifier(sanitized):
# Drop the emission rather than ship an identifier to the user. The
# next transition will get another chance; cadence is transition-driven
# so one dropped sentence just means a slightly longer silence, not a
# bad user experience of copilot jargon bleeding through.
LOG.warning(
"copilot narrator dropped due to identifier leak",
transition=prompt_ctx.transition.value,
preview=sanitized[:120],
)
return None
return sanitized
def _get_narrator_handler() -> Any:
# Reuse SECONDARY_LLM_API_HANDLER so deployments already wired for the
# feasibility gate get narration for free. Returns None when the app
# holder isn't initialized (unit tests, pre-startup).
try:
handler = app.SECONDARY_LLM_API_HANDLER
except (RuntimeError, AttributeError):
return None
return handler
def handler_available() -> bool:
"""Cheap check callers can use to skip all narrator-side bookkeeping
(transition detection, tool-details extraction, state updates) when no
narrator LLM is configured. Resolved once per stream, not per event."""
return _get_narrator_handler() is not None
def _build_narrator_prompt(prompt_ctx: _NarratorPromptContext) -> str:
# Tool names are remapped to user-facing labels before reaching the LLM so
# the model cannot echo raw internal identifiers back at the user. The
# ``details`` field carries concrete nouns (block labels, domains, counts)
# extracted from the tool's parsed payload so the narrator can be specific
# instead of defaulting to filler like "Analyzing results".
activity_lines: list[str] = []
for entry in prompt_ctx.activity:
label = _USER_FACING_TOOL_LABELS.get(entry.tool_name, "running a tool")
status = "ok" if entry.success else "failed"
detail = entry.details.strip()
if len(detail) > 200:
detail = detail[:200].rstrip() + "..."
line = f"- {label} ({status})"
if detail:
line += f": {detail}"
activity_lines.append(line)
transition_label = _TRANSITION_LABELS[prompt_ctx.transition]
activity_block = "\n".join(activity_lines) if activity_lines else "(no tool activity yet)"
goal_snippet = (prompt_ctx.user_goal or "").strip().replace("\n", " ")
if len(goal_snippet) > 240:
goal_snippet = goal_snippet[:240].rstrip() + "..."
goal_block = goal_snippet or "(no goal provided)"
if prompt_ctx.pending_tool_name:
current_action_label = _USER_FACING_TOOL_LABELS.get(prompt_ctx.pending_tool_name, "working on the task")
else:
current_action_label = "no action in flight"
# Return JSON rather than raw prose: the shared LLM handler runs
# json_repair on the response body and coerces unparseable prose to an
# empty string, which silently drops every narration. Asking the model to
# emit {"narration": "..."} keeps json_repair happy and preserves the text.
return (
"You are a narrator for a workflow-building copilot. Write ONE short "
"sentence (max 14 words) describing what the copilot is doing right "
"now, grounded in the user's goal.\n\n"
"Rules (hard):\n"
"- Ground the sentence in the concrete subject from the user's goal "
"(their named target, topic, or product). Prefer the user's own words "
'over vague placeholders like "the site" or "the page".\n'
"- NEVER mention tool names, block names, or any identifier-looking token. "
"Forbidden: anything containing an underscore (e.g. extract_top_post, "
"update_and_run_blocks), camelCase tokens, anything in backticks, anything "
'starting with "via the", JSON/YAML/code, full URLs, or raw IDs.\n'
"- Do not echo untrusted page content verbatim.\n"
'- Use present continuous in user-facing language ("Setting up the '
'workflow", "Extracting the requested fields").\n'
"- If the most recent action failed, say what it is retrying or correcting.\n"
'- Return ONLY a JSON object: {"narration": "<sentence>"}. No prose, no markdown.\n\n'
"Good examples:\n"
' {"narration": "Setting up the workflow."}\n'
' {"narration": "Running the workflow to gather the requested data."}\n'
' {"narration": "Checking the extracted results."}\n'
"Bad examples (do NOT do this):\n"
' {"narration": "Extracting the values via the parse_results block."}\n'
' {"narration": "Running update_and_run_blocks on the workflow."}\n\n'
f"User goal: {goal_block}\n\n"
f"Currently doing: {current_action_label}\n\n"
f"Latest signal: {transition_label}\n\n"
f"Recent activity (most recent last):\n{activity_block}\n\n"
"JSON:"
)
# Agent tool names get remapped before reaching the LLM so internal identifiers
# can't surface via prompt echo. Unknown tools fall back to a generic phrase.
_USER_FACING_TOOL_LABELS: dict[str, str] = {
"update_workflow": "revising the workflow draft",
"update_and_run_blocks": "revising and testing the workflow",
"run_blocks_and_collect_debug": "running a test of the workflow",
"navigate_browser": "opening a page in the browser",
"get_browser_screenshot": "taking a screenshot",
"click": "clicking an element on the page",
"type_text": "filling a field on the page",
"select_option": "choosing an option from a dropdown",
"press_key": "pressing a key",
"scroll": "scrolling the page",
"evaluate": "inspecting the page",
"console_messages": "checking the browser console",
"list_credentials": "checking saved credentials",
"get_block_schema": "looking up workflow block options",
"validate_block": "checking workflow block configuration",
"get_run_results": "checking results of a prior run",
}
_TRANSITION_LABELS: dict[TransitionKind, str] = {
TransitionKind.TOOL_STARTED: "just started a new action",
TransitionKind.NEW_TOOL_CLUSTER: "starting a different kind of work",
TransitionKind.ENFORCEMENT_RETRY: "course-correcting after a check",
TransitionKind.NAVIGATION_COMPLETED: "just finished loading a page",
TransitionKind.TEST_COMPLETED: "just finished a test of the workflow",
TransitionKind.WORKFLOW_UPDATED: "just updated the workflow draft",
}
_MAX_DETAILS_CHARS = 240
def extract_tool_details(tool_name: str, parsed: dict[str, Any]) -> str:
"""Compact narrator-friendly excerpt from a tool's parsed payload.
Intentionally narrow: counts, domains, and high-level statuses only.
Raw labels (block names, field names, URL paths, page content) are excluded
so they can't reach the narrator prompt and be echoed at the user.
"""
if not isinstance(parsed, dict):
return ""
if not parsed.get("ok", True):
return "last action failed"
data = parsed.get("data")
if not isinstance(data, dict):
return ""
if tool_name == "update_workflow" or tool_name == "update_and_run_blocks":
return _format_step_status(data.get("block_count"), data)
if tool_name == "run_blocks_and_collect_debug":
executed = data.get("executed_block_labels") or [
b.get("label") for b in data.get("blocks", []) if isinstance(b, dict)
]
executed_count = sum(1 for label in executed if label)
return _format_step_status(executed_count, data)
if tool_name == "navigate_browser":
return _format_url_detail(parsed.get("url") or data.get("url"), "domain")
if tool_name == "get_browser_screenshot":
return _format_url_detail(data.get("url"), "on")
if tool_name == "get_run_results":
return f"{len(data)} extracted field(s)" if data else ""
if tool_name == "validate_block":
valid = data.get("valid")
if valid is True:
return "configuration valid"
if valid is False:
return "configuration invalid"
return ""
if tool_name == "list_credentials":
return _format_int_count(data, "credential")
if tool_name == "get_block_schema":
return _format_int_count(data, "step type")
return ""
def _format_step_status(count: Any, data: dict[str, Any]) -> str:
parts: list[str] = []
if isinstance(count, int) and count:
parts.append(f"{count} step(s)")
status = data.get("overall_status") or data.get("status")
if isinstance(status, str) and status:
parts.append(f"status: {status}")
return _bound(" - ".join(parts))
def _format_url_detail(url: Any, prefix: str) -> str:
if isinstance(url, str):
return f"{prefix}: {_domain_only(url)}"
return ""
def _format_int_count(data: dict[str, Any], noun: str) -> str:
count = data.get("count")
if isinstance(count, int):
return f"{count} {noun}(s)"
return ""
def _domain_only(url: str) -> str:
# Narrator sees only the host. Prevents query-string / path content from
# leaking into output.
try:
host = urlparse(url).hostname
except ValueError:
host = None
if host:
return host[:80]
# Fallback for schemeless or malformed inputs that urlparse returns ""/None for.
return url.split("://", 1)[-1].split("/", 1)[0].split("?", 1)[0][:80]
def _bound(text: str) -> str:
return text[:_MAX_DETAILS_CHARS]
def _extract_narration_text(response: Any) -> str | None:
"""Pull a plain string from whatever the LLM handler returned.
Handlers may return a str, a dict with ``user_response``/``content``, or
some other shape depending on experimentation wiring. Fall back to str()
only when nothing structured is recognizable.
"""
if isinstance(response, str):
return response.strip() or None
if isinstance(response, dict):
for key in ("narration", "sentence", "user_response", "content", "text"):
value = response.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return None
return None
# Narration sanitization: trim, strip trailing quotes/fences the model might
# have included, collapse whitespace, and enforce a hard length ceiling.
# Belt-and-braces layer in addition to the prompt rules.
_MAX_NARRATION_CHARS = 180
_NARRATION_DELIMITERS = ("```", '"', "'")
def _sanitize_narration(text: str) -> str:
cleaned = text.strip()
for delim in _NARRATION_DELIMITERS:
if cleaned.startswith(delim):
cleaned = cleaned[len(delim) :].lstrip()
if cleaned.endswith(delim):
cleaned = cleaned[: -len(delim)].rstrip()
cleaned = " ".join(cleaned.split())
if len(cleaned) > _MAX_NARRATION_CHARS:
cleaned = cleaned[:_MAX_NARRATION_CHARS].rstrip() + "..."
return cleaned
# Any token that looks like an internal identifier: snake_case, camelCase with
# at least one lowercase-then-uppercase boundary, kebab-case with 3+ segments
# (to spare ordinary English compounds like "follow-up"), or anything wrapped
# in backticks. Belt-and-braces guard on top of the prompt rules: if the model
# still sneaks a block/tool name through, the narration is dropped rather
# than shipped. False positives are cheap (one silent cadence slot) while a
# missed leak ships jargon to the user.
_IDENTIFIER_LEAK_PATTERNS = (
re.compile(r"[A-Za-z][A-Za-z0-9]*_[A-Za-z0-9_]+"),
re.compile(r"\b[a-z][a-z0-9]*[A-Z][A-Za-z0-9]+\b"),
re.compile(r"\b[a-z][a-z0-9]+(?:-[a-z0-9]+){2,}\b"),
re.compile(r"`[^`]+`"),
re.compile(r"\bvia the\b", re.IGNORECASE),
)
def _narration_leaks_identifier(narration: str) -> bool:
return any(pattern.search(narration) for pattern in _IDENTIFIER_LEAK_PATTERNS)

View file

@ -11,6 +11,16 @@ import structlog
# Reuse the HTTP-logging redactor so SSE tool inputs and request-body logs
# share one exact-match sensitive-key policy.
from skyvern.forge.request_logging import redact_sensitive_fields
from skyvern.forge.sdk.copilot.narration import (
NarratorState,
TransitionKind,
cancel_in_flight,
detect_transitions,
extract_tool_details,
handler_available,
schedule_narration,
snapshot_ctx,
)
from skyvern.forge.sdk.copilot.output_utils import summarize_tool_result
from skyvern.forge.sdk.schemas.workflow_copilot import (
WorkflowCopilotStreamMessageType,
@ -68,6 +78,17 @@ async def stream_to_sse(
# carry the same iteration value; it advances after the matching result.
iteration = 0
# Narrator state persists across enforcement iterations via ctx so
# cadence (last_emitted_at, min-gap) survives run_with_enforcement retries.
# Resolve handler availability once so per-event narrator bookkeeping
# (snapshot, detect, extract) is skipped when no narrator LLM is configured.
narrator_enabled = handler_available()
narrator_state: NarratorState = getattr(ctx, "narrator_state", None) or NarratorState()
ctx.narrator_state = narrator_state
user_message = getattr(ctx, "user_message", "") or ""
if user_message and not narrator_state.user_goal:
narrator_state.user_goal = user_message
try:
async for event in result.stream_events():
if not isinstance(event, RunItemStreamEvent):
@ -105,17 +126,29 @@ async def stream_to_sse(
)
)
# First narration lands here (~seconds after submit) rather
# than waiting for tool_output of a long tool.
if narrator_enabled:
narrator_state.pending_tool_name = tool_name
narrator_state.record_transition(TransitionKind.TOOL_STARTED)
schedule_narration(narrator_state, stream, iteration)
elif event.name == "tool_output":
raw = event.item.raw_item
call_id = _get_raw_field(raw, "call_id") or _get_raw_field(raw, "id") or ""
tool_name = call_id_to_name.get(call_id, "unknown")
# Clear pending_tool_name so post-tool transitions describe
# what the agent just finished, not what it's still doing.
narrator_state.pending_tool_name = None
output = getattr(event.item, "output", None)
parsed = parse_tool_output(output)
# Compute summary/success unconditionally: the narrator path
# below also needs them, and the work is cheap (no I/O).
summary = summarize_tool_result(tool_name, parsed)
success = parsed.get("ok", True)
if not client_gone:
summary = summarize_tool_result(tool_name, parsed)
success = parsed.get("ok", True)
await stream.send(
WorkflowCopilotToolResultUpdate(
type=WorkflowCopilotStreamMessageType.TOOL_RESULT,
@ -127,7 +160,26 @@ async def stream_to_sse(
)
)
_update_enforcement_from_tool(ctx, tool_name, parsed)
if narrator_enabled:
ctx_before = snapshot_ctx(ctx)
_update_enforcement_from_tool(ctx, tool_name, parsed)
ctx_after = snapshot_ctx(ctx)
prior_tool_name = (
narrator_state.pending_activity[-1].tool_name if narrator_state.pending_activity else None
)
narrator_state.record_tool(
tool_name=tool_name,
summary=summary,
success=success,
iteration=iteration,
details=extract_tool_details(tool_name, parsed),
)
for transition in detect_transitions(ctx_before, ctx_after, tool_name, prior_tool_name):
narrator_state.record_transition(transition)
schedule_narration(narrator_state, stream, iteration)
else:
_update_enforcement_from_tool(ctx, tool_name, parsed)
iteration += 1
except asyncio.CancelledError:
# Real cancellation (server shutdown, upstream abort). Propagate so
@ -135,6 +187,10 @@ async def stream_to_sse(
# run to free provider resources.
result.cancel()
raise
finally:
# Cancel any in-flight narration before the stream tears down so
# tasks don't try to send on a disconnected channel.
await cancel_in_flight(narrator_state)
def _get_raw_field(raw: Any, key: str) -> Any:

View file

@ -67,6 +67,7 @@ class WorkflowCopilotStreamMessageType(StrEnum):
TOOL_CALL = "tool_call"
TOOL_RESULT = "tool_result"
CONDENSING = "condensing"
NARRATION = "narration"
class WorkflowCopilotProcessingUpdate(BaseModel):
@ -120,6 +121,20 @@ class WorkflowCopilotCondensingUpdate(BaseModel):
status: str = Field(..., description="Condensing status: 'started' or 'completed'")
class WorkflowCopilotNarrationUpdate(BaseModel):
# Ephemeral, user-facing one-sentence status line emitted periodically while
# the agent runs. Distinct from PROCESSING_UPDATE (terse status text) so the
# frontend can style narration as a separate "thinking" channel. Not
# persisted to chat history -- reload shows only user and final-assistant
# rows.
type: WorkflowCopilotStreamMessageType = Field(
WorkflowCopilotStreamMessageType.NARRATION, description="Message type"
)
narration: str = Field(..., description="One-sentence user-facing progress narration")
iteration: int = Field(..., description="Agent loop iteration number this narration describes")
timestamp: datetime = Field(..., description="Server timestamp")
class WorkflowYAMLConversionRequest(BaseModel):
workflow_definition_yaml: str = Field(..., description="Workflow definition YAML to convert to blocks")
workflow_id: str = Field(..., description="Workflow ID")

View file

@ -0,0 +1,659 @@
"""Tests for the copilot narration layer (SKY-9001).
The narrator runs as a background task that consumes tool round-trips from the
agent stream and emits one-sentence user-facing progress lines over SSE. These
tests exercise the state machine, the emit gate, and the fire-and-drop
failure semantics -- the narrator must never be able to crash the agent run.
"""
from __future__ import annotations
import asyncio
from types import SimpleNamespace
from typing import Any
import pytest
from skyvern.forge.sdk.copilot import narration
from skyvern.forge.sdk.copilot.narration import (
MIN_NARRATION_GAP_SECONDS,
NarratorState,
TransitionKind,
_build_narrator_prompt,
_extract_narration_text,
_NarratorPromptContext,
_sanitize_narration,
cancel_in_flight,
detect_transitions,
schedule_narration,
should_emit,
snapshot_ctx,
)
def _ctx(
update_workflow_called: bool = False,
test_after_update_done: bool = False,
navigate_called: bool = False,
observation_after_navigate: bool = False,
) -> SimpleNamespace:
return SimpleNamespace(
update_workflow_called=update_workflow_called,
test_after_update_done=test_after_update_done,
navigate_called=navigate_called,
observation_after_navigate=observation_after_navigate,
)
# ---------------------------------------------------------------------------
# detect_transitions
# ---------------------------------------------------------------------------
def test_detect_transitions_workflow_updated_on_false_to_true() -> None:
before = snapshot_ctx(_ctx(update_workflow_called=False))
after = snapshot_ctx(_ctx(update_workflow_called=True))
result = detect_transitions(before, after, tool_name="update_workflow", prior_tool_name="evaluate")
assert TransitionKind.WORKFLOW_UPDATED in result
def test_detect_transitions_test_completed() -> None:
before = snapshot_ctx(_ctx(test_after_update_done=False))
after = snapshot_ctx(_ctx(test_after_update_done=True))
result = detect_transitions(before, after, tool_name="run_blocks_and_collect_debug", prior_tool_name=None)
assert TransitionKind.TEST_COMPLETED in result
def test_detect_transitions_navigation_completed() -> None:
before = snapshot_ctx(_ctx(navigate_called=False))
after = snapshot_ctx(_ctx(navigate_called=True))
result = detect_transitions(before, after, tool_name="navigate_browser", prior_tool_name=None)
assert TransitionKind.NAVIGATION_COMPLETED in result
def test_detect_transitions_new_tool_cluster_only_on_change() -> None:
before = snapshot_ctx(_ctx())
after = snapshot_ctx(_ctx())
assert TransitionKind.NEW_TOOL_CLUSTER in detect_transitions(before, after, "click", prior_tool_name="evaluate")
assert TransitionKind.NEW_TOOL_CLUSTER not in detect_transitions(before, after, "click", prior_tool_name="click")
# First tool (prior is None) does not count as a cluster transition -- the
# agent is just starting up, not changing course.
assert TransitionKind.NEW_TOOL_CLUSTER not in detect_transitions(before, after, "click", prior_tool_name=None)
def test_detect_transitions_unchanged_ctx_produces_empty() -> None:
# Same ctx before and after, same tool name: no transitions at all.
before = snapshot_ctx(_ctx())
after = snapshot_ctx(_ctx())
assert detect_transitions(before, after, "click", prior_tool_name="click") == []
# ---------------------------------------------------------------------------
# NarratorState.record_transition priority
# ---------------------------------------------------------------------------
def test_record_transition_first_one_wins_when_same_priority() -> None:
state = NarratorState()
state.record_transition(TransitionKind.NEW_TOOL_CLUSTER)
state.record_transition(TransitionKind.NEW_TOOL_CLUSTER)
assert state.pending_transition == TransitionKind.NEW_TOOL_CLUSTER
def test_record_transition_higher_priority_overrides_lower() -> None:
state = NarratorState()
state.record_transition(TransitionKind.NEW_TOOL_CLUSTER)
state.record_transition(TransitionKind.WORKFLOW_UPDATED)
assert state.pending_transition == TransitionKind.WORKFLOW_UPDATED
def test_record_transition_lower_priority_does_not_override() -> None:
state = NarratorState()
state.record_transition(TransitionKind.WORKFLOW_UPDATED)
state.record_transition(TransitionKind.NEW_TOOL_CLUSTER)
assert state.pending_transition == TransitionKind.WORKFLOW_UPDATED
def test_record_tool_truncates_to_buffer_cap() -> None:
state = NarratorState()
for i in range(narration.MAX_TOOL_ACTIVITY_BUFFER + 5):
state.record_tool(tool_name=f"t{i}", summary="s", success=True, iteration=i)
assert len(state.pending_activity) == narration.MAX_TOOL_ACTIVITY_BUFFER
# Oldest entries are dropped.
assert state.pending_activity[0].tool_name == "t5"
# ---------------------------------------------------------------------------
# should_emit gate
# ---------------------------------------------------------------------------
def test_should_emit_false_without_pending_transition() -> None:
state = NarratorState(last_emitted_at=None)
assert should_emit(state, now=100.0) is False
def test_should_emit_false_when_in_flight_not_done() -> None:
async def _pending() -> None:
await asyncio.sleep(60)
async def _run() -> bool:
task = asyncio.create_task(_pending())
try:
state = NarratorState(
last_emitted_at=None, in_flight_task=task, pending_transition=TransitionKind.WORKFLOW_UPDATED
)
return should_emit(state, now=100.0)
finally:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert asyncio.run(_run()) is False
def test_should_emit_false_inside_min_gap_window() -> None:
state = NarratorState(last_emitted_at=100.0, pending_transition=TransitionKind.WORKFLOW_UPDATED)
assert should_emit(state, now=100.0 + (MIN_NARRATION_GAP_SECONDS - 1)) is False
def test_should_emit_true_after_min_gap_elapsed() -> None:
state = NarratorState(last_emitted_at=100.0, pending_transition=TransitionKind.WORKFLOW_UPDATED)
assert should_emit(state, now=100.0 + MIN_NARRATION_GAP_SECONDS + 0.01) is True
def test_should_emit_true_on_first_emission_with_transition() -> None:
# No prior emission (last_emitted_at is None) + pending transition + no
# in-flight task = green light.
state = NarratorState(last_emitted_at=None, pending_transition=TransitionKind.WORKFLOW_UPDATED)
assert should_emit(state, now=0.0) is True
# ---------------------------------------------------------------------------
# _sanitize_narration
# ---------------------------------------------------------------------------
def test_sanitize_strips_quotes() -> None:
assert _sanitize_narration('"Checking the login form"') == "Checking the login form"
assert _sanitize_narration("'Checking the login form'") == "Checking the login form"
def test_sanitize_strips_code_fences() -> None:
assert _sanitize_narration("```Checking the login form```") == "Checking the login form"
def test_sanitize_collapses_whitespace() -> None:
assert _sanitize_narration("Checking the\n login\tform") == "Checking the login form"
def test_sanitize_truncates_over_cap() -> None:
out = _sanitize_narration("x" * 500)
assert out.endswith("...")
assert len(out) <= narration._MAX_NARRATION_CHARS + len("...")
# ---------------------------------------------------------------------------
# _extract_narration_text
# ---------------------------------------------------------------------------
def test_extract_narration_from_str() -> None:
assert _extract_narration_text(" hello ") == "hello"
assert _extract_narration_text("") is None
def test_extract_narration_from_dict_priority_keys() -> None:
assert _extract_narration_text({"narration": "a"}) == "a"
assert _extract_narration_text({"sentence": "b"}) == "b"
assert _extract_narration_text({"user_response": "c"}) == "c"
assert _extract_narration_text({"content": "d"}) == "d"
assert _extract_narration_text({"text": "e"}) == "e"
def test_extract_narration_unknown_shape_returns_none() -> None:
assert _extract_narration_text({"unrelated": "x"}) is None
assert _extract_narration_text(42) is None
assert _extract_narration_text(None) is None
# ---------------------------------------------------------------------------
# _narration_leaks_identifier
# ---------------------------------------------------------------------------
def test_leak_guard_flags_snake_case_tokens() -> None:
leaks = narration._narration_leaks_identifier
assert leaks("Running the extract_top_post block.") is True
assert leaks("Calling update_and_run_blocks on the workflow.") is True
assert leaks("Extracting via the extract_top_post block.") is True
def test_leak_guard_flags_backtick_identifiers() -> None:
assert narration._narration_leaks_identifier("Running the `extract_top_post` block.") is True
def test_leak_guard_flags_via_the_phrasing() -> None:
# "via the ... block" phrasing correlates strongly with the LLM echoing
# an identifier back even if the identifier itself slipped the regex.
assert narration._narration_leaks_identifier("Extracting via the top post block.") is True
def test_leak_guard_flags_camel_case_tokens() -> None:
leaks = narration._narration_leaks_identifier
assert leaks("Running the extractTopPost block.") is True
assert leaks("Calling updateAndRunBlocks on the workflow.") is True
def test_leak_guard_flags_kebab_case_tokens() -> None:
leaks = narration._narration_leaks_identifier
# 3+ hyphen segments = identifier-shaped. Two-segment compounds like
# "follow-up" stay legit English.
assert leaks("Running the extract-top-post step.") is True
assert leaks("Invoking update-and-run-blocks.") is True
def test_leak_guard_accepts_clean_sentences() -> None:
leaks = narration._narration_leaks_identifier
assert leaks("Setting up the workflow.") is False
assert leaks("Extracting the requested fields.") is False
assert leaks("Running the workflow to find today's top post.") is False
# Ordinary English hyphenated compounds are not identifiers.
assert leaks("Following up on the results.") is False
assert leaks("Double-checking the output.") is False
# ---------------------------------------------------------------------------
# extract_tool_details — no identifier-looking tokens leave this function
# ---------------------------------------------------------------------------
def test_extract_tool_details_update_workflow_excludes_block_names() -> None:
parsed = {
"ok": True,
"data": {
"block_count": 2,
"blocks": [{"label": "open_target_page"}, {"label": "extract_values"}],
"overall_status": "succeeded",
},
}
details = narration.extract_tool_details("update_and_run_blocks", parsed)
assert "open_target_page" not in details
assert "extract_values" not in details
assert "2 step(s)" in details
assert "status: succeeded" in details
def test_extract_tool_details_run_blocks_excludes_executed_labels() -> None:
parsed = {
"ok": True,
"data": {
"executed_block_labels": ["open_hn", "extract_top"],
"overall_status": "succeeded",
},
}
details = narration.extract_tool_details("run_blocks_and_collect_debug", parsed)
assert "open_hn" not in details
assert "extract_top" not in details
assert "2 step(s)" in details
def test_extract_tool_details_navigate_uses_domain_only() -> None:
parsed = {"ok": True, "data": {"url": "https://sub.example.com/items?id=12345&token=secret"}}
details = narration.extract_tool_details("navigate_browser", parsed)
assert details == "domain: sub.example.com"
def test_extract_tool_details_navigate_strips_userinfo_and_port() -> None:
parsed = {
"ok": True,
"data": {"url": "https://user:secret@host.example.com:8443/private/path?auth=abc"},
}
details = narration.extract_tool_details("navigate_browser", parsed)
assert details == "domain: host.example.com"
def test_extract_tool_details_navigate_ignores_redirect_like_urls() -> None:
# Decoy authority inside the query string must not become the reported
# host. This is the exact shape CodeQL's incomplete-url-substring-match
# rule was pointing at.
parsed = {
"ok": True,
"data": {"url": "https://attacker.example.com/?redirect=https://victim.example.com/path"},
}
details = narration.extract_tool_details("navigate_browser", parsed)
assert details == "domain: attacker.example.com"
def test_extract_tool_details_get_run_results_drops_field_names() -> None:
parsed = {
"ok": True,
"data": {"rank": 1, "title": "x", "url": "y", "points": 10, "author": "a"},
}
details = narration.extract_tool_details("get_run_results", parsed)
# Field names in user's extracted data may include arbitrary strings; hide
# them behind a count so the narrator can't echo private field names back.
assert "rank" not in details
assert "title" not in details
assert "5 extracted field(s)" in details
def test_extract_tool_details_failure_is_generic() -> None:
details = narration.extract_tool_details(
"update_and_run_blocks",
{"ok": False, "error": "AgentTool failed: secret_key_123_invalid"},
)
# Raw error payload may include secrets / internal identifiers -- we keep
# it vague so it can't reach the narrator prompt.
assert "secret_key_123" not in details
assert "failed" in details.lower()
# ---------------------------------------------------------------------------
# _build_narrator_prompt — redacts raw tool identifiers
# ---------------------------------------------------------------------------
def test_prompt_does_not_leak_raw_tool_names() -> None:
state = NarratorState()
state.record_tool(tool_name="update_workflow", summary="wrote 3 blocks", success=True, iteration=0)
state.record_tool(tool_name="run_blocks_and_collect_debug", summary="ran successfully", success=True, iteration=1)
prompt = _build_narrator_prompt(
_NarratorPromptContext(
transition=TransitionKind.WORKFLOW_UPDATED,
activity=list(state.pending_activity),
)
)
# Raw internal tool names must not appear in the prompt we send to the LLM.
# Instead their user-facing labels do.
assert "update_workflow" not in prompt
assert "run_blocks_and_collect_debug" not in prompt
assert "revising the workflow draft" in prompt
assert "running a test of the workflow" in prompt
def test_prompt_handles_unknown_tool_via_generic_label() -> None:
entry = narration._ToolActivityEntry(
tool_name="some_future_tool",
summary="did a thing",
success=True,
iteration=0,
)
prompt = _build_narrator_prompt(
_NarratorPromptContext(transition=TransitionKind.NEW_TOOL_CLUSTER, activity=[entry])
)
assert "some_future_tool" not in prompt
assert "running a tool" in prompt
def test_prompt_truncates_long_tool_summaries() -> None:
entry = narration._ToolActivityEntry(
tool_name="evaluate",
summary="x" * 500,
success=True,
iteration=0,
details="x" * 500,
)
prompt = _build_narrator_prompt(
_NarratorPromptContext(transition=TransitionKind.NEW_TOOL_CLUSTER, activity=[entry])
)
# Snippet is capped at 200 chars + ellipsis. The prompt length overall is
# well under the raw details length.
assert "x" * 300 not in prompt
# ---------------------------------------------------------------------------
# schedule_narration + _narration_task_body — end-to-end
# ---------------------------------------------------------------------------
class _FakeStream:
"""Minimal EventSourceStream stand-in for narration tests."""
def __init__(self, send_ok: bool = True) -> None:
self.send_ok = send_ok
self.sent: list[Any] = []
async def send(self, payload: Any) -> bool:
self.sent.append(payload)
return self.send_ok
async def is_disconnected(self) -> bool:
return False
async def _install_handler(monkeypatch: pytest.MonkeyPatch, handler: Any) -> None:
monkeypatch.setattr(narration, "_get_narrator_handler", lambda: handler)
@pytest.mark.asyncio
async def test_schedule_narration_no_op_when_no_transition() -> None:
state = NarratorState()
stream = _FakeStream()
schedule_narration(state, stream, iteration=0) # type: ignore[arg-type]
assert state.in_flight_task is None
assert stream.sent == []
@pytest.mark.asyncio
async def test_schedule_narration_emits_on_happy_path(monkeypatch: pytest.MonkeyPatch) -> None:
async def _handler(prompt: str, prompt_name: str, **kwargs: object) -> str:
assert prompt_name == "workflow-copilot-narration"
return "Revising the workflow draft."
await _install_handler(monkeypatch, _handler)
state = NarratorState()
state.record_transition(TransitionKind.WORKFLOW_UPDATED)
stream = _FakeStream()
assert state.last_emitted_at is None
schedule_narration(state, stream, iteration=3) # type: ignore[arg-type]
assert state.in_flight_task is not None
await state.in_flight_task
assert state.in_flight_task is None
assert len(stream.sent) == 1
payload = stream.sent[0]
assert payload.narration == "Revising the workflow draft."
assert payload.iteration == 3
# Transition is consumed once scheduled.
assert state.pending_transition is None
# Clock advanced only after the SSE frame was delivered.
assert state.last_emitted_at is not None
@pytest.mark.asyncio
async def test_schedule_narration_keeps_clock_frozen_on_failure(monkeypatch: pytest.MonkeyPatch) -> None:
"""A failed narration must not advance ``last_emitted_at``. Advancing would
silence the next ``MIN_NARRATION_GAP_SECONDS`` of valid transitions even
though no user-visible narration was actually delivered."""
async def _raising_handler(prompt: str, prompt_name: str, **kwargs: object) -> str:
raise RuntimeError("provider down")
await _install_handler(monkeypatch, _raising_handler)
state = NarratorState()
state.record_transition(TransitionKind.WORKFLOW_UPDATED)
stream = _FakeStream()
schedule_narration(state, stream, iteration=1) # type: ignore[arg-type]
assert state.in_flight_task is not None
await state.in_flight_task
assert state.last_emitted_at is None, "clock must stay frozen when no narration was delivered"
# The slot is released so the next transition can schedule immediately.
assert state.in_flight_task is None
@pytest.mark.asyncio
async def test_schedule_narration_swallows_handler_exception(
monkeypatch: pytest.MonkeyPatch,
caplog: pytest.LogCaptureFixture,
) -> None:
async def _raising_handler(prompt: str, prompt_name: str, **kwargs: object) -> str:
raise RuntimeError("provider down")
await _install_handler(monkeypatch, _raising_handler)
state = NarratorState()
state.record_transition(TransitionKind.WORKFLOW_UPDATED)
stream = _FakeStream()
schedule_narration(state, stream, iteration=1) # type: ignore[arg-type]
assert state.in_flight_task is not None
# Awaiting the task should not raise -- errors are swallowed inside.
await state.in_flight_task
assert state.in_flight_task is None
assert stream.sent == []
@pytest.mark.asyncio
async def test_schedule_narration_drops_on_timeout(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(narration, "NARRATOR_TIMEOUT_SECONDS", 0.05)
async def _slow_handler(prompt: str, prompt_name: str, **kwargs: object) -> str:
await asyncio.sleep(1.0)
return "too late"
await _install_handler(monkeypatch, _slow_handler)
state = NarratorState()
state.record_transition(TransitionKind.WORKFLOW_UPDATED)
stream = _FakeStream()
schedule_narration(state, stream, iteration=2) # type: ignore[arg-type]
assert state.in_flight_task is not None
await state.in_flight_task
assert state.in_flight_task is None
assert stream.sent == []
@pytest.mark.asyncio
async def test_schedule_narration_drops_empty_response(monkeypatch: pytest.MonkeyPatch) -> None:
async def _blank_handler(prompt: str, prompt_name: str, **kwargs: object) -> str:
return " "
await _install_handler(monkeypatch, _blank_handler)
state = NarratorState()
state.record_transition(TransitionKind.TEST_COMPLETED)
stream = _FakeStream()
schedule_narration(state, stream, iteration=4) # type: ignore[arg-type]
assert state.in_flight_task is not None
await state.in_flight_task
assert stream.sent == []
assert state.in_flight_task is None
@pytest.mark.asyncio
async def test_schedule_narration_no_handler_available(monkeypatch: pytest.MonkeyPatch) -> None:
# Simulate the AppHolder-not-initialized case (unit tests, pre-startup).
await _install_handler(monkeypatch, None)
state = NarratorState()
state.record_transition(TransitionKind.NAVIGATION_COMPLETED)
stream = _FakeStream()
schedule_narration(state, stream, iteration=0) # type: ignore[arg-type]
assert state.in_flight_task is not None
await state.in_flight_task
assert stream.sent == []
assert state.in_flight_task is None
@pytest.mark.asyncio
async def test_schedule_narration_skips_when_in_flight(monkeypatch: pytest.MonkeyPatch) -> None:
"""A second transition arriving while the first narration is in flight
must not spawn a concurrent task. At most one narration runs at a time."""
gate = asyncio.Event()
async def _gated_handler(prompt: str, prompt_name: str, **kwargs: object) -> str:
await gate.wait()
return "first narration"
await _install_handler(monkeypatch, _gated_handler)
state = NarratorState()
state.record_transition(TransitionKind.WORKFLOW_UPDATED)
stream = _FakeStream()
schedule_narration(state, stream, iteration=0) # type: ignore[arg-type]
first_task = state.in_flight_task
assert first_task is not None
# Second transition arrives while first is running.
state.record_transition(TransitionKind.ENFORCEMENT_RETRY)
schedule_narration(state, stream, iteration=1) # type: ignore[arg-type]
# Same task -- no new task spawned.
assert state.in_flight_task is first_task
gate.set()
await first_task
assert len(stream.sent) == 1
# ---------------------------------------------------------------------------
# cancel_in_flight
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cancel_in_flight_noop_when_no_task() -> None:
state = NarratorState()
await cancel_in_flight(state) # must not raise
@pytest.mark.asyncio
async def test_cancel_in_flight_noop_when_task_done() -> None:
async def _immediate() -> None:
return None
task = asyncio.create_task(_immediate())
await task
state = NarratorState(in_flight_task=task)
await cancel_in_flight(state) # must not raise
@pytest.mark.asyncio
async def test_cancel_in_flight_hard_cancels_running_task() -> None:
"""A narration LLM call runs ~2-3s; waiting it out before the final
response would regress completion latency. Cancel immediately so the
route can ship the final assistant message without delay."""
async def _pending() -> None:
await asyncio.sleep(60)
task = asyncio.create_task(_pending())
state = NarratorState(in_flight_task=task)
await cancel_in_flight(state)
assert task.cancelled() or task.done()
@pytest.mark.asyncio
async def test_cancel_in_flight_returns_fast() -> None:
"""Cancellation must not add meaningful latency to stream teardown.
Budget: well under 100ms for a hard-cancel."""
started = asyncio.Event()
async def _pending() -> None:
started.set()
await asyncio.sleep(60)
task = asyncio.create_task(_pending())
await started.wait()
state = NarratorState(in_flight_task=task)
loop = asyncio.get_running_loop()
t0 = loop.time()
await cancel_in_flight(state)
elapsed = loop.time() - t0
assert elapsed < 0.1, f"cancel_in_flight took {elapsed:.3f}s, expected <0.1s"

View file

@ -150,6 +150,92 @@ async def test_stream_to_sse_propagates_cancelled_error() -> None:
result.cancel.assert_called_once()
@pytest.mark.asyncio
async def test_stream_to_sse_emits_narration_on_workflow_updated_transition(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""End-to-end: a completed update_workflow tool round-trip flips
ctx.update_workflow_called, which should register as a workflow_updated
transition on the narrator state and produce a NARRATION SSE payload in
addition to the existing TOOL_CALL / TOOL_RESULT frames.
"""
from agents.items import RunItem
from agents.stream_events import RunItemStreamEvent
from skyvern.forge.sdk.copilot import narration
from skyvern.forge.sdk.copilot.context import CopilotContext
from skyvern.forge.sdk.schemas.workflow_copilot import WorkflowCopilotStreamMessageType
async def _handler(prompt: str, prompt_name: str, **kwargs: object) -> str:
return "Revising the workflow draft."
monkeypatch.setattr(narration, "_get_narrator_handler", lambda: _handler)
# Tool input / output raw_items matching the shapes streaming_adapter reads.
call_raw = {"call_id": "c-upd", "name": "update_workflow", "arguments": "{}"}
call_item = MagicMock(spec=RunItem)
call_item.raw_item = call_raw
tool_call_event = RunItemStreamEvent(name="tool_called", item=call_item)
# Shape the output so _update_enforcement_from_tool sets
# update_workflow_called=True (needs ok=True + non-empty block_count).
output_payload = [{"type": "text", "text": '{"ok": true, "data": {"block_count": 2}}'}]
out_item = MagicMock(spec=RunItem)
out_item.raw_item = {"call_id": "c-upd", "name": "update_workflow"}
out_item.output = output_payload
tool_output_event = RunItemStreamEvent(name="tool_output", item=out_item)
# In production the SDK yields tool events slowly (LLM latency between
# them), giving the background narration task ample time to finish before
# stream_to_sse's finally cancels anything still in flight. Simulate that
# by inserting explicit event-loop yields between synthetic events and
# after the last one, so the narration task can run to completion under
# the test's microsecond-fast loop.
async def _slow_stream_events() -> Any:
yield tool_call_event
await asyncio.sleep(0)
yield tool_output_event
# Let the scheduled narration task complete before the async generator
# exits and stream_to_sse's finally cancels any still-running task.
for _ in range(10):
await asyncio.sleep(0)
result = MagicMock()
result.stream_events = _slow_stream_events
result.cancel = MagicMock()
sent: list[Any] = []
async def _send(payload: Any) -> bool:
sent.append(payload)
return True
stream = MagicMock()
stream.is_disconnected = AsyncMock(return_value=False)
stream.send = _send
ctx = CopilotContext(
organization_id="org_test",
workflow_id="wf_test",
workflow_permanent_id="wpid_test",
workflow_yaml="",
browser_session_id=None,
stream=None, # type: ignore[arg-type]
api_key=None,
user_message="",
)
await stream_to_sse(result, stream, ctx)
narration_payloads = [p for p in sent if getattr(p, "type", None) == WorkflowCopilotStreamMessageType.NARRATION]
assert len(narration_payloads) == 1
assert narration_payloads[0].narration == "Revising the workflow draft."
# The tool round-trip also emitted TOOL_CALL + TOOL_RESULT.
tool_types = [getattr(p, "type", None) for p in sent]
assert WorkflowCopilotStreamMessageType.TOOL_CALL in tool_types
assert WorkflowCopilotStreamMessageType.TOOL_RESULT in tool_types
class TestParseToolOutput:
@staticmethod
def _parse(output: Any) -> dict[str, Any]: