diff --git a/design/qwen-code-electron-desktop-implementation-plan.md b/design/qwen-code-electron-desktop-implementation-plan.md index 3146d3a30..9d361640f 100644 --- a/design/qwen-code-electron-desktop-implementation-plan.md +++ b/design/qwen-code-electron-desktop-implementation-plan.md @@ -107,7 +107,7 @@ order, verification, decisions, and remaining work. ### Slice 5: WebSocket Chat Loop -- Status: pending +- Status: in progress - Goal: add per-session WS connections and send user prompts through ACP. - Files: - `packages/desktop/src/server/ws/SessionSocketHub.ts` @@ -118,6 +118,12 @@ order, verification, decisions, and remaining work. - WS handshake validates session id and token. - One active prompt per session is enforced. - Renderer receives normalized assistant/tool/usage events. +- Progress: + - 2026-04-25: authenticated `/ws/:sessionId` handshake, `ping`/`pong`, + `user_message` to ACP `prompt`, `stop_generation` to ACP `cancel`, and + one-active-prompt guard are implemented on the server. + - Remaining: ACP `sessionUpdate` normalization, renderer WebSocket client, + and chat store integration. - Verification: - `npm run test --workspace=packages/desktop` - fake ACP integration test for prompt and stream completion @@ -236,6 +242,18 @@ order, verification, decisions, and remaining work. - `npm run typecheck` passed across workspaces. - `npm run build` passed across the configured build order. Existing VS Code companion lint warnings were reported by its build script, with no errors. +- 2026-04-25 Slice 5a: + - `npm install --ignore-scripts --workspace=@qwen-code/desktop` passed. + - `npx prettier --check design/qwen-code-electron-desktop-implementation-plan.md scripts/build.js packages/desktop` passed. + - `npm run test --workspace=packages/desktop` passed: 2 files, 21 tests. + - `npm run lint --workspace=packages/desktop` passed. + - `npm run typecheck --workspace=packages/desktop` passed. + - `npm run build --workspace=packages/desktop` passed. + - `npm exec --workspace=packages/desktop -- electron --version` passed: + `v41.3.0`. + - `npm run typecheck` passed across workspaces. + - `npm run build` passed across the configured build order. Existing VS Code + companion lint warnings were reported by its build script, with no errors. ## Self Review Notes @@ -272,10 +290,18 @@ order, verification, decisions, and remaining work. - Electron main intentionally does not auto-start real ACP yet; CLI path resolution and packaged `ELECTRON_RUN_AS_NODE=1` behavior remain for the packaging/runtime integration slices. +- 2026-04-25 Slice 5a: + - WebSocket upgrade uses the same local-origin policy and random token as the + REST API. + - The hub defaults to an `acp_unavailable` error when no ACP client is + injected, rather than silently dropping user messages. + - Session update broadcasting is intentionally a follow-up; this keeps the + prompt/cancel transport independently testable before event normalization. ## Remaining Work -- Commit Slice 4. -- Continue with Slice 5 WebSocket chat loop. +- Commit Slice 5a. +- Continue with ACP event normalization and renderer WebSocket client for the + rest of Slice 5. - Continue through the ACP, session, WebSocket, permission, settings, and packaging slices until the architecture MVP is fully verified. diff --git a/package-lock.json b/package-lock.json index 27bcde59a..830a40378 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18789,12 +18789,14 @@ "@agentclientprotocol/sdk": "^0.14.1", "@qwen-code/webui": "file:../webui", "react": "^19.1.0", - "react-dom": "^19.1.0" + "react-dom": "^19.1.0", + "ws": "^8.18.0" }, "devDependencies": { "@types/node": "^20.11.24", "@types/react": "^19.1.8", "@types/react-dom": "^19.1.6", + "@types/ws": "^8.5.10", "@vitejs/plugin-react": "^4.2.0", "electron": "^41.3.0", "esbuild": "^0.25.0", diff --git a/packages/desktop/package.json b/packages/desktop/package.json index 3e6b6d045..7c6702e04 100644 --- a/packages/desktop/package.json +++ b/packages/desktop/package.json @@ -20,12 +20,14 @@ "@agentclientprotocol/sdk": "^0.14.1", "@qwen-code/webui": "file:../webui", "react": "^19.1.0", - "react-dom": "^19.1.0" + "react-dom": "^19.1.0", + "ws": "^8.18.0" }, "devDependencies": { "@types/node": "^20.11.24", "@types/react": "^19.1.8", "@types/react-dom": "^19.1.6", + "@types/ws": "^8.5.10", "@vitejs/plugin-react": "^4.2.0", "electron": "^41.3.0", "esbuild": "^0.25.0", diff --git a/packages/desktop/src/server/index.test.ts b/packages/desktop/src/server/index.test.ts index fa11017a8..494053d3a 100644 --- a/packages/desktop/src/server/index.test.ts +++ b/packages/desktop/src/server/index.test.ts @@ -5,6 +5,7 @@ */ import { afterEach, describe, expect, it, vi } from 'vitest'; +import { WebSocket } from 'ws'; import { startDesktopServer } from './index.js'; import type { DesktopServer } from './types.js'; import type { AcpSessionClient } from './services/sessionService.js'; @@ -221,6 +222,62 @@ describe('DesktopServer', () => { }); }); + it('accepts authenticated session WebSocket connections', async () => { + const server = await createTestServer(createAcpClient()); + const testSocket = await connectSocket(server, '/ws/session-1'); + + expect(await testSocket.readMessage()).toMatchObject({ + type: 'connected', + sessionId: 'session-1', + }); + + testSocket.socket.send(JSON.stringify({ type: 'ping' })); + expect(await testSocket.readMessage()).toMatchObject({ type: 'pong' }); + testSocket.socket.close(); + }); + + it('sends user messages to ACP prompt over WebSocket', async () => { + const acpClient = createAcpClient(); + const server = await createTestServer(acpClient); + const testSocket = await connectSocket(server, '/ws/session-1'); + await testSocket.readMessage(); + + testSocket.socket.send( + JSON.stringify({ type: 'user_message', content: 'hello' }), + ); + + expect(await testSocket.readMessage()).toMatchObject({ + type: 'message_complete', + stopReason: 'end_turn', + }); + expect(acpClient.prompt).toHaveBeenCalledWith('session-1', 'hello'); + testSocket.socket.close(); + }); + + it('cancels generation over WebSocket', async () => { + const acpClient = createAcpClient(); + const server = await createTestServer(acpClient); + const testSocket = await connectSocket(server, '/ws/session-1'); + await testSocket.readMessage(); + + testSocket.socket.send(JSON.stringify({ type: 'stop_generation' })); + + expect(await testSocket.readMessage()).toMatchObject({ + type: 'message_complete', + stopReason: 'cancelled', + }); + expect(acpClient.cancel).toHaveBeenCalledWith('session-1'); + testSocket.socket.close(); + }); + + it('rejects WebSocket connections without the desktop token', async () => { + const server = await createTestServer(createAcpClient()); + + await expect( + connectSocket(server, '/ws/session-1', 'wrong-token'), + ).rejects.toThrow(); + }); + it('returns a typed error for unknown authenticated routes', async () => { const server = await createTestServer(); @@ -321,6 +378,52 @@ function createAcpClient(): AcpSessionClient { }), newSession: vi.fn().mockResolvedValue({ sessionId: 'session-1' }), loadSession: vi.fn().mockResolvedValue({ models: [] }), + prompt: vi.fn().mockResolvedValue({ stopReason: 'end_turn' }), + cancel: vi.fn().mockResolvedValue(undefined), extMethod: vi.fn().mockResolvedValue({ success: true }), }; } + +async function connectSocket( + server: DesktopServer, + path: string, + token = server.info.token, +): Promise<{ + socket: WebSocket; + readMessage(): Promise; +}> { + const url = new URL(path, server.info.url.replace('http:', 'ws:')); + url.searchParams.set('token', token); + const socket = new WebSocket(url); + const messages: unknown[] = []; + const messageWaiters: Array<(message: unknown) => void> = []; + + socket.on('message', (data) => { + const parsed = JSON.parse(data.toString()) as unknown; + const waiter = messageWaiters.shift(); + if (waiter) { + waiter(parsed); + } else { + messages.push(parsed); + } + }); + + await new Promise((resolve, reject) => { + socket.once('open', () => resolve()); + socket.once('error', reject); + }); + + return { + socket, + readMessage: () => { + const message = messages.shift(); + if (message) { + return Promise.resolve(message); + } + + return new Promise((resolve) => { + messageWaiters.push(resolve); + }); + }, + }; +} diff --git a/packages/desktop/src/server/index.ts b/packages/desktop/src/server/index.ts index dd6a8996e..a43ac5d9a 100644 --- a/packages/desktop/src/server/index.ts +++ b/packages/desktop/src/server/index.ts @@ -20,6 +20,7 @@ import { import { isDesktopHttpError, DesktopHttpError } from './http/errors.js'; import { getRuntimeInfo } from './services/runtimeService.js'; import { DesktopSessionService } from './services/sessionService.js'; +import { SessionSocketHub } from './ws/SessionSocketHub.js'; import type { DesktopJsonResponse, DesktopServer, @@ -40,6 +41,10 @@ export async function startDesktopServer( const now = options.now ?? (() => new Date()); const startedAt = now().getTime(); const sessionService = new DesktopSessionService(options.acpClient); + const socketHub = new SessionSocketHub({ + token, + acpClient: options.acpClient, + }); const server = createServer((request, response) => { void handleRequest(request, response, { token, @@ -67,6 +72,9 @@ export async function startDesktopServer( }); }); }); + server.on('upgrade', (request, socket, head) => { + socketHub.handleUpgrade(request, socket, head); + }); await new Promise((resolve, reject) => { const handleError = (error: Error) => { @@ -94,7 +102,10 @@ export async function startDesktopServer( url: `http://127.0.0.1:${address.port}`, token, }, - close: () => closeHttpServer(server), + close: async () => { + socketHub.close(); + await closeHttpServer(server); + }, }; } diff --git a/packages/desktop/src/server/services/sessionService.ts b/packages/desktop/src/server/services/sessionService.ts index 5ab32415f..08c3dec5b 100644 --- a/packages/desktop/src/server/services/sessionService.ts +++ b/packages/desktop/src/server/services/sessionService.ts @@ -8,6 +8,7 @@ import type { ListSessionsResponse, LoadSessionResponse, NewSessionResponse, + PromptResponse, } from '@agentclientprotocol/sdk'; import { DesktopHttpError } from '../http/errors.js'; @@ -21,6 +22,8 @@ export interface AcpSessionClient { }): Promise; newSession(cwd: string): Promise; loadSession(sessionId: string, cwd: string): Promise; + prompt(sessionId: string, prompt: string): Promise; + cancel(sessionId: string): Promise; extMethod>( method: string, params: Record, diff --git a/packages/desktop/src/server/ws/SessionSocketHub.ts b/packages/desktop/src/server/ws/SessionSocketHub.ts new file mode 100644 index 000000000..91463b4fd --- /dev/null +++ b/packages/desktop/src/server/ws/SessionSocketHub.ts @@ -0,0 +1,270 @@ +/** + * @license + * Copyright 2026 Qwen Team + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { IncomingMessage } from 'node:http'; +import type { Duplex } from 'node:stream'; +import { WebSocket, WebSocketServer } from 'ws'; +import { getSingleHeader, isAllowedOrigin } from '../http/auth.js'; +import type { AcpSessionClient } from '../services/sessionService.js'; + +type ClientMessage = + | { type: 'ping' } + | { type: 'stop_generation' } + | { type: 'user_message'; content: string }; + +type ServerMessage = + | { type: 'connected'; sessionId: string } + | { type: 'pong' } + | { type: 'message_complete'; stopReason?: string } + | { type: 'error'; code: string; message: string; retryable?: boolean }; + +interface SessionSocketHubOptions { + token: string; + acpClient?: AcpSessionClient; +} + +export class SessionSocketHub { + private readonly server = new WebSocketServer({ noServer: true }); + private readonly socketsBySession = new Map>(); + private readonly activePrompts = new Set(); + + constructor(private readonly options: SessionSocketHubOptions) {} + + handleUpgrade(request: IncomingMessage, socket: Duplex, head: Buffer): void { + const origin = getSingleHeader(request.headers.origin); + if (!isAllowedOrigin(origin)) { + rejectUpgrade(socket, 403, 'Forbidden'); + return; + } + + const requestUrl = parseSocketUrl(request); + if (!requestUrl) { + rejectUpgrade(socket, 400, 'Bad Request'); + return; + } + + const sessionId = matchSessionId(requestUrl.pathname); + if (!sessionId) { + rejectUpgrade(socket, 404, 'Not Found'); + return; + } + + if (requestUrl.searchParams.get('token') !== this.options.token) { + rejectUpgrade(socket, 401, 'Unauthorized'); + return; + } + + this.server.handleUpgrade(request, socket, head, (webSocket) => { + this.handleConnection(sessionId, webSocket); + }); + } + + close(): void { + for (const sockets of this.socketsBySession.values()) { + for (const socket of sockets) { + socket.close(); + } + } + this.socketsBySession.clear(); + this.activePrompts.clear(); + this.server.close(); + } + + broadcast(sessionId: string, message: ServerMessage): void { + const sockets = this.socketsBySession.get(sessionId); + if (!sockets) { + return; + } + + for (const socket of sockets) { + sendMessage(socket, message); + } + } + + private handleConnection(sessionId: string, socket: WebSocket): void { + const sockets = + this.socketsBySession.get(sessionId) ?? new Set(); + sockets.add(socket); + this.socketsBySession.set(sessionId, sockets); + + sendMessage(socket, { type: 'connected', sessionId }); + + socket.on('message', (rawMessage) => { + void this.handleClientMessage(sessionId, socket, rawMessage).catch( + (error: unknown) => { + sendMessage(socket, { + type: 'error', + code: 'internal_error', + message: + error instanceof Error + ? error.message + : 'WebSocket message handling failed.', + }); + }, + ); + }); + + socket.on('close', () => { + sockets.delete(socket); + if (sockets.size === 0) { + this.socketsBySession.delete(sessionId); + } + }); + } + + private async handleClientMessage( + sessionId: string, + socket: WebSocket, + rawMessage: WebSocket.RawData, + ): Promise { + const message = parseClientMessage(rawMessage); + if (!message) { + sendMessage(socket, { + type: 'error', + code: 'bad_message', + message: 'WebSocket message is invalid.', + }); + return; + } + + switch (message.type) { + case 'ping': + sendMessage(socket, { type: 'pong' }); + return; + case 'stop_generation': + await this.cancelPrompt(sessionId, socket); + return; + case 'user_message': + await this.sendPrompt(sessionId, socket, message.content); + return; + default: + sendMessage(socket, { + type: 'error', + code: 'bad_message', + message: 'WebSocket message is invalid.', + }); + } + } + + private async sendPrompt( + sessionId: string, + socket: WebSocket, + content: string, + ): Promise { + if (!this.options.acpClient) { + sendMessage(socket, { + type: 'error', + code: 'acp_unavailable', + message: 'ACP client is not configured.', + retryable: true, + }); + return; + } + + if (this.activePrompts.has(sessionId)) { + sendMessage(socket, { + type: 'error', + code: 'prompt_active', + message: 'A prompt is already running for this session.', + retryable: true, + }); + return; + } + + this.activePrompts.add(sessionId); + try { + const response = await this.options.acpClient.prompt(sessionId, content); + sendMessage(socket, { + type: 'message_complete', + stopReason: response.stopReason, + }); + } finally { + this.activePrompts.delete(sessionId); + } + } + + private async cancelPrompt( + sessionId: string, + socket: WebSocket, + ): Promise { + if (!this.options.acpClient) { + sendMessage(socket, { + type: 'error', + code: 'acp_unavailable', + message: 'ACP client is not configured.', + retryable: true, + }); + return; + } + + await this.options.acpClient.cancel(sessionId); + this.activePrompts.delete(sessionId); + sendMessage(socket, { type: 'message_complete', stopReason: 'cancelled' }); + } +} + +function parseSocketUrl(request: IncomingMessage): URL | null { + try { + return new URL(request.url ?? '/', 'ws://127.0.0.1'); + } catch { + return null; + } +} + +function matchSessionId(pathname: string): string | null { + const match = /^\/ws\/([^/]+)$/u.exec(pathname); + if (!match?.[1]) { + return null; + } + + return decodeURIComponent(match[1]); +} + +function parseClientMessage( + rawMessage: WebSocket.RawData, +): ClientMessage | null { + let parsed: unknown; + try { + parsed = JSON.parse(rawMessage.toString()) as unknown; + } catch { + return null; + } + + if (!parsed || typeof parsed !== 'object' || !('type' in parsed)) { + return null; + } + + const candidate = parsed as Partial; + if (candidate.type === 'ping' || candidate.type === 'stop_generation') { + return { type: candidate.type }; + } + + if ( + candidate.type === 'user_message' && + typeof candidate.content === 'string' + ) { + return { type: 'user_message', content: candidate.content }; + } + + return null; +} + +function sendMessage(socket: WebSocket, message: ServerMessage): void { + if (socket.readyState === WebSocket.OPEN) { + socket.send(JSON.stringify(message)); + } +} + +function rejectUpgrade( + socket: Duplex, + statusCode: number, + message: string, +): void { + socket.write( + `HTTP/1.1 ${statusCode} ${message}\r\nConnection: close\r\n\r\n`, + ); + socket.destroy(); +}