🔄 synced local 'tests/unit/' with remote 'tests/unit/'

This commit is contained in:
Shuchang Zheng 2026-04-23 21:22:27 +00:00
parent 6fd7c65469
commit 00ae545b24
17 changed files with 1919 additions and 296 deletions

View file

@ -1,7 +1,13 @@
"""Shared pytest fixtures and setup for unit tests."""
# -- begin speed up unit tests
from unittest.mock import AsyncMock, MagicMock
import pytest
from opentelemetry import trace as otel_trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from tests.unit.force_stub_app import start_forge_stub_app
@ -59,3 +65,36 @@ def make_mock_session(mock_model: MagicMock) -> AsyncMock:
mock_session.refresh = AsyncMock()
return mock_session
# -- shared OTEL span capture for tests that assert on span attributes --
#
# OTEL's global TracerProvider can only be set once per process. We install a
# single TracerProvider + InMemorySpanExporter at session start; tests that
# need span capture depend on the `span_exporter` fixture and get a cleared
# exporter for each test.
_SHARED_SPAN_EXPORTER: InMemorySpanExporter | None = None
def _install_span_exporter() -> InMemorySpanExporter:
global _SHARED_SPAN_EXPORTER
if _SHARED_SPAN_EXPORTER is None:
exporter = InMemorySpanExporter()
provider = otel_trace.get_tracer_provider()
if isinstance(provider, TracerProvider):
provider.add_span_processor(SimpleSpanProcessor(exporter))
else:
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(exporter))
otel_trace.set_tracer_provider(provider)
_SHARED_SPAN_EXPORTER = exporter
return _SHARED_SPAN_EXPORTER
@pytest.fixture
def span_exporter() -> InMemorySpanExporter:
exporter = _install_span_exporter()
exporter.clear()
yield exporter
exporter.clear()

View file

@ -0,0 +1,171 @@
"""Tests for the `skyvern.agent.complete_verify` OTEL span attributes (SKY-9174).
The verification span carries two attributes that power the logfire signal used
to measure SKY-9174's acceptance criterion post-rollout:
- ``verification.status``: ``"complete" | "terminate" | "continue"``
- ``verification.template``: ``"check-user-goal" | "check-user-goal-with-termination"``
These tests assert the attributes are set correctly across the three result
shapes and under both prompt-template selections. No behavioral change is being
verified just the observability plumbing.
"""
from __future__ import annotations
from datetime import UTC, datetime
from unittest.mock import AsyncMock
from zoneinfo import ZoneInfo
import pytest
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from skyvern.forge.agent import ForgeAgent
from skyvern.forge.sdk.core import skyvern_context
from skyvern.forge.sdk.core.skyvern_context import SkyvernContext
from skyvern.forge.sdk.models import StepStatus
from tests.unit.helpers import make_browser_state, make_organization, make_step, make_task
COMPLETE_VERIFY_SPAN_NAME = "skyvern.agent.complete_verify"
def _span_by_name(spans: list, name: str):
return next((s for s in spans if s.name == name), None)
async def _call_complete_verify(
monkeypatch: pytest.MonkeyPatch,
*,
llm_response: dict,
use_termination_prompt: bool,
) -> None:
agent = ForgeAgent()
now = datetime.now(UTC)
organization = make_organization(now)
task = make_task(now, organization, navigation_goal="Submit the contact form")
step = make_step(
now,
task,
step_id="step-verify",
status=StepStatus.running,
order=0,
output=None,
)
_, scraped_page, page = make_browser_state()
scraped_page_refreshed = AsyncMock()
scraped_page_refreshed.screenshots = [b"image"]
scraped_page.refresh = AsyncMock(return_value=scraped_page_refreshed)
monkeypatch.setattr(
"skyvern.forge.agent.service_utils.is_cua_task",
AsyncMock(return_value=False),
)
async def feature_flag_side_effect(flag_name: str, *_args, **_kwargs) -> bool:
if flag_name == "USE_TERMINATION_AWARE_COMPLETE_VERIFICATION":
return use_termination_prompt
return False
monkeypatch.setattr(
"skyvern.forge.agent.app.EXPERIMENTATION_PROVIDER.is_feature_enabled_cached",
AsyncMock(side_effect=feature_flag_side_effect),
)
monkeypatch.setattr(
"skyvern.forge.agent.load_prompt_with_elements",
lambda **_kwargs: "rendered prompt",
)
llm_handler = AsyncMock(return_value=llm_response)
monkeypatch.setattr(
"skyvern.forge.agent.LLMAPIHandlerFactory.get_override_llm_api_handler",
lambda *_args, **_kwargs: llm_handler,
)
context = SkyvernContext(
task_id=task.task_id,
step_id=None,
organization_id=task.organization_id,
workflow_run_id=task.workflow_run_id,
tz_info=ZoneInfo("UTC"),
)
skyvern_context.set(context)
try:
await agent.complete_verify(page=page, scraped_page=scraped_page, task=task, step=step)
finally:
skyvern_context.reset()
@pytest.mark.asyncio
async def test_span_attrs_for_complete_status_legacy_prompt(
monkeypatch: pytest.MonkeyPatch, span_exporter: InMemorySpanExporter
) -> None:
await _call_complete_verify(
monkeypatch,
llm_response={"user_goal_achieved": True, "thoughts": "done", "page_info": "ok"},
use_termination_prompt=False,
)
span = _span_by_name(span_exporter.get_finished_spans(), COMPLETE_VERIFY_SPAN_NAME)
assert span is not None
attrs = span.attributes or {}
assert attrs.get("verification.status") == "complete"
assert attrs.get("verification.template") == "check-user-goal"
@pytest.mark.asyncio
async def test_span_attrs_for_continue_status_legacy_prompt(
monkeypatch: pytest.MonkeyPatch, span_exporter: InMemorySpanExporter
) -> None:
await _call_complete_verify(
monkeypatch,
llm_response={"user_goal_achieved": False, "thoughts": "still loading", "page_info": "spinner"},
use_termination_prompt=False,
)
span = _span_by_name(span_exporter.get_finished_spans(), COMPLETE_VERIFY_SPAN_NAME)
assert span is not None
attrs = span.attributes or {}
assert attrs.get("verification.status") == "continue"
assert attrs.get("verification.template") == "check-user-goal"
@pytest.mark.asyncio
async def test_span_attrs_for_terminate_status_termination_aware_prompt(
monkeypatch: pytest.MonkeyPatch, span_exporter: InMemorySpanExporter
) -> None:
await _call_complete_verify(
monkeypatch,
llm_response={
"status": "terminate",
"thoughts": "blocked by captcha",
"page_info": "cloudflare",
"failure_categories": [{"category": "ANTI_BOT_DETECTION", "confidence_float": 0.9, "reasoning": "cf"}],
},
use_termination_prompt=True,
)
span = _span_by_name(span_exporter.get_finished_spans(), COMPLETE_VERIFY_SPAN_NAME)
assert span is not None
attrs = span.attributes or {}
assert attrs.get("verification.status") == "terminate"
assert attrs.get("verification.template") == "check-user-goal-with-termination"
@pytest.mark.asyncio
async def test_span_attrs_for_complete_status_termination_aware_prompt(
monkeypatch: pytest.MonkeyPatch, span_exporter: InMemorySpanExporter
) -> None:
await _call_complete_verify(
monkeypatch,
llm_response={
"status": "complete",
"thoughts": "thank-you page visible",
"page_info": "thank-you",
"failure_categories": [],
},
use_termination_prompt=True,
)
span = _span_by_name(span_exporter.get_finished_spans(), COMPLETE_VERIFY_SPAN_NAME)
assert span is not None
attrs = span.attributes or {}
assert attrs.get("verification.status") == "complete"
assert attrs.get("verification.template") == "check-user-goal-with-termination"

View file

@ -315,6 +315,37 @@ class TestTranslateToAgentResultGating:
assert "All done" not in agent_result.user_response
assert agent_result.updated_workflow is None
def test_inline_replace_workflow_wraps_block_goals_with_user_message(self, monkeypatch) -> None:
# SKY-9174 parity: update_and_run_blocks_tool wraps block goals with
# the user's chat message as big-goal context. The REPLACE_WORKFLOW
# inline path must do the same, otherwise the untested yaml latches
# onto ctx without user-intent framing and any downstream block run
# hits the verifier-on-confirmation-surface bug this PR fixes.
from skyvern.forge.sdk.copilot import agent as agent_module
captured: dict[str, str] = {}
def fake_process(**kwargs):
captured["yaml"] = kwargs["workflow_yaml"]
return SimpleNamespace(name="new-wf")
def fake_wrap(workflow_yaml: str, user_message: str) -> str:
return f"WRAPPED::{user_message}::{workflow_yaml}"
monkeypatch.setattr("skyvern.forge.sdk.copilot.tools._process_workflow_yaml", fake_process)
monkeypatch.setattr("skyvern.forge.sdk.copilot.agent.wrap_block_goals", fake_wrap)
ctx = _ctx(user_message="Submit a contact form on example.com.")
result = _fake_run_result(
{"type": "REPLACE_WORKFLOW", "user_response": "Here you go.", "workflow_yaml": "raw: yaml"}
)
agent_module._translate_to_agent_result(
result, ctx, global_llm_context=None, chat_request=_chat_request(), organization_id="org-1"
)
assert captured["yaml"] == "WRAPPED::Submit a contact form on example.com.::raw: yaml"
assert ctx.last_workflow_yaml == "WRAPPED::Submit a contact form on example.com.::raw: yaml"
class TestCredentialRefusalReachesAgent:
"""Prove the SKY-9189 refusal rule is actually delivered to the agent.

View file

@ -0,0 +1,261 @@
"""Tests for the copilot v2 block-goal wrap helper (SKY-9174, Part B).
Covers wrapping of ``navigation_goal``, ``complete_criterion``, and
``terminate_criterion`` via ``MINI_GOAL_TEMPLATE``.
"""
from __future__ import annotations
import textwrap
import yaml
from skyvern.constants import MINI_GOAL_TEMPLATE
from skyvern.forge.sdk.copilot.block_goal_wrapping import wrap_block_goals
USER_MESSAGE = "Submit a contact form on example.com with my details."
def _yaml_with_blocks(*blocks: dict) -> str:
return yaml.safe_dump(
{
"title": "test workflow",
"workflow_definition": {"blocks": list(blocks)},
},
sort_keys=False,
)
def _blocks_from(yaml_str: str) -> list[dict]:
return yaml.safe_load(yaml_str)["workflow_definition"]["blocks"]
def _wrapped(mini_goal: str) -> str:
return MINI_GOAL_TEMPLATE.format(mini_goal=mini_goal, main_goal=USER_MESSAGE)
def test_wraps_task_block_navigation_goal() -> None:
src = _yaml_with_blocks(
{"block_type": "task", "label": "fill_form", "navigation_goal": "Fill in name and email, click Send"}
)
out = wrap_block_goals(src, USER_MESSAGE)
blocks = _blocks_from(out)
assert blocks[0]["navigation_goal"] == _wrapped("Fill in name and email, click Send")
def test_wraps_navigation_action_login_and_file_download_block_types() -> None:
src = _yaml_with_blocks(
{"block_type": "navigation", "label": "a", "navigation_goal": "nav goal"},
{"block_type": "action", "label": "b", "navigation_goal": "action goal"},
{"block_type": "login", "label": "c", "navigation_goal": "login goal"},
{"block_type": "file_download", "label": "d", "navigation_goal": "download goal"},
)
out = wrap_block_goals(src, USER_MESSAGE)
blocks = _blocks_from(out)
assert blocks[0]["navigation_goal"] == _wrapped("nav goal")
assert blocks[1]["navigation_goal"] == _wrapped("action goal")
assert blocks[2]["navigation_goal"] == _wrapped("login goal")
assert blocks[3]["navigation_goal"] == _wrapped("download goal")
def test_wraps_complete_criterion_on_validation_navigation_and_login_blocks() -> None:
src = _yaml_with_blocks(
{"block_type": "validation", "label": "v", "complete_criterion": "Your message has been sent"},
{
"block_type": "navigation",
"label": "n",
"navigation_goal": "submit form",
"complete_criterion": "confirmation page visible",
},
{
"block_type": "login",
"label": "l",
"navigation_goal": "login",
"complete_criterion": "user dashboard visible",
},
)
out = wrap_block_goals(src, USER_MESSAGE)
blocks = _blocks_from(out)
assert blocks[0]["complete_criterion"] == _wrapped("Your message has been sent")
assert blocks[1]["navigation_goal"] == _wrapped("submit form")
assert blocks[1]["complete_criterion"] == _wrapped("confirmation page visible")
assert blocks[2]["navigation_goal"] == _wrapped("login")
assert blocks[2]["complete_criterion"] == _wrapped("user dashboard visible")
def test_wraps_terminate_criterion() -> None:
src = _yaml_with_blocks(
{
"block_type": "validation",
"label": "v",
"complete_criterion": "order placed",
"terminate_criterion": "payment failed",
},
)
out = wrap_block_goals(src, USER_MESSAGE)
block = _blocks_from(out)[0]
assert block["complete_criterion"] == _wrapped("order placed")
assert block["terminate_criterion"] == _wrapped("payment failed")
def test_leaves_blocks_without_wrappable_fields_untouched() -> None:
src = _yaml_with_blocks(
{"block_type": "extraction", "label": "extract", "data_extraction_goal": "get title"},
{"block_type": "goto_url", "label": "go", "url": "https://example.com"},
{"block_type": "task", "label": "empty_goal", "navigation_goal": "", "complete_criterion": ""},
{"block_type": "validation", "label": "null_crit", "complete_criterion": None},
)
out = wrap_block_goals(src, USER_MESSAGE)
blocks = _blocks_from(out)
assert blocks[0] == {"block_type": "extraction", "label": "extract", "data_extraction_goal": "get title"}
assert blocks[1] == {"block_type": "goto_url", "label": "go", "url": "https://example.com"}
assert blocks[2]["navigation_goal"] == ""
assert blocks[2]["complete_criterion"] == ""
assert blocks[3]["complete_criterion"] is None
def test_idempotent_on_already_wrapped_fields() -> None:
already_wrapped_goal = _wrapped("Fill in name and email, click Send")
already_wrapped_criterion = _wrapped("Your message has been sent")
src = _yaml_with_blocks(
{
"block_type": "task",
"label": "fill_form",
"navigation_goal": already_wrapped_goal,
"complete_criterion": already_wrapped_criterion,
}
)
out = wrap_block_goals(src, USER_MESSAGE)
block = _blocks_from(out)[0]
assert block["navigation_goal"] == already_wrapped_goal
assert block["complete_criterion"] == already_wrapped_criterion
def test_noop_on_empty_user_message() -> None:
src = _yaml_with_blocks(
{
"block_type": "task",
"label": "fill_form",
"navigation_goal": "Fill the form",
"complete_criterion": "Your message has been sent",
}
)
out = wrap_block_goals(src, "")
assert out == src
def test_noop_when_no_block_mutations_needed() -> None:
src = _yaml_with_blocks(
{"block_type": "extraction", "label": "extract", "data_extraction_goal": "get title"},
)
out = wrap_block_goals(src, USER_MESSAGE)
assert out == src
def test_recurses_into_for_loop_blocks() -> None:
src = yaml.safe_dump(
{
"title": "loop workflow",
"workflow_definition": {
"blocks": [
{
"block_type": "for_loop",
"label": "loop",
"loop_over": {"parameter_key": "items"},
"loop_blocks": [
{"block_type": "task", "label": "inner_task", "navigation_goal": "Process each item"},
{
"block_type": "validation",
"label": "inner_check",
"complete_criterion": "item processed",
},
],
}
]
},
},
sort_keys=False,
)
out = wrap_block_goals(src, USER_MESSAGE)
parsed = yaml.safe_load(out)
loop_blocks = parsed["workflow_definition"]["blocks"][0]["loop_blocks"]
assert loop_blocks[0]["navigation_goal"] == _wrapped("Process each item")
assert loop_blocks[1]["complete_criterion"] == _wrapped("item processed")
def test_preserves_other_fields() -> None:
src = _yaml_with_blocks(
{
"block_type": "task",
"label": "fill_form",
"url": "https://example.com",
"title": "Fill form",
"navigation_goal": "Fill in fields",
"parameter_keys": ["name", "email"],
"complete_criterion": "Form submitted",
"max_retries": 2,
}
)
out = wrap_block_goals(src, USER_MESSAGE)
block = _blocks_from(out)[0]
assert block["url"] == "https://example.com"
assert block["title"] == "Fill form"
assert block["parameter_keys"] == ["name", "email"]
assert block["max_retries"] == 2
assert block["navigation_goal"] == _wrapped("Fill in fields")
assert block["complete_criterion"] == _wrapped("Form submitted")
def test_returns_input_unchanged_on_malformed_yaml() -> None:
malformed = textwrap.dedent(
"""
title: bad
workflow_definition:
blocks:
- block_type: task
navigation_goal: "unclosed
"""
).strip()
out = wrap_block_goals(malformed, USER_MESSAGE)
assert out == malformed
def test_returns_input_unchanged_when_workflow_definition_missing() -> None:
src = yaml.safe_dump({"title": "no definition"}, sort_keys=False)
out = wrap_block_goals(src, USER_MESSAGE)
assert out == src
def test_returns_input_unchanged_when_blocks_not_list() -> None:
src = yaml.safe_dump(
{"title": "bad blocks", "workflow_definition": {"blocks": "not a list"}},
sort_keys=False,
)
out = wrap_block_goals(src, USER_MESSAGE)
assert out == src

View file

@ -238,7 +238,7 @@ def test_trusted_post_drain_handles_missing_row() -> None:
from types import SimpleNamespace as _NS # noqa: E402 (grouped with test block)
from skyvern.forge.sdk.copilot.tools import _BLOCK_RUNNING_TOOLS, _tool_loop_error # noqa: E402
from skyvern.forge.sdk.copilot.tools import BLOCK_RUNNING_TOOLS, _tool_loop_error # noqa: E402
def _guard_ctx(pending_run_id: str | None = None) -> _NS:
@ -251,9 +251,9 @@ def _guard_ctx(pending_run_id: str | None = None) -> _NS:
)
@pytest.mark.parametrize("tool_name", sorted(_BLOCK_RUNNING_TOOLS))
@pytest.mark.parametrize("tool_name", sorted(BLOCK_RUNNING_TOOLS))
def test_reconciliation_guard_blocks_block_running_tools(tool_name: str) -> None:
"""With a pending run set, every tool in ``_BLOCK_RUNNING_TOOLS`` is
"""With a pending run set, every tool in ``BLOCK_RUNNING_TOOLS`` is
rejected with an error that names the run id and directs the LLM to call
``get_run_results`` first."""
ctx = _guard_ctx(pending_run_id="wr_pending_123")
@ -268,7 +268,7 @@ def test_reconciliation_guard_blocks_block_running_tools(tool_name: str) -> None
["get_run_results", "update_workflow", "list_credentials"],
)
def test_reconciliation_guard_ignores_non_block_running_tools(tool_name: str) -> None:
"""The guard is scoped to ``_BLOCK_RUNNING_TOOLS``. Planning / metadata
"""The guard is scoped to ``BLOCK_RUNNING_TOOLS``. Planning / metadata
tools (including ``get_run_results`` itself, which is the tool that can
CLEAR the flag) must not be rejected."""
ctx = _guard_ctx(pending_run_id="wr_pending_123")
@ -279,7 +279,7 @@ def test_reconciliation_guard_passes_when_flag_empty() -> None:
"""No pending run → `_tool_loop_error` returns None for block-running
tools (assuming no other guard trips)."""
ctx = _guard_ctx(pending_run_id=None)
for name in _BLOCK_RUNNING_TOOLS:
for name in BLOCK_RUNNING_TOOLS:
assert _tool_loop_error(ctx, name) is None
@ -288,7 +288,7 @@ def test_reconciliation_guard_rejects_empty_string_run_id() -> None:
not set. Prevents a spurious guard trip if anything ever clears the flag
to ``''`` instead of ``None``."""
ctx = _guard_ctx(pending_run_id="")
for name in _BLOCK_RUNNING_TOOLS:
for name in BLOCK_RUNNING_TOOLS:
assert _tool_loop_error(ctx, name) is None

View file

@ -570,7 +570,7 @@ def test_tool_loop_error_blocks_run_blocks_and_collect_debug_after_dns_failure()
def test_tool_loop_error_does_not_block_planning_tools() -> None:
# get_run_results / list_credentials / update_workflow are scoped out of
# _BLOCK_RUNNING_TOOLS and should remain callable so the agent can inspect
# BLOCK_RUNNING_TOOLS and should remain callable so the agent can inspect
# the failure and decide how to respond to the user.
ctx = _fresh_context()
ctx.last_test_non_retriable_nav_error = _DNS_FAILURE_REASON

View file

@ -0,0 +1,114 @@
"""Tests for the copilot-v2 SchemaOverlay hooks that ban task/task_v2 block
types at the discovery surface (SKY-9174, Part C)."""
from __future__ import annotations
from unittest.mock import MagicMock
import pytest
from skyvern.forge.sdk.copilot.tools import (
_COPILOT_BANNED_BLOCK_TYPES,
_get_block_schema_post_hook,
_get_block_schema_pre_hook,
)
@pytest.fixture
def ctx() -> MagicMock:
return MagicMock()
@pytest.mark.parametrize("block_type", ["task", "task_v2", "TASK", "Task_V2", " task "])
@pytest.mark.asyncio
async def test_pre_hook_blocks_banned_types_case_and_whitespace_insensitive(block_type: str, ctx: MagicMock) -> None:
result = await _get_block_schema_pre_hook({"block_type": block_type}, ctx)
assert result is not None
assert result["ok"] is False
assert "not available in the workflow copilot" in result["error"]
for alternative in ("navigation", "extraction", "validation", "login"):
assert alternative in result["error"]
@pytest.mark.asyncio
async def test_pre_hook_allows_non_banned_types(ctx: MagicMock) -> None:
for block_type in ("navigation", "extraction", "validation", "login", "goto_url", "for_loop"):
assert await _get_block_schema_pre_hook({"block_type": block_type}, ctx) is None
@pytest.mark.asyncio
async def test_pre_hook_allows_list_mode_no_block_type(ctx: MagicMock) -> None:
assert await _get_block_schema_pre_hook({}, ctx) is None
assert await _get_block_schema_pre_hook({"block_type": None}, ctx) is None
@pytest.mark.asyncio
async def test_pre_hook_allows_non_string_block_type(ctx: MagicMock) -> None:
assert await _get_block_schema_pre_hook({"block_type": 123}, ctx) is None
@pytest.mark.asyncio
async def test_post_hook_scrubs_banned_types_from_list_response(ctx: MagicMock) -> None:
result = {
"ok": True,
"data": {
"block_types": {
"navigation": "Take actions on a page",
"task": "deprecated",
"task_v2": "deprecated",
"extraction": "Extract data",
},
"count": 4,
},
}
out = await _get_block_schema_post_hook(result, raw={}, ctx=ctx)
assert set(out["data"]["block_types"]) == {"navigation", "extraction"}
@pytest.mark.asyncio
async def test_post_hook_passthrough_when_no_block_types_dict(ctx: MagicMock) -> None:
result = {"ok": True, "data": {"block_type": "navigation", "summary": "..."}}
out = await _get_block_schema_post_hook(result, raw={}, ctx=ctx)
assert out == {"ok": True, "data": {"block_type": "navigation", "summary": "..."}}
@pytest.mark.asyncio
async def test_post_hook_handles_missing_or_malformed_data(ctx: MagicMock) -> None:
assert await _get_block_schema_post_hook({"ok": False, "error": "x"}, raw={}, ctx=ctx) == {
"ok": False,
"error": "x",
}
assert await _get_block_schema_post_hook({"ok": True, "data": None}, raw={}, ctx=ctx) == {
"ok": True,
"data": None,
}
assert await _get_block_schema_post_hook(
{"ok": True, "data": {"block_types": ["not", "a", "dict"]}}, raw={}, ctx=ctx
) == {"ok": True, "data": {"block_types": ["not", "a", "dict"]}}
def test_banned_types_set_contents() -> None:
assert _COPILOT_BANNED_BLOCK_TYPES == frozenset({"task", "task_v2"})
def test_pre_hook_and_post_emission_reject_share_constant() -> None:
"""SKY-9174 Part F: the pre-emission SchemaOverlay hooks and the
post-emission YAML-level reject (in `_update_workflow` + REPLACE_WORKFLOW)
both import `_COPILOT_BANNED_BLOCK_TYPES` from the same module. Guard
against a future refactor that redefines the set in only one place
any divergence would leave one of the two layers out of sync."""
import skyvern.forge.sdk.copilot.tools as tools_module
# `_detect_new_banned_blocks` exists on the same module and is the
# post-emission counterpart. If either symbol is removed, the layer is
# effectively ripped out and we want this test to catch it.
assert hasattr(tools_module, "_COPILOT_BANNED_BLOCK_TYPES")
assert hasattr(tools_module, "_get_block_schema_pre_hook")
assert hasattr(tools_module, "_get_block_schema_post_hook")
assert hasattr(tools_module, "_detect_new_banned_blocks")
assert hasattr(tools_module, "_banned_block_reject_message")

View file

