🔄 synced local 'skyvern/' with remote 'skyvern/'

This commit is contained in:
Shuchang Zheng 2026-04-25 19:31:48 +00:00
parent e14d9af2e2
commit e3c75edae0
29 changed files with 930 additions and 51 deletions

View file

@ -11,7 +11,8 @@ from playwright.async_api import Page
from skyvern.config import settings
from skyvern.constants import SKYVERN_PAGE_MAX_SCRAPING_RETRIES, SPECIAL_FIELD_VERIFICATION_CODE
from skyvern.core.script_generations.skyvern_page_ai import SkyvernPageAi
from skyvern.core.script_generations.skyvern_page_ai import SYSTEM_PROMPT_UNSET, SkyvernPageAi
from skyvern.exceptions import WorkflowRunContextNotInitialized
from skyvern.forge import app
from skyvern.forge.prompts import prompt_engine
from skyvern.forge.sdk.api.files import validate_download_url
@ -903,6 +904,7 @@ class RealSkyvernPageAi(SkyvernPageAi):
data: str | dict[str, Any] | None = None,
skip_refresh: bool = False,
include_extracted_text: bool = True,
system_prompt: str | None | Any = SYSTEM_PROMPT_UNSET,
) -> dict[str, Any] | list | str | None:
"""Extract information from the page using AI."""
@ -915,6 +917,42 @@ class RealSkyvernPageAi(SkyvernPageAi):
prompt = _render_template_with_label(prompt, label=self.current_label)
local_datetime_str = datetime.now(tz_info).isoformat()
# Resolve the effective workflow_system_prompt for this run. Order:
# 1. Caller-passed value wins (including None — "block opted out,
# send no system prompt").
# 2. Block-recorded value from ``WorkflowRunContext``, populated by
# ``Block._apply_workflow_system_prompt`` in both the agent path
# (``format_potential_template_parameters``) and the script path
# (``_execute_single_block`` before ``exec``). Using the recorded
# value makes the block the single source of truth for the
# opt-out + resolved-string decision — script-path extractions
# hash to the same cache key and send the same LLM input the
# agent path would. A recorded ``None`` is a real opt-out, not a
# miss (SKY-9147).
# 3. Fall back to the run-wide effective prompt for non-block
# callers (standalone scripts, sdk routes, etc.) that never set
# ``current_label`` and never went through a Block.
workflow_system_prompt: str | None
if system_prompt is not SYSTEM_PROMPT_UNSET:
workflow_system_prompt = cast("str | None", system_prompt)
else:
workflow_system_prompt = None
workflow_run_context_for_prompt = None
if context and context.workflow_run_id:
try:
workflow_run_context_for_prompt = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(
context.workflow_run_id
)
except WorkflowRunContextNotInitialized:
workflow_run_context_for_prompt = None
if workflow_run_context_for_prompt is not None:
recorded, value = workflow_run_context_for_prompt.get_block_workflow_system_prompt(self.current_label)
if recorded:
workflow_system_prompt = value
else:
workflow_system_prompt = workflow_run_context_for_prompt.resolve_effective_workflow_system_prompt()
# Render the prompt FIRST so the cache key hashes the exact string
# that will be sent to the LLM (captures economy-tree swaps and 2/3
# truncation inside load_prompt_with_elements).
@ -981,6 +1019,7 @@ class RealSkyvernPageAi(SkyvernPageAi):
extracted_information_schema=post_ceiling_kwargs["extracted_information_schema"],
error_code_mapping=error_code_mapping_str,
llm_key=None,
workflow_system_prompt=workflow_system_prompt,
)
lookup_result = extraction_cache.lookup(workflow_run_id, cache_key)
except Exception:
@ -1042,6 +1081,7 @@ class RealSkyvernPageAi(SkyvernPageAi):
screenshots=self.scraped_page.screenshots,
prompt_name="extract-information",
force_dict=False,
system_prompt=workflow_system_prompt,
)
# Validate and fill missing fields based on schema

View file

