diff --git a/tests/unit/test_ai_extract_system_prompt_optout.py b/tests/unit/test_ai_extract_system_prompt_optout.py new file mode 100644 index 000000000..70ec83973 --- /dev/null +++ b/tests/unit/test_ai_extract_system_prompt_optout.py @@ -0,0 +1,263 @@ +"""Regression tests: script-path ``ai_extract`` must honor the current block's +``ignore_workflow_system_prompt`` opt-out, matching agent-path behavior. + +Without this, a cached script-path extraction still injects the workflow prompt +even when the block has opted out — the two execution modes diverge for the +same block (SKY-9147). +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from skyvern.core.script_generations import real_skyvern_page_ai as module +from skyvern.core.script_generations.skyvern_page_ai import SYSTEM_PROMPT_UNSET +from skyvern.forge.sdk.workflow.models.block import ExtractionBlock +from skyvern.forge.sdk.workflow.models.parameter import OutputParameter, ParameterType +from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowDefinition + + +def _make_output_parameter() -> OutputParameter: + now = datetime.now(timezone.utc) + return OutputParameter( + parameter_type=ParameterType.OUTPUT, + key="extract1_output", + description="test output", + output_parameter_id="op_extract1", + workflow_id="w_test", + created_at=now, + modified_at=now, + ) + + +def _make_workflow_with_block(*, ignore_workflow_system_prompt: bool, workflow_system_prompt: str | None) -> Workflow: + block = ExtractionBlock( + label="extract1", + output_parameter=_make_output_parameter(), + data_extraction_goal="Extract things", + ignore_workflow_system_prompt=ignore_workflow_system_prompt, + ) + now = datetime.now(timezone.utc) + return Workflow( + workflow_id="w_test", + organization_id="o_test", + title="test", + workflow_permanent_id="wpid_test", + version=1, + is_saved_task=False, + workflow_definition=WorkflowDefinition( + parameters=[], + blocks=[block], + workflow_system_prompt=workflow_system_prompt, + ), + created_at=now, + modified_at=now, + ) + + +def _run_ai_extract( + monkeypatch: pytest.MonkeyPatch, + *, + workflow: Workflow | None, + current_label: str | None, + extra_kwargs: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Drive ``RealSkyvernPageAi.ai_extract`` with mocks that capture both the + cache-key ``system_prompt`` and the LLM-handler ``system_prompt``. Returns + a dict with keys ``cache_system_prompt`` and ``llm_system_prompt``.""" + + captured: dict[str, Any] = {} + + def fake_compute_cache_key(**kwargs: Any) -> str: + captured["cache_system_prompt"] = kwargs.get("workflow_system_prompt") + return "fake-cache-key" + + def fake_lookup(*_args: Any, **_kwargs: Any) -> None: + # Cache miss — let the flow fall through to the handler. + return None + + def fake_load_prompt(**_kwargs: Any) -> tuple[str, dict[str, Any]]: + return "rendered-prompt", { + "extracted_text": None, + "extracted_information_schema": None, + } + + async def fake_handler(*, system_prompt: Any = None, **_ignored: Any) -> dict[str, Any]: + captured["llm_system_prompt"] = system_prompt + return {} + + # Build a minimal fake ``WorkflowRunContext``. The single-block workflows + # this helper builds go through ``_apply_workflow_system_prompt`` in the + # real script-path dispatch, so simulate that by pre-recording the block's + # effective value — ``None`` when the block opts out, the rendered prompt + # otherwise. ``ai_extract`` reads this recorded value verbatim, which is + # exactly the contract Andrew's fix enforces. + if workflow is not None: + block = workflow.workflow_definition.blocks[0] if workflow.workflow_definition.blocks else None + workflow_prompt = workflow.workflow_definition.workflow_system_prompt + recorded_value: str | None = ( + None if (block is not None and getattr(block, "ignore_workflow_system_prompt", False)) else workflow_prompt + ) + has_block_record = block is not None and current_label == block.label + ctx = MagicMock() + ctx.workflow = workflow + ctx.resolve_effective_workflow_system_prompt = MagicMock(return_value=workflow_prompt) + ctx.get_block_workflow_system_prompt = MagicMock( + return_value=(has_block_record, recorded_value), + ) + + def get_run_ctx(_run_id: str) -> Any: + return ctx + + monkeypatch.setattr( + module.app.WORKFLOW_CONTEXT_MANAGER, + "get_workflow_run_context", + get_run_ctx, + ) + + skyvern_ctx = MagicMock() + skyvern_ctx.workflow_run_id = "wr_test" + skyvern_ctx.tz_info = None + skyvern_ctx.organization_id = None + skyvern_ctx.task_id = None + skyvern_ctx.step_id = None + skyvern_ctx.script_mode = False + + monkeypatch.setattr(module.skyvern_context, "current", lambda: skyvern_ctx) + monkeypatch.setattr(module, "load_prompt_with_elements_tracked", fake_load_prompt) + monkeypatch.setattr(module.extraction_cache, "compute_cache_key", fake_compute_cache_key) + monkeypatch.setattr(module.extraction_cache, "lookup", fake_lookup) + monkeypatch.setattr(module.app, "EXTRACTION_LLM_API_HANDLER", fake_handler) + + scraped_page = MagicMock() + scraped_page.url = "https://example.test" + scraped_page.extracted_text = "page text" + scraped_page.screenshots = [] + scraped_page.build_element_tree = MagicMock(return_value="link") + scraped_page.support_economy_elements_tree = MagicMock(return_value=False) + scraped_page.last_used_element_tree_html = None + + page = module.RealSkyvernPageAi.__new__(module.RealSkyvernPageAi) + page.scraped_page = scraped_page + page.current_label = current_label + + async def fake_refresh(*_args: Any, **_kwargs: Any) -> None: + return None + + monkeypatch.setattr(page, "_refresh_scraped_page", fake_refresh) + + asyncio.run( + page.ai_extract( + prompt="Extract things", + schema={"type": "object"}, + **(extra_kwargs or {}), + ) + ) + + return captured + + +# --------------------------------------------------------------------------- +# The bug Andrew flagged: block opts out, but script-path still injects prompt. +# --------------------------------------------------------------------------- + + +def test_opted_out_block_sends_no_system_prompt_to_llm(monkeypatch: pytest.MonkeyPatch) -> None: + workflow = _make_workflow_with_block( + ignore_workflow_system_prompt=True, + workflow_system_prompt="WORKFLOW RULES.", + ) + captured = _run_ai_extract(monkeypatch, workflow=workflow, current_label="extract1") + + assert captured["llm_system_prompt"] is None + + +def test_opted_out_block_cache_key_omits_system_prompt(monkeypatch: pytest.MonkeyPatch) -> None: + """Cache-key parity with agent path: an opted-out block's extractions must + hash the same way whether the workflow has a prompt set or not. Otherwise a + user toggling ``workflow_system_prompt`` at the workflow level silently + invalidates caches for blocks that explicitly opted out.""" + workflow = _make_workflow_with_block( + ignore_workflow_system_prompt=True, + workflow_system_prompt="WORKFLOW RULES.", + ) + captured = _run_ai_extract(monkeypatch, workflow=workflow, current_label="extract1") + + assert captured["cache_system_prompt"] is None + + +# --------------------------------------------------------------------------- +# Non-opted-out block still inherits the workflow prompt (no regression of the +# base feature). +# --------------------------------------------------------------------------- + + +def test_non_opted_out_block_receives_workflow_prompt(monkeypatch: pytest.MonkeyPatch) -> None: + workflow = _make_workflow_with_block( + ignore_workflow_system_prompt=False, + workflow_system_prompt="WORKFLOW RULES.", + ) + captured = _run_ai_extract(monkeypatch, workflow=workflow, current_label="extract1") + + assert captured["llm_system_prompt"] == "WORKFLOW RULES." + assert captured["cache_system_prompt"] == "WORKFLOW RULES." + + +# --------------------------------------------------------------------------- +# Explicit parameter overrides: caller can pass ``system_prompt`` directly, +# bypassing the block-flag lookup. Includes the "explicit None" escape hatch. +# --------------------------------------------------------------------------- + + +def test_explicit_system_prompt_overrides_workflow(monkeypatch: pytest.MonkeyPatch) -> None: + workflow = _make_workflow_with_block( + ignore_workflow_system_prompt=False, + workflow_system_prompt="WORKFLOW RULES.", + ) + captured = _run_ai_extract( + monkeypatch, + workflow=workflow, + current_label="extract1", + extra_kwargs={"system_prompt": "EXPLICIT RULES."}, + ) + + assert captured["llm_system_prompt"] == "EXPLICIT RULES." + assert captured["cache_system_prompt"] == "EXPLICIT RULES." + + +def test_explicit_none_opts_out_even_when_workflow_has_prompt(monkeypatch: pytest.MonkeyPatch) -> None: + """Explicit ``None`` is the escape hatch: caller says "send no system prompt" + regardless of the workflow or block state.""" + workflow = _make_workflow_with_block( + ignore_workflow_system_prompt=False, + workflow_system_prompt="WORKFLOW RULES.", + ) + captured = _run_ai_extract( + monkeypatch, + workflow=workflow, + current_label="extract1", + extra_kwargs={"system_prompt": None}, + ) + + assert captured["llm_system_prompt"] is None + assert captured["cache_system_prompt"] is None + + +def test_sentinel_default_is_not_leaked_to_handler(monkeypatch: pytest.MonkeyPatch) -> None: + """The sentinel must never reach the LLM handler — if the fallback path + silently forwarded it, the handler would see a non-None, non-string object + and fail or pass a bogus ``system_prompt`` to the model.""" + workflow = _make_workflow_with_block( + ignore_workflow_system_prompt=False, + workflow_system_prompt=None, # no prompt at all + ) + captured = _run_ai_extract(monkeypatch, workflow=workflow, current_label="extract1") + + assert captured["llm_system_prompt"] is None + assert captured["llm_system_prompt"] is not SYSTEM_PROMPT_UNSET + assert captured["cache_system_prompt"] is None diff --git a/tests/unit/test_block_workflow_system_prompt_inheritance.py b/tests/unit/test_block_workflow_system_prompt_inheritance.py new file mode 100644 index 000000000..ae6e12302 --- /dev/null +++ b/tests/unit/test_block_workflow_system_prompt_inheritance.py @@ -0,0 +1,489 @@ +"""Tests for workflow-level workflow_system_prompt inheritance into blocks at execution time.""" + +from datetime import datetime, timezone +from unittest.mock import MagicMock + +from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext +from skyvern.forge.sdk.workflow.models.block import ( + FileParserBlock, + PDFParserBlock, + TaskBlock, + TaskV2Block, + TextPromptBlock, +) +from skyvern.forge.sdk.workflow.models.parameter import OutputParameter, ParameterType +from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowDefinition +from skyvern.schemas.workflows import FileType + + +def _make_output_parameter() -> OutputParameter: + now = datetime.now(timezone.utc) + return OutputParameter( + parameter_type=ParameterType.OUTPUT, + key="task1_output", + description="test output", + output_parameter_id="op_task1", + workflow_id="w_test", + created_at=now, + modified_at=now, + ) + + +def _make_task_block(workflow_system_prompt: str | None = None) -> TaskBlock: + return TaskBlock( + label="task1", + output_parameter=_make_output_parameter(), + title="task title", + workflow_system_prompt=workflow_system_prompt, + ) + + +def _make_task_v2_block(workflow_system_prompt: str | None = None) -> TaskV2Block: + return TaskV2Block( + label="task1", + output_parameter=_make_output_parameter(), + prompt="user goal", + workflow_system_prompt=workflow_system_prompt, + ) + + +def _make_workflow(workflow_system_prompt: str | None) -> Workflow: + workflow_definition = WorkflowDefinition( + parameters=[], + blocks=[], + workflow_system_prompt=workflow_system_prompt, + ) + now = datetime.now(timezone.utc) + return Workflow( + workflow_id="w_test", + organization_id="o_test", + title="test", + workflow_permanent_id="wpid_test", + version=1, + is_saved_task=False, + workflow_definition=workflow_definition, + created_at=now, + modified_at=now, + ) + + +def _make_workflow_run_context( + workflow_system_prompt: str | None, + inherited_workflow_system_prompt: str | None = None, +) -> WorkflowRunContext: + ctx = WorkflowRunContext( + workflow_title="test", + workflow_id="w_test", + workflow_permanent_id="wpid_test", + workflow_run_id="wr_test", + aws_client=MagicMock(), + workflow=_make_workflow(workflow_system_prompt), + inherited_workflow_system_prompt=inherited_workflow_system_prompt, + ) + return ctx + + +class TestTaskBlockSystemPromptInheritance: + def test_block_inherits_workflow_prompt_when_none(self) -> None: + block = _make_task_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context("Never guess. If unsure, say UNKNOWN.") + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt == "Never guess. If unsure, say UNKNOWN." + + def test_both_none_stays_none(self) -> None: + block = _make_task_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context(workflow_system_prompt=None) + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt is None + + def test_jinja_substitution_resolves_workflow_parameters(self) -> None: + """Global system prompt should support Jinja substitution against workflow parameters.""" + block = _make_task_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context("Respond in the style of {{ style }}.") + ctx.values["style"] = "a formal English butler" + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt == "Respond in the style of a formal English butler." + + +class TestTaskV2BlockSystemPromptInheritance: + def test_block_inherits_workflow_prompt_when_none(self) -> None: + block = _make_task_v2_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context("Never guess.") + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt == "Never guess." + + def test_both_none_stays_none(self) -> None: + block = _make_task_v2_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context(workflow_system_prompt=None) + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt is None + + +def _make_text_prompt_block(workflow_system_prompt: str | None = None) -> TextPromptBlock: + return TextPromptBlock( + label="prompt1", + output_parameter=_make_output_parameter(), + prompt="what is 2 + 2?", + workflow_system_prompt=workflow_system_prompt, + ) + + +def _make_file_parser_block(workflow_system_prompt: str | None = None) -> FileParserBlock: + return FileParserBlock( + label="fileparser1", + output_parameter=_make_output_parameter(), + file_url="https://example.com/file.csv", + file_type=FileType.CSV, + workflow_system_prompt=workflow_system_prompt, + ) + + +def _make_pdf_parser_block(workflow_system_prompt: str | None = None) -> PDFParserBlock: + return PDFParserBlock( + label="pdfparser1", + output_parameter=_make_output_parameter(), + file_url="https://example.com/file.pdf", + workflow_system_prompt=workflow_system_prompt, + ) + + +class TestTextPromptBlockSystemPromptInheritance: + def test_block_inherits_workflow_prompt_when_none(self) -> None: + block = _make_text_prompt_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context("Answer only in Spanish.") + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt == "Answer only in Spanish." + + def test_both_none_stays_none(self) -> None: + block = _make_text_prompt_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context(workflow_system_prompt=None) + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt is None + + def test_jinja_substitution_resolves_workflow_parameters(self) -> None: + block = _make_text_prompt_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context("Respond in the style of {{ style }}.") + ctx.values["style"] = "a pirate" + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt == "Respond in the style of a pirate." + + +class TestFileParserBlockSystemPromptInheritance: + def test_block_inherits_workflow_prompt_when_none(self) -> None: + block = _make_file_parser_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context("Only respond with structured data.") + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt == "Only respond with structured data." + + def test_both_none_stays_none(self) -> None: + block = _make_file_parser_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context(workflow_system_prompt=None) + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt is None + + +class TestPDFParserBlockSystemPromptInheritance: + def test_block_inherits_workflow_prompt_when_none(self) -> None: + block = _make_pdf_parser_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context("Summarize in English.") + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt == "Summarize in English." + + def test_both_none_stays_none(self) -> None: + block = _make_pdf_parser_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context(workflow_system_prompt=None) + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt is None + + +class TestChildWorkflowInheritsParentWorkflowSystemPrompt: + """SKY-9147: parent workflow_trigger workflow_system_prompt must flow into child blocks.""" + + def test_child_inherits_parent_when_child_unset(self) -> None: + block = _make_task_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context( + workflow_system_prompt=None, + inherited_workflow_system_prompt="Omit the word 'not'.", + ) + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt == "Omit the word 'not'." + + def test_child_concatenates_parent_and_own_prompt(self) -> None: + block = _make_task_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context( + workflow_system_prompt="Respond in French.", + inherited_workflow_system_prompt="Omit the word 'not'.", + ) + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt == "Omit the word 'not'.\n\nRespond in French." + + def test_ignore_flag_drops_both_inherited_and_own(self) -> None: + block = _make_task_block(workflow_system_prompt=None) + block.ignore_workflow_system_prompt = True + ctx = _make_workflow_run_context( + workflow_system_prompt="Respond in French.", + inherited_workflow_system_prompt="Omit the word 'not'.", + ) + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt is None + + def test_inherited_only_with_no_workflow_attached(self) -> None: + """Child context may carry inherited rules even when workflow is not hydrated.""" + block = _make_text_prompt_block(workflow_system_prompt=None) + ctx = WorkflowRunContext( + workflow_title="child", + workflow_id="w_child", + workflow_permanent_id="wpid_child", + workflow_run_id="wr_child", + aws_client=MagicMock(), + workflow=None, + inherited_workflow_system_prompt="Be concise.", + ) + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt == "Be concise." + + +class TestIgnoreWorkflowSystemPromptPerBlock: + """SKY-9147: per-block opt-out short-circuits inheritance across every LLM-consuming block.""" + + def test_task_block_opt_out_skips_workflow_prompt(self) -> None: + block = _make_task_block(workflow_system_prompt=None) + block.ignore_workflow_system_prompt = True + ctx = _make_workflow_run_context("Be concise.") + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt is None + + def test_task_v2_block_opt_out_skips_workflow_prompt(self) -> None: + block = _make_task_v2_block(workflow_system_prompt=None) + block.ignore_workflow_system_prompt = True + ctx = _make_workflow_run_context("Be concise.") + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt is None + + def test_text_prompt_block_opt_out_skips_workflow_prompt(self) -> None: + block = _make_text_prompt_block(workflow_system_prompt=None) + block.ignore_workflow_system_prompt = True + ctx = _make_workflow_run_context("Answer only in Spanish.") + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt is None + + def test_file_parser_block_opt_out_skips_workflow_prompt(self) -> None: + block = _make_file_parser_block(workflow_system_prompt=None) + block.ignore_workflow_system_prompt = True + ctx = _make_workflow_run_context("Only respond with structured data.") + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt is None + + def test_pdf_parser_block_opt_out_skips_workflow_prompt(self) -> None: + block = _make_pdf_parser_block(workflow_system_prompt=None) + block.ignore_workflow_system_prompt = True + ctx = _make_workflow_run_context("Summarize in English.") + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt is None + + def test_opt_out_default_false_preserves_inheritance(self) -> None: + """Regression guard: omitting the field leaves default inheritance behavior intact.""" + block = _make_task_block(workflow_system_prompt=None) + assert block.ignore_workflow_system_prompt is False + ctx = _make_workflow_run_context("Be concise.") + + block.format_potential_template_parameters(ctx) + + assert block.workflow_system_prompt == "Be concise." + + +class TestWorkflowTriggerPersistsOptOutOnChildRun: + """SKY-9147: the trigger-block flag is persisted on the spawned child's + workflow_run row so both sync and async (Temporal-dispatched) child + executions honor it uniformly when they read their own row. + """ + + def test_workflow_run_has_skip_inherited_field(self) -> None: + """WorkflowRun Pydantic model carries the persisted flag.""" + now = datetime.now(timezone.utc) + from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun, WorkflowRunStatus + + run = WorkflowRun( + workflow_run_id="wr_child", + workflow_id="w_child", + workflow_permanent_id="wpid_child", + organization_id="o_test", + status=WorkflowRunStatus.created, + created_at=now, + modified_at=now, + ignore_inherited_workflow_system_prompt=True, + ) + + assert run.ignore_inherited_workflow_system_prompt is True + + def test_workflow_run_defaults_false(self) -> None: + """Existing code paths that omit the field continue to inherit.""" + now = datetime.now(timezone.utc) + from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun, WorkflowRunStatus + + run = WorkflowRun( + workflow_run_id="wr_child", + workflow_id="w_child", + workflow_permanent_id="wpid_child", + organization_id="o_test", + status=WorkflowRunStatus.created, + created_at=now, + modified_at=now, + ) + + assert run.ignore_inherited_workflow_system_prompt is False + + +class TestWorkflowDefinitionYAMLRoundTrip: + def test_workflow_system_prompt_survives_yaml_roundtrip(self) -> None: + """Regression guard: workflow_system_prompt must roundtrip through WorkflowCreateYAMLRequest.""" + from skyvern.schemas.workflows import WorkflowCreateYAMLRequest + + payload = { + "title": "test", + "workflow_definition": { + "version": 1, + "parameters": [], + "blocks": [], + "workflow_system_prompt": "Never guess.", + }, + } + request = WorkflowCreateYAMLRequest.model_validate(payload) + assert request.workflow_definition.workflow_system_prompt == "Never guess." + + +class TestBlockWorkflowSystemPromptNotSerialized: + """The runtime cache must not leak through ``model_dump`` or JSON + serialization — it's a per-run transient, not part of the block's + authored shape.""" + + def test_unset_field_absent_from_model_dump(self) -> None: + block = _make_task_block(workflow_system_prompt=None) + assert "workflow_system_prompt" not in block.model_dump() + + def test_resolved_runtime_value_absent_from_model_dump(self) -> None: + block = _make_task_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context("Never guess.") + block.format_potential_template_parameters(ctx) + + # Invariant confirmed at runtime: inheritance populated the cache… + assert block.workflow_system_prompt == "Never guess." + # …but it still doesn't escape via serialization. + dumped = block.model_dump() + assert "workflow_system_prompt" not in dumped + assert "Never guess." not in block.model_dump_json() + + +class TestBlockWorkflowSystemPromptRecordedOnContext: + """``Block._apply_workflow_system_prompt`` records its decision on the + ``WorkflowRunContext`` so both the agent path (which uses the block's + own ``workflow_system_prompt`` field) and the script path (which reads + from the context cache via ``ai_extract``) see the same value — single + source of truth for the opt-out (SKY-9147).""" + + def test_non_opted_out_block_records_resolved_value(self) -> None: + block = _make_task_block(workflow_system_prompt=None) + ctx = _make_workflow_run_context("Answer only in Spanish.") + + block.format_potential_template_parameters(ctx) + + recorded, value = ctx.get_block_workflow_system_prompt(block.label) + assert recorded is True + assert value == "Answer only in Spanish." + + def test_opted_out_block_records_none(self) -> None: + block = TaskBlock( + label="task1", + output_parameter=_make_output_parameter(), + title="task title", + ignore_workflow_system_prompt=True, + ) + ctx = _make_workflow_run_context("WORKFLOW RULES.") + + block.format_potential_template_parameters(ctx) + + # Recorded explicitly so ``ai_extract`` reads ``None`` (opt-out) rather + # than falling through to ``resolve_effective_workflow_system_prompt``. + recorded, value = ctx.get_block_workflow_system_prompt(block.label) + assert recorded is True + assert value is None + + def test_unknown_label_returns_not_recorded(self) -> None: + ctx = _make_workflow_run_context("whatever") + recorded, value = ctx.get_block_workflow_system_prompt("never-seen") + assert recorded is False + assert value is None + + +class TestResolveEffectiveWorkflowSystemPromptRejectsNonString: + """``resolve_effective_workflow_system_prompt`` must treat a non-string + ``workflow_system_prompt`` as absent rather than passing it to Jinja. + + Regression: a malformed workflow definition (or a test fixture whose + attribute access returns a ``MagicMock``) previously flowed a non-str + into ``env.from_string`` and exploded with ``Can't compile non template + nodes`` from deep inside the template compiler.""" + + def test_magicmock_workflow_definition_yields_none(self) -> None: + ctx = _make_workflow_run_context(workflow_system_prompt=None) + mock_workflow = MagicMock() + # Attribute access on a MagicMock returns another truthy MagicMock, + # mirroring what ``get_workflow_by_permanent_id`` returns in tests + # that don't set up a real Workflow. + ctx.set_workflow(mock_workflow) + + assert ctx.resolve_effective_workflow_system_prompt() is None + + def test_non_string_inherited_prompt_yields_none(self) -> None: + ctx = WorkflowRunContext( + workflow_title="test", + workflow_id="w_test", + workflow_permanent_id="wpid_test", + workflow_run_id="wr_test", + aws_client=MagicMock(), + inherited_workflow_system_prompt=MagicMock(), # non-str + ) + + assert ctx.resolve_effective_workflow_system_prompt() is None diff --git a/tests/unit/test_collect_inherited_workflow_system_prompt.py b/tests/unit/test_collect_inherited_workflow_system_prompt.py new file mode 100644 index 000000000..7d02a60ba --- /dev/null +++ b/tests/unit/test_collect_inherited_workflow_system_prompt.py @@ -0,0 +1,270 @@ +"""Unit tests for ``WorkflowService._collect_inherited_workflow_system_prompt``. + +The helper walks the parent workflow-run chain via +``app.DATABASE.workflow_runs.get_workflow_run`` + ``WorkflowService.get_workflow`` +and joins each ancestor's raw ``workflow_system_prompt`` outermost-first. These +tests pin the chain-break-on-opt-out, cycle detection, outermost-first ordering, +and depth-cap semantics described in the helper's docstring (SKY-9147). +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from unittest.mock import AsyncMock + +import pytest + +from skyvern.forge import app +from skyvern.forge.sdk.workflow.models.block import WorkflowTriggerBlock +from skyvern.forge.sdk.workflow.models.workflow import ( + Workflow, + WorkflowDefinition, + WorkflowRun, + WorkflowRunStatus, +) +from skyvern.forge.sdk.workflow.service import WorkflowService + + +def _make_workflow(workflow_id: str, workflow_system_prompt: str | None) -> Workflow: + now = datetime.now(timezone.utc) + return Workflow( + workflow_id=workflow_id, + organization_id="o_test", + title=f"workflow {workflow_id}", + workflow_permanent_id=f"wpid_{workflow_id}", + version=1, + is_saved_task=False, + workflow_definition=WorkflowDefinition( + parameters=[], + blocks=[], + workflow_system_prompt=workflow_system_prompt, + ), + created_at=now, + modified_at=now, + ) + + +def _make_run( + *, + run_id: str, + workflow_id: str, + parent_run_id: str | None, + skip_inherited: bool = False, +) -> WorkflowRun: + now = datetime.now(timezone.utc) + return WorkflowRun( + workflow_run_id=run_id, + workflow_id=workflow_id, + workflow_permanent_id=f"wpid_{workflow_id}", + organization_id="o_test", + status=WorkflowRunStatus.running, + parent_workflow_run_id=parent_run_id, + ignore_inherited_workflow_system_prompt=skip_inherited, + created_at=now, + modified_at=now, + ) + + +def _install_chain( + monkeypatch: pytest.MonkeyPatch, + runs: dict[str, WorkflowRun], + workflows: dict[str, Workflow], +) -> tuple[AsyncMock, AsyncMock]: + """Wire mocked ``get_workflow_run`` + ``get_workflow`` against the provided + dicts. Returns both mocks so tests can assert call counts.""" + + get_run = AsyncMock(side_effect=lambda run_id: runs.get(run_id)) + monkeypatch.setattr(app.DATABASE.workflow_runs, "get_workflow_run", get_run) + + get_workflow = AsyncMock(side_effect=lambda workflow_id: workflows.get(workflow_id)) + monkeypatch.setattr(WorkflowService, "get_workflow", get_workflow) + + return get_run, get_workflow + + +@pytest.mark.asyncio +async def test_returns_none_when_parent_run_id_is_none(monkeypatch: pytest.MonkeyPatch) -> None: + _install_chain(monkeypatch, runs={}, workflows={}) + + result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id=None) + + assert result is None + + +@pytest.mark.asyncio +async def test_returns_none_when_no_ancestor_has_prompt(monkeypatch: pytest.MonkeyPatch) -> None: + """Chain exists but every ancestor's ``workflow_system_prompt`` is None/empty.""" + runs = { + "wr_parent": _make_run(run_id="wr_parent", workflow_id="w_parent", parent_run_id="wr_grandparent"), + "wr_grandparent": _make_run(run_id="wr_grandparent", workflow_id="w_grandparent", parent_run_id=None), + } + workflows = { + "w_parent": _make_workflow("w_parent", workflow_system_prompt=None), + "w_grandparent": _make_workflow("w_grandparent", workflow_system_prompt=""), + } + _install_chain(monkeypatch, runs, workflows) + + result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent") + + assert result is None + + +@pytest.mark.asyncio +async def test_single_ancestor_with_prompt(monkeypatch: pytest.MonkeyPatch) -> None: + runs = {"wr_parent": _make_run(run_id="wr_parent", workflow_id="w_parent", parent_run_id=None)} + workflows = {"w_parent": _make_workflow("w_parent", workflow_system_prompt="Respond in English.")} + _install_chain(monkeypatch, runs, workflows) + + result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent") + + assert result == "Respond in English." + + +@pytest.mark.asyncio +async def test_outermost_first_ordering(monkeypatch: pytest.MonkeyPatch) -> None: + """Chain: great-grandparent -> grandparent -> parent -> (child). The helper + walks up bottom-up starting from the parent; the returned string must join + outermost-first (great-grandparent before grandparent before parent).""" + runs = { + "wr_parent": _make_run(run_id="wr_parent", workflow_id="w_parent", parent_run_id="wr_grand"), + "wr_grand": _make_run(run_id="wr_grand", workflow_id="w_grand", parent_run_id="wr_great"), + "wr_great": _make_run(run_id="wr_great", workflow_id="w_great", parent_run_id=None), + } + workflows = { + "w_parent": _make_workflow("w_parent", workflow_system_prompt="PARENT rule."), + "w_grand": _make_workflow("w_grand", workflow_system_prompt="GRAND rule."), + "w_great": _make_workflow("w_great", workflow_system_prompt="GREAT rule."), + } + _install_chain(monkeypatch, runs, workflows) + + result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent") + + assert result == "GREAT rule.\n\nGRAND rule.\n\nPARENT rule." + + +@pytest.mark.asyncio +async def test_chain_break_includes_opted_out_ancestors_own_prompt(monkeypatch: pytest.MonkeyPatch) -> None: + """When an ancestor has ``ignore_inherited_workflow_system_prompt=True``, its own + prompt is still collected but traversal stops — the grandparent's rules must + NOT appear in the result (SKY-9147 design: an opted-out workflow rejects its + parents' rules but still propagates its own to descendants).""" + runs = { + "wr_parent": _make_run( + run_id="wr_parent", + workflow_id="w_parent", + parent_run_id="wr_grand", + skip_inherited=True, + ), + "wr_grand": _make_run(run_id="wr_grand", workflow_id="w_grand", parent_run_id=None), + } + workflows = { + "w_parent": _make_workflow("w_parent", workflow_system_prompt="PARENT rule."), + "w_grand": _make_workflow("w_grand", workflow_system_prompt="GRAND rule."), + } + _install_chain(monkeypatch, runs, workflows) + + result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent") + + assert result == "PARENT rule." + + +@pytest.mark.asyncio +async def test_chain_break_opted_out_ancestor_with_no_own_prompt(monkeypatch: pytest.MonkeyPatch) -> None: + """When the opted-out ancestor has no prompt of its own and its ancestors do, + the result is None — traversal still stops at the opted-out boundary.""" + runs = { + "wr_parent": _make_run( + run_id="wr_parent", + workflow_id="w_parent", + parent_run_id="wr_grand", + skip_inherited=True, + ), + "wr_grand": _make_run(run_id="wr_grand", workflow_id="w_grand", parent_run_id=None), + } + workflows = { + "w_parent": _make_workflow("w_parent", workflow_system_prompt=None), + "w_grand": _make_workflow("w_grand", workflow_system_prompt="GRAND rule."), + } + _install_chain(monkeypatch, runs, workflows) + + result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent") + + assert result is None + + +@pytest.mark.asyncio +async def test_cycle_detection_breaks_loop(monkeypatch: pytest.MonkeyPatch) -> None: + """A parent chain that points back to an already-visited run must not infinite- + loop. The helper should collect each unique ancestor's prompt exactly once.""" + runs = { + "wr_a": _make_run(run_id="wr_a", workflow_id="w_a", parent_run_id="wr_b"), + "wr_b": _make_run(run_id="wr_b", workflow_id="w_b", parent_run_id="wr_a"), + } + workflows = { + "w_a": _make_workflow("w_a", workflow_system_prompt="A rule."), + "w_b": _make_workflow("w_b", workflow_system_prompt="B rule."), + } + get_run, _ = _install_chain(monkeypatch, runs, workflows) + + result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_a") + + assert result == "B rule.\n\nA rule." + # Each unique run fetched exactly once — no infinite loop. + assert get_run.await_count == 2 + + +@pytest.mark.asyncio +async def test_missing_parent_run_breaks_chain(monkeypatch: pytest.MonkeyPatch) -> None: + """If ``get_workflow_run`` returns None mid-walk (soft-deleted / race), + traversal stops cleanly and returns whatever was collected so far.""" + runs = { + "wr_parent": _make_run(run_id="wr_parent", workflow_id="w_parent", parent_run_id="wr_missing"), + } + workflows = {"w_parent": _make_workflow("w_parent", workflow_system_prompt="PARENT rule.")} + _install_chain(monkeypatch, runs, workflows) + + result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent") + + assert result == "PARENT rule." + + +@pytest.mark.asyncio +async def test_missing_parent_workflow_skips_that_ancestor(monkeypatch: pytest.MonkeyPatch) -> None: + """If ``get_workflow`` returns None (deleted definition) the traversal still + continues up the chain, silently skipping that level rather than aborting.""" + runs = { + "wr_parent": _make_run(run_id="wr_parent", workflow_id="w_parent_missing", parent_run_id="wr_grand"), + "wr_grand": _make_run(run_id="wr_grand", workflow_id="w_grand", parent_run_id=None), + } + workflows = {"w_grand": _make_workflow("w_grand", workflow_system_prompt="GRAND rule.")} + _install_chain(monkeypatch, runs, workflows) + + result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_parent") + + assert result == "GRAND rule." + + +@pytest.mark.asyncio +async def test_depth_cap_bounds_traversal(monkeypatch: pytest.MonkeyPatch) -> None: + """A malformed deep chain must stop at ``MAX_TRIGGER_DEPTH`` — the helper + collects ancestors up to the cap and returns those without hanging.""" + depth = WorkflowTriggerBlock.MAX_TRIGGER_DEPTH + 5 + runs: dict[str, WorkflowRun] = {} + workflows: dict[str, Workflow] = {} + for i in range(depth): + run_id = f"wr_{i}" + workflow_id = f"w_{i}" + parent_id = f"wr_{i + 1}" if i + 1 < depth else None + runs[run_id] = _make_run(run_id=run_id, workflow_id=workflow_id, parent_run_id=parent_id) + workflows[workflow_id] = _make_workflow(workflow_id, workflow_system_prompt=f"rule {i}.") + + get_run, _ = _install_chain(monkeypatch, runs, workflows) + + result = await WorkflowService()._collect_inherited_workflow_system_prompt(parent_workflow_run_id="wr_0") + + # Exactly MAX_TRIGGER_DEPTH ancestors fetched; deeper ones dropped. + assert get_run.await_count == WorkflowTriggerBlock.MAX_TRIGGER_DEPTH + assert result is not None + # Parts are "rule 0." … "rule (cap-1)." joined outermost-first. + expected_parts = [f"rule {i}." for i in reversed(range(WorkflowTriggerBlock.MAX_TRIGGER_DEPTH))] + assert result == "\n\n".join(expected_parts) diff --git a/tests/unit/test_data_extraction_summary_schema_cap.py b/tests/unit/test_data_extraction_summary_schema_cap.py index 8da4cba8c..312d721d0 100644 --- a/tests/unit/test_data_extraction_summary_schema_cap.py +++ b/tests/unit/test_data_extraction_summary_schema_cap.py @@ -18,7 +18,7 @@ def _run_create_extract_action(monkeypatch, extracted_information_schema): captured.update(kwargs) return original_load_prompt(template_name, **kwargs) - async def fake_handler(*, prompt, step, prompt_name): + async def fake_handler(*, prompt, step, prompt_name, **_ignored): captured["prompt"] = prompt return {"summary": "ok"} diff --git a/tests/unit/test_downloaded_files_artifact_urls.py b/tests/unit/test_downloaded_files_artifact_urls.py new file mode 100644 index 000000000..c59904f1e --- /dev/null +++ b/tests/unit/test_downloaded_files_artifact_urls.py @@ -0,0 +1,493 @@ +"""Tests for downloaded_files migration to short artifact URLs (SKY-8861).""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from skyvern.forge.sdk.artifact.manager import ArtifactManager +from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType +from skyvern.forge.sdk.artifact.storage.s3 import S3Storage + + +@pytest.mark.asyncio +async def test_create_download_artifact_is_idempotent_per_run_and_uri(): + """A repeat save (e.g. inside a loop) must return the existing artifact_id so + downstream URL-based dedup (``loop_download_filter``) keeps seeing a stable URL. + """ + manager = ArtifactManager() + + existing = Artifact( + artifact_id="a_existing", + artifact_type=ArtifactType.DOWNLOAD, + uri="s3://skyvern-uploads/downloads/local/o_1/wr_1/file.pdf", + organization_id="o_1", + run_id="wr_1", + workflow_run_id="wr_1", + created_at="2026-04-23T00:00:00Z", + modified_at="2026-04-23T00:00:00Z", + ) + find_existing = AsyncMock(return_value=existing) + mock_db_create = AsyncMock() + + with ( + patch( + "skyvern.forge.sdk.artifact.manager.app.DATABASE.artifacts.find_download_artifact", + find_existing, + ), + patch( + "skyvern.forge.sdk.artifact.manager.app.DATABASE.artifacts.create_artifact", + mock_db_create, + ), + ): + artifact_id = await manager.create_download_artifact( + organization_id="o_1", + run_id="wr_1", + workflow_run_id="wr_1", + uri=existing.uri, + filename="file.pdf", + ) + + assert artifact_id == "a_existing" + mock_db_create.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_create_download_artifact_inserts_row_without_uploading(): + """create_download_artifact only writes a DB row; bytes are already in S3.""" + manager = ArtifactManager() + + mock_db_create = AsyncMock( + return_value=Artifact( + artifact_id="a_abc123", + artifact_type=ArtifactType.DOWNLOAD, + uri="s3://skyvern-uploads/download/prod/o_1/wr_1/file.pdf", + organization_id="o_1", + run_id="wr_1", + workflow_run_id="wr_1", + created_at="2026-04-23T00:00:00Z", + modified_at="2026-04-23T00:00:00Z", + ) + ) + mock_store = AsyncMock() + find_existing = AsyncMock(return_value=None) + + with ( + patch( + "skyvern.forge.sdk.artifact.manager.app.DATABASE.artifacts.find_download_artifact", + find_existing, + ), + patch("skyvern.forge.sdk.artifact.manager.app.DATABASE.artifacts.create_artifact", mock_db_create), + patch("skyvern.forge.sdk.artifact.manager.app.STORAGE.store_artifact", mock_store), + patch("skyvern.forge.sdk.artifact.manager.app.STORAGE.store_artifact_from_path", mock_store), + ): + artifact_id = await manager.create_download_artifact( + organization_id="o_1", + run_id="wr_1", + workflow_run_id="wr_1", + uri="s3://skyvern-uploads/download/prod/o_1/wr_1/file.pdf", + filename="file.pdf", + ) + + assert artifact_id.startswith("a_") + mock_db_create.assert_awaited_once() + _, kwargs = mock_db_create.call_args + assert kwargs["artifact_type"] == ArtifactType.DOWNLOAD + assert kwargs["uri"] == "s3://skyvern-uploads/download/prod/o_1/wr_1/file.pdf" + assert kwargs["organization_id"] == "o_1" + assert kwargs["run_id"] == "wr_1" + assert kwargs["workflow_run_id"] == "wr_1" + mock_store.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_save_downloaded_files_registers_artifact_per_file(tmp_path): + """After uploading each file to S3, save_downloaded_files should create an + Artifact row so later retrieval can build short /v1/artifacts URLs.""" + download_dir = tmp_path / "downloads" + download_dir.mkdir() + (download_dir / "invoice.pdf").write_bytes(b"%PDF-1.4 ...") + (download_dir / "report.csv").write_bytes(b"a,b,c\n1,2,3\n") + + storage = S3Storage() + storage.async_client = MagicMock() + storage.async_client.upload_file_from_path = AsyncMock() + + mock_create_download = AsyncMock(return_value="a_new") + mock_artifact_manager = MagicMock() + mock_artifact_manager.create_download_artifact = mock_create_download + + with ( + patch("skyvern.forge.sdk.artifact.storage.s3.get_download_dir", return_value=str(download_dir)), + patch.object(storage, "_get_storage_class_for_org", new=AsyncMock(return_value=MagicMock())), + patch("skyvern.forge.sdk.artifact.storage.s3.calculate_sha256_for_file", return_value="sha-xyz"), + patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module, + ): + app_module.ARTIFACT_MANAGER = mock_artifact_manager + await storage.save_downloaded_files(organization_id="o_1", run_id="wr_1") + + assert mock_create_download.await_count == 2 + uris = {call.kwargs["uri"] for call in mock_create_download.await_args_list} + filenames = {call.kwargs["filename"] for call in mock_create_download.await_args_list} + assert filenames == {"invoice.pdf", "report.csv"} + assert all(u.startswith("s3://") and "/downloads/" in u and "/o_1/wr_1/" in u for u in uris) + for call in mock_create_download.await_args_list: + assert call.kwargs["organization_id"] == "o_1" + assert call.kwargs["run_id"] == "wr_1" + + +def _make_artifact( + artifact_id: str, + uri: str, + run_id: str = "wr_1", + *, + checksum: str | None = None, + created_at: str = "2026-04-23T00:00:00Z", +) -> Artifact: + return Artifact( + artifact_id=artifact_id, + artifact_type=ArtifactType.DOWNLOAD, + uri=uri, + organization_id="o_1", + run_id=run_id, + workflow_run_id=run_id if run_id.startswith("wr_") else None, + checksum=checksum, + created_at=created_at, + modified_at=created_at, + ) + + +_DUMMY_KEYRING_JSON = '{"current_kid": "k1", "keys": {"k1": {"secret": "0000000000000000000000000000000000000000000000000000000000000000"}}}' + + +@pytest.fixture +def keyring_configured(): + """Simulate cloud-style config: HMAC keyring is set so the artifact URL branch is active. + Unit tests default to no keyring to match the OSS default, so tests that exercise the + short-URL path must opt in.""" + from skyvern.config import settings + + with patch.object(settings, "ARTIFACT_CONTENT_HMAC_KEYRING", _DUMMY_KEYRING_JSON): + yield + + +@pytest.mark.asyncio +async def test_get_downloaded_files_uses_artifact_urls_when_rows_exist(keyring_configured): + """When DOWNLOAD artifact rows exist, retrieval skips S3 entirely: + URL, checksum, filename, modified_at all come straight from the row.""" + storage = S3Storage() + storage.async_client = MagicMock() + storage.async_client.list_files = AsyncMock() # must NOT be called + storage.async_client.get_file_metadata = AsyncMock() # must NOT be called + storage.async_client.create_presigned_urls = AsyncMock() # must NOT be called + + artifact = _make_artifact( + "a_42", + "s3://skyvern-uploads/downloads/local/o_1/wr_1/invoice.pdf", + checksum="sha-from-db", + ) + mock_list = AsyncMock(return_value=[artifact]) + build_url = MagicMock(return_value="https://api.skyvern.com/v1/artifacts/a_42/content?expiry=x&kid=y&sig=z") + + with patch("skyvern.forge.sdk.artifact.storage.base.app") as base_app: + with patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app: + s3_app.DATABASE.artifacts.list_artifacts_for_run_by_type = mock_list + base_app.ARTIFACT_MANAGER.build_signed_content_url = build_url + result = await storage.get_downloaded_files(organization_id="o_1", run_id="wr_1") + + assert len(result) == 1 + assert result[0].url.startswith("https://api.skyvern.com/v1/artifacts/a_42/content") + assert result[0].filename == "invoice.pdf" + assert result[0].checksum == "sha-from-db" + assert result[0].modified_at is not None + storage.async_client.list_files.assert_not_awaited() + storage.async_client.get_file_metadata.assert_not_awaited() + storage.async_client.create_presigned_urls.assert_not_awaited() + mock_list.assert_awaited_once_with(run_id="wr_1", organization_id="o_1", artifact_type=ArtifactType.DOWNLOAD) + + +@pytest.mark.asyncio +async def test_get_downloaded_files_preserves_artifact_row_order(keyring_configured): + """Artifact rows are returned ASC by created_at; FileInfo list must follow the + same order (matches save order, drives loop_download_filter signatures).""" + storage = S3Storage() + storage.async_client = MagicMock() + + first = _make_artifact( + "a_1", + "s3://skyvern-uploads/downloads/local/o_1/wr_1/first.pdf", + created_at="2026-04-23T00:00:00Z", + ) + second = _make_artifact( + "a_2", + "s3://skyvern-uploads/downloads/local/o_1/wr_1/second.pdf", + created_at="2026-04-23T00:01:00Z", + ) + mock_list = AsyncMock(return_value=[first, second]) + build_url = MagicMock( + side_effect=lambda artifact_id, **_: f"https://api.skyvern.com/v1/artifacts/{artifact_id}/content" + ) + + with patch("skyvern.forge.sdk.artifact.storage.base.app") as base_app: + with patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app: + s3_app.DATABASE.artifacts.list_artifacts_for_run_by_type = mock_list + base_app.ARTIFACT_MANAGER.build_signed_content_url = build_url + result = await storage.get_downloaded_files(organization_id="o_1", run_id="wr_1") + + assert [fi.filename for fi in result] == ["first.pdf", "second.pdf"] + + +@pytest.mark.asyncio +async def test_get_downloaded_files_falls_back_to_presigned_for_legacy_runs(keyring_configured): + """Production-cloud legacy run: keyring IS configured, but the run pre-dates SKY-8861 + so no artifact rows exist. Files in S3 must still surface as presigned URLs — the + whole point of keeping the fallback path.""" + storage = S3Storage() + storage.async_client = MagicMock() + s3_key = "downloads/local/o_1/wr_old/legacy.pdf" + storage.async_client.list_files = AsyncMock(return_value=[s3_key]) + storage.async_client.get_file_metadata = AsyncMock( + return_value={"sha256_checksum": "sha-old", "original_filename": "legacy.pdf"} + ) + storage.async_client.create_presigned_urls = AsyncMock( + return_value=["https://skyvern-uploads.s3.amazonaws.com/...?sig=old"] + ) + + mock_list = AsyncMock(return_value=[]) # no artifact rows for this legacy run + build_url = MagicMock() # must NOT be called + + with patch("skyvern.forge.sdk.artifact.storage.base.app") as base_app: + with patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app: + s3_app.DATABASE.artifacts.list_artifacts_for_run_by_type = mock_list + base_app.ARTIFACT_MANAGER.build_signed_content_url = build_url + result = await storage.get_downloaded_files(organization_id="o_1", run_id="wr_old") + + assert len(result) == 1 + assert result[0].filename == "legacy.pdf" + assert result[0].checksum == "sha-old" + assert "s3.amazonaws.com" in result[0].url # nosemgrep: incomplete-url-substring-sanitization + build_url.assert_not_called() + storage.async_client.list_files.assert_awaited_once() + storage.async_client.create_presigned_urls.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_get_downloaded_files_falls_back_to_presigned_when_keyring_unset(tmp_path): + """Self-hosted OSS deployments without ARTIFACT_CONTENT_HMAC_KEYRING must keep + serving presigned S3 URLs — the short Skyvern URL would be unsigned and the + content endpoint would 401 without an API key.""" + from skyvern.config import settings + + storage = S3Storage() + storage.async_client = MagicMock() + s3_key = "downloads/local/o_1/wr_1/invoice.pdf" + storage.async_client.list_files = AsyncMock(return_value=[s3_key]) + storage.async_client.get_file_metadata = AsyncMock( + return_value={"sha256_checksum": "sha-abc", "original_filename": "invoice.pdf"} + ) + storage.async_client.create_presigned_urls = AsyncMock( + return_value=["https://skyvern-uploads.s3.amazonaws.com/...?sig=fallback"] + ) + + artifact = _make_artifact("a_42", f"s3://skyvern-uploads/{s3_key}") + mock_list = AsyncMock(return_value=[artifact]) + build_url = MagicMock() + + with ( + patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module, + patch.object(settings, "ARTIFACT_CONTENT_HMAC_KEYRING", None), + ): + app_module.DATABASE.artifacts.list_artifacts_for_run_by_type = mock_list + app_module.ARTIFACT_MANAGER.build_signed_content_url = build_url + result = await storage.get_downloaded_files(organization_id="o_1", run_id="wr_1") + + assert len(result) == 1 + assert "s3.amazonaws.com" in result[0].url # nosemgrep: incomplete-url-substring-sanitization + build_url.assert_not_called() + + +@pytest.mark.asyncio +async def test_get_downloaded_files_artifact_lookup_failure_falls_back_to_listing(keyring_configured): + """If the DB lookup raises (transient outage), retrieval must not 500 the + run-output API — fall through to the legacy S3-listing path so files + still surface as presigned URLs.""" + storage = S3Storage() + storage.async_client = MagicMock() + s3_key = "downloads/local/o_1/wr_1/recoverable.pdf" + storage.async_client.list_files = AsyncMock(return_value=[s3_key]) + storage.async_client.get_file_metadata = AsyncMock( + return_value={"sha256_checksum": "sha-recover", "original_filename": "recoverable.pdf"} + ) + storage.async_client.create_presigned_urls = AsyncMock( + return_value=["https://skyvern-uploads.s3.amazonaws.com/...?sig=fallback"] + ) + + mock_list = AsyncMock(side_effect=RuntimeError("DB unreachable")) + + with patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module: + app_module.DATABASE.artifacts.list_artifacts_for_run_by_type = mock_list + result = await storage.get_downloaded_files(organization_id="o_1", run_id="wr_1") + + assert len(result) == 1 + assert "s3.amazonaws.com" in result[0].url # nosemgrep: incomplete-url-substring-sanitization + storage.async_client.list_files.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_content_endpoint_download_returns_attachment_with_filename(): + """DOWNLOAD artifacts must serve with attachment disposition so browsers don't render + PDFs inline (defeats the SKY-8862 XSS-via-PDF mitigation).""" + from skyvern.forge.sdk.routes.agent_protocol import _artifact_response_config + + artifact = _make_artifact("a_dl", "s3://skyvern-uploads/downloads/local/o_1/wr_1/invoice.pdf") + media_type, disposition = _artifact_response_config(artifact) + assert media_type == "application/octet-stream" + assert disposition.startswith("attachment;") + assert 'filename="invoice.pdf"' in disposition + + +def test_content_endpoint_non_download_stays_inline(): + """Existing artifact types keep the inline disposition we had before.""" + from skyvern.forge.sdk.routes.agent_protocol import _artifact_response_config + + screenshot = Artifact( + artifact_id="a_ss", + artifact_type=ArtifactType.SCREENSHOT_FINAL, + uri="s3://skyvern-artifacts/.../final.png", + organization_id="o_1", + created_at="2026-04-23T00:00:00Z", + modified_at="2026-04-23T00:00:00Z", + ) + media_type, disposition = _artifact_response_config(screenshot) + assert media_type == "image/png" + assert disposition == "inline" + + +def test_content_endpoint_download_non_ascii_filename_does_not_crash_header_encoding(): + """Starlette encodes response headers as Latin-1. Unicode filenames must use RFC 5987 + (filename*=UTF-8''...) with an ASCII fallback so the endpoint does not 500.""" + from starlette.responses import Response + + from skyvern.forge.sdk.routes.agent_protocol import _artifact_response_config + + artifact = _make_artifact("a_unicode", "s3://skyvern-uploads/downloads/local/o_1/wr_1/文档.pdf") + media_type, disposition = _artifact_response_config(artifact) + # Must not raise — Starlette's header encoding rejects non-Latin-1 bytes. + Response(content=b"x", media_type=media_type, headers={"Content-Disposition": disposition}) + assert "filename*=UTF-8''" in disposition + assert "%E6%96%87%E6%A1%A3.pdf" in disposition or "%e6%96%87%e6%a1%a3.pdf" in disposition + + +def test_sanitize_header_filename_strips_crlf_and_quotes_directly(): + """Direct unit test on _sanitize_header_filename so we know the function works + even when urlparse isn't part of the chain (defense in depth).""" + from skyvern.forge.sdk.routes.agent_protocol import _sanitize_header_filename + + assert _sanitize_header_filename('evil"pdf') == "evilpdf" + assert _sanitize_header_filename("hello\r\nworld.pdf") == "helloworld.pdf" + assert _sanitize_header_filename("back\\slash.pdf") == "backslash.pdf" + assert _sanitize_header_filename("") == "download" + + +def test_content_endpoint_download_filename_preserves_question_and_hash(): + """S3 keys may legitimately contain '?' or '#'; urlparse would otherwise strip them.""" + from skyvern.forge.sdk.routes.agent_protocol import _artifact_response_config + + artifact = _make_artifact("a_q", "s3://skyvern-uploads/downloads/local/o_1/wr_1/report?v=2#a.pdf") + _, disposition = _artifact_response_config(artifact) + assert "report%3Fv%3D2%23a.pdf" in disposition or "report?v=2#a.pdf" in disposition + + +def test_sanitize_header_filename_strips_bidi_and_format_characters(): + """Unicode bidi overrides and format chars (ZWSP, RLO, ZWNBSP) enable filename + spoofing in the browser's download UI (``invoice\\u202efdp.exe`` -> ``invoice.exe.pdf``).""" + from skyvern.forge.sdk.routes.agent_protocol import _sanitize_header_filename + + assert "\u202e" not in _sanitize_header_filename("invoice\u202efdp.exe") + assert "\u200b" not in _sanitize_header_filename("stealth\u200b.pdf") + assert "\ufeff" not in _sanitize_header_filename("bom\ufeff.pdf") + + +def test_ascii_fallback_filename_preserves_stem_for_pure_unicode_names(): + """Pure non-ASCII names (e.g. CJK, emoji) must not reduce to a bare ``.pdf`` hidden + dotfile after the NFKD strip; fall back to a ``download`` stem instead.""" + from skyvern.forge.sdk.routes.agent_protocol import _ascii_fallback_filename + + assert _ascii_fallback_filename("文档.pdf") == "download.pdf" + assert _ascii_fallback_filename("🎉.pdf") == "download.pdf" + # Accented Latin still transliterates to keep the stem. + assert _ascii_fallback_filename("fïlè.pdf") == "file.pdf" + + +def test_sanitize_header_filename_strips_control_characters(): + """NUL/DEL/C1 control chars are valid Latin-1 bytes but violate RFC 7230 header syntax.""" + from skyvern.forge.sdk.routes.agent_protocol import _sanitize_header_filename + + assert "\x00" not in _sanitize_header_filename("evil\x00.pdf") + assert "\x7f" not in _sanitize_header_filename("evil\x7f.pdf") + assert "\x1b" not in _sanitize_header_filename("evil\x1b.pdf") + assert "\x80" not in _sanitize_header_filename("evil\x80.pdf") + + +@pytest.mark.asyncio +async def test_content_endpoint_sets_nosniff_header_end_to_end(): + """Hit the real route through a FastAPI TestClient and verify the response + actually carries X-Content-Type-Options: nosniff on the DOWNLOAD path. + + Defence-in-depth for SKY-8862: prevents a refactor from silently dropping + the header without this suite noticing.""" + import json + + from fastapi import FastAPI + from fastapi.testclient import TestClient + + from skyvern.config import settings + from skyvern.forge.sdk.artifact.signing import sign_artifact_url + from skyvern.forge.sdk.routes.routers import base_router + + artifact = _make_artifact("a_e2e", "s3://skyvern-uploads/downloads/local/o_1/wr_1/report.pdf") + keyring_json = json.dumps({"current_kid": "k1", "keys": {"k1": {"secret": "0" * 64, "created_at": "2026-04-23"}}}) + + with ( + patch.object(settings, "ARTIFACT_CONTENT_HMAC_KEYRING", keyring_json), + patch.object(settings, "SKYVERN_BASE_URL", "http://testserver"), + patch("skyvern.forge.sdk.routes.agent_protocol.app") as app_module, + ): + app_module.DATABASE.artifacts.get_artifact_by_id_no_org = AsyncMock(return_value=artifact) + app_module.ARTIFACT_MANAGER.retrieve_artifact = AsyncMock(return_value=b"%PDF-1.4 fake body") + + from skyvern.forge.sdk.artifact.signing import parse_keyring + + signed_url = sign_artifact_url( + base_url="http://testserver", + artifact_id=artifact.artifact_id, + keyring=parse_keyring(keyring_json), + artifact_name="report.pdf", + artifact_type="download", + ) + + test_app = FastAPI() + test_app.include_router(base_router, prefix="/v1") + client = TestClient(test_app) + resp = client.get(signed_url.replace("http://testserver", "")) + + assert resp.status_code == 200, resp.text + assert resp.headers.get("X-Content-Type-Options") == "nosniff" + assert resp.headers.get("Content-Disposition", "").startswith("attachment;") + assert resp.content == b"%PDF-1.4 fake body" + + +def test_content_endpoint_download_filename_strips_header_injection(): + """URI-derived filenames go straight into a Content-Disposition header; + CR/LF and raw quotes must be stripped to prevent header injection.""" + from skyvern.forge.sdk.routes.agent_protocol import _artifact_response_config + + artifact = _make_artifact( + "a_bad", + 's3://skyvern-uploads/downloads/local/o_1/wr_1/evil"\r\nSet-Cookie: x=y.pdf', + ) + _, disposition = _artifact_response_config(artifact) + assert "\r" not in disposition + assert "\n" not in disposition + assert disposition.count('"') == 2 # only the pair around filename diff --git a/tests/unit/test_extract_information_text_optout.py b/tests/unit/test_extract_information_text_optout.py index de5abf2b6..a4f49fb78 100644 --- a/tests/unit/test_extract_information_text_optout.py +++ b/tests/unit/test_extract_information_text_optout.py @@ -196,7 +196,7 @@ def _capture_ai_extract_kwargs(monkeypatch, include_extracted_text: bool): async def fake_refresh(*_args, **_kwargs): return None - async def fake_handler(*, prompt, step, screenshots, prompt_name, force_dict): + async def fake_handler(*, prompt, step, screenshots, prompt_name, force_dict, **_ignored): return {} monkeypatch.setattr(module, "load_prompt_with_elements_tracked", fake_load_prompt_with_elements_tracked) @@ -258,7 +258,7 @@ def _capture_ai_extract_kwargs_with_schema(monkeypatch, schema): async def fake_refresh(*_args, **_kwargs): return None - async def fake_handler(*, prompt, step, screenshots, prompt_name, force_dict): + async def fake_handler(*, prompt, step, screenshots, prompt_name, force_dict, **_ignored): return {} monkeypatch.setattr(module, "load_prompt_with_elements_tracked", fake_load_prompt_with_elements_tracked) diff --git a/tests/unit/test_workflow_parameter_validation.py b/tests/unit/test_workflow_parameter_validation.py index 1a68b4eb8..ce0df96d2 100644 --- a/tests/unit/test_workflow_parameter_validation.py +++ b/tests/unit/test_workflow_parameter_validation.py @@ -625,3 +625,40 @@ class TestSanitizeWorkflowYamlWithReferences: goal = result["workflow_definition"]["blocks"][0]["navigation_goal"] assert "{{ block_1 }}" in goal assert "{{ block_1_output }}" in goal + + def test_sanitize_updates_output_references_in_workflow_system_prompt(self) -> None: + """Output references inside the workflow-level workflow_system_prompt must + be rewritten when the referenced block label is sanitized. The global + prompt is resolved through Jinja at execution time, so its references + need the same renaming treatment as block-level fields.""" + workflow_yaml = { + "title": "Test Workflow", + "workflow_definition": { + "parameters": [], + "blocks": [{"label": "my-block", "block_type": "task"}], + "workflow_system_prompt": "Honor {{ my-block_output }} for every downstream block.", + }, + } + result = sanitize_workflow_yaml_with_references(workflow_yaml) + assert result["workflow_definition"]["blocks"][0]["label"] == "my_block" + assert "{{ my_block_output }}" in result["workflow_definition"]["workflow_system_prompt"] + + def test_sanitize_parameter_key_updates_jinja_references_in_workflow_system_prompt(self) -> None: + """Parameter-key references inside the workflow-level workflow_system_prompt + must be rewritten when the parameter key is sanitized.""" + workflow_yaml = { + "title": "Test Workflow", + "workflow_definition": { + "parameters": [ + { + "key": "user-input", + "parameter_type": "workflow", + } + ], + "blocks": [], + "workflow_system_prompt": "Always respond in the style of {{ user-input }}.", + }, + } + result = sanitize_workflow_yaml_with_references(workflow_yaml) + assert result["workflow_definition"]["parameters"][0]["key"] == "user_input" + assert "{{ user_input }}" in result["workflow_definition"]["workflow_system_prompt"] diff --git a/tests/unit/test_workflow_system_prompt_llm_forwarding.py b/tests/unit/test_workflow_system_prompt_llm_forwarding.py new file mode 100644 index 000000000..08e9bdb3a --- /dev/null +++ b/tests/unit/test_workflow_system_prompt_llm_forwarding.py @@ -0,0 +1,387 @@ +"""Regression tests: workflow-level workflow_system_prompt must reach the LLM handler. + +Complements ``test_block_workflow_system_prompt_inheritance.py`` (which asserts +that the workflow prompt flows onto the block model). These tests assert the +next hop — that each block/function that makes an LLM call actually forwards +``system_prompt`` as a kwarg to the handler. Without this, inheritance passes +quietly but the prompt has no effect on output (the bug the user hit on the +extraction path). +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock + +from skyvern.forge.sdk.cache import extraction_cache +from skyvern.forge.sdk.workflow.models.block import FileParserBlock, PDFParserBlock, TextPromptBlock +from skyvern.forge.sdk.workflow.models.parameter import OutputParameter, ParameterType +from skyvern.schemas.workflows import FileType + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + + +def _make_output_parameter() -> OutputParameter: + now = datetime.now(timezone.utc) + return OutputParameter( + parameter_type=ParameterType.OUTPUT, + key="task1_output", + description="test output", + output_parameter_id="op_task1", + workflow_id="w_test", + created_at=now, + modified_at=now, + ) + + +def _make_scraped_page(): + refreshed = MagicMock() + refreshed.extracted_text = "page text" + refreshed.url = "https://example.test" + refreshed.screenshots = [] + refreshed.build_element_tree = MagicMock(return_value="link") + refreshed.support_economy_elements_tree = MagicMock(return_value=False) + refreshed.last_used_element_tree_html = None + + scraped_page = MagicMock() + scraped_page.refresh = AsyncMock(return_value=refreshed) + scraped_page.screenshots = [] + return scraped_page + + +def _make_task(*, system_prompt: str | None) -> MagicMock: + task = MagicMock() + task.navigation_goal = None + task.navigation_payload = None + task.extracted_information = None + task.extracted_information_schema = {"type": "object"} + task.data_extraction_goal = "Extract documents" + task.error_code_mapping = None + task.llm_key = None + task.workflow_run_id = "wfr_sysprompt" + task.task_id = "tsk_sysprompt" + task.workflow_permanent_id = "wpid_sysprompt" + task.organization_id = "o_sysprompt" + task.include_extracted_text = True + task.workflow_system_prompt = system_prompt + return task + + +# --------------------------------------------------------------------------- +# extract-information handler (the OP's exact failure path) +# --------------------------------------------------------------------------- + + +def test_extract_information_forwards_system_prompt(monkeypatch) -> None: + """Regression for the OP's bug: an ExtractionBlock whose Task carries a + ``system_prompt`` (inherited from workflow.workflow_system_prompt) must + forward it to the extract-information LLM call.""" + from skyvern.webeye.actions import handler + + extraction_cache._reset_for_tests() + + captured: dict = {} + + async def fake_llm(**kwargs): + captured.update(kwargs) + return {"quotes": "SHOUTED QUOTE"} + + monkeypatch.setattr( + handler, + "load_prompt_with_elements_tracked", + lambda **kwargs: ("rendered-prompt", dict(kwargs)), + ) + monkeypatch.setattr(handler, "ensure_context", lambda: MagicMock(tz_info=None)) + monkeypatch.setattr(handler.service_utils, "is_cua_task", AsyncMock(return_value=False)) + monkeypatch.setattr( + handler.LLMAPIHandlerFactory, + "get_override_llm_api_handler", + lambda llm_key, default: fake_llm, + ) + monkeypatch.setattr( + handler.app.AGENT_FUNCTION, + "should_shadow_extraction_cache_hit", + AsyncMock(return_value=False), + ) + monkeypatch.setattr( + handler.app.AGENT_FUNCTION, + "lookup_cross_run_extraction_cache", + AsyncMock(return_value=None), + ) + monkeypatch.setattr( + handler.app.AGENT_FUNCTION, + "store_cross_run_extraction_cache", + AsyncMock(return_value=None), + ) + + task = _make_task(system_prompt="Respond only in uppercase.") + step = MagicMock(step_id="stp_sp", retry_index=0) + scraped_page = _make_scraped_page() + + asyncio.run(handler.extract_information_for_navigation_goal(task=task, step=step, scraped_page=scraped_page)) + + assert captured.get("system_prompt") == "Respond only in uppercase." + extraction_cache._reset_for_tests() + + +def test_extract_information_passes_none_system_prompt_when_task_has_none(monkeypatch) -> None: + """No workflow_system_prompt → task.workflow_system_prompt is None → handler receives None. + Locks in that we don't accidentally invent a default or drop the kwarg.""" + from skyvern.webeye.actions import handler + + extraction_cache._reset_for_tests() + + captured: dict = {} + + async def fake_llm(**kwargs): + captured.update(kwargs) + return {"quotes": "x"} + + monkeypatch.setattr( + handler, + "load_prompt_with_elements_tracked", + lambda **kwargs: ("rendered-prompt", dict(kwargs)), + ) + monkeypatch.setattr(handler, "ensure_context", lambda: MagicMock(tz_info=None)) + monkeypatch.setattr(handler.service_utils, "is_cua_task", AsyncMock(return_value=False)) + monkeypatch.setattr( + handler.LLMAPIHandlerFactory, + "get_override_llm_api_handler", + lambda llm_key, default: fake_llm, + ) + monkeypatch.setattr( + handler.app.AGENT_FUNCTION, + "should_shadow_extraction_cache_hit", + AsyncMock(return_value=False), + ) + monkeypatch.setattr( + handler.app.AGENT_FUNCTION, + "lookup_cross_run_extraction_cache", + AsyncMock(return_value=None), + ) + monkeypatch.setattr( + handler.app.AGENT_FUNCTION, + "store_cross_run_extraction_cache", + AsyncMock(return_value=None), + ) + + task = _make_task(system_prompt=None) + step = MagicMock(step_id="stp_none", retry_index=0) + scraped_page = _make_scraped_page() + + asyncio.run(handler.extract_information_for_navigation_goal(task=task, step=step, scraped_page=scraped_page)) + + assert "system_prompt" in captured + assert captured["system_prompt"] is None + extraction_cache._reset_for_tests() + + +# --------------------------------------------------------------------------- +# data-extraction-summary (agent.py path) +# --------------------------------------------------------------------------- + + +def test_data_extraction_summary_forwards_system_prompt(monkeypatch) -> None: + from skyvern.forge import agent as agent_module + + captured: dict = {} + + async def fake_handler(**kwargs): + captured.update(kwargs) + return {"summary": "ok"} + + monkeypatch.setattr(agent_module.app, "EXTRACTION_LLM_API_HANDLER", fake_handler) + monkeypatch.setattr( + agent_module.skyvern_context, + "ensure_context", + lambda: MagicMock(tz_info=None, workflow_run_id="wr_sp"), + ) + monkeypatch.setattr(agent_module.extraction_cache, "compute_cache_key", lambda **_: None) + monkeypatch.setattr(agent_module.extraction_cache, "lookup", lambda *a, **k: None) + + task = _make_task(system_prompt="Answer in French.") + step = MagicMock(step_id="stp_sp", order=0) + scraped_page = MagicMock(url="https://example.test") + + asyncio.run(agent_module.ForgeAgent.create_extract_action(task=task, step=step, scraped_page=scraped_page)) + + assert captured.get("system_prompt") == "Answer in French." + + +# --------------------------------------------------------------------------- +# Extraction cache key — system_prompt must be part of the key +# --------------------------------------------------------------------------- + + +def test_extraction_cache_key_changes_with_system_prompt() -> None: + """Two calls that differ only in system_prompt must produce different + digests — otherwise a user switching workflow_system_prompt mid-run would + get stale output from a prior key's cached value.""" + base_kwargs: dict = dict( + call_path="handler", + element_tree="link", + extracted_text="text", + current_url="https://example.test", + data_extraction_goal="Extract docs", + extracted_information_schema={"type": "object"}, + navigation_payload=None, + error_code_mapping=None, + previous_extracted_information=None, + llm_key=None, + ) + + key_no_sp = extraction_cache.compute_cache_key(**base_kwargs, workflow_system_prompt=None) + key_a = extraction_cache.compute_cache_key(**base_kwargs, workflow_system_prompt="Answer in Spanish.") + key_b = extraction_cache.compute_cache_key(**base_kwargs, workflow_system_prompt="Answer in French.") + + assert key_no_sp != key_a + assert key_a != key_b + # Same prompt → same key (determinism). + key_a2 = extraction_cache.compute_cache_key(**base_kwargs, workflow_system_prompt="Answer in Spanish.") + assert key_a == key_a2 + + +# --------------------------------------------------------------------------- +# TextPromptBlock.send_prompt +# --------------------------------------------------------------------------- + + +def test_text_prompt_block_forwards_system_prompt(monkeypatch) -> None: + from skyvern.forge.sdk.workflow.models import block as block_module + + captured: dict = {} + + async def fake_llm(**kwargs): + captured.update(kwargs) + return {"llm_response": "hi"} + + async def fake_default_handler(*args, **kwargs): + return None + + async def fake_resolve(self, workflow_run_id, organization_id): + return fake_default_handler + + monkeypatch.setattr(TextPromptBlock, "_resolve_default_llm_handler", fake_resolve) + monkeypatch.setattr( + block_module.LLMAPIHandlerFactory, + "get_override_llm_api_handler", + lambda llm_key, default: fake_llm, + ) + + block = TextPromptBlock( + label="p", + output_parameter=_make_output_parameter(), + prompt="What is the meaning of life?", + workflow_system_prompt="Respond as Shakespeare.", + ) + + asyncio.run( + block.send_prompt( + prompt="What is the meaning of life?", + parameter_values={}, + workflow_run_id="wfr_sp", + organization_id="o_sp", + workflow_run_block_id=None, + ) + ) + + assert captured.get("system_prompt") == "Respond as Shakespeare." + + +# --------------------------------------------------------------------------- +# FileParserBlock._extract_with_ai +# --------------------------------------------------------------------------- + + +def test_file_parser_block_forwards_system_prompt(monkeypatch) -> None: + from skyvern.forge.sdk.workflow.models import block as block_module + + captured: dict = {} + + async def fake_llm(**kwargs): + captured.update(kwargs) + return {"output": {}} + + monkeypatch.setattr( + block_module.LLMAPIHandlerFactory, + "get_override_llm_api_handler", + lambda llm_key, default: fake_llm, + ) + + block = FileParserBlock( + label="fp", + output_parameter=_make_output_parameter(), + file_url="https://example.com/file.csv", + file_type=FileType.CSV, + workflow_system_prompt="Parse as JSON only.", + ) + + asyncio.run(block._extract_with_ai("hello,world\n1,2", workflow_run_context=MagicMock())) + + assert captured.get("system_prompt") == "Parse as JSON only." + + +# --------------------------------------------------------------------------- +# PDFParserBlock.execute — system_prompt forwarded to LLM call +# --------------------------------------------------------------------------- + + +def test_pdf_parser_block_forwards_system_prompt(monkeypatch) -> None: + from skyvern.forge.sdk.workflow.models import block as block_module + + captured: dict = {} + + async def fake_llm(**kwargs): + captured.update(kwargs) + return {"output": {}} + + # Patch the app-level handler directly since PDFParserBlock uses app.LLM_API_HANDLER. + monkeypatch.setattr(block_module.app, "LLM_API_HANDLER", fake_llm) + monkeypatch.setattr( + block_module, + "download_file", + AsyncMock(return_value="/tmp/file.pdf"), + ) + monkeypatch.setattr(block_module, "extract_pdf_file", lambda *a, **k: "extracted text") + + workflow_run_context = MagicMock() + workflow_run_context.has_parameter = MagicMock(return_value=False) + workflow_run_context.has_value = MagicMock(return_value=False) + workflow_run_context.workflow = None + workflow_run_context.resolve_effective_workflow_system_prompt = MagicMock(return_value=None) + + monkeypatch.setattr( + PDFParserBlock, + "get_workflow_run_context", + staticmethod(lambda workflow_run_id: workflow_run_context), + ) + monkeypatch.setattr( + PDFParserBlock, + "record_output_parameter_value", + AsyncMock(return_value=None), + ) + monkeypatch.setattr( + PDFParserBlock, + "build_block_result", + AsyncMock(return_value=MagicMock()), + ) + + block = PDFParserBlock( + label="pdf", + output_parameter=_make_output_parameter(), + file_url="https://example.com/file.pdf", + workflow_system_prompt="Summarize in one sentence.", + ) + + asyncio.run( + block.execute( + workflow_run_id="wfr_sp_pdf", + workflow_run_block_id="wrb_sp_pdf", + organization_id="o_sp_pdf", + ) + ) + + assert captured.get("system_prompt") == "Summarize in one sentence."