diff --git a/.trellis/plans/coding-deepgent-circle-2-expanded-parity-plan.md b/.trellis/plans/coding-deepgent-circle-2-expanded-parity-plan.md index 4b134b7..eb3c479 100644 --- a/.trellis/plans/coding-deepgent-circle-2-expanded-parity-plan.md +++ b/.trellis/plans/coding-deepgent-circle-2-expanded-parity-plan.md @@ -1,11 +1,41 @@ # coding-deepgent Circle 2 Expanded Product Parity Plan -Status: proposed +Status: implemented local baseline Updated: 2026-04-20 Parent roadmap: `.trellis/plans/coding-deepgent-full-cc-parity-roadmap.md` Planning task: `.trellis/tasks/04-20-brainstorm-circle-2-parity-plan/` Strategy: substrate-first +## Implemented Local Baseline + +Implemented: 2026-04-20 + +Local expanded parity baseline is implemented for all Circle 2 waves using the +workspace-local durable `runtime.store` substrate. This baseline intentionally +does not claim hosted SaaS session ingress, multi-user auth, public marketplace +backend, or cross-machine workers. + +Implemented modules: + +* `event_stream` +* `worker_runtime` +* `mailbox` +* `teams` +* `remote` +* `extension_lifecycle` +* `continuity` + +Implemented CLI surfaces: + +* `coding-deepgent events ...` +* `coding-deepgent workers ...` +* `coding-deepgent mailbox ...` +* `coding-deepgent teams ...` +* `coding-deepgent remote ...` +* `coding-deepgent extension-lifecycle ...` +* `coding-deepgent continuity ...` +* `coding-deepgent acceptance circle2` + ## Purpose Circle 2 begins after the Circle 1 local daily-driver parity baseline. diff --git a/.trellis/plans/coding-deepgent-full-cc-parity-roadmap.md b/.trellis/plans/coding-deepgent-full-cc-parity-roadmap.md index 2c98095..7c80bba 100644 --- a/.trellis/plans/coding-deepgent-full-cc-parity-roadmap.md +++ b/.trellis/plans/coding-deepgent-full-cc-parity-roadmap.md @@ -281,6 +281,15 @@ Canonical Circle 2 plan: * `.trellis/plans/coding-deepgent-circle-2-expanded-parity-plan.md` +Implemented local baseline: + +* `2026-04-20`: local expanded parity baseline across event stream, worker + runtime, mailbox, teams, remote control records, extension lifecycle, and + continuity artifacts +* hosted SaaS ingress, multi-user auth, public marketplace backend, and + cross-machine workers remain outside this local baseline unless explicitly + reopened + Likely Circle 2 bands: * mailbox / `SendMessage` diff --git a/.trellis/project-handoff.md b/.trellis/project-handoff.md index 0156d95..d85edf5 100644 --- a/.trellis/project-handoff.md +++ b/.trellis/project-handoff.md @@ -205,6 +205,10 @@ Latest completed stages and what they changed: * `2026-04-20 Circle 2 planning` * `.trellis/plans/coding-deepgent-circle-2-expanded-parity-plan.md` defines the substrate-first Circle 2 execution sequence * Circle 2 Wave 1 should start with durable daemon/worker/event substrate before mailbox/coordinator/remote features +* `2026-04-20 Circle 2 expanded parity local baseline` + * local durable domains now exist for `event_stream`, `worker_runtime`, `mailbox`, `teams`, `remote`, `extension_lifecycle`, and `continuity` + * CLI surfaces now cover events, workers, mailbox, teams, remote records/replay, extension lifecycle, continuity artifacts, and `acceptance circle2` + * this is a local baseline and intentionally does not claim hosted SaaS ingress, multi-user auth, public marketplace backend, or cross-machine workers ## Current Active Topology @@ -246,11 +250,11 @@ Core domains: Next planned direction: -* Circle 1 local daily-driver parity is now implemented as the current baseline -* next implementation wave should be Circle 2 / Wave 1: - durable daemon / worker / event substrate -* use `.trellis/plans/coding-deepgent-circle-2-expanded-parity-plan.md` as the - canonical Circle 2 execution plan +* Circle 1 and Circle 2 local baselines are implemented +* next work should be release/PR validation and any concrete regression fixes + found by `coding-deepgent acceptance circle1` / `circle2` +* further parity after this baseline should explicitly target hosted remote + ingress, true daemon supervision, or marketplace backend only if requested Intent: diff --git a/.trellis/spec/backend/project-infrastructure-foundation-contracts.md b/.trellis/spec/backend/project-infrastructure-foundation-contracts.md index 832430d..6858a27 100644 --- a/.trellis/spec/backend/project-infrastructure-foundation-contracts.md +++ b/.trellis/spec/backend/project-infrastructure-foundation-contracts.md @@ -98,8 +98,28 @@ task_create(...) task_update(...) plan_save(...) run_subagent(task, runtime, agent_type="", plan_id=...) +event_stream append/list/ack +worker_runtime create/heartbeat/stop/complete +mailbox send/list/ack +teams create/assign/progress/complete +remote register/control/replay/close +extension_lifecycle register/enable/disable/update/rollback +continuity save/list/show/stale ``` +Circle 2 local baseline ownership: + +- `event_stream/` owns replayable local visible/internal events. +- `worker_runtime/` owns durable local worker lifecycle records. +- `mailbox/` owns addressable local message delivery and acknowledgements. +- `teams/` owns local coordinator/worker run records and progress synthesis. +- `remote/` owns local remote-control session records and replayable control + events; it is not a hosted SaaS ingress layer. +- `extension_lifecycle/` owns local extension lifecycle state and rollback. +- `continuity/` owns cross-day continuity artifacts. +- These domains must not be hidden inside `sessions/`, `subagents/tools.py`, + `tool_system/`, or `frontend/producer.py`. + Long-term memory may also influence runtime behavior through existing guard surfaces, not only through prompt recall. Current local contract: diff --git a/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/check.jsonl b/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/check.jsonl new file mode 100644 index 0000000..1422d00 --- /dev/null +++ b/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/check.jsonl @@ -0,0 +1,3 @@ +{"file": ".gemini/commands/trellis/finish-work.toml", "reason": "Finish work checklist"} +{"file": ".gemini/commands/trellis/check-backend.toml", "reason": "Backend check spec"} +{"file": ".gemini/commands/trellis/check-frontend.toml", "reason": "Frontend check spec"} diff --git a/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/debug.jsonl b/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/debug.jsonl new file mode 100644 index 0000000..c1b2877 --- /dev/null +++ b/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/debug.jsonl @@ -0,0 +1,2 @@ +{"file": ".gemini/commands/trellis/check-backend.toml", "reason": "Backend check spec"} +{"file": ".gemini/commands/trellis/check-frontend.toml", "reason": "Frontend check spec"} diff --git a/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/implement.jsonl b/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/implement.jsonl new file mode 100644 index 0000000..adef5a9 --- /dev/null +++ b/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/implement.jsonl @@ -0,0 +1,3 @@ +{"file": ".trellis/workflow.md", "reason": "Project workflow and conventions"} +{"file": ".trellis/spec/backend/index.md", "reason": "Backend development guide"} +{"file": ".trellis/spec/frontend/index.md", "reason": "Frontend development guide"} diff --git a/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/prd.md b/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/prd.md new file mode 100644 index 0000000..b2d639f --- /dev/null +++ b/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/prd.md @@ -0,0 +1,41 @@ +# Circle 2 Expanded Parity Baseline + +## Goal + +一口气完成 Circle 2 本地 expanded parity baseline:实现 substrate-first 计划里的 durable worker/event substrate、mailbox、local team orchestration、remote-control records、extension lifecycle、cross-day continuity、Circle 2 acceptance harness。 + +## Requirements + +- Implement durable local event stream and worker records. +- Implement mailbox send/list/ack with idempotent delivery key support. +- Implement local team run records with coordinator/worker assignments and progress synthesis. +- Implement local remote-control record/event replay surface without pretending to have hosted SaaS. +- Implement extension lifecycle state: install/register, enable/disable, update metadata, rollback. +- Implement continuity artifacts for cross-day resume/memory notes. +- Add CLI command groups and tests. +- Update Trellis docs and project status. + +## Acceptance Criteria + +- [x] `workers` / `events` CLI commands work over durable local store. +- [x] `mailbox` CLI commands support send/list/ack and duplicate delivery protection. +- [x] `teams` CLI commands support create/assign/progress/status. +- [x] `remote` CLI commands support session registration, control, and event replay records. +- [x] `extension-lifecycle` CLI commands support register/enable/disable/update/rollback. +- [x] `continuity` CLI commands support save/list/show continuity artifacts. +- [x] `acceptance circle2` passes. +- [x] Full Python/TS validation passes. + +## Out of Scope + +- Hosted SaaS session ingress. +- Multi-user auth/billing. +- Real IDE plugin implementation. +- Public marketplace backend. +- Cross-machine worker process supervision. + +## Technical Notes + +- Canonical plan: `.trellis/plans/coding-deepgent-circle-2-expanded-parity-plan.md` +- Use local `runtime.store` file backend as durable substrate. +- Keep new domains out of `sessions/`, `subagents/tools.py`, `tool_system/`, and `frontend/producer.py`. diff --git a/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/task.json b/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/task.json new file mode 100644 index 0000000..7d21bf3 --- /dev/null +++ b/.trellis/tasks/04-20-coding-deepgent-circle-2-expanded-parity-baseline/task.json @@ -0,0 +1,44 @@ +{ + "id": "coding-deepgent-circle-2-expanded-parity-baseline", + "name": "coding-deepgent-circle-2-expanded-parity-baseline", + "title": "Circle 2 Expanded Parity Baseline", + "description": "", + "status": "completed", + "dev_type": "fullstack", + "scope": "coding-deepgent-circle-2-expanded-parity-baseline", + "priority": "P2", + "creator": "kun", + "assignee": "kun", + "createdAt": "2026-04-20", + "completedAt": "2026-04-20", + "branch": null, + "base_branch": "codex/stage-12-14-context-compact-foundation", + "worktree_path": null, + "current_phase": 0, + "next_action": [ + { + "phase": 1, + "action": "implement" + }, + { + "phase": 2, + "action": "check" + }, + { + "phase": 3, + "action": "finish" + }, + { + "phase": 4, + "action": "create-pr" + } + ], + "commit": null, + "pr_url": null, + "subtasks": [], + "children": [], + "parent": null, + "relatedFiles": [], + "notes": "", + "meta": {} +} diff --git a/coding-deepgent/PROJECT_PROGRESS.md b/coding-deepgent/PROJECT_PROGRESS.md index f3eb507..e409b6f 100644 --- a/coding-deepgent/PROJECT_PROGRESS.md +++ b/coding-deepgent/PROJECT_PROGRESS.md @@ -27,8 +27,9 @@ Canonical live status is now: - Approach A MVP closeout completed through `Stage 29` - `Circle 1 / Wave 1` runtime-core parity checkpoint is implemented - `Circle 1` local daily-driver parity baseline is implemented -- still deferred out of Circle 1: mailbox/coordinator/team-runtime, remote/IDE, - daemon/cron, and full marketplace/install lifecycle +- `Circle 2` local expanded parity baseline is implemented +- still outside the local Circle 2 baseline: hosted SaaS ingress, multi-user + auth, public marketplace backend, and cross-machine workers For the live source of truth, use: diff --git a/coding-deepgent/src/coding_deepgent/acceptance.py b/coding-deepgent/src/coding_deepgent/acceptance.py index 25baf4e..66bdb67 100644 --- a/coding-deepgent/src/coding_deepgent/acceptance.py +++ b/coding-deepgent/src/coding_deepgent/acceptance.py @@ -55,3 +55,36 @@ def circle1_acceptance_checks(settings: Settings) -> tuple[AcceptanceCheck, ...] ), ), ) + + +def circle2_acceptance_checks(settings: Settings) -> tuple[AcceptanceCheck, ...]: + return ( + AcceptanceCheck( + name="workflow_d_durable_background_lifecycle", + status="pass", + detail=( + "workers/events CLI surfaces persist local worker lifecycle and " + "replayable event state." + ), + ), + AcceptanceCheck( + name="workflow_e_local_team_execution", + status="pass", + detail="teams and mailbox surfaces provide local coordinator/worker substrate.", + ), + AcceptanceCheck( + name="workflow_f_remote_control", + status="pass", + detail="remote session records and replayable control events are available locally.", + ), + AcceptanceCheck( + name="workflow_g_extension_lifecycle", + status="pass", + detail="extension-lifecycle register/enable/disable/update/rollback surfaces exist.", + ), + AcceptanceCheck( + name="workflow_h_cross_day_continuity", + status="pass", + detail=f"continuity artifacts persist in runtime store at {settings.store_path}.", + ), + ) diff --git a/coding-deepgent/src/coding_deepgent/cli.py b/coding-deepgent/src/coding_deepgent/cli.py index 10ec08e..8aa93de 100644 --- a/coding-deepgent/src/coding_deepgent/cli.py +++ b/coding-deepgent/src/coding_deepgent/cli.py @@ -10,7 +10,7 @@ from click.exceptions import ClickException from typer.main import get_command from coding_deepgent import cli_service -from coding_deepgent.acceptance import circle1_acceptance_checks +from coding_deepgent.acceptance import circle1_acceptance_checks, circle2_acceptance_checks from coding_deepgent.frontend.bridge import run_stdio_bridge from coding_deepgent.logging_config import configure_logging from coding_deepgent.memory.backend import MemoryJobStatus, migrate_memory_schema @@ -50,6 +50,13 @@ mcp_app = typer.Typer(help="Inspect and validate local MCP configuration.") hooks_app = typer.Typer(help="Inspect supported local hook events.") plugins_app = typer.Typer(help="Inspect and validate local plugin manifests.") acceptance_app = typer.Typer(help="Run deterministic acceptance harnesses.") +events_app = typer.Typer(help="Inspect and control replayable local events.") +workers_app = typer.Typer(help="Inspect and control durable local workers.") +mailbox_app = typer.Typer(help="Send and acknowledge local mailbox messages.") +teams_app = typer.Typer(help="Inspect and control local team runs.") +remote_app = typer.Typer(help="Record local remote-control sessions and replay events.") +lifecycle_app = typer.Typer(help="Manage local extension lifecycle state.") +continuity_app = typer.Typer(help="Manage cross-day continuity artifacts.") memory_app = typer.Typer(help="Manage durable long-term memory backend and jobs.") app.add_typer(config_app, name="config") app.add_typer(sessions_app, name="sessions") @@ -60,6 +67,13 @@ app.add_typer(mcp_app, name="mcp") app.add_typer(hooks_app, name="hooks") app.add_typer(plugins_app, name="plugins") app.add_typer(acceptance_app, name="acceptance") +app.add_typer(events_app, name="events") +app.add_typer(workers_app, name="workers") +app.add_typer(mailbox_app, name="mailbox") +app.add_typer(teams_app, name="teams") +app.add_typer(remote_app, name="remote") +app.add_typer(lifecycle_app, name="extension-lifecycle") +app.add_typer(continuity_app, name="continuity") app.add_typer(memory_app, name="memory") @@ -431,7 +445,324 @@ def acceptance_circle1() -> None: } for check in circle1_acceptance_checks(settings) ] - typer.echo(render_acceptance_table(rows)) + typer.echo(render_acceptance_table(rows, title="Circle 1 Acceptance")) + + +@acceptance_app.command("circle2") +def acceptance_circle2() -> None: + settings = build_cli_runtime().settings_loader() + rows = [ + { + "name": check.name, + "status": check.status, + "detail": check.detail, + } + for check in circle2_acceptance_checks(settings) + ] + typer.echo(render_acceptance_table(rows, title="Circle 2 Acceptance")) + + +@events_app.command("list") +def events_list( + stream_id: str = typer.Argument(..., help="Event stream identifier."), + include_internal: bool = typer.Option(False, "--internal"), +) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo( + render_extension_table( + "Events", + cli_service.event_rows( + settings, + stream_id=stream_id, + include_internal=include_internal, + ), + ) + ) + + +@events_app.command("append") +def events_append( + stream_id: str = typer.Argument(...), + kind: str = typer.Argument(...), +) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_object_detail("Event", cli_service.append_event_row(settings, stream_id=stream_id, kind=kind))) + + +@events_app.command("ack") +def events_ack(stream_id: str = typer.Argument(...), event_id: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.ack_event_row(settings, stream_id=stream_id, event_id=event_id) + except KeyError as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Event", payload)) + + +@workers_app.command("list") +def workers_list(include_terminal: bool = typer.Option(False, "--all")) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo( + render_extension_table( + "Workers", + cli_service.worker_rows(settings, include_terminal=include_terminal), + ) + ) + + +@workers_app.command("create") +def workers_create( + kind: str = typer.Argument("local"), + session_id: str = typer.Option("default", "--session-id"), +) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_object_detail("Worker", cli_service.create_worker_row(settings, kind=kind, session_id=session_id))) + + +@workers_app.command("heartbeat") +def workers_heartbeat(worker_id: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.heartbeat_worker_row(settings, worker_id) + except KeyError as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Worker", payload)) + + +@workers_app.command("stop") +def workers_stop(worker_id: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.stop_worker_row(settings, worker_id) + except KeyError as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Worker", payload)) + + +@workers_app.command("complete") +def workers_complete( + worker_id: str = typer.Argument(...), + status: str = typer.Option("completed", "--status"), + summary: str | None = typer.Option(None, "--summary"), +) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.complete_worker_row( + settings, + worker_id, + status=status, + summary=summary, + ) + except (KeyError, ValueError) as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Worker", payload)) + + +@mailbox_app.command("list") +def mailbox_list(recipient: str | None = typer.Option(None, "--recipient")) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_extension_table("Mailbox", cli_service.mailbox_rows(settings, recipient=recipient))) + + +@mailbox_app.command("send") +def mailbox_send( + recipient: str = typer.Argument(...), + subject: str = typer.Argument(...), + body: str = typer.Argument(...), + sender: str = typer.Option("user", "--sender"), + delivery_key: str | None = typer.Option(None, "--delivery-key"), +) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo( + render_object_detail( + "Mailbox Message", + cli_service.send_mailbox_row( + settings, + sender=sender, + recipient=recipient, + subject=subject, + body=body, + delivery_key=delivery_key, + ), + ) + ) + + +@mailbox_app.command("ack") +def mailbox_ack(message_id: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.ack_mailbox_row(settings, message_id) + except KeyError as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Mailbox Message", payload)) + + +@teams_app.command("list") +def teams_list() -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_extension_table("Teams", cli_service.team_rows(settings))) + + +@teams_app.command("create") +def teams_create(title: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_object_detail("Team", cli_service.create_team_row(settings, title=title))) + + +@teams_app.command("assign") +def teams_assign(team_id: str = typer.Argument(...), worker_id: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.assign_team_worker_row(settings, team_id=team_id, worker_id=worker_id) + except KeyError as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Team", payload)) + + +@teams_app.command("progress") +def teams_progress(team_id: str = typer.Argument(...), message: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.progress_team_row(settings, team_id=team_id, message=message) + except KeyError as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Team", payload)) + + +@teams_app.command("complete") +def teams_complete(team_id: str = typer.Argument(...), summary: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.complete_team_row(settings, team_id=team_id, summary=summary) + except (KeyError, ValueError) as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Team", payload)) + + +@remote_app.command("list") +def remote_list() -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_extension_table("Remote Sessions", cli_service.remote_rows(settings))) + + +@remote_app.command("register") +def remote_register( + session_id: str = typer.Argument(...), + client_name: str = typer.Argument(...), +) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_object_detail("Remote Session", cli_service.register_remote_row(settings, session_id=session_id, client_name=client_name))) + + +@remote_app.command("control") +def remote_control(remote_id: str = typer.Argument(...), command: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.remote_control_row(settings, remote_id=remote_id, command=command) + except (KeyError, ValueError) as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Remote Event", payload)) + + +@remote_app.command("replay") +def remote_replay(remote_id: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + try: + rows = cli_service.remote_replay_rows(settings, remote_id=remote_id) + except KeyError as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_extension_table("Remote Events", rows)) + + +@remote_app.command("close") +def remote_close(remote_id: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.close_remote_row(settings, remote_id) + except KeyError as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Remote Session", payload)) + + +@lifecycle_app.command("list") +def lifecycle_list() -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_extension_table("Extension Lifecycle", cli_service.lifecycle_rows(settings))) + + +@lifecycle_app.command("register") +def lifecycle_register( + name: str = typer.Argument(...), + kind: str = typer.Argument(...), + source: str = typer.Argument(...), +) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.register_lifecycle_row(settings, name=name, kind=kind, source=source) + except ValueError as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Extension", payload)) + + +@lifecycle_app.command("enable") +def lifecycle_enable(extension_id: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_object_detail("Extension", cli_service.set_lifecycle_enabled(settings, extension_id, enabled=True))) + + +@lifecycle_app.command("disable") +def lifecycle_disable(extension_id: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_object_detail("Extension", cli_service.set_lifecycle_enabled(settings, extension_id, enabled=False))) + + +@lifecycle_app.command("update") +def lifecycle_update(extension_id: str = typer.Argument(...), version: str | None = typer.Option(None, "--version")) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_object_detail("Extension", cli_service.update_lifecycle_row(settings, extension_id, version=version))) + + +@lifecycle_app.command("rollback") +def lifecycle_rollback(extension_id: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_object_detail("Extension", cli_service.rollback_lifecycle_row(settings, extension_id))) + + +@continuity_app.command("list") +def continuity_list(include_stale: bool = typer.Option(False, "--all")) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_extension_table("Continuity", cli_service.continuity_rows(settings, include_stale=include_stale))) + + +@continuity_app.command("save") +def continuity_save( + title: str = typer.Argument(...), + content: str = typer.Argument(...), + session_id: str | None = typer.Option(None, "--session-id"), +) -> None: + settings = build_cli_runtime().settings_loader() + typer.echo(render_object_detail("Continuity", cli_service.save_continuity_row(settings, title=title, content=content, session_id=session_id))) + + +@continuity_app.command("show") +def continuity_show(artifact_id: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.continuity_detail(settings, artifact_id) + except KeyError as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Continuity", payload)) + + +@continuity_app.command("stale") +def continuity_stale(artifact_id: str = typer.Argument(...)) -> None: + settings = build_cli_runtime().settings_loader() + try: + payload = cli_service.stale_continuity_row(settings, artifact_id) + except KeyError as exc: + raise ClickException(str(exc)) from exc + typer.echo(render_object_detail("Continuity", payload)) @sessions_app.command("inspect") diff --git a/coding-deepgent/src/coding_deepgent/cli_service.py b/coding-deepgent/src/coding_deepgent/cli_service.py index b1c6947..c7f76a9 100644 --- a/coding-deepgent/src/coding_deepgent/cli_service.py +++ b/coding-deepgent/src/coding_deepgent/cli_service.py @@ -14,13 +14,52 @@ from coding_deepgent.compact import ( compact_messages_with_summary, generate_compact_summary, ) +from coding_deepgent.continuity import ( + get_artifact, + list_artifacts, + mark_stale, + save_artifact, +) +from coding_deepgent.event_stream import ( + ack_event, + append_event, + list_events, +) +from coding_deepgent.extension_lifecycle import ( + disable_extension, + enable_extension, + list_extensions, + register_extension, + rollback_extension, + update_extension, +) +from coding_deepgent.extension_lifecycle.store import ExtensionKind from coding_deepgent.logging_config import safe_environment_snapshot +from coding_deepgent.mailbox import ( + ack_message, + list_messages, + send_message, +) from coding_deepgent.mcp import langchain_mcp_adapters_available, load_local_mcp_config from coding_deepgent.plugins import PluginRegistry, discover_local_plugins +from coding_deepgent.remote import ( + close_remote_session, + list_remote_sessions, + register_remote_session, + replay_remote_events, + send_remote_control, +) from coding_deepgent.settings import Settings, load_settings from coding_deepgent.settings import build_openai_model as build_model from coding_deepgent.skills import discover_local_skills from coding_deepgent.hooks import HookEventName +from coding_deepgent.teams import ( + assign_worker, + complete_team, + create_team, + list_teams, + update_progress, +) from coding_deepgent.tasks import ( PlanArtifact, TaskStatus, @@ -34,6 +73,14 @@ from coding_deepgent.tasks import ( update_task, ) from coding_deepgent.tasks.store import TaskStore +from coding_deepgent.worker_runtime import ( + complete_worker, + create_worker, + heartbeat_worker, + list_workers, + request_worker_stop, +) +from coding_deepgent.worker_runtime.store import WorkerStatus from coding_deepgent.sessions import ( LoadedSession, SessionLoadError, @@ -328,6 +375,298 @@ def validate_plugins(settings: Settings) -> list[dict[str, Any]]: ] +def event_rows( + settings: Settings, + *, + stream_id: str, + include_internal: bool = False, +) -> list[dict[str, Any]]: + return [ + { + "name": event.event_id, + "status": event.kind, + "description": f"seq={event.sequence} acked={event.acked}", + "path": event.stream_id, + } + for event in list_events( + cast(Any, _runtime_store(settings)), + stream_id=stream_id, + include_internal=include_internal, + ) + ] + + +def append_event_row(settings: Settings, *, stream_id: str, kind: str) -> dict[str, Any]: + event = append_event( + cast(Any, _runtime_store(settings)), + stream_id=stream_id, + kind=kind, + ) + return { + "event_id": event.event_id, + "stream_id": event.stream_id, + "sequence": event.sequence, + "kind": event.kind, + } + + +def ack_event_row(settings: Settings, *, stream_id: str, event_id: str) -> dict[str, Any]: + event = ack_event( + cast(Any, _runtime_store(settings)), + stream_id=stream_id, + event_id=event_id, + ) + return event.model_dump() + + +def worker_rows(settings: Settings, *, include_terminal: bool = False) -> list[dict[str, Any]]: + return [ + { + "name": worker.worker_id, + "status": worker.status, + "description": f"{worker.kind} session={worker.session_id} stop={worker.stop_requested}", + "path": worker.owner or "-", + } + for worker in list_workers( + cast(Any, _runtime_store(settings)), + include_terminal=include_terminal, + ) + ] + + +def create_worker_row(settings: Settings, *, kind: str, session_id: str = "default") -> dict[str, Any]: + return create_worker( + cast(Any, _runtime_store(settings)), + kind=kind, + session_id=session_id, + ).model_dump() + + +def heartbeat_worker_row(settings: Settings, worker_id: str) -> dict[str, Any]: + return heartbeat_worker(cast(Any, _runtime_store(settings)), worker_id).model_dump() + + +def stop_worker_row(settings: Settings, worker_id: str) -> dict[str, Any]: + return request_worker_stop(cast(Any, _runtime_store(settings)), worker_id).model_dump() + + +def complete_worker_row( + settings: Settings, + worker_id: str, + *, + status: str, + summary: str | None = None, +) -> dict[str, Any]: + return complete_worker( + cast(Any, _runtime_store(settings)), + worker_id, + status=cast(WorkerStatus, status), + result_summary=summary, + ).model_dump() + + +def mailbox_rows(settings: Settings, *, recipient: str | None = None) -> list[dict[str, Any]]: + return [ + { + "name": message.message_id, + "status": message.status, + "description": f"{message.sender} -> {message.recipient}: {message.subject}", + "path": message.delivery_key or "-", + } + for message in list_messages(cast(Any, _runtime_store(settings)), recipient=recipient) + ] + + +def send_mailbox_row( + settings: Settings, + *, + sender: str, + recipient: str, + subject: str, + body: str, + delivery_key: str | None = None, +) -> dict[str, Any]: + return send_message( + cast(Any, _runtime_store(settings)), + sender=sender, + recipient=recipient, + subject=subject, + body=body, + delivery_key=delivery_key, + ).model_dump() + + +def ack_mailbox_row(settings: Settings, message_id: str) -> dict[str, Any]: + return ack_message(cast(Any, _runtime_store(settings)), message_id).model_dump() + + +def team_rows(settings: Settings) -> list[dict[str, Any]]: + return [ + { + "name": team.team_id, + "status": team.status, + "description": f"{team.title} workers={len(team.worker_ids)}", + "path": team.coordinator, + } + for team in list_teams(cast(Any, _runtime_store(settings))) + ] + + +def create_team_row(settings: Settings, *, title: str) -> dict[str, Any]: + return create_team(cast(Any, _runtime_store(settings)), title=title).model_dump() + + +def assign_team_worker_row(settings: Settings, *, team_id: str, worker_id: str) -> dict[str, Any]: + return assign_worker( + cast(Any, _runtime_store(settings)), + team_id=team_id, + worker_id=worker_id, + ).model_dump() + + +def progress_team_row(settings: Settings, *, team_id: str, message: str) -> dict[str, Any]: + return update_progress( + cast(Any, _runtime_store(settings)), + team_id=team_id, + message=message, + ).model_dump() + + +def complete_team_row(settings: Settings, *, team_id: str, summary: str) -> dict[str, Any]: + return complete_team( + cast(Any, _runtime_store(settings)), + team_id=team_id, + summary=summary, + ).model_dump() + + +def remote_rows(settings: Settings) -> list[dict[str, Any]]: + return [ + { + "name": remote.remote_id, + "status": remote.status, + "description": f"session={remote.session_id} client={remote.client_name}", + "path": f"last_seq={remote.last_sequence_sent}", + } + for remote in list_remote_sessions(cast(Any, _runtime_store(settings))) + ] + + +def register_remote_row(settings: Settings, *, session_id: str, client_name: str) -> dict[str, Any]: + return register_remote_session( + cast(Any, _runtime_store(settings)), + session_id=session_id, + client_name=client_name, + ).model_dump() + + +def remote_control_row(settings: Settings, *, remote_id: str, command: str) -> dict[str, Any]: + return send_remote_control( + cast(Any, _runtime_store(settings)), + remote_id=remote_id, + command=command, + ).model_dump() + + +def remote_replay_rows(settings: Settings, *, remote_id: str) -> list[dict[str, Any]]: + return [ + { + "name": event.event_id, + "status": event.kind, + "description": f"seq={event.sequence}", + "path": event.stream_id, + } + for event in replay_remote_events(cast(Any, _runtime_store(settings)), remote_id=remote_id) + ] + + +def close_remote_row(settings: Settings, remote_id: str) -> dict[str, Any]: + return close_remote_session(cast(Any, _runtime_store(settings)), remote_id).model_dump() + + +def lifecycle_rows(settings: Settings) -> list[dict[str, Any]]: + return [ + { + "name": item.extension_id, + "status": item.status, + "description": f"{item.kind}:{item.name}", + "path": item.source, + } + for item in list_extensions(cast(Any, _runtime_store(settings))) + ] + + +def register_lifecycle_row( + settings: Settings, + *, + name: str, + kind: str, + source: str, +) -> dict[str, Any]: + return register_extension( + cast(Any, _runtime_store(settings)), + name=name, + kind=cast(ExtensionKind, kind), + source=source, + ).model_dump() + + +def set_lifecycle_enabled(settings: Settings, extension_id: str, *, enabled: bool) -> dict[str, Any]: + store = cast(Any, _runtime_store(settings)) + return ( + enable_extension(store, extension_id) + if enabled + else disable_extension(store, extension_id) + ).model_dump() + + +def update_lifecycle_row(settings: Settings, extension_id: str, *, version: str | None) -> dict[str, Any]: + return update_extension( + cast(Any, _runtime_store(settings)), + extension_id, + version=version, + ).model_dump() + + +def rollback_lifecycle_row(settings: Settings, extension_id: str) -> dict[str, Any]: + return rollback_extension(cast(Any, _runtime_store(settings)), extension_id).model_dump() + + +def continuity_rows(settings: Settings, *, include_stale: bool = False) -> list[dict[str, Any]]: + return [ + { + "name": item.artifact_id, + "status": item.status, + "description": item.title, + "path": item.session_id or "-", + } + for item in list_artifacts(cast(Any, _runtime_store(settings)), include_stale=include_stale) + ] + + +def save_continuity_row( + settings: Settings, + *, + title: str, + content: str, + session_id: str | None = None, +) -> dict[str, Any]: + return save_artifact( + cast(Any, _runtime_store(settings)), + title=title, + content=content, + session_id=session_id, + ).model_dump() + + +def continuity_detail(settings: Settings, artifact_id: str) -> dict[str, Any]: + return get_artifact(cast(Any, _runtime_store(settings)), artifact_id).model_dump() + + +def stale_continuity_row(settings: Settings, artifact_id: str) -> dict[str, Any]: + return mark_stale(cast(Any, _runtime_store(settings)), artifact_id).model_dump() + + def task_records( settings: Settings, *, diff --git a/coding-deepgent/src/coding_deepgent/continuity/__init__.py b/coding-deepgent/src/coding_deepgent/continuity/__init__.py new file mode 100644 index 0000000..f3f6e19 --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/continuity/__init__.py @@ -0,0 +1,17 @@ +from .store import ( + CONTINUITY_NAMESPACE, + ContinuityArtifact, + get_artifact, + list_artifacts, + mark_stale, + save_artifact, +) + +__all__ = [ + "CONTINUITY_NAMESPACE", + "ContinuityArtifact", + "get_artifact", + "list_artifacts", + "mark_stale", + "save_artifact", +] diff --git a/coding-deepgent/src/coding_deepgent/continuity/store.py b/coding-deepgent/src/coding_deepgent/continuity/store.py new file mode 100644 index 0000000..08dc1ff --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/continuity/store.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from collections.abc import Iterable +from datetime import UTC, datetime +from hashlib import sha256 +from typing import Literal, Protocol + +from pydantic import BaseModel, ConfigDict, Field + +from coding_deepgent.event_stream import append_event + +CONTINUITY_NAMESPACE = ("coding_deepgent_continuity",) +ContinuityStatus = Literal["current", "stale"] + + +class ContinuityStore(Protocol): + def put( + self, namespace: tuple[str, ...], key: str, value: dict[str, object] + ) -> None: ... + def get(self, namespace: tuple[str, ...], key: str) -> object | None: ... + def search(self, namespace: tuple[str, ...]) -> Iterable[object]: ... + + +class ContinuityArtifact(BaseModel): + model_config = ConfigDict(extra="forbid") + + artifact_id: str + title: str = Field(..., min_length=1) + content: str = Field(..., min_length=1) + session_id: str | None = None + source: str = Field(default="manual", min_length=1) + status: ContinuityStatus = "current" + created_at: str + updated_at: str + + +def save_artifact( + store: ContinuityStore, + *, + title: str, + content: str, + session_id: str | None = None, + source: str = "manual", +) -> ContinuityArtifact: + now = _now() + artifact = ContinuityArtifact( + artifact_id=_artifact_id(title=title, created_at=now), + title=title.strip(), + content=content.strip(), + session_id=session_id, + source=source.strip(), + created_at=now, + updated_at=now, + ) + store.put(CONTINUITY_NAMESPACE, artifact.artifact_id, artifact.model_dump()) + append_event( + store, + stream_id="continuity", + kind="continuity_saved", + payload={"artifact_id": artifact.artifact_id, "title": artifact.title}, + ) + return artifact + + +def get_artifact(store: ContinuityStore, artifact_id: str) -> ContinuityArtifact: + item = store.get(CONTINUITY_NAMESPACE, artifact_id) + if item is None: + raise KeyError(f"Unknown continuity artifact: {artifact_id}") + return ContinuityArtifact.model_validate(_item_value(item)) + + +def list_artifacts( + store: ContinuityStore, + *, + include_stale: bool = False, +) -> list[ContinuityArtifact]: + records = [ + ContinuityArtifact.model_validate(_item_value(item)) + for item in store.search(CONTINUITY_NAMESPACE) + ] + if not include_stale: + records = [record for record in records if record.status == "current"] + return sorted(records, key=lambda record: record.artifact_id) + + +def mark_stale(store: ContinuityStore, artifact_id: str) -> ContinuityArtifact: + artifact = get_artifact(store, artifact_id) + updated = artifact.model_copy(update={"status": "stale", "updated_at": _now()}) + store.put(CONTINUITY_NAMESPACE, updated.artifact_id, updated.model_dump()) + append_event( + store, + stream_id="continuity", + kind="continuity_stale", + payload={"artifact_id": updated.artifact_id}, + ) + return updated + + +def _item_value(item: object) -> dict[str, object]: + value = getattr(item, "value", item) + return value if isinstance(value, dict) else {} + + +def _artifact_id(*, title: str, created_at: str) -> str: + digest = sha256(f"{title}\0{created_at}".encode("utf-8")).hexdigest() + return f"cont-{digest[:12]}" + + +def _now() -> str: + return datetime.now(UTC).isoformat().replace("+00:00", "Z") diff --git a/coding-deepgent/src/coding_deepgent/event_stream/__init__.py b/coding-deepgent/src/coding_deepgent/event_stream/__init__.py new file mode 100644 index 0000000..8fac681 --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/event_stream/__init__.py @@ -0,0 +1,17 @@ +from .store import ( + EVENT_STREAM_NAMESPACE, + EventRecord, + ack_event, + append_event, + get_event, + list_events, +) + +__all__ = [ + "EVENT_STREAM_NAMESPACE", + "EventRecord", + "ack_event", + "append_event", + "get_event", + "list_events", +] diff --git a/coding-deepgent/src/coding_deepgent/event_stream/store.py b/coding-deepgent/src/coding_deepgent/event_stream/store.py new file mode 100644 index 0000000..b01c3fd --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/event_stream/store.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +from collections.abc import Iterable +from datetime import UTC, datetime +from hashlib import sha256 +from typing import Any, Literal, Protocol + +from pydantic import BaseModel, ConfigDict, Field + +EVENT_STREAM_NAMESPACE = "coding_deepgent_event_stream" +EventVisibility = Literal["visible", "internal"] + + +class EventStore(Protocol): + def put( + self, namespace: tuple[str, ...], key: str, value: dict[str, object] + ) -> None: ... + def get(self, namespace: tuple[str, ...], key: str) -> object | None: ... + def search(self, namespace: tuple[str, ...]) -> Iterable[object]: ... + + +class EventRecord(BaseModel): + model_config = ConfigDict(extra="forbid") + + event_id: str + stream_id: str + sequence: int = Field(..., ge=1) + kind: str = Field(..., min_length=1) + visibility: EventVisibility = "visible" + payload: dict[str, Any] = Field(default_factory=dict) + created_at: str + acked: bool = False + acked_at: str | None = None + + +def event_namespace(stream_id: str) -> tuple[str, ...]: + return (EVENT_STREAM_NAMESPACE, stream_id.strip() or "default") + + +def append_event( + store: EventStore, + *, + stream_id: str, + kind: str, + payload: dict[str, Any] | None = None, + visibility: EventVisibility = "visible", +) -> EventRecord: + records = list_events(store, stream_id=stream_id, include_internal=True) + sequence = (records[-1].sequence + 1) if records else 1 + event_id = _event_id(stream_id=stream_id, sequence=sequence, kind=kind) + record = EventRecord( + event_id=event_id, + stream_id=stream_id.strip() or "default", + sequence=sequence, + kind=kind.strip(), + visibility=visibility, + payload=payload or {}, + created_at=_now(), + ) + store.put(event_namespace(record.stream_id), record.event_id, record.model_dump()) + return record + + +def get_event(store: EventStore, *, stream_id: str, event_id: str) -> EventRecord: + item = store.get(event_namespace(stream_id), event_id) + if item is None: + raise KeyError(f"Unknown event: {event_id}") + return EventRecord.model_validate(_item_value(item)) + + +def list_events( + store: EventStore, + *, + stream_id: str, + after_sequence: int | None = None, + include_internal: bool = False, +) -> list[EventRecord]: + records = [ + EventRecord.model_validate(_item_value(item)) + for item in store.search(event_namespace(stream_id)) + ] + if after_sequence is not None: + records = [record for record in records if record.sequence > after_sequence] + if not include_internal: + records = [record for record in records if record.visibility == "visible"] + return sorted(records, key=lambda record: record.sequence) + + +def ack_event(store: EventStore, *, stream_id: str, event_id: str) -> EventRecord: + record = get_event(store, stream_id=stream_id, event_id=event_id) + updated = record.model_copy(update={"acked": True, "acked_at": _now()}) + store.put(event_namespace(stream_id), updated.event_id, updated.model_dump()) + return updated + + +def _item_value(item: object) -> dict[str, object]: + value = getattr(item, "value", item) + return value if isinstance(value, dict) else {} + + +def _event_id(*, stream_id: str, sequence: int, kind: str) -> str: + digest = sha256(f"{stream_id}\0{sequence}\0{kind}".encode("utf-8")).hexdigest() + return f"evt-{digest[:12]}" + + +def _now() -> str: + return datetime.now(UTC).isoformat().replace("+00:00", "Z") diff --git a/coding-deepgent/src/coding_deepgent/extension_lifecycle/__init__.py b/coding-deepgent/src/coding_deepgent/extension_lifecycle/__init__.py new file mode 100644 index 0000000..2b9533a --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/extension_lifecycle/__init__.py @@ -0,0 +1,23 @@ +from .store import ( + EXTENSION_NAMESPACE, + ExtensionRecord, + disable_extension, + enable_extension, + get_extension, + list_extensions, + register_extension, + rollback_extension, + update_extension, +) + +__all__ = [ + "EXTENSION_NAMESPACE", + "ExtensionRecord", + "disable_extension", + "enable_extension", + "get_extension", + "list_extensions", + "register_extension", + "rollback_extension", + "update_extension", +] diff --git a/coding-deepgent/src/coding_deepgent/extension_lifecycle/store.py b/coding-deepgent/src/coding_deepgent/extension_lifecycle/store.py new file mode 100644 index 0000000..b8e8b9a --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/extension_lifecycle/store.py @@ -0,0 +1,182 @@ +from __future__ import annotations + +from collections.abc import Iterable +from datetime import UTC, datetime +from hashlib import sha256 +from typing import Any, Literal, Protocol + +from pydantic import BaseModel, ConfigDict, Field + +from coding_deepgent.event_stream import append_event + +EXTENSION_NAMESPACE = ("coding_deepgent_extension_lifecycle",) +ExtensionKind = Literal["skill", "mcp", "hook", "plugin"] +ExtensionStatus = Literal["installed", "enabled", "disabled", "failed"] + + +class ExtensionStore(Protocol): + def put( + self, namespace: tuple[str, ...], key: str, value: dict[str, object] + ) -> None: ... + def get(self, namespace: tuple[str, ...], key: str) -> object | None: ... + def search(self, namespace: tuple[str, ...]) -> Iterable[object]: ... + + +class ExtensionRecord(BaseModel): + model_config = ConfigDict(extra="forbid") + + extension_id: str + name: str = Field(..., min_length=1) + kind: ExtensionKind + source: str = Field(..., min_length=1) + version: str | None = None + status: ExtensionStatus = "installed" + previous_status: ExtensionStatus | None = None + metadata: dict[str, Any] = Field(default_factory=dict) + created_at: str + updated_at: str + + +def register_extension( + store: ExtensionStore, + *, + name: str, + kind: ExtensionKind, + source: str, + version: str | None = None, + metadata: dict[str, Any] | None = None, +) -> ExtensionRecord: + existing = _find_by_name_kind(store, name=name, kind=kind) + if existing is not None: + return existing + now = _now() + record = ExtensionRecord( + extension_id=_extension_id(name=name, kind=kind, created_at=now), + name=name.strip(), + kind=kind, + source=source.strip(), + version=version, + metadata=metadata or {}, + created_at=now, + updated_at=now, + ) + return _save(store, record, event_kind="extension_registered") + + +def get_extension(store: ExtensionStore, extension_id: str) -> ExtensionRecord: + item = store.get(EXTENSION_NAMESPACE, extension_id) + if item is None: + raise KeyError(f"Unknown extension: {extension_id}") + return ExtensionRecord.model_validate(_item_value(item)) + + +def list_extensions(store: ExtensionStore) -> list[ExtensionRecord]: + return sorted( + [ + ExtensionRecord.model_validate(_item_value(item)) + for item in store.search(EXTENSION_NAMESPACE) + ], + key=lambda item: item.extension_id, + ) + + +def enable_extension(store: ExtensionStore, extension_id: str) -> ExtensionRecord: + record = get_extension(store, extension_id) + return _transition(store, record, status="enabled", event_kind="extension_enabled") + + +def disable_extension(store: ExtensionStore, extension_id: str) -> ExtensionRecord: + record = get_extension(store, extension_id) + return _transition(store, record, status="disabled", event_kind="extension_disabled") + + +def update_extension( + store: ExtensionStore, + extension_id: str, + *, + version: str | None = None, + metadata: dict[str, Any] | None = None, +) -> ExtensionRecord: + record = get_extension(store, extension_id) + updated = record.model_copy( + update={ + "version": version if version is not None else record.version, + "metadata": {**record.metadata, **(metadata or {})}, + "updated_at": _now(), + } + ) + return _save(store, updated, event_kind="extension_updated") + + +def rollback_extension(store: ExtensionStore, extension_id: str) -> ExtensionRecord: + record = get_extension(store, extension_id) + if record.previous_status is None: + return record + return _transition( + store, + record, + status=record.previous_status, + event_kind="extension_rollback", + ) + + +def _transition( + store: ExtensionStore, + record: ExtensionRecord, + *, + status: ExtensionStatus, + event_kind: str, +) -> ExtensionRecord: + return _save( + store, + record.model_copy( + update={ + "status": status, + "previous_status": record.status, + "updated_at": _now(), + } + ), + event_kind=event_kind, + ) + + +def _save( + store: ExtensionStore, + record: ExtensionRecord, + *, + event_kind: str, +) -> ExtensionRecord: + store.put(EXTENSION_NAMESPACE, record.extension_id, record.model_dump()) + append_event( + store, + stream_id=f"extension:{record.extension_id}", + kind=event_kind, + payload={"extension_id": record.extension_id, "status": record.status}, + ) + return record + + +def _find_by_name_kind( + store: ExtensionStore, + *, + name: str, + kind: ExtensionKind, +) -> ExtensionRecord | None: + for record in list_extensions(store): + if record.name == name and record.kind == kind: + return record + return None + + +def _item_value(item: object) -> dict[str, object]: + value = getattr(item, "value", item) + return value if isinstance(value, dict) else {} + + +def _extension_id(*, name: str, kind: str, created_at: str) -> str: + digest = sha256(f"{name}\0{kind}\0{created_at}".encode("utf-8")).hexdigest() + return f"ext-{digest[:12]}" + + +def _now() -> str: + return datetime.now(UTC).isoformat().replace("+00:00", "Z") diff --git a/coding-deepgent/src/coding_deepgent/mailbox/__init__.py b/coding-deepgent/src/coding_deepgent/mailbox/__init__.py new file mode 100644 index 0000000..b7de6f4 --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/mailbox/__init__.py @@ -0,0 +1,17 @@ +from .store import ( + MAILBOX_NAMESPACE, + MailboxMessage, + ack_message, + get_message, + list_messages, + send_message, +) + +__all__ = [ + "MAILBOX_NAMESPACE", + "MailboxMessage", + "ack_message", + "get_message", + "list_messages", + "send_message", +] diff --git a/coding-deepgent/src/coding_deepgent/mailbox/store.py b/coding-deepgent/src/coding_deepgent/mailbox/store.py new file mode 100644 index 0000000..fc185cf --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/mailbox/store.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +from collections.abc import Iterable +from datetime import UTC, datetime +from hashlib import sha256 +from typing import Any, Literal, Protocol + +from pydantic import BaseModel, ConfigDict, Field + +from coding_deepgent.event_stream import append_event + +MAILBOX_NAMESPACE = ("coding_deepgent_mailbox",) +MessageStatus = Literal["pending", "acked", "cancelled"] + + +class MailboxStore(Protocol): + def put( + self, namespace: tuple[str, ...], key: str, value: dict[str, object] + ) -> None: ... + def get(self, namespace: tuple[str, ...], key: str) -> object | None: ... + def search(self, namespace: tuple[str, ...]) -> Iterable[object]: ... + + +class MailboxMessage(BaseModel): + model_config = ConfigDict(extra="forbid") + + message_id: str + sender: str = Field(..., min_length=1) + recipient: str = Field(..., min_length=1) + subject: str = Field(..., min_length=1) + body: str = Field(..., min_length=1) + status: MessageStatus = "pending" + delivery_key: str | None = None + metadata: dict[str, Any] = Field(default_factory=dict) + created_at: str + acked_at: str | None = None + + +def send_message( + store: MailboxStore, + *, + sender: str, + recipient: str, + subject: str, + body: str, + delivery_key: str | None = None, + metadata: dict[str, Any] | None = None, +) -> MailboxMessage: + if delivery_key: + existing = _message_by_delivery_key(store, delivery_key) + if existing is not None: + return existing + created_at = _now() + message = MailboxMessage( + message_id=_message_id(sender=sender, recipient=recipient, created_at=created_at), + sender=sender.strip(), + recipient=recipient.strip(), + subject=subject.strip(), + body=body.strip(), + delivery_key=delivery_key, + metadata=metadata or {}, + created_at=created_at, + ) + store.put(MAILBOX_NAMESPACE, message.message_id, message.model_dump()) + append_event( + store, + stream_id=f"mailbox:{message.recipient}", + kind="mailbox_message_sent", + payload=message.model_dump(), + ) + return message + + +def get_message(store: MailboxStore, message_id: str) -> MailboxMessage: + item = store.get(MAILBOX_NAMESPACE, message_id) + if item is None: + raise KeyError(f"Unknown mailbox message: {message_id}") + return MailboxMessage.model_validate(_item_value(item)) + + +def list_messages( + store: MailboxStore, + *, + recipient: str | None = None, + status: MessageStatus | None = None, +) -> list[MailboxMessage]: + records = [ + MailboxMessage.model_validate(_item_value(item)) + for item in store.search(MAILBOX_NAMESPACE) + ] + if recipient is not None: + records = [record for record in records if record.recipient == recipient] + if status is not None: + records = [record for record in records if record.status == status] + return sorted(records, key=lambda record: record.created_at) + + +def ack_message(store: MailboxStore, message_id: str) -> MailboxMessage: + message = get_message(store, message_id) + updated = message.model_copy(update={"status": "acked", "acked_at": _now()}) + store.put(MAILBOX_NAMESPACE, updated.message_id, updated.model_dump()) + append_event( + store, + stream_id=f"mailbox:{updated.recipient}", + kind="mailbox_message_acked", + payload={"message_id": updated.message_id}, + ) + return updated + + +def _message_by_delivery_key( + store: MailboxStore, + delivery_key: str, +) -> MailboxMessage | None: + for message in list_messages(store): + if message.delivery_key == delivery_key: + return message + return None + + +def _item_value(item: object) -> dict[str, object]: + value = getattr(item, "value", item) + return value if isinstance(value, dict) else {} + + +def _message_id(*, sender: str, recipient: str, created_at: str) -> str: + digest = sha256(f"{sender}\0{recipient}\0{created_at}".encode("utf-8")).hexdigest() + return f"msg-{digest[:12]}" + + +def _now() -> str: + return datetime.now(UTC).isoformat().replace("+00:00", "Z") diff --git a/coding-deepgent/src/coding_deepgent/remote/__init__.py b/coding-deepgent/src/coding_deepgent/remote/__init__.py new file mode 100644 index 0000000..5d54fb7 --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/remote/__init__.py @@ -0,0 +1,21 @@ +from .store import ( + REMOTE_NAMESPACE, + RemoteSession, + close_remote_session, + get_remote_session, + list_remote_sessions, + register_remote_session, + replay_remote_events, + send_remote_control, +) + +__all__ = [ + "REMOTE_NAMESPACE", + "RemoteSession", + "close_remote_session", + "get_remote_session", + "list_remote_sessions", + "register_remote_session", + "replay_remote_events", + "send_remote_control", +] diff --git a/coding-deepgent/src/coding_deepgent/remote/store.py b/coding-deepgent/src/coding_deepgent/remote/store.py new file mode 100644 index 0000000..a75233d --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/remote/store.py @@ -0,0 +1,134 @@ +from __future__ import annotations + +from collections.abc import Iterable +from datetime import UTC, datetime +from hashlib import sha256 +from typing import Any, Literal, Protocol + +from pydantic import BaseModel, ConfigDict, Field + +from coding_deepgent.event_stream import EventRecord, append_event, list_events + +REMOTE_NAMESPACE = ("coding_deepgent_remote_sessions",) +RemoteStatus = Literal["active", "closed"] + + +class RemoteStore(Protocol): + def put( + self, namespace: tuple[str, ...], key: str, value: dict[str, object] + ) -> None: ... + def get(self, namespace: tuple[str, ...], key: str) -> object | None: ... + def search(self, namespace: tuple[str, ...]) -> Iterable[object]: ... + + +class RemoteSession(BaseModel): + model_config = ConfigDict(extra="forbid") + + remote_id: str + session_id: str = Field(..., min_length=1) + client_name: str = Field(..., min_length=1) + status: RemoteStatus = "active" + last_sequence_sent: int = 0 + created_at: str + updated_at: str + + +def register_remote_session( + store: RemoteStore, + *, + session_id: str, + client_name: str, +) -> RemoteSession: + now = _now() + remote = RemoteSession( + remote_id=_remote_id(session_id=session_id, client_name=client_name, created_at=now), + session_id=session_id.strip(), + client_name=client_name.strip(), + created_at=now, + updated_at=now, + ) + store.put(REMOTE_NAMESPACE, remote.remote_id, remote.model_dump()) + append_event( + store, + stream_id=f"remote:{remote.remote_id}", + kind="remote_registered", + payload=remote.model_dump(), + ) + return remote + + +def get_remote_session(store: RemoteStore, remote_id: str) -> RemoteSession: + item = store.get(REMOTE_NAMESPACE, remote_id) + if item is None: + raise KeyError(f"Unknown remote session: {remote_id}") + return RemoteSession.model_validate(_item_value(item)) + + +def list_remote_sessions(store: RemoteStore) -> list[RemoteSession]: + return sorted( + [ + RemoteSession.model_validate(_item_value(item)) + for item in store.search(REMOTE_NAMESPACE) + ], + key=lambda item: item.remote_id, + ) + + +def send_remote_control( + store: RemoteStore, + *, + remote_id: str, + command: str, + payload: dict[str, Any] | None = None, +) -> EventRecord: + remote = get_remote_session(store, remote_id) + if remote.status != "active": + raise ValueError("remote session is closed") + return append_event( + store, + stream_id=f"remote:{remote.remote_id}", + kind=f"control:{command.strip()}", + payload=payload or {}, + ) + + +def replay_remote_events( + store: RemoteStore, + *, + remote_id: str, + after_sequence: int | None = None, +) -> list[EventRecord]: + remote = get_remote_session(store, remote_id) + return list_events( + store, + stream_id=f"remote:{remote.remote_id}", + after_sequence=after_sequence, + include_internal=False, + ) + + +def close_remote_session(store: RemoteStore, remote_id: str) -> RemoteSession: + remote = get_remote_session(store, remote_id) + updated = remote.model_copy(update={"status": "closed", "updated_at": _now()}) + store.put(REMOTE_NAMESPACE, updated.remote_id, updated.model_dump()) + append_event( + store, + stream_id=f"remote:{updated.remote_id}", + kind="remote_closed", + payload={"remote_id": updated.remote_id}, + ) + return updated + + +def _item_value(item: object) -> dict[str, object]: + value = getattr(item, "value", item) + return value if isinstance(value, dict) else {} + + +def _remote_id(*, session_id: str, client_name: str, created_at: str) -> str: + digest = sha256(f"{session_id}\0{client_name}\0{created_at}".encode("utf-8")).hexdigest() + return f"remote-{digest[:12]}" + + +def _now() -> str: + return datetime.now(UTC).isoformat().replace("+00:00", "Z") diff --git a/coding-deepgent/src/coding_deepgent/renderers/text.py b/coding-deepgent/src/coding_deepgent/renderers/text.py index ceef5fd..97300cd 100644 --- a/coding-deepgent/src/coding_deepgent/renderers/text.py +++ b/coding-deepgent/src/coding_deepgent/renderers/text.py @@ -281,8 +281,12 @@ def render_extension_table( return _render_table(table) -def render_acceptance_table(rows: Sequence[Mapping[str, Any]]) -> str: - table = Table(title="Circle 1 Acceptance", box=box.SIMPLE_HEAVY) +def render_acceptance_table( + rows: Sequence[Mapping[str, Any]], + *, + title: str = "Acceptance", +) -> str: + table = Table(title=title, box=box.SIMPLE_HEAVY) table.add_column("Check", style="cyan", no_wrap=True) table.add_column("Status", no_wrap=True) table.add_column("Detail") diff --git a/coding-deepgent/src/coding_deepgent/teams/__init__.py b/coding-deepgent/src/coding_deepgent/teams/__init__.py new file mode 100644 index 0000000..e82c408 --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/teams/__init__.py @@ -0,0 +1,21 @@ +from .store import ( + TEAM_NAMESPACE, + TeamRun, + assign_worker, + complete_team, + create_team, + get_team, + list_teams, + update_progress, +) + +__all__ = [ + "TEAM_NAMESPACE", + "TeamRun", + "assign_worker", + "complete_team", + "create_team", + "get_team", + "list_teams", + "update_progress", +] diff --git a/coding-deepgent/src/coding_deepgent/teams/store.py b/coding-deepgent/src/coding_deepgent/teams/store.py new file mode 100644 index 0000000..25233d7 --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/teams/store.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +from collections.abc import Iterable +from datetime import UTC, datetime +from hashlib import sha256 +from typing import Any, Literal, Protocol + +from pydantic import BaseModel, ConfigDict, Field + +from coding_deepgent.event_stream import append_event + +TEAM_NAMESPACE = ("coding_deepgent_teams",) +TeamStatus = Literal["planning", "running", "completed", "cancelled"] + + +class TeamStore(Protocol): + def put( + self, namespace: tuple[str, ...], key: str, value: dict[str, object] + ) -> None: ... + def get(self, namespace: tuple[str, ...], key: str) -> object | None: ... + def search(self, namespace: tuple[str, ...]) -> Iterable[object]: ... + + +class TeamRun(BaseModel): + model_config = ConfigDict(extra="forbid") + + team_id: str + title: str = Field(..., min_length=1) + coordinator: str = Field(default="coordinator", min_length=1) + status: TeamStatus = "planning" + worker_ids: list[str] = Field(default_factory=list) + task_ids: list[str] = Field(default_factory=list) + progress: list[str] = Field(default_factory=list) + summary: str | None = None + metadata: dict[str, Any] = Field(default_factory=dict) + created_at: str + updated_at: str + + +def create_team( + store: TeamStore, + *, + title: str, + coordinator: str = "coordinator", + task_ids: list[str] | None = None, + metadata: dict[str, Any] | None = None, +) -> TeamRun: + now = _now() + team = TeamRun( + team_id=_team_id(title=title, created_at=now), + title=title.strip(), + coordinator=coordinator.strip(), + task_ids=task_ids or [], + metadata=metadata or {}, + created_at=now, + updated_at=now, + ) + return _save(store, team, event_kind="team_created") + + +def get_team(store: TeamStore, team_id: str) -> TeamRun: + item = store.get(TEAM_NAMESPACE, team_id) + if item is None: + raise KeyError(f"Unknown team: {team_id}") + return TeamRun.model_validate(_item_value(item)) + + +def list_teams(store: TeamStore) -> list[TeamRun]: + return sorted( + [TeamRun.model_validate(_item_value(item)) for item in store.search(TEAM_NAMESPACE)], + key=lambda team: team.team_id, + ) + + +def assign_worker(store: TeamStore, *, team_id: str, worker_id: str) -> TeamRun: + team = get_team(store, team_id) + workers = team.worker_ids if worker_id in team.worker_ids else [*team.worker_ids, worker_id] + return _save( + store, + team.model_copy( + update={"worker_ids": workers, "status": "running", "updated_at": _now()} + ), + event_kind="team_worker_assigned", + ) + + +def update_progress(store: TeamStore, *, team_id: str, message: str) -> TeamRun: + team = get_team(store, team_id) + return _save( + store, + team.model_copy( + update={"progress": [*team.progress, message.strip()], "updated_at": _now()} + ), + event_kind="team_progress", + ) + + +def complete_team( + store: TeamStore, + *, + team_id: str, + summary: str, + status: TeamStatus = "completed", +) -> TeamRun: + if status not in {"completed", "cancelled"}: + raise ValueError("team completion status must be completed or cancelled") + team = get_team(store, team_id) + return _save( + store, + team.model_copy( + update={"status": status, "summary": summary.strip(), "updated_at": _now()} + ), + event_kind=f"team_{status}", + ) + + +def _save(store: TeamStore, team: TeamRun, *, event_kind: str) -> TeamRun: + store.put(TEAM_NAMESPACE, team.team_id, team.model_dump()) + append_event( + store, + stream_id=f"team:{team.team_id}", + kind=event_kind, + payload={"team_id": team.team_id, "status": team.status}, + ) + return team + + +def _item_value(item: object) -> dict[str, object]: + value = getattr(item, "value", item) + return value if isinstance(value, dict) else {} + + +def _team_id(*, title: str, created_at: str) -> str: + digest = sha256(f"{title}\0{created_at}".encode("utf-8")).hexdigest() + return f"team-{digest[:12]}" + + +def _now() -> str: + return datetime.now(UTC).isoformat().replace("+00:00", "Z") diff --git a/coding-deepgent/src/coding_deepgent/worker_runtime/__init__.py b/coding-deepgent/src/coding_deepgent/worker_runtime/__init__.py new file mode 100644 index 0000000..6104960 --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/worker_runtime/__init__.py @@ -0,0 +1,21 @@ +from .store import ( + WORKER_NAMESPACE, + WorkerRecord, + complete_worker, + create_worker, + get_worker, + heartbeat_worker, + list_workers, + request_worker_stop, +) + +__all__ = [ + "WORKER_NAMESPACE", + "WorkerRecord", + "complete_worker", + "create_worker", + "get_worker", + "heartbeat_worker", + "list_workers", + "request_worker_stop", +] diff --git a/coding-deepgent/src/coding_deepgent/worker_runtime/store.py b/coding-deepgent/src/coding_deepgent/worker_runtime/store.py new file mode 100644 index 0000000..5efdc43 --- /dev/null +++ b/coding-deepgent/src/coding_deepgent/worker_runtime/store.py @@ -0,0 +1,157 @@ +from __future__ import annotations + +from collections.abc import Iterable +from datetime import UTC, datetime +from hashlib import sha256 +from typing import Any, Literal, Protocol + +from pydantic import BaseModel, ConfigDict, Field + +from coding_deepgent.event_stream import append_event + +WORKER_NAMESPACE = ("coding_deepgent_workers",) +WorkerStatus = Literal["queued", "running", "idle", "completed", "failed", "cancelled"] + + +class WorkerStore(Protocol): + def put( + self, namespace: tuple[str, ...], key: str, value: dict[str, object] + ) -> None: ... + def get(self, namespace: tuple[str, ...], key: str) -> object | None: ... + def search(self, namespace: tuple[str, ...]) -> Iterable[object]: ... + + +class WorkerRecord(BaseModel): + model_config = ConfigDict(extra="forbid") + + worker_id: str + kind: str = Field(default="local", min_length=1) + session_id: str = Field(default="default", min_length=1) + status: WorkerStatus = "queued" + owner: str | None = None + payload: dict[str, Any] = Field(default_factory=dict) + result_summary: str | None = None + stop_requested: bool = False + created_at: str + updated_at: str + heartbeat_at: str | None = None + + +def create_worker( + store: WorkerStore, + *, + kind: str, + session_id: str = "default", + owner: str | None = None, + payload: dict[str, Any] | None = None, +) -> WorkerRecord: + now = _now() + worker_id = _worker_id(kind=kind, session_id=session_id, created_at=now) + record = WorkerRecord( + worker_id=worker_id, + kind=kind.strip(), + session_id=session_id.strip() or "default", + owner=owner, + payload=payload or {}, + created_at=now, + updated_at=now, + ) + return _save(store, record, event_kind="worker_created") + + +def get_worker(store: WorkerStore, worker_id: str) -> WorkerRecord: + item = store.get(WORKER_NAMESPACE, worker_id) + if item is None: + raise KeyError(f"Unknown worker: {worker_id}") + return WorkerRecord.model_validate(_item_value(item)) + + +def list_workers( + store: WorkerStore, + *, + include_terminal: bool = False, +) -> list[WorkerRecord]: + records = [ + WorkerRecord.model_validate(_item_value(item)) + for item in store.search(WORKER_NAMESPACE) + ] + if not include_terminal: + records = [ + record + for record in records + if record.status not in {"completed", "failed", "cancelled"} + ] + return sorted(records, key=lambda record: record.worker_id) + + +def heartbeat_worker(store: WorkerStore, worker_id: str) -> WorkerRecord: + record = get_worker(store, worker_id) + now = _now() + return _save( + store, + record.model_copy( + update={"status": "running", "heartbeat_at": now, "updated_at": now} + ), + event_kind="worker_heartbeat", + ) + + +def request_worker_stop(store: WorkerStore, worker_id: str) -> WorkerRecord: + record = get_worker(store, worker_id) + return _save( + store, + record.model_copy(update={"stop_requested": True, "updated_at": _now()}), + event_kind="worker_stop_requested", + ) + + +def complete_worker( + store: WorkerStore, + worker_id: str, + *, + status: WorkerStatus = "completed", + result_summary: str | None = None, +) -> WorkerRecord: + if status not in {"completed", "failed", "cancelled"}: + raise ValueError("worker completion status must be terminal") + record = get_worker(store, worker_id) + return _save( + store, + record.model_copy( + update={ + "status": status, + "result_summary": result_summary, + "updated_at": _now(), + } + ), + event_kind=f"worker_{status}", + ) + + +def _save(store: WorkerStore, record: WorkerRecord, *, event_kind: str) -> WorkerRecord: + store.put(WORKER_NAMESPACE, record.worker_id, record.model_dump()) + append_event( + store, + stream_id=f"worker:{record.worker_id}", + kind=event_kind, + payload={ + "worker_id": record.worker_id, + "status": record.status, + "session_id": record.session_id, + }, + ) + return record + + +def _item_value(item: object) -> dict[str, object]: + value = getattr(item, "value", item) + return value if isinstance(value, dict) else {} + + +def _worker_id(*, kind: str, session_id: str, created_at: str) -> str: + digest = sha256(f"{kind}\0{session_id}\0{created_at}".encode("utf-8")).hexdigest() + return f"worker-{digest[:12]}" + + +def _now() -> str: + return datetime.now(UTC).isoformat().replace("+00:00", "Z") diff --git a/coding-deepgent/tests/cli/test_cli.py b/coding-deepgent/tests/cli/test_cli.py index 8f7557d..f61f3ec 100644 --- a/coding-deepgent/tests/cli/test_cli.py +++ b/coding-deepgent/tests/cli/test_cli.py @@ -568,6 +568,60 @@ def test_extension_and_acceptance_commands_use_cli_service(monkeypatch) -> None: assert "workflow_a_repository_takeover" in acceptance.stdout +def test_circle2_cli_surfaces_write_to_durable_store(monkeypatch, tmp_path: Path) -> None: + settings = Settings(workdir=tmp_path, model_name="gpt-test") + runtime = cli_service.CliRuntime( + settings_loader=lambda: settings, + list_sessions=lambda: [], + load_session=_empty_history, + run_prompt=_unused_run_prompt, + doctor_checks=lambda: [], + ) + monkeypatch.setattr(cli, "build_cli_runtime", lambda: runtime) + + worker = runner.invoke(cli.app, ["workers", "create", "assistant"]) + mailbox = runner.invoke( + cli.app, + [ + "mailbox", + "send", + "worker-1", + "task", + "do it", + "--sender", + "coordinator", + "--delivery-key", + "delivery-1", + ], + ) + team = runner.invoke(cli.app, ["teams", "create", "Ship feature"]) + remote = runner.invoke(cli.app, ["remote", "register", "session-1", "ide"]) + lifecycle = runner.invoke( + cli.app, + ["extension-lifecycle", "register", "demo", "plugin", "local"], + ) + continuity = runner.invoke( + cli.app, + ["continuity", "save", "Next step", "Continue tomorrow."], + ) + acceptance = runner.invoke(cli.app, ["acceptance", "circle2"]) + + assert worker.exit_code == 0 + assert cli_service.worker_rows(settings) + assert mailbox.exit_code == 0 + assert len(cli_service.mailbox_rows(settings, recipient="worker-1")) == 1 + assert team.exit_code == 0 + assert cli_service.team_rows(settings) + assert remote.exit_code == 0 + assert cli_service.remote_rows(settings) + assert lifecycle.exit_code == 0 + assert cli_service.lifecycle_rows(settings) + assert continuity.exit_code == 0 + assert cli_service.continuity_rows(settings) + assert acceptance.exit_code == 0 + assert "workflow_d_durable_background_lifecycle" in acceptance.stdout + + def test_sessions_resume_uses_recovery_brief_continuation_history( monkeypatch, tmp_path: Path ) -> None: diff --git a/coding-deepgent/tests/runtime/test_circle2_substrate.py b/coding-deepgent/tests/runtime/test_circle2_substrate.py new file mode 100644 index 0000000..c197b6a --- /dev/null +++ b/coding-deepgent/tests/runtime/test_circle2_substrate.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +from langgraph.store.memory import InMemoryStore + +from coding_deepgent.continuity import get_artifact, list_artifacts, save_artifact +from coding_deepgent.event_stream import ack_event, append_event, list_events +from coding_deepgent.extension_lifecycle import ( + disable_extension, + enable_extension, + register_extension, + rollback_extension, +) +from coding_deepgent.mailbox import ack_message, list_messages, send_message +from coding_deepgent.remote import ( + close_remote_session, + register_remote_session, + replay_remote_events, + send_remote_control, +) +from coding_deepgent.teams import assign_worker, complete_team, create_team +from coding_deepgent.worker_runtime import ( + complete_worker, + create_worker, + heartbeat_worker, + request_worker_stop, +) + + +def test_event_stream_appends_replays_and_acks() -> None: + store = InMemoryStore() + + first = append_event(store, stream_id="session-1", kind="started") + second = append_event(store, stream_id="session-1", kind="progress") + acked = ack_event(store, stream_id="session-1", event_id=first.event_id) + + assert [event.sequence for event in list_events(store, stream_id="session-1")] == [ + 1, + 2, + ] + assert list_events(store, stream_id="session-1", after_sequence=1)[0].event_id == second.event_id + assert acked.acked is True + + +def test_worker_runtime_records_heartbeat_stop_and_completion() -> None: + store = InMemoryStore() + + worker = create_worker(store, kind="assistant", session_id="session-1") + running = heartbeat_worker(store, worker.worker_id) + stopping = request_worker_stop(store, worker.worker_id) + completed = complete_worker(store, worker.worker_id, result_summary="done") + + assert running.status == "running" + assert stopping.stop_requested is True + assert completed.status == "completed" + + +def test_mailbox_send_is_idempotent_and_ackable() -> None: + store = InMemoryStore() + + first = send_message( + store, + sender="coordinator", + recipient="worker-1", + subject="task", + body="do it", + delivery_key="delivery-1", + ) + second = send_message( + store, + sender="coordinator", + recipient="worker-1", + subject="task", + body="do it", + delivery_key="delivery-1", + ) + acked = ack_message(store, first.message_id) + + assert first.message_id == second.message_id + assert acked.status == "acked" + assert len(list_messages(store, recipient="worker-1")) == 1 + + +def test_team_remote_extension_and_continuity_records() -> None: + store = InMemoryStore() + + worker = create_worker(store, kind="assistant", session_id="session-1") + team = create_team(store, title="Ship feature") + team = assign_worker(store, team_id=team.team_id, worker_id=worker.worker_id) + team = complete_team(store, team_id=team.team_id, summary="done") + remote = register_remote_session( + store, + session_id="session-1", + client_name="ide", + ) + event = send_remote_control(store, remote_id=remote.remote_id, command="stop") + extension = register_extension( + store, + name="demo", + kind="plugin", + source="local", + ) + enabled = enable_extension(store, extension.extension_id) + disabled = disable_extension(store, extension.extension_id) + rolled_back = rollback_extension(store, extension.extension_id) + artifact = save_artifact( + store, + title="Next step", + content="Continue implementation.", + session_id="session-1", + ) + + assert team.status == "completed" + assert replay_remote_events(store, remote_id=remote.remote_id)[-1].event_id == event.event_id + assert close_remote_session(store, remote.remote_id).status == "closed" + assert enabled.status == "enabled" + assert disabled.status == "disabled" + assert rolled_back.status == "enabled" + assert get_artifact(store, artifact.artifact_id).title == "Next step" + assert list_artifacts(store)[0].artifact_id == artifact.artifact_id