@ -1323,6 +1323,9 @@ class SkyvernPage(Page):
"""
data = kwargs.pop("data", None)
skip_refresh = kwargs.pop("skip_refresh", False)
extra_kwargs: dict[str, Any] = {}
if "system_prompt" in kwargs:
extra_kwargs["system_prompt"] = kwargs.pop("system_prompt")
return await self._ai.ai_extract(
prompt=prompt,
schema=schema,
@ -1330,6 +1333,7 @@ class SkyvernPage(Page):
intention=intention,
data=data,
skip_refresh=skip_refresh,
**extra_kwargs,
)
async def validate(

View file

@ -4,6 +4,12 @@ from typing import Any, Protocol
from skyvern.config import settings
# Sentinel for the optional ``system_prompt`` parameter on ``ai_extract``.
# Distinguishes "caller omitted the argument" (resolve from workflow context,
# honoring the current block's ``ignore_workflow_system_prompt`` flag) from
# "caller passed None" (opt out, send no system prompt).
SYSTEM_PROMPT_UNSET: Any = object()
class SkyvernPageAi(Protocol):
"""Protocol defining the interface for AI-powered page interactions."""
@ -67,6 +73,7 @@ class SkyvernPageAi(Protocol):
data: str | dict[str, Any] | None = None,
skip_refresh: bool = False,
include_extracted_text: bool = True,
system_prompt: str | None | Any = SYSTEM_PROMPT_UNSET,
) -> dict[str, Any] | list | str | None:
"""Extract information from the page using AI."""
...

View file

@ -393,6 +393,7 @@ class ForgeAgent:
retry=task_retry,
max_steps_per_run=task_block.max_steps_per_run,
error_code_mapping=task_block.error_code_mapping,
workflow_system_prompt=task_block.workflow_system_prompt,
include_action_history_in_verification=task_block.include_action_history_in_verification,
model=task_block.model,
max_screenshot_scrolling_times=workflow_run.max_screenshot_scrolls,
@ -462,6 +463,7 @@ class ForgeAgent:
proxy_location=task_request.proxy_location,
extracted_information_schema=task_request.extracted_information_schema,
error_code_mapping=task_request.error_code_mapping,
workflow_system_prompt=task_request.workflow_system_prompt,
application=task_request.application,
include_action_history_in_verification=task_request.include_action_history_in_verification,
model=task_request.model,
@ -1311,6 +1313,7 @@ class ForgeAgent:
prompt_name=prompt_name,
step=step,
screenshots=scraped_page.screenshots,
system_prompt=task.workflow_system_prompt,
)
else:
LOG.debug(
@ -1917,6 +1920,7 @@ class ForgeAgent:
prompt_name="cua-answer-question",
step=step,
screenshots=scraped_page.screenshots,
system_prompt=task.workflow_system_prompt,
)
LOG.info("Skyvern response to CUA question", skyvern_response=skyvern_response)
resp_content = skyvern_response.get("answer")
@ -2172,6 +2176,7 @@ class ForgeAgent:
prompt_name=prompt_name,
step=next_step,
screenshots=scraped_page.screenshots,
system_prompt=task.workflow_system_prompt,
)
LOG.info(
@ -2499,6 +2504,7 @@ class ForgeAgent:
step=step,
screenshots=scraped_page_refreshed.screenshots,
prompt_name=prompt_name,
system_prompt=task.workflow_system_prompt,
)
result = CompleteVerifyResult.model_validate(verification_result)
if result.is_complete:
@ -3286,7 +3292,9 @@ class ForgeAgent:
llm_api_handler = LLMAPIHandlerFactory.get_override_llm_api_handler(
task.llm_key, default=app.LLM_API_HANDLER
)
json_response = await llm_api_handler(prompt=prompt, step=step, prompt_name="infer-action-type")
json_response = await llm_api_handler(
prompt=prompt, step=step, prompt_name="infer-action-type", system_prompt=task.workflow_system_prompt
)
if json_response.get("error"):
raise FailedToParseActionInstruction(
reason=json_response.get("thought"), error_type=json_response.get("error")
@ -4736,7 +4744,11 @@ class ForgeAgent:
local_datetime=datetime.now(skyvern_context.ensure_context().tz_info).isoformat(),
)
json_response = await app.LLM_API_HANDLER(
prompt=prompt, screenshots=screenshots, step=step, prompt_name="summarize-max-steps-reason"
prompt=prompt,
screenshots=screenshots,
step=step,
prompt_name="summarize-max-steps-reason",
system_prompt=task.workflow_system_prompt,
)
return MaxStepsReasonResponse.model_validate(json_response)
except Exception:
@ -4879,6 +4891,7 @@ class ForgeAgent:
screenshots=screenshots,
step=step,
prompt_name="summarize-max-retries-reason",
system_prompt=task.workflow_system_prompt,
)
return MaxStepsReasonResponse.model_validate(json_response)
except Exception:
@ -5256,6 +5269,7 @@ class ForgeAgent:
step=step,
screenshots=scraped_page.screenshots,
prompt_name=prompt_name,
system_prompt=task.workflow_system_prompt,
)
@staticmethod
@ -5323,6 +5337,7 @@ class ForgeAgent:
data_extraction_goal=task.data_extraction_goal,
extracted_information_schema=post_ceiling_kwargs["data_extraction_schema"],
llm_key=None,
workflow_system_prompt=task.workflow_system_prompt,
)
lookup_result = extraction_cache.lookup(workflow_run_id, cache_key)
except Exception:
@ -5375,7 +5390,10 @@ class ForgeAgent:
cache_path="agent",
)
data_extraction_summary_resp = await app.EXTRACTION_LLM_API_HANDLER(
prompt=prompt, step=step, prompt_name="data-extraction-summary"
prompt=prompt,
step=step,
prompt_name="data-extraction-summary",
system_prompt=task.workflow_system_prompt,
)
if cache_key and isinstance(data_extraction_summary_resp, dict):
extraction_cache.store(workflow_run_id, cache_key, data_extraction_summary_resp)

View file

@ -272,6 +272,56 @@ class ArtifactManager:
path=path,
)
async def create_download_artifact(
self,
*,
organization_id: str,
run_id: str,
uri: str,
filename: str,
workflow_run_id: str | None = None,
checksum: str | None = None,
) -> str:
"""Register a downloaded file as an Artifact row without re-uploading.
The bytes already live at ``uri`` (the uploads bucket). We only record a
row so the file can be served through the signed ``/v1/artifacts/{id}/content``
endpoint.
"""
# Idempotent on (run_id, uri): if a DOWNLOAD artifact already exists for the
# same physical file (e.g. a loop iteration re-uploads the same download dir),
# return the existing artifact_id so signed URLs stay stable across calls —
# otherwise ``loop_download_filter.to_downloaded_file_signature`` would treat
# every iteration's URL as new.
existing = await app.DATABASE.artifacts.find_download_artifact(
organization_id=organization_id,
run_id=run_id,
uri=uri,
)
if existing is not None:
return existing.artifact_id
artifact_id = generate_artifact_id()
context = skyvern_context.current()
if workflow_run_id is None and context is not None:
workflow_run_id = context.workflow_run_id
await app.DATABASE.artifacts.create_artifact(
artifact_id=artifact_id,
artifact_type=ArtifactType.DOWNLOAD,
uri=uri,
organization_id=organization_id,
run_id=run_id,
workflow_run_id=workflow_run_id,
checksum=checksum,
)
LOG.debug(
"Registered downloaded file as artifact",
artifact_id=artifact_id,
run_id=run_id,
filename=filename,
)
return artifact_id
async def create_thought_artifact(
self,
thought: Thought,
@ -836,6 +886,25 @@ class ArtifactManager:
async def retrieve_artifact(self, artifact: Artifact) -> bytes | None:
return await app.STORAGE.retrieve_artifact(artifact)
def build_signed_content_url(
self,
artifact_id: str,
artifact_name: str | None = None,
artifact_type: str | None = None,
) -> str:
"""Return a signed ``/v1/artifacts/{id}/content`` URL for any artifact.
Non-bundled artifacts normally get a presigned S3 URL from
``STORAGE.get_share_link``. This method always builds the Skyvern-origin
signed URL regardless of ``bundle_key`` used for DOWNLOAD artifacts
so webhook payloads stay short and clients hit our origin.
"""
return self._bundle_content_url(
artifact_id=artifact_id,
artifact_name=artifact_name,
artifact_type=artifact_type,
)
def _bundle_content_url(
self,
artifact_id: str,

View file

@ -61,6 +61,9 @@ class ArtifactType(StrEnum):
# Task archive: one ZIP per task containing task-level cleanup artifacts (HAR, console log, trace, final screenshot)
TASK_ARCHIVE = "task_archive"
# Files downloaded by the browser during a run (stored in the uploads bucket, not the artifacts bucket).
DOWNLOAD = "download"
class Artifact(BaseModel):
created_at: datetime = Field(
@ -82,6 +85,7 @@ class Artifact(BaseModel):
artifact_type: ArtifactType
uri: str
bundle_key: str | None = None
checksum: str | None = None
task_id: str | None = None
step_id: str | None = None
workflow_run_id: str | None = None

View file

@ -8,6 +8,7 @@ import structlog
from skyvern.config import settings
from skyvern.constants import BROWSER_DOWNLOADING_SUFFIX, DOWNLOAD_FILE_PREFIX
from skyvern.forge import app
from skyvern.forge.sdk.api.azure import AzureUri, StandardBlobTier
from skyvern.forge.sdk.api.files import (
calculate_sha256_for_file,
@ -22,6 +23,7 @@ from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityT
from skyvern.forge.sdk.artifact.storage.base import (
FILE_EXTENTSION_MAP,
BaseStorage,
_file_infos_from_download_artifacts,
)
from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion
@ -405,16 +407,84 @@ class AzureStorage(BaseStorage):
organization_id=organization_id,
storage_tier=tier,
)
# Upload file with checksum metadata
await self.async_client.upload_file_from_path(
uri=uri,
file_path=fpath,
metadata={"sha256_checksum": checksum, "original_filename": file},
tier=tier,
tags=tags,
)
# Azure Blob metadata values must be ASCII; preserve the full
# filename via the blob path / Artifact URI instead.
metadata: dict[str, str] = {"sha256_checksum": checksum}
if file.isascii():
metadata["original_filename"] = file
# Catch upload failures so we never create an Artifact row for
# bytes that didn't actually land in storage.
try:
await self.async_client.upload_file_from_path(
uri=uri,
file_path=fpath,
metadata=metadata,
tier=tier,
tags=tags,
)
except Exception:
LOG.warning(
"Skipping downloaded file — Azure upload failed",
file=file,
organization_id=organization_id,
run_id=run_id,
exc_info=True,
)
continue
# Register the file as an Artifact so GET run output can serve it via
# the signed /v1/artifacts/{id}/content endpoint (SKY-8861). Persist
# the SHA-256 we already computed so retrieval doesn't need an
# extra blob HEAD per file.
if run_id is not None:
try:
await app.ARTIFACT_MANAGER.create_download_artifact(
organization_id=organization_id,
run_id=run_id,
uri=uri,
filename=file,
checksum=checksum,
)
except Exception:
LOG.warning(
"Failed to register downloaded file as artifact; falling back to SAS URLs for retrieval",
file=file,
organization_id=organization_id,
run_id=run_id,
exc_info=True,
)
async def get_downloaded_files(self, organization_id: str, run_id: str | None) -> list[FileInfo]:
# Artifact-first — see s3.py::get_downloaded_files for rationale. When
# the keyring isn't configured (OSS default) or no artifact rows exist
# (legacy run pre-SKY-8861) we fall back to the legacy listing path so
# downloaded files remain reachable.
if run_id is not None and settings.ARTIFACT_CONTENT_HMAC_KEYRING:
artifacts = await self._list_download_artifacts_safe(organization_id=organization_id, run_id=run_id)
if artifacts:
return _file_infos_from_download_artifacts(artifacts)
return await self._get_downloaded_files_via_blob_listing(organization_id=organization_id, run_id=run_id)
async def _list_download_artifacts_safe(self, *, organization_id: str, run_id: str) -> list[Artifact]:
try:
return await app.DATABASE.artifacts.list_artifacts_for_run_by_type(
run_id=run_id,
organization_id=organization_id,
artifact_type=ArtifactType.DOWNLOAD,
)
except Exception:
LOG.warning(
"Failed to look up download artifacts; falling back to SAS URLs",
organization_id=organization_id,
run_id=run_id,
exc_info=True,
)
return []
async def _get_downloaded_files_via_blob_listing(
self, *, organization_id: str, run_id: str | None
) -> list[FileInfo]:
uri = f"azure://{settings.AZURE_STORAGE_CONTAINER_UPLOADS}/{DOWNLOAD_FILE_PREFIX}/{settings.ENV}/{organization_id}/{run_id}"
object_keys = await self.async_client.list_files(uri=uri)
if len(object_keys) == 0:
@ -424,24 +494,22 @@ class AzureStorage(BaseStorage):
for key in object_keys:
object_uri = f"azure://{settings.AZURE_STORAGE_CONTAINER_UPLOADS}/{key}"
# Get metadata (including checksum)
metadata = await self.async_client.get_file_metadata(object_uri, log_exception=False)
# Create FileInfo object
filename = os.path.basename(key)
checksum = metadata.get("sha256_checksum") if metadata else None
display_name = metadata.get("original_filename", filename) if metadata else filename
# Get SAS URL
sas_urls = await self.async_client.create_sas_urls([object_uri])
if not sas_urls:
continue
file_info = FileInfo(
url=sas_urls[0],
checksum=checksum,
filename=metadata.get("original_filename", filename) if metadata else filename,
file_infos.append(
FileInfo(
url=sas_urls[0],
checksum=checksum,
filename=display_name,
)
)
file_infos.append(file_info)
return file_infos

View file

@ -1,6 +1,7 @@
from abc import ABC, abstractmethod
from typing import BinaryIO
from skyvern.forge import app
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityType
from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion
@ -8,6 +9,33 @@ from skyvern.forge.sdk.schemas.files import FileInfo
from skyvern.forge.sdk.schemas.task_v2 import TaskV2, Thought
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock
def _file_infos_from_download_artifacts(artifacts: list[Artifact]) -> list[FileInfo]:
"""Build the API-shaped ``FileInfo`` list from DOWNLOAD artifact rows.
Filename is the URI basename (the save site writes ``{base_uri}/{file}``);
checksum and modified_at come straight from the row, so retrieval needs
zero S3 round-trips.
"""
infos: list[FileInfo] = []
for artifact in artifacts:
filename = artifact.uri.rsplit("/", 1)[-1] if artifact.uri else ""
url = app.ARTIFACT_MANAGER.build_signed_content_url(
artifact_id=artifact.artifact_id,
artifact_name=filename,
artifact_type=ArtifactType.DOWNLOAD.value,
)
infos.append(
FileInfo(
url=url,
checksum=artifact.checksum,
filename=filename,
modified_at=artifact.created_at,
)
)
return infos
# TODO: This should be a part of the ArtifactType model
FILE_EXTENTSION_MAP: dict[ArtifactType, str] = {
ArtifactType.RECORDING: "webm",

View file

@ -11,6 +11,7 @@ import zstandard as zstd
from skyvern.config import settings
from skyvern.constants import BROWSER_DOWNLOADING_SUFFIX, DOWNLOAD_FILE_PREFIX
from skyvern.forge import app
from skyvern.forge.sdk.api.aws import AsyncAWSClient, S3StorageClass, S3Uri
from skyvern.forge.sdk.api.files import (
calculate_sha256_for_file,
@ -24,6 +25,7 @@ from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityT
from skyvern.forge.sdk.artifact.storage.base import (
FILE_EXTENTSION_MAP,
BaseStorage,
_file_infos_from_download_artifacts,
)
from skyvern.forge.sdk.models import Step
from skyvern.forge.sdk.schemas.ai_suggestions import AISuggestion
@ -432,15 +434,91 @@ class S3Storage(BaseStorage):
organization_id=organization_id,
storage_class=storage_class,
)
# Upload file with checksum metadata
await self.async_client.upload_file_from_path(
uri=uri,
file_path=fpath,
metadata={"sha256_checksum": checksum, "original_filename": file},
storage_class=storage_class,
)
# S3 object metadata only allows ASCII; non-ASCII filenames (CJK,
# emoji) would otherwise raise ParamValidationError at upload time.
# The full filename is still preserved in the S3 key and on the
# Artifact row's URI.
metadata: dict[str, str] = {"sha256_checksum": checksum}
if file.isascii():
metadata["original_filename"] = file
# Upload with raise_exception=True so a partial failure aborts
# this iteration and we never create an Artifact row for bytes
# that didn't actually land in S3.
try:
await self.async_client.upload_file_from_path(
uri=uri,
file_path=fpath,
metadata=metadata,
storage_class=storage_class,
raise_exception=True,
)
except Exception:
LOG.warning(
"Skipping downloaded file — S3 upload failed",
file=file,
organization_id=organization_id,
run_id=run_id,
exc_info=True,
)
continue
# Register the file as an Artifact so GET run output can serve it via
# the signed /v1/artifacts/{id}/content endpoint (SKY-8861). Persist
# the SHA-256 we already computed so retrieval doesn't need an
# extra S3 HEAD per file.
if run_id is not None:
try:
await app.ARTIFACT_MANAGER.create_download_artifact(
organization_id=organization_id,
run_id=run_id,
uri=uri,
filename=file,
checksum=checksum,
)
except Exception:
LOG.warning(
"Failed to register downloaded file as artifact; falling back to S3 listing for retrieval",
file=file,
organization_id=organization_id,
run_id=run_id,
exc_info=True,
)
async def get_downloaded_files(self, organization_id: str, run_id: str | None) -> list[FileInfo]:
# Artifact-first: when a run has DOWNLOAD artifact rows, return them as
# the source of truth — the row carries enough to build a short signed
# /v1/artifacts/{id}/content URL plus the SHA-256 we persisted at save
# time, so we skip the S3 LIST and per-file HEAD entirely (SKY-8861).
#
# If HMAC signing isn't configured (self-hosted OSS default), the signed
# endpoint requires an API key webhook consumers don't have, so we stay
# on the legacy S3-list+presign path even when rows exist.
if run_id is not None and settings.ARTIFACT_CONTENT_HMAC_KEYRING:
artifacts = await self._list_download_artifacts_safe(organization_id=organization_id, run_id=run_id)
if artifacts:
return _file_infos_from_download_artifacts(artifacts)
# Legacy fallback — runs predating SKY-8861 (no artifact rows) and
# OSS-default deployments without HMAC signing both arrive here.
return await self._get_downloaded_files_via_s3_listing(organization_id=organization_id, run_id=run_id)
async def _list_download_artifacts_safe(self, *, organization_id: str, run_id: str) -> list[Artifact]:
try:
return await app.DATABASE.artifacts.list_artifacts_for_run_by_type(
run_id=run_id,
organization_id=organization_id,
artifact_type=ArtifactType.DOWNLOAD,
)
except Exception:
LOG.warning(
"Failed to look up download artifacts; falling back to presigned S3 URLs",
organization_id=organization_id,
run_id=run_id,
exc_info=True,
)
return []
async def _get_downloaded_files_via_s3_listing(self, *, organization_id: str, run_id: str | None) -> list[FileInfo]:
bucket = settings.AWS_S3_BUCKET_UPLOADS
uri = f"s3://{bucket}/{DOWNLOAD_FILE_PREFIX}/{settings.ENV}/{organization_id}/{run_id}"
object_keys = await self.async_client.list_files(uri=uri)
@ -451,25 +529,22 @@ class S3Storage(BaseStorage):
for key in object_keys:
object_uri = f"s3://{bucket}/{key}"
# Get metadata (including checksum)
metadata = await self.async_client.get_file_metadata(object_uri, log_exception=False)
# Create FileInfo object
filename = os.path.basename(key)
checksum = metadata.get("sha256_checksum") if metadata else None
display_name = metadata.get("original_filename", filename) if metadata else filename
# Get presigned URL
presigned_urls = await self.async_client.create_presigned_urls([object_uri])
if not presigned_urls:
continue
file_info = FileInfo(
url=presigned_urls[0],
checksum=checksum,
filename=metadata.get("original_filename", filename) if metadata else filename,
file_infos.append(
FileInfo(
url=presigned_urls[0],
checksum=checksum,
filename=display_name,
)
)
file_infos.append(file_info)
return file_infos
async def save_legacy_file(

View file

@ -40,6 +40,10 @@ Key derivation (shared with the cross-run tier):
correctness if an intra-task second-step extraction happens.
- llm_key the caller's model override. Prevents stale hits when a user
changes models to retune quality.
- workflow_system_prompt the workflow's workflow_system_prompt (or None).
The prompt is sent to the LLM as the `system` message; changing it
changes the output even if all user-prompt inputs are identical, so two
calls that differ only in workflow_system_prompt must not collide.
- Date is intentionally NOT in the key. Two calls on byte-identical page
content are semantically the same extraction regardless of wall-clock
date; relying on the content hash keeps hit rate up for scheduled
@ -353,6 +357,7 @@ def compute_cache_key(
error_code_mapping: Any = None,
previous_extracted_information: Any = None,
llm_key: str | None = None,
workflow_system_prompt: str | None = None,
) -> str:
"""Return a stable sha256 hex digest for the inputs that affect extraction output.
@ -391,6 +396,7 @@ def compute_cache_key(
_normalize(error_code_mapping),
_normalize(previous_extracted_information),
_s(llm_key),
_s(workflow_system_prompt),
]
joined = "\x1f".join(parts).encode("utf-8", errors="replace")
return hashlib.sha256(joined).hexdigest()

View file

@ -103,6 +103,7 @@ class TaskModel(Base):
order = Column(Integer, nullable=True)
retry = Column(Integer, nullable=True)
error_code_mapping = Column(JSON, nullable=True)
workflow_system_prompt = Column(UnicodeText, nullable=True)
errors = Column(JSON, default=[], nullable=False)
max_steps_per_run = Column(Integer, nullable=True)
application = Column(String, nullable=True)
@ -248,6 +249,7 @@ class ArtifactModel(Base):
uri = Column(String)
bundle_key = Column(String, nullable=True)
run_id = Column(String, nullable=True)
checksum = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
modified_at = Column(
DateTime,
@ -426,6 +428,14 @@ class WorkflowRunModel(Base):
verification_code_identifier = Column(String, nullable=True)
verification_code_polling_started_at = Column(DateTime, nullable=True)
failure_category = Column(JSON, nullable=True)
# When True, this run was spawned by a WorkflowTriggerBlock whose
# ignore_workflow_system_prompt flag was set, and the child must not
# inherit the parent chain's workflow_system_prompt. Set at spawn time so
# async (Temporal-dispatched) child runs can honor the flag even though
# they start in a separate worker without in-process context.
ignore_inherited_workflow_system_prompt = Column(
Boolean, nullable=False, default=False, server_default=sqlalchemy.false()
)
queued_at = Column(DateTime, nullable=True)
started_at = Column(DateTime, nullable=True)
@ -864,6 +874,7 @@ class TaskV2Model(Base):
proxy_location = Column(String, nullable=True)
extracted_information_schema = Column(JSON, nullable=True)
error_code_mapping = Column(JSON, nullable=True)
workflow_system_prompt = Column(UnicodeText, nullable=True)
max_steps = Column(Integer, nullable=True)
max_screenshot_scrolling_times = Column(Integer, nullable=True)
extra_http_headers = Column(JSON, nullable=True)

View file

@ -47,6 +47,7 @@ class ArtifactsRepository(BaseRepository):
run_id: str | None = None,
thought_id: str | None = None,
ai_suggestion_id: str | None = None,
checksum: str | None = None,
) -> Artifact:
async with self.Session() as session:
new_artifact = ArtifactModel(
@ -62,6 +63,7 @@ class ArtifactsRepository(BaseRepository):
run_id=run_id,
ai_suggestion_id=ai_suggestion_id,
organization_id=organization_id,
checksum=checksum,
)
session.add(new_artifact)
await session.commit()
@ -357,6 +359,60 @@ class ArtifactsRepository(BaseRepository):
return convert_to_artifact(artifact, self.debug_enabled)
return None
@db_operation("find_download_artifact")
async def find_download_artifact(
self,
organization_id: str,
run_id: str,
uri: str,
) -> Artifact | None:
"""Return the existing DOWNLOAD artifact for ``(run_id, uri)`` if any.
Used by :meth:`ArtifactManager.create_download_artifact` to stay
idempotent: repeated saves of the same file in the same run (e.g.
within a loop block iteration) must reuse the existing artifact_id
so downstream URL-based dedup keeps seeing a stable URL.
"""
async with self.Session() as session:
artifact = (
await session.scalars(
select(ArtifactModel)
.filter(ArtifactModel.run_id == run_id)
.filter(ArtifactModel.artifact_type == ArtifactType.DOWNLOAD)
.filter(ArtifactModel.organization_id == organization_id)
.filter(ArtifactModel.uri == uri)
.order_by(ArtifactModel.created_at.desc())
)
).first()
if artifact:
return convert_to_artifact(artifact, self.debug_enabled)
return None
@db_operation("list_artifacts_for_run_by_type")
async def list_artifacts_for_run_by_type(
self,
run_id: str,
organization_id: str,
artifact_type: ArtifactType,
) -> list[Artifact]:
"""List all artifacts for a run filtered by type, using the dedicated ``run_id`` column.
Unlike :meth:`get_artifacts_for_run` this does not consult a ``RunReader``
it filters directly on the partial index ``ix_artifacts_run_id_partial`` and
returns the rows ordered by creation time.
"""
async with self.Session() as session:
artifacts = (
await session.scalars(
select(ArtifactModel)
.filter(ArtifactModel.run_id == run_id)
.filter(ArtifactModel.artifact_type == artifact_type)
.filter(ArtifactModel.organization_id == organization_id)
.order_by(ArtifactModel.created_at)
)
).all()
return [convert_to_artifact(a, self.debug_enabled) for a in artifacts]
@db_operation("get_artifact_for_run")
async def get_artifact_for_run(
self,

View file

@ -28,6 +28,7 @@ from skyvern.forge.sdk.db.utils import (
)
from skyvern.forge.sdk.schemas.task_v2 import TaskV2, TaskV2Status, Thought, ThoughtType
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock
from skyvern.forge.sdk.utils.sanitization import sanitize_postgres_text
from skyvern.schemas.runs import ProxyLocationInput, RunEngine, ScriptRunResponse
from skyvern.schemas.workflows import BlockStatus, BlockType
@ -196,12 +197,15 @@ class ObserverRepository(BaseRepository):
webhook_callback_url: str | None = None,
extracted_information_schema: dict | list | str | None = None,
error_code_mapping: dict | None = None,
workflow_system_prompt: str | None = None,
model: dict[str, Any] | None = None,
max_screenshot_scrolling_times: int | None = None,
extra_http_headers: dict[str, str] | None = None,
browser_address: str | None = None,
run_with: str | None = None,
) -> TaskV2:
if isinstance(workflow_system_prompt, str):
workflow_system_prompt = sanitize_postgres_text(workflow_system_prompt)
async with self.Session() as session:
new_task_v2 = TaskV2Model(
workflow_run_id=workflow_run_id,
@ -215,6 +219,7 @@ class ObserverRepository(BaseRepository):
webhook_callback_url=webhook_callback_url,
extracted_information_schema=extracted_information_schema,
error_code_mapping=error_code_mapping,
workflow_system_prompt=workflow_system_prompt,
organization_id=organization_id,
model=model,
max_screenshot_scrolling_times=max_screenshot_scrolling_times,

View file

@ -57,6 +57,7 @@ class TasksRepository(BaseRepository):
retry: int | None = None,
max_steps_per_run: int | None = None,
error_code_mapping: dict[str, str] | None = None,
workflow_system_prompt: str | None = None,
task_type: str = TaskType.general,
application: str | None = None,
include_action_history_in_verification: bool | None = None,
@ -79,6 +80,7 @@ class TasksRepository(BaseRepository):
url = sanitize_postgres_text(url)
complete_criterion = _sanitize(complete_criterion)
terminate_criterion = _sanitize(terminate_criterion)
workflow_system_prompt = _sanitize(workflow_system_prompt)
async with self.Session() as session:
new_task = TaskModel(
@ -102,6 +104,7 @@ class TasksRepository(BaseRepository):
retry=retry,
max_steps_per_run=max_steps_per_run,
error_code_mapping=error_code_mapping,
workflow_system_prompt=workflow_system_prompt,
application=application,
include_action_history_in_verification=include_action_history_in_verification,
model=model,

View file

@ -163,6 +163,7 @@ class WorkflowRunsRepository(BaseRepository):
workflow_run_id: str | None = None,
trigger_type: WorkflowRunTriggerType | None = None,
workflow_schedule_id: str | None = None,
ignore_inherited_workflow_system_prompt: bool = False,
) -> WorkflowRun:
async with self.Session() as session:
kwargs: dict[str, Any] = {}
@ -190,6 +191,7 @@ class WorkflowRunsRepository(BaseRepository):
code_gen=code_gen,
trigger_type=trigger_type.value if trigger_type else None,
workflow_schedule_id=workflow_schedule_id,
ignore_inherited_workflow_system_prompt=ignore_inherited_workflow_system_prompt,
**kwargs,
)
session.add(workflow_run)

