Remove unnecessary S3 PUTs (#5448)
Some checks failed
Run tests and pre-commit / Run tests and pre-commit hooks (push) Waiting to run
Run tests and pre-commit / Frontend Lint and Build (push) Waiting to run
Publish Fern Docs / run (push) Waiting to run
Auto Create GitHub Release on Version Change / check-version-change (push) Has been cancelled
Build Skyvern SDK and publish to PyPI / check-version-change (push) Has been cancelled
Auto Create GitHub Release on Version Change / create-release (push) Has been cancelled
Build Skyvern SDK and publish to PyPI / run-ci (push) Has been cancelled
Build Skyvern SDK and publish to PyPI / build-sdk (push) Has been cancelled

This commit is contained in:
Shuchang Zheng 2026-04-09 19:44:09 -07:00 committed by GitHub
parent 18ab8447c2
commit 688b010b72
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 146 additions and 13 deletions

View file

@ -605,6 +605,7 @@ class ForgeAgent:
status=TaskStatus.completed,
)
await app.ARTIFACT_MANAGER.flush_step_archive(step.step_id)
# Skip per-step video sync: clean_up_task performs the authoritative final upload.
await self.clean_up_task(
task=completed_task,
last_step=last_step,
@ -621,6 +622,8 @@ class ForgeAgent:
await app.ARTIFACT_MANAGER.flush_step_archive(step.step_id)
# If there is no next step, it means that the task has failed
if maybe_next_step:
# Only sync on retry; clean_up_task handles the final upload on terminal paths.
await self._sync_video_artifact_after_step(task, browser_state)
next_step = maybe_next_step
retry = True
else:
@ -655,6 +658,7 @@ class ForgeAgent:
await app.ARTIFACT_MANAGER.flush_step_archive(step.step_id)
if is_task_completed is not None and maybe_last_step:
last_step = maybe_last_step
# Skip per-step video sync: clean_up_task performs the authoritative final upload.
await self.clean_up_task(
task=task,
last_step=last_step,
@ -664,6 +668,8 @@ class ForgeAgent:
)
return last_step, detailed_output, None
elif maybe_next_step:
# Only sync when continuing to the next step; clean_up_task handles terminal paths.
await self._sync_video_artifact_after_step(task, browser_state)
next_step = maybe_next_step
retry = False
else:
@ -680,6 +686,7 @@ class ForgeAgent:
)
# Flush for unexpected step status to release any buffered data.
await app.ARTIFACT_MANAGER.flush_step_archive(step.step_id)
await self._sync_video_artifact_after_step(task, browser_state)
cua_response_param = detailed_output.cua_response if detailed_output else None
if not cua_response_param and cua_response:
@ -2344,6 +2351,33 @@ class ForgeAgent:
)
return None
async def _sync_video_artifact_after_step(self, task: Task, browser_state: BrowserState | None) -> None:
"""Upload the current video snapshot once per step so in-progress recordings are visible.
The video file is still open while recording, so this is a partial snapshot rather than a
finalized recording. The authoritative final upload happens in cleanup_and_persist_task after
the browser closes and Playwright writes the complete file.
"""
if not browser_state:
return
try:
video_artifacts = await app.BROWSER_MANAGER.get_video_artifacts(
task_id=task.task_id, browser_state=browser_state
)
for video_artifact in video_artifacts:
await app.ARTIFACT_MANAGER.update_artifact_data(
artifact_id=video_artifact.video_artifact_id,
organization_id=task.organization_id,
data=video_artifact.video_data,
)
except Exception:
LOG.warning(
"Failed to sync video artifact after step",
task_id=task.task_id,
organization_id=task.organization_id,
exc_info=True,
)
async def record_artifacts_after_action(
self,
task: Task,
@ -2467,19 +2501,6 @@ class ForgeAgent:
exc_info=True,
)
try:
video_artifacts = await app.BROWSER_MANAGER.get_video_artifacts(
task_id=task.task_id, browser_state=browser_state
)
for video_artifact in video_artifacts:
await app.ARTIFACT_MANAGER.update_artifact_data(
artifact_id=video_artifact.video_artifact_id,
organization_id=task.organization_id,
data=video_artifact.video_data,
)
except Exception:
LOG.exception("Failed to record video after action")
async def initialize_execution_state(
self,
task: Task,

View file

@ -3116,6 +3116,34 @@ class WorkflowService:
)
]
# Track task_generation for observability (SKY-8842)
try:
user_prompt_hash = sha256(user_prompt.encode("utf-8")).hexdigest()
v1_kwargs: dict[str, Any] = {}
if task_version == "v1":
v1_kwargs = {
"url": url,
"navigation_goal": navigation_goal,
"navigation_payload": task_response.get("navigation_payload"),
"data_extraction_goal": data_extraction_goal,
"suggested_title": task_response.get("suggested_title"),
"llm": settings.LLM_KEY,
"llm_prompt": task_prompt,
"llm_response": str(task_response),
}
await app.DATABASE.workflow_params.create_task_generation(
organization_id=organization.organization_id,
user_prompt=user_prompt,
user_prompt_hash=user_prompt_hash,
**v1_kwargs,
)
except Exception:
LOG.warning(
"Failed to create task_generation record",
exc_info=True,
organization_id=organization.organization_id,
)
new_workflow = await self.create_workflow(
title=title,
workflow_definition=WorkflowDefinition(parameters=[], blocks=blocks),

View file

@ -0,0 +1,84 @@
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from skyvern.forge.agent import ForgeAgent
def _make_task(task_id: str = "task-1", organization_id: str = "org-1") -> MagicMock:
task = MagicMock()
task.task_id = task_id
task.organization_id = organization_id
return task
def _make_video_artifact(artifact_id: str, video_data: bytes) -> MagicMock:
artifact = MagicMock()
artifact.video_artifact_id = artifact_id
artifact.video_data = video_data
return artifact
@pytest.mark.asyncio
async def test_sync_video_noop_when_browser_state_is_none() -> None:
"""When browser_state is None the method must return without touching any app singletons."""
agent = ForgeAgent()
task = _make_task()
with patch("skyvern.forge.agent.app") as mock_app:
await agent._sync_video_artifact_after_step(task, browser_state=None)
mock_app.BROWSER_MANAGER.get_video_artifacts.assert_not_called()
mock_app.ARTIFACT_MANAGER.update_artifact_data.assert_not_called()
@pytest.mark.asyncio
async def test_sync_video_uploads_each_artifact() -> None:
"""Each video artifact returned by BROWSER_MANAGER must be uploaded via ARTIFACT_MANAGER."""
agent = ForgeAgent()
task = _make_task()
browser_state = MagicMock()
artifact_a = _make_video_artifact("vid-a", b"bytes-a")
artifact_b = _make_video_artifact("vid-b", b"bytes-b")
with patch("skyvern.forge.agent.app") as mock_app:
mock_app.BROWSER_MANAGER.get_video_artifacts = AsyncMock(return_value=[artifact_a, artifact_b])
mock_app.ARTIFACT_MANAGER.update_artifact_data = AsyncMock()
await agent._sync_video_artifact_after_step(task, browser_state=browser_state)
mock_app.BROWSER_MANAGER.get_video_artifacts.assert_awaited_once_with(
task_id=task.task_id, browser_state=browser_state
)
assert mock_app.ARTIFACT_MANAGER.update_artifact_data.await_count == 2
mock_app.ARTIFACT_MANAGER.update_artifact_data.assert_any_await(
artifact_id="vid-a", organization_id=task.organization_id, data=b"bytes-a"
)
mock_app.ARTIFACT_MANAGER.update_artifact_data.assert_any_await(
artifact_id="vid-b", organization_id=task.organization_id, data=b"bytes-b"
)
@pytest.mark.asyncio
async def test_sync_video_swallows_exception() -> None:
"""If get_video_artifacts raises, the method must not propagate the exception,
and the warning log must include task_id and organization_id for traceability."""
agent = ForgeAgent()
task = _make_task(task_id="task-err", organization_id="org-err")
browser_state = MagicMock()
with patch("skyvern.forge.agent.app") as mock_app:
mock_app.BROWSER_MANAGER.get_video_artifacts = AsyncMock(side_effect=RuntimeError("storage unavailable"))
with patch("skyvern.forge.agent.LOG") as mock_log:
# Should not raise
await agent._sync_video_artifact_after_step(task, browser_state=browser_state)
mock_log.warning.assert_called_once()
_, kwargs = mock_log.warning.call_args
assert kwargs.get("task_id") == "task-err"
assert kwargs.get("organization_id") == "org-err"
assert kwargs.get("exc_info") is True
mock_app.ARTIFACT_MANAGER.update_artifact_data.assert_not_called()