feat(desktop): add acp process client

This commit is contained in:
DragonnZhang 2026-04-25 02:43:01 +08:00
parent 132269fff4
commit ca8ed0bcb5
5 changed files with 645 additions and 3 deletions

View file

@ -72,7 +72,7 @@ order, verification, decisions, and remaining work.
### Slice 3: ACP Process Client Wrapper
- Status: pending
- Status: complete
- Goal: implement a desktop-local ACP child-process client around
`qwen --acp --channel=Desktop`.
- Files:
@ -213,6 +213,18 @@ order, verification, decisions, and remaining work.
- `npm run typecheck` passed across workspaces.
- `npm run build` passed across the configured build order. Existing VS Code
companion lint warnings were reported by its build script, with no errors.
- 2026-04-25 Slice 3:
- `npm install --ignore-scripts --workspace=@qwen-code/desktop` passed.
- `npx prettier --check design/qwen-code-electron-desktop-implementation-plan.md scripts/build.js packages/desktop` passed.
- `npm run test --workspace=packages/desktop` passed: 2 files, 12 tests.
- `npm run lint --workspace=packages/desktop` passed.
- `npm run typecheck --workspace=packages/desktop` passed.
- `npm run build --workspace=packages/desktop` passed.
- `npm exec --workspace=packages/desktop -- electron --version` passed:
`v41.3.0`.
- `npm run typecheck` passed across workspaces.
- `npm run build` passed across the configured build order. Existing VS Code
companion lint warnings were reported by its build script, with no errors.
## Self Review Notes
@ -234,10 +246,17 @@ order, verification, decisions, and remaining work.
placeholders until ACP is connected.
- Renderer displays runtime summary from REST only and still obtains the
server token only through preload.
- 2026-04-25 Slice 3:
- `AcpProcessClient` follows the existing Qwen ACP boundary:
`ClientSideConnection`, `ndJsonStream`, and `qwen --acp`.
- The wrapper defaults permission requests to cancellation until the
permission bridge slice supplies a UI-backed resolver.
- Startup failures race initialize against child exit; later process exits
clear connection state without leaving a rejected startup promise.
## Remaining Work
- Commit Slice 2.
- Continue with Slice 3 ACP process client wrapper.
- Commit Slice 3.
- Continue with Slice 4 session REST API.
- Continue through the ACP, session, WebSocket, permission, settings, and
packaging slices until the architecture MVP is fully verified.

1
package-lock.json generated
View file

@ -18786,6 +18786,7 @@
"name": "@qwen-code/desktop",
"version": "0.15.2",
"dependencies": {
"@agentclientprotocol/sdk": "^0.14.1",
"@qwen-code/webui": "file:../webui",
"react": "^19.1.0",
"react-dom": "^19.1.0"

View file

@ -17,6 +17,7 @@
"typecheck": "tsc --noEmit --project tsconfig.main.json && tsc --noEmit --project tsconfig.renderer.json"
},
"dependencies": {
"@agentclientprotocol/sdk": "^0.14.1",
"@qwen-code/webui": "file:../webui",
"react": "^19.1.0",
"react-dom": "^19.1.0"

View file

@ -0,0 +1,265 @@
/**
* @license
* Copyright 2026 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
import { EventEmitter } from 'node:events';
import type { ChildProcess, SpawnOptions } from 'node:child_process';
import { PassThrough } from 'node:stream';
import { describe, expect, it, vi } from 'vitest';
import { PROTOCOL_VERSION } from '@agentclientprotocol/sdk';
import type {
Agent,
Client,
ClientSideConnection,
InitializeResponse,
RequestPermissionRequest,
SessionNotification,
} from '@agentclientprotocol/sdk';
import {
AcpProcessClient,
type AcpProcessClientOptions,
} from './AcpProcessClient.js';
type AcpSdkConnection = Pick<
ClientSideConnection,
| 'authenticate'
| 'cancel'
| 'extMethod'
| 'initialize'
| 'loadSession'
| 'newSession'
| 'prompt'
| 'setSessionMode'
| 'unstable_listSessions'
| 'unstable_setSessionModel'
>;
interface Harness {
client: AcpProcessClient;
child: ChildProcess;
connection: AcpSdkConnection;
capturedClients: Client[];
spawnProcess: ReturnType<typeof vi.fn>;
}
describe('AcpProcessClient', () => {
it('spawns qwen ACP with the Desktop channel and initializes SDK', async () => {
const harness = createHarness({
cwd: '/workspace',
extraArgs: ['--model', 'qwen-plus'],
env: { QWEN_TEST_FLAG: '1' },
});
await harness.client.connect();
expect(harness.spawnProcess).toHaveBeenCalledWith(
process.execPath,
['/tmp/qwen.js', '--acp', '--channel=Desktop', '--model', 'qwen-plus'],
expect.objectContaining({
cwd: '/workspace',
shell: false,
stdio: ['pipe', 'pipe', 'pipe'],
}),
);
expect(harness.connection.initialize).toHaveBeenCalledWith({
protocolVersion: PROTOCOL_VERSION,
clientCapabilities: {},
});
});
it('forwards ACP client callbacks to desktop handlers', async () => {
const harness = createHarness();
const onSessionUpdate = vi.fn();
const onPermissionRequest = vi
.fn()
.mockResolvedValue({ outcome: { outcome: 'cancelled' } });
const onExtNotification = vi.fn();
harness.client.onSessionUpdate = onSessionUpdate;
harness.client.onPermissionRequest = onPermissionRequest;
harness.client.onExtNotification = onExtNotification;
await harness.client.connect();
const acpClient = harness.capturedClients[0];
const sessionUpdate = {
sessionId: 'session-1',
update: { sessionUpdate: 'agent_message_chunk' },
} as unknown as SessionNotification;
const permissionRequest = {
sessionId: 'session-1',
options: [],
} as unknown as RequestPermissionRequest;
await acpClient.sessionUpdate(sessionUpdate);
await acpClient.requestPermission(permissionRequest);
await acpClient.extNotification('authenticate/update', { ok: true });
expect(onSessionUpdate).toHaveBeenCalledWith(sessionUpdate);
expect(onPermissionRequest).toHaveBeenCalledWith(permissionRequest);
expect(onExtNotification).toHaveBeenCalledWith('authenticate/update', {
ok: true,
});
});
it('wraps session and prompt ACP methods', async () => {
const harness = createHarness();
await harness.client.connect();
await harness.client.newSession('/workspace');
await harness.client.loadSession('session-1', '/workspace');
await harness.client.listSessions({
cwd: '/workspace',
cursor: 4,
size: 20,
});
await harness.client.prompt('session-1', 'hello');
await harness.client.cancel('session-1');
await harness.client.setMode('session-1', 'yolo');
await harness.client.setModel('session-1', 'qwen-plus');
expect(harness.connection.newSession).toHaveBeenCalledWith({
cwd: '/workspace',
mcpServers: [],
});
expect(harness.connection.loadSession).toHaveBeenCalledWith({
sessionId: 'session-1',
cwd: '/workspace',
mcpServers: [],
});
expect(harness.connection.unstable_listSessions).toHaveBeenCalledWith({
cwd: '/workspace',
cursor: '4',
_meta: { size: 20 },
});
expect(harness.connection.prompt).toHaveBeenCalledWith({
sessionId: 'session-1',
prompt: [{ type: 'text', text: 'hello' }],
});
expect(harness.connection.cancel).toHaveBeenCalledWith({
sessionId: 'session-1',
});
expect(harness.connection.setSessionMode).toHaveBeenCalledWith({
sessionId: 'session-1',
modeId: 'yolo',
});
expect(harness.connection.unstable_setSessionModel).toHaveBeenCalledWith({
sessionId: 'session-1',
modelId: 'qwen-plus',
});
});
it('clears connection state and kills the process on disconnect', async () => {
const harness = createHarness();
await harness.client.connect();
harness.client.disconnect();
expect(harness.child.kill).toHaveBeenCalledOnce();
expect(harness.client.isConnected).toBe(false);
});
it('reports child process exits through onDisconnected', async () => {
const harness = createHarness();
const onDisconnected = vi.fn();
harness.client.onDisconnected = onDisconnected;
await harness.client.connect();
harness.child.emit('exit', 143, 'SIGTERM');
expect(onDisconnected).toHaveBeenCalledWith(143, 'SIGTERM');
expect(harness.client.isConnected).toBe(false);
});
it('rejects connect when the ACP process exits before initialize completes', async () => {
const harness = createHarness();
vi.mocked(harness.connection.initialize).mockReturnValue(
new Promise<InitializeResponse>(() => {}),
);
const connectPromise = harness.client.connect();
harness.child.stderr?.emit('data', Buffer.from('startup failed'));
harness.child.emit('exit', 1, null);
await expect(connectPromise).rejects.toThrow(
'Qwen ACP process exited unexpectedly',
);
});
});
function createHarness(
options: Partial<AcpProcessClientOptions> = {},
): Harness {
const child = createFakeChild();
const connection = createFakeConnection();
const capturedClients: Client[] = [];
const spawnProcess = vi.fn(
(_command: string, _args: string[], _options: SpawnOptions) => child,
);
const createConnection: AcpProcessClientOptions['createConnection'] = (
clientFactory: (agent: Agent) => Client,
) => {
capturedClients.push(clientFactory({} as Agent));
return connection;
};
return {
child,
connection,
capturedClients,
spawnProcess,
client: new AcpProcessClient({
cliEntryPath: '/tmp/qwen.js',
startupDelayMs: 0,
validateCliPath: false,
spawnProcess,
createConnection,
...options,
}),
};
}
function createFakeConnection(): AcpSdkConnection {
return {
authenticate: vi.fn().mockResolvedValue({}),
cancel: vi.fn().mockResolvedValue(undefined),
extMethod: vi.fn().mockResolvedValue({ success: true }),
initialize: vi.fn().mockResolvedValue({
protocolVersion: PROTOCOL_VERSION,
agentInfo: {
name: 'qwen-code',
title: 'Qwen Code',
version: 'test',
},
agentCapabilities: {},
authMethods: [],
} satisfies InitializeResponse),
loadSession: vi.fn().mockResolvedValue({ sessionId: 'session-1' }),
newSession: vi.fn().mockResolvedValue({ sessionId: 'session-1' }),
prompt: vi.fn().mockResolvedValue({ stopReason: 'end_turn' }),
setSessionMode: vi.fn().mockResolvedValue({}),
unstable_listSessions: vi.fn().mockResolvedValue({ sessions: [] }),
unstable_setSessionModel: vi.fn().mockResolvedValue({}),
} as unknown as AcpSdkConnection;
}
function createFakeChild(): ChildProcess {
const child = new EventEmitter() as ChildProcess;
const stdin = new PassThrough();
const stdout = new PassThrough();
const stderr = new PassThrough();
Object.assign(child, {
stdin,
stdout,
stderr,
killed: false,
exitCode: null,
kill: vi.fn(() => {
Object.assign(child, { killed: true, exitCode: 0 });
return true;
}),
});
return child;
}

View file

@ -0,0 +1,356 @@
/**
* @license
* Copyright 2026 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
import { spawn } from 'node:child_process';
import type { ChildProcess, SpawnOptions } from 'node:child_process';
import { existsSync } from 'node:fs';
import { Readable, Writable } from 'node:stream';
import {
ClientSideConnection,
ndJsonStream,
PROTOCOL_VERSION,
} from '@agentclientprotocol/sdk';
import type {
Agent,
AuthenticateResponse,
Client,
ClientCapabilities,
ContentBlock,
InitializeResponse,
ListSessionsResponse,
LoadSessionResponse,
NewSessionResponse,
PromptResponse,
RequestPermissionRequest,
RequestPermissionResponse,
SessionNotification,
SetSessionModeResponse,
SetSessionModelResponse,
} from '@agentclientprotocol/sdk';
type AcpSdkConnection = Pick<
ClientSideConnection,
| 'authenticate'
| 'cancel'
| 'extMethod'
| 'initialize'
| 'loadSession'
| 'newSession'
| 'prompt'
| 'setSessionMode'
| 'unstable_listSessions'
| 'unstable_setSessionModel'
>;
type CreateAcpConnection = (
clientFactory: (agent: Agent) => Client,
stdin: WritableStream,
stdout: ReadableStream<Uint8Array>,
) => AcpSdkConnection;
type SpawnProcess = (
command: string,
args: string[],
options: SpawnOptions,
) => ChildProcess;
export interface AcpProcessClientOptions {
cliEntryPath: string;
cwd?: string;
command?: string;
channel?: 'Desktop';
extraArgs?: string[];
env?: NodeJS.ProcessEnv;
startupDelayMs?: number;
validateCliPath?: boolean;
spawnProcess?: SpawnProcess;
createConnection?: CreateAcpConnection;
}
export interface ListSessionsOptions {
cwd?: string;
cursor?: number;
size?: number;
}
export class AcpProcessClient {
private child: ChildProcess | null = null;
private connection: AcpSdkConnection | null = null;
private readonly options: Required<
Pick<
AcpProcessClientOptions,
'channel' | 'startupDelayMs' | 'validateCliPath'
>
> &
Omit<
AcpProcessClientOptions,
'channel' | 'startupDelayMs' | 'validateCliPath'
>;
onSessionUpdate: (notification: SessionNotification) => void = () => {};
onPermissionRequest: (
request: RequestPermissionRequest,
) => Promise<RequestPermissionResponse> = async () => ({
outcome: { outcome: 'cancelled' },
});
onExtNotification: (
method: string,
params: Record<string, unknown>,
) => Promise<void> = async () => {};
onDisconnected: (code: number | null, signal: string | null) => void =
() => {};
onInitialized: (response: InitializeResponse) => void = () => {};
constructor(options: AcpProcessClientOptions) {
this.options = {
...options,
channel: options.channel ?? 'Desktop',
startupDelayMs: options.startupDelayMs ?? 1000,
validateCliPath: options.validateCliPath ?? true,
};
}
get isConnected(): boolean {
return (
this.child !== null &&
!this.child.killed &&
this.child.exitCode === null &&
this.connection !== null
);
}
async connect(): Promise<InitializeResponse> {
if (this.child) {
this.disconnect();
}
if (
this.options.validateCliPath &&
!existsSync(this.options.cliEntryPath)
) {
throw new Error(
`Qwen CLI ACP entry not found: ${this.options.cliEntryPath}`,
);
}
const child = this.spawnChild();
const stderrChunks: string[] = [];
let spawnError: Error | null = null;
let startupComplete = false;
const processExitPromise = new Promise<never>((_resolve, reject) => {
child.on('exit', (code: number | null, signal: string | null) => {
const stderrOutput = stderrChunks.join('').trim();
const stderrSuffix = stderrOutput
? `\nCLI stderr: ${stderrOutput.slice(-500)}`
: '';
if (this.child === child) {
this.child = null;
this.connection = null;
this.onDisconnected(code, signal);
}
if (!startupComplete) {
reject(
new Error(
`Qwen ACP process exited unexpectedly (exit code: ${code}, signal: ${signal})${stderrSuffix}`,
),
);
}
});
});
child.stderr?.on('data', (data: Buffer) => {
stderrChunks.push(data.toString());
});
child.on('error', (error: Error) => {
spawnError = error;
});
this.child = child;
if (this.options.startupDelayMs > 0) {
await delay(this.options.startupDelayMs);
}
if (spawnError) {
throw spawnError;
}
if (child.killed || child.exitCode !== null) {
throw new Error('Qwen ACP process failed to start.');
}
const connection = this.createSdkConnection(child);
this.connection = connection;
const initializeResponse = await Promise.race([
connection.initialize({
protocolVersion: PROTOCOL_VERSION,
clientCapabilities: this.getClientCapabilities(),
}),
processExitPromise,
]).finally(() => {
startupComplete = true;
});
this.onInitialized(initializeResponse);
return initializeResponse;
}
disconnect(): void {
if (this.child) {
this.child.kill();
}
this.child = null;
this.connection = null;
}
async authenticate(methodId = 'default'): Promise<AuthenticateResponse> {
return this.ensureConnection().authenticate({ methodId });
}
async newSession(cwd: string): Promise<NewSessionResponse> {
return this.ensureConnection().newSession({ cwd, mcpServers: [] });
}
async loadSession(
sessionId: string,
cwd: string,
): Promise<LoadSessionResponse> {
return this.ensureConnection().loadSession({
sessionId,
cwd,
mcpServers: [],
});
}
async listSessions(
options: ListSessionsOptions = {},
): Promise<ListSessionsResponse> {
const params: Parameters<AcpSdkConnection['unstable_listSessions']>[0] = {
cwd: options.cwd ?? this.options.cwd ?? process.cwd(),
};
if (options.cursor !== undefined) {
params.cursor = String(options.cursor);
}
if (options.size !== undefined) {
params._meta = { ...(params._meta ?? {}), size: options.size };
}
return this.ensureConnection().unstable_listSessions(params);
}
async prompt(
sessionId: string,
prompt: string | ContentBlock[],
): Promise<PromptResponse> {
const promptBlocks =
typeof prompt === 'string' ? [{ type: 'text', text: prompt }] : prompt;
return this.ensureConnection().prompt({
sessionId,
prompt: promptBlocks,
});
}
async cancel(sessionId: string): Promise<void> {
await this.ensureConnection().cancel({ sessionId });
}
async setMode(
sessionId: string,
modeId: string,
): Promise<SetSessionModeResponse | void> {
return this.ensureConnection().setSessionMode({ sessionId, modeId });
}
async setModel(
sessionId: string,
modelId: string,
): Promise<SetSessionModelResponse | void> {
return this.ensureConnection().unstable_setSessionModel({
sessionId,
modelId,
});
}
async extMethod<T extends Record<string, unknown>>(
method: string,
params: Record<string, unknown>,
): Promise<T> {
return (await this.ensureConnection().extMethod(method, params)) as T;
}
private spawnChild(): ChildProcess {
const command = this.options.command ?? process.execPath;
const args = [
this.options.cliEntryPath,
'--acp',
`--channel=${this.options.channel}`,
...(this.options.extraArgs ?? []),
];
const env = { ...process.env, ...(this.options.env ?? {}) };
return (this.options.spawnProcess ?? spawn)(command, args, {
cwd: this.options.cwd ?? process.cwd(),
env,
shell: false,
stdio: ['pipe', 'pipe', 'pipe'],
});
}
private createSdkConnection(child: ChildProcess): AcpSdkConnection {
const stdout = Readable.toWeb(child.stdout!) as ReadableStream<Uint8Array>;
const stdin = Writable.toWeb(child.stdin!) as WritableStream;
const createConnection =
this.options.createConnection ?? createSdkConnection;
return createConnection(
() => ({
sessionUpdate: (params: SessionNotification): Promise<void> => {
this.onSessionUpdate(params);
return Promise.resolve();
},
requestPermission: (
params: RequestPermissionRequest,
): Promise<RequestPermissionResponse> =>
this.onPermissionRequest(params),
extNotification: (
method: string,
params: Record<string, unknown>,
): Promise<void> => this.onExtNotification(method, params),
}),
stdin,
stdout,
);
}
private ensureConnection(): AcpSdkConnection {
if (!this.isConnected || !this.connection) {
throw new Error('Not connected to ACP agent');
}
return this.connection;
}
private getClientCapabilities(): ClientCapabilities {
return {};
}
}
function createSdkConnection(
clientFactory: (agent: Agent) => Client,
stdin: WritableStream,
stdout: ReadableStream<Uint8Array>,
): AcpSdkConnection {
return new ClientSideConnection(clientFactory, ndJsonStream(stdin, stdout));
}
async function delay(ms: number): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, ms));
}