View file

@ -245,6 +245,7 @@ def convert_to_task(task_obj: TaskModel, debug_enabled: bool = False, workflow_p
retry=task_obj.retry,
max_steps_per_run=task_obj.max_steps_per_run,
error_code_mapping=task_obj.error_code_mapping,
workflow_system_prompt=task_obj.workflow_system_prompt,
errors=task_obj.errors,
application=task_obj.application,
model=task_obj.model,
@ -372,6 +373,7 @@ def convert_to_artifact(artifact_model: ArtifactModel, debug_enabled: bool = Fal
artifact_type=ArtifactType[artifact_model.artifact_type.upper()],
uri=artifact_model.uri,
bundle_key=artifact_model.bundle_key,
checksum=artifact_model.checksum,
task_id=artifact_model.task_id,
step_id=artifact_model.step_id,
workflow_run_id=artifact_model.workflow_run_id,
@ -488,6 +490,7 @@ def convert_to_workflow_run(
trigger_type=_safe_trigger_type(workflow_run_model.trigger_type),
workflow_schedule_id=workflow_run_model.workflow_schedule_id,
failure_category=workflow_run_model.failure_category,
ignore_inherited_workflow_system_prompt=workflow_run_model.ignore_inherited_workflow_system_prompt,
)

View file

@ -1,7 +1,9 @@
import asyncio
import json
import unicodedata
from enum import Enum
from typing import Annotated, Any
from urllib.parse import quote
import structlog
import yaml
@ -1459,10 +1461,106 @@ _ARTIFACT_CONTENT_TYPES: dict[ArtifactType, str] = {
ArtifactType.SCREENSHOT_ACTION: "image/png",
ArtifactType.SCREENSHOT_FINAL: "image/png",
ArtifactType.RECORDING: "video/webm",
ArtifactType.DOWNLOAD: "application/octet-stream",
}
_ARTIFACT_CONTENT_TYPE_DEFAULT = "application/json"
def _sanitize_header_filename(name: str) -> str:
"""Strip characters that would break or inject into a Content-Disposition header.
The artifact URI basename is derived from a user-controlled S3 key. Rejects:
- C0 (<0x20), DEL (0x7F), C1 (0x80-0x9F) RFC 7230 violations.
- ``"`` and ``\\`` — would terminate or escape the quoted value.
- Unicode *format* (Cf) and *separator-line/paragraph* (Zl/Zp) chars
includes bidi overrides (U+202E), ZWSP (U+200B), ZWNBSP (U+FEFF);
these enable filename spoofing in the browser download UI.
"""
cleaned = []
for ch in name:
code = ord(ch)
if code < 0x20 or code == 0x7F or 0x80 <= code <= 0x9F:
continue
if ch in ('"', "\\"):
continue
if unicodedata.category(ch) in {"Cf", "Zl", "Zp"}:
continue
cleaned.append(ch)
return "".join(cleaned) or "download"
def _ascii_fallback_filename(name: str) -> str:
"""Best-effort ASCII form of ``name`` for the ``filename=`` parameter.
NFKD-normalizes and drops combining marks first so accented Latin
characters survive as their base letters (``fïlè.pdf`` ``file.pdf``)
instead of being stripped entirely. The RFC 5987 ``filename*=UTF-8''...``
form still carries the full name for modern clients.
If the ASCII stem ends up empty (e.g. pure CJK or emoji names),
prepend ``download`` so legacy clients don't save a bare ``.pdf``
hidden file.
"""
normalized = unicodedata.normalize("NFKD", name)
ascii_only = normalized.encode("ascii", "ignore").decode("ascii")
sanitized = _sanitize_header_filename(ascii_only)
stem, dot, ext = sanitized.rpartition(".")
if dot and not stem:
return f"download.{ext}"
return sanitized
def _build_attachment_disposition(filename: str) -> str:
"""Build a Content-Disposition header that survives non-ASCII filenames.
Emits both ``filename="<ascii>"`` (for legacy clients) and
``filename*=UTF-8''<pct-encoded>`` (RFC 5987, for everything modern).
Ensures the header value is Latin-1 encodable so Starlette doesn't 500.
"""
safe = _sanitize_header_filename(filename)
ascii_part = _ascii_fallback_filename(safe)
encoded = quote(safe, safe="")
return f"attachment; filename=\"{ascii_part}\"; filename*=UTF-8''{encoded}"
def _artifact_filename_from_uri(uri: str | None) -> str:
"""Extract the basename from an ``s3://``/``azure://`` URI without using
``urlparse`` that would split on ``?``/``#`` characters, which are legal
in S3 keys."""
if not uri:
return ""
return uri.rsplit("/", 1)[-1]
def _artifact_response_config(artifact: Artifact) -> tuple[str, str]:
"""Return (media_type, Content-Disposition) for the artifact content response.
DOWNLOAD artifacts use ``attachment`` disposition with the sanitized filename
so browsers never render user-supplied content inline (SKY-8862). All other
types keep the historical ``inline`` behaviour.
"""
media_type = _ARTIFACT_CONTENT_TYPES.get(artifact.artifact_type, _ARTIFACT_CONTENT_TYPE_DEFAULT)
if artifact.artifact_type == ArtifactType.DOWNLOAD:
raw_name = _artifact_filename_from_uri(artifact.uri)
return media_type, _build_attachment_disposition(raw_name)
return media_type, "inline"
def _artifact_content_response_headers(*, disposition: str, is_signed: bool) -> dict[str, str]:
"""Response headers for the artifact content endpoint.
Includes ``X-Content-Type-Options: nosniff`` as defence-in-depth for
SKY-8862: even if something upstream strips the attachment disposition,
the browser will not sniff the octet-stream body back into HTML/PDF.
"""
return {
"Content-Disposition": disposition,
"Cache-Control": f"private, max-age={ARTIFACT_URL_EXPIRY_SECONDS}" if is_signed else "private, no-cache",
"X-Content-Type-Options": "nosniff",
}
@base_router.get(
"/artifacts/{artifact_id}/content",
tags=["Artifacts"],
@ -1529,16 +1627,15 @@ async def get_artifact_content(
status_code=http_status.HTTP_404_NOT_FOUND,
detail="Artifact content not available",
)
media_type = _ARTIFACT_CONTENT_TYPES.get(artifact.artifact_type, _ARTIFACT_CONTENT_TYPE_DEFAULT)
media_type, content_disposition = _artifact_response_config(artifact)
is_signed = sig is not None and expiry is not None and kid is not None
cache_control = f"private, max-age={ARTIFACT_URL_EXPIRY_SECONDS}" if is_signed else "private, no-cache"
return Response(
content=content,
media_type=media_type,
headers={
"Content-Disposition": "inline",
"Cache-Control": cache_control,
},
headers=_artifact_content_response_headers(
disposition=content_disposition,
is_signed=is_signed,
),
)

View file

@ -46,6 +46,7 @@ class TaskV2(BaseModel):
webhook_failure_reason: str | None = None
extracted_information_schema: dict | list | str | None = None
error_code_mapping: dict | None = None
workflow_system_prompt: str | None = None
model: dict[str, Any] | None = None
queued_at: datetime | None = None
started_at: datetime | None = None
@ -203,6 +204,7 @@ class TaskV2Request(BaseModel):
publish_workflow: bool = False
extracted_information_schema: dict | list | str | None = None
error_code_mapping: dict[str, str] | None = None
workflow_system_prompt: str | None = None
max_screenshot_scrolls: int | None = None
extra_http_headers: dict[str, str] | None = None
browser_address: str | None = None

View file

@ -69,6 +69,11 @@ class TaskBase(BaseModel):
}
],
)
workflow_system_prompt: str | None = Field(
default=None,
description="System prompt applied to every LLM call for this task. Inherited from the workflow-level workflow_system_prompt when unset.",
examples=["Never guess at an answer. If unsure, respond with UNKNOWN."],
)
proxy_location: ProxyLocationInput = Field(
default=None,
description=PROXY_LOCATION_DOC_STRING,

View file

@ -1,4 +1,5 @@
import copy
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Self
import structlog
@ -26,7 +27,7 @@ from skyvern.forge.sdk.schemas.organizations import Organization
from skyvern.forge.sdk.schemas.tasks import TaskStatus
from skyvern.forge.sdk.services.bitwarden import BitwardenConstants, BitwardenService
from skyvern.forge.sdk.services.credentials import AzureVaultConstants, OnePasswordConstants, parse_totp_secret
from skyvern.forge.sdk.workflow.exceptions import OutputParameterKeyCollisionError
from skyvern.forge.sdk.workflow.exceptions import MissingJinjaVariables, OutputParameterKeyCollisionError
from skyvern.forge.sdk.workflow.models.parameter import (
PARAMETER_TYPE,
AWSSecretParameter,
@ -45,6 +46,7 @@ from skyvern.forge.sdk.workflow.models.parameter import (
WorkflowParameterType,
)
from skyvern.utils.strings import generate_random_string
from skyvern.utils.templating import get_missing_variables
if TYPE_CHECKING:
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRunParameter
@ -83,6 +85,7 @@ class WorkflowRunContext:
],
block_outputs: dict[str, Any] | None = None,
workflow: "Workflow | None" = None,
inherited_workflow_system_prompt: str | None = None,
) -> Self:
# key is label name
workflow_run_context = cls(
@ -92,6 +95,7 @@ class WorkflowRunContext:
workflow_run_id=workflow_run_id,
aws_client=aws_client,
workflow=workflow,
inherited_workflow_system_prompt=inherited_workflow_system_prompt,
)
workflow_run_context.organization_id = organization.organization_id
@ -170,12 +174,34 @@ class WorkflowRunContext:
workflow_run_id: str,
aws_client: AsyncAWSClient,
workflow: "Workflow | None" = None,
inherited_workflow_system_prompt: str | None = None,
) -> None:
self.workflow_title = workflow_title
self.workflow_id = workflow_id
self.workflow_permanent_id = workflow_permanent_id
self.workflow_run_id = workflow_run_id
self.workflow = workflow
# Joined raw workflow_system_prompt(s) from ancestor workflows (outermost
# first) collected by walking workflow_run.parent_workflow_run_id at
# execute_workflow time. Jinja-rendered on demand and concatenated with
# this workflow's own workflow_system_prompt inside
# resolve_effective_workflow_system_prompt so parent-workflow rules flow
# into every child block and agent (SKY-9147).
self.inherited_workflow_system_prompt = inherited_workflow_system_prompt
# Sentinel for the lazy-resolved effective workflow_system_prompt cache.
# Using a sentinel (not None) so "resolved to None" is distinguishable
# from "not yet resolved". Invalidated by set_workflow() because late
# hydration can change the workflow's own workflow_system_prompt.
self._effective_workflow_system_prompt_cache: str | None = None
self._effective_workflow_system_prompt_resolved: bool = False
# Per-block record of the effective workflow_system_prompt once a block
# has run through ``Block._apply_workflow_system_prompt``. Keyed by
# block label. ``None`` is a valid recorded value (block opted out).
# Read by the script path (``RealSkyvernPageAi.ai_extract``) so a
# cached-script extraction uses the same string the agent path would
# — single source of truth, no re-resolving the opt-out from the
# workflow definition in two places (SKY-9147).
self._block_workflow_system_prompts: dict[str, str | None] = {}
self.blocks_metadata: dict[str, BlockMetadata] = {}
self.parameters: dict[str, PARAMETER_TYPE] = {}
self.values: dict[str, Any] = {}
@ -193,6 +219,12 @@ class WorkflowRunContext:
This is used when the workflow is fetched from the database as a fallback.
"""
self.workflow = workflow
# Late-hydrated workflow may carry a different workflow_system_prompt than
# what was visible at construction time. Drop the cache so the next
# resolve_effective_workflow_system_prompt() re-renders against the new
# definition.
self._effective_workflow_system_prompt_resolved = False
self._effective_workflow_system_prompt_cache = None
def get_parameter(self, key: str) -> Parameter:
return self.parameters[key]
@ -225,6 +257,108 @@ class WorkflowRunContext:
label = ""
return self.blocks_metadata.get(label, BlockMetadata())
def record_block_workflow_system_prompt(self, label: str, value: str | None) -> None:
"""Record the effective ``workflow_system_prompt`` a block resolved to.
Called by ``Block._apply_workflow_system_prompt`` (agent path) and by
the script-path dispatch before handing execution to cached code. Both
paths use the same recorded value in ``ai_extract`` so agent and
script extractions for the same block hash to the same cache key and
the same LLM input.
"""
if label:
self._block_workflow_system_prompts[label] = value
def get_block_workflow_system_prompt(self, label: str | None) -> tuple[bool, str | None]:
"""Return ``(recorded, value)`` for a block label.
``recorded`` is True only when the block has actually run through
``_apply_workflow_system_prompt`` a recorded ``None`` (opt-out) is
distinguished from "never recorded" so callers can fall back safely
for non-block invocations (e.g. standalone scripts).
"""
if label and label in self._block_workflow_system_prompts:
return True, self._block_workflow_system_prompts[label]
return False, None
def resolve_effective_workflow_system_prompt(self) -> str | None:
"""Return the effective workflow-level system prompt for this run.
Concatenates any prompt inherited from ancestor workflows (propagated via
``WorkflowTriggerBlock`` outermost first) with this workflow's own
``workflow_system_prompt``. Jinja substitutions are rendered against this
run's values for both portions so ancestor templates can still reference
common variables like ``workflow_title``; placeholders that only exist in
the parent's context render empty under non-strict mode. Parts join with
a blank line so distinct rule sets stay readable to the LLM. Returns
``None`` when nothing is configured at any level so callers can short-
circuit on a simple falsy check.
The resolved string is cached on first call and reused for the life of
the run so every block sees the same effective prompt and the LLM
cache keys that derive from it stay stable across blocks. The cache is
invalidated in ``set_workflow`` for the late-hydration path.
"""
if self._effective_workflow_system_prompt_resolved:
return self._effective_workflow_system_prompt_cache
own_raw: str | None = None
if self.workflow is not None and self.workflow.workflow_definition is not None:
candidate = self.workflow.workflow_definition.workflow_system_prompt
# ``isinstance`` guard: a malformed workflow definition (or a test
# MagicMock whose attribute access returns another mock) could
# hand us a non-string here. Jinja's ``from_string`` would then
# raise ``Can't compile non template nodes`` deep inside the
# render path. Narrowing to ``str`` keeps the fallback silent.
if isinstance(candidate, str):
own_raw = candidate
inherited = (
self.inherited_workflow_system_prompt if isinstance(self.inherited_workflow_system_prompt, str) else None
)
inherited_resolved = self.render_workflow_level_template(inherited) if inherited else None
own_resolved = self.render_workflow_level_template(own_raw) if own_raw else None
parts = [p for p in (inherited_resolved, own_resolved) if p]
resolved = "\n\n".join(parts) if parts else None
self._effective_workflow_system_prompt_cache = resolved
self._effective_workflow_system_prompt_resolved = True
return resolved
def render_workflow_level_template(self, raw_template: str) -> str:
"""Render a Jinja template against workflow-scoped variables only.
Shared by every path that resolves the workflow-level workflow_system_prompt
(block execution, script-path ai_extract) so both produce the same string
same cache key, same LLM output. Deliberately omits block-scoped context:
a workflow-wide prompt has no single "current block" to bind against.
"""
if not raw_template:
return raw_template
template_data: dict[str, Any] = self.values.copy()
template_data.setdefault("workflow_title", self.workflow_title)
template_data.setdefault("workflow_id", self.workflow_id)
template_data.setdefault("workflow_permanent_id", self.workflow_permanent_id)
template_data.setdefault("workflow_run_id", self.workflow_run_id)
template_data.setdefault("browser_session_id", self.browser_session_id or "")
template_data.setdefault("current_date", datetime.now(timezone.utc).strftime("%Y-%m-%d"))
template_data["workflow_run_outputs"] = self.workflow_run_outputs
template_data["workflow_run_summary"] = self.build_workflow_run_summary()
if missing_variables := get_missing_variables(raw_template, template_data):
if settings.WORKFLOW_TEMPLATING_STRICTNESS == "strict":
raise MissingJinjaVariables(template=raw_template, variables=missing_variables)
# Non-strict mode silently renders undefined variables as empty strings,
# which makes typos like {{ persona }} invisible to the user. Emit a
# warning so the operator has a breadcrumb when a workflow_system_prompt
# isn't picking up the value they expected.
LOG.warning(
"Undefined Jinja variables in workflow-level template; rendering them as empty strings",
missing_variables=missing_variables,
workflow_run_id=self.workflow_run_id,
workflow_permanent_id=self.workflow_permanent_id,
)
return jinja_sandbox_env.from_string(raw_template).render(template_data)
async def _should_include_secrets_in_templates(self) -> bool:
"""
Check if secrets should be included in template formatting based on experimentation provider.
@ -1266,6 +1400,7 @@ class WorkflowContextManager:
],
block_outputs: dict[str, Any] | None = None,
workflow: "Workflow | None" = None,
inherited_workflow_system_prompt: str | None = None,
) -> WorkflowRunContext:
workflow_run_context = await WorkflowRunContext.init(
self.aws_client,
@ -1280,6 +1415,7 @@ class WorkflowContextManager:
secret_parameters,
block_outputs,
workflow,
inherited_workflow_system_prompt=inherited_workflow_system_prompt,
)
self.workflow_run_contexts[workflow_run_id] = workflow_run_context
return workflow_run_context

