diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 7fc2f2cb..3030e5e7 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -8,7 +8,7 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from skyvern.config import settings -from skyvern.exceptions import WorkflowParameterNotFound +from skyvern.exceptions import WorkflowParameterNotFound, WorkflowRunNotFound from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType, TaskType from skyvern.forge.sdk.db.exceptions import NotFoundError @@ -1462,7 +1462,7 @@ class AgentDB: async def update_workflow_run( self, workflow_run_id: str, status: WorkflowRunStatus, failure_reason: str | None = None - ) -> WorkflowRun | None: + ) -> WorkflowRun: async with self.Session() as session: workflow_run = ( await session.scalars(select(WorkflowRunModel).filter_by(workflow_run_id=workflow_run_id)) @@ -1474,11 +1474,8 @@ class AgentDB: await session.refresh(workflow_run) await save_workflow_run_logs(workflow_run_id) return convert_to_workflow_run(workflow_run) - LOG.error( - "WorkflowRun not found, nothing to update", - workflow_run_id=workflow_run_id, - ) - return None + else: + raise WorkflowRunNotFound(workflow_run_id) async def get_all_runs( self, organization_id: str, page: int = 1, page_size: int = 10, status: list[WorkflowRunStatus] | None = None diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index d54fb54c..0264f78c 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -193,7 +193,7 @@ class WorkflowService: if isinstance(e, SkyvernException): failure_reason = f"Setup workflow failed due to an SkyvernException({e.__class__.__name__}): {str(e)}" - await self.mark_workflow_run_as_failed( + workflow_run = await self.mark_workflow_run_as_failed( workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason ) raise e @@ -219,7 +219,7 @@ class WorkflowService: workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id=workflow_run.workflow_permanent_id) # Set workflow run status to running, create workflow run parameters - await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) + workflow_run = await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) # Get all context parameters from the workflow definition context_parameters = [ @@ -266,7 +266,7 @@ class WorkflowService: exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}" failure_reason = f"Failed to initialize workflow run context. failure reason: {exception_message}" - await self.mark_workflow_run_as_failed( + workflow_run = await self.mark_workflow_run_as_failed( workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason ) await self.clean_up_workflow( @@ -284,45 +284,46 @@ class WorkflowService: block_result = None for block_idx, block in enumerate(blocks): try: - refreshed_workflow_run = await app.DATABASE.get_workflow_run( + if refreshed_workflow_run := await app.DATABASE.get_workflow_run( workflow_run_id=workflow_run.workflow_run_id, organization_id=organization_id, - ) - if refreshed_workflow_run and refreshed_workflow_run.status == WorkflowRunStatus.canceled: - LOG.info( - "Workflow run is canceled, stopping execution inside workflow execution loop", - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_type=block.block_type, - block_label=block.label, - ) - await self.clean_up_workflow( - workflow=workflow, - workflow_run=workflow_run, - api_key=api_key, - need_call_webhook=True, - close_browser_on_completion=browser_session_id is None, - browser_session_id=browser_session_id, - ) - return workflow_run + ): + workflow_run = refreshed_workflow_run + if workflow_run.status == WorkflowRunStatus.canceled: + LOG.info( + "Workflow run is canceled, stopping execution inside workflow execution loop", + workflow_run_id=workflow_run.workflow_run_id, + block_idx=block_idx, + block_type=block.block_type, + block_label=block.label, + ) + await self.clean_up_workflow( + workflow=workflow, + workflow_run=workflow_run, + api_key=api_key, + need_call_webhook=True, + close_browser_on_completion=browser_session_id is None, + browser_session_id=browser_session_id, + ) + return workflow_run - if refreshed_workflow_run and refreshed_workflow_run.status == WorkflowRunStatus.timed_out: - LOG.info( - "Workflow run is timed out, stopping execution inside workflow execution loop", - workflow_run_id=workflow_run.workflow_run_id, - block_idx=block_idx, - block_type=block.block_type, - block_label=block.label, - ) - await self.clean_up_workflow( - workflow=workflow, - workflow_run=workflow_run, - api_key=api_key, - need_call_webhook=True, - close_browser_on_completion=browser_session_id is None, - browser_session_id=browser_session_id, - ) - return workflow_run + if workflow_run.status == WorkflowRunStatus.timed_out: + LOG.info( + "Workflow run is timed out, stopping execution inside workflow execution loop", + workflow_run_id=workflow_run.workflow_run_id, + block_idx=block_idx, + block_type=block.block_type, + block_label=block.label, + ) + await self.clean_up_workflow( + workflow=workflow, + workflow_run=workflow_run, + api_key=api_key, + need_call_webhook=True, + close_browser_on_completion=browser_session_id is None, + browser_session_id=browser_session_id, + ) + return workflow_run parameters = block.get_all_parameters(workflow_run_id) await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run( @@ -351,7 +352,9 @@ class WorkflowService: block_type_var=block.block_type, block_label=block.label, ) - await self.mark_workflow_run_as_canceled(workflow_run_id=workflow_run.workflow_run_id) + workflow_run = await self.mark_workflow_run_as_canceled( + workflow_run_id=workflow_run.workflow_run_id + ) # We're not sending a webhook here because the workflow run is manually marked as canceled. await self.clean_up_workflow( workflow=workflow, @@ -376,7 +379,7 @@ class WorkflowService: failure_reason = ( f"{block.block_type} block failed. failure reason: {block_result.failure_reason}" ) - await self.mark_workflow_run_as_failed( + workflow_run = await self.mark_workflow_run_as_failed( workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason ) await self.clean_up_workflow( @@ -412,7 +415,7 @@ class WorkflowService: if not block.continue_on_failure: failure_reason = f"{block.block_type} block terminated. Reason: {block_result.failure_reason}" - await self.mark_workflow_run_as_terminated( + workflow_run = await self.mark_workflow_run_as_terminated( workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason ) await self.clean_up_workflow( @@ -448,7 +451,7 @@ class WorkflowService: if not block.continue_on_failure: failure_reason = f"{block.block_type} block timed out. Reason: {block_result.failure_reason}" - await self.mark_workflow_run_as_failed( + workflow_run = await self.mark_workflow_run_as_failed( workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason ) await self.clean_up_workflow( @@ -485,7 +488,7 @@ class WorkflowService: exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}" failure_reason = f"{block.block_type} block failed. failure reason: {exception_message}" - await self.mark_workflow_run_as_failed( + workflow_run = await self.mark_workflow_run_as_failed( workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason ) await self.clean_up_workflow( @@ -497,23 +500,24 @@ class WorkflowService: ) return workflow_run - refreshed_workflow_run = await app.DATABASE.get_workflow_run( + if refreshed_workflow_run := await app.DATABASE.get_workflow_run( workflow_run_id=workflow_run.workflow_run_id, organization_id=organization_id, - ) - if refreshed_workflow_run and refreshed_workflow_run.status not in ( - WorkflowRunStatus.canceled, - WorkflowRunStatus.failed, - WorkflowRunStatus.terminated, - WorkflowRunStatus.timed_out, ): - await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id) - else: - LOG.info( - "Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed", - workflow_run_id=workflow_run.workflow_run_id, - workflow_run_status=refreshed_workflow_run.status if refreshed_workflow_run else None, - ) + workflow_run = refreshed_workflow_run + if workflow_run.status not in ( + WorkflowRunStatus.canceled, + WorkflowRunStatus.failed, + WorkflowRunStatus.terminated, + WorkflowRunStatus.timed_out, + ): + workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id) + else: + LOG.info( + "Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed", + workflow_run_id=workflow_run.workflow_run_id, + workflow_run_status=workflow_run.status if workflow_run else None, + ) await self.clean_up_workflow( workflow=workflow, workflow_run=workflow_run, @@ -731,72 +735,74 @@ class WorkflowService: parent_workflow_run_id=parent_workflow_run_id, ) - async def mark_workflow_run_as_completed(self, workflow_run_id: str) -> None: + async def mark_workflow_run_as_completed(self, workflow_run_id: str) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as completed", workflow_run_id=workflow_run_id, workflow_status="completed", ) - await app.DATABASE.update_workflow_run( + return await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.completed, ) - async def mark_workflow_run_as_failed(self, workflow_run_id: str, failure_reason: str | None) -> None: + async def mark_workflow_run_as_failed(self, workflow_run_id: str, failure_reason: str | None) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as failed", workflow_run_id=workflow_run_id, workflow_status="failed", failure_reason=failure_reason, ) - await app.DATABASE.update_workflow_run( + return await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.failed, failure_reason=failure_reason, ) - async def mark_workflow_run_as_running(self, workflow_run_id: str) -> None: + async def mark_workflow_run_as_running(self, workflow_run_id: str) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as running", workflow_run_id=workflow_run_id, workflow_status="running", ) - await app.DATABASE.update_workflow_run( + return await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.running, ) - async def mark_workflow_run_as_terminated(self, workflow_run_id: str, failure_reason: str | None) -> None: + async def mark_workflow_run_as_terminated(self, workflow_run_id: str, failure_reason: str | None) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as terminated", workflow_run_id=workflow_run_id, workflow_status="terminated", failure_reason=failure_reason, ) - await app.DATABASE.update_workflow_run( + return await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.terminated, failure_reason=failure_reason, ) - async def mark_workflow_run_as_canceled(self, workflow_run_id: str) -> None: + async def mark_workflow_run_as_canceled(self, workflow_run_id: str) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as canceled", workflow_run_id=workflow_run_id, workflow_status="canceled", ) - await app.DATABASE.update_workflow_run( + return await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.canceled, ) - async def mark_workflow_run_as_timed_out(self, workflow_run_id: str, failure_reason: str | None = None) -> None: + async def mark_workflow_run_as_timed_out( + self, workflow_run_id: str, failure_reason: str | None = None + ) -> WorkflowRun: LOG.info( f"Marking workflow run {workflow_run_id} as timed out", workflow_run_id=workflow_run_id, workflow_status="timed_out", ) - await app.DATABASE.update_workflow_run( + return await app.DATABASE.update_workflow_run( workflow_run_id=workflow_run_id, status=WorkflowRunStatus.timed_out, failure_reason=failure_reason,