From f293aa6bdf3316c53ed2dbdd80df4e32c0e31665 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Fri, 29 May 2026 17:49:05 +0200 Subject: [PATCH] refactor(automations): move schedule trigger into builtin package --- .../app/automations/services/automation.py | 2 +- .../app/automations/services/trigger.py | 2 +- .../app/automations/tasks/schedule_tick.py | 185 ------------------ .../app/automations/triggers/__init__.py | 5 +- .../automations/triggers/builtin/__init__.py | 5 + .../{ => builtin}/schedule/__init__.py | 2 - .../triggers/{ => builtin}/schedule/cron.py | 0 .../{ => builtin}/schedule/definition.py | 5 +- .../triggers/{ => builtin}/schedule/params.py | 0 .../automations/triggers/schedule/dispatch.py | 67 ------- .../{ => builtin}/schedule/__init__.py | 0 .../{ => builtin}/schedule/test_cron.py | 2 +- .../{ => builtin}/schedule/test_params.py | 2 +- 13 files changed, 14 insertions(+), 263 deletions(-) delete mode 100644 surfsense_backend/app/automations/tasks/schedule_tick.py create mode 100644 surfsense_backend/app/automations/triggers/builtin/__init__.py rename surfsense_backend/app/automations/triggers/{ => builtin}/schedule/__init__.py (85%) rename surfsense_backend/app/automations/triggers/{ => builtin}/schedule/cron.py (100%) rename surfsense_backend/app/automations/triggers/{ => builtin}/schedule/definition.py (73%) rename surfsense_backend/app/automations/triggers/{ => builtin}/schedule/params.py (100%) delete mode 100644 surfsense_backend/app/automations/triggers/schedule/dispatch.py rename surfsense_backend/tests/unit/automations/triggers/{ => builtin}/schedule/__init__.py (100%) rename surfsense_backend/tests/unit/automations/triggers/{ => builtin}/schedule/test_cron.py (98%) rename surfsense_backend/tests/unit/automations/triggers/{ => builtin}/schedule/test_params.py (93%) diff --git a/surfsense_backend/app/automations/services/automation.py b/surfsense_backend/app/automations/services/automation.py index 0d2937e0e..8dc85a8fb 100644 --- a/surfsense_backend/app/automations/services/automation.py +++ b/surfsense_backend/app/automations/services/automation.py @@ -19,7 +19,7 @@ from app.automations.schemas.api import ( TriggerCreate, ) from app.automations.triggers import get_trigger -from app.automations.triggers.schedule import compute_next_fire_at +from app.automations.triggers.builtin.schedule import compute_next_fire_at from app.db import Permission, User, get_async_session from app.users import current_active_user from app.utils.rbac import check_permission diff --git a/surfsense_backend/app/automations/services/trigger.py b/surfsense_backend/app/automations/services/trigger.py index 29ac84557..523153927 100644 --- a/surfsense_backend/app/automations/services/trigger.py +++ b/surfsense_backend/app/automations/services/trigger.py @@ -13,7 +13,7 @@ from app.automations.persistence.models.automation import Automation from app.automations.persistence.models.trigger import AutomationTrigger from app.automations.schemas.api import TriggerCreate, TriggerUpdate from app.automations.triggers import get_trigger -from app.automations.triggers.schedule import compute_next_fire_at +from app.automations.triggers.builtin.schedule import compute_next_fire_at from app.db import Permission, User, get_async_session from app.users import current_active_user from app.utils.rbac import check_permission diff --git a/surfsense_backend/app/automations/tasks/schedule_tick.py b/surfsense_backend/app/automations/tasks/schedule_tick.py deleted file mode 100644 index 90fff66fc..000000000 --- a/surfsense_backend/app/automations/tasks/schedule_tick.py +++ /dev/null @@ -1,185 +0,0 @@ -"""Celery Beat tick that fires due ``schedule`` triggers. - -Runs every minute. Each tick performs two passes: - -1. **Self-heal**: enabled schedule triggers with NULL ``next_fire_at`` get - it computed from their ``cron`` + ``timezone`` (e.g. fresh inserts or - rows restored from backup). -2. **Claim & fire**: due rows are locked with ``FOR UPDATE SKIP LOCKED``, - their ``next_fire_at`` is advanced and ``last_fired_at`` is set, and - ``dispatch_schedule_run`` is invoked for each. Dispatch errors are - logged; a missed fire stays missed (matches K8s CronJob / Airflow - ``catchup=False`` semantics). -""" - -from __future__ import annotations - -import logging -from dataclasses import dataclass -from datetime import UTC, datetime - -from sqlalchemy import select -from sqlalchemy.ext.asyncio import AsyncSession - -from app.automations.persistence.enums.trigger_type import TriggerType -from app.automations.persistence.models.trigger import AutomationTrigger -from app.automations.triggers.schedule import ( - InvalidCronError, - compute_next_fire_at, - dispatch_schedule_run, -) -from app.celery_app import celery_app -from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task - -logger = logging.getLogger(__name__) - -TASK_NAME = "automation_schedule_tick" - -# Cap rows touched per tick so a backlog of due triggers can't starve the -# worker; remaining rows fire on the next tick. -_TICK_BATCH = 200 - - -@dataclass(frozen=True, slots=True) -class _Claim: - """Per-trigger fire context captured before row state is mutated.""" - - trigger_id: int - scheduled_for: datetime - previous_last_fired_at: datetime | None - - -@celery_app.task(name=TASK_NAME) -def automation_schedule_tick() -> None: - """Tick once: self-heal NULL next_fire_at, claim due rows, fire each.""" - return run_async_celery_task(_tick) - - -async def _tick() -> None: - session_maker = get_celery_session_maker() - async with session_maker() as session: - now = datetime.now(UTC) - - await _self_heal_null_next_fire(session, now=now) - - claims = await _claim_due_triggers(session, now=now) - if not claims: - return - - for claim in claims: - await _fire_one(session, claim=claim, fired_at=now) - - -async def _self_heal_null_next_fire(session: AsyncSession, *, now: datetime) -> None: - """Backfill ``next_fire_at`` for enabled schedule triggers missing it.""" - stmt = ( - select(AutomationTrigger) - .where( - AutomationTrigger.type == TriggerType.SCHEDULE, - AutomationTrigger.enabled.is_(True), - AutomationTrigger.next_fire_at.is_(None), - ) - .limit(_TICK_BATCH) - ) - triggers = (await session.execute(stmt)).scalars().all() - if not triggers: - return - - for trigger in triggers: - try: - trigger.next_fire_at = compute_next_fire_at( - trigger.params["cron"], - trigger.params["timezone"], - after=now, - ) - except (InvalidCronError, KeyError, TypeError) as exc: - logger.warning( - "automation_trigger %d has invalid schedule params, disabling: %s", - trigger.id, - exc, - ) - trigger.enabled = False - - await session.commit() - - -async def _claim_due_triggers(session: AsyncSession, *, now: datetime) -> list[_Claim]: - """Lock and advance due rows; return per-trigger fire context.""" - stmt = ( - select(AutomationTrigger) - .where( - AutomationTrigger.type == TriggerType.SCHEDULE, - AutomationTrigger.enabled.is_(True), - AutomationTrigger.next_fire_at.isnot(None), - AutomationTrigger.next_fire_at <= now, - ) - .order_by(AutomationTrigger.next_fire_at) - .limit(_TICK_BATCH) - .with_for_update(skip_locked=True) - ) - triggers = (await session.execute(stmt)).scalars().all() - if not triggers: - return [] - - claims: list[_Claim] = [] - for trigger in triggers: - # Snapshot fire-context BEFORE we advance the row. - scheduled_for = trigger.next_fire_at - previous_last_fired_at = trigger.last_fired_at - - try: - trigger.next_fire_at = compute_next_fire_at( - trigger.params["cron"], - trigger.params["timezone"], - after=now, - ) - except (InvalidCronError, KeyError, TypeError) as exc: - logger.warning( - "automation_trigger %d has invalid schedule params, disabling: %s", - trigger.id, - exc, - ) - trigger.enabled = False - continue - - trigger.last_fired_at = now - claims.append( - _Claim( - trigger_id=trigger.id, - scheduled_for=scheduled_for, - previous_last_fired_at=previous_last_fired_at, - ) - ) - - await session.commit() - return claims - - -async def _fire_one( - session: AsyncSession, *, claim: _Claim, fired_at: datetime -) -> None: - """Reload the trigger post-commit and dispatch a run for it.""" - trigger = await session.get(AutomationTrigger, claim.trigger_id) - if trigger is None: - return - - try: - run = await dispatch_schedule_run( - session=session, - trigger=trigger, - fired_at=fired_at, - scheduled_for=claim.scheduled_for, - previous_last_fired_at=claim.previous_last_fired_at, - ) - logger.info( - "scheduled fire: trigger=%d automation=%d run=%d", - claim.trigger_id, - trigger.automation_id, - run.id, - ) - except Exception: - logger.exception( - "scheduled fire failed for trigger %d (next attempt at next match)", - claim.trigger_id, - ) - await session.rollback() diff --git a/surfsense_backend/app/automations/triggers/__init__.py b/surfsense_backend/app/automations/triggers/__init__.py index f630ebf6f..9d28ddf5f 100644 --- a/surfsense_backend/app/automations/triggers/__init__.py +++ b/surfsense_backend/app/automations/triggers/__init__.py @@ -1,7 +1,6 @@ """Triggers domain: registry surface + built-in trigger packages. -Each trigger lives in its own subpackage (``schedule/``, ...) and -self-registers at import time via its ``definition`` module. +Built-in trigger types live under ``builtin/`` and self-register at import time. """ from __future__ import annotations @@ -17,4 +16,4 @@ __all__ = [ ] # Built-in triggers self-register at import time. -from . import schedule # noqa: F401 +from . import builtin # noqa: F401 diff --git a/surfsense_backend/app/automations/triggers/builtin/__init__.py b/surfsense_backend/app/automations/triggers/builtin/__init__.py new file mode 100644 index 000000000..17d8e914b --- /dev/null +++ b/surfsense_backend/app/automations/triggers/builtin/__init__.py @@ -0,0 +1,5 @@ +"""Built-in trigger types — each in its own subpackage, self-registering at import.""" + +from __future__ import annotations + +from . import event, schedule # noqa: F401 diff --git a/surfsense_backend/app/automations/triggers/schedule/__init__.py b/surfsense_backend/app/automations/triggers/builtin/schedule/__init__.py similarity index 85% rename from surfsense_backend/app/automations/triggers/schedule/__init__.py rename to surfsense_backend/app/automations/triggers/builtin/schedule/__init__.py index 92f478aac..0267b0577 100644 --- a/surfsense_backend/app/automations/triggers/schedule/__init__.py +++ b/surfsense_backend/app/automations/triggers/builtin/schedule/__init__.py @@ -3,14 +3,12 @@ from __future__ import annotations from .cron import InvalidCronError, compute_next_fire_at, validate_cron -from .dispatch import dispatch_schedule_run from .params import ScheduleTriggerParams __all__ = [ "InvalidCronError", "ScheduleTriggerParams", "compute_next_fire_at", - "dispatch_schedule_run", "validate_cron", ] diff --git a/surfsense_backend/app/automations/triggers/schedule/cron.py b/surfsense_backend/app/automations/triggers/builtin/schedule/cron.py similarity index 100% rename from surfsense_backend/app/automations/triggers/schedule/cron.py rename to surfsense_backend/app/automations/triggers/builtin/schedule/cron.py diff --git a/surfsense_backend/app/automations/triggers/schedule/definition.py b/surfsense_backend/app/automations/triggers/builtin/schedule/definition.py similarity index 73% rename from surfsense_backend/app/automations/triggers/schedule/definition.py rename to surfsense_backend/app/automations/triggers/builtin/schedule/definition.py index 605765307..a6b0b9b8e 100644 --- a/surfsense_backend/app/automations/triggers/schedule/definition.py +++ b/surfsense_backend/app/automations/triggers/builtin/schedule/definition.py @@ -2,8 +2,9 @@ from __future__ import annotations -from ..store import register_trigger -from ..types import TriggerDefinition +from app.automations.triggers.store import register_trigger +from app.automations.triggers.types import TriggerDefinition + from .params import ScheduleTriggerParams SCHEDULE_TRIGGER = TriggerDefinition( diff --git a/surfsense_backend/app/automations/triggers/schedule/params.py b/surfsense_backend/app/automations/triggers/builtin/schedule/params.py similarity index 100% rename from surfsense_backend/app/automations/triggers/schedule/params.py rename to surfsense_backend/app/automations/triggers/builtin/schedule/params.py diff --git a/surfsense_backend/app/automations/triggers/schedule/dispatch.py b/surfsense_backend/app/automations/triggers/schedule/dispatch.py deleted file mode 100644 index 6d3d5fcb9..000000000 --- a/surfsense_backend/app/automations/triggers/schedule/dispatch.py +++ /dev/null @@ -1,67 +0,0 @@ -"""Schedule dispatch adapter: load + guard, then call generic dispatch.""" - -from __future__ import annotations - -from datetime import datetime - -from sqlalchemy import select -from sqlalchemy.ext.asyncio import AsyncSession - -from app.automations.dispatch import DispatchError, dispatch_run -from app.automations.persistence.enums.automation_status import AutomationStatus -from app.automations.persistence.models.automation import Automation -from app.automations.persistence.models.run import AutomationRun -from app.automations.persistence.models.trigger import AutomationTrigger - - -async def dispatch_schedule_run( - *, - session: AsyncSession, - trigger: AutomationTrigger, - fired_at: datetime, - scheduled_for: datetime, - previous_last_fired_at: datetime | None, -) -> AutomationRun: - """Fire one scheduled run for ``trigger``. - - Emits calendar context as runtime inputs: - - - ``fired_at`` — actual fire time - - ``scheduled_for`` — cron-derived target time for this fire - - ``last_fired_at`` — fire time of the previous run, or null on first fire - - The caller (the schedule tick) is responsible for selecting due triggers - and advancing ``next_fire_at`` / ``last_fired_at`` before invoking this. - """ - automation = await _load_automation(session, trigger.automation_id) - if automation is None: - raise DispatchError( - f"automation {trigger.automation_id} not found for trigger {trigger.id}" - ) - - if automation.status != AutomationStatus.ACTIVE: - raise DispatchError( - f"automation {trigger.automation_id} is {automation.status.value}, not active" - ) - - runtime_inputs = { - "fired_at": fired_at.isoformat(), - "scheduled_for": scheduled_for.isoformat(), - "last_fired_at": ( - previous_last_fired_at.isoformat() if previous_last_fired_at else None - ), - } - - return await dispatch_run( - session=session, - automation=automation, - trigger=trigger, - runtime_inputs=runtime_inputs, - ) - - -async def _load_automation( - session: AsyncSession, automation_id: int -) -> Automation | None: - stmt = select(Automation).where(Automation.id == automation_id) - return (await session.execute(stmt)).scalar_one_or_none() diff --git a/surfsense_backend/tests/unit/automations/triggers/schedule/__init__.py b/surfsense_backend/tests/unit/automations/triggers/builtin/schedule/__init__.py similarity index 100% rename from surfsense_backend/tests/unit/automations/triggers/schedule/__init__.py rename to surfsense_backend/tests/unit/automations/triggers/builtin/schedule/__init__.py diff --git a/surfsense_backend/tests/unit/automations/triggers/schedule/test_cron.py b/surfsense_backend/tests/unit/automations/triggers/builtin/schedule/test_cron.py similarity index 98% rename from surfsense_backend/tests/unit/automations/triggers/schedule/test_cron.py rename to surfsense_backend/tests/unit/automations/triggers/builtin/schedule/test_cron.py index 5c7580823..618b82f2a 100644 --- a/surfsense_backend/tests/unit/automations/triggers/schedule/test_cron.py +++ b/surfsense_backend/tests/unit/automations/triggers/builtin/schedule/test_cron.py @@ -6,7 +6,7 @@ from datetime import UTC, datetime import pytest -from app.automations.triggers.schedule.cron import ( +from app.automations.triggers.builtin.schedule.cron import ( InvalidCronError, compute_next_fire_at, validate_cron, diff --git a/surfsense_backend/tests/unit/automations/triggers/schedule/test_params.py b/surfsense_backend/tests/unit/automations/triggers/builtin/schedule/test_params.py similarity index 93% rename from surfsense_backend/tests/unit/automations/triggers/schedule/test_params.py rename to surfsense_backend/tests/unit/automations/triggers/builtin/schedule/test_params.py index be98c5be1..bd9ebc621 100644 --- a/surfsense_backend/tests/unit/automations/triggers/schedule/test_params.py +++ b/surfsense_backend/tests/unit/automations/triggers/builtin/schedule/test_params.py @@ -5,7 +5,7 @@ from __future__ import annotations import pytest from pydantic import ValidationError -from app.automations.triggers.schedule.params import ScheduleTriggerParams +from app.automations.triggers.builtin.schedule.params import ScheduleTriggerParams pytestmark = pytest.mark.unit