diff --git a/backend/app/controller/event_controller.py b/backend/app/controller/event_controller.py index 426d4a06..f5b15fd7 100644 --- a/backend/app/controller/event_controller.py +++ b/backend/app/controller/event_controller.py @@ -89,3 +89,13 @@ def list_projects() -> list[dict[str, Any]]: return [] return SQLiteTranscriptStore.query_projects(db_path) + + +@router.get("/events/steps/{run_id}", name="list local playback steps") +def list_playback_steps(run_id: str) -> list[dict[str, Any]]: + """Return saved Eigent SSE steps for a run in playback order.""" + db_path = get_event_db_path() + if not db_path.exists(): + return [] + + return SQLiteTranscriptStore.query_playback_steps(db_path, run_id=run_id) diff --git a/backend/app/event_store/schema.py b/backend/app/event_store/schema.py index c83add0c..603d51da 100644 --- a/backend/app/event_store/schema.py +++ b/backend/app/event_store/schema.py @@ -51,12 +51,21 @@ class EventEnvelope: def to_dict(self) -> dict[str, Any]: data = asdict(self) if isinstance(data["payload"], str): - data["payload"] = json.loads(data["payload"]) + try: + data["payload"] = json.loads(data["payload"]) + except json.JSONDecodeError: + data["payload"] = data["payload"] return data def to_compat_dict(self) -> dict[str, Any]: """Return a dict compatible with CAMEL's TranscriptStore.read_all() format (workforce_id, timestamp keys).""" + payload = self.payload + if isinstance(payload, str): + try: + payload = json.loads(payload) + except json.JSONDecodeError: + pass return { "event_type": self.event_type, "workforce_id": self.run_id, @@ -65,9 +74,7 @@ class EventEnvelope: "agent_id": self.agent_id, "agent_name": self.agent_name, "source": self.source, - "payload": self.payload - if isinstance(self.payload, dict) - else json.loads(self.payload), + "payload": payload, } diff --git a/backend/app/event_store/sqlite_store.py b/backend/app/event_store/sqlite_store.py index a80c406b..80b18fd4 100644 --- a/backend/app/event_store/sqlite_store.py +++ b/backend/app/event_store/sqlite_store.py @@ -44,6 +44,8 @@ class SQLiteTranscriptStore: via the ``transcript_store`` parameter on ``Workforce.__init__``. """ + _write_lock = threading.Lock() + def __init__( self, path: str | Path, @@ -53,7 +55,6 @@ class SQLiteTranscriptStore: self.path = Path(path) self.run_id = run_id self.project_id = project_id - self._lock = threading.Lock() self.path.parent.mkdir(parents=True, exist_ok=True) self._conn = sqlite3.connect( @@ -70,10 +71,14 @@ class SQLiteTranscriptStore: # ------------------------------------------------------------------ def _ensure_schema(self) -> None: - with self._conn: - self._conn.execute(CREATE_EVENT_LOG_TABLE) + self._ensure_schema_on_conn(self._conn) + + @staticmethod + def _ensure_schema_on_conn(conn: sqlite3.Connection) -> None: + with conn: + conn.execute(CREATE_EVENT_LOG_TABLE) for idx_ddl in CREATE_INDEXES: - self._conn.execute(idx_ddl) + conn.execute(idx_ddl) # ------------------------------------------------------------------ # TranscriptStore-compatible interface @@ -87,39 +92,60 @@ class SQLiteTranscriptStore: """ envelope = self._to_envelope(event) - with self._lock: + with self._write_lock: with self._conn: seq = self._next_seq(self._conn) envelope.seq = seq - self._conn.execute( - """\ - INSERT INTO event_log ( - event_id, run_id, project_id, task_id, seq, - event_type, occurred_at, source, - agent_id, agent_name, schema_version, - payload, synced_at, sync_attempts - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - envelope.event_id, - envelope.run_id, - envelope.project_id, - envelope.task_id, - envelope.seq, - envelope.event_type, - envelope.occurred_at, - envelope.source, - envelope.agent_id, - envelope.agent_name, - envelope.schema_version, - json.dumps(envelope.payload, ensure_ascii=False), - envelope.synced_at, - envelope.sync_attempts, - ), - ) + self._insert_envelope(self._conn, envelope) return event.to_dict() + @classmethod + def append_event( + cls, + db_path: str | Path, + *, + run_id: str, + project_id: str, + event_type: str, + payload: Any, + source: str, + task_id: str | None = None, + agent_id: str | None = None, + agent_name: str | None = None, + occurred_at: str | None = None, + ) -> dict[str, Any]: + """Append a non-CAMEL event into the canonical local event log.""" + path = Path(db_path) + path.parent.mkdir(parents=True, exist_ok=True) + + conn = sqlite3.connect(str(path)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL;") + conn.execute("PRAGMA busy_timeout=5000;") + + try: + cls._ensure_schema_on_conn(conn) + envelope = EventEnvelope( + run_id=run_id, + project_id=project_id, + task_id=task_id, + event_type=event_type, + occurred_at=occurred_at or datetime.now(UTC).isoformat(), + source=source, + agent_id=agent_id, + agent_name=agent_name, + schema_version=SCHEMA_VERSION, + payload=payload, + ) + with cls._write_lock: + with conn: + envelope.seq = cls._next_seq_for_run(conn, run_id) + cls._insert_envelope(conn, envelope) + return envelope.to_dict() + finally: + conn.close() + def read_all(self) -> list[dict[str, Any]]: """Read all events for this run, returning dicts in the same format as CAMEL's JSONL ``TranscriptStore``.""" @@ -160,7 +186,7 @@ class SQLiteTranscriptStore: return now = datetime.now(UTC).isoformat() placeholders = ",".join("?" for _ in event_ids) - with self._lock: + with self._write_lock: with self._conn: self._conn.execute( f"UPDATE event_log SET synced_at = ? " # nosec B608 @@ -173,7 +199,7 @@ class SQLiteTranscriptStore: if not event_ids: return placeholders = ",".join("?" for _ in event_ids) - with self._lock: + with self._write_lock: with self._conn: self._conn.execute( f"UPDATE event_log SET sync_attempts = sync_attempts + 1 " # nosec B608 @@ -257,41 +283,87 @@ class SQLiteTranscriptStore: if not table_check: return [] + params: list[Any] = [] + where_clause = "" if project_id: - rows = conn.execute( - """\ - SELECT run_id, project_id, - MIN(task_id) AS task_id, - COUNT(*) AS event_count, - MIN(occurred_at) AS first_event, - MAX(occurred_at) AS last_event - FROM event_log - WHERE project_id = ? - GROUP BY run_id - ORDER BY MAX(occurred_at) DESC - """, - (project_id,), - ).fetchall() - else: - rows = conn.execute( - """\ - SELECT run_id, project_id, - MIN(task_id) AS task_id, - COUNT(*) AS event_count, - MIN(occurred_at) AS first_event, - MAX(occurred_at) AS last_event - FROM event_log - GROUP BY run_id - ORDER BY MAX(occurred_at) DESC - """, - ).fetchall() - return [dict(r) for r in rows] + where_clause = "WHERE project_id = ?" + params.append(project_id) + + rows = conn.execute( + f"""\ + SELECT * + FROM event_log + {where_clause} + ORDER BY run_id, seq + """, + params, + ).fetchall() + events = [SQLiteTranscriptStore._row_to_canonical_dict(r) for r in rows] + return SQLiteTranscriptStore._summarize_runs(events) finally: conn.close() @staticmethod def query_projects(db_path: str | Path) -> list[dict[str, Any]]: """List projects with aggregated stats. Used by the local API.""" + runs = SQLiteTranscriptStore.query_runs(db_path) + project_map: dict[str, dict[str, Any]] = {} + + for run in runs: + project_id = run["project_id"] + project = project_map.setdefault( + project_id, + { + "project_id": project_id, + "run_count": 0, + "task_count": 0, + "event_count": 0, + "first_event": run["first_event"], + "last_event": run["last_event"], + "last_prompt": run["question"], + "total_completed_tasks": 0, + "total_ongoing_tasks": 0, + "sync_status": "local", + }, + ) + project["run_count"] += 1 + project["task_count"] += 1 + project["event_count"] += run["event_count"] + project["first_event"] = min( + project["first_event"], run["first_event"] + ) + if run["last_event"] >= project["last_event"]: + project["last_event"] = run["last_event"] + project["last_prompt"] = run["question"] + if run["status"] == 2: + project["total_completed_tasks"] += 1 + else: + project["total_ongoing_tasks"] += 1 + + project.setdefault("_sync_states", []).append(run["sync_status"]) + + for project in project_map.values(): + sync_states = set(project.pop("_sync_states", [])) + if sync_states == {"synced"}: + project["sync_status"] = "synced" + elif sync_states == {"local"}: + project["sync_status"] = "local" + else: + project["sync_status"] = "partial" + + return sorted( + project_map.values(), + key=lambda item: item["last_event"], + reverse=True, + ) + + @staticmethod + def query_playback_steps( + db_path: str | Path, + *, + run_id: str, + ) -> list[dict[str, Any]]: + """Return saved Eigent SSE steps for a run in playback order.""" conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row try: @@ -303,13 +375,56 @@ class SQLiteTranscriptStore: rows = conn.execute( """\ - SELECT project_id, - COUNT(DISTINCT run_id) AS run_count, - COUNT(*) AS event_count, - MIN(occurred_at) AS first_event, + SELECT event_type, payload, task_id, occurred_at + FROM event_log + WHERE run_id = ? AND source = 'eigent_sse' + ORDER BY seq + """, + (run_id,), + ).fetchall() + + steps: list[dict[str, Any]] = [] + for row in rows: + payload = row["payload"] + if isinstance(payload, str): + payload = json.loads(payload) + steps.append( + { + "step": row["event_type"], + "data": payload, + "task_id": row["task_id"], + "created_at": row["occurred_at"], + } + ) + return steps + finally: + conn.close() + + @staticmethod + def query_sync_status( + db_path: str | Path, + ) -> list[dict[str, Any]]: + """Return sync status per run. Used by the local API.""" + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + table_check = conn.execute( + "SELECT name FROM sqlite_master " + "WHERE type='table' AND name='event_log'" + ).fetchone() + if not table_check: + return [] + + rows = conn.execute( + """\ + SELECT run_id, + project_id, + COUNT(*) AS total_events, + SUM(CASE WHEN synced_at IS NOT NULL + THEN 1 ELSE 0 END) AS synced_events, MAX(occurred_at) AS last_event FROM event_log - GROUP BY project_id + GROUP BY run_id ORDER BY MAX(occurred_at) DESC """, ).fetchall() @@ -322,13 +437,48 @@ class SQLiteTranscriptStore: # ------------------------------------------------------------------ def _next_seq(self, conn: sqlite3.Connection) -> int: + return self._next_seq_for_run(conn, self.run_id) + + @staticmethod + def _next_seq_for_run(conn: sqlite3.Connection, run_id: str) -> int: row = conn.execute( "SELECT COALESCE(MAX(seq), -1) + 1 AS next_seq " "FROM event_log WHERE run_id = ?", - (self.run_id,), + (run_id,), ).fetchone() return row["next_seq"] + @staticmethod + def _insert_envelope( + conn: sqlite3.Connection, envelope: EventEnvelope + ) -> None: + conn.execute( + """\ + INSERT INTO event_log ( + event_id, run_id, project_id, task_id, seq, + event_type, occurred_at, source, + agent_id, agent_name, schema_version, + payload, synced_at, sync_attempts + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + envelope.event_id, + envelope.run_id, + envelope.project_id, + envelope.task_id, + envelope.seq, + envelope.event_type, + envelope.occurred_at, + envelope.source, + envelope.agent_id, + envelope.agent_name, + envelope.schema_version, + json.dumps(envelope.payload, ensure_ascii=False), + envelope.synced_at, + envelope.sync_attempts, + ), + ) + def _to_envelope(self, event: TranscriptEvent) -> EventEnvelope: return EventEnvelope( event_id=str(uuid.uuid4()), @@ -370,3 +520,109 @@ class SQLiteTranscriptStore: if isinstance(data.get("payload"), str): data["payload"] = json.loads(data["payload"]) return data + + @staticmethod + def _summarize_runs(events: list[dict[str, Any]]) -> list[dict[str, Any]]: + run_map: dict[str, list[dict[str, Any]]] = {} + for event in events: + run_map.setdefault(event["run_id"], []).append(event) + + summaries = [ + SQLiteTranscriptStore._summarize_run(run_events) + for run_events in run_map.values() + if run_events + ] + return sorted( + summaries, + key=lambda item: item["last_event"], + reverse=True, + ) + + @staticmethod + def _summarize_run(events: list[dict[str, Any]]) -> dict[str, Any]: + first_event = events[0] + last_event = events[-1] + synced_events = sum(1 for event in events if event["synced_at"]) + total_events = len(events) + + if synced_events == total_events and total_events > 0: + sync_status = "synced" + elif synced_events == 0: + sync_status = "local" + else: + sync_status = "pending" + + return { + "id": f"local:{first_event['run_id']}", + "run_id": first_event["run_id"], + "project_id": first_event["project_id"], + "task_id": first_event["run_id"], + "event_count": total_events, + "first_event": first_event["occurred_at"], + "last_event": last_event["occurred_at"], + "question": SQLiteTranscriptStore._extract_question(events), + "summary": SQLiteTranscriptStore._extract_summary(events), + "status": 2 + if SQLiteTranscriptStore._is_run_complete(events) + else 1, + "synced_events": synced_events, + "sync_status": sync_status, + } + + @staticmethod + def _extract_question(events: list[dict[str, Any]]) -> str: + for event in events: + payload = event.get("payload") + if not isinstance(payload, dict): + continue + if event["event_type"] == "confirmed": + question = payload.get("question") + if question: + return question + if event["event_type"] in {"user_message", "task_created"}: + question = payload.get("message") or payload.get( + "description" + ) + if question: + return question + return "" + + @staticmethod + def _extract_summary(events: list[dict[str, Any]]) -> str: + for event in reversed(events): + payload = event.get("payload") + if event["event_type"] == "end": + if isinstance(payload, str): + return SQLiteTranscriptStore._strip_summary_tags(payload) + if isinstance(payload, dict): + summary = ( + payload.get("summary") + or payload.get("task_result") + or payload.get("result") + ) + if summary: + return SQLiteTranscriptStore._strip_summary_tags( + str(summary) + ) + if event["event_type"] == "task_completed" and isinstance( + payload, dict + ): + summary = payload.get("result_summary") + if summary: + return str(summary) + if event["event_type"] == "error" and isinstance(payload, dict): + message = payload.get("message") + if message: + return str(message) + return "" + + @staticmethod + def _is_run_complete(events: list[dict[str, Any]]) -> bool: + terminal_events = {"end", "error", "all_tasks_completed"} + return any(event["event_type"] in terminal_events for event in events) + + @staticmethod + def _strip_summary_tags(value: str) -> str: + if "" in value and "" in value: + return value.split("", 1)[1].split("", 1)[0] + return value diff --git a/backend/app/utils/server/event_sync.py b/backend/app/utils/server/event_sync.py index 36b146a0..975eed72 100644 --- a/backend/app/utils/server/event_sync.py +++ b/backend/app/utils/server/event_sync.py @@ -23,7 +23,7 @@ Environment variables (passed by Electron via process env): SERVER_TOKEN -- JWT auth token for cloud API CLOUD_SYNC_ENABLED -- set to "true" to enable sync - (toggle via Electron IPC / ~/.eigent/.env) + (typically configured in ~/.eigent/.env) """ from __future__ import annotations diff --git a/backend/app/utils/server/sync_step.py b/backend/app/utils/server/sync_step.py index ec2d8f51..2b0ec34e 100644 --- a/backend/app/utils/server/sync_step.py +++ b/backend/app/utils/server/sync_step.py @@ -14,8 +14,9 @@ """ Cloud sync step decorator. -Syncs SSE step data to cloud server when SERVER_URL is configured. -High-frequency events (decompose_text) are batched to reduce API calls. +Persists outgoing SSE step data to the local SQLite event log. +When SERVER_URL is configured, the same step data can also be +forwarded to the cloud server in batches. Config (~/.eigent/.env): SERVER_URL=https://dev.eigent.ai/api @@ -30,6 +31,8 @@ from functools import lru_cache import httpx from app.component.environment import env +from app.event_store.config import get_event_db_path +from app.event_store.sqlite_store import SQLiteTranscriptStore from app.service.task import get_task_lock_if_exists logger = logging.getLogger("sync_step") @@ -42,7 +45,7 @@ _text_buffers: dict[str, str] = {} @lru_cache(maxsize=1) -def _get_config(): +def _get_sync_url(): server_url = env("SERVER_URL", "") if not server_url: @@ -53,44 +56,49 @@ def _get_config(): def sync_step(func): async def wrapper(*args, **kwargs): - config = _get_config() + sync_url = _get_sync_url() - if not config: + try: async for value in func(*args, **kwargs): + _record_step(args, value, sync_url) yield value - return - - async for value in func(*args, **kwargs): - _try_sync(args, value, config) - yield value + finally: + run_id = _get_run_id(args) + if run_id in _text_buffers: + _flush_buffer(args, run_id, sync_url) return wrapper -def _try_sync(args, value, sync_url): +def _record_step(args, value, sync_url): data = _parse_value(value) if not data: return - task_id = _get_task_id(args) - if not task_id: + run_id = _get_run_id(args) + if not run_id: return step = data.get("step") - # Batch decompose_text events to reduce API calls + # Batch decompose_text events to reduce event volume. if step == "decompose_text": - _buffer_text(task_id, data["data"].get("content", "")) - if _should_flush(task_id): - _flush_buffer(task_id, sync_url) + _buffer_text(run_id, data["data"].get("content", "")) + if _should_flush(run_id): + _flush_buffer(args, run_id, sync_url) return - # Flush any buffered text before sending other events (preserves order) - if task_id in _text_buffers: - _flush_buffer(task_id, sync_url) + # Flush any buffered text before sending other events. + if run_id in _text_buffers: + _flush_buffer(args, run_id, sync_url) + + _persist_local_event(args, run_id, step, data["data"]) + + if not sync_url: + return payload = { - "task_id": task_id, + "task_id": _get_current_task_id(args, data["data"]), "step": step, "data": data["data"], "timestamp": time.time_ns() / 1_000_000_000, @@ -99,30 +107,36 @@ def _try_sync(args, value, sync_url): asyncio.create_task(_send(sync_url, payload)) -def _buffer_text(task_id: str, content: str): +def _buffer_text(run_id: str, content: str): """Accumulate decompose_text content in buffer.""" - if task_id not in _text_buffers: - _text_buffers[task_id] = "" - _text_buffers[task_id] += content + if run_id not in _text_buffers: + _text_buffers[run_id] = "" + _text_buffers[run_id] += content -def _should_flush(task_id: str) -> bool: +def _should_flush(run_id: str) -> bool: """Check if buffer has enough words to flush.""" - text = _text_buffers.get(task_id, "") + text = _text_buffers.get(run_id, "") word_count = len(text.split()) return word_count >= BATCH_WORD_THRESHOLD -def _flush_buffer(task_id: str, sync_url: str): +def _flush_buffer(args, run_id: str, sync_url: str | None): """Send buffered text and clear buffer.""" - text = _text_buffers.pop(task_id, "") + text = _text_buffers.pop(run_id, "") if not text: return + data = {"content": text} + _persist_local_event(args, run_id, "decompose_text", data) + + if not sync_url: + return + payload = { - "task_id": task_id, + "task_id": _get_current_task_id(args, data), "step": "decompose_text", - "data": {"content": text}, + "data": data, "timestamp": time.time_ns() / 1_000_000_000, } @@ -143,7 +157,18 @@ def _parse_value(value): return None -def _get_task_id(args): +def _get_run_id(args): + if not args or not hasattr(args[0], "task_id"): + return None + + return getattr(args[0], "task_id", None) + + +def _get_current_task_id(args, payload=None): + payload = payload if isinstance(payload, dict) else {} + if task_id := payload.get("task_id") or payload.get("process_task_id"): + return task_id + if not args or not hasattr(args[0], "task_id"): return None @@ -162,6 +187,46 @@ def _get_task_id(args): return chat.task_id +def _persist_local_event(args, run_id: str, step: str, payload): + if not args or not hasattr(args[0], "project_id"): + return + + chat = args[0] + try: + SQLiteTranscriptStore.append_event( + get_event_db_path(), + run_id=run_id, + project_id=chat.project_id, + task_id=_get_current_task_id(args, payload), + event_type=step, + payload=payload, + source="eigent_sse", + agent_id=_extract_agent_id(payload), + agent_name=_extract_agent_name(payload), + ) + except Exception as e: + logger.warning( + f"Failed to persist local step event {step}: " + f"{type(e).__name__}: {e}" + ) + + +def _extract_agent_id(payload) -> str | None: + if not isinstance(payload, dict): + return None + return ( + payload.get("agent_id") + or payload.get("assignee_id") + or payload.get("worker_id") + ) + + +def _extract_agent_name(payload) -> str | None: + if not isinstance(payload, dict): + return None + return payload.get("agent_name") or payload.get("role") + + async def _send(url, data): try: async with httpx.AsyncClient(timeout=5.0) as client: diff --git a/backend/tests/unit/event_store/test_sqlite_store.py b/backend/tests/unit/event_store/test_sqlite_store.py index 420e2741..14ccc894 100644 --- a/backend/tests/unit/event_store/test_sqlite_store.py +++ b/backend/tests/unit/event_store/test_sqlite_store.py @@ -137,6 +137,21 @@ class TestAppendAndRead: assert ev["run_id"] == "run-001" assert ev["project_id"] == "proj-001" + def test_append_event_for_sse_payload(self, db_path: Path): + event = SQLiteTranscriptStore.append_event( + db_path, + run_id="run-sse", + project_id="proj-001", + task_id="task-001", + event_type="confirmed", + payload={"question": "Summarize the local task"}, + source="eigent_sse", + ) + + assert event["run_id"] == "run-sse" + assert event["event_type"] == "confirmed" + assert event["payload"]["question"] == "Summarize the local task" + class TestSequencing: def test_seq_starts_at_zero(self, store: SQLiteTranscriptStore): @@ -310,6 +325,35 @@ class TestStaticQueries: s1.close() s2.close() + def test_query_runs_extracts_question_summary_and_sync_status( + self, db_path: Path + ): + SQLiteTranscriptStore.append_event( + db_path, + run_id="run-1", + project_id="proj-1", + task_id="run-1", + event_type="confirmed", + payload={"question": "Write a changelog"}, + source="eigent_sse", + ) + SQLiteTranscriptStore.append_event( + db_path, + run_id="run-1", + project_id="proj-1", + task_id="run-1", + event_type="end", + payload="Done", + source="eigent_sse", + ) + + runs = SQLiteTranscriptStore.query_runs(db_path) + assert len(runs) == 1 + assert runs[0]["question"] == "Write a changelog" + assert runs[0]["summary"] == "Done" + assert runs[0]["status"] == 2 + assert runs[0]["sync_status"] == "local" + def test_query_projects(self, db_path: Path): s1 = SQLiteTranscriptStore( path=db_path, run_id="r1", project_id="proj-A" @@ -331,6 +375,44 @@ class TestStaticQueries: s1.close() s2.close() + def test_query_playback_steps_filters_to_eigent_sse( + self, db_path: Path + ): + SQLiteTranscriptStore.append_event( + db_path, + run_id="run-1", + project_id="proj-1", + task_id="task-1", + event_type="confirmed", + payload={"question": "Use local replay"}, + source="eigent_sse", + ) + SQLiteTranscriptStore.append_event( + db_path, + run_id="run-1", + project_id="proj-1", + task_id="task-1", + event_type="task_created", + payload={"description": "raw camel event"}, + source="camel", + ) + SQLiteTranscriptStore.append_event( + db_path, + run_id="run-1", + project_id="proj-1", + task_id="task-1", + event_type="end", + payload="Done", + source="eigent_sse", + ) + + steps = SQLiteTranscriptStore.query_playback_steps( + db_path, run_id="run-1" + ) + assert [step["step"] for step in steps] == ["confirmed", "end"] + assert steps[0]["data"]["question"] == "Use local replay" + assert steps[1]["data"] == "Done" + def test_query_events_empty_db(self, tmp_path: Path): """Query on a non-existent DB path should handle gracefully.""" db_path = tmp_path / "nonexistent.db" diff --git a/electron/main/index.ts b/electron/main/index.ts index aaa986a4..1b9d5da4 100644 --- a/electron/main/index.ts +++ b/electron/main/index.ts @@ -2132,11 +2132,7 @@ function registerIpcHandlers() { }); // ==================== read global env handler ==================== - const ALLOWED_GLOBAL_ENV_KEYS = new Set([ - 'HTTP_PROXY', - 'HTTPS_PROXY', - 'CLOUD_SYNC_ENABLED', - ]); + const ALLOWED_GLOBAL_ENV_KEYS = new Set(['HTTP_PROXY', 'HTTPS_PROXY']); ipcMain.handle('read-global-env', async (_event, key: string) => { if (!ALLOWED_GLOBAL_ENV_KEYS.has(key)) { log.warn(`[ENV] Blocked read of disallowed global env key: ${key}`); @@ -2145,36 +2141,6 @@ function registerIpcHandlers() { return { value: readGlobalEnvKey(key) }; }); - // ==================== cloud sync toggle ==================== - ipcMain.handle('get-cloud-sync', async () => { - const value = readGlobalEnvKey('CLOUD_SYNC_ENABLED'); - return { enabled: value?.toLowerCase() === 'true' }; - }); - - ipcMain.handle('set-cloud-sync', async (_event, enabled: boolean) => { - const GLOBAL_ENV_PATH = path.join(os.homedir(), '.eigent', '.env'); - let content = ''; - try { - content = fs.existsSync(GLOBAL_ENV_PATH) - ? fs.readFileSync(GLOBAL_ENV_PATH, 'utf-8') - : ''; - } catch (error) { - log.error('set-cloud-sync read error:', error); - } - let lines = content.split(/\r?\n/); - lines = updateEnvBlock(lines, { - CLOUD_SYNC_ENABLED: enabled ? 'true' : 'false', - }); - try { - fs.writeFileSync(GLOBAL_ENV_PATH, lines.join('\n'), 'utf-8'); - log.info(`[SYNC] Cloud sync ${enabled ? 'enabled' : 'disabled'}`); - } catch (error) { - log.error('set-cloud-sync write error:', error); - return { success: false }; - } - return { success: true }; - }); - // ==================== new window handler ==================== ipcMain.handle('open-win', (_, arg) => { const childWindow = new BrowserWindow({ diff --git a/electron/preload/index.ts b/electron/preload/index.ts index ffe0efd3..910a670d 100644 --- a/electron/preload/index.ts +++ b/electron/preload/index.ts @@ -201,10 +201,6 @@ contextBridge.exposeInMainWorld('electronAPI', { ipcRenderer.invoke('skill-config-update', userId, skillName, skillConfig), skillConfigDelete: (userId: string, skillName: string) => ipcRenderer.invoke('skill-config-delete', userId, skillName), - // Cloud sync toggle - getCloudSync: () => ipcRenderer.invoke('get-cloud-sync'), - setCloudSync: (enabled: boolean) => - ipcRenderer.invoke('set-cloud-sync', enabled), }); // --------- Preload scripts loading --------- diff --git a/server/app/domains/chat/api/event_controller.py b/server/app/domains/chat/api/event_controller.py deleted file mode 100644 index 2bf39347..00000000 --- a/server/app/domains/chat/api/event_controller.py +++ /dev/null @@ -1,205 +0,0 @@ -# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= - -"""Workflow Event controller for batch ingestion and read access. - -Receives canonical events from the local sync worker and provides -read endpoints for cloud-based history queries. -""" - -from typing import List, Optional - -from fastapi import APIRouter, Depends, HTTPException, Query -from sqlmodel import Session, select - -from app.core.database import session -from app.model.chat.workflow_event import ( - BatchIngestRequest, - BatchIngestResponse, - WorkflowEvent, - WorkflowEventOut, -) -from app.shared.auth import auth_must - -router = APIRouter(prefix="/chat", tags=["V1 Workflow Events"]) - - -@router.post( - "/events/batch", - name="batch ingest workflow events", - response_model=BatchIngestResponse, -) -async def batch_ingest( - body: BatchIngestRequest, - db_session: Session = Depends(session), - auth=Depends(auth_must), -): - """Idempotent batch ingestion of canonical events. - - - Duplicate ``event_id`` is silently skipped (idempotent). - - Conflicting ``(run_id, seq)`` with a different - ``event_id`` is rejected. - """ - accepted: list[str] = [] - rejected: list[dict] = [] - - if not body.events: - return BatchIngestResponse( - accepted=accepted, rejected=rejected - ) - - # Preflight: find existing event_ids to skip duplicates - incoming_ids = [e.event_id for e in body.events] - existing_rows = db_session.exec( - select(WorkflowEvent.event_id).where( - WorkflowEvent.event_id.in_(incoming_ids) - ) - ).all() - existing_ids = set(existing_rows) - - for event_in in body.events: - if event_in.event_id in existing_ids: - # Already ingested -- idempotent skip - accepted.append(event_in.event_id) - continue - - try: - db_event = WorkflowEvent( - event_id=event_in.event_id, - run_id=event_in.run_id, - task_id=event_in.task_id, - project_id=event_in.project_id, - user_id=auth.user.id, - seq=event_in.seq, - event_type=event_in.event_type, - occurred_at=event_in.occurred_at, - source=event_in.source, - agent_id=event_in.agent_id, - agent_name=event_in.agent_name, - schema_version=event_in.schema_version, - payload=event_in.payload, - ) - db_session.add(db_event) - db_session.flush() - accepted.append(event_in.event_id) - except Exception as e: - db_session.rollback() - error_msg = str(e) - if "uq_workflow_event_run_seq" in error_msg: - rejected.append( - { - "event_id": event_in.event_id, - "reason": ( - f"Conflicting (run_id=" - f"{event_in.run_id}, " - f"seq={event_in.seq})" - ), - } - ) - else: - rejected.append( - { - "event_id": event_in.event_id, - "reason": error_msg[:200], - } - ) - - # Commit all accepted events - try: - db_session.commit() - except Exception as e: - db_session.rollback() - return BatchIngestResponse( - accepted=[], - rejected=[ - { - "event_id": eid, - "reason": str(e)[:200], - } - for eid in incoming_ids - ], - ) - - return BatchIngestResponse( - accepted=accepted, rejected=rejected - ) - - -@router.get( - "/events", - name="list workflow events", - response_model=List[WorkflowEventOut], -) -async def list_events( - run_id: Optional[str] = Query(None), - task_id: Optional[str] = Query(None), - project_id: Optional[str] = Query(None), - after_seq: Optional[int] = Query(None), - limit: int = Query(200, ge=1, le=1000), - db_session: Session = Depends(session), - auth=Depends(auth_must), -): - """Read events filtered by run_id, task_id, or project_id. - - Ownership-checked: only returns events belonging to the - authenticated user. - """ - if not any([run_id, task_id, project_id]): - raise HTTPException( - status_code=400, - detail=( - "At least one of run_id, task_id, " - "or project_id is required" - ), - ) - - query = select(WorkflowEvent).where( - WorkflowEvent.user_id == auth.user.id - ) - - if run_id: - query = query.where(WorkflowEvent.run_id == run_id) - if task_id: - query = query.where(WorkflowEvent.task_id == task_id) - if project_id: - query = query.where( - WorkflowEvent.project_id == project_id - ) - if after_seq is not None: - query = query.where(WorkflowEvent.seq > after_seq) - - query = query.order_by( - WorkflowEvent.run_id, WorkflowEvent.seq - ).limit(limit) - - return list(db_session.exec(query).all()) - - -@router.get( - "/events/{event_id}", - name="get workflow event", - response_model=WorkflowEventOut, -) -async def get_event( - event_id: str, - db_session: Session = Depends(session), - auth=Depends(auth_must), -): - """Get a single event by event_id. Ownership-checked.""" - event = db_session.get(WorkflowEvent, event_id) - if not event or event.user_id != auth.user.id: - raise HTTPException( - status_code=404, detail="Event not found" - ) - return event diff --git a/server/app/model/chat/workflow_event.py b/server/app/model/chat/workflow_event.py deleted file mode 100644 index 98ae016d..00000000 --- a/server/app/model/chat/workflow_event.py +++ /dev/null @@ -1,149 +0,0 @@ -# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= - -"""Append-only workflow event model for the canonical event log. - -This table stores raw events synced from local SQLite databases. -It is NOT a replacement for ChatStep -- both coexist. ChatStep -serves real-time UI playback; WorkflowEvent is the durable -canonical history. -""" - -import json -from datetime import datetime -from typing import Any, Optional - -from pydantic import BaseModel, field_validator -from sqlalchemy import TIMESTAMP, UniqueConstraint, text -from sqlmodel import Field, JSON - -from app.model.abstract.model import AbstractModel - - -class WorkflowEvent(AbstractModel, table=True): - """Append-only event from agent execution. - - Does NOT inherit DefaultTimes because events are immutable -- - no updated_at or deleted_at. - """ - - __table_args__ = ( - UniqueConstraint( - "run_id", "seq", name="uq_workflow_event_run_seq" - ), - ) - - event_id: str = Field(primary_key=True) - run_id: str = Field(index=True) - task_id: Optional[str] = Field(default=None, index=True) - project_id: str = Field(index=True) - user_id: int = Field(index=True) - seq: int - event_type: str - occurred_at: str # ISO-8601 from the local client - source: str = Field(default="camel") - agent_id: Optional[str] = Field(default=None) - agent_name: Optional[str] = Field(default=None) - schema_version: int = Field(default=1) - payload: str = Field(sa_type=JSON, default="{}") - ingested_at: Optional[datetime] = Field( - default_factory=datetime.now, - sa_type=TIMESTAMP, - sa_column_kwargs={ - "server_default": text("CURRENT_TIMESTAMP") - }, - ) - - @field_validator("payload", mode="before") - @classmethod - def serialize_payload(cls, v): - if isinstance(v, (dict, list)): - return json.dumps(v, ensure_ascii=False) - return v - - @field_validator("payload", mode="after") - @classmethod - def deserialize_payload(cls, v): - if isinstance(v, str): - try: - return json.loads(v) - except Exception: - return v - return v - - -# ------------------------------------------------------------------- -# Pydantic companion schemas -# ------------------------------------------------------------------- - - -class WorkflowEventIn(BaseModel): - """Ingestion payload for a single event (from sync worker).""" - - event_id: str - run_id: str - task_id: Optional[str] = None - project_id: str - seq: int - event_type: str - occurred_at: str - source: str = "camel" - agent_id: Optional[str] = None - agent_name: Optional[str] = None - schema_version: int = 1 - payload: Any = {} - - -class WorkflowEventOut(BaseModel): - """Read response for a single event.""" - - event_id: str - run_id: str - task_id: Optional[str] = None - project_id: str - user_id: int - seq: int - event_type: str - occurred_at: str - source: str - agent_id: Optional[str] = None - agent_name: Optional[str] = None - schema_version: int - payload: Any - ingested_at: Optional[datetime] = None - - class Config: - from_attributes = True - - -class BatchIngestRequest(BaseModel): - """Batch ingestion request body.""" - - events: list[WorkflowEventIn] - - @field_validator("events") - @classmethod - def limit_batch_size(cls, v): - if len(v) > 200: - raise ValueError( - "Batch size must not exceed 200 events" - ) - return v - - -class BatchIngestResponse(BaseModel): - """Batch ingestion response body.""" - - accepted: list[str] = [] # event_ids that were ingested - rejected: list[dict[str, Any]] = [] # {event_id, reason} diff --git a/src/components/GroupedHistoryView/ProjectGroup.tsx b/src/components/GroupedHistoryView/ProjectGroup.tsx index 3980a667..ba64f1cb 100644 --- a/src/components/GroupedHistoryView/ProjectGroup.tsx +++ b/src/components/GroupedHistoryView/ProjectGroup.tsx @@ -28,6 +28,9 @@ import { ChatTaskStatus } from '@/types/constants'; import { ProjectGroup as ProjectGroupType } from '@/types/history'; import { motion } from 'framer-motion'; import { + Cloud, + CloudAlertIcon, + CloudOff, Edit, Hash, Loader2, @@ -438,6 +441,28 @@ export default function ProjectGroup({ {project.total_triggers || 0} + + {project.sync_status === 'synced' && ( + + + + + + )} + {project.sync_status === 'local' && ( + + + + + + )} + {project.sync_status === 'partial' && ( + + + + + + )} {/* End: Status and menu */} diff --git a/src/components/GroupedHistoryView/TaskItem.tsx b/src/components/GroupedHistoryView/TaskItem.tsx index 3cb253e3..1a7603b3 100644 --- a/src/components/GroupedHistoryView/TaskItem.tsx +++ b/src/components/GroupedHistoryView/TaskItem.tsx @@ -29,6 +29,8 @@ import { CirclePause, CirclePlay, Clock, + Cloud, + CloudOff, Ellipsis, Hash, Pin, @@ -138,6 +140,17 @@ export default function TaskItem({
{!isOngoing && getStatusTag(task.status)} + {task.sync_status === 'synced' && ( + + + + )} + {(task.sync_status === 'local' || task.sync_status === 'pending') && ( + + + + )} + {task.tokens ? task.tokens.toLocaleString() : '0'} diff --git a/src/components/SearchHistoryDialog.tsx b/src/components/SearchHistoryDialog.tsx index 1e33a03f..d6fe128c 100644 --- a/src/components/SearchHistoryDialog.tsx +++ b/src/components/SearchHistoryDialog.tsx @@ -14,7 +14,7 @@ 'use client'; -import { ScanFace, Search } from 'lucide-react'; +import { Cloud, CloudOff, ScanFace, Search } from 'lucide-react'; import { useEffect, useState } from 'react'; import GroupedHistoryView from '@/components/GroupedHistoryView'; @@ -31,6 +31,7 @@ import useChatStoreAdapter from '@/hooks/useChatStoreAdapter'; import { loadProjectFromHistory } from '@/lib'; import { fetchHistoryTasks } from '@/service/historyApi'; import { useGlobalStore } from '@/store/globalStore'; +import { HistoryTask } from '@/types/history'; import { VisuallyHidden } from '@radix-ui/react-visually-hidden'; import { useTranslation } from 'react-i18next'; import { useNavigate } from 'react-router-dom'; @@ -40,7 +41,7 @@ import { DialogTitle } from './ui/dialog'; export function SearchHistoryDialog() { const { t } = useTranslation(); const [open, setOpen] = useState(false); - const [historyTasks, setHistoryTasks] = useState([]); + const [historyTasks, setHistoryTasks] = useState([]); const { history_type } = useGlobalStore(); //Get Chatstore for the active project's task const { chatStore, projectStore } = useChatStoreAdapter(); @@ -139,8 +140,17 @@ export function SearchHistoryDialog() { } > -
- {task.question} +
+ {task.sync_status === 'synced' && ( + + )} + {(task.sync_status === 'local' || + task.sync_status === 'pending') && ( + + )} +
+ {task.question} +
))} diff --git a/src/service/historyApi.ts b/src/service/historyApi.ts index 3f94a931..59e8778a 100644 --- a/src/service/historyApi.ts +++ b/src/service/historyApi.ts @@ -13,28 +13,139 @@ // ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import { proxyFetchGet } from '@/api/http'; +import { fetchLocalRuns, RunSummary } from '@/service/localEventApi'; import { HistoryTask, ProjectGroup } from '@/types/history'; -// Group tasks by project_id and add project-level metadata -const groupTasksByProject = (tasks: HistoryTask[]): ProjectGroup[] => { +const normalizeTask = (task: Partial): HistoryTask => ({ + id: task.id ?? '', + task_id: task.task_id ?? '', + project_id: task.project_id ?? task.task_id ?? '', + question: task.question ?? '', + language: task.language ?? 'local', + model_platform: task.model_platform ?? 'local', + model_type: task.model_type ?? 'local', + api_key: task.api_key ?? '', + api_url: task.api_url, + max_retries: task.max_retries ?? 0, + file_save_path: task.file_save_path, + installed_mcp: task.installed_mcp, + project_name: task.project_name, + summary: task.summary, + tokens: task.tokens ?? 0, + status: task.status ?? 1, + created_at: task.created_at, + updated_at: task.updated_at, + sync_status: task.sync_status, +}); + +const localRunToTask = (run: RunSummary): HistoryTask => + normalizeTask({ + id: run.id || `local:${run.run_id}`, + task_id: run.task_id || run.run_id, + project_id: run.project_id, + question: run.question || 'Untitled task', + summary: run.summary, + status: run.status ?? 1, + created_at: run.first_event, + updated_at: run.last_event, + sync_status: run.sync_status, + }); + +const normalizeCloudTask = (task: HistoryTask): HistoryTask => + normalizeTask({ + ...task, + sync_status: task.sync_status ?? 'synced', + }); + +const sortTasks = (tasks: HistoryTask[]): HistoryTask[] => + tasks.sort((a, b) => { + const dateA = new Date(a.updated_at || a.created_at || 0).getTime(); + const dateB = new Date(b.updated_at || b.created_at || 0).getTime(); + return dateB - dateA; + }); + +const mergeTasks = ( + localRuns: RunSummary[], + cloudTasks: HistoryTask[] +): HistoryTask[] => { + const taskMap = new Map(); + + cloudTasks.forEach((task) => { + taskMap.set(task.task_id, normalizeCloudTask(task)); + }); + + localRuns.map(localRunToTask).forEach((localTask) => { + const cloudTask = taskMap.get(localTask.task_id); + if (!cloudTask) { + taskMap.set(localTask.task_id, localTask); + return; + } + + taskMap.set( + localTask.task_id, + normalizeTask({ + ...cloudTask, + ...localTask, + id: cloudTask.id ?? localTask.id, + project_name: cloudTask.project_name ?? localTask.project_name, + language: cloudTask.language || localTask.language, + model_platform: cloudTask.model_platform || localTask.model_platform, + model_type: cloudTask.model_type || localTask.model_type, + max_retries: cloudTask.max_retries ?? localTask.max_retries, + tokens: cloudTask.tokens ?? localTask.tokens, + question: localTask.question || cloudTask.question, + summary: localTask.summary || cloudTask.summary, + created_at: localTask.created_at || cloudTask.created_at, + updated_at: localTask.updated_at || cloudTask.updated_at, + sync_status: localTask.sync_status, + }) + ); + }); + + return sortTasks(Array.from(taskMap.values())); +}; + +const getProjectSyncStatus = ( + tasks: HistoryTask[] +): ProjectGroup['sync_status'] => { + const statuses = new Set(tasks.map((task) => task.sync_status || 'synced')); + if (statuses.size === 1 && statuses.has('synced')) { + return 'synced'; + } + if (statuses.size === 1 && statuses.has('local')) { + return 'local'; + } + return 'partial'; +}; + +const groupTasksByProject = ( + tasks: HistoryTask[], + projectMetadata = new Map>() +): ProjectGroup[] => { const projectMap = new Map(); tasks.forEach((task) => { const projectId = task.project_id; + const metadata = projectMetadata.get(projectId); if (!projectMap.has(projectId)) { projectMap.set(projectId, { project_id: projectId, - project_name: task.project_name || `Project ${projectId}`, + project_name: + task.project_name || + metadata?.project_name || + `Project ${projectId}`, total_tokens: 0, task_count: 0, - total_triggers: 0, - latest_task_date: task.created_at || new Date().toISOString(), + total_triggers: metadata?.total_triggers || 0, + latest_task_date: + task.updated_at || task.created_at || new Date().toISOString(), tasks: [], total_completed_tasks: 0, total_ongoing_tasks: 0, average_tokens_per_task: 0, - last_prompt: task.question || '', + last_prompt: task.question || metadata?.last_prompt || '', + sync_status: 'local', }); } @@ -43,37 +154,31 @@ const groupTasksByProject = (tasks: HistoryTask[]): ProjectGroup[] => { project.task_count++; project.total_tokens += task.tokens || 0; - // ChatStatus enum: ongoing = 1, done = 2 if (task.status === 2) { - // ChatStatus.done (completed) project.total_completed_tasks++; } else if (task.status === 1) { - // ChatStatus.ongoing (pending/running etc..) project.total_ongoing_tasks++; } - // Update latest task date - if (task.created_at && task.created_at > project.latest_task_date) { - project.latest_task_date = task.created_at; + const candidateDate = + task.updated_at || task.created_at || project.latest_task_date; + if (candidateDate >= project.latest_task_date) { + project.latest_task_date = candidateDate; + project.last_prompt = task.question || project.last_prompt; + project.project_name = + task.project_name || metadata?.project_name || project.project_name; } }); - // Calculate averages and sort tasks within each project projectMap.forEach((project) => { project.average_tokens_per_task = project.task_count > 0 ? Math.round(project.total_tokens / project.task_count) : 0; - - // Sort tasks by creation date (newest first) - project.tasks.sort((a, b) => { - const dateA = new Date(a.created_at || 0).getTime(); - const dateB = new Date(b.created_at || 0).getTime(); - return dateB - dateA; - }); + project.sync_status = getProjectSyncStatus(project.tasks); + project.tasks = sortTasks(project.tasks); }); - // Convert to array and sort by latest task date (newest first) return Array.from(projectMap.values()).sort((a, b) => { const dateA = new Date(a.latest_task_date).getTime(); const dateB = new Date(b.latest_task_date).getTime(); @@ -81,81 +186,84 @@ const groupTasksByProject = (tasks: HistoryTask[]): ProjectGroup[] => { }); }; -export const fetchHistoryTasks = async ( - setTasks: React.Dispatch> -) => { +const fetchCloudHistoryTasks = async (): Promise => { try { const res = await proxyFetchGet(`/api/v1/chat/histories`); - setTasks(res.items); + return Array.isArray(res?.items) + ? res.items.map((task: HistoryTask) => normalizeCloudTask(task)) + : []; + } catch (error) { + console.error('Failed to fetch cloud history tasks:', error); + return []; + } +}; + +const fetchCloudGroupedProjects = async (): Promise => { + try { + const res = await proxyFetchGet( + `/api/v1/chat/histories/grouped?include_tasks=true` + ); + return Array.isArray(res?.projects) ? res.projects : []; + } catch (error) { + console.error('Failed to fetch grouped cloud history tasks:', error); + return []; + } +}; + +export const fetchHistoryTasks = async ( + setTasks: React.Dispatch> +) => { + try { + const [localRuns, cloudTasks] = await Promise.all([ + fetchLocalRuns(), + fetchCloudHistoryTasks(), + ]); + setTasks(mergeTasks(localRuns, cloudTasks)); } catch (error) { console.error('Failed to fetch history tasks:', error); setTasks([]); } }; -// New function to fetch grouped history tasks from the backend endpoint export const fetchGroupedHistoryTasks = async ( setProjects: React.Dispatch> ) => { try { - const res = await proxyFetchGet( - `/api/v1/chat/histories/grouped?include_tasks=true` + const [localRuns, cloudProjects] = await Promise.all([ + fetchLocalRuns(), + fetchCloudGroupedProjects(), + ]); + const cloudTasks = cloudProjects.flatMap((project) => project.tasks || []); + const mergedTasks = mergeTasks(localRuns, cloudTasks); + const metadata = new Map( + cloudProjects.map((project) => [ + project.project_id, + { + project_name: project.project_name, + total_triggers: project.total_triggers, + last_prompt: project.last_prompt, + }, + ]) ); - // If the response doesn't have projects field, fall back to legacy grouping - if (!res || !res.projects) { - await fetchGroupedHistoryTasksLegacy(setProjects); - } else { - setProjects(res.projects); - } - } catch (error) { - console.error( - 'Failed to fetch grouped history tasks, falling back to legacy:', - error - ); - // Fall back to legacy grouping if the new endpoint fails - await fetchGroupedHistoryTasksLegacy(setProjects); - } -}; - -// Function to fetch grouped history summaries only (without individual tasks for better performance) -export const fetchGroupedHistorySummaries = async ( - setProjects: React.Dispatch> -) => { - try { - const res = await proxyFetchGet( - `/api/v1/chat/histories/grouped?include_tasks=false` - ); - // If the response doesn't have projects field, fall back to legacy grouping - if (!res || !res.projects) { - await fetchGroupedHistoryTasksLegacy(setProjects); - } else { - setProjects(res.projects); - } - } catch (error) { - console.error( - 'Failed to fetch grouped history summaries, falling back to legacy:', - error - ); - // Fall back to legacy grouping if the new endpoint fails - await fetchGroupedHistoryTasksLegacy(setProjects); - } -}; - -// Legacy function for backward compatibility - groups on frontend -export const fetchGroupedHistoryTasksLegacy = async ( - setProjects: React.Dispatch> -) => { - try { - const res = await proxyFetchGet(`/api/v1/chat/histories`); - const groupedProjects = groupTasksByProject(res.items); - setProjects(groupedProjects); + setProjects(groupTasksByProject(mergedTasks, metadata)); } catch (error) { console.error('Failed to fetch grouped history tasks:', error); setProjects([]); } }; -// Utility function to get all tasks from grouped data (for backward compatibility) +export const fetchGroupedHistorySummaries = async ( + setProjects: React.Dispatch> +) => { + await fetchGroupedHistoryTasks(setProjects); +}; + +export const fetchGroupedHistoryTasksLegacy = async ( + setProjects: React.Dispatch> +) => { + await fetchGroupedHistoryTasks(setProjects); +}; + export const flattenProjectTasks = ( projects: ProjectGroup[] ): HistoryTask[] => { diff --git a/src/service/localEventApi.ts b/src/service/localEventApi.ts index 3bcff1b4..7fc00271 100644 --- a/src/service/localEventApi.ts +++ b/src/service/localEventApi.ts @@ -12,20 +12,8 @@ // limitations under the License. // ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -/** - * Local event log API client. - * - * Reads from the local Python backend's SQLite event log endpoints. - * These calls go to localhost (no cloud dependency) and are used by - * the frontend's local-first data loading path. - */ - import { fetchGet } from '@/api/http'; -// ----------------------------------------------------------------------- -// Types -// ----------------------------------------------------------------------- - export interface EventEnvelope { event_id: string; run_id: string; @@ -38,36 +26,46 @@ export interface EventEnvelope { agent_id: string | null; agent_name: string | null; schema_version: number; - payload: Record; + payload: Record | string; synced_at: string | null; sync_attempts: number; } export interface RunSummary { + id: string; run_id: string; project_id: string; task_id: string | null; event_count: number; first_event: string; last_event: string; + question: string; + summary: string; + status: number; + synced_events: number; + sync_status: 'local' | 'synced' | 'pending'; } export interface ProjectSummary { project_id: string; run_count: number; + task_count: number; event_count: number; first_event: string; last_event: string; + last_prompt: string; + total_completed_tasks: number; + total_ongoing_tasks: number; + sync_status: 'local' | 'synced' | 'partial'; } -// ----------------------------------------------------------------------- -// API calls (hit local backend at localhost:{backendPort}) -// ----------------------------------------------------------------------- +export interface PlaybackStep { + step: string; + data: Record | string; + task_id: string | null; + created_at: string; +} -/** - * Fetch events from the local event log. - * At least one of runId, taskId, or projectId must be provided. - */ export const fetchLocalEvents = async (params: { runId?: string; taskId?: string; @@ -91,9 +89,6 @@ export const fetchLocalEvents = async (params: { } }; -/** - * List all runs from the local event log, optionally filtered by project. - */ export const fetchLocalRuns = async ( projectId?: string ): Promise => { @@ -109,9 +104,6 @@ export const fetchLocalRuns = async ( } }; -/** - * List projects with aggregated stats from the local event log. - */ export const fetchLocalProjects = async (): Promise => { try { const res = await fetchGet('/events/projects'); @@ -122,10 +114,18 @@ export const fetchLocalProjects = async (): Promise => { } }; -/** - * Check if local events exist for a given run/task. - * Useful for deciding whether to load from local DB vs cloud. - */ +export const fetchLocalPlaybackSteps = async ( + runId: string +): Promise => { + try { + const res = await fetchGet(`/events/steps/${encodeURIComponent(runId)}`); + return Array.isArray(res) ? res : []; + } catch (error) { + console.debug('[localEventApi] fetchLocalPlaybackSteps failed:', error); + return []; + } +}; + export const hasLocalEvents = async (params: { runId?: string; taskId?: string; diff --git a/src/store/chatStore.ts b/src/store/chatStore.ts index 3c404ea8..b7540893 100644 --- a/src/store/chatStore.ts +++ b/src/store/chatStore.ts @@ -26,6 +26,7 @@ import { import { showCreditsToast } from '@/components/Toast/creditsToast'; import { showStorageToast } from '@/components/Toast/storageToast'; import { generateUniqueId, uploadLog } from '@/lib'; +import { fetchLocalPlaybackSteps } from '@/service/localEventApi'; import { proxyUpdateTriggerExecution } from '@/service/triggerApi'; import { ExecutionStatus } from '@/types'; import { @@ -94,6 +95,7 @@ export interface ChatStore { setStatus: (taskId: string, status: ChatTaskStatusType) => void; setActiveTaskId: (taskId: string) => void; replay: (taskId: string, question: string, time: number) => Promise; + loadHistoryTask: (taskId: string, question: string) => Promise; startTask: ( taskId: string, type?: string, @@ -205,6 +207,9 @@ const normalizeToolkitMessage = (value: unknown) => { } }; +const sleep = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); + /** Persist subtask edits to backend via PUT /task/{project_id}. */ const persistSubtaskEdits = (taskInfo: TaskInfo[]) => { const projectId = useProjectStore.getState().activeProjectId; @@ -585,20 +590,25 @@ const chatStore = (initial?: Partial) => const base_Url = import.meta.env.DEV ? import.meta.env.VITE_PROXY_URL : import.meta.env.VITE_BASE_URL; + const localPlaybackSteps = + type === 'replay' ? await fetchLocalPlaybackSteps(newTaskId) : []; + const hasLocalPlayback = + type === 'replay' && localPlaybackSteps.length > 0; const api = type == 'share' ? `${base_Url}/api/v1/chat/share/playback/${shareToken}?delay_time=${delayTime}` : type == 'replay' - ? `${base_Url}/api/v1/chat/steps/playback/${newTaskId}?delay_time=${delayTime}` + ? hasLocalPlayback + ? '' + : `${base_Url}/api/v1/chat/steps/playback/${newTaskId}?delay_time=${delayTime}` : `${baseURL}/chat`; - const { tasks: _tasks } = get(); let historyId: string | null = projectStore.getHistoryId(project_id); - let snapshots: any = []; + let snapshots: any[] = []; let skipFirstConfirm = true; - // replay or share request - if (type) { + // Cloud replay/share can reuse previously uploaded snapshots. + if (type && !hasLocalPlayback) { const res = await proxyFetchGet(`/api/v1/chat/snapshots`, { api_task_id: taskId, }); @@ -619,99 +629,101 @@ const chatStore = (initial?: Partial) => api_url: '', extra_params: {}, }; - if (modelType === 'custom' || modelType === 'local') { - const res = await proxyFetchGet('/api/v1/providers', { - prefer: true, - }); - const providerList = res.items || []; - console.log('providerList', providerList); - const provider = providerList[0]; - - if (!provider) { - throw new Error( - 'No model provider configured. Please go to Agents > Models and configure at least one model provider as default.' - ); - } - - apiModel = { - api_key: provider.api_key, - model_type: provider.model_type, - model_platform: provider.provider_name, - api_url: provider.endpoint_url || provider.api_url, - extra_params: provider.encrypted_config, - }; - } else if (modelType === 'cloud') { - // get current model - const res = await proxyFetchGet('/api/v1/user/key'); - if (res.warning_code && res.warning_code === '21') { - showStorageToast(); - } - apiModel = { - api_key: res.value, - model_type: cloud_model_type, - model_platform: cloud_model_type.includes('gpt') - ? 'openai' - : cloud_model_type.includes('claude') - ? 'aws-bedrock' - : cloud_model_type.includes('gemini') - ? 'gemini' - : 'openai-compatible-model', - api_url: res.api_url, - extra_params: {}, - }; - } - - // Get search engine configuration for custom mode let searchConfig: Record = {}; - if (modelType === 'custom') { - try { - const configsRes = await proxyFetchGet('/api/v1/configs'); - const configs = Array.isArray(configsRes) ? configsRes : []; - - // Extract Google Search API keys - const googleApiKey = configs.find( - (c: any) => - c.config_group?.toLowerCase() === 'search' && - c.config_name === 'GOOGLE_API_KEY' - )?.config_value; - - const searchEngineId = configs.find( - (c: any) => - c.config_group?.toLowerCase() === 'search' && - c.config_name === 'SEARCH_ENGINE_ID' - )?.config_value; - - if (googleApiKey && searchEngineId) { - searchConfig = { - GOOGLE_API_KEY: googleApiKey, - SEARCH_ENGINE_ID: searchEngineId, - }; - console.log('Loaded custom search configuration'); - } - } catch (error) { - console.error('Failed to load search configuration:', error); - } - } - - const addWorkers = workerList.map((worker) => { - return { - name: worker.workerInfo?.name, - description: worker.workerInfo?.description, - tools: worker.workerInfo?.tools, - mcp_tools: worker.workerInfo?.mcp_tools, - }; - }); - - // get env path + let addWorkers: Array<{ + name: string; + description: string; + tools: any; + mcp_tools: any; + }> = []; let envPath = ''; - try { - envPath = await window.ipcRenderer.invoke('get-env-path', email); - } catch (error) { - console.log('get-env-path error', error); - } + let browser_port = 0; + let cdp_browsers: any[] = []; - // create history if (!type) { + if (modelType === 'custom' || modelType === 'local') { + const res = await proxyFetchGet('/api/v1/providers', { + prefer: true, + }); + const providerList = res.items || []; + console.log('providerList', providerList); + const provider = providerList[0]; + + if (!provider) { + throw new Error( + 'No model provider configured. Please go to Agents > Models and configure at least one model provider as default.' + ); + } + + apiModel = { + api_key: provider.api_key, + model_type: provider.model_type, + model_platform: provider.provider_name, + api_url: provider.endpoint_url || provider.api_url, + extra_params: provider.encrypted_config, + }; + } else if (modelType === 'cloud') { + const res = await proxyFetchGet('/api/v1/user/key'); + if (res.warning_code && res.warning_code === '21') { + showStorageToast(); + } + apiModel = { + api_key: res.value, + model_type: cloud_model_type, + model_platform: cloud_model_type.includes('gpt') + ? 'openai' + : cloud_model_type.includes('claude') + ? 'aws-bedrock' + : cloud_model_type.includes('gemini') + ? 'gemini' + : 'openai-compatible-model', + api_url: res.api_url, + extra_params: {}, + }; + } + + if (modelType === 'custom') { + try { + const configsRes = await proxyFetchGet('/api/v1/configs'); + const configs = Array.isArray(configsRes) ? configsRes : []; + const googleApiKey = configs.find( + (c: any) => + c.config_group?.toLowerCase() === 'search' && + c.config_name === 'GOOGLE_API_KEY' + )?.config_value; + const searchEngineId = configs.find( + (c: any) => + c.config_group?.toLowerCase() === 'search' && + c.config_name === 'SEARCH_ENGINE_ID' + )?.config_value; + + if (googleApiKey && searchEngineId) { + searchConfig = { + GOOGLE_API_KEY: googleApiKey, + SEARCH_ENGINE_ID: searchEngineId, + }; + console.log('Loaded custom search configuration'); + } + } catch (error) { + console.error('Failed to load search configuration:', error); + } + } + + addWorkers = workerList.map((worker) => { + return { + name: worker.workerInfo?.name ?? '', + description: worker.workerInfo?.description ?? '', + tools: worker.workerInfo?.tools, + mcp_tools: worker.workerInfo?.mcp_tools, + }; + }); + + try { + envPath = await window.ipcRenderer.invoke('get-env-path', email); + } catch (error) { + console.log('get-env-path error', error); + } + const authStore = getAuthStore(); const obj = { @@ -743,9 +755,9 @@ const chatStore = (initial?: Partial) => if (project_id && historyId) projectStore.setHistoryId(project_id, historyId); }); + browser_port = await window.ipcRenderer.invoke('get-browser-port'); + cdp_browsers = await window.ipcRenderer.invoke('get-cdp-browsers'); } - const browser_port = await window.ipcRenderer.invoke('get-browser-port'); - const cdp_browsers = await window.ipcRenderer.invoke('get-cdp-browsers'); // Lock the chatStore reference at the start of SSE session to prevent focus changes // during active message processing @@ -788,48 +800,7 @@ const chatStore = (initial?: Partial) => lockedTaskId = newTaskId; }; - const ssePromise = fetchEventSource(api, { - method: !type ? 'POST' : 'GET', - openWhenHidden: true, - signal: abortController.signal, // Add abort signal for proper cleanup - headers: { - 'Content-Type': 'application/json', - Authorization: - type == 'replay' - ? `Bearer ${token}` - : (undefined as unknown as string), - }, - body: !type - ? JSON.stringify({ - project_id: project_id, - task_id: newTaskId, - question: - messageContent || - targetChatStore.getState().getLastUserMessage()?.content, - model_platform: apiModel.model_platform, - email, - model_type: apiModel.model_type, - api_key: apiModel.api_key, - api_url: apiModel.api_url, - extra_params: apiModel.extra_params, - installed_mcp: { mcpServers: {} }, - language: systemLanguage, - allow_local_system: true, - attaches: ( - messageAttaches || - targetChatStore.getState().tasks[newTaskId]?.attaches || - [] - ).map((f) => f.filePath), - summary_prompt: ``, - new_agents: [...addWorkers], - browser_port: browser_port, - cdp_browsers: cdp_browsers, - env_path: envPath, - search_config: searchConfig, - }) - : undefined, - - async onmessage(event: any) { + const handleIncomingMessage = async (event: any) => { let agentMessages: AgentMessage; try { @@ -2502,7 +2473,73 @@ const chatStore = (initial?: Partial) => isConfirm: false, }; addMessages(currentTaskId, newMessage); + }; + + if (hasLocalPlayback) { + try { + get().setAttaches(newTaskId, []); + for (let index = 0; index < localPlaybackSteps.length; index++) { + if (abortController.signal.aborted) { + break; + } + await handleIncomingMessage({ + data: JSON.stringify(localPlaybackSteps[index]), + }); + if ( + (delayTime || 0) > 0 && + index < localPlaybackSteps.length - 1 + ) { + await sleep((delayTime || 0) * 1000); + } + } + } finally { + delete activeSSEControllers[newTaskId]; + } + return; + } + + const ssePromise = fetchEventSource(api, { + method: !type ? 'POST' : 'GET', + openWhenHidden: true, + signal: abortController.signal, // Add abort signal for proper cleanup + headers: { + 'Content-Type': 'application/json', + Authorization: + type == 'replay' + ? `Bearer ${token}` + : (undefined as unknown as string), }, + body: !type + ? JSON.stringify({ + project_id: project_id, + task_id: newTaskId, + question: + messageContent || + targetChatStore.getState().getLastUserMessage()?.content, + model_platform: apiModel.model_platform, + email, + model_type: apiModel.model_type, + api_key: apiModel.api_key, + api_url: apiModel.api_url, + extra_params: apiModel.extra_params, + installed_mcp: { mcpServers: {} }, + language: systemLanguage, + allow_local_system: true, + attaches: ( + messageAttaches || + targetChatStore.getState().tasks[newTaskId]?.attaches || + [] + ).map((f) => f.filePath), + summary_prompt: ``, + new_agents: [...addWorkers], + browser_port: browser_port, + cdp_browsers: cdp_browsers, + env_path: envPath, + search_config: searchConfig, + }) + : undefined, + + onmessage: handleIncomingMessage, async onopen(respond) { console.log('open', respond); const { setAttaches, activeTaskId } = get(); @@ -2630,14 +2667,7 @@ const chatStore = (initial?: Partial) => addMessages, startTask, setActiveTaskId, - handleConfirmTask, } = get(); - //get project id - const project_id = useProjectStore.getState().activeProjectId; - if (!project_id) { - console.error("Can't replay task because no project id provided"); - return; - } create(taskId, 'replay'); setHasMessages(taskId, true); @@ -2649,7 +2679,21 @@ const chatStore = (initial?: Partial) => await startTask(taskId, 'replay', undefined, time); setActiveTaskId(taskId); - handleConfirmTask(project_id, taskId, 'replay'); + }, + loadHistoryTask: async (taskId: string, question: string) => { + const { create, setHasMessages, addMessages, startTask, setActiveTaskId } = + get(); + + create(taskId, 'replay'); + setHasMessages(taskId, true); + addMessages(taskId, { + id: generateUniqueId(), + role: 'user', + content: question.split('|')[0], + }); + + await startTask(taskId, 'replay', undefined, 0); + setActiveTaskId(taskId); }, setUpdateCount() { set((state) => ({ diff --git a/src/store/projectStore.ts b/src/store/projectStore.ts index a5b4ff24..3741b48a 100644 --- a/src/store/projectStore.ts +++ b/src/store/projectStore.ts @@ -679,7 +679,7 @@ const projectStore = create()((set, get) => ({ const chatStore = project.chatStores[chatId]; if (chatStore) { try { - await chatStore.getState().replay(taskId, question, 0); + await chatStore.getState().loadHistoryTask(taskId, question); console.log(`[ProjectStore] Loaded task ${taskId}`); } catch (error) { console.error( diff --git a/src/types/history.d.ts b/src/types/history.d.ts index 82242ed8..fb440783 100644 --- a/src/types/history.d.ts +++ b/src/types/history.d.ts @@ -15,7 +15,7 @@ // History API types for project-grouped structure export interface HistoryTask { - id: number; + id: number | string; task_id: string; project_id: string; question: string; @@ -33,6 +33,8 @@ export interface HistoryTask { status: number; created_at?: string; updated_at?: string; + /** Event sync status: local-only, synced to cloud, or pending sync */ + sync_status?: 'local' | 'synced' | 'pending'; } export interface ProjectGroup { @@ -48,6 +50,8 @@ export interface ProjectGroup { total_completed_tasks: number; total_ongoing_tasks: number; average_tokens_per_task: number; + /** Aggregate sync status: local-only, fully synced, or partial */ + sync_status?: 'local' | 'synced' | 'partial'; } export interface GroupedHistoryResponse {