Make text prompt executions a step in workflow runs (#1590)

This commit is contained in:
Shuchang Zheng 2025-01-17 09:17:31 -08:00 committed by GitHub
parent bb834fa137
commit e4cb0987cb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 25 additions and 1 deletions

View file

@ -311,6 +311,13 @@ class StepTerminationError(SkyvernException):
super().__init__(f"Step {step_id} cannot be executed and task is failed. Reason: {reason}") super().__init__(f"Step {step_id} cannot be executed and task is failed. Reason: {reason}")
class BlockTerminationError(SkyvernException):
def __init__(self, workflow_run_block_id: str, workflow_run_id: str, reason: str) -> None:
super().__init__(
f"Block {workflow_run_block_id} cannot be executed and workflow run {workflow_run_id} is failed. Reason: {reason}"
)
class StepUnableToExecuteError(SkyvernException): class StepUnableToExecuteError(SkyvernException):
def __init__(self, step_id: str, reason: str) -> None: def __init__(self, step_id: str, reason: str) -> None:
super().__init__(f"Step {step_id} cannot be executed and task execution is stopped. Reason: {reason}") super().__init__(f"Step {step_id} cannot be executed and task execution is stopped. Reason: {reason}")

View file

@ -17,6 +17,7 @@ from skyvern.forge.sdk.api.llm.exceptions import LLMProviderError
from skyvern.forge.sdk.models import Step, StepStatus from skyvern.forge.sdk.models import Step, StepStatus
from skyvern.forge.sdk.schemas.organizations import Organization from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus from skyvern.forge.sdk.schemas.tasks import Task, TaskStatus
from skyvern.forge.sdk.workflow.models.block import BlockTypeVar
from skyvern.webeye.browser_factory import BrowserState from skyvern.webeye.browser_factory import BrowserState
from skyvern.webeye.scraper.scraper import ELEMENT_NODE_ATTRIBUTES, CleanupElementTreeFunc, json_to_html from skyvern.webeye.scraper.scraper import ELEMENT_NODE_ATTRIBUTES, CleanupElementTreeFunc, json_to_html
@ -379,6 +380,11 @@ class AgentFunction:
if not can_execute: if not can_execute:
raise StepUnableToExecuteError(step_id=step.step_id, reason=f"Cannot execute step. Reasons: {reasons}") raise StepUnableToExecuteError(step_id=step.step_id, reason=f"Cannot execute step. Reasons: {reasons}")
async def validate_block_execution(
self, block: BlockTypeVar, workflow_run_id: str, workflow_run_block_id: str, organization_id: str | None
) -> None:
return
async def prepare_step_execution( async def prepare_step_execution(
self, self,
organization: Organization | None, organization: Organization | None,

View file

@ -1181,6 +1181,13 @@ class TextPromptBlock(Block):
browser_session_id: str | None = None, browser_session_id: str | None = None,
**kwargs: dict, **kwargs: dict,
) -> BlockResult: ) -> BlockResult:
# Validate block execution
await app.AGENT_FUNCTION.validate_block_execution(
block=self,
workflow_run_block_id=workflow_run_block_id,
workflow_run_id=workflow_run_id,
organization_id=organization_id,
)
# get workflow run context # get workflow run context
workflow_run_context = self.get_workflow_run_context(workflow_run_id) workflow_run_context = self.get_workflow_run_context(workflow_run_id)
await app.DATABASE.update_workflow_run_block( await app.DATABASE.update_workflow_run_block(

View file

@ -860,11 +860,15 @@ class WorkflowService:
workflow_run_steps = await app.DATABASE.get_steps_by_task_ids( workflow_run_steps = await app.DATABASE.get_steps_by_task_ids(
task_ids=[task.task_id for task in workflow_run_tasks], organization_id=organization_id task_ids=[task.task_id for task in workflow_run_tasks], organization_id=organization_id
) )
workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks(
workflow_run_id=workflow_run_id, organization_id=organization_id
)
text_prompt_blocks = [block for block in workflow_run_blocks if block.block_type == BlockType.TEXT_PROMPT]
total_steps = len(workflow_run_steps) total_steps = len(workflow_run_steps)
# TODO: This is a temporary cost calculation. We need to implement a more accurate cost calculation. # TODO: This is a temporary cost calculation. We need to implement a more accurate cost calculation.
# successful steps are the ones that have a status of completed and the total count of unique step.order # successful steps are the ones that have a status of completed and the total count of unique step.order
successful_steps = [step for step in workflow_run_steps if step.status == StepStatus.completed] successful_steps = [step for step in workflow_run_steps if step.status == StepStatus.completed]
total_cost = 0.1 * len(successful_steps) total_cost = 0.1 * (len(successful_steps) + len(text_prompt_blocks))
return WorkflowRunStatusResponse( return WorkflowRunStatusResponse(
workflow_id=workflow.workflow_permanent_id, workflow_id=workflow.workflow_permanent_id,
workflow_run_id=workflow_run_id, workflow_run_id=workflow_run_id,