🔄 synced local 'tests/unit/' with remote 'tests/unit/'

This commit is contained in:
Shuchang Zheng 2026-04-25 19:31:53 +00:00
parent 84e43687b5
commit c2a211be4e
8 changed files with 1942 additions and 3 deletions

View file

@ -0,0 +1,263 @@
"""Regression tests: script-path ``ai_extract`` must honor the current block's
``ignore_workflow_system_prompt`` opt-out, matching agent-path behavior.
Without this, a cached script-path extraction still injects the workflow prompt
even when the block has opted out the two execution modes diverge for the
same block (SKY-9147).
"""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from typing import Any
from unittest.mock import MagicMock
import pytest
from skyvern.core.script_generations import real_skyvern_page_ai as module
from skyvern.core.script_generations.skyvern_page_ai import SYSTEM_PROMPT_UNSET
from skyvern.forge.sdk.workflow.models.block import ExtractionBlock
from skyvern.forge.sdk.workflow.models.parameter import OutputParameter, ParameterType
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowDefinition
def _make_output_parameter() -> OutputParameter:
now = datetime.now(timezone.utc)
return OutputParameter(
parameter_type=ParameterType.OUTPUT,
key="extract1_output",
description="test output",
output_parameter_id="op_extract1",
workflow_id="w_test",
created_at=now,
modified_at=now,
)
def _make_workflow_with_block(*, ignore_workflow_system_prompt: bool, workflow_system_prompt: str | None) -> Workflow:
block = ExtractionBlock(
label="extract1",
output_parameter=_make_output_parameter(),
data_extraction_goal="Extract things",
ignore_workflow_system_prompt=ignore_workflow_system_prompt,
)
now = datetime.now(timezone.utc)
return Workflow(
workflow_id="w_test",
organization_id="o_test",
title="test",
workflow_permanent_id="wpid_test",
version=1,
is_saved_task=False,
workflow_definition=WorkflowDefinition(
parameters=[],
blocks=[block],
workflow_system_prompt=workflow_system_prompt,
),
created_at=now,
modified_at=now,
)
def _run_ai_extract(
monkeypatch: pytest.MonkeyPatch,
*,
workflow: Workflow | None,
current_label: str | None,
extra_kwargs: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Drive ``RealSkyvernPageAi.ai_extract`` with mocks that capture both the
cache-key ``system_prompt`` and the LLM-handler ``system_prompt``. Returns
a dict with keys ``cache_system_prompt`` and ``llm_system_prompt``."""
captured: dict[str, Any] = {}
def fake_compute_cache_key(**kwargs: Any) -> str:
captured["cache_system_prompt"] = kwargs.get("workflow_system_prompt")
return "fake-cache-key"
def fake_lookup(*_args: Any, **_kwargs: Any) -> None:
# Cache miss — let the flow fall through to the handler.
return None
def fake_load_prompt(**_kwargs: Any) -> tuple[str, dict[str, Any]]:
return "rendered-prompt", {
"extracted_text": None,
"extracted_information_schema": None,
}
async def fake_handler(*, system_prompt: Any = None, **_ignored: Any) -> dict[str, Any]:
captured["llm_system_prompt"] = system_prompt
return {}
# Build a minimal fake ``WorkflowRunContext``. The single-block workflows
# this helper builds go through ``_apply_workflow_system_prompt`` in the
# real script-path dispatch, so simulate that by pre-recording the block's
# effective value — ``None`` when the block opts out, the rendered prompt
# otherwise. ``ai_extract`` reads this recorded value verbatim, which is
# exactly the contract Andrew's fix enforces.
if workflow is not None:
block = workflow.workflow_definition.blocks[0] if workflow.workflow_definition.blocks else None
workflow_prompt = workflow.workflow_definition.workflow_system_prompt
recorded_value: str | None = (
None if (block is not None and getattr(block, "ignore_workflow_system_prompt", False)) else workflow_prompt
)
has_block_record = block is not None and current_label == block.label
ctx = MagicMock()
ctx.workflow = workflow
ctx.resolve_effective_workflow_system_prompt = MagicMock(return_value=workflow_prompt)
ctx.get_block_workflow_system_prompt = MagicMock(
return_value=(has_block_record, recorded_value),
)
def get_run_ctx(_run_id: str) -> Any:
return ctx
monkeypatch.setattr(
module.app.WORKFLOW_CONTEXT_MANAGER,
"get_workflow_run_context",
get_run_ctx,
)
skyvern_ctx = MagicMock()
skyvern_ctx.workflow_run_id = "wr_test"
skyvern_ctx.tz_info = None
skyvern_ctx.organization_id = None
skyvern_ctx.task_id = None
skyvern_ctx.step_id = None
skyvern_ctx.script_mode = False
monkeypatch.setattr(module.skyvern_context, "current", lambda: skyvern_ctx)
monkeypatch.setattr(module, "load_prompt_with_elements_tracked", fake_load_prompt)
monkeypatch.setattr(module.extraction_cache, "compute_cache_key", fake_compute_cache_key)
monkeypatch.setattr(module.extraction_cache, "lookup", fake_lookup)
monkeypatch.setattr(module.app, "EXTRACTION_LLM_API_HANDLER", fake_handler)
scraped_page = MagicMock()
scraped_page.url = "https://example.test"
scraped_page.extracted_text = "page text"
scraped_page.screenshots = []
scraped_page.build_element_tree = MagicMock(return_value="<a>link</a>")
scraped_page.support_economy_elements_tree = MagicMock(return_value=False)
scraped_page.last_used_element_tree_html = None
page = module.RealSkyvernPageAi.__new__(module.RealSkyvernPageAi)
page.scraped_page = scraped_page
page.current_label = current_label
async def fake_refresh(*_args: Any, **_kwargs: Any) -> None:
return None
monkeypatch.setattr(page, "_refresh_scraped_page", fake_refresh)
asyncio.run(
page.ai_extract(
prompt="Extract things",
schema={"type": "object"},
**(extra_kwargs or {}),
)
)
return captured
# ---------------------------------------------------------------------------
# The bug Andrew flagged: block opts out, but script-path still injects prompt.
# ---------------------------------------------------------------------------
def test_opted_out_block_sends_no_system_prompt_to_llm(monkeypatch: pytest.MonkeyPatch) -> None:
workflow = _make_workflow_with_block(
ignore_workflow_system_prompt=True,
workflow_system_prompt="WORKFLOW RULES.",
)
captured = _run_ai_extract(monkeypatch, workflow=workflow, current_label="extract1")
assert captured["llm_system_prompt"] is None
def test_opted_out_block_cache_key_omits_system_prompt(monkeypatch: pytest.MonkeyPatch) -> None:
"""Cache-key parity with agent path: an opted-out block's extractions must
hash the same way whether the workflow has a prompt set or not. Otherwise a
user toggling ``workflow_system_prompt`` at the workflow level silently
invalidates caches for blocks that explicitly opted out."""
workflow = _make_workflow_with_block(
ignore_workflow_system_prompt=True,
workflow_system_prompt="WORKFLOW RULES.",
)
captured = _run_ai_extract(monkeypatch, workflow=workflow, current_label="extract1")
assert captured["cache_system_prompt"] is None
# ---------------------------------------------------------------------------
# Non-opted-out block still inherits the workflow prompt (no regression of the
# base feature).
# ---------------------------------------------------------------------------
def test_non_opted_out_block_receives_workflow_prompt(monkeypatch: pytest.MonkeyPatch) -> None:
workflow = _make_workflow_with_block(
ignore_workflow_system_prompt=False,
workflow_system_prompt="WORKFLOW RULES.",
)
captured = _run_ai_extract(monkeypatch, workflow=workflow, current_label="extract1")
assert captured["llm_system_prompt"] == "WORKFLOW RULES."
assert captured["cache_system_prompt"] == "WORKFLOW RULES."
# ---------------------------------------------------------------------------
# Explicit parameter overrides: caller can pass ``system_prompt`` directly,
# bypassing the block-flag lookup. Includes the "explicit None" escape hatch.
# ---------------------------------------------------------------------------
def test_explicit_system_prompt_overrides_workflow(monkeypatch: pytest.MonkeyPatch) -> None:
workflow = _make_workflow_with_block(
ignore_workflow_system_prompt=False,
workflow_system_prompt="WORKFLOW RULES.",
)
captured = _run_ai_extract(
monkeypatch,
workflow=workflow,
current_label="extract1",
extra_kwargs={"system_prompt": "EXPLICIT RULES."},
)
assert captured["llm_system_prompt"] == "EXPLICIT RULES."
assert captured["cache_system_prompt"] == "EXPLICIT RULES."
def test_explicit_none_opts_out_even_when_workflow_has_prompt(monkeypatch: pytest.MonkeyPatch) -> None:
"""Explicit ``None`` is the escape hatch: caller says "send no system prompt"
regardless of the workflow or block state."""
workflow = _make_workflow_with_block(
ignore_workflow_system_prompt=False,
workflow_system_prompt="WORKFLOW RULES.",
)
captured = _run_ai_extract(
monkeypatch,
workflow=workflow,
current_label="extract1",
extra_kwargs={"system_prompt": None},
)
assert captured["llm_system_prompt"] is None
assert captured["cache_system_prompt"] is None
def test_sentinel_default_is_not_leaked_to_handler(monkeypatch: pytest.MonkeyPatch) -> None:
"""The sentinel must never reach the LLM handler — if the fallback path
silently forwarded it, the handler would see a non-None, non-string object
and fail or pass a bogus ``system_prompt`` to the model."""
workflow = _make_workflow_with_block(
ignore_workflow_system_prompt=False,
workflow_system_prompt=None, # no prompt at all
)
captured = _run_ai_extract(monkeypatch, workflow=workflow, current_label="extract1")
assert captured["llm_system_prompt"] is None
assert captured["llm_system_prompt"] is not SYSTEM_PROMPT_UNSET
assert captured["cache_system_prompt"] is None

View file

@ -0,0 +1,489 @@
"""Tests for workflow-level workflow_system_prompt inheritance into blocks at execution time."""
from datetime import datetime, timezone
from unittest.mock import MagicMock
from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
from skyvern.forge.sdk.workflow.models.block import (
FileParserBlock,
PDFParserBlock,
TaskBlock,
TaskV2Block,
TextPromptBlock,
)
from skyvern.forge.sdk.workflow.models.parameter import OutputParameter, ParameterType
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowDefinition
from skyvern.schemas.workflows import FileType
def _make_output_parameter() -> OutputParameter:
now = datetime.now(timezone.utc)
return OutputParameter(
parameter_type=ParameterType.OUTPUT,
key="task1_output",
description="test output",
output_parameter_id="op_task1",
workflow_id="w_test",
created_at=now,
modified_at=now,
)
def _make_task_block(workflow_system_prompt: str | None = None) -> TaskBlock:
return TaskBlock(
label="task1",
output_parameter=_make_output_parameter(),
title="task title",
workflow_system_prompt=workflow_system_prompt,
)
def _make_task_v2_block(workflow_system_prompt: str | None = None) -> TaskV2Block:
return TaskV2Block(
label="task1",
output_parameter=_make_output_parameter(),
prompt="user goal",
workflow_system_prompt=workflow_system_prompt,
)
def _make_workflow(workflow_system_prompt: str | None) -> Workflow:
workflow_definition = WorkflowDefinition(
parameters=[],
blocks=[],
workflow_system_prompt=workflow_system_prompt,
)
now = datetime.now(timezone.utc)
return Workflow(
workflow_id="w_test",
organization_id="o_test",
title="test",
workflow_permanent_id="wpid_test",
version=1,
is_saved_task=False,
workflow_definition=workflow_definition,
created_at=now,
modified_at=now,
)
def _make_workflow_run_context(
workflow_system_prompt: str | None,
inherited_workflow_system_prompt: str | None = None,
) -> WorkflowRunContext:
ctx = WorkflowRunContext(
workflow_title="test",
workflow_id="w_test",
workflow_permanent_id="wpid_test",
workflow_run_id="wr_test",
aws_client=MagicMock(),
workflow=_make_workflow(workflow_system_prompt),
inherited_workflow_system_prompt=inherited_workflow_system_prompt,
)
return ctx
class TestTaskBlockSystemPromptInheritance:
def test_block_inherits_workflow_prompt_when_none(self) -> None:
block = _make_task_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context("Never guess. If unsure, say UNKNOWN.")
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt == "Never guess. If unsure, say UNKNOWN."
def test_both_none_stays_none(self) -> None:
block = _make_task_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context(workflow_system_prompt=None)
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt is None
def test_jinja_substitution_resolves_workflow_parameters(self) -> None:
"""Global system prompt should support Jinja substitution against workflow parameters."""
block = _make_task_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context("Respond in the style of {{ style }}.")
ctx.values["style"] = "a formal English butler"
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt == "Respond in the style of a formal English butler."
class TestTaskV2BlockSystemPromptInheritance:
def test_block_inherits_workflow_prompt_when_none(self) -> None:
block = _make_task_v2_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context("Never guess.")
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt == "Never guess."
def test_both_none_stays_none(self) -> None:
block = _make_task_v2_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context(workflow_system_prompt=None)
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt is None
def _make_text_prompt_block(workflow_system_prompt: str | None = None) -> TextPromptBlock:
return TextPromptBlock(
label="prompt1",
output_parameter=_make_output_parameter(),
prompt="what is 2 + 2?",
workflow_system_prompt=workflow_system_prompt,
)
def _make_file_parser_block(workflow_system_prompt: str | None = None) -> FileParserBlock:
return FileParserBlock(
label="fileparser1",
output_parameter=_make_output_parameter(),
file_url="https://example.com/file.csv",
file_type=FileType.CSV,
workflow_system_prompt=workflow_system_prompt,
)
def _make_pdf_parser_block(workflow_system_prompt: str | None = None) -> PDFParserBlock:
return PDFParserBlock(
label="pdfparser1",
output_parameter=_make_output_parameter(),
file_url="https://example.com/file.pdf",
workflow_system_prompt=workflow_system_prompt,
)
class TestTextPromptBlockSystemPromptInheritance:
def test_block_inherits_workflow_prompt_when_none(self) -> None:
block = _make_text_prompt_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context("Answer only in Spanish.")
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt == "Answer only in Spanish."
def test_both_none_stays_none(self) -> None:
block = _make_text_prompt_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context(workflow_system_prompt=None)
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt is None
def test_jinja_substitution_resolves_workflow_parameters(self) -> None:
block = _make_text_prompt_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context("Respond in the style of {{ style }}.")
ctx.values["style"] = "a pirate"
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt == "Respond in the style of a pirate."
class TestFileParserBlockSystemPromptInheritance:
def test_block_inherits_workflow_prompt_when_none(self) -> None:
block = _make_file_parser_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context("Only respond with structured data.")
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt == "Only respond with structured data."
def test_both_none_stays_none(self) -> None:
block = _make_file_parser_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context(workflow_system_prompt=None)
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt is None
class TestPDFParserBlockSystemPromptInheritance:
def test_block_inherits_workflow_prompt_when_none(self) -> None:
block = _make_pdf_parser_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context("Summarize in English.")
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt == "Summarize in English."
def test_both_none_stays_none(self) -> None:
block = _make_pdf_parser_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context(workflow_system_prompt=None)
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt is None
class TestChildWorkflowInheritsParentWorkflowSystemPrompt:
"""SKY-9147: parent workflow_trigger workflow_system_prompt must flow into child blocks."""
def test_child_inherits_parent_when_child_unset(self) -> None:
block = _make_task_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context(
workflow_system_prompt=None,
inherited_workflow_system_prompt="Omit the word 'not'.",
)
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt == "Omit the word 'not'."
def test_child_concatenates_parent_and_own_prompt(self) -> None:
block = _make_task_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context(
workflow_system_prompt="Respond in French.",
inherited_workflow_system_prompt="Omit the word 'not'.",
)
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt == "Omit the word 'not'.\n\nRespond in French."
def test_ignore_flag_drops_both_inherited_and_own(self) -> None:
block = _make_task_block(workflow_system_prompt=None)
block.ignore_workflow_system_prompt = True
ctx = _make_workflow_run_context(
workflow_system_prompt="Respond in French.",
inherited_workflow_system_prompt="Omit the word 'not'.",
)
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt is None
def test_inherited_only_with_no_workflow_attached(self) -> None:
"""Child context may carry inherited rules even when workflow is not hydrated."""
block = _make_text_prompt_block(workflow_system_prompt=None)
ctx = WorkflowRunContext(
workflow_title="child",
workflow_id="w_child",
workflow_permanent_id="wpid_child",
workflow_run_id="wr_child",
aws_client=MagicMock(),
workflow=None,
inherited_workflow_system_prompt="Be concise.",
)
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt == "Be concise."
class TestIgnoreWorkflowSystemPromptPerBlock:
"""SKY-9147: per-block opt-out short-circuits inheritance across every LLM-consuming block."""
def test_task_block_opt_out_skips_workflow_prompt(self) -> None:
block = _make_task_block(workflow_system_prompt=None)
block.ignore_workflow_system_prompt = True
ctx = _make_workflow_run_context("Be concise.")
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt is None
def test_task_v2_block_opt_out_skips_workflow_prompt(self) -> None:
block = _make_task_v2_block(workflow_system_prompt=None)
block.ignore_workflow_system_prompt = True
ctx = _make_workflow_run_context("Be concise.")
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt is None
def test_text_prompt_block_opt_out_skips_workflow_prompt(self) -> None:
block = _make_text_prompt_block(workflow_system_prompt=None)
block.ignore_workflow_system_prompt = True
ctx = _make_workflow_run_context("Answer only in Spanish.")
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt is None
def test_file_parser_block_opt_out_skips_workflow_prompt(self) -> None:
block = _make_file_parser_block(workflow_system_prompt=None)
block.ignore_workflow_system_prompt = True
ctx = _make_workflow_run_context("Only respond with structured data.")
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt is None
def test_pdf_parser_block_opt_out_skips_workflow_prompt(self) -> None:
block = _make_pdf_parser_block(workflow_system_prompt=None)
block.ignore_workflow_system_prompt = True
ctx = _make_workflow_run_context("Summarize in English.")
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt is None
def test_opt_out_default_false_preserves_inheritance(self) -> None:
"""Regression guard: omitting the field leaves default inheritance behavior intact."""
block = _make_task_block(workflow_system_prompt=None)
assert block.ignore_workflow_system_prompt is False
ctx = _make_workflow_run_context("Be concise.")
block.format_potential_template_parameters(ctx)
assert block.workflow_system_prompt == "Be concise."
class TestWorkflowTriggerPersistsOptOutOnChildRun:
"""SKY-9147: the trigger-block flag is persisted on the spawned child's
workflow_run row so both sync and async (Temporal-dispatched) child
executions honor it uniformly when they read their own row.
"""
def test_workflow_run_has_skip_inherited_field(self) -> None:
"""WorkflowRun Pydantic model carries the persisted flag."""
now = datetime.now(timezone.utc)
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun, WorkflowRunStatus
run = WorkflowRun(
workflow_run_id="wr_child",
workflow_id="w_child",
workflow_permanent_id="wpid_child",
organization_id="o_test",
status=WorkflowRunStatus.created,
created_at=now,
modified_at=now,
ignore_inherited_workflow_system_prompt=True,
)
assert run.ignore_inherited_workflow_system_prompt is True
def test_workflow_run_defaults_false(self) -> None:
"""Existing code paths that omit the field continue to inherit."""
now = datetime.now(timezone.utc)
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun, WorkflowRunStatus
run = WorkflowRun(
workflow_run_id="wr_child",
workflow_id="w_child",
workflow_permanent_id="wpid_child",
organization_id="o_test",
status=WorkflowRunStatus.created,
created_at=now,
modified_at=now,
)
assert run.ignore_inherited_workflow_system_prompt is False
class TestWorkflowDefinitionYAMLRoundTrip:
def test_workflow_system_prompt_survives_yaml_roundtrip(self) -> None:
"""Regression guard: workflow_system_prompt must roundtrip through WorkflowCreateYAMLRequest."""
from skyvern.schemas.workflows import WorkflowCreateYAMLRequest
payload = {
"title": "test",
"workflow_definition": {
"version": 1,
"parameters": [],
"blocks": [],
"workflow_system_prompt": "Never guess.",
},
}
request = WorkflowCreateYAMLRequest.model_validate(payload)
assert request.workflow_definition.workflow_system_prompt == "Never guess."
class TestBlockWorkflowSystemPromptNotSerialized:
"""The runtime cache must not leak through ``model_dump`` or JSON
serialization it's a per-run transient, not part of the block's
authored shape."""
def test_unset_field_absent_from_model_dump(self) -> None:
block = _make_task_block(workflow_system_prompt=None)
assert "workflow_system_prompt" not in block.model_dump()
def test_resolved_runtime_value_absent_from_model_dump(self) -> None:
block = _make_task_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context("Never guess.")
block.format_potential_template_parameters(ctx)
# Invariant confirmed at runtime: inheritance populated the cache…
assert block.workflow_system_prompt == "Never guess."
# …but it still doesn't escape via serialization.
dumped = block.model_dump()
assert "workflow_system_prompt" not in dumped
assert "Never guess." not in block.model_dump_json()
class TestBlockWorkflowSystemPromptRecordedOnContext:
"""``Block._apply_workflow_system_prompt`` records its decision on the
``WorkflowRunContext`` so both the agent path (which uses the block's
own ``workflow_system_prompt`` field) and the script path (which reads
from the context cache via ``ai_extract``) see the same value single
source of truth for the opt-out (SKY-9147)."""
def test_non_opted_out_block_records_resolved_value(self) -> None:
block = _make_task_block(workflow_system_prompt=None)
ctx = _make_workflow_run_context("Answer only in Spanish.")
block.format_potential_template_parameters(ctx)
recorded, value = ctx.get_block_workflow_system_prompt(block.label)
assert recorded is True
assert value == "Answer only in Spanish."
def test_opted_out_block_records_none(self) -> None:
block = TaskBlock(
label="task1",
output_parameter=_make_output_parameter(),
title="task title",
ignore_workflow_system_prompt=True,
)
ctx = _make_workflow_run_context("WORKFLOW RULES.")
block.format_potential_template_parameters(ctx)
# Recorded explicitly so ``ai_extract`` reads ``None`` (opt-out) rather
# than falling through to ``resolve_effective_workflow_system_prompt``.
recorded, value = ctx.get_block_workflow_system_prompt(block.label)
assert recorded is True
assert value is None
def test_unknown_label_returns_not_recorded(self) -> None:
ctx = _make_workflow_run_context("whatever")
recorded, value = ctx.get_block_workflow_system_prompt("never-seen")
assert recorded is False
assert value is None
class TestResolveEffectiveWorkflowSystemPromptRejectsNonString:
"""``resolve_effective_workflow_system_prompt`` must treat a non-string
``workflow_system_prompt`` as absent rather than passing it to Jinja.
Regression: a malformed workflow definition (or a test fixture whose
attribute access returns a ``MagicMock``) previously flowed a non-str
into ``env.from_string`` and exploded with ``Can't compile non template
nodes`` from deep inside the template compiler."""
def test_magicmock_workflow_definition_yields_none(self) -> None:
ctx = _make_workflow_run_context(workflow_system_prompt=None)
mock_workflow = MagicMock()
# Attribute access on a MagicMock returns another truthy MagicMock,
# mirroring what ``get_workflow_by_permanent_id`` returns in tests
# that don't set up a real Workflow.
ctx.set_workflow(mock_workflow)
assert ctx.resolve_effective_workflow_system_prompt() is None
def test_non_string_inherited_prompt_yields_none(self) -> None:
ctx = WorkflowRunContext(
workflow_title="test",
workflow_id="w_test",
workflow_permanent_id="wpid_test",
workflow_run_id="wr_test",
aws_client=MagicMock(),
inherited_workflow_system_prompt=MagicMock(), # non-str
)
assert ctx.resolve_effective_workflow_system_prompt() is None

View file

@ -0,0 +1,270 @@
"""Unit tests for ``WorkflowService._collect_inherited_workflow_system_prompt``.
The helper walks the parent workflow-run chain via
``app.DATABASE.workflow_runs.get_workflow_run`` + ``WorkflowService.get_workflow``
and joins each ancestor's raw ``workflow_system_prompt`` outermost-first. These
tests pin the chain-break-on-opt-out, cycle detection, outermost-first ordering,
and depth-cap semantics described in the helper's docstring (SKY-9147).
"""
from __future__ import annotations
from datetime import datetime, timezone
from unittest.mock import AsyncMock
import pytest
from skyvern.forge import app
from skyvern.forge.sdk.workflow.models.block import WorkflowTriggerBlock
from skyvern.forge.sdk.workflow.models.workflow import (
Workflow,
WorkflowDefinition,
WorkflowRun,
WorkflowRunStatus,
)
from skyvern.forge.sdk.workflow.service import WorkflowService
def _make_workflow(workflow_id: str, workflow_system_prompt: str | None) -> Workflow:
now = datetime.now(timezone.utc)
return Workflow(
workflow_id=workflow_id,
organization_id="o_test",
title=f"workflow {workflow_id}",
workflow_permanent_id=f"wpid_{workflow_id}",
version=1,
is_saved_task=False,
workflow_definition=WorkflowDefinition(
parameters=[],
blocks=[],
workflow_system_prompt=workflow_system_prompt,
),
created_at=now,
modified_at=now,
)
def _make_run(
*,
run_id: str,
workflow_id: str,
parent_run_id: str | None,
skip_inherited: bool = False,
) -> WorkflowRun:
now = datetime.now(timezone.utc)
return WorkflowRun(
workflow_run_id=run_id,
workflow_id=workflow_id,
workflow_permanent_id=f"wpid_{workflow_id}",
organization_id="o_test",
status=WorkflowRunStatus.running,
parent_workflow_run_id=parent_run_id,
ignore_inherited_workflow_system_prompt=skip_inherited,
created_at=now,
modified_at=now,
)
def _install_chain(
monkeypatch: pytest.MonkeyPatch,
runs: dict[str, WorkflowRun],
workflows: dict[str, Workflow],
) -> tuple[AsyncMock, AsyncMock]:
"""Wire mocked ``get_workflow_run`` + ``get_workflow`` against the provided
dicts. Returns both mocks so tests can assert call counts."""
get_run = AsyncMock(side_effect=lambda run_id: runs.get(run_id))
monkeypatch.setattr(app.DATABASE.workflow_runs, "get_workflow_run", get_run)
get_workflow = AsyncMock(side_effect=lambda workflow_id: workflows.get(workflow_id))
monkeypatch.setattr(WorkflowService, "get_workflow", get_workflow)
return get_run, get_workflow
@pytest.mark.asyncio
async def test_returns_none_when_parent_run_id_is_none(monkeypatch: pytest.MonkeyPatch) -> None:
_install_chain(monkeypatch, runs={}, workflows={})
result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id=None)
assert result is None
@pytest.mark.asyncio
async def test_returns_none_when_no_ancestor_has_prompt(monkeypatch: pytest.MonkeyPatch) -> None:
"""Chain exists but every ancestor's ``workflow_system_prompt`` is None/empty."""
runs = {
"wr_parent": _make_run(run_id="wr_parent", workflow_id="w_parent", parent_run_id="wr_grandparent"),
"wr_grandparent": _make_run(run_id="wr_grandparent", workflow_id="w_grandparent", parent_run_id=None),
}
workflows = {
"w_parent": _make_workflow("w_parent", workflow_system_prompt=None),
"w_grandparent": _make_workflow("w_grandparent", workflow_system_prompt=""),
}
_install_chain(monkeypatch, runs, workflows)
result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent")
assert result is None
@pytest.mark.asyncio
async def test_single_ancestor_with_prompt(monkeypatch: pytest.MonkeyPatch) -> None:
runs = {"wr_parent": _make_run(run_id="wr_parent", workflow_id="w_parent", parent_run_id=None)}
workflows = {"w_parent": _make_workflow("w_parent", workflow_system_prompt="Respond in English.")}
_install_chain(monkeypatch, runs, workflows)
result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent")
assert result == "Respond in English."
@pytest.mark.asyncio
async def test_outermost_first_ordering(monkeypatch: pytest.MonkeyPatch) -> None:
"""Chain: great-grandparent -> grandparent -> parent -> (child). The helper
walks up bottom-up starting from the parent; the returned string must join
outermost-first (great-grandparent before grandparent before parent)."""
runs = {
"wr_parent": _make_run(run_id="wr_parent", workflow_id="w_parent", parent_run_id="wr_grand"),
"wr_grand": _make_run(run_id="wr_grand", workflow_id="w_grand", parent_run_id="wr_great"),
"wr_great": _make_run(run_id="wr_great", workflow_id="w_great", parent_run_id=None),
}
workflows = {
"w_parent": _make_workflow("w_parent", workflow_system_prompt="PARENT rule."),
"w_grand": _make_workflow("w_grand", workflow_system_prompt="GRAND rule."),
"w_great": _make_workflow("w_great", workflow_system_prompt="GREAT rule."),
}
_install_chain(monkeypatch, runs, workflows)
result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent")
assert result == "GREAT rule.\n\nGRAND rule.\n\nPARENT rule."
@pytest.mark.asyncio
async def test_chain_break_includes_opted_out_ancestors_own_prompt(monkeypatch: pytest.MonkeyPatch) -> None:
"""When an ancestor has ``ignore_inherited_workflow_system_prompt=True``, its own
prompt is still collected but traversal stops the grandparent's rules must
NOT appear in the result (SKY-9147 design: an opted-out workflow rejects its
parents' rules but still propagates its own to descendants)."""
runs = {
"wr_parent": _make_run(
run_id="wr_parent",
workflow_id="w_parent",
parent_run_id="wr_grand",
skip_inherited=True,
),
"wr_grand": _make_run(run_id="wr_grand", workflow_id="w_grand", parent_run_id=None),
}
workflows = {
"w_parent": _make_workflow("w_parent", workflow_system_prompt="PARENT rule."),
"w_grand": _make_workflow("w_grand", workflow_system_prompt="GRAND rule."),
}
_install_chain(monkeypatch, runs, workflows)
result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent")
assert result == "PARENT rule."
@pytest.mark.asyncio
async def test_chain_break_opted_out_ancestor_with_no_own_prompt(monkeypatch: pytest.MonkeyPatch) -> None:
"""When the opted-out ancestor has no prompt of its own and its ancestors do,
the result is None traversal still stops at the opted-out boundary."""
runs = {
"wr_parent": _make_run(
run_id="wr_parent",
workflow_id="w_parent",
parent_run_id="wr_grand",
skip_inherited=True,
),
"wr_grand": _make_run(run_id="wr_grand", workflow_id="w_grand", parent_run_id=None),
}
workflows = {
"w_parent": _make_workflow("w_parent", workflow_system_prompt=None),
"w_grand": _make_workflow("w_grand", workflow_system_prompt="GRAND rule."),
}
_install_chain(monkeypatch, runs, workflows)
result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent")
assert result is None
@pytest.mark.asyncio
async def test_cycle_detection_breaks_loop(monkeypatch: pytest.MonkeyPatch) -> None:
"""A parent chain that points back to an already-visited run must not infinite-
loop. The helper should collect each unique ancestor's prompt exactly once."""
runs = {
"wr_a": _make_run(run_id="wr_a", workflow_id="w_a", parent_run_id="wr_b"),
"wr_b": _make_run(run_id="wr_b", workflow_id="w_b", parent_run_id="wr_a"),
}
workflows = {
"w_a": _make_workflow("w_a", workflow_system_prompt="A rule."),
"w_b": _make_workflow("w_b", workflow_system_prompt="B rule."),
}
get_run, _ = _install_chain(monkeypatch, runs, workflows)
result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_a")
assert result == "B rule.\n\nA rule."
# Each unique run fetched exactly once — no infinite loop.
assert get_run.await_count == 2
@pytest.mark.asyncio
async def test_missing_parent_run_breaks_chain(monkeypatch: pytest.MonkeyPatch) -> None:
"""If ``get_workflow_run`` returns None mid-walk (soft-deleted / race),
traversal stops cleanly and returns whatever was collected so far."""
runs = {
"wr_parent": _make_run(run_id="wr_parent", workflow_id="w_parent", parent_run_id="wr_missing"),
}
workflows = {"w_parent": _make_workflow("w_parent", workflow_system_prompt="PARENT rule.")}
_install_chain(monkeypatch, runs, workflows)
result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent")
assert result == "PARENT rule."
@pytest.mark.asyncio
async def test_missing_parent_workflow_skips_that_ancestor(monkeypatch: pytest.MonkeyPatch) -> None:
"""If ``get_workflow`` returns None (deleted definition) the traversal still
continues up the chain, silently skipping that level rather than aborting."""
runs = {
"wr_parent": _make_run(run_id="wr_parent", workflow_id="w_parent_missing", parent_run_id="wr_grand"),
"wr_grand": _make_run(run_id="wr_grand", workflow_id="w_grand", parent_run_id=None),
}
workflows = {"w_grand": _make_workflow("w_grand", workflow_system_prompt="GRAND rule.")}
_install_chain(monkeypatch, runs, workflows)
result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent")
assert result == "GRAND rule."
@pytest.mark.asyncio
async def test_depth_cap_bounds_traversal(monkeypatch: pytest.MonkeyPatch) -> None:
"""A malformed deep chain must stop at ``MAX_TRIGGER_DEPTH`` — the helper
collects ancestors up to the cap and returns those without hanging."""
depth = WorkflowTriggerBlock.MAX_TRIGGER_DEPTH + 5
runs: dict[str, WorkflowRun] = {}
workflows: dict[str, Workflow] = {}
for i in range(depth):
run_id = f"wr_{i}"
workflow_id = f"w_{i}"
parent_id = f"wr_{i + 1}" if i + 1 < depth else None
runs[run_id] = _make_run(run_id=run_id, workflow_id=workflow_id, parent_run_id=parent_id)
workflows[workflow_id] = _make_workflow(workflow_id, workflow_system_prompt=f"rule {i}.")
get_run, _ = _install_chain(monkeypatch, runs, workflows)
result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_0")
# Exactly MAX_TRIGGER_DEPTH ancestors fetched; deeper ones dropped.
assert get_run.await_count == WorkflowTriggerBlock.MAX_TRIGGER_DEPTH
assert result is not None
# Parts are "rule 0." … "rule (cap-1)." joined outermost-first.
expected_parts = [f"rule {i}." for i in reversed(range(WorkflowTriggerBlock.MAX_TRIGGER_DEPTH))]
assert result == "\n\n".join(expected_parts)

View file

@ -18,7 +18,7 @@ def _run_create_extract_action(monkeypatch, extracted_information_schema):
captured.update(kwargs)
return original_load_prompt(template_name, **kwargs)
async def fake_handler(*, prompt, step, prompt_name):
async def fake_handler(*, prompt, step, prompt_name, **_ignored):
captured["prompt"] = prompt
return {"summary": "ok"}

View file

@ -0,0 +1,493 @@
"""Tests for downloaded_files migration to short artifact URLs (SKY-8861)."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from skyvern.forge.sdk.artifact.manager import ArtifactManager
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType
from skyvern.forge.sdk.artifact.storage.s3 import S3Storage
@pytest.mark.asyncio
async def test_create_download_artifact_is_idempotent_per_run_and_uri():
"""A repeat save (e.g. inside a loop) must return the existing artifact_id so
downstream URL-based dedup (``loop_download_filter``) keeps seeing a stable URL.
"""
manager = ArtifactManager()
existing = Artifact(
artifact_id="a_existing",
artifact_type=ArtifactType.DOWNLOAD,
uri="s3://skyvern-uploads/downloads/local/o_1/wr_1/file.pdf",
organization_id="o_1",
run_id="wr_1",
workflow_run_id="wr_1",
created_at="2026-04-23T00:00:00Z",
modified_at="2026-04-23T00:00:00Z",
)
find_existing = AsyncMock(return_value=existing)
mock_db_create = AsyncMock()
with (
patch(
"skyvern.forge.sdk.artifact.manager.app.DATABASE.artifacts.find_download_artifact",
find_existing,
),
patch(
"skyvern.forge.sdk.artifact.manager.app.DATABASE.artifacts.create_artifact",
mock_db_create,
),
):
artifact_id = await manager.create_download_artifact(
organization_id="o_1",
run_id="wr_1",
workflow_run_id="wr_1",
uri=existing.uri,
filename="file.pdf",
)
assert artifact_id == "a_existing"
mock_db_create.assert_not_awaited()
@pytest.mark.asyncio
async def test_create_download_artifact_inserts_row_without_uploading():
"""create_download_artifact only writes a DB row; bytes are already in S3."""
manager = ArtifactManager()
mock_db_create = AsyncMock(
return_value=Artifact(
artifact_id="a_abc123",
artifact_type=ArtifactType.DOWNLOAD,
uri="s3://skyvern-uploads/download/prod/o_1/wr_1/file.pdf",
organization_id="o_1",
run_id="wr_1",
workflow_run_id="wr_1",
created_at="2026-04-23T00:00:00Z",
modified_at="2026-04-23T00:00:00Z",
)
)
mock_store = AsyncMock()
find_existing = AsyncMock(return_value=None)
with (
patch(
"skyvern.forge.sdk.artifact.manager.app.DATABASE.artifacts.find_download_artifact",
find_existing,
),
patch("skyvern.forge.sdk.artifact.manager.app.DATABASE.artifacts.create_artifact", mock_db_create),
patch("skyvern.forge.sdk.artifact.manager.app.STORAGE.store_artifact", mock_store),
patch("skyvern.forge.sdk.artifact.manager.app.STORAGE.store_artifact_from_path", mock_store),
):
artifact_id = await manager.create_download_artifact(
organization_id="o_1",
run_id="wr_1",
workflow_run_id="wr_1",
uri="s3://skyvern-uploads/download/prod/o_1/wr_1/file.pdf",
filename="file.pdf",
)
assert artifact_id.startswith("a_")
mock_db_create.assert_awaited_once()
_, kwargs = mock_db_create.call_args
assert kwargs["artifact_type"] == ArtifactType.DOWNLOAD
assert kwargs["uri"] == "s3://skyvern-uploads/download/prod/o_1/wr_1/file.pdf"
assert kwargs["organization_id"] == "o_1"
assert kwargs["run_id"] == "wr_1"
assert kwargs["workflow_run_id"] == "wr_1"
mock_store.assert_not_awaited()
@pytest.mark.asyncio
async def test_save_downloaded_files_registers_artifact_per_file(tmp_path):
"""After uploading each file to S3, save_downloaded_files should create an
Artifact row so later retrieval can build short /v1/artifacts URLs."""
download_dir = tmp_path / "downloads"
download_dir.mkdir()
(download_dir / "invoice.pdf").write_bytes(b"%PDF-1.4 ...")
(download_dir / "report.csv").write_bytes(b"a,b,c\n1,2,3\n")
storage = S3Storage()
storage.async_client = MagicMock()
storage.async_client.upload_file_from_path = AsyncMock()
mock_create_download = AsyncMock(return_value="a_new")
mock_artifact_manager = MagicMock()
mock_artifact_manager.create_download_artifact = mock_create_download
with (
patch("skyvern.forge.sdk.artifact.storage.s3.get_download_dir", return_value=str(download_dir)),
patch.object(storage, "_get_storage_class_for_org", new=AsyncMock(return_value=MagicMock())),
patch("skyvern.forge.sdk.artifact.storage.s3.calculate_sha256_for_file", return_value="sha-xyz"),
patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module,
):
app_module.ARTIFACT_MANAGER = mock_artifact_manager
await storage.save_downloaded_files(organization_id="o_1", run_id="wr_1")
assert mock_create_download.await_count == 2
uris = {call.kwargs["uri"] for call in mock_create_download.await_args_list}
filenames = {call.kwargs["filename"] for call in mock_create_download.await_args_list}
assert filenames == {"invoice.pdf", "report.csv"}
assert all(u.startswith("s3://") and "/downloads/" in u and "/o_1/wr_1/" in u for u in uris)
for call in mock_create_download.await_args_list:
assert call.kwargs["organization_id"] == "o_1"
assert call.kwargs["run_id"] == "wr_1"
def _make_artifact(
artifact_id: str,
uri: str,
run_id: str = "wr_1",
*,
checksum: str | None = None,
created_at: str = "2026-04-23T00:00:00Z",
) -> Artifact:
return Artifact(
artifact_id=artifact_id,
artifact_type=ArtifactType.DOWNLOAD,
uri=uri,
organization_id="o_1",
run_id=run_id,
workflow_run_id=run_id if run_id.startswith("wr_") else None,
checksum=checksum,
created_at=created_at,
modified_at=created_at,
)
_DUMMY_KEYRING_JSON = '{"current_kid": "k1", "keys": {"k1": {"secret": "0000000000000000000000000000000000000000000000000000000000000000"}}}'
@pytest.fixture
def keyring_configured():
"""Simulate cloud-style config: HMAC keyring is set so the artifact URL branch is active.
Unit tests default to no keyring to match the OSS default, so tests that exercise the
short-URL path must opt in."""
from skyvern.config import settings
with patch.object(settings, "ARTIFACT_CONTENT_HMAC_KEYRING", _DUMMY_KEYRING_JSON):
yield
@pytest.mark.asyncio
async def test_get_downloaded_files_uses_artifact_urls_when_rows_exist(keyring_configured):
"""When DOWNLOAD artifact rows exist, retrieval skips S3 entirely:
URL, checksum, filename, modified_at all come straight from the row."""
storage = S3Storage()
storage.async_client = MagicMock()
storage.async_client.list_files = AsyncMock() # must NOT be called
storage.async_client.get_file_metadata = AsyncMock() # must NOT be called
storage.async_client.create_presigned_urls = AsyncMock() # must NOT be called
artifact = _make_artifact(
"a_42",
"s3://skyvern-uploads/downloads/local/o_1/wr_1/invoice.pdf",
checksum="sha-from-db",
)
mock_list = AsyncMock(return_value=[artifact])
build_url = MagicMock(return_value="https://api.skyvern.com/v1/artifacts/a_42/content?expiry=x&kid=y&sig=z")
with patch("skyvern.forge.sdk.artifact.storage.base.app") as base_app:
with patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app:
s3_app.DATABASE.artifacts.list_artifacts_for_run_by_type = mock_list
base_app.ARTIFACT_MANAGER.build_signed_content_url = build_url
result = await storage.get_downloaded_files(organization_id="o_1", run_id="wr_1")
assert len(result) == 1
assert result[0].url.startswith("https://api.skyvern.com/v1/artifacts/a_42/content")
assert result[0].filename == "invoice.pdf"
assert result[0].checksum == "sha-from-db"
assert result[0].modified_at is not None
storage.async_client.list_files.assert_not_awaited()
storage.async_client.get_file_metadata.assert_not_awaited()
storage.async_client.create_presigned_urls.assert_not_awaited()
mock_list.assert_awaited_once_with(run_id="wr_1", organization_id="o_1", artifact_type=ArtifactType.DOWNLOAD)
@pytest.mark.asyncio
async def test_get_downloaded_files_preserves_artifact_row_order(keyring_configured):
"""Artifact rows are returned ASC by created_at; FileInfo list must follow the
same order (matches save order, drives loop_download_filter signatures)."""
storage = S3Storage()
storage.async_client = MagicMock()
first = _make_artifact(
"a_1",
"s3://skyvern-uploads/downloads/local/o_1/wr_1/first.pdf",
created_at="2026-04-23T00:00:00Z",
)
second = _make_artifact(
"a_2",
"s3://skyvern-uploads/downloads/local/o_1/wr_1/second.pdf",
created_at="2026-04-23T00:01:00Z",
)
mock_list = AsyncMock(return_value=[first, second])
build_url = MagicMock(
side_effect=lambda artifact_id, **_: f"https://api.skyvern.com/v1/artifacts/{artifact_id}/content"
)
with patch("skyvern.forge.sdk.artifact.storage.base.app") as base_app:
with patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app:
s3_app.DATABASE.artifacts.list_artifacts_for_run_by_type = mock_list
base_app.ARTIFACT_MANAGER.build_signed_content_url = build_url
result = await storage.get_downloaded_files(organization_id="o_1", run_id="wr_1")
assert [fi.filename for fi in result] == ["first.pdf", "second.pdf"]
@pytest.mark.asyncio
async def test_get_downloaded_files_falls_back_to_presigned_for_legacy_runs(keyring_configured):
"""Production-cloud legacy run: keyring IS configured, but the run pre-dates SKY-8861
so no artifact rows exist. Files in S3 must still surface as presigned URLs the
whole point of keeping the fallback path."""
storage = S3Storage()
storage.async_client = MagicMock()
s3_key = "downloads/local/o_1/wr_old/legacy.pdf"
storage.async_client.list_files = AsyncMock(return_value=[s3_key])
storage.async_client.get_file_metadata = AsyncMock(
return_value={"sha256_checksum": "sha-old", "original_filename": "legacy.pdf"}
)
storage.async_client.create_presigned_urls = AsyncMock(
return_value=["https://skyvern-uploads.s3.amazonaws.com/...?sig=old"]
)
mock_list = AsyncMock(return_value=[]) # no artifact rows for this legacy run
build_url = MagicMock() # must NOT be called
with patch("skyvern.forge.sdk.artifact.storage.base.app") as base_app:
with patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app:
s3_app.DATABASE.artifacts.list_artifacts_for_run_by_type = mock_list
base_app.ARTIFACT_MANAGER.build_signed_content_url = build_url
result = await storage.get_downloaded_files(organization_id="o_1", run_id="wr_old")
assert len(result) == 1
assert result[0].filename == "legacy.pdf"
assert result[0].checksum == "sha-old"
assert "s3.amazonaws.com" in result[0].url # nosemgrep: incomplete-url-substring-sanitization
build_url.assert_not_called()
storage.async_client.list_files.assert_awaited_once()
storage.async_client.create_presigned_urls.assert_awaited_once()
@pytest.mark.asyncio
async def test_get_downloaded_files_falls_back_to_presigned_when_keyring_unset(tmp_path):
"""Self-hosted OSS deployments without ARTIFACT_CONTENT_HMAC_KEYRING must keep
serving presigned S3 URLs the short Skyvern URL would be unsigned and the
content endpoint would 401 without an API key."""
from skyvern.config import settings
storage = S3Storage()
storage.async_client = MagicMock()
s3_key = "downloads/local/o_1/wr_1/invoice.pdf"
storage.async_client.list_files = AsyncMock(return_value=[s3_key])
storage.async_client.get_file_metadata = AsyncMock(
return_value={"sha256_checksum": "sha-abc", "original_filename": "invoice.pdf"}
)
storage.async_client.create_presigned_urls = AsyncMock(
return_value=["https://skyvern-uploads.s3.amazonaws.com/...?sig=fallback"]
)
artifact = _make_artifact("a_42", f"s3://skyvern-uploads/{s3_key}")
mock_list = AsyncMock(return_value=[artifact])
build_url = MagicMock()
with (
patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module,
patch.object(settings, "ARTIFACT_CONTENT_HMAC_KEYRING", None),
):
app_module.DATABASE.artifacts.list_artifacts_for_run_by_type = mock_list
app_module.ARTIFACT_MANAGER.build_signed_content_url = build_url
result = await storage.get_downloaded_files(organization_id="o_1", run_id="wr_1")
assert len(result) == 1
assert "s3.amazonaws.com" in result[0].url # nosemgrep: incomplete-url-substring-sanitization
build_url.assert_not_called()
@pytest.mark.asyncio
async def test_get_downloaded_files_artifact_lookup_failure_falls_back_to_listing(keyring_configured):
"""If the DB lookup raises (transient outage), retrieval must not 500 the
run-output API fall through to the legacy S3-listing path so files
still surface as presigned URLs."""
storage = S3Storage()
storage.async_client = MagicMock()
s3_key = "downloads/local/o_1/wr_1/recoverable.pdf"
storage.async_client.list_files = AsyncMock(return_value=[s3_key])
storage.async_client.get_file_metadata = AsyncMock(
return_value={"sha256_checksum": "sha-recover", "original_filename": "recoverable.pdf"}
)
storage.async_client.create_presigned_urls = AsyncMock(
return_value=["https://skyvern-uploads.s3.amazonaws.com/...?sig=fallback"]
)
mock_list = AsyncMock(side_effect=RuntimeError("DB unreachable"))
with patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module:
app_module.DATABASE.artifacts.list_artifacts_for_run_by_type = mock_list
result = await storage.get_downloaded_files(organization_id="o_1", run_id="wr_1")
assert len(result) == 1
assert "s3.amazonaws.com" in result[0].url # nosemgrep: incomplete-url-substring-sanitization
storage.async_client.list_files.assert_awaited_once()
@pytest.mark.asyncio
async def test_content_endpoint_download_returns_attachment_with_filename():
"""DOWNLOAD artifacts must serve with attachment disposition so browsers don't render
PDFs inline (defeats the SKY-8862 XSS-via-PDF mitigation)."""
from skyvern.forge.sdk.routes.agent_protocol import _artifact_response_config
artifact = _make_artifact("a_dl", "s3://skyvern-uploads/downloads/local/o_1/wr_1/invoice.pdf")
media_type, disposition = _artifact_response_config(artifact)
assert media_type == "application/octet-stream"
assert disposition.startswith("attachment;")
assert 'filename="invoice.pdf"' in disposition
def test_content_endpoint_non_download_stays_inline():
"""Existing artifact types keep the inline disposition we had before."""
from skyvern.forge.sdk.routes.agent_protocol import _artifact_response_config
screenshot = Artifact(
artifact_id="a_ss",
artifact_type=ArtifactType.SCREENSHOT_FINAL,
uri="s3://skyvern-artifacts/.../final.png",
organization_id="o_1",
created_at="2026-04-23T00:00:00Z",
modified_at="2026-04-23T00:00:00Z",
)
media_type, disposition = _artifact_response_config(screenshot)
assert media_type == "image/png"
assert disposition == "inline"
def test_content_endpoint_download_non_ascii_filename_does_not_crash_header_encoding():
"""Starlette encodes response headers as Latin-1. Unicode filenames must use RFC 5987
(filename*=UTF-8''...) with an ASCII fallback so the endpoint does not 500."""
from starlette.responses import Response
from skyvern.forge.sdk.routes.agent_protocol import _artifact_response_config
artifact = _make_artifact("a_unicode", "s3://skyvern-uploads/downloads/local/o_1/wr_1/文档.pdf")
media_type, disposition = _artifact_response_config(artifact)
# Must not raise — Starlette's header encoding rejects non-Latin-1 bytes.
Response(content=b"x", media_type=media_type, headers={"Content-Disposition": disposition})
assert "filename*=UTF-8''" in disposition
assert "%E6%96%87%E6%A1%A3.pdf" in disposition or "%e6%96%87%e6%a1%a3.pdf" in disposition
def test_sanitize_header_filename_strips_crlf_and_quotes_directly():
"""Direct unit test on _sanitize_header_filename so we know the function works
even when urlparse isn't part of the chain (defense in depth)."""
from skyvern.forge.sdk.routes.agent_protocol import _sanitize_header_filename
assert _sanitize_header_filename('evil"pdf') == "evilpdf"
assert _sanitize_header_filename("hello\r\nworld.pdf") == "helloworld.pdf"
assert _sanitize_header_filename("back\\slash.pdf") == "backslash.pdf"
assert _sanitize_header_filename("") == "download"
def test_content_endpoint_download_filename_preserves_question_and_hash():
"""S3 keys may legitimately contain '?' or '#'; urlparse would otherwise strip them."""
from skyvern.forge.sdk.routes.agent_protocol import _artifact_response_config
artifact = _make_artifact("a_q", "s3://skyvern-uploads/downloads/local/o_1/wr_1/report?v=2#a.pdf")
_, disposition = _artifact_response_config(artifact)
assert "report%3Fv%3D2%23a.pdf" in disposition or "report?v=2#a.pdf" in disposition
def test_sanitize_header_filename_strips_bidi_and_format_characters():
"""Unicode bidi overrides and format chars (ZWSP, RLO, ZWNBSP) enable filename
spoofing in the browser's download UI (``invoice\\u202efdp.exe`` -> ``invoice.exe.pdf``)."""
from skyvern.forge.sdk.routes.agent_protocol import _sanitize_header_filename
assert "\u202e" not in _sanitize_header_filename("invoice\u202efdp.exe")
assert "\u200b" not in _sanitize_header_filename("stealth\u200b.pdf")
assert "\ufeff" not in _sanitize_header_filename("bom\ufeff.pdf")
def test_ascii_fallback_filename_preserves_stem_for_pure_unicode_names():
"""Pure non-ASCII names (e.g. CJK, emoji) must not reduce to a bare ``.pdf`` hidden
dotfile after the NFKD strip; fall back to a ``download`` stem instead."""
from skyvern.forge.sdk.routes.agent_protocol import _ascii_fallback_filename
assert _ascii_fallback_filename("文档.pdf") == "download.pdf"
assert _ascii_fallback_filename("🎉.pdf") == "download.pdf"
# Accented Latin still transliterates to keep the stem.
assert _ascii_fallback_filename("fïlè.pdf") == "file.pdf"
def test_sanitize_header_filename_strips_control_characters():
"""NUL/DEL/C1 control chars are valid Latin-1 bytes but violate RFC 7230 header syntax."""
from skyvern.forge.sdk.routes.agent_protocol import _sanitize_header_filename
assert "\x00" not in _sanitize_header_filename("evil\x00.pdf")
assert "\x7f" not in _sanitize_header_filename("evil\x7f.pdf")
assert "\x1b" not in _sanitize_header_filename("evil\x1b.pdf")
assert "\x80" not in _sanitize_header_filename("evil\x80.pdf")
@pytest.mark.asyncio
async def test_content_endpoint_sets_nosniff_header_end_to_end():
"""Hit the real route through a FastAPI TestClient and verify the response
actually carries X-Content-Type-Options: nosniff on the DOWNLOAD path.
Defence-in-depth for SKY-8862: prevents a refactor from silently dropping
the header without this suite noticing."""
import json
from fastapi import FastAPI
from fastapi.testclient import TestClient
from skyvern.config import settings
from skyvern.forge.sdk.artifact.signing import sign_artifact_url
from skyvern.forge.sdk.routes.routers import base_router
artifact = _make_artifact("a_e2e", "s3://skyvern-uploads/downloads/local/o_1/wr_1/report.pdf")
keyring_json = json.dumps({"current_kid": "k1", "keys": {"k1": {"secret": "0" * 64, "created_at": "2026-04-23"}}})
with (
patch.object(settings, "ARTIFACT_CONTENT_HMAC_KEYRING", keyring_json),
patch.object(settings, "SKYVERN_BASE_URL", "http://testserver"),
patch("skyvern.forge.sdk.routes.agent_protocol.app") as app_module,
):
app_module.DATABASE.artifacts.get_artifact_by_id_no_org = AsyncMock(return_value=artifact)
app_module.ARTIFACT_MANAGER.retrieve_artifact = AsyncMock(return_value=b"%PDF-1.4 fake body")
from skyvern.forge.sdk.artifact.signing import parse_keyring
signed_url = sign_artifact_url(
base_url="http://testserver",
artifact_id=artifact.artifact_id,
keyring=parse_keyring(keyring_json),
artifact_name="report.pdf",
artifact_type="download",
)
test_app = FastAPI()
test_app.include_router(base_router, prefix="/v1")
client = TestClient(test_app)
resp = client.get(signed_url.replace("http://testserver", ""))
assert resp.status_code == 200, resp.text
assert resp.headers.get("X-Content-Type-Options") == "nosniff"
assert resp.headers.get("Content-Disposition", "").startswith("attachment;")
assert resp.content == b"%PDF-1.4 fake body"
def test_content_endpoint_download_filename_strips_header_injection():
"""URI-derived filenames go straight into a Content-Disposition header;
CR/LF and raw quotes must be stripped to prevent header injection."""
from skyvern.forge.sdk.routes.agent_protocol import _artifact_response_config
artifact = _make_artifact(
"a_bad",
's3://skyvern-uploads/downloads/local/o_1/wr_1/evil"\r\nSet-Cookie: x=y.pdf',
)
_, disposition = _artifact_response_config(artifact)
assert "\r" not in disposition
assert "\n" not in disposition
assert disposition.count('"') == 2 # only the pair around filename

View file

@ -196,7 +196,7 @@ def _capture_ai_extract_kwargs(monkeypatch, include_extracted_text: bool):
async def fake_refresh(*_args, **_kwargs):
return None
async def fake_handler(*, prompt, step, screenshots, prompt_name, force_dict):
async def fake_handler(*, prompt, step, screenshots, prompt_name, force_dict, **_ignored):
return {}
monkeypatch.setattr(module, "load_prompt_with_elements_tracked", fake_load_prompt_with_elements_tracked)
@ -258,7 +258,7 @@ def _capture_ai_extract_kwargs_with_schema(monkeypatch, schema):
async def fake_refresh(*_args, **_kwargs):
return None
async def fake_handler(*, prompt, step, screenshots, prompt_name, force_dict):
async def fake_handler(*, prompt, step, screenshots, prompt_name, force_dict, **_ignored):
return {}
monkeypatch.setattr(module, "load_prompt_with_elements_tracked", fake_load_prompt_with_elements_tracked)

View file

@ -625,3 +625,40 @@ class TestSanitizeWorkflowYamlWithReferences:
goal = result["workflow_definition"]["blocks"][0]["navigation_goal"]
assert "{{ block_1 }}" in goal
assert "{{ block_1_output }}" in goal
def test_sanitize_updates_output_references_in_workflow_system_prompt(self) -> None:
"""Output references inside the workflow-level workflow_system_prompt must
be rewritten when the referenced block label is sanitized. The global
prompt is resolved through Jinja at execution time, so its references
need the same renaming treatment as block-level fields."""
workflow_yaml = {
"title": "Test Workflow",
"workflow_definition": {
"parameters": [],
"blocks": [{"label": "my-block", "block_type": "task"}],
"workflow_system_prompt": "Honor {{ my-block_output }} for every downstream block.",
},
}
result = sanitize_workflow_yaml_with_references(workflow_yaml)
assert result["workflow_definition"]["blocks"][0]["label"] == "my_block"
assert "{{ my_block_output }}" in result["workflow_definition"]["workflow_system_prompt"]
def test_sanitize_parameter_key_updates_jinja_references_in_workflow_system_prompt(self) -> None:
"""Parameter-key references inside the workflow-level workflow_system_prompt
must be rewritten when the parameter key is sanitized."""
workflow_yaml = {
"title": "Test Workflow",
"workflow_definition": {
"parameters": [
{
"key": "user-input",
"parameter_type": "workflow",
}
],
"blocks": [],
"workflow_system_prompt": "Always respond in the style of {{ user-input }}.",
},
}
result = sanitize_workflow_yaml_with_references(workflow_yaml)
assert result["workflow_definition"]["parameters"][0]["key"] == "user_input"
assert "{{ user_input }}" in result["workflow_definition"]["workflow_system_prompt"]

View file

@ -0,0 +1,387 @@
"""Regression tests: workflow-level workflow_system_prompt must reach the LLM handler.
Complements ``test_block_workflow_system_prompt_inheritance.py`` (which asserts
that the workflow prompt flows onto the block model). These tests assert the
next hop that each block/function that makes an LLM call actually forwards
``system_prompt`` as a kwarg to the handler. Without this, inheritance passes
quietly but the prompt has no effect on output (the bug the user hit on the
extraction path).
"""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
from skyvern.forge.sdk.cache import extraction_cache
from skyvern.forge.sdk.workflow.models.block import FileParserBlock, PDFParserBlock, TextPromptBlock
from skyvern.forge.sdk.workflow.models.parameter import OutputParameter, ParameterType
from skyvern.schemas.workflows import FileType
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _make_output_parameter() -> OutputParameter:
now = datetime.now(timezone.utc)
return OutputParameter(
parameter_type=ParameterType.OUTPUT,
key="task1_output",
description="test output",
output_parameter_id="op_task1",
workflow_id="w_test",
created_at=now,
modified_at=now,
)
def _make_scraped_page():
refreshed = MagicMock()
refreshed.extracted_text = "page text"
refreshed.url = "https://example.test"
refreshed.screenshots = []
refreshed.build_element_tree = MagicMock(return_value="<a>link</a>")
refreshed.support_economy_elements_tree = MagicMock(return_value=False)
refreshed.last_used_element_tree_html = None
scraped_page = MagicMock()
scraped_page.refresh = AsyncMock(return_value=refreshed)
scraped_page.screenshots = []
return scraped_page
def _make_task(*, system_prompt: str | None) -> MagicMock:
task = MagicMock()
task.navigation_goal = None
task.navigation_payload = None
task.extracted_information = None
task.extracted_information_schema = {"type": "object"}
task.data_extraction_goal = "Extract documents"
task.error_code_mapping = None
task.llm_key = None
task.workflow_run_id = "wfr_sysprompt"
task.task_id = "tsk_sysprompt"
task.workflow_permanent_id = "wpid_sysprompt"
task.organization_id = "o_sysprompt"
task.include_extracted_text = True
task.workflow_system_prompt = system_prompt
return task
# ---------------------------------------------------------------------------
# extract-information handler (the OP's exact failure path)
# ---------------------------------------------------------------------------
def test_extract_information_forwards_system_prompt(monkeypatch) -> None:
"""Regression for the OP's bug: an ExtractionBlock whose Task carries a
``system_prompt`` (inherited from workflow.workflow_system_prompt) must
forward it to the extract-information LLM call."""
from skyvern.webeye.actions import handler
extraction_cache._reset_for_tests()
captured: dict = {}
async def fake_llm(**kwargs):
captured.update(kwargs)
return {"quotes": "SHOUTED QUOTE"}
monkeypatch.setattr(
handler,
"load_prompt_with_elements_tracked",
lambda **kwargs: ("rendered-prompt", dict(kwargs)),
)
monkeypatch.setattr(handler, "ensure_context", lambda: MagicMock(tz_info=None))
monkeypatch.setattr(handler.service_utils, "is_cua_task", AsyncMock(return_value=False))
monkeypatch.setattr(
handler.LLMAPIHandlerFactory,
"get_override_llm_api_handler",
lambda llm_key, default: fake_llm,
)
monkeypatch.setattr(
handler.app.AGENT_FUNCTION,
"should_shadow_extraction_cache_hit",
AsyncMock(return_value=False),
)
monkeypatch.setattr(
handler.app.AGENT_FUNCTION,
"lookup_cross_run_extraction_cache",
AsyncMock(return_value=None),
)
monkeypatch.setattr(
handler.app.AGENT_FUNCTION,
"store_cross_run_extraction_cache",
AsyncMock(return_value=None),
)
task = _make_task(system_prompt="Respond only in uppercase.")
step = MagicMock(step_id="stp_sp", retry_index=0)
scraped_page = _make_scraped_page()
asyncio.run(handler.extract_information_for_navigation_goal(task=task, step=step, scraped_page=scraped_page))
assert captured.get("system_prompt") == "Respond only in uppercase."
extraction_cache._reset_for_tests()
def test_extract_information_passes_none_system_prompt_when_task_has_none(monkeypatch) -> None:
"""No workflow_system_prompt → task.workflow_system_prompt is None → handler receives None.
Locks in that we don't accidentally invent a default or drop the kwarg."""
from skyvern.webeye.actions import handler
extraction_cache._reset_for_tests()
captured: dict = {}
async def fake_llm(**kwargs):
captured.update(kwargs)
return {"quotes": "x"}
monkeypatch.setattr(
handler,
"load_prompt_with_elements_tracked",
lambda **kwargs: ("rendered-prompt", dict(kwargs)),
)
monkeypatch.setattr(handler, "ensure_context", lambda: MagicMock(tz_info=None))
monkeypatch.setattr(handler.service_utils, "is_cua_task", AsyncMock(return_value=False))
monkeypatch.setattr(
handler.LLMAPIHandlerFactory,
"get_override_llm_api_handler",
lambda llm_key, default: fake_llm,
)
monkeypatch.setattr(
handler.app.AGENT_FUNCTION,
"should_shadow_extraction_cache_hit",
AsyncMock(return_value=False),
)
monkeypatch.setattr(
handler.app.AGENT_FUNCTION,
"lookup_cross_run_extraction_cache",
AsyncMock(return_value=None),
)
monkeypatch.setattr(
handler.app.AGENT_FUNCTION,
"store_cross_run_extraction_cache",
AsyncMock(return_value=None),
)
task = _make_task(system_prompt=None)
step = MagicMock(step_id="stp_none", retry_index=0)
scraped_page = _make_scraped_page()
asyncio.run(handler.extract_information_for_navigation_goal(task=task, step=step, scraped_page=scraped_page))
assert "system_prompt" in captured
assert captured["system_prompt"] is None
extraction_cache._reset_for_tests()
# ---------------------------------------------------------------------------
# data-extraction-summary (agent.py path)
# ---------------------------------------------------------------------------
def test_data_extraction_summary_forwards_system_prompt(monkeypatch) -> None:
from skyvern.forge import agent as agent_module
captured: dict = {}
async def fake_handler(**kwargs):
captured.update(kwargs)
return {"summary": "ok"}
monkeypatch.setattr(agent_module.app, "EXTRACTION_LLM_API_HANDLER", fake_handler)
monkeypatch.setattr(
agent_module.skyvern_context,
"ensure_context",
lambda: MagicMock(tz_info=None, workflow_run_id="wr_sp"),
)
monkeypatch.setattr(agent_module.extraction_cache, "compute_cache_key", lambda **_: None)
monkeypatch.setattr(agent_module.extraction_cache, "lookup", lambda *a, **k: None)
task = _make_task(system_prompt="Answer in French.")
step = MagicMock(step_id="stp_sp", order=0)
scraped_page = MagicMock(url="https://example.test")
asyncio.run(agent_module.ForgeAgent.create_extract_action(task=task, step=step, scraped_page=scraped_page))
assert captured.get("system_prompt") == "Answer in French."
# ---------------------------------------------------------------------------
# Extraction cache key — system_prompt must be part of the key
# ---------------------------------------------------------------------------
def test_extraction_cache_key_changes_with_system_prompt() -> None:
"""Two calls that differ only in system_prompt must produce different
digests otherwise a user switching workflow_system_prompt mid-run would
get stale output from a prior key's cached value."""
base_kwargs: dict = dict(
call_path="handler",
element_tree="<a>link</a>",
extracted_text="text",
current_url="https://example.test",
data_extraction_goal="Extract docs",
extracted_information_schema={"type": "object"},
navigation_payload=None,
error_code_mapping=None,
previous_extracted_information=None,
llm_key=None,
)
key_no_sp = extraction_cache.compute_cache_key(**base_kwargs, workflow_system_prompt=None)
key_a = extraction_cache.compute_cache_key(**base_kwargs, workflow_system_prompt="Answer in Spanish.")
key_b = extraction_cache.compute_cache_key(**base_kwargs, workflow_system_prompt="Answer in French.")
assert key_no_sp != key_a
assert key_a != key_b
# Same prompt → same key (determinism).
key_a2 = extraction_cache.compute_cache_key(**base_kwargs, workflow_system_prompt="Answer in Spanish.")
assert key_a == key_a2
# ---------------------------------------------------------------------------
# TextPromptBlock.send_prompt
# ---------------------------------------------------------------------------
def test_text_prompt_block_forwards_system_prompt(monkeypatch) -> None:
from skyvern.forge.sdk.workflow.models import block as block_module
captured: dict = {}
async def fake_llm(**kwargs):
captured.update(kwargs)
return {"llm_response": "hi"}
async def fake_default_handler(*args, **kwargs):
return None
async def fake_resolve(self, workflow_run_id, organization_id):
return fake_default_handler
monkeypatch.setattr(TextPromptBlock, "_resolve_default_llm_handler", fake_resolve)
monkeypatch.setattr(
block_module.LLMAPIHandlerFactory,
"get_override_llm_api_handler",
lambda llm_key, default: fake_llm,
)
block = TextPromptBlock(
label="p",
output_parameter=_make_output_parameter(),
prompt="What is the meaning of life?",
workflow_system_prompt="Respond as Shakespeare.",
)
asyncio.run(
block.send_prompt(
prompt="What is the meaning of life?",
parameter_values={},
workflow_run_id="wfr_sp",
organization_id="o_sp",
workflow_run_block_id=None,
)
)
assert captured.get("system_prompt") == "Respond as Shakespeare."
# ---------------------------------------------------------------------------
# FileParserBlock._extract_with_ai
# ---------------------------------------------------------------------------
def test_file_parser_block_forwards_system_prompt(monkeypatch) -> None:
from skyvern.forge.sdk.workflow.models import block as block_module
captured: dict = {}
async def fake_llm(**kwargs):
captured.update(kwargs)
return {"output": {}}
monkeypatch.setattr(
block_module.LLMAPIHandlerFactory,
"get_override_llm_api_handler",
lambda llm_key, default: fake_llm,
)
block = FileParserBlock(
label="fp",
output_parameter=_make_output_parameter(),
file_url="https://example.com/file.csv",
file_type=FileType.CSV,
workflow_system_prompt="Parse as JSON only.",
)
asyncio.run(block._extract_with_ai("hello,world\n1,2", workflow_run_context=MagicMock()))
assert captured.get("system_prompt") == "Parse as JSON only."
# ---------------------------------------------------------------------------
# PDFParserBlock.execute — system_prompt forwarded to LLM call
# ---------------------------------------------------------------------------
def test_pdf_parser_block_forwards_system_prompt(monkeypatch) -> None:
from skyvern.forge.sdk.workflow.models import block as block_module
captured: dict = {}
async def fake_llm(**kwargs):
captured.update(kwargs)
return {"output": {}}
# Patch the app-level handler directly since PDFParserBlock uses app.LLM_API_HANDLER.
monkeypatch.setattr(block_module.app, "LLM_API_HANDLER", fake_llm)
monkeypatch.setattr(
block_module,
"download_file",
AsyncMock(return_value="/tmp/file.pdf"),
)
monkeypatch.setattr(block_module, "extract_pdf_file", lambda *a, **k: "extracted text")
workflow_run_context = MagicMock()
workflow_run_context.has_parameter = MagicMock(return_value=False)
workflow_run_context.has_value = MagicMock(return_value=False)
workflow_run_context.workflow = None
workflow_run_context.resolve_effective_workflow_system_prompt = MagicMock(return_value=None)
monkeypatch.setattr(
PDFParserBlock,
"get_workflow_run_context",
staticmethod(lambda workflow_run_id: workflow_run_context),
)
monkeypatch.setattr(
PDFParserBlock,
"record_output_parameter_value",
AsyncMock(return_value=None),
)
monkeypatch.setattr(
PDFParserBlock,
"build_block_result",
AsyncMock(return_value=MagicMock()),
)
block = PDFParserBlock(
label="pdf",
output_parameter=_make_output_parameter(),
file_url="https://example.com/file.pdf",
workflow_system_prompt="Summarize in one sentence.",
)
asyncio.run(
block.execute(
workflow_run_id="wfr_sp_pdf",
workflow_run_block_id="wrb_sp_pdf",
organization_id="o_sp_pdf",
)
)
assert captured.get("system_prompt") == "Summarize in one sentence."