mirror of
https://github.com/Skyvern-AI/skyvern.git
synced 2026-04-28 03:30:10 +00:00
feat(sky-8861): browser-session downloads -> short artifact URLs (#5672)
This commit is contained in:
parent
9bc1ae28a6
commit
632462a9da
13 changed files with 1171 additions and 20 deletions
|
|
@ -0,0 +1,43 @@
|
|||
"""add browser_session_id to artifacts
|
||||
|
||||
Revision ID: 2d0f9407ffac
|
||||
Revises: bd362c15b74b
|
||||
Create Date: 2026-04-26T21:42:34.513980+00:00
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "2d0f9407ffac"
|
||||
down_revision: Union[str, None] = "bd362c15b74b"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column("artifacts", sa.Column("browser_session_id", sa.String(), nullable=True))
|
||||
|
||||
# Partial index build on a very large table. CREATE INDEX CONCURRENTLY
|
||||
# avoids the long-held ACCESS EXCLUSIVE lock plain CREATE INDEX would take;
|
||||
# statement_timeout gives the build room to finish on production volumes.
|
||||
with op.get_context().autocommit_block():
|
||||
op.execute("SET statement_timeout = '3h';")
|
||||
op.execute("""
|
||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_artifacts_browser_session_id_partial
|
||||
ON artifacts (browser_session_id)
|
||||
WHERE browser_session_id IS NOT NULL
|
||||
""")
|
||||
op.execute("RESET statement_timeout;")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
with op.get_context().autocommit_block():
|
||||
op.execute("SET statement_timeout = '3h';")
|
||||
op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_artifacts_browser_session_id_partial")
|
||||
op.execute("RESET statement_timeout;")
|
||||
op.drop_column("artifacts", "browser_session_id")
|
||||
|
|
@ -3842,12 +3842,41 @@ class ForgeAgent:
|
|||
randomize_if_missing=False,
|
||||
)
|
||||
context = skyvern_context.current()
|
||||
finalization_run_id = (
|
||||
context.run_id if context and context.run_id else task.workflow_run_id or task.task_id
|
||||
)
|
||||
await app.STORAGE.save_downloaded_files(
|
||||
organization_id=task.organization_id,
|
||||
run_id=context.run_id
|
||||
if context and context.run_id
|
||||
else task.workflow_run_id or task.task_id,
|
||||
run_id=finalization_run_id,
|
||||
)
|
||||
# Tag any session-scoped DOWNLOAD artifacts created during
|
||||
# this run with run_id, so GET /v1/runs/{id} surfaces them
|
||||
# (the watcher in browser_controller can't know the active
|
||||
# run at upload time — see
|
||||
# cloud_docs/BROWSER_SESSION_DOWNLOAD_ARTIFACTS.md).
|
||||
browser_session_id = context.browser_session_id if context else None
|
||||
if browser_session_id and task.organization_id and finalization_run_id:
|
||||
try:
|
||||
claimed = await app.DATABASE.artifacts.claim_session_download_artifacts_for_run(
|
||||
run_id=finalization_run_id,
|
||||
browser_session_id=browser_session_id,
|
||||
organization_id=task.organization_id,
|
||||
run_started_at=task.created_at,
|
||||
)
|
||||
if claimed:
|
||||
LOG.debug(
|
||||
"Claimed session-scoped download artifacts for run",
|
||||
run_id=finalization_run_id,
|
||||
browser_session_id=browser_session_id,
|
||||
claimed=claimed,
|
||||
)
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Failed to claim session-scoped download artifacts for run",
|
||||
run_id=finalization_run_id,
|
||||
browser_session_id=browser_session_id,
|
||||
exc_info=True,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
LOG.warning(
|
||||
"Timeout to save downloaded files",
|
||||
|
|
|
|||
|
|
@ -322,6 +322,55 @@ class ArtifactManager:
|
|||
)
|
||||
return artifact_id
|
||||
|
||||
async def create_browser_session_download_artifact(
|
||||
self,
|
||||
*,
|
||||
organization_id: str,
|
||||
browser_session_id: str,
|
||||
uri: str,
|
||||
filename: str,
|
||||
checksum: str | None = None,
|
||||
) -> str:
|
||||
"""Register a session-scoped downloaded file as an Artifact row.
|
||||
|
||||
Used by the browser_controller's watcher write site
|
||||
(``S3Storage.sync_browser_session_file(artifact_type="downloads")``).
|
||||
Idempotent on ``(organization_id, browser_session_id, uri)`` — the
|
||||
watcher fires repeatedly as a downloaded file grows, so we look up
|
||||
the existing row before inserting.
|
||||
|
||||
``run_id`` is intentionally NOT set here. The watcher runs in a
|
||||
separate process from the agent and does not know which run is
|
||||
currently using the session. Run finalization runs the
|
||||
``claim_session_download_artifacts_for_run`` UPDATE to tag rows
|
||||
whose ``created_at`` falls inside the run's window.
|
||||
"""
|
||||
existing = await app.DATABASE.artifacts.find_artifact_for_browser_session(
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
uri=uri,
|
||||
artifact_type=ArtifactType.DOWNLOAD,
|
||||
)
|
||||
if existing is not None:
|
||||
return existing.artifact_id
|
||||
|
||||
artifact_id = generate_artifact_id()
|
||||
await app.DATABASE.artifacts.create_artifact(
|
||||
artifact_id=artifact_id,
|
||||
artifact_type=ArtifactType.DOWNLOAD,
|
||||
uri=uri,
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
checksum=checksum,
|
||||
)
|
||||
LOG.debug(
|
||||
"Registered session-scoped downloaded file as artifact",
|
||||
artifact_id=artifact_id,
|
||||
browser_session_id=browser_session_id,
|
||||
filename=filename,
|
||||
)
|
||||
return artifact_id
|
||||
|
||||
async def create_thought_artifact(
|
||||
self,
|
||||
thought: Thought,
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ class Artifact(BaseModel):
|
|||
workflow_run_id: str | None = None
|
||||
workflow_run_block_id: str | None = None
|
||||
run_id: str | None = None
|
||||
browser_session_id: str | None = None
|
||||
observer_cruise_id: str | None = None
|
||||
observer_thought_id: str | None = None
|
||||
ai_suggestion_id: str | None = None
|
||||
|
|
|
|||
|
|
@ -242,16 +242,88 @@ class AzureStorage(BaseStorage):
|
|||
async def list_downloaded_files_in_browser_session(
|
||||
self, organization_id: str, browser_session_id: str
|
||||
) -> list[str]:
|
||||
"""Return Azure URIs of completed downloads. DB-backed; mirrors s3.py."""
|
||||
return await self._list_downloads_for_session(
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
in_progress=False,
|
||||
)
|
||||
|
||||
async def _list_downloads_for_session(
|
||||
self,
|
||||
*,
|
||||
organization_id: str,
|
||||
browser_session_id: str,
|
||||
in_progress: bool,
|
||||
) -> list[str]:
|
||||
"""Shared DB-backed lister with a partial-vs-final discriminator."""
|
||||
if settings.ARTIFACT_CONTENT_HMAC_KEYRING:
|
||||
try:
|
||||
artifacts = await app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type(
|
||||
browser_session_id=browser_session_id,
|
||||
organization_id=organization_id,
|
||||
artifact_type=ArtifactType.DOWNLOAD,
|
||||
)
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Failed to list browser-session download artifacts; falling back to Azure LIST",
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
in_progress=in_progress,
|
||||
exc_info=True,
|
||||
)
|
||||
artifacts = None
|
||||
if artifacts is not None:
|
||||
return [a.uri for a in artifacts if a.uri and a.uri.endswith(BROWSER_DOWNLOADING_SUFFIX) == in_progress]
|
||||
|
||||
uri = f"azure://{settings.AZURE_STORAGE_CONTAINER_ARTIFACTS}/v1/{settings.ENV}/{organization_id}/browser_sessions/{browser_session_id}/downloads"
|
||||
return [
|
||||
files = [
|
||||
f"azure://{settings.AZURE_STORAGE_CONTAINER_ARTIFACTS}/{file}"
|
||||
for file in await self.async_client.list_files(uri=uri)
|
||||
]
|
||||
return [f for f in files if f.endswith(BROWSER_DOWNLOADING_SUFFIX) == in_progress]
|
||||
|
||||
async def get_shared_downloaded_files_in_browser_session(
|
||||
self, organization_id: str, browser_session_id: str
|
||||
) -> list[FileInfo]:
|
||||
object_keys = await self.list_downloaded_files_in_browser_session(organization_id, browser_session_id)
|
||||
# Artifact-first when keyring is configured — see s3.py for rationale.
|
||||
if settings.ARTIFACT_CONTENT_HMAC_KEYRING:
|
||||
try:
|
||||
artifacts = await app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type(
|
||||
browser_session_id=browser_session_id,
|
||||
organization_id=organization_id,
|
||||
artifact_type=ArtifactType.DOWNLOAD,
|
||||
)
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Failed to look up browser-session download artifacts; falling back to SAS URLs",
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
exc_info=True,
|
||||
)
|
||||
artifacts = []
|
||||
# Filter out in-progress partials — user-facing listing must only
|
||||
# show completed downloads. Mirrors s3.py.
|
||||
artifacts = [a for a in artifacts if a.uri and not a.uri.endswith(BROWSER_DOWNLOADING_SUFFIX)]
|
||||
if artifacts:
|
||||
return _file_infos_from_download_artifacts(artifacts)
|
||||
|
||||
return await self._get_shared_downloaded_files_in_browser_session_via_listing(
|
||||
organization_id=organization_id, browser_session_id=browser_session_id
|
||||
)
|
||||
|
||||
async def _get_shared_downloaded_files_in_browser_session_via_listing(
|
||||
self, *, organization_id: str, browser_session_id: str
|
||||
) -> list[FileInfo]:
|
||||
# Direct Azure LIST: legacy fallback for sessions pre-cutover.
|
||||
# ``list_downloaded_files_in_browser_session`` is now DB-backed, so
|
||||
# we can't reuse it here.
|
||||
listing_uri = f"azure://{settings.AZURE_STORAGE_CONTAINER_ARTIFACTS}/v1/{settings.ENV}/{organization_id}/browser_sessions/{browser_session_id}/downloads"
|
||||
object_keys = [
|
||||
f"azure://{settings.AZURE_STORAGE_CONTAINER_ARTIFACTS}/{file}"
|
||||
for file in await self.async_client.list_files(uri=listing_uri)
|
||||
if not file.endswith(BROWSER_DOWNLOADING_SUFFIX)
|
||||
]
|
||||
if len(object_keys) == 0:
|
||||
return []
|
||||
|
||||
|
|
@ -290,12 +362,12 @@ class AzureStorage(BaseStorage):
|
|||
async def list_downloading_files_in_browser_session(
|
||||
self, organization_id: str, browser_session_id: str
|
||||
) -> list[str]:
|
||||
uri = f"azure://{settings.AZURE_STORAGE_CONTAINER_ARTIFACTS}/v1/{settings.ENV}/{organization_id}/browser_sessions/{browser_session_id}/downloads"
|
||||
files = [
|
||||
f"azure://{settings.AZURE_STORAGE_CONTAINER_ARTIFACTS}/{file}"
|
||||
for file in await self.async_client.list_files(uri=uri)
|
||||
]
|
||||
return [file for file in files if file.endswith(BROWSER_DOWNLOADING_SUFFIX)]
|
||||
"""Return Azure URIs of in-progress (``*.crdownload``) downloads. DB-backed."""
|
||||
return await self._list_downloads_for_session(
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
in_progress=True,
|
||||
)
|
||||
|
||||
async def list_recordings_in_browser_session(self, organization_id: str, browser_session_id: str) -> list[str]:
|
||||
"""List all recording files for a browser session from Azure."""
|
||||
|
|
@ -594,6 +666,27 @@ class AzureStorage(BaseStorage):
|
|||
tier = await self._get_storage_tier_for_org(organization_id)
|
||||
tags = await self._get_tags_for_org(organization_id)
|
||||
await self.async_client.upload_file_from_path(uri, local_file_path, tier=tier, tags=tags)
|
||||
|
||||
# See s3.py — DB is the single source of truth for both the user-facing
|
||||
# listing and the agent's baseline / complete_on_download checks.
|
||||
# Partials get a row with checksum=None so the agent can detect "still
|
||||
# downloading"; the row is dropped on Chrome's atomic-rename
|
||||
# ``Change.deleted`` event.
|
||||
#
|
||||
# Exceptions propagate so the watcher's bounded retry can recover from
|
||||
# a transient DB outage — silently swallowing would leave the file in
|
||||
# Azure with no row, invisible to baseline diffs.
|
||||
if artifact_type == "downloads":
|
||||
is_partial = remote_path.endswith(BROWSER_DOWNLOADING_SUFFIX)
|
||||
checksum = None if is_partial else calculate_sha256_for_file(local_file_path)
|
||||
await app.ARTIFACT_MANAGER.create_browser_session_download_artifact(
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
uri=uri,
|
||||
filename=os.path.basename(remote_path),
|
||||
checksum=checksum,
|
||||
)
|
||||
|
||||
return uri
|
||||
|
||||
async def delete_browser_session_file(
|
||||
|
|
@ -604,8 +697,29 @@ class AzureStorage(BaseStorage):
|
|||
remote_path: str,
|
||||
date: str | None = None,
|
||||
) -> None:
|
||||
"""Delete a file from browser session storage in Azure."""
|
||||
"""Delete a file from browser session storage in Azure.
|
||||
|
||||
For ``downloads``, also drop the matching DOWNLOAD artifact row so a
|
||||
subsequent ``GET /v1/browser_sessions/{id}`` doesn't hand out a signed
|
||||
URL that 404s. Mirrors the S3 implementation.
|
||||
"""
|
||||
uri = self._build_browser_session_uri(organization_id, browser_session_id, artifact_type, remote_path, date)
|
||||
if artifact_type == "downloads":
|
||||
try:
|
||||
await app.DATABASE.artifacts.delete_artifact_for_browser_session(
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
uri=uri,
|
||||
artifact_type=ArtifactType.DOWNLOAD,
|
||||
)
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Failed to delete browser-session download artifact row; proceeding with Azure delete",
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
remote_path=remote_path,
|
||||
exc_info=True,
|
||||
)
|
||||
await self.async_client.delete_file(uri)
|
||||
|
||||
async def browser_session_file_exists(
|
||||
|
|
|
|||
|
|
@ -278,14 +278,110 @@ class S3Storage(BaseStorage):
|
|||
async def list_downloaded_files_in_browser_session(
|
||||
self, organization_id: str, browser_session_id: str
|
||||
) -> list[str]:
|
||||
"""Return S3 URIs of completed downloads in the session.
|
||||
|
||||
DB-backed (artifact rows are the source of truth) — see
|
||||
``cloud_docs/BROWSER_SESSION_DOWNLOAD_ARTIFACTS.md``. Used by the agent
|
||||
for baseline-before / baseline-after diffs to detect newly-downloaded
|
||||
files. Excludes ``*.crdownload`` partials; those go through
|
||||
``list_downloading_files_in_browser_session`` instead.
|
||||
|
||||
Falls back to S3 LIST when the keyring is unset (OSS default — no
|
||||
artifact rows exist) or when the DB lookup itself raises.
|
||||
"""
|
||||
return await self._list_downloads_for_session(
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
in_progress=False,
|
||||
)
|
||||
|
||||
async def _list_downloads_for_session(
|
||||
self,
|
||||
*,
|
||||
organization_id: str,
|
||||
browser_session_id: str,
|
||||
in_progress: bool,
|
||||
) -> list[str]:
|
||||
"""Shared DB-backed lister with a partial-vs-final discriminator.
|
||||
|
||||
Centralizes the keyring-gating + DB-failure-fallback so the two public
|
||||
methods stay parallel.
|
||||
"""
|
||||
if settings.ARTIFACT_CONTENT_HMAC_KEYRING:
|
||||
try:
|
||||
artifacts = await app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type(
|
||||
browser_session_id=browser_session_id,
|
||||
organization_id=organization_id,
|
||||
artifact_type=ArtifactType.DOWNLOAD,
|
||||
)
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Failed to list browser-session download artifacts; falling back to S3 LIST",
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
in_progress=in_progress,
|
||||
exc_info=True,
|
||||
)
|
||||
artifacts = None
|
||||
if artifacts is not None:
|
||||
# Honour the partial-vs-final discriminator the agent expects.
|
||||
return [a.uri for a in artifacts if a.uri and a.uri.endswith(BROWSER_DOWNLOADING_SUFFIX) == in_progress]
|
||||
|
||||
bucket = settings.AWS_S3_BUCKET_ARTIFACTS
|
||||
uri = f"s3://{bucket}/{self._PATH_VERSION}/{settings.ENV}/{organization_id}/browser_sessions/{browser_session_id}/downloads"
|
||||
return [f"s3://{bucket}/{file}" for file in await self.async_client.list_files(uri=uri)]
|
||||
files = [f"s3://{bucket}/{file}" for file in await self.async_client.list_files(uri=uri)]
|
||||
return [f for f in files if f.endswith(BROWSER_DOWNLOADING_SUFFIX) == in_progress]
|
||||
|
||||
async def get_shared_downloaded_files_in_browser_session(
|
||||
self, organization_id: str, browser_session_id: str
|
||||
) -> list[FileInfo]:
|
||||
object_keys = await self.list_downloaded_files_in_browser_session(organization_id, browser_session_id)
|
||||
# Artifact-first when keyring is configured: query rows scoped to the
|
||||
# session, build short signed /v1/artifacts URLs from them. See
|
||||
# cloud_docs/BROWSER_SESSION_DOWNLOAD_ARTIFACTS.md.
|
||||
#
|
||||
# OSS-default deployments without HMAC signing fall straight to the
|
||||
# legacy listing path so webhook consumers (no API key) can still
|
||||
# fetch the files via presigned URLs.
|
||||
if settings.ARTIFACT_CONTENT_HMAC_KEYRING:
|
||||
try:
|
||||
artifacts = await app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type(
|
||||
browser_session_id=browser_session_id,
|
||||
organization_id=organization_id,
|
||||
artifact_type=ArtifactType.DOWNLOAD,
|
||||
)
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Failed to look up browser-session download artifacts; falling back to presigned S3 URLs",
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
exc_info=True,
|
||||
)
|
||||
artifacts = []
|
||||
# Filter out in-progress partials — the user-facing listing must
|
||||
# only show completed downloads. Partials still live as artifact
|
||||
# rows so the agent can detect "still downloading" via DB query.
|
||||
artifacts = [a for a in artifacts if a.uri and not a.uri.endswith(BROWSER_DOWNLOADING_SUFFIX)]
|
||||
if artifacts:
|
||||
return _file_infos_from_download_artifacts(artifacts)
|
||||
|
||||
return await self._get_shared_downloaded_files_in_browser_session_via_listing(
|
||||
organization_id=organization_id, browser_session_id=browser_session_id
|
||||
)
|
||||
|
||||
async def _get_shared_downloaded_files_in_browser_session_via_listing(
|
||||
self, *, organization_id: str, browser_session_id: str
|
||||
) -> list[FileInfo]:
|
||||
# Direct S3 LIST: legacy fallback for sessions pre-cutover (no artifact
|
||||
# rows) and OSS deployments without a keyring. We can't go through
|
||||
# ``list_downloaded_files_in_browser_session`` here — that now sources
|
||||
# from artifact rows and would short-circuit to [] on legacy sessions.
|
||||
bucket = settings.AWS_S3_BUCKET_ARTIFACTS
|
||||
listing_uri = f"s3://{bucket}/{self._PATH_VERSION}/{settings.ENV}/{organization_id}/browser_sessions/{browser_session_id}/downloads"
|
||||
object_keys = [
|
||||
f"s3://{bucket}/{file}"
|
||||
for file in await self.async_client.list_files(uri=listing_uri)
|
||||
if not file.endswith(BROWSER_DOWNLOADING_SUFFIX)
|
||||
]
|
||||
if len(object_keys) == 0:
|
||||
return []
|
||||
|
||||
|
|
@ -323,10 +419,19 @@ class S3Storage(BaseStorage):
|
|||
async def list_downloading_files_in_browser_session(
|
||||
self, organization_id: str, browser_session_id: str
|
||||
) -> list[str]:
|
||||
bucket = settings.AWS_S3_BUCKET_ARTIFACTS
|
||||
uri = f"s3://{bucket}/{self._PATH_VERSION}/{settings.ENV}/{organization_id}/browser_sessions/{browser_session_id}/downloads"
|
||||
files = [f"s3://{bucket}/{file}" for file in await self.async_client.list_files(uri=uri)]
|
||||
return [file for file in files if file.endswith(BROWSER_DOWNLOADING_SUFFIX)]
|
||||
"""Return S3 URIs of in-progress downloads (``*.crdownload``).
|
||||
|
||||
DB-backed (artifact rows are the source of truth). The watcher creates
|
||||
a partial artifact row (``checksum=None``) the moment Chrome opens the
|
||||
``.crdownload`` file; that row is dropped when Chrome's atomic rename
|
||||
fires ``Change.deleted``. Used by ``complete_on_download`` task blocks
|
||||
to wait until in-flight downloads finish.
|
||||
"""
|
||||
return await self._list_downloads_for_session(
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
in_progress=True,
|
||||
)
|
||||
|
||||
async def list_recordings_in_browser_session(self, organization_id: str, browser_session_id: str) -> list[str]:
|
||||
"""List all recording files for a browser session from S3."""
|
||||
|
|
@ -626,6 +731,33 @@ class S3Storage(BaseStorage):
|
|||
uri = self._build_browser_session_uri(organization_id, browser_session_id, artifact_type, remote_path, date)
|
||||
sc = await self._get_storage_class_for_org(organization_id, self.bucket)
|
||||
await self.async_client.upload_file_from_path(uri, local_file_path, storage_class=sc)
|
||||
|
||||
# For downloaded files (only), register an Artifact row scoped to the
|
||||
# session so the DB is the single source of truth for both the
|
||||
# ``GET /v1/browser_sessions/{id}`` user-facing listing and the
|
||||
# agent's baseline-before/after / complete_on_download checks.
|
||||
# Partial files (``*.crdownload``) get a row too with checksum=None —
|
||||
# the agent's "still downloading" query reads URI-suffix from the
|
||||
# row. The row is dropped when Chrome's atomic rename fires
|
||||
# ``Change.deleted`` for the partial path.
|
||||
#
|
||||
# We deliberately let exceptions propagate so the watcher's bounded
|
||||
# retry can recover from a transient DB outage — silently swallowing
|
||||
# would leave the file in S3 with no row, invisible to baseline
|
||||
# diffs and complete_on_download. Both ``upload_file_from_path``
|
||||
# (S3 overwrite) and ``create_browser_session_download_artifact``
|
||||
# (idempotent on ``(session, uri)``) are safe to retry.
|
||||
if artifact_type == "downloads":
|
||||
is_partial = remote_path.endswith(BROWSER_DOWNLOADING_SUFFIX)
|
||||
checksum = None if is_partial else calculate_sha256_for_file(local_file_path)
|
||||
await app.ARTIFACT_MANAGER.create_browser_session_download_artifact(
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
uri=uri,
|
||||
filename=os.path.basename(remote_path),
|
||||
checksum=checksum,
|
||||
)
|
||||
|
||||
return uri
|
||||
|
||||
async def delete_browser_session_file(
|
||||
|
|
@ -636,8 +768,31 @@ class S3Storage(BaseStorage):
|
|||
remote_path: str,
|
||||
date: str | None = None,
|
||||
) -> None:
|
||||
"""Delete a file from browser session storage in S3."""
|
||||
"""Delete a file from browser session storage in S3.
|
||||
|
||||
For ``downloads``, also drop the matching DOWNLOAD artifact row so a
|
||||
subsequent ``GET /v1/browser_sessions/{id}`` doesn't hand out a signed
|
||||
URL that 404s. The DB delete runs before the S3 delete: if S3 fails
|
||||
we'd rather have an artifact row missing (the listing fallback covers
|
||||
it) than a row pointing at a deleted object.
|
||||
"""
|
||||
uri = self._build_browser_session_uri(organization_id, browser_session_id, artifact_type, remote_path, date)
|
||||
if artifact_type == "downloads":
|
||||
try:
|
||||
await app.DATABASE.artifacts.delete_artifact_for_browser_session(
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
uri=uri,
|
||||
artifact_type=ArtifactType.DOWNLOAD,
|
||||
)
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Failed to delete browser-session download artifact row; proceeding with S3 delete",
|
||||
organization_id=organization_id,
|
||||
browser_session_id=browser_session_id,
|
||||
remote_path=remote_path,
|
||||
exc_info=True,
|
||||
)
|
||||
await self.async_client.delete_file(uri, log_exception=True)
|
||||
|
||||
async def browser_session_file_exists(
|
||||
|
|
|
|||
|
|
@ -38,6 +38,21 @@ def azure_storage() -> AzureStorageForTests:
|
|||
return AzureStorageForTests(container=TEST_CONTAINER)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_browser_session_download_artifact_create(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
"""Stub out the DB-side artifact-row insert for browser-session downloads.
|
||||
|
||||
Mirrors the s3 storage test fixture — see SKY-8861 follow-up. Patches
|
||||
the module-level ``app`` reference in ``azure.py`` because the forge
|
||||
app isn't initialized in these storage-only tests.
|
||||
"""
|
||||
import skyvern.forge.sdk.artifact.storage.azure as azure_module
|
||||
|
||||
fake_app = MagicMock()
|
||||
fake_app.ARTIFACT_MANAGER.create_browser_session_download_artifact = AsyncMock(return_value="a_test")
|
||||
monkeypatch.setattr(azure_module, "app", fake_app)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestAzureStorageBrowserSessionFiles:
|
||||
"""Test AzureStorage browser session file methods."""
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import io
|
|||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Generator
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import boto3
|
||||
import pytest
|
||||
|
|
@ -57,6 +58,27 @@ def aws_credentials(monkeypatch: pytest.MonkeyPatch) -> None:
|
|||
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing")
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_browser_session_download_artifact_create(monkeypatch: pytest.MonkeyPatch) -> Generator[None, None, None]:
|
||||
"""Stub out the DB-side artifact-row insert for browser-session downloads.
|
||||
|
||||
``S3Storage.sync_browser_session_file(artifact_type="downloads")`` now
|
||||
awaits ``app.ARTIFACT_MANAGER.create_browser_session_download_artifact``
|
||||
(SKY-8861 follow-up). These storage tests run against a moto S3 with no
|
||||
forge app initialized, so we monkey-patch the module-level ``app``
|
||||
reference in ``s3.py`` — patching ``app.ARTIFACT_MANAGER`` directly
|
||||
would trip the lazy-init guard on AppHolder.
|
||||
"""
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import skyvern.forge.sdk.artifact.storage.s3 as s3_module
|
||||
|
||||
fake_app = MagicMock()
|
||||
fake_app.ARTIFACT_MANAGER.create_browser_session_download_artifact = AsyncMock(return_value="a_test")
|
||||
monkeypatch.setattr(s3_module, "app", fake_app)
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def moto_server() -> Generator[str, None, None]:
|
||||
# Note: pass `port=0` to get a random free port.
|
||||
|
|
|
|||
|
|
@ -234,6 +234,11 @@ class ArtifactModel(Base):
|
|||
"run_id",
|
||||
postgresql_where=text("run_id IS NOT NULL"),
|
||||
),
|
||||
Index(
|
||||
"ix_artifacts_browser_session_id_partial",
|
||||
"browser_session_id",
|
||||
postgresql_where=text("browser_session_id IS NOT NULL"),
|
||||
),
|
||||
)
|
||||
|
||||
artifact_id = Column(String, primary_key=True, default=generate_artifact_id)
|
||||
|
|
@ -249,6 +254,7 @@ class ArtifactModel(Base):
|
|||
uri = Column(String)
|
||||
bundle_key = Column(String, nullable=True)
|
||||
run_id = Column(String, nullable=True)
|
||||
browser_session_id = Column(String, nullable=True)
|
||||
checksum = Column(String, nullable=True)
|
||||
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
|
||||
modified_at = Column(
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
from typing import TYPE_CHECKING, Callable
|
||||
|
||||
import structlog
|
||||
|
|
@ -48,6 +49,7 @@ class ArtifactsRepository(BaseRepository):
|
|||
thought_id: str | None = None,
|
||||
ai_suggestion_id: str | None = None,
|
||||
checksum: str | None = None,
|
||||
browser_session_id: str | None = None,
|
||||
) -> Artifact:
|
||||
async with self.Session() as session:
|
||||
new_artifact = ArtifactModel(
|
||||
|
|
@ -64,6 +66,7 @@ class ArtifactsRepository(BaseRepository):
|
|||
ai_suggestion_id=ai_suggestion_id,
|
||||
organization_id=organization_id,
|
||||
checksum=checksum,
|
||||
browser_session_id=browser_session_id,
|
||||
)
|
||||
session.add(new_artifact)
|
||||
await session.commit()
|
||||
|
|
@ -413,6 +416,119 @@ class ArtifactsRepository(BaseRepository):
|
|||
).all()
|
||||
return [convert_to_artifact(a, self.debug_enabled) for a in artifacts]
|
||||
|
||||
@db_operation("find_artifact_for_browser_session")
|
||||
async def find_artifact_for_browser_session(
|
||||
self,
|
||||
organization_id: str,
|
||||
browser_session_id: str,
|
||||
uri: str,
|
||||
artifact_type: ArtifactType,
|
||||
) -> Artifact | None:
|
||||
"""Return the existing artifact row for ``(browser_session_id, uri)`` if any.
|
||||
|
||||
Used by :meth:`ArtifactManager.create_browser_session_download_artifact`
|
||||
to stay idempotent: the watcher fires repeatedly as a downloaded file
|
||||
grows, so we look up the existing row before inserting.
|
||||
"""
|
||||
async with self.Session() as session:
|
||||
artifact = (
|
||||
await session.scalars(
|
||||
select(ArtifactModel)
|
||||
.filter(ArtifactModel.browser_session_id == browser_session_id)
|
||||
.filter(ArtifactModel.artifact_type == artifact_type)
|
||||
.filter(ArtifactModel.organization_id == organization_id)
|
||||
.filter(ArtifactModel.uri == uri)
|
||||
.order_by(ArtifactModel.created_at.desc())
|
||||
)
|
||||
).first()
|
||||
if artifact:
|
||||
return convert_to_artifact(artifact, self.debug_enabled)
|
||||
return None
|
||||
|
||||
@db_operation("list_artifacts_for_browser_session_by_type")
|
||||
async def list_artifacts_for_browser_session_by_type(
|
||||
self,
|
||||
browser_session_id: str,
|
||||
organization_id: str,
|
||||
artifact_type: ArtifactType,
|
||||
) -> list[Artifact]:
|
||||
"""List all artifacts for a browser session filtered by type.
|
||||
|
||||
Filters directly on the partial index ``ix_artifacts_browser_session_id_partial``
|
||||
and returns the rows ordered by creation time. Used by the
|
||||
``GET /v1/browser_sessions/{id}`` read path.
|
||||
"""
|
||||
async with self.Session() as session:
|
||||
artifacts = (
|
||||
await session.scalars(
|
||||
select(ArtifactModel)
|
||||
.filter(ArtifactModel.browser_session_id == browser_session_id)
|
||||
.filter(ArtifactModel.artifact_type == artifact_type)
|
||||
.filter(ArtifactModel.organization_id == organization_id)
|
||||
.order_by(ArtifactModel.created_at)
|
||||
)
|
||||
).all()
|
||||
return [convert_to_artifact(a, self.debug_enabled) for a in artifacts]
|
||||
|
||||
@db_operation("claim_session_download_artifacts_for_run")
|
||||
async def claim_session_download_artifacts_for_run(
|
||||
self,
|
||||
run_id: str,
|
||||
browser_session_id: str,
|
||||
organization_id: str,
|
||||
run_started_at: datetime.datetime,
|
||||
) -> int:
|
||||
"""Tag session-scoped DOWNLOAD artifacts that landed during this run with ``run_id``.
|
||||
|
||||
Called at run finalization. ``occupy_browser_session`` ensures at
|
||||
most one run is active on a session at a time, so the time-window
|
||||
match is unambiguous.
|
||||
|
||||
Returns the number of rows updated. Idempotent: re-running picks up
|
||||
only ``run_id IS NULL`` rows, so a retry after success is a no-op.
|
||||
"""
|
||||
async with self.Session() as session:
|
||||
result = await session.execute(
|
||||
update(ArtifactModel)
|
||||
.where(ArtifactModel.browser_session_id == browser_session_id)
|
||||
.where(ArtifactModel.organization_id == organization_id)
|
||||
.where(ArtifactModel.artifact_type == ArtifactType.DOWNLOAD)
|
||||
.where(ArtifactModel.run_id.is_(None))
|
||||
.where(ArtifactModel.created_at >= run_started_at)
|
||||
.values(run_id=run_id)
|
||||
)
|
||||
await session.commit()
|
||||
return result.rowcount or 0
|
||||
|
||||
@db_operation("delete_artifact_for_browser_session")
|
||||
async def delete_artifact_for_browser_session(
|
||||
self,
|
||||
organization_id: str,
|
||||
browser_session_id: str,
|
||||
uri: str,
|
||||
artifact_type: ArtifactType,
|
||||
) -> int:
|
||||
"""Delete the artifact row for ``(browser_session_id, uri)`` if any.
|
||||
|
||||
Mirror of :meth:`find_artifact_for_browser_session` for the watcher's
|
||||
``Change.deleted`` path: when the user/browser removes a downloaded
|
||||
file we drop the row too, otherwise the next API read would return a
|
||||
signed URL pointing at a deleted S3 object.
|
||||
|
||||
Returns the number of rows removed (0 or 1). Safe to call when no row
|
||||
exists.
|
||||
"""
|
||||
async with self.Session() as session:
|
||||
result = await session.execute(
|
||||
delete(ArtifactModel)
|
||||
.where(ArtifactModel.browser_session_id == browser_session_id)
|
||||
.where(ArtifactModel.organization_id == organization_id)
|
||||
.where(ArtifactModel.artifact_type == artifact_type)
|
||||
.where(ArtifactModel.uri == uri)
|
||||
)
|
||||
await session.commit()
|
||||
return result.rowcount or 0
|
||||
|
||||
@db_operation("get_artifact_for_run")
|
||||
async def get_artifact_for_run(
|
||||
self,
|
||||
|
|
|
|||
|
|
@ -379,6 +379,7 @@ def convert_to_artifact(artifact_model: ArtifactModel, debug_enabled: bool = Fal
|
|||
workflow_run_id=artifact_model.workflow_run_id,
|
||||
workflow_run_block_id=artifact_model.workflow_run_block_id,
|
||||
run_id=artifact_model.run_id,
|
||||
browser_session_id=artifact_model.browser_session_id,
|
||||
observer_cruise_id=artifact_model.observer_cruise_id,
|
||||
observer_thought_id=artifact_model.observer_thought_id,
|
||||
created_at=artifact_model.created_at,
|
||||
|
|
|
|||
|
|
@ -4924,10 +4924,37 @@ class WorkflowService:
|
|||
try:
|
||||
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
|
||||
context = skyvern_context.current()
|
||||
finalization_run_id = context.run_id if context and context.run_id else workflow_run.workflow_run_id
|
||||
await app.STORAGE.save_downloaded_files(
|
||||
organization_id=workflow_run.organization_id,
|
||||
run_id=context.run_id if context and context.run_id else workflow_run.workflow_run_id,
|
||||
run_id=finalization_run_id,
|
||||
)
|
||||
# Tag any session-scoped DOWNLOAD artifacts created during this
|
||||
# workflow run with run_id (see
|
||||
# cloud_docs/BROWSER_SESSION_DOWNLOAD_ARTIFACTS.md).
|
||||
browser_session_id = context.browser_session_id if context else None
|
||||
if browser_session_id and finalization_run_id:
|
||||
try:
|
||||
claimed = await app.DATABASE.artifacts.claim_session_download_artifacts_for_run(
|
||||
run_id=finalization_run_id,
|
||||
browser_session_id=browser_session_id,
|
||||
organization_id=workflow_run.organization_id,
|
||||
run_started_at=workflow_run.created_at,
|
||||
)
|
||||
if claimed:
|
||||
LOG.debug(
|
||||
"Claimed session-scoped download artifacts for workflow run",
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
browser_session_id=browser_session_id,
|
||||
claimed=claimed,
|
||||
)
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Failed to claim session-scoped download artifacts for workflow run",
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
browser_session_id=browser_session_id,
|
||||
exc_info=True,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
LOG.warning(
|
||||
"Timeout to save downloaded files",
|
||||
|
|
|
|||
573
tests/unit/test_browser_session_download_artifacts.py
Normal file
573
tests/unit/test_browser_session_download_artifacts.py
Normal file
|
|
@ -0,0 +1,573 @@
|
|||
"""Tests for the browser-session download artifact pipeline.
|
||||
|
||||
See ``cloud_docs/BROWSER_SESSION_DOWNLOAD_ARTIFACTS.md`` for the design.
|
||||
The unit-level tests below cover:
|
||||
|
||||
- ``ArtifactManager.create_browser_session_download_artifact`` — DB-only
|
||||
helper used by the watcher write site, idempotent on
|
||||
``(organization_id, browser_session_id, uri)``.
|
||||
- ``S3Storage.sync_browser_session_file(artifact_type="downloads")`` — write
|
||||
site that registers the artifact row after a successful upload, skips
|
||||
files matching ``BROWSER_DOWNLOADING_SUFFIX``.
|
||||
- ``S3Storage.get_shared_downloaded_files_in_browser_session`` — artifact-
|
||||
first read with legacy S3-list fallback.
|
||||
- The end-of-run claim ``UPDATE`` is exercised separately by the
|
||||
repository-level tests (DB-shape only — no claim wiring yet).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import pytest
|
||||
|
||||
from skyvern.forge.sdk.artifact.manager import ArtifactManager
|
||||
from skyvern.forge.sdk.artifact.models import Artifact, ArtifactType
|
||||
from skyvern.forge.sdk.artifact.storage.s3 import S3Storage
|
||||
|
||||
_DUMMY_KEYRING_JSON = '{"current_kid": "k1", "keys": {"k1": {"secret": "0000000000000000000000000000000000000000000000000000000000000000"}}}'
|
||||
|
||||
|
||||
def _is_amazonaws_s3_url(url: str) -> bool:
|
||||
"""Strict hostname-suffix check (closes CodeQL py/incomplete-url-substring-sanitization)."""
|
||||
host = urlparse(url).hostname
|
||||
if host is None:
|
||||
return False
|
||||
return host == "s3.amazonaws.com" or host.endswith(".s3.amazonaws.com")
|
||||
|
||||
|
||||
def _make_artifact(
|
||||
artifact_id: str,
|
||||
uri: str,
|
||||
*,
|
||||
browser_session_id: str = "pbs_1",
|
||||
run_id: str | None = None,
|
||||
checksum: str | None = None,
|
||||
created_at: str = "2026-04-25T00:00:00Z",
|
||||
) -> Artifact:
|
||||
return Artifact(
|
||||
artifact_id=artifact_id,
|
||||
artifact_type=ArtifactType.DOWNLOAD,
|
||||
uri=uri,
|
||||
organization_id="o_1",
|
||||
run_id=run_id,
|
||||
browser_session_id=browser_session_id,
|
||||
checksum=checksum,
|
||||
created_at=created_at,
|
||||
modified_at=created_at,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def keyring_configured():
|
||||
from skyvern.config import settings
|
||||
|
||||
with patch.object(settings, "ARTIFACT_CONTENT_HMAC_KEYRING", _DUMMY_KEYRING_JSON):
|
||||
yield
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# create_browser_session_download_artifact (manager helper)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_browser_session_download_artifact_inserts_when_no_existing_row():
|
||||
manager = ArtifactManager()
|
||||
find_existing = AsyncMock(return_value=None)
|
||||
mock_db_create = AsyncMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"skyvern.forge.sdk.artifact.manager.app.DATABASE.artifacts.find_artifact_for_browser_session",
|
||||
find_existing,
|
||||
),
|
||||
patch(
|
||||
"skyvern.forge.sdk.artifact.manager.app.DATABASE.artifacts.create_artifact",
|
||||
mock_db_create,
|
||||
),
|
||||
):
|
||||
artifact_id = await manager.create_browser_session_download_artifact(
|
||||
organization_id="o_1",
|
||||
browser_session_id="pbs_1",
|
||||
uri="s3://skyvern-artifacts/v1/local/o_1/browser_sessions/pbs_1/downloads/file.pdf",
|
||||
filename="file.pdf",
|
||||
checksum="sha-xyz",
|
||||
)
|
||||
|
||||
assert artifact_id.startswith("a_")
|
||||
mock_db_create.assert_awaited_once()
|
||||
_, kwargs = mock_db_create.call_args
|
||||
assert kwargs["artifact_type"] == ArtifactType.DOWNLOAD
|
||||
assert kwargs["browser_session_id"] == "pbs_1"
|
||||
assert kwargs["organization_id"] == "o_1"
|
||||
assert kwargs["checksum"] == "sha-xyz"
|
||||
# No run_id at write time — claim happens at run finalization.
|
||||
assert kwargs.get("run_id") is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_browser_session_download_artifact_is_idempotent_per_session_and_uri():
|
||||
"""The watcher fires repeatedly as a downloaded file grows. Every call
|
||||
after the first must reuse the existing artifact_id."""
|
||||
manager = ArtifactManager()
|
||||
existing = _make_artifact(
|
||||
"a_existing",
|
||||
"s3://skyvern-artifacts/v1/local/o_1/browser_sessions/pbs_1/downloads/file.pdf",
|
||||
)
|
||||
find_existing = AsyncMock(return_value=existing)
|
||||
mock_db_create = AsyncMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"skyvern.forge.sdk.artifact.manager.app.DATABASE.artifacts.find_artifact_for_browser_session",
|
||||
find_existing,
|
||||
),
|
||||
patch(
|
||||
"skyvern.forge.sdk.artifact.manager.app.DATABASE.artifacts.create_artifact",
|
||||
mock_db_create,
|
||||
),
|
||||
):
|
||||
artifact_id = await manager.create_browser_session_download_artifact(
|
||||
organization_id="o_1",
|
||||
browser_session_id="pbs_1",
|
||||
uri=existing.uri,
|
||||
filename="file.pdf",
|
||||
checksum="sha-xyz",
|
||||
)
|
||||
|
||||
assert artifact_id == "a_existing"
|
||||
mock_db_create.assert_not_awaited()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Storage write site — sync_browser_session_file
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sync_browser_session_file_registers_download_artifact():
|
||||
"""A successful 'downloads' sync must create an Artifact row scoped to the session."""
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
storage.async_client.upload_file_from_path = AsyncMock()
|
||||
|
||||
mock_create = AsyncMock(return_value="a_new")
|
||||
mock_artifact_manager = MagicMock()
|
||||
mock_artifact_manager.create_browser_session_download_artifact = mock_create
|
||||
|
||||
with (
|
||||
patch.object(storage, "_get_storage_class_for_org", new=AsyncMock(return_value=MagicMock())),
|
||||
patch("skyvern.forge.sdk.artifact.storage.s3.calculate_sha256_for_file", return_value="sha-1"),
|
||||
patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module,
|
||||
):
|
||||
app_module.ARTIFACT_MANAGER = mock_artifact_manager
|
||||
await storage.sync_browser_session_file(
|
||||
organization_id="o_1",
|
||||
browser_session_id="pbs_1",
|
||||
artifact_type="downloads",
|
||||
local_file_path="/tmp/file.pdf",
|
||||
remote_path="file.pdf",
|
||||
)
|
||||
|
||||
mock_create.assert_awaited_once()
|
||||
_, kwargs = mock_create.call_args
|
||||
assert kwargs["organization_id"] == "o_1"
|
||||
assert kwargs["browser_session_id"] == "pbs_1"
|
||||
assert kwargs["filename"] == "file.pdf"
|
||||
assert kwargs["checksum"] == "sha-1"
|
||||
assert kwargs["uri"].startswith("s3://") and "browser_sessions/pbs_1/downloads/" in kwargs["uri"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sync_browser_session_file_skips_artifact_for_non_download_types():
|
||||
"""We only register artifact rows for downloads — videos, browser profiles, etc. stay untouched."""
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
storage.async_client.upload_file_from_path = AsyncMock()
|
||||
|
||||
mock_create = AsyncMock()
|
||||
mock_artifact_manager = MagicMock()
|
||||
mock_artifact_manager.create_browser_session_download_artifact = mock_create
|
||||
|
||||
with (
|
||||
patch.object(storage, "_get_storage_class_for_org", new=AsyncMock(return_value=MagicMock())),
|
||||
patch("skyvern.forge.sdk.artifact.storage.s3.calculate_sha256_for_file", return_value="sha-2"),
|
||||
patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module,
|
||||
):
|
||||
app_module.ARTIFACT_MANAGER = mock_artifact_manager
|
||||
await storage.sync_browser_session_file(
|
||||
organization_id="o_1",
|
||||
browser_session_id="pbs_1",
|
||||
artifact_type="videos",
|
||||
local_file_path="/tmp/recording.webm",
|
||||
remote_path="recording.webm",
|
||||
)
|
||||
|
||||
mock_create.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sync_browser_session_file_propagates_artifact_row_failure():
|
||||
"""If the artifact-row insert raises after the upload succeeds, the storage
|
||||
layer must propagate so the watcher's bounded retry catches it.
|
||||
Swallowing would leave the file in S3 with no row — invisible to the
|
||||
DB-backed agent baseline diffs."""
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
storage.async_client.upload_file_from_path = AsyncMock()
|
||||
|
||||
mock_create = AsyncMock(side_effect=RuntimeError("DB unreachable"))
|
||||
mock_artifact_manager = MagicMock()
|
||||
mock_artifact_manager.create_browser_session_download_artifact = mock_create
|
||||
|
||||
with (
|
||||
patch.object(storage, "_get_storage_class_for_org", new=AsyncMock(return_value=MagicMock())),
|
||||
patch("skyvern.forge.sdk.artifact.storage.s3.calculate_sha256_for_file", return_value="sha-99"),
|
||||
patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module,
|
||||
):
|
||||
app_module.ARTIFACT_MANAGER = mock_artifact_manager
|
||||
with pytest.raises(RuntimeError, match="DB unreachable"):
|
||||
await storage.sync_browser_session_file(
|
||||
organization_id="o_1",
|
||||
browser_session_id="pbs_1",
|
||||
artifact_type="downloads",
|
||||
local_file_path="/tmp/file.pdf",
|
||||
remote_path="file.pdf",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sync_browser_session_file_creates_partial_artifact_with_null_checksum():
|
||||
"""Partials (``*.crdownload``) get an artifact row with checksum=None so
|
||||
the agent can detect "still downloading" via DB query. The row is dropped
|
||||
when Chrome's atomic rename fires Change.deleted."""
|
||||
from skyvern.constants import BROWSER_DOWNLOADING_SUFFIX
|
||||
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
storage.async_client.upload_file_from_path = AsyncMock()
|
||||
|
||||
mock_create = AsyncMock(return_value="a_partial")
|
||||
mock_artifact_manager = MagicMock()
|
||||
mock_artifact_manager.create_browser_session_download_artifact = mock_create
|
||||
mock_checksum = MagicMock() # must NOT be called for partials
|
||||
|
||||
with (
|
||||
patch.object(storage, "_get_storage_class_for_org", new=AsyncMock(return_value=MagicMock())),
|
||||
patch("skyvern.forge.sdk.artifact.storage.s3.calculate_sha256_for_file", mock_checksum),
|
||||
patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module,
|
||||
):
|
||||
app_module.ARTIFACT_MANAGER = mock_artifact_manager
|
||||
await storage.sync_browser_session_file(
|
||||
organization_id="o_1",
|
||||
browser_session_id="pbs_1",
|
||||
artifact_type="downloads",
|
||||
local_file_path="/tmp/file.pdf.crdownload",
|
||||
remote_path=f"file.pdf{BROWSER_DOWNLOADING_SUFFIX}",
|
||||
)
|
||||
|
||||
mock_create.assert_awaited_once()
|
||||
_, kwargs = mock_create.call_args
|
||||
assert kwargs["checksum"] is None
|
||||
assert kwargs["uri"].endswith(BROWSER_DOWNLOADING_SUFFIX)
|
||||
# Partial files: checksum computation skipped — file is mid-write.
|
||||
mock_checksum.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Storage read site — get_shared_downloaded_files_in_browser_session
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_shared_downloaded_files_in_browser_session_uses_artifact_urls(keyring_configured):
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
storage.async_client.list_files = AsyncMock() # must NOT be called
|
||||
storage.async_client.create_presigned_urls = AsyncMock() # must NOT be called
|
||||
|
||||
artifact = _make_artifact(
|
||||
"a_42",
|
||||
"s3://skyvern-artifacts/v1/local/o_1/browser_sessions/pbs_1/downloads/invoice.pdf",
|
||||
checksum="sha-from-db",
|
||||
)
|
||||
mock_list = AsyncMock(return_value=[artifact])
|
||||
build_url = MagicMock(return_value="https://api.skyvern.com/v1/artifacts/a_42/content?expiry=x&kid=y&sig=z")
|
||||
|
||||
with patch("skyvern.forge.sdk.artifact.storage.base.app") as base_app:
|
||||
with patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app:
|
||||
s3_app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type = mock_list
|
||||
base_app.ARTIFACT_MANAGER.build_signed_content_url = build_url
|
||||
result = await storage.get_shared_downloaded_files_in_browser_session(
|
||||
organization_id="o_1", browser_session_id="pbs_1"
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].url.startswith("https://api.skyvern.com/v1/artifacts/a_42/content")
|
||||
assert result[0].checksum == "sha-from-db"
|
||||
storage.async_client.list_files.assert_not_awaited()
|
||||
storage.async_client.create_presigned_urls.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_shared_downloaded_files_in_browser_session_falls_back_to_presigned_for_legacy(keyring_configured):
|
||||
"""Pre-cutover sessions have no artifact rows. Files must still surface as presigned URLs."""
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
object_uri = "s3://skyvern-artifacts/v1/local/o_1/browser_sessions/pbs_old/downloads/legacy.pdf"
|
||||
storage.async_client.list_files = AsyncMock(return_value=[object_uri.split("/", 3)[-1]])
|
||||
storage.async_client.get_object_info = AsyncMock(
|
||||
return_value={
|
||||
"Metadata": {"sha256_checksum": "sha-old", "original_filename": "legacy.pdf"},
|
||||
"LastModified": None,
|
||||
}
|
||||
)
|
||||
storage.async_client.create_presigned_urls = AsyncMock(
|
||||
return_value=["https://skyvern-artifacts.s3.amazonaws.com/...?sig=old"]
|
||||
)
|
||||
|
||||
mock_list = AsyncMock(return_value=[])
|
||||
build_url = MagicMock() # must NOT be called
|
||||
|
||||
with patch("skyvern.forge.sdk.artifact.storage.base.app") as base_app:
|
||||
with patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app:
|
||||
s3_app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type = mock_list
|
||||
base_app.ARTIFACT_MANAGER.build_signed_content_url = build_url
|
||||
result = await storage.get_shared_downloaded_files_in_browser_session(
|
||||
organization_id="o_1", browser_session_id="pbs_old"
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert _is_amazonaws_s3_url(result[0].url)
|
||||
build_url.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_shared_downloaded_files_in_browser_session_filters_partial_artifacts(keyring_configured):
|
||||
"""User-facing listing must hide ``*.crdownload`` rows even when DB returns them.
|
||||
Partial rows exist for the agent's "still downloading" check, not for end users."""
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
storage.async_client.list_files = AsyncMock() # must NOT be called
|
||||
|
||||
completed = _make_artifact(
|
||||
"a_done",
|
||||
"s3://skyvern-artifacts/v1/local/o_1/browser_sessions/pbs_1/downloads/done.pdf",
|
||||
checksum="sha-1",
|
||||
)
|
||||
partial = _make_artifact(
|
||||
"a_partial",
|
||||
"s3://skyvern-artifacts/v1/local/o_1/browser_sessions/pbs_1/downloads/inflight.pdf.crdownload",
|
||||
)
|
||||
mock_list = AsyncMock(return_value=[partial, completed])
|
||||
build_url = MagicMock(return_value="https://api.skyvern.com/v1/artifacts/a_done/content?expiry=x&kid=y&sig=z")
|
||||
|
||||
with (
|
||||
patch("skyvern.forge.sdk.artifact.storage.base.app") as base_app,
|
||||
patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app,
|
||||
):
|
||||
s3_app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type = mock_list
|
||||
base_app.ARTIFACT_MANAGER.build_signed_content_url = build_url
|
||||
result = await storage.get_shared_downloaded_files_in_browser_session(
|
||||
organization_id="o_1", browser_session_id="pbs_1"
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert "a_done" in result[0].url
|
||||
storage.async_client.list_files.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_downloaded_files_in_browser_session_db_backed_filters_partials(keyring_configured):
|
||||
"""list_downloaded_files_in_browser_session is DB-backed and must exclude
|
||||
partials — the agent uses this for baseline diff and a .crdownload entry
|
||||
would falsely look like a completed download."""
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
storage.async_client.list_files = AsyncMock() # must NOT be called
|
||||
|
||||
completed = _make_artifact(
|
||||
"a_done",
|
||||
"s3://skyvern-artifacts/v1/local/o_1/browser_sessions/pbs_1/downloads/done.pdf",
|
||||
)
|
||||
partial = _make_artifact(
|
||||
"a_partial",
|
||||
"s3://skyvern-artifacts/v1/local/o_1/browser_sessions/pbs_1/downloads/inflight.pdf.crdownload",
|
||||
)
|
||||
mock_list = AsyncMock(return_value=[partial, completed])
|
||||
|
||||
with patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app:
|
||||
s3_app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type = mock_list
|
||||
result = await storage.list_downloaded_files_in_browser_session(
|
||||
organization_id="o_1", browser_session_id="pbs_1"
|
||||
)
|
||||
|
||||
assert result == [completed.uri]
|
||||
storage.async_client.list_files.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_downloading_files_in_browser_session_db_backed_returns_only_partials(keyring_configured):
|
||||
"""list_downloading_files_in_browser_session is DB-backed and must return
|
||||
only ``*.crdownload`` rows — the agent waits on these for complete_on_download."""
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
storage.async_client.list_files = AsyncMock() # must NOT be called
|
||||
|
||||
completed = _make_artifact(
|
||||
"a_done",
|
||||
"s3://skyvern-artifacts/v1/local/o_1/browser_sessions/pbs_1/downloads/done.pdf",
|
||||
)
|
||||
partial = _make_artifact(
|
||||
"a_partial",
|
||||
"s3://skyvern-artifacts/v1/local/o_1/browser_sessions/pbs_1/downloads/inflight.pdf.crdownload",
|
||||
)
|
||||
mock_list = AsyncMock(return_value=[completed, partial])
|
||||
|
||||
with patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app:
|
||||
s3_app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type = mock_list
|
||||
result = await storage.list_downloading_files_in_browser_session(
|
||||
organization_id="o_1", browser_session_id="pbs_1"
|
||||
)
|
||||
|
||||
assert result == [partial.uri]
|
||||
storage.async_client.list_files.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_downloads_falls_back_to_s3_listing_when_db_raises(keyring_configured):
|
||||
"""Transient DB outage must not break the agent — fall back to S3 LIST."""
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
s3_key = "v1/local/o_1/browser_sessions/pbs_1/downloads/legacy.pdf"
|
||||
storage.async_client.list_files = AsyncMock(return_value=[s3_key])
|
||||
|
||||
mock_list = AsyncMock(side_effect=RuntimeError("DB unreachable"))
|
||||
|
||||
with patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app:
|
||||
s3_app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type = mock_list
|
||||
result = await storage.list_downloaded_files_in_browser_session(
|
||||
organization_id="o_1", browser_session_id="pbs_1"
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].endswith("legacy.pdf") # nosemgrep: incomplete-url-substring-sanitization
|
||||
storage.async_client.list_files.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_shared_downloaded_files_in_browser_session_keyring_unset_skips_artifact_lookup():
|
||||
"""OSS default (no keyring) must skip the artifact path entirely — webhook consumers
|
||||
don't have an API key to hit the signed endpoint."""
|
||||
from skyvern.config import settings
|
||||
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
object_uri = "s3://skyvern-artifacts/v1/local/o_1/browser_sessions/pbs_1/downloads/legacy.pdf"
|
||||
storage.async_client.list_files = AsyncMock(return_value=[object_uri.split("/", 3)[-1]])
|
||||
storage.async_client.get_object_info = AsyncMock(
|
||||
return_value={"Metadata": {"sha256_checksum": "sha-x"}, "LastModified": None}
|
||||
)
|
||||
storage.async_client.create_presigned_urls = AsyncMock(
|
||||
return_value=["https://skyvern-artifacts.s3.amazonaws.com/...?sig=fallback"]
|
||||
)
|
||||
|
||||
mock_list = AsyncMock() # must NOT be called
|
||||
|
||||
with (
|
||||
patch.object(settings, "ARTIFACT_CONTENT_HMAC_KEYRING", None),
|
||||
patch("skyvern.forge.sdk.artifact.storage.s3.app") as s3_app,
|
||||
):
|
||||
s3_app.DATABASE.artifacts.list_artifacts_for_browser_session_by_type = mock_list
|
||||
result = await storage.get_shared_downloaded_files_in_browser_session(
|
||||
organization_id="o_1", browser_session_id="pbs_1"
|
||||
)
|
||||
|
||||
assert len(result) == 1
|
||||
assert _is_amazonaws_s3_url(result[0].url)
|
||||
mock_list.assert_not_awaited()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Storage delete site — delete_browser_session_file
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_browser_session_file_drops_artifact_row_for_downloads():
|
||||
"""When the watcher fires Change.deleted for a download, the artifact row
|
||||
must be removed too — otherwise the next session read returns a signed
|
||||
URL pointing at a deleted S3 object."""
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
storage.async_client.delete_file = AsyncMock()
|
||||
|
||||
mock_delete_row = AsyncMock(return_value=1)
|
||||
|
||||
with patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module:
|
||||
app_module.DATABASE.artifacts.delete_artifact_for_browser_session = mock_delete_row
|
||||
await storage.delete_browser_session_file(
|
||||
organization_id="o_1",
|
||||
browser_session_id="pbs_1",
|
||||
artifact_type="downloads",
|
||||
remote_path="invoice.pdf",
|
||||
)
|
||||
|
||||
mock_delete_row.assert_awaited_once()
|
||||
_, kwargs = mock_delete_row.call_args
|
||||
assert kwargs["organization_id"] == "o_1"
|
||||
assert kwargs["browser_session_id"] == "pbs_1"
|
||||
assert kwargs["artifact_type"] == ArtifactType.DOWNLOAD
|
||||
assert "browser_sessions/pbs_1/downloads/" in kwargs["uri"]
|
||||
storage.async_client.delete_file.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_browser_session_file_skips_row_delete_for_non_download_types():
|
||||
"""Videos/HAR have no artifact rows — don't even attempt the DB delete."""
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
storage.async_client.delete_file = AsyncMock()
|
||||
|
||||
mock_delete_row = AsyncMock()
|
||||
|
||||
with patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module:
|
||||
app_module.DATABASE.artifacts.delete_artifact_for_browser_session = mock_delete_row
|
||||
await storage.delete_browser_session_file(
|
||||
organization_id="o_1",
|
||||
browser_session_id="pbs_1",
|
||||
artifact_type="videos",
|
||||
remote_path="recording.webm",
|
||||
)
|
||||
|
||||
mock_delete_row.assert_not_awaited()
|
||||
storage.async_client.delete_file.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_browser_session_file_swallows_db_failure_and_still_deletes_s3():
|
||||
"""A transient DB error must not block S3 cleanup — the listing fallback
|
||||
can still surface the file otherwise."""
|
||||
storage = S3Storage()
|
||||
storage.async_client = MagicMock()
|
||||
storage.async_client.delete_file = AsyncMock()
|
||||
|
||||
mock_delete_row = AsyncMock(side_effect=RuntimeError("DB unreachable"))
|
||||
|
||||
with patch("skyvern.forge.sdk.artifact.storage.s3.app") as app_module:
|
||||
app_module.DATABASE.artifacts.delete_artifact_for_browser_session = mock_delete_row
|
||||
await storage.delete_browser_session_file(
|
||||
organization_id="o_1",
|
||||
browser_session_id="pbs_1",
|
||||
artifact_type="downloads",
|
||||
remote_path="invoice.pdf",
|
||||
)
|
||||
|
||||
storage.async_client.delete_file.assert_awaited_once()
|
||||
|
||||
|
||||
# Watcher-level tests for browser_controller live under tests/cloud/ — the
|
||||
# browser_controller module imports cloud-only dependencies (redis client) and
|
||||
# can't load in the OSS-synced unit suite.
|
||||
Loading…
Add table
Add a link
Reference in a new issue