diff --git a/skyvern/forge/sdk/schemas/tasks.py b/skyvern/forge/sdk/schemas/tasks.py index 84d32075..ac136ca6 100644 --- a/skyvern/forge/sdk/schemas/tasks.py +++ b/skyvern/forge/sdk/schemas/tasks.py @@ -208,5 +208,23 @@ class TaskResponse(BaseModel): errors: list[dict[str, Any]] = [] +class TaskOutput(BaseModel): + task_id: str + status: TaskStatus + extracted_information: list | dict[str, Any] | str | None = None + failure_reason: str | None = None + errors: list[dict[str, Any]] = [] + + @staticmethod + def from_task(task: Task) -> TaskOutput: + return TaskOutput( + task_id=task.task_id, + status=task.status, + extracted_information=task.extracted_information, + failure_reason=task.failure_reason, + errors=task.errors, + ) + + class CreateTaskResponse(BaseModel): task_id: str diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index f848bbb8..67a3c9cf 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -26,7 +26,7 @@ from skyvern.forge.prompts import prompt_engine from skyvern.forge.sdk.api.aws import AsyncAWSClient from skyvern.forge.sdk.api.files import download_file, get_path_for_workflow_download_directory from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory -from skyvern.forge.sdk.schemas.tasks import TaskStatus +from skyvern.forge.sdk.schemas.tasks import TaskOutput, TaskStatus from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext from skyvern.forge.sdk.workflow.exceptions import InvalidEmailClientConfiguration @@ -63,6 +63,7 @@ class Block(BaseModel, abc.ABC): label: str block_type: BlockType output_parameter: OutputParameter | None = None + continue_on_failure: bool = False @classmethod def get_subclasses(cls) -> tuple[type["Block"], ...]: @@ -241,19 +242,22 @@ class TaskBlock(Block): LOG.info( f"Task completed", task_id=updated_task.task_id, + task_status=updated_task.status, workflow_run_id=workflow_run_id, workflow_id=workflow.workflow_id, organization_id=workflow.organization_id, ) + success = updated_task.status == TaskStatus.completed + task_output = TaskOutput.from_task(updated_task) if self.output_parameter: await workflow_run_context.register_output_parameter_value_post_execution( parameter=self.output_parameter, - value=updated_task.extracted_information, + value=task_output.model_dump(), ) await app.DATABASE.create_workflow_run_output_parameter( workflow_run_id=workflow_run_id, output_parameter_id=self.output_parameter.output_parameter_id, - value=updated_task.extracted_information, + value=task_output.model_dump(), ) LOG.info( f"Registered output parameter value", @@ -264,15 +268,16 @@ class TaskBlock(Block): task_id=updated_task.task_id, ) return BlockResult( - success=True, + success=success, output_parameter=self.output_parameter, - output_parameter_value=updated_task.extracted_information, + output_parameter_value=task_output.model_dump(), ) - return BlockResult(success=True) + return BlockResult(success=success) else: current_retry += 1 will_retry = current_retry <= self.max_retries retry_message = f", retrying task {current_retry}/{self.max_retries}" if will_retry else "" + task_output = TaskOutput.from_task(updated_task) LOG.warning( f"Task failed with status {updated_task.status}{retry_message}", task_id=updated_task.task_id, @@ -282,6 +287,7 @@ class TaskBlock(Block): organization_id=workflow.organization_id, current_retry=current_retry, max_retries=self.max_retries, + task_output=task_output.model_dump_json(), ) return BlockResult(success=False) @@ -353,6 +359,7 @@ class ForLoopBlock(Block): return [parameter_value] async def execute(self, workflow_run_id: str, **kwargs: dict) -> BlockResult: + success = False workflow_run_context = self.get_workflow_run_context(workflow_run_id) loop_over_values = self.get_loop_over_parameter_values(workflow_run_context) LOG.info( @@ -361,15 +368,34 @@ class ForLoopBlock(Block): workflow_run_id=workflow_run_id, num_loop_over_values=len(loop_over_values), ) + if not loop_over_values or len(loop_over_values) == 0: + LOG.info( + f"No loop_over values found", + block_type=self.block_type, + workflow_run_id=workflow_run_id, + num_loop_over_values=len(loop_over_values), + ) + return BlockResult(success=success) outputs_with_loop_values = [] - for loop_over_value in loop_over_values: + for loop_idx, loop_over_value in enumerate(loop_over_values): context_parameters_with_value = self.get_loop_block_context_parameters(workflow_run_id, loop_over_value) for context_parameter in context_parameters_with_value: workflow_run_context.set_value(context_parameter.key, context_parameter.value) try: - block_outputs = [ - await loop_block.execute(workflow_run_id=workflow_run_id) for loop_block in self.loop_blocks - ] + block_outputs = [] + for block_idx, loop_block in enumerate(self.loop_blocks): + block_output = await loop_block.execute_safe(workflow_run_id=workflow_run_id) + block_outputs.append(block_output) + if not block_output.success and not loop_block.continue_on_failure: + LOG.info( + f"ForLoopBlock: Encountered an failure processing block {block_idx} during loop {loop_idx}, terminating early", + block_outputs=block_outputs, + loop_idx=loop_idx, + block_idx=block_idx, + loop_over_value=loop_over_value, + loop_block_continue_on_failure=loop_block.continue_on_failure, + ) + break except Exception as e: LOG.error("ForLoopBlock: Failed to execute loop block", exc_info=True) raise e @@ -385,12 +411,14 @@ class ForLoopBlock(Block): ] ) - # If all block outputs are successful, the loop is successful + # If all block outputs are successful, the loop is successful. If self.continue_on_failure is True, we will + # continue to the next loop iteration even if there are failures. success = all([block_output.success for block_output in block_outputs]) - if not success: + if not success and not self.continue_on_failure: LOG.info( "ForLoopBlock: Encountered an failure processing block, terminating early", block_outputs=block_outputs, + continue_on_failure=self.continue_on_failure, ) break diff --git a/skyvern/forge/sdk/workflow/models/yaml.py b/skyvern/forge/sdk/workflow/models/yaml.py index b70baf86..1bec459b 100644 --- a/skyvern/forge/sdk/workflow/models/yaml.py +++ b/skyvern/forge/sdk/workflow/models/yaml.py @@ -68,6 +68,7 @@ class BlockYAML(BaseModel, abc.ABC): block_type: BlockType label: str output_parameter_key: str | None = None + continue_on_failure: bool = False class TaskBlockYAML(BlockYAML): diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 7f4f3586..85f7c474 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -185,8 +185,18 @@ class WorkflowService: block_idx=block_idx, block_result=block_result, ) - await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) - break + if block.continue_on_failure: + LOG.warning( + f"Block with type {block.block_type} at index {block_idx} failed but will continue executing the workflow run {workflow_run_id}", + block_type=block.block_type, + workflow_run_id=workflow_run.workflow_run_id, + block_idx=block_idx, + block_result=block_result, + continue_on_failure=block.continue_on_failure, + ) + else: + await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) + break except Exception as e: LOG.exception(