🔄 synced local 'tests/unit/' with remote 'tests/unit/'

This commit is contained in:
Shuchang Zheng 2026-04-22 16:06:30 +00:00 committed by Andrew Neilson
parent e35400f429
commit 71b752c554
3 changed files with 254 additions and 0 deletions

View file

@ -0,0 +1,88 @@
import importlib
from datetime import timedelta
import pytest
from temporalio.exceptions import ActivityError
from temporalio.exceptions import TimeoutError as TemporalTimeoutError
from temporalio.exceptions import TimeoutType
from workers.run_parameters import RunSkyvernWorkflowParams, SendWorkflowWebhookParams
importlib.import_module("cloud")
workflow_module = importlib.import_module("workers.temporal_v2_worker.workflows")
RunWorkflowWorkflowV2 = workflow_module.RunWorkflowWorkflowV2
def _activity_name(activity_fn: object) -> str:
return getattr(activity_fn, "__name__", repr(activity_fn))
def _heartbeat_timeout_error() -> ActivityError:
err = ActivityError(
"activity timed out",
scheduled_event_id=1,
started_event_id=2,
identity="test-worker",
activity_type="run_workflow_activity",
activity_id="act_1",
retry_state=None,
)
err.__cause__ = TemporalTimeoutError(
"simulated heartbeat timeout",
type=TimeoutType.HEARTBEAT,
last_heartbeat_details=[],
)
return err
@pytest.mark.asyncio
async def test_run_workflow_v2_sends_webhook_on_heartbeat_timeout(monkeypatch: pytest.MonkeyPatch) -> None:
calls: list[tuple[str, object]] = []
async def fake_execute_activity(activity_fn: object, arg: object, **_kwargs: object) -> None:
name = _activity_name(activity_fn)
calls.append((name, arg))
if name == "run_workflow_activity":
raise _heartbeat_timeout_error()
monkeypatch.setattr(workflow_module.workflow, "execute_activity", fake_execute_activity)
monkeypatch.setattr(workflow_module.workflow, "upsert_search_attributes", lambda *_args, **_kwargs: None)
with pytest.raises(ActivityError):
await RunWorkflowWorkflowV2().run(RunSkyvernWorkflowParams(organization_id="o_1", workflow_run_id="wr_1"))
assert [name for name, _arg in calls] == [
"run_workflow_activity",
"timeout_workflow_run_activity",
"signal_dependent_workflows_activity",
"send_workflow_webhook_activity",
]
send_params = calls[-1][1]
assert isinstance(send_params, SendWorkflowWebhookParams)
assert send_params.workflow_run_id == "wr_1"
assert send_params.organization_id == "o_1"
@pytest.mark.asyncio
async def test_run_workflow_v2_sends_webhook_on_success(monkeypatch: pytest.MonkeyPatch) -> None:
calls: list[tuple[str, object, object]] = []
async def fake_execute_activity(activity_fn: object, arg: object, **kwargs: object) -> None:
calls.append((_activity_name(activity_fn), arg, kwargs))
monkeypatch.setattr(workflow_module.workflow, "execute_activity", fake_execute_activity)
monkeypatch.setattr(workflow_module.workflow, "upsert_search_attributes", lambda *_args, **_kwargs: None)
await RunWorkflowWorkflowV2().run(RunSkyvernWorkflowParams(organization_id="o_1", workflow_run_id="wr_ok"))
assert [name for name, _arg, _kwargs in calls] == [
"run_workflow_activity",
"signal_dependent_workflows_activity",
"send_workflow_webhook_activity",
]
send_params = calls[-1][1]
send_kwargs = calls[-1][2]
assert isinstance(send_params, SendWorkflowWebhookParams)
assert send_params.workflow_run_id == "wr_ok"
assert send_kwargs["start_to_close_timeout"] == timedelta(seconds=60)
assert send_kwargs["retry_policy"].maximum_attempts == 1

View file

@ -0,0 +1,109 @@
import importlib
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import pytest
from skyvern.exceptions import FailedToSendWebhook
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun, WorkflowRunStatus
from workers.run_parameters import RunSkyvernWorkflowParams, SendWorkflowWebhookParams
importlib.import_module("cloud")
activities_module = importlib.import_module("workers.temporal_v2_worker.activities")
send_workflow_webhook_activity = activities_module.send_workflow_webhook_activity
def _make_wr(
status: str = "timed_out",
webhook_callback_url: str | None = "https://example.com/hook",
) -> WorkflowRun:
now = datetime.now(timezone.utc)
return WorkflowRun(
workflow_run_id="wr_abc",
workflow_id="wf_abc",
workflow_permanent_id="wpid_abc",
organization_id="o_xyz",
status=WorkflowRunStatus(status),
webhook_callback_url=webhook_callback_url,
created_at=now,
modified_at=now,
)
def test_send_workflow_webhook_params_accepts_required_fields() -> None:
params = SendWorkflowWebhookParams(
workflow_run_id="wr_520069195913377474",
organization_id="o_482380747875719746",
)
assert params.workflow_run_id == "wr_520069195913377474"
assert params.organization_id == "o_482380747875719746"
@pytest.mark.asyncio
async def test_send_workflow_webhook_activity_delegates_to_service() -> None:
params = SendWorkflowWebhookParams(
workflow_run_id="wr_abc",
organization_id="o_xyz",
)
fake_run = _make_wr(status="timed_out")
with (
patch("skyvern.forge.app.DATABASE.workflow_runs.get_workflow_run", AsyncMock(return_value=fake_run)),
patch("skyvern.forge.app.WORKFLOW_SERVICE.execute_workflow_webhook", AsyncMock()) as send_mock,
):
await send_workflow_webhook_activity(params)
send_mock.assert_awaited_once()
kwargs = send_mock.await_args.kwargs
assert kwargs["workflow_run"] is fake_run
@pytest.mark.asyncio
async def test_send_workflow_webhook_activity_returns_silently_when_wr_missing() -> None:
params = SendWorkflowWebhookParams(workflow_run_id="wr_gone", organization_id="o_xyz")
with (
patch("skyvern.forge.app.DATABASE.workflow_runs.get_workflow_run", AsyncMock(return_value=None)),
patch("skyvern.forge.app.WORKFLOW_SERVICE.execute_workflow_webhook", AsyncMock()) as send_mock,
):
await send_workflow_webhook_activity(params)
send_mock.assert_not_awaited()
@pytest.mark.asyncio
async def test_send_workflow_webhook_activity_propagates_delivery_errors() -> None:
params = SendWorkflowWebhookParams(workflow_run_id="wr_abc", organization_id="o_xyz")
fake_run = _make_wr(status="timed_out")
fake_token = type("T", (), {"token": "k"})()
with (
patch("skyvern.forge.app.DATABASE.workflow_runs.get_workflow_run", AsyncMock(return_value=fake_run)),
patch("skyvern.forge.app.DATABASE.organizations.get_valid_org_auth_token", AsyncMock(return_value=fake_token)),
patch(
"skyvern.forge.app.WORKFLOW_SERVICE.execute_workflow_webhook",
AsyncMock(side_effect=FailedToSendWebhook(workflow_id="w_1", workflow_run_id="wr_abc")),
),
):
with pytest.raises(FailedToSendWebhook):
await send_workflow_webhook_activity(params)
@pytest.mark.asyncio
async def test_run_workflow_activity_disables_inline_webhook(monkeypatch: pytest.MonkeyPatch) -> None:
execute_mock = AsyncMock()
async def fake_heartbeat_loop(_run_id: str, stop) -> None:
await stop.wait()
monkeypatch.setattr("scripts.run_workflow.execute_workflow", execute_mock)
monkeypatch.setattr(activities_module, "heartbeat_loop", fake_heartbeat_loop)
monkeypatch.setattr(activities_module, "activity_teardown", AsyncMock())
monkeypatch.setattr(activities_module.activity, "heartbeat", lambda *_args, **_kwargs: None)
await activities_module.run_workflow_activity(
RunSkyvernWorkflowParams(
organization_id="o_xyz",
workflow_run_id="wr_abc",
)
)
execute_mock.assert_awaited_once()
assert execute_mock.await_args.kwargs["need_call_webhook"] is False

View file

@ -0,0 +1,57 @@
import importlib
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import pytest
from skyvern.forge.sdk.workflow.models.workflow import WorkflowRun, WorkflowRunStatus
from workers.run_parameters import TimeoutWorkflowRunParams
importlib.import_module("cloud")
timeout_workflow_run_activity = importlib.import_module(
"workers.temporal_v2_worker.activities"
).timeout_workflow_run_activity
def _make_wr(status: str = "running") -> WorkflowRun:
now = datetime.now(timezone.utc)
return WorkflowRun(
workflow_run_id="wr_1",
workflow_id="wf_1",
workflow_permanent_id="wpid_1",
organization_id="o_1",
status=WorkflowRunStatus(status),
created_at=now,
modified_at=now,
)
@pytest.mark.asyncio
async def test_timeout_activity_does_not_send_webhook_anymore() -> None:
params = TimeoutWorkflowRunParams(workflow_run_id="wr_1", organization_id="o_1")
fake_run = _make_wr(status="running")
with (
patch("skyvern.forge.app.DATABASE.workflow_runs.get_workflow_run", AsyncMock(return_value=fake_run)),
patch(
"skyvern.forge.app.WORKFLOW_SERVICE.mark_workflow_run_as_timed_out",
AsyncMock(return_value=_make_wr(status="timed_out")),
),
patch("skyvern.forge.app.WORKFLOW_SERVICE.execute_workflow_webhook", AsyncMock()) as webhook_mock,
):
await timeout_workflow_run_activity(params)
webhook_mock.assert_not_awaited()
@pytest.mark.asyncio
async def test_timeout_activity_still_marks_db_as_timed_out() -> None:
params = TimeoutWorkflowRunParams(workflow_run_id="wr_1", organization_id="o_1")
fake_run = _make_wr(status="running")
mark_mock = AsyncMock(return_value=_make_wr(status="timed_out"))
with (
patch("skyvern.forge.app.DATABASE.workflow_runs.get_workflow_run", AsyncMock(return_value=fake_run)),
patch("skyvern.forge.app.WORKFLOW_SERVICE.mark_workflow_run_as_timed_out", mark_mock),
):
await timeout_workflow_run_activity(params)
mark_mock.assert_awaited_once_with("wr_1")