eigent/backend/app/utils/server/sync_step.py

235 lines
6.3 KiB
Python

# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
"""
Cloud sync step decorator.
Persists outgoing SSE step data to the local SQLite event log.
When SERVER_URL is configured, the same step data can also be
forwarded to the cloud server in batches.
Config (~/.eigent/.env):
SERVER_URL=https://dev.eigent.ai/api
"""
import asyncio
import json
import logging
import time
from functools import lru_cache
import httpx
from app.component.environment import env
from app.event_store.config import get_event_db_path
from app.event_store.sqlite_store import SQLiteTranscriptStore
from app.service.task import get_task_lock_if_exists
logger = logging.getLogger("sync_step")
# Batch config for decompose_text events
BATCH_WORD_THRESHOLD = 5
# Buffer storage: task_id -> accumulated text
_text_buffers: dict[str, str] = {}
@lru_cache(maxsize=1)
def _get_sync_url():
server_url = env("SERVER_URL", "")
if not server_url:
return None
return f"{server_url.rstrip('/')}/chat/steps"
def sync_step(func):
async def wrapper(*args, **kwargs):
sync_url = _get_sync_url()
try:
async for value in func(*args, **kwargs):
_record_step(args, value, sync_url)
yield value
finally:
run_id = _get_run_id(args)
if run_id in _text_buffers:
_flush_buffer(args, run_id, sync_url)
return wrapper
def _record_step(args, value, sync_url):
data = _parse_value(value)
if not data:
return
run_id = _get_run_id(args)
if not run_id:
return
step = data.get("step")
# Batch decompose_text events to reduce event volume.
if step == "decompose_text":
_buffer_text(run_id, data["data"].get("content", ""))
if _should_flush(run_id):
_flush_buffer(args, run_id, sync_url)
return
# Flush any buffered text before sending other events.
if run_id in _text_buffers:
_flush_buffer(args, run_id, sync_url)
_persist_local_event(args, run_id, step, data["data"])
if not sync_url:
return
payload = {
"task_id": _get_current_task_id(args, data["data"]),
"step": step,
"data": data["data"],
"timestamp": time.time_ns() / 1_000_000_000,
}
asyncio.create_task(_send(sync_url, payload))
def _buffer_text(run_id: str, content: str):
"""Accumulate decompose_text content in buffer."""
if run_id not in _text_buffers:
_text_buffers[run_id] = ""
_text_buffers[run_id] += content
def _should_flush(run_id: str) -> bool:
"""Check if buffer has enough words to flush."""
text = _text_buffers.get(run_id, "")
word_count = len(text.split())
return word_count >= BATCH_WORD_THRESHOLD
def _flush_buffer(args, run_id: str, sync_url: str | None):
"""Send buffered text and clear buffer."""
text = _text_buffers.pop(run_id, "")
if not text:
return
data = {"content": text}
_persist_local_event(args, run_id, "decompose_text", data)
if not sync_url:
return
payload = {
"task_id": _get_current_task_id(args, data),
"step": "decompose_text",
"data": data,
"timestamp": time.time_ns() / 1_000_000_000,
}
asyncio.create_task(_send(sync_url, payload))
def _parse_value(value):
if isinstance(value, str) and value.startswith("data: "):
value = value[6:].strip()
try:
data = json.loads(value)
if "step" in data and "data" in data:
return data
except (json.JSONDecodeError, TypeError):
pass
return None
def _get_run_id(args):
if not args or not hasattr(args[0], "task_id"):
return None
return getattr(args[0], "task_id", None)
def _get_current_task_id(args, payload=None):
payload = payload if isinstance(payload, dict) else {}
if task_id := payload.get("task_id") or payload.get("process_task_id"):
return task_id
if not args or not hasattr(args[0], "task_id"):
return None
chat = args[0]
task_lock = get_task_lock_if_exists(chat.project_id)
if task_lock and getattr(task_lock, "current_task_id", None):
return task_lock.current_task_id
if not task_lock:
logger.warning(
f"Task lock not found for project_id {chat.project_id}, "
f"using chat.task_id"
)
return chat.task_id
def _persist_local_event(args, run_id: str, step: str, payload):
if not args or not hasattr(args[0], "project_id"):
return
chat = args[0]
try:
SQLiteTranscriptStore.append_event(
get_event_db_path(),
run_id=run_id,
project_id=chat.project_id,
task_id=_get_current_task_id(args, payload),
event_type=step,
payload=payload,
source="eigent_sse",
agent_id=_extract_agent_id(payload),
agent_name=_extract_agent_name(payload),
)
except Exception as e:
logger.warning(
f"Failed to persist local step event {step}: "
f"{type(e).__name__}: {e}"
)
def _extract_agent_id(payload) -> str | None:
if not isinstance(payload, dict):
return None
return (
payload.get("agent_id")
or payload.get("assignee_id")
or payload.get("worker_id")
)
def _extract_agent_name(payload) -> str | None:
if not isinstance(payload, dict):
return None
return payload.get("agent_name") or payload.get("role")
async def _send(url, data):
try:
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(url, json=data)
except Exception as e:
logger.error(f"Failed to sync step to {url}: {type(e).__name__}: {e}")