diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index 2423133fb..73746f04f 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -243,7 +243,6 @@ celery_app.conf.update( "index_obsidian_attachment": {"queue": CONNECTORS_QUEUE}, # Everything else (document processing, podcasts, reindexing, # schedule checker, cleanup) stays on the default fast queue. - "gateway.process_inbound_event": {"queue": f"{CELERY_TASK_DEFAULT_QUEUE}.gateway"}, "gateway.reconcile_inbox": {"queue": f"{CELERY_TASK_DEFAULT_QUEUE}.gateway"}, "gateway.health_check": {"queue": f"{CELERY_TASK_DEFAULT_QUEUE}.gateway"}, "gateway.retention_sweep": {"queue": f"{CELERY_TASK_DEFAULT_QUEUE}.gateway"}, diff --git a/surfsense_backend/app/tasks/celery_tasks/gateway_tasks.py b/surfsense_backend/app/tasks/celery_tasks/gateway_tasks.py index b8076b5a7..aeb3d721e 100644 --- a/surfsense_backend/app/tasks/celery_tasks/gateway_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/gateway_tasks.py @@ -1,4 +1,4 @@ -"""Celery tasks for messaging gateway intake and maintenance.""" +"""Celery maintenance tasks for external chat surfaces.""" from __future__ import annotations @@ -9,11 +9,11 @@ from sqlalchemy import select, update from app.celery_app import celery_app from app.db import ( - GatewayEventStatus, - GatewayHealthStatus, - GatewayInboundEvent, - GatewayPlatform, - GatewayPlatformAccount, + ExternalChatEventStatus, + ExternalChatHealthStatus, + ExternalChatInboundEvent, + ExternalChatPlatform, + ExternalChatAccount, ) from app.gateway.accounts import account_token from app.gateway.inbox import persist_inbound_event, telegram_event_dedupe_key @@ -27,17 +27,11 @@ from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_ta logger = logging.getLogger(__name__) -@celery_app.task( - bind=True, - name="gateway.process_inbound_event", - acks_late=True, - max_retries=5, - retry_backoff=True, -) -def process_inbound_event_task(self, inbox_id: int) -> None: +@celery_app.task(name="gateway.process_inbound_event") +def process_inbound_event_task(inbox_id: int) -> None: logger.warning( - "Ignoring Celery gateway.process_inbound_event for inbox_id=%s; " - "GatewayRunner owns agent turn processing.", + "Ignoring gateway.process_inbound_event for inbox_id=%s; " + "FastAPI owns external chat agent turn processing.", inbox_id, ) return None @@ -50,14 +44,14 @@ def reconcile_inbox_task() -> None: async with session_maker() as session: stale_threshold = datetime.now(UTC) - timedelta(minutes=10) result = await session.execute( - update(GatewayInboundEvent) + update(ExternalChatInboundEvent) .where( - GatewayInboundEvent.status == GatewayEventStatus.PROCESSING, - GatewayInboundEvent.received_at < stale_threshold, + ExternalChatInboundEvent.status == ExternalChatEventStatus.PROCESSING, + ExternalChatInboundEvent.received_at < stale_threshold, ) .values( - status=GatewayEventStatus.RECEIVED, - last_error="stale processing reset for gateway runner", + status=ExternalChatEventStatus.RECEIVED, + last_error="stale processing reset for FastAPI inbox worker", ) ) for _ in range(result.rowcount or 0): @@ -72,22 +66,19 @@ def gateway_health_check_task() -> None: async def _run() -> None: session_maker = get_celery_session_maker() async with session_maker() as session: - result = await session.execute(select(GatewayPlatformAccount)) + result = await session.execute(select(ExternalChatAccount)) accounts = list(result.scalars()) for account in accounts: token = account_token(account) - if not token or account.platform != GatewayPlatform.TELEGRAM: + if not token or account.platform != ExternalChatPlatform.TELEGRAM: continue try: metadata = await TelegramAdapter(token).validate_credentials() - account.health_status = GatewayHealthStatus.OK - account.account_metadata = { - **(account.account_metadata or {}), - "bot_username": metadata.get("username"), - } + account.health_status = ExternalChatHealthStatus.OK + account.bot_username = metadata.get("username") except Exception: - logger.warning("Gateway Telegram health check failed", exc_info=True) - account.health_status = GatewayHealthStatus.FAILING + logger.warning("External chat Telegram health check failed", exc_info=True) + account.health_status = ExternalChatHealthStatus.FAILING record_gateway_health_check_failure(platform=account.platform.value) account.last_health_check_at = datetime.now(UTC) await session.commit() @@ -95,6 +86,15 @@ def gateway_health_check_task() -> None: return run_async_celery_task(_run) +@celery_app.task(name="gateway.enqueue_received_sweep") +def enqueue_received_sweep_task() -> int: + logger.info( + "Skipping gateway.enqueue_received_sweep; " + "FastAPI inbox worker scans RECEIVED rows directly." + ) + return 0 + + @celery_app.task(name="gateway.retention_sweep") def gateway_retention_sweep_task() -> None: async def _run() -> None: @@ -103,13 +103,13 @@ def gateway_retention_sweep_task() -> None: raw_cutoff = datetime.now(UTC) - timedelta(days=30) delete_cutoff = datetime.now(UTC) - timedelta(days=365) await session.execute( - update(GatewayInboundEvent) - .where(GatewayInboundEvent.received_at < raw_cutoff) + update(ExternalChatInboundEvent) + .where(ExternalChatInboundEvent.received_at < raw_cutoff) .values(raw_payload=None) ) result = await session.execute( - select(GatewayInboundEvent).where( - GatewayInboundEvent.received_at < delete_cutoff + select(ExternalChatInboundEvent).where( + ExternalChatInboundEvent.received_at < delete_cutoff ) ) for event in result.scalars(): @@ -126,7 +126,7 @@ async def enqueue_telegram_update(account_id: int, raw_update: dict) -> int | No inbox_id = await persist_inbound_event( session, account_id=account_id, - platform=GatewayPlatform.TELEGRAM, + platform=ExternalChatPlatform.TELEGRAM, event_dedupe_key=telegram_event_dedupe_key(raw_update["update_id"]), external_event_id=str(raw_update["update_id"]), external_message_id=parsed.external_message_id, diff --git a/surfsense_backend/tests/unit/gateway/test_enqueue_received_sweep.py b/surfsense_backend/tests/unit/gateway/test_enqueue_received_sweep.py new file mode 100644 index 000000000..5fe46502f --- /dev/null +++ b/surfsense_backend/tests/unit/gateway/test_enqueue_received_sweep.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from app.tasks.celery_tasks import gateway_tasks + + +def test_enqueue_received_sweep_is_noop_guard(mocker): + apply_async = mocker.Mock() + mocker.patch.object(gateway_tasks.process_inbound_event_task, "apply_async", apply_async) + info = mocker.patch.object(gateway_tasks.logger, "info") + + replayed = gateway_tasks.enqueue_received_sweep_task.run() + + apply_async.assert_not_called() + assert replayed == 0 + info.assert_called_once() + diff --git a/surfsense_backend/tests/unit/gateway/test_process_inbound_event_task.py b/surfsense_backend/tests/unit/gateway/test_process_inbound_event_task.py new file mode 100644 index 000000000..484eacd1a --- /dev/null +++ b/surfsense_backend/tests/unit/gateway/test_process_inbound_event_task.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from app.tasks.celery_tasks import gateway_tasks + + +def test_process_inbound_event_task_is_noop_guard(mocker): + warning = mocker.patch.object(gateway_tasks.logger, "warning") + + assert gateway_tasks.process_inbound_event_task.run(123) is None + + warning.assert_called_once() + assert "FastAPI owns external chat agent turn processing" in warning.call_args.args[0] +