mirror of
https://github.com/Skyvern-AI/skyvern.git
synced 2025-09-07 13:10:11 +00:00
967 lines
35 KiB
Python
967 lines
35 KiB
Python
import asyncio
|
|
import base64
|
|
import hashlib
|
|
import importlib.util
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Any, cast
|
|
|
|
import structlog
|
|
from fastapi import BackgroundTasks, HTTPException
|
|
from jinja2.sandbox import SandboxedEnvironment
|
|
|
|
from skyvern.constants import GET_DOWNLOADED_FILES_TIMEOUT
|
|
from skyvern.core.script_generations.constants import SCRIPT_TASK_BLOCKS
|
|
from skyvern.core.script_generations.script_run_context_manager import script_run_context_manager
|
|
from skyvern.exceptions import ScriptNotFound, WorkflowRunNotFound
|
|
from skyvern.forge import app
|
|
from skyvern.forge.prompts import prompt_engine
|
|
from skyvern.forge.sdk.core import skyvern_context
|
|
from skyvern.forge.sdk.models import StepStatus
|
|
from skyvern.forge.sdk.schemas.files import FileInfo
|
|
from skyvern.forge.sdk.schemas.tasks import TaskOutput, TaskStatus
|
|
from skyvern.forge.sdk.workflow.models.block import TaskBlock
|
|
from skyvern.schemas.runs import RunEngine
|
|
from skyvern.schemas.scripts import CreateScriptResponse, FileNode, ScriptFileCreate
|
|
from skyvern.schemas.workflows import BlockStatus, BlockType
|
|
|
|
LOG = structlog.get_logger(__name__)
|
|
jinja_sandbox_env = SandboxedEnvironment()
|
|
|
|
|
|
async def build_file_tree(
|
|
files: list[ScriptFileCreate],
|
|
organization_id: str,
|
|
script_id: str,
|
|
script_version: int,
|
|
script_revision_id: str,
|
|
) -> dict[str, FileNode]:
|
|
"""Build a hierarchical file tree from a list of files and upload the files to s3 with the same tree structure."""
|
|
file_tree: dict[str, FileNode] = {}
|
|
|
|
for file in files:
|
|
# Decode content to calculate size and hash
|
|
content_bytes = base64.b64decode(file.content)
|
|
content_hash = hashlib.sha256(content_bytes).hexdigest()
|
|
file_size = len(content_bytes)
|
|
|
|
# Create artifact and upload to S3
|
|
try:
|
|
artifact_id = await app.ARTIFACT_MANAGER.create_script_file_artifact(
|
|
organization_id=organization_id,
|
|
script_id=script_id,
|
|
script_version=script_version,
|
|
file_path=file.path,
|
|
data=content_bytes,
|
|
)
|
|
LOG.debug(
|
|
"Created script file artifact",
|
|
artifact_id=artifact_id,
|
|
file_path=file.path,
|
|
script_id=script_id,
|
|
script_version=script_version,
|
|
)
|
|
# create a script file record
|
|
await app.DATABASE.create_script_file(
|
|
script_revision_id=script_revision_id,
|
|
script_id=script_id,
|
|
organization_id=organization_id,
|
|
file_path=file.path,
|
|
file_name=file.path.split("/")[-1],
|
|
file_type="file",
|
|
content_hash=f"sha256:{content_hash}",
|
|
file_size=file_size,
|
|
mime_type=file.mime_type,
|
|
artifact_id=artifact_id,
|
|
)
|
|
except Exception:
|
|
LOG.exception(
|
|
"Failed to create script file artifact",
|
|
file_path=file.path,
|
|
script_id=script_id,
|
|
script_version=script_version,
|
|
script_revision_id=script_revision_id,
|
|
)
|
|
raise
|
|
|
|
# Split path into components
|
|
path_parts = file.path.split("/")
|
|
current_level = file_tree
|
|
|
|
# Create directory structure
|
|
for _, part in enumerate(path_parts[:-1]):
|
|
if part not in current_level:
|
|
current_level[part] = FileNode(type="directory", created_at=datetime.utcnow(), children={})
|
|
elif current_level[part].type == "file":
|
|
# Convert file to directory if needed
|
|
current_level[part] = FileNode(type="directory", created_at=current_level[part].created_at, children={})
|
|
|
|
current_level = current_level[part].children or {}
|
|
|
|
# Add the file
|
|
filename = path_parts[-1]
|
|
current_level[filename] = FileNode(
|
|
type="file",
|
|
size=file_size,
|
|
mime_type=file.mime_type,
|
|
content_hash=f"sha256:{content_hash}",
|
|
created_at=datetime.utcnow(),
|
|
)
|
|
|
|
return file_tree
|
|
|
|
|
|
async def create_script(
|
|
organization_id: str,
|
|
workflow_id: str | None = None,
|
|
run_id: str | None = None,
|
|
files: list[ScriptFileCreate] | None = None,
|
|
) -> CreateScriptResponse:
|
|
LOG.info(
|
|
"Creating script",
|
|
organization_id=organization_id,
|
|
file_count=len(files) if files else 0,
|
|
)
|
|
|
|
try:
|
|
if run_id and not await app.DATABASE.get_run(run_id=run_id, organization_id=organization_id):
|
|
raise HTTPException(status_code=404, detail=f"Run_id {run_id} not found")
|
|
|
|
script = await app.DATABASE.create_script(
|
|
organization_id=organization_id,
|
|
run_id=run_id,
|
|
)
|
|
|
|
file_tree: dict[str, FileNode] = {}
|
|
file_count = 0
|
|
if files:
|
|
file_tree = await build_file_tree(
|
|
files,
|
|
organization_id=organization_id,
|
|
script_id=script.script_id,
|
|
script_version=script.version,
|
|
script_revision_id=script.script_revision_id,
|
|
)
|
|
file_count = len(files)
|
|
|
|
return CreateScriptResponse(
|
|
script_id=script.script_id,
|
|
version=script.version,
|
|
run_id=script.run_id,
|
|
file_count=file_count,
|
|
created_at=script.created_at,
|
|
file_tree=file_tree,
|
|
)
|
|
except Exception as e:
|
|
LOG.error("Failed to create script", error=str(e), exc_info=True)
|
|
raise HTTPException(status_code=500, detail="Failed to create script")
|
|
|
|
|
|
async def execute_script(
|
|
script_id: str,
|
|
organization_id: str,
|
|
parameters: dict[str, Any] | None = None,
|
|
workflow_run_id: str | None = None,
|
|
background_tasks: BackgroundTasks | None = None,
|
|
) -> None:
|
|
# TODO: assume the script only has one ScriptFile called main.py
|
|
# step 1: get the script revision
|
|
# step 2: get the script files
|
|
# step 3: copy the script files to the local directory
|
|
# step 4: execute the script
|
|
# step 5: TODO: close all the browser instances
|
|
|
|
# step 1: get the script revision
|
|
script = await app.DATABASE.get_script(
|
|
script_id=script_id,
|
|
organization_id=organization_id,
|
|
)
|
|
if not script:
|
|
raise ScriptNotFound(script_id=script_id)
|
|
|
|
# step 2: get the script files
|
|
script_files = await app.DATABASE.get_script_files(
|
|
script_revision_id=script.script_revision_id, organization_id=organization_id
|
|
)
|
|
|
|
# step 3: copy the script files to the local directory
|
|
for file in script_files:
|
|
# retrieve the artifact
|
|
if not file.artifact_id:
|
|
continue
|
|
artifact = await app.DATABASE.get_artifact_by_id(file.artifact_id, organization_id)
|
|
if not artifact:
|
|
LOG.error("Artifact not found", artifact_id=file.artifact_id, script_id=script_id)
|
|
continue
|
|
file_content = await app.ARTIFACT_MANAGER.retrieve_artifact(artifact)
|
|
if not file_content:
|
|
continue
|
|
file_path = os.path.join(script.script_id, file.file_path)
|
|
# create the directory if it doesn't exist
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
|
|
|
# Determine the encoding to use
|
|
encoding = "utf-8"
|
|
|
|
try:
|
|
# Try to decode as text
|
|
if file.mime_type and file.mime_type.startswith("text/"):
|
|
# Text file - decode as string
|
|
with open(file_path, "w", encoding=encoding) as f:
|
|
f.write(file_content.decode(encoding))
|
|
else:
|
|
# Binary file - write as bytes
|
|
with open(file_path, "wb") as f:
|
|
f.write(file_content)
|
|
except UnicodeDecodeError:
|
|
# Fallback to binary mode if text decoding fails
|
|
with open(file_path, "wb") as f:
|
|
f.write(file_content)
|
|
|
|
# step 4: execute the script
|
|
if workflow_run_id and not parameters:
|
|
parameter_tuples = await app.DATABASE.get_workflow_run_parameters(workflow_run_id=workflow_run_id)
|
|
parameters = {wf_param.key: run_param.value for wf_param, run_param in parameter_tuples}
|
|
LOG.info("Script run Parameters is using workflow run parameters", parameters=parameters)
|
|
|
|
if background_tasks:
|
|
# Execute asynchronously in background
|
|
background_tasks.add_task(
|
|
run_script, parameters=parameters, organization_id=organization_id, workflow_run_id=workflow_run_id
|
|
)
|
|
else:
|
|
# Execute synchronously
|
|
script_path = os.path.join(script.script_id, "main.py")
|
|
if os.path.exists(script_path):
|
|
await run_script(
|
|
script_path, parameters=parameters, organization_id=organization_id, workflow_run_id=workflow_run_id
|
|
)
|
|
else:
|
|
LOG.error("Script main.py not found", script_path=script_path, script_id=script_id)
|
|
raise Exception(f"Script main.py not found at {script_path}")
|
|
|
|
LOG.info("Script executed successfully", script_id=script_id)
|
|
|
|
|
|
async def _create_workflow_block_run_and_task(
|
|
block_type: BlockType,
|
|
prompt: str | None = None,
|
|
url: str | None = None,
|
|
) -> tuple[str | None, str | None, str | None]:
|
|
"""
|
|
Create a workflow block run and optionally a task if workflow_run_id is available in context.
|
|
Returns (workflow_run_block_id, task_id) tuple.
|
|
"""
|
|
context = skyvern_context.current()
|
|
if not context or not context.workflow_run_id or not context.organization_id:
|
|
return None, None, None
|
|
workflow_run_id = context.workflow_run_id
|
|
organization_id = context.organization_id
|
|
|
|
try:
|
|
# Create workflow run block with appropriate parameters based on block type
|
|
# TODO: support engine in the future
|
|
engine = None
|
|
workflow_run_block = await app.DATABASE.create_workflow_run_block(
|
|
workflow_run_id=workflow_run_id,
|
|
organization_id=organization_id,
|
|
block_type=block_type,
|
|
engine=engine,
|
|
)
|
|
|
|
workflow_run_block_id = workflow_run_block.workflow_run_block_id
|
|
task_id = None
|
|
step_id = None
|
|
|
|
# Create task for task-based blocks
|
|
if block_type in SCRIPT_TASK_BLOCKS:
|
|
# Create task
|
|
task = await app.DATABASE.create_task(
|
|
# fix HACK: changed the type of url to str | None to support None url. url is not used in the script right now.
|
|
url=url or "",
|
|
title=f"Script {block_type.value} task",
|
|
navigation_goal=prompt,
|
|
data_extraction_goal=prompt if block_type == BlockType.EXTRACTION else None,
|
|
navigation_payload={},
|
|
status="running",
|
|
organization_id=organization_id,
|
|
workflow_run_id=workflow_run_id,
|
|
)
|
|
|
|
task_id = task.task_id
|
|
|
|
# create a single step for the task
|
|
step = await app.DATABASE.create_step(
|
|
task_id=task_id,
|
|
order=0,
|
|
retry_index=0,
|
|
organization_id=organization_id,
|
|
status=StepStatus.running,
|
|
)
|
|
step_id = step.step_id
|
|
|
|
# Update workflow run block with task_id
|
|
await app.DATABASE.update_workflow_run_block(
|
|
workflow_run_block_id=workflow_run_block_id,
|
|
task_id=task_id,
|
|
organization_id=organization_id,
|
|
)
|
|
|
|
context.step_id = step_id
|
|
context.task_id = task_id
|
|
|
|
return workflow_run_block_id, task_id, step_id
|
|
|
|
except Exception as e:
|
|
LOG.warning(
|
|
"Failed to create workflow block run and task",
|
|
error=str(e),
|
|
block_type=block_type,
|
|
workflow_run_id=context.workflow_run_id,
|
|
exc_info=True,
|
|
)
|
|
return None, None, None
|
|
|
|
|
|
async def _record_output_parameter_value(
|
|
workflow_run_id: str,
|
|
workflow_id: str,
|
|
organization_id: str,
|
|
output: dict[str, Any] | list | str | None,
|
|
label: str | None = None,
|
|
) -> None:
|
|
if not label:
|
|
return
|
|
# TODO support this in the future
|
|
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
|
|
# get the workflow
|
|
workflow = await app.DATABASE.get_workflow(workflow_id=workflow_id, organization_id=organization_id)
|
|
if not workflow:
|
|
return
|
|
|
|
# get the output_paramter
|
|
output_parameter = workflow.get_output_parameter(label)
|
|
if not output_parameter:
|
|
return
|
|
|
|
await workflow_run_context.register_output_parameter_value_post_execution(
|
|
parameter=output_parameter,
|
|
value=output,
|
|
)
|
|
await app.DATABASE.create_or_update_workflow_run_output_parameter(
|
|
workflow_run_id=workflow_run_id,
|
|
output_parameter_id=output_parameter.output_parameter_id,
|
|
value=output,
|
|
)
|
|
|
|
|
|
async def _update_workflow_block(
|
|
workflow_run_block_id: str,
|
|
status: BlockStatus,
|
|
task_id: str | None = None,
|
|
task_status: TaskStatus = TaskStatus.completed,
|
|
step_id: str | None = None,
|
|
step_status: StepStatus = StepStatus.completed,
|
|
is_last: bool | None = True,
|
|
label: str | None = None,
|
|
failure_reason: str | None = None,
|
|
output: dict[str, Any] | list | str | None = None,
|
|
) -> None:
|
|
"""Update the status of a workflow run block."""
|
|
try:
|
|
context = skyvern_context.current()
|
|
if not context or not context.organization_id or not context.workflow_run_id or not context.workflow_id:
|
|
return
|
|
final_output = output
|
|
if task_id:
|
|
if step_id:
|
|
await app.DATABASE.update_step(
|
|
step_id=step_id,
|
|
task_id=task_id,
|
|
organization_id=context.organization_id,
|
|
status=step_status,
|
|
is_last=is_last,
|
|
)
|
|
updated_task = await app.DATABASE.update_task(
|
|
task_id=task_id,
|
|
organization_id=context.organization_id,
|
|
status=task_status,
|
|
failure_reason=failure_reason,
|
|
extracted_information=output,
|
|
)
|
|
downloaded_files: list[FileInfo] = []
|
|
try:
|
|
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
|
|
downloaded_files = await app.STORAGE.get_downloaded_files(
|
|
organization_id=context.organization_id,
|
|
run_id=context.workflow_run_id,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
LOG.warning("Timeout getting downloaded files", task_id=task_id)
|
|
|
|
task_output = TaskOutput.from_task(updated_task, downloaded_files)
|
|
final_output = task_output.model_dump()
|
|
await app.DATABASE.update_workflow_run_block(
|
|
workflow_run_block_id=workflow_run_block_id,
|
|
organization_id=context.organization_id if context else None,
|
|
status=status,
|
|
failure_reason=failure_reason,
|
|
output=final_output,
|
|
)
|
|
else:
|
|
final_output = None
|
|
await app.DATABASE.update_workflow_run_block(
|
|
workflow_run_block_id=workflow_run_block_id,
|
|
organization_id=context.organization_id if context else None,
|
|
status=status,
|
|
failure_reason=failure_reason,
|
|
)
|
|
await _record_output_parameter_value(
|
|
context.workflow_run_id,
|
|
context.workflow_id,
|
|
context.organization_id,
|
|
final_output,
|
|
label,
|
|
)
|
|
|
|
except Exception as e:
|
|
LOG.warning(
|
|
"Failed to update workflow block status",
|
|
workflow_run_block_id=workflow_run_block_id,
|
|
status=status,
|
|
error=str(e),
|
|
exc_info=True,
|
|
)
|
|
|
|
|
|
async def _run_cached_function(cache_key: str) -> Any:
|
|
cached_fn = script_run_context_manager.get_cached_fn(cache_key)
|
|
if cached_fn:
|
|
# TODO: handle exceptions here and fall back to AI run in case of error
|
|
run_context = script_run_context_manager.ensure_run_context()
|
|
return await cached_fn(page=run_context.page, context=run_context)
|
|
else:
|
|
raise Exception(f"Cache key {cache_key} not found")
|
|
|
|
|
|
async def _fallback_to_ai_run(
|
|
cache_key: str,
|
|
prompt: str | None = None,
|
|
url: str | None = None,
|
|
engine: RunEngine = RunEngine.skyvern_v1,
|
|
complete_criterion: str | None = None,
|
|
terminate_criterion: str | None = None,
|
|
data_extraction_goal: str | None = None,
|
|
schema: dict[str, Any] | list | str | None = None,
|
|
error_code_mapping: dict[str, str] | None = None,
|
|
max_steps: int | None = None,
|
|
complete_on_download: bool = False,
|
|
download_suffix: str | None = None,
|
|
totp_verification_url: str | None = None,
|
|
totp_identifier: str | None = None,
|
|
complete_verification: bool = True,
|
|
include_action_history_in_verification: bool = False,
|
|
error: Exception | None = None,
|
|
workflow_run_block_id: str | None = None,
|
|
) -> None:
|
|
context = skyvern_context.current()
|
|
if not (
|
|
context
|
|
and context.organization_id
|
|
and context.workflow_run_id
|
|
and context.workflow_id
|
|
and context.task_id
|
|
and context.step_id
|
|
):
|
|
return
|
|
try:
|
|
organization_id = context.organization_id
|
|
LOG.info(
|
|
"Script fallback to AI run",
|
|
cache_key=cache_key,
|
|
organization_id=organization_id,
|
|
workflow_id=context.workflow_id,
|
|
workflow_run_id=context.workflow_run_id,
|
|
task_id=context.task_id,
|
|
step_id=context.step_id,
|
|
)
|
|
# 1. fail the previous step
|
|
previous_step = await app.DATABASE.update_step(
|
|
step_id=context.step_id,
|
|
task_id=context.task_id,
|
|
organization_id=organization_id,
|
|
status=StepStatus.failed,
|
|
)
|
|
# 2. create a new step for ai run
|
|
ai_step = await app.DATABASE.create_step(
|
|
task_id=context.task_id,
|
|
organization_id=organization_id,
|
|
order=previous_step.order + 1,
|
|
retry_index=0,
|
|
)
|
|
context.step_id = ai_step.step_id
|
|
# 3. build the task block
|
|
# 4. run execute_step
|
|
organization = await app.DATABASE.get_organization(organization_id=organization_id)
|
|
if not organization:
|
|
raise Exception(f"Organization is missing organization_id={organization_id}")
|
|
task = await app.DATABASE.get_task(task_id=context.task_id, organization_id=organization_id)
|
|
if not task:
|
|
raise Exception(f"Task is missing task_id={context.task_id}")
|
|
workflow = await app.DATABASE.get_workflow(workflow_id=context.workflow_id, organization_id=organization_id)
|
|
if not workflow:
|
|
return
|
|
|
|
# get the output_paramter
|
|
output_parameter = workflow.get_output_parameter(cache_key)
|
|
if not output_parameter:
|
|
return
|
|
|
|
task_block = TaskBlock(
|
|
label=cache_key,
|
|
url=task.url,
|
|
navigation_goal=prompt,
|
|
output_parameter=output_parameter,
|
|
title=cache_key,
|
|
engine=engine,
|
|
complete_criterion=complete_criterion,
|
|
terminate_criterion=terminate_criterion,
|
|
data_extraction_goal=data_extraction_goal,
|
|
data_schema=schema,
|
|
error_code_mapping=error_code_mapping,
|
|
max_steps_per_run=max_steps,
|
|
complete_on_download=complete_on_download,
|
|
download_suffix=download_suffix,
|
|
totp_verification_url=totp_verification_url,
|
|
totp_identifier=totp_identifier,
|
|
complete_verification=complete_verification,
|
|
include_action_history_in_verification=include_action_history_in_verification,
|
|
)
|
|
await app.agent.execute_step(
|
|
organization=organization,
|
|
task=task,
|
|
step=ai_step,
|
|
task_block=task_block,
|
|
)
|
|
# Update block status to completed if workflow block was created
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.completed,
|
|
task_id=context.task_id,
|
|
step_id=context.step_id,
|
|
label=cache_key,
|
|
)
|
|
except Exception as e:
|
|
LOG.warning("Failed to fallback to AI run", cache_key=cache_key, exc_info=True)
|
|
# Update block status to failed if workflow block was created
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.failed,
|
|
task_id=context.task_id,
|
|
task_status=TaskStatus.failed,
|
|
label=cache_key,
|
|
failure_reason=str(e),
|
|
)
|
|
raise e
|
|
|
|
|
|
async def run_task(
|
|
prompt: str,
|
|
url: str | None = None,
|
|
max_steps: int | None = None,
|
|
cache_key: str | None = None,
|
|
) -> None:
|
|
# Auto-create workflow block run and task if workflow_run_id is available
|
|
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
|
|
block_type=BlockType.TASK,
|
|
prompt=prompt,
|
|
url=url,
|
|
)
|
|
# set the prompt in the RunContext
|
|
run_context = script_run_context_manager.ensure_run_context()
|
|
run_context.prompt = prompt
|
|
|
|
if cache_key:
|
|
try:
|
|
await _run_cached_function(cache_key)
|
|
|
|
# Update block status to completed if workflow block was created
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.completed,
|
|
task_id=task_id,
|
|
step_id=step_id,
|
|
label=cache_key,
|
|
)
|
|
|
|
except Exception as e:
|
|
LOG.exception("Failed to run task block. Falling back to AI run.")
|
|
await _fallback_to_ai_run(
|
|
cache_key=cache_key,
|
|
prompt=prompt,
|
|
url=url,
|
|
max_steps=max_steps,
|
|
error=e,
|
|
workflow_run_block_id=workflow_run_block_id,
|
|
)
|
|
finally:
|
|
# clear the prompt in the RunContext
|
|
run_context.prompt = None
|
|
else:
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.failed,
|
|
task_id=task_id,
|
|
task_status=TaskStatus.failed,
|
|
step_id=step_id,
|
|
step_status=StepStatus.failed,
|
|
failure_reason="Cache key is required",
|
|
)
|
|
run_context.prompt = None
|
|
raise Exception("Cache key is required to run task block in a script")
|
|
|
|
|
|
async def download(
|
|
prompt: str,
|
|
url: str | None = None,
|
|
max_steps: int | None = None,
|
|
cache_key: str | None = None,
|
|
) -> None:
|
|
# Auto-create workflow block run and task if workflow_run_id is available
|
|
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
|
|
block_type=BlockType.FILE_DOWNLOAD,
|
|
prompt=prompt,
|
|
url=url,
|
|
)
|
|
# set the prompt in the RunContext
|
|
run_context = script_run_context_manager.ensure_run_context()
|
|
run_context.prompt = prompt
|
|
|
|
if cache_key:
|
|
try:
|
|
await _run_cached_function(cache_key)
|
|
|
|
# Update block status to completed if workflow block was created
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.completed,
|
|
task_id=task_id,
|
|
step_id=step_id,
|
|
label=cache_key,
|
|
)
|
|
|
|
except Exception as e:
|
|
LOG.exception("Failed to run download block. Falling back to AI run.")
|
|
await _fallback_to_ai_run(
|
|
cache_key=cache_key,
|
|
prompt=prompt,
|
|
url=url,
|
|
max_steps=max_steps,
|
|
complete_on_download=True,
|
|
error=e,
|
|
workflow_run_block_id=workflow_run_block_id,
|
|
)
|
|
finally:
|
|
run_context.prompt = None
|
|
else:
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.failed,
|
|
task_id=task_id,
|
|
task_status=TaskStatus.failed,
|
|
step_id=step_id,
|
|
step_status=StepStatus.failed,
|
|
failure_reason="Cache key is required",
|
|
)
|
|
run_context.prompt = None
|
|
raise Exception("Cache key is required to run task block in a script")
|
|
|
|
|
|
async def action(
|
|
prompt: str,
|
|
url: str | None = None,
|
|
max_steps: int | None = None,
|
|
cache_key: str | None = None,
|
|
) -> None:
|
|
# Auto-create workflow block run and task if workflow_run_id is available
|
|
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
|
|
block_type=BlockType.ACTION,
|
|
prompt=prompt,
|
|
url=url,
|
|
)
|
|
# set the prompt in the RunContext
|
|
run_context = script_run_context_manager.ensure_run_context()
|
|
run_context.prompt = prompt
|
|
|
|
if cache_key:
|
|
try:
|
|
await _run_cached_function(cache_key)
|
|
|
|
# Update block status to completed if workflow block was created
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.completed,
|
|
task_id=task_id,
|
|
step_id=step_id,
|
|
label=cache_key,
|
|
)
|
|
|
|
except Exception as e:
|
|
LOG.exception("Failed to run action block. Falling back to AI run.")
|
|
await _fallback_to_ai_run(
|
|
cache_key=cache_key,
|
|
prompt=prompt,
|
|
url=url,
|
|
max_steps=max_steps,
|
|
error=e,
|
|
workflow_run_block_id=workflow_run_block_id,
|
|
)
|
|
finally:
|
|
run_context.prompt = None
|
|
else:
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.failed,
|
|
task_id=task_id,
|
|
task_status=TaskStatus.failed,
|
|
step_id=step_id,
|
|
step_status=StepStatus.failed,
|
|
failure_reason="Cache key is required",
|
|
)
|
|
run_context.prompt = None
|
|
raise Exception("Cache key is required to run task block in a script")
|
|
|
|
|
|
async def login(
|
|
prompt: str,
|
|
url: str | None = None,
|
|
max_steps: int | None = None,
|
|
cache_key: str | None = None,
|
|
) -> None:
|
|
# Auto-create workflow block run and task if workflow_run_id is available
|
|
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
|
|
block_type=BlockType.LOGIN,
|
|
prompt=prompt,
|
|
url=url,
|
|
)
|
|
# set the prompt in the RunContext
|
|
run_context = script_run_context_manager.ensure_run_context()
|
|
run_context.prompt = prompt
|
|
|
|
if cache_key:
|
|
try:
|
|
await _run_cached_function(cache_key)
|
|
|
|
# Update block status to completed if workflow block was created
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.completed,
|
|
task_id=task_id,
|
|
step_id=step_id,
|
|
label=cache_key,
|
|
)
|
|
|
|
except Exception as e:
|
|
LOG.exception("Failed to run login block. Falling back to AI run.")
|
|
await _fallback_to_ai_run(
|
|
cache_key=cache_key,
|
|
prompt=prompt,
|
|
url=url,
|
|
max_steps=max_steps,
|
|
error=e,
|
|
workflow_run_block_id=workflow_run_block_id,
|
|
)
|
|
finally:
|
|
run_context.prompt = None
|
|
else:
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.failed,
|
|
task_id=task_id,
|
|
task_status=TaskStatus.failed,
|
|
step_id=step_id,
|
|
step_status=StepStatus.failed,
|
|
failure_reason="Cache key is required",
|
|
)
|
|
run_context.prompt = None
|
|
raise Exception("Cache key is required to run task block in a script")
|
|
|
|
|
|
async def extract(
|
|
prompt: str,
|
|
url: str | None = None,
|
|
max_steps: int | None = None,
|
|
cache_key: str | None = None,
|
|
) -> dict[str, Any] | list | str | None:
|
|
# Auto-create workflow block run and task if workflow_run_id is available
|
|
workflow_run_block_id, task_id, step_id = await _create_workflow_block_run_and_task(
|
|
block_type=BlockType.EXTRACTION,
|
|
prompt=prompt,
|
|
url=url,
|
|
)
|
|
# set the prompt in the RunContext
|
|
run_context = script_run_context_manager.ensure_run_context()
|
|
run_context.prompt = prompt
|
|
output: dict[str, Any] | list | str | None = None
|
|
|
|
if cache_key:
|
|
try:
|
|
output = cast(dict[str, Any] | list | str | None, await _run_cached_function(cache_key))
|
|
|
|
# Update block status to completed if workflow block was created
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.completed,
|
|
task_id=task_id,
|
|
step_id=step_id,
|
|
output=output,
|
|
label=cache_key,
|
|
)
|
|
return output
|
|
except Exception as e:
|
|
# Update block status to failed if workflow block was created
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.failed,
|
|
task_id=task_id,
|
|
task_status=TaskStatus.failed,
|
|
step_id=step_id,
|
|
step_status=StepStatus.failed,
|
|
failure_reason=str(e),
|
|
output=output,
|
|
label=cache_key,
|
|
)
|
|
raise
|
|
finally:
|
|
run_context.prompt = None
|
|
else:
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(
|
|
workflow_run_block_id,
|
|
BlockStatus.failed,
|
|
task_id=task_id,
|
|
task_status=TaskStatus.failed,
|
|
step_id=step_id,
|
|
step_status=StepStatus.failed,
|
|
failure_reason="Cache key is required",
|
|
)
|
|
run_context.prompt = None
|
|
raise Exception("Cache key is required to run task block in a script")
|
|
|
|
|
|
async def wait(seconds: int) -> None:
|
|
# Auto-create workflow block run if workflow_run_id is available (wait block doesn't create tasks)
|
|
workflow_run_block_id, _, _ = await _create_workflow_block_run_and_task(block_type=BlockType.WAIT)
|
|
|
|
try:
|
|
await asyncio.sleep(seconds)
|
|
|
|
# Update block status to completed if workflow block was created
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(workflow_run_block_id, BlockStatus.completed)
|
|
|
|
except Exception as e:
|
|
# Update block status to failed if workflow block was created
|
|
if workflow_run_block_id:
|
|
await _update_workflow_block(workflow_run_block_id, BlockStatus.failed, failure_reason=str(e))
|
|
raise
|
|
|
|
|
|
async def run_script(
|
|
path: str,
|
|
parameters: dict[str, Any] | None = None,
|
|
organization_id: str | None = None,
|
|
workflow_run_id: str | None = None,
|
|
) -> None:
|
|
# register the script run
|
|
context = skyvern_context.current()
|
|
if not context:
|
|
context = skyvern_context.ensure_context()
|
|
skyvern_context.set(skyvern_context.SkyvernContext())
|
|
if workflow_run_id and organization_id:
|
|
workflow_run = await app.DATABASE.get_workflow_run(
|
|
workflow_run_id=workflow_run_id, organization_id=organization_id
|
|
)
|
|
if not workflow_run:
|
|
raise WorkflowRunNotFound(workflow_run_id=workflow_run_id)
|
|
context.workflow_run_id = workflow_run_id
|
|
context.organization_id = organization_id
|
|
|
|
# run the script as subprocess; pass the parameters and run_id to the script
|
|
# Dynamically import the script at the given path
|
|
spec = importlib.util.spec_from_file_location("user_script", path)
|
|
if not spec or not spec.loader:
|
|
raise Exception(f"Failed to import script from {path}")
|
|
user_script = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(user_script)
|
|
|
|
# Call run_workflow from the imported module
|
|
if hasattr(user_script, "run_workflow"):
|
|
# If parameters is None, pass an empty dict
|
|
if parameters:
|
|
await user_script.run_workflow(parameters=parameters)
|
|
else:
|
|
await user_script.run_workflow()
|
|
else:
|
|
raise Exception(f"No 'run_workflow' function found in {path}")
|
|
|
|
|
|
async def generate_text(
|
|
text: str | None = None,
|
|
intention: str | None = None,
|
|
data: dict[str, Any] | None = None,
|
|
) -> str:
|
|
if text:
|
|
return text
|
|
new_text = text or ""
|
|
if intention and data:
|
|
try:
|
|
run_context = script_run_context_manager.ensure_run_context()
|
|
prompt = run_context.prompt
|
|
# Build the element tree of the current page for the prompt
|
|
payload_str = json.dumps(data) if isinstance(data, (dict, list)) else (data or "")
|
|
script_generation_input_text_prompt = prompt_engine.load_prompt(
|
|
template="script-generation-input-text-generatiion",
|
|
intention=intention,
|
|
data=payload_str,
|
|
goal=prompt,
|
|
)
|
|
json_response = await app.SINGLE_INPUT_AGENT_LLM_API_HANDLER(
|
|
prompt=script_generation_input_text_prompt,
|
|
prompt_name="script-generation-input-text-generatiion",
|
|
)
|
|
new_text = json_response.get("answer", new_text)
|
|
except Exception:
|
|
LOG.exception("Failed to generate text for script")
|
|
raise
|
|
return new_text
|
|
|
|
|
|
def render_template(template: str, data: dict[str, Any] | None = None) -> str:
|
|
"""
|
|
Refer to Block.format_block_parameter_template_from_workflow_run_context
|
|
|
|
TODO: complete this function so that block code shares the same template rendering logic
|
|
"""
|
|
template_data = data or {}
|
|
jinja_template = jinja_sandbox_env.from_string(template)
|
|
context = skyvern_context.current()
|
|
if context and context.workflow_run_id:
|
|
workflow_run_id = context.workflow_run_id
|
|
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
|
|
template_data.update(workflow_run_context.values)
|
|
|
|
return jinja_template.render(template_data)
|