feat: make it simpler. local db store UI events

This commit is contained in:
a7m-1st 2026-04-06 04:09:41 +03:00
parent 799696a2a9
commit fc03b0d086
18 changed files with 988 additions and 756 deletions

View file

@ -89,3 +89,13 @@ def list_projects() -> list[dict[str, Any]]:
return []
return SQLiteTranscriptStore.query_projects(db_path)
@router.get("/events/steps/{run_id}", name="list local playback steps")
def list_playback_steps(run_id: str) -> list[dict[str, Any]]:
"""Return saved Eigent SSE steps for a run in playback order."""
db_path = get_event_db_path()
if not db_path.exists():
return []
return SQLiteTranscriptStore.query_playback_steps(db_path, run_id=run_id)

View file

@ -51,12 +51,21 @@ class EventEnvelope:
def to_dict(self) -> dict[str, Any]:
data = asdict(self)
if isinstance(data["payload"], str):
data["payload"] = json.loads(data["payload"])
try:
data["payload"] = json.loads(data["payload"])
except json.JSONDecodeError:
data["payload"] = data["payload"]
return data
def to_compat_dict(self) -> dict[str, Any]:
"""Return a dict compatible with CAMEL's TranscriptStore.read_all()
format (workforce_id, timestamp keys)."""
payload = self.payload
if isinstance(payload, str):
try:
payload = json.loads(payload)
except json.JSONDecodeError:
pass
return {
"event_type": self.event_type,
"workforce_id": self.run_id,
@ -65,9 +74,7 @@ class EventEnvelope:
"agent_id": self.agent_id,
"agent_name": self.agent_name,
"source": self.source,
"payload": self.payload
if isinstance(self.payload, dict)
else json.loads(self.payload),
"payload": payload,
}

View file

@ -44,6 +44,8 @@ class SQLiteTranscriptStore:
via the ``transcript_store`` parameter on ``Workforce.__init__``.
"""
_write_lock = threading.Lock()
def __init__(
self,
path: str | Path,
@ -53,7 +55,6 @@ class SQLiteTranscriptStore:
self.path = Path(path)
self.run_id = run_id
self.project_id = project_id
self._lock = threading.Lock()
self.path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(
@ -70,10 +71,14 @@ class SQLiteTranscriptStore:
# ------------------------------------------------------------------
def _ensure_schema(self) -> None:
with self._conn:
self._conn.execute(CREATE_EVENT_LOG_TABLE)
self._ensure_schema_on_conn(self._conn)
@staticmethod
def _ensure_schema_on_conn(conn: sqlite3.Connection) -> None:
with conn:
conn.execute(CREATE_EVENT_LOG_TABLE)
for idx_ddl in CREATE_INDEXES:
self._conn.execute(idx_ddl)
conn.execute(idx_ddl)
# ------------------------------------------------------------------
# TranscriptStore-compatible interface
@ -87,39 +92,60 @@ class SQLiteTranscriptStore:
"""
envelope = self._to_envelope(event)
with self._lock:
with self._write_lock:
with self._conn:
seq = self._next_seq(self._conn)
envelope.seq = seq
self._conn.execute(
"""\
INSERT INTO event_log (
event_id, run_id, project_id, task_id, seq,
event_type, occurred_at, source,
agent_id, agent_name, schema_version,
payload, synced_at, sync_attempts
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
envelope.event_id,
envelope.run_id,
envelope.project_id,
envelope.task_id,
envelope.seq,
envelope.event_type,
envelope.occurred_at,
envelope.source,
envelope.agent_id,
envelope.agent_name,
envelope.schema_version,
json.dumps(envelope.payload, ensure_ascii=False),
envelope.synced_at,
envelope.sync_attempts,
),
)
self._insert_envelope(self._conn, envelope)
return event.to_dict()
@classmethod
def append_event(
cls,
db_path: str | Path,
*,
run_id: str,
project_id: str,
event_type: str,
payload: Any,
source: str,
task_id: str | None = None,
agent_id: str | None = None,
agent_name: str | None = None,
occurred_at: str | None = None,
) -> dict[str, Any]:
"""Append a non-CAMEL event into the canonical local event log."""
path = Path(db_path)
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA busy_timeout=5000;")
try:
cls._ensure_schema_on_conn(conn)
envelope = EventEnvelope(
run_id=run_id,
project_id=project_id,
task_id=task_id,
event_type=event_type,
occurred_at=occurred_at or datetime.now(UTC).isoformat(),
source=source,
agent_id=agent_id,
agent_name=agent_name,
schema_version=SCHEMA_VERSION,
payload=payload,
)
with cls._write_lock:
with conn:
envelope.seq = cls._next_seq_for_run(conn, run_id)
cls._insert_envelope(conn, envelope)
return envelope.to_dict()
finally:
conn.close()
def read_all(self) -> list[dict[str, Any]]:
"""Read all events for this run, returning dicts in the same
format as CAMEL's JSONL ``TranscriptStore``."""
@ -160,7 +186,7 @@ class SQLiteTranscriptStore:
return
now = datetime.now(UTC).isoformat()
placeholders = ",".join("?" for _ in event_ids)
with self._lock:
with self._write_lock:
with self._conn:
self._conn.execute(
f"UPDATE event_log SET synced_at = ? " # nosec B608
@ -173,7 +199,7 @@ class SQLiteTranscriptStore:
if not event_ids:
return
placeholders = ",".join("?" for _ in event_ids)
with self._lock:
with self._write_lock:
with self._conn:
self._conn.execute(
f"UPDATE event_log SET sync_attempts = sync_attempts + 1 " # nosec B608
@ -257,41 +283,87 @@ class SQLiteTranscriptStore:
if not table_check:
return []
params: list[Any] = []
where_clause = ""
if project_id:
rows = conn.execute(
"""\
SELECT run_id, project_id,
MIN(task_id) AS task_id,
COUNT(*) AS event_count,
MIN(occurred_at) AS first_event,
MAX(occurred_at) AS last_event
FROM event_log
WHERE project_id = ?
GROUP BY run_id
ORDER BY MAX(occurred_at) DESC
""",
(project_id,),
).fetchall()
else:
rows = conn.execute(
"""\
SELECT run_id, project_id,
MIN(task_id) AS task_id,
COUNT(*) AS event_count,
MIN(occurred_at) AS first_event,
MAX(occurred_at) AS last_event
FROM event_log
GROUP BY run_id
ORDER BY MAX(occurred_at) DESC
""",
).fetchall()
return [dict(r) for r in rows]
where_clause = "WHERE project_id = ?"
params.append(project_id)
rows = conn.execute(
f"""\
SELECT *
FROM event_log
{where_clause}
ORDER BY run_id, seq
""",
params,
).fetchall()
events = [SQLiteTranscriptStore._row_to_canonical_dict(r) for r in rows]
return SQLiteTranscriptStore._summarize_runs(events)
finally:
conn.close()
@staticmethod
def query_projects(db_path: str | Path) -> list[dict[str, Any]]:
"""List projects with aggregated stats. Used by the local API."""
runs = SQLiteTranscriptStore.query_runs(db_path)
project_map: dict[str, dict[str, Any]] = {}
for run in runs:
project_id = run["project_id"]
project = project_map.setdefault(
project_id,
{
"project_id": project_id,
"run_count": 0,
"task_count": 0,
"event_count": 0,
"first_event": run["first_event"],
"last_event": run["last_event"],
"last_prompt": run["question"],
"total_completed_tasks": 0,
"total_ongoing_tasks": 0,
"sync_status": "local",
},
)
project["run_count"] += 1
project["task_count"] += 1
project["event_count"] += run["event_count"]
project["first_event"] = min(
project["first_event"], run["first_event"]
)
if run["last_event"] >= project["last_event"]:
project["last_event"] = run["last_event"]
project["last_prompt"] = run["question"]
if run["status"] == 2:
project["total_completed_tasks"] += 1
else:
project["total_ongoing_tasks"] += 1
project.setdefault("_sync_states", []).append(run["sync_status"])
for project in project_map.values():
sync_states = set(project.pop("_sync_states", []))
if sync_states == {"synced"}:
project["sync_status"] = "synced"
elif sync_states == {"local"}:
project["sync_status"] = "local"
else:
project["sync_status"] = "partial"
return sorted(
project_map.values(),
key=lambda item: item["last_event"],
reverse=True,
)
@staticmethod
def query_playback_steps(
db_path: str | Path,
*,
run_id: str,
) -> list[dict[str, Any]]:
"""Return saved Eigent SSE steps for a run in playback order."""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
@ -303,13 +375,56 @@ class SQLiteTranscriptStore:
rows = conn.execute(
"""\
SELECT project_id,
COUNT(DISTINCT run_id) AS run_count,
COUNT(*) AS event_count,
MIN(occurred_at) AS first_event,
SELECT event_type, payload, task_id, occurred_at
FROM event_log
WHERE run_id = ? AND source = 'eigent_sse'
ORDER BY seq
""",
(run_id,),
).fetchall()
steps: list[dict[str, Any]] = []
for row in rows:
payload = row["payload"]
if isinstance(payload, str):
payload = json.loads(payload)
steps.append(
{
"step": row["event_type"],
"data": payload,
"task_id": row["task_id"],
"created_at": row["occurred_at"],
}
)
return steps
finally:
conn.close()
@staticmethod
def query_sync_status(
db_path: str | Path,
) -> list[dict[str, Any]]:
"""Return sync status per run. Used by the local API."""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
table_check = conn.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='event_log'"
).fetchone()
if not table_check:
return []
rows = conn.execute(
"""\
SELECT run_id,
project_id,
COUNT(*) AS total_events,
SUM(CASE WHEN synced_at IS NOT NULL
THEN 1 ELSE 0 END) AS synced_events,
MAX(occurred_at) AS last_event
FROM event_log
GROUP BY project_id
GROUP BY run_id
ORDER BY MAX(occurred_at) DESC
""",
).fetchall()
@ -322,13 +437,48 @@ class SQLiteTranscriptStore:
# ------------------------------------------------------------------
def _next_seq(self, conn: sqlite3.Connection) -> int:
return self._next_seq_for_run(conn, self.run_id)
@staticmethod
def _next_seq_for_run(conn: sqlite3.Connection, run_id: str) -> int:
row = conn.execute(
"SELECT COALESCE(MAX(seq), -1) + 1 AS next_seq "
"FROM event_log WHERE run_id = ?",
(self.run_id,),
(run_id,),
).fetchone()
return row["next_seq"]
@staticmethod
def _insert_envelope(
conn: sqlite3.Connection, envelope: EventEnvelope
) -> None:
conn.execute(
"""\
INSERT INTO event_log (
event_id, run_id, project_id, task_id, seq,
event_type, occurred_at, source,
agent_id, agent_name, schema_version,
payload, synced_at, sync_attempts
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
envelope.event_id,
envelope.run_id,
envelope.project_id,
envelope.task_id,
envelope.seq,
envelope.event_type,
envelope.occurred_at,
envelope.source,
envelope.agent_id,
envelope.agent_name,
envelope.schema_version,
json.dumps(envelope.payload, ensure_ascii=False),
envelope.synced_at,
envelope.sync_attempts,
),
)
def _to_envelope(self, event: TranscriptEvent) -> EventEnvelope:
return EventEnvelope(
event_id=str(uuid.uuid4()),
@ -370,3 +520,109 @@ class SQLiteTranscriptStore:
if isinstance(data.get("payload"), str):
data["payload"] = json.loads(data["payload"])
return data
@staticmethod
def _summarize_runs(events: list[dict[str, Any]]) -> list[dict[str, Any]]:
run_map: dict[str, list[dict[str, Any]]] = {}
for event in events:
run_map.setdefault(event["run_id"], []).append(event)
summaries = [
SQLiteTranscriptStore._summarize_run(run_events)
for run_events in run_map.values()
if run_events
]
return sorted(
summaries,
key=lambda item: item["last_event"],
reverse=True,
)
@staticmethod
def _summarize_run(events: list[dict[str, Any]]) -> dict[str, Any]:
first_event = events[0]
last_event = events[-1]
synced_events = sum(1 for event in events if event["synced_at"])
total_events = len(events)
if synced_events == total_events and total_events > 0:
sync_status = "synced"
elif synced_events == 0:
sync_status = "local"
else:
sync_status = "pending"
return {
"id": f"local:{first_event['run_id']}",
"run_id": first_event["run_id"],
"project_id": first_event["project_id"],
"task_id": first_event["run_id"],
"event_count": total_events,
"first_event": first_event["occurred_at"],
"last_event": last_event["occurred_at"],
"question": SQLiteTranscriptStore._extract_question(events),
"summary": SQLiteTranscriptStore._extract_summary(events),
"status": 2
if SQLiteTranscriptStore._is_run_complete(events)
else 1,
"synced_events": synced_events,
"sync_status": sync_status,
}
@staticmethod
def _extract_question(events: list[dict[str, Any]]) -> str:
for event in events:
payload = event.get("payload")
if not isinstance(payload, dict):
continue
if event["event_type"] == "confirmed":
question = payload.get("question")
if question:
return question
if event["event_type"] in {"user_message", "task_created"}:
question = payload.get("message") or payload.get(
"description"
)
if question:
return question
return ""
@staticmethod
def _extract_summary(events: list[dict[str, Any]]) -> str:
for event in reversed(events):
payload = event.get("payload")
if event["event_type"] == "end":
if isinstance(payload, str):
return SQLiteTranscriptStore._strip_summary_tags(payload)
if isinstance(payload, dict):
summary = (
payload.get("summary")
or payload.get("task_result")
or payload.get("result")
)
if summary:
return SQLiteTranscriptStore._strip_summary_tags(
str(summary)
)
if event["event_type"] == "task_completed" and isinstance(
payload, dict
):
summary = payload.get("result_summary")
if summary:
return str(summary)
if event["event_type"] == "error" and isinstance(payload, dict):
message = payload.get("message")
if message:
return str(message)
return ""
@staticmethod
def _is_run_complete(events: list[dict[str, Any]]) -> bool:
terminal_events = {"end", "error", "all_tasks_completed"}
return any(event["event_type"] in terminal_events for event in events)
@staticmethod
def _strip_summary_tags(value: str) -> str:
if "<summary>" in value and "</summary>" in value:
return value.split("<summary>", 1)[1].split("</summary>", 1)[0]
return value

View file

@ -23,7 +23,7 @@ Environment variables (passed by Electron via process env):
SERVER_TOKEN -- JWT auth token for cloud API
CLOUD_SYNC_ENABLED -- set to "true" to enable sync
(toggle via Electron IPC / ~/.eigent/.env)
(typically configured in ~/.eigent/.env)
"""
from __future__ import annotations

View file

@ -14,8 +14,9 @@
"""
Cloud sync step decorator.
Syncs SSE step data to cloud server when SERVER_URL is configured.
High-frequency events (decompose_text) are batched to reduce API calls.
Persists outgoing SSE step data to the local SQLite event log.
When SERVER_URL is configured, the same step data can also be
forwarded to the cloud server in batches.
Config (~/.eigent/.env):
SERVER_URL=https://dev.eigent.ai/api
@ -30,6 +31,8 @@ from functools import lru_cache
import httpx
from app.component.environment import env
from app.event_store.config import get_event_db_path
from app.event_store.sqlite_store import SQLiteTranscriptStore
from app.service.task import get_task_lock_if_exists
logger = logging.getLogger("sync_step")
@ -42,7 +45,7 @@ _text_buffers: dict[str, str] = {}
@lru_cache(maxsize=1)
def _get_config():
def _get_sync_url():
server_url = env("SERVER_URL", "")
if not server_url:
@ -53,44 +56,49 @@ def _get_config():
def sync_step(func):
async def wrapper(*args, **kwargs):
config = _get_config()
sync_url = _get_sync_url()
if not config:
try:
async for value in func(*args, **kwargs):
_record_step(args, value, sync_url)
yield value
return
async for value in func(*args, **kwargs):
_try_sync(args, value, config)
yield value
finally:
run_id = _get_run_id(args)
if run_id in _text_buffers:
_flush_buffer(args, run_id, sync_url)
return wrapper
def _try_sync(args, value, sync_url):
def _record_step(args, value, sync_url):
data = _parse_value(value)
if not data:
return
task_id = _get_task_id(args)
if not task_id:
run_id = _get_run_id(args)
if not run_id:
return
step = data.get("step")
# Batch decompose_text events to reduce API calls
# Batch decompose_text events to reduce event volume.
if step == "decompose_text":
_buffer_text(task_id, data["data"].get("content", ""))
if _should_flush(task_id):
_flush_buffer(task_id, sync_url)
_buffer_text(run_id, data["data"].get("content", ""))
if _should_flush(run_id):
_flush_buffer(args, run_id, sync_url)
return
# Flush any buffered text before sending other events (preserves order)
if task_id in _text_buffers:
_flush_buffer(task_id, sync_url)
# Flush any buffered text before sending other events.
if run_id in _text_buffers:
_flush_buffer(args, run_id, sync_url)
_persist_local_event(args, run_id, step, data["data"])
if not sync_url:
return
payload = {
"task_id": task_id,
"task_id": _get_current_task_id(args, data["data"]),
"step": step,
"data": data["data"],
"timestamp": time.time_ns() / 1_000_000_000,
@ -99,30 +107,36 @@ def _try_sync(args, value, sync_url):
asyncio.create_task(_send(sync_url, payload))
def _buffer_text(task_id: str, content: str):
def _buffer_text(run_id: str, content: str):
"""Accumulate decompose_text content in buffer."""
if task_id not in _text_buffers:
_text_buffers[task_id] = ""
_text_buffers[task_id] += content
if run_id not in _text_buffers:
_text_buffers[run_id] = ""
_text_buffers[run_id] += content
def _should_flush(task_id: str) -> bool:
def _should_flush(run_id: str) -> bool:
"""Check if buffer has enough words to flush."""
text = _text_buffers.get(task_id, "")
text = _text_buffers.get(run_id, "")
word_count = len(text.split())
return word_count >= BATCH_WORD_THRESHOLD
def _flush_buffer(task_id: str, sync_url: str):
def _flush_buffer(args, run_id: str, sync_url: str | None):
"""Send buffered text and clear buffer."""
text = _text_buffers.pop(task_id, "")
text = _text_buffers.pop(run_id, "")
if not text:
return
data = {"content": text}
_persist_local_event(args, run_id, "decompose_text", data)
if not sync_url:
return
payload = {
"task_id": task_id,
"task_id": _get_current_task_id(args, data),
"step": "decompose_text",
"data": {"content": text},
"data": data,
"timestamp": time.time_ns() / 1_000_000_000,
}
@ -143,7 +157,18 @@ def _parse_value(value):
return None
def _get_task_id(args):
def _get_run_id(args):
if not args or not hasattr(args[0], "task_id"):
return None
return getattr(args[0], "task_id", None)
def _get_current_task_id(args, payload=None):
payload = payload if isinstance(payload, dict) else {}
if task_id := payload.get("task_id") or payload.get("process_task_id"):
return task_id
if not args or not hasattr(args[0], "task_id"):
return None
@ -162,6 +187,46 @@ def _get_task_id(args):
return chat.task_id
def _persist_local_event(args, run_id: str, step: str, payload):
if not args or not hasattr(args[0], "project_id"):
return
chat = args[0]
try:
SQLiteTranscriptStore.append_event(
get_event_db_path(),
run_id=run_id,
project_id=chat.project_id,
task_id=_get_current_task_id(args, payload),
event_type=step,
payload=payload,
source="eigent_sse",
agent_id=_extract_agent_id(payload),
agent_name=_extract_agent_name(payload),
)
except Exception as e:
logger.warning(
f"Failed to persist local step event {step}: "
f"{type(e).__name__}: {e}"
)
def _extract_agent_id(payload) -> str | None:
if not isinstance(payload, dict):
return None
return (
payload.get("agent_id")
or payload.get("assignee_id")
or payload.get("worker_id")
)
def _extract_agent_name(payload) -> str | None:
if not isinstance(payload, dict):
return None
return payload.get("agent_name") or payload.get("role")
async def _send(url, data):
try:
async with httpx.AsyncClient(timeout=5.0) as client:

View file

@ -137,6 +137,21 @@ class TestAppendAndRead:
assert ev["run_id"] == "run-001"
assert ev["project_id"] == "proj-001"
def test_append_event_for_sse_payload(self, db_path: Path):
event = SQLiteTranscriptStore.append_event(
db_path,
run_id="run-sse",
project_id="proj-001",
task_id="task-001",
event_type="confirmed",
payload={"question": "Summarize the local task"},
source="eigent_sse",
)
assert event["run_id"] == "run-sse"
assert event["event_type"] == "confirmed"
assert event["payload"]["question"] == "Summarize the local task"
class TestSequencing:
def test_seq_starts_at_zero(self, store: SQLiteTranscriptStore):
@ -310,6 +325,35 @@ class TestStaticQueries:
s1.close()
s2.close()
def test_query_runs_extracts_question_summary_and_sync_status(
self, db_path: Path
):
SQLiteTranscriptStore.append_event(
db_path,
run_id="run-1",
project_id="proj-1",
task_id="run-1",
event_type="confirmed",
payload={"question": "Write a changelog"},
source="eigent_sse",
)
SQLiteTranscriptStore.append_event(
db_path,
run_id="run-1",
project_id="proj-1",
task_id="run-1",
event_type="end",
payload="<summary>Done</summary>",
source="eigent_sse",
)
runs = SQLiteTranscriptStore.query_runs(db_path)
assert len(runs) == 1
assert runs[0]["question"] == "Write a changelog"
assert runs[0]["summary"] == "Done"
assert runs[0]["status"] == 2
assert runs[0]["sync_status"] == "local"
def test_query_projects(self, db_path: Path):
s1 = SQLiteTranscriptStore(
path=db_path, run_id="r1", project_id="proj-A"
@ -331,6 +375,44 @@ class TestStaticQueries:
s1.close()
s2.close()
def test_query_playback_steps_filters_to_eigent_sse(
self, db_path: Path
):
SQLiteTranscriptStore.append_event(
db_path,
run_id="run-1",
project_id="proj-1",
task_id="task-1",
event_type="confirmed",
payload={"question": "Use local replay"},
source="eigent_sse",
)
SQLiteTranscriptStore.append_event(
db_path,
run_id="run-1",
project_id="proj-1",
task_id="task-1",
event_type="task_created",
payload={"description": "raw camel event"},
source="camel",
)
SQLiteTranscriptStore.append_event(
db_path,
run_id="run-1",
project_id="proj-1",
task_id="task-1",
event_type="end",
payload="<summary>Done</summary>",
source="eigent_sse",
)
steps = SQLiteTranscriptStore.query_playback_steps(
db_path, run_id="run-1"
)
assert [step["step"] for step in steps] == ["confirmed", "end"]
assert steps[0]["data"]["question"] == "Use local replay"
assert steps[1]["data"] == "<summary>Done</summary>"
def test_query_events_empty_db(self, tmp_path: Path):
"""Query on a non-existent DB path should handle gracefully."""
db_path = tmp_path / "nonexistent.db"

View file

@ -2132,11 +2132,7 @@ function registerIpcHandlers() {
});
// ==================== read global env handler ====================
const ALLOWED_GLOBAL_ENV_KEYS = new Set([
'HTTP_PROXY',
'HTTPS_PROXY',
'CLOUD_SYNC_ENABLED',
]);
const ALLOWED_GLOBAL_ENV_KEYS = new Set(['HTTP_PROXY', 'HTTPS_PROXY']);
ipcMain.handle('read-global-env', async (_event, key: string) => {
if (!ALLOWED_GLOBAL_ENV_KEYS.has(key)) {
log.warn(`[ENV] Blocked read of disallowed global env key: ${key}`);
@ -2145,36 +2141,6 @@ function registerIpcHandlers() {
return { value: readGlobalEnvKey(key) };
});
// ==================== cloud sync toggle ====================
ipcMain.handle('get-cloud-sync', async () => {
const value = readGlobalEnvKey('CLOUD_SYNC_ENABLED');
return { enabled: value?.toLowerCase() === 'true' };
});
ipcMain.handle('set-cloud-sync', async (_event, enabled: boolean) => {
const GLOBAL_ENV_PATH = path.join(os.homedir(), '.eigent', '.env');
let content = '';
try {
content = fs.existsSync(GLOBAL_ENV_PATH)
? fs.readFileSync(GLOBAL_ENV_PATH, 'utf-8')
: '';
} catch (error) {
log.error('set-cloud-sync read error:', error);
}
let lines = content.split(/\r?\n/);
lines = updateEnvBlock(lines, {
CLOUD_SYNC_ENABLED: enabled ? 'true' : 'false',
});
try {
fs.writeFileSync(GLOBAL_ENV_PATH, lines.join('\n'), 'utf-8');
log.info(`[SYNC] Cloud sync ${enabled ? 'enabled' : 'disabled'}`);
} catch (error) {
log.error('set-cloud-sync write error:', error);
return { success: false };
}
return { success: true };
});
// ==================== new window handler ====================
ipcMain.handle('open-win', (_, arg) => {
const childWindow = new BrowserWindow({

View file

@ -201,10 +201,6 @@ contextBridge.exposeInMainWorld('electronAPI', {
ipcRenderer.invoke('skill-config-update', userId, skillName, skillConfig),
skillConfigDelete: (userId: string, skillName: string) =>
ipcRenderer.invoke('skill-config-delete', userId, skillName),
// Cloud sync toggle
getCloudSync: () => ipcRenderer.invoke('get-cloud-sync'),
setCloudSync: (enabled: boolean) =>
ipcRenderer.invoke('set-cloud-sync', enabled),
});
// --------- Preload scripts loading ---------

View file

@ -1,205 +0,0 @@
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
"""Workflow Event controller for batch ingestion and read access.
Receives canonical events from the local sync worker and provides
read endpoints for cloud-based history queries.
"""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import Session, select
from app.core.database import session
from app.model.chat.workflow_event import (
BatchIngestRequest,
BatchIngestResponse,
WorkflowEvent,
WorkflowEventOut,
)
from app.shared.auth import auth_must
router = APIRouter(prefix="/chat", tags=["V1 Workflow Events"])
@router.post(
"/events/batch",
name="batch ingest workflow events",
response_model=BatchIngestResponse,
)
async def batch_ingest(
body: BatchIngestRequest,
db_session: Session = Depends(session),
auth=Depends(auth_must),
):
"""Idempotent batch ingestion of canonical events.
- Duplicate ``event_id`` is silently skipped (idempotent).
- Conflicting ``(run_id, seq)`` with a different
``event_id`` is rejected.
"""
accepted: list[str] = []
rejected: list[dict] = []
if not body.events:
return BatchIngestResponse(
accepted=accepted, rejected=rejected
)
# Preflight: find existing event_ids to skip duplicates
incoming_ids = [e.event_id for e in body.events]
existing_rows = db_session.exec(
select(WorkflowEvent.event_id).where(
WorkflowEvent.event_id.in_(incoming_ids)
)
).all()
existing_ids = set(existing_rows)
for event_in in body.events:
if event_in.event_id in existing_ids:
# Already ingested -- idempotent skip
accepted.append(event_in.event_id)
continue
try:
db_event = WorkflowEvent(
event_id=event_in.event_id,
run_id=event_in.run_id,
task_id=event_in.task_id,
project_id=event_in.project_id,
user_id=auth.user.id,
seq=event_in.seq,
event_type=event_in.event_type,
occurred_at=event_in.occurred_at,
source=event_in.source,
agent_id=event_in.agent_id,
agent_name=event_in.agent_name,
schema_version=event_in.schema_version,
payload=event_in.payload,
)
db_session.add(db_event)
db_session.flush()
accepted.append(event_in.event_id)
except Exception as e:
db_session.rollback()
error_msg = str(e)
if "uq_workflow_event_run_seq" in error_msg:
rejected.append(
{
"event_id": event_in.event_id,
"reason": (
f"Conflicting (run_id="
f"{event_in.run_id}, "
f"seq={event_in.seq})"
),
}
)
else:
rejected.append(
{
"event_id": event_in.event_id,
"reason": error_msg[:200],
}
)
# Commit all accepted events
try:
db_session.commit()
except Exception as e:
db_session.rollback()
return BatchIngestResponse(
accepted=[],
rejected=[
{
"event_id": eid,
"reason": str(e)[:200],
}
for eid in incoming_ids
],
)
return BatchIngestResponse(
accepted=accepted, rejected=rejected
)
@router.get(
"/events",
name="list workflow events",
response_model=List[WorkflowEventOut],
)
async def list_events(
run_id: Optional[str] = Query(None),
task_id: Optional[str] = Query(None),
project_id: Optional[str] = Query(None),
after_seq: Optional[int] = Query(None),
limit: int = Query(200, ge=1, le=1000),
db_session: Session = Depends(session),
auth=Depends(auth_must),
):
"""Read events filtered by run_id, task_id, or project_id.
Ownership-checked: only returns events belonging to the
authenticated user.
"""
if not any([run_id, task_id, project_id]):
raise HTTPException(
status_code=400,
detail=(
"At least one of run_id, task_id, "
"or project_id is required"
),
)
query = select(WorkflowEvent).where(
WorkflowEvent.user_id == auth.user.id
)
if run_id:
query = query.where(WorkflowEvent.run_id == run_id)
if task_id:
query = query.where(WorkflowEvent.task_id == task_id)
if project_id:
query = query.where(
WorkflowEvent.project_id == project_id
)
if after_seq is not None:
query = query.where(WorkflowEvent.seq > after_seq)
query = query.order_by(
WorkflowEvent.run_id, WorkflowEvent.seq
).limit(limit)
return list(db_session.exec(query).all())
@router.get(
"/events/{event_id}",
name="get workflow event",
response_model=WorkflowEventOut,
)
async def get_event(
event_id: str,
db_session: Session = Depends(session),
auth=Depends(auth_must),
):
"""Get a single event by event_id. Ownership-checked."""
event = db_session.get(WorkflowEvent, event_id)
if not event or event.user_id != auth.user.id:
raise HTTPException(
status_code=404, detail="Event not found"
)
return event

View file

@ -1,149 +0,0 @@
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
"""Append-only workflow event model for the canonical event log.
This table stores raw events synced from local SQLite databases.
It is NOT a replacement for ChatStep -- both coexist. ChatStep
serves real-time UI playback; WorkflowEvent is the durable
canonical history.
"""
import json
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, field_validator
from sqlalchemy import TIMESTAMP, UniqueConstraint, text
from sqlmodel import Field, JSON
from app.model.abstract.model import AbstractModel
class WorkflowEvent(AbstractModel, table=True):
"""Append-only event from agent execution.
Does NOT inherit DefaultTimes because events are immutable --
no updated_at or deleted_at.
"""
__table_args__ = (
UniqueConstraint(
"run_id", "seq", name="uq_workflow_event_run_seq"
),
)
event_id: str = Field(primary_key=True)
run_id: str = Field(index=True)
task_id: Optional[str] = Field(default=None, index=True)
project_id: str = Field(index=True)
user_id: int = Field(index=True)
seq: int
event_type: str
occurred_at: str # ISO-8601 from the local client
source: str = Field(default="camel")
agent_id: Optional[str] = Field(default=None)
agent_name: Optional[str] = Field(default=None)
schema_version: int = Field(default=1)
payload: str = Field(sa_type=JSON, default="{}")
ingested_at: Optional[datetime] = Field(
default_factory=datetime.now,
sa_type=TIMESTAMP,
sa_column_kwargs={
"server_default": text("CURRENT_TIMESTAMP")
},
)
@field_validator("payload", mode="before")
@classmethod
def serialize_payload(cls, v):
if isinstance(v, (dict, list)):
return json.dumps(v, ensure_ascii=False)
return v
@field_validator("payload", mode="after")
@classmethod
def deserialize_payload(cls, v):
if isinstance(v, str):
try:
return json.loads(v)
except Exception:
return v
return v
# -------------------------------------------------------------------
# Pydantic companion schemas
# -------------------------------------------------------------------
class WorkflowEventIn(BaseModel):
"""Ingestion payload for a single event (from sync worker)."""
event_id: str
run_id: str
task_id: Optional[str] = None
project_id: str
seq: int
event_type: str
occurred_at: str
source: str = "camel"
agent_id: Optional[str] = None
agent_name: Optional[str] = None
schema_version: int = 1
payload: Any = {}
class WorkflowEventOut(BaseModel):
"""Read response for a single event."""
event_id: str
run_id: str
task_id: Optional[str] = None
project_id: str
user_id: int
seq: int
event_type: str
occurred_at: str
source: str
agent_id: Optional[str] = None
agent_name: Optional[str] = None
schema_version: int
payload: Any
ingested_at: Optional[datetime] = None
class Config:
from_attributes = True
class BatchIngestRequest(BaseModel):
"""Batch ingestion request body."""
events: list[WorkflowEventIn]
@field_validator("events")
@classmethod
def limit_batch_size(cls, v):
if len(v) > 200:
raise ValueError(
"Batch size must not exceed 200 events"
)
return v
class BatchIngestResponse(BaseModel):
"""Batch ingestion response body."""
accepted: list[str] = [] # event_ids that were ingested
rejected: list[dict[str, Any]] = [] # {event_id, reason}

View file

@ -28,6 +28,9 @@ import { ChatTaskStatus } from '@/types/constants';
import { ProjectGroup as ProjectGroupType } from '@/types/history';
import { motion } from 'framer-motion';
import {
Cloud,
CloudAlertIcon,
CloudOff,
Edit,
Hash,
Loader2,
@ -438,6 +441,28 @@ export default function ProjectGroup({
<span>{project.total_triggers || 0}</span>
</Tag>
</TooltipSimple>
{project.sync_status === 'synced' && (
<TooltipSimple content="Synced to cloud">
<Tag variant="success" size="sm">
<Cloud />
</Tag>
</TooltipSimple>
)}
{project.sync_status === 'local' && (
<TooltipSimple content="Local only">
<Tag variant="default" size="sm">
<CloudOff />
</Tag>
</TooltipSimple>
)}
{project.sync_status === 'partial' && (
<TooltipSimple content="Partially synced">
<Tag variant="info" size="sm">
<CloudAlertIcon />
</Tag>
</TooltipSimple>
)}
</div>
{/* End: Status and menu */}

View file

@ -29,6 +29,8 @@ import {
CirclePause,
CirclePlay,
Clock,
Cloud,
CloudOff,
Ellipsis,
Hash,
Pin,
@ -138,6 +140,17 @@ export default function TaskItem({
<div className="flex flex-shrink-0 items-center gap-2">
{!isOngoing && getStatusTag(task.status)}
{task.sync_status === 'synced' && (
<Tag variant="success" size="sm">
<Cloud />
</Tag>
)}
{(task.sync_status === 'local' || task.sync_status === 'pending') && (
<Tag variant="default" size="sm">
<CloudOff />
</Tag>
)}
<Tag variant="info" size="sm">
<Hash />
<span>{task.tokens ? task.tokens.toLocaleString() : '0'}</span>

View file

@ -14,7 +14,7 @@
'use client';
import { ScanFace, Search } from 'lucide-react';
import { Cloud, CloudOff, ScanFace, Search } from 'lucide-react';
import { useEffect, useState } from 'react';
import GroupedHistoryView from '@/components/GroupedHistoryView';
@ -31,6 +31,7 @@ import useChatStoreAdapter from '@/hooks/useChatStoreAdapter';
import { loadProjectFromHistory } from '@/lib';
import { fetchHistoryTasks } from '@/service/historyApi';
import { useGlobalStore } from '@/store/globalStore';
import { HistoryTask } from '@/types/history';
import { VisuallyHidden } from '@radix-ui/react-visually-hidden';
import { useTranslation } from 'react-i18next';
import { useNavigate } from 'react-router-dom';
@ -40,7 +41,7 @@ import { DialogTitle } from './ui/dialog';
export function SearchHistoryDialog() {
const { t } = useTranslation();
const [open, setOpen] = useState(false);
const [historyTasks, setHistoryTasks] = useState<any[]>([]);
const [historyTasks, setHistoryTasks] = useState<HistoryTask[]>([]);
const { history_type } = useGlobalStore();
//Get Chatstore for the active project's task
const { chatStore, projectStore } = useChatStoreAdapter();
@ -139,8 +140,17 @@ export function SearchHistoryDialog() {
}
>
<ScanFace />
<div className="overflow-hidden text-ellipsis whitespace-nowrap">
{task.question}
<div className="flex min-w-0 items-center gap-2">
{task.sync_status === 'synced' && (
<Cloud className="h-3.5 w-3.5 text-text-information" />
)}
{(task.sync_status === 'local' ||
task.sync_status === 'pending') && (
<CloudOff className="h-3.5 w-3.5 text-text-label" />
)}
<div className="overflow-hidden text-ellipsis whitespace-nowrap">
{task.question}
</div>
</div>
</CommandItem>
))}

View file

@ -13,28 +13,139 @@
// ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
import { proxyFetchGet } from '@/api/http';
import { fetchLocalRuns, RunSummary } from '@/service/localEventApi';
import { HistoryTask, ProjectGroup } from '@/types/history';
// Group tasks by project_id and add project-level metadata
const groupTasksByProject = (tasks: HistoryTask[]): ProjectGroup[] => {
const normalizeTask = (task: Partial<HistoryTask>): HistoryTask => ({
id: task.id ?? '',
task_id: task.task_id ?? '',
project_id: task.project_id ?? task.task_id ?? '',
question: task.question ?? '',
language: task.language ?? 'local',
model_platform: task.model_platform ?? 'local',
model_type: task.model_type ?? 'local',
api_key: task.api_key ?? '',
api_url: task.api_url,
max_retries: task.max_retries ?? 0,
file_save_path: task.file_save_path,
installed_mcp: task.installed_mcp,
project_name: task.project_name,
summary: task.summary,
tokens: task.tokens ?? 0,
status: task.status ?? 1,
created_at: task.created_at,
updated_at: task.updated_at,
sync_status: task.sync_status,
});
const localRunToTask = (run: RunSummary): HistoryTask =>
normalizeTask({
id: run.id || `local:${run.run_id}`,
task_id: run.task_id || run.run_id,
project_id: run.project_id,
question: run.question || 'Untitled task',
summary: run.summary,
status: run.status ?? 1,
created_at: run.first_event,
updated_at: run.last_event,
sync_status: run.sync_status,
});
const normalizeCloudTask = (task: HistoryTask): HistoryTask =>
normalizeTask({
...task,
sync_status: task.sync_status ?? 'synced',
});
const sortTasks = (tasks: HistoryTask[]): HistoryTask[] =>
tasks.sort((a, b) => {
const dateA = new Date(a.updated_at || a.created_at || 0).getTime();
const dateB = new Date(b.updated_at || b.created_at || 0).getTime();
return dateB - dateA;
});
const mergeTasks = (
localRuns: RunSummary[],
cloudTasks: HistoryTask[]
): HistoryTask[] => {
const taskMap = new Map<string, HistoryTask>();
cloudTasks.forEach((task) => {
taskMap.set(task.task_id, normalizeCloudTask(task));
});
localRuns.map(localRunToTask).forEach((localTask) => {
const cloudTask = taskMap.get(localTask.task_id);
if (!cloudTask) {
taskMap.set(localTask.task_id, localTask);
return;
}
taskMap.set(
localTask.task_id,
normalizeTask({
...cloudTask,
...localTask,
id: cloudTask.id ?? localTask.id,
project_name: cloudTask.project_name ?? localTask.project_name,
language: cloudTask.language || localTask.language,
model_platform: cloudTask.model_platform || localTask.model_platform,
model_type: cloudTask.model_type || localTask.model_type,
max_retries: cloudTask.max_retries ?? localTask.max_retries,
tokens: cloudTask.tokens ?? localTask.tokens,
question: localTask.question || cloudTask.question,
summary: localTask.summary || cloudTask.summary,
created_at: localTask.created_at || cloudTask.created_at,
updated_at: localTask.updated_at || cloudTask.updated_at,
sync_status: localTask.sync_status,
})
);
});
return sortTasks(Array.from(taskMap.values()));
};
const getProjectSyncStatus = (
tasks: HistoryTask[]
): ProjectGroup['sync_status'] => {
const statuses = new Set(tasks.map((task) => task.sync_status || 'synced'));
if (statuses.size === 1 && statuses.has('synced')) {
return 'synced';
}
if (statuses.size === 1 && statuses.has('local')) {
return 'local';
}
return 'partial';
};
const groupTasksByProject = (
tasks: HistoryTask[],
projectMetadata = new Map<string, Partial<ProjectGroup>>()
): ProjectGroup[] => {
const projectMap = new Map<string, ProjectGroup>();
tasks.forEach((task) => {
const projectId = task.project_id;
const metadata = projectMetadata.get(projectId);
if (!projectMap.has(projectId)) {
projectMap.set(projectId, {
project_id: projectId,
project_name: task.project_name || `Project ${projectId}`,
project_name:
task.project_name ||
metadata?.project_name ||
`Project ${projectId}`,
total_tokens: 0,
task_count: 0,
total_triggers: 0,
latest_task_date: task.created_at || new Date().toISOString(),
total_triggers: metadata?.total_triggers || 0,
latest_task_date:
task.updated_at || task.created_at || new Date().toISOString(),
tasks: [],
total_completed_tasks: 0,
total_ongoing_tasks: 0,
average_tokens_per_task: 0,
last_prompt: task.question || '',
last_prompt: task.question || metadata?.last_prompt || '',
sync_status: 'local',
});
}
@ -43,37 +154,31 @@ const groupTasksByProject = (tasks: HistoryTask[]): ProjectGroup[] => {
project.task_count++;
project.total_tokens += task.tokens || 0;
// ChatStatus enum: ongoing = 1, done = 2
if (task.status === 2) {
// ChatStatus.done (completed)
project.total_completed_tasks++;
} else if (task.status === 1) {
// ChatStatus.ongoing (pending/running etc..)
project.total_ongoing_tasks++;
}
// Update latest task date
if (task.created_at && task.created_at > project.latest_task_date) {
project.latest_task_date = task.created_at;
const candidateDate =
task.updated_at || task.created_at || project.latest_task_date;
if (candidateDate >= project.latest_task_date) {
project.latest_task_date = candidateDate;
project.last_prompt = task.question || project.last_prompt;
project.project_name =
task.project_name || metadata?.project_name || project.project_name;
}
});
// Calculate averages and sort tasks within each project
projectMap.forEach((project) => {
project.average_tokens_per_task =
project.task_count > 0
? Math.round(project.total_tokens / project.task_count)
: 0;
// Sort tasks by creation date (newest first)
project.tasks.sort((a, b) => {
const dateA = new Date(a.created_at || 0).getTime();
const dateB = new Date(b.created_at || 0).getTime();
return dateB - dateA;
});
project.sync_status = getProjectSyncStatus(project.tasks);
project.tasks = sortTasks(project.tasks);
});
// Convert to array and sort by latest task date (newest first)
return Array.from(projectMap.values()).sort((a, b) => {
const dateA = new Date(a.latest_task_date).getTime();
const dateB = new Date(b.latest_task_date).getTime();
@ -81,81 +186,84 @@ const groupTasksByProject = (tasks: HistoryTask[]): ProjectGroup[] => {
});
};
export const fetchHistoryTasks = async (
setTasks: React.Dispatch<React.SetStateAction<any[]>>
) => {
const fetchCloudHistoryTasks = async (): Promise<HistoryTask[]> => {
try {
const res = await proxyFetchGet(`/api/v1/chat/histories`);
setTasks(res.items);
return Array.isArray(res?.items)
? res.items.map((task: HistoryTask) => normalizeCloudTask(task))
: [];
} catch (error) {
console.error('Failed to fetch cloud history tasks:', error);
return [];
}
};
const fetchCloudGroupedProjects = async (): Promise<ProjectGroup[]> => {
try {
const res = await proxyFetchGet(
`/api/v1/chat/histories/grouped?include_tasks=true`
);
return Array.isArray(res?.projects) ? res.projects : [];
} catch (error) {
console.error('Failed to fetch grouped cloud history tasks:', error);
return [];
}
};
export const fetchHistoryTasks = async (
setTasks: React.Dispatch<React.SetStateAction<HistoryTask[]>>
) => {
try {
const [localRuns, cloudTasks] = await Promise.all([
fetchLocalRuns(),
fetchCloudHistoryTasks(),
]);
setTasks(mergeTasks(localRuns, cloudTasks));
} catch (error) {
console.error('Failed to fetch history tasks:', error);
setTasks([]);
}
};
// New function to fetch grouped history tasks from the backend endpoint
export const fetchGroupedHistoryTasks = async (
setProjects: React.Dispatch<React.SetStateAction<ProjectGroup[]>>
) => {
try {
const res = await proxyFetchGet(
`/api/v1/chat/histories/grouped?include_tasks=true`
const [localRuns, cloudProjects] = await Promise.all([
fetchLocalRuns(),
fetchCloudGroupedProjects(),
]);
const cloudTasks = cloudProjects.flatMap((project) => project.tasks || []);
const mergedTasks = mergeTasks(localRuns, cloudTasks);
const metadata = new Map(
cloudProjects.map((project) => [
project.project_id,
{
project_name: project.project_name,
total_triggers: project.total_triggers,
last_prompt: project.last_prompt,
},
])
);
// If the response doesn't have projects field, fall back to legacy grouping
if (!res || !res.projects) {
await fetchGroupedHistoryTasksLegacy(setProjects);
} else {
setProjects(res.projects);
}
} catch (error) {
console.error(
'Failed to fetch grouped history tasks, falling back to legacy:',
error
);
// Fall back to legacy grouping if the new endpoint fails
await fetchGroupedHistoryTasksLegacy(setProjects);
}
};
// Function to fetch grouped history summaries only (without individual tasks for better performance)
export const fetchGroupedHistorySummaries = async (
setProjects: React.Dispatch<React.SetStateAction<ProjectGroup[]>>
) => {
try {
const res = await proxyFetchGet(
`/api/v1/chat/histories/grouped?include_tasks=false`
);
// If the response doesn't have projects field, fall back to legacy grouping
if (!res || !res.projects) {
await fetchGroupedHistoryTasksLegacy(setProjects);
} else {
setProjects(res.projects);
}
} catch (error) {
console.error(
'Failed to fetch grouped history summaries, falling back to legacy:',
error
);
// Fall back to legacy grouping if the new endpoint fails
await fetchGroupedHistoryTasksLegacy(setProjects);
}
};
// Legacy function for backward compatibility - groups on frontend
export const fetchGroupedHistoryTasksLegacy = async (
setProjects: React.Dispatch<React.SetStateAction<ProjectGroup[]>>
) => {
try {
const res = await proxyFetchGet(`/api/v1/chat/histories`);
const groupedProjects = groupTasksByProject(res.items);
setProjects(groupedProjects);
setProjects(groupTasksByProject(mergedTasks, metadata));
} catch (error) {
console.error('Failed to fetch grouped history tasks:', error);
setProjects([]);
}
};
// Utility function to get all tasks from grouped data (for backward compatibility)
export const fetchGroupedHistorySummaries = async (
setProjects: React.Dispatch<React.SetStateAction<ProjectGroup[]>>
) => {
await fetchGroupedHistoryTasks(setProjects);
};
export const fetchGroupedHistoryTasksLegacy = async (
setProjects: React.Dispatch<React.SetStateAction<ProjectGroup[]>>
) => {
await fetchGroupedHistoryTasks(setProjects);
};
export const flattenProjectTasks = (
projects: ProjectGroup[]
): HistoryTask[] => {

View file

@ -12,20 +12,8 @@
// limitations under the License.
// ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
/**
* Local event log API client.
*
* Reads from the local Python backend's SQLite event log endpoints.
* These calls go to localhost (no cloud dependency) and are used by
* the frontend's local-first data loading path.
*/
import { fetchGet } from '@/api/http';
// -----------------------------------------------------------------------
// Types
// -----------------------------------------------------------------------
export interface EventEnvelope {
event_id: string;
run_id: string;
@ -38,36 +26,46 @@ export interface EventEnvelope {
agent_id: string | null;
agent_name: string | null;
schema_version: number;
payload: Record<string, any>;
payload: Record<string, any> | string;
synced_at: string | null;
sync_attempts: number;
}
export interface RunSummary {
id: string;
run_id: string;
project_id: string;
task_id: string | null;
event_count: number;
first_event: string;
last_event: string;
question: string;
summary: string;
status: number;
synced_events: number;
sync_status: 'local' | 'synced' | 'pending';
}
export interface ProjectSummary {
project_id: string;
run_count: number;
task_count: number;
event_count: number;
first_event: string;
last_event: string;
last_prompt: string;
total_completed_tasks: number;
total_ongoing_tasks: number;
sync_status: 'local' | 'synced' | 'partial';
}
// -----------------------------------------------------------------------
// API calls (hit local backend at localhost:{backendPort})
// -----------------------------------------------------------------------
export interface PlaybackStep {
step: string;
data: Record<string, any> | string;
task_id: string | null;
created_at: string;
}
/**
* Fetch events from the local event log.
* At least one of runId, taskId, or projectId must be provided.
*/
export const fetchLocalEvents = async (params: {
runId?: string;
taskId?: string;
@ -91,9 +89,6 @@ export const fetchLocalEvents = async (params: {
}
};
/**
* List all runs from the local event log, optionally filtered by project.
*/
export const fetchLocalRuns = async (
projectId?: string
): Promise<RunSummary[]> => {
@ -109,9 +104,6 @@ export const fetchLocalRuns = async (
}
};
/**
* List projects with aggregated stats from the local event log.
*/
export const fetchLocalProjects = async (): Promise<ProjectSummary[]> => {
try {
const res = await fetchGet('/events/projects');
@ -122,10 +114,18 @@ export const fetchLocalProjects = async (): Promise<ProjectSummary[]> => {
}
};
/**
* Check if local events exist for a given run/task.
* Useful for deciding whether to load from local DB vs cloud.
*/
export const fetchLocalPlaybackSteps = async (
runId: string
): Promise<PlaybackStep[]> => {
try {
const res = await fetchGet(`/events/steps/${encodeURIComponent(runId)}`);
return Array.isArray(res) ? res : [];
} catch (error) {
console.debug('[localEventApi] fetchLocalPlaybackSteps failed:', error);
return [];
}
};
export const hasLocalEvents = async (params: {
runId?: string;
taskId?: string;

View file

@ -26,6 +26,7 @@ import {
import { showCreditsToast } from '@/components/Toast/creditsToast';
import { showStorageToast } from '@/components/Toast/storageToast';
import { generateUniqueId, uploadLog } from '@/lib';
import { fetchLocalPlaybackSteps } from '@/service/localEventApi';
import { proxyUpdateTriggerExecution } from '@/service/triggerApi';
import { ExecutionStatus } from '@/types';
import {
@ -94,6 +95,7 @@ export interface ChatStore {
setStatus: (taskId: string, status: ChatTaskStatusType) => void;
setActiveTaskId: (taskId: string) => void;
replay: (taskId: string, question: string, time: number) => Promise<void>;
loadHistoryTask: (taskId: string, question: string) => Promise<void>;
startTask: (
taskId: string,
type?: string,
@ -205,6 +207,9 @@ const normalizeToolkitMessage = (value: unknown) => {
}
};
const sleep = (ms: number) =>
new Promise((resolve) => setTimeout(resolve, ms));
/** Persist subtask edits to backend via PUT /task/{project_id}. */
const persistSubtaskEdits = (taskInfo: TaskInfo[]) => {
const projectId = useProjectStore.getState().activeProjectId;
@ -585,20 +590,25 @@ const chatStore = (initial?: Partial<ChatStore>) =>
const base_Url = import.meta.env.DEV
? import.meta.env.VITE_PROXY_URL
: import.meta.env.VITE_BASE_URL;
const localPlaybackSteps =
type === 'replay' ? await fetchLocalPlaybackSteps(newTaskId) : [];
const hasLocalPlayback =
type === 'replay' && localPlaybackSteps.length > 0;
const api =
type == 'share'
? `${base_Url}/api/v1/chat/share/playback/${shareToken}?delay_time=${delayTime}`
: type == 'replay'
? `${base_Url}/api/v1/chat/steps/playback/${newTaskId}?delay_time=${delayTime}`
? hasLocalPlayback
? ''
: `${base_Url}/api/v1/chat/steps/playback/${newTaskId}?delay_time=${delayTime}`
: `${baseURL}/chat`;
const { tasks: _tasks } = get();
let historyId: string | null = projectStore.getHistoryId(project_id);
let snapshots: any = [];
let snapshots: any[] = [];
let skipFirstConfirm = true;
// replay or share request
if (type) {
// Cloud replay/share can reuse previously uploaded snapshots.
if (type && !hasLocalPlayback) {
const res = await proxyFetchGet(`/api/v1/chat/snapshots`, {
api_task_id: taskId,
});
@ -619,99 +629,101 @@ const chatStore = (initial?: Partial<ChatStore>) =>
api_url: '',
extra_params: {},
};
if (modelType === 'custom' || modelType === 'local') {
const res = await proxyFetchGet('/api/v1/providers', {
prefer: true,
});
const providerList = res.items || [];
console.log('providerList', providerList);
const provider = providerList[0];
if (!provider) {
throw new Error(
'No model provider configured. Please go to Agents > Models and configure at least one model provider as default.'
);
}
apiModel = {
api_key: provider.api_key,
model_type: provider.model_type,
model_platform: provider.provider_name,
api_url: provider.endpoint_url || provider.api_url,
extra_params: provider.encrypted_config,
};
} else if (modelType === 'cloud') {
// get current model
const res = await proxyFetchGet('/api/v1/user/key');
if (res.warning_code && res.warning_code === '21') {
showStorageToast();
}
apiModel = {
api_key: res.value,
model_type: cloud_model_type,
model_platform: cloud_model_type.includes('gpt')
? 'openai'
: cloud_model_type.includes('claude')
? 'aws-bedrock'
: cloud_model_type.includes('gemini')
? 'gemini'
: 'openai-compatible-model',
api_url: res.api_url,
extra_params: {},
};
}
// Get search engine configuration for custom mode
let searchConfig: Record<string, string> = {};
if (modelType === 'custom') {
try {
const configsRes = await proxyFetchGet('/api/v1/configs');
const configs = Array.isArray(configsRes) ? configsRes : [];
// Extract Google Search API keys
const googleApiKey = configs.find(
(c: any) =>
c.config_group?.toLowerCase() === 'search' &&
c.config_name === 'GOOGLE_API_KEY'
)?.config_value;
const searchEngineId = configs.find(
(c: any) =>
c.config_group?.toLowerCase() === 'search' &&
c.config_name === 'SEARCH_ENGINE_ID'
)?.config_value;
if (googleApiKey && searchEngineId) {
searchConfig = {
GOOGLE_API_KEY: googleApiKey,
SEARCH_ENGINE_ID: searchEngineId,
};
console.log('Loaded custom search configuration');
}
} catch (error) {
console.error('Failed to load search configuration:', error);
}
}
const addWorkers = workerList.map((worker) => {
return {
name: worker.workerInfo?.name,
description: worker.workerInfo?.description,
tools: worker.workerInfo?.tools,
mcp_tools: worker.workerInfo?.mcp_tools,
};
});
// get env path
let addWorkers: Array<{
name: string;
description: string;
tools: any;
mcp_tools: any;
}> = [];
let envPath = '';
try {
envPath = await window.ipcRenderer.invoke('get-env-path', email);
} catch (error) {
console.log('get-env-path error', error);
}
let browser_port = 0;
let cdp_browsers: any[] = [];
// create history
if (!type) {
if (modelType === 'custom' || modelType === 'local') {
const res = await proxyFetchGet('/api/v1/providers', {
prefer: true,
});
const providerList = res.items || [];
console.log('providerList', providerList);
const provider = providerList[0];
if (!provider) {
throw new Error(
'No model provider configured. Please go to Agents > Models and configure at least one model provider as default.'
);
}
apiModel = {
api_key: provider.api_key,
model_type: provider.model_type,
model_platform: provider.provider_name,
api_url: provider.endpoint_url || provider.api_url,
extra_params: provider.encrypted_config,
};
} else if (modelType === 'cloud') {
const res = await proxyFetchGet('/api/v1/user/key');
if (res.warning_code && res.warning_code === '21') {
showStorageToast();
}
apiModel = {
api_key: res.value,
model_type: cloud_model_type,
model_platform: cloud_model_type.includes('gpt')
? 'openai'
: cloud_model_type.includes('claude')
? 'aws-bedrock'
: cloud_model_type.includes('gemini')
? 'gemini'
: 'openai-compatible-model',
api_url: res.api_url,
extra_params: {},
};
}
if (modelType === 'custom') {
try {
const configsRes = await proxyFetchGet('/api/v1/configs');
const configs = Array.isArray(configsRes) ? configsRes : [];
const googleApiKey = configs.find(
(c: any) =>
c.config_group?.toLowerCase() === 'search' &&
c.config_name === 'GOOGLE_API_KEY'
)?.config_value;
const searchEngineId = configs.find(
(c: any) =>
c.config_group?.toLowerCase() === 'search' &&
c.config_name === 'SEARCH_ENGINE_ID'
)?.config_value;
if (googleApiKey && searchEngineId) {
searchConfig = {
GOOGLE_API_KEY: googleApiKey,
SEARCH_ENGINE_ID: searchEngineId,
};
console.log('Loaded custom search configuration');
}
} catch (error) {
console.error('Failed to load search configuration:', error);
}
}
addWorkers = workerList.map((worker) => {
return {
name: worker.workerInfo?.name ?? '',
description: worker.workerInfo?.description ?? '',
tools: worker.workerInfo?.tools,
mcp_tools: worker.workerInfo?.mcp_tools,
};
});
try {
envPath = await window.ipcRenderer.invoke('get-env-path', email);
} catch (error) {
console.log('get-env-path error', error);
}
const authStore = getAuthStore();
const obj = {
@ -743,9 +755,9 @@ const chatStore = (initial?: Partial<ChatStore>) =>
if (project_id && historyId)
projectStore.setHistoryId(project_id, historyId);
});
browser_port = await window.ipcRenderer.invoke('get-browser-port');
cdp_browsers = await window.ipcRenderer.invoke('get-cdp-browsers');
}
const browser_port = await window.ipcRenderer.invoke('get-browser-port');
const cdp_browsers = await window.ipcRenderer.invoke('get-cdp-browsers');
// Lock the chatStore reference at the start of SSE session to prevent focus changes
// during active message processing
@ -788,48 +800,7 @@ const chatStore = (initial?: Partial<ChatStore>) =>
lockedTaskId = newTaskId;
};
const ssePromise = fetchEventSource(api, {
method: !type ? 'POST' : 'GET',
openWhenHidden: true,
signal: abortController.signal, // Add abort signal for proper cleanup
headers: {
'Content-Type': 'application/json',
Authorization:
type == 'replay'
? `Bearer ${token}`
: (undefined as unknown as string),
},
body: !type
? JSON.stringify({
project_id: project_id,
task_id: newTaskId,
question:
messageContent ||
targetChatStore.getState().getLastUserMessage()?.content,
model_platform: apiModel.model_platform,
email,
model_type: apiModel.model_type,
api_key: apiModel.api_key,
api_url: apiModel.api_url,
extra_params: apiModel.extra_params,
installed_mcp: { mcpServers: {} },
language: systemLanguage,
allow_local_system: true,
attaches: (
messageAttaches ||
targetChatStore.getState().tasks[newTaskId]?.attaches ||
[]
).map((f) => f.filePath),
summary_prompt: ``,
new_agents: [...addWorkers],
browser_port: browser_port,
cdp_browsers: cdp_browsers,
env_path: envPath,
search_config: searchConfig,
})
: undefined,
async onmessage(event: any) {
const handleIncomingMessage = async (event: any) => {
let agentMessages: AgentMessage;
try {
@ -2502,7 +2473,73 @@ const chatStore = (initial?: Partial<ChatStore>) =>
isConfirm: false,
};
addMessages(currentTaskId, newMessage);
};
if (hasLocalPlayback) {
try {
get().setAttaches(newTaskId, []);
for (let index = 0; index < localPlaybackSteps.length; index++) {
if (abortController.signal.aborted) {
break;
}
await handleIncomingMessage({
data: JSON.stringify(localPlaybackSteps[index]),
});
if (
(delayTime || 0) > 0 &&
index < localPlaybackSteps.length - 1
) {
await sleep((delayTime || 0) * 1000);
}
}
} finally {
delete activeSSEControllers[newTaskId];
}
return;
}
const ssePromise = fetchEventSource(api, {
method: !type ? 'POST' : 'GET',
openWhenHidden: true,
signal: abortController.signal, // Add abort signal for proper cleanup
headers: {
'Content-Type': 'application/json',
Authorization:
type == 'replay'
? `Bearer ${token}`
: (undefined as unknown as string),
},
body: !type
? JSON.stringify({
project_id: project_id,
task_id: newTaskId,
question:
messageContent ||
targetChatStore.getState().getLastUserMessage()?.content,
model_platform: apiModel.model_platform,
email,
model_type: apiModel.model_type,
api_key: apiModel.api_key,
api_url: apiModel.api_url,
extra_params: apiModel.extra_params,
installed_mcp: { mcpServers: {} },
language: systemLanguage,
allow_local_system: true,
attaches: (
messageAttaches ||
targetChatStore.getState().tasks[newTaskId]?.attaches ||
[]
).map((f) => f.filePath),
summary_prompt: ``,
new_agents: [...addWorkers],
browser_port: browser_port,
cdp_browsers: cdp_browsers,
env_path: envPath,
search_config: searchConfig,
})
: undefined,
onmessage: handleIncomingMessage,
async onopen(respond) {
console.log('open', respond);
const { setAttaches, activeTaskId } = get();
@ -2630,14 +2667,7 @@ const chatStore = (initial?: Partial<ChatStore>) =>
addMessages,
startTask,
setActiveTaskId,
handleConfirmTask,
} = get();
//get project id
const project_id = useProjectStore.getState().activeProjectId;
if (!project_id) {
console.error("Can't replay task because no project id provided");
return;
}
create(taskId, 'replay');
setHasMessages(taskId, true);
@ -2649,7 +2679,21 @@ const chatStore = (initial?: Partial<ChatStore>) =>
await startTask(taskId, 'replay', undefined, time);
setActiveTaskId(taskId);
handleConfirmTask(project_id, taskId, 'replay');
},
loadHistoryTask: async (taskId: string, question: string) => {
const { create, setHasMessages, addMessages, startTask, setActiveTaskId } =
get();
create(taskId, 'replay');
setHasMessages(taskId, true);
addMessages(taskId, {
id: generateUniqueId(),
role: 'user',
content: question.split('|')[0],
});
await startTask(taskId, 'replay', undefined, 0);
setActiveTaskId(taskId);
},
setUpdateCount() {
set((state) => ({

View file

@ -679,7 +679,7 @@ const projectStore = create<ProjectStore>()((set, get) => ({
const chatStore = project.chatStores[chatId];
if (chatStore) {
try {
await chatStore.getState().replay(taskId, question, 0);
await chatStore.getState().loadHistoryTask(taskId, question);
console.log(`[ProjectStore] Loaded task ${taskId}`);
} catch (error) {
console.error(

View file

@ -15,7 +15,7 @@
// History API types for project-grouped structure
export interface HistoryTask {
id: number;
id: number | string;
task_id: string;
project_id: string;
question: string;
@ -33,6 +33,8 @@ export interface HistoryTask {
status: number;
created_at?: string;
updated_at?: string;
/** Event sync status: local-only, synced to cloud, or pending sync */
sync_status?: 'local' | 'synced' | 'pending';
}
export interface ProjectGroup {
@ -48,6 +50,8 @@ export interface ProjectGroup {
total_completed_tasks: number;
total_ongoing_tasks: number;
average_tokens_per_task: number;
/** Aggregate sync status: local-only, fully synced, or partial */
sync_status?: 'local' | 'synced' | 'partial';
}
export interface GroupedHistoryResponse {