Add failure_category to tasks and workflow runs (SKY-8469) (#5254)

This commit is contained in:
LawyZheng 2026-03-27 02:13:52 +08:00 committed by GitHub
parent e7b2f09d00
commit 9d9ae67fe4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 651 additions and 24 deletions

View file

@ -50,6 +50,7 @@ from skyvern.exceptions import (
get_user_facing_exception_message,
)
from skyvern.forge import app
from skyvern.forge.failure_classifier import classify_from_failure_reason
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType
from skyvern.forge.sdk.cache.factory import CacheFactory
@ -3480,6 +3481,7 @@ class WorkflowService:
failure_reason: str | None = None,
run_with: str | None = None,
ai_fallback: bool | None = None,
failure_category: list[dict] | None = None,
) -> WorkflowRun:
workflow_run = await app.DATABASE.update_workflow_run(
workflow_run_id=workflow_run_id,
@ -3487,6 +3489,7 @@ class WorkflowService:
failure_reason=failure_reason,
run_with=run_with,
ai_fallback=ai_fallback,
failure_category=failure_category,
)
if status in [WorkflowRunStatus.completed, WorkflowRunStatus.failed, WorkflowRunStatus.terminated]:
start_time = (
@ -3561,6 +3564,7 @@ class WorkflowService:
workflow_run_id: str,
failure_reason: str | None,
run_with: str | None = None,
failure_category: list[dict] | None = None,
) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as failed",
@ -3572,11 +3576,16 @@ class WorkflowService:
# Add workflow failure tag to trace
otel_trace.get_current_span().set_attribute("task.completion_status", WorkflowRunStatus.failed)
# Auto-classify if no explicit category provided
if failure_category is None:
failure_category = classify_from_failure_reason(failure_reason)
return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.failed,
failure_reason=failure_reason,
run_with=run_with,
failure_category=failure_category,
)
async def mark_workflow_run_as_running(self, workflow_run_id: str, run_with: str | None = None) -> WorkflowRun:
@ -3605,6 +3614,7 @@ class WorkflowService:
workflow_run_id: str,
failure_reason: str | None,
run_with: str | None = None,
failure_category: list[dict] | None = None,
) -> WorkflowRun:
LOG.info(
f"Marking workflow run {workflow_run_id} as terminated",
@ -3616,11 +3626,16 @@ class WorkflowService:
# Add workflow terminated tag to trace
otel_trace.get_current_span().set_attribute("task.completion_status", WorkflowRunStatus.terminated)
# Auto-classify if no explicit category provided
if failure_category is None:
failure_category = classify_from_failure_reason(failure_reason)
return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.terminated,
failure_reason=failure_reason,
run_with=run_with,
failure_category=failure_category,
)
async def mark_workflow_run_as_canceled(self, workflow_run_id: str, run_with: str | None = None) -> WorkflowRun:
@ -3654,11 +3669,14 @@ class WorkflowService:
# Add workflow timed out tag to trace
otel_trace.get_current_span().set_attribute("task.completion_status", WorkflowRunStatus.timed_out)
failure_category = classify_from_failure_reason(failure_reason)
return await self._update_workflow_run_status(
workflow_run_id=workflow_run_id,
status=WorkflowRunStatus.timed_out,
failure_reason=failure_reason,
run_with=run_with,
failure_category=failure_category,
)
async def get_workflow_run(self, workflow_run_id: str, organization_id: str | None = None) -> WorkflowRun: