🔄 synced local 'skyvern/' with remote 'skyvern/'

This commit is contained in:
Shuchang Zheng 2026-04-27 23:47:58 +00:00
parent 157521fd3c
commit 669f2070ef
7 changed files with 262 additions and 80 deletions

View file

@ -350,11 +350,57 @@ class ArtifactManager:
``claim_session_download_artifacts_for_run`` UPDATE to tag rows
whose ``created_at`` falls inside the run's window.
"""
return await self._create_browser_session_artifact(
organization_id=organization_id,
browser_session_id=browser_session_id,
uri=uri,
filename=filename,
artifact_type=ArtifactType.DOWNLOAD,
checksum=checksum,
)
async def create_browser_session_recording_artifact(
self,
*,
organization_id: str,
browser_session_id: str,
uri: str,
filename: str,
checksum: str | None = None,
) -> str:
"""Register a session-scoped recording (video) as a RECORDING Artifact row.
Mirrors :meth:`create_browser_session_download_artifact`. Called from
``S3Storage.sync_browser_session_file(artifact_type="videos")`` once
Playwright finalizes the recording at session close. Idempotent on
``(organization_id, browser_session_id, uri)`` re-runs of the
end-of-session sync are safe.
"""
return await self._create_browser_session_artifact(
organization_id=organization_id,
browser_session_id=browser_session_id,
uri=uri,
filename=filename,
artifact_type=ArtifactType.RECORDING,
checksum=checksum,
)
async def _create_browser_session_artifact(
self,
*,
organization_id: str,
browser_session_id: str,
uri: str,
filename: str,
artifact_type: ArtifactType,
checksum: str | None = None,
) -> str:
"""Shared idempotent insert keyed on ``(browser_session_id, uri, artifact_type)``."""
existing = await app.DATABASE.artifacts.find_artifact_for_browser_session(
organization_id=organization_id,
browser_session_id=browser_session_id,
uri=uri,
artifact_type=ArtifactType.DOWNLOAD,
artifact_type=artifact_type,
)
if existing is not None:
return existing.artifact_id
@ -362,17 +408,18 @@ class ArtifactManager:
artifact_id = generate_artifact_id()
await app.DATABASE.artifacts.create_artifact(
artifact_id=artifact_id,
artifact_type=ArtifactType.DOWNLOAD,
artifact_type=artifact_type,
uri=uri,
organization_id=organization_id,
browser_session_id=browser_session_id,
checksum=checksum,
)
LOG.debug(
"Registered session-scoped downloaded file as artifact",
"Registered session-scoped artifact",
artifact_id=artifact_id,
browser_session_id=browser_session_id,
filename=filename,
artifact_type=artifact_type.value,
)
return artifact_id

View file

@ -23,6 +23,7 @@ from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityT
from skyvern.forge.sdk.artifact.storage.base import (
FILE_EXTENTSION_MAP,
BaseStorage,
_file_infos_from_artifacts,
_file_infos_from_download_artifacts,
)
from skyvern.forge.sdk.models import Step
@ -369,19 +370,58 @@ class AzureStorage(BaseStorage):
in_progress=True,
)
async def list_recordings_in_browser_session(self, organization_id: str, browser_session_id: str) -> list[str]:
"""List all recording files for a browser session from Azure."""
uri = f"azure://{settings.AZURE_STORAGE_CONTAINER_ARTIFACTS}/v1/{settings.ENV}/{organization_id}/browser_sessions/{browser_session_id}/videos"
return [
f"azure://{settings.AZURE_STORAGE_CONTAINER_ARTIFACTS}/{file}"
for file in await self.async_client.list_files(uri=uri)
]
async def get_shared_recordings_in_browser_session(
self, organization_id: str, browser_session_id: str
) -> list[FileInfo]:
"""Get recording files with SAS URLs for a browser session."""
object_keys = await self.list_recordings_in_browser_session(organization_id, browser_session_id)
"""Get recording files for a browser session.
Artifact-first when the keyring is configured see s3.py for the
rationale. Falls back to direct Azure LIST + SAS URLs for legacy
sessions and OSS-default deployments.
"""
if settings.ARTIFACT_CONTENT_HMAC_KEYRING:
try:
artifacts = await app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type(
browser_session_id=browser_session_id,
organization_id=organization_id,
artifact_type=ArtifactType.RECORDING,
)
except Exception:
LOG.warning(
"Failed to look up browser-session recording artifacts; falling back to SAS URLs",
organization_id=organization_id,
browser_session_id=browser_session_id,
exc_info=True,
)
artifacts = []
artifacts = [
a for a in artifacts if a.uri and (a.uri.lower().endswith(".webm") or a.uri.lower().endswith(".mp4"))
]
if artifacts:
file_infos = await _file_infos_from_artifacts(artifacts, artifact_type=ArtifactType.RECORDING)
file_infos.sort(key=lambda f: (f.modified_at is not None, f.modified_at), reverse=True)
return file_infos
# Legacy fallback: keyring unset, DB raised, or session pre-cutover
# with no rows at all. SKY-9286: drop entirely after the bake-in
# window (target 2026-05-03) — every call here is a billable
# ListBlobs request.
return await self._get_shared_recordings_in_browser_session_via_listing(
organization_id=organization_id, browser_session_id=browser_session_id
)
async def _get_shared_recordings_in_browser_session_via_listing(
self, *, organization_id: str, browser_session_id: str
) -> list[FileInfo]:
# Direct Azure LIST: legacy fallback for pre-cutover sessions and
# OSS deployments without a keyring.
# SKY-9286: scheduled for removal once production sessions all have
# rows — every call here is a billable ListBlobs request.
listing_uri = f"azure://{settings.AZURE_STORAGE_CONTAINER_ARTIFACTS}/v1/{settings.ENV}/{organization_id}/browser_sessions/{browser_session_id}/videos"
object_keys = [
f"azure://{settings.AZURE_STORAGE_CONTAINER_ARTIFACTS}/{file}"
for file in await self.async_client.list_files(uri=listing_uri)
]
if len(object_keys) == 0:
return []
@ -667,16 +707,13 @@ class AzureStorage(BaseStorage):
tags = await self._get_tags_for_org(organization_id)
await self.async_client.upload_file_from_path(uri, local_file_path, tier=tier, tags=tags)
# See s3.py — DB is the single source of truth for both the user-facing
# listing and the agent's baseline / complete_on_download checks.
# Partials get a row with checksum=None so the agent can detect "still
# downloading"; the row is dropped on Chrome's atomic-rename
# ``Change.deleted`` event.
#
# Exceptions propagate so the watcher's bounded retry can recover from
# a transient DB outage — silently swallowing would leave the file in
# Azure with no row, invisible to baseline diffs.
if artifact_type == "downloads":
# See s3.py — DB is the single source of truth for the user-facing
# listing and the agent's baseline / complete_on_download checks.
# Partials get a row with checksum=None; the row is dropped on
# Chrome's atomic-rename ``Change.deleted`` event. Exceptions
# propagate so the watcher's bounded retry can recover from a
# transient DB outage — both ops are idempotent.
is_partial = remote_path.endswith(BROWSER_DOWNLOADING_SUFFIX)
checksum = None if is_partial else calculate_sha256_for_file(local_file_path)
await app.ARTIFACT_MANAGER.create_browser_session_download_artifact(
@ -686,6 +723,22 @@ class AzureStorage(BaseStorage):
filename=os.path.basename(remote_path),
checksum=checksum,
)
elif artifact_type == "videos":
# Recording uploaded once at session close — see s3.py. Artifact-
# row creation is best-effort: the only caller swallows
# exceptions without retry, so the gated legacy listing fallback
# in ``get_shared_recordings_in_browser_session`` is the safety
# net for missed writes (when the session has no RECORDING rows
# we fall through to the Azure LIST path, so a row-less recording
# still surfaces via the legacy SAS URL).
checksum = calculate_sha256_for_file(local_file_path)
await app.ARTIFACT_MANAGER.create_browser_session_recording_artifact(
organization_id=organization_id,
browser_session_id=browser_session_id,
uri=uri,
filename=os.path.basename(remote_path),
checksum=checksum,
)
return uri

View file

@ -10,16 +10,21 @@ from skyvern.forge.sdk.schemas.task_v2 import TaskV2, Thought
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock
async def _file_infos_from_download_artifacts(artifacts: list[Artifact]) -> list[FileInfo]:
"""Build the API-shaped ``FileInfo`` list from DOWNLOAD artifact rows.
async def _file_infos_from_artifacts(artifacts: list[Artifact], *, artifact_type: ArtifactType) -> list[FileInfo]:
"""Build the API-shaped ``FileInfo`` list from a homogeneous batch of
artifact rows (e.g. all DOWNLOAD or all RECORDING).
Filename is the URI basename (the save site writes ``{base_uri}/{file}``);
checksum and modified_at come straight from the row, so retrieval needs
zero S3 round-trips.
All artifacts in a single batch share the same organization (downloads are
scoped to a run, which is scoped to an org), so the per-org URL TTL is
resolved once and applied to every URL.
All artifacts in a single batch share the same organization (downloads /
recordings are scoped to a run or browser session, which is scoped to an
org), so the per-org URL TTL is resolved once and applied to every URL.
The ``artifact_type`` is only used for the URL's informational query
parameter it does not affect the HMAC signature. Callers must pass rows
of a single type so the URL hint is correct.
"""
if not artifacts:
return []
@ -31,7 +36,7 @@ async def _file_infos_from_download_artifacts(artifacts: list[Artifact]) -> list
url = app.ARTIFACT_MANAGER.build_signed_content_url(
artifact_id=artifact.artifact_id,
artifact_name=filename,
artifact_type=ArtifactType.DOWNLOAD.value,
artifact_type=artifact_type.value,
expiry_seconds=expiry_seconds,
)
infos.append(
@ -46,6 +51,16 @@ async def _file_infos_from_download_artifacts(artifacts: list[Artifact]) -> list
return infos
async def _file_infos_from_download_artifacts(artifacts: list[Artifact]) -> list[FileInfo]:
"""Backward-compat alias for DOWNLOAD-typed callers.
Forwards to :func:`_file_infos_from_artifacts` with the DOWNLOAD type so
pre-existing import sites keep working without each having to thread the
artifact_type through.
"""
return await _file_infos_from_artifacts(artifacts, artifact_type=ArtifactType.DOWNLOAD)
# TODO: This should be a part of the ArtifactType model
FILE_EXTENTSION_MAP: dict[ArtifactType, str] = {
ArtifactType.RECORDING: "webm",
@ -194,10 +209,6 @@ class BaseStorage(ABC):
) -> list[FileInfo]:
pass
@abstractmethod
async def list_recordings_in_browser_session(self, organization_id: str, browser_session_id: str) -> list[str]:
pass
@abstractmethod
async def get_shared_recordings_in_browser_session(
self, organization_id: str, browser_session_id: str

View file

@ -314,8 +314,10 @@ class LocalStorage(BaseStorage):
) -> list[str]:
return []
async def list_recordings_in_browser_session(self, organization_id: str, browser_session_id: str) -> list[str]:
"""List all recording files for a browser session from local storage.
async def get_shared_recordings_in_browser_session(
self, organization_id: str, browser_session_id: str
) -> list[FileInfo]:
"""Get recording files with URLs for a browser session from local storage.
Videos are synced to the browser_sessions storage path when the session closes.
"""
@ -328,20 +330,14 @@ class LocalStorage(BaseStorage):
/ "videos"
)
recording_files: list[str] = []
if videos_base.exists():
for root, _, files in os.walk(videos_base):
for file in files:
file_path = Path(root) / file
recording_files.append(f"file://{file_path}")
if not videos_base.exists():
return []
return recording_files
file_uris: list[str] = []
for root, _, files in os.walk(videos_base):
for file in files:
file_uris.append(f"file://{Path(root) / file}")
async def get_shared_recordings_in_browser_session(
self, organization_id: str, browser_session_id: str
) -> list[FileInfo]:
"""Get recording files with URLs for a browser session from local storage."""
file_uris = await self.list_recordings_in_browser_session(organization_id, browser_session_id)
if not file_uris:
return []

View file

@ -25,6 +25,7 @@ from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType, LogEntityT
from skyvern.forge.sdk.artifact.storage.base import (
FILE_EXTENTSION_MAP,
BaseStorage,
_file_infos_from_artifacts,
_file_infos_from_download_artifacts,
)
from skyvern.forge.sdk.models import Step
@ -433,17 +434,61 @@ class S3Storage(BaseStorage):
in_progress=True,
)
async def list_recordings_in_browser_session(self, organization_id: str, browser_session_id: str) -> list[str]:
"""List all recording files for a browser session from S3."""
bucket = settings.AWS_S3_BUCKET_ARTIFACTS
uri = f"s3://{bucket}/{self._PATH_VERSION}/{settings.ENV}/{organization_id}/browser_sessions/{browser_session_id}/videos"
return [f"s3://{bucket}/{file}" for file in await self.async_client.list_files(uri=uri)]
async def get_shared_recordings_in_browser_session(
self, organization_id: str, browser_session_id: str
) -> list[FileInfo]:
"""Get recording files with presigned URLs for a browser session."""
object_keys = await self.list_recordings_in_browser_session(organization_id, browser_session_id)
"""Get recording files for a browser session.
Artifact-first when the keyring is configured: query RECORDING rows
scoped to the session and build short signed ``/v1/artifacts/{id}/content``
URLs from them no S3 round-trip per file. Falls back to direct S3
LIST + presigned URLs for legacy sessions and OSS-default deployments
without HMAC signing.
"""
if settings.ARTIFACT_CONTENT_HMAC_KEYRING:
try:
artifacts = await app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type(
browser_session_id=browser_session_id,
organization_id=organization_id,
artifact_type=ArtifactType.RECORDING,
)
except Exception:
LOG.warning(
"Failed to look up browser-session recording artifacts; falling back to presigned S3 URLs",
organization_id=organization_id,
browser_session_id=browser_session_id,
exc_info=True,
)
artifacts = []
# Defensive extension filter — same as the legacy listing path —
# in case a non-recording row sneaks under the same browser_session_id.
artifacts = [
a for a in artifacts if a.uri and (a.uri.lower().endswith(".webm") or a.uri.lower().endswith(".mp4"))
]
if artifacts:
file_infos = await _file_infos_from_artifacts(artifacts, artifact_type=ArtifactType.RECORDING)
# Newest first — match the legacy listing path's ordering.
file_infos.sort(key=lambda f: (f.modified_at is not None, f.modified_at), reverse=True)
return file_infos
# Legacy fallback: keyring unset, DB raised, or session pre-cutover
# with no rows at all. SKY-9286: drop entirely after the bake-in
# window (target 2026-05-03) — every call here is a billable
# ListObjects request.
return await self._get_shared_recordings_in_browser_session_via_listing(
organization_id=organization_id, browser_session_id=browser_session_id
)
async def _get_shared_recordings_in_browser_session_via_listing(
self, *, organization_id: str, browser_session_id: str
) -> list[FileInfo]:
# Direct S3 LIST: legacy fallback for sessions pre-cutover (no
# RECORDING artifact rows) and OSS deployments without a keyring.
# SKY-9286: scheduled for removal once production sessions all have
# rows — every call here is a billable ListObjects request.
bucket = settings.AWS_S3_BUCKET_ARTIFACTS
listing_uri = f"s3://{bucket}/{self._PATH_VERSION}/{settings.ENV}/{organization_id}/browser_sessions/{browser_session_id}/videos"
object_keys = [f"s3://{bucket}/{file}" for file in await self.async_client.list_files(uri=listing_uri)]
if len(object_keys) == 0:
return []
@ -732,22 +777,21 @@ class S3Storage(BaseStorage):
sc = await self._get_storage_class_for_org(organization_id, self.bucket)
await self.async_client.upload_file_from_path(uri, local_file_path, storage_class=sc)
# For downloaded files (only), register an Artifact row scoped to the
# session so the DB is the single source of truth for both the
# ``GET /v1/browser_sessions/{id}`` user-facing listing and the
# agent's baseline-before/after / complete_on_download checks.
# Partial files (``*.crdownload``) get a row too with checksum=None —
# the agent's "still downloading" query reads URI-suffix from the
# row. The row is dropped when Chrome's atomic rename fires
# ``Change.deleted`` for the partial path.
#
# We deliberately let exceptions propagate so the watcher's bounded
# retry can recover from a transient DB outage — silently swallowing
# would leave the file in S3 with no row, invisible to baseline
# diffs and complete_on_download. Both ``upload_file_from_path``
# (S3 overwrite) and ``create_browser_session_download_artifact``
# (idempotent on ``(session, uri)``) are safe to retry.
if artifact_type == "downloads":
# Register a DOWNLOAD Artifact row scoped to the session so the DB
# is the single source of truth for both the
# ``GET /v1/browser_sessions/{id}`` user-facing listing and the
# agent's baseline-before/after / complete_on_download checks.
# Partial files (``*.crdownload``) get a row too with
# checksum=None — the agent's "still downloading" query reads
# URI-suffix from the row. The row is dropped when Chrome's
# atomic rename fires ``Change.deleted`` for the partial path.
#
# Exceptions propagate so the watcher's bounded retry in
# ``browser_controller._watch_and_sync_directory`` can recover
# from a transient DB outage. Both ``upload_file_from_path`` (S3
# overwrite) and ``create_browser_session_download_artifact``
# (idempotent on ``(session, uri)``) are safe to retry.
is_partial = remote_path.endswith(BROWSER_DOWNLOADING_SUFFIX)
checksum = None if is_partial else calculate_sha256_for_file(local_file_path)
await app.ARTIFACT_MANAGER.create_browser_session_download_artifact(
@ -757,6 +801,32 @@ class S3Storage(BaseStorage):
filename=os.path.basename(remote_path),
checksum=checksum,
)
elif artifact_type == "videos":
# Register a RECORDING artifact row so
# ``GET /v1/browser_sessions/{id}`` serves the recording via a
# short signed ``/v1/artifacts/{id}/content`` URL instead of a
# raw S3 presigned URL. Recordings are uploaded once at session
# close (Playwright finalizes the file when the browser context
# closes — there's no partial / mid-write state to track).
#
# Artifact-row creation is best-effort here. The only caller
# (``DefaultPersistentSessionsManager.close_session``) wraps the
# whole final-sync in ``except Exception: LOG.exception(...)``
# without retry, so propagating doesn't recover from a missed
# write the way the watcher's retry does for downloads. The
# safety net is the gated legacy listing fallback in
# ``get_shared_recordings_in_browser_session``: when the session
# has no RECORDING rows we fall through to the S3 LIST path, so
# a row-less recording still surfaces via the legacy presigned
# URL until a subsequent close writes the row.
checksum = calculate_sha256_for_file(local_file_path)
await app.ARTIFACT_MANAGER.create_browser_session_recording_artifact(
organization_id=organization_id,
browser_session_id=browser_session_id,
uri=uri,
filename=os.path.basename(remote_path),
checksum=checksum,
)
return uri

View file

@ -39,17 +39,19 @@ def azure_storage() -> AzureStorageForTests:
@pytest.fixture(autouse=True)
def mock_browser_session_download_artifact_create(monkeypatch: pytest.MonkeyPatch) -> None:
"""Stub out the DB-side artifact-row insert for browser-session downloads.
def mock_browser_session_artifact_create(monkeypatch: pytest.MonkeyPatch) -> None:
"""Stub out the DB-side artifact-row inserts for browser-session files.
Mirrors the s3 storage test fixture see SKY-8861 follow-up. Patches
the module-level ``app`` reference in ``azure.py`` because the forge
app isn't initialized in these storage-only tests.
app isn't initialized in these storage-only tests. Covers both
download and recording artifact creators.
"""
import skyvern.forge.sdk.artifact.storage.azure as azure_module
fake_app = MagicMock()
fake_app.ARTIFACT_MANAGER.create_browser_session_download_artifact = AsyncMock(return_value="a_test")
fake_app.ARTIFACT_MANAGER.create_browser_session_recording_artifact = AsyncMock(return_value="a_test")
monkeypatch.setattr(azure_module, "app", fake_app)

View file

@ -59,15 +59,17 @@ def aws_credentials(monkeypatch: pytest.MonkeyPatch) -> None:
@pytest.fixture(autouse=True)
def mock_browser_session_download_artifact_create(monkeypatch: pytest.MonkeyPatch) -> Generator[None, None, None]:
"""Stub out the DB-side artifact-row insert for browser-session downloads.
def mock_browser_session_artifact_create(monkeypatch: pytest.MonkeyPatch) -> Generator[None, None, None]:
"""Stub out the DB-side artifact-row inserts for browser-session files.
``S3Storage.sync_browser_session_file(artifact_type="downloads")`` now
awaits ``app.ARTIFACT_MANAGER.create_browser_session_download_artifact``
(SKY-8861 follow-up). These storage tests run against a moto S3 with no
forge app initialized, so we monkey-patch the module-level ``app``
reference in ``s3.py`` patching ``app.ARTIFACT_MANAGER`` directly
would trip the lazy-init guard on AppHolder.
``S3Storage.sync_browser_session_file`` now awaits
``app.ARTIFACT_MANAGER.create_browser_session_download_artifact`` (for
``artifact_type="downloads"``) and
``app.ARTIFACT_MANAGER.create_browser_session_recording_artifact`` (for
``artifact_type="videos"``). These storage tests run against a moto S3
with no forge app initialized, so we monkey-patch the module-level
``app`` reference in ``s3.py`` patching ``app.ARTIFACT_MANAGER``
directly would trip the lazy-init guard on AppHolder.
"""
from unittest.mock import MagicMock
@ -75,6 +77,7 @@ def mock_browser_session_download_artifact_create(monkeypatch: pytest.MonkeyPatc
fake_app = MagicMock()
fake_app.ARTIFACT_MANAGER.create_browser_session_download_artifact = AsyncMock(return_value="a_test")
fake_app.ARTIFACT_MANAGER.create_browser_session_recording_artifact = AsyncMock(return_value="a_test")
monkeypatch.setattr(s3_module, "app", fake_app)
yield