diff --git a/skyvern/forge/sdk/artifact/storage/local.py b/skyvern/forge/sdk/artifact/storage/local.py index 848f968f6..36f82b768 100644 --- a/skyvern/forge/sdk/artifact/storage/local.py +++ b/skyvern/forge/sdk/artifact/storage/local.py @@ -1,6 +1,6 @@ import os import shutil -from datetime import datetime +from datetime import UTC, datetime from pathlib import Path from typing import BinaryIO @@ -367,7 +367,9 @@ class LocalStorage(BaseStorage): if file_size == 0: continue - modified_at = datetime.fromtimestamp(path_obj.stat().st_mtime) + # Return UTC-aware so consumers can safely compare against S3 LastModified + # (also UTC-aware) without hitting naive-vs-aware TypeErrors. + modified_at = datetime.fromtimestamp(path_obj.stat().st_mtime, tz=UTC) checksum = calculate_sha256_for_file(file_path) filename = path_obj.name diff --git a/skyvern/forge/sdk/workflow/models/workflow.py b/skyvern/forge/sdk/workflow/models/workflow.py index e636369b4..3d80d677b 100644 --- a/skyvern/forge/sdk/workflow/models/workflow.py +++ b/skyvern/forge/sdk/workflow/models/workflow.py @@ -270,6 +270,7 @@ class WorkflowRunResponseBase(BaseModel): parameters: dict[str, Any] screenshot_urls: list[str] | None = None recording_url: str | None = None + recording_urls: list[str] | None = None downloaded_files: list[FileInfo] | None = None downloaded_file_urls: list[str] | None = None outputs: dict[str, Any] | None = None diff --git a/skyvern/forge/sdk/workflow/service.py b/skyvern/forge/sdk/workflow/service.py index 8ef33ad71..ec55caff7 100644 --- a/skyvern/forge/sdk/workflow/service.py +++ b/skyvern/forge/sdk/workflow/service.py @@ -135,6 +135,31 @@ LOG = structlog.get_logger() DEFAULT_FIRST_BLOCK_LABEL = "block_1" DEFAULT_WORKFLOW_TITLE = "New Workflow" +# Empirical S3 upload SLA; no start buffer (back-to-back leakage is worse than late uploads to the next run). +RECORDING_WINDOW_END_BUFFER = timedelta(minutes=15) + + +def _as_utc(dt: datetime) -> datetime: + return dt.replace(tzinfo=UTC) if dt.tzinfo is None else dt.astimezone(UTC) + + +def _select_recording_urls_in_window( + recordings: Sequence[FileInfo], + lower_bound: datetime, + upper_bound: datetime, +) -> list[str]: + """Filter recordings to [lower_bound, upper_bound] by modified_at (UTC), sort oldest-first.""" + in_window: list[tuple[datetime, str]] = [] + for r in recordings: + if r.modified_at is None: + continue + modified_utc = _as_utc(r.modified_at) + if lower_bound <= modified_utc <= upper_bound: + in_window.append((modified_utc, r.url)) + in_window.sort(key=lambda pair: pair[0]) + return [url for _, url in in_window] + + CacheInvalidationReason = Literal["updated_block", "new_block", "removed_block"] BLOCK_TYPES_THAT_SHOULD_BE_CACHED = { BlockType.TASK, @@ -4522,29 +4547,43 @@ class WorkflowService: ) screenshot_urls = screenshot_urls or None - recording_url = None - # Get recording url from browser session first, - # if not found, get the recording url from the artifacts + recording_urls: list[str] = [] + # Prefer browser-session recordings; fall back to artifact store. if workflow_run.browser_session_id: - try: - async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT): - recordings = await app.STORAGE.get_shared_recordings_in_browser_session( - organization_id=workflow_run.organization_id, - browser_session_id=workflow_run.browser_session_id, - ) - # FIXME: we only support one recording for now - recording_url = recordings[0].url if recordings else None - except asyncio.TimeoutError: - LOG.warning("Timeout getting recordings", browser_session_id=workflow_run.browser_session_id) + if workflow_run.started_at is None: + LOG.warning( + "Skipping recording fan-out: workflow run has browser_session_id but no started_at", + workflow_run_id=workflow_run.workflow_run_id, + browser_session_id=workflow_run.browser_session_id, + ) + else: + try: + async with asyncio.timeout(GET_DOWNLOADED_FILES_TIMEOUT): + recordings = await app.STORAGE.get_shared_recordings_in_browser_session( + organization_id=workflow_run.organization_id, + browser_session_id=workflow_run.browser_session_id, + ) + # started_at excludes prior-run uploads on reused sessions. + lower_bound = _as_utc(workflow_run.started_at) + run_end = _as_utc(workflow_run.finished_at) if workflow_run.finished_at else datetime.now(UTC) + upper_bound = run_end + RECORDING_WINDOW_END_BUFFER + recording_urls = _select_recording_urls_in_window(recordings, lower_bound, upper_bound) + except asyncio.TimeoutError: + LOG.warning("Timeout getting recordings", browser_session_id=workflow_run.browser_session_id) - if recording_url is None: + if not recording_urls: recording_artifact = await app.DATABASE.artifacts.get_artifact_for_run( run_id=task_v2.observer_cruise_id if task_v2 else workflow_run_id, artifact_type=ArtifactType.RECORDING, organization_id=organization_id, ) if recording_artifact: - recording_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact) + artifact_url = await app.ARTIFACT_MANAGER.get_share_link(recording_artifact) + if artifact_url: + recording_urls = [artifact_url] + + # Preserve legacy singular contract: last element is the newest (old code returned recordings[0] of newest-first list). + recording_url = recording_urls[-1] if recording_urls else None downloaded_files: list[FileInfo] = [] downloaded_file_urls: list[str] | None = None @@ -4659,6 +4698,7 @@ class WorkflowService: parameters=parameters_with_value, screenshot_urls=screenshot_urls, recording_url=recording_url, + recording_urls=recording_urls or None, # omit field when empty downloaded_files=downloaded_files, downloaded_file_urls=downloaded_file_urls, outputs=outputs, diff --git a/tests/unit/test_recording_window_filter.py b/tests/unit/test_recording_window_filter.py new file mode 100644 index 000000000..0d4844e6d --- /dev/null +++ b/tests/unit/test_recording_window_filter.py @@ -0,0 +1,89 @@ +from datetime import UTC, datetime, timedelta, timezone + +import pytest + +from skyvern.forge.sdk.schemas.files import FileInfo +from skyvern.forge.sdk.workflow.service import _as_utc, _select_recording_urls_in_window + + +def _rec(url: str, modified_at: datetime | None) -> FileInfo: + return FileInfo(url=url, checksum=None, filename=None, modified_at=modified_at) + + +@pytest.mark.parametrize( + "dt,expected", + [ + (datetime(2026, 1, 1, 12, 0), datetime(2026, 1, 1, 12, 0, tzinfo=UTC)), + (datetime(2026, 1, 1, 12, 0, tzinfo=UTC), datetime(2026, 1, 1, 12, 0, tzinfo=UTC)), + ( + datetime(2026, 1, 1, 8, 0, tzinfo=timezone(timedelta(hours=-4))), + datetime(2026, 1, 1, 12, 0, tzinfo=UTC), + ), + ], +) +def test_as_utc_normalizes(dt: datetime, expected: datetime) -> None: + assert _as_utc(dt) == expected + + +def test_select_empty_list_returns_empty() -> None: + lower = datetime(2026, 1, 1, 10, 0, tzinfo=UTC) + upper = datetime(2026, 1, 1, 11, 0, tzinfo=UTC) + assert _select_recording_urls_in_window([], lower, upper) == [] + + +def test_select_drops_undated() -> None: + lower = datetime(2026, 1, 1, 10, 0, tzinfo=UTC) + upper = datetime(2026, 1, 1, 11, 0, tzinfo=UTC) + recs = [ + _rec("undated", None), + _rec("in", datetime(2026, 1, 1, 10, 30, tzinfo=UTC)), + ] + assert _select_recording_urls_in_window(recs, lower, upper) == ["in"] + + +def test_select_sorts_oldest_first() -> None: + lower = datetime(2026, 1, 1, 10, 0, tzinfo=UTC) + upper = datetime(2026, 1, 1, 12, 0, tzinfo=UTC) + recs = [ + _rec("new", datetime(2026, 1, 1, 11, 30, tzinfo=UTC)), + _rec("old", datetime(2026, 1, 1, 10, 15, tzinfo=UTC)), + _rec("mid", datetime(2026, 1, 1, 10, 45, tzinfo=UTC)), + ] + assert _select_recording_urls_in_window(recs, lower, upper) == ["old", "mid", "new"] + + +def test_select_excludes_out_of_window() -> None: + lower = datetime(2026, 1, 1, 10, 0, tzinfo=UTC) + upper = datetime(2026, 1, 1, 11, 0, tzinfo=UTC) + recs = [ + _rec("before", datetime(2026, 1, 1, 9, 30, tzinfo=UTC)), + _rec("after", datetime(2026, 1, 1, 12, 0, tzinfo=UTC)), + _rec("in", datetime(2026, 1, 1, 10, 30, tzinfo=UTC)), + ] + assert _select_recording_urls_in_window(recs, lower, upper) == ["in"] + + +@pytest.mark.parametrize("boundary", ["lower", "upper"]) +def test_select_includes_exact_boundary_match(boundary: str) -> None: + lower = datetime(2026, 1, 1, 10, 0, tzinfo=UTC) + upper = datetime(2026, 1, 1, 11, 0, tzinfo=UTC) + ts = lower if boundary == "lower" else upper + recs = [_rec("edge", ts)] + assert _select_recording_urls_in_window(recs, lower, upper) == ["edge"] + + +def test_select_handles_naive_modified_at() -> None: + # FileInfo.modified_at can still be naive from legacy code; _as_utc must normalize. + lower = datetime(2026, 1, 1, 10, 0, tzinfo=UTC) + upper = datetime(2026, 1, 1, 11, 0, tzinfo=UTC) + recs = [_rec("naive", datetime(2026, 1, 1, 10, 30))] + assert _select_recording_urls_in_window(recs, lower, upper) == ["naive"] + + +def test_select_handles_non_utc_aware_modified_at() -> None: + lower = datetime(2026, 1, 1, 10, 0, tzinfo=UTC) + upper = datetime(2026, 1, 1, 11, 0, tzinfo=UTC) + est = timezone(timedelta(hours=-5)) + # 2026-01-01 05:30 EST == 10:30 UTC + recs = [_rec("est", datetime(2026, 1, 1, 5, 30, tzinfo=est))] + assert _select_recording_urls_in_window(recs, lower, upper) == ["est"]