mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-25 23:06:28 +00:00
200 lines
6.6 KiB
Python
200 lines
6.6 KiB
Python
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
|
"""Batch sync of local events to the cloud server.
|
|
|
|
Called after task completion to push all unsynced events in one
|
|
efficient batch. Only runs when ``CLOUD_SYNC_ENABLED=true`` and
|
|
``SERVER_URL`` are set.
|
|
|
|
Environment variables (passed by Electron via process env):
|
|
SERVER_URL -- resolved from VITE_PROXY_URL in .env.development
|
|
(Electron passes this to the backend process)
|
|
SERVER_TOKEN -- JWT auth token for cloud API
|
|
|
|
CLOUD_SYNC_ENABLED -- set to "true" to enable sync
|
|
(typically configured in ~/.eigent/.env)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import sqlite3
|
|
from datetime import UTC, datetime
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
from app.component.environment import env
|
|
from app.event_store.config import get_event_db_path
|
|
from app.event_store.sqlite_store import SQLiteTranscriptStore
|
|
|
|
logger = logging.getLogger("event_sync")
|
|
|
|
BATCH_SIZE = 100
|
|
HTTP_TIMEOUT = 30.0
|
|
|
|
|
|
def _is_enabled() -> bool:
|
|
return env("CLOUD_SYNC_ENABLED", "").lower() == "true" and bool(
|
|
env("SERVER_URL", "")
|
|
)
|
|
|
|
|
|
async def sync_pending_events() -> None:
|
|
"""Push all unsynced local events to the cloud server in batches.
|
|
|
|
Intended to be called once after task completion (fire-and-forget
|
|
via ``asyncio.create_task``). Silently no-ops when cloud sync is
|
|
disabled or SERVER_URL is not set.
|
|
"""
|
|
if not _is_enabled():
|
|
return
|
|
|
|
db_path = get_event_db_path()
|
|
if not db_path.exists():
|
|
return
|
|
|
|
server_url = env("SERVER_URL", "").rstrip("/")
|
|
batch_url = f"{server_url}/chat/events/batch"
|
|
token = env("SERVER_TOKEN", "")
|
|
headers: dict[str, str] = {}
|
|
if token:
|
|
headers["Authorization"] = f"Bearer {token}"
|
|
|
|
total_synced = 0
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
|
|
while True:
|
|
unsynced = _read_unsynced(db_path, limit=BATCH_SIZE)
|
|
if not unsynced:
|
|
break
|
|
|
|
# Strip local-only columns before sending
|
|
payload_events = [
|
|
{
|
|
k: v
|
|
for k, v in e.items()
|
|
if k not in ("synced_at", "sync_attempts")
|
|
}
|
|
for e in unsynced
|
|
]
|
|
|
|
response = await client.post(
|
|
batch_url,
|
|
json={"events": payload_events},
|
|
headers=headers,
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
accepted = data.get("accepted", [])
|
|
if accepted:
|
|
_mark_synced(db_path, accepted)
|
|
total_synced += len(accepted)
|
|
|
|
rejected = data.get("rejected", [])
|
|
if rejected:
|
|
logger.warning(
|
|
f"Cloud rejected {len(rejected)} events: "
|
|
f"{rejected[:3]}"
|
|
)
|
|
# Don't retry rejected events this round
|
|
_increment_attempts(
|
|
db_path, [r["event_id"] for r in rejected]
|
|
)
|
|
|
|
# If nothing was accepted and nothing rejected,
|
|
# all were duplicates -- we're done
|
|
if not accepted and not rejected:
|
|
break
|
|
else:
|
|
logger.warning(
|
|
f"Cloud sync failed: HTTP {response.status_code}"
|
|
)
|
|
_increment_attempts(
|
|
db_path, [e["event_id"] for e in unsynced]
|
|
)
|
|
break # Don't retry on this call; next task will pick up
|
|
|
|
except httpx.ConnectError:
|
|
logger.debug("Cloud server unreachable, sync deferred to next task")
|
|
except httpx.TimeoutException:
|
|
logger.warning("Cloud sync timed out, sync deferred to next task")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected sync error: {type(e).__name__}: {e}")
|
|
|
|
if total_synced:
|
|
logger.info(f"Synced {total_synced} events to cloud")
|
|
|
|
|
|
# -------------------------------------------------------------------
|
|
# SQLite helpers (one-shot connections to avoid holding locks)
|
|
# -------------------------------------------------------------------
|
|
|
|
|
|
def _read_unsynced(db_path: Path, limit: int) -> list[dict]:
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
table_exists = conn.execute(
|
|
"SELECT name FROM sqlite_master "
|
|
"WHERE type='table' AND name='event_log'"
|
|
).fetchone()
|
|
if not table_exists:
|
|
return []
|
|
|
|
rows = conn.execute(
|
|
"SELECT * FROM event_log "
|
|
"WHERE synced_at IS NULL "
|
|
"ORDER BY run_id, seq "
|
|
"LIMIT ?",
|
|
(limit,),
|
|
).fetchall()
|
|
return [SQLiteTranscriptStore._row_to_canonical_dict(r) for r in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _mark_synced(db_path: Path, event_ids: list[str]) -> None:
|
|
if not event_ids:
|
|
return
|
|
now = datetime.now(UTC).isoformat()
|
|
placeholders = ",".join("?" for _ in event_ids)
|
|
conn = sqlite3.connect(str(db_path))
|
|
try:
|
|
conn.execute(
|
|
f"UPDATE event_log SET synced_at = ? " # nosec B608
|
|
f"WHERE event_id IN ({placeholders})", # nosec B608
|
|
[now, *event_ids],
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _increment_attempts(db_path: Path, event_ids: list[str]) -> None:
|
|
if not event_ids:
|
|
return
|
|
placeholders = ",".join("?" for _ in event_ids)
|
|
conn = sqlite3.connect(str(db_path))
|
|
try:
|
|
conn.execute(
|
|
f"UPDATE event_log SET sync_attempts = sync_attempts + 1 " # nosec B608
|
|
f"WHERE event_id IN ({placeholders})", # nosec B608
|
|
event_ids,
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|