script generation improvement (#3216)
Some checks are pending
Run tests and pre-commit / Run tests and pre-commit hooks (push) Waiting to run
Run tests and pre-commit / Frontend Lint and Build (push) Waiting to run
Publish Fern Docs / run (push) Waiting to run

This commit is contained in:
Shuchang Zheng 2025-08-16 17:48:10 -07:00 committed by GitHub
parent 82f0b98fca
commit b9b7591079
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 841 additions and 279 deletions

View file

@ -26,26 +26,32 @@ from skyvern.library import Skyvern # noqa: E402
from skyvern.core.script_generations.skyvern_page import RunContext, SkyvernPage # noqa: E402 from skyvern.core.script_generations.skyvern_page import RunContext, SkyvernPage # noqa: E402
from skyvern.core.script_generations.run_initializer import setup # noqa: E402 from skyvern.core.script_generations.run_initializer import setup # noqa: E402
from skyvern.core.script_generations.workflow_wrappers import ( # noqa: E402 from skyvern.core.script_generations.workflow_wrappers import ( # noqa: E402
email_block, # noqa: E402 cached, # noqa: E402
file_download_block, # noqa: E402
navigation_block, # noqa: E402
task_block, # noqa: E402
url_block, # noqa: E402
wait_block, # noqa: E402
workflow, # noqa: E402 workflow, # noqa: E402
) # noqa: E402 ) # noqa: E402
from skyvern.services.script_service import ( # noqa: E402
action, # noqa: E402
download, # noqa: E402
extract, # noqa: E402
login, # noqa: E402
run_script, # noqa: E402
run_task, # noqa: E402
wait, # noqa: E402
) # noqa: E402
__all__ = [ __all__ = [
"Skyvern", "Skyvern",
"SkyvernPage", "SkyvernPage",
"RunContext", "RunContext",
"email_block", "action",
"file_download_block", "cached",
"navigation_block", "download",
"extract",
"login",
"run_script",
"run_task",
"setup", "setup",
"task_block", "wait",
"url_block",
"wait_block",
"workflow", "workflow",
] ]

View file

@ -119,6 +119,7 @@ class Settings(BaseSettings):
SECONDARY_LLM_KEY: str | None = None SECONDARY_LLM_KEY: str | None = None
SELECT_AGENT_LLM_KEY: str | None = None SELECT_AGENT_LLM_KEY: str | None = None
SINGLE_CLICK_AGENT_LLM_KEY: str | None = None SINGLE_CLICK_AGENT_LLM_KEY: str | None = None
SINGLE_INPUT_AGENT_LLM_KEY: str | None = None
PROMPT_BLOCK_LLM_KEY: str | None = None PROMPT_BLOCK_LLM_KEY: str | None = None
# COMMON # COMMON
LLM_CONFIG_TIMEOUT: int = 300 LLM_CONFIG_TIMEOUT: int = 300

View file

@ -40,3 +40,6 @@ SCROLL_AMOUNT_MULTIPLIER = 100
# Text input constants # Text input constants
TEXT_INPUT_DELAY = 10 # 10ms between each character input TEXT_INPUT_DELAY = 10 # 10ms between each character input
TEXT_PRESS_MAX_LENGTH = 20 TEXT_PRESS_MAX_LENGTH = 20
# Script generation constants
DEFAULT_SCRIPT_RUN_ID = "default"

View file

@ -0,0 +1,8 @@
SCRIPT_TASK_BLOCKS = {
"task",
"file_download",
"navigation",
"action",
"extraction",
"login",
}

View file

@ -18,13 +18,13 @@ from __future__ import annotations
import hashlib import hashlib
import keyword import keyword
from enum import StrEnum
from typing import Any from typing import Any
import libcst as cst import libcst as cst
import structlog import structlog
from libcst import Attribute, Call, Dict, DictElement, FunctionDef, Name, Param from libcst import Attribute, Call, Dict, DictElement, FunctionDef, Name, Param
from skyvern.core.script_generations.constants import SCRIPT_TASK_BLOCKS
from skyvern.forge import app from skyvern.forge import app
from skyvern.webeye.actions.action_types import ActionType from skyvern.webeye.actions.action_types import ActionType
@ -96,7 +96,7 @@ def _value(value: Any) -> cst.BaseExpression:
# --------------------------------------------------------------------- # # --------------------------------------------------------------------- #
# 2. builders # # 2. utility builders #
# --------------------------------------------------------------------- # # --------------------------------------------------------------------- #
@ -134,54 +134,16 @@ def _workflow_decorator(wf_req: dict[str, Any]) -> cst.Decorator:
) )
def _make_decorator(block: dict[str, Any]) -> cst.Decorator: def _make_decorator(block_label: str, block: dict[str, Any]) -> cst.Decorator:
bt = block["block_type"] kwargs = [
deco_name = { cst.Arg(
"task": "task_block", keyword=cst.Name("cache_key"),
"file_download": "file_download_block", value=_value(block_label),
"send_email": "email_block", )
"wait": "wait_block", ]
"navigation": "navigation_block",
"for_loop": "for_loop_block",
"action": "action_block",
"extraction": "extraction_block",
"login": "login_block",
"text_prompt": "text_prompt_block",
"goto_url": "url_block",
}[bt]
kwargs = []
field_map = {
"title": "title",
"navigation_goal": "prompt",
"url": "url",
"engine": "engine",
"model": "model",
"totp_identifier": "totp_identifier",
"webhook_callback_url": "webhook_callback_url",
"max_steps_per_run": "max_steps",
"wait_sec": "seconds",
}
for src_key, kw in field_map.items():
v = block.get(src_key)
if v not in (None, "", [], {}):
if isinstance(v, StrEnum):
v = v.value
try:
kwargs.append(cst.Arg(value=_value(v), keyword=Name(kw)))
except Exception:
raise
# booleans
if block.get("complete_on_download"):
kwargs.append(cst.Arg(value=Name("True"), keyword=Name("complete_on_download")))
if block.get("download_suffix"):
kwargs.append(cst.Arg(value=_value(block["download_suffix"]), keyword=Name("download_suffix")))
return cst.Decorator( return cst.Decorator(
decorator=Call( decorator=Call(
func=Attribute(value=Name("skyvern"), attr=Name(deco_name)), func=Attribute(value=cst.Name("skyvern"), attr=cst.Name("cached")),
args=kwargs, args=kwargs,
) )
) )
@ -196,31 +158,78 @@ def _action_to_stmt(act: dict[str, Any]) -> cst.BaseStatement:
method = ACTION_MAP[act["action_type"]] method = ACTION_MAP[act["action_type"]]
args: list[cst.Arg] = [] args: list[cst.Arg] = []
if method in ACTIONS_WITH_XPATH:
args.append(
cst.Arg(
keyword=cst.Name("xpath"),
value=_value(act["xpath"]),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
)
if method == "input_text": if method == "input_text":
args.append(cst.Arg(keyword=cst.Name("text"), value=_value(act["text"]))) args.append(
cst.Arg(
keyword=cst.Name("text"),
value=_value(act["text"]),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
)
elif method == "select_option": elif method == "select_option":
args.append(cst.Arg(keyword=cst.Name("option"), value=_value(act["option"]["value"]))) args.append(
cst.Arg(
keyword=cst.Name("option"),
value=_value(act["option"]["value"]),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
)
elif method == "wait": elif method == "wait":
args.append(cst.Arg(keyword=cst.Name("seconds"), value=_value(act["seconds"]))) args.append(
cst.Arg(
keyword=cst.Name("seconds"),
value=_value(act["seconds"]),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
)
args.extend( args.extend(
[ [
cst.Arg( cst.Arg(
keyword=cst.Name("intention"), keyword=cst.Name("intention"),
value=_value(act.get("intention") or act.get("reasoning") or ""), value=_value(act.get("intention") or act.get("reasoning") or ""),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
), ),
cst.Arg( cst.Arg(
keyword=cst.Name("data"), keyword=cst.Name("data"),
value=cst.Attribute(value=cst.Name("context"), attr=cst.Name("parameters")), value=cst.Attribute(value=cst.Name("context"), attr=cst.Name("parameters")),
whitespace_after_arg=cst.ParenthesizedWhitespace(indent=True),
comma=cst.Comma(),
), ),
] ]
) )
if method in ACTIONS_WITH_XPATH:
args.append(cst.Arg(keyword=cst.Name("xpath"), value=_value(act["xpath"])))
call = cst.Call( call = cst.Call(
func=cst.Attribute(value=cst.Name("page"), attr=cst.Name(method)), func=cst.Attribute(value=cst.Name("page"), attr=cst.Name(method)),
args=args, args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
) )
# await page.method(...) # await page.method(...)
@ -231,7 +240,7 @@ def _action_to_stmt(act: dict[str, Any]) -> cst.BaseStatement:
def _build_block_fn(block: dict[str, Any], actions: list[dict[str, Any]]) -> FunctionDef: def _build_block_fn(block: dict[str, Any], actions: list[dict[str, Any]]) -> FunctionDef:
name = _safe_name(block.get("title") or block.get("label") or f"block_{block.get('workflow_run_block_id')}") name = block.get("label") or _safe_name(block.get("title") or f"block_{block.get('workflow_run_block_id')}")
body_stmts: list[cst.BaseStatement] = [] body_stmts: list[cst.BaseStatement] = []
if block.get("url"): if block.get("url"):
@ -253,7 +262,7 @@ def _build_block_fn(block: dict[str, Any], actions: list[dict[str, Any]]) -> Fun
Param(name=Name("context"), annotation=cst.Annotation(cst.Name("RunContext"))), Param(name=Name("context"), annotation=cst.Annotation(cst.Name("RunContext"))),
] ]
), ),
decorators=[_make_decorator(block)], decorators=[_make_decorator(name, block)],
body=cst.IndentedBlock(body_stmts), body=cst.IndentedBlock(body_stmts),
returns=None, returns=None,
asynchronous=cst.Asynchronous(), asynchronous=cst.Asynchronous(),
@ -308,38 +317,556 @@ def _build_cached_params(values: dict[str, Any]) -> cst.SimpleStatementLine:
return cst.SimpleStatementLine([assign]) return cst.SimpleStatementLine([assign])
def _build_run_fn(task_titles: list[str], wf_req: dict[str, Any]) -> FunctionDef: # --------------------------------------------------------------------- #
# 3. statement builders #
# --------------------------------------------------------------------- #
def _build_run_task_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.run_task statement."""
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_value(block.get("navigation_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("max_steps"),
value=_value(block.get("max_steps_per_run", 30)),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("cache_key"),
value=_value(block_title),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("run_task")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_download_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.download statement."""
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_value(block.get("navigation_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("complete_on_download"),
value=_value(block.get("complete_on_download", False)),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("download_suffix"),
value=_value(block.get("download_suffix", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("cache_key"),
value=_value(block_title),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("download")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_action_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.action statement."""
args = [
cst.Arg(
keyword=cst.Name("title"),
value=_value(block_title),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("prompt"),
value=_value(block.get("navigation_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("max_steps"),
value=_value(block.get("max_steps_per_run", 30)),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("action")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_login_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.login statement."""
args = [
cst.Arg(
keyword=cst.Name("title"),
value=_value(block_title),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("prompt"),
value=_value(block.get("navigation_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("totp_identifier"),
value=_value(block.get("totp_identifier", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("webhook_callback_url"),
value=_value(block.get("webhook_callback_url", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("login")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_extract_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.extract statement."""
args = [
cst.Arg(
keyword=cst.Name("title"),
value=_value(block_title),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("prompt"),
value=_value(block.get("navigation_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("extract")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_navigate_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.navigate statement."""
args = [
cst.Arg(
keyword=cst.Name("title"),
value=_value(block_title),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("prompt"),
value=_value(block.get("navigation_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("url"),
value=_value(block.get("url", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("max_steps"),
value=_value(block.get("max_steps_per_run", 30)),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("run_task")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_send_email_statement(block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.send_email statement."""
args = [
cst.Arg(
keyword=cst.Name("sender"),
value=_value(block.get("sender", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("subject"),
value=_value(block.get("subject", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("body"),
value=_value(block.get("body", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("recipients"),
value=_value(block.get("recipients", [])),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("attach_downloaded_files"),
value=_value(block.get("attach_downloaded_files", False)),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("send_email")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_validate_statement(block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.validate statement."""
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=_value(block.get("navigation_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("validate")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_wait_statement(block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.wait statement."""
args = [
cst.Arg(
keyword=cst.Name("seconds"),
value=_value(block.get("wait_sec", 1)),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("wait")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_for_loop_statement(block_title: str, block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.for_loop statement."""
args = [
cst.Arg(
keyword=cst.Name("title"),
value=_value(block_title),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("prompt"),
value=_value(block.get("navigation_goal", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
),
cst.Arg(
keyword=cst.Name("max_steps"),
value=_value(block.get("max_steps_per_run", 30)),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("for_loop")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
def _build_goto_statement(block: dict[str, Any]) -> cst.SimpleStatementLine:
"""Build a skyvern.goto statement."""
args = [
cst.Arg(
keyword=cst.Name("url"),
value=_value(block.get("url", "")),
whitespace_after_arg=cst.ParenthesizedWhitespace(
indent=True,
),
comma=cst.Comma(),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("goto")),
args=args,
whitespace_before_args=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
)
return cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
# --------------------------------------------------------------------- #
# 4. function builders #
# --------------------------------------------------------------------- #
def _build_run_fn(blocks: list[dict[str, Any]], wf_req: dict[str, Any]) -> FunctionDef:
body = [ body = [
cst.parse_statement("page, context = await skyvern.setup(parameters.model_dump())"), cst.parse_statement("page, context = await skyvern.setup(parameters.model_dump())"),
*[cst.parse_statement(f"await {_safe_name(t)}(page, context)") for t in task_titles],
] ]
for block in blocks:
block_type = block.get("block_type")
block_title = block.get("label") or block.get("title") or f"block_{block.get('workflow_run_block_id')}"
if block_type in SCRIPT_TASK_BLOCKS:
# For task blocks, call the custom function with cache_key
if block_type == "task":
stmt = _build_run_task_statement(block_title, block)
elif block_type == "file_download":
stmt = _build_download_statement(block_title, block)
elif block_type == "action":
stmt = _build_action_statement(block_title, block)
elif block_type == "login":
stmt = _build_login_statement(block_title, block)
elif block_type == "extraction":
stmt = _build_extract_statement(block_title, block)
elif block_type == "navigation":
stmt = _build_navigate_statement(block_title, block)
elif block_type == "send_email":
stmt = _build_send_email_statement(block)
elif block_type == "text_prompt":
stmt = _build_validate_statement(block)
elif block_type == "wait":
stmt = _build_wait_statement(block)
elif block_type == "for_loop":
stmt = _build_for_loop_statement(block_title, block)
elif block_type == "goto_url":
stmt = _build_goto_statement(block)
else:
# Default case for unknown block types
stmt = cst.SimpleStatementLine([cst.Expr(cst.SimpleString(f"# Unknown block type: {block_type}"))])
body.append(stmt)
# Add a final validation step if not already present
has_validation = any(block.get("block_type") == "text_prompt" for block in blocks)
has_task_blocks = any(block.get("block_type") in SCRIPT_TASK_BLOCKS for block in blocks)
if not has_validation and not has_task_blocks:
# Build the final validation statement using LibCST components
args = [
cst.Arg(
keyword=cst.Name("prompt"),
value=cst.SimpleString(
'"Your goal is to validate that the workflow completed successfully. COMPLETE if successful, TERMINATE if there are issues."'
),
),
]
call = cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("validate")),
args=args,
)
validation_stmt = cst.SimpleStatementLine([cst.Expr(cst.Await(call))])
body.append(validation_stmt)
params = cst.Parameters( params = cst.Parameters(
params=[ params=[
Param( Param(
name=cst.Name("parameters"), name=cst.Name("parameters"),
annotation=cst.Annotation(cst.Name("WorkflowParameters")), annotation=cst.Annotation(cst.Name("WorkflowParameters")),
default=cst.Name("cached_parameters"), default=cst.Name("cached_parameters"),
whitespace_after_param=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
), ),
Param( Param(
name=cst.Name("title"), name=cst.Name("title"),
annotation=cst.Annotation(cst.Name("str")), annotation=cst.Annotation(cst.Name("str")),
default=_value(wf_req.get("title", "")), default=_value(wf_req.get("title", "")),
whitespace_after_param=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
), ),
Param( Param(
name=cst.Name("webhook_url"), name=cst.Name("webhook_url"),
annotation=cst.Annotation(cst.parse_expression("str | None")), annotation=cst.Annotation(cst.parse_expression("str | None")),
default=_value(wf_req.get("webhook_url")), default=_value(wf_req.get("webhook_url")),
whitespace_after_param=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
), ),
Param( Param(
name=cst.Name("totp_url"), name=cst.Name("totp_url"),
annotation=cst.Annotation(cst.parse_expression("str | None")), annotation=cst.Annotation(cst.parse_expression("str | None")),
default=_value(wf_req.get("totp_url")), default=_value(wf_req.get("totp_url")),
whitespace_after_param=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
), ),
Param( Param(
name=cst.Name("totp_identifier"), name=cst.Name("totp_identifier"),
annotation=cst.Annotation(cst.parse_expression("str | None")), annotation=cst.Annotation(cst.parse_expression("str | None")),
default=_value(wf_req.get("totp_identifier")), default=_value(wf_req.get("totp_identifier")),
whitespace_after_param=cst.ParenthesizedWhitespace(),
comma=cst.Comma(),
), ),
] ]
) )
@ -350,11 +877,15 @@ def _build_run_fn(task_titles: list[str], wf_req: dict[str, Any]) -> FunctionDef
decorators=[_workflow_decorator(wf_req)], decorators=[_workflow_decorator(wf_req)],
params=params, params=params,
body=cst.IndentedBlock(body), body=cst.IndentedBlock(body),
whitespace_before_params=cst.ParenthesizedWhitespace(
indent=True,
last_line=cst.SimpleWhitespace(INDENT),
),
) )
# --------------------------------------------------------------------- # # --------------------------------------------------------------------- #
# 3. entrypoint # # 5. entrypoint #
# --------------------------------------------------------------------- # # --------------------------------------------------------------------- #
@ -363,7 +894,7 @@ async def generate_workflow_script(
file_name: str, file_name: str,
workflow_run_request: dict[str, Any], workflow_run_request: dict[str, Any],
workflow: dict[str, Any], workflow: dict[str, Any],
tasks: list[dict[str, Any]], blocks: list[dict[str, Any]],
actions_by_task: dict[str, list[dict[str, Any]]], actions_by_task: dict[str, list[dict[str, Any]]],
organization_id: str | None = None, organization_id: str | None = None,
run_id: str | None = None, run_id: str | None = None,
@ -405,10 +936,11 @@ async def generate_workflow_script(
# --- blocks --------------------------------------------------------- # --- blocks ---------------------------------------------------------
block_fns = [] block_fns = []
length_of_tasks = len(tasks) task_blocks = [block for block in blocks if block["block_type"] in SCRIPT_TASK_BLOCKS]
length_of_tasks = len(task_blocks)
# Create script first if organization_id is provided # Create script first if organization_id is provided
for idx, task in enumerate(tasks): for idx, task in enumerate(task_blocks):
block_fn_def = _build_block_fn(task, actions_by_task.get(task.get("task_id", ""), [])) block_fn_def = _build_block_fn(task, actions_by_task.get(task.get("task_id", ""), []))
# Create script block if we have script context # Create script block if we have script context
@ -433,12 +965,8 @@ async def generate_workflow_script(
block_fns.append(cst.EmptyLine()) block_fns.append(cst.EmptyLine())
block_fns.append(cst.EmptyLine()) block_fns.append(cst.EmptyLine())
task_titles: list[str] = [
t.get("title") or t.get("label") or t.get("task_id") or f"unknown_title_{idx}" for idx, t in enumerate(tasks)
]
# --- runner --------------------------------------------------------- # --- runner ---------------------------------------------------------
run_fn = _build_run_fn(task_titles, workflow_run_request) run_fn = _build_run_fn(blocks, workflow_run_request)
module = cst.Module( module = cst.Module(
body=[ body=[

View file

@ -1,9 +1,11 @@
from typing import Any from typing import Any
from skyvern.core.script_generations.script_run_context_manager import script_run_context_manager
from skyvern.core.script_generations.skyvern_page import RunContext, SkyvernPage from skyvern.core.script_generations.skyvern_page import RunContext, SkyvernPage
async def setup(parameters: dict[str, Any], generate_response: bool = False) -> tuple[SkyvernPage, RunContext]: async def setup(parameters: dict[str, Any], run_id: str | None = None) -> tuple[SkyvernPage, RunContext]:
skyvern_page = await SkyvernPage.create() skyvern_page = await SkyvernPage.create()
run_context = RunContext(parameters=parameters, page=skyvern_page) run_context = RunContext(parameters=parameters, page=skyvern_page)
script_run_context_manager.set_run_context(run_context)
return skyvern_page, run_context return skyvern_page, run_context

View file

@ -1,3 +1,5 @@
from typing import Callable
from skyvern.core.script_generations.skyvern_page import RunContext from skyvern.core.script_generations.skyvern_page import RunContext
@ -7,16 +9,26 @@ class ScriptRunContextManager:
""" """
def __init__(self) -> None: def __init__(self) -> None:
self.run_contexts: dict[str, RunContext] = {} # self.run_contexts: dict[str, RunContext] = {}
""" self.run_context: RunContext | None = None
run_id -> RunContext self.cached_fns: dict[str, Callable] = {}
"""
def get_run_context(self, run_id: str) -> RunContext | None: def get_run_context(self) -> RunContext | None:
return self.run_contexts.get(run_id) return self.run_context
def set_run_context(self, run_id: str, run_context: RunContext) -> None: def set_run_context(self, run_context: RunContext) -> None:
self.run_contexts[run_id] = run_context self.run_context = run_context
def delete_run_context(self, run_id: str) -> None: def ensure_run_context(self) -> RunContext:
self.run_contexts.pop(run_id, None) if not self.run_context:
raise Exception("Run context not found")
return self.run_context
def set_cached_fn(self, cache_key: str, fn: Callable) -> None:
self.cached_fns[cache_key] = fn
def get_cached_fn(self, cache_key: str) -> Callable | None:
return self.cached_fns.get(cache_key)
script_run_context_manager = ScriptRunContextManager()

View file

@ -63,11 +63,6 @@ class SkyvernPage:
@classmethod @classmethod
async def create(cls) -> SkyvernPage: async def create(cls) -> SkyvernPage:
# set up skyvern context if not already set
current_skyvern_context = skyvern_context.current()
if not current_skyvern_context:
skyvern_context.set(skyvern_context.SkyvernContext())
# initialize browser state # initialize browser state
browser_state = await app.BROWSER_MANAGER.get_or_create_for_script() browser_state = await app.BROWSER_MANAGER.get_or_create_for_script()
scraped_page = await scrape_website( scraped_page = await scrape_website(
@ -78,6 +73,7 @@ class SkyvernPage:
max_screenshot_number=settings.MAX_NUM_SCREENSHOTS, max_screenshot_number=settings.MAX_NUM_SCREENSHOTS,
draw_boxes=True, draw_boxes=True,
scroll=True, scroll=True,
support_empty_page=True,
) )
page = await scraped_page._browser_state.must_get_working_page() page = await scraped_page._browser_state.must_get_working_page()
return cls(scraped_page=scraped_page, page=page) return cls(scraped_page=scraped_page, page=page)
@ -105,7 +101,9 @@ class SkyvernPage:
meta = ActionMetadata(intention, data) meta = ActionMetadata(intention, data)
call = ActionCall(action, args, kwargs, meta) call = ActionCall(action, args, kwargs, meta)
try: try:
call.result = await fn(skyvern_page, *args, **kwargs) # real driver call call.result = await fn(
skyvern_page, *args, intention=intention, data=data, **kwargs
) # real driver call
return call.result return call.result
except Exception as e: except Exception as e:
call.error = e call.error = e
@ -168,7 +166,7 @@ class SkyvernPage:
await locator.click(timeout=5000) await locator.click(timeout=5000)
@action_wrap(ActionType.INPUT_TEXT) @action_wrap(ActionType.INPUT_TEXT)
async def input_text( async def fill(
self, self,
xpath: str, xpath: str,
text: str, text: str,
@ -176,11 +174,60 @@ class SkyvernPage:
data: str | dict[str, Any] | None = None, data: str | dict[str, Any] | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS, timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> None: ) -> None:
# if self.generate_response: await self._input_text(xpath, text, intention, data, timeout)
# # TODO: regenerate text
# pass @action_wrap(ActionType.INPUT_TEXT)
async def type(
self,
xpath: str,
text: str,
intention: str | None = None,
data: str | dict[str, Any] | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> None:
await self._input_text(xpath, text, intention, data, timeout)
async def _input_text(
self,
xpath: str,
text: str,
intention: str | None = None,
data: str | dict[str, Any] | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> None:
"""Input text into an element identified by ``xpath``.
When ``intention`` and ``data`` are provided a new input text action is
generated via the `script-generation-input-text-generatiion` prompt. The model returns a
fresh text based on the current DOM and the updated data for this run.
The browser then inputs the text using this newly generated text.
If the prompt generation or parsing fails for any reason we fall back to
inputting the originally supplied ``text``.
"""
new_text = text
if intention and data:
try:
# Build the element tree of the current page for the prompt
skyvern_context.ensure_context()
payload_str = json.dumps(data) if isinstance(data, (dict, list)) else (data or "")
script_generation_input_text_prompt = prompt_engine.load_prompt(
template="script-generation-input-text-generatiion",
intention=intention,
data=payload_str,
)
json_response = await app.SINGLE_INPUT_AGENT_LLM_API_HANDLER(
prompt=script_generation_input_text_prompt,
prompt_name="script-generation-input-text-generatiion",
)
new_text = json_response.get("answer", text) or text
except Exception:
# If anything goes wrong, fall back to the original text
new_text = text
locator = self.page.locator(f"xpath={xpath}") locator = self.page.locator(f"xpath={xpath}")
await handler_utils.input_sequentially(locator, text, timeout=timeout) await handler_utils.input_sequentially(locator, new_text, timeout=timeout)
@action_wrap(ActionType.UPLOAD_FILE) @action_wrap(ActionType.UPLOAD_FILE)
async def upload_file( async def upload_file(
@ -306,10 +353,6 @@ class SkyvernPage:
class RunContext: class RunContext:
"""
Lives for one workflow run.
"""
def __init__(self, parameters: dict[str, Any], page: SkyvernPage) -> None: def __init__(self, parameters: dict[str, Any], page: SkyvernPage) -> None:
self.parameters = parameters self.parameters = parameters
self.page = page self.page = page

View file

@ -3,6 +3,7 @@ from typing import Any
import structlog import structlog
from skyvern.core.script_generations.constants import SCRIPT_TASK_BLOCKS
from skyvern.forge import app from skyvern.forge import app
from skyvern.forge.sdk.workflow.models.block import BlockType from skyvern.forge.sdk.workflow.models.block import BlockType
from skyvern.services import workflow_service from skyvern.services import workflow_service
@ -53,11 +54,7 @@ async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organiz
block_dump = block.model_dump() block_dump = block.model_dump()
if block.block_type == BlockType.TaskV2: if block.block_type == BlockType.TaskV2:
raise ValueError("TaskV2 blocks are not supported yet") raise ValueError("TaskV2 blocks are not supported yet")
if ( if block.block_type in SCRIPT_TASK_BLOCKS and block.task_id:
block.block_type
in [BlockType.TASK, BlockType.ACTION, BlockType.EXTRACTION, BlockType.LOGIN, BlockType.NAVIGATION]
and block.task_id
):
task = await app.DATABASE.get_task(task_id=block.task_id, organization_id=organization_id) task = await app.DATABASE.get_task(task_id=block.task_id, organization_id=organization_id)
if not task: if not task:
LOG.warning(f"Task {block.task_id} not found") LOG.warning(f"Task {block.task_id} not found")

View file

@ -1,6 +1,7 @@
from typing import Any, Callable from typing import Any, Callable
from skyvern import RunContext, SkyvernPage from skyvern import RunContext, SkyvernPage
from skyvern.core.script_generations.script_run_context_manager import script_run_context_manager
# Build a dummy workflow decorator # Build a dummy workflow decorator
@ -12,182 +13,18 @@ def workflow(
max_steps: int | None = None, max_steps: int | None = None,
) -> Callable: ) -> Callable:
def wrapper(func: Callable) -> Callable: def wrapper(func: Callable) -> Callable:
# TODO: create a workflow run object
return func return func
return wrapper return wrapper
def task_block( def cached(cache_key: str) -> Callable:
prompt: str | None = None,
title: str | None = None,
url: str | None = None,
engine: str | None = None,
model: dict[str, Any] | None = None,
totp_url: str | None = None,
totp_identifier: str | None = None,
max_steps: int | None = None,
navigation_payload: str | None = None,
webhook_url: str | None = None,
) -> Callable:
def decorator(func: Callable) -> Callable: def decorator(func: Callable) -> Callable:
script_run_context_manager.set_cached_fn(cache_key, func)
async def wrapper(page: SkyvernPage, context: RunContext, *args: Any, **kwargs: Any) -> Any: async def wrapper(page: SkyvernPage, context: RunContext, *args: Any, **kwargs: Any) -> Any:
# Store the prompt in the context # Store the function in context.cached_fns
context.prompt = prompt
return await func(page, context, *args, **kwargs)
return wrapper
return decorator
def login_block(
prompt: str | None = None,
title: str | None = None,
url: str | None = None,
engine: str | None = None,
model: dict[str, Any] | None = None,
totp_url: str | None = None,
totp_identifier: str | None = None,
max_steps: int | None = None,
navigation_payload: str | None = None,
webhook_url: str | None = None,
) -> Callable:
def decorator(func: Callable) -> Callable:
async def wrapper(page: SkyvernPage, context: RunContext, *args: Any, **kwargs: Any) -> Any:
# Store the prompt in the context
context.prompt = prompt
return await func(page, context, *args, **kwargs)
return wrapper
return decorator
def navigation_block(
prompt: str | None = None,
title: str | None = None,
url: str | None = None,
engine: str | None = None,
model: dict[str, Any] | None = None,
totp_url: str | None = None,
totp_identifier: str | None = None,
max_steps: int | None = None,
) -> Callable:
def decorator(func: Callable) -> Callable:
async def wrapper(page: SkyvernPage, context: RunContext, *args: Any, **kwargs: Any) -> Any:
# Store the prompt in the context
context.prompt = prompt
return await func(page, context, *args, **kwargs)
return wrapper
return decorator
def action_block(
prompt: str | None = None,
title: str | None = None,
url: str | None = None,
engine: str | None = None,
model: dict[str, Any] | None = None,
totp_url: str | None = None,
totp_identifier: str | None = None,
max_steps: int | None = None,
) -> Callable:
def decorator(func: Callable) -> Callable:
async def wrapper(page: SkyvernPage, context: RunContext, *args: Any, **kwargs: Any) -> Any:
# Store the prompt in the context
context.prompt = prompt
return await func(page, context, *args, **kwargs)
return wrapper
return decorator
def extraction_block(
title: str | None = None,
data_extraction_goal: str | None = None,
data_extraction_schema: dict[str, Any] | list | str | None = None,
model: dict[str, Any] | None = None,
) -> Callable:
def decorator(func: Callable) -> Callable:
async def wrapper(page: SkyvernPage, context: RunContext, *args: Any, **kwargs: Any) -> Any:
# Store the data_extraction_goal as prompt in the context
context.prompt = data_extraction_goal
return await func(page, context, *args, **kwargs)
return wrapper
return decorator
def url_block(
title: str | None = None,
url: str | None = None,
) -> Callable:
def decorator(func: Callable) -> Callable:
async def wrapper(page: SkyvernPage, context: RunContext, *args: Any, **kwargs: Any) -> Any:
# No prompt to store for url_block
context.prompt = None
return await func(page, context, *args, **kwargs)
return wrapper
return decorator
def file_download_block(
prompt: str | None = None,
title: str | None = None,
url: str | None = None,
max_steps: int | None = None,
engine: str | None = None,
) -> Callable:
def decorator(func: Callable) -> Callable:
async def wrapper(page: SkyvernPage, context: RunContext, *args: Any, **kwargs: Any) -> Any:
# Store the prompt in the context
context.prompt = prompt
return await func(page, context, *args, **kwargs)
return wrapper
return decorator
def email_block(prompt: str | None = None, title: str | None = None, url: str | None = None) -> Callable:
def decorator(func: Callable) -> Callable:
async def wrapper(page: SkyvernPage, context: RunContext, *args: Any, **kwargs: Any) -> Any:
# Store the prompt in the context
context.prompt = prompt
return await func(page, context, *args, **kwargs)
return wrapper
return decorator
def wait_block(seconds: int, title: str | None = None) -> Callable:
def decorator(func: Callable) -> Callable:
async def wrapper(page: SkyvernPage, context: RunContext, *args: Any, **kwargs: Any) -> Any:
# No prompt to store for wait_block
context.prompt = None
return await func(page, context, *args, **kwargs)
return wrapper
return decorator
def text_prompt_block(
prompt: str | None = None,
title: str | None = None,
json_schema: dict[str, Any] | list | str | None = None,
) -> Callable:
def decorator(func: Callable) -> Callable:
async def wrapper(page: SkyvernPage, context: RunContext, *args: Any, **kwargs: Any) -> Any:
# Store the prompt in the context
context.prompt = prompt
return await func(page, context, *args, **kwargs) return await func(page, context, *args, **kwargs)
return wrapper return wrapper

View file

@ -69,6 +69,11 @@ SINGLE_CLICK_AGENT_LLM_API_HANDLER = (
if SETTINGS_MANAGER.SINGLE_CLICK_AGENT_LLM_KEY if SETTINGS_MANAGER.SINGLE_CLICK_AGENT_LLM_KEY
else SECONDARY_LLM_API_HANDLER else SECONDARY_LLM_API_HANDLER
) )
SINGLE_INPUT_AGENT_LLM_API_HANDLER = (
LLMAPIHandlerFactory.get_llm_api_handler(SETTINGS_MANAGER.SINGLE_INPUT_AGENT_LLM_KEY)
if SETTINGS_MANAGER.SINGLE_INPUT_AGENT_LLM_KEY
else SECONDARY_LLM_API_HANDLER
)
WORKFLOW_CONTEXT_MANAGER = WorkflowContextManager() WORKFLOW_CONTEXT_MANAGER = WorkflowContextManager()
WORKFLOW_SERVICE = WorkflowService() WORKFLOW_SERVICE = WorkflowService()
AGENT_FUNCTION = AgentFunction() AGENT_FUNCTION = AgentFunction()

View file

@ -0,0 +1,17 @@
# Goal
You are an expert in filling out text input forms on a webpage. Help the user fill out a specific text input field.
# Provided information:{% if goal %}
- User's overall goal: {{ goal }}{% endif %}
- Context and details: {{ data }}
- The question or the intention for this field: {{ intention }}
# Output
- Your answer should be direct and to the point. No need to explain the answer.
- YOUR RESPONSE HAS TO BE IN JSON FORMAT. DO NOT RETURN ANYTHING ELSE.
- DO NOT INCLUDE ANY UNRELATED INFORMATION OR UNNECESSARY DETAILS IN YOUR ANSWER.
EXAMPLE RESPONSE FORMAT:
{
"answer": "string",
}

View file

@ -2310,7 +2310,7 @@ class WorkflowService:
file_name=codegen_input.file_name, file_name=codegen_input.file_name,
workflow_run_request=codegen_input.workflow_run, workflow_run_request=codegen_input.workflow_run,
workflow=codegen_input.workflow, workflow=codegen_input.workflow,
tasks=codegen_input.workflow_blocks, blocks=codegen_input.workflow_blocks,
actions_by_task=codegen_input.actions_by_task, actions_by_task=codegen_input.actions_by_task,
organization_id=workflow.organization_id, organization_id=workflow.organization_id,
script_id=created_script.script_id, script_id=created_script.script_id,

View file

@ -1,14 +1,19 @@
import asyncio
import base64 import base64
import hashlib import hashlib
import importlib.util
import os import os
import subprocess import subprocess
from datetime import datetime from datetime import datetime
from typing import Any
import structlog import structlog
from fastapi import BackgroundTasks, HTTPException from fastapi import BackgroundTasks, HTTPException
from skyvern.core.script_generations.script_run_context_manager import script_run_context_manager
from skyvern.exceptions import ScriptNotFound from skyvern.exceptions import ScriptNotFound
from skyvern.forge import app from skyvern.forge import app
from skyvern.forge.sdk.core import skyvern_context
from skyvern.schemas.scripts import CreateScriptResponse, FileNode, ScriptFileCreate from skyvern.schemas.scripts import CreateScriptResponse, FileNode, ScriptFileCreate
LOG = structlog.get_logger(__name__) LOG = structlog.get_logger(__name__)
@ -204,3 +209,101 @@ async def execute_script(
if background_tasks: if background_tasks:
background_tasks.add_task(subprocess.run, ["python", f"{script.script_id}/main.py"]) background_tasks.add_task(subprocess.run, ["python", f"{script.script_id}/main.py"])
LOG.info("Script executed successfully", script_id=script_id) LOG.info("Script executed successfully", script_id=script_id)
async def _run_cached_function(cache_key: str) -> None:
cached_fn = script_run_context_manager.get_cached_fn(cache_key)
if cached_fn:
# TODO: handle exceptions here and fall back to AI run in case of error
run_context = script_run_context_manager.ensure_run_context()
await cached_fn(page=run_context.page, context=run_context)
else:
raise Exception(f"Cache key {cache_key} not found")
async def run_task(
prompt: str,
url: str | None = None,
max_steps: int | None = None,
cache_key: str | None = None,
) -> None:
if cache_key:
await _run_cached_function(cache_key)
else:
raise Exception("Cache key is required to run task block in a script")
async def download(
prompt: str,
url: str | None = None,
max_steps: int | None = None,
cache_key: str | None = None,
) -> None:
if cache_key:
await _run_cached_function(cache_key)
else:
raise Exception("Cache key is required to run task block in a script")
async def action(
prompt: str,
url: str | None = None,
max_steps: int | None = None,
cache_key: str | None = None,
) -> None:
if cache_key:
await _run_cached_function(cache_key)
else:
raise Exception("Cache key is required to run task block in a script")
async def login(
prompt: str,
url: str | None = None,
max_steps: int | None = None,
cache_key: str | None = None,
) -> None:
if cache_key:
await _run_cached_function(cache_key)
else:
raise Exception("Cache key is required to run task block in a script")
async def extract(
prompt: str,
url: str | None = None,
max_steps: int | None = None,
cache_key: str | None = None,
) -> None:
if cache_key:
await _run_cached_function(cache_key)
else:
raise Exception("Cache key is required to run task block in a script")
async def wait(seconds: int) -> None:
await asyncio.sleep(seconds)
async def run_script(path: str, parameters: dict[str, Any] | None = None) -> None:
# register the script run
run_id = "123"
skyvern_context.set(skyvern_context.SkyvernContext(run_id=run_id))
# run the script as subprocess; pass the parameters and run_id to the script
# Dynamically import the script at the given path
spec = importlib.util.spec_from_file_location("user_script", path)
if not spec or not spec.loader:
raise Exception(f"Failed to import script from {path}")
user_script = importlib.util.module_from_spec(spec)
spec.loader.exec_module(user_script)
# Call run_workflow from the imported module
if hasattr(user_script, "run_workflow"):
# If parameters is None, pass an empty dict
if parameters:
await user_script.run_workflow(parameters=parameters)
else:
await user_script.run_workflow()
else:
raise Exception(f"No 'run_workflow' function found in {path}")