Skyvern/skyvern/forge/sdk/schemas/workflow_runs.py

56 lines
1.6 KiB
Python

from __future__ import annotations
from datetime import datetime
from enum import StrEnum
from typing import Any
from pydantic import BaseModel
from skyvern.forge.sdk.schemas.observers import ObserverThought
from skyvern.forge.sdk.workflow.models.block import BlockType
from skyvern.webeye.actions.actions import Action
class WorkflowRunBlock(BaseModel):
workflow_run_block_id: str
workflow_run_id: str
organization_id: str | None = None
parent_workflow_run_block_id: str | None = None
block_type: BlockType
label: str | None = None
status: str | None = None
output: dict | list | str | None = None
continue_on_failure: bool = False
failure_reason: str | None = None
task_id: str | None = None
url: str | None = None
navigation_goal: str | None = None
navigation_payload: dict[str, Any] | list | str | None = None
data_extraction_goal: str | None = None
data_schema: dict[str, Any] | list | str | None = None
terminate_criterion: str | None = None
complete_criterion: str | None = None
actions: list[Action] = []
created_at: datetime
modified_at: datetime
# for loop block
loop_values: list[Any] | None = None
# block inside a loop block
current_value: str | None = None
current_index: int | None = None
class WorkflowRunTimelineType(StrEnum):
thought = "thought"
block = "block"
class WorkflowRunTimeline(BaseModel):
type: WorkflowRunTimelineType
block: WorkflowRunBlock | None = None
thought: ObserverThought | None = None
children: list[WorkflowRunTimeline] = []
created_at: datetime
modified_at: datetime