eigent/backend/app/utils/server/event_sync.py

200 lines
6.6 KiB
Python

# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
"""Batch sync of local events to the cloud server.
Called after task completion to push all unsynced events in one
efficient batch. Only runs when ``CLOUD_SYNC_ENABLED=true`` and
``SERVER_URL`` are set.
Environment variables (passed by Electron via process env):
SERVER_URL -- resolved from VITE_PROXY_URL in .env.development
(Electron passes this to the backend process)
SERVER_TOKEN -- JWT auth token for cloud API
CLOUD_SYNC_ENABLED -- set to "true" to enable sync
(typically configured in ~/.eigent/.env)
"""
from __future__ import annotations
import logging
import sqlite3
from datetime import UTC, datetime
from pathlib import Path
import httpx
from app.component.environment import env
from app.event_store.config import get_event_db_path
from app.event_store.sqlite_store import SQLiteTranscriptStore
logger = logging.getLogger("event_sync")
BATCH_SIZE = 100
HTTP_TIMEOUT = 30.0
def _is_enabled() -> bool:
return env("CLOUD_SYNC_ENABLED", "").lower() == "true" and bool(
env("SERVER_URL", "")
)
async def sync_pending_events() -> None:
"""Push all unsynced local events to the cloud server in batches.
Intended to be called once after task completion (fire-and-forget
via ``asyncio.create_task``). Silently no-ops when cloud sync is
disabled or SERVER_URL is not set.
"""
if not _is_enabled():
return
db_path = get_event_db_path()
if not db_path.exists():
return
server_url = env("SERVER_URL", "").rstrip("/")
batch_url = f"{server_url}/chat/events/batch"
token = env("SERVER_TOKEN", "")
headers: dict[str, str] = {}
if token:
headers["Authorization"] = f"Bearer {token}"
total_synced = 0
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
while True:
unsynced = _read_unsynced(db_path, limit=BATCH_SIZE)
if not unsynced:
break
# Strip local-only columns before sending
payload_events = [
{
k: v
for k, v in e.items()
if k not in ("synced_at", "sync_attempts")
}
for e in unsynced
]
response = await client.post(
batch_url,
json={"events": payload_events},
headers=headers,
)
if response.status_code == 200:
data = response.json()
accepted = data.get("accepted", [])
if accepted:
_mark_synced(db_path, accepted)
total_synced += len(accepted)
rejected = data.get("rejected", [])
if rejected:
logger.warning(
f"Cloud rejected {len(rejected)} events: "
f"{rejected[:3]}"
)
# Don't retry rejected events this round
_increment_attempts(
db_path, [r["event_id"] for r in rejected]
)
# If nothing was accepted and nothing rejected,
# all were duplicates -- we're done
if not accepted and not rejected:
break
else:
logger.warning(
f"Cloud sync failed: HTTP {response.status_code}"
)
_increment_attempts(
db_path, [e["event_id"] for e in unsynced]
)
break # Don't retry on this call; next task will pick up
except httpx.ConnectError:
logger.debug("Cloud server unreachable, sync deferred to next task")
except httpx.TimeoutException:
logger.warning("Cloud sync timed out, sync deferred to next task")
except Exception as e:
logger.error(f"Unexpected sync error: {type(e).__name__}: {e}")
if total_synced:
logger.info(f"Synced {total_synced} events to cloud")
# -------------------------------------------------------------------
# SQLite helpers (one-shot connections to avoid holding locks)
# -------------------------------------------------------------------
def _read_unsynced(db_path: Path, limit: int) -> list[dict]:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
table_exists = conn.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='event_log'"
).fetchone()
if not table_exists:
return []
rows = conn.execute(
"SELECT * FROM event_log "
"WHERE synced_at IS NULL "
"ORDER BY run_id, seq "
"LIMIT ?",
(limit,),
).fetchall()
return [SQLiteTranscriptStore._row_to_canonical_dict(r) for r in rows]
finally:
conn.close()
def _mark_synced(db_path: Path, event_ids: list[str]) -> None:
if not event_ids:
return
now = datetime.now(UTC).isoformat()
placeholders = ",".join("?" for _ in event_ids)
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
f"UPDATE event_log SET synced_at = ? " # nosec B608
f"WHERE event_id IN ({placeholders})", # nosec B608
[now, *event_ids],
)
conn.commit()
finally:
conn.close()
def _increment_attempts(db_path: Path, event_ids: list[str]) -> None:
if not event_ids:
return
placeholders = ",".join("?" for _ in event_ids)
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
f"UPDATE event_log SET sync_attempts = sync_attempts + 1 " # nosec B608
f"WHERE event_id IN ({placeholders})", # nosec B608
event_ids,
)
conn.commit()
finally:
conn.close()