learn-claude-code/s12_task_system/code.py
gui-yue 1baf1aca5a
Some checks failed
Test / web-build (push) Waiting to run
CI / build (push) Failing after 16s
Test / python-smoke (push) Failing after 24m19s
Follow up PR #265: refine chapters, diagrams, and add S20 (#283)
* feat: s01-s14 docs quality overhaul — tool pipeline, single-agent, knowledge & resilience

Rewrite code.py and README (zh/en/ja) for s01-s14, each chapter building
incrementally on the previous. Key fixes across chapters:

- s01-s04: agent loop, tool dispatch, permission pipeline, hooks
- s05-s08: todo write, subagent, skill loading, context compact
- s09-s11: memory system, system prompt assembly, error recovery
- s12-s14: task graph, background tasks, cron scheduler

All chapters CC source-verified. Code inherits fixes forward (PROMPT_SECTIONS,
json.dumps cache, real-state context, can_start dep protection, etc.).

* feat: s15-s19 docs quality overhaul — multi-agent platform: teams, protocols, autonomy, worktree, MCP tools

Rewrite code.py and README (zh/en/ja) for s15-s19, the multi-agent platform
chapters. Each chapter inherits all previous fixes and adds one mechanism:

- s15: agent teams (TeamCreate, teammate threads, shared task list)
- s16: team protocols (plan approval, shutdown handshake, consume_inbox)
- s17: autonomous agents (idle polling, auto-claim, consume_lead_inbox)
- s18: worktree isolation (git worktree, bind_task, cwd switching, safety)
- s19: MCP tools (MCPClient, normalize_mcp_name, assemble_tool_pool, no cache)

All appendix source code references verified against CC source. Config priority
corrected: claude.ai < plugin < user < project < local.

* fix: 5 regressions across s05-s19 — glob safety, todo validation, memory extraction, protocol types, dep crash

- s05-s09: glob results now filter with is_relative_to(WORKDIR) (inherited from s02)
- s06-s08: todo_write validates content/status required fields (inherited from s05)
- s09: extract_memories uses pre-compression snapshot instead of compacted messages
- s16: submit_plan docstring clarifies protocol-only (not code-level gate)
- s17-s19: match_response restores type mismatch validation (from s16)
- s17-s19: claim_task deps list handles missing dep files without crashing

* fix: s12 Todo V2 logic reversal, s14/s15 cron range validation, s18/s19 worktree name validation

- s12 README (zh/en/ja): fix Todo V2 direction — interactive defaults to Task,
  non-interactive/SDK defaults to TodoWrite. Fix env var name to
  CLAUDE_CODE_ENABLE_TASKS (not TODO_V2).
- s14/s15: add _validate_cron_field with per-field range checks (minute 0-59,
  hour 0-23, dom 1-31, month 1-12, dow 0-6), step > 0, range lo <= hi.
  Replace old try/except validation that only caught exceptions.
- s18/s19: add validate_worktree_name() to remove_worktree and keep_worktree,
  not just create_worktree.

* fix: align s16-s19 teaching tool consistency

* fix pr265 chapter diagrams

* Add comprehensive s20 harness chapter

* Fix chapter smoke test regressions

* Clarify README tutorial track transition

---------

Co-authored-by: Haoran <bill-billion@outlook.com>
2026-05-20 21:45:38 +08:00

376 lines
13 KiB
Python

#!/usr/bin/env python3
"""
s12: Task System — file-persisted task graph with blockedBy dependencies.
Run: python s12_task_system/code.py
Need: pip install anthropic python-dotenv + .env with ANTHROPIC_API_KEY
Changes from s11:
- Task dataclass (id, subject, description, status, owner, blockedBy)
- TASKS_DIR = .tasks/ for persistent JSON storage
- create_task / save_task / load_task / list_tasks / get_task
- can_start: checks blockedBy all completed (missing deps = blocked)
- claim_task: set owner + pending -> in_progress
- complete_task: set completed + report unblocked downstream
- 5 new tools: create_task, list_tasks, get_task, claim_task, complete_task
Note: Teaching code keeps a basic agent loop to stay focused on the task
system. S11's full error recovery (RecoveryState, backoff, escalation,
reactive compact, fallback model) is omitted — in real CC, tasks.ts and
withRetry are independent layers that compose naturally.
"""
import os, subprocess, json, time, random
from pathlib import Path
from dataclasses import dataclass, asdict
try:
import readline
readline.parse_and_bind('set bind-tty-special-chars off')
except ImportError:
pass
from anthropic import Anthropic
from dotenv import load_dotenv
load_dotenv(override=True)
if os.getenv("ANTHROPIC_BASE_URL"):
os.environ.pop("ANTHROPIC_AUTH_TOKEN", None)
WORKDIR = Path.cwd()
MEMORY_DIR = WORKDIR / ".memory"
MEMORY_INDEX = MEMORY_DIR / "MEMORY.md"
client = Anthropic(base_url=os.getenv("ANTHROPIC_BASE_URL"))
MODEL = os.environ["MODEL_ID"]
# ── Task System ──
TASKS_DIR = WORKDIR / ".tasks"
TASKS_DIR.mkdir(exist_ok=True)
@dataclass
class Task:
id: str
subject: str
description: str
status: str # pending | in_progress | completed
owner: str | None # Agent name (multi-agent scenarios)
blockedBy: list[str] # Dependency task IDs
def _task_path(task_id: str) -> Path:
return TASKS_DIR / f"{task_id}.json"
def create_task(subject: str, description: str = "",
blockedBy: list[str] | None = None) -> Task:
task = Task(
id=f"task_{int(time.time())}_{random.randint(0, 9999):04d}",
subject=subject,
description=description,
status="pending",
owner=None,
blockedBy=blockedBy or [],
)
save_task(task)
return task
def save_task(task: Task):
_task_path(task.id).write_text(json.dumps(asdict(task), indent=2))
def load_task(task_id: str) -> Task:
return Task(**json.loads(_task_path(task_id).read_text()))
def list_tasks() -> list[Task]:
return [Task(**json.loads(p.read_text()))
for p in sorted(TASKS_DIR.glob("task_*.json"))]
def get_task(task_id: str) -> str:
"""Return full task details as JSON."""
task = load_task(task_id)
return json.dumps(asdict(task), indent=2)
def can_start(task_id: str) -> bool:
"""Check if all blockedBy dependencies are completed.
Missing dependencies are treated as blocked."""
task = load_task(task_id)
for dep_id in task.blockedBy:
if not _task_path(dep_id).exists():
return False
if load_task(dep_id).status != "completed":
return False
return True
def claim_task(task_id: str, owner: str = "agent") -> str:
task = load_task(task_id)
if task.status != "pending":
return f"Task {task_id} is {task.status}, cannot claim"
if not can_start(task_id):
deps = [d for d in task.blockedBy
if not _task_path(d).exists() or load_task(d).status != "completed"]
return f"Blocked by: {deps}"
task.owner = owner
task.status = "in_progress"
save_task(task)
print(f" \033[36m[claim] {task.subject} → in_progress (owner: {owner})\033[0m")
return f"Claimed {task.id} ({task.subject})"
def complete_task(task_id: str) -> str:
task = load_task(task_id)
if task.status != "in_progress":
return f"Task {task_id} is {task.status}, cannot complete"
task.status = "completed"
save_task(task)
unblocked = [t.subject for t in list_tasks()
if t.status == "pending" and t.blockedBy and can_start(t.id)]
print(f" \033[32m[complete] {task.subject}\033[0m")
msg = f"Completed {task.id} ({task.subject})"
if unblocked:
msg += f"\nUnblocked: {', '.join(unblocked)}"
print(f" \033[33m[unblocked] {', '.join(unblocked)}\033[0m")
return msg
# ── Prompt Assembly (from s10, synced) ──
PROMPT_SECTIONS = {
"identity": "You are a coding agent. Act, don't explain.",
"tools": "Available tools: bash, read_file, write_file, "
"create_task, list_tasks, get_task, claim_task, complete_task.",
"workspace": f"Working directory: {WORKDIR}",
"memory": "Relevant memories are injected below when available.",
}
def assemble_system_prompt(context: dict) -> str:
sections = [PROMPT_SECTIONS["identity"],
PROMPT_SECTIONS["tools"],
PROMPT_SECTIONS["workspace"]]
memories = context.get("memories", "")
if memories:
sections.append(f"Relevant memories:\n{memories}")
return "\n\n".join(sections)
_last_context_key, _last_prompt = None, None
def get_system_prompt(context: dict) -> str:
global _last_context_key, _last_prompt
key = json.dumps(context, sort_keys=True, ensure_ascii=False, default=str)
if key == _last_context_key and _last_prompt:
return _last_prompt
_last_context_key = key
_last_prompt = assemble_system_prompt(context)
return _last_prompt
# ── Tools ──
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_bash(command: str) -> str:
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
def run_read(path: str, limit: int | None = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
return "\n".join(lines)
except Exception as e:
return f"Error: {e}"
def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content)
return f"Wrote {len(content)} bytes to {path}"
except Exception as e:
return f"Error: {e}"
# Task tools
def run_create_task(subject: str, description: str = "",
blockedBy: list[str] | None = None) -> str:
task = create_task(subject, description, blockedBy)
deps = f" (blockedBy: {', '.join(blockedBy)})" if blockedBy else ""
print(f" \033[34m[create] {task.subject}{deps}\033[0m")
return f"Created {task.id}: {task.subject}{deps}"
def run_list_tasks() -> str:
tasks = list_tasks()
if not tasks:
return "No tasks. Use create_task to add some."
lines = []
for t in tasks:
icon = {"pending": "", "in_progress": "",
"completed": ""}.get(t.status, "?")
deps = f" (blockedBy: {', '.join(t.blockedBy)})" if t.blockedBy else ""
owner = f" [{t.owner}]" if t.owner else ""
lines.append(f" {icon} {t.id}: {t.subject} "
f"[{t.status}]{owner}{deps}")
return "\n".join(lines)
def run_get_task(task_id: str) -> str:
try:
return get_task(task_id)
except FileNotFoundError:
return f"Error: Task {task_id} not found"
def run_claim_task(task_id: str) -> str:
return claim_task(task_id, owner="agent")
def run_complete_task(task_id: str) -> str:
return complete_task(task_id)
TOOLS = [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"},
"limit": {"type": "integer"}},
"required": ["path"]}},
{"name": "write_file", "description": "Write content to a file.",
"input_schema": {"type": "object",
"properties": {"path": {"type": "string"},
"content": {"type": "string"}},
"required": ["path", "content"]}},
{"name": "create_task",
"description": "Create a new task with optional blockedBy dependencies.",
"input_schema": {"type": "object",
"properties": {
"subject": {"type": "string"},
"description": {"type": "string"},
"blockedBy": {"type": "array",
"items": {"type": "string"}}},
"required": ["subject"]}},
{"name": "list_tasks",
"description": "List all tasks with status, owner, and dependencies.",
"input_schema": {"type": "object", "properties": {},
"required": []}},
{"name": "get_task",
"description": "Get full details of a specific task by ID.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "claim_task",
"description": "Claim a pending task. Sets owner, changes status to in_progress.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
{"name": "complete_task",
"description": "Complete an in-progress task. Reports unblocked downstream tasks.",
"input_schema": {"type": "object",
"properties": {"task_id": {"type": "string"}},
"required": ["task_id"]}},
]
TOOL_HANDLERS = {
"bash": run_bash, "read_file": run_read, "write_file": run_write,
"create_task": run_create_task, "list_tasks": run_list_tasks,
"get_task": run_get_task, "claim_task": run_claim_task,
"complete_task": run_complete_task,
}
# ── Context ──
def update_context(context: dict, messages: list) -> dict:
"""Derive context from real state."""
memories = ""
if MEMORY_INDEX.exists():
content = MEMORY_INDEX.read_text().strip()
if content:
memories = content
return {
"enabled_tools": list(TOOL_HANDLERS.keys()),
"workspace": str(WORKDIR),
"memories": memories,
}
# ── Agent Loop (simplified, focused on task system) ──
def agent_loop(messages: list, context: dict):
system = get_system_prompt(context)
while True:
try:
response = client.messages.create(
model=MODEL, system=system, messages=messages,
tools=TOOLS, max_tokens=8000)
except Exception as e:
messages.append({"role": "assistant", "content": [
{"type": "text",
"text": f"[Error] {type(e).__name__}: {e}"}]})
return
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type != "tool_use":
continue
print(f"\033[36m> {block.name}\033[0m")
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown: {block.name}"
print(str(output)[:300])
results.append({"type": "tool_result",
"tool_use_id": block.id, "content": output})
messages.append({"role": "user", "content": results})
context = update_context(context, messages)
system = get_system_prompt(context)
if __name__ == "__main__":
print("s12: task system")
print("Enter a question, press Enter to send. Type q to quit.\n")
history = []
context = update_context({}, [])
while True:
try:
query = input("\033[36ms12 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history, context)
context = update_context(context, history)
for block in history[-1]["content"]:
if getattr(block, "type", None) == "text":
print(block.text)
print()