@ -0,0 +1,349 @@
"""Tests for the copilot-v2 post-emission reject of ``task`` / ``task_v2`` block
types (SKY-9174, Part F).
Part C.1 banned the types at the schema-lookup surface via `SchemaOverlay`
pre / post hooks, but the LLM can bypass that by writing YAML directly without
querying the schema. Part F closes the bypass with a YAML-level reject that
fires on every copilot-v2 write path (``_update_workflow`` + inline
``REPLACE_WORKFLOW``), keyed by block label so legacy workflows with
pre-existing ``task`` blocks can still be edited by the copilot.
"""
from __future__ import annotations
import textwrap
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import yaml
from skyvern.forge.sdk.copilot.tools import _detect_new_banned_blocks, _update_workflow
def _yaml(*blocks: dict) -> str:
return yaml.safe_dump(
{"title": "wf", "workflow_definition": {"blocks": list(blocks)}},
sort_keys=False,
)
# ---------- Flat shapes ----------
def test_top_level_task_block_is_detected_on_first_authoring() -> None:
submitted = _yaml({"block_type": "task", "label": "fill_contact_form", "navigation_goal": "do thing"})
result = _detect_new_banned_blocks(submitted, prior_workflow_yaml=None)
assert result == [("fill_contact_form", "task")]
def test_top_level_task_v2_block_is_detected() -> None:
submitted = _yaml({"block_type": "task_v2", "label": "legacy_taskv2", "prompt": "do it"})
result = _detect_new_banned_blocks(submitted, prior_workflow_yaml=None)
assert result == [("legacy_taskv2", "task_v2")]
def test_case_and_whitespace_insensitive() -> None:
submitted = _yaml(
{"block_type": "TASK", "label": "a"},
{"block_type": " task_v2 ", "label": "b"},
{"block_type": "Task", "label": "c"},
)
result = _detect_new_banned_blocks(submitted, prior_workflow_yaml=None)
assert sorted(result) == [("a", "task"), ("b", "task_v2"), ("c", "task")]
def test_mixed_task_and_navigation_only_reports_banned() -> None:
submitted = _yaml(
{"block_type": "navigation", "label": "nav_a", "navigation_goal": "ok"},
{"block_type": "task", "label": "bad", "navigation_goal": "bad"},
{"block_type": "extraction", "label": "ext_a"},
)
result = _detect_new_banned_blocks(submitted, prior_workflow_yaml=None)
assert result == [("bad", "task")]
def test_only_allowed_types_returns_empty() -> None:
submitted = _yaml(
{"block_type": "navigation", "label": "n"},
{"block_type": "extraction", "label": "e"},
{"block_type": "validation", "label": "v"},
{"block_type": "login", "label": "lg"},
{"block_type": "goto_url", "label": "g"},
)
assert _detect_new_banned_blocks(submitted, prior_workflow_yaml=None) == []
# ---------- Malformed ----------
def test_malformed_yaml_is_graceful_no_op() -> None:
# Intentional parse failure — missing close quote.
assert _detect_new_banned_blocks("title: 'unterminated", prior_workflow_yaml=None) == []
def test_missing_workflow_definition_is_graceful_no_op() -> None:
assert _detect_new_banned_blocks("title: wf\n", prior_workflow_yaml=None) == []
def test_blocks_key_not_a_list_is_graceful_no_op() -> None:
bad = "title: wf\nworkflow_definition:\n blocks: not-a-list\n"
assert _detect_new_banned_blocks(bad, prior_workflow_yaml=None) == []
def test_block_entry_not_a_dict_is_skipped() -> None:
# A bare string where a block dict is expected — should be skipped, not crash.
weird = textwrap.dedent(
"""\
title: wf
workflow_definition:
blocks:
- "not a block"
- block_type: task
label: real_banned
"""
)
assert _detect_new_banned_blocks(weird, prior_workflow_yaml=None) == [("real_banned", "task")]
# ---------- Legacy preservation (RISK-1) ----------
def test_preserved_legacy_task_block_under_same_label_does_not_reject() -> None:
prior = _yaml({"block_type": "task", "label": "legacy_task", "navigation_goal": "old"})
submitted = _yaml({"block_type": "task", "label": "legacy_task", "navigation_goal": "old edited"})
assert _detect_new_banned_blocks(submitted, prior_workflow_yaml=prior) == []
def test_new_task_block_alongside_preserved_legacy_reports_only_the_new_one() -> None:
prior = _yaml({"block_type": "task", "label": "legacy_task"})
submitted = _yaml(
{"block_type": "task", "label": "legacy_task"},
{"block_type": "task", "label": "fill_contact_form"},
)
assert _detect_new_banned_blocks(submitted, prior_workflow_yaml=prior) == [("fill_contact_form", "task")]
def test_renamed_legacy_task_block_is_treated_as_new() -> None:
"""Edge case: copilot re-emits a legacy task block under a different label.
The detector has no way to know this is a rename, so it's reported as new.
Acceptable: the copilot can recover by re-using the prior label."""
prior = _yaml({"block_type": "task", "label": "old_name"})
submitted = _yaml({"block_type": "task", "label": "new_name"})
assert _detect_new_banned_blocks(submitted, prior_workflow_yaml=prior) == [("new_name", "task")]
def test_prior_contains_allowed_types_submitted_adds_task_rejects() -> None:
prior = _yaml({"block_type": "navigation", "label": "nav"})
submitted = _yaml(
{"block_type": "navigation", "label": "nav"},
{"block_type": "task", "label": "bad_new"},
)
assert _detect_new_banned_blocks(submitted, prior_workflow_yaml=prior) == [("bad_new", "task")]
def test_legacy_task_v2_preservation() -> None:
prior = _yaml({"block_type": "task_v2", "label": "legacy_v2"})
submitted = _yaml({"block_type": "task_v2", "label": "legacy_v2"})
assert _detect_new_banned_blocks(submitted, prior_workflow_yaml=prior) == []
# ---------- Nested (COMP-1) ----------
def test_task_block_inside_for_loop_is_detected() -> None:
submitted = _yaml(
{
"block_type": "for_loop",
"label": "loop",
"loop_blocks": [
{"block_type": "navigation", "label": "inner_nav"},
{"block_type": "task", "label": "inner_bad"},
],
}
)
assert _detect_new_banned_blocks(submitted, prior_workflow_yaml=None) == [("inner_bad", "task")]
def test_nested_preservation_does_not_reject() -> None:
prior = _yaml(
{
"block_type": "for_loop",
"label": "loop",
"loop_blocks": [{"block_type": "task", "label": "nested_legacy"}],
}
)
submitted = _yaml(
{
"block_type": "for_loop",
"label": "loop",
"loop_blocks": [{"block_type": "task", "label": "nested_legacy"}],
}
)
assert _detect_new_banned_blocks(submitted, prior_workflow_yaml=prior) == []
def test_nested_new_addition_is_detected() -> None:
prior = _yaml(
{
"block_type": "for_loop",
"label": "loop",
"loop_blocks": [{"block_type": "navigation", "label": "nav_inner"}],
}
)
submitted = _yaml(
{
"block_type": "for_loop",
"label": "loop",
"loop_blocks": [
{"block_type": "navigation", "label": "nav_inner"},
{"block_type": "task", "label": "new_nested_bad"},
],
}
)
assert _detect_new_banned_blocks(submitted, prior_workflow_yaml=prior) == [("new_nested_bad", "task")]
def test_deeply_nested_for_loop_is_walked() -> None:
"""for_loop nested inside another for_loop — recursion must reach the innermost level."""
submitted = _yaml(
{
"block_type": "for_loop",
"label": "outer",
"loop_blocks": [
{
"block_type": "for_loop",
"label": "inner",
"loop_blocks": [{"block_type": "task", "label": "deeply_nested_bad"}],
}
],
}
)
assert _detect_new_banned_blocks(submitted, prior_workflow_yaml=None) == [("deeply_nested_bad", "task")]
# ---------- Missing label — should not crash ----------
def test_block_without_label_is_skipped() -> None:
"""A banned block missing the ``label`` key can't be identified for
preservation matching; skip it rather than crash. The YAML validator
downstream will surface the missing-label error on its own."""
submitted = _yaml({"block_type": "task", "navigation_goal": "no label"})
# No label → not collectible; result is empty (downstream Pydantic reject
# will surface the malformed block).
assert _detect_new_banned_blocks(submitted, prior_workflow_yaml=None) == []
# ---------- Integration-shape tests: _update_workflow end-to-end ----------
#
# These exercise the reject path at the tool-helper boundary, confirming the
# detection + error tool-result shape + dedicated OTEL span. The success path
# (YAML with only allowed types, or with preserved legacy task labels) is also
# covered — we patch ``_process_workflow_yaml`` and the workflow-service write
# so the test does not need a DB.
def _ctx(prior_yaml: str | None = None) -> MagicMock:
ctx = MagicMock()
ctx.workflow_yaml = prior_yaml
ctx.workflow_id = "w_test"
ctx.workflow_permanent_id = "wpid_test"
ctx.organization_id = "o_test"
return ctx
@pytest.mark.asyncio
async def test_update_workflow_rejects_new_task_block_and_emits_span() -> None:
submitted = _yaml({"block_type": "task", "label": "fill_contact_form", "navigation_goal": "do"})
ctx = _ctx(prior_yaml=None)
with patch("skyvern.forge.sdk.copilot.tools._record_banned_block_reject_span") as mock_span:
result = await _update_workflow({"workflow_yaml": submitted}, ctx)
assert result["ok"] is False
assert "not available in the workflow copilot" in result["error"]
assert "fill_contact_form" in result["error"]
for alternative in ("navigation", "extraction", "validation", "login"):
assert alternative in result["error"]
# Dedicated span fired with source_tool + items for logfire trend analysis.
mock_span.assert_called_once_with("_update_workflow", [("fill_contact_form", "task")])
@pytest.mark.asyncio
async def test_update_workflow_preserves_legacy_task_block_under_unchanged_label() -> None:
"""Copilot edit of a legacy workflow that already carries a ``task`` block
must not fail the reject. The helper sees the task label in prior YAML
and treats its re-emission as legacy preservation, not a new addition."""
prior = _yaml({"block_type": "task", "label": "legacy_task", "navigation_goal": "old"})
# New YAML preserves the legacy task block AND adds an allowed-type block.
submitted = _yaml(
{"block_type": "task", "label": "legacy_task", "navigation_goal": "old"},
{"block_type": "navigation", "label": "new_nav", "navigation_goal": "new"},
)
ctx = _ctx(prior_yaml=prior)
fake_workflow = MagicMock()
fake_workflow.title = "t"
fake_workflow.description = "d"
fake_workflow.workflow_definition = MagicMock()
fake_workflow.proxy_location = None
fake_workflow.webhook_callback_url = None
fake_workflow.persist_browser_session = False
fake_workflow.model = None
fake_workflow.max_screenshot_scrolls = None
fake_workflow.extra_http_headers = None
fake_workflow.run_with = None
fake_workflow.ai_fallback = None
fake_workflow.cache_key = None
fake_workflow.run_sequentially = None
fake_workflow.sequential_key = None
with (
patch("skyvern.forge.sdk.copilot.tools._process_workflow_yaml", return_value=fake_workflow),
patch("skyvern.forge.sdk.copilot.tools.app") as mock_app,
):
mock_app.WORKFLOW_SERVICE.update_workflow_definition = AsyncMock()
result = await _update_workflow({"workflow_yaml": submitted}, ctx)
assert result["ok"] is True
# The new YAML was accepted and assigned to ctx as the current workflow state.
assert ctx.workflow_yaml == submitted
@pytest.mark.asyncio
async def test_update_workflow_allows_all_allowed_block_types() -> None:
"""Baseline success path: only allowed block types, no prior — passes through."""
submitted = _yaml(
{"block_type": "navigation", "label": "n", "navigation_goal": "x"},
{"block_type": "validation", "label": "v", "complete_criterion": "c"},
)
ctx = _ctx(prior_yaml=None)
fake_workflow = MagicMock()
for attr in (
"title",
"description",
"workflow_definition",
"proxy_location",
"webhook_callback_url",
"persist_browser_session",
"model",
"max_screenshot_scrolls",
"extra_http_headers",
"run_with",
"ai_fallback",
"cache_key",
"run_sequentially",
"sequential_key",
):
setattr(fake_workflow, attr, None)
with (
patch("skyvern.forge.sdk.copilot.tools._process_workflow_yaml", return_value=fake_workflow),
patch("skyvern.forge.sdk.copilot.tools.app") as mock_app,
):
mock_app.WORKFLOW_SERVICE.update_workflow_definition = AsyncMock()
result = await _update_workflow({"workflow_yaml": submitted}, ctx)
assert result["ok"] is True

View file

@ -5,15 +5,9 @@ These tests verify the LLM chokepoint span + SKY-8414
`skyvern/forge/sdk/api/llm/api_handler_factory.py`. They serve as regression
coverage for the instrumentation.
Note: OTEL's global TracerProvider can only be set once per process. This
module installs a shared TracerProvider + InMemorySpanExporter on first use
via `_ensure_provider()`. Other test files that also call
`otel_trace.set_tracer_provider(...)` will clobber or be clobbered depending
on import order. If more test files need span capture, move the provider
setup to a session-scoped fixture in conftest.py.
The tests use OTEL's `InMemorySpanExporter` — no OTEL backend, collector, or
network required. Fast and deterministic.
The `span_exporter` fixture lives in `tests/unit/conftest.py` so any test
module that needs span capture can depend on it without installing its own
TracerProvider (OTEL's global provider can only be set once per process).
"""
from __future__ import annotations
@ -21,9 +15,6 @@ from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import pytest # type: ignore[import-not-found]
from opentelemetry import trace as otel_trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from skyvern.forge.sdk.api.llm import api_handler_factory
@ -38,31 +29,6 @@ LLM_SPAN_NAME = "skyvern.llm.request"
LLM_EVENT_NAME = "llm.request.completed"
_SHARED_EXPORTER: InMemorySpanExporter | None = None
def _ensure_provider() -> InMemorySpanExporter:
"""OTEL's global TracerProvider can only be set once per process. Install
a shared TracerProvider + InMemorySpanExporter on first use; subsequent
tests reuse it and just clear the buffer between runs."""
global _SHARED_EXPORTER
if _SHARED_EXPORTER is None:
exporter = InMemorySpanExporter()
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(exporter))
otel_trace.set_tracer_provider(provider)
_SHARED_EXPORTER = exporter
return _SHARED_EXPORTER
@pytest.fixture
def span_exporter() -> InMemorySpanExporter:
exporter = _ensure_provider()
exporter.clear()
yield exporter
exporter.clear()
def _span_by_name(spans: list, name: str):
return next((s for s in spans if s.name == name), None)

View file

@ -0,0 +1,109 @@
"""Parity guard between the backend BlockType enum and the Fern SDK unions.
The vendored Fern SDK at ``skyvern/client/`` is regenerated manually. When a
new block type is added to ``skyvern/schemas/workflows.BlockType`` without
regenerating the SDK, MCP read paths that still deserialize through the Fern
``Workflow`` type break (see ``test_mcp_workflow_list_drift``). Even though the
MCP tools now bypass Fern for Workflow reads, we still want an early warning
when a regeneration is overdue so downstream clients that *do* use the Fern
SDK directly stay in sync.
This test introspects both Fern discriminated unions and diffs them against
the backend enum. Known-drifted values are tolerated via an allowlist with a
Linear tracking pointer to the regeneration task; any NEW drift fails CI.
"""
from __future__ import annotations
import typing
import pytest
from skyvern.client.types.workflow_definition_blocks_item import WorkflowDefinitionBlocksItem
from skyvern.client.types.workflow_definition_yaml_blocks_item import WorkflowDefinitionYamlBlocksItem
from skyvern.schemas.workflows import BlockType
# Known drift: block types present in the backend but not yet regenerated into
# the vendored Fern SDK. Run `fern generate` (or the equivalent Skyvern SDK
# regen workflow) to resync, then remove the entry here.
# Tracked follow-up: SKY-9227
# https://linear.app/skyvern/issue/SKY-9227/prevent-fern-sdk-drift-from-breaking-workflow-block-types
# Remove this allowlist after the Fern SDK has been regenerated to include
# these values and downstream `skyvern` PyPI + `@skyvern/client` npm packages
# are published.
_KNOWN_DRIFT_ALLOWLIST: frozenset[str] = frozenset(
{
"google_sheets_read",
"google_sheets_write",
}
)
def _fern_union_block_types(union_type: typing.Any) -> set[str]:
"""Extract the ``block_type`` Literal value from every variant of a Fern Union."""
values: set[str] = set()
for variant in typing.get_args(union_type):
annotation = variant.model_fields["block_type"].annotation
literal_args = typing.get_args(annotation)
if not literal_args:
pytest.fail(f"Fern variant {variant.__name__} has a non-Literal block_type annotation: {annotation!r}")
values.update(str(arg) for arg in literal_args)
return values
def test_backend_block_types_present_in_fern_unions() -> None:
"""Every BlockType value must be known to both Fern unions (or allowlisted)."""
backend_values = {member.value for member in BlockType}
fern_blocks = _fern_union_block_types(WorkflowDefinitionBlocksItem)
fern_yaml_blocks = _fern_union_block_types(WorkflowDefinitionYamlBlocksItem)
fern_known = fern_blocks & fern_yaml_blocks
missing_in_fern = backend_values - fern_known - _KNOWN_DRIFT_ALLOWLIST
assert not missing_in_fern, (
"Fern SDK drift detected: the backend `skyvern.schemas.workflows.BlockType` "
f"has value(s) {sorted(missing_in_fern)!r} that the Fern discriminated unions "
"(`WorkflowDefinitionBlocksItem` / `WorkflowDefinitionYamlBlocksItem`) do not know. "
"Run `fern generate` to resync the vendored SDK at skyvern/client/, or add the "
"value to _KNOWN_DRIFT_ALLOWLIST in this test if the drift is intentional "
"and tracked."
)
def test_fern_union_variants_are_subset_of_backend_or_deprecated() -> None:
"""Fern may legitimately retain retired types during a deprecation window.
This direction is informational: if Fern references a block_type the backend
no longer emits, we note it but don't fail — clients using a new SDK against
an older backend is the usual deprecation trajectory.
"""
backend_values = {member.value for member in BlockType}
fern_blocks = _fern_union_block_types(WorkflowDefinitionBlocksItem)
fern_yaml_blocks = _fern_union_block_types(WorkflowDefinitionYamlBlocksItem)
extras = (fern_blocks | fern_yaml_blocks) - backend_values
# Assert parity but allow the allowlist to flex in either direction so a
# retired block type stays tolerated for one regen cycle.
unexpected = extras - _KNOWN_DRIFT_ALLOWLIST
assert not unexpected, (
f"Fern unions reference block_type value(s) {sorted(unexpected)!r} that are "
"not in the backend BlockType enum. If this is a planned deprecation, add "
"the value to _KNOWN_DRIFT_ALLOWLIST; otherwise investigate."
)
def test_allowlist_entries_are_actually_drifted() -> None:
"""Keeps _KNOWN_DRIFT_ALLOWLIST honest: drop stale entries after Fern regen."""
backend_values = {member.value for member in BlockType}
fern_blocks = _fern_union_block_types(WorkflowDefinitionBlocksItem)
fern_yaml_blocks = _fern_union_block_types(WorkflowDefinitionYamlBlocksItem)
fern_known = fern_blocks & fern_yaml_blocks
currently_drifted = (backend_values - fern_known) | (fern_blocks ^ fern_yaml_blocks)
stale_allowlist_entries = _KNOWN_DRIFT_ALLOWLIST - currently_drifted
assert not stale_allowlist_entries, (
f"_KNOWN_DRIFT_ALLOWLIST contains stale entries {sorted(stale_allowlist_entries)!r} "
"that no longer drift. Remove them to keep the allowlist tight."
)

View file

@ -8,7 +8,6 @@ import pytest
import skyvern.cli.mcp_tools.folder as folder_tools
import skyvern.cli.mcp_tools.workflow as workflow_tools
from skyvern.client.errors import BadRequestError
from skyvern.client.raw_client import AsyncRawSkyvern, RawSkyvern
@ -110,12 +109,28 @@ async def test_folder_delete_handles_non_dict_sdk_result(monkeypatch: pytest.Mon
@pytest.mark.asyncio
async def test_workflow_update_folder_calls_sdk(monkeypatch: pytest.MonkeyPatch) -> None:
fake_client = SimpleNamespace(update_workflow_folder=AsyncMock(return_value=_fake_workflow_response()))
payload = {
"workflow_permanent_id": "wpid_test",
"workflow_id": "wf_test",
"title": "Example Workflow",
"version": 1,
"status": "published",
"description": None,
"is_saved_task": False,
"folder_id": "fld_test",
"created_at": "2026-04-23T10:00:00+00:00",
"modified_at": "2026-04-23T10:00:00+00:00",
}
response = SimpleNamespace(status_code=200, json=lambda: payload, text="")
request_mock = AsyncMock(return_value=response)
fake_client = SimpleNamespace(_client_wrapper=SimpleNamespace(httpx_client=SimpleNamespace(request=request_mock)))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
result = await workflow_tools.skyvern_workflow_update_folder("wpid_test", "fld_test")
fake_client.update_workflow_folder.assert_awaited_once_with("wpid_test", folder_id="fld_test")
request_mock.assert_awaited_once()
call_kwargs = request_mock.await_args.kwargs
assert call_kwargs["json"]["folder_id"] == "fld_test"
assert result["ok"] is True
assert result["data"]["workflow_permanent_id"] == "wpid_test"
assert result["data"]["folder_id"] == "fld_test"
@ -132,9 +147,10 @@ async def test_workflow_update_folder_rejects_invalid_folder_id() -> None:
@pytest.mark.asyncio
async def test_workflow_update_folder_surfaces_bad_request(monkeypatch: pytest.MonkeyPatch) -> None:
fake_client = SimpleNamespace(
update_workflow_folder=AsyncMock(side_effect=BadRequestError(body={"detail": "Folder fld_missing not found"}))
)
error_payload = {"detail": "Folder fld_missing not found"}
response = SimpleNamespace(status_code=400, json=lambda: error_payload, text="Folder fld_missing not found")
request_mock = AsyncMock(return_value=response)
fake_client = SimpleNamespace(_client_wrapper=SimpleNamespace(httpx_client=SimpleNamespace(request=request_mock)))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
result = await workflow_tools.skyvern_workflow_update_folder("wpid_test", "fld_missing")

View file

@ -182,7 +182,8 @@ async def test_get_script_code_via_mcp(monkeypatch):
assert result.data["ok"] is True
data = result.data["data"]
assert "fill_form" in data["blocks"]
assert "page.fill" in data["blocks"]["fill_form"]
# Semgrep false positive: this checks a script code path, not a user-supplied URL.
assert "page.fill" in data["blocks"]["fill_form"] # nosemgrep: incomplete-url-substring-sanitization
assert "@skyvern.workflow" in data["main_script"]
@ -342,25 +343,24 @@ async def test_deploy_script_via_mcp(monkeypatch):
@pytest.mark.asyncio
async def test_workflow_create_surfaces_caching_fields_via_mcp(monkeypatch):
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
fake_wf = SimpleNamespace(
workflow_permanent_id="wpid_new",
workflow_id="wf_1",
title="Test",
version=1,
status="published",
description=None,
is_saved_task=False,
folder_id=None,
created_at=now,
modified_at=now,
code_version=2,
adaptive_caching=True,
run_with="code",
)
fake_client = SimpleNamespace(create_workflow=AsyncMock(return_value=fake_wf))
payload = {
"workflow_permanent_id": "wpid_new",
"workflow_id": "wf_1",
"title": "Test",
"version": 1,
"status": "published",
"description": None,
"is_saved_task": False,
"folder_id": None,
"created_at": "2026-04-23T10:00:00+00:00",
"modified_at": "2026-04-23T10:00:00+00:00",
"code_version": 2,
"adaptive_caching": True,
"run_with": "code",
}
response = SimpleNamespace(status_code=200, json=lambda: payload, text="")
request_mock = AsyncMock(return_value=response)
fake_client = SimpleNamespace(_client_wrapper=SimpleNamespace(httpx_client=SimpleNamespace(request=request_mock)))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
definition = json.dumps(

View file

@ -130,5 +130,7 @@ def test_parse_definition_unaffected() -> None:
json_def, _, err = _parse_definition(definition, "json")
assert err is None
assert json_def is not None
assert isinstance(json_def, dict)
# run_with should be "agent" (schema default), not "code"
assert json_def.run_with == "agent"
assert json_def.get("run_with") == "agent"
assert json_def.get("code_version") != 2

View file

@ -0,0 +1,218 @@
"""Regression tests for MCP workflow list resilience to Fern SDK drift.
Context: the vendored Fern SDK at ``skyvern/client/`` validates workflow
responses through a discriminated pydantic Union that does not know about
block types added to the backend after the last SDK regeneration (currently
``google_sheets_read`` and ``google_sheets_write``). Before the Strategy B fix,
``skyvern_workflow_list`` deserialized through that stale Union and would
crash the entire page for any workflow whose definition referenced an
unknown block_type. These tests lock in the bypass: list calls go through
raw httpx and return plain dicts, so new backend block types pass through
unchanged.
"""
from __future__ import annotations
from types import SimpleNamespace
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import httpx
import pytest
from skyvern.cli.mcp_tools import workflow as mcp_workflow
from skyvern.client.core.client_wrapper import AsyncClientWrapper
def _make_workflow_dict(workflow_id: str, block_type: str, *, label: str = "step1") -> dict[str, Any]:
"""Minimal workflow payload shape, close enough to what ``v1/workflows`` returns."""
block: dict[str, Any] = {
"block_type": block_type,
"label": label,
}
if block_type == "google_sheets_read":
block["spreadsheet_url"] = "https://docs.google.com/spreadsheets/d/xxx/edit"
block["sheet_name"] = "Sheet1"
block["range"] = "A1:D100"
block["credential_id"] = "{{ google_credential_id }}"
block["has_header_row"] = True
elif block_type == "navigation":
block["url"] = "https://example.com"
block["navigation_goal"] = "do the thing"
return {
"workflow_permanent_id": workflow_id,
"workflow_id": f"wf_{workflow_id.split('_', 1)[-1]}",
"title": f"Workflow {workflow_id}",
"version": 1,
"status": "published",
"description": None,
"is_saved_task": False,
"folder_id": None,
"created_at": "2026-04-20T10:00:00+00:00",
"modified_at": "2026-04-22T10:00:00+00:00",
"workflow_definition": {
"parameters": [],
"blocks": [block],
},
}
def _patch_skyvern_list_response(
monkeypatch: pytest.MonkeyPatch,
*,
payload: list[dict[str, Any]],
status_code: int = 200,
) -> AsyncMock:
response = MagicMock()
response.status_code = status_code
response.json.return_value = payload
response.text = ""
request_mock = AsyncMock(return_value=response)
httpx_client = SimpleNamespace(request=request_mock)
client_wrapper = SimpleNamespace(httpx_client=httpx_client)
fake_skyvern = SimpleNamespace(_client_wrapper=client_wrapper)
monkeypatch.setattr(mcp_workflow, "get_skyvern", lambda: fake_skyvern)
return request_mock
@pytest.mark.asyncio
async def test_list_succeeds_when_workflow_uses_google_sheets_block(monkeypatch: pytest.MonkeyPatch) -> None:
"""Regression: a google_sheets_read block must not break list deserialization.
Pre-fix behavior: ``skyvern.get_workflows(...)`` ran ``parse_obj_as(List[Workflow], ...)``
on the response, and the stale Fern discriminated Union raised ValidationError
on ``google_sheets_read``, taking the entire page down. The bypass returns raw
dicts, so any block_type string round-trips through the tool untouched.
"""
payload = [
_make_workflow_dict("wpid_ok", "navigation"),
_make_workflow_dict("wpid_sheets", "google_sheets_read"),
]
request_mock = _patch_skyvern_list_response(monkeypatch, payload=payload)
result = await mcp_workflow.skyvern_workflow_list(page=1, page_size=10)
assert result["ok"] is True, result
data = result["data"]
assert data["count"] == 2
assert data["page"] == 1
ids = {wf["workflow_permanent_id"] for wf in data["workflows"]}
assert ids == {"wpid_ok", "wpid_sheets"}
request_mock.assert_awaited_once()
call = request_mock.await_args
assert call.args[0] == "v1/workflows"
assert call.kwargs["method"] == "GET"
params = call.kwargs["params"]
assert "search_key" not in params
assert params["page"] == 1
assert params["page_size"] == 10
assert params["only_workflows"] is False
@pytest.mark.asyncio
async def test_list_includes_search_key_only_when_search_is_present(monkeypatch: pytest.MonkeyPatch) -> None:
"""Avoid leaking a literal search_key=None query parameter."""
request_mock = _patch_skyvern_list_response(monkeypatch, payload=[])
result = await mcp_workflow.skyvern_workflow_list(search="invoice", page=2, page_size=5)
assert result["ok"] is True, result
params = request_mock.await_args.kwargs["params"]
assert params == {
"search_key": "invoice",
"page": 2,
"page_size": 5,
"only_workflows": False,
}
@pytest.mark.asyncio
async def test_raw_list_uses_fern_http_wrapper_auth_headers(monkeypatch: pytest.MonkeyPatch) -> None:
"""The raw bypass still uses Fern's HTTP wrapper, including auth and query encoding."""
seen_requests: list[httpx.Request] = []
async def handler(request: httpx.Request) -> httpx.Response:
seen_requests.append(request)
return httpx.Response(200, json=[])
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as base_client:
client_wrapper = AsyncClientWrapper(
api_key="sk_test",
headers={"x-test-header": "present"},
base_url="https://api.example.test/api",
timeout=60,
httpx_client=base_client,
)
fake_skyvern = SimpleNamespace(_client_wrapper=client_wrapper)
monkeypatch.setattr(mcp_workflow, "get_skyvern", lambda: fake_skyvern)
result = await mcp_workflow.skyvern_workflow_list(only_workflows=True)
assert result["ok"] is True, result
assert len(seen_requests) == 1
request = seen_requests[0]
assert request.url.path == "/api/v1/workflows"
assert request.headers["x-api-key"] == "sk_test"
assert request.headers["x-test-header"] == "present"
assert request.headers["x-fern-sdk-name"] == "skyvern"
assert request.url.params["only_workflows"] == "true"
def test_extract_error_detail_truncates_non_json_response_text() -> None:
"""Avoid returning an entire proxy/load-balancer HTML page in MCP errors."""
response = MagicMock()
response.json.side_effect = ValueError("not json")
response.text = "x" * 600
detail = mcp_workflow._extract_error_detail(response)
assert detail == "x" * 500
@pytest.mark.asyncio
async def test_list_surfaces_http_error(monkeypatch: pytest.MonkeyPatch) -> None:
"""Non-2xx responses return an API_ERROR result, not a crash."""
response = MagicMock()
response.status_code = 500
response.json.return_value = {"detail": "boom"}
response.text = '{"detail": "boom"}'
request_mock = AsyncMock(return_value=response)
fake_skyvern = SimpleNamespace(_client_wrapper=SimpleNamespace(httpx_client=SimpleNamespace(request=request_mock)))
monkeypatch.setattr(mcp_workflow, "get_skyvern", lambda: fake_skyvern)
result = await mcp_workflow.skyvern_workflow_list()
assert result["ok"] is False
assert "500" in result["error"]["message"]
@pytest.mark.asyncio
async def test_list_preserves_serialization_shape(monkeypatch: pytest.MonkeyPatch) -> None:
"""Response payload keys stay stable so MCP clients see no contract change."""
payload = [_make_workflow_dict("wpid_ok", "navigation")]
_patch_skyvern_list_response(monkeypatch, payload=payload)
result = await mcp_workflow.skyvern_workflow_list(page_size=5)
assert result["ok"] is True
data = result["data"]
assert set(data.keys()) == {"workflows", "page", "page_size", "count", "has_more", "sdk_equivalent"}
wf = data["workflows"][0]
expected_keys = {
"workflow_permanent_id",
"workflow_id",
"title",
"version",
"status",
"description",
"is_saved_task",
"folder_id",
"created_at",
"modified_at",
}
assert expected_keys <= set(wf.keys())
assert wf["created_at"] == "2026-04-20T10:00:00+00:00"

View file

@ -36,6 +36,73 @@ def _fake_http_response(payload: dict[str, object], status_code: int = 200) -> S
)
def _fake_workflow_dict(**overrides: object) -> dict[str, object]:
"""Plain dict shape returned by `v1/workflows` — matches _fake_workflow_response()."""
data: dict[str, object] = {
"workflow_permanent_id": "wpid_test",
"workflow_id": "wf_test",
"title": "Example Workflow",
"version": 1,
"status": "published",
"description": None,
"is_saved_task": False,
"folder_id": None,
"created_at": "2026-04-23T10:00:00+00:00",
"modified_at": "2026-04-23T10:00:00+00:00",
}
data.update(overrides)
return data
def _patch_skyvern_http(
monkeypatch: pytest.MonkeyPatch,
*,
response_payload: object,
status_code: int = 200,
) -> AsyncMock:
"""Patch ``workflow_tools.get_skyvern`` to return a fake client whose httpx
request returns the given payload. Returns the request AsyncMock so tests
can assert on what was sent.
"""
response = SimpleNamespace(
status_code=status_code,
json=lambda: response_payload,
text=json.dumps(response_payload),
)
request_mock = AsyncMock(return_value=response)
fake_skyvern = SimpleNamespace(
_client_wrapper=SimpleNamespace(httpx_client=SimpleNamespace(request=request_mock)),
)
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_skyvern)
return request_mock
def _google_sheets_definition(block_type: str) -> dict[str, object]:
block: dict[str, object] = {
"block_type": block_type,
"label": f"{block_type}_step",
"spreadsheet_url": "https://docs.google.com/spreadsheets/d/SPREADSHEET_ID/edit",
"sheet_name": "Sheet1",
"range": "A1:D100",
"credential_id": "{{ google_credential_id }}",
}
if block_type == "google_sheets_read":
block["has_header_row"] = True
elif block_type == "google_sheets_write":
block["write_mode"] = "append"
block["values"] = "{{ output_data | tojson }}"
else:
raise ValueError(f"Unsupported google sheets block type: {block_type}")
return {
"title": f"{block_type} workflow",
"workflow_definition": {
"parameters": [],
"blocks": [block],
},
}
def _heavy_workflow_run_payload(*, include_expanded_outputs: bool = True) -> dict[str, object]:
long_url = "https://artifacts.skyvern.example/" + ("x" * 1450)
screenshot_urls = [f"{long_url}-{idx}" for idx in range(6)]
@ -86,10 +153,64 @@ def _heavy_workflow_run_payload(*, include_expanded_outputs: bool = True) -> dic
}
@pytest.mark.parametrize("block_type", ["google_sheets_read", "google_sheets_write"])
@pytest.mark.asyncio
async def test_workflow_create_sends_google_sheets_json_definition_as_raw_dict(
monkeypatch: pytest.MonkeyPatch, block_type: str
) -> None:
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
definition = _google_sheets_definition(block_type)
result = await workflow_tools.skyvern_workflow_create(definition=json.dumps(definition), format="json")
assert result["ok"] is True, result
sent_definition = request_mock.await_args.kwargs["json"]["json_definition"]
assert type(sent_definition) is dict
assert not hasattr(sent_definition, "model_dump")
sent_block = sent_definition["workflow_definition"]["blocks"][0]
assert sent_block["block_type"] == block_type
assert sent_block["spreadsheet_url"] == "https://docs.google.com/spreadsheets/d/SPREADSHEET_ID/edit"
@pytest.mark.parametrize("block_type", ["google_sheets_read", "google_sheets_write"])
@pytest.mark.asyncio
async def test_workflow_update_sends_google_sheets_json_definition_as_raw_dict(
monkeypatch: pytest.MonkeyPatch, block_type: str
) -> None:
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
assert workflow_id == "wpid_test"
assert version is None
return {
"proxy_location": "RESIDENTIAL",
"workflow_definition": {
"parameters": [],
"blocks": [],
},
}
monkeypatch.setattr(workflow_tools, "_get_workflow_by_id", fake_get_workflow_by_id)
definition = _google_sheets_definition(block_type)
result = await workflow_tools.skyvern_workflow_update(
workflow_id="wpid_test",
definition=json.dumps(definition),
format="json",
)
assert result["ok"] is True, result
sent_definition = request_mock.await_args.kwargs["json"]["json_definition"]
assert type(sent_definition) is dict
assert not hasattr(sent_definition, "model_dump")
sent_block = sent_definition["workflow_definition"]["blocks"][0]
assert sent_block["block_type"] == block_type
assert sent_block["spreadsheet_url"] == "https://docs.google.com/spreadsheets/d/SPREADSHEET_ID/edit"
@pytest.mark.asyncio
async def test_workflow_create_normalizes_invalid_text_prompt_llm_key(monkeypatch: pytest.MonkeyPatch) -> None:
fake_client = SimpleNamespace(create_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
definition = {
"title": "Normalize invalid llm_key",
@ -108,19 +229,18 @@ async def test_workflow_create_normalizes_invalid_text_prompt_llm_key(monkeypatc
result = await workflow_tools.skyvern_workflow_create(definition=json.dumps(definition), format="json")
sent_definition = fake_client.create_workflow.await_args.kwargs["json_definition"]
sent_block = sent_definition.workflow_definition.blocks[0]
sent_definition = request_mock.await_args.kwargs["json"]["json_definition"]
sent_block = sent_definition["workflow_definition"]["blocks"][0]
assert result["ok"] is True
assert sent_block.llm_key is None
assert sent_block.model is None
assert "ANTHROPIC_CLAUDE_3_5_SONNET" not in json.dumps(sent_definition.model_dump(mode="json"))
assert sent_block.get("llm_key") is None
assert sent_block.get("model") is None
assert "ANTHROPIC_CLAUDE_3_5_SONNET" not in json.dumps(sent_definition)
@pytest.mark.asyncio
async def test_workflow_create_preserves_explicit_internal_text_prompt_llm_key(monkeypatch: pytest.MonkeyPatch) -> None:
fake_client = SimpleNamespace(create_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
definition = {
"title": "Preserve explicit internal llm_key",
@ -143,12 +263,12 @@ async def test_workflow_create_preserves_explicit_internal_text_prompt_llm_key(m
):
result = await workflow_tools.skyvern_workflow_create(definition=json.dumps(definition), format="json")
sent_definition = fake_client.create_workflow.await_args.kwargs["json_definition"]
sent_block = sent_definition.workflow_definition.blocks[0]
sent_definition = request_mock.await_args.kwargs["json"]["json_definition"]
sent_block = sent_definition["workflow_definition"]["blocks"][0]
assert result["ok"] is True
assert sent_block.model is None
assert sent_block.llm_key == "SPECIAL_INTERNAL_KEY"
assert sent_block.get("model") is None
assert sent_block.get("llm_key") == "SPECIAL_INTERNAL_KEY"
# ---------------------------------------------------------------------------
@ -174,8 +294,7 @@ async def test_mcp_strips_all_common_hallucinated_llm_keys(
monkeypatch: pytest.MonkeyPatch, hallucinated_key: str
) -> None:
"""MCP workflow creation must strip ANY hallucinated llm_key and default to Skyvern Optimized (null)."""
fake_client = SimpleNamespace(create_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
definition = {
"title": "Agent-generated workflow",
@ -198,25 +317,23 @@ async def test_mcp_strips_all_common_hallucinated_llm_keys(
):
result = await workflow_tools.skyvern_workflow_create(definition=json.dumps(definition), format="auto")
sent_def = fake_client.create_workflow.await_args.kwargs["json_definition"]
sent_block = sent_def.workflow_definition.blocks[0]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
sent_block = sent_def["workflow_definition"]["blocks"][0]
assert result["ok"] is True
assert sent_block.llm_key is None, f"hallucinated key {hallucinated_key!r} was NOT stripped"
assert sent_block.model is None, "should default to Skyvern Optimized (null model)"
assert sent_block.get("llm_key") is None, f"hallucinated key {hallucinated_key!r} was NOT stripped"
assert sent_block.get("model") is None, "should default to Skyvern Optimized (null model)"
@pytest.mark.asyncio
async def test_workflow_create_preserves_unknown_fields(monkeypatch: pytest.MonkeyPatch) -> None:
"""Fields not in the internal schema should survive normalization.
The Fern-generated WorkflowCreateYamlRequest uses extra='allow', so unknown
fields are accepted. Our normalization deep-merges the original raw dict with
the normalized output so that future SDK fields not yet mirrored in the
internal schema are preserved at any nesting depth.
Our normalization deep-merges the original raw dict with the backend schema
output so fields not yet mirrored in that schema are preserved at any
nesting depth.
"""
fake_client = SimpleNamespace(create_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
definition = {
"title": "Unknown fields test",
@ -237,19 +354,17 @@ async def test_workflow_create_preserves_unknown_fields(monkeypatch: pytest.Monk
result = await workflow_tools.skyvern_workflow_create(definition=json.dumps(definition), format="json")
assert result["ok"] is True
sent = fake_client.create_workflow.await_args.kwargs["json_definition"]
sent = request_mock.await_args.kwargs["json"]["json_definition"]
# Top-level unknown field preserved
assert sent.some_future_sdk_field == "should_survive"
assert sent.get("some_future_sdk_field") == "should_survive"
# Nested unknown field inside workflow_definition also preserved via deep merge
wd = sent.workflow_definition
wd_dict = wd.model_dump(mode="json") if hasattr(wd, "model_dump") else wd.__dict__
assert wd_dict.get("some_nested_future_field") == "also_survives"
wd = sent["workflow_definition"]
assert wd.get("some_nested_future_field") == "also_survives"
@pytest.mark.asyncio
async def test_workflow_create_defaults_proxy_location_when_omitted(monkeypatch: pytest.MonkeyPatch) -> None:
fake_client = SimpleNamespace(create_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
definition = {
"title": "Default proxy workflow",
@ -269,16 +384,15 @@ async def test_workflow_create_defaults_proxy_location_when_omitted(monkeypatch:
result = await workflow_tools.skyvern_workflow_create(definition=json.dumps(definition), format="json")
sent_definition = fake_client.create_workflow.await_args.kwargs["json_definition"]
sent_definition = request_mock.await_args.kwargs["json"]["json_definition"]
assert result["ok"] is True
assert sent_definition.proxy_location == "RESIDENTIAL"
assert sent_definition.get("proxy_location") == "RESIDENTIAL"
@pytest.mark.asyncio
async def test_workflow_create_preserves_block_level_unknown_fields(monkeypatch: pytest.MonkeyPatch) -> None:
"""Unknown fields inside individual block dicts survive normalization via deep merge."""
fake_client = SimpleNamespace(create_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
definition = {
"title": "Block-level unknown fields test",
@ -298,16 +412,14 @@ async def test_workflow_create_preserves_block_level_unknown_fields(monkeypatch:
result = await workflow_tools.skyvern_workflow_create(definition=json.dumps(definition), format="json")
assert result["ok"] is True
sent = fake_client.create_workflow.await_args.kwargs["json_definition"]
sent_block = sent.workflow_definition.blocks[0]
block_dict = sent_block.model_dump(mode="json") if hasattr(sent_block, "model_dump") else sent_block.__dict__
assert block_dict.get("some_future_block_field") == 42
sent = request_mock.await_args.kwargs["json"]["json_definition"]
sent_block = sent["workflow_definition"]["blocks"][0]
assert sent_block.get("some_future_block_field") == 42
@pytest.mark.asyncio
async def test_workflow_update_preserves_existing_proxy_when_omitted(monkeypatch: pytest.MonkeyPatch) -> None:
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
assert workflow_id == "wpid_test"
@ -338,15 +450,14 @@ async def test_workflow_update_preserves_existing_proxy_when_omitted(monkeypatch
format="json",
)
sent_definition = fake_client.update_workflow.await_args.kwargs["json_definition"]
sent_definition = request_mock.await_args.kwargs["json"]["json_definition"]
assert result["ok"] is True
assert sent_definition.proxy_location == "RESIDENTIAL_AU"
assert sent_definition.get("proxy_location") == "RESIDENTIAL_AU"
@pytest.mark.asyncio
async def test_workflow_update_defaults_proxy_when_existing_is_null(monkeypatch: pytest.MonkeyPatch) -> None:
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
assert workflow_id == "wpid_test"
@ -377,16 +488,15 @@ async def test_workflow_update_defaults_proxy_when_existing_is_null(monkeypatch:
format="json",
)
sent_definition = fake_client.update_workflow.await_args.kwargs["json_definition"]
sent_definition = request_mock.await_args.kwargs["json"]["json_definition"]
assert result["ok"] is True
assert sent_definition.proxy_location == "RESIDENTIAL"
assert sent_definition.get("proxy_location") == "RESIDENTIAL"
@pytest.mark.asyncio
async def test_workflow_create_falls_back_on_schema_validation_error(monkeypatch: pytest.MonkeyPatch) -> None:
"""If the internal schema rejects the payload, normalization is skipped and the raw dict is forwarded."""
fake_client = SimpleNamespace(create_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
definition = {
"title": "Schema rejection test",
@ -410,11 +520,10 @@ async def test_workflow_create_falls_back_on_schema_validation_error(monkeypatch
result = await workflow_tools.skyvern_workflow_create(definition=json.dumps(definition), format="json")
assert result["ok"] is True
sent = fake_client.create_workflow.await_args.kwargs["json_definition"]
sent = request_mock.await_args.kwargs["json"]["json_definition"]
# Normalization was skipped, so the hallucinated key passes through to the SDK
sent_block = sent.workflow_definition.blocks[0]
block_dict = sent_block.model_dump(mode="json") if hasattr(sent_block, "model_dump") else sent_block.__dict__
assert block_dict.get("llm_key") == "HALLUCINATED_KEY"
sent_block = sent["workflow_definition"]["blocks"][0]
assert sent_block.get("llm_key") == "HALLUCINATED_KEY"
@pytest.mark.parametrize("format_name", ["json", "auto"])
@ -476,8 +585,7 @@ def test_deep_merge_preserves_block_unknown_fields_when_list_lengths_differ() ->
@pytest.mark.asyncio
async def test_mcp_text_prompt_without_llm_key_stays_null(monkeypatch: pytest.MonkeyPatch) -> None:
"""When MCP correctly omits llm_key (Skyvern Optimized), it stays null through the whole pipeline."""
fake_client = SimpleNamespace(create_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
definition = {
"title": "Well-behaved MCP workflow",
@ -494,11 +602,11 @@ async def test_mcp_text_prompt_without_llm_key_stays_null(monkeypatch: pytest.Mo
}
result = await workflow_tools.skyvern_workflow_create(definition=json.dumps(definition), format="json")
sent_block = fake_client.create_workflow.await_args.kwargs["json_definition"].workflow_definition.blocks[0]
sent_block = request_mock.await_args.kwargs["json"]["json_definition"]["workflow_definition"]["blocks"][0]
assert result["ok"] is True
assert sent_block.llm_key is None
assert sent_block.model is None
assert sent_block.get("llm_key") is None
assert sent_block.get("model") is None
@pytest.mark.asyncio
@ -644,8 +752,7 @@ async def test_workflow_update_preserves_credential_parameters_when_omitted(
"""When the update definition omits credential parameters, they should be
injected from the existing workflow so the login block keeps working."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -711,13 +818,13 @@ async def test_workflow_update_preserves_credential_parameters_when_omitted(
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
# Verify credential parameter was injected
params = sent_def.workflow_definition.parameters
cred_params = [p for p in params if getattr(p, "parameter_type", None) == "credential"]
params = sent_def["workflow_definition"]["parameters"]
cred_params = [p for p in params if p.get("parameter_type") == "credential"]
assert len(cred_params) == 1
assert cred_params[0].credential_id == "cred_abc123"
assert cred_params[0].key == "credentials"
assert cred_params[0]["credential_id"] == "cred_abc123"
assert cred_params[0]["key"] == "credentials"
@pytest.mark.asyncio
@ -727,8 +834,7 @@ async def test_workflow_update_injects_credential_key_into_block_parameter_keys(
"""When the update definition omits the credential parameter key from a login
block's parameter_keys, the key should be injected from the existing workflow."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -792,17 +898,17 @@ async def test_workflow_update_injects_credential_key_into_block_parameter_keys(
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
# Verify credential parameter was injected
params = sent_def.workflow_definition.parameters
cred_params = [p for p in params if getattr(p, "parameter_type", None) == "credential"]
params = sent_def["workflow_definition"]["parameters"]
cred_params = [p for p in params if p.get("parameter_type") == "credential"]
assert len(cred_params) == 1
assert cred_params[0].credential_id == "cred_abc123"
assert cred_params[0]["credential_id"] == "cred_abc123"
# Verify the login block now references the credential parameter key
blocks = sent_def.workflow_definition.blocks
login_block = next(b for b in blocks if getattr(b, "label", None) == "login_block")
pkeys = getattr(login_block, "parameter_keys", [])
blocks = sent_def["workflow_definition"]["blocks"]
login_block = next(b for b in blocks if b.get("label") == "login_block")
pkeys = login_block.get("parameter_keys", [])
assert "credentials" in pkeys
@ -813,8 +919,7 @@ async def test_workflow_update_injects_credential_key_when_parameter_keys_omitte
"""When the update definition omits parameter_keys entirely from the block,
credential keys should still be injected."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -863,16 +968,16 @@ async def test_workflow_update_injects_credential_key_when_parameter_keys_omitte
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
# Verify credential parameter was injected
params = sent_def.workflow_definition.parameters
cred_params = [p for p in params if getattr(p, "parameter_type", None) == "credential"]
params = sent_def["workflow_definition"]["parameters"]
cred_params = [p for p in params if p.get("parameter_type") == "credential"]
assert len(cred_params) == 1
# Verify the login block now has the credential key
blocks = sent_def.workflow_definition.blocks
login_block = next(b for b in blocks if getattr(b, "label", None) == "login_block")
pkeys = getattr(login_block, "parameter_keys", [])
blocks = sent_def["workflow_definition"]["blocks"]
login_block = next(b for b in blocks if b.get("label") == "login_block")
pkeys = login_block.get("parameter_keys", [])
assert "credentials" in pkeys
@ -883,8 +988,7 @@ async def test_workflow_update_preserves_block_credential_when_param_already_inc
"""When the update already includes the credential parameter but the block omits
the key from parameter_keys, the key should still be injected into the block."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -940,15 +1044,15 @@ async def test_workflow_update_preserves_block_credential_when_param_already_inc
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
blocks = sent_def.workflow_definition.blocks
login_block = next(b for b in blocks if getattr(b, "label", None) == "login_block")
pkeys = getattr(login_block, "parameter_keys", [])
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
blocks = sent_def["workflow_definition"]["blocks"]
login_block = next(b for b in blocks if b.get("label") == "login_block")
pkeys = login_block.get("parameter_keys", [])
assert "credentials" in pkeys
# Should not duplicate the credential parameter
params = sent_def.workflow_definition.parameters
cred_params = [p for p in params if getattr(p, "parameter_type", None) == "credential"]
params = sent_def["workflow_definition"]["parameters"]
cred_params = [p for p in params if p.get("parameter_type") == "credential"]
assert len(cred_params) == 1
@ -959,8 +1063,7 @@ async def test_workflow_update_does_not_duplicate_existing_credential_parameter(
"""When the update definition already includes the credential parameter,
it should NOT be duplicated."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -1016,9 +1119,9 @@ async def test_workflow_update_does_not_duplicate_existing_credential_parameter(
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
params = sent_def.workflow_definition.parameters
cred_params = [p for p in params if getattr(p, "parameter_type", None) == "credential"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
params = sent_def["workflow_definition"]["parameters"]
cred_params = [p for p in params if p.get("parameter_type") == "credential"]
# Should still be exactly 1, not duplicated
assert len(cred_params) == 1
@ -1030,8 +1133,7 @@ async def test_workflow_update_credential_keys_injected_when_login_block_label_r
"""When Claude renames a login block label (e.g. 'login_block' -> 'login'),
credential parameter_keys should still be injected via type-based matching."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -1082,18 +1184,18 @@ async def test_workflow_update_credential_keys_injected_when_login_block_label_r
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
# Credential parameter must be injected
params = sent_def.workflow_definition.parameters
cred_params = [p for p in params if getattr(p, "parameter_type", None) == "credential"]
params = sent_def["workflow_definition"]["parameters"]
cred_params = [p for p in params if p.get("parameter_type") == "credential"]
assert len(cred_params) == 1
assert cred_params[0].credential_id == "cred_abc123"
assert cred_params[0]["credential_id"] == "cred_abc123"
# Login block must have credential key despite label mismatch
blocks = sent_def.workflow_definition.blocks
login_block = next(b for b in blocks if getattr(b, "block_type", None) == "login")
pkeys = getattr(login_block, "parameter_keys", [])
blocks = sent_def["workflow_definition"]["blocks"]
login_block = next(b for b in blocks if b.get("block_type") == "login")
pkeys = login_block.get("parameter_keys", [])
assert "credentials" in pkeys
@ -1104,8 +1206,7 @@ async def test_workflow_update_always_replaces_wrong_credential_id(
"""When Claude includes a credential parameter with the wrong credential_id,
the existing workflow's credential_id should always win."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -1161,12 +1262,12 @@ async def test_workflow_update_always_replaces_wrong_credential_id(
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
params = sent_def.workflow_definition.parameters
cred_params = [p for p in params if getattr(p, "parameter_type", None) == "credential"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
params = sent_def["workflow_definition"]["parameters"]
cred_params = [p for p in params if p.get("parameter_type") == "credential"]
assert len(cred_params) == 1
# The existing (correct) credential_id must win
assert cred_params[0].credential_id == "cred_CORRECT"
assert cred_params[0]["credential_id"] == "cred_CORRECT"
@pytest.mark.asyncio
@ -1176,8 +1277,7 @@ async def test_workflow_update_correct_credential_still_works(
"""When Claude includes the credential parameter correctly, the always-replace
strategy should still work (idempotent, no regression)."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -1245,21 +1345,21 @@ async def test_workflow_update_correct_credential_still_works(
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
params = sent_def.workflow_definition.parameters
cred_params = [p for p in params if getattr(p, "parameter_type", None) == "credential"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
params = sent_def["workflow_definition"]["parameters"]
cred_params = [p for p in params if p.get("parameter_type") == "credential"]
assert len(cred_params) == 1
assert cred_params[0].credential_id == "cred_abc123"
assert cred_params[0]["credential_id"] == "cred_abc123"
# Non-credential params should be preserved from the update
wf_params = [p for p in params if getattr(p, "parameter_type", None) == "workflow"]
wf_params = [p for p in params if p.get("parameter_type") == "workflow"]
assert len(wf_params) == 1
assert wf_params[0].default_value == "https://new-url.com"
assert wf_params[0]["default_value"] == "https://new-url.com"
# Login block should have the credential key
blocks = sent_def.workflow_definition.blocks
login_block = next(b for b in blocks if getattr(b, "block_type", None) == "login")
pkeys = getattr(login_block, "parameter_keys", [])
blocks = sent_def["workflow_definition"]["blocks"]
login_block = next(b for b in blocks if b.get("block_type") == "login")
pkeys = login_block.get("parameter_keys", [])
assert "credentials" in pkeys
@ -1270,8 +1370,7 @@ async def test_workflow_update_multiple_login_blocks_all_get_credential_keys(
"""When the update has multiple login blocks, ALL of them should get credential
parameter_keys injected (even if labels don't match existing blocks)."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -1330,19 +1429,19 @@ async def test_workflow_update_multiple_login_blocks_all_get_credential_keys(
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
blocks = sent_def.workflow_definition.blocks
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
blocks = sent_def["workflow_definition"]["blocks"]
# Both login blocks should have credential keys
login_blocks = [b for b in blocks if getattr(b, "block_type", None) == "login"]
login_blocks = [b for b in blocks if b.get("block_type") == "login"]
assert len(login_blocks) == 2
for lb in login_blocks:
pkeys = getattr(lb, "parameter_keys", [])
assert "credentials" in pkeys, f"Login block {getattr(lb, 'label', '?')} missing credential key"
pkeys = lb.get("parameter_keys", [])
assert "credentials" in pkeys, f"Login block {lb.get('label', '?')} missing credential key"
# Task block should NOT have credential keys (no label match, not a login block)
task_block = next(b for b in blocks if getattr(b, "block_type", None) == "task")
task_pkeys = getattr(task_block, "parameter_keys", None) or []
task_block = next(b for b in blocks if b.get("block_type") == "task")
task_pkeys = task_block.get("parameter_keys") or []
assert "credentials" not in task_pkeys
@ -1353,8 +1452,7 @@ async def test_workflow_update_credential_keys_injected_into_login_block_nested_
"""When a login block is nested inside a for_loop block, credential parameter_keys
should still be injected via type-based matching."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -1416,20 +1514,20 @@ async def test_workflow_update_credential_keys_injected_into_login_block_nested_
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
# Credential parameter must be injected
params = sent_def.workflow_definition.parameters
cred_params = [p for p in params if getattr(p, "parameter_type", None) == "credential"]
params = sent_def["workflow_definition"]["parameters"]
cred_params = [p for p in params if p.get("parameter_type") == "credential"]
assert len(cred_params) == 1
assert cred_params[0].credential_id == "cred_abc123"
assert cred_params[0]["credential_id"] == "cred_abc123"
# The nested login block inside for_loop must have credential keys
blocks = sent_def.workflow_definition.blocks
loop_block = next(b for b in blocks if getattr(b, "block_type", None) == "for_loop")
nested_blocks = getattr(loop_block, "loop_blocks", [])
login_block = next(b for b in nested_blocks if getattr(b, "block_type", None) == "login")
pkeys = getattr(login_block, "parameter_keys", [])
blocks = sent_def["workflow_definition"]["blocks"]
loop_block = next(b for b in blocks if b.get("block_type") == "for_loop")
nested_blocks = loop_block.get("loop_blocks", [])
login_block = next(b for b in nested_blocks if b.get("block_type") == "login")
pkeys = login_block.get("parameter_keys", [])
assert "credentials" in pkeys
@ -1440,8 +1538,7 @@ async def test_workflow_update_login_block_only_gets_credential_type_keys_not_aw
"""When a workflow has both a credential param and an aws_secret param,
login blocks should only get the credential key, not the aws_secret key."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -1506,26 +1603,26 @@ async def test_workflow_update_login_block_only_gets_credential_type_keys_not_aw
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
# Both params should be preserved
params = sent_def.workflow_definition.parameters
cred_params = [p for p in params if getattr(p, "parameter_type", None) == "credential"]
aws_params = [p for p in params if getattr(p, "parameter_type", None) == "aws_secret"]
params = sent_def["workflow_definition"]["parameters"]
cred_params = [p for p in params if p.get("parameter_type") == "credential"]
aws_params = [p for p in params if p.get("parameter_type") == "aws_secret"]
assert len(cred_params) == 1
assert len(aws_params) == 1
blocks = sent_def.workflow_definition.blocks
blocks = sent_def["workflow_definition"]["blocks"]
# Login block should ONLY get credential key, NOT aws_secret
login_block = next(b for b in blocks if getattr(b, "block_type", None) == "login")
login_pkeys = getattr(login_block, "parameter_keys", [])
login_block = next(b for b in blocks if b.get("block_type") == "login")
login_pkeys = login_block.get("parameter_keys", [])
assert "credentials" in login_pkeys
assert "api_secret" not in login_pkeys
# Task block should get aws_secret via label fallback (label unchanged)
task_block = next(b for b in blocks if getattr(b, "block_type", None) == "task")
task_pkeys = getattr(task_block, "parameter_keys", [])
task_block = next(b for b in blocks if b.get("block_type") == "task")
task_pkeys = task_block.get("parameter_keys", [])
assert "api_secret" in task_pkeys
@ -1537,8 +1634,7 @@ async def test_workflow_update_strips_runtime_fields_from_credential_params(
(credential_parameter_id, workflow_id, created_at, modified_at, deleted_at),
those fields must be stripped before re-injecting into the update definition."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -1593,26 +1689,24 @@ async def test_workflow_update_strips_runtime_fields_from_credential_params(
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
params = sent_def.workflow_definition.parameters
cred_params = [p for p in params if getattr(p, "parameter_type", None) == "credential"]
params = sent_def["workflow_definition"]["parameters"]
cred_params = [p for p in params if p.get("parameter_type") == "credential"]
assert len(cred_params) == 1
cred = cred_params[0]
# Business fields should be preserved
assert getattr(cred, "key", None) == "credentials"
assert getattr(cred, "credential_id", None) == "cred_abc123"
assert getattr(cred, "description", None) == "Login credentials"
assert cred.get("key") == "credentials"
assert cred.get("credential_id") == "cred_abc123"
assert cred.get("description") == "Login credentials"
# Runtime fields must NOT be present — Fern SDK types use extra="allow"
# so extra fields survive as attributes if passed through
cred_dict = cred.dict() if hasattr(cred, "dict") else cred.__dict__
assert "credential_parameter_id" not in cred_dict
assert "workflow_id" not in cred_dict
assert "created_at" not in cred_dict
assert "modified_at" not in cred_dict
assert "deleted_at" not in cred_dict
# Runtime fields must NOT be present
assert "credential_parameter_id" not in cred
assert "workflow_id" not in cred
assert "created_at" not in cred
assert "modified_at" not in cred
assert "deleted_at" not in cred
@pytest.mark.asyncio
@ -1621,8 +1715,7 @@ async def test_workflow_update_preserves_workflow_credential_id_params_and_injec
) -> None:
"""Workflow parameters with `credential_id` type should be preserved like direct credential params."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -1690,34 +1783,32 @@ async def test_workflow_update_preserves_workflow_credential_id_params_and_injec
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
params = sent_def.workflow_definition.parameters
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
params = sent_def["workflow_definition"]["parameters"]
workflow_cred_params = [
p
for p in params
if getattr(p, "parameter_type", None) == "workflow"
and str(getattr(p, "workflow_parameter_type", None)) == "credential_id"
if p.get("parameter_type") == "workflow" and str(p.get("workflow_parameter_type")) == "credential_id"
]
assert len(workflow_cred_params) == 1
workflow_cred = workflow_cred_params[0]
assert getattr(workflow_cred, "key", None) == "my_creds"
assert getattr(workflow_cred, "default_value", None) == "cred_abc123"
assert workflow_cred.get("key") == "my_creds"
assert workflow_cred.get("default_value") == "cred_abc123"
workflow_cred_dict = workflow_cred.dict() if hasattr(workflow_cred, "dict") else workflow_cred.__dict__
assert "workflow_parameter_id" not in workflow_cred_dict
assert "workflow_id" not in workflow_cred_dict
assert "created_at" not in workflow_cred_dict
assert "modified_at" not in workflow_cred_dict
assert "deleted_at" not in workflow_cred_dict
assert "workflow_parameter_id" not in workflow_cred
assert "workflow_id" not in workflow_cred
assert "created_at" not in workflow_cred
assert "modified_at" not in workflow_cred
assert "deleted_at" not in workflow_cred
workflow_params = [p for p in params if getattr(p, "parameter_type", None) == "workflow"]
url_param = next(p for p in workflow_params if getattr(p, "key", None) == "url_input")
assert getattr(url_param, "default_value", None) == "https://new-url.com"
workflow_params = [p for p in params if p.get("parameter_type") == "workflow"]
url_param = next(p for p in workflow_params if p.get("key") == "url_input")
assert url_param.get("default_value") == "https://new-url.com"
blocks = sent_def.workflow_definition.blocks
login_block = next(b for b in blocks if getattr(b, "block_type", None) == "login")
pkeys = getattr(login_block, "parameter_keys", [])
blocks = sent_def["workflow_definition"]["blocks"]
login_block = next(b for b in blocks if b.get("block_type") == "login")
pkeys = login_block.get("parameter_keys", [])
assert "my_creds" in pkeys
@ -1727,8 +1818,7 @@ async def test_workflow_update_always_replaces_wrong_workflow_credential_id_defa
) -> None:
"""Credential-id workflow params should keep the existing default credential on MCP updates."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -1785,16 +1875,15 @@ async def test_workflow_update_always_replaces_wrong_workflow_credential_id_defa
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
params = sent_def.workflow_definition.parameters
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
params = sent_def["workflow_definition"]["parameters"]
workflow_cred_params = [
p
for p in params
if getattr(p, "parameter_type", None) == "workflow"
and str(getattr(p, "workflow_parameter_type", None)) == "credential_id"
if p.get("parameter_type") == "workflow" and str(p.get("workflow_parameter_type")) == "credential_id"
]
assert len(workflow_cred_params) == 1
assert getattr(workflow_cred_params[0], "default_value", None) == "cred_CORRECT"
assert workflow_cred_params[0].get("default_value") == "cred_CORRECT"
@pytest.mark.asyncio
@ -1803,8 +1892,7 @@ async def test_workflow_update_injects_onepassword_key_into_login_block_paramete
) -> None:
"""Login blocks should keep external credential keys like onepassword after MCP edits."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -1855,18 +1943,18 @@ async def test_workflow_update_injects_onepassword_key_into_login_block_paramete
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
params = sent_def.workflow_definition.parameters
onepassword_params = [p for p in params if getattr(p, "parameter_type", None) == "onepassword"]
params = sent_def["workflow_definition"]["parameters"]
onepassword_params = [p for p in params if p.get("parameter_type") == "onepassword"]
assert len(onepassword_params) == 1
assert getattr(onepassword_params[0], "key", None) == "site_login_cred"
assert getattr(onepassword_params[0], "vault_id", None) == "vault_123"
assert getattr(onepassword_params[0], "item_id", None) == "item_456"
assert onepassword_params[0].get("key") == "site_login_cred"
assert onepassword_params[0].get("vault_id") == "vault_123"
assert onepassword_params[0].get("item_id") == "item_456"
blocks = sent_def.workflow_definition.blocks
login_block = next(b for b in blocks if getattr(b, "block_type", None) == "login")
pkeys = getattr(login_block, "parameter_keys", [])
blocks = sent_def["workflow_definition"]["blocks"]
login_block = next(b for b in blocks if b.get("block_type") == "login")
pkeys = login_block.get("parameter_keys", [])
assert "site_login_cred" in pkeys
@ -1876,8 +1964,7 @@ async def test_workflow_update_injects_bitwarden_login_key_into_login_block_para
) -> None:
"""Login blocks should keep bitwarden_login_credential keys after MCP edits."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -1936,16 +2023,16 @@ async def test_workflow_update_injects_bitwarden_login_key_into_login_block_para
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
params = sent_def.workflow_definition.parameters
bw_params = [p for p in params if getattr(p, "parameter_type", None) == "bitwarden_login_credential"]
params = sent_def["workflow_definition"]["parameters"]
bw_params = [p for p in params if p.get("parameter_type") == "bitwarden_login_credential"]
assert len(bw_params) == 1
assert getattr(bw_params[0], "key", None) == "portal_login"
assert bw_params[0].get("key") == "portal_login"
blocks = sent_def.workflow_definition.blocks
login_block = next(b for b in blocks if getattr(b, "block_type", None) == "login")
pkeys = getattr(login_block, "parameter_keys", [])
blocks = sent_def["workflow_definition"]["blocks"]
login_block = next(b for b in blocks if b.get("block_type") == "login")
pkeys = login_block.get("parameter_keys", [])
assert "portal_login" in pkeys
@ -1955,8 +2042,7 @@ async def test_workflow_update_injects_azure_vault_key_into_login_block_paramete
) -> None:
"""Login blocks should keep azure_vault_credential keys after MCP edits."""
fake_client = SimpleNamespace(update_workflow=AsyncMock(return_value=_fake_workflow_response()))
monkeypatch.setattr(workflow_tools, "get_skyvern", lambda: fake_client)
request_mock = _patch_skyvern_http(monkeypatch, response_payload=_fake_workflow_dict())
async def fake_get_workflow_by_id(workflow_id: str, version: int | None = None) -> dict[str, object]:
return {
@ -2014,14 +2100,14 @@ async def test_workflow_update_injects_azure_vault_key_into_login_block_paramete
assert result["ok"] is True
sent_def = fake_client.update_workflow.await_args.kwargs["json_definition"]
sent_def = request_mock.await_args.kwargs["json"]["json_definition"]
params = sent_def.workflow_definition.parameters
az_params = [p for p in params if getattr(p, "parameter_type", None) == "azure_vault_credential"]
params = sent_def["workflow_definition"]["parameters"]
az_params = [p for p in params if p.get("parameter_type") == "azure_vault_credential"]
assert len(az_params) == 1
assert getattr(az_params[0], "key", None) == "azure_creds"
assert az_params[0].get("key") == "azure_creds"
blocks = sent_def.workflow_definition.blocks
login_block = next(b for b in blocks if getattr(b, "block_type", None) == "login")
pkeys = getattr(login_block, "parameter_keys", [])
blocks = sent_def["workflow_definition"]["blocks"]
login_block = next(b for b in blocks if b.get("block_type") == "login")
pkeys = login_block.get("parameter_keys", [])
assert "azure_creds" in pkeys

View file

@ -0,0 +1,182 @@
"""Tests for the ``validation.decision`` / ``validation.reasoning_kind`` span
attributes (SKY-9174, Part D.3).
The two attributes give us a post-merge logfire signal for when a validation
block's LLM reasons literally and/or terminates — the failure mode Part D aims
to reduce. Query shape::
SELECT COUNT(*) FROM records
WHERE span_name = 'skyvern.agent.step_body'
AND attributes->>'validation.decision' = 'terminate'
AND attributes->>'validation.reasoning_kind' = 'literal'
AND start_timestamp > now() - INTERVAL '24 hours';
Pre-fix this count should be non-trivial; post-fix it should trend to zero on
the copilot-v2 cohort. These tests cover the attribute-writing logic directly
(the helper is pure, so we don't need to drive the full agent step).
"""
from __future__ import annotations
from datetime import UTC, datetime
import opentelemetry.trace as otel_trace
import pytest
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from skyvern.forge.agent import record_validation_span_attrs
from skyvern.forge.sdk.db.enums import TaskType
from skyvern.webeye.actions.actions import (
Action,
ActionType,
ClickAction,
CompleteAction,
TerminateAction,
)
from tests.unit.helpers import make_organization, make_task
STEP_SPAN_NAME = "skyvern.agent.validation_step_body_fixture"
def _validation_task() -> object:
now = datetime.now(UTC)
org = make_organization(now)
return make_task(now, org, task_type=TaskType.validation)
def _general_task() -> object:
now = datetime.now(UTC)
org = make_organization(now)
return make_task(now, org, task_type=TaskType.general)
def _run_with_span(task: object, actions: list[Action]) -> dict:
"""Start a span, invoke the helper inside it, end the span. Return the
span's attribute dict via the in-memory exporter."""
tracer = otel_trace.get_tracer("sky-9174-test")
with tracer.start_as_current_span(STEP_SPAN_NAME) as span:
record_validation_span_attrs(span, task, actions)
return {} # attrs read from the exporter by the caller
def _span_attrs(span_exporter: InMemorySpanExporter) -> dict:
span = next((s for s in span_exporter.get_finished_spans() if s.name == STEP_SPAN_NAME), None)
assert span is not None, "expected fixture span to be recorded"
return dict(span.attributes or {})
def _complete_action(reasoning: str) -> CompleteAction:
return CompleteAction(
reasoning=reasoning,
intention=reasoning,
action_type=ActionType.COMPLETE,
)
def _terminate_action(reasoning: str) -> TerminateAction:
return TerminateAction(
reasoning=reasoning,
intention=reasoning,
action_type=ActionType.TERMINATE,
)
def test_complete_with_semantic_reasoning_records_semantic(span_exporter: InMemorySpanExporter) -> None:
task = _validation_task()
actions = [_complete_action("The current page shows a thank-you confirmation.")]
_run_with_span(task, actions)
attrs = _span_attrs(span_exporter)
assert attrs.get("validation.decision") == "complete"
assert attrs.get("validation.reasoning_kind") == "semantic"
def test_terminate_with_literal_reasoning_records_literal(span_exporter: InMemorySpanExporter) -> None:
"""The regression we care about most: LLM terminated because an exact
string wasn't found. This is the combination (terminate, literal) that
Part D aims to drive toward zero."""
task = _validation_task()
actions = [
_terminate_action(
"The page does not contain the exact complete-criterion text 'Your message has been sent'. TERMINATE."
)
]
_run_with_span(task, actions)
attrs = _span_attrs(span_exporter)
assert attrs.get("validation.decision") == "terminate"
assert attrs.get("validation.reasoning_kind") == "literal"
def test_terminate_with_semantic_reasoning_records_semantic(span_exporter: InMemorySpanExporter) -> None:
task = _validation_task()
actions = [_terminate_action("An error banner surfaced at the top of the page saying the submission failed.")]
_run_with_span(task, actions)
attrs = _span_attrs(span_exporter)
assert attrs.get("validation.decision") == "terminate"
assert attrs.get("validation.reasoning_kind") == "semantic"
def test_complete_with_literal_reasoning_records_literal(span_exporter: InMemorySpanExporter) -> None:
"""Symmetric — a literal COMPLETE is harmless but we still tag it, because
the post-merge dashboard cares about the distribution across both axes,
not just the terminate one."""
task = _validation_task()
actions = [_complete_action("The page contains the exact phrase 'Your message has been sent'.")]
_run_with_span(task, actions)
attrs = _span_attrs(span_exporter)
assert attrs.get("validation.decision") == "complete"
assert attrs.get("validation.reasoning_kind") == "literal"
def test_non_validation_task_does_not_tag_span(span_exporter: InMemorySpanExporter) -> None:
"""Guard against accidental tagging of non-validation step spans — those
span attributes are reserved for TaskType.validation."""
task = _general_task()
actions = [_complete_action("The current page shows a thank-you confirmation.")]
_run_with_span(task, actions)
attrs = _span_attrs(span_exporter)
assert "validation.decision" not in attrs
assert "validation.reasoning_kind" not in attrs
def test_non_decisive_action_does_not_tag_span(span_exporter: InMemorySpanExporter) -> None:
"""Validation tasks whose first action isn't a Complete/Terminate (unusual
but possible during partial parsing) should not produce tagged attrs."""
task = _validation_task()
# A ClickAction stands in for any non-DecisiveAction leading-first.
non_decisive = ClickAction(action_type=ActionType.CLICK, element_id="AAAB", reasoning="click")
_run_with_span(task, [non_decisive])
attrs = _span_attrs(span_exporter)
assert "validation.decision" not in attrs
assert "validation.reasoning_kind" not in attrs
def test_empty_actions_list_does_not_tag_span(span_exporter: InMemorySpanExporter) -> None:
task = _validation_task()
_run_with_span(task, [])
attrs = _span_attrs(span_exporter)
assert "validation.decision" not in attrs
assert "validation.reasoning_kind" not in attrs
def test_missing_reasoning_defaults_to_semantic(span_exporter: InMemorySpanExporter) -> None:
"""Empty/None reasoning shouldn't crash — absence of literal signals means
semantic by the helper's rule."""
task = _validation_task()
actions = [_complete_action("")]
_run_with_span(task, actions)
attrs = _span_attrs(span_exporter)
assert attrs.get("validation.decision") == "complete"
assert attrs.get("validation.reasoning_kind") == "semantic"
@pytest.mark.parametrize(
"signal",
["exact", "literal", "verbatim", "word-for-word", "word for word"],
)
def test_every_literal_signal_flags_reasoning(signal: str, span_exporter: InMemorySpanExporter) -> None:
"""Each configured signal, on its own, must classify reasoning as literal."""
task = _validation_task()
actions = [_terminate_action(f"The criterion does not appear {signal} on the page.")]
_run_with_span(task, actions)
attrs = _span_attrs(span_exporter)
assert attrs.get("validation.reasoning_kind") == "literal", signal

View file

@ -0,0 +1,79 @@
"""Tests for the ``verification.reasoning_kind`` span attribute (SKY-9174, Part E.2).
Part A already wrote ``verification.status`` and ``verification.template`` on
``skyvern.agent.complete_verify`` spans. Part E adds one more:
``verification.reasoning_kind`` (``literal`` | ``semantic``) derived from the
verifier LLM's ``thoughts`` text via the shared ``_classify_reasoning_kind``
heuristic. Post-fix logfire query::
SELECT COUNT(*) FROM records
WHERE span_name = 'skyvern.agent.complete_verify'
AND attributes->>'verification.status' != 'complete'
AND attributes->>'verification.reasoning_kind' = 'literal'
AND start_timestamp > now() - INTERVAL '24 hours';
Pre-fix, a single reproducing run produces double-digit rows with
``reasoning_kind = 'literal'``. Post-fix, that count should trend to zero.
"""
from __future__ import annotations
import opentelemetry.trace as otel_trace
import pytest
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from skyvern.forge.agent import record_verification_span_attrs
SPAN_NAME = "skyvern.agent.verification_fixture"
def _run_with_span(thoughts: str | None) -> None:
tracer = otel_trace.get_tracer("sky-9174-part-e-test")
with tracer.start_as_current_span(SPAN_NAME) as span:
record_verification_span_attrs(span, thoughts)
def _span_attrs(span_exporter: InMemorySpanExporter) -> dict:
span = next((s for s in span_exporter.get_finished_spans() if s.name == SPAN_NAME), None)
assert span is not None, "expected fixture span to be recorded"
return dict(span.attributes or {})
def test_literal_reasoning_records_literal(span_exporter: InMemorySpanExporter) -> None:
"""The regression we care about most: verifier insisted on finding the
criterion's exact wording on the page and returned ``continue``. This is
the (verifier, literal) combination Part E aims to drive toward zero."""
_run_with_span("The page does not contain the exact phrase 'Your message has been sent'. user_goal_achieved=false.")
attrs = _span_attrs(span_exporter)
assert attrs.get("verification.reasoning_kind") == "literal"
def test_semantic_reasoning_records_semantic(span_exporter: InMemorySpanExporter) -> None:
_run_with_span("The page renders a thank-you confirmation, satisfying the goal's intent.")
attrs = _span_attrs(span_exporter)
assert attrs.get("verification.reasoning_kind") == "semantic"
def test_empty_reasoning_defaults_to_semantic(span_exporter: InMemorySpanExporter) -> None:
_run_with_span("")
attrs = _span_attrs(span_exporter)
assert attrs.get("verification.reasoning_kind") == "semantic"
def test_none_reasoning_defaults_to_semantic(span_exporter: InMemorySpanExporter) -> None:
_run_with_span(None)
attrs = _span_attrs(span_exporter)
assert attrs.get("verification.reasoning_kind") == "semantic"
@pytest.mark.parametrize(
"signal",
["exact", "literal", "verbatim", "word-for-word", "word for word"],
)
def test_every_literal_signal_flags_reasoning(signal: str, span_exporter: InMemorySpanExporter) -> None:
"""Shared classifier: each signal used by ``record_validation_span_attrs``
also flags verifier reasoning. Guards against future drift between the two
callers."""
_run_with_span(f"The goal does not appear {signal} on the page.")
attrs = _span_attrs(span_exporter)
assert attrs.get("verification.reasoning_kind") == "literal", signal