Workflow Fixes (#156)

This commit is contained in:
Kerem Yilmaz 2024-04-04 19:09:19 -07:00 committed by GitHub
parent 8117395d73
commit 0800990627
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 350 additions and 108 deletions

View file

@ -22,6 +22,7 @@ from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus
from skyvern.forge.sdk.workflow.exceptions import WorkflowDefinitionHasDuplicateParameterKeys
from skyvern.forge.sdk.workflow.models.block import (
BlockResult,
BlockType,
BlockTypeVar,
CodeBlock,
@ -30,6 +31,7 @@ from skyvern.forge.sdk.workflow.models.block import (
SendEmailBlock,
TaskBlock,
TextPromptBlock,
UploadToS3Block,
)
from skyvern.forge.sdk.workflow.models.parameter import (
AWSSecretParameter,
@ -150,9 +152,10 @@ class WorkflowService:
)
# Execute workflow blocks
blocks = workflow.workflow_definition.blocks
try:
for block_idx, block in enumerate(blocks):
parameters = block.get_all_parameters()
block_result = None
for block_idx, block in enumerate(blocks):
try:
parameters = block.get_all_parameters(workflow_run_id)
await app.WORKFLOW_CONTEXT_MANAGER.register_block_parameters_for_workflow_run(
workflow_run_id, parameters
)
@ -162,40 +165,36 @@ class WorkflowService:
workflow_run_id=workflow_run.workflow_run_id,
block_idx=block_idx,
)
await block.execute(workflow_run_id=workflow_run_id)
except Exception:
LOG.exception(
f"Error while executing workflow run {workflow_run.workflow_run_id}",
workflow_run_id=workflow_run.workflow_run_id,
exc_info=True,
)
block_result = await block.execute_safe(workflow_run_id=workflow_run_id)
if not block_result.success:
LOG.error(
f"Block with type {block.block_type} at index {block_idx} failed for workflow run {workflow_run_id}",
block_type=block.block_type,
workflow_run_id=workflow_run.workflow_run_id,
block_idx=block_idx,
block_result=block_result,
)
await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id)
break
tasks = await self.get_tasks_by_workflow_run_id(workflow_run.workflow_run_id)
if tasks:
workflow_run = await self.handle_workflow_status(workflow_run=workflow_run, tasks=tasks)
else:
# Check if the workflow run has any workflow run output parameters
# if it does, mark the workflow run as completed, else mark it as failed
workflow_run_output_parameters = await self.get_workflow_run_output_parameters(
workflow_run_id=workflow_run.workflow_run_id
)
if workflow_run_output_parameters:
LOG.info(
f"Workflow run {workflow_run.workflow_run_id} has output parameters, marking as completed",
workflow_run_id=workflow_run.workflow_run_id,
)
await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id)
else:
LOG.error(
f"Workflow run {workflow_run.workflow_run_id} has no tasks or output parameters, marking as failed",
except Exception as e:
LOG.exception(
f"Error while executing workflow run {workflow_run.workflow_run_id}",
workflow_run_id=workflow_run.workflow_run_id,
exc_info=True,
)
await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id)
raise e
tasks = await self.get_tasks_by_workflow_run_id(workflow_run.workflow_run_id)
await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id)
await self.send_workflow_response(
workflow=workflow,
workflow_run=workflow_run,
tasks=tasks,
# TODO: We don't persist the block result for now, but we should in the case the users want to get it later
last_block_result=block_result,
api_key=api_key,
)
return workflow_run
@ -430,7 +429,7 @@ class WorkflowService:
return await app.DATABASE.get_tasks_by_workflow_run_id(workflow_run_id=workflow_run_id)
async def build_workflow_run_status_response(
self, workflow_id: str, workflow_run_id: str, organization_id: str
self, workflow_id: str, workflow_run_id: str, last_block_result: BlockResult | None, organization_id: str
) -> WorkflowRunStatusResponse:
workflow = await self.get_workflow(workflow_id=workflow_id)
if workflow is None:
@ -499,6 +498,7 @@ class WorkflowService:
screenshot_urls=screenshot_urls,
recording_url=recording_url,
payload=payload,
output=last_block_result,
)
async def send_workflow_response(
@ -506,6 +506,7 @@ class WorkflowService:
workflow: Workflow,
workflow_run: WorkflowRun,
tasks: list[Task],
last_block_result: BlockResult | None,
api_key: str | None = None,
close_browser_on_completion: bool = True,
) -> None:
@ -523,6 +524,7 @@ class WorkflowService:
workflow_run_status_response = await self.build_workflow_run_status_response(
workflow_id=workflow.workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
last_block_result=last_block_result,
organization_id=workflow.organization_id,
)
LOG.info("Built workflow run status response", workflow_run_status_response=workflow_run_status_response)
@ -772,6 +774,12 @@ class WorkflowService:
output_parameter=output_parameter,
url=block_yaml.url,
)
elif block_yaml.block_type == BlockType.UPLOAD_TO_S3:
return UploadToS3Block(
label=block_yaml.label,
output_parameter=output_parameter,
path=block_yaml.path,
)
elif block_yaml.block_type == BlockType.SEND_EMAIL:
return SendEmailBlock(
label=block_yaml.label,