Optimize caching: content dedup, recursive for-loop tracking, observability (SKY-8684) (#5371)

This commit is contained in:
Aaron Perez 2026-04-03 14:38:39 -05:00 committed by GitHub
parent 12b80f531a
commit 58bbc7dbb9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1637 additions and 195 deletions

View file

@ -144,6 +144,32 @@ BLOCK_TYPES_THAT_SHOULD_BE_CACHED = {
}
def _collect_uncached_loop_children(
block: ForLoopBlock,
script_blocks_by_label: dict[str, object],
blocks_to_update: set[str],
) -> None:
"""Recursively collect uncached cacheable children from nested for-loops.
ForLoopBlock children execute via block.py's execute_loop_helper(),
bypassing _execute_single_block() where blocks_to_update tracking lives.
This function walks all nesting levels so the script generator produces
cached functions for deeply nested blocks (e.g., file_download inside
a double-nested for-loop).
"""
for child in block.loop_blocks:
if (
child.label
and child.label not in script_blocks_by_label
and child.block_type in BLOCK_TYPES_THAT_SHOULD_BE_CACHED
):
blocks_to_update.add(child.label)
# Recurse into nested for-loops regardless of whether the for-loop
# itself is cached — its children may not be.
if isinstance(child, ForLoopBlock):
_collect_uncached_loop_children(child, script_blocks_by_label, blocks_to_update)
def _extract_blocks_info(blocks: list[BLOCK_YAML_TYPES]) -> list[dict[str, str]]:
"""Extract lightweight info from blocks for title generation (limit to first 5)."""
blocks_info: list[dict[str, str]] = []
@ -1390,6 +1416,24 @@ class WorkflowService:
if ctx:
ctx.script_mode = True
# SKY-8684: Detect empty-block scripts and ensure regeneration.
# When a WorkflowScript exists but has zero usable ScriptBlock records,
# the run correctly falls through to code_generation mode. However,
# generate_script was set to False (in execute_workflow) because the
# script exists. Override it to True so per-block generation fires
# and post-run finalize can regenerate the script.
if script and is_script_run and not script_blocks_by_label:
LOG.warning(
"Script exists but has zero usable blocks — will regenerate",
workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id,
script_id=script.script_id,
script_revision_id=script.script_revision_id,
)
regen_ctx = skyvern_context.current()
if regen_ctx:
regen_ctx.generate_script = True
# Single source-of-truth log for how this run will execute.
# Three modes:
# "code" — cached script loaded, executing code
@ -1418,6 +1462,7 @@ class WorkflowService:
script_id=script.script_id if script else None,
script_revision_id=script.script_revision_id if script else None,
script_block_count=len(script_blocks_by_label),
empty_blocks_detected=script is not None and is_script_run and not script_blocks_by_label,
)
if block_labels and len(block_labels):
@ -2367,28 +2412,24 @@ class WorkflowService:
# Track uncached for-loop child blocks for regeneration.
# ForLoopBlock children execute via block.py's execute_loop_helper(),
# bypassing _execute_single_block. Without this, their labels never
# reach blocks_to_update and the script generator never produces
# cached functions for them (e.g., file_download inside a loop).
# bypassing _execute_single_block. Recursively walk all nesting levels
# so deeply nested blocks (e.g., file_download inside a double-nested
# for-loop) get cached functions generated.
if (
isinstance(block, ForLoopBlock)
and (is_adaptive_caching(workflow, workflow_run) or is_script_run)
and workflow_run_block_result.status in cacheable_statuses
):
for loop_child in block.loop_blocks:
if (
loop_child.label
and loop_child.label not in script_blocks_by_label
and loop_child.block_type in BLOCK_TYPES_THAT_SHOULD_BE_CACHED
):
blocks_to_update.add(loop_child.label)
LOG.info(
"For-loop child block marked for caching",
parent_label=block.label,
child_label=loop_child.label,
child_block_type=loop_child.block_type,
workflow_run_id=workflow_run_id,
)
previous_labels = set(blocks_to_update)
_collect_uncached_loop_children(block, script_blocks_by_label, blocks_to_update)
new_labels = sorted(blocks_to_update - previous_labels)
if new_labels:
LOG.info(
"For-loop child blocks marked for caching",
parent_label=block.label,
child_labels=new_labels,
workflow_run_id=workflow_run_id,
)
workflow_run, should_stop = await self._handle_block_result_status(
block=block,