mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-01 22:41:25 +00:00
refactor(gateway): model external chat surfaces over canonical chats
This commit is contained in:
parent
afcadfb4bf
commit
2a41a157f7
4 changed files with 64 additions and 36 deletions
|
|
@ -243,7 +243,6 @@ celery_app.conf.update(
|
|||
"index_obsidian_attachment": {"queue": CONNECTORS_QUEUE},
|
||||
# Everything else (document processing, podcasts, reindexing,
|
||||
# schedule checker, cleanup) stays on the default fast queue.
|
||||
"gateway.process_inbound_event": {"queue": f"{CELERY_TASK_DEFAULT_QUEUE}.gateway"},
|
||||
"gateway.reconcile_inbox": {"queue": f"{CELERY_TASK_DEFAULT_QUEUE}.gateway"},
|
||||
"gateway.health_check": {"queue": f"{CELERY_TASK_DEFAULT_QUEUE}.gateway"},
|
||||
"gateway.retention_sweep": {"queue": f"{CELERY_TASK_DEFAULT_QUEUE}.gateway"},
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
"""Celery tasks for messaging gateway intake and maintenance."""
|
||||
"""Celery maintenance tasks for external chat surfaces."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
|
|
@ -9,11 +9,11 @@ from sqlalchemy import select, update
|
|||
|
||||
from app.celery_app import celery_app
|
||||
from app.db import (
|
||||
GatewayEventStatus,
|
||||
GatewayHealthStatus,
|
||||
GatewayInboundEvent,
|
||||
GatewayPlatform,
|
||||
GatewayPlatformAccount,
|
||||
ExternalChatEventStatus,
|
||||
ExternalChatHealthStatus,
|
||||
ExternalChatInboundEvent,
|
||||
ExternalChatPlatform,
|
||||
ExternalChatAccount,
|
||||
)
|
||||
from app.gateway.accounts import account_token
|
||||
from app.gateway.inbox import persist_inbound_event, telegram_event_dedupe_key
|
||||
|
|
@ -27,17 +27,11 @@ from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_ta
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@celery_app.task(
|
||||
bind=True,
|
||||
name="gateway.process_inbound_event",
|
||||
acks_late=True,
|
||||
max_retries=5,
|
||||
retry_backoff=True,
|
||||
)
|
||||
def process_inbound_event_task(self, inbox_id: int) -> None:
|
||||
@celery_app.task(name="gateway.process_inbound_event")
|
||||
def process_inbound_event_task(inbox_id: int) -> None:
|
||||
logger.warning(
|
||||
"Ignoring Celery gateway.process_inbound_event for inbox_id=%s; "
|
||||
"GatewayRunner owns agent turn processing.",
|
||||
"Ignoring gateway.process_inbound_event for inbox_id=%s; "
|
||||
"FastAPI owns external chat agent turn processing.",
|
||||
inbox_id,
|
||||
)
|
||||
return None
|
||||
|
|
@ -50,14 +44,14 @@ def reconcile_inbox_task() -> None:
|
|||
async with session_maker() as session:
|
||||
stale_threshold = datetime.now(UTC) - timedelta(minutes=10)
|
||||
result = await session.execute(
|
||||
update(GatewayInboundEvent)
|
||||
update(ExternalChatInboundEvent)
|
||||
.where(
|
||||
GatewayInboundEvent.status == GatewayEventStatus.PROCESSING,
|
||||
GatewayInboundEvent.received_at < stale_threshold,
|
||||
ExternalChatInboundEvent.status == ExternalChatEventStatus.PROCESSING,
|
||||
ExternalChatInboundEvent.received_at < stale_threshold,
|
||||
)
|
||||
.values(
|
||||
status=GatewayEventStatus.RECEIVED,
|
||||
last_error="stale processing reset for gateway runner",
|
||||
status=ExternalChatEventStatus.RECEIVED,
|
||||
last_error="stale processing reset for FastAPI inbox worker",
|
||||
)
|
||||
)
|
||||
for _ in range(result.rowcount or 0):
|
||||
|
|
@ -72,22 +66,19 @@ def gateway_health_check_task() -> None:
|
|||
async def _run() -> None:
|
||||
session_maker = get_celery_session_maker()
|
||||
async with session_maker() as session:
|
||||
result = await session.execute(select(GatewayPlatformAccount))
|
||||
result = await session.execute(select(ExternalChatAccount))
|
||||
accounts = list(result.scalars())
|
||||
for account in accounts:
|
||||
token = account_token(account)
|
||||
if not token or account.platform != GatewayPlatform.TELEGRAM:
|
||||
if not token or account.platform != ExternalChatPlatform.TELEGRAM:
|
||||
continue
|
||||
try:
|
||||
metadata = await TelegramAdapter(token).validate_credentials()
|
||||
account.health_status = GatewayHealthStatus.OK
|
||||
account.account_metadata = {
|
||||
**(account.account_metadata or {}),
|
||||
"bot_username": metadata.get("username"),
|
||||
}
|
||||
account.health_status = ExternalChatHealthStatus.OK
|
||||
account.bot_username = metadata.get("username")
|
||||
except Exception:
|
||||
logger.warning("Gateway Telegram health check failed", exc_info=True)
|
||||
account.health_status = GatewayHealthStatus.FAILING
|
||||
logger.warning("External chat Telegram health check failed", exc_info=True)
|
||||
account.health_status = ExternalChatHealthStatus.FAILING
|
||||
record_gateway_health_check_failure(platform=account.platform.value)
|
||||
account.last_health_check_at = datetime.now(UTC)
|
||||
await session.commit()
|
||||
|
|
@ -95,6 +86,15 @@ def gateway_health_check_task() -> None:
|
|||
return run_async_celery_task(_run)
|
||||
|
||||
|
||||
@celery_app.task(name="gateway.enqueue_received_sweep")
|
||||
def enqueue_received_sweep_task() -> int:
|
||||
logger.info(
|
||||
"Skipping gateway.enqueue_received_sweep; "
|
||||
"FastAPI inbox worker scans RECEIVED rows directly."
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
@celery_app.task(name="gateway.retention_sweep")
|
||||
def gateway_retention_sweep_task() -> None:
|
||||
async def _run() -> None:
|
||||
|
|
@ -103,13 +103,13 @@ def gateway_retention_sweep_task() -> None:
|
|||
raw_cutoff = datetime.now(UTC) - timedelta(days=30)
|
||||
delete_cutoff = datetime.now(UTC) - timedelta(days=365)
|
||||
await session.execute(
|
||||
update(GatewayInboundEvent)
|
||||
.where(GatewayInboundEvent.received_at < raw_cutoff)
|
||||
update(ExternalChatInboundEvent)
|
||||
.where(ExternalChatInboundEvent.received_at < raw_cutoff)
|
||||
.values(raw_payload=None)
|
||||
)
|
||||
result = await session.execute(
|
||||
select(GatewayInboundEvent).where(
|
||||
GatewayInboundEvent.received_at < delete_cutoff
|
||||
select(ExternalChatInboundEvent).where(
|
||||
ExternalChatInboundEvent.received_at < delete_cutoff
|
||||
)
|
||||
)
|
||||
for event in result.scalars():
|
||||
|
|
@ -126,7 +126,7 @@ async def enqueue_telegram_update(account_id: int, raw_update: dict) -> int | No
|
|||
inbox_id = await persist_inbound_event(
|
||||
session,
|
||||
account_id=account_id,
|
||||
platform=GatewayPlatform.TELEGRAM,
|
||||
platform=ExternalChatPlatform.TELEGRAM,
|
||||
event_dedupe_key=telegram_event_dedupe_key(raw_update["update_id"]),
|
||||
external_event_id=str(raw_update["update_id"]),
|
||||
external_message_id=parsed.external_message_id,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,16 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from app.tasks.celery_tasks import gateway_tasks
|
||||
|
||||
|
||||
def test_enqueue_received_sweep_is_noop_guard(mocker):
|
||||
apply_async = mocker.Mock()
|
||||
mocker.patch.object(gateway_tasks.process_inbound_event_task, "apply_async", apply_async)
|
||||
info = mocker.patch.object(gateway_tasks.logger, "info")
|
||||
|
||||
replayed = gateway_tasks.enqueue_received_sweep_task.run()
|
||||
|
||||
apply_async.assert_not_called()
|
||||
assert replayed == 0
|
||||
info.assert_called_once()
|
||||
|
||||
|
|
@ -0,0 +1,13 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from app.tasks.celery_tasks import gateway_tasks
|
||||
|
||||
|
||||
def test_process_inbound_event_task_is_noop_guard(mocker):
|
||||
warning = mocker.patch.object(gateway_tasks.logger, "warning")
|
||||
|
||||
assert gateway_tasks.process_inbound_event_task.run(123) is None
|
||||
|
||||
warning.assert_called_once()
|
||||
assert "FastAPI owns external chat agent turn processing" in warning.call_args.args[0]
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue