🔄 synced local 'skyvern/' with remote 'skyvern/'

This commit is contained in:
andrewneilson 2026-04-23 19:05:17 +00:00
parent e376b8a7e9
commit d291a8c64a
6 changed files with 110 additions and 8 deletions

View file

@ -168,6 +168,7 @@ def _build_exit_result(ctx: CopilotContext, user_response: str, global_llm_conte
global_llm_context=global_llm_context,
workflow_yaml=verified_yaml,
workflow_was_persisted=ctx.workflow_persisted,
total_tokens=ctx.total_tokens_used,
)
@ -251,6 +252,7 @@ def _translate_to_agent_result(
response_type=resp_type,
workflow_yaml=last_workflow_yaml,
workflow_was_persisted=ctx.workflow_persisted,
total_tokens=ctx.total_tokens_used,
)
@ -486,6 +488,7 @@ async def run_copilot_agent(
global_llm_context=global_llm_context,
workflow_yaml=None,
workflow_was_persisted=ctx.workflow_persisted,
total_tokens=ctx.total_tokens_used,
)
except Exception as e:
LOG.error("Copilot agent error", error=str(e), exc_info=True)

View file

@ -4,13 +4,15 @@ from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal
from pydantic import BaseModel, Field
from skyvern.forge.sdk.copilot.runtime import AgentContext
from skyvern.forge.sdk.workflow.models.workflow import Workflow
ResponseType = Literal["REPLY", "ASK_QUESTION", "REPLACE_WORKFLOW"]
if TYPE_CHECKING:
from skyvern.forge.sdk.copilot.narration import NarratorState
@ -108,13 +110,18 @@ class AgentResult:
user_response: str
updated_workflow: Workflow | None
global_llm_context: str | None
response_type: str = "REPLY"
response_type: ResponseType = "REPLY"
workflow_yaml: str | None = None
workflow_was_persisted: bool = False
# Feasibility-gate fast-path sets this True so the route can null any
# previously-persisted proposed_workflow. Regular in-loop ASK_QUESTION
# responses leave it False, preserving in-progress drafts.
clear_proposed_workflow: bool = False
# Actual API token usage accumulated across the agent run. None when no
# provider reported usage on the stream — distinguishes "no data" from
# "0 tokens" so eval cost grading can flag missing telemetry instead of
# silently passing as cheap.
total_tokens: int | None = None
@dataclass
@ -150,6 +157,14 @@ class CopilotContext(AgentContext):
consecutive_tool_tracker: list[str] = field(default_factory=list)
tool_activity: list[dict[str, Any]] = field(default_factory=list)
# Token usage summed from raw_responses after each streamed run. None
# until the first response that carries a usage object — some providers
# (notably non-OpenAI streaming routes) omit usage entirely, and we want
# eval cost grading to see "no data" rather than "0 tokens".
total_tokens_used: int | None = None
input_tokens_used: int | None = None
output_tokens_used: int | None = None
# Workflow state
last_workflow: Workflow | None = None
last_workflow_yaml: str | None = None

View file

@ -918,6 +918,31 @@ class _SendTrackingStream:
await self._inner.close()
def _accumulate_usage(result: RunResultStreaming, ctx: Any) -> None:
"""Sum actual token usage from raw_responses onto the context.
Called per enforcement iteration in a ``finally:`` so pre-overflow
response tokens are still counted even when ``stream_to_sse`` raises.
First observed usage flips the counters from ``None`` to ``0``; if no
response on this stream carries a usage object the counters stay
``None``, which the eval surfaces as "telemetry missing" rather than
"ran for free".
"""
if not hasattr(ctx, "total_tokens_used"):
return
for resp in getattr(result, "raw_responses", []) or []:
usage = getattr(resp, "usage", None)
if usage is None:
continue
if ctx.total_tokens_used is None:
ctx.total_tokens_used = 0
ctx.input_tokens_used = 0
ctx.output_tokens_used = 0
ctx.total_tokens_used += getattr(usage, "total_tokens", 0) or 0
ctx.input_tokens_used += getattr(usage, "input_tokens", 0) or 0
ctx.output_tokens_used += getattr(usage, "output_tokens", 0) or 0
async def run_with_enforcement(
agent: Agent,
initial_input: str | list,
@ -965,7 +990,10 @@ async def run_with_enforcement(
):
try:
result = Runner.run_streamed(agent, input=current_input, context=ctx, session=session, **runner_kwargs)
await stream_to_sse(result, tracked_stream, ctx)
try:
await stream_to_sse(result, tracked_stream, ctx)
finally:
_accumulate_usage(result, ctx)
except Exception as e:
if not _is_context_window_error(e):
raise
@ -995,7 +1023,10 @@ async def run_with_enforcement(
result = Runner.run_streamed(
agent, input=current_input, context=ctx, session=session, **runner_kwargs
)
await stream_to_sse(result, tracked_stream, ctx)
try:
await stream_to_sse(result, tracked_stream, ctx)
finally:
_accumulate_usage(result, ctx)
except Exception as retry_err:
# Never retry twice; even a second overflow surfaces as a
# real failure rather than spinning.

View file

@ -59,6 +59,20 @@ _FAILED_BLOCK_STATUSES: frozenset[str] = frozenset(
)
_DATA_PRODUCING_BLOCK_TYPES = frozenset({"EXTRACTION", "TEXT_PROMPT"})
# Block types whose ``block.output`` is a ``TaskOutput.from_task()`` envelope
# (schemas/tasks.py:TaskOutput) rather than the raw payload. The
# meaningful-data check must unwrap these via ``_block_data_payload`` before
# judging output, because envelope fields (task_id, status, artifact IDs) are
# always populated on a completed run and would otherwise mask empty
# extractions. This is a subset of ``_DATA_PRODUCING_BLOCK_TYPES`` — keep the
# two in sync when adding a new task-backed type. ``TEXT_PROMPT`` is
# deliberately excluded: its block.output is the raw LLM response dict (see
# ``TextPromptBlock.execute``), no envelope to strip.
_TASK_ENVELOPE_BLOCK_TYPES = frozenset({"EXTRACTION"})
assert _TASK_ENVELOPE_BLOCK_TYPES <= _DATA_PRODUCING_BLOCK_TYPES, (
"_TASK_ENVELOPE_BLOCK_TYPES must be a subset of _DATA_PRODUCING_BLOCK_TYPES"
)
# Absolute upper bound on a single ``run_blocks`` tool invocation. Exists only
# as a last-resort trip wire for runaway loops — progressing runs should never
# approach this. The OpenAI Agents SDK wraps the tool in
@ -239,6 +253,34 @@ def _is_meaningful_extracted_data(extracted: Any) -> bool:
return True
# Payload fields inside a ``TaskOutput.from_task()`` envelope
# (schemas/tasks.py:TaskOutput). Only these carry "did the block produce
# something useful?" signal; the rest (task_id, status, artifact IDs, etc.)
# are always populated on a completed run and would short-circuit
# _is_meaningful_extracted_data to True even when nothing useful was produced.
_TASK_OUTPUT_PAYLOAD_FIELDS: tuple[str, ...] = (
"extracted_information",
"downloaded_files",
"downloaded_file_urls",
)
def _block_data_payload(extracted_data: Any, block_type: str | None) -> Any:
"""Return the payload view of a block's output for the meaningful-data check.
For task-envelope block types (``_TASK_ENVELOPE_BLOCK_TYPES``), slice the
envelope down to ``_TASK_OUTPUT_PAYLOAD_FIELDS`` so envelope metadata
can't mask an empty result. Other data-producing types pass through
unchanged e.g. TEXT_PROMPT's ``block.output`` is the raw LLM response
dict (TextPromptBlock.execute records ``output_parameter_value=response``
directly), so scoping the unwrap avoids slicing a user-defined
json_schema that happens to include an ``extracted_information`` field.
"""
if block_type in _TASK_ENVELOPE_BLOCK_TYPES and isinstance(extracted_data, dict):
return {field: extracted_data.get(field) for field in _TASK_OUTPUT_PAYLOAD_FIELDS}
return extracted_data
async def _attach_action_traces(
blocks: list,
results: list[dict[str, Any]],
@ -318,7 +360,7 @@ async def _attach_failed_block_screenshots(
task_id_to_block[task_id]["screenshot_b64"] = b64
_BLOCK_RUNNING_TOOLS = frozenset({"run_blocks_and_collect_debug", "update_and_run_blocks"})
BLOCK_RUNNING_TOOLS = frozenset({"run_blocks_and_collect_debug", "update_and_run_blocks"})
def _tool_loop_error(ctx: AgentContext, tool_name: str) -> str | None:
@ -334,7 +376,7 @@ def _tool_loop_error(ctx: AgentContext, tool_name: str) -> str | None:
# detecting) and further attempts will just burn the tool timeout. Scoped
# to block-running tools so planning/metadata tools (update_workflow,
# list_credentials, get_run_results) stay unaffected.
if tool_name in _BLOCK_RUNNING_TOOLS:
if tool_name in BLOCK_RUNNING_TOOLS:
# Reconciliation guard: the previous block-running tool call exited
# without a trustworthy terminal status for its workflow run (the
# watchdog's stagnation / ceiling / task_exit_unfinalized paths, or
@ -1752,9 +1794,11 @@ def _analyze_run_blocks(result: dict[str, Any]) -> tuple[str | None, bool, list[
reason = block.get("failure_reason")
if isinstance(reason, str):
texts_to_scan.append(reason)
if block.get("block_type") in _DATA_PRODUCING_BLOCK_TYPES and block.get("status") == "completed":
block_type = block.get("block_type")
if block_type in _DATA_PRODUCING_BLOCK_TYPES and block.get("status") == "completed":
has_data_blocks = True
if _is_meaningful_extracted_data(block.get("extracted_data")):
payload = _block_data_payload(block.get("extracted_data"), block_type)
if _is_meaningful_extracted_data(payload):
any_data_output = True
combined = "\n".join(texts_to_scan)

View file

@ -910,6 +910,8 @@ async def _new_copilot_chat_post(
message=user_response,
updated_workflow=updated_workflow.model_dump(mode="json") if updated_workflow else None,
response_time=assistant_message.created_at,
total_tokens=getattr(agent_result, "total_tokens", None),
response_type=getattr(agent_result, "response_type", "REPLY"),
)
)
except HTTPException as exc:

View file

@ -3,6 +3,8 @@ from enum import StrEnum
from pydantic import BaseModel, ConfigDict, Field
from skyvern.forge.sdk.copilot.context import ResponseType
class WorkflowCopilotChat(BaseModel):
model_config = ConfigDict(from_attributes=True)
@ -86,6 +88,11 @@ class WorkflowCopilotStreamResponseUpdate(BaseModel):
message: str = Field(..., description="The message sent to the user")
updated_workflow: dict | None = Field(None, description="The updated workflow")
response_time: datetime = Field(..., description="When the assistant message was created")
total_tokens: int | None = Field(
None,
description="Total tokens consumed by the agent during this turn; None when no provider reported usage",
)
response_type: ResponseType = Field("REPLY", description="Agent response classification")
class WorkflowCopilotStreamErrorUpdate(BaseModel):