mirror of
https://github.com/Skyvern-AI/skyvern.git
synced 2026-04-28 03:30:10 +00:00
fix: use started_at for run history duration calculation (#5094)
This commit is contained in:
parent
e08ff39a49
commit
76bb2dee93
5 changed files with 792 additions and 2 deletions
|
|
@ -40,6 +40,10 @@ from skyvern.forge.sdk.workflow.models.parameter import ParameterType
|
|||
from skyvern.forge.sdk.workflow.models.workflow import Workflow
|
||||
from skyvern.forge.sdk.workflow.workflow_definition_converter import convert_workflow_definition
|
||||
from skyvern.schemas.workflows import (
|
||||
BlockYAML,
|
||||
BranchConditionYAML,
|
||||
ConditionalBlockYAML,
|
||||
ForLoopBlockYAML,
|
||||
LoginBlockYAML,
|
||||
WorkflowCreateYAMLRequest,
|
||||
WorkflowDefinitionYAML,
|
||||
|
|
@ -302,6 +306,233 @@ async def _auto_correct_workflow_yaml(
|
|||
return action_data.get("workflow_yaml", workflow_yaml)
|
||||
|
||||
|
||||
def _collect_reachable(
|
||||
start_label: str,
|
||||
label_to_block: dict[str, BlockYAML],
|
||||
reachable: set[str],
|
||||
) -> None:
|
||||
"""Walk the next_block_label chain from start_label, collecting all reachable labels.
|
||||
|
||||
For conditional blocks, also follows branch target chains recursively.
|
||||
|
||||
The ``current not in reachable`` loop guard means the main-chain walk
|
||||
stops early if we hit a node already collected via a branch recursion.
|
||||
This is correct — those downstream nodes and their successors are
|
||||
already in ``reachable`` — but callers should be aware of the coupling.
|
||||
"""
|
||||
current: str | None = start_label
|
||||
while current and current in label_to_block and current not in reachable:
|
||||
reachable.add(current)
|
||||
block = label_to_block[current]
|
||||
if isinstance(block, ConditionalBlockYAML):
|
||||
for branch in block.branch_conditions:
|
||||
if branch.next_block_label and branch.next_block_label not in reachable:
|
||||
_collect_reachable(branch.next_block_label, label_to_block, reachable)
|
||||
current = block.next_block_label
|
||||
|
||||
|
||||
def _break_cycles(
|
||||
start_label: str,
|
||||
label_to_block: dict[str, BlockYAML],
|
||||
) -> bool:
|
||||
"""Detect and break circular references in the block chain using DFS.
|
||||
|
||||
Uses a recursion stack to distinguish true back-edges (cycles) from merge
|
||||
points (two branches converging on the same block). When a back-edge is
|
||||
found the offending ``next_block_label`` is set to ``None``, breaking the
|
||||
cycle. Handles both the main chain and conditional branch chains.
|
||||
|
||||
Note: this function operates on a single level of blocks. It does **not**
|
||||
recurse into ``ForLoopBlockYAML.loop_blocks``; nested loops are handled
|
||||
by the recursive ``_repair_next_block_label_chain`` call in Phase 3.
|
||||
|
||||
Returns True if at least one cycle was broken.
|
||||
"""
|
||||
visited: set[str] = set()
|
||||
rec_stack: set[str] = set()
|
||||
found_cycle = False
|
||||
|
||||
def _follow_edge(target: str | None, edge_owner: BlockYAML | BranchConditionYAML, parent_label: str) -> None:
|
||||
"""Follow an edge to *target*. *edge_owner* is the object whose
|
||||
``next_block_label`` will be set to ``None`` when the target forms a
|
||||
back-edge. *parent_label* is the block label that owns this edge
|
||||
for logging."""
|
||||
nonlocal found_cycle
|
||||
if not target or target not in label_to_block:
|
||||
return
|
||||
if target in rec_stack:
|
||||
is_branch = hasattr(edge_owner, "criteria")
|
||||
LOG.warning(
|
||||
"Copilot produced circular block chain, breaking cycle",
|
||||
cycle_target=target,
|
||||
broken_at=parent_label,
|
||||
is_branch_condition=is_branch,
|
||||
branch_expression=getattr(getattr(edge_owner, "criteria", None), "expression", None),
|
||||
)
|
||||
edge_owner.next_block_label = None
|
||||
found_cycle = True
|
||||
return
|
||||
if target in visited:
|
||||
return # merge point — not a cycle
|
||||
_dfs(target)
|
||||
|
||||
def _dfs(label: str) -> None:
|
||||
visited.add(label)
|
||||
rec_stack.add(label)
|
||||
block = label_to_block[label]
|
||||
|
||||
if isinstance(block, ConditionalBlockYAML):
|
||||
for branch in block.branch_conditions:
|
||||
_follow_edge(branch.next_block_label, branch, label)
|
||||
|
||||
_follow_edge(block.next_block_label, block, label)
|
||||
rec_stack.discard(label)
|
||||
|
||||
if start_label in label_to_block:
|
||||
_dfs(start_label)
|
||||
return found_cycle
|
||||
|
||||
|
||||
def _find_terminal_label(
|
||||
start_label: str,
|
||||
label_to_block: dict[str, BlockYAML],
|
||||
all_labels: set[str],
|
||||
) -> str | None:
|
||||
"""Find the terminal block by walking the main chain from start_label."""
|
||||
visited: set[str] = set()
|
||||
current: str | None = start_label
|
||||
while current and current in label_to_block and current not in visited:
|
||||
visited.add(current)
|
||||
block = label_to_block[current]
|
||||
if block.next_block_label is None or block.next_block_label not in all_labels:
|
||||
return current
|
||||
current = block.next_block_label
|
||||
return None
|
||||
|
||||
|
||||
def _order_orphaned_blocks(
|
||||
orphaned_labels: set[str],
|
||||
label_to_block: dict[str, BlockYAML],
|
||||
all_labels: set[str],
|
||||
blocks: list[BlockYAML],
|
||||
) -> list[str]:
|
||||
"""Order orphaned blocks by following their internal next_block_label chains.
|
||||
|
||||
Multiple disconnected orphan sub-chains are concatenated in the order their
|
||||
chain-start appears in the original blocks list.
|
||||
"""
|
||||
pointed_to: set[str] = set()
|
||||
for label in orphaned_labels:
|
||||
block = label_to_block[label]
|
||||
if block.next_block_label and block.next_block_label in orphaned_labels:
|
||||
pointed_to.add(block.next_block_label)
|
||||
|
||||
# Chain starts are orphans not pointed to by another orphan.
|
||||
# Preserve original array order for deterministic stitching.
|
||||
chain_starts = [b.label for b in blocks if b.label in orphaned_labels and b.label not in pointed_to]
|
||||
|
||||
# If all orphans point to each other (cycle), pick the first in array order.
|
||||
if not chain_starts:
|
||||
chain_starts = [next(b.label for b in blocks if b.label in orphaned_labels)]
|
||||
|
||||
ordered: list[str] = []
|
||||
visited: set[str] = set()
|
||||
for start in chain_starts:
|
||||
current: str | None = start
|
||||
while current and current in orphaned_labels and current not in visited:
|
||||
visited.add(current)
|
||||
ordered.append(current)
|
||||
current = label_to_block[current].next_block_label
|
||||
|
||||
# Append any remaining orphans not reached (multiple cycles).
|
||||
for block in blocks:
|
||||
if block.label in orphaned_labels and block.label not in visited:
|
||||
ordered.append(block.label)
|
||||
|
||||
# Re-link the orphan chain so it forms a single connected path.
|
||||
# This may overwrite an orphan's original next_block_label that pointed to a
|
||||
# reachable block (a merge/join pattern). Log when this happens.
|
||||
for i in range(len(ordered) - 1):
|
||||
old_target = label_to_block[ordered[i]].next_block_label
|
||||
new_target = ordered[i + 1]
|
||||
if old_target and old_target != new_target and old_target not in orphaned_labels:
|
||||
LOG.info(
|
||||
"Orphan re-link overwrites cross-chain reference",
|
||||
block=ordered[i],
|
||||
old_target=old_target,
|
||||
new_target=new_target,
|
||||
)
|
||||
label_to_block[ordered[i]].next_block_label = new_target
|
||||
if ordered:
|
||||
old_last_target = label_to_block[ordered[-1]].next_block_label
|
||||
if old_last_target and old_last_target not in orphaned_labels:
|
||||
LOG.info(
|
||||
"Orphan chain terminal overwrites cross-chain reference",
|
||||
block=ordered[-1],
|
||||
old_target=old_last_target,
|
||||
)
|
||||
label_to_block[ordered[-1]].next_block_label = None
|
||||
|
||||
return ordered
|
||||
|
||||
|
||||
def _repair_next_block_label_chain(blocks: list[BlockYAML]) -> None:
|
||||
"""Ensure all top-level blocks form a single acyclic chain from blocks[0].
|
||||
|
||||
Repairs two classes of LLM mistakes:
|
||||
1. Circular references — breaks cycles so the chain has a proper terminal block.
|
||||
2. Disconnected paths — stitches orphaned blocks onto the end of the reachable chain.
|
||||
|
||||
Recursively repairs nested ForLoopBlockYAML.loop_blocks at all depths.
|
||||
Mutates *blocks* in place.
|
||||
"""
|
||||
if len(blocks) <= 1:
|
||||
# Still recurse into loop_blocks even for single-block lists
|
||||
for block in blocks:
|
||||
if isinstance(block, ForLoopBlockYAML) and block.loop_blocks:
|
||||
_repair_next_block_label_chain(block.loop_blocks)
|
||||
return
|
||||
|
||||
# Warn on duplicate labels — the dict comprehension silently keeps the last
|
||||
# occurrence, so earlier blocks with the same label become invisible.
|
||||
seen_labels: set[str] = set()
|
||||
for block in blocks:
|
||||
if block.label in seen_labels:
|
||||
LOG.warning("Copilot produced duplicate block label", label=block.label)
|
||||
seen_labels.add(block.label)
|
||||
|
||||
label_to_block: dict[str, BlockYAML] = {block.label: block for block in blocks}
|
||||
all_labels = set(label_to_block.keys())
|
||||
|
||||
# Phase 1: break any circular references reachable from the first block.
|
||||
# Note: cycles among orphaned blocks (unreachable from blocks[0]) are handled
|
||||
# implicitly by _order_orphaned_blocks via its visited set and re-linking logic.
|
||||
_break_cycles(blocks[0].label, label_to_block)
|
||||
|
||||
# Phase 2: find orphaned (unreachable) blocks and stitch them to the end.
|
||||
reachable: set[str] = set()
|
||||
_collect_reachable(blocks[0].label, label_to_block, reachable)
|
||||
|
||||
orphaned_labels = all_labels - reachable
|
||||
if orphaned_labels:
|
||||
LOG.warning(
|
||||
"Copilot produced disconnected workflow blocks, repairing chain",
|
||||
orphaned_labels=sorted(orphaned_labels),
|
||||
reachable_labels=sorted(reachable),
|
||||
)
|
||||
|
||||
terminal_label = _find_terminal_label(blocks[0].label, label_to_block, all_labels)
|
||||
ordered_orphan_labels = _order_orphaned_blocks(orphaned_labels, label_to_block, all_labels, blocks)
|
||||
|
||||
if terminal_label and ordered_orphan_labels:
|
||||
label_to_block[terminal_label].next_block_label = ordered_orphan_labels[0]
|
||||
|
||||
# Phase 3: recursively repair nested ForLoopBlockYAML.loop_blocks.
|
||||
for block in blocks:
|
||||
if isinstance(block, ForLoopBlockYAML) and block.loop_blocks:
|
||||
_repair_next_block_label_chain(block.loop_blocks)
|
||||
|
||||
|
||||
def _process_workflow_yaml(
|
||||
workflow_id: str,
|
||||
workflow_permanent_id: str,
|
||||
|
|
@ -328,6 +559,8 @@ def _process_workflow_yaml(
|
|||
p for p in workflow_yaml_request.workflow_definition.parameters if p.parameter_type != ParameterType.OUTPUT
|
||||
]
|
||||
|
||||
_repair_next_block_label_chain(workflow_yaml_request.workflow_definition.blocks)
|
||||
|
||||
updated_workflow_definition = convert_workflow_definition(
|
||||
workflow_definition_yaml=workflow_yaml_request.workflow_definition,
|
||||
workflow_id=workflow_id,
|
||||
|
|
@ -585,6 +818,8 @@ async def workflow_copilot_convert_yaml_to_blocks(
|
|||
parsed_yaml = yaml.safe_load(request.workflow_definition_yaml)
|
||||
workflow_definition_yaml = WorkflowDefinitionYAML.model_validate(parsed_yaml)
|
||||
|
||||
_repair_next_block_label_chain(workflow_definition_yaml.blocks)
|
||||
|
||||
workflow_definition = convert_workflow_definition(
|
||||
workflow_definition_yaml=workflow_definition_yaml,
|
||||
workflow_id=request.workflow_id,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue