ruvector/docs/research/claude-code-rvsource/extracted/agent-loop.rvf
rUv 930fca916f feat(sse): decouple SSE to mcp.pi.ruv.io proxy + Claude Code source research
SSE Proxy Decoupling (ADR-130):
- Fix ruvbrain-sse proxy: proper MCP handshake, session creation, drain polling
- Fix internal queue endpoints: session_create keeps receiver, drain returns buffered messages
- Add response_queues to AppState for SSE proxy communication
- Skip sparsifier for >5M edge graphs (was crashing on 16M edges)
- Add SSE_DISABLED/MAX_SSE env vars for configurable connection limits
- Route SSE to dedicated mcp.pi.ruv.io subdomain (Cloudflare CNAME)
- Serve SSE at root / path on proxy (no /sse needed)
- Update all references from pi.ruv.io/sse to mcp.pi.ruv.io
- Fix Dockerfile consciousness crate build (feature/version mismatches)

Claude Code CLI Source Research (ADR-133):
- 19 research documents analyzing Claude Code internals (3000+ lines)
- Decompiler script + RVF corpus builder for all major versions
- Binary RVF containers for v0.2, v1.0, v2.0, v2.1 (300-2068 vectors each)
- Call graphs, class hierarchies, state machines from minified source

Integration Strategy (ADR-134):
- 6-tier integration plan: WASM MCP, agents, hooks, cache, SDK, plugin
- Integration guide with architecture diagrams and performance targets

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-04-02 23:39:56 +00:00

116 lines
4.7 KiB
Text

---
type: source-extraction
module: agent-loop
binary: claude-code
version: 2.0.62
extraction-method: strings+pattern-match
confidence: high
---
# Agent Loop (s$) - Core Async Generator
## Function Signature (reconstructed from call sites)
```javascript
async function* s$({
messages, // Message[]
systemPrompt, // string
userContext, // Record<string, unknown>
systemContext, // Record<string, unknown>
canUseTool, // (toolName, input) => PermissionResult
toolUseContext, // ToolUseContext object
autoCompactTracking, // optional CompactTracker
fallbackModel, // optional string
stopHookActive, // optional boolean
querySource // "sdk" | "repl" | "hook_agent" | "compact" | etc.
})
```
## Call Sites Found (10 invocations of s$)
1. SDK entry: `for await(let P1 of s$({messages:B1,systemPrompt:WA,userContext:r,systemContext:BA,canUseTool:t,toolUseContext:jA,fallbackModel:E,querySource:"sdk"}))`
2. REPL query: `for await(let R8 of s$({messages:e0,systemPrompt:vK,userContext:_Y,systemContext:kI,canUseTool:IJ,toolUseContext:K6,querySource:jVA()}))`
3. Direct query: `for await(let kI of s$({messages:[...CQ,...HB],systemPrompt:Y9,userContext:h8,systemContext:A5,canUseTool:IJ,toolUseContext:_Y,querySource:jVA()}))`
4. Agent fork: `for await(let U of s$({messages:E,systemPrompt:W,userContext:K,systemContext:V,canUseTool:B,toolUseContext:H,querySource:G}))`
5. Sub-agent: `for await(let r of s$({messages:H,systemPrompt:b,userContext:C,systemContext:U,canUseTool:G,toolUseContext:d,querySource:J}))`
6. Tool result continuation (recursive): `yield*s$({messages:[...A,...Q,...K],systemPrompt:B,userContext:G,systemContext:Z,canUseTool:Y,toolUseContext:V,autoCompactTracking:I,fallbackModel:W,querySource:X})`
7. Attachment resending: `yield*s$({messages:[...A,...Q,...K],systemPrompt:B,userContext:G,systemContext:Z,canUseTool:Y,toolUseContext:V,autoCompactTracking:I,fallbackModel:W,querySource:X})`
8. Stop hook continuation: `yield*s$({messages:[...A,...Q,...D],systemPrompt:B,userContext:G,systemContext:Z,canUseTool:Y,toolUseContext:J,autoCompactTracking:I,fallbackModel:W,stopHookActive:!0,querySource:X})`
9. Hook agent: `for await(let r of s$({messages:H,systemPrompt:b,userContext:{},systemContext:{},canUseTool:a$,toolUseContext:t,querySource:"hook_agent"}))`
10. Compaction: called with `querySource:"compact"` for summarization
## Entry Point (s$ definition, partially reconstructed)
```javascript
async function* s$({messages, systemPrompt, userContext, systemContext, canUseTool, toolUseContext, autoCompactTracking, fallbackModel, stopHookActive, querySource}) {
let sdkBetas = toolUseContext.options.sdkBetas;
yield {type: "stream_request_start"};
// Build API parameters
// ... token counting, system prompt assembly ...
// Call Anthropic Messages API (streaming)
// ... SSE processing loop ...
// For each content block:
// - text: accumulate text_delta
// - tool_use: accumulate input_json_delta
// - thinking: accumulate thinking_delta
// When message_stop:
yield {type: "assistant", message: accumulatedMessage};
// If tool_use blocks present:
// For each tool:
// 1. Permission check via canUseTool
// 2. Execute tool.call()
// 3. Collect results
// Assemble tool_result messages
// yield {type: "user", tool_results}
// yield* s$({...updatedParams}) // RECURSIVE
// If end_turn:
// yield {type: "stop"}
// Run stop hooks if active
}
```
## Event Consumer Pattern (iBA function)
```javascript
// iBA processes events from the generator:
function iBA(event, addMessage, updateLength, setStreamMode, clearSpinner) {
switch (event.type) {
case "assistant":
addMessage(event.message);
break;
case "user":
addMessage(event.message);
break;
case "stream_event":
if (event.event.type === "content_block_delta" && event.event.delta.type === "text_delta") {
updateLength(event.event.delta.text.length);
}
break;
// ... other event types
}
}
```
## Query Preparation (JP / c_7 functions)
```javascript
// c_7 prepares the query context
async function c_7({input, mode, messages, mainLoopModel, pastedContents, ideSelection,
memoryPath, thinkingTokens, thinkingEnabled, querySource, commands,
isLoading, setIsLoading, setToolJSX, getToolUseContext,
setUserInputOnProcessing, setAbortController, onQuery,
resetLoadingState, setAppState, onBeforeQuery, resetHistory}) {
// 1. Create abort controller
// 2. Build thinking config: p_7(mode, thinkingTokens, input, thinkingEnabled)
// 3. Prepare messages and tools
// 4. Call onQuery callback
// 5. Return history entry
}
```