diff --git a/alembic/versions/2024_11_15_0251-1909715536dc_add_failure_reason_column_to_workflow_.py b/alembic/versions/2024_11_15_0251-1909715536dc_add_failure_reason_column_to_workflow_.py new file mode 100644 index 00000000..b6191a23 --- /dev/null +++ b/alembic/versions/2024_11_15_0251-1909715536dc_add_failure_reason_column_to_workflow_.py @@ -0,0 +1,31 @@ +"""Add failure_reason column to workflow_runs + +Revision ID: 1909715536dc +Revises: b8f9e09e181d +Create Date: 2024-11-15 02:51:33.553177+00:00 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "1909715536dc" +down_revision: Union[str, None] = "b8f9e09e181d" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("workflow_runs", sa.Column("failure_reason", sa.String(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("workflow_runs", "failure_reason") + # ### end Alembic commands ### diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index fd3c61cb..63370907 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1083,13 +1083,16 @@ class AgentDB: LOG.error("SQLAlchemyError", exc_info=True) raise - async def update_workflow_run(self, workflow_run_id: str, status: WorkflowRunStatus) -> WorkflowRun | None: + async def update_workflow_run( + self, workflow_run_id: str, status: WorkflowRunStatus, failure_reason: str | None = None + ) -> WorkflowRun | None: async with self.Session() as session: workflow_run = ( await session.scalars(select(WorkflowRunModel).filter_by(workflow_run_id=workflow_run_id)) ).first() if workflow_run: workflow_run.status = status + workflow_run.failure_reason = failure_reason await session.commit() await session.refresh(workflow_run) return convert_to_workflow_run(workflow_run) diff --git a/skyvern/forge/sdk/db/models.py b/skyvern/forge/sdk/db/models.py index a6b47945..42baeda3 100644 --- a/skyvern/forge/sdk/db/models.py +++ b/skyvern/forge/sdk/db/models.py @@ -211,6 +211,7 @@ class WorkflowRunModel(Base): workflow_permanent_id = Column(String, nullable=False, index=True) organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=False, index=True) status = Column(String, nullable=False) + failure_reason = Column(String) proxy_location = Column(Enum(ProxyLocation)) webhook_callback_url = Column(String) totp_verification_url = Column(String) diff --git a/skyvern/forge/sdk/db/utils.py b/skyvern/forge/sdk/db/utils.py index 00383194..a17e08fc 100644 --- a/skyvern/forge/sdk/db/utils.py +++ b/skyvern/forge/sdk/db/utils.py @@ -191,6 +191,7 @@ def convert_to_workflow_run(workflow_run_model: WorkflowRunModel, debug_enabled: workflow_id=workflow_run_model.workflow_id, organization_id=workflow_run_model.organization_id, status=WorkflowRunStatus[workflow_run_model.status], + failure_reason=workflow_run_model.failure_reason, proxy_location=( ProxyLocation(workflow_run_model.proxy_location) if workflow_run_model.proxy_location else None ), diff --git a/skyvern/forge/sdk/workflow/models/block.py b/skyvern/forge/sdk/workflow/models/block.py index 90a3a39a..34ea7b5c 100644 --- a/skyvern/forge/sdk/workflow/models/block.py +++ b/skyvern/forge/sdk/workflow/models/block.py @@ -26,6 +26,7 @@ from skyvern.exceptions import ( FailedToNavigateToUrl, MissingBrowserState, MissingBrowserStatePage, + SkyvernException, TaskNotFound, UnexpectedTaskStatus, ) @@ -39,7 +40,7 @@ from skyvern.forge.sdk.api.files import ( get_path_for_workflow_download_directory, ) from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory -from skyvern.forge.sdk.schemas.tasks import TaskOutput, TaskStatus +from skyvern.forge.sdk.schemas.tasks import Task, TaskOutput, TaskStatus from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext from skyvern.forge.sdk.workflow.exceptions import ( @@ -83,6 +84,7 @@ class BlockResult: output_parameter: OutputParameter output_parameter_value: dict[str, Any] | list | str | None = None status: BlockStatus | None = None + failure_reason: str | None = None class Block(BaseModel, abc.ABC): @@ -116,11 +118,13 @@ class Block(BaseModel, abc.ABC): def build_block_result( self, success: bool, + failure_reason: str | None, output_parameter_value: dict[str, Any] | list | str | None = None, status: BlockStatus | None = None, ) -> BlockResult: return BlockResult( success=success, + failure_reason=failure_reason, output_parameter=self.output_parameter, output_parameter_value=output_parameter_value, status=status, @@ -145,7 +149,7 @@ class Block(BaseModel, abc.ABC): async def execute_safe(self, workflow_run_id: str, **kwargs: dict) -> BlockResult: try: return await self.execute(workflow_run_id, **kwargs) - except Exception: + except Exception as e: LOG.exception( "Block execution failed", workflow_run_id=workflow_run_id, @@ -156,7 +160,11 @@ class Block(BaseModel, abc.ABC): workflow_run_context = self.get_workflow_run_context(workflow_run_id) if not workflow_run_context.has_value(self.output_parameter.key): await self.record_output_parameter_value(workflow_run_context, workflow_run_id) - return self.build_block_result(success=False, status=BlockStatus.failed) + + failure_reason = "unexpected exception" + if isinstance(e, SkyvernException): + failure_reason = f"unexpected SkyvernException({e.__class__.__name__})" + return self.build_block_result(success=False, failure_reason=failure_reason, status=BlockStatus.failed) @abc.abstractmethod def get_all_parameters( @@ -233,6 +241,7 @@ class TaskBlock(Block): current_retry = 0 # initial value for will_retry is True, so that the loop runs at least once will_retry = True + current_running_task: Task | None = None workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id=workflow_run_id) workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id=workflow_run.workflow_id) # if the task url is parameterized, we need to get the value from the workflow run context @@ -283,6 +292,7 @@ class TaskBlock(Block): task_order=task_order, task_retry=task_retry, ) + current_running_task = task organization = await app.DATABASE.get_organization(organization_id=workflow.organization_id) if not organization: raise Exception(f"Organization is missing organization_id={workflow.organization_id}") @@ -353,6 +363,7 @@ class TaskBlock(Block): raise TaskNotFound(task.task_id) if not updated_task.status.is_final(): raise UnexpectedTaskStatus(task_id=updated_task.task_id, status=updated_task.status) + current_running_task = updated_task block_status_mapping = { TaskStatus.completed: BlockStatus.completed, @@ -375,6 +386,7 @@ class TaskBlock(Block): await self.record_output_parameter_value(workflow_run_context, workflow_run_id, output_parameter_value) return self.build_block_result( success=success, + failure_reason=updated_task.failure_reason, output_parameter_value=output_parameter_value, status=block_status_mapping[updated_task.status], ) @@ -388,7 +400,10 @@ class TaskBlock(Block): organization_id=workflow.organization_id, ) return self.build_block_result( - success=False, output_parameter_value=None, status=block_status_mapping[updated_task.status] + success=False, + failure_reason=updated_task.failure_reason, + output_parameter_value=None, + status=block_status_mapping[updated_task.status], ) else: current_retry += 1 @@ -413,12 +428,17 @@ class TaskBlock(Block): ) return self.build_block_result( success=False, + failure_reason=updated_task.failure_reason, output_parameter_value=output_parameter_value, status=block_status_mapping[updated_task.status], ) await self.record_output_parameter_value(workflow_run_context, workflow_run_id) - return self.build_block_result(success=False, status=BlockStatus.failed) + return self.build_block_result( + success=False, + status=BlockStatus.failed, + failure_reason=current_running_task.failure_reason if current_running_task else None, + ) class ForLoopBlock(Block): @@ -520,19 +540,37 @@ class ForLoopBlock(Block): num_loop_over_values=len(loop_over_values), ) await self.record_output_parameter_value(workflow_run_context, workflow_run_id, []) - return self.build_block_result(success=False, status=BlockStatus.terminated) + return self.build_block_result( + success=False, + failure_reason="No iterable value found for the loop block", + status=BlockStatus.terminated, + ) + + if not self.loop_blocks or len(self.loop_blocks) == 0: + LOG.info( + "No defined blocks to loop, terminating block", + block_type=self.block_type, + workflow_run_id=workflow_run_id, + num_loop_blocks=len(self.loop_blocks), + ) + await self.record_output_parameter_value(workflow_run_context, workflow_run_id, []) + return self.build_block_result( + success=False, failure_reason="No defined blocks to loop", status=BlockStatus.terminated + ) + + block_outputs: list[BlockResult] = [] for loop_idx, loop_over_value in enumerate(loop_over_values): context_parameters_with_value = self.get_loop_block_context_parameters(workflow_run_id, loop_over_value) for context_parameter in context_parameters_with_value: workflow_run_context.set_value(context_parameter.key, context_parameter.value) - block_outputs = [] for block_idx, loop_block in enumerate(self.loop_blocks): original_loop_block = loop_block loop_block = loop_block.copy() block_output = await loop_block.execute_safe(workflow_run_id=workflow_run_id) if block_output.status == BlockStatus.canceled: + failure_message = f"ForLoopBlock: Block with type {loop_block.block_type} at index {block_idx} during loop {loop_idx} was canceled for workflow run {workflow_run_id}, canceling for loop" LOG.info( - f"ForLoopBlock: Block with type {loop_block.block_type} at index {block_idx} was canceled for workflow run {workflow_run_id}, canceling for loop", + failure_message, block_type=loop_block.block_type, workflow_run_id=workflow_run_id, block_idx=block_idx, @@ -542,7 +580,10 @@ class ForLoopBlock(Block): workflow_run_context, workflow_run_id, outputs_with_loop_values ) return self.build_block_result( - success=False, output_parameter_value=outputs_with_loop_values, status=BlockStatus.canceled + success=False, + failure_reason=failure_message, + output_parameter_value=outputs_with_loop_values, + status=BlockStatus.canceled, ) loop_block = original_loop_block @@ -580,6 +621,9 @@ class ForLoopBlock(Block): ) break + # at least one block must be executed in the loop + assert len(block_outputs) != 0 + is_any_block_terminated = any([block_output.status == BlockStatus.terminated for block_output in block_outputs]) for_loop_block_status = BlockStatus.completed if is_any_block_terminated: @@ -588,7 +632,10 @@ class ForLoopBlock(Block): for_loop_block_status = BlockStatus.failed await self.record_output_parameter_value(workflow_run_context, workflow_run_id, outputs_with_loop_values) return self.build_block_result( - success=success, output_parameter_value=outputs_with_loop_values, status=for_loop_block_status + success=success, + failure_reason=block_outputs[-1].failure_reason, + output_parameter_value=outputs_with_loop_values, + status=for_loop_block_status, ) @@ -710,7 +757,9 @@ class TextPromptBlock(Block): response = await self.send_prompt(self.prompt, parameter_values) await self.record_output_parameter_value(workflow_run_context, workflow_run_id, response) - return self.build_block_result(success=True, output_parameter_value=response, status=BlockStatus.completed) + return self.build_block_result( + success=True, failure_reason=None, output_parameter_value=response, status=BlockStatus.completed + ) class DownloadToS3Block(Block): @@ -767,7 +816,9 @@ class DownloadToS3Block(Block): LOG.info("DownloadToS3Block: File downloaded and uploaded to S3", uri=uri) await self.record_output_parameter_value(workflow_run_context, workflow_run_id, uri) - return self.build_block_result(success=True, output_parameter_value=uri, status=BlockStatus.completed) + return self.build_block_result( + success=True, failure_reason=None, output_parameter_value=uri, status=BlockStatus.completed + ) class UploadToS3Block(Block): @@ -841,7 +892,9 @@ class UploadToS3Block(Block): LOG.info("UploadToS3Block: File(s) uploaded to S3", file_path=self.path) await self.record_output_parameter_value(workflow_run_context, workflow_run_id, s3_uris) - return self.build_block_result(success=True, output_parameter_value=s3_uris, status=BlockStatus.completed) + return self.build_block_result( + success=True, failure_reason=None, output_parameter_value=s3_uris, status=BlockStatus.completed + ) class SendEmailBlock(Block): @@ -1109,14 +1162,18 @@ class SendEmailBlock(Block): LOG.error("SendEmailBlock: Failed to send email", exc_info=True) result_dict = {"success": False, "error": str(e)} await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict) - return self.build_block_result(success=False, output_parameter_value=result_dict, status=BlockStatus.failed) + return self.build_block_result( + success=False, failure_reason=str(e), output_parameter_value=result_dict, status=BlockStatus.failed + ) finally: if smtp_host: smtp_host.quit() result_dict = {"success": True} await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict) - return self.build_block_result(success=True, output_parameter_value=result_dict, status=BlockStatus.completed) + return self.build_block_result( + success=True, failure_reason=None, output_parameter_value=result_dict, status=BlockStatus.completed + ) class FileType(StrEnum): @@ -1179,7 +1236,9 @@ class FileParserBlock(Block): parsed_data.append(row) # Record the parsed data await self.record_output_parameter_value(workflow_run_context, workflow_run_id, parsed_data) - return self.build_block_result(success=True, output_parameter_value=parsed_data, status=BlockStatus.completed) + return self.build_block_result( + success=True, failure_reason=None, output_parameter_value=parsed_data, status=BlockStatus.completed + ) BlockSubclasses = Union[ diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index 2b13a534..28ba042d 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -90,6 +90,7 @@ class WorkflowRun(BaseModel): webhook_callback_url: str | None = None totp_verification_url: str | None = None totp_identifier: str | None = None + failure_reason: str | None = None created_at: datetime modified_at: datetime diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index bfbc2c39..10cc0b14 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -6,7 +6,13 @@ import httpx import structlog from skyvern import analytics -from skyvern.exceptions import FailedToSendWebhook, MissingValueForParameter, WorkflowNotFound, WorkflowRunNotFound +from skyvern.exceptions import ( + FailedToSendWebhook, + MissingValueForParameter, + SkyvernException, + WorkflowNotFound, + WorkflowRunNotFound, +) from skyvern.forge import app from skyvern.forge.sdk.artifact.models import ArtifactType from skyvern.forge.sdk.core import skyvern_context @@ -147,7 +153,14 @@ class WorkflowService: f"Error while setting up workflow run {workflow_run.workflow_run_id}", workflow_run_id=workflow_run.workflow_run_id, ) - await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) + + failure_reason = "Setup up workflow failed due to an unexpected exception" + if isinstance(e, SkyvernException): + failure_reason = f"Setup workflow failed due to an SkyvernException({e.__class__.__name__})" + + await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason + ) raise e return workflow_run @@ -260,7 +273,10 @@ class WorkflowService: block_label=block.label, ) else: - await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) + failure_reason = f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} failed. failure reason: {block_result.failure_reason}" + await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason + ) await self.clean_up_workflow( workflow=workflow, workflow_run=workflow_run, @@ -289,14 +305,17 @@ class WorkflowService: block_label=block.label, ) else: - await self.mark_workflow_run_as_terminated(workflow_run_id=workflow_run.workflow_run_id) + failure_reason = f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} terminated. Reason: {block_result.failure_reason}" + await self.mark_workflow_run_as_terminated( + workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason + ) await self.clean_up_workflow( workflow=workflow, workflow_run=workflow_run, api_key=api_key, ) return workflow_run - except Exception: + except Exception as e: LOG.exception( f"Error while executing workflow run {workflow_run.workflow_run_id}", workflow_run_id=workflow_run.workflow_run_id, @@ -304,7 +323,15 @@ class WorkflowService: block_type=block.block_type, block_label=block.label, ) - await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id) + + exception_message = "unexpected exception" + if isinstance(e, SkyvernException): + exception_message = f"unexpected SkyvernException({e.__class__.__name__})" + + failure_reason = f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} failed. failure reason: {exception_message}" + await self.mark_workflow_run_as_failed( + workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason + ) await self.clean_up_workflow(workflow=workflow, workflow_run=workflow_run, api_key=api_key) return workflow_run @@ -472,15 +499,17 @@ class WorkflowService: status=WorkflowRunStatus.completed, ) - async def mark_workflow_run_as_failed(self, workflow_run_id: str) -> None: + async def mark_workflow_run_as_failed(self, workflow_run_id: str, failure_reason: str | None) -> None: LOG.info( f"Marking workflow run {workflow_run_id} as failed", workflow_run_id=workflow_run_id, workflow_status="failed", + failure_reason=failure_reason, ) await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.failed, + failure_reason=failure_reason, ) async def mark_workflow_run_as_running(self, workflow_run_id: str) -> None: @@ -494,15 +523,17 @@ class WorkflowService: status=WorkflowRunStatus.running, ) - async def mark_workflow_run_as_terminated(self, workflow_run_id: str) -> None: + async def mark_workflow_run_as_terminated(self, workflow_run_id: str, failure_reason: str | None) -> None: LOG.info( f"Marking workflow run {workflow_run_id} as terminated", workflow_run_id=workflow_run_id, workflow_status="terminated", + failure_reason=failure_reason, ) await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.terminated, + failure_reason=failure_reason, ) async def mark_workflow_run_as_canceled(self, workflow_run_id: str) -> None: