fix task workflow cancel issue (#1111)

This commit is contained in:
Shuchang Zheng 2024-11-01 15:13:41 -07:00 committed by GitHub
parent d03ad38630
commit ad0bd0b4f5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 204 additions and 111 deletions

View file

@ -2,7 +2,7 @@ import json
from datetime import datetime
from typing import Any
import requests
import httpx
import structlog
from skyvern import analytics
@ -183,15 +183,17 @@ class WorkflowService:
)
# Execute workflow blocks
blocks = workflow.workflow_definition.blocks
blocks_cnt = len(blocks)
block_result = None
for block_idx, block in enumerate(blocks):
is_last_block = block_idx + 1 == blocks_cnt
try:
parameters = block.get_all_parameters(workflow_run_id)
await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run(
workflow_run_id, parameters, organization
)
LOG.info(
f"Executing root block {block.block_type} at index {block_idx} for workflow run {workflow_run_id}",
f"Executing root block {block.block_type} at index {block_idx}/{blocks_cnt -1} for workflow run {workflow_run_id}",
block_type=block.block_type,
workflow_run_id=workflow_run.workflow_run_id,
block_idx=block_idx,
@ -199,7 +201,7 @@ class WorkflowService:
block_result = await block.execute_safe(workflow_run_id=workflow_run_id)
if block_result.status == BlockStatus.canceled:
LOG.info(
f"Block with type {block.block_type} at index {block_idx} was canceled for workflow run {workflow_run_id}, cancelling workflow run",
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} was canceled for workflow run {workflow_run_id}, cancelling workflow run",
block_type=block.block_type,
workflow_run_id=workflow_run.workflow_run_id,
block_idx=block_idx,
@ -207,18 +209,24 @@ class WorkflowService:
)
await self.mark_workflow_run_as_canceled(workflow_run_id=workflow_run.workflow_run_id)
# We're not sending a webhook here because the workflow run is manually marked as canceled.
await self.clean_up_workflow(
workflow=workflow,
workflow_run=workflow_run,
api_key=api_key,
need_call_webhook=False,
)
return workflow_run
elif block_result.status == BlockStatus.failed:
LOG.error(
f"Block with type {block.block_type} at index {block_idx} failed for workflow run {workflow_run_id}",
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} failed for workflow run {workflow_run_id}",
block_type=block.block_type,
workflow_run_id=workflow_run.workflow_run_id,
block_idx=block_idx,
block_result=block_result,
)
if block.continue_on_failure:
if block.continue_on_failure and not is_last_block:
LOG.warning(
f"Block with type {block.block_type} at index {block_idx} failed but will continue executing the workflow run {workflow_run_id}",
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} failed but will continue executing the workflow run {workflow_run_id}",
block_type=block.block_type,
workflow_run_id=workflow_run.workflow_run_id,
block_idx=block_idx,
@ -227,7 +235,7 @@ class WorkflowService:
)
else:
await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id)
await self.send_workflow_response(
await self.clean_up_workflow(
workflow=workflow,
workflow_run=workflow_run,
api_key=api_key,
@ -235,15 +243,15 @@ class WorkflowService:
return workflow_run
elif block_result.status == BlockStatus.terminated:
LOG.info(
f"Block with type {block.block_type} at index {block_idx} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated",
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated",
block_type=block.block_type,
workflow_run_id=workflow_run.workflow_run_id,
block_idx=block_idx,
block_result=block_result,
)
if block.continue_on_failure:
if block.continue_on_failure and not is_last_block:
LOG.warning(
f"Block with type {block.block_type} at index {block_idx} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run",
f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run",
block_type=block.block_type,
workflow_run_id=workflow_run.workflow_run_id,
block_idx=block_idx,
@ -252,7 +260,7 @@ class WorkflowService:
)
else:
await self.mark_workflow_run_as_terminated(workflow_run_id=workflow_run.workflow_run_id)
await self.send_workflow_response(
await self.clean_up_workflow(
workflow=workflow,
workflow_run=workflow_run,
api_key=api_key,
@ -264,11 +272,11 @@ class WorkflowService:
workflow_run_id=workflow_run.workflow_run_id,
)
await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id)
await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key)
await self.clean_up_workflow(workflow=workflow, workflow_run=workflow_run, api_key=api_key)
return workflow_run
await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id)
await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key)
await self.clean_up_workflow(workflow=workflow, workflow_run=workflow_run, api_key=api_key)
return workflow_run
async def create_workflow(
@ -707,12 +715,13 @@ class WorkflowService:
outputs=outputs,
)
async def send_workflow_response(
async def clean_up_workflow(
self,
workflow: Workflow,
workflow_run: WorkflowRun,
api_key: str | None = None,
close_browser_on_completion: bool = True,
need_call_webhook: bool = True,
) -> None:
analytics.capture("skyvern-oss-agent-workflow-status", {"status": workflow_run.status})
tasks = await self.get_tasks_by_workflow_run_id(workflow_run.workflow_run_id)
@ -735,6 +744,9 @@ class WorkflowService:
await app.ARTIFACT_MANAGER.wait_for_upload_aiotasks_for_tasks(all_workflow_task_ids)
if not need_call_webhook:
return
workflow_run_status_response = await self.build_workflow_run_status_response(
workflow_permanent_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run.workflow_run_id,
@ -762,7 +774,6 @@ class WorkflowService:
return
# send webhook to the webhook callback url
# TODO: use async requests (httpx)
timestamp = str(int(datetime.utcnow().timestamp()))
payload = workflow_run_status_response.model_dump_json()
signature = generate_skyvern_signature(
@ -783,8 +794,10 @@ class WorkflowService:
headers=headers,
)
try:
resp = requests.post(workflow_run.webhook_callback_url, data=payload, headers=headers)
if resp.ok:
resp = await httpx.AsyncClient().post(
url=workflow_run.webhook_callback_url, data=payload, headers=headers, timeout=httpx.Timeout(30.0)
)
if resp.status_code == 200:
LOG.info(
"Webhook sent successfully",
workflow_id=workflow.workflow_id,