fix workflow run webhook status (#2458)

This commit is contained in:
Shuchang Zheng 2025-05-26 08:49:42 -07:00 committed by GitHub
parent e6bb20f720
commit 04c47b7e79
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 80 additions and 77 deletions

View file

@ -8,7 +8,7 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from skyvern.config import settings
from skyvern.exceptions import WorkflowParameterNotFound
from skyvern.exceptions import WorkflowParameterNotFound, WorkflowRunNotFound
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType
from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType, TaskType
from skyvern.forge.sdk.db.exceptions import NotFoundError
@ -1462,7 +1462,7 @@ class AgentDB:
async def update_workflow_run(
self, workflow_run_id: str, status: WorkflowRunStatus, failure_reason: str | None = None
) -> WorkflowRun | None:
) -> WorkflowRun:
async with self.Session() as session:
workflow_run = (
await session.scalars(select(WorkflowRunModel).filter_by(workflow_run_id=workflow_run_id))
@ -1474,11 +1474,8 @@ class AgentDB:
await session.refresh(workflow_run)
await save_workflow_run_logs(workflow_run_id)
return convert_to_workflow_run(workflow_run)
LOG.error(
"WorkflowRun not found, nothing to update",
workflow_run_id=workflow_run_id,
)
return None
else:
raise WorkflowRunNotFound(workflow_run_id)
async def get_all_runs(
self, organization_id: str, page: int = 1, page_size: int = 10, status: list[WorkflowRunStatus] | None = None

View file

@ -193,7 +193,7 @@ class WorkflowService:
if isinstance(e, SkyvernException):
failure_reason = f"Setup workflow failed due to an SkyvernException({e.__class__.__name__}): {str(e)}"
await self.mark_workflow_run_as_failed(
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
)
raise e
@ -219,7 +219,7 @@ class WorkflowService:
workflow = await self.get_workflow_by_permanent_id(workflow_permanent_id=workflow_run.workflow_permanent_id)
# Set workflow run status to running, create workflow run parameters
await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id)
workflow_run = await self.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id)
# Get all context parameters from the workflow definition
context_parameters = [
@ -266,7 +266,7 @@ class WorkflowService:
exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}"
failure_reason = f"Failed to initialize workflow run context. failure reason: {exception_message}"
await self.mark_workflow_run_as_failed(
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
)
await self.clean_up_workflow(
@ -284,45 +284,46 @@ class WorkflowService:
block_result = None
for block_idx, block in enumerate(blocks):
try:
refreshed_workflow_run = await app.DATABASE.get_workflow_run(
if refreshed_workflow_run := await app.DATABASE.get_workflow_run(
workflow_run_id=workflow_run.workflow_run_id,
organization_id=organization_id,
)
if refreshed_workflow_run and refreshed_workflow_run.status == WorkflowRunStatus.canceled:
LOG.info(
"Workflow run is canceled, stopping execution inside workflow execution loop",
workflow_run_id=workflow_run.workflow_run_id,
block_idx=block_idx,
block_type=block.block_type,
block_label=block.label,
)
await self.clean_up_workflow(
workflow=workflow,
workflow_run=workflow_run,
api_key=api_key,
need_call_webhook=True,
close_browser_on_completion=browser_session_id is None,
browser_session_id=browser_session_id,
)
return workflow_run
):
workflow_run = refreshed_workflow_run
if workflow_run.status == WorkflowRunStatus.canceled:
LOG.info(
"Workflow run is canceled, stopping execution inside workflow execution loop",
workflow_run_id=workflow_run.workflow_run_id,
block_idx=block_idx,
block_type=block.block_type,
block_label=block.label,
)
await self.clean_up_workflow(
workflow=workflow,
workflow_run=workflow_run,
api_key=api_key,
need_call_webhook=True,
close_browser_on_completion=browser_session_id is None,
browser_session_id=browser_session_id,
)
return workflow_run
if refreshed_workflow_run and refreshed_workflow_run.status == WorkflowRunStatus.timed_out:
LOG.info(
"Workflow run is timed out, stopping execution inside workflow execution loop",
workflow_run_id=workflow_run.workflow_run_id,
block_idx=block_idx,
block_type=block.block_type,
block_label=block.label,
)
await self.clean_up_workflow(
workflow=workflow,
workflow_run=workflow_run,
api_key=api_key,
need_call_webhook=True,
close_browser_on_completion=browser_session_id is None,
browser_session_id=browser_session_id,
)
return workflow_run
if workflow_run.status == WorkflowRunStatus.timed_out:
LOG.info(
"Workflow run is timed out, stopping execution inside workflow execution loop",
workflow_run_id=workflow_run.workflow_run_id,
block_idx=block_idx,
block_type=block.block_type,
block_label=block.label,
)
await self.clean_up_workflow(
workflow=workflow,
workflow_run=workflow_run,
api_key=api_key,
need_call_webhook=True,
close_browser_on_completion=browser_session_id is None,
browser_session_id=browser_session_id,
)
return workflow_run
parameters = block.get_all_parameters(workflow_run_id)
await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run(
@ -351,7 +352,9 @@ class WorkflowService:
block_type_var=block.block_type,
block_label=block.label,
)
await self.mark_workflow_run_as_canceled(workflow_run_id=workflow_run.workflow_run_id)
workflow_run = await self.mark_workflow_run_as_canceled(
workflow_run_id=workflow_run.workflow_run_id
)
# We're not sending a webhook here because the workflow run is manually marked as canceled.
await self.clean_up_workflow(
workflow=workflow,
@ -376,7 +379,7 @@ class WorkflowService:
failure_reason = (
f"{block.block_type} block failed. failure reason: {block_result.failure_reason}"
)
await self.mark_workflow_run_as_failed(
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
)
await self.clean_up_workflow(
@ -412,7 +415,7 @@ class WorkflowService:
if not block.continue_on_failure:
failure_reason = f"{block.block_type} block terminated. Reason: {block_result.failure_reason}"
await self.mark_workflow_run_as_terminated(
workflow_run = await self.mark_workflow_run_as_terminated(
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
)
await self.clean_up_workflow(
@ -448,7 +451,7 @@ class WorkflowService:
if not block.continue_on_failure:
failure_reason = f"{block.block_type} block timed out. Reason: {block_result.failure_reason}"
await self.mark_workflow_run_as_failed(
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
)
await self.clean_up_workflow(
@ -485,7 +488,7 @@ class WorkflowService:
exception_message = f"unexpected SkyvernException({e.__class__.__name__}): {str(e)}"
failure_reason = f"{block.block_type} block failed. failure reason: {exception_message}"
await self.mark_workflow_run_as_failed(
workflow_run = await self.mark_workflow_run_as_failed(
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
)
await self.clean_up_workflow(
@ -497,23 +500,24 @@ class WorkflowService:
)
return workflow_run
refreshed_workflow_run = await app.DATABASE.get_workflow_run(
if refreshed_workflow_run := await app.DATABASE.get_workflow_run(
workflow_run_id=workflow_run.workflow_run_id,
organization_id=organization_id,
)
if refreshed_workflow_run and refreshed_workflow_run.status not in (
WorkflowRunStatus.canceled,
WorkflowRunStatus.failed,
WorkflowRunStatus.terminated,
WorkflowRunStatus.timed_out,
):
await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id)
else:
LOG.info(
"Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed",
workflow_run_id=workflow_run.workflow_run_id,
workflow_run_status=refreshed_workflow_run.status if refreshed_workflow_run else None,
)
workflow_run = refreshed_workflow_run
if workflow_run.status not in (
WorkflowRunStatus.canceled,
WorkflowRunStatus.failed,
WorkflowRunStatus.terminated,
WorkflowRunStatus.timed_out,
):
workflow_run = await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id)
else:
LOG.info(
"Workflow run is already timed_out, canceled, failed, or terminated, not marking as completed",
workflow_run_id=workflow_run.workflow_run_id,
workflow_run_status=workflow_run.status if workflow_run else None,
)
await self.clean_up_workflow(
workflow=workflow,
workflow_run=workflow_run,
@ -731,72 +735,74 @@ class WorkflowService:
parent_workflow_run_id=parent_workflow_run_id,
)
async def mark_workflow_run_as_completed(self, workflow_run_id: str) -> None:
async def mark_workflow_run_as_completed(self, workflow_run_id: str) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as completed",
workflow_run_id=workflow_run_id,
workflow_status="completed",
)
await app.DATABASE.update_workflow_run(
return await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.completed,
)
async def mark_workflow_run_as_failed(self, workflow_run_id: str, failure_reason: str | None) -> None:
async def mark_workflow_run_as_failed(self, workflow_run_id: str, failure_reason: str | None) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as failed",
workflow_run_id=workflow_run_id,
workflow_status="failed",
failure_reason=failure_reason,
)
await app.DATABASE.update_workflow_run(
return await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.failed,
failure_reason=failure_reason,
)
async def mark_workflow_run_as_running(self, workflow_run_id: str) -> None:
async def mark_workflow_run_as_running(self, workflow_run_id: str) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as running",
workflow_run_id=workflow_run_id,
workflow_status="running",
)
await app.DATABASE.update_workflow_run(
return await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.running,
)
async def mark_workflow_run_as_terminated(self, workflow_run_id: str, failure_reason: str | None) -> None:
async def mark_workflow_run_as_terminated(self, workflow_run_id: str, failure_reason: str | None) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as terminated",
workflow_run_id=workflow_run_id,
workflow_status="terminated",
failure_reason=failure_reason,
)
await app.DATABASE.update_workflow_run(
return await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.terminated,
failure_reason=failure_reason,
)
async def mark_workflow_run_as_canceled(self, workflow_run_id: str) -> None:
async def mark_workflow_run_as_canceled(self, workflow_run_id: str) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as canceled",
workflow_run_id=workflow_run_id,
workflow_status="canceled",
)
await app.DATABASE.update_workflow_run(
return await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.canceled,
)
async def mark_workflow_run_as_timed_out(self, workflow_run_id: str, failure_reason: str | None = None) -> None:
async def mark_workflow_run_as_timed_out(
self, workflow_run_id: str, failure_reason: str | None = None
) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as timed out",
workflow_run_id=workflow_run_id,
workflow_status="timed_out",
)
await app.DATABASE.update_workflow_run(
return await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.timed_out,
failure_reason=failure_reason,