mirror of
https://github.com/Skyvern-AI/skyvern.git
synced 2025-09-14 09:19:40 +00:00
add workflow failure reason (#1198)
This commit is contained in:
parent
b2516dc95f
commit
e3aa583b24
7 changed files with 152 additions and 25 deletions
|
@ -0,0 +1,31 @@
|
|||
"""Add failure_reason column to workflow_runs
|
||||
|
||||
Revision ID: 1909715536dc
|
||||
Revises: b8f9e09e181d
|
||||
Create Date: 2024-11-15 02:51:33.553177+00:00
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "1909715536dc"
|
||||
down_revision: Union[str, None] = "b8f9e09e181d"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column("workflow_runs", sa.Column("failure_reason", sa.String(), nullable=True))
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_column("workflow_runs", "failure_reason")
|
||||
# ### end Alembic commands ###
|
|
@ -1083,13 +1083,16 @@ class AgentDB:
|
|||
LOG.error("SQLAlchemyError", exc_info=True)
|
||||
raise
|
||||
|
||||
async def update_workflow_run(self, workflow_run_id: str, status: WorkflowRunStatus) -> WorkflowRun | None:
|
||||
async def update_workflow_run(
|
||||
self, workflow_run_id: str, status: WorkflowRunStatus, failure_reason: str | None = None
|
||||
) -> WorkflowRun | None:
|
||||
async with self.Session() as session:
|
||||
workflow_run = (
|
||||
await session.scalars(select(WorkflowRunModel).filter_by(workflow_run_id=workflow_run_id))
|
||||
).first()
|
||||
if workflow_run:
|
||||
workflow_run.status = status
|
||||
workflow_run.failure_reason = failure_reason
|
||||
await session.commit()
|
||||
await session.refresh(workflow_run)
|
||||
return convert_to_workflow_run(workflow_run)
|
||||
|
|
|
@ -211,6 +211,7 @@ class WorkflowRunModel(Base):
|
|||
workflow_permanent_id = Column(String, nullable=False, index=True)
|
||||
organization_id = Column(String, ForeignKey("organizations.organization_id"), nullable=False, index=True)
|
||||
status = Column(String, nullable=False)
|
||||
failure_reason = Column(String)
|
||||
proxy_location = Column(Enum(ProxyLocation))
|
||||
webhook_callback_url = Column(String)
|
||||
totp_verification_url = Column(String)
|
||||
|
|
|
@ -191,6 +191,7 @@ def convert_to_workflow_run(workflow_run_model: WorkflowRunModel, debug_enabled:
|
|||
workflow_id=workflow_run_model.workflow_id,
|
||||
organization_id=workflow_run_model.organization_id,
|
||||
status=WorkflowRunStatus[workflow_run_model.status],
|
||||
failure_reason=workflow_run_model.failure_reason,
|
||||
proxy_location=(
|
||||
ProxyLocation(workflow_run_model.proxy_location) if workflow_run_model.proxy_location else None
|
||||
),
|
||||
|
|
|
@ -26,6 +26,7 @@ from skyvern.exceptions import (
|
|||
FailedToNavigateToUrl,
|
||||
MissingBrowserState,
|
||||
MissingBrowserStatePage,
|
||||
SkyvernException,
|
||||
TaskNotFound,
|
||||
UnexpectedTaskStatus,
|
||||
)
|
||||
|
@ -39,7 +40,7 @@ from skyvern.forge.sdk.api.files import (
|
|||
get_path_for_workflow_download_directory,
|
||||
)
|
||||
from skyvern.forge.sdk.api.llm.api_handler_factory import LLMAPIHandlerFactory
|
||||
from skyvern.forge.sdk.schemas.tasks import TaskOutput, TaskStatus
|
||||
from skyvern.forge.sdk.schemas.tasks import Task, TaskOutput, TaskStatus
|
||||
from skyvern.forge.sdk.settings_manager import SettingsManager
|
||||
from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
|
||||
from skyvern.forge.sdk.workflow.exceptions import (
|
||||
|
@ -83,6 +84,7 @@ class BlockResult:
|
|||
output_parameter: OutputParameter
|
||||
output_parameter_value: dict[str, Any] | list | str | None = None
|
||||
status: BlockStatus | None = None
|
||||
failure_reason: str | None = None
|
||||
|
||||
|
||||
class Block(BaseModel, abc.ABC):
|
||||
|
@ -116,11 +118,13 @@ class Block(BaseModel, abc.ABC):
|
|||
def build_block_result(
|
||||
self,
|
||||
success: bool,
|
||||
failure_reason: str | None,
|
||||
output_parameter_value: dict[str, Any] | list | str | None = None,
|
||||
status: BlockStatus | None = None,
|
||||
) -> BlockResult:
|
||||
return BlockResult(
|
||||
success=success,
|
||||
failure_reason=failure_reason,
|
||||
output_parameter=self.output_parameter,
|
||||
output_parameter_value=output_parameter_value,
|
||||
status=status,
|
||||
|
@ -145,7 +149,7 @@ class Block(BaseModel, abc.ABC):
|
|||
async def execute_safe(self, workflow_run_id: str, **kwargs: dict) -> BlockResult:
|
||||
try:
|
||||
return await self.execute(workflow_run_id, **kwargs)
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
LOG.exception(
|
||||
"Block execution failed",
|
||||
workflow_run_id=workflow_run_id,
|
||||
|
@ -156,7 +160,11 @@ class Block(BaseModel, abc.ABC):
|
|||
workflow_run_context = self.get_workflow_run_context(workflow_run_id)
|
||||
if not workflow_run_context.has_value(self.output_parameter.key):
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id)
|
||||
return self.build_block_result(success=False, status=BlockStatus.failed)
|
||||
|
||||
failure_reason = "unexpected exception"
|
||||
if isinstance(e, SkyvernException):
|
||||
failure_reason = f"unexpected SkyvernException({e.__class__.__name__})"
|
||||
return self.build_block_result(success=False, failure_reason=failure_reason, status=BlockStatus.failed)
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_all_parameters(
|
||||
|
@ -233,6 +241,7 @@ class TaskBlock(Block):
|
|||
current_retry = 0
|
||||
# initial value for will_retry is True, so that the loop runs at least once
|
||||
will_retry = True
|
||||
current_running_task: Task | None = None
|
||||
workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id=workflow_run_id)
|
||||
workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id=workflow_run.workflow_id)
|
||||
# if the task url is parameterized, we need to get the value from the workflow run context
|
||||
|
@ -283,6 +292,7 @@ class TaskBlock(Block):
|
|||
task_order=task_order,
|
||||
task_retry=task_retry,
|
||||
)
|
||||
current_running_task = task
|
||||
organization = await app.DATABASE.get_organization(organization_id=workflow.organization_id)
|
||||
if not organization:
|
||||
raise Exception(f"Organization is missing organization_id={workflow.organization_id}")
|
||||
|
@ -353,6 +363,7 @@ class TaskBlock(Block):
|
|||
raise TaskNotFound(task.task_id)
|
||||
if not updated_task.status.is_final():
|
||||
raise UnexpectedTaskStatus(task_id=updated_task.task_id, status=updated_task.status)
|
||||
current_running_task = updated_task
|
||||
|
||||
block_status_mapping = {
|
||||
TaskStatus.completed: BlockStatus.completed,
|
||||
|
@ -375,6 +386,7 @@ class TaskBlock(Block):
|
|||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, output_parameter_value)
|
||||
return self.build_block_result(
|
||||
success=success,
|
||||
failure_reason=updated_task.failure_reason,
|
||||
output_parameter_value=output_parameter_value,
|
||||
status=block_status_mapping[updated_task.status],
|
||||
)
|
||||
|
@ -388,7 +400,10 @@ class TaskBlock(Block):
|
|||
organization_id=workflow.organization_id,
|
||||
)
|
||||
return self.build_block_result(
|
||||
success=False, output_parameter_value=None, status=block_status_mapping[updated_task.status]
|
||||
success=False,
|
||||
failure_reason=updated_task.failure_reason,
|
||||
output_parameter_value=None,
|
||||
status=block_status_mapping[updated_task.status],
|
||||
)
|
||||
else:
|
||||
current_retry += 1
|
||||
|
@ -413,12 +428,17 @@ class TaskBlock(Block):
|
|||
)
|
||||
return self.build_block_result(
|
||||
success=False,
|
||||
failure_reason=updated_task.failure_reason,
|
||||
output_parameter_value=output_parameter_value,
|
||||
status=block_status_mapping[updated_task.status],
|
||||
)
|
||||
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id)
|
||||
return self.build_block_result(success=False, status=BlockStatus.failed)
|
||||
return self.build_block_result(
|
||||
success=False,
|
||||
status=BlockStatus.failed,
|
||||
failure_reason=current_running_task.failure_reason if current_running_task else None,
|
||||
)
|
||||
|
||||
|
||||
class ForLoopBlock(Block):
|
||||
|
@ -520,19 +540,37 @@ class ForLoopBlock(Block):
|
|||
num_loop_over_values=len(loop_over_values),
|
||||
)
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, [])
|
||||
return self.build_block_result(success=False, status=BlockStatus.terminated)
|
||||
return self.build_block_result(
|
||||
success=False,
|
||||
failure_reason="No iterable value found for the loop block",
|
||||
status=BlockStatus.terminated,
|
||||
)
|
||||
|
||||
if not self.loop_blocks or len(self.loop_blocks) == 0:
|
||||
LOG.info(
|
||||
"No defined blocks to loop, terminating block",
|
||||
block_type=self.block_type,
|
||||
workflow_run_id=workflow_run_id,
|
||||
num_loop_blocks=len(self.loop_blocks),
|
||||
)
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, [])
|
||||
return self.build_block_result(
|
||||
success=False, failure_reason="No defined blocks to loop", status=BlockStatus.terminated
|
||||
)
|
||||
|
||||
block_outputs: list[BlockResult] = []
|
||||
for loop_idx, loop_over_value in enumerate(loop_over_values):
|
||||
context_parameters_with_value = self.get_loop_block_context_parameters(workflow_run_id, loop_over_value)
|
||||
for context_parameter in context_parameters_with_value:
|
||||
workflow_run_context.set_value(context_parameter.key, context_parameter.value)
|
||||
block_outputs = []
|
||||
for block_idx, loop_block in enumerate(self.loop_blocks):
|
||||
original_loop_block = loop_block
|
||||
loop_block = loop_block.copy()
|
||||
block_output = await loop_block.execute_safe(workflow_run_id=workflow_run_id)
|
||||
if block_output.status == BlockStatus.canceled:
|
||||
failure_message = f"ForLoopBlock: Block with type {loop_block.block_type} at index {block_idx} during loop {loop_idx} was canceled for workflow run {workflow_run_id}, canceling for loop"
|
||||
LOG.info(
|
||||
f"ForLoopBlock: Block with type {loop_block.block_type} at index {block_idx} was canceled for workflow run {workflow_run_id}, canceling for loop",
|
||||
failure_message,
|
||||
block_type=loop_block.block_type,
|
||||
workflow_run_id=workflow_run_id,
|
||||
block_idx=block_idx,
|
||||
|
@ -542,7 +580,10 @@ class ForLoopBlock(Block):
|
|||
workflow_run_context, workflow_run_id, outputs_with_loop_values
|
||||
)
|
||||
return self.build_block_result(
|
||||
success=False, output_parameter_value=outputs_with_loop_values, status=BlockStatus.canceled
|
||||
success=False,
|
||||
failure_reason=failure_message,
|
||||
output_parameter_value=outputs_with_loop_values,
|
||||
status=BlockStatus.canceled,
|
||||
)
|
||||
|
||||
loop_block = original_loop_block
|
||||
|
@ -580,6 +621,9 @@ class ForLoopBlock(Block):
|
|||
)
|
||||
break
|
||||
|
||||
# at least one block must be executed in the loop
|
||||
assert len(block_outputs) != 0
|
||||
|
||||
is_any_block_terminated = any([block_output.status == BlockStatus.terminated for block_output in block_outputs])
|
||||
for_loop_block_status = BlockStatus.completed
|
||||
if is_any_block_terminated:
|
||||
|
@ -588,7 +632,10 @@ class ForLoopBlock(Block):
|
|||
for_loop_block_status = BlockStatus.failed
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, outputs_with_loop_values)
|
||||
return self.build_block_result(
|
||||
success=success, output_parameter_value=outputs_with_loop_values, status=for_loop_block_status
|
||||
success=success,
|
||||
failure_reason=block_outputs[-1].failure_reason,
|
||||
output_parameter_value=outputs_with_loop_values,
|
||||
status=for_loop_block_status,
|
||||
)
|
||||
|
||||
|
||||
|
@ -710,7 +757,9 @@ class TextPromptBlock(Block):
|
|||
|
||||
response = await self.send_prompt(self.prompt, parameter_values)
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, response)
|
||||
return self.build_block_result(success=True, output_parameter_value=response, status=BlockStatus.completed)
|
||||
return self.build_block_result(
|
||||
success=True, failure_reason=None, output_parameter_value=response, status=BlockStatus.completed
|
||||
)
|
||||
|
||||
|
||||
class DownloadToS3Block(Block):
|
||||
|
@ -767,7 +816,9 @@ class DownloadToS3Block(Block):
|
|||
|
||||
LOG.info("DownloadToS3Block: File downloaded and uploaded to S3", uri=uri)
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, uri)
|
||||
return self.build_block_result(success=True, output_parameter_value=uri, status=BlockStatus.completed)
|
||||
return self.build_block_result(
|
||||
success=True, failure_reason=None, output_parameter_value=uri, status=BlockStatus.completed
|
||||
)
|
||||
|
||||
|
||||
class UploadToS3Block(Block):
|
||||
|
@ -841,7 +892,9 @@ class UploadToS3Block(Block):
|
|||
|
||||
LOG.info("UploadToS3Block: File(s) uploaded to S3", file_path=self.path)
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, s3_uris)
|
||||
return self.build_block_result(success=True, output_parameter_value=s3_uris, status=BlockStatus.completed)
|
||||
return self.build_block_result(
|
||||
success=True, failure_reason=None, output_parameter_value=s3_uris, status=BlockStatus.completed
|
||||
)
|
||||
|
||||
|
||||
class SendEmailBlock(Block):
|
||||
|
@ -1109,14 +1162,18 @@ class SendEmailBlock(Block):
|
|||
LOG.error("SendEmailBlock: Failed to send email", exc_info=True)
|
||||
result_dict = {"success": False, "error": str(e)}
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict)
|
||||
return self.build_block_result(success=False, output_parameter_value=result_dict, status=BlockStatus.failed)
|
||||
return self.build_block_result(
|
||||
success=False, failure_reason=str(e), output_parameter_value=result_dict, status=BlockStatus.failed
|
||||
)
|
||||
finally:
|
||||
if smtp_host:
|
||||
smtp_host.quit()
|
||||
|
||||
result_dict = {"success": True}
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, result_dict)
|
||||
return self.build_block_result(success=True, output_parameter_value=result_dict, status=BlockStatus.completed)
|
||||
return self.build_block_result(
|
||||
success=True, failure_reason=None, output_parameter_value=result_dict, status=BlockStatus.completed
|
||||
)
|
||||
|
||||
|
||||
class FileType(StrEnum):
|
||||
|
@ -1179,7 +1236,9 @@ class FileParserBlock(Block):
|
|||
parsed_data.append(row)
|
||||
# Record the parsed data
|
||||
await self.record_output_parameter_value(workflow_run_context, workflow_run_id, parsed_data)
|
||||
return self.build_block_result(success=True, output_parameter_value=parsed_data, status=BlockStatus.completed)
|
||||
return self.build_block_result(
|
||||
success=True, failure_reason=None, output_parameter_value=parsed_data, status=BlockStatus.completed
|
||||
)
|
||||
|
||||
|
||||
BlockSubclasses = Union[
|
||||
|
|
|
@ -90,6 +90,7 @@ class WorkflowRun(BaseModel):
|
|||
webhook_callback_url: str | None = None
|
||||
totp_verification_url: str | None = None
|
||||
totp_identifier: str | None = None
|
||||
failure_reason: str | None = None
|
||||
|
||||
created_at: datetime
|
||||
modified_at: datetime
|
||||
|
|
|
@ -6,7 +6,13 @@ import httpx
|
|||
import structlog
|
||||
|
||||
from skyvern import analytics
|
||||
from skyvern.exceptions import FailedToSendWebhook, MissingValueForParameter, WorkflowNotFound, WorkflowRunNotFound
|
||||
from skyvern.exceptions import (
|
||||
FailedToSendWebhook,
|
||||
MissingValueForParameter,
|
||||
SkyvernException,
|
||||
WorkflowNotFound,
|
||||
WorkflowRunNotFound,
|
||||
)
|
||||
from skyvern.forge import app
|
||||
from skyvern.forge.sdk.artifact.models import ArtifactType
|
||||
from skyvern.forge.sdk.core import skyvern_context
|
||||
|
@ -147,7 +153,14 @@ class WorkflowService:
|
|||
f"Error while setting up workflow run {workflow_run.workflow_run_id}",
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
)
|
||||
await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id)
|
||||
|
||||
failure_reason = "Setup up workflow failed due to an unexpected exception"
|
||||
if isinstance(e, SkyvernException):
|
||||
failure_reason = f"Setup workflow failed due to an SkyvernException({e.__class__.__name__})"
|
||||
|
||||
await self.mark_workflow_run_as_failed(
|
||||
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
||||
)
|
||||
raise e
|
||||
|
||||
return workflow_run
|
||||
|
@ -260,7 +273,10 @@ class WorkflowService:
|
|||
block_label=block.label,
|
||||
)
|
||||
else:
|
||||
await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id)
|
||||
failure_reason = f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} failed. failure reason: {block_result.failure_reason}"
|
||||
await self.mark_workflow_run_as_failed(
|
||||
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
||||
)
|
||||
await self.clean_up_workflow(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
|
@ -289,14 +305,17 @@ class WorkflowService:
|
|||
block_label=block.label,
|
||||
)
|
||||
else:
|
||||
await self.mark_workflow_run_as_terminated(workflow_run_id=workflow_run.workflow_run_id)
|
||||
failure_reason = f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} terminated. Reason: {block_result.failure_reason}"
|
||||
await self.mark_workflow_run_as_terminated(
|
||||
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
||||
)
|
||||
await self.clean_up_workflow(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
api_key=api_key,
|
||||
)
|
||||
return workflow_run
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
LOG.exception(
|
||||
f"Error while executing workflow run {workflow_run.workflow_run_id}",
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
|
@ -304,7 +323,15 @@ class WorkflowService:
|
|||
block_type=block.block_type,
|
||||
block_label=block.label,
|
||||
)
|
||||
await self.mark_workflow_run_as_failed(workflow_run_id=workflow_run.workflow_run_id)
|
||||
|
||||
exception_message = "unexpected exception"
|
||||
if isinstance(e, SkyvernException):
|
||||
exception_message = f"unexpected SkyvernException({e.__class__.__name__})"
|
||||
|
||||
failure_reason = f"Block with type {block.block_type} at index {block_idx}/{blocks_cnt -1} failed. failure reason: {exception_message}"
|
||||
await self.mark_workflow_run_as_failed(
|
||||
workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason
|
||||
)
|
||||
await self.clean_up_workflow(workflow=workflow, workflow_run=workflow_run, api_key=api_key)
|
||||
return workflow_run
|
||||
|
||||
|
@ -472,15 +499,17 @@ class WorkflowService:
|
|||
status=WorkflowRunStatus.completed,
|
||||
)
|
||||
|
||||
async def mark_workflow_run_as_failed(self, workflow_run_id: str) -> None:
|
||||
async def mark_workflow_run_as_failed(self, workflow_run_id: str, failure_reason: str | None) -> None:
|
||||
LOG.info(
|
||||
f"Marking workflow run {workflow_run_id} as failed",
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_status="failed",
|
||||
failure_reason=failure_reason,
|
||||
)
|
||||
await app.DATABASE.update_workflow_run(
|
||||
workflow_run_id=workflow_run_id,
|
||||
status=WorkflowRunStatus.failed,
|
||||
failure_reason=failure_reason,
|
||||
)
|
||||
|
||||
async def mark_workflow_run_as_running(self, workflow_run_id: str) -> None:
|
||||
|
@ -494,15 +523,17 @@ class WorkflowService:
|
|||
status=WorkflowRunStatus.running,
|
||||
)
|
||||
|
||||
async def mark_workflow_run_as_terminated(self, workflow_run_id: str) -> None:
|
||||
async def mark_workflow_run_as_terminated(self, workflow_run_id: str, failure_reason: str | None) -> None:
|
||||
LOG.info(
|
||||
f"Marking workflow run {workflow_run_id} as terminated",
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_status="terminated",
|
||||
failure_reason=failure_reason,
|
||||
)
|
||||
await app.DATABASE.update_workflow_run(
|
||||
workflow_run_id=workflow_run_id,
|
||||
status=WorkflowRunStatus.terminated,
|
||||
failure_reason=failure_reason,
|
||||
)
|
||||
|
||||
async def mark_workflow_run_as_canceled(self, workflow_run_id: str) -> None:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue