feat(desktop): complete session websocket chat loop

This commit is contained in:
DragonnZhang 2026-04-25 03:04:14 +08:00
parent 9b0ec190e7
commit bbab16f3b8
13 changed files with 1723 additions and 31 deletions

View file

@ -107,7 +107,7 @@ order, verification, decisions, and remaining work.
### Slice 5: WebSocket Chat Loop
- Status: in progress
- Status: complete
- Goal: add per-session WS connections and send user prompts through ACP.
- Files:
- `packages/desktop/src/server/ws/SessionSocketHub.ts`
@ -122,8 +122,11 @@ order, verification, decisions, and remaining work.
- 2026-04-25: authenticated `/ws/:sessionId` handshake, `ping`/`pong`,
`user_message` to ACP `prompt`, `stop_generation` to ACP `cancel`, and
one-active-prompt guard are implemented on the server.
- Remaining: ACP `sessionUpdate` normalization, renderer WebSocket client,
and chat store integration.
- 2026-04-25: added `AcpEventRouter` normalization for ACP message, tool,
plan, mode, commands, and usage updates; routed session updates into the
per-session socket hub; added a renderer WebSocket client, chat reducer,
and basic workbench wiring for session selection, streaming messages,
tool updates, plan updates, usage, stop, and send.
- Verification:
- `npm run test --workspace=packages/desktop`
- fake ACP integration test for prompt and stream completion
@ -193,6 +196,11 @@ order, verification, decisions, and remaining work.
committing to an HTTP framework before the ACP routing shape is known.
- 2026-04-25: Allow CORS preflight without bearer auth, but only for allowed
app origins. Actual REST requests remain bearer-token protected.
- 2026-04-25: Keep ACP update normalization inside `packages/desktop` for now
instead of importing the VS Code session update handler. The desktop protocol
needs WebSocket message shapes, while the VS Code handler is callback/UI
oriented; shared extraction can happen after permission and settings slices
stabilize the common surface.
## Verification Log
@ -254,6 +262,11 @@ order, verification, decisions, and remaining work.
- `npm run typecheck` passed across workspaces.
- `npm run build` passed across the configured build order. Existing VS Code
companion lint warnings were reported by its build script, with no errors.
- 2026-04-25 Slice 5b:
- `npm run test --workspace=packages/desktop` passed: 3 files, 26 tests.
- `npm run lint --workspace=packages/desktop` passed.
- `npm run typecheck --workspace=packages/desktop` passed.
- `npm run build --workspace=packages/desktop` passed.
## Self Review Notes
@ -297,11 +310,21 @@ order, verification, decisions, and remaining work.
injected, rather than silently dropping user messages.
- Session update broadcasting is intentionally a follow-up; this keeps the
prompt/cancel transport independently testable before event normalization.
- 2026-04-25 Slice 5b:
- ACP session updates now broadcast only to sockets for the matching session;
tests cover a second session socket receiving only its own `pong`.
- Renderer chat state consumes the shared desktop WebSocket protocol without
Node access and keeps the server token in memory from preload-provided
server info.
- Main still does not auto-start a real ACP child process; the chat loop is
verified with an injected fake ACP client and remains ready for the runtime
integration slice.
## Remaining Work
- Commit Slice 5a.
- Continue with ACP event normalization and renderer WebSocket client for the
rest of Slice 5.
- Continue through the ACP, session, WebSocket, permission, settings, and
packaging slices until the architecture MVP is fully verified.
- Commit Slice 5b.
- Start Slice 6 permission bridge: route ACP permission and ask-user-question
callbacks through the existing per-session WebSocket channel with timeout
cancellation.
- Continue through permission, settings/auth/model/mode, real ACP runtime
startup, and packaging slices until the architecture MVP is fully verified.

View file

@ -4,11 +4,32 @@
* SPDX-License-Identifier: Apache-2.0
*/
import { useEffect, useMemo, useState } from 'react';
import {
type FormEvent,
useCallback,
useEffect,
useMemo,
useReducer,
useRef,
useState,
} from 'react';
import {
createDesktopSession,
listDesktopSessions,
loadDesktopStatus,
type DesktopConnectionStatus,
type DesktopSessionSummary,
} from './api/client.js';
import {
connectSessionSocket,
type SessionSocketClient,
} from './api/websocket.js';
import {
chatReducer,
createInitialChatState,
type ChatState,
type ChatTimelineItem,
} from './stores/chatStore.js';
type LoadState =
| { state: 'loading' }
@ -17,6 +38,17 @@ type LoadState =
export function App() {
const [loadState, setLoadState] = useState<LoadState>({ state: 'loading' });
const [workspacePath, setWorkspacePath] = useState('');
const [sessions, setSessions] = useState<DesktopSessionSummary[]>([]);
const [activeSessionId, setActiveSessionId] = useState<string | null>(null);
const [sessionError, setSessionError] = useState<string | null>(null);
const [messageText, setMessageText] = useState('');
const [chatState, dispatchChat] = useReducer(
chatReducer,
undefined,
createInitialChatState,
);
const socketRef = useRef<SessionSocketClient | null>(null);
useEffect(() => {
let disposed = false;
@ -47,6 +79,110 @@ export function App() {
};
}, []);
useEffect(() => {
if (loadState.state !== 'ready') {
return;
}
let disposed = false;
void listDesktopSessions(
loadState.status.serverInfo,
workspacePath || undefined,
)
.then((result) => {
if (!disposed) {
setSessions(result.sessions);
setSessionError(null);
}
})
.catch((error: unknown) => {
if (!disposed) {
setSessionError(getErrorMessage(error));
}
});
return () => {
disposed = true;
};
}, [loadState, workspacePath]);
useEffect(() => {
socketRef.current?.close();
socketRef.current = null;
if (loadState.state !== 'ready' || !activeSessionId) {
return;
}
dispatchChat({ type: 'connect' });
const socket = connectSessionSocket(
loadState.status.serverInfo,
activeSessionId,
{
onMessage: (message) =>
dispatchChat({ type: 'server_message', message }),
onClose: () => dispatchChat({ type: 'disconnect' }),
onError: () => setSessionError('Session socket connection failed.'),
},
);
socketRef.current = socket;
return () => {
socket.close();
if (socketRef.current === socket) {
socketRef.current = null;
}
};
}, [activeSessionId, loadState]);
const chooseWorkspace = useCallback(async () => {
try {
const selectedPath = await window.qwenDesktop.selectDirectory();
if (selectedPath) {
setWorkspacePath(selectedPath);
}
} catch (error) {
setSessionError(getErrorMessage(error));
}
}, []);
const createSession = useCallback(async () => {
if (loadState.state !== 'ready' || !workspacePath) {
return;
}
try {
const session = await createDesktopSession(
loadState.status.serverInfo,
workspacePath,
);
setSessions((current) => [session, ...current]);
setActiveSessionId(session.sessionId);
setSessionError(null);
} catch (error) {
setSessionError(getErrorMessage(error));
}
}, [loadState, workspacePath]);
const sendMessage = useCallback(
(event: FormEvent<HTMLFormElement>) => {
event.preventDefault();
const content = messageText.trim();
if (!content || !socketRef.current) {
return;
}
dispatchChat({ type: 'append_user_message', content });
socketRef.current.sendUserMessage(content);
setMessageText('');
},
[messageText],
);
const stopGeneration = useCallback(() => {
socketRef.current?.stopGeneration();
}, []);
const statusLabel = useMemo(() => {
if (loadState.state === 'ready') {
return 'Connected';
@ -74,12 +210,28 @@ export function App() {
<section className="sidebar-section">
<h2>Workspace</h2>
<div className="empty-row">No folder selected</div>
<div className="workspace-path">
{workspacePath || 'No folder selected'}
</div>
<button className="secondary-button" onClick={chooseWorkspace}>
Select Folder
</button>
<button
className="primary-button"
disabled={loadState.state !== 'ready' || !workspacePath}
onClick={createSession}
>
New Session
</button>
</section>
<section className="sidebar-section sidebar-section-fill">
<h2>Sessions</h2>
<div className="empty-row">No sessions</div>
<SessionList
activeSessionId={activeSessionId}
sessions={sessions}
onSelect={setActiveSessionId}
/>
</section>
</aside>
@ -96,9 +248,38 @@ export function App() {
<section className="panel panel-main">
<div className="panel-header">
<h3>Conversation</h3>
<span>Idle</span>
<span>
{chatState.streaming ? 'Streaming' : chatState.connection}
</span>
</div>
<div className="conversation-empty">No session selected</div>
<ChatTimeline state={chatState} activeSessionId={activeSessionId} />
<form className="composer" onSubmit={sendMessage}>
<textarea
aria-label="Message"
disabled={!activeSessionId}
onChange={(event) => setMessageText(event.target.value)}
placeholder={activeSessionId ? 'Message Qwen Code' : ''}
rows={3}
value={messageText}
/>
<div className="composer-actions">
<button
className="secondary-button"
disabled={!chatState.streaming}
type="button"
onClick={stopGeneration}
>
Stop
</button>
<button
className="primary-button"
disabled={!activeSessionId || messageText.trim().length === 0}
type="submit"
>
Send
</button>
</div>
</form>
</section>
<section className="panel panel-side">
@ -106,6 +287,11 @@ export function App() {
<h3>Runtime</h3>
</div>
<RuntimeDetails loadState={loadState} />
<SessionDetails
activeSessionId={activeSessionId}
chatState={chatState}
sessionError={sessionError}
/>
</section>
</div>
</section>
@ -113,6 +299,102 @@ export function App() {
);
}
function SessionList({
activeSessionId,
sessions,
onSelect,
}: {
activeSessionId: string | null;
sessions: DesktopSessionSummary[];
onSelect: (sessionId: string) => void;
}) {
if (sessions.length === 0) {
return <div className="empty-row">No sessions</div>;
}
return (
<div className="session-list">
{sessions.map((session) => (
<button
className={
session.sessionId === activeSessionId
? 'session-row session-row-active'
: 'session-row'
}
key={session.sessionId}
onClick={() => onSelect(session.sessionId)}
>
<span>{session.title || session.sessionId}</span>
<small>{session.cwd || session.sessionId}</small>
</button>
))}
</div>
);
}
function ChatTimeline({
activeSessionId,
state,
}: {
activeSessionId: string | null;
state: ChatState;
}) {
if (!activeSessionId) {
return <div className="conversation-empty">No session selected</div>;
}
if (state.items.length === 0) {
return <div className="conversation-empty">Session ready</div>;
}
return (
<div className="chat-timeline">
{state.items.map((item) => (
<TimelineItem item={item} key={item.id} />
))}
</div>
);
}
function TimelineItem({ item }: { item: ChatTimelineItem }) {
if (item.type === 'message') {
return (
<article className={`chat-message chat-message-${item.role}`}>
<div className="message-role">{item.role}</div>
<p>{item.text}</p>
</article>
);
}
if (item.type === 'tool') {
return (
<article className="chat-tool">
<div className="message-role">{item.toolCall.kind || 'tool'}</div>
<strong>{item.toolCall.title || item.toolCall.toolCallId}</strong>
{item.toolCall.status ? <span>{item.toolCall.status}</span> : null}
</article>
);
}
if (item.type === 'plan') {
return (
<article className="chat-plan">
<div className="message-role">plan</div>
<ol>
{item.entries.map((entry) => (
<li key={`${entry.content}-${entry.status}`}>
<span>{entry.status}</span>
{entry.content}
</li>
))}
</ol>
</article>
);
}
return <div className="chat-event">{item.label}</div>;
}
function StatusPill({ state }: { state: LoadState['state'] }) {
return <span className={`status-pill status-pill-${state}`}>{state}</span>;
}
@ -160,3 +442,53 @@ function RuntimeDetails({ loadState }: { loadState: LoadState }) {
</dl>
);
}
function SessionDetails({
activeSessionId,
chatState,
sessionError,
}: {
activeSessionId: string | null;
chatState: ChatState;
sessionError: string | null;
}) {
return (
<div className="session-details">
<div className="panel-header panel-header-inline">
<h3>Session</h3>
</div>
<dl className="runtime-details">
<div>
<dt>Active</dt>
<dd>{activeSessionId || 'None'}</dd>
</div>
<div>
<dt>Mode</dt>
<dd>{chatState.mode || 'Unknown'}</dd>
</div>
<div>
<dt>Commands</dt>
<dd>{chatState.availableCommands.length}</dd>
</div>
<div>
<dt>Skills</dt>
<dd>{chatState.availableSkills.length}</dd>
</div>
<div>
<dt>Tokens</dt>
<dd>{chatState.latestUsage?.usage?.totalTokens ?? 'Unknown'}</dd>
</div>
{sessionError || chatState.error ? (
<div>
<dt>Error</dt>
<dd className="error-text">{sessionError || chatState.error}</dd>
</div>
) : null}
</dl>
</div>
);
}
function getErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : 'Desktop operation failed.';
}

View file

@ -14,6 +14,7 @@ export interface DesktopHealth {
}
export interface DesktopConnectionStatus {
serverInfo: DesktopServerInfo;
serverUrl: string;
health: DesktopHealth;
runtime: DesktopRuntime;
@ -42,6 +43,17 @@ export interface DesktopRuntime {
};
}
export interface DesktopSessionSummary {
sessionId: string;
title?: string;
cwd?: string;
}
export interface DesktopSessionList {
sessions: DesktopSessionSummary[];
nextCursor?: string;
}
export async function loadDesktopStatus(): Promise<DesktopConnectionStatus> {
const serverInfo = await getServerInfo();
const [health, runtime] = await Promise.all([
@ -50,12 +62,39 @@ export async function loadDesktopStatus(): Promise<DesktopConnectionStatus> {
]);
return {
serverInfo,
serverUrl: serverInfo.url,
health,
runtime,
};
}
export async function listDesktopSessions(
serverInfo: DesktopServerInfo,
cwd?: string,
): Promise<DesktopSessionList> {
const url = new URL('/api/sessions', serverInfo.url);
if (cwd) {
url.searchParams.set('cwd', cwd);
}
return getJson(serverInfo, `${url.pathname}${url.search}`, isSessionList);
}
export async function createDesktopSession(
serverInfo: DesktopServerInfo,
cwd: string,
): Promise<DesktopSessionSummary> {
const response = await writeJson(
serverInfo,
'/api/sessions',
'POST',
{ cwd },
isCreateSessionResponse,
);
return response.session;
}
async function getJson<T>(
serverInfo: DesktopServerInfo,
path: string,
@ -69,7 +108,31 @@ async function getJson<T>(
const payload = (await response.json()) as unknown;
if (!response.ok || !isExpectedPayload(payload)) {
throw new Error(`Desktop service request failed: ${path}`);
throw new Error(getResponseErrorMessage(payload, path));
}
return payload;
}
async function writeJson<T>(
serverInfo: DesktopServerInfo,
path: string,
method: 'POST',
body: Record<string, unknown>,
isExpectedPayload: (value: unknown) => value is T,
): Promise<T> {
const response = await fetch(new URL(path, serverInfo.url), {
method,
headers: {
Authorization: `Bearer ${serverInfo.token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
const payload = (await response.json()) as unknown;
if (!response.ok || !isExpectedPayload(payload)) {
throw new Error(getResponseErrorMessage(payload, path));
}
return payload;
@ -83,6 +146,19 @@ async function getServerInfo(): Promise<DesktopServerInfo> {
return window.qwenDesktop.getServerInfo();
}
function getResponseErrorMessage(payload: unknown, path: string): string {
if (
payload &&
typeof payload === 'object' &&
'message' in payload &&
typeof payload.message === 'string'
) {
return payload.message;
}
return `Desktop service request failed: ${path}`;
}
function isDesktopHealth(value: unknown): value is DesktopHealth {
if (!value || typeof value !== 'object') {
return false;
@ -146,3 +222,41 @@ function isDesktopRuntimePlatform(
typeof value.release === 'string'
);
}
function isSessionList(value: unknown): value is DesktopSessionList {
if (!value || typeof value !== 'object') {
return false;
}
const candidate = value as { sessions?: unknown; nextCursor?: unknown };
return (
Array.isArray(candidate.sessions) &&
candidate.sessions.every(isSessionSummary) &&
(typeof candidate.nextCursor === 'string' ||
candidate.nextCursor === undefined)
);
}
function isCreateSessionResponse(
value: unknown,
): value is { ok: true; session: DesktopSessionSummary } {
if (!value || typeof value !== 'object') {
return false;
}
const candidate = value as { ok?: unknown; session?: unknown };
return candidate.ok === true && isSessionSummary(candidate.session);
}
function isSessionSummary(value: unknown): value is DesktopSessionSummary {
if (!value || typeof value !== 'object') {
return false;
}
const candidate = value as Partial<DesktopSessionSummary>;
return (
typeof candidate.sessionId === 'string' &&
(typeof candidate.title === 'string' || candidate.title === undefined) &&
(typeof candidate.cwd === 'string' || candidate.cwd === undefined)
);
}

View file

@ -0,0 +1,98 @@
/**
* @license
* Copyright 2026 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
import type { DesktopServerInfo } from '../../shared/desktopApi.js';
import type {
DesktopClientMessage,
DesktopServerMessage,
} from '../../shared/desktopProtocol.js';
export interface SessionSocketHandlers {
onOpen?: () => void;
onMessage: (message: DesktopServerMessage) => void;
onClose?: (event: CloseEvent) => void;
onError?: (event: Event) => void;
}
export interface SessionSocketClient {
sendUserMessage(content: string): void;
stopGeneration(): void;
ping(): void;
close(): void;
}
export function connectSessionSocket(
serverInfo: DesktopServerInfo,
sessionId: string,
handlers: SessionSocketHandlers,
): SessionSocketClient {
const socket = new WebSocket(createSessionSocketUrl(serverInfo, sessionId));
socket.addEventListener('open', () => handlers.onOpen?.());
socket.addEventListener('message', (event) => {
const message = parseServerMessage(event.data);
if (message) {
handlers.onMessage(message);
}
});
socket.addEventListener('close', (event) => handlers.onClose?.(event));
socket.addEventListener('error', (event) => handlers.onError?.(event));
return {
sendUserMessage(content: string): void {
sendClientMessage(socket, { type: 'user_message', content });
},
stopGeneration(): void {
sendClientMessage(socket, { type: 'stop_generation' });
},
ping(): void {
sendClientMessage(socket, { type: 'ping' });
},
close(): void {
socket.close();
},
};
}
function createSessionSocketUrl(
serverInfo: DesktopServerInfo,
sessionId: string,
): string {
const url = new URL(`/ws/${encodeURIComponent(sessionId)}`, serverInfo.url);
url.protocol = url.protocol === 'https:' ? 'wss:' : 'ws:';
url.searchParams.set('token', serverInfo.token);
return url.toString();
}
function sendClientMessage(
socket: WebSocket,
message: DesktopClientMessage,
): void {
if (socket.readyState !== WebSocket.OPEN) {
return;
}
socket.send(JSON.stringify(message));
}
function parseServerMessage(value: unknown): DesktopServerMessage | null {
if (typeof value !== 'string') {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(value) as unknown;
} catch {
return null;
}
if (!parsed || typeof parsed !== 'object' || !('type' in parsed)) {
return null;
}
return parsed as DesktopServerMessage;
}

View file

@ -0,0 +1,308 @@
/**
* @license
* Copyright 2026 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
import type {
DesktopAvailableCommand,
DesktopPlanEntry,
DesktopServerMessage,
DesktopToolCallUpdate,
DesktopUsageStats,
} from '../../shared/desktopProtocol.js';
type ChatConnectionState = 'idle' | 'connecting' | 'connected' | 'closed';
export type ChatTimelineItem =
| {
id: string;
type: 'message';
role: 'assistant' | 'thinking' | 'user';
text: string;
streaming: boolean;
timestamp: number;
}
| {
id: string;
type: 'tool';
toolCall: DesktopToolCallUpdate;
timestamp: number;
}
| {
id: string;
type: 'plan';
entries: DesktopPlanEntry[];
timestamp: number;
}
| {
id: string;
type: 'event';
label: string;
timestamp: number;
};
export interface ChatState {
connection: ChatConnectionState;
streaming: boolean;
items: ChatTimelineItem[];
latestUsage: DesktopUsageStats | null;
availableCommands: DesktopAvailableCommand[];
availableSkills: string[];
mode: string | null;
error: string | null;
}
export type ChatAction =
| { type: 'connect' }
| { type: 'disconnect' }
| { type: 'append_user_message'; content: string }
| { type: 'server_message'; message: DesktopServerMessage };
export function createInitialChatState(): ChatState {
return {
connection: 'idle',
streaming: false,
items: [],
latestUsage: null,
availableCommands: [],
availableSkills: [],
mode: null,
error: null,
};
}
export function chatReducer(state: ChatState, action: ChatAction): ChatState {
switch (action.type) {
case 'connect':
return {
...state,
connection: 'connecting',
error: null,
};
case 'disconnect':
return {
...state,
connection: 'closed',
streaming: false,
};
case 'append_user_message':
return {
...state,
streaming: true,
error: null,
items: [
...state.items,
createMessageItem('user', action.content, false),
],
};
case 'server_message':
return applyServerMessage(state, action.message);
default:
return state;
}
}
function applyServerMessage(
state: ChatState,
message: DesktopServerMessage,
): ChatState {
switch (message.type) {
case 'connected':
return {
...state,
connection: 'connected',
error: null,
items: [
...state.items,
createEventItem(`Connected to ${message.sessionId}`),
],
};
case 'pong':
return state;
case 'message_delta':
return {
...state,
streaming: true,
items: appendMessageDelta(state.items, message.role, message.text),
};
case 'tool_call':
return {
...state,
items: upsertToolCall(state.items, message.data),
};
case 'plan':
return {
...state,
items: upsertPlan(state.items, message.entries),
};
case 'usage':
return {
...state,
latestUsage: message.data,
};
case 'mode_changed':
return {
...state,
mode: message.mode,
};
case 'available_commands':
return {
...state,
availableCommands: message.commands,
availableSkills: message.skills,
};
case 'message_complete':
return {
...state,
streaming: false,
items: [
...markStreamingMessagesComplete(state.items),
createEventItem(
message.stopReason
? `Turn complete: ${message.stopReason}`
: 'Turn complete',
),
],
};
case 'error':
return {
...state,
streaming: false,
error: message.message,
items: [...state.items, createEventItem(message.message)],
};
default:
return state;
}
}
function appendMessageDelta(
items: ChatTimelineItem[],
role: 'assistant' | 'thinking' | 'user',
text: string,
): ChatTimelineItem[] {
const lastItem = items[items.length - 1];
if (
lastItem?.type === 'message' &&
lastItem.role === role &&
lastItem.streaming
) {
return [
...items.slice(0, -1),
{
...lastItem,
text: `${lastItem.text}${text}`,
},
];
}
return [...items, createMessageItem(role, text, true)];
}
function upsertToolCall(
items: ChatTimelineItem[],
update: DesktopToolCallUpdate,
): ChatTimelineItem[] {
const index = items.findIndex(
(item) =>
item.type === 'tool' && item.toolCall.toolCallId === update.toolCallId,
);
if (index === -1) {
return [...items, createToolItem(update)];
}
return items.map((item, itemIndex) => {
if (itemIndex !== index || item.type !== 'tool') {
return item;
}
return {
...item,
toolCall: {
...item.toolCall,
...update,
},
};
});
}
function upsertPlan(
items: ChatTimelineItem[],
entries: DesktopPlanEntry[],
): ChatTimelineItem[] {
const index = items.findIndex((item) => item.type === 'plan');
if (index === -1) {
return [...items, createPlanItem(entries)];
}
return items.map((item, itemIndex) =>
itemIndex === index && item.type === 'plan'
? { ...item, entries, timestamp: Date.now() }
: item,
);
}
function markStreamingMessagesComplete(
items: ChatTimelineItem[],
): ChatTimelineItem[] {
return items.map((item) =>
item.type === 'message' ? { ...item, streaming: false } : item,
);
}
function createMessageItem(
role: 'assistant' | 'thinking' | 'user',
text: string,
streaming: boolean,
): ChatTimelineItem {
return {
id: `message-${Date.now()}-${Math.random().toString(16).slice(2)}`,
type: 'message',
role,
text,
streaming,
timestamp: Date.now(),
};
}
function createToolItem(toolCall: DesktopToolCallUpdate): ChatTimelineItem {
return {
id: `tool-${toolCall.toolCallId}`,
type: 'tool',
toolCall,
timestamp: toolCall.timestamp ?? Date.now(),
};
}
function createPlanItem(entries: DesktopPlanEntry[]): ChatTimelineItem {
return {
id: 'plan-current',
type: 'plan',
entries,
timestamp: Date.now(),
};
}
function createEventItem(label: string): ChatTimelineItem {
return {
id: `event-${Date.now()}-${Math.random().toString(16).slice(2)}`,
type: 'event',
label,
timestamp: Date.now(),
};
}

View file

@ -28,6 +28,17 @@ textarea {
font: inherit;
}
button {
border: 0;
cursor: pointer;
}
button:disabled,
textarea:disabled {
cursor: not-allowed;
opacity: 0.55;
}
#root {
min-height: 100vh;
}
@ -114,6 +125,76 @@ textarea {
font-size: 13px;
}
.workspace-path {
min-height: 42px;
padding: 12px;
border: 1px solid rgba(238, 240, 237, 0.09);
overflow-wrap: anywhere;
color: #d8dcd6;
font-size: 13px;
}
.primary-button,
.secondary-button {
min-height: 36px;
padding: 0 12px;
border: 1px solid transparent;
font-size: 13px;
font-weight: 700;
letter-spacing: 0;
}
.primary-button {
background: #e7bd73;
color: #161311;
}
.secondary-button {
border-color: rgba(238, 240, 237, 0.13);
background: rgba(238, 240, 237, 0.04);
color: #d8dcd6;
}
.session-list {
display: grid;
gap: 8px;
}
.session-row {
display: grid;
gap: 4px;
width: 100%;
min-height: 54px;
padding: 10px 12px;
border: 1px solid rgba(238, 240, 237, 0.09);
background: transparent;
color: #d8dcd6;
text-align: left;
}
.session-row-active {
border-color: rgba(231, 189, 115, 0.55);
background: rgba(231, 189, 115, 0.1);
}
.session-row span,
.session-row small {
min-width: 0;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.session-row span {
font-size: 13px;
font-weight: 700;
}
.session-row small {
color: #858c84;
font-size: 11px;
}
.workbench {
display: flex;
min-width: 0;
@ -208,6 +289,133 @@ textarea {
font-size: 14px;
}
.chat-timeline {
display: flex;
flex: 1;
min-height: 0;
flex-direction: column;
gap: 10px;
overflow: auto;
padding: 16px;
}
.chat-message,
.chat-tool,
.chat-plan {
display: grid;
gap: 8px;
max-width: min(760px, 100%);
padding: 12px 14px;
border: 1px solid rgba(238, 240, 237, 0.09);
background: rgba(238, 240, 237, 0.035);
}
.chat-message-user {
align-self: flex-end;
border-color: rgba(231, 189, 115, 0.36);
background: rgba(231, 189, 115, 0.1);
}
.chat-message-thinking {
border-color: rgba(113, 169, 239, 0.32);
background: rgba(113, 169, 239, 0.08);
}
.chat-message p,
.chat-tool strong,
.chat-tool span,
.chat-plan ol {
margin: 0;
}
.chat-message p {
white-space: pre-wrap;
color: #eef0ed;
font-size: 14px;
line-height: 1.55;
}
.message-role {
color: #9ca39b;
font-size: 11px;
font-weight: 800;
text-transform: uppercase;
}
.chat-tool {
border-color: rgba(99, 214, 157, 0.28);
}
.chat-tool strong {
color: #eef0ed;
font-size: 14px;
}
.chat-tool span {
color: #96e6ba;
font-size: 12px;
font-weight: 700;
}
.chat-plan ol {
display: grid;
gap: 8px;
padding-left: 18px;
}
.chat-plan li {
color: #d8dcd6;
font-size: 13px;
line-height: 1.45;
}
.chat-plan li span {
display: inline-block;
min-width: 88px;
color: #9ca39b;
font-size: 11px;
font-weight: 800;
text-transform: uppercase;
}
.chat-event {
align-self: center;
max-width: 100%;
padding: 6px 10px;
color: #858c84;
font-size: 12px;
}
.composer {
display: grid;
gap: 10px;
padding: 14px;
border-top: 1px solid rgba(238, 240, 237, 0.08);
background: rgba(16, 18, 20, 0.76);
}
.composer textarea {
width: 100%;
min-height: 76px;
resize: vertical;
border: 1px solid rgba(238, 240, 237, 0.13);
background: rgba(238, 240, 237, 0.035);
color: #eef0ed;
padding: 12px;
line-height: 1.45;
}
.composer textarea:focus {
border-color: rgba(231, 189, 115, 0.55);
outline: 0;
}
.composer-actions {
display: flex;
justify-content: flex-end;
gap: 8px;
}
.runtime-row,
.runtime-details {
padding: 16px;
@ -248,6 +456,14 @@ textarea {
color: #ff9a84;
}
.session-details {
border-top: 1px solid rgba(238, 240, 237, 0.08);
}
.panel-header-inline {
border-bottom: 0;
}
@media (max-width: 860px) {
.desktop-shell {
grid-template-columns: 1fr;

View file

@ -0,0 +1,175 @@
/**
* @license
* Copyright 2026 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
import { describe, expect, it } from 'vitest';
import type { SessionNotification } from '@agentclientprotocol/sdk';
import { normalizeSessionUpdate } from './AcpEventRouter.js';
describe('normalizeSessionUpdate', () => {
it('normalizes assistant text chunks and usage metadata', () => {
const messages = normalizeSessionUpdate({
sessionId: 'session-1',
update: {
sessionUpdate: 'agent_message_chunk',
content: { type: 'text', text: 'hello' },
_meta: {
usage: {
inputTokens: 11,
outputTokens: 7,
thoughtTokens: 2,
totalTokens: 20,
},
durationMs: 1500,
},
},
} as SessionNotification);
expect(messages).toEqual([
{ type: 'message_delta', role: 'assistant', text: 'hello' },
{
type: 'usage',
data: {
usage: {
inputTokens: 11,
outputTokens: 7,
thoughtTokens: 2,
totalTokens: 20,
cachedReadTokens: undefined,
cachedWriteTokens: undefined,
promptTokens: 11,
completionTokens: 7,
thoughtsTokens: 2,
cachedTokens: undefined,
},
durationMs: 1500,
},
},
]);
});
it('normalizes thought chunks, tool calls, plans, modes, and commands', () => {
expect(
normalizeSessionUpdate({
sessionId: 'session-1',
update: {
sessionUpdate: 'agent_thought_chunk',
content: { type: 'text', text: 'thinking' },
},
} as SessionNotification),
).toEqual([{ type: 'message_delta', role: 'thinking', text: 'thinking' }]);
expect(
normalizeSessionUpdate({
sessionId: 'session-1',
update: {
sessionUpdate: 'tool_call',
toolCallId: 'tool-1',
title: 'Read file',
kind: 'read',
status: 'pending',
rawInput: { path: 'README.md' },
locations: [{ path: 'README.md', line: 1 }],
_meta: { timestamp: 123 },
},
} as SessionNotification),
).toEqual([
{
type: 'tool_call',
data: {
toolCallId: 'tool-1',
title: 'Read file',
kind: 'read',
status: 'pending',
rawInput: { path: 'README.md' },
rawOutput: undefined,
content: undefined,
locations: [{ path: 'README.md', line: 1 }],
timestamp: 123,
},
},
]);
expect(
normalizeSessionUpdate({
sessionId: 'session-1',
update: {
sessionUpdate: 'plan',
entries: [
{ content: 'Implement', priority: 'high', status: 'in_progress' },
],
},
} as SessionNotification),
).toEqual([
{
type: 'plan',
entries: [
{ content: 'Implement', priority: 'high', status: 'in_progress' },
],
},
]);
expect(
normalizeSessionUpdate({
sessionId: 'session-1',
update: {
sessionUpdate: 'current_mode_update',
currentModeId: 'auto-edit',
},
} as SessionNotification),
).toEqual([{ type: 'mode_changed', mode: 'auto-edit' }]);
expect(
normalizeSessionUpdate({
sessionId: 'session-1',
update: {
sessionUpdate: 'available_commands_update',
availableCommands: [
{
name: 'help',
description: 'Show help',
input: { hint: 'topic' },
},
],
_meta: { availableSkills: ['review'] },
},
} as SessionNotification),
).toEqual([
{
type: 'available_commands',
commands: [
{
name: 'help',
description: 'Show help',
input: { hint: 'topic' },
},
],
skills: ['review'],
},
]);
});
it('normalizes explicit usage updates', () => {
const messages = normalizeSessionUpdate({
sessionId: 'session-1',
update: {
sessionUpdate: 'usage_update',
used: 4096,
size: 128000,
},
} as SessionNotification);
expect(messages).toEqual([
{
type: 'usage',
data: {
usage: { totalTokens: 4096 },
tokenLimit: 128000,
cost: undefined,
},
},
]);
});
});

View file

@ -0,0 +1,249 @@
/**
* @license
* Copyright 2026 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
import type {
AvailableCommand,
SessionNotification,
} from '@agentclientprotocol/sdk';
import type {
DesktopAvailableCommand,
DesktopServerMessage,
DesktopToolCallUpdate,
DesktopUsageStats,
} from '../../shared/desktopProtocol.js';
export interface AcpEventRouterOptions {
broadcast(sessionId: string, message: DesktopServerMessage): void;
}
export class AcpEventRouter {
constructor(private readonly options: AcpEventRouterOptions) {}
handleSessionUpdate(notification: SessionNotification): void {
const messages = normalizeSessionUpdate(notification);
for (const message of messages) {
this.options.broadcast(notification.sessionId, message);
}
}
}
export function normalizeSessionUpdate(
notification: SessionNotification,
): DesktopServerMessage[] {
const { update } = notification;
const messages: DesktopServerMessage[] = [];
switch (update.sessionUpdate) {
case 'user_message_chunk': {
const text = getTextContent(update.content);
if (text) {
messages.push({ type: 'message_delta', role: 'user', text });
}
messages.push(...getUsageMessages(update._meta));
break;
}
case 'agent_message_chunk': {
const text = getTextContent(update.content);
if (text) {
messages.push({ type: 'message_delta', role: 'assistant', text });
}
messages.push(...getUsageMessages(update._meta));
break;
}
case 'agent_thought_chunk': {
const text = getTextContent(update.content);
if (text) {
messages.push({ type: 'message_delta', role: 'thinking', text });
}
messages.push(...getUsageMessages(update._meta));
break;
}
case 'tool_call':
case 'tool_call_update':
messages.push({
type: 'tool_call',
data: normalizeToolCall(update),
});
break;
case 'plan':
messages.push({
type: 'plan',
entries: update.entries.map((entry) => ({
content: entry.content,
priority: entry.priority,
status: entry.status,
})),
});
break;
case 'current_mode_update':
messages.push({
type: 'mode_changed',
mode: update.currentModeId,
});
break;
case 'available_commands_update':
messages.push({
type: 'available_commands',
commands: update.availableCommands.map(normalizeAvailableCommand),
skills: getStringArray(getRecord(update._meta)?.['availableSkills']),
});
break;
case 'usage_update':
messages.push({
type: 'usage',
data: {
usage: { totalTokens: update.used },
tokenLimit: update.size,
cost: update.cost ?? undefined,
},
});
break;
case 'config_option_update':
case 'session_info_update':
break;
default:
break;
}
return messages;
}
function getTextContent(content: { type: string } | undefined): string {
if (content?.type !== 'text') {
return '';
}
const text = (content as { text?: unknown }).text;
return typeof text === 'string' ? text : '';
}
function normalizeToolCall(
update: Extract<
SessionNotification['update'],
{ sessionUpdate: 'tool_call' | 'tool_call_update' }
>,
): DesktopToolCallUpdate {
const meta = getRecord(update._meta);
const timestamp = getOptionalNumber(meta?.['timestamp']);
return {
toolCallId: update.toolCallId,
kind: getOptionalString(update.kind),
title: getOptionalString(update.title),
status: getOptionalString(update.status),
rawInput: update.rawInput,
rawOutput: update.rawOutput,
content: update.content ?? undefined,
locations: update.locations ?? undefined,
...(timestamp !== undefined && { timestamp }),
};
}
function normalizeAvailableCommand(
command: AvailableCommand,
): DesktopAvailableCommand {
return {
name: command.name,
description: command.description,
input: command.input ?? undefined,
};
}
function getUsageMessages(
meta: Record<string, unknown> | null | undefined,
): DesktopServerMessage[] {
const usage = getUsageStats(meta);
return usage ? [{ type: 'usage', data: usage }] : [];
}
function getUsageStats(
meta: Record<string, unknown> | null | undefined,
): DesktopUsageStats | null {
if (!meta) {
return null;
}
const rawUsage = getRecord(meta['usage']);
const durationMs = getNullableNumber(meta['durationMs']);
if (!rawUsage && durationMs === undefined) {
return null;
}
return {
usage: rawUsage
? {
inputTokens:
getNullableNumber(rawUsage['inputTokens']) ??
getNullableNumber(rawUsage['promptTokens']),
outputTokens:
getNullableNumber(rawUsage['outputTokens']) ??
getNullableNumber(rawUsage['completionTokens']),
thoughtTokens:
getNullableNumber(rawUsage['thoughtTokens']) ??
getNullableNumber(rawUsage['thoughtsTokens']),
totalTokens: getNullableNumber(rawUsage['totalTokens']),
cachedReadTokens:
getNullableNumber(rawUsage['cachedReadTokens']) ??
getNullableNumber(rawUsage['cachedTokens']),
cachedWriteTokens: getNullableNumber(rawUsage['cachedWriteTokens']),
promptTokens:
getNullableNumber(rawUsage['promptTokens']) ??
getNullableNumber(rawUsage['inputTokens']),
completionTokens:
getNullableNumber(rawUsage['completionTokens']) ??
getNullableNumber(rawUsage['outputTokens']),
thoughtsTokens:
getNullableNumber(rawUsage['thoughtsTokens']) ??
getNullableNumber(rawUsage['thoughtTokens']),
cachedTokens:
getNullableNumber(rawUsage['cachedTokens']) ??
getNullableNumber(rawUsage['cachedReadTokens']),
}
: undefined,
...(durationMs !== undefined && { durationMs }),
};
}
function getRecord(value: unknown): Record<string, unknown> | undefined {
if (!value || typeof value !== 'object' || Array.isArray(value)) {
return undefined;
}
return value as Record<string, unknown>;
}
function getOptionalString(value: unknown): string | undefined {
return typeof value === 'string' ? value : undefined;
}
function getOptionalNumber(value: unknown): number | undefined {
return typeof value === 'number' ? value : undefined;
}
function getNullableNumber(value: unknown): number | null | undefined {
if (value === null) {
return null;
}
return getOptionalNumber(value);
}
function getStringArray(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
return value.filter((item): item is string => typeof item === 'string');
}

View file

@ -6,6 +6,7 @@
import { afterEach, describe, expect, it, vi } from 'vitest';
import { WebSocket } from 'ws';
import type { SessionNotification } from '@agentclientprotocol/sdk';
import { startDesktopServer } from './index.js';
import type { DesktopServer } from './types.js';
import type { AcpSessionClient } from './services/sessionService.js';
@ -254,6 +255,95 @@ describe('DesktopServer', () => {
testSocket.socket.close();
});
it('broadcasts normalized ACP session updates over WebSocket', async () => {
const acpClient = createAcpClient();
const server = await createTestServer(acpClient);
const testSocket = await connectSocket(server, '/ws/session-1');
await testSocket.readMessage();
acpClient.emitSessionUpdate({
sessionId: 'session-1',
update: {
sessionUpdate: 'agent_message_chunk',
content: { type: 'text', text: 'streamed text' },
_meta: {
usage: {
inputTokens: 5,
outputTokens: 3,
totalTokens: 8,
},
},
},
} as SessionNotification);
expect(await testSocket.readMessage()).toMatchObject({
type: 'message_delta',
role: 'assistant',
text: 'streamed text',
});
expect(await testSocket.readMessage()).toMatchObject({
type: 'usage',
data: {
usage: {
inputTokens: 5,
outputTokens: 3,
totalTokens: 8,
},
},
});
testSocket.socket.close();
});
it('broadcasts ACP tool and plan updates only to the matching session', async () => {
const acpClient = createAcpClient();
const server = await createTestServer(acpClient);
const matchingSocket = await connectSocket(server, '/ws/session-1');
const otherSocket = await connectSocket(server, '/ws/session-2');
await matchingSocket.readMessage();
await otherSocket.readMessage();
acpClient.emitSessionUpdate({
sessionId: 'session-1',
update: {
sessionUpdate: 'tool_call',
toolCallId: 'tool-1',
title: 'Run command',
kind: 'execute',
status: 'in_progress',
},
} as SessionNotification);
acpClient.emitSessionUpdate({
sessionId: 'session-1',
update: {
sessionUpdate: 'plan',
entries: [
{ content: 'Wire events', priority: 'medium', status: 'completed' },
],
},
} as SessionNotification);
expect(await matchingSocket.readMessage()).toMatchObject({
type: 'tool_call',
data: {
toolCallId: 'tool-1',
title: 'Run command',
kind: 'execute',
status: 'in_progress',
},
});
expect(await matchingSocket.readMessage()).toMatchObject({
type: 'plan',
entries: [
{ content: 'Wire events', priority: 'medium', status: 'completed' },
],
});
otherSocket.socket.send(JSON.stringify({ type: 'ping' }));
expect(await otherSocket.readMessage()).toMatchObject({ type: 'pong' });
matchingSocket.socket.close();
otherSocket.socket.close();
});
it('cancels generation over WebSocket', async () => {
const acpClient = createAcpClient();
const server = await createTestServer(acpClient);
@ -369,9 +459,17 @@ async function writeJson(
};
}
function createAcpClient(): AcpSessionClient {
return {
interface TestAcpClient extends AcpSessionClient {
emitSessionUpdate(notification: SessionNotification): void;
}
function createAcpClient(): TestAcpClient {
const client: TestAcpClient = {
isConnected: true,
onSessionUpdate: undefined,
emitSessionUpdate(notification: SessionNotification): void {
client.onSessionUpdate?.(notification);
},
listSessions: vi.fn().mockResolvedValue({
sessions: [{ sessionId: 'session-1', title: 'Test session' }],
nextCursor: '3',
@ -382,6 +480,7 @@ function createAcpClient(): AcpSessionClient {
cancel: vi.fn().mockResolvedValue(undefined),
extMethod: vi.fn().mockResolvedValue({ success: true }),
};
return client;
}
async function connectSocket(

View file

@ -17,6 +17,7 @@ import {
isAllowedOrigin,
isAuthorized,
} from './http/auth.js';
import { AcpEventRouter } from './acp/AcpEventRouter.js';
import { isDesktopHttpError, DesktopHttpError } from './http/errors.js';
import { getRuntimeInfo } from './services/runtimeService.js';
import { DesktopSessionService } from './services/sessionService.js';
@ -45,6 +46,16 @@ export async function startDesktopServer(
token,
acpClient: options.acpClient,
});
const acpEventRouter = new AcpEventRouter({
broadcast: (sessionId, message) => socketHub.broadcast(sessionId, message),
});
const previousSessionUpdateHandler = options.acpClient?.onSessionUpdate;
if (options.acpClient) {
options.acpClient.onSessionUpdate = (notification) => {
previousSessionUpdateHandler?.(notification);
acpEventRouter.handleSessionUpdate(notification);
};
}
const server = createServer((request, response) => {
void handleRequest(request, response, {
token,

View file

@ -9,11 +9,13 @@ import type {
LoadSessionResponse,
NewSessionResponse,
PromptResponse,
SessionNotification,
} from '@agentclientprotocol/sdk';
import { DesktopHttpError } from '../http/errors.js';
export interface AcpSessionClient {
readonly isConnected?: boolean;
onSessionUpdate?: (notification: SessionNotification) => void;
connect?(): Promise<unknown>;
listSessions(options?: {
cwd?: string;

View file

@ -9,17 +9,10 @@ import type { Duplex } from 'node:stream';
import { WebSocket, WebSocketServer } from 'ws';
import { getSingleHeader, isAllowedOrigin } from '../http/auth.js';
import type { AcpSessionClient } from '../services/sessionService.js';
type ClientMessage =
| { type: 'ping' }
| { type: 'stop_generation' }
| { type: 'user_message'; content: string };
type ServerMessage =
| { type: 'connected'; sessionId: string }
| { type: 'pong' }
| { type: 'message_complete'; stopReason?: string }
| { type: 'error'; code: string; message: string; retryable?: boolean };
import type {
DesktopClientMessage,
DesktopServerMessage,
} from '../../shared/desktopProtocol.js';
interface SessionSocketHubOptions {
token: string;
@ -73,7 +66,7 @@ export class SessionSocketHub {
this.server.close();
}
broadcast(sessionId: string, message: ServerMessage): void {
broadcast(sessionId: string, message: DesktopServerMessage): void {
const sockets = this.socketsBySession.get(sessionId);
if (!sockets) {
return;
@ -225,7 +218,7 @@ function matchSessionId(pathname: string): string | null {
function parseClientMessage(
rawMessage: WebSocket.RawData,
): ClientMessage | null {
): DesktopClientMessage | null {
let parsed: unknown;
try {
parsed = JSON.parse(rawMessage.toString()) as unknown;
@ -237,7 +230,7 @@ function parseClientMessage(
return null;
}
const candidate = parsed as Partial<ClientMessage>;
const candidate = parsed as Partial<DesktopClientMessage>;
if (candidate.type === 'ping' || candidate.type === 'stop_generation') {
return { type: candidate.type };
}
@ -252,7 +245,7 @@ function parseClientMessage(
return null;
}
function sendMessage(socket: WebSocket, message: ServerMessage): void {
function sendMessage(socket: WebSocket, message: DesktopServerMessage): void {
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify(message));
}

View file

@ -0,0 +1,72 @@
/**
* @license
* Copyright 2026 Qwen Team
* SPDX-License-Identifier: Apache-2.0
*/
export type DesktopClientMessage =
| { type: 'ping' }
| { type: 'stop_generation' }
| { type: 'user_message'; content: string };
export interface DesktopPlanEntry {
content: string;
priority?: 'high' | 'medium' | 'low';
status: 'pending' | 'in_progress' | 'completed';
}
export interface DesktopToolCallUpdate {
toolCallId: string;
kind?: string;
title?: string;
status?: string;
rawInput?: unknown;
rawOutput?: unknown;
content?: unknown[];
locations?: Array<{ path: string; line?: number | null }>;
timestamp?: number;
}
export interface DesktopUsageStats {
usage?: {
inputTokens?: number | null;
outputTokens?: number | null;
thoughtTokens?: number | null;
totalTokens?: number | null;
cachedReadTokens?: number | null;
cachedWriteTokens?: number | null;
promptTokens?: number | null;
completionTokens?: number | null;
thoughtsTokens?: number | null;
cachedTokens?: number | null;
} | null;
durationMs?: number | null;
tokenLimit?: number | null;
cost?: unknown;
}
export interface DesktopAvailableCommand {
name: string;
description: string;
input?: unknown;
}
export type DesktopServerMessage =
| { type: 'connected'; sessionId: string }
| { type: 'pong' }
| {
type: 'message_delta';
role: 'assistant' | 'thinking' | 'user';
text: string;
}
| { type: 'tool_call'; data: DesktopToolCallUpdate }
| { type: 'plan'; entries: DesktopPlanEntry[] }
| { type: 'usage'; data: DesktopUsageStats }
| { type: 'mode_changed'; mode: string }
| {
type: 'available_commands';
commands: DesktopAvailableCommand[];
skills: string[];
}
| { type: 'message_complete'; stopReason?: string }
| { type: 'error'; code: string; message: string; retryable?: boolean };