View file

@ -261,6 +261,19 @@ class Block(BaseModel, abc.ABC):
continue_on_failure: bool = False
model: dict[str, Any] | None = None
disable_cache: bool = False
# Opt-out from workflow-level workflow_system_prompt inheritance (and, on a
# WorkflowTriggerBlock, from propagating the parent chain's prompt into the
# spawned child run). A no-op for deterministic blocks that don't call an LLM.
ignore_workflow_system_prompt: bool = False
# Runtime cache populated by ``Block._apply_workflow_system_prompt`` — not
# user-settable. Excluded from serialization (``model_dump`` / JSON / API
# responses) so the resolved prompt doesn't leak into logs, workflow
# definition round-trips, or responses that weren't meant to carry it.
# Deliberately absent from the BlockYAML schema so it can never be set
# through YAML or the API. The user-facing opt-out is
# ``ignore_workflow_system_prompt``. Only consumed by block types that call
# an LLM; deterministic blocks ignore it.
workflow_system_prompt: str | None = Field(default=None, exclude=True)
# Only valid for blocks inside a for loop block
# Whether to continue to the next iteration when the block fails
@ -517,6 +530,42 @@ class Block(BaseModel, abc.ABC):
return template.render(template_data)
def _apply_workflow_system_prompt(
self,
workflow_run_context: WorkflowRunContext,
) -> None:
"""Resolve the workflow-level ``workflow_system_prompt`` for this block and
materialize it onto ``self.workflow_system_prompt``.
Concatenates any prompt inherited from ancestor workflows (propagated through
``WorkflowTriggerBlock``) with this workflow's own ``workflow_system_prompt``.
Jinja substitutions on this workflow's own prompt are resolved against
``workflow_run_context``; the inherited portion is already resolved at the
trigger boundary.
Shared by every block type that needs to inherit the workflow system prompt
into its own ``workflow_system_prompt`` runtime cache before dispatching an
LLM call. Callers invoke this inside ``format_potential_template_parameters``
so the value is available at execute time. ``workflow_system_prompt`` on each
block is a runtime cache it's deliberately absent from the BlockYAML schema
and not user-settable.
When a block opts out via ``ignore_workflow_system_prompt``, this leaves
the block's own ``workflow_system_prompt`` untouched (falling back to the
system default if none is set). The opt-out covers both this workflow's
prompt and any inherited prompt from parent workflows.
"""
if self.ignore_workflow_system_prompt:
# Record the opt-out so the script path (``ai_extract``) reads the
# same decision instead of re-resolving the flag from the
# definition. See ``WorkflowRunContext.record_block_workflow_system_prompt``.
workflow_run_context.record_block_workflow_system_prompt(self.label, None)
return
resolved = workflow_run_context.resolve_effective_workflow_system_prompt()
if resolved is not None:
self.workflow_system_prompt = resolved
workflow_run_context.record_block_workflow_system_prompt(self.label, resolved)
@classmethod
def get_subclasses(cls) -> tuple[type[Block], ...]:
return tuple(cls.__subclasses__())
@ -809,6 +858,10 @@ class BaseTaskBlock(Block):
for error_code, error_description in merged_mapping.items()
}
# Materialize the workflow-level workflow_system_prompt onto this block so
# ForgeAgent.create_task can hand it off to the Task row verbatim.
self._apply_workflow_system_prompt(workflow_run_context)
@staticmethod
async def get_task_order(workflow_run_id: str, current_retry: int) -> tuple[int, int]:
"""
@ -2787,6 +2840,8 @@ class TextPromptBlock(Block):
if self.json_schema:
self.json_schema = self._render_schema_templates(self.json_schema, workflow_run_context)
self._apply_workflow_system_prompt(workflow_run_context)
async def send_prompt(
self,
prompt: str,
@ -2839,6 +2894,7 @@ class TextPromptBlock(Block):
response = await llm_api_handler(
prompt=prompt,
prompt_name="text-prompt",
system_prompt=self.workflow_system_prompt,
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,
)
@ -3786,6 +3842,8 @@ class FileParserBlock(Block):
self.file_url, workflow_run_context
)
self._apply_workflow_system_prompt(workflow_run_context)
def _detect_file_type_from_url(self, file_url: str, file_path: str | None = None) -> FileType:
"""Detect file type based on file extension in the URL, with magic-byte fallback."""
url_parsed = urlparse(file_url)
@ -4024,6 +4082,8 @@ class FileParserBlock(Block):
llm_api_handler = LLMAPIHandlerFactory.get_override_llm_api_handler(
self.override_llm_key, default=app.LLM_API_HANDLER
)
# OCR transcription intentionally skips system_prompt
# It still applies to the downstream extract-information-from-file-text call.
llm_response = await llm_api_handler(
prompt=llm_prompt,
prompt_name="extract-text-from-image",
@ -4055,6 +4115,8 @@ class FileParserBlock(Block):
llm_api_handler = LLMAPIHandlerFactory.get_override_llm_api_handler(
self.override_llm_key, default=app.LLM_API_HANDLER
)
# OCR transcription intentionally skips system_prompt — see
# _parse_pdf_file_with_vision_ocr for rationale.
llm_response = await llm_api_handler(
prompt=llm_prompt,
prompt_name="extract-text-from-image",
@ -4175,6 +4237,7 @@ class FileParserBlock(Block):
prompt=llm_prompt,
prompt_name="extract-information-from-file-text",
force_dict=False,
system_prompt=self.workflow_system_prompt,
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,
)
@ -4348,6 +4411,8 @@ class PDFParserBlock(Block):
self.file_url, workflow_run_context
)
self._apply_workflow_system_prompt(workflow_run_context)
async def execute(
self,
workflow_run_id: str,
@ -4416,6 +4481,7 @@ class PDFParserBlock(Block):
prompt=llm_prompt,
prompt_name="extract-information-from-file-text",
force_dict=False,
system_prompt=self.workflow_system_prompt,
workflow_run_block_id=workflow_run_block_id,
organization_id=organization_id,
)
@ -4837,6 +4903,10 @@ class TaskV2Block(Block):
)
self.totp_verification_url = prepend_scheme_and_validate_url(self.totp_verification_url)
# Materialize the workflow-level workflow_system_prompt onto this block so
# execute() can hand it off to the TaskV2 row verbatim.
self._apply_workflow_system_prompt(workflow_run_context)
async def execute(
self,
workflow_run_id: str,
@ -4912,6 +4982,7 @@ class TaskV2Block(Block):
totp_identifier=resolved_totp_identifier,
totp_verification_url=resolved_totp_verification_url,
max_screenshot_scrolling_times=workflow_run.max_screenshot_scrolls,
workflow_system_prompt=self.workflow_system_prompt,
)
await app.DATABASE.observer.update_task_v2(
task_v2.observer_cruise_id, status=TaskV2Status.queued, organization_id=organization_id
@ -7091,6 +7162,7 @@ class WorkflowTriggerBlock(Block):
workflow_permanent_id=resolved_workflow_permanent_id,
organization=organization,
parent_workflow_run_id=workflow_run_id,
ignore_inherited_workflow_system_prompt=self.ignore_workflow_system_prompt,
)
except Exception as e:
error_msg = get_user_facing_exception_message(e)
@ -7106,6 +7178,9 @@ class WorkflowTriggerBlock(Block):
)
try:
# The opt-out flag is persisted on the child's workflow_run row at
# spawn time (setup_workflow_run above), so execute_workflow reads
# it from the DB. This works identically for sync and async triggers.
final_run = await app.WORKFLOW_SERVICE.execute_workflow(
workflow_run_id=triggered_run_id,
api_key=None,
@ -7173,6 +7248,12 @@ class WorkflowTriggerBlock(Block):
browser_session_id=resolved_browser_session_id,
)
try:
# ``run_workflow`` persists this flag to the child's
# workflow_run row via its internal setup_workflow_run call,
# then dispatches to Temporal without passing the flag
# separately; the worker reads it back from the DB inside
# ``execute_workflow``. Symmetric with the sync branch above
# — the flag is written once, at spawn time, for both paths.
triggered_workflow_run = await run_workflow(
workflow_id=resolved_workflow_permanent_id,
organization=organization,
@ -7180,6 +7261,7 @@ class WorkflowTriggerBlock(Block):
request=None,
background_tasks=None,
parent_workflow_run_id=workflow_run_id,
ignore_inherited_workflow_system_prompt=self.ignore_workflow_system_prompt,
)
except Exception as e:
error_msg = get_user_facing_exception_message(e)

View file

@ -56,6 +56,7 @@ class WorkflowDefinition(BaseModel):
blocks: List[BlockTypeVar]
finally_block_label: str | None = None
error_code_mapping: dict[str, str] | None = None
workflow_system_prompt: str | None = None
def validate(self) -> None:
all_labels: set[str] = set()
@ -199,6 +200,7 @@ class WorkflowRun(BaseModel):
code_gen: bool | None = None
trigger_type: WorkflowRunTriggerType | None = None
workflow_schedule_id: str | None = None
ignore_inherited_workflow_system_prompt: bool = False
@field_validator("run_with", mode="before")
@classmethod

View file

@ -80,6 +80,7 @@ from skyvern.forge.sdk.workflow.models.block import (
ForLoopBlock,
NavigationBlock,
TaskV2Block,
WorkflowTriggerBlock,
compute_conditional_scopes,
get_all_blocks,
)
@ -649,6 +650,7 @@ class WorkflowService:
workflow_run_id: str | None = None,
trigger_type: WorkflowRunTriggerType | None = None,
workflow_schedule_id: str | None = None,
ignore_inherited_workflow_system_prompt: bool = False,
) -> WorkflowRun:
"""
Create a workflow run and its parameters. Validate the workflow and the organization. If there are missing
@ -708,6 +710,7 @@ class WorkflowService:
workflow_run_id=workflow_run_id,
trigger_type=trigger_type,
workflow_schedule_id=workflow_schedule_id,
ignore_inherited_workflow_system_prompt=ignore_inherited_workflow_system_prompt,
)
LOG.info(
f"Created workflow run {workflow_run.workflow_run_id} for workflow {workflow.workflow_id}",
@ -940,6 +943,79 @@ class WorkflowService:
return None
async def _collect_inherited_workflow_system_prompt(
self,
parent_workflow_run_id: str | None,
) -> str | None:
"""Walk up the parent workflow-run chain and join each ancestor's raw
``workflow_system_prompt`` (outermost first). Returns None when no ancestor
has one set. A depth cap matches ``WorkflowTriggerBlock.MAX_TRIGGER_DEPTH``
to keep the traversal bounded against malformed chains.
This reads raw prompt strings from each ancestor's ``workflow_definition``
without Jinja rendering the child's context will render them later via
``WorkflowRunContext.resolve_effective_workflow_system_prompt``. Using raw
strings here avoids depending on the parent's live ``WorkflowRunContext``,
which isn't available for async/fire-and-forget child runs.
Chain-break on opt-out: when an ancestor has ``ignore_inherited_workflow_system_prompt``
set, its own prompt is still included (it ran without its own ancestors'
prompts, but its own rules remain its statement to descendants), but the
traversal stops there. A workflow explicitly opting out of its parents'
rules creates a clean boundary for itself and everything it triggers
otherwise descendants would silently reintroduce prompts the opted-out
workflow rejected.
"""
# Two-phase walk to keep DB round trips bounded. Phase 1 is an
# inherently sequential chain walk (each ``parent_workflow_run_id`` is
# only known after fetching the previous run), capped at
# ``MAX_TRIGGER_DEPTH``. Phase 2 batches the independent workflow-
# definition fetches with ``asyncio.gather`` so all N definition
# lookups happen in one concurrent burst instead of N sequential
# awaits — brings the worst case from 2N round trips down to
# N + 1 (depth-bounded at 10). A deeper optimization (single
# recursive CTE across workflow_runs + workflows) is possible if
# trigger chains ever get deep enough to matter.
chain: list[tuple[str, bool]] = [] # [(workflow_id, ignore_inherited), ...] outermost child first
current_parent_id: str | None = parent_workflow_run_id
visited: set[str] = set()
depth = 0
while current_parent_id and depth < WorkflowTriggerBlock.MAX_TRIGGER_DEPTH:
if current_parent_id in visited:
break
visited.add(current_parent_id)
parent_run = await app.DATABASE.workflow_runs.get_workflow_run(current_parent_id)
if parent_run is None:
break
chain.append((parent_run.workflow_id, parent_run.ignore_inherited_workflow_system_prompt))
if parent_run.ignore_inherited_workflow_system_prompt:
break
current_parent_id = parent_run.parent_workflow_run_id
depth += 1
if not chain:
return None
# Fetch all ancestor workflow definitions concurrently.
ancestor_workflows = await asyncio.gather(
*(self.get_workflow(workflow_id=workflow_id) for workflow_id, _ in chain),
return_exceptions=False,
)
prompts: list[str] = []
for workflow in ancestor_workflows:
if workflow is None or workflow.workflow_definition is None:
continue
raw = workflow.workflow_definition.workflow_system_prompt
if raw:
prompts.append(raw)
if not prompts:
return None
# Outermost ancestor first so child-local rules appear after broader rules.
prompts.reverse()
return "\n\n".join(prompts)
@traced(name="skyvern.workflow.execute", role="wrapper")
async def execute_workflow(
self,
@ -951,7 +1027,15 @@ class WorkflowService:
browser_session_id: str | None = None,
need_call_webhook: bool = True,
) -> WorkflowRun:
"""Execute a workflow."""
"""Execute a workflow.
When the workflow_run row has ``ignore_inherited_workflow_system_prompt``
set (populated at spawn time by a ``WorkflowTriggerBlock`` whose
``ignore_workflow_system_prompt`` flag is True), the child workflow
starts with a clean slate no inherited prompt from the ancestor
chain. Persisting the intent on the row means the flag is honored for
both sync and async (Temporal-dispatched) trigger modes.
"""
organization_id = organization.organization_id
LOG.info(
@ -1008,6 +1092,19 @@ class WorkflowService:
# Get all <workflow parameter, workflow run parameter> tuples
wp_wps_tuples = await self.get_workflow_run_parameter_tuples(workflow_run_id=workflow_run_id)
workflow_output_parameters = await self.get_workflow_output_parameters(workflow_id=workflow.workflow_id)
# Collect resolved workflow_system_prompt from every ancestor workflow so child
# blocks inherit them (SKY-9147). We read each parent's workflow_definition from
# the DB because the parent's in-memory WorkflowRunContext may be gone by the
# time a fire-and-forget child runs on its own worker. Jinja placeholders in
# ancestor prompts are rendered against this run's values; parent-only
# parameters will simply render empty in non-strict mode.
inherited_workflow_system_prompt = (
None
if workflow_run.ignore_inherited_workflow_system_prompt
else await self._collect_inherited_workflow_system_prompt(
parent_workflow_run_id=workflow_run.parent_workflow_run_id,
)
)
try:
await app.WORKFLOW_CONTEXT_MANAGER.initialize_workflow_run_context(
organization,
@ -1021,6 +1118,7 @@ class WorkflowService:
secret_parameters,
block_outputs,
workflow,
inherited_workflow_system_prompt=inherited_workflow_system_prompt,
)
except Exception as e:
LOG.exception(
@ -2121,6 +2219,22 @@ class WorkflowService:
block_label=block.label,
run_signature=script_block.run_signature,
)
# Script path skips the block's own execute() (which is where
# format_potential_template_parameters runs in the agent path),
# so we apply the workflow_system_prompt here to thread the
# block-resolved value into the ``WorkflowRunContext`` cache.
# ``ai_extract`` reads from that cache so the script-generated
# extraction honors ``ignore_workflow_system_prompt`` the same
# way the agent path does — same string, same cache key.
try:
workflow_run_context = app.WORKFLOW_CONTEXT_MANAGER.get_workflow_run_context(workflow_run_id)
block._apply_workflow_system_prompt(workflow_run_context)
except Exception:
LOG.warning(
"Failed to apply workflow_system_prompt for script-path block; continuing",
block_label=block.label,
exc_info=True,
)
block_exec_start = time.monotonic()
try:
vars_dict = vars(loaded_script_module) if loaded_script_module else {}
@ -3750,6 +3864,7 @@ class WorkflowService:
workflow_run_id: str | None = None,
trigger_type: WorkflowRunTriggerType | None = None,
workflow_schedule_id: str | None = None,
ignore_inherited_workflow_system_prompt: bool = False,
) -> WorkflowRun:
# validate the browser session or profile id
browser_profile_id = workflow_request.browser_profile_id
@ -3840,6 +3955,7 @@ class WorkflowService:
workflow_run_id=workflow_run_id,
trigger_type=trigger_type,
workflow_schedule_id=workflow_schedule_id,
ignore_inherited_workflow_system_prompt=ignore_inherited_workflow_system_prompt,
)
async def _update_workflow_run_status(

View file

@ -327,6 +327,7 @@ def convert_workflow_definition(
version=dag_version,
finally_block_label=workflow_definition_yaml.finally_block_label,
error_code_mapping=workflow_definition_yaml.error_code_mapping,
workflow_system_prompt=workflow_definition_yaml.workflow_system_prompt,
)
LOG.info(
@ -383,6 +384,7 @@ def _build_block_kwargs(
"continue_on_failure": block_yaml.continue_on_failure,
"next_loop_on_failure": block_yaml.next_loop_on_failure,
"model": block_yaml.model,
"ignore_workflow_system_prompt": block_yaml.ignore_workflow_system_prompt,
}

View file

@ -15,7 +15,7 @@ from skyvern.client import (
RunSdkActionRequestAction_Validate,
)
from skyvern.config import settings
from skyvern.core.script_generations.skyvern_page_ai import SkyvernPageAi
from skyvern.core.script_generations.skyvern_page_ai import SYSTEM_PROMPT_UNSET, SkyvernPageAi
if TYPE_CHECKING:
from skyvern.library.skyvern_browser import SkyvernBrowser
@ -168,13 +168,15 @@ class SdkSkyvernPageAi(SkyvernPageAi):
data: str | dict[str, Any] | None = None,
skip_refresh: bool = False,
include_extracted_text: bool = True,
system_prompt: str | None | Any = SYSTEM_PROMPT_UNSET,
) -> dict[str, Any] | list | str | None:
"""Extract information from the page using AI via API call.
Note: skip_refresh and include_extracted_text are accepted for Protocol
compatibility but not forwarded to the API. The server-side controls
both via the Task record on the SDK HTTP path. The optimizations only
take effect on the direct RealSkyvernPageAI path (MCP local browser).
Note: skip_refresh, include_extracted_text, and system_prompt are
accepted for Protocol compatibility but not forwarded to the API. The
server-side controls them via the Task record on the SDK HTTP path.
The optimizations only take effect on the direct RealSkyvernPageAI
path (MCP local browser).
"""
LOG.info("AI extract", prompt=prompt, workflow_run_id=self._browser.workflow_run_id)

View file

@ -321,6 +321,16 @@ def sanitize_workflow_yaml_with_references(workflow_yaml: dict[str, Any]) -> dic
workflow_definition["parameters"], old_output_key, new_output_key
)
# workflow_system_prompt is rendered through Jinja at execution time, so
# references inside it need the same rename treatment as block fields.
if isinstance(workflow_definition.get("workflow_system_prompt"), str):
workflow_definition["workflow_system_prompt"] = _replace_references_in_value(
workflow_definition["workflow_system_prompt"], old_output_key, new_output_key
)
workflow_definition["workflow_system_prompt"] = _replace_references_in_value(
workflow_definition["workflow_system_prompt"], old_label, new_label
)
# Step 4: Update all parameter key references
for old_key, new_key in param_key_mapping.items():
# Update Jinja references in blocks (e.g., {{ old_key }})
@ -339,6 +349,11 @@ def sanitize_workflow_yaml_with_references(workflow_yaml: dict[str, Any]) -> dic
workflow_definition["parameters"], old_key, new_key
)
if isinstance(workflow_definition.get("workflow_system_prompt"), str):
workflow_definition["workflow_system_prompt"] = _replace_references_in_value(
workflow_definition["workflow_system_prompt"], old_key, new_key
)
# Rewrite workflow-level error_code_mapping atomically so substitutions don't chain
# (e.g. foo-bar -> foo_bar and foo_bar -> foo_bar_2 must not combine into foo_bar_2).
if "error_code_mapping" in workflow_definition:
@ -614,6 +629,10 @@ class BlockYAML(BaseModel, abc.ABC):
)
continue_on_failure: bool = False
model: dict[str, Any] | None = None
# Opt-out from workflow-level workflow_system_prompt inheritance (and, on a
# WorkflowTriggerBlock, from propagating the parent chain's prompt into the
# spawned child run). A no-op for deterministic blocks that don't call an LLM.
ignore_workflow_system_prompt: bool = False
# Only valid for blocks inside a for loop block
# Whether to continue to the next iteration when the block fails
next_loop_on_failure: bool = False
@ -1105,6 +1124,7 @@ class WorkflowDefinitionYAML(BaseModel):
blocks: list[BLOCK_YAML_TYPES]
finally_block_label: str | None = None
error_code_mapping: dict[str, str] | None = None
workflow_system_prompt: str | None = None
@model_validator(mode="after")
def validate_unique_block_labels(self) -> "WorkflowDefinitionYAML":

View file

@ -145,6 +145,7 @@ async def _summarize_max_steps_failure_reason(
screenshots=screenshots,
prompt_name="task_v2_summarize-max-steps-reason",
thought=thought,
system_prompt=task_v2.workflow_system_prompt,
)
return json_response.get("reasoning", ""), json_response.get("failure_categories")
except Exception:
@ -252,6 +253,7 @@ async def initialize_task_v2(
parent_workflow_run_id: str | None = None,
extracted_information_schema: dict | list | str | None = None,
error_code_mapping: dict | None = None,
workflow_system_prompt: str | None = None,
create_task_run: bool = False,
model: dict[str, Any] | None = None,
max_screenshot_scrolling_times: int | None = None,
@ -270,6 +272,7 @@ async def initialize_task_v2(
proxy_location=proxy_location,
extracted_information_schema=extracted_information_schema,
error_code_mapping=error_code_mapping,
workflow_system_prompt=workflow_system_prompt,
model=model,
max_screenshot_scrolling_times=max_screenshot_scrolling_times,
extra_http_headers=extra_http_headers,
@ -381,6 +384,7 @@ async def initialize_task_v2_metadata(
prompt=metadata_prompt,
thought=thought,
prompt_name="task_v2_generate_metadata",
system_prompt=task_v2.workflow_system_prompt,
)
# validate
@ -829,6 +833,7 @@ async def run_task_v2_helper(
screenshots=scraped_page.screenshots,
thought=thought,
prompt_name="task_v2",
system_prompt=task_v2.workflow_system_prompt,
)
LOG.info(
"Task v2 response",
@ -1079,6 +1084,7 @@ async def run_task_v2_helper(
screenshots=completion_screenshots,
thought=thought,
prompt_name="task_v2_check_completion",
system_prompt=task_v2.workflow_system_prompt,
)
LOG.info(
"Task v2 completion check response",
@ -1465,6 +1471,7 @@ async def _generate_loop_task(
screenshots=scraped_page.screenshots,
thought=thought_task_in_loop,
prompt_name="task_v2_generate_task_block",
system_prompt=task_v2.workflow_system_prompt,
)
LOG.info("Task in loop metadata response", task_in_loop_metadata_response=task_in_loop_metadata_response)
navigation_goal = task_in_loop_metadata_response.get("navigation_goal")
@ -1571,6 +1578,7 @@ async def _generate_extraction_task(
task_v2=task_v2,
prompt_name="task_v2_generate_extraction_task",
organization_id=task_v2.organization_id,
system_prompt=task_v2.workflow_system_prompt,
)
break
except (InvalidLLMResponseFormat, EmptyLLMResponseError) as e:
@ -1971,6 +1979,7 @@ async def _summarize_task_v2(
screenshots=screenshots,
thought=thought,
prompt_name="task_v2_summary",
system_prompt=task_v2.workflow_system_prompt,
)
LOG.info("Task v2 summary response", task_v2_summary_resp=task_v2_summary_resp)

View file

@ -27,6 +27,7 @@ async def prepare_workflow(
code_gen: bool | None = None,
parent_workflow_run_id: str | None = None,
trigger_type: WorkflowRunTriggerType | None = None,
ignore_inherited_workflow_system_prompt: bool = False,
) -> WorkflowRun:
"""
Prepare a workflow to be run.
@ -47,6 +48,7 @@ async def prepare_workflow(
code_gen=code_gen,
parent_workflow_run_id=parent_workflow_run_id,
trigger_type=trigger_type,
ignore_inherited_workflow_system_prompt=ignore_inherited_workflow_system_prompt,
)
workflow = await app.WORKFLOW_SERVICE.get_workflow_by_permanent_id(
@ -87,6 +89,7 @@ async def run_workflow(
block_outputs: dict[str, t.Any] | None = None,
parent_workflow_run_id: str | None = None,
trigger_type: WorkflowRunTriggerType | None = None,
ignore_inherited_workflow_system_prompt: bool = False,
) -> WorkflowRun:
workflow_run = await prepare_workflow(
workflow_id=workflow_id,
@ -98,6 +101,7 @@ async def run_workflow(
request_id=request_id,
parent_workflow_run_id=parent_workflow_run_id,
trigger_type=trigger_type,
ignore_inherited_workflow_system_prompt=ignore_inherited_workflow_system_prompt,
)
await AsyncExecutorFactory.get_executor().execute_workflow(

View file

@ -4463,6 +4463,7 @@ async def extract_information_for_navigation_goal(
error_code_mapping=error_code_mapping_str,
previous_extracted_information=post_ceiling_kwargs["previous_extracted_information"],
llm_key=llm_key_override,
workflow_system_prompt=task.workflow_system_prompt,
)
if is_retry_step:
# Proactively evict the in-run entry. The cross-run tier will be
@ -4548,6 +4549,7 @@ async def extract_information_for_navigation_goal(
# reasons unrelated to cache correctness.
prompt_name="extract-information",
force_dict=False,
system_prompt=task.workflow_system_prompt,
)
# Apply the same post-processing the miss path applies so the
# comparison is apples-to-apples against the cached value.
@ -4696,6 +4698,7 @@ async def extract_information_for_navigation_goal(
screenshots=scraped_page.screenshots,
prompt_name="extract-information",
force_dict=False,
system_prompt=task.workflow_system_prompt,
)
# Validate and fill missing fields based on schema