Fix for-loop adaptive caching: output aggregation, cached function bodies, and template resolution (#4931)

This commit is contained in:
pedrohsdb 2026-02-28 15:57:05 -08:00 committed by GitHub
parent 7aec0da11e
commit fab3347758
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 433 additions and 0 deletions

View file

@ -483,3 +483,318 @@ class TestForLoopScriptCompilation:
# Verify the for-loop is present inside run_workflow (not at module level)
assert "async for current_value in skyvern.loop" in result
assert "def run_workflow" in result
class TestForLoopInnerBlockCachedFunctions:
"""Test that inner blocks inside for_loop get @skyvern.cached function bodies generated.
This covers the bug where for_loop inner blocks (e.g. extraction inside a loop) didn't
get cached function bodies because they were nested in loop_blocks and not in the
top-level blocks list. Without cached function bodies, code mode falls back to agent.
"""
@pytest.mark.asyncio
async def test_inner_extraction_block_gets_cached_function(self) -> None:
"""Verify that an extraction block inside a for_loop generates a @skyvern.cached function."""
from skyvern.core.script_generations.generate_script import generate_workflow_script_python_code
blocks = [
{
"block_type": "for_loop",
"label": "scrape_each_page",
"loop_variable_reference": "{{ urls }}",
"workflow_run_block_id": "wfrb_loop_1",
"loop_blocks": [
{
"block_type": "extraction",
"label": "extract_data",
"data_extraction_goal": "Get page content",
"task_id": "task_inner_1",
"workflow_run_block_id": "wfrb_inner_1",
"url": "https://example.com",
},
],
},
]
actions_by_task = {
"task_inner_1": [
{
"action_type": "extract",
"action_id": "action_1",
"xpath": "//div[@id='content']",
"element_id": "elem_1",
"text": None,
"data_extraction_goal": "Get page content",
},
],
}
workflow = {
"workflow_id": "wf_test",
"title": "Test ForLoop Inner Block",
"workflow_definition": {"parameters": []},
}
mock_create_script_block = AsyncMock()
with (
patch(
"skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema",
new_callable=AsyncMock,
return_value=("", {}),
),
patch(
"skyvern.core.script_generations.generate_script.create_or_update_script_block",
mock_create_script_block,
),
):
result = await generate_workflow_script_python_code(
file_name="test_forloop_inner.py",
workflow_run_request={"workflow_id": "wpid_test"},
workflow=workflow,
blocks=blocks,
actions_by_task=actions_by_task,
script_id="script_123",
script_revision_id="rev_123",
organization_id="org_123",
)
# The generated code must compile without errors
try:
ast.parse(result)
except SyntaxError as e:
pytest.fail(f"Generated script has SyntaxError: {e}\n\nGenerated code:\n{result}")
# Verify the inner block's @skyvern.cached function is present
assert "@skyvern.cached" in result
assert "extract_data" in result
# Verify create_or_update_script_block was called for both the for_loop
# and the inner extraction block
call_labels = [
call.kwargs.get("block_label") or call.args[4] for call in mock_create_script_block.call_args_list
]
assert "scrape_each_page" in call_labels, (
f"for_loop block should have a script_block entry. Labels: {call_labels}"
)
assert "extract_data" in call_labels, (
f"Inner extraction block should have a script_block entry. Labels: {call_labels}"
)
@pytest.mark.asyncio
async def test_inner_block_skipped_without_actions(self) -> None:
"""Inner blocks with no actions should be skipped (no cached function generated)."""
from skyvern.core.script_generations.generate_script import generate_workflow_script_python_code
blocks = [
{
"block_type": "for_loop",
"label": "scrape_each_page",
"loop_variable_reference": "{{ urls }}",
"loop_blocks": [
{
"block_type": "extraction",
"label": "extract_data",
"data_extraction_goal": "Get page content",
"task_id": "task_no_actions",
},
],
},
]
workflow = {
"workflow_id": "wf_test",
"title": "Test ForLoop No Actions",
"workflow_definition": {"parameters": []},
}
mock_create_script_block = AsyncMock()
with (
patch(
"skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema",
new_callable=AsyncMock,
return_value=("", {}),
),
patch(
"skyvern.core.script_generations.generate_script.create_or_update_script_block",
mock_create_script_block,
),
):
result = await generate_workflow_script_python_code(
file_name="test_forloop_no_actions.py",
workflow_run_request={"workflow_id": "wpid_test"},
workflow=workflow,
blocks=blocks,
actions_by_task={}, # No actions
script_id="script_123",
script_revision_id="rev_123",
organization_id="org_123",
)
# Should compile fine
try:
ast.parse(result)
except SyntaxError as e:
pytest.fail(f"Generated script has SyntaxError: {e}\n\nGenerated code:\n{result}")
# No @skyvern.cached should be present since inner block had no actions
assert "@skyvern.cached" not in result
# create_or_update_script_block should be called for the for_loop only,
# NOT for the inner block
call_labels = [
call.kwargs.get("block_label") or call.args[4] for call in mock_create_script_block.call_args_list
]
assert "scrape_each_page" in call_labels
assert "extract_data" not in call_labels
@pytest.mark.asyncio
async def test_non_task_inner_blocks_are_skipped(self) -> None:
"""Inner blocks with block_type not in SCRIPT_TASK_BLOCKS should be skipped."""
from skyvern.core.script_generations.generate_script import generate_workflow_script_python_code
blocks = [
{
"block_type": "for_loop",
"label": "process_items",
"loop_variable_reference": "{{ items }}",
"loop_blocks": [
{
"block_type": "goto_url",
"label": "go_to_page",
"url": "https://example.com",
},
],
},
]
workflow = {
"workflow_id": "wf_test",
"title": "Test ForLoop Non-Task Inner",
"workflow_definition": {"parameters": []},
}
mock_create_script_block = AsyncMock()
with (
patch(
"skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema",
new_callable=AsyncMock,
return_value=("", {}),
),
patch(
"skyvern.core.script_generations.generate_script.create_or_update_script_block",
mock_create_script_block,
),
):
result = await generate_workflow_script_python_code(
file_name="test_forloop_non_task.py",
workflow_run_request={"workflow_id": "wpid_test"},
workflow=workflow,
blocks=blocks,
actions_by_task={},
script_id="script_123",
script_revision_id="rev_123",
organization_id="org_123",
)
try:
ast.parse(result)
except SyntaxError as e:
pytest.fail(f"Generated script has SyntaxError: {e}\n\nGenerated code:\n{result}")
# No @skyvern.cached should be generated for non-task blocks
assert "@skyvern.cached" not in result
@pytest.mark.asyncio
async def test_inner_block_labels_in_processed_labels(self) -> None:
"""Inner block labels should be tracked to avoid duplication in unexecuted-branch preservation."""
from skyvern.core.script_generations.generate_script import generate_workflow_script_python_code
blocks = [
{
"block_type": "for_loop",
"label": "scrape_each_page",
"loop_variable_reference": "{{ urls }}",
"workflow_run_block_id": "wfrb_loop_1",
"loop_blocks": [
{
"block_type": "extraction",
"label": "extract_data",
"data_extraction_goal": "Get page content",
"task_id": "task_inner_1",
"workflow_run_block_id": "wfrb_inner_1",
},
],
},
]
actions_by_task = {
"task_inner_1": [
{
"action_type": "extract",
"action_id": "action_1",
"xpath": "//div[@id='content']",
"element_id": "elem_1",
"text": None,
"data_extraction_goal": "Get page content",
},
],
}
workflow = {
"workflow_id": "wf_test",
"title": "Test ForLoop Labels",
"workflow_definition": {"parameters": []},
}
# Mock a cached_block with the same label as the inner block to verify
# it doesn't get duplicated in the "preserve unexecuted branch" section
mock_cached_source = MagicMock()
mock_cached_source.code = "@skyvern.cached(cache_key='extract_data')\nasync def extract_data_fn(): pass"
mock_cached_source.run_signature = "await skyvern.extract(prompt='Get page content', label='extract_data')"
mock_cached_source.workflow_run_id = "wr_old"
mock_cached_source.workflow_run_block_id = "wfrb_old"
mock_cached_source.input_fields = None
mock_create_script_block = AsyncMock()
with (
patch(
"skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema",
new_callable=AsyncMock,
return_value=("", {}),
),
patch(
"skyvern.core.script_generations.generate_script.create_or_update_script_block",
mock_create_script_block,
),
):
result = await generate_workflow_script_python_code(
file_name="test_forloop_labels.py",
workflow_run_request={"workflow_id": "wpid_test"},
workflow=workflow,
blocks=blocks,
actions_by_task=actions_by_task,
script_id="script_123",
script_revision_id="rev_123",
organization_id="org_123",
cached_blocks={"extract_data": mock_cached_source},
)
# Should compile fine
try:
ast.parse(result)
except SyntaxError as e:
pytest.fail(f"Generated script has SyntaxError: {e}\n\nGenerated code:\n{result}")
# The inner block should appear only once in the generated code (not duplicated
# by the "preserve unexecuted branch" section)
occurrences = result.count("@skyvern.cached")
# There should be exactly 1 cached function (the inner block), not 2
assert occurrences == 1, (
f"Expected 1 @skyvern.cached occurrence but found {occurrences}. "
f"Inner block may have been duplicated.\n\nGenerated code:\n{result}"
)