[Oncall] [SKY-8854] Preserve download suffixes across task finalization (#5499)

This commit is contained in:
Marc Kelechava 2026-04-14 12:35:31 -07:00 committed by GitHub
parent 2c61ac9254
commit 68bc01051c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 451 additions and 52 deletions

View file

@ -169,6 +169,82 @@ class ForgeAgent:
def __init__(self) -> None:
self.async_operation_pool = AsyncOperationPool()
async def _finalize_downloaded_files_for_task(
self,
task: Task,
*,
organization_id: str,
download_suffix: str | None,
list_files_before: list[str],
randomize_if_missing: bool,
) -> list[str]:
"""Rename newly downloaded files for a task before persistence.
Returns the list of pre-rename file names discovered as new since
``list_files_before``, for logging continuity with the legacy inline
path.
"""
if not task.workflow_run_id:
return []
context = skyvern_context.current()
workflow_download_directory = get_path_for_workflow_download_directory(
context.run_id if context and context.run_id else task.workflow_run_id
)
list_files_after = list_files_in_directory(workflow_download_directory)
if task.browser_session_id:
browser_session_downloaded_files_after = await app.STORAGE.list_downloaded_files_in_browser_session(
organization_id=organization_id,
browser_session_id=task.browser_session_id,
)
list_files_after = list_files_after + browser_session_downloaded_files_after
files_to_rename = list(set(list_files_after) - set(list_files_before))
if not files_to_rename:
return []
for file in files_to_rename:
local_file_name = file
if file.startswith("s3://"):
file_data = await get_aws_client().download_file(file, log_exception=False)
if not file_data:
continue
local_file_name = file.split("/")[-1]
with open(os.path.join(workflow_download_directory, local_file_name), "wb") as f:
f.write(file_data)
file_extension = Path(local_file_name).suffix
if file_extension == BROWSER_DOWNLOADING_SUFFIX:
LOG.warning(
"Detecting incompleted download file, skip the rename",
file=local_file_name,
task_id=task.task_id,
workflow_run_id=task.workflow_run_id,
)
continue
if download_suffix:
final_file_name = download_suffix
elif randomize_if_missing:
random_file_id = "".join(random.choices(string.ascii_uppercase + string.digits, k=4))
final_file_name = f"download-{datetime.now().strftime('%Y%m%d%H%M%S%f')}-{random_file_id}"
else:
continue
base_final_file_name = final_file_name
target_path = os.path.join(workflow_download_directory, final_file_name + file_extension)
counter = 1
while os.path.exists(target_path):
final_file_name = f"{base_final_file_name}_{counter}"
target_path = os.path.join(workflow_download_directory, final_file_name + file_extension)
counter += 1
rename_file(
os.path.join(workflow_download_directory, local_file_name),
final_file_name + file_extension,
)
return files_to_rename
async def create_task_and_step_from_block(
self,
task_block: BaseTaskBlock,
@ -328,6 +404,7 @@ class ForgeAgent:
engine: RunEngine = RunEngine.skyvern_v1,
cua_response: OpenAIResponse | None = None,
llm_caller: LLMCaller | None = None,
download_baseline_files: list[str] | None = None,
) -> Tuple[Step, DetailedAgentStepOutput | None, Step | None]:
# set the step_id and task_id in the context
context = skyvern_context.ensure_context()
@ -405,6 +482,8 @@ class ForgeAgent:
need_call_webhook=True,
browser_session_id=browser_session_id,
close_browser_on_completion=close_browser_on_completion,
download_suffix=task_block.download_suffix if task_block else None,
list_files_before=download_baseline_files,
)
return step, None, None
@ -423,16 +502,16 @@ class ForgeAgent:
)
next_step: Step | None = None
detailed_output: DetailedAgentStepOutput | None = None
list_files_before: list[str] = []
list_files_before: list[str] = download_baseline_files.copy() if download_baseline_files is not None else []
browser_state: BrowserState | None = None
try:
if task.workflow_run_id:
if download_baseline_files is None and task.workflow_run_id:
list_files_before = list_files_in_directory(
get_path_for_workflow_download_directory(
context.run_id if context and context.run_id else task.workflow_run_id
)
)
if task.browser_session_id:
if task.browser_session_id and download_baseline_files is None:
browser_session_downloaded_files = await app.STORAGE.list_downloaded_files_in_browser_session(
organization_id=organization.organization_id,
browser_session_id=task.browser_session_id,
@ -545,59 +624,19 @@ class ForgeAgent:
workflow_run_id=task.workflow_run_id,
)
list_files_after = list_files_in_directory(workflow_download_directory)
if task.browser_session_id:
browser_session_downloaded_files_after = await app.STORAGE.list_downloaded_files_in_browser_session(
organization_id=organization.organization_id,
browser_session_id=task.browser_session_id,
)
list_files_after = list_files_after + browser_session_downloaded_files_after
if len(list_files_after) > len(list_files_before):
files_to_rename = list(set(list_files_after) - set(list_files_before))
for file in files_to_rename:
if file.startswith("s3://"):
file_data = await get_aws_client().download_file(file, log_exception=False)
if not file_data:
continue
file = file.split("/")[-1] # Extract filename from the end of S3 URI
with open(os.path.join(workflow_download_directory, file), "wb") as f:
f.write(file_data)
file_extension = Path(file).suffix
if file_extension == BROWSER_DOWNLOADING_SUFFIX:
LOG.warning(
"Detecting incompleted download file, skip the rename",
file=file,
task_id=task.task_id,
workflow_run_id=task.workflow_run_id,
)
continue
if task_block.download_suffix:
# Use download_suffix as the complete filename (without extension)
final_file_name = task_block.download_suffix
else:
# Fallback to random filename if no download_suffix provided
random_file_id = "".join(random.choices(string.ascii_uppercase + string.digits, k=4))
final_file_name = f"download-{datetime.now().strftime('%Y%m%d%H%M%S%f')}-{random_file_id}"
# Check if file with this name already exists
final_file_name = final_file_name
target_path = os.path.join(workflow_download_directory, final_file_name + file_extension)
counter = 1
while os.path.exists(target_path):
# If file exists, append counter to filename
final_file_name = f"{final_file_name}_{counter}"
target_path = os.path.join(workflow_download_directory, final_file_name + file_extension)
counter += 1
rename_file(os.path.join(workflow_download_directory, file), final_file_name + file_extension)
files_to_rename = await self._finalize_downloaded_files_for_task(
task,
organization_id=organization.organization_id,
download_suffix=task_block.download_suffix,
list_files_before=list_files_before,
randomize_if_missing=True,
)
if files_to_rename:
LOG.info(
"Task marked as completed due to download",
task_id=task.task_id,
num_files_before=len(list_files_before),
num_files_after=len(list_files_after),
num_files_after=len(list_files_before) + len(files_to_rename),
new_files=files_to_rename,
)
last_step = await self.update_step(step, is_last=True)
@ -607,6 +646,9 @@ class ForgeAgent:
)
await app.ARTIFACT_MANAGER.flush_step_archive(step.step_id)
# Skip per-step video sync: clean_up_task performs the authoritative final upload.
# Do not pass download finalization inputs into cleanup here:
# the early complete_on_download path already finalized files
# against the pre-step baseline and cleanup must not do it again.
await self.clean_up_task(
task=completed_task,
last_step=last_step,
@ -634,6 +676,8 @@ class ForgeAgent:
api_key=api_key,
close_browser_on_completion=close_browser_on_completion,
browser_session_id=browser_session_id,
download_suffix=task_block.download_suffix if task_block else None,
list_files_before=list_files_before,
)
return step, detailed_output, None
elif step.status == StepStatus.completed:
@ -666,6 +710,8 @@ class ForgeAgent:
api_key=api_key,
close_browser_on_completion=close_browser_on_completion,
browser_session_id=browser_session_id,
download_suffix=task_block.download_suffix if task_block else None,
list_files_before=list_files_before,
)
return last_step, detailed_output, None
elif maybe_next_step:
@ -693,6 +739,9 @@ class ForgeAgent:
if not cua_response_param and cua_response:
cua_response_param = cua_response
# Forward the initial download baseline into recursive execute_step calls so
# files downloaded on this step are still seen as "new" by cleanup on a later step.
# Any additional recursive execute_step call site must preserve this kwarg.
if retry and next_step:
return await self.execute_step(
organization,
@ -706,6 +755,7 @@ class ForgeAgent:
engine=engine,
cua_response=cua_response_param,
llm_caller=llm_caller,
download_baseline_files=list_files_before,
)
elif settings.execute_all_steps() and next_step:
return await self.execute_step(
@ -720,6 +770,7 @@ class ForgeAgent:
engine=engine,
cua_response=cua_response_param,
llm_caller=llm_caller,
download_baseline_files=list_files_before,
)
else:
LOG.info(
@ -742,6 +793,8 @@ class ForgeAgent:
api_key=api_key,
close_browser_on_completion=close_browser_on_completion,
browser_session_id=browser_session_id,
download_suffix=task_block.download_suffix if task_block else None,
list_files_before=list_files_before,
)
return step, detailed_output, None
except StepTerminationError as e:
@ -757,6 +810,8 @@ class ForgeAgent:
api_key=api_key,
close_browser_on_completion=close_browser_on_completion,
browser_session_id=browser_session_id,
download_suffix=task_block.download_suffix if task_block else None,
list_files_before=list_files_before,
)
else:
LOG.warning("Task isn't marked as failed, after step termination. NOT clean up the task")
@ -784,6 +839,8 @@ class ForgeAgent:
close_browser_on_completion=close_browser_on_completion,
need_final_screenshot=False,
browser_session_id=browser_session_id,
download_suffix=task_block.download_suffix if task_block else None,
list_files_before=list_files_before,
)
else:
LOG.warning("Task isn't marked as failed, after navigation failure. NOT clean up the task")
@ -800,6 +857,8 @@ class ForgeAgent:
need_call_webhook=False,
browser_session_id=browser_session_id,
close_browser_on_completion=close_browser_on_completion,
download_suffix=task_block.download_suffix if task_block else None,
list_files_before=list_files_before,
)
return step, detailed_output, None
except InvalidTaskStatusTransition:
@ -812,6 +871,8 @@ class ForgeAgent:
need_call_webhook=False,
browser_session_id=browser_session_id,
close_browser_on_completion=close_browser_on_completion,
download_suffix=task_block.download_suffix if task_block else None,
list_files_before=list_files_before,
)
return step, detailed_output, None
except (UnsupportedActionType, UnsupportedTaskType, FailedToParseActionInstruction) as e:
@ -828,6 +889,8 @@ class ForgeAgent:
need_call_webhook=False,
browser_session_id=browser_session_id,
close_browser_on_completion=close_browser_on_completion,
download_suffix=task_block.download_suffix if task_block else None,
list_files_before=list_files_before,
)
return step, detailed_output, None
except ScrapingFailed as sfe:
@ -850,6 +913,8 @@ class ForgeAgent:
api_key=api_key,
close_browser_on_completion=close_browser_on_completion,
browser_session_id=browser_session_id,
download_suffix=task_block.download_suffix if task_block else None,
list_files_before=list_files_before,
)
return step, detailed_output, None
except MissingBrowserStatePage as e:
@ -867,6 +932,8 @@ class ForgeAgent:
api_key=api_key,
close_browser_on_completion=close_browser_on_completion,
browser_session_id=browser_session_id,
download_suffix=task_block.download_suffix if task_block else None,
list_files_before=list_files_before,
)
return step, detailed_output, None
except Exception as e:
@ -882,6 +949,8 @@ class ForgeAgent:
api_key=api_key,
close_browser_on_completion=close_browser_on_completion,
browser_session_id=browser_session_id,
download_suffix=task_block.download_suffix if task_block else None,
list_files_before=list_files_before,
)
else:
LOG.warning("Task isn't marked as failed, after unexpected exception. NOT clean up the task")
@ -3493,6 +3562,8 @@ class ForgeAgent:
close_browser_on_completion: bool = True,
need_final_screenshot: bool = True,
browser_session_id: str | None = None,
download_suffix: str | None = None,
list_files_before: list[str] | None = None,
) -> None:
"""
send the task response to the webhook callback url
@ -3545,7 +3616,18 @@ class ForgeAgent:
if task.organization_id:
try:
# Keep both finalize and save inside a single timeout budget so a hung
# finalize call cannot block persistence forever; accept the trade-off
# that a very slow finalize on many files could crowd out save.
async with asyncio.timeout(SAVE_DOWNLOADED_FILES_TIMEOUT):
if download_suffix and list_files_before is not None:
await self._finalize_downloaded_files_for_task(
task,
organization_id=task.organization_id,
download_suffix=download_suffix,
list_files_before=list_files_before,
randomize_if_missing=False,
)
context = skyvern_context.current()
await app.STORAGE.save_downloaded_files(
organization_id=task.organization_id,

View file

@ -0,0 +1,317 @@
from __future__ import annotations
import os
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from skyvern.forge.agent import ForgeAgent
from skyvern.schemas.runs import RunEngine
from skyvern.webeye.actions.models import DetailedAgentStepOutput
def _make_task(
*,
task_id: str = "task-1",
organization_id: str = "org-1",
workflow_run_id: str = "wr-1",
) -> MagicMock:
task = MagicMock()
task.task_id = task_id
task.organization_id = organization_id
task.workflow_run_id = workflow_run_id
task.browser_session_id = None
task.status = MagicMock(value="terminated")
return task
@pytest.mark.asyncio
async def test_finalize_downloaded_files_renames_with_download_suffix(tmp_path) -> None:
agent = ForgeAgent()
task = _make_task()
download_dir = tmp_path / "downloads"
download_dir.mkdir()
rename_mock = MagicMock()
with (
patch("skyvern.forge.agent.get_path_for_workflow_download_directory", return_value=download_dir),
patch("skyvern.forge.agent.list_files_in_directory", return_value=["uuid-file.zip"]),
patch("skyvern.forge.agent.rename_file", rename_mock),
patch("skyvern.forge.agent.skyvern_context.current", return_value=None),
):
renamed = await agent._finalize_downloaded_files_for_task(
task,
organization_id=task.organization_id,
download_suffix="req-123",
list_files_before=[],
randomize_if_missing=False,
)
assert renamed == ["uuid-file.zip"]
rename_mock.assert_called_once_with(os.path.join(download_dir, "uuid-file.zip"), "req-123.zip")
@pytest.mark.asyncio
async def test_cleanup_task_finalizes_downloads_before_saving(tmp_path) -> None:
agent = ForgeAgent()
task = _make_task()
last_step = MagicMock()
last_step.step_id = "step-1"
call_order: list[str] = []
async def finalize_side_effect(*args, **kwargs):
call_order.append("rename")
return ["uuid-file.zip"]
async def save_side_effect(**kwargs):
call_order.append("save")
with (
patch("skyvern.forge.agent.analytics.capture"),
patch("skyvern.forge.agent.otel_trace.get_current_span", return_value=MagicMock()),
patch("skyvern.forge.agent.skyvern_context.current", return_value=None),
patch.object(agent, "_finalize_downloaded_files_for_task", AsyncMock(side_effect=finalize_side_effect)),
patch("skyvern.forge.agent.app") as mock_app,
):
mock_app.DATABASE.tasks.get_task = AsyncMock(return_value=task)
mock_app.STORAGE.save_downloaded_files = AsyncMock(side_effect=save_side_effect)
await agent.clean_up_task(
task,
last_step=last_step,
need_final_screenshot=False,
download_suffix="req-123",
list_files_before=[],
)
assert call_order == ["rename", "save"]
@pytest.mark.asyncio
async def test_execute_step_complete_on_download_does_not_double_finalize(tmp_path) -> None:
agent = ForgeAgent()
download_dir = tmp_path / "downloads"
download_dir.mkdir()
task = _make_task()
task.status = SimpleNamespace(value="running")
task.navigation_goal = "Download invoice"
task.data_extraction_goal = None
task.complete_criterion = None
task.terminate_criterion = None
task.browser_address = None
task.max_steps_per_run = None
task.url = "https://example.com"
task.proxy_location = None
task.llm_key = None
task.task_type = "general"
step = MagicMock()
step.step_id = "step-1"
step.order = 0
step.retry_index = 0
step.status = "created"
organization = MagicMock()
organization.organization_id = task.organization_id
organization.max_steps_per_run = None
task_block = MagicMock()
task_block.complete_on_download = True
task_block.download_timeout = None
task_block.download_suffix = "req-123"
browser_state = MagicMock()
browser_state.get_working_page = AsyncMock(return_value=None)
async def agent_step_side_effect(*args, **kwargs):
(download_dir / "uuid-file.zip").write_text("dummy")
return step, DetailedAgentStepOutput(
scraped_page=None,
extract_action_prompt=None,
llm_response=None,
actions=None,
action_results=None,
actions_and_results=None,
cua_response=None,
)
async def update_step_side_effect(step_obj, *args, **kwargs):
if "status" in kwargs:
step_obj.status = kwargs["status"]
if "is_last" in kwargs:
step_obj.is_last = kwargs["is_last"]
return step_obj
async def update_task_side_effect(task_obj, *args, **kwargs):
return task_obj
with (
patch("skyvern.forge.agent.analytics.capture"),
patch("skyvern.forge.agent.otel_trace.get_current_span", return_value=MagicMock()),
patch("skyvern.forge.agent.skyvern_context.ensure_context", return_value=MagicMock()),
patch("skyvern.forge.agent.skyvern_context.current", return_value=None),
patch("skyvern.forge.agent.get_path_for_workflow_download_directory", return_value=download_dir),
patch("skyvern.forge.agent.list_downloading_files_in_directory", return_value=[]),
patch("skyvern.forge.agent.app") as mock_app,
patch.object(agent, "initialize_execution_state", AsyncMock(return_value=(step, browser_state, None))),
patch.object(agent, "agent_step", AsyncMock(side_effect=agent_step_side_effect)),
patch.object(agent, "update_step", AsyncMock(side_effect=update_step_side_effect)),
patch.object(agent, "update_task", AsyncMock(side_effect=update_task_side_effect)),
patch.object(agent, "update_task_errors_from_detailed_output", AsyncMock(return_value=task)),
):
mock_app.DATABASE.workflow_runs.get_workflow_run = AsyncMock(return_value=None)
mock_app.DATABASE.tasks.get_task = AsyncMock(return_value=task)
mock_app.DATABASE.tasks.update_task = AsyncMock(return_value=task)
mock_app.AGENT_FUNCTION.validate_step_execution = AsyncMock()
mock_app.AGENT_FUNCTION.post_step_execution = AsyncMock()
mock_app.ARTIFACT_MANAGER.flush_step_archive = AsyncMock()
mock_app.BROWSER_MANAGER.get_for_task = MagicMock(return_value=None)
mock_app.STORAGE.save_downloaded_files = AsyncMock()
mock_app.STORAGE.list_downloaded_files_in_browser_session = AsyncMock(return_value=[])
await agent.execute_step(
organization=organization,
task=task,
step=step,
task_block=task_block,
close_browser_on_completion=True,
complete_verification=True,
engine=RunEngine.skyvern_v1,
)
assert (download_dir / "req-123.zip").exists()
assert not (download_dir / "req-123_1.zip").exists()
assert not (download_dir / "uuid-file.zip").exists()
@pytest.mark.asyncio
async def test_execute_step_reuses_initial_download_baseline_across_recursive_steps(tmp_path) -> None:
agent = ForgeAgent()
download_dir = tmp_path / "downloads"
download_dir.mkdir()
task = _make_task()
task.status = SimpleNamespace(value="running")
task.navigation_goal = "Download invoice"
task.data_extraction_goal = None
task.complete_criterion = None
task.terminate_criterion = None
task.browser_address = None
task.max_steps_per_run = None
task.url = "https://example.com"
task.proxy_location = None
task.llm_key = None
task.task_type = "general"
step1 = MagicMock()
step1.step_id = "step-1"
step1.order = 0
step1.retry_index = 0
step1.status = "created"
step2 = MagicMock()
step2.step_id = "step-2"
step2.order = 1
step2.retry_index = 0
step2.status = "created"
organization = MagicMock()
organization.organization_id = task.organization_id
organization.max_steps_per_run = None
task_block = MagicMock()
task_block.complete_on_download = False
task_block.download_timeout = None
task_block.download_suffix = "req-123"
browser_state = MagicMock()
browser_state.get_working_page = AsyncMock(return_value=None)
async def agent_step_side_effect(*args, **kwargs):
current_step = args[1]
if current_step.step_id == "step-1":
(download_dir / "uuid-file.zip").write_text("dummy")
step1.status = "completed"
return step1, DetailedAgentStepOutput(
scraped_page=None,
extract_action_prompt=None,
llm_response=None,
actions=None,
action_results=None,
actions_and_results=None,
cua_response=None,
)
step2.status = "completed"
return step2, DetailedAgentStepOutput(
scraped_page=None,
extract_action_prompt=None,
llm_response=None,
actions=None,
action_results=None,
actions_and_results=None,
cua_response=None,
)
async def update_step_side_effect(step_obj, *args, **kwargs):
return step_obj
async def update_task_side_effect(task_obj, *args, **kwargs):
return task_obj
handle_completed_step_mock = AsyncMock(
side_effect=[
(None, None, step2),
(True, step2, None),
]
)
with (
patch("skyvern.forge.agent.analytics.capture"),
patch("skyvern.forge.agent.otel_trace.get_current_span", return_value=MagicMock()),
patch("skyvern.forge.agent.skyvern_context.ensure_context", return_value=MagicMock()),
patch("skyvern.forge.agent.skyvern_context.current", return_value=None),
patch("skyvern.forge.agent.get_path_for_workflow_download_directory", return_value=download_dir),
patch("skyvern.forge.agent.list_downloading_files_in_directory", return_value=[]),
patch.object(
type(__import__("skyvern.forge.agent", fromlist=["settings"]).settings),
"execute_all_steps",
return_value=True,
),
patch("skyvern.forge.agent.app") as mock_app,
patch.object(
agent,
"initialize_execution_state",
AsyncMock(side_effect=lambda task_obj, step_obj, *_args, **_kwargs: (step_obj, browser_state, None)),
),
patch.object(agent, "agent_step", AsyncMock(side_effect=agent_step_side_effect)),
patch.object(agent, "update_step", AsyncMock(side_effect=update_step_side_effect)),
patch.object(agent, "update_task", AsyncMock(side_effect=update_task_side_effect)),
patch.object(agent, "update_task_errors_from_detailed_output", AsyncMock(return_value=task)),
patch.object(agent, "handle_completed_step", handle_completed_step_mock),
):
mock_app.DATABASE.workflow_runs.get_workflow_run = AsyncMock(return_value=None)
mock_app.DATABASE.tasks.get_task = AsyncMock(return_value=task)
mock_app.DATABASE.tasks.update_task = AsyncMock(return_value=task)
mock_app.AGENT_FUNCTION.validate_step_execution = AsyncMock()
mock_app.AGENT_FUNCTION.post_step_execution = AsyncMock()
mock_app.ARTIFACT_MANAGER.flush_step_archive = AsyncMock()
mock_app.BROWSER_MANAGER.get_for_task = MagicMock(return_value=None)
mock_app.BROWSER_MANAGER.get_video_artifacts = AsyncMock(return_value=[])
mock_app.STORAGE.save_downloaded_files = AsyncMock()
mock_app.STORAGE.list_downloaded_files_in_browser_session = AsyncMock(return_value=[])
await agent.execute_step(
organization=organization,
task=task,
step=step1,
task_block=task_block,
close_browser_on_completion=True,
complete_verification=True,
engine=RunEngine.skyvern_v1,
)
assert (download_dir / "req-123.zip").exists()
assert not (download_dir / "uuid-file.zip").exists()