mirror of
https://github.com/Skyvern-AI/skyvern.git
synced 2026-04-28 03:30:10 +00:00
Fix browser session timeout for FORCE_BROWSER_SESSION workflows (#SKY-8099) (#4903)
This commit is contained in:
parent
3a55633ab6
commit
62b74eb98a
24 changed files with 872 additions and 186 deletions
|
|
@ -1,9 +1,9 @@
|
|||
{/* This file (fern/) and docs/integrations/cli.mdx must stay in sync. fern/ is for Fern, docs/ is for Mintlify. */}
|
||||
---
|
||||
title: CLI & Skills
|
||||
subtitle: Automate browsers and manage workflows from the command line
|
||||
slug: integrations/cli
|
||||
---
|
||||
{/* This file (fern/) and docs/integrations/cli.mdx must stay in sync. fern/ is for Fern, docs/ is for Mintlify. */}
|
||||
|
||||
The `skyvern` CLI gives you direct access to browser automation, workflow management, credential storage, and more — all from your terminal. Use it in shell scripts, CI/CD pipelines, or for quick one-off tasks.
|
||||
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
{/* This file (fern/) and docs/integrations/mcp.mdx must stay in sync. fern/ is for Fern, docs/ is for Mintlify. Minor formatting differences (Tip vs inline text, CardGroup) are OK since the platforms support different components. */}
|
||||
---
|
||||
title: MCP Server
|
||||
subtitle: Connect AI assistants to browser automation via Model Context Protocol
|
||||
slug: integrations/mcp
|
||||
---
|
||||
{/* This file (fern/) and docs/integrations/mcp.mdx must stay in sync. fern/ is for Fern, docs/ is for Mintlify. Minor formatting differences (Tip vs inline text, CardGroup) are OK since the platforms support different components. */}
|
||||
|
||||
The Skyvern MCP server lets AI assistants like Claude Desktop, Claude Code, Codex, Cursor, and Windsurf control a browser. Your AI can fill out forms, extract data, download files, and run multi-step workflows, all through natural language.
|
||||
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ function HighlightText({ text, query }: HighlightTextProps) {
|
|||
<>
|
||||
{parts.map((part, i) =>
|
||||
part.toLowerCase() === lowerQ ? (
|
||||
<span key={i} className="rounded bg-blue-500/30 px-0.5 text-blue-400">
|
||||
<span key={i} className="rounded bg-blue-500/30 text-blue-400">
|
||||
{part}
|
||||
</span>
|
||||
) : (
|
||||
|
|
|
|||
|
|
@ -73,7 +73,8 @@ function NavigationNode({ id, data, type }: NodeProps<NavigationNode>) {
|
|||
const handleEngineChange = (value: RunEngine) => {
|
||||
const updates: Partial<NavigationNode["data"]> = { engine: value };
|
||||
if (value === RunEngine.SkyvernV2) {
|
||||
// Switching to V2 — clear V1-specific fields
|
||||
// Switching to V2 — preserve prompt content, clear V1-specific fields
|
||||
updates.prompt = data.navigationGoal || data.prompt;
|
||||
updates.navigationGoal = "";
|
||||
updates.completeCriterion = "";
|
||||
updates.terminateCriterion = "";
|
||||
|
|
@ -85,7 +86,8 @@ function NavigationNode({ id, data, type }: NodeProps<NavigationNode>) {
|
|||
updates.downloadSuffix = null;
|
||||
updates.includeActionHistoryInVerification = false;
|
||||
} else if (data.engine === RunEngine.SkyvernV2) {
|
||||
// Switching away from V2 — clear V2-specific fields
|
||||
// Switching away from V2 — preserve prompt content, clear V2-specific fields
|
||||
updates.navigationGoal = data.prompt || data.navigationGoal;
|
||||
updates.prompt = "";
|
||||
updates.maxSteps = MAX_STEPS_DEFAULT;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import {
|
|||
LockOpen1Icon,
|
||||
StopwatchIcon,
|
||||
UpdateIcon,
|
||||
Share1Icon,
|
||||
UploadIcon,
|
||||
} from "@radix-ui/react-icons";
|
||||
import { ExtractIcon } from "@/components/icons/ExtractIcon";
|
||||
|
|
@ -94,6 +95,9 @@ function WorkflowBlockIcon({ workflowBlockType, className }: Props) {
|
|||
case "print_page": {
|
||||
return <FileTextIcon className={className} />;
|
||||
}
|
||||
case "workflow_trigger": {
|
||||
return <Share1Icon className={className} />;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,155 @@
|
|||
import { useState, useRef } from "react";
|
||||
import { PlusIcon } from "@radix-ui/react-icons";
|
||||
import { Skeleton } from "@/components/ui/skeleton";
|
||||
import {
|
||||
Popover,
|
||||
PopoverContent,
|
||||
PopoverTrigger,
|
||||
} from "@/components/ui/popover";
|
||||
import { getClient } from "@/api/AxiosClient";
|
||||
import { useCredentialGetter } from "@/hooks/useCredentialGetter";
|
||||
import { useQuery } from "@tanstack/react-query";
|
||||
import { useDebounce } from "use-debounce";
|
||||
import { WorkflowApiResponse } from "@/routes/workflows/types/workflowTypes";
|
||||
import { WorkflowBlockParameterSelect } from "@/routes/workflows/editor/nodes/WorkflowBlockParameterSelect";
|
||||
|
||||
interface WorkflowSelectorProps {
|
||||
nodeId: string;
|
||||
value: string;
|
||||
onChange: (value: string) => void;
|
||||
}
|
||||
|
||||
function WorkflowSelector({ nodeId, value, onChange }: WorkflowSelectorProps) {
|
||||
const [focused, setFocused] = useState(false);
|
||||
const [debouncedValue] = useDebounce(value, 300);
|
||||
const credentialGetter = useCredentialGetter();
|
||||
const containerRef = useRef<HTMLDivElement>(null);
|
||||
const inputRef = useRef<HTMLInputElement>(null);
|
||||
const isTyping = value !== debouncedValue;
|
||||
|
||||
const { data: workflows = [], isFetching } = useQuery<
|
||||
Array<WorkflowApiResponse>
|
||||
>({
|
||||
queryKey: ["workflows", "selector", debouncedValue],
|
||||
queryFn: async () => {
|
||||
const client = await getClient(credentialGetter);
|
||||
const params = new URLSearchParams();
|
||||
params.append("page", "1");
|
||||
params.append("page_size", "10");
|
||||
params.append("only_workflows", "true");
|
||||
if (debouncedValue) {
|
||||
params.append("search_key", debouncedValue);
|
||||
}
|
||||
return client
|
||||
.get("/workflows", { params })
|
||||
.then((response) => response.data);
|
||||
},
|
||||
enabled: focused,
|
||||
});
|
||||
|
||||
const showDropdown =
|
||||
focused && (workflows.length > 0 || isFetching || isTyping);
|
||||
|
||||
const insertParameter = (parameterKey: string) => {
|
||||
const parameterText = `{{${parameterKey}}}`;
|
||||
const input = inputRef.current;
|
||||
if (input) {
|
||||
const start = input.selectionStart ?? value.length;
|
||||
const end = input.selectionEnd ?? value.length;
|
||||
const newValue =
|
||||
value.substring(0, start) + parameterText + value.substring(end);
|
||||
onChange(newValue);
|
||||
setTimeout(() => {
|
||||
const newPosition = start + parameterText.length;
|
||||
input.focus();
|
||||
input.setSelectionRange(newPosition, newPosition);
|
||||
}, 0);
|
||||
} else {
|
||||
onChange(`${value}${parameterText}`);
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
<div
|
||||
ref={containerRef}
|
||||
className="nopan relative"
|
||||
onBlur={(e) => {
|
||||
if (!containerRef.current?.contains(e.relatedTarget as Node)) {
|
||||
setFocused(false);
|
||||
}
|
||||
}}
|
||||
>
|
||||
<input
|
||||
ref={inputRef}
|
||||
id={`workflow-selector-${nodeId}`}
|
||||
type="text"
|
||||
value={value}
|
||||
onChange={(e) => onChange(e.target.value)}
|
||||
onFocus={() => setFocused(true)}
|
||||
placeholder="Search by title or enter wpid_xxx / {{ parameter }}"
|
||||
className="w-full rounded-md border border-input bg-transparent px-3 py-2 pr-9 text-xs text-slate-300 shadow-sm placeholder:text-muted-foreground focus-visible:outline-none focus-visible:ring-1 focus-visible:ring-ring"
|
||||
/>
|
||||
<div className="absolute right-1 top-0 flex size-9 items-center justify-end">
|
||||
<Popover>
|
||||
<PopoverTrigger asChild>
|
||||
<div className="cursor-pointer rounded p-1 hover:bg-muted">
|
||||
<PlusIcon className="size-4" />
|
||||
</div>
|
||||
</PopoverTrigger>
|
||||
<PopoverContent className="w-[22rem]">
|
||||
<WorkflowBlockParameterSelect
|
||||
nodeId={nodeId}
|
||||
onAdd={insertParameter}
|
||||
/>
|
||||
</PopoverContent>
|
||||
</Popover>
|
||||
</div>
|
||||
{showDropdown && (
|
||||
<div className="absolute z-50 mt-1 w-full overflow-hidden rounded-md border border-slate-600 bg-slate-800 shadow-lg">
|
||||
<div className="max-h-[200px] overflow-y-auto">
|
||||
{(isFetching || isTyping) && workflows.length === 0 ? (
|
||||
<>
|
||||
{Array.from({ length: 3 }).map((_, index) => (
|
||||
<div
|
||||
key={`skeleton-${index}`}
|
||||
className="flex w-full flex-col gap-1 px-3 py-2"
|
||||
>
|
||||
<Skeleton className="h-3.5 w-3/4" />
|
||||
<Skeleton className="h-3 w-1/2" />
|
||||
</div>
|
||||
))}
|
||||
</>
|
||||
) : (
|
||||
workflows.map((workflow) => {
|
||||
const isSelected = value === workflow.workflow_permanent_id;
|
||||
return (
|
||||
<button
|
||||
key={workflow.workflow_permanent_id}
|
||||
type="button"
|
||||
onMouseDown={(e) => e.preventDefault()}
|
||||
onClick={() => {
|
||||
onChange(workflow.workflow_permanent_id);
|
||||
setFocused(false);
|
||||
}}
|
||||
className={`flex w-full flex-col gap-0.5 px-3 py-2 text-left text-xs transition-colors hover:bg-slate-700 ${
|
||||
isSelected ? "bg-slate-700" : ""
|
||||
}`}
|
||||
>
|
||||
<span className="font-medium text-slate-200">
|
||||
{workflow.title}
|
||||
</span>
|
||||
<span className="text-slate-500">
|
||||
{workflow.workflow_permanent_id}
|
||||
</span>
|
||||
</button>
|
||||
);
|
||||
})
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
export { WorkflowSelector };
|
||||
|
|
@ -0,0 +1,242 @@
|
|||
import {
|
||||
Accordion,
|
||||
AccordionContent,
|
||||
AccordionItem,
|
||||
AccordionTrigger,
|
||||
} from "@/components/ui/accordion";
|
||||
import { Label } from "@/components/ui/label";
|
||||
import { Separator } from "@/components/ui/separator";
|
||||
import { Handle, NodeProps, Position, useEdges, useNodes } from "@xyflow/react";
|
||||
import { NodeHeader } from "../components/NodeHeader";
|
||||
import { WorkflowBlockTypes } from "@/routes/workflows/types/workflowTypes";
|
||||
import type { WorkflowTriggerNode as WorkflowTriggerNodeType } from "./types";
|
||||
import { HelpTooltip } from "@/components/HelpTooltip";
|
||||
import { Switch } from "@/components/ui/switch";
|
||||
import { WorkflowBlockInputTextarea } from "@/components/WorkflowBlockInputTextarea";
|
||||
import { WorkflowSelector } from "./WorkflowSelector";
|
||||
import { AppNode } from "..";
|
||||
import {
|
||||
getAvailableOutputParameterKeys,
|
||||
isNodeInsideForLoop,
|
||||
} from "../../workflowEditorUtils";
|
||||
import { ParametersMultiSelect } from "../TaskNode/ParametersMultiSelect";
|
||||
import { useIsFirstBlockInWorkflow } from "../../hooks/useIsFirstNodeInWorkflow";
|
||||
import { CodeEditor } from "@/routes/workflows/components/CodeEditor";
|
||||
import { JsonValidator } from "@/routes/workflows/editor/nodes/HttpRequestNode/HttpUtils";
|
||||
import { useUpdate } from "@/routes/workflows/editor/useUpdate";
|
||||
import { useParams } from "react-router-dom";
|
||||
import { statusIsRunningOrQueued } from "@/routes/tasks/types";
|
||||
import { useWorkflowRunQuery } from "@/routes/workflows/hooks/useWorkflowRunQuery";
|
||||
import { useRecordingStore } from "@/store/useRecordingStore";
|
||||
import { cn } from "@/util/utils";
|
||||
import { BlockExecutionOptions } from "../components/BlockExecutionOptions";
|
||||
|
||||
const workflowPermanentIdTooltip =
|
||||
"The permanent ID (wpid_xxx) of the workflow to trigger. You can use {{ parameter_name }} to reference parameters.";
|
||||
const payloadTooltip =
|
||||
'JSON payload to pass as parameters to the triggered workflow. Values support Jinja2 templates. Example: {"url": "{{ some_parameter }}"}';
|
||||
const waitForCompletionTooltip =
|
||||
"If enabled, this block will wait for the triggered workflow to complete before continuing to the next block. If disabled, the workflow is triggered asynchronously and execution continues immediately.";
|
||||
const useParentBrowserSessionTooltip =
|
||||
"When enabled, the triggered workflow will use the same browser session as the parent workflow, continuing where it left off (same tabs, cookies, login state).";
|
||||
const browserSessionIdTooltip =
|
||||
"Optional browser session ID to pass to the triggered workflow. This allows the triggered workflow to reuse an existing browser session. Overrides the parent session toggle if set.";
|
||||
|
||||
function WorkflowTriggerNode({ id, data }: NodeProps<WorkflowTriggerNodeType>) {
|
||||
const { editable, label } = data;
|
||||
const { blockLabel: urlBlockLabel } = useParams();
|
||||
const { data: workflowRun } = useWorkflowRunQuery();
|
||||
const workflowRunIsRunningOrQueued =
|
||||
workflowRun && statusIsRunningOrQueued(workflowRun);
|
||||
const thisBlockIsTargetted =
|
||||
urlBlockLabel !== undefined && urlBlockLabel === label;
|
||||
const thisBlockIsPlaying =
|
||||
workflowRunIsRunningOrQueued && thisBlockIsTargetted;
|
||||
const isFirstWorkflowBlock = useIsFirstBlockInWorkflow({ id });
|
||||
|
||||
const nodes = useNodes<AppNode>();
|
||||
const isInsideForLoop = isNodeInsideForLoop(nodes, id);
|
||||
const edges = useEdges();
|
||||
const availableOutputParameterKeys = getAvailableOutputParameterKeys(
|
||||
nodes,
|
||||
edges,
|
||||
id,
|
||||
);
|
||||
|
||||
const update = useUpdate<WorkflowTriggerNodeType["data"]>({ id, editable });
|
||||
const recordingStore = useRecordingStore();
|
||||
|
||||
return (
|
||||
<div
|
||||
className={cn({
|
||||
"pointer-events-none opacity-50": recordingStore.isRecording,
|
||||
})}
|
||||
>
|
||||
<Handle
|
||||
type="source"
|
||||
position={Position.Bottom}
|
||||
id="a"
|
||||
className="opacity-0"
|
||||
/>
|
||||
<Handle
|
||||
type="target"
|
||||
position={Position.Top}
|
||||
id="b"
|
||||
className="opacity-0"
|
||||
/>
|
||||
<div
|
||||
className={cn(
|
||||
"transform-origin-center w-[30rem] space-y-4 rounded-lg bg-slate-elevation3 px-6 py-4 transition-all",
|
||||
{
|
||||
"pointer-events-none": thisBlockIsPlaying,
|
||||
"bg-slate-950 outline outline-2 outline-slate-300":
|
||||
thisBlockIsTargetted,
|
||||
},
|
||||
)}
|
||||
>
|
||||
<NodeHeader
|
||||
blockLabel={label}
|
||||
editable={editable}
|
||||
nodeId={id}
|
||||
totpIdentifier={null}
|
||||
totpUrl={null}
|
||||
type={WorkflowBlockTypes.WorkflowTrigger}
|
||||
/>
|
||||
<div className="space-y-4">
|
||||
<div className="space-y-2">
|
||||
<div className="flex justify-between">
|
||||
<div className="flex gap-2">
|
||||
<Label className="text-xs text-slate-300">
|
||||
Target Workflow
|
||||
</Label>
|
||||
<HelpTooltip content={workflowPermanentIdTooltip} />
|
||||
</div>
|
||||
{isFirstWorkflowBlock ? (
|
||||
<div className="flex justify-end text-xs text-slate-400">
|
||||
Tip: Use the {"+"} button to add parameters!
|
||||
</div>
|
||||
) : null}
|
||||
</div>
|
||||
<WorkflowSelector
|
||||
nodeId={id}
|
||||
value={data.workflowPermanentId}
|
||||
onChange={(value) => {
|
||||
update({ workflowPermanentId: value });
|
||||
}}
|
||||
/>
|
||||
</div>
|
||||
<Separator />
|
||||
<div className="space-y-2">
|
||||
<div className="flex gap-2">
|
||||
<Label className="text-xs text-slate-300">Payload</Label>
|
||||
<HelpTooltip content={payloadTooltip} />
|
||||
</div>
|
||||
<CodeEditor
|
||||
language="json"
|
||||
value={data.payload}
|
||||
onChange={(value) => {
|
||||
update({ payload: value });
|
||||
}}
|
||||
className="nowheel nopan"
|
||||
fontSize={11}
|
||||
minHeight="80px"
|
||||
maxHeight="200px"
|
||||
/>
|
||||
<JsonValidator value={data.payload} />
|
||||
</div>
|
||||
<Separator />
|
||||
<Accordion type="single" collapsible>
|
||||
<AccordionItem value="advanced" className="border-b-0">
|
||||
<AccordionTrigger className="py-0">
|
||||
Advanced Settings
|
||||
</AccordionTrigger>
|
||||
<AccordionContent className="space-y-4 pl-6 pr-1 pt-4">
|
||||
<div className="flex items-center justify-between">
|
||||
<div className="flex gap-2">
|
||||
<Label className="text-xs font-normal text-slate-300">
|
||||
Wait for Completion
|
||||
</Label>
|
||||
<HelpTooltip content={waitForCompletionTooltip} />
|
||||
</div>
|
||||
<div className="w-52">
|
||||
<Switch
|
||||
checked={data.waitForCompletion}
|
||||
onCheckedChange={(checked) => {
|
||||
update({ waitForCompletion: checked });
|
||||
}}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
<BlockExecutionOptions
|
||||
continueOnFailure={data.continueOnFailure}
|
||||
nextLoopOnFailure={data.nextLoopOnFailure}
|
||||
editable={editable}
|
||||
isInsideForLoop={isInsideForLoop}
|
||||
blockType="workflowTrigger"
|
||||
onContinueOnFailureChange={(checked) => {
|
||||
update({ continueOnFailure: checked });
|
||||
}}
|
||||
onNextLoopOnFailureChange={(checked) => {
|
||||
update({ nextLoopOnFailure: checked });
|
||||
}}
|
||||
/>
|
||||
<div className="space-y-2">
|
||||
<div className="flex items-center justify-between">
|
||||
<div className="flex gap-2">
|
||||
<Label className="text-xs font-normal text-slate-300">
|
||||
Use Parent Browser Session
|
||||
</Label>
|
||||
<HelpTooltip content={useParentBrowserSessionTooltip} />
|
||||
</div>
|
||||
<div className="w-52">
|
||||
<Switch
|
||||
checked={data.useParentBrowserSession}
|
||||
onCheckedChange={(checked) => {
|
||||
update({ useParentBrowserSession: checked });
|
||||
}}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
{!data.waitForCompletion && data.useParentBrowserSession && (
|
||||
<p className="text-xs text-yellow-500">
|
||||
Using the parent browser session while "Wait for
|
||||
Completion" is off may cause the triggered workflow to
|
||||
fail if the parent finishes and closes the browser first.
|
||||
</p>
|
||||
)}
|
||||
</div>
|
||||
<div className="space-y-2">
|
||||
<div className="flex gap-2">
|
||||
<Label className="text-xs font-normal text-slate-300">
|
||||
Browser Session ID
|
||||
</Label>
|
||||
<HelpTooltip content={browserSessionIdTooltip} />
|
||||
</div>
|
||||
<WorkflowBlockInputTextarea
|
||||
nodeId={id}
|
||||
onChange={(value) => {
|
||||
update({ browserSessionId: value });
|
||||
}}
|
||||
value={data.browserSessionId}
|
||||
placeholder="Optional: {{ browser_session_id }}"
|
||||
className="nopan text-xs"
|
||||
/>
|
||||
</div>
|
||||
<Separator />
|
||||
<ParametersMultiSelect
|
||||
availableOutputParameters={availableOutputParameterKeys}
|
||||
parameters={data.parameterKeys}
|
||||
onParametersChange={(parameterKeys) => {
|
||||
update({ parameterKeys });
|
||||
}}
|
||||
/>
|
||||
</AccordionContent>
|
||||
</AccordionItem>
|
||||
</Accordion>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
export { WorkflowTriggerNode };
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
export { WorkflowTriggerNode } from "./WorkflowTriggerNode";
|
||||
export type { WorkflowTriggerNode as WorkflowTriggerNodeType } from "./types";
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
import { Node } from "@xyflow/react";
|
||||
import { debuggableWorkflowBlockTypes } from "@/routes/workflows/types/workflowTypes";
|
||||
import { NodeBaseData } from "../types";
|
||||
|
||||
export type WorkflowTriggerNodeData = NodeBaseData & {
|
||||
workflowPermanentId: string;
|
||||
payload: string; // JSON string of the payload dict
|
||||
waitForCompletion: boolean;
|
||||
browserSessionId: string;
|
||||
useParentBrowserSession: boolean;
|
||||
parameterKeys: Array<string>;
|
||||
};
|
||||
|
||||
export type WorkflowTriggerNode = Node<
|
||||
WorkflowTriggerNodeData,
|
||||
"workflowTrigger"
|
||||
>;
|
||||
|
||||
export const workflowTriggerNodeDefaultData: WorkflowTriggerNodeData = {
|
||||
debuggable: debuggableWorkflowBlockTypes.has("workflow_trigger"),
|
||||
label: "",
|
||||
continueOnFailure: false,
|
||||
editable: true,
|
||||
model: null,
|
||||
workflowPermanentId: "",
|
||||
payload: "{}",
|
||||
waitForCompletion: true,
|
||||
browserSessionId: "",
|
||||
useParentBrowserSession: false,
|
||||
parameterKeys: [],
|
||||
};
|
||||
|
||||
export function isWorkflowTriggerNode(node: Node): node is WorkflowTriggerNode {
|
||||
return node.type === "workflowTrigger";
|
||||
}
|
||||
|
|
@ -49,6 +49,8 @@ import { HumanInteractionNode } from "./HumanInteractionNode/types";
|
|||
import { HumanInteractionNode as HumanInteractionNodeComponent } from "./HumanInteractionNode/HumanInteractionNode";
|
||||
import { PrintPageNode } from "./PrintPageNode/types";
|
||||
import { PrintPageNode as PrintPageNodeComponent } from "./PrintPageNode/PrintPageNode";
|
||||
import { WorkflowTriggerNode } from "./WorkflowTriggerNode/types";
|
||||
import { WorkflowTriggerNode as WorkflowTriggerNodeComponent } from "./WorkflowTriggerNode/WorkflowTriggerNode";
|
||||
|
||||
export type UtilityNode = StartNode | NodeAdderNode;
|
||||
|
||||
|
|
@ -75,7 +77,8 @@ export type WorkflowBlockNode =
|
|||
| Taskv2Node
|
||||
| URLNode
|
||||
| HttpRequestNode
|
||||
| PrintPageNode;
|
||||
| PrintPageNode
|
||||
| WorkflowTriggerNode;
|
||||
|
||||
export function isUtilityNode(node: AppNode): node is UtilityNode {
|
||||
return node.type === "nodeAdder" || node.type === "start";
|
||||
|
|
@ -113,4 +116,5 @@ export const nodeTypes = {
|
|||
url: memo(URLNodeComponent),
|
||||
http_request: memo(HttpRequestNodeComponent),
|
||||
printPage: memo(PrintPageNodeComponent),
|
||||
workflowTrigger: memo(WorkflowTriggerNodeComponent),
|
||||
} as const;
|
||||
|
|
|
|||
|
|
@ -67,4 +67,5 @@ export const workflowBlockTitle: {
|
|||
goto_url: "Go to URL",
|
||||
http_request: "HTTP Request",
|
||||
print_page: "Print Page",
|
||||
workflow_trigger: "Workflow Trigger",
|
||||
};
|
||||
|
|
|
|||
|
|
@ -262,6 +262,17 @@ const nodeLibraryItems: Array<{
|
|||
title: "Print Page Block",
|
||||
description: "Print current page to PDF",
|
||||
},
|
||||
{
|
||||
nodeType: "workflowTrigger",
|
||||
icon: (
|
||||
<WorkflowBlockIcon
|
||||
workflowBlockType={WorkflowBlockTypes.WorkflowTrigger}
|
||||
className="size-6"
|
||||
/>
|
||||
),
|
||||
title: "Workflow Trigger Block",
|
||||
description: "Trigger another workflow",
|
||||
},
|
||||
];
|
||||
|
||||
type Props = {
|
||||
|
|
|
|||
|
|
@ -47,6 +47,7 @@ import {
|
|||
FileUploadBlockYAML,
|
||||
HttpRequestBlockYAML,
|
||||
PrintPageBlockYAML,
|
||||
WorkflowTriggerBlockYAML,
|
||||
} from "../types/workflowYamlTypes";
|
||||
import {
|
||||
EMAIL_BLOCK_SENDER,
|
||||
|
|
@ -131,6 +132,10 @@ import {
|
|||
validateJson,
|
||||
} from "./nodes/HttpRequestNode/httpValidation";
|
||||
import { printPageNodeDefaultData } from "./nodes/PrintPageNode/types";
|
||||
import {
|
||||
isWorkflowTriggerNode,
|
||||
workflowTriggerNodeDefaultData,
|
||||
} from "./nodes/WorkflowTriggerNode/types";
|
||||
|
||||
export const NEW_NODE_LABEL_PREFIX = "block_";
|
||||
|
||||
|
|
@ -919,6 +924,22 @@ function convertToNode(
|
|||
},
|
||||
};
|
||||
}
|
||||
case "workflow_trigger": {
|
||||
return {
|
||||
...identifiers,
|
||||
...common,
|
||||
type: "workflowTrigger",
|
||||
data: {
|
||||
...commonData,
|
||||
workflowPermanentId: block.workflow_permanent_id ?? "",
|
||||
payload: JSON.stringify(block.payload || {}, null, 2),
|
||||
waitForCompletion: block.wait_for_completion ?? true,
|
||||
browserSessionId: block.browser_session_id ?? "",
|
||||
useParentBrowserSession: block.use_parent_browser_session ?? false,
|
||||
parameterKeys: block.parameters.map((p) => p.key),
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1970,6 +1991,17 @@ function createNode(
|
|||
},
|
||||
};
|
||||
}
|
||||
case "workflowTrigger": {
|
||||
return {
|
||||
...identifiers,
|
||||
...common,
|
||||
type: "workflowTrigger",
|
||||
data: {
|
||||
...workflowTriggerNodeDefaultData,
|
||||
label,
|
||||
},
|
||||
};
|
||||
}
|
||||
case "conditional": {
|
||||
const branches = createDefaultBranchConditions();
|
||||
return {
|
||||
|
|
@ -2451,6 +2483,25 @@ function getWorkflowBlock(
|
|||
parameter_keys: node.data.parameterKeys,
|
||||
};
|
||||
}
|
||||
case "workflowTrigger": {
|
||||
const parsedPayload = JSONParseSafe(node.data.payload) as Record<
|
||||
string,
|
||||
unknown
|
||||
> | null;
|
||||
return {
|
||||
...base,
|
||||
block_type: "workflow_trigger",
|
||||
workflow_permanent_id: node.data.workflowPermanentId,
|
||||
payload:
|
||||
parsedPayload && Object.keys(parsedPayload).length > 0
|
||||
? parsedPayload
|
||||
: null,
|
||||
wait_for_completion: node.data.waitForCompletion,
|
||||
browser_session_id: node.data.browserSessionId || null,
|
||||
use_parent_browser_session: node.data.useParentBrowserSession,
|
||||
parameter_keys: node.data.parameterKeys,
|
||||
};
|
||||
}
|
||||
case "conditional": {
|
||||
return serializeConditionalBlock(node as ConditionalNode, nodes, edges);
|
||||
}
|
||||
|
|
@ -3872,6 +3923,19 @@ function convertBlocksToBlockYAML(
|
|||
};
|
||||
return blockYaml;
|
||||
}
|
||||
case "workflow_trigger": {
|
||||
const blockYaml: WorkflowTriggerBlockYAML = {
|
||||
...base,
|
||||
block_type: "workflow_trigger",
|
||||
workflow_permanent_id: block.workflow_permanent_id,
|
||||
payload: block.payload,
|
||||
wait_for_completion: block.wait_for_completion,
|
||||
browser_session_id: block.browser_session_id,
|
||||
use_parent_browser_session: block.use_parent_browser_session,
|
||||
parameter_keys: block.parameters.map((p) => p.key),
|
||||
};
|
||||
return blockYaml;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
@ -4096,6 +4160,17 @@ function getWorkflowErrors(nodes: Array<AppNode>): Array<string> {
|
|||
});
|
||||
});
|
||||
|
||||
const workflowTriggerNodes = nodes.filter(isWorkflowTriggerNode);
|
||||
workflowTriggerNodes.forEach((node) => {
|
||||
if (!node.data.workflowPermanentId.trim()) {
|
||||
errors.push(`${node.data.label}: Workflow Permanent ID is required.`);
|
||||
}
|
||||
const payloadResult = validateJson(node.data.payload);
|
||||
if (!payloadResult.valid && payloadResult.message) {
|
||||
errors.push(`${node.data.label}: Payload is not valid JSON.`);
|
||||
}
|
||||
});
|
||||
|
||||
return errors;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -212,7 +212,8 @@ export type WorkflowBlock =
|
|||
| Taskv2Block
|
||||
| URLBlock
|
||||
| HttpRequestBlock
|
||||
| PrintPageBlock;
|
||||
| PrintPageBlock
|
||||
| WorkflowTriggerBlock;
|
||||
|
||||
export const WorkflowBlockTypes = {
|
||||
Task: "task",
|
||||
|
|
@ -238,6 +239,7 @@ export const WorkflowBlockTypes = {
|
|||
URL: "goto_url",
|
||||
HttpRequest: "http_request",
|
||||
PrintPage: "print_page",
|
||||
WorkflowTrigger: "workflow_trigger",
|
||||
} as const;
|
||||
|
||||
// all of them
|
||||
|
|
@ -559,6 +561,16 @@ export type PrintPageBlock = WorkflowBlockBase & {
|
|||
parameters: Array<WorkflowParameter>;
|
||||
};
|
||||
|
||||
export type WorkflowTriggerBlock = WorkflowBlockBase & {
|
||||
block_type: "workflow_trigger";
|
||||
workflow_permanent_id: string;
|
||||
payload: Record<string, unknown> | null;
|
||||
wait_for_completion: boolean;
|
||||
browser_session_id: string | null;
|
||||
use_parent_browser_session: boolean;
|
||||
parameters: Array<WorkflowParameter>;
|
||||
};
|
||||
|
||||
export type WorkflowDefinition = {
|
||||
version?: number | null;
|
||||
parameters: Array<Parameter>;
|
||||
|
|
|
|||
|
|
@ -142,7 +142,8 @@ export type BlockYAML =
|
|||
| Taskv2BlockYAML
|
||||
| URLBlockYAML
|
||||
| HttpRequestBlockYAML
|
||||
| PrintPageBlockYAML;
|
||||
| PrintPageBlockYAML
|
||||
| WorkflowTriggerBlockYAML;
|
||||
|
||||
export type BlockYAMLBase = {
|
||||
block_type: WorkflowBlockType;
|
||||
|
|
@ -416,3 +417,13 @@ export type PrintPageBlockYAML = BlockYAMLBase & {
|
|||
print_background: boolean;
|
||||
parameter_keys?: Array<string> | null;
|
||||
};
|
||||
|
||||
export type WorkflowTriggerBlockYAML = BlockYAMLBase & {
|
||||
block_type: "workflow_trigger";
|
||||
workflow_permanent_id: string;
|
||||
payload: Record<string, unknown> | null;
|
||||
wait_for_completion: boolean;
|
||||
browser_session_id?: string | null;
|
||||
use_parent_browser_session?: boolean;
|
||||
parameter_keys?: Array<string> | null;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -570,6 +570,9 @@ async def skyvern_workflow_run(
|
|||
timeout_seconds: Annotated[
|
||||
int, Field(description="Max wait time in seconds when wait=true (default 300)", ge=10, le=3600)
|
||||
] = 300,
|
||||
run_with: Annotated[
|
||||
str | None, Field(description="Execution mode override (e.g., 'code' for cached script execution)")
|
||||
] = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Run a Skyvern workflow with parameters. Use when you need to execute an automation workflow.
|
||||
Returns immediately by default (async) — set wait=true to block until completion.
|
||||
|
|
@ -620,6 +623,7 @@ async def skyvern_workflow_run(
|
|||
proxy_location=proxy,
|
||||
wait_for_completion=wait,
|
||||
timeout=timeout_seconds,
|
||||
run_with=run_with,
|
||||
)
|
||||
timer.mark("sdk")
|
||||
except asyncio.TimeoutError:
|
||||
|
|
|
|||
|
|
@ -224,6 +224,7 @@ def workflow_run(
|
|||
max=3600,
|
||||
help="Max wait time in seconds when --wait is set.",
|
||||
),
|
||||
run_with: str | None = typer.Option(None, "--run-with", help="Execution mode (e.g., 'code' for cached script)."),
|
||||
json_output: bool = typer.Option(False, "--json", help="Output as JSON."),
|
||||
) -> None:
|
||||
"""Run a workflow."""
|
||||
|
|
@ -238,6 +239,7 @@ def workflow_run(
|
|||
proxy_location=proxy,
|
||||
wait=wait,
|
||||
timeout_seconds=timeout,
|
||||
run_with=run_with,
|
||||
)
|
||||
|
||||
_run_tool(_run, json_output=json_output, hint_on_exception="Check the workflow ID and run parameters.")
|
||||
|
|
|
|||
|
|
@ -427,12 +427,12 @@ class Settings(BaseSettings):
|
|||
in minutes.
|
||||
"""
|
||||
|
||||
DEBUG_SESSION_TIMEOUT_THRESHOLD_MINUTES: int = 5
|
||||
DEBUG_SESSION_TIMEOUT_THRESHOLD_MINUTES: int = 10
|
||||
"""
|
||||
If there are `DEBUG_SESSION_TIMEOUT_THRESHOLD_MINUTES` or more minutes left
|
||||
in the persistent browser session (`started_at` + `timeout_minutes`), then
|
||||
the `timeout_minutes` of the persistent browser session can be extended.
|
||||
Otherwise we'll consider the persistent browser session to be expired.
|
||||
Threshold for browser session timeout extension.
|
||||
- V1 (OSS): extends when remaining >= threshold, raises if below (expired).
|
||||
- V2 (cloud): extends when remaining <= threshold, no-ops if above (plenty of time).
|
||||
Set to 10 minutes so that a 5-minute renewal loop gets 2+ attempts before expiry.
|
||||
"""
|
||||
|
||||
ENCRYPTOR_AES_SECRET_KEY: str = "fillmein"
|
||||
|
|
|
|||
|
|
@ -573,6 +573,20 @@ async def delete_workflow_cache_key_value(
|
|||
if not deleted:
|
||||
raise HTTPException(status_code=404, detail="Cache key value not found")
|
||||
|
||||
# Clear in-memory cache so stale entries aren't served after deletion
|
||||
cache_cleared_count = workflow_script_service.clear_workflow_script_cache(
|
||||
organization_id=current_org.organization_id,
|
||||
workflow_permanent_id=workflow_permanent_id,
|
||||
)
|
||||
|
||||
LOG.info(
|
||||
"Deleted workflow cache key value",
|
||||
organization_id=current_org.organization_id,
|
||||
workflow_permanent_id=workflow_permanent_id,
|
||||
cache_key_value=cache_key_value,
|
||||
cache_cleared_count=cache_cleared_count,
|
||||
)
|
||||
|
||||
return {"message": "Cache key value deleted successfully"}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -123,9 +123,6 @@ CURRENT_DATE_FORMAT = "%Y-%m-%d"
|
|||
# Sentinel marker for native JSON type injection via | json filter.
|
||||
_JSON_TYPE_MARKER = "__SKYVERN_RAW_JSON__"
|
||||
|
||||
# Strong references to fire-and-forget asyncio tasks so they are not GC'd mid-execution.
|
||||
_background_tasks: set[asyncio.Task[Any]] = set()
|
||||
|
||||
|
||||
def _json_type_filter(value: Any) -> str:
|
||||
"""Jinja filter that marks a value for native JSON type injection.
|
||||
|
|
@ -6160,16 +6157,6 @@ class WorkflowTriggerBlock(Block):
|
|||
|
||||
resolved_workflow_permanent_id = self.workflow_permanent_id
|
||||
resolved_payload = self.payload
|
||||
# Browser session priority:
|
||||
# 1. Explicit browser_session_id configured on the block
|
||||
# 2. use_parent_browser_session → inherit parent's session
|
||||
# 3. Neither → None, child creates its own session
|
||||
if self.browser_session_id:
|
||||
resolved_browser_session_id = self.browser_session_id
|
||||
elif self.use_parent_browser_session and browser_session_id:
|
||||
resolved_browser_session_id = browser_session_id
|
||||
else:
|
||||
resolved_browser_session_id = None
|
||||
|
||||
# 2. Check recursion depth
|
||||
try:
|
||||
|
|
@ -6184,52 +6171,101 @@ class WorkflowTriggerBlock(Block):
|
|||
if not organization:
|
||||
return await _fail(f"Organization {organization_id} not found")
|
||||
|
||||
# 4. Create WorkflowRequestBody
|
||||
workflow_request = WorkflowRequestBody(
|
||||
data=resolved_payload,
|
||||
browser_session_id=resolved_browser_session_id,
|
||||
)
|
||||
# 4. Resolve browser session
|
||||
# Browser session priority:
|
||||
# 1. Explicit browser_session_id configured on the block
|
||||
# 2. use_parent_browser_session → inherit parent's session (persistent
|
||||
# or in-memory via self.pages[parent_workflow_run_id] lookup)
|
||||
# 3. Neither → for sync (wait_for_completion), create a fresh persistent
|
||||
# session; for async (fire-and-forget), let the child's Temporal worker
|
||||
# handle its own browser.
|
||||
created_fresh_session = False
|
||||
if self.browser_session_id:
|
||||
resolved_browser_session_id = self.browser_session_id
|
||||
elif self.use_parent_browser_session and browser_session_id:
|
||||
resolved_browser_session_id = browser_session_id
|
||||
elif self.use_parent_browser_session:
|
||||
# Parent uses an in-memory browser (no persistent session).
|
||||
# Pass None so the child inherits via the parent_workflow_run_id
|
||||
# lookup in get_or_create_for_workflow_run.
|
||||
resolved_browser_session_id = None
|
||||
elif self.wait_for_completion:
|
||||
# Sync mode: child runs inline in the same process, so it needs
|
||||
# its own persistent session to avoid sharing the parent's browser.
|
||||
parent_workflow_run = await app.DATABASE.get_workflow_run(workflow_run_id)
|
||||
proxy_location = parent_workflow_run.proxy_location if parent_workflow_run else None
|
||||
try:
|
||||
child_browser_session = await app.PERSISTENT_SESSIONS_MANAGER.create_session(
|
||||
organization_id=organization_id,
|
||||
proxy_location=proxy_location,
|
||||
timeout_minutes=30,
|
||||
)
|
||||
resolved_browser_session_id = child_browser_session.persistent_browser_session_id
|
||||
created_fresh_session = True
|
||||
LOG.info(
|
||||
"Created fresh browser session for triggered workflow",
|
||||
parent_workflow_run_id=workflow_run_id,
|
||||
child_browser_session_id=resolved_browser_session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
return await _fail(f"Failed to create browser session for triggered workflow: {str(e)}")
|
||||
else:
|
||||
# Async (fire-and-forget): the child runs in its own Temporal worker
|
||||
# and will create its own browser. No pre-creation needed.
|
||||
resolved_browser_session_id = None
|
||||
|
||||
# 5. Setup the workflow run (creates WR with unique ID + parameters)
|
||||
# Save the parent's skyvern_context because setup_workflow_run and
|
||||
# execute_workflow overwrite it with the child's values. We restore
|
||||
# it after the child finishes so subsequent parent blocks get correct
|
||||
# context (logs, observability, workflow_run_id, etc.).
|
||||
from skyvern.forge.sdk.core import skyvern_context # noqa: PLC0415
|
||||
|
||||
parent_context = skyvern_context.current()
|
||||
try:
|
||||
triggered_workflow_run = await app.WORKFLOW_SERVICE.setup_workflow_run(
|
||||
request_id=None,
|
||||
workflow_request=workflow_request,
|
||||
workflow_permanent_id=resolved_workflow_permanent_id,
|
||||
organization=organization,
|
||||
parent_workflow_run_id=workflow_run_id,
|
||||
)
|
||||
except Exception as e:
|
||||
error_msg = get_user_facing_exception_message(e)
|
||||
if parent_context:
|
||||
skyvern_context.set(parent_context)
|
||||
return await _fail(f"Failed to setup triggered workflow run: {error_msg}")
|
||||
|
||||
triggered_run_id = triggered_workflow_run.workflow_run_id
|
||||
|
||||
LOG.info(
|
||||
"Triggered workflow run",
|
||||
parent_workflow_run_id=workflow_run_id,
|
||||
triggered_workflow_run_id=triggered_run_id,
|
||||
triggered_workflow_permanent_id=resolved_workflow_permanent_id,
|
||||
wait_for_completion=self.wait_for_completion,
|
||||
)
|
||||
|
||||
# 6. Execute based on wait mode
|
||||
# 5. Execute based on wait mode
|
||||
output_data: dict[str, Any] = {}
|
||||
success = False
|
||||
if self.wait_for_completion:
|
||||
# Synchronous: setup + execute inline in the same process.
|
||||
workflow_request = WorkflowRequestBody(
|
||||
data=resolved_payload,
|
||||
browser_session_id=resolved_browser_session_id,
|
||||
)
|
||||
|
||||
# Save the parent's skyvern_context because setup_workflow_run and
|
||||
# execute_workflow overwrite it with the child's values. We restore
|
||||
# it after the child finishes so subsequent parent blocks get correct
|
||||
# context (logs, observability, workflow_run_id, etc.).
|
||||
from skyvern.forge.sdk.core import skyvern_context # noqa: PLC0415
|
||||
|
||||
parent_context = skyvern_context.current()
|
||||
try:
|
||||
triggered_workflow_run = await app.WORKFLOW_SERVICE.setup_workflow_run(
|
||||
request_id=None,
|
||||
workflow_request=workflow_request,
|
||||
workflow_permanent_id=resolved_workflow_permanent_id,
|
||||
organization=organization,
|
||||
parent_workflow_run_id=workflow_run_id,
|
||||
)
|
||||
except Exception as e:
|
||||
error_msg = get_user_facing_exception_message(e)
|
||||
if parent_context:
|
||||
skyvern_context.set(parent_context)
|
||||
if created_fresh_session and resolved_browser_session_id:
|
||||
try:
|
||||
await app.PERSISTENT_SESSIONS_MANAGER.close_session(
|
||||
organization_id, resolved_browser_session_id
|
||||
)
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Failed to close child browser session after setup failure",
|
||||
child_browser_session_id=resolved_browser_session_id,
|
||||
exc_info=True,
|
||||
)
|
||||
return await _fail(f"Failed to setup triggered workflow run: {error_msg}")
|
||||
|
||||
triggered_run_id = triggered_workflow_run.workflow_run_id
|
||||
|
||||
LOG.info(
|
||||
"Triggered workflow run (sync)",
|
||||
parent_workflow_run_id=workflow_run_id,
|
||||
triggered_workflow_run_id=triggered_run_id,
|
||||
triggered_workflow_permanent_id=resolved_workflow_permanent_id,
|
||||
)
|
||||
|
||||
try:
|
||||
# Pass api_key=None so the child workflow's webhook (if configured)
|
||||
# is intentionally skipped. Webhook dispatch requires the caller's
|
||||
# API key, which is not available in a block execution context.
|
||||
final_run = await app.WORKFLOW_SERVICE.execute_workflow(
|
||||
workflow_run_id=triggered_run_id,
|
||||
api_key=None,
|
||||
|
|
@ -6272,46 +6308,51 @@ class WorkflowTriggerBlock(Block):
|
|||
}
|
||||
success = False
|
||||
finally:
|
||||
# Restore parent context after child execution completes
|
||||
if parent_context:
|
||||
skyvern_context.set(parent_context)
|
||||
if created_fresh_session and resolved_browser_session_id:
|
||||
try:
|
||||
await app.PERSISTENT_SESSIONS_MANAGER.close_session(
|
||||
organization_id, resolved_browser_session_id
|
||||
)
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Failed to close child browser session",
|
||||
child_browser_session_id=resolved_browser_session_id,
|
||||
triggered_workflow_run_id=triggered_run_id,
|
||||
exc_info=True,
|
||||
)
|
||||
else:
|
||||
# Fire and forget: spawn the child workflow as a concurrent task.
|
||||
# We use asyncio.create_task instead of AsyncExecutorFactory because
|
||||
# block execution runs outside an HTTP request context, so there is
|
||||
# no FastAPI BackgroundTasks instance to schedule on.
|
||||
def _on_child_done(t: asyncio.Task[Any]) -> None:
|
||||
if t.cancelled():
|
||||
LOG.warning(
|
||||
"Async child workflow was cancelled",
|
||||
triggered_workflow_run_id=triggered_run_id,
|
||||
)
|
||||
return
|
||||
if exc := t.exception():
|
||||
LOG.error(
|
||||
"Async child workflow failed",
|
||||
triggered_workflow_run_id=triggered_run_id,
|
||||
exc_info=exc,
|
||||
)
|
||||
# Fire and forget: dispatch the child workflow via Temporal so it
|
||||
# gets its own independent worker process. This ensures the child
|
||||
# survives even if the parent workflow finishes first.
|
||||
# NOTE: This path requires Temporal (cloud). On self-hosted
|
||||
# (BackgroundTaskExecutor), the workflow run record is created but
|
||||
# execution is silently skipped because background_tasks=None.
|
||||
from skyvern.services.workflow_service import run_workflow # noqa: PLC0415
|
||||
|
||||
task = asyncio.create_task(
|
||||
app.WORKFLOW_SERVICE.execute_workflow(
|
||||
workflow_run_id=triggered_run_id,
|
||||
api_key=None,
|
||||
organization=organization,
|
||||
browser_session_id=resolved_browser_session_id,
|
||||
)
|
||||
workflow_request = WorkflowRequestBody(
|
||||
data=resolved_payload,
|
||||
browser_session_id=resolved_browser_session_id,
|
||||
)
|
||||
_background_tasks.add(task)
|
||||
task.add_done_callback(_background_tasks.discard)
|
||||
task.add_done_callback(_on_child_done)
|
||||
# Restore parent context AFTER create_task so the child task
|
||||
# inherits the child's context (ContextVar is copied at task
|
||||
# creation), but subsequent parent blocks see the parent's context.
|
||||
if parent_context:
|
||||
skyvern_context.set(parent_context)
|
||||
try:
|
||||
triggered_workflow_run = await run_workflow(
|
||||
workflow_id=resolved_workflow_permanent_id,
|
||||
organization=organization,
|
||||
workflow_request=workflow_request,
|
||||
request=None,
|
||||
background_tasks=None,
|
||||
parent_workflow_run_id=workflow_run_id,
|
||||
)
|
||||
except Exception as e:
|
||||
error_msg = get_user_facing_exception_message(e)
|
||||
return await _fail(f"Failed to dispatch triggered workflow: {error_msg}")
|
||||
|
||||
triggered_run_id = triggered_workflow_run.workflow_run_id
|
||||
|
||||
LOG.info(
|
||||
"Async workflow dispatch succeeded",
|
||||
"Async workflow dispatch succeeded (via Temporal)",
|
||||
parent_workflow_run_id=workflow_run_id,
|
||||
triggered_workflow_run_id=triggered_run_id,
|
||||
triggered_workflow_permanent_id=resolved_workflow_permanent_id,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ from skyvern.exceptions import (
|
|||
BlockNotFound,
|
||||
BrowserProfileNotFound,
|
||||
BrowserSessionNotFound,
|
||||
BrowserSessionNotRenewable,
|
||||
FailedToSendWebhook,
|
||||
InvalidCredentialId,
|
||||
MissingValueForParameter,
|
||||
|
|
@ -852,6 +853,7 @@ class WorkflowService:
|
|||
browser_session_id=browser_session_id,
|
||||
)
|
||||
|
||||
renewal_task: asyncio.Task[None] | None = None
|
||||
if browser_session_id:
|
||||
try:
|
||||
await app.PERSISTENT_SESSIONS_MANAGER.begin_session(
|
||||
|
|
@ -879,91 +881,146 @@ class WorkflowService:
|
|||
close_browser_on_completion=close_browser_on_completion,
|
||||
)
|
||||
return workflow_run
|
||||
|
||||
# Check if there's a related workflow script that should be used instead
|
||||
workflow_script, _ = await workflow_script_service.get_workflow_script(workflow, workflow_run, block_labels)
|
||||
current_context = skyvern_context.current()
|
||||
if current_context:
|
||||
if workflow_script:
|
||||
current_context.generate_script = False
|
||||
if workflow_run.code_gen:
|
||||
current_context.generate_script = True
|
||||
workflow_run, blocks_to_update = await self._execute_workflow_blocks(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
organization=organization,
|
||||
browser_session_id=browser_session_id,
|
||||
browser_profile_id=browser_profile_id,
|
||||
block_labels=block_labels,
|
||||
block_outputs=block_outputs,
|
||||
script=workflow_script,
|
||||
)
|
||||
|
||||
# Check if there's a finally block configured
|
||||
finally_block_label = workflow.workflow_definition.finally_block_label
|
||||
|
||||
if refreshed_workflow_run := await app.DATABASE.get_workflow_run(
|
||||
workflow_run_id=workflow_run_id,
|
||||
organization_id=organization_id,
|
||||
):
|
||||
workflow_run = refreshed_workflow_run
|
||||
|
||||
pre_finally_status = workflow_run.status
|
||||
pre_finally_failure_reason = workflow_run.failure_reason
|
||||
|
||||
if pre_finally_status not in (
|
||||
WorkflowRunStatus.canceled,
|
||||
WorkflowRunStatus.failed,
|
||||
WorkflowRunStatus.terminated,
|
||||
WorkflowRunStatus.timed_out,
|
||||
):
|
||||
await self.generate_script_if_needed(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
block_labels=block_labels,
|
||||
blocks_to_update=blocks_to_update,
|
||||
finalize=True, # Force regeneration to ensure field mappings have complete action data
|
||||
has_conditionals=has_conditionals,
|
||||
# Start background task to periodically renew the browser session
|
||||
renewal_task = asyncio.create_task(
|
||||
self._renew_browser_session_loop(browser_session_id, organization.organization_id),
|
||||
name=f"browser_session_renewal_{workflow_run_id}",
|
||||
)
|
||||
|
||||
# Execute finally block if configured. Skip for: canceled (user explicitly stopped)
|
||||
should_run_finally = finally_block_label and pre_finally_status != WorkflowRunStatus.canceled
|
||||
if should_run_finally:
|
||||
# Temporarily set to running for terminal workflows (for frontend UX)
|
||||
if pre_finally_status in (
|
||||
WorkflowRunStatus.failed,
|
||||
WorkflowRunStatus.terminated,
|
||||
WorkflowRunStatus.timed_out,
|
||||
):
|
||||
workflow_run = await self._update_workflow_run_status(
|
||||
workflow_run_id=workflow_run_id,
|
||||
status=WorkflowRunStatus.running,
|
||||
failure_reason=None,
|
||||
)
|
||||
await self._execute_finally_block_if_configured(
|
||||
try:
|
||||
# Check if there's a related workflow script that should be used instead
|
||||
workflow_script, _ = await workflow_script_service.get_workflow_script(workflow, workflow_run, block_labels)
|
||||
current_context = skyvern_context.current()
|
||||
if current_context:
|
||||
if workflow_script:
|
||||
current_context.generate_script = False
|
||||
if workflow_run.code_gen:
|
||||
current_context.generate_script = True
|
||||
workflow_run, blocks_to_update = await self._execute_workflow_blocks(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
organization=organization,
|
||||
browser_session_id=browser_session_id,
|
||||
browser_profile_id=browser_profile_id,
|
||||
block_labels=block_labels,
|
||||
block_outputs=block_outputs,
|
||||
script=workflow_script,
|
||||
)
|
||||
|
||||
workflow_run = await self._finalize_workflow_run_status(
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_run=workflow_run,
|
||||
pre_finally_status=pre_finally_status,
|
||||
pre_finally_failure_reason=pre_finally_failure_reason,
|
||||
)
|
||||
# Check if there's a finally block configured
|
||||
finally_block_label = workflow.workflow_definition.finally_block_label
|
||||
|
||||
await self.clean_up_workflow(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
api_key=api_key,
|
||||
browser_session_id=browser_session_id,
|
||||
close_browser_on_completion=close_browser_on_completion,
|
||||
)
|
||||
if refreshed_workflow_run := await app.DATABASE.get_workflow_run(
|
||||
workflow_run_id=workflow_run_id,
|
||||
organization_id=organization_id,
|
||||
):
|
||||
workflow_run = refreshed_workflow_run
|
||||
|
||||
pre_finally_status = workflow_run.status
|
||||
pre_finally_failure_reason = workflow_run.failure_reason
|
||||
|
||||
if pre_finally_status not in (
|
||||
WorkflowRunStatus.canceled,
|
||||
WorkflowRunStatus.failed,
|
||||
WorkflowRunStatus.terminated,
|
||||
WorkflowRunStatus.timed_out,
|
||||
):
|
||||
await self.generate_script_if_needed(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
block_labels=block_labels,
|
||||
blocks_to_update=blocks_to_update,
|
||||
finalize=True, # Force regeneration to ensure field mappings have complete action data
|
||||
has_conditionals=has_conditionals,
|
||||
)
|
||||
|
||||
# Execute finally block if configured. Skip for: canceled (user explicitly stopped)
|
||||
should_run_finally = finally_block_label and pre_finally_status != WorkflowRunStatus.canceled
|
||||
if should_run_finally:
|
||||
# Temporarily set to running for terminal workflows (for frontend UX)
|
||||
if pre_finally_status in (
|
||||
WorkflowRunStatus.failed,
|
||||
WorkflowRunStatus.terminated,
|
||||
WorkflowRunStatus.timed_out,
|
||||
):
|
||||
workflow_run = await self._update_workflow_run_status(
|
||||
workflow_run_id=workflow_run_id,
|
||||
status=WorkflowRunStatus.running,
|
||||
failure_reason=None,
|
||||
)
|
||||
await self._execute_finally_block_if_configured(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
organization=organization,
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
|
||||
workflow_run = await self._finalize_workflow_run_status(
|
||||
workflow_run_id=workflow_run_id,
|
||||
workflow_run=workflow_run,
|
||||
pre_finally_status=pre_finally_status,
|
||||
pre_finally_failure_reason=pre_finally_failure_reason,
|
||||
)
|
||||
finally:
|
||||
if renewal_task is not None and not renewal_task.done():
|
||||
renewal_task.cancel()
|
||||
try:
|
||||
await renewal_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
await self.clean_up_workflow(
|
||||
workflow=workflow,
|
||||
workflow_run=workflow_run,
|
||||
api_key=api_key,
|
||||
browser_session_id=browser_session_id,
|
||||
close_browser_on_completion=close_browser_on_completion,
|
||||
)
|
||||
|
||||
return workflow_run
|
||||
|
||||
async def _renew_browser_session_loop(self, browser_session_id: str, organization_id: str) -> None:
|
||||
"""Periodically renew a browser session to prevent timeout during long-running workflows."""
|
||||
max_renewal_seconds = 2 * 60 * 60 # 2 hours
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(300) # 5 minutes — ensures 2+ attempts within the 10-min renewal threshold
|
||||
elapsed = asyncio.get_event_loop().time() - start_time
|
||||
if elapsed >= max_renewal_seconds:
|
||||
LOG.info(
|
||||
"Browser session renewal loop reached 2-hour cap, stopping",
|
||||
browser_session_id=browser_session_id,
|
||||
organization_id=organization_id,
|
||||
elapsed_seconds=elapsed,
|
||||
)
|
||||
return
|
||||
await app.PERSISTENT_SESSIONS_MANAGER.renew_or_close_session(browser_session_id, organization_id)
|
||||
LOG.debug(
|
||||
"Browser session renewal check completed",
|
||||
browser_session_id=browser_session_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
LOG.info(
|
||||
"Browser session renewal loop cancelled",
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
return
|
||||
except BrowserSessionNotRenewable:
|
||||
LOG.warning(
|
||||
"Browser session is no longer renewable, stopping renewal loop",
|
||||
browser_session_id=browser_session_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
return
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
"Error renewing browser session, will retry",
|
||||
browser_session_id=browser_session_id,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
|
||||
async def _execute_workflow_blocks(
|
||||
self,
|
||||
workflow: Workflow,
|
||||
|
|
@ -2601,7 +2658,7 @@ class WorkflowService:
|
|||
browser_session = await app.PERSISTENT_SESSIONS_MANAGER.create_session(
|
||||
organization_id=organization_id,
|
||||
proxy_location=workflow_request.proxy_location,
|
||||
timeout_minutes=30, # 30 minutes default timeout for forced browser sessions
|
||||
timeout_minutes=60, # 60 minutes default timeout for forced browser sessions
|
||||
)
|
||||
browser_session_id = browser_session.persistent_browser_session_id
|
||||
LOG.info(
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ async def prepare_workflow(
|
|||
request_id: str | None = None,
|
||||
debug_session_id: str | None = None,
|
||||
code_gen: bool | None = None,
|
||||
parent_workflow_run_id: str | None = None,
|
||||
) -> WorkflowRun:
|
||||
"""
|
||||
Prepare a workflow to be run.
|
||||
|
|
@ -42,6 +43,7 @@ async def prepare_workflow(
|
|||
is_template_workflow=template,
|
||||
debug_session_id=debug_session_id,
|
||||
code_gen=code_gen,
|
||||
parent_workflow_run_id=parent_workflow_run_id,
|
||||
)
|
||||
|
||||
workflow = await app.WORKFLOW_SERVICE.get_workflow_by_permanent_id(
|
||||
|
|
@ -76,6 +78,7 @@ async def run_workflow(
|
|||
background_tasks: BackgroundTasks | None = None,
|
||||
block_labels: list[str] | None = None,
|
||||
block_outputs: dict[str, t.Any] | None = None,
|
||||
parent_workflow_run_id: str | None = None,
|
||||
) -> WorkflowRun:
|
||||
workflow_run = await prepare_workflow(
|
||||
workflow_id=workflow_id,
|
||||
|
|
@ -85,6 +88,7 @@ async def run_workflow(
|
|||
version=version,
|
||||
max_steps=max_steps,
|
||||
request_id=request_id,
|
||||
parent_workflow_run_id=parent_workflow_run_id,
|
||||
)
|
||||
|
||||
await AsyncExecutorFactory.get_executor().execute_workflow(
|
||||
|
|
|
|||
|
|
@ -152,18 +152,22 @@ class RealBrowserManager(BrowserManager):
|
|||
workflow_run_id = workflow_run.workflow_run_id
|
||||
if browser_profile_id is None:
|
||||
browser_profile_id = workflow_run.browser_profile_id
|
||||
browser_state = self.get_for_workflow_run(
|
||||
workflow_run_id=workflow_run_id, parent_workflow_run_id=parent_workflow_run_id
|
||||
)
|
||||
if browser_state:
|
||||
# always keep the browser state for the workflow run and the parent workflow run synced
|
||||
self.pages[workflow_run_id] = browser_state
|
||||
if parent_workflow_run_id:
|
||||
self.pages[parent_workflow_run_id] = browser_state
|
||||
return browser_state
|
||||
|
||||
# When an explicit browser_session_id is provided (e.g. from a workflow
|
||||
# trigger block), skip the parent workflow lookup so the child uses the
|
||||
# specified persistent session instead of inheriting the parent's browser.
|
||||
if not browser_session_id:
|
||||
browser_state = self.get_for_workflow_run(
|
||||
workflow_run_id=workflow_run_id, parent_workflow_run_id=parent_workflow_run_id
|
||||
)
|
||||
if browser_state:
|
||||
# always keep the browser state for the workflow run and the parent workflow run synced
|
||||
self.pages[workflow_run_id] = browser_state
|
||||
if parent_workflow_run_id:
|
||||
self.pages[parent_workflow_run_id] = browser_state
|
||||
return browser_state
|
||||
|
||||
if browser_session_id:
|
||||
# TODO: what if there's a parent workflow run?
|
||||
LOG.info(
|
||||
"Getting browser state for workflow run from persistent sessions manager",
|
||||
browser_session_id=browser_session_id,
|
||||
|
|
@ -206,7 +210,11 @@ class RealBrowserManager(BrowserManager):
|
|||
)
|
||||
|
||||
self.pages[workflow_run_id] = browser_state
|
||||
if parent_workflow_run_id:
|
||||
# Only sync the parent's entry when the child is sharing the parent's
|
||||
# browser. When an explicit browser_session_id is provided the child
|
||||
# has its own browser, and overwriting the parent's entry would break
|
||||
# subsequent parent blocks.
|
||||
if parent_workflow_run_id and not browser_session_id:
|
||||
self.pages[parent_workflow_run_id] = browser_state
|
||||
|
||||
# The URL here is only used when creating a new page, and not when using an existing page.
|
||||
|
|
|
|||
|
|
@ -403,6 +403,7 @@ class TestWorkflowCommands:
|
|||
proxy="RESIDENTIAL",
|
||||
wait=True,
|
||||
timeout=450,
|
||||
run_with=None,
|
||||
json_output=True,
|
||||
)
|
||||
|
||||
|
|
@ -414,6 +415,7 @@ class TestWorkflowCommands:
|
|||
"proxy_location": "RESIDENTIAL",
|
||||
"wait": True,
|
||||
"timeout_seconds": 450,
|
||||
"run_with": None,
|
||||
}
|
||||
parsed = json.loads(capsys.readouterr().out)
|
||||
assert parsed["ok"] is True
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue