Workflow CodeGen (#2740)

This commit is contained in:
Shuchang Zheng 2025-06-18 00:44:46 -07:00 committed by GitHub
parent 14bc711240
commit f6a0ccd32b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 565 additions and 51 deletions

View file

@ -23,5 +23,25 @@ setup_logger()
from skyvern.forge import app # noqa: E402, F401 from skyvern.forge import app # noqa: E402, F401
from skyvern.library import Skyvern # noqa: E402 from skyvern.library import Skyvern # noqa: E402
from skyvern.core.code_generations.skyvern_page import RunContext, SkyvernPage # noqa: E402
from skyvern.core.code_generations.run_initializer import setup # noqa: E402
from skyvern.core.code_generations.workflow_wrappers import ( # noqa: E402
workflow, # noqa: E402
task_block, # noqa: E402
file_download_block, # noqa: E402
email_block, # noqa: E402
wait_block, # noqa: E402
) # noqa: E402
__all__ = ["Skyvern"]
__all__ = [
"Skyvern",
"SkyvernPage",
"RunContext",
"setup",
"workflow",
"task_block",
"file_download_block",
"email_block",
"wait_block",
]

0
skyvern/core/__init__.py Normal file
View file

View file

@ -0,0 +1,22 @@
from skyvern.core.code_generations.skyvern_page import RunContext
class CodeRunContextManager:
"""
Manages the run context for code runs.
"""
def __init__(self) -> None:
self.run_contexts: dict[str, RunContext] = {}
"""
run_id -> RunContext
"""
def get_run_context(self, run_id: str) -> RunContext | None:
return self.run_contexts.get(run_id)
def set_run_context(self, run_id: str, run_context: RunContext) -> None:
self.run_contexts[run_id] = run_context
def delete_run_context(self, run_id: str) -> None:
self.run_contexts.pop(run_id, None)

View file

@ -1,18 +1,15 @@
# skyvern_codegen_cst.py # skyvern_codegen_cst.py
""" """
Generate a runnable Skyvern workflow script **with LibCST**. Generate a runnable Skyvern workflow script.
Example Example
------- -------
from skyvern_codegen_cst import generate_workflow_script generated_code = generate_workflow_script(
file_name="workflow.py",
src = generate_workflow_script( workflow_run_request=workflow_run_request,
workflow=workflow_dict, workflow=workflow,
tasks=[task1, task2, ...], tasks=tasks,
actions_by_task={ actions_by_task=actions_by_task,
task1["task_id"]: task1_actions,
task2["task_id"]: task2_actions,
},
) )
Path("workflow.py").write_text(src) Path("workflow.py").write_text(src)
""" """
@ -20,11 +17,14 @@ Path("workflow.py").write_text(src)
from __future__ import annotations from __future__ import annotations
import keyword import keyword
from typing import Any, Iterable, Mapping from enum import StrEnum
from typing import Any
import libcst as cst import libcst as cst
from libcst import Attribute, Call, Dict, DictElement, FunctionDef, Name, Param from libcst import Attribute, Call, Dict, DictElement, FunctionDef, Name, Param
from skyvern.webeye.actions.action_types import ActionType
# --------------------------------------------------------------------- # # --------------------------------------------------------------------- #
# 1. helpers # # 1. helpers #
# --------------------------------------------------------------------- # # --------------------------------------------------------------------- #
@ -42,6 +42,8 @@ ACTION_MAP = {
"drag": "drag", "drag": "drag",
"solve_captcha": "solve_captcha", "solve_captcha": "solve_captcha",
"verification_code": "verification_code", "verification_code": "verification_code",
"wait": "wait",
"extract": "extract",
} }
INDENT = " " * 4 INDENT = " " * 4
@ -86,12 +88,47 @@ def _value(value: Any) -> cst.BaseExpression:
# --------------------------------------------------------------------- # # --------------------------------------------------------------------- #
def _make_decorator(block: Mapping[str, Any]) -> cst.Decorator: def _workflow_decorator(wf_req: dict[str, Any]) -> cst.Decorator:
"""
Build @skyvern.workflow(
title="...", totp_url=..., totp_identifier=..., webhook_callback_url=..., max_steps=...
)
"""
# helper that skips “None” so the output is concise
def kw(key: str, value: Any) -> cst.Arg | None:
if value is None:
return None
return cst.Arg(keyword=cst.Name(key), value=_value(value))
args: list = list(
filter(
None,
[
kw("title", wf_req.get("title", "")),
kw("totp_url", wf_req.get("totp_url")),
kw("totp_identifier", wf_req.get("totp_identifier")),
kw("webhook_url", wf_req.get("webhook_url")),
kw("max_steps", wf_req.get("max_steps")),
],
)
)
return cst.Decorator(
decorator=cst.Call(
func=cst.Attribute(value=cst.Name("skyvern"), attr=cst.Name("workflow")),
args=args,
)
)
def _make_decorator(block: dict[str, Any]) -> cst.Decorator:
bt = block["block_type"] bt = block["block_type"]
deco_name = { deco_name = {
"task": "task_block", "task": "task_block",
"file_download": "file_download_block", "file_download": "file_download_block",
"send_email": "email_block", "send_email": "email_block",
"wait": "wait_block",
}[bt] }[bt]
kwargs = [] kwargs = []
@ -104,12 +141,18 @@ def _make_decorator(block: Mapping[str, Any]) -> cst.Decorator:
"totp_identifier": "totp_identifier", "totp_identifier": "totp_identifier",
"webhook_callback_url": "webhook_callback_url", "webhook_callback_url": "webhook_callback_url",
"max_steps_per_run": "max_steps", "max_steps_per_run": "max_steps",
"wait_sec": "seconds",
} }
for src_key, kw in field_map.items(): for src_key, kw in field_map.items():
v = block.get(src_key) v = block.get(src_key)
if v not in (None, "", [], {}): if v not in (None, "", [], {}):
kwargs.append(cst.Arg(value=_value(v), keyword=Name(kw))) if isinstance(v, StrEnum):
v = v.value
try:
kwargs.append(cst.Arg(value=_value(v), keyword=Name(kw)))
except Exception:
raise
# booleans # booleans
if block.get("complete_on_download"): if block.get("complete_on_download"):
@ -125,7 +168,7 @@ def _make_decorator(block: Mapping[str, Any]) -> cst.Decorator:
) )
def _action_to_stmt(act: Mapping[str, Any]) -> cst.BaseStatement: def _action_to_stmt(act: dict[str, Any]) -> cst.BaseStatement:
""" """
Turn one Action dict into: Turn one Action dict into:
@ -157,14 +200,16 @@ def _action_to_stmt(act: Mapping[str, Any]) -> cst.BaseStatement:
return cst.SimpleStatementLine([cst.Expr(await_expr)]) return cst.SimpleStatementLine([cst.Expr(await_expr)])
def _build_block_fn(block: Mapping[str, Any], actions: Iterable[Mapping[str, Any]]) -> FunctionDef: def _build_block_fn(block: dict[str, Any], actions: list[dict[str, Any]]) -> FunctionDef:
name = _safe_name(block["title"]) name = _safe_name(block.get("title") or block.get("label") or f"block_{block.get('workflow_run_block_id')}")
body_stmts: list[cst.BaseStatement] = [] body_stmts: list[cst.BaseStatement] = []
if block.get("url"): if block.get("url"):
body_stmts.append(cst.parse_statement(f"await page.goto({repr(block['url'])})")) body_stmts.append(cst.parse_statement(f"await page.goto({repr(block['url'])})"))
for act in actions: for act in actions:
if act["action_type"] in [ActionType.COMPLETE]:
continue
body_stmts.append(_action_to_stmt(act)) body_stmts.append(_action_to_stmt(act))
if not body_stmts: if not body_stmts:
@ -174,8 +219,8 @@ def _build_block_fn(block: Mapping[str, Any], actions: Iterable[Mapping[str, Any
name=Name(name), name=Name(name),
params=cst.Parameters( params=cst.Parameters(
params=[ params=[
Param(name=Name("page")), Param(name=Name("page"), annotation=cst.Annotation(cst.Name("SkyvernPage"))),
Param(name=Name("context")), Param(name=Name("context"), annotation=cst.Annotation(cst.Name("RunContext"))),
] ]
), ),
decorators=[_make_decorator(block)], decorators=[_make_decorator(block)],
@ -185,7 +230,7 @@ def _build_block_fn(block: Mapping[str, Any], actions: Iterable[Mapping[str, Any
) )
def _build_model(workflow: Mapping[str, Any]) -> cst.ClassDef: def _build_model(workflow: dict[str, Any]) -> cst.ClassDef:
""" """
class WorkflowParameters(BaseModel): class WorkflowParameters(BaseModel):
ein_info: str ein_info: str
@ -216,31 +261,65 @@ def _build_model(workflow: Mapping[str, Any]) -> cst.ClassDef:
) )
def _build_cached_params() -> cst.SimpleStatementLine: def _build_cached_params(values: dict[str, Any]) -> cst.SimpleStatementLine:
src = "cached_parameters = WorkflowParameters(**{k: f'<{k}>' for k in WorkflowParameters.model_fields})" """
return cst.parse_statement(src) Make a CST for:
cached_parameters = WorkflowParameters(ein_info="...", ...)
"""
call = cst.Call(
func=cst.Name("WorkflowParameters"),
args=[cst.Arg(keyword=cst.Name(k), value=_value(v)) for k, v in values.items()],
)
assign = cst.Assign(
targets=[cst.AssignTarget(cst.Name("cached_parameters"))],
value=call,
)
return cst.SimpleStatementLine([assign])
def _build_run_fn(task_fns: list[str]) -> FunctionDef: def _build_run_fn(task_titles: list[str], wf_req: dict[str, Any]) -> FunctionDef:
body = [cst.parse_statement("page, context = await skyvern.setup(parameters.model_dump())")] + [ body = [
cst.parse_statement(f"await {_safe_name(t)}(page, context)") for t in task_fns cst.parse_statement("page, context = await skyvern.setup(parameters.model_dump())"),
*[cst.parse_statement(f"await {_safe_name(t)}(page, context)") for t in task_titles],
] ]
params = cst.Parameters(
params=[
Param(
name=cst.Name("parameters"),
annotation=cst.Annotation(cst.Name("WorkflowParameters")),
default=cst.Name("cached_parameters"),
),
Param(
name=cst.Name("title"),
annotation=cst.Annotation(cst.Name("str")),
default=_value(wf_req.get("title", "")),
),
Param(
name=cst.Name("webhook_url"),
annotation=cst.Annotation(cst.parse_expression("str | None")),
default=_value(wf_req.get("webhook_url")),
),
Param(
name=cst.Name("totp_url"),
annotation=cst.Annotation(cst.parse_expression("str | None")),
default=_value(wf_req.get("totp_url")),
),
Param(
name=cst.Name("totp_identifier"),
annotation=cst.Annotation(cst.parse_expression("str | None")),
default=_value(wf_req.get("totp_identifier")),
),
]
)
return FunctionDef( return FunctionDef(
name=Name("run_workflow"), name=cst.Name("run_workflow"),
decorators=[cst.Decorator(Attribute(value=Name("skyvern"), attr=Name("workflow")))],
params=cst.Parameters(
params=[
Param(
name=Name("parameters"),
default=Name("cached_parameters"),
annotation=cst.Annotation(Name("WorkflowParameters")),
)
]
),
body=cst.IndentedBlock(body),
returns=None,
asynchronous=cst.Asynchronous(), asynchronous=cst.Asynchronous(),
decorators=[_workflow_decorator(wf_req)],
params=params,
body=cst.IndentedBlock(body),
) )
@ -251,9 +330,11 @@ def _build_run_fn(task_fns: list[str]) -> FunctionDef:
def generate_workflow_script( def generate_workflow_script(
*, *,
workflow: Mapping[str, Any], file_name: str,
tasks: Iterable[Mapping[str, Any]], workflow_run_request: dict[str, Any],
actions_by_task: Mapping[str, Iterable[Mapping[str, Any]]], workflow: dict[str, Any],
tasks: list[dict[str, Any]],
actions_by_task: dict[str, list[dict[str, Any]]],
) -> str: ) -> str:
""" """
Build a LibCST Module and emit .code (PEP-8-formatted source). Build a LibCST Module and emit .code (PEP-8-formatted source).
@ -285,31 +366,41 @@ def generate_workflow_script(
# --- class + cached params ----------------------------------------- # --- class + cached params -----------------------------------------
model_cls = _build_model(workflow) model_cls = _build_model(workflow)
cached_params_stmt = _build_cached_params() cached_params_stmt = _build_cached_params(workflow_run_request.get("parameters", {}))
# --- blocks --------------------------------------------------------- # --- blocks ---------------------------------------------------------
block_fns: list[FunctionDef] = [] block_fns = []
task_titles = [] length_of_tasks = len(tasks)
for t in tasks: for idx, task in enumerate(tasks):
fn = _build_block_fn(t, actions_by_task.get(t["task_id"], [])) block_fns.append(_build_block_fn(task, actions_by_task.get(task.get("task_id", ""), [])))
block_fns.append(fn) if idx < length_of_tasks - 1:
task_titles.append(t["title"]) block_fns.append(cst.EmptyLine())
block_fns.append(cst.EmptyLine())
task_titles: list[str] = [
t.get("title") or t.get("label") or t.get("task_id") or f"unknown_title_{idx}" for idx, t in enumerate(tasks)
]
# --- runner --------------------------------------------------------- # --- runner ---------------------------------------------------------
run_fn = _build_run_fn(task_titles) run_fn = _build_run_fn(task_titles, workflow_run_request)
module = cst.Module( module = cst.Module(
body=[ body=[
*imports, *imports,
cst.EmptyLine(), cst.EmptyLine(),
cst.EmptyLine(),
model_cls, model_cls,
cst.EmptyLine(), cst.EmptyLine(),
cst.EmptyLine(),
cached_params_stmt, cached_params_stmt,
cst.EmptyLine(), cst.EmptyLine(),
cst.EmptyLine(),
*block_fns, *block_fns,
cst.EmptyLine(), cst.EmptyLine(),
run_fn,
cst.EmptyLine(), cst.EmptyLine(),
run_fn,
] ]
) )
with open(file_name, "w") as f:
f.write(module.code)
return module.code return module.code

View file

@ -0,0 +1,23 @@
from typing import Any
from playwright.async_api import async_playwright
from skyvern.core.code_generations.skyvern_page import RunContext, SkyvernPage
from skyvern.forge.sdk.core import skyvern_context
from skyvern.webeye.browser_factory import BrowserContextFactory
# TODO: find a better name for this function
async def setup(parameters: dict[str, Any]) -> tuple[SkyvernPage, RunContext]:
# set up skyvern context
skyvern_context.set(skyvern_context.SkyvernContext())
# start playwright
pw = await async_playwright().start()
(
browser_context,
_,
_,
) = await BrowserContextFactory.create_browser_context(playwright=pw)
new_page = await browser_context.new_page()
skyvern_page = SkyvernPage(page=new_page)
return skyvern_page, RunContext(parameters=parameters, page=skyvern_page)

View file

@ -0,0 +1,206 @@
from __future__ import annotations
from dataclasses import dataclass
from enum import StrEnum
from typing import Any, Callable
from playwright.async_api import Page
from skyvern.config import settings
from skyvern.forge.sdk.api.files import download_file
from skyvern.webeye.actions import handler_utils
from skyvern.webeye.actions.action_types import ActionType
class Driver(StrEnum):
PLAYWRIGHT = "playwright"
@dataclass
class ActionMetadata:
intention: str = ""
data: dict[str, Any] | str | None = None
timestamp: float | None = None # filled in by recorder
screenshot_path: str | None = None # if enabled
@dataclass
class ActionCall:
name: ActionType
args: tuple[Any, ...]
kwargs: dict[str, Any]
meta: ActionMetadata
result: Any | None = None # populated after execution
error: Exception | None = None # populated if failed
class SkyvernPage:
"""
A minimal adapter around the chosen driver that:
1. Executes real browser commands
2. Records ActionCallobjects into RunContext.trace
3. Adds retry / fallback hooks
"""
def __init__(
self,
page: Page,
driver: Driver = Driver.PLAYWRIGHT,
*,
recorder: Callable[[ActionCall], None] | None = None,
):
self.driver = driver
self.page = page # e.g. Playwright's Page
self._record = recorder or (lambda ac: None)
@staticmethod
def action_wrap(
action: ActionType,
) -> Callable:
"""
Decorator to record the action call.
TODOs:
- generate action record in db pre action
- generate screenshot post action
"""
def decorator(fn: Callable) -> Callable:
async def wrapper(
skyvern_page: SkyvernPage,
*args: Any,
intention: str = "",
data: str | dict[str, Any] = "",
**kwargs: Any,
) -> Any:
meta = ActionMetadata(intention, data)
call = ActionCall(action, args, kwargs, meta)
try:
call.result = await fn(skyvern_page, *args, **kwargs) # real driver call
return call.result
except Exception as e:
call.error = e
# LLM fallback hook could go here ...
raise
finally:
skyvern_page._record(call)
return wrapper
return decorator
async def goto(self, url: str) -> None:
await self.page.goto(url)
######### Public Interfaces #########
@action_wrap(ActionType.CLICK)
async def click(self, xpath: str, intention: str | None = None, data: str | dict[str, Any] | None = None) -> None:
locator = self.page.locator(xpath)
await locator.click(timeout=5000)
@action_wrap(ActionType.INPUT_TEXT)
async def input_text(
self,
xpath: str,
text: str,
intention: str | None = None,
data: str | dict[str, Any] | None = None,
timeout: float = settings.BROWSER_ACTION_TIMEOUT_MS,
) -> None:
locator = self.page.locator(xpath)
await handler_utils.input_sequentially(locator, text, timeout=timeout)
@action_wrap(ActionType.UPLOAD_FILE)
async def upload_file(
self, xpath: str, file_path: str, intention: str | None = None, data: str | dict[str, Any] | None = None
) -> None:
file = await download_file(file_path)
await self.page.set_input_files(xpath, file)
@action_wrap(ActionType.SELECT_OPTION)
async def select_option(
self, xpath: str, option: str, intention: str | None = None, data: str | dict[str, Any] | None = None
) -> None:
locator = self.page.locator(xpath)
await locator.select_option(option, timeout=5000)
@action_wrap(ActionType.WAIT)
async def wait(
self, seconds: float, intention: str | None = None, data: str | dict[str, Any] | None = None
) -> None: ...
@action_wrap(ActionType.NULL_ACTION)
async def null_action(self, intention: str | None = None, data: str | dict[str, Any] | None = None) -> None: ...
@action_wrap(ActionType.SOLVE_CAPTCHA)
async def solve_captcha(
self, xpath: str, intention: str | None = None, data: str | dict[str, Any] | None = None
) -> None: ...
@action_wrap(ActionType.TERMINATE)
async def terminate(
self, errors: list[str], intention: str | None = None, data: str | dict[str, Any] | None = None
) -> None: ...
@action_wrap(ActionType.COMPLETE)
async def complete(
self, data_extraction_goal: str, intention: str | None = None, data: str | dict[str, Any] | None = None
) -> None: ...
@action_wrap(ActionType.RELOAD_PAGE)
async def reload_page(self, intention: str | None = None, data: str | dict[str, Any] | None = None) -> None: ...
@action_wrap(ActionType.EXTRACT)
async def extract(
self, data_extraction_goal: str, intention: str | None = None, data: str | dict[str, Any] | None = None
) -> None: ...
@action_wrap(ActionType.VERIFICATION_CODE)
async def verification_code(
self, xpath: str, intention: str | None = None, data: str | dict[str, Any] | None = None
) -> None: ...
@action_wrap(ActionType.SCROLL)
async def scroll(
self, amount: int, intention: str | None = None, data: str | dict[str, Any] | None = None
) -> None: ...
@action_wrap(ActionType.KEYPRESS)
async def keypress(
self, key: str, intention: str | None = None, data: str | dict[str, Any] | None = None
) -> None: ...
@action_wrap(ActionType.TYPE)
async def type(self, text: str, intention: str | None = None, data: str | dict[str, Any] | None = None) -> None: ...
@action_wrap(ActionType.MOVE)
async def move(
self, x: int, y: int, intention: str | None = None, data: str | dict[str, Any] | None = None
) -> None: ...
@action_wrap(ActionType.DRAG)
async def drag(
self,
start_x: int,
start_y: int,
end_x: int,
end_y: int,
intention: str | None = None,
data: str | dict[str, Any] | None = None,
) -> None: ...
@action_wrap(ActionType.LEFT_MOUSE)
async def left_mouse(
self, x: int, y: int, intention: str | None = None, data: str | dict[str, Any] | None = None
) -> None: ...
class RunContext:
"""
Lives for one workflow run.
"""
def __init__(self, parameters: dict[str, Any], page: SkyvernPage) -> None:
self.parameters = parameters
self.page = page
self.trace: list[ActionCall] = []

View file

@ -0,0 +1,81 @@
from dataclasses import dataclass
from typing import Any, Iterable, Mapping
import structlog
from skyvern.forge import app
from skyvern.forge.sdk.workflow.models.block import BlockType
from skyvern.services import workflow_service
LOG = structlog.get_logger(__name__)
@dataclass
class CodeGenInput:
file_name: str
workflow_run: Mapping[str, Any]
workflow: Mapping[str, Any]
workflow_blocks: Iterable[Mapping[str, Any]]
actions_by_task: Mapping[str, Iterable[Mapping[str, Any]]]
async def transform_workflow_run_to_code_gen_input(workflow_run_id: str, organization_id: str) -> CodeGenInput:
# get the workflow run request
workflow_run_resp = await workflow_service.get_workflow_run_response(
workflow_run_id=workflow_run_id, organization_id=organization_id
)
if not workflow_run_resp:
raise ValueError(f"Workflow run {workflow_run_id} not found")
run_request = workflow_run_resp.run_request
if not run_request:
raise ValueError(f"Workflow run {workflow_run_id} has no run request")
workflow_run_request_json = run_request.model_dump()
# get the workflow
workflow = await app.WORKFLOW_SERVICE.get_workflow_by_permanent_id(
workflow_permanent_id=run_request.workflow_id, organization_id=organization_id
)
if not workflow:
raise ValueError(f"Workflow {run_request.workflow_id} not found")
workflow_json = workflow.model_dump()
# get the tasks
## first, get all the workflow run blocks
workflow_run_blocks = await app.DATABASE.get_workflow_run_blocks(
workflow_run_id=workflow_run_id, organization_id=organization_id
)
workflow_run_blocks.sort(key=lambda x: x.created_at)
workflow_block_dump = []
# Hydrate blocks with task data
# TODO: support task v2
actions_by_task = {}
for block in workflow_run_blocks:
block_dump = block.model_dump()
if block.block_type == BlockType.TaskV2:
raise ValueError("TaskV2 blocks are not supported yet")
if (
block.block_type
in [BlockType.TASK, BlockType.ACTION, BlockType.EXTRACTION, BlockType.LOGIN, BlockType.NAVIGATION]
and block.task_id
):
task = await app.DATABASE.get_task(task_id=block.task_id, organization_id=organization_id)
if not task:
LOG.warning(f"Task {block.task_id} not found")
continue
block_dump.update(task.model_dump())
actions = await app.DATABASE.get_task_actions(task_id=block.task_id, organization_id=organization_id)
action_dumps = []
for action in actions:
action_dump = action.model_dump()
action_dump["xpath"] = action.get_xpath()
action_dumps.append(action_dump)
actions_by_task[block.task_id] = action_dumps
workflow_block_dump.append(block_dump)
return CodeGenInput(
file_name=f"{workflow_run_id}.py",
workflow_run=workflow_run_request_json,
workflow=workflow_json,
workflow_blocks=workflow_block_dump,
actions_by_task=actions_by_task,
)

View file

@ -0,0 +1,59 @@
from typing import Any, Callable
# Build a dummy workflow decorator
def workflow(
title: str | None = None,
totp_url: str | None = None,
totp_identifier: str | None = None,
webhook_url: str | None = None,
max_steps: int | None = None,
) -> Callable:
def wrapper(func: Callable) -> Callable:
return func
return wrapper
def task_block(
prompt: str | None = None,
title: str | None = None,
url: str | None = None,
engine: str | None = None,
model: dict[str, Any] | None = None,
totp_url: str | None = None,
totp_identifier: str | None = None,
max_steps: int | None = None,
navigation_payload: str | None = None,
webhook_url: str | None = None,
) -> Callable:
def decorator(func: Callable) -> Callable:
return func
return decorator
def file_download_block(
prompt: str | None = None,
title: str | None = None,
url: str | None = None,
max_steps: int | None = None,
) -> Callable:
def decorator(func: Callable) -> Callable:
return func
return decorator
def email_block(prompt: str | None = None, title: str | None = None, url: str | None = None) -> Callable:
def decorator(func: Callable) -> Callable:
return func
return decorator
def wait_block(seconds: int) -> Callable:
def decorator(func: Callable) -> Callable:
return func
return decorator

View file

@ -0,0 +1,12 @@
from pathlib import Path
async def run_code(path: Path) -> None:
# initialize the SkyvernPage object
# initialize workflow run context
# load the python file and do validation
# run the preloaded workflow
return