fix(acp): wait for claude results before idle completion

This commit is contained in:
Peter Steinberger 2026-04-27 12:12:32 +01:00
parent eebdda92f0
commit 277cc640b1
No known key found for this signature in database
10 changed files with 403 additions and 37 deletions

View file

@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
- Control UI/Gateway: preserve WebChat client version labels across localhost, 127.0.0.1, and IPv6 loopback aliases on the same port, avoiding misleading `vcontrol-ui` connection logs while investigating duplicate-message reports. Refs #72753 and #72742. Thanks @LumenFromTheFuture and @allesgutefy.
- Agents/reasoning: treat orphan closing reasoning tags with following answer text as a privacy boundary across delivery, history, streaming, and Control UI sanitizers so malformed local-model output cannot leak chain-of-thought text. Fixes #67092. Thanks @AnildoSilva.
- Memory-core: run one-shot memory CLI commands through transient builtin and QMD managers so `memory index`, `memory status --index`, and `memory search` no longer start long-lived file watchers that can hit macOS `EMFILE` limits. Fixes #59101; carries forward #49851. Thanks @mbear469210-coder and @maoyuanxue.
- Agents/ACP: ship the Claude ACP adapter with OpenClaw and require Claude result messages before idle can complete a prompt, preventing parent agents from waking early on long-running `sessions_spawn(runtime: "acp", agentId: "claude")` children. Fixes #72080. Thanks @siavash-saki and @iannwu.
- Memory-core: re-resolve the active runtime config whenever `memory_search` or `memory_get` executes, so provider changes made by `config.patch` stop leaving stale embedding backends behind in existing tool instances. Fixes #61098. Thanks @BradGroux and @Linux2010.
- WebChat: keep bare `/new` and `/reset` startup instructions out of visible chat history while preserving `/reset <note>` as user-visible transcript text. Fixes #72369. Thanks @collynes and @haishmg.
- CLI/doctor: remove dangling channel config, heartbeat targets, and channel model overrides when stale plugin repair removes a missing channel plugin, preventing Gateway boot loops after failed plugin reinstalls. Fixes #65293. Thanks @yidecode.

View file

@ -4,6 +4,7 @@
"description": "OpenClaw ACP runtime backend",
"type": "module",
"dependencies": {
"@agentclientprotocol/claude-agent-acp": "0.31.0",
"acpx": "0.6.1"
},
"devDependencies": {

View file

@ -0,0 +1,129 @@
import { ClaudeAcpAgent } from "@agentclientprotocol/claude-agent-acp";
import { describe, expect, it, vi } from "vitest";
type IteratorResultResolver = (value: IteratorResult<unknown>) => void;
class ManualAsyncIterator implements AsyncIterator<unknown> {
private readonly pending: IteratorResultResolver[] = [];
private readonly queued: IteratorResult<unknown>[] = [];
next(): Promise<IteratorResult<unknown>> {
const next = this.queued.shift();
if (next) {
return Promise.resolve(next);
}
return new Promise((resolve) => {
this.pending.push(resolve);
});
}
push(value: unknown): void {
this.resolve({ value, done: false });
}
end(): void {
this.resolve({ value: undefined, done: true });
}
private resolve(value: IteratorResult<unknown>): void {
const pending = this.pending.shift();
if (pending) {
pending(value);
return;
}
this.queued.push(value);
}
}
function makeResultMessage() {
return {
type: "result",
subtype: "success",
is_error: false,
result: "finished",
stop_reason: null,
total_cost_usd: 0,
usage: {
input_tokens: 1,
output_tokens: 1,
cache_read_input_tokens: 0,
cache_creation_input_tokens: 0,
},
modelUsage: [],
};
}
function makeIdleMessage() {
return {
type: "system",
subtype: "session_state_changed",
state: "idle",
session_id: "session-1",
};
}
async function flushMicrotasks(): Promise<void> {
await Promise.resolve();
await Promise.resolve();
}
describe("patched claude-agent-acp completion", () => {
it("does not resolve a prompt on idle before the result message", async () => {
const query = new ManualAsyncIterator();
const agent = new ClaudeAcpAgent({
sessionUpdate: vi.fn(),
extNotification: vi.fn(),
} as unknown as ConstructorParameters<typeof ClaudeAcpAgent>[0]);
agent.sessions["session-1"] = {
cancelled: false,
accumulatedUsage: {
inputTokens: 0,
outputTokens: 0,
cachedReadTokens: 0,
cachedWriteTokens: 0,
},
contextWindowSize: 200_000,
cwd: "/tmp",
emitRawSDKMessages: false,
input: {
push: vi.fn(),
end: vi.fn(),
},
nextPendingOrder: 0,
pendingMessages: new Map(),
promptRunning: false,
query,
settingsManager: {
dispose: vi.fn(),
},
} as unknown as (typeof agent.sessions)[string];
let resolved = false;
const promptPromise = agent
.prompt({
sessionId: "session-1",
prompt: [{ type: "text", text: "do work" }],
})
.then((value) => {
resolved = true;
return value;
});
query.push(makeIdleMessage());
await flushMicrotasks();
expect(resolved).toBe(false);
query.push(makeResultMessage());
await flushMicrotasks();
expect(resolved).toBe(false);
query.push(makeIdleMessage());
await expect(promptPromise).resolves.toMatchObject({
stopReason: "end_turn",
usage: {
inputTokens: 1,
outputTokens: 1,
},
});
});
});

View file

@ -1770,7 +1770,8 @@
}
},
"patchedDependencies": {
"@whiskeysockets/baileys@7.0.0-rc.9": "patches/@whiskeysockets__baileys@7.0.0-rc.9.patch"
"@whiskeysockets/baileys@7.0.0-rc.9": "patches/@whiskeysockets__baileys@7.0.0-rc.9.patch",
"@agentclientprotocol/claude-agent-acp@0.31.0": "patches/@agentclientprotocol__claude-agent-acp@0.31.0.patch"
}
}
}

View file

@ -0,0 +1,39 @@
diff --git a/dist/acp-agent.js b/dist/acp-agent.js
index 0a8f5e3c57ed05189cba546bd65fc18143744d09..a8522d86a5a2f1bbcdd446d222cb9b7b5acb79ca 100644
--- a/dist/acp-agent.js
+++ b/dist/acp-agent.js
@@ -421,6 +421,7 @@ export class ClaudeAcpAgent {
session.promptRunning = true;
let handedOff = false;
let stopReason = "end_turn";
+ let sawResult = false;
try {
while (true) {
const { value: message, done } = await session.query.next();
@@ -428,6 +429,9 @@ export class ClaudeAcpAgent {
if (session.cancelled) {
return { stopReason: "cancelled" };
}
+ if (sawResult) {
+ return { stopReason, usage: sessionUsage(session) };
+ }
break;
}
if (session.emitRawSDKMessages &&
@@ -496,7 +500,7 @@ export class ClaudeAcpAgent {
break;
}
case "session_state_changed": {
- if (message.state === "idle") {
+ if (message.state === "idle" && sawResult) {
return { stopReason, usage: sessionUsage(session) };
}
break;
@@ -601,6 +605,7 @@ export class ClaudeAcpAgent {
unreachable(message, this.logger);
break;
}
+ sawResult = true;
break;
}
case "stream_event": {

111
pnpm-lock.yaml generated
View file

@ -31,6 +31,9 @@ overrides:
packageExtensionsChecksum: sha256-n+P/SQo4Pf+dHYpYn1Y6wL4cJEVoVzZ835N0OEp4TM8=
patchedDependencies:
'@agentclientprotocol/claude-agent-acp@0.31.0':
hash: e8b472d71289ac8de9813c57d79abac524889ca96f279f6f3ad08043434f6615
path: patches/@agentclientprotocol__claude-agent-acp@0.31.0.patch
'@whiskeysockets/baileys@7.0.0-rc.9':
hash: 23ec8efe1484afa57c51b96955ba331d1467521a8e676a18c2690da7e70a6201
path: patches/@whiskeysockets__baileys@7.0.0-rc.9.patch
@ -214,6 +217,9 @@ importers:
extensions/acpx:
dependencies:
'@agentclientprotocol/claude-agent-acp':
specifier: 0.31.0
version: 0.31.0(patch_hash=e8b472d71289ac8de9813c57d79abac524889ca96f279f6f3ad08043434f6615)
acpx:
specifier: 0.6.1
version: 0.6.1
@ -1577,11 +1583,65 @@ importers:
packages:
'@agentclientprotocol/claude-agent-acp@0.31.0':
resolution: {integrity: sha512-AHyMSwBWg5MOmsN3UZSsw2kEG7HEkFMSWtSGgw20UyTFv9nbM14YtR3ABpXLICKzw+SkZbN/Zlz6vBE8Sk8n+w==}
hasBin: true
'@agentclientprotocol/sdk@0.20.0':
resolution: {integrity: sha512-BxEHyE4MvwyOsdyVPub1vEtyrq8E0JSdjC+ckXWimY1VabFCTXdPyXv2y2Omz1j+iod7Z8oBJDXFCJptM0GBqQ==}
peerDependencies:
zod: ^3.25.0 || ^4.0.0
'@anthropic-ai/claude-agent-sdk-darwin-arm64@0.2.119':
resolution: {integrity: sha512-kxnG37SZqUata2Jcp/YQ0n9Y7o/sinE/8LdG4ltM1gePh+z+0Mfa4vBUUTEBMBFth9PTovKoesIuVuyFpvO/Cw==}
cpu: [arm64]
os: [darwin]
'@anthropic-ai/claude-agent-sdk-darwin-x64@0.2.119':
resolution: {integrity: sha512-9Aj8g3ELsmZuOFg17TCkikeg/Wt2ucVT8hOOPQUatzLd7BKhydrHLA0RP42nBpWECO1B/n/mPdQ4iS/LS3s2Fg==}
cpu: [x64]
os: [darwin]
'@anthropic-ai/claude-agent-sdk-linux-arm64-musl@0.2.119':
resolution: {integrity: sha512-IPGWgtz+gGnD7fxKAvSf913EUT/lYBTBE8EZ7lh3+x5ZP2859LWLmrCm053Lf3nMWo/CWikZsVPwkDVwpz6tIQ==}
cpu: [arm64]
os: [linux]
libc: [musl]
'@anthropic-ai/claude-agent-sdk-linux-arm64@0.2.119':
resolution: {integrity: sha512-v3o464XkiYehp/OKidQQirxdVb+aGSvdJvHF2zH9p33W8M/NC21zwwh4dhwDnKsyrtBIgkt2CcMwzIl30r0OtA==}
cpu: [arm64]
os: [linux]
libc: [glibc]
'@anthropic-ai/claude-agent-sdk-linux-x64-musl@0.2.119':
resolution: {integrity: sha512-QYxFNAe4FFridPkKhGlNcNBJ0TaIygWYyvfI9g4kX0i+RVbresUWuZVkWY06ioJ0fXoixFJ+HNQBMB7dLrIp8Q==}
cpu: [x64]
os: [linux]
libc: [musl]
'@anthropic-ai/claude-agent-sdk-linux-x64@0.2.119':
resolution: {integrity: sha512-9ePt4ZN+hsqDw4AgS4KtcWIGKfL9Oq28kwkrTER/QAcSrVKxiLonp81cCLzg7Ok/IUJu4Cfd71GZbFv/WE54zw==}
cpu: [x64]
os: [linux]
libc: [glibc]
'@anthropic-ai/claude-agent-sdk-win32-arm64@0.2.119':
resolution: {integrity: sha512-p/TjcKQvkCYtXGPlR+mdyNwqCmvRcQL34Wtq0yUZ+iqmI/eyCe59IJ3AZrE0EZoqmiAevEYzatPIt9sncC9uxw==}
cpu: [arm64]
os: [win32]
'@anthropic-ai/claude-agent-sdk-win32-x64@0.2.119':
resolution: {integrity: sha512-k98Ju0wtktm6FhqTE/cXlVr6K4kGqBolVjEGzeKkW6ZILc7124euwNapAvkQCwMAavAxS/ZnO3jdKMtHtwTVTA==}
cpu: [x64]
os: [win32]
'@anthropic-ai/claude-agent-sdk@0.2.119':
resolution: {integrity: sha512-6AvthpsaOTlkn514brSGOcCSLHDXODnU+ExN1O3CJCjxr5RBcmzR057C9EIM0G7IchnXsRfMZgRO1QKsjTXdbA==}
engines: {node: '>=18.0.0'}
peerDependencies:
zod: ^4.0.0
'@anthropic-ai/sdk@0.91.0':
resolution: {integrity: sha512-hybd/DOI3ujG4gZyqqcWnSekYxkdjr1JbZYqP2Lb4AmcsU6HCTHSrTOgqedPSsQAruBVucHNAoD1vTQnpPzedw==}
hasBin: true
@ -7555,10 +7615,61 @@ packages:
snapshots:
'@agentclientprotocol/claude-agent-acp@0.31.0(patch_hash=e8b472d71289ac8de9813c57d79abac524889ca96f279f6f3ad08043434f6615)':
dependencies:
'@agentclientprotocol/sdk': 0.20.0(zod@4.3.6)
'@anthropic-ai/claude-agent-sdk': 0.2.119(zod@4.3.6)
zod: 4.3.6
transitivePeerDependencies:
- '@cfworker/json-schema'
- supports-color
'@agentclientprotocol/sdk@0.20.0(zod@4.3.6)':
dependencies:
zod: 4.3.6
'@anthropic-ai/claude-agent-sdk-darwin-arm64@0.2.119':
optional: true
'@anthropic-ai/claude-agent-sdk-darwin-x64@0.2.119':
optional: true
'@anthropic-ai/claude-agent-sdk-linux-arm64-musl@0.2.119':
optional: true
'@anthropic-ai/claude-agent-sdk-linux-arm64@0.2.119':
optional: true
'@anthropic-ai/claude-agent-sdk-linux-x64-musl@0.2.119':
optional: true
'@anthropic-ai/claude-agent-sdk-linux-x64@0.2.119':
optional: true
'@anthropic-ai/claude-agent-sdk-win32-arm64@0.2.119':
optional: true
'@anthropic-ai/claude-agent-sdk-win32-x64@0.2.119':
optional: true
'@anthropic-ai/claude-agent-sdk@0.2.119(zod@4.3.6)':
dependencies:
'@anthropic-ai/sdk': 0.91.0(zod@4.3.6)
'@modelcontextprotocol/sdk': 1.29.0(zod@4.3.6)
zod: 4.3.6
optionalDependencies:
'@anthropic-ai/claude-agent-sdk-darwin-arm64': 0.2.119
'@anthropic-ai/claude-agent-sdk-darwin-x64': 0.2.119
'@anthropic-ai/claude-agent-sdk-linux-arm64': 0.2.119
'@anthropic-ai/claude-agent-sdk-linux-arm64-musl': 0.2.119
'@anthropic-ai/claude-agent-sdk-linux-x64': 0.2.119
'@anthropic-ai/claude-agent-sdk-linux-x64-musl': 0.2.119
'@anthropic-ai/claude-agent-sdk-win32-arm64': 0.2.119
'@anthropic-ai/claude-agent-sdk-win32-x64': 0.2.119
transitivePeerDependencies:
- '@cfworker/json-schema'
- supports-color
'@anthropic-ai/sdk@0.91.0(zod@4.3.6)':
dependencies:
json-schema-to-ts: 3.1.1

View file

@ -39,6 +39,7 @@ import {
applyManagerRuntimeControls,
resolveManagerRuntimeCapabilities,
} from "./manager.runtime-controls.js";
import { consumeAcpTurnStream } from "./manager.turn-stream.js";
import {
type AcpCloseSessionInput,
type AcpCloseSessionResult,
@ -784,59 +785,46 @@ export class AcpSessionManager {
this.activeTurnBySession.set(actorKey, activeTurn);
activeTurnStarted = true;
let streamError: AcpRuntimeError | null = null;
const combinedSignal =
input.signal && typeof AbortSignal.any === "function"
? AbortSignal.any([input.signal, internalAbortController.signal])
: internalAbortController.signal;
const eventGate = { open: true };
const turnPromise = (async () => {
for await (const event of runtime.runTurn({
const turnPromise = consumeAcpTurnStream({
runtime,
turn: {
handle,
text: input.text,
attachments: input.attachments,
mode: input.mode,
requestId: input.requestId,
signal: combinedSignal,
})) {
if (!eventGate.open) {
continue;
}
if (event.type === "error") {
streamError = new AcpRuntimeError(
normalizeAcpErrorCode(event.code),
normalizeText(event.message) || "ACP turn failed before completion.",
},
eventGate,
onOutputEvent: (event) => {
sawTurnOutput = true;
if (event.type === "text_delta" && event.stream !== "thought" && event.text) {
taskProgressSummary = appendBackgroundTaskProgressSummary(
taskProgressSummary,
event.text,
);
} else if (event.type === "text_delta" || event.type === "tool_call") {
sawTurnOutput = true;
if (event.type === "text_delta" && event.stream !== "thought" && event.text) {
taskProgressSummary = appendBackgroundTaskProgressSummary(
taskProgressSummary,
event.text,
);
}
if (taskContext) {
this.markBackgroundTaskRunning(taskContext.runId, {
sessionKey,
lastEventAt: Date.now(),
progressSummary: taskProgressSummary || null,
});
}
}
if (input.onEvent) {
await input.onEvent(event);
if (taskContext) {
this.markBackgroundTaskRunning(taskContext.runId, {
sessionKey,
lastEventAt: Date.now(),
progressSummary: taskProgressSummary || null,
});
}
}
if (eventGate.open && streamError) {
throw streamError;
}
})();
},
onEvent: input.onEvent,
});
const turnTimeoutMs = this.resolveTurnTimeoutMs({
cfg: input.cfg,
meta,
});
const sessionMode = meta.mode;
await this.awaitTurnWithTimeout({
const turnOutcome = await this.awaitTurnWithTimeout({
sessionKey,
turnPromise,
timeoutMs: turnTimeoutMs + ACP_TURN_TIMEOUT_GRACE_MS,
@ -854,8 +842,11 @@ export class AcpSessionManager {
});
},
});
if (streamError) {
throw streamError;
if (!turnOutcome.sawTerminalEvent) {
throw new AcpRuntimeError(
"ACP_TURN_FAILED",
"ACP turn ended without a terminal done event.",
);
}
this.recordTurnCompletion({
startedAt: turnStartedAt,

View file

@ -2047,6 +2047,44 @@ describe("AcpSessionManager", () => {
expect(states.at(-1)).toBe("error");
});
it("rejects ACP streams that end without a terminal done event", async () => {
const runtimeState = createRuntime();
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
hoisted.readAcpSessionEntryMock.mockReturnValue({
sessionKey: "agent:codex:acp:session-1",
storeSessionKey: "agent:codex:acp:session-1",
acp: readySessionMeta(),
});
runtimeState.runTurn.mockImplementation(async function* () {
yield {
type: "text_delta" as const,
stream: "output" as const,
text: "Starting work...",
};
});
const manager = new AcpSessionManager();
await expect(
manager.runTurn({
cfg: baseCfg,
sessionKey: "agent:codex:acp:session-1",
text: "do work",
mode: "prompt",
requestId: "run-1",
}),
).rejects.toMatchObject({
code: "ACP_TURN_FAILED",
message: "ACP turn ended without a terminal done event.",
});
const states = extractStatesFromUpserts();
expect(states).toContain("running");
expect(states.at(-1)).toBe("error");
});
it("marks the session as errored when runtime ensure fails before turn start", async () => {
const runtimeState = createRuntime();
runtimeState.ensureSession.mockRejectedValue(new Error("acpx exited with code 1"));

View file

@ -0,0 +1,54 @@
import { AcpRuntimeError } from "../runtime/errors.js";
import type { AcpRuntime, AcpRuntimeEvent, AcpRuntimeTurnInput } from "../runtime/types.js";
import { normalizeAcpErrorCode } from "./manager.utils.js";
import { normalizeText } from "./runtime-options.js";
export type AcpTurnEventGate = {
open: boolean;
};
export type AcpTurnStreamOutcome = {
sawOutput: boolean;
sawTerminalEvent: boolean;
};
export async function consumeAcpTurnStream(params: {
runtime: AcpRuntime;
turn: AcpRuntimeTurnInput;
eventGate: AcpTurnEventGate;
onEvent?: (event: AcpRuntimeEvent) => Promise<void> | void;
onOutputEvent?: (
event: Extract<AcpRuntimeEvent, { type: "text_delta" | "tool_call" }>,
) => Promise<void> | void;
}): Promise<AcpTurnStreamOutcome> {
let streamError: AcpRuntimeError | null = null;
let sawOutput = false;
let sawTerminalEvent = false;
for await (const event of params.runtime.runTurn(params.turn)) {
if (!params.eventGate.open) {
continue;
}
if (event.type === "done") {
sawTerminalEvent = true;
} else if (event.type === "error") {
streamError = new AcpRuntimeError(
normalizeAcpErrorCode(event.code),
normalizeText(event.message) || "ACP turn failed before completion.",
);
} else if (event.type === "text_delta" || event.type === "tool_call") {
sawOutput = true;
await params.onOutputEvent?.(event);
}
await params.onEvent?.(event);
}
if (params.eventGate.open && streamError) {
throw streamError;
}
return {
sawOutput,
sawTerminalEvent,
};
}

View file

@ -51,6 +51,7 @@ export async function runAcpRuntimeAdapterContract(
event.type === "tool_call",
),
).toBe(true);
expect(successEvents.some((event) => event.type === "done")).toBe(true);
await params.assertSuccessEvents?.(successEvents);
if (params.includeControlChecks ?? true) {