diff --git a/design/qwen-code-electron-desktop-implementation-plan.md b/design/qwen-code-electron-desktop-implementation-plan.md index 4a3c1b0f3..c28a9d44c 100644 --- a/design/qwen-code-electron-desktop-implementation-plan.md +++ b/design/qwen-code-electron-desktop-implementation-plan.md @@ -72,7 +72,7 @@ order, verification, decisions, and remaining work. ### Slice 3: ACP Process Client Wrapper -- Status: pending +- Status: complete - Goal: implement a desktop-local ACP child-process client around `qwen --acp --channel=Desktop`. - Files: @@ -213,6 +213,18 @@ order, verification, decisions, and remaining work. - `npm run typecheck` passed across workspaces. - `npm run build` passed across the configured build order. Existing VS Code companion lint warnings were reported by its build script, with no errors. +- 2026-04-25 Slice 3: + - `npm install --ignore-scripts --workspace=@qwen-code/desktop` passed. + - `npx prettier --check design/qwen-code-electron-desktop-implementation-plan.md scripts/build.js packages/desktop` passed. + - `npm run test --workspace=packages/desktop` passed: 2 files, 12 tests. + - `npm run lint --workspace=packages/desktop` passed. + - `npm run typecheck --workspace=packages/desktop` passed. + - `npm run build --workspace=packages/desktop` passed. + - `npm exec --workspace=packages/desktop -- electron --version` passed: + `v41.3.0`. + - `npm run typecheck` passed across workspaces. + - `npm run build` passed across the configured build order. Existing VS Code + companion lint warnings were reported by its build script, with no errors. ## Self Review Notes @@ -234,10 +246,17 @@ order, verification, decisions, and remaining work. placeholders until ACP is connected. - Renderer displays runtime summary from REST only and still obtains the server token only through preload. +- 2026-04-25 Slice 3: + - `AcpProcessClient` follows the existing Qwen ACP boundary: + `ClientSideConnection`, `ndJsonStream`, and `qwen --acp`. + - The wrapper defaults permission requests to cancellation until the + permission bridge slice supplies a UI-backed resolver. + - Startup failures race initialize against child exit; later process exits + clear connection state without leaving a rejected startup promise. ## Remaining Work -- Commit Slice 2. -- Continue with Slice 3 ACP process client wrapper. +- Commit Slice 3. +- Continue with Slice 4 session REST API. - Continue through the ACP, session, WebSocket, permission, settings, and packaging slices until the architecture MVP is fully verified. diff --git a/package-lock.json b/package-lock.json index b29b93aee..27bcde59a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18786,6 +18786,7 @@ "name": "@qwen-code/desktop", "version": "0.15.2", "dependencies": { + "@agentclientprotocol/sdk": "^0.14.1", "@qwen-code/webui": "file:../webui", "react": "^19.1.0", "react-dom": "^19.1.0" diff --git a/packages/desktop/package.json b/packages/desktop/package.json index 72cbb81ba..3e6b6d045 100644 --- a/packages/desktop/package.json +++ b/packages/desktop/package.json @@ -17,6 +17,7 @@ "typecheck": "tsc --noEmit --project tsconfig.main.json && tsc --noEmit --project tsconfig.renderer.json" }, "dependencies": { + "@agentclientprotocol/sdk": "^0.14.1", "@qwen-code/webui": "file:../webui", "react": "^19.1.0", "react-dom": "^19.1.0" diff --git a/packages/desktop/src/server/acp/AcpProcessClient.test.ts b/packages/desktop/src/server/acp/AcpProcessClient.test.ts new file mode 100644 index 000000000..ec0b5d62a --- /dev/null +++ b/packages/desktop/src/server/acp/AcpProcessClient.test.ts @@ -0,0 +1,265 @@ +/** + * @license + * Copyright 2026 Qwen Team + * SPDX-License-Identifier: Apache-2.0 + */ + +import { EventEmitter } from 'node:events'; +import type { ChildProcess, SpawnOptions } from 'node:child_process'; +import { PassThrough } from 'node:stream'; +import { describe, expect, it, vi } from 'vitest'; +import { PROTOCOL_VERSION } from '@agentclientprotocol/sdk'; +import type { + Agent, + Client, + ClientSideConnection, + InitializeResponse, + RequestPermissionRequest, + SessionNotification, +} from '@agentclientprotocol/sdk'; +import { + AcpProcessClient, + type AcpProcessClientOptions, +} from './AcpProcessClient.js'; + +type AcpSdkConnection = Pick< + ClientSideConnection, + | 'authenticate' + | 'cancel' + | 'extMethod' + | 'initialize' + | 'loadSession' + | 'newSession' + | 'prompt' + | 'setSessionMode' + | 'unstable_listSessions' + | 'unstable_setSessionModel' +>; + +interface Harness { + client: AcpProcessClient; + child: ChildProcess; + connection: AcpSdkConnection; + capturedClients: Client[]; + spawnProcess: ReturnType; +} + +describe('AcpProcessClient', () => { + it('spawns qwen ACP with the Desktop channel and initializes SDK', async () => { + const harness = createHarness({ + cwd: '/workspace', + extraArgs: ['--model', 'qwen-plus'], + env: { QWEN_TEST_FLAG: '1' }, + }); + + await harness.client.connect(); + + expect(harness.spawnProcess).toHaveBeenCalledWith( + process.execPath, + ['/tmp/qwen.js', '--acp', '--channel=Desktop', '--model', 'qwen-plus'], + expect.objectContaining({ + cwd: '/workspace', + shell: false, + stdio: ['pipe', 'pipe', 'pipe'], + }), + ); + expect(harness.connection.initialize).toHaveBeenCalledWith({ + protocolVersion: PROTOCOL_VERSION, + clientCapabilities: {}, + }); + }); + + it('forwards ACP client callbacks to desktop handlers', async () => { + const harness = createHarness(); + const onSessionUpdate = vi.fn(); + const onPermissionRequest = vi + .fn() + .mockResolvedValue({ outcome: { outcome: 'cancelled' } }); + const onExtNotification = vi.fn(); + harness.client.onSessionUpdate = onSessionUpdate; + harness.client.onPermissionRequest = onPermissionRequest; + harness.client.onExtNotification = onExtNotification; + + await harness.client.connect(); + const acpClient = harness.capturedClients[0]; + const sessionUpdate = { + sessionId: 'session-1', + update: { sessionUpdate: 'agent_message_chunk' }, + } as unknown as SessionNotification; + const permissionRequest = { + sessionId: 'session-1', + options: [], + } as unknown as RequestPermissionRequest; + + await acpClient.sessionUpdate(sessionUpdate); + await acpClient.requestPermission(permissionRequest); + await acpClient.extNotification('authenticate/update', { ok: true }); + + expect(onSessionUpdate).toHaveBeenCalledWith(sessionUpdate); + expect(onPermissionRequest).toHaveBeenCalledWith(permissionRequest); + expect(onExtNotification).toHaveBeenCalledWith('authenticate/update', { + ok: true, + }); + }); + + it('wraps session and prompt ACP methods', async () => { + const harness = createHarness(); + await harness.client.connect(); + + await harness.client.newSession('/workspace'); + await harness.client.loadSession('session-1', '/workspace'); + await harness.client.listSessions({ + cwd: '/workspace', + cursor: 4, + size: 20, + }); + await harness.client.prompt('session-1', 'hello'); + await harness.client.cancel('session-1'); + await harness.client.setMode('session-1', 'yolo'); + await harness.client.setModel('session-1', 'qwen-plus'); + + expect(harness.connection.newSession).toHaveBeenCalledWith({ + cwd: '/workspace', + mcpServers: [], + }); + expect(harness.connection.loadSession).toHaveBeenCalledWith({ + sessionId: 'session-1', + cwd: '/workspace', + mcpServers: [], + }); + expect(harness.connection.unstable_listSessions).toHaveBeenCalledWith({ + cwd: '/workspace', + cursor: '4', + _meta: { size: 20 }, + }); + expect(harness.connection.prompt).toHaveBeenCalledWith({ + sessionId: 'session-1', + prompt: [{ type: 'text', text: 'hello' }], + }); + expect(harness.connection.cancel).toHaveBeenCalledWith({ + sessionId: 'session-1', + }); + expect(harness.connection.setSessionMode).toHaveBeenCalledWith({ + sessionId: 'session-1', + modeId: 'yolo', + }); + expect(harness.connection.unstable_setSessionModel).toHaveBeenCalledWith({ + sessionId: 'session-1', + modelId: 'qwen-plus', + }); + }); + + it('clears connection state and kills the process on disconnect', async () => { + const harness = createHarness(); + await harness.client.connect(); + + harness.client.disconnect(); + + expect(harness.child.kill).toHaveBeenCalledOnce(); + expect(harness.client.isConnected).toBe(false); + }); + + it('reports child process exits through onDisconnected', async () => { + const harness = createHarness(); + const onDisconnected = vi.fn(); + harness.client.onDisconnected = onDisconnected; + await harness.client.connect(); + + harness.child.emit('exit', 143, 'SIGTERM'); + + expect(onDisconnected).toHaveBeenCalledWith(143, 'SIGTERM'); + expect(harness.client.isConnected).toBe(false); + }); + + it('rejects connect when the ACP process exits before initialize completes', async () => { + const harness = createHarness(); + vi.mocked(harness.connection.initialize).mockReturnValue( + new Promise(() => {}), + ); + + const connectPromise = harness.client.connect(); + harness.child.stderr?.emit('data', Buffer.from('startup failed')); + harness.child.emit('exit', 1, null); + + await expect(connectPromise).rejects.toThrow( + 'Qwen ACP process exited unexpectedly', + ); + }); +}); + +function createHarness( + options: Partial = {}, +): Harness { + const child = createFakeChild(); + const connection = createFakeConnection(); + const capturedClients: Client[] = []; + const spawnProcess = vi.fn( + (_command: string, _args: string[], _options: SpawnOptions) => child, + ); + const createConnection: AcpProcessClientOptions['createConnection'] = ( + clientFactory: (agent: Agent) => Client, + ) => { + capturedClients.push(clientFactory({} as Agent)); + return connection; + }; + + return { + child, + connection, + capturedClients, + spawnProcess, + client: new AcpProcessClient({ + cliEntryPath: '/tmp/qwen.js', + startupDelayMs: 0, + validateCliPath: false, + spawnProcess, + createConnection, + ...options, + }), + }; +} + +function createFakeConnection(): AcpSdkConnection { + return { + authenticate: vi.fn().mockResolvedValue({}), + cancel: vi.fn().mockResolvedValue(undefined), + extMethod: vi.fn().mockResolvedValue({ success: true }), + initialize: vi.fn().mockResolvedValue({ + protocolVersion: PROTOCOL_VERSION, + agentInfo: { + name: 'qwen-code', + title: 'Qwen Code', + version: 'test', + }, + agentCapabilities: {}, + authMethods: [], + } satisfies InitializeResponse), + loadSession: vi.fn().mockResolvedValue({ sessionId: 'session-1' }), + newSession: vi.fn().mockResolvedValue({ sessionId: 'session-1' }), + prompt: vi.fn().mockResolvedValue({ stopReason: 'end_turn' }), + setSessionMode: vi.fn().mockResolvedValue({}), + unstable_listSessions: vi.fn().mockResolvedValue({ sessions: [] }), + unstable_setSessionModel: vi.fn().mockResolvedValue({}), + } as unknown as AcpSdkConnection; +} + +function createFakeChild(): ChildProcess { + const child = new EventEmitter() as ChildProcess; + const stdin = new PassThrough(); + const stdout = new PassThrough(); + const stderr = new PassThrough(); + + Object.assign(child, { + stdin, + stdout, + stderr, + killed: false, + exitCode: null, + kill: vi.fn(() => { + Object.assign(child, { killed: true, exitCode: 0 }); + return true; + }), + }); + + return child; +} diff --git a/packages/desktop/src/server/acp/AcpProcessClient.ts b/packages/desktop/src/server/acp/AcpProcessClient.ts new file mode 100644 index 000000000..b101f2bed --- /dev/null +++ b/packages/desktop/src/server/acp/AcpProcessClient.ts @@ -0,0 +1,356 @@ +/** + * @license + * Copyright 2026 Qwen Team + * SPDX-License-Identifier: Apache-2.0 + */ + +import { spawn } from 'node:child_process'; +import type { ChildProcess, SpawnOptions } from 'node:child_process'; +import { existsSync } from 'node:fs'; +import { Readable, Writable } from 'node:stream'; +import { + ClientSideConnection, + ndJsonStream, + PROTOCOL_VERSION, +} from '@agentclientprotocol/sdk'; +import type { + Agent, + AuthenticateResponse, + Client, + ClientCapabilities, + ContentBlock, + InitializeResponse, + ListSessionsResponse, + LoadSessionResponse, + NewSessionResponse, + PromptResponse, + RequestPermissionRequest, + RequestPermissionResponse, + SessionNotification, + SetSessionModeResponse, + SetSessionModelResponse, +} from '@agentclientprotocol/sdk'; + +type AcpSdkConnection = Pick< + ClientSideConnection, + | 'authenticate' + | 'cancel' + | 'extMethod' + | 'initialize' + | 'loadSession' + | 'newSession' + | 'prompt' + | 'setSessionMode' + | 'unstable_listSessions' + | 'unstable_setSessionModel' +>; + +type CreateAcpConnection = ( + clientFactory: (agent: Agent) => Client, + stdin: WritableStream, + stdout: ReadableStream, +) => AcpSdkConnection; + +type SpawnProcess = ( + command: string, + args: string[], + options: SpawnOptions, +) => ChildProcess; + +export interface AcpProcessClientOptions { + cliEntryPath: string; + cwd?: string; + command?: string; + channel?: 'Desktop'; + extraArgs?: string[]; + env?: NodeJS.ProcessEnv; + startupDelayMs?: number; + validateCliPath?: boolean; + spawnProcess?: SpawnProcess; + createConnection?: CreateAcpConnection; +} + +export interface ListSessionsOptions { + cwd?: string; + cursor?: number; + size?: number; +} + +export class AcpProcessClient { + private child: ChildProcess | null = null; + private connection: AcpSdkConnection | null = null; + private readonly options: Required< + Pick< + AcpProcessClientOptions, + 'channel' | 'startupDelayMs' | 'validateCliPath' + > + > & + Omit< + AcpProcessClientOptions, + 'channel' | 'startupDelayMs' | 'validateCliPath' + >; + + onSessionUpdate: (notification: SessionNotification) => void = () => {}; + onPermissionRequest: ( + request: RequestPermissionRequest, + ) => Promise = async () => ({ + outcome: { outcome: 'cancelled' }, + }); + onExtNotification: ( + method: string, + params: Record, + ) => Promise = async () => {}; + onDisconnected: (code: number | null, signal: string | null) => void = + () => {}; + onInitialized: (response: InitializeResponse) => void = () => {}; + + constructor(options: AcpProcessClientOptions) { + this.options = { + ...options, + channel: options.channel ?? 'Desktop', + startupDelayMs: options.startupDelayMs ?? 1000, + validateCliPath: options.validateCliPath ?? true, + }; + } + + get isConnected(): boolean { + return ( + this.child !== null && + !this.child.killed && + this.child.exitCode === null && + this.connection !== null + ); + } + + async connect(): Promise { + if (this.child) { + this.disconnect(); + } + + if ( + this.options.validateCliPath && + !existsSync(this.options.cliEntryPath) + ) { + throw new Error( + `Qwen CLI ACP entry not found: ${this.options.cliEntryPath}`, + ); + } + + const child = this.spawnChild(); + const stderrChunks: string[] = []; + let spawnError: Error | null = null; + let startupComplete = false; + + const processExitPromise = new Promise((_resolve, reject) => { + child.on('exit', (code: number | null, signal: string | null) => { + const stderrOutput = stderrChunks.join('').trim(); + const stderrSuffix = stderrOutput + ? `\nCLI stderr: ${stderrOutput.slice(-500)}` + : ''; + + if (this.child === child) { + this.child = null; + this.connection = null; + this.onDisconnected(code, signal); + } + + if (!startupComplete) { + reject( + new Error( + `Qwen ACP process exited unexpectedly (exit code: ${code}, signal: ${signal})${stderrSuffix}`, + ), + ); + } + }); + }); + + child.stderr?.on('data', (data: Buffer) => { + stderrChunks.push(data.toString()); + }); + child.on('error', (error: Error) => { + spawnError = error; + }); + + this.child = child; + + if (this.options.startupDelayMs > 0) { + await delay(this.options.startupDelayMs); + } + + if (spawnError) { + throw spawnError; + } + if (child.killed || child.exitCode !== null) { + throw new Error('Qwen ACP process failed to start.'); + } + + const connection = this.createSdkConnection(child); + this.connection = connection; + + const initializeResponse = await Promise.race([ + connection.initialize({ + protocolVersion: PROTOCOL_VERSION, + clientCapabilities: this.getClientCapabilities(), + }), + processExitPromise, + ]).finally(() => { + startupComplete = true; + }); + + this.onInitialized(initializeResponse); + return initializeResponse; + } + + disconnect(): void { + if (this.child) { + this.child.kill(); + } + + this.child = null; + this.connection = null; + } + + async authenticate(methodId = 'default'): Promise { + return this.ensureConnection().authenticate({ methodId }); + } + + async newSession(cwd: string): Promise { + return this.ensureConnection().newSession({ cwd, mcpServers: [] }); + } + + async loadSession( + sessionId: string, + cwd: string, + ): Promise { + return this.ensureConnection().loadSession({ + sessionId, + cwd, + mcpServers: [], + }); + } + + async listSessions( + options: ListSessionsOptions = {}, + ): Promise { + const params: Parameters[0] = { + cwd: options.cwd ?? this.options.cwd ?? process.cwd(), + }; + + if (options.cursor !== undefined) { + params.cursor = String(options.cursor); + } + if (options.size !== undefined) { + params._meta = { ...(params._meta ?? {}), size: options.size }; + } + + return this.ensureConnection().unstable_listSessions(params); + } + + async prompt( + sessionId: string, + prompt: string | ContentBlock[], + ): Promise { + const promptBlocks = + typeof prompt === 'string' ? [{ type: 'text', text: prompt }] : prompt; + return this.ensureConnection().prompt({ + sessionId, + prompt: promptBlocks, + }); + } + + async cancel(sessionId: string): Promise { + await this.ensureConnection().cancel({ sessionId }); + } + + async setMode( + sessionId: string, + modeId: string, + ): Promise { + return this.ensureConnection().setSessionMode({ sessionId, modeId }); + } + + async setModel( + sessionId: string, + modelId: string, + ): Promise { + return this.ensureConnection().unstable_setSessionModel({ + sessionId, + modelId, + }); + } + + async extMethod>( + method: string, + params: Record, + ): Promise { + return (await this.ensureConnection().extMethod(method, params)) as T; + } + + private spawnChild(): ChildProcess { + const command = this.options.command ?? process.execPath; + const args = [ + this.options.cliEntryPath, + '--acp', + `--channel=${this.options.channel}`, + ...(this.options.extraArgs ?? []), + ]; + const env = { ...process.env, ...(this.options.env ?? {}) }; + + return (this.options.spawnProcess ?? spawn)(command, args, { + cwd: this.options.cwd ?? process.cwd(), + env, + shell: false, + stdio: ['pipe', 'pipe', 'pipe'], + }); + } + + private createSdkConnection(child: ChildProcess): AcpSdkConnection { + const stdout = Readable.toWeb(child.stdout!) as ReadableStream; + const stdin = Writable.toWeb(child.stdin!) as WritableStream; + const createConnection = + this.options.createConnection ?? createSdkConnection; + + return createConnection( + () => ({ + sessionUpdate: (params: SessionNotification): Promise => { + this.onSessionUpdate(params); + return Promise.resolve(); + }, + requestPermission: ( + params: RequestPermissionRequest, + ): Promise => + this.onPermissionRequest(params), + extNotification: ( + method: string, + params: Record, + ): Promise => this.onExtNotification(method, params), + }), + stdin, + stdout, + ); + } + + private ensureConnection(): AcpSdkConnection { + if (!this.isConnected || !this.connection) { + throw new Error('Not connected to ACP agent'); + } + + return this.connection; + } + + private getClientCapabilities(): ClientCapabilities { + return {}; + } +} + +function createSdkConnection( + clientFactory: (agent: Agent) => Client, + stdin: WritableStream, + stdout: ReadableStream, +): AcpSdkConnection { + return new ClientSideConnection(clientFactory, ndJsonStream(stdin, stdout)); +} + +async function delay(ms: number): Promise { + await new Promise((resolve) => setTimeout(resolve, ms)); +}