mirror of
https://github.com/Skyvern-AI/skyvern.git
synced 2026-04-28 03:30:10 +00:00
feat: return all workflow run recordings (backend) (SKY-9233) (#5660)
This commit is contained in:
parent
fb64ffd88c
commit
26ec15a045
4 changed files with 149 additions and 17 deletions
|
|
@ -135,6 +135,31 @@ LOG = structlog.get_logger()
|
|||
DEFAULT_FIRST_BLOCK_LABEL = "block_1"
|
||||
DEFAULT_WORKFLOW_TITLE = "New Workflow"
|
||||
|
||||
# Empirical S3 upload SLA; no start buffer (back-to-back leakage is worse than late uploads to the next run).
|
||||
RECORDING_WINDOW_END_BUFFER = timedelta(minutes=15)
|
||||
|
||||
|
||||
def _as_utc(dt: datetime) -> datetime:
|
||||
return dt.replace(tzinfo=UTC) if dt.tzinfo is None else dt.astimezone(UTC)
|
||||
|
||||
|
||||
def _select_recording_urls_in_window(
|
||||
recordings: Sequence[FileInfo],
|
||||
lower_bound: datetime,
|
||||
upper_bound: datetime,
|
||||
) -> list[str]:
|
||||
"""Filter recordings to [lower_bound, upper_bound] by modified_at (UTC), sort oldest-first."""
|
||||
in_window: list[tuple[datetime, str]] = []
|
||||
for r in recordings:
|
||||
if r.modified_at is None:
|
||||
continue
|
||||
modified_utc = _as_utc(r.modified_at)
|
||||
if lower_bound <= modified_utc <= upper_bound:
|
||||
in_window.append((modified_utc, r.url))
|
||||
in_window.sort(key=lambda pair: pair[0])
|
||||
return [url for _, url in in_window]
|
||||
|
||||
|
||||
CacheInvalidationReason = Literal["updated_block", "new_block", "removed_block"]
|
||||
BLOCK_TYPES_THAT_SHOULD_BE_CACHED = {
|
||||
BlockType.TASK,
|
||||
|
|
@ -4522,29 +4547,43 @@ class WorkflowService:
|
|||
)
|
||||
screenshot_urls = screenshot_urls or None
|
||||
|
||||
recording_url = None
|
||||
# Get recording url from browser session first,
|
||||
# if not found, get the recording url from the artifacts
|
||||
recording_urls: list[str] = []
|
||||
# Prefer browser-session recordings; fall back to artifact store.
|
||||
if workflow_run.browser_session_id:
|
||||
try:
|
||||
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
|
||||
recordings = await app.STORAGE.get_shared_recordings_in_browser_session(
|
||||
organization_id=workflow_run.organization_id,
|
||||
browser_session_id=workflow_run.browser_session_id,
|
||||
)
|
||||
# FIXME: we only support one recording for now
|
||||
recording_url = recordings[0].url if recordings else None
|
||||
except asyncio.TimeoutError:
|
||||
LOG.warning("Timeout getting recordings", browser_session_id=workflow_run.browser_session_id)
|
||||
if workflow_run.started_at is None:
|
||||
LOG.warning(
|
||||
"Skipping recording fan-out: workflow run has browser_session_id but no started_at",
|
||||
workflow_run_id=workflow_run.workflow_run_id,
|
||||
browser_session_id=workflow_run.browser_session_id,
|
||||
)
|
||||
else:
|
||||
try:
|
||||
async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT):
|
||||
recordings = await app.STORAGE.get_shared_recordings_in_browser_session(
|
||||
organization_id=workflow_run.organization_id,
|
||||
browser_session_id=workflow_run.browser_session_id,
|
||||
)
|
||||
# started_at excludes prior-run uploads on reused sessions.
|
||||
lower_bound = _as_utc(workflow_run.started_at)
|
||||
run_end = _as_utc(workflow_run.finished_at) if workflow_run.finished_at else datetime.now(UTC)
|
||||
upper_bound = run_end + RECORDING_WINDOW_END_BUFFER
|
||||
recording_urls = _select_recording_urls_in_window(recordings, lower_bound, upper_bound)
|
||||
except asyncio.TimeoutError:
|
||||
LOG.warning("Timeout getting recordings", browser_session_id=workflow_run.browser_session_id)
|
||||
|
||||
if recording_url is None:
|
||||
if not recording_urls:
|
||||
recording_artifact = await app.DATABASE.artifacts.get_artifact_for_run(
|
||||
run_id=task_v2.observer_cruise_id if task_v2 else workflow_run_id,
|
||||
artifact_type=ArtifactType.RECORDING,
|
||||
organization_id=organization_id,
|
||||
)
|
||||
if recording_artifact:
|
||||
recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact)
|
||||
artifact_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact)
|
||||
if artifact_url:
|
||||
recording_urls = [artifact_url]
|
||||
|
||||
# Preserve legacy singular contract: last element is the newest (old code returned recordings[0] of newest-first list).
|
||||
recording_url = recording_urls[-1] if recording_urls else None
|
||||
|
||||
downloaded_files: list[FileInfo] = []
|
||||
downloaded_file_urls: list[str] | None = None
|
||||
|
|
@ -4659,6 +4698,7 @@ class WorkflowService:
|
|||
parameters=parameters_with_value,
|
||||
screenshot_urls=screenshot_urls,
|
||||
recording_url=recording_url,
|
||||
recording_urls=recording_urls or None, # omit field when empty
|
||||
downloaded_files=downloaded_files,
|
||||
downloaded_file_urls=downloaded_file_urls,
|
||||
outputs=outputs,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue