From 2e72c2380c2dcb9727f174f308fd6e2a22bd48d4 Mon Sep 17 00:00:00 2001 From: Aaron Perez Date: Tue, 7 Apr 2026 12:38:48 -0500 Subject: [PATCH] fix: nested for-loop script block persistence (SKY-8757) (#5404) --- .../script_generations/generate_script.py | 164 ++- .../transform_workflow_run.py | 257 ++-- skyvern/forge/sdk/db/repositories/scripts.py | 6 + skyvern/forge/sdk/workflow/service.py | 65 +- skyvern/services/workflow_script_service.py | 30 +- .../test_nested_forloop_script_generation.py | 1043 +++++++++++++++++ 6 files changed, 1476 insertions(+), 89 deletions(-) create mode 100644 tests/unit/test_nested_forloop_script_generation.py diff --git a/skyvern/core/script_generations/generate_script.py b/skyvern/core/script_generations/generate_script.py index 2310c289f..3e7a29535 100644 --- a/skyvern/core/script_generations/generate_script.py +++ b/skyvern/core/script_generations/generate_script.py @@ -9,6 +9,7 @@ from __future__ import annotations import hashlib import keyword import re +from collections import deque from dataclasses import dataclass from typing import Any @@ -537,7 +538,9 @@ def _render_value( ) -> cst.BaseExpression: """Create a prompt value with template rendering logic if needed.""" if not prompt_text: - return cst.SimpleString("") + # Delegate to _value so empty/None inputs produce a valid CST node + # (libcst rejects SimpleString("") because it lacks enclosing quotes). + return _value(prompt_text) if "{{" in prompt_text and "}}" in prompt_text: args = [cst.Arg(value=_value(prompt_text))] if data_variable_name: @@ -1052,10 +1055,16 @@ def _build_form_filling_block_fn( navigation_goal = block.get("navigation_goal") or "Fill out the form" - # Include page.goto(url) if the block has a URL, just like _build_block_fn does + # Include page.goto(url) if the block has a URL, just like _build_block_fn does. + # Templated URLs (e.g. {{ outer_loop.current_value.url }}) are wrapped in + # skyvern.render_template() so they resolve at runtime. goto_line = "" if block.get("url"): - goto_line = f" await page.goto({repr(block['url'])})\n" + url_str = block["url"] + if isinstance(url_str, str) and "{{" in url_str and "}}" in url_str: + goto_line = f" await page.goto(skyvern.render_template({repr(url_str)}))\n" + else: + goto_line = f" await page.goto({repr(url_str)})\n" func_code = ( f"async def {name}(page: SkyvernPage, context: RunContext):\n" @@ -1135,7 +1144,14 @@ def _build_block_fn( actions = _annotate_multi_field_totp_sequence(actions) if block.get("url"): - body_stmts.append(cst.parse_statement(f"await page.goto({repr(block['url'])})")) + # Use skyvern.render_template() when the URL contains a Jinja expression + # (e.g. {{ outer_page_loop.current_value.url }}) so it resolves at runtime + # against workflow_run_context.values populated by skyvern.loop(). + url_str = block["url"] + if isinstance(url_str, str) and "{{" in url_str and "}}" in url_str: + body_stmts.append(cst.parse_statement(f"await page.goto(skyvern.render_template({repr(url_str)}))")) + else: + body_stmts.append(cst.parse_statement(f"await page.goto({repr(url_str)})")) # For file_download blocks inside for-loops, generate a dynamic click that uses # per-iteration context instead of hardcoded xpath/prompt from iteration 0. @@ -1406,7 +1422,7 @@ def _build_extract_statement( args = [ cst.Arg( keyword=cst.Name("prompt"), - value=_value(block.get("data_extraction_goal", "")), + value=_render_value(block.get("data_extraction_goal", ""), data_variable_name), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -1414,6 +1430,8 @@ def _build_extract_statement( ), cst.Arg( keyword=cst.Name("schema"), + # data_schema is a dict/object, not a string template — _render_value only + # handles strings, so we intentionally keep _value here. value=_value(block.get("data_schema", "")), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, @@ -1421,6 +1439,20 @@ def _build_extract_statement( ), ), ] + # Emit url so the extraction block navigates to the right page on cache hit. + # Uses _render_value so Jinja refs like {{ outer_page_loop.current_value.url }} + # resolve at runtime from workflow_run_context.values (populated by skyvern.loop()). + if block.get("url"): + args.append( + cst.Arg( + keyword=cst.Name("url"), + value=_render_value(block.get("url", ""), data_variable_name), + whitespace_after_arg=cst.ParenthesizedWhitespace( + indent=True, + last_line=cst.SimpleWhitespace(INDENT), + ), + ) + ) if block.get("model"): args.append( cst.Arg( @@ -2440,7 +2472,14 @@ def __build_base_task_statement( if value_to_param and prompt: prompt_value = _build_parameterized_prompt_cst(prompt, value_to_param) if prompt_value is None: - prompt_value = _value(prompt) + if prompt: + # Use _render_value so Jinja refs (e.g. {{ current_value }}, + # {{ outer_loop.current_value.url }}) are resolved at runtime via + # skyvern.render_template() instead of emitted as Python literals. + prompt_value = _render_value(prompt, data_variable_name) + else: + # Preserve old behavior for None/empty prompts (emits `None` vs `""`). + prompt_value = _value(prompt) args = [ cst.Arg( @@ -2456,7 +2495,10 @@ def __build_base_task_statement( args.append( cst.Arg( keyword=cst.Name("url"), - value=_value(block.get("url", "")), + # Use _render_value so Jinja refs (e.g. {{ current_value }}, + # {{ outer_loop.current_value.url }}) resolve at runtime via + # skyvern.render_template() instead of being emitted as literals. + value=_render_value(block.get("url", ""), data_variable_name), whitespace_after_arg=cst.ParenthesizedWhitespace( indent=True, last_line=cst.SimpleWhitespace(INDENT), @@ -3030,8 +3072,86 @@ async def generate_workflow_script_python_code( # Inner blocks (e.g. extraction inside a loop) are nested in loop_blocks and # are NOT in the top-level blocks list, so they need separate processing here. # This follows the same pattern as task_v2 child block handling (lines 2704-2714). - for loop_block in for_loop_block.get("loop_blocks", []): - if loop_block.get("block_type") not in SCRIPT_TASK_BLOCKS: + # Uses a BFS queue to recursively handle nested for-loops (SKY-8757). + # Each queue entry is (block_dict, parent_forloop_label) so cache + # invalidation propagates from the correct parent at any depth. + loop_block_queue: deque[tuple[dict[str, Any], str]] = deque( + (lb, for_loop_label) for lb in for_loop_block.get("loop_blocks", []) + ) + while loop_block_queue: + loop_block, parent_fl_label = loop_block_queue.popleft() + loop_block_type = loop_block.get("block_type") + + # block_type is a string here (from dict.get on model_dump output), + # not a BlockType enum — unlike transform_workflow_run.py which + # works with ORM objects. Both compare correctly to string literals. + # + # Nested for-loop: create script_block for the inner for-loop itself, + # then push its children onto the queue for processing. + # NOTE: Do NOT call append_block_code() for nested for_loop blocks + # (same as top-level for_loops) — they produce bare `async for` + # statements that cause SyntaxError at module level. + if loop_block_type == "for_loop": + nested_label = loop_block.get("label") or f"for_loop_{loop_block.get('workflow_run_block_id')}" + + cached_nested = cached_blocks.get(nested_label) + # Force rebuild when the nested label OR its immediate parent + # for-loop is marked for regeneration (invalidation propagates + # down at every nesting depth, not just from the top-level). + use_nested_cached = ( + cached_nested is not None + and nested_label not in updated_block_labels + and parent_fl_label not in updated_block_labels + ) + + nested_wrbi = loop_block.get("workflow_run_block_id") + nested_wri = loop_block.get("workflow_run_id") or run_id + + # use_nested_cached already guarantees cached_nested is not None; + # the explicit check is retained only for mypy type narrowing. + if ( + use_nested_cached + and cached_nested is not None + and cached_nested.code + and cached_nested.run_signature + ): + nested_code = cached_nested.code + nested_sig = cached_nested.run_signature + nested_wrbi = cached_nested.workflow_run_block_id + nested_wri = cached_nested.workflow_run_id + else: + # No usable cache entry (missing, incomplete, or needs update) + # — rebuild from current run data. Mark this label as updated + # so invalidation cascades to deeper descendants. + updated_block_labels.add(nested_label) + nested_stmt = _build_for_loop_statement(nested_label, loop_block) + temp_mod = cst.Module(body=[nested_stmt]) + nested_code = temp_mod.code + nested_sig = nested_code.strip() + + if script_id and script_revision_id and organization_id: + ok = await create_or_update_script_block( + block_code=nested_code, + script_revision_id=script_revision_id, + script_id=script_id, + organization_id=organization_id, + block_label=nested_label, + update=pending, + run_signature=nested_sig, + workflow_run_id=nested_wri, + workflow_run_block_id=nested_wrbi, + input_fields=None, + ) + if ok: + blocks_created += 1 + else: + blocks_failed += 1 + + # Push nested for-loop's children with this loop as their parent + loop_block_queue.extend((child, nested_label) for child in loop_block.get("loop_blocks", [])) + continue + + if loop_block_type not in SCRIPT_TASK_BLOCKS: continue inner_label = ( @@ -3051,7 +3171,11 @@ async def generate_workflow_script_python_code( else: inner_actions = actions_by_task.get(loop_block.get("task_id", ""), []) if not inner_actions: - continue # No actions from agent run = can't generate cached function + # No actions from agent run = can't generate cached function. + # No script_block row is created; the block will be cached on + # a future run when actions become available. This is intentional + # — generating a stub would produce broken code. + continue inner_fn_def = _build_block_fn( loop_block, @@ -3160,10 +3284,22 @@ async def generate_workflow_script_python_code( for flb in for_loop_blocks: label = flb.get("label") or f"for_loop_{flb.get('workflow_run_block_id')}" processed_labels.add(label) - # Also track inner block labels to prevent duplication in the - # "preserve unexecuted branch" section below - for lb in flb.get("loop_blocks", []): - inner_lbl = lb.get("label") or lb.get("title") + # Recursively track all inner block labels (including nested for-loops) + # to prevent duplication in the "preserve unexecuted branch" section below. + # Use the same label derivation as the main code generation loop to ensure + # labels match (e.g., for_loop blocks without explicit labels get the + # "for_loop_{workflow_run_block_id}" fallback). + inner_queue: deque[dict[str, Any]] = deque(flb.get("loop_blocks", [])) + while inner_queue: + lb = inner_queue.popleft() + lb_type = lb.get("block_type") + if lb_type == "for_loop": + inner_lbl = lb.get("label") or f"for_loop_{lb.get('workflow_run_block_id')}" + inner_queue.extend(lb.get("loop_blocks", [])) + else: + # Use the same 3-fallback chain as the main generation loop + # (label → title → block_{wrb_id}) so labels always match. + inner_lbl = lb.get("label") or lb.get("title") or f"block_{lb.get('workflow_run_block_id')}" if inner_lbl: processed_labels.add(inner_lbl) if adaptive_caching: diff --git a/skyvern/core/script_generations/transform_workflow_run.py b/skyvern/core/script_generations/transform_workflow_run.py index fe5473569..4c8e99ce3 100644 --- a/skyvern/core/script_generations/transform_workflow_run.py +++ b/skyvern/core/script_generations/transform_workflow_run.py @@ -47,6 +47,182 @@ def _process_action_for_block( return action_dump +def _build_children_by_parent(workflow_run_blocks: list[Any]) -> dict[str | None, list[Any]]: + """Build a parent_id -> [child_blocks] mapping for O(1) lookups.""" + result: dict[str | None, list[Any]] = defaultdict(list) + for block in workflow_run_blocks: + result[block.parent_workflow_run_block_id].append(block) + return result + + +def _count_descendant_actions( + root_wrb_id: str, + children_by_parent: dict[str | None, list[Any]], + actions_by_task_id: dict[str, list[Action]], +) -> int: + """Count total actions across all descendants of a run block (DFS).""" + total = 0 + stack = list(children_by_parent.get(root_wrb_id, [])) + while stack: + node = stack.pop() + if node.task_id: + total += len(actions_by_task_id.get(node.task_id, [])) + stack.extend(children_by_parent.get(node.workflow_run_block_id, [])) + return total + + +def _process_forloop_children( + forloop_run_block: Any, + loop_blocks_def: list[dict[str, Any]], + children_by_parent: dict[str | None, list[Any]], + tasks_by_id: dict[str, Any], + actions_by_task_id: dict[str, list[Action]], + actions_by_task: dict[str, list[dict[str, Any]]], +) -> list[dict[str, Any]]: + """Process ForLoop child blocks, merging run data into definition blocks. + + Recursively handles nested for-loops so deeply nested blocks (e.g., extraction + inside a double-nested for-loop) get their task_id and actions merged. + """ + # Child blocks have parent_workflow_run_block_id pointing to the ForLoop's workflow_run_block_id. + # When the outer loop iterates N times, there are N child run blocks per label. + # Pick the best candidate per label: prefer the block that has a task_id (for task + # blocks) or the most grandchildren (for nested for-loops), so we don't lose data + # if the last iteration happened to be empty. + child_run_blocks = children_by_parent.get(forloop_run_block.workflow_run_block_id, []) + child_run_blocks_by_label: dict[str, Any] = {} + for b in child_run_blocks: + if not b.label: + continue + existing = child_run_blocks_by_label.get(b.label) + if existing is None: + child_run_blocks_by_label[b.label] = b + elif b.block_type in SCRIPT_TASK_BLOCKS: + # Prefer the iteration with the richest execution evidence: + # 1. has task_id beats no task_id + # 2. when both have task_id, prefer more actions + # 3. on action tie, prefer completed status over failed/other + if b.task_id and not existing.task_id: + child_run_blocks_by_label[b.label] = b + elif b.task_id and existing.task_id: + b_actions = len(actions_by_task_id.get(b.task_id, [])) + existing_actions = len(actions_by_task_id.get(existing.task_id, [])) + if b_actions > existing_actions: + child_run_blocks_by_label[b.label] = b + elif b_actions == existing_actions: + # Break tie by status: completed > everything else. + # Use str() in case the ORM returns a Status enum. + b_completed = str(b.status) == "completed" + existing_completed = str(existing.status) == "completed" + if b_completed and not existing_completed: + child_run_blocks_by_label[b.label] = b + elif b.block_type == BlockType.FOR_LOOP: + # Prefer the nested for-loop iteration that produced grandchildren. + # On ties, break by total deep-descendant action count so the + # iteration with usable actions wins even at 3+ nesting levels. + existing_children = children_by_parent.get(existing.workflow_run_block_id, []) + b_children_list = children_by_parent.get(b.workflow_run_block_id, []) + if len(b_children_list) > len(existing_children): + child_run_blocks_by_label[b.label] = b + elif len(b_children_list) == len(existing_children) and b_children_list: + b_desc = _count_descendant_actions(b.workflow_run_block_id, children_by_parent, actions_by_task_id) + existing_desc = _count_descendant_actions( + existing.workflow_run_block_id, children_by_parent, actions_by_task_id + ) + if b_desc > existing_desc: + child_run_blocks_by_label[b.label] = b + + unlabeled_children = [b for b in child_run_blocks if not b.label] + if unlabeled_children: + LOG.warning( + "ForLoop has child blocks without labels - these will not be matched to loop_blocks definitions", + forloop_label=forloop_run_block.label, + unlabeled_count=len(unlabeled_children), + ) + + if loop_blocks_def and not child_run_blocks: + LOG.warning( + "ForLoop block has loop_blocks definitions but no child run blocks found", + forloop_label=forloop_run_block.label, + workflow_run_block_id=forloop_run_block.workflow_run_block_id, + loop_blocks_count=len(loop_blocks_def), + ) + + updated_loop_blocks: list[dict[str, Any]] = [] + matched_count = 0 + nested_forloop_count = 0 + for loop_block_def in loop_blocks_def: + # Shallow copy: safe because we only replace top-level keys (update(), + # ["loop_blocks"] = ...). Do not mutate nested dicts in-place. + if isinstance(loop_block_def, dict): + loop_block_dump = loop_block_def.copy() + elif hasattr(loop_block_def, "model_dump"): + loop_block_dump = loop_block_def.model_dump() + else: + loop_block_dump = dict(loop_block_def) + loop_block_label = loop_block_dump.get("label") + + child_run_block = child_run_blocks_by_label.get(loop_block_label) if loop_block_label else None + + if child_run_block and child_run_block.block_type in SCRIPT_TASK_BLOCKS and child_run_block.task_id: + matched_count += 1 + task = tasks_by_id.get(child_run_block.task_id) + if task: + task_dump = task.model_dump() + loop_block_dump.update({k: v for k, v in task_dump.items() if k not in loop_block_dump}) + loop_block_dump.update( + { + "task_id": child_run_block.task_id, + "status": child_run_block.status, + "output": child_run_block.output, + } + ) + + actions = actions_by_task_id.get(child_run_block.task_id, []) + action_dumps = [_process_action_for_block(action, loop_block_dump) for action in actions] + actions_by_task[child_run_block.task_id] = action_dumps + else: + LOG.warning( + "Task not found for ForLoop child block", + task_id=child_run_block.task_id, + forloop_label=forloop_run_block.label, + ) + + # Recursively process nested for-loops so their inner blocks + # also get task_id and actions merged (SKY-8757). + if child_run_block and child_run_block.block_type == BlockType.FOR_LOOP: + nested_forloop_count += 1 + inner_loop_blocks = loop_block_dump.get("loop_blocks", []) + if inner_loop_blocks: + loop_block_dump["loop_blocks"] = _process_forloop_children( + forloop_run_block=child_run_block, + loop_blocks_def=inner_loop_blocks, + children_by_parent=children_by_parent, + tasks_by_id=tasks_by_id, + actions_by_task_id=actions_by_task_id, + actions_by_task=actions_by_task, + ) + # Always set run block metadata for the inner for-loop, even when + # loop_blocks is empty. generate_script.py needs workflow_run_block_id + # for script_block creation and label fallback derivation. + loop_block_dump["workflow_run_block_id"] = child_run_block.workflow_run_block_id + loop_block_dump["workflow_run_id"] = child_run_block.workflow_run_id + + updated_loop_blocks.append(loop_block_dump) + + if matched_count or nested_forloop_count: + LOG.info( + "ForLoop child block processing summary", + forloop_label=forloop_run_block.label, + definition_count=len(loop_blocks_def), + run_block_count=len(child_run_blocks), + task_blocks_matched=matched_count, + nested_forloops=nested_forloop_count, + ) + + return updated_loop_blocks + + async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organization_id: str) -> CodeGenInput: # get the workflow run request workflow_run_resp = await workflow_service.get_workflow_run_response( @@ -70,7 +246,10 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz # get the original workflow definition blocks (with templated information) workflow_definition_blocks = workflow.workflow_definition.blocks - # get workflow run blocks for task execution data + # Get workflow run blocks for task execution data. + # IMPORTANT: This returns ALL descendant blocks (including for-loop children + # and deeply nested blocks), not just top-level blocks. The batch task_id + # collection below and _process_forloop_children both depend on this. workflow_run_blocks = await app.DATABASE.observer.get_workflow_run_blocks( workflow_run_id=workflow_run_id, organization_id=organization_id ) @@ -119,6 +298,9 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz actions_by_task: dict[str, list[dict[str, Any]]] = {} task_v2_child_blocks = {} + # Pre-build parent -> children mapping for O(1) lookups in for-loop processing + children_by_parent = _build_children_by_parent(workflow_run_blocks) + # Loop through workflow run blocks and match to original definition blocks by label for definition_block in workflow_definition_blocks: # if definition_block.block_type == BlockType.TaskV2: @@ -202,71 +384,14 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz ) if run_block.block_type == BlockType.FOR_LOOP: - # Process ForLoop child blocks to get actions for task blocks inside the loop - # Child blocks have parent_workflow_run_block_id pointing to the ForLoop's workflow_run_block_id - child_run_blocks = [ - b for b in workflow_run_blocks if b.parent_workflow_run_block_id == run_block.workflow_run_block_id - ] - # Create mapping of child run blocks by label - child_run_blocks_by_label = {b.label: b for b in child_run_blocks if b.label} - - # Warn about any unlabeled child blocks that won't be matched - unlabeled_children = [b for b in child_run_blocks if not b.label] - if unlabeled_children: - LOG.warning( - "ForLoop has child blocks without labels - these will not be matched to loop_blocks definitions", - forloop_label=run_block.label, - unlabeled_count=len(unlabeled_children), - ) - - # Get loop_blocks from the definition block - loop_blocks = final_dump.get("loop_blocks", []) - - if loop_blocks and not child_run_blocks: - LOG.warning( - "ForLoop block has loop_blocks definitions but no child run blocks found", - forloop_label=run_block.label, - workflow_run_block_id=run_block.workflow_run_block_id, - loop_blocks_count=len(loop_blocks), - ) - updated_loop_blocks = [] - - for loop_block_def in loop_blocks: - loop_block_dump = loop_block_def.copy() if isinstance(loop_block_def, dict) else loop_block_def - loop_block_label = loop_block_dump.get("label") - - # Find matching child run block - child_run_block = child_run_blocks_by_label.get(loop_block_label) if loop_block_label else None - - if child_run_block and child_run_block.block_type in SCRIPT_TASK_BLOCKS and child_run_block.task_id: - # Use pre-fetched task data (batch fetched) - task = tasks_by_id.get(child_run_block.task_id) - if task: - task_dump = task.model_dump() - loop_block_dump.update({k: v for k, v in task_dump.items() if k not in loop_block_dump}) - loop_block_dump.update( - { - "task_id": child_run_block.task_id, - "status": child_run_block.status, - "output": child_run_block.output, - } - ) - - # Use pre-fetched actions (batch fetched) - actions = actions_by_task_id.get(child_run_block.task_id, []) - action_dumps = [_process_action_for_block(action, loop_block_dump) for action in actions] - actions_by_task[child_run_block.task_id] = action_dumps - else: - LOG.warning( - "Task not found for ForLoop child block", - task_id=child_run_block.task_id, - forloop_label=run_block.label, - ) - - updated_loop_blocks.append(loop_block_dump) - - # Update final_dump with the processed loop_blocks - final_dump["loop_blocks"] = updated_loop_blocks + final_dump["loop_blocks"] = _process_forloop_children( + forloop_run_block=run_block, + loop_blocks_def=final_dump.get("loop_blocks", []), + children_by_parent=children_by_parent, + tasks_by_id=tasks_by_id, + actions_by_task_id=actions_by_task_id, + actions_by_task=actions_by_task, + ) final_dump["workflow_run_id"] = workflow_run_id if run_block: diff --git a/skyvern/forge/sdk/db/repositories/scripts.py b/skyvern/forge/sdk/db/repositories/scripts.py index b97cf9725..0f1b82b87 100644 --- a/skyvern/forge/sdk/db/repositories/scripts.py +++ b/skyvern/forge/sdk/db/repositories/scripts.py @@ -555,6 +555,12 @@ class ScriptsRepository(BaseRepository): WorkflowScriptModel.workflow_permanent_id == workflow_permanent_id, WorkflowScriptModel.cache_key_value == cache_key_value, WorkflowScriptModel.deleted_at.is_(None), + # Exclude soft-deleted Script revisions so an empty/failed revision + # left behind by a crashed regeneration cannot be returned as the + # "latest" version for this cache key. Without this filter, runs + # observe has_script=True with script_block_count=0 (empty_blocks_detected + # regression tracked under SKY-8757). + ScriptModel.deleted_at.is_(None), ) ) diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index ef319100d..768c5099c 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -1125,6 +1125,14 @@ class WorkflowService: finalize=True, # Force regeneration to ensure field mappings have complete action data has_conditionals=has_conditionals, ) + else: + LOG.info( + "Skipping post-run script generation due to run status", + workflow_run_id=workflow_run_id, + workflow_permanent_id=workflow.workflow_permanent_id, + pre_finally_status=pre_finally_status, + blocks_to_update_count=len(blocks_to_update), + ) # Trigger AI Script Reviewer for adaptive caching workflows # Include terminated and failed runs — the reviewer filters to only @@ -2441,7 +2449,9 @@ class WorkflowService: "For-loop child blocks marked for caching", parent_label=block.label, child_labels=new_labels, + child_count=len(new_labels), workflow_run_id=workflow_run_id, + workflow_permanent_id=workflow.workflow_permanent_id, ) workflow_run, should_stop = await self._handle_block_result_status( @@ -5052,7 +5062,8 @@ class WorkflowService: block_labels=block_labels, code_gen=code_gen, workflow_run_id=workflow_run.workflow_run_id, - blocks_to_update=list(blocks_to_update), + workflow_permanent_id=workflow.workflow_permanent_id, + blocks_to_update_count=len(blocks_to_update), ) if block_labels and not code_gen: @@ -5210,17 +5221,27 @@ class WorkflowService: updated_block_labels=blocks_to_update, ) - # If generation failed (e.g. syntax error), clean up the empty script row - # to avoid orphaned versions that skip version numbers on next regeneration. + # If generation failed (e.g. syntax error, S3/DB contention), clean up + # the empty script row to avoid orphaned versions that skip version + # numbers AND to prevent later runs from finding a published revision + # with zero blocks (the empty_blocks_detected regression from SKY-8757). + # Check BOTH files and blocks — a revision with main.py but zero + # script_block rows still fails code-mode execution. script_files = await app.DATABASE.scripts.get_script_files( script_revision_id=regenerated_script.script_revision_id, organization_id=workflow.organization_id, ) - if not script_files: + script_blocks = await app.DATABASE.scripts.get_script_blocks_by_script_revision_id( + script_revision_id=regenerated_script.script_revision_id, + organization_id=workflow.organization_id, + ) + if not script_files or not script_blocks: LOG.warning( - "Script generation produced no files, soft-deleting empty version", + "Script generation produced no files or no blocks, soft-deleting empty version", script_id=regenerated_script.script_id, version=regenerated_script.version, + script_file_count=len(script_files), + script_block_count=len(script_blocks), ) await app.DATABASE.scripts.soft_delete_script_by_revision( script_revision_id=regenerated_script.script_revision_id, @@ -5271,6 +5292,14 @@ class WorkflowService: await _regenerate_script() return + LOG.debug( + "Creating new cached script (first run for this cache key)", + workflow_permanent_id=workflow.workflow_permanent_id, + workflow_run_id=workflow_run.workflow_run_id, + cache_key_value=rendered_cache_key_value, + blocks_to_update_count=len(blocks_to_update), + ) + created_script = await app.DATABASE.scripts.create_script( organization_id=workflow.organization_id, run_id=workflow_run.workflow_run_id, @@ -5284,6 +5313,32 @@ class WorkflowService: cached_script=None, updated_block_labels=None, ) + + # Mirror the regeneration path's post-write guard: if this first-time + # generation produced no files or no blocks, soft-delete the empty revision + # so it can't be observed by subsequent runs. (SKY-8757 follow-up.) + script_files = await app.DATABASE.scripts.get_script_files( + script_revision_id=created_script.script_revision_id, + organization_id=workflow.organization_id, + ) + script_blocks = await app.DATABASE.scripts.get_script_blocks_by_script_revision_id( + script_revision_id=created_script.script_revision_id, + organization_id=workflow.organization_id, + ) + if not script_files or not script_blocks: + LOG.warning( + "First-time script generation produced no files or no blocks, soft-deleting empty version", + script_id=created_script.script_id, + version=created_script.version, + script_file_count=len(script_files), + script_block_count=len(script_blocks), + ) + await app.DATABASE.scripts.soft_delete_script_by_revision( + script_revision_id=created_script.script_revision_id, + organization_id=workflow.organization_id, + ) + return + aio_task_primary_key = f"{created_script.script_id}_{created_script.version}" if aio_task_primary_key in app.ARTIFACT_MANAGER.upload_aiotasks_map: aio_tasks = app.ARTIFACT_MANAGER.upload_aiotasks_map[aio_task_primary_key] diff --git a/skyvern/services/workflow_script_service.py b/skyvern/services/workflow_script_service.py index 1ad33f6fd..0e7e62ba9 100644 --- a/skyvern/services/workflow_script_service.py +++ b/skyvern/services/workflow_script_service.py @@ -3,6 +3,7 @@ import hashlib import re import time import urllib.parse +from collections import deque from typing import Any, NamedTuple import structlog @@ -554,10 +555,24 @@ async def generate_workflow_script( updated_block_labels.update(missing_labels) updated_block_labels.add(settings.WORKFLOW_START_BLOCK_LABEL) + # Count all descendant blocks inside top-level for-loops (task, extraction, + # nested for-loops, etc). Does not include blocks inside task_v2 or conditionals. + forloop_descendant_count = 0 + for blk in codegen_input.workflow_blocks: + if blk.get("block_type") == "for_loop": + q: deque[dict[str, Any]] = deque(blk.get("loop_blocks", [])) + while q: + inner = q.popleft() + forloop_descendant_count += 1 + if inner.get("block_type") == "for_loop": + q.extend(inner.get("loop_blocks", [])) + LOG.info( "Script generation block analysis", workflow_run_id=workflow_run.workflow_run_id, - total_blocks=len(block_labels), + workflow_permanent_id=workflow.workflow_permanent_id, + total_top_level_blocks=len(block_labels), + forloop_descendant_blocks=forloop_descendant_count, cached_blocks=len(cached_block_sources), missing_blocks=len(missing_labels), blocks_to_regenerate=len(updated_block_labels), @@ -584,6 +599,8 @@ async def generate_workflow_script( generation_duration_ms = (time.monotonic() - generation_start) * 1000 LOG.error( "Failed to generate workflow script source", + workflow_run_id=workflow_run.workflow_run_id, + workflow_permanent_id=workflow.workflow_permanent_id, duration_ms=round(generation_duration_ms, 1), exc_info=True, ) @@ -598,6 +615,7 @@ async def generate_workflow_script( "Script generation completed", workflow_run_id=workflow_run.workflow_run_id, workflow_id=workflow.workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, cache_key_value=rendered_cache_key_value, duration_ms=round(generation_duration_ms, 1), script_size_bytes=len(python_src.encode("utf-8")), @@ -614,12 +632,16 @@ async def generate_workflow_script( blocks_failed=blocks_failed, ) - # Guard: if every block creation failed, do not persist a zero-block script - if blocks_created == 0 and blocks_failed > 0: + # Guard: never persist a zero-block script, regardless of whether the blocks + # failed or were silently skipped (e.g. generate_script.py:2901 fast-skip for + # blocks with no actions and no task_id). Publishing an empty revision is the + # `empty_blocks_detected=True` regression tracked under SKY-8757. + if blocks_created == 0: LOG.error( - "All script block creations failed — skipping WorkflowScript creation", + "Script generation produced zero blocks — skipping WorkflowScript creation", workflow_permanent_id=workflow.workflow_permanent_id, script_id=script.script_id, + script_revision_id=script.script_revision_id, blocks_failed=blocks_failed, ) return diff --git a/tests/unit/test_nested_forloop_script_generation.py b/tests/unit/test_nested_forloop_script_generation.py new file mode 100644 index 000000000..31b791231 --- /dev/null +++ b/tests/unit/test_nested_forloop_script_generation.py @@ -0,0 +1,1043 @@ +"""Tests for nested for-loop script generation and transformation (SKY-8757). + +Covers: +1. _process_forloop_children in transform_workflow_run.py — recursive merging + of task data for nested for-loop children. +2. generate_workflow_script_python_code in generate_script.py — code generation + for nested for-loop inner blocks (script_block creation + function bodies). +""" + +import ast +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from skyvern.core.script_generations.transform_workflow_run import ( + _build_children_by_parent, + _process_forloop_children, +) +from skyvern.schemas.workflows import BlockType +from skyvern.webeye.actions.actions import Action + + +# --------------------------------------------------------------------------- +# Part 1: _process_forloop_children tests +# --------------------------------------------------------------------------- +class TestProcessForloopChildren: + """Test recursive merging of for-loop children in transform_workflow_run.""" + + def test_single_level_merges_task_data(self) -> None: + """Direct task children get task_id and actions merged.""" + forloop_run_block = MagicMock() + forloop_run_block.workflow_run_block_id = "wfrb_outer" + forloop_run_block.label = "outer_loop" + + child_run_block = MagicMock() + child_run_block.workflow_run_block_id = "wfrb_child" + child_run_block.parent_workflow_run_block_id = "wfrb_outer" + child_run_block.block_type = "extraction" + child_run_block.label = "extract_data" + child_run_block.task_id = "task_1" + child_run_block.status = "completed" + child_run_block.output = {"data": "extracted"} + child_run_block.workflow_run_id = "wr_1" + + mock_task = MagicMock() + mock_task.model_dump.return_value = {"task_id": "task_1", "navigation_goal": "Extract"} + + mock_action = MagicMock(spec=Action) + mock_action.model_dump.return_value = {"action_type": "extract", "action_id": "a1"} + mock_action.get_xpath.return_value = "//div" + mock_action.has_mini_agent = False + mock_action.action_type = "extract" + mock_action.task_id = "task_1" + + loop_blocks_def = [ + {"block_type": "extraction", "label": "extract_data", "data_extraction_goal": "Extract"}, + ] + + actions_by_task: dict[str, list[dict[str, Any]]] = {} + all_blocks = [forloop_run_block, child_run_block] + + result = _process_forloop_children( + forloop_run_block=forloop_run_block, + loop_blocks_def=loop_blocks_def, + children_by_parent=_build_children_by_parent(all_blocks), + tasks_by_id={"task_1": mock_task}, + actions_by_task_id={"task_1": [mock_action]}, + actions_by_task=actions_by_task, + ) + + assert len(result) == 1 + assert result[0]["task_id"] == "task_1" + assert result[0]["status"] == "completed" + assert "task_1" in actions_by_task + + def test_nested_forloop_recurses_into_children(self) -> None: + """Nested for-loop's children should get task data merged recursively.""" + outer_run = MagicMock() + outer_run.workflow_run_block_id = "wfrb_outer" + outer_run.label = "outer_loop" + + inner_run = MagicMock() + inner_run.workflow_run_block_id = "wfrb_inner" + inner_run.parent_workflow_run_block_id = "wfrb_outer" + inner_run.block_type = BlockType.FOR_LOOP + inner_run.label = "inner_loop" + inner_run.task_id = None + inner_run.workflow_run_id = "wr_1" + + grandchild_run = MagicMock() + grandchild_run.workflow_run_block_id = "wfrb_grandchild" + grandchild_run.parent_workflow_run_block_id = "wfrb_inner" + grandchild_run.block_type = "extraction" + grandchild_run.label = "deep_extract" + grandchild_run.task_id = "task_deep" + grandchild_run.status = "completed" + grandchild_run.output = {"deep": True} + grandchild_run.workflow_run_id = "wr_1" + + mock_task = MagicMock() + mock_task.model_dump.return_value = {"task_id": "task_deep"} + + mock_action = MagicMock(spec=Action) + mock_action.model_dump.return_value = {"action_type": "extract"} + mock_action.get_xpath.return_value = "//span" + mock_action.has_mini_agent = False + mock_action.action_type = "extract" + mock_action.task_id = "task_deep" + + loop_blocks_def = [ + { + "block_type": "for_loop", + "label": "inner_loop", + "loop_blocks": [ + {"block_type": "extraction", "label": "deep_extract", "data_extraction_goal": "Deep extract"}, + ], + }, + ] + + all_run_blocks = [outer_run, inner_run, grandchild_run] + actions_by_task: dict[str, list[dict[str, Any]]] = {} + + result = _process_forloop_children( + forloop_run_block=outer_run, + loop_blocks_def=loop_blocks_def, + children_by_parent=_build_children_by_parent(all_run_blocks), + tasks_by_id={"task_deep": mock_task}, + actions_by_task_id={"task_deep": [mock_action]}, + actions_by_task=actions_by_task, + ) + + assert len(result) == 1 + inner_loop = result[0] + assert inner_loop["block_type"] == "for_loop" + assert inner_loop["workflow_run_block_id"] == "wfrb_inner" + + # Verify the grandchild got task data merged + inner_children = inner_loop.get("loop_blocks", []) + assert len(inner_children) == 1 + assert inner_children[0]["task_id"] == "task_deep" + assert "task_deep" in actions_by_task + + def test_multi_iteration_picks_best_task_block(self) -> None: + """When the outer loop iterates multiple times, the run block with task_id should win.""" + outer_run = MagicMock() + outer_run.workflow_run_block_id = "wfrb_outer" + outer_run.parent_workflow_run_block_id = None + outer_run.label = "outer_loop" + + # Iteration 1: has task_id (good) + child_iter1 = MagicMock() + child_iter1.workflow_run_block_id = "wfrb_child_iter1" + child_iter1.parent_workflow_run_block_id = "wfrb_outer" + child_iter1.block_type = "extraction" + child_iter1.label = "extract_data" + child_iter1.task_id = "task_good" + child_iter1.status = "completed" + child_iter1.output = {"data": True} + child_iter1.workflow_run_id = "wr_1" + + # Iteration 2: no task_id (empty iteration) + child_iter2 = MagicMock() + child_iter2.workflow_run_block_id = "wfrb_child_iter2" + child_iter2.parent_workflow_run_block_id = "wfrb_outer" + child_iter2.block_type = "extraction" + child_iter2.label = "extract_data" + child_iter2.task_id = None + child_iter2.status = None + child_iter2.output = None + child_iter2.workflow_run_id = "wr_1" + + mock_task = MagicMock() + mock_task.model_dump.return_value = {"task_id": "task_good"} + + mock_action = MagicMock(spec=Action) + mock_action.model_dump.return_value = {"action_type": "extract"} + mock_action.get_xpath.return_value = "//div" + mock_action.has_mini_agent = False + mock_action.action_type = "extract" + mock_action.task_id = "task_good" + + loop_blocks_def = [ + {"block_type": "extraction", "label": "extract_data", "data_extraction_goal": "Extract"}, + ] + + actions_by_task: dict[str, list[dict[str, Any]]] = {} + # child_iter2 comes after child_iter1 — old code would keep iter2 (no task_id) + all_blocks = [outer_run, child_iter1, child_iter2] + + result = _process_forloop_children( + forloop_run_block=outer_run, + loop_blocks_def=loop_blocks_def, + children_by_parent=_build_children_by_parent(all_blocks), + tasks_by_id={"task_good": mock_task}, + actions_by_task_id={"task_good": [mock_action]}, + actions_by_task=actions_by_task, + ) + + assert len(result) == 1 + # Should pick the block with task_id, not the empty one + assert result[0]["task_id"] == "task_good" + assert "task_good" in actions_by_task + + def test_multi_iteration_prefers_richer_actions(self) -> None: + """When both iterations have task_id, prefer the one with more actions.""" + outer_run = MagicMock() + outer_run.workflow_run_block_id = "wfrb_outer" + outer_run.parent_workflow_run_block_id = None + outer_run.label = "outer_loop" + + # Iteration 1: has task_id but only 1 action (partial) + child_iter1 = MagicMock() + child_iter1.workflow_run_block_id = "wfrb_child_iter1" + child_iter1.parent_workflow_run_block_id = "wfrb_outer" + child_iter1.block_type = "extraction" + child_iter1.label = "extract_data" + child_iter1.task_id = "task_partial" + child_iter1.status = "completed" + child_iter1.output = {} + child_iter1.workflow_run_id = "wr_1" + + # Iteration 2: has task_id with 3 actions (richer) + child_iter2 = MagicMock() + child_iter2.workflow_run_block_id = "wfrb_child_iter2" + child_iter2.parent_workflow_run_block_id = "wfrb_outer" + child_iter2.block_type = "extraction" + child_iter2.label = "extract_data" + child_iter2.task_id = "task_rich" + child_iter2.status = "completed" + child_iter2.output = {"data": "full"} + child_iter2.workflow_run_id = "wr_1" + + mock_task_partial = MagicMock() + mock_task_partial.model_dump.return_value = {"task_id": "task_partial"} + mock_task_rich = MagicMock() + mock_task_rich.model_dump.return_value = {"task_id": "task_rich"} + + def _make_action(task_id: str) -> MagicMock: + a = MagicMock(spec=Action) + a.model_dump.return_value = {"action_type": "extract"} + a.get_xpath.return_value = "//div" + a.has_mini_agent = False + a.action_type = "extract" + a.task_id = task_id + return a + + loop_blocks_def = [ + {"block_type": "extraction", "label": "extract_data", "data_extraction_goal": "Extract"}, + ] + + actions_by_task: dict[str, list[dict[str, Any]]] = {} + all_blocks = [outer_run, child_iter1, child_iter2] + + result = _process_forloop_children( + forloop_run_block=outer_run, + loop_blocks_def=loop_blocks_def, + children_by_parent=_build_children_by_parent(all_blocks), + tasks_by_id={"task_partial": mock_task_partial, "task_rich": mock_task_rich}, + actions_by_task_id={ + "task_partial": [_make_action("task_partial")], + "task_rich": [_make_action("task_rich"), _make_action("task_rich"), _make_action("task_rich")], + }, + actions_by_task=actions_by_task, + ) + + assert len(result) == 1 + # Should pick task_rich (3 actions) over task_partial (1 action) + assert result[0]["task_id"] == "task_rich" + assert "task_rich" in actions_by_task + + def test_multi_iteration_nested_forloop_picks_richest(self) -> None: + """When a nested for-loop has multiple iterations, pick the one with most grandchildren.""" + outer_run = MagicMock() + outer_run.workflow_run_block_id = "wfrb_outer" + outer_run.parent_workflow_run_block_id = None + outer_run.label = "outer_loop" + + # Iteration 1 of inner loop: has grandchildren + inner_iter1 = MagicMock() + inner_iter1.workflow_run_block_id = "wfrb_inner_iter1" + inner_iter1.parent_workflow_run_block_id = "wfrb_outer" + inner_iter1.block_type = BlockType.FOR_LOOP + inner_iter1.label = "inner_loop" + inner_iter1.task_id = None + inner_iter1.workflow_run_id = "wr_1" + + grandchild = MagicMock() + grandchild.workflow_run_block_id = "wfrb_grandchild" + grandchild.parent_workflow_run_block_id = "wfrb_inner_iter1" + grandchild.block_type = "extraction" + grandchild.label = "deep_extract" + grandchild.task_id = "task_deep" + grandchild.status = "completed" + grandchild.output = {"deep": True} + grandchild.workflow_run_id = "wr_1" + + # Iteration 2 of inner loop: empty (no grandchildren) + inner_iter2 = MagicMock() + inner_iter2.workflow_run_block_id = "wfrb_inner_iter2" + inner_iter2.parent_workflow_run_block_id = "wfrb_outer" + inner_iter2.block_type = BlockType.FOR_LOOP + inner_iter2.label = "inner_loop" + inner_iter2.task_id = None + inner_iter2.workflow_run_id = "wr_1" + + mock_task = MagicMock() + mock_task.model_dump.return_value = {"task_id": "task_deep"} + + mock_action = MagicMock(spec=Action) + mock_action.model_dump.return_value = {"action_type": "extract"} + mock_action.get_xpath.return_value = "//span" + mock_action.has_mini_agent = False + mock_action.action_type = "extract" + mock_action.task_id = "task_deep" + + loop_blocks_def = [ + { + "block_type": "for_loop", + "label": "inner_loop", + "loop_blocks": [ + {"block_type": "extraction", "label": "deep_extract", "data_extraction_goal": "Deep"}, + ], + }, + ] + + actions_by_task: dict[str, list[dict[str, Any]]] = {} + # inner_iter2 comes last but has no grandchildren — should pick inner_iter1 + all_blocks = [outer_run, inner_iter1, grandchild, inner_iter2] + + result = _process_forloop_children( + forloop_run_block=outer_run, + loop_blocks_def=loop_blocks_def, + children_by_parent=_build_children_by_parent(all_blocks), + tasks_by_id={"task_deep": mock_task}, + actions_by_task_id={"task_deep": [mock_action]}, + actions_by_task=actions_by_task, + ) + + assert len(result) == 1 + inner = result[0] + # Should have picked inner_iter1 (has grandchildren) + assert inner["workflow_run_block_id"] == "wfrb_inner_iter1" + # Grandchild should have task data merged + inner_children = inner.get("loop_blocks", []) + assert len(inner_children) == 1 + assert inner_children[0]["task_id"] == "task_deep" + assert "task_deep" in actions_by_task + + def test_nested_forloop_tie_broken_by_descendant_actions(self) -> None: + """When two nested for-loop iterations have the same child count, + prefer the one whose descendants have more actions.""" + outer_run = MagicMock() + outer_run.workflow_run_block_id = "wfrb_outer" + outer_run.parent_workflow_run_block_id = None + outer_run.label = "outer_loop" + + # Iteration 1: 1 grandchild, no actions + inner_iter1 = MagicMock() + inner_iter1.workflow_run_block_id = "wfrb_inner_iter1" + inner_iter1.parent_workflow_run_block_id = "wfrb_outer" + inner_iter1.block_type = BlockType.FOR_LOOP + inner_iter1.label = "inner_loop" + inner_iter1.task_id = None + inner_iter1.workflow_run_id = "wr_1" + + gc1 = MagicMock() + gc1.workflow_run_block_id = "wfrb_gc1" + gc1.parent_workflow_run_block_id = "wfrb_inner_iter1" + gc1.block_type = "extraction" + gc1.label = "deep_extract" + gc1.task_id = "task_empty" + gc1.status = "completed" + gc1.output = {} + gc1.workflow_run_id = "wr_1" + + # Iteration 2: 1 grandchild, with actions (richer) + inner_iter2 = MagicMock() + inner_iter2.workflow_run_block_id = "wfrb_inner_iter2" + inner_iter2.parent_workflow_run_block_id = "wfrb_outer" + inner_iter2.block_type = BlockType.FOR_LOOP + inner_iter2.label = "inner_loop" + inner_iter2.task_id = None + inner_iter2.workflow_run_id = "wr_1" + + gc2 = MagicMock() + gc2.workflow_run_block_id = "wfrb_gc2" + gc2.parent_workflow_run_block_id = "wfrb_inner_iter2" + gc2.block_type = "extraction" + gc2.label = "deep_extract" + gc2.task_id = "task_rich" + gc2.status = "completed" + gc2.output = {"data": True} + gc2.workflow_run_id = "wr_1" + + mock_task = MagicMock() + mock_task.model_dump.return_value = {"task_id": "task_rich"} + + mock_action = MagicMock(spec=Action) + mock_action.model_dump.return_value = {"action_type": "extract"} + mock_action.get_xpath.return_value = "//div" + mock_action.has_mini_agent = False + mock_action.action_type = "extract" + mock_action.task_id = "task_rich" + + loop_blocks_def = [ + { + "block_type": "for_loop", + "label": "inner_loop", + "loop_blocks": [ + {"block_type": "extraction", "label": "deep_extract", "data_extraction_goal": "Extract"}, + ], + }, + ] + + actions_by_task: dict[str, list[dict[str, Any]]] = {} + # Both iterations have 1 grandchild, but only iter2's grandchild has actions + all_blocks = [outer_run, inner_iter1, gc1, inner_iter2, gc2] + + result = _process_forloop_children( + forloop_run_block=outer_run, + loop_blocks_def=loop_blocks_def, + children_by_parent=_build_children_by_parent(all_blocks), + tasks_by_id={"task_rich": mock_task}, + actions_by_task_id={"task_rich": [mock_action]}, # only task_rich has actions + actions_by_task=actions_by_task, + ) + + assert len(result) == 1 + inner = result[0] + # Should pick iter2 (descendant has actions) over iter1 (no actions) + assert inner["workflow_run_block_id"] == "wfrb_inner_iter2" + + def test_no_matching_run_block_preserves_definition(self) -> None: + """If no run block matches a definition child, the definition is preserved unchanged.""" + forloop_run = MagicMock() + forloop_run.workflow_run_block_id = "wfrb_loop" + forloop_run.label = "loop" + + loop_blocks_def = [ + {"block_type": "extraction", "label": "unexecuted_block"}, + ] + + actions_by_task: dict[str, list[dict[str, Any]]] = {} + + result = _process_forloop_children( + forloop_run_block=forloop_run, + loop_blocks_def=loop_blocks_def, + children_by_parent=_build_children_by_parent([forloop_run]), + tasks_by_id={}, + actions_by_task_id={}, + actions_by_task=actions_by_task, + ) + + assert len(result) == 1 + assert result[0]["label"] == "unexecuted_block" + assert "task_id" not in result[0] + + +# --------------------------------------------------------------------------- +# Part 2: generate_workflow_script_python_code tests for nested for-loops +# --------------------------------------------------------------------------- +class TestNestedForloopCodeGeneration: + """Test that nested for-loops generate correct script blocks and code.""" + + @pytest.mark.asyncio + async def test_nested_forloop_creates_script_blocks_for_all_levels(self) -> None: + """A double-nested for-loop should create script_blocks for: + 1. Outer for-loop + 2. Inner for-loop + 3. Inner task blocks + """ + from skyvern.core.script_generations.generate_script import generate_workflow_script_python_code + + blocks = [ + { + "block_type": "for_loop", + "label": "outer_loop", + "loop_variable_reference": "{{ urls }}", + "workflow_run_block_id": "wfrb_outer", + "loop_blocks": [ + { + "block_type": "for_loop", + "label": "inner_loop", + "loop_variable_reference": "{{ documents }}", + "workflow_run_block_id": "wfrb_inner", + "loop_blocks": [ + { + "block_type": "extraction", + "label": "extract_data", + "data_extraction_goal": "Get content", + "task_id": "task_extract", + "workflow_run_block_id": "wfrb_extract", + }, + ], + }, + ], + }, + ] + + actions_by_task = { + "task_extract": [ + { + "action_type": "extract", + "action_id": "action_1", + "xpath": "//div[@id='content']", + "element_id": "elem_1", + "text": None, + "data_extraction_goal": "Get content", + }, + ], + } + + workflow = { + "workflow_id": "wf_test", + "title": "Nested ForLoop Test", + "workflow_definition": {"parameters": []}, + } + + mock_create_script_block = AsyncMock(return_value=True) + + with ( + patch( + "skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema", + new_callable=AsyncMock, + return_value=("", {}), + ), + patch( + "skyvern.core.script_generations.generate_script.create_or_update_script_block", + mock_create_script_block, + ), + ): + result = await generate_workflow_script_python_code( + file_name="test_nested.py", + workflow_run_request={"workflow_id": "wpid_test"}, + workflow=workflow, + blocks=blocks, + actions_by_task=actions_by_task, + script_id="script_1", + script_revision_id="rev_1", + organization_id="org_1", + ) + + # Must compile + try: + ast.parse(result.source_code) + except SyntaxError as e: + pytest.fail(f"Generated script has SyntaxError: {e}\n\n{result.source_code}") + + # Verify script blocks created for all three levels + call_labels = [call.kwargs.get("block_label") for call in mock_create_script_block.call_args_list] + assert "outer_loop" in call_labels, f"outer for-loop missing. Labels: {call_labels}" + assert "inner_loop" in call_labels, f"inner for-loop missing. Labels: {call_labels}" + assert "extract_data" in call_labels, f"inner extraction missing. Labels: {call_labels}" + + @pytest.mark.asyncio + async def test_nested_forloop_inner_block_gets_cached_function(self) -> None: + """The extraction block inside a nested for-loop should get a @skyvern.cached function.""" + from skyvern.core.script_generations.generate_script import generate_workflow_script_python_code + + blocks = [ + { + "block_type": "for_loop", + "label": "page_loop", + "loop_variable_reference": "{{ pages }}", + "workflow_run_block_id": "wfrb_page", + "loop_blocks": [ + { + "block_type": "for_loop", + "label": "doc_loop", + "loop_variable_reference": "{{ docs }}", + "workflow_run_block_id": "wfrb_doc", + "loop_blocks": [ + { + "block_type": "file_download", + "label": "download_file", + "url": "https://example.com", + "navigation_goal": "Download the file", + "task_id": "task_download", + "workflow_run_block_id": "wfrb_download", + }, + ], + }, + ], + }, + ] + + actions_by_task = { + "task_download": [ + { + "action_type": "click", + "action_id": "act_1", + "xpath": "//a[@class='download']", + "element_id": "elem_dl", + "text": None, + }, + ], + } + + workflow = { + "workflow_id": "wf_test", + "title": "Nested Download Test", + "workflow_definition": {"parameters": []}, + } + + mock_create_script_block = AsyncMock(return_value=True) + + with ( + patch( + "skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema", + new_callable=AsyncMock, + return_value=("", {}), + ), + patch( + "skyvern.core.script_generations.generate_script.create_or_update_script_block", + mock_create_script_block, + ), + ): + result = await generate_workflow_script_python_code( + file_name="test_nested_download.py", + workflow_run_request={"workflow_id": "wpid_test"}, + workflow=workflow, + blocks=blocks, + actions_by_task=actions_by_task, + script_id="script_1", + script_revision_id="rev_1", + organization_id="org_1", + ) + + try: + ast.parse(result.source_code) + except SyntaxError as e: + pytest.fail(f"SyntaxError: {e}\n\n{result.source_code}") + + # Inner block should have a @skyvern.cached function + assert "@skyvern.cached" in result.source_code + assert "download_file" in result.source_code + + @pytest.mark.asyncio + async def test_nested_forloop_labels_tracked_in_processed_labels(self) -> None: + """Nested for-loop labels should be tracked to avoid duplication + when preserving unexecuted branch cached blocks.""" + from skyvern.core.script_generations.generate_script import generate_workflow_script_python_code + + blocks = [ + { + "block_type": "for_loop", + "label": "outer_loop", + "loop_variable_reference": "{{ urls }}", + "workflow_run_block_id": "wfrb_outer", + "loop_blocks": [ + { + "block_type": "for_loop", + "label": "inner_loop", + "loop_variable_reference": "{{ items }}", + "workflow_run_block_id": "wfrb_inner", + "loop_blocks": [ + { + "block_type": "extraction", + "label": "deep_extract", + "data_extraction_goal": "Extract", + "task_id": "task_deep", + "workflow_run_block_id": "wfrb_deep", + }, + ], + }, + ], + }, + ] + + actions_by_task = { + "task_deep": [ + { + "action_type": "extract", + "action_id": "a1", + "xpath": "//div", + "element_id": "e1", + "text": None, + "data_extraction_goal": "Extract", + }, + ], + } + + # Also provide the same labels as cached_blocks to test dedup + mock_cached_extract = MagicMock() + mock_cached_extract.code = "@skyvern.cached(cache_key='deep_extract')\nasync def deep_extract_fn(): pass" + mock_cached_extract.run_signature = "await skyvern.extract(prompt='Extract', label='deep_extract')" + mock_cached_extract.workflow_run_id = "wr_old" + mock_cached_extract.workflow_run_block_id = "wfrb_old" + mock_cached_extract.input_fields = None + + mock_cached_loop = MagicMock() + mock_cached_loop.code = ( + "async for current_value in skyvern.loop(values='{{ items }}', label='inner_loop'): pass" + ) + mock_cached_loop.run_signature = ( + "async for current_value in skyvern.loop(values='{{ items }}', label='inner_loop'): pass" + ) + mock_cached_loop.workflow_run_id = "wr_old" + mock_cached_loop.workflow_run_block_id = "wfrb_old" + mock_cached_loop.input_fields = None + + workflow = { + "workflow_id": "wf_test", + "title": "Dedup Test", + "workflow_definition": {"parameters": []}, + } + + mock_create_script_block = AsyncMock(return_value=True) + + with ( + patch( + "skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema", + new_callable=AsyncMock, + return_value=("", {}), + ), + patch( + "skyvern.core.script_generations.generate_script.create_or_update_script_block", + mock_create_script_block, + ), + ): + result = await generate_workflow_script_python_code( + file_name="test_dedup.py", + workflow_run_request={"workflow_id": "wpid_test"}, + workflow=workflow, + blocks=blocks, + actions_by_task=actions_by_task, + script_id="s1", + script_revision_id="r1", + organization_id="o1", + cached_blocks={ + "deep_extract": mock_cached_extract, + "inner_loop": mock_cached_loop, + }, + ) + + try: + ast.parse(result.source_code) + except SyntaxError as e: + pytest.fail(f"SyntaxError: {e}\n\n{result.source_code}") + + # @skyvern.cached should appear exactly once (not duplicated by + # the "preserve unexecuted branch" section) + cached_count = result.source_code.count("@skyvern.cached") + assert cached_count == 1, ( + f"Expected 1 @skyvern.cached but found {cached_count}. " + f"Nested labels may not be tracked in processed_labels.\n\n{result.source_code}" + ) + + @pytest.mark.asyncio + async def test_nested_forloop_uses_cached_entry_when_valid(self) -> None: + """When a nested for-loop has a valid cached entry and is NOT in + updated_block_labels, create_or_update_script_block should still be + called (to persist metadata) but use the cached code, not rebuild.""" + from skyvern.core.script_generations.generate_script import generate_workflow_script_python_code + + cached_inner_loop = MagicMock() + cached_inner_loop.code = ( + "async for current_value in skyvern.loop(values='{{ docs }}', label='inner_loop'):\n pass" + ) + cached_inner_loop.run_signature = cached_inner_loop.code.strip() + cached_inner_loop.workflow_run_id = "wr_cached" + cached_inner_loop.workflow_run_block_id = "wfrb_cached" + cached_inner_loop.input_fields = None + + cached_deep_extract = MagicMock() + cached_deep_extract.code = "@skyvern.cached(cache_key='deep_extract')\nasync def deep_extract_fn(page, context):\n await skyvern.extract(prompt='Get data', label='deep_extract')" + cached_deep_extract.run_signature = "await skyvern.extract(prompt='Get data', label='deep_extract')" + cached_deep_extract.workflow_run_id = "wr_cached" + cached_deep_extract.workflow_run_block_id = "wfrb_cached_deep" + cached_deep_extract.input_fields = None + + blocks = [ + { + "block_type": "for_loop", + "label": "outer_loop", + "loop_variable_reference": "{{ urls }}", + "workflow_run_block_id": "wfrb_outer", + "loop_blocks": [ + { + "block_type": "for_loop", + "label": "inner_loop", + "loop_variable_reference": "{{ docs }}", + "workflow_run_block_id": "wfrb_inner", + "loop_blocks": [ + { + "block_type": "extraction", + "label": "deep_extract", + "data_extraction_goal": "Get data", + "task_id": "task_deep", + "workflow_run_block_id": "wfrb_deep", + }, + ], + }, + ], + }, + ] + + workflow = { + "workflow_id": "wf_test", + "title": "Cache Hit Test", + "workflow_definition": {"parameters": []}, + } + + mock_create_script_block = AsyncMock(return_value=True) + + with ( + patch( + "skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema", + new_callable=AsyncMock, + return_value=("", {}), + ), + patch( + "skyvern.core.script_generations.generate_script.create_or_update_script_block", + mock_create_script_block, + ), + ): + result = await generate_workflow_script_python_code( + file_name="test_cache_hit.py", + workflow_run_request={"workflow_id": "wpid_test"}, + workflow=workflow, + blocks=blocks, + actions_by_task={}, # No fresh actions — relying on cache + script_id="s1", + script_revision_id="r1", + organization_id="o1", + cached_blocks={ + "inner_loop": cached_inner_loop, + "deep_extract": cached_deep_extract, + }, + # Neither inner_loop nor its parent outer_loop are in + # updated_block_labels → should use cached entries. + updated_block_labels={"__start_block__"}, + ) + + try: + ast.parse(result.source_code) + except SyntaxError as e: + pytest.fail(f"SyntaxError: {e}\n\n{result.source_code}") + + # The nested for-loop's cached code should be used + call_labels = [call.kwargs.get("block_label") for call in mock_create_script_block.call_args_list] + # inner_loop and deep_extract should both have script_block entries + assert "inner_loop" in call_labels, f"inner_loop missing from calls: {call_labels}" + assert "deep_extract" in call_labels, f"deep_extract missing from calls: {call_labels}" + + # The inner_loop call should use the cached code, not freshly built + inner_loop_call = next( + c for c in mock_create_script_block.call_args_list if c.kwargs.get("block_label") == "inner_loop" + ) + assert inner_loop_call.kwargs["block_code"] == cached_inner_loop.code + assert inner_loop_call.kwargs["workflow_run_id"] == "wr_cached" + + @pytest.mark.asyncio + async def test_nested_forloop_extraction_url_uses_render_template(self) -> None: + """Regression: nested extraction with a templated URL (e.g. `{{ outer_loop.current_value.url }}`) + must be emitted as a `skyvern.render_template("...")` call, not a Python literal. + + This is the follow-up to SKY-8757 surfaced by tests/manual/test_nested_forloop_workflow.py's + second-run cache-hit check. Without this fix, the cached code ran `await skyvern.extract(...)` + without a `url=` arg and the fallback `ExtractionBlock(url=None)` hit + `InvalidWorkflowTaskURLState` at agent.py:200 on every cache hit. + """ + from skyvern.core.script_generations.generate_script import generate_workflow_script_python_code + + blocks = [ + { + "block_type": "for_loop", + "label": "outer_page_loop", + "loop_variable_reference": "{{ pages }}", + "workflow_run_block_id": "wfrb_outer", + "loop_blocks": [ + { + "block_type": "for_loop", + "label": "inner_field_loop", + "loop_variable_reference": "{{ current_value.fields }}", + "workflow_run_block_id": "wfrb_inner", + "loop_blocks": [ + { + "block_type": "extraction", + "label": "extract_field_data", + "url": "{{ outer_page_loop.current_value.url }}", + "data_extraction_goal": "Extract {{ current_value }}.", + "data_schema": {"type": "object", "properties": {}}, + "task_id": "task_extract", + "workflow_run_block_id": "wfrb_extract", + }, + ], + }, + ], + }, + ] + + actions_by_task = { + "task_extract": [ + { + "action_type": "extract", + "action_id": "action_1", + "xpath": "//div", + "element_id": "elem_1", + "text": None, + "data_extraction_goal": "Extract", + }, + ], + } + + workflow = { + "workflow_id": "wf_test", + "title": "Nested Templated URL Test", + "workflow_definition": {"parameters": []}, + } + + mock_create_script_block = AsyncMock(return_value=True) + + with ( + patch( + "skyvern.core.script_generations.generate_script.generate_workflow_parameters_schema", + new_callable=AsyncMock, + return_value=("", {}), + ), + patch( + "skyvern.core.script_generations.generate_script.create_or_update_script_block", + mock_create_script_block, + ), + ): + result = await generate_workflow_script_python_code( + file_name="test_templated_url.py", + workflow_run_request={"workflow_id": "wpid_test"}, + workflow=workflow, + blocks=blocks, + actions_by_task=actions_by_task, + script_id="script_1", + script_revision_id="rev_1", + organization_id="org_1", + ) + + # Generated code must compile. + try: + ast.parse(result.source_code) + except SyntaxError as e: + pytest.fail(f"Generated script has SyntaxError: {e}\n\n{result.source_code}") + + # Find the block_code stored for extract_field_data. + extract_call = next( + ( + c + for c in mock_create_script_block.call_args_list + if c.kwargs.get("block_label") == "extract_field_data" + ), + None, + ) + assert extract_call is not None, ( + "No create_or_update_script_block call for extract_field_data. " + f"Labels seen: {[c.kwargs.get('block_label') for c in mock_create_script_block.call_args_list]}" + ) + block_code: str = extract_call.kwargs["block_code"] + + # The URL must be emitted as a skyvern.render_template() call so that + # {{ outer_page_loop.current_value.url }} resolves at runtime from the + # workflow_run_context.values populated by skyvern.loop(). + assert "skyvern.render_template" in block_code, ( + f"extract_field_data block_code does not contain skyvern.render_template:\n\n{block_code}" + ) + assert "{{ outer_page_loop.current_value.url }}" in block_code, ( + f"Template string not present in block_code:\n\n{block_code}" + ) + # And must NOT appear as a raw Python literal passed to extract(..., url=...). + assert "url='{{ outer_page_loop.current_value.url }}'" not in block_code, ( + f"URL was emitted as a literal, not a render_template call:\n\n{block_code}" + ) + assert 'url="{{ outer_page_loop.current_value.url }}"' not in block_code, ( + f"URL was emitted as a literal, not a render_template call:\n\n{block_code}" + ) + + +# --------------------------------------------------------------------------- +# Part 3: _render_value unit tests +# --------------------------------------------------------------------------- +class TestRenderValue: + """Unit tests for the _render_value CST helper in generate_script.py.""" + + def test_empty_or_none_returns_valid_cst_node(self) -> None: + """Empty/None prompts return valid CST nodes that libcst can serialize. + + Regression: the original helper returned `cst.SimpleString("")` which + libcst rejects because it lacks enclosing quotes. The helper now + delegates to `_value` for the empty/None case — `_value("")` emits a + `SimpleString("''")` and `_value(None)` emits `Name("None")`. + """ + import libcst as cst + + from skyvern.core.script_generations.generate_script import _render_value + + module = cst.Module(body=[]) + + result_empty = _render_value("") + assert isinstance(result_empty, cst.SimpleString) + assert module.code_for_node(result_empty) == "''" + + # None passes through _value → cst.parse_expression("None") which + # is a Name node, not a SimpleString, but still a valid BaseExpression. + result_none = _render_value(None) + assert module.code_for_node(result_none) == "None" + + def test_plain_string_returns_literal_simple_string(self) -> None: + """A non-template string falls back to _value() and emits a Python literal.""" + import libcst as cst + + from skyvern.core.script_generations.generate_script import _render_value + + result = _render_value("https://example.com") + # _value wraps strings via repr() → "'https://example.com'" + assert isinstance(result, cst.SimpleString) + assert result.value == "'https://example.com'" + + def test_template_string_emits_render_template_call(self) -> None: + """A string containing {{...}} is emitted as skyvern.render_template(...) call.""" + import libcst as cst + + from skyvern.core.script_generations.generate_script import _render_value + + result = _render_value("{{ outer_page_loop.current_value.url }}") + assert isinstance(result, cst.Call) + # The call should be skyvern.render_template("{{ ... }}") + module = cst.Module(body=[]) + rendered = module.code_for_node(result) + assert rendered.startswith("skyvern.render_template(") + assert '"{{ outer_page_loop.current_value.url }}"' in rendered or ( + "'{{ outer_page_loop.current_value.url }}'" in rendered + ) + + def test_template_string_with_data_variable_appends_kwarg(self) -> None: + """When a data_variable_name is provided, it is passed as a data= kwarg.""" + import libcst as cst + + from skyvern.core.script_generations.generate_script import _render_value + + result = _render_value("{{ current_value }}", data_variable_name="context_params") + assert isinstance(result, cst.Call) + module = cst.Module(body=[]) + rendered = module.code_for_node(result) + # libcst may emit with or without whitespace around =; accept both. + assert "data=context_params" in rendered or "data = context_params" in rendered