Update workflow response and webhook, bugfix: workflow is always completed (#328)

This commit is contained in:
Kerem Yilmaz 2024-05-16 13:44:53 -07:00 committed by GitHub
parent e6cee25416
commit 7d3bb704ed
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 30 additions and 43 deletions

View file

@ -491,7 +491,6 @@ async def get_workflow_run(
return await app.WORKFLOW_SERVICE.build_workflow_run_status_response(
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
last_block_result=None,
organization_id=current_org.organization_id,
)

View file

@ -6,7 +6,7 @@ from pydantic import BaseModel
from skyvern.forge.sdk.schemas.tasks import ProxyLocation
from skyvern.forge.sdk.workflow.exceptions import WorkflowDefinitionHasDuplicateBlockLabels
from skyvern.forge.sdk.workflow.models.block import BlockResult, BlockTypeVar
from skyvern.forge.sdk.workflow.models.block import BlockTypeVar
from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE
@ -98,5 +98,4 @@ class WorkflowRunStatusResponse(BaseModel):
parameters: dict[str, Any]
screenshot_urls: list[str] | None = None
recording_url: str | None = None
payload: dict[str, Any] | None = None
output: BlockResult | None = None
outputs: dict[str, Any] | None = None

View file

@ -27,7 +27,6 @@ from skyvern.forge.sdk.workflow.exceptions import (
WorkflowDefinitionHasReservedParameterKeys,
)
from skyvern.forge.sdk.workflow.models.block import (
BlockResult,
BlockType,
BlockTypeVar,
CodeBlock,
@ -204,27 +203,20 @@ class WorkflowService:
)
else:
await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id)
break
await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key)
return workflow_run
except Exception as e:
except Exception:
LOG.exception(
f"Error while executing workflow run {workflow_run.workflow_run_id}",
workflow_run_id=workflow_run.workflow_run_id,
)
await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id)
raise e
await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key)
return workflow_run
tasks = await self.get_tasks_by_workflow_run_id(workflow_run.workflow_run_id)
await self.mark_workflow_run_as_completed(workflow_run_id=workflow_run.workflow_run_id)
await self.send_workflow_response(
workflow=workflow,
workflow_run=workflow_run,
tasks=tasks,
# TODO: We don't persist the block result for now, but we should in the case the users want to get it later
last_block_result=block_result,
api_key=api_key,
)
await self.send_workflow_response(workflow=workflow, workflow_run=workflow_run, api_key=api_key)
return workflow_run
async def handle_workflow_status(self, workflow_run: WorkflowRun, tasks: list[Task]) -> WorkflowRun:
@ -363,7 +355,9 @@ class WorkflowService:
async def mark_workflow_run_as_completed(self, workflow_run_id: str) -> None:
LOG.info(
f"Marking workflow run {workflow_run_id} as completed", workflow_run_id=workflow_run_id, status="completed"
f"Marking workflow run {workflow_run_id} as completed",
workflow_run_id=workflow_run_id,
workflow_status="completed",
)
await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id,
@ -371,7 +365,11 @@ class WorkflowService:
)
async def mark_workflow_run_as_failed(self, workflow_run_id: str) -> None:
LOG.info(f"Marking workflow run {workflow_run_id} as failed", workflow_run_id=workflow_run_id, status="failed")
LOG.info(
f"Marking workflow run {workflow_run_id} as failed",
workflow_run_id=workflow_run_id,
workflow_status="failed",
)
await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.failed,
@ -379,7 +377,9 @@ class WorkflowService:
async def mark_workflow_run_as_running(self, workflow_run_id: str) -> None:
LOG.info(
f"Marking workflow run {workflow_run_id} as running", workflow_run_id=workflow_run_id, status="running"
f"Marking workflow run {workflow_run_id} as running",
workflow_run_id=workflow_run_id,
workflow_status="running",
)
await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id,
@ -390,7 +390,7 @@ class WorkflowService:
LOG.info(
f"Marking workflow run {workflow_run_id} as terminated",
workflow_run_id=workflow_run_id,
status="terminated",
workflow_status="terminated",
)
await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id,
@ -511,7 +511,6 @@ class WorkflowService:
self,
workflow_id: str,
workflow_run_id: str,
last_block_result: BlockResult | None,
organization_id: str,
) -> WorkflowRunStatusResponse:
workflow = await self.get_workflow(workflow_id=workflow_id, organization_id=organization_id)
@ -555,21 +554,14 @@ class WorkflowService:
workflow_id=workflow_id, workflow_run_id=workflow_run_id
)
if output_parameter_tuples:
payload = {
output_parameter.key: wfrp.value
for output_parameter, wfrp in output_parameter_tuples
if wfrp.value is not None
}
outputs = {output_parameter.key: output.value for output_parameter, output in output_parameter_tuples}
else:
payload = {
task.task_id: {
"title": task.title,
"extracted_information": task.extracted_information,
"navigation_payload": task.navigation_payload,
"errors": await app.agent.get_task_errors(task=task),
}
for task in workflow_run_tasks
}
LOG.error(
"No output parameters found for workflow run",
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
)
outputs = None
return WorkflowRunStatusResponse(
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
@ -581,20 +573,18 @@ class WorkflowService:
parameters=parameters_with_value,
screenshot_urls=screenshot_urls,
recording_url=recording_url,
payload=payload,
output=last_block_result,
outputs=outputs,
)
async def send_workflow_response(
self,
workflow: Workflow,
workflow_run: WorkflowRun,
tasks: list[Task],
last_block_result: BlockResult | None,
api_key: str | None = None,
close_browser_on_completion: bool = True,
) -> None:
analytics.capture("skyvern-oss-agent-workflow-status", {"status": workflow_run.status})
tasks = await self.get_tasks_by_workflow_run_id(workflow_run.workflow_run_id)
all_workflow_task_ids = [task.task_id for task in tasks]
browser_state = await app.BROWSER_MANAGER.cleanup_for_workflow_run(
workflow_run.workflow_run_id, all_workflow_task_ids, close_browser_on_completion
@ -608,7 +598,6 @@ class WorkflowService:
workflow_run_status_response = await self.build_workflow_run_status_response(
workflow_id=workflow.workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
last_block_result=last_block_result,
organization_id=workflow.organization_id,
)
LOG.info("Built workflow run status response", workflow_run_status_response=workflow_run_status_response)
@ -629,7 +618,7 @@ class WorkflowService:
)
return
# send task_response to the webhook callback url
# send webhook to the webhook callback url
# TODO: use async requests (httpx)
timestamp = str(int(datetime.utcnow().timestamp()))
payload = workflow_run_status_response.model_dump_json()