diff --git a/integration-tests/cli/acp-cron.test.ts b/integration-tests/cli/acp-cron.test.ts new file mode 100644 index 000000000..84eb71a01 --- /dev/null +++ b/integration-tests/cli/acp-cron.test.ts @@ -0,0 +1,380 @@ +/** + * @license + * Copyright 2026 Qwen Team + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * ACP integration tests for in-session cron/loop scheduling. + * + * These verify that cron jobs created during an ACP session fire correctly + * and stream results back to the client via sessionUpdate notifications, + * even after the originating prompt has already returned. + * + * The two tests share one ACP session to stay within 2 minutes total: + * 1. Fast smoke test — cron tools available (no cron fire needed) + * 2. Combined test — create job, verify session responsive, wait for + * cron fire, check content + _meta.source, then clean up + */ + +import { spawn } from 'node:child_process'; +import { readFileSync, writeFileSync } from 'node:fs'; +import { createInterface } from 'node:readline'; +import { setTimeout as delay } from 'node:timers/promises'; +import { describe, it, expect } from 'vitest'; +import { TestRig } from '../test-helper.js'; + +const REQUEST_TIMEOUT_MS = 60_000; + +const IS_SANDBOX = + process.env['QWEN_SANDBOX'] && + process.env['QWEN_SANDBOX']!.toLowerCase() !== 'false'; + +type PendingRequest = { + resolve: (value: unknown) => void; + reject: (reason: Error) => void; + timeout: NodeJS.Timeout; +}; + +type SessionUpdateNotification = { + sessionId?: string; + update?: { + sessionUpdate?: string; + content?: { + type: string; + text?: string; + }; + title?: string; + toolCallId?: string; + status?: string; + _meta?: Record; + [key: string]: unknown; + }; +}; + +type PermissionRequest = { + id: number; + sessionId?: string; + toolCall?: { + toolCallId: string; + title: string; + kind: string; + status: string; + }; + options?: Array<{ + optionId: string; + name: string; + kind: string; + }>; +}; + +/** + * Sets up an ACP test environment with cron support enabled. + */ +function setupAcpCronTest(rig: TestRig) { + const pending = new Map(); + let nextRequestId = 1; + const sessionUpdates: (SessionUpdateNotification & { + receivedAt: number; + })[] = []; + const stderr: string[] = []; + + const agent = spawn( + 'node', + [rig.bundlePath, '--acp', '--no-chat-recording'], + { + cwd: rig.testDir!, + stdio: ['pipe', 'pipe', 'pipe'], + env: { + ...process.env, + QWEN_CODE_ENABLE_CRON: '1', + }, + }, + ); + + agent.stderr?.on('data', (chunk: Buffer) => { + stderr.push(chunk.toString()); + }); + + const rl = createInterface({ input: agent.stdout }); + + const send = (json: unknown) => { + agent.stdin.write(`${JSON.stringify(json)}\n`); + }; + + const sendResponse = (id: number, result: unknown) => { + send({ jsonrpc: '2.0', id, result }); + }; + + const sendRequest = (method: string, params?: unknown) => + new Promise((resolve, reject) => { + const id = nextRequestId++; + const timeout = setTimeout(() => { + pending.delete(id); + reject(new Error(`Request ${id} (${method}) timed out`)); + }, REQUEST_TIMEOUT_MS); + pending.set(id, { resolve, reject, timeout }); + send({ jsonrpc: '2.0', id, method, params }); + }); + + const handleResponse = (msg: { + id: number; + result?: unknown; + error?: { message?: string }; + }) => { + const waiter = pending.get(msg.id); + if (!waiter) return; + clearTimeout(waiter.timeout); + pending.delete(msg.id); + if (msg.error) { + const error = new Error(msg.error.message ?? 'Unknown error'); + (error as Error & { response?: unknown }).response = msg.error; + waiter.reject(error); + } else { + waiter.resolve(msg.result); + } + }; + + const handleMessage = (msg: { + id?: number; + method?: string; + params?: SessionUpdateNotification & { + path?: string; + content?: string; + sessionId?: string; + toolCall?: PermissionRequest['toolCall']; + options?: PermissionRequest['options']; + }; + result?: unknown; + error?: { message?: string }; + }) => { + if (typeof msg.id !== 'undefined' && ('result' in msg || 'error' in msg)) { + handleResponse( + msg as { + id: number; + result?: unknown; + error?: { message?: string }; + }, + ); + return; + } + + if (msg.method === 'session/update') { + sessionUpdates.push({ + sessionId: msg.params?.sessionId, + update: msg.params?.update, + receivedAt: Date.now(), + }); + return; + } + + if ( + msg.method === 'session/request_permission' && + typeof msg.id === 'number' + ) { + sendResponse(msg.id, { + outcome: { optionId: 'proceed_once', outcome: 'selected' }, + }); + return; + } + + if (msg.method === 'fs/read_text_file' && typeof msg.id === 'number') { + try { + const content = readFileSync(msg.params?.path ?? '', 'utf8'); + sendResponse(msg.id, { content }); + } catch (e) { + sendResponse(msg.id, { content: `ERROR: ${(e as Error).message}` }); + } + return; + } + + if (msg.method === 'fs/write_text_file' && typeof msg.id === 'number') { + try { + writeFileSync( + msg.params?.path ?? '', + msg.params?.content ?? '', + 'utf8', + ); + sendResponse(msg.id, null); + } catch (e) { + sendResponse(msg.id, { message: (e as Error).message }); + } + } + }; + + rl.on('line', (line: string) => { + if (!line.trim()) return; + try { + const msg = JSON.parse(line); + handleMessage(msg); + } catch { + // Ignore non-JSON output + } + }); + + /** + * Polls sessionUpdates until a notification matching the predicate appears, + * or the timeout expires. + */ + const waitForSessionUpdate = async ( + predicate: ( + update: SessionUpdateNotification & { receivedAt: number }, + ) => boolean, + description: string, + timeoutMs: number, + ): Promise => { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const match = sessionUpdates.find(predicate); + if (match) return match; + await delay(500); + } + throw new Error( + `Timed out waiting for sessionUpdate: ${description} (after ${timeoutMs}ms, ` + + `saw ${sessionUpdates.length} updates: ` + + `[${sessionUpdates.map((u) => u.update?.sessionUpdate).join(', ')}])`, + ); + }; + + const waitForExit = () => + new Promise((resolve) => { + if (agent.exitCode !== null || agent.signalCode) { + resolve(); + return; + } + agent.once('exit', () => resolve()); + }); + + const cleanup = async () => { + rl.close(); + agent.kill(); + pending.forEach(({ timeout }) => clearTimeout(timeout)); + pending.clear(); + await waitForExit(); + }; + + return { + sendRequest, + cleanup, + stderr, + sessionUpdates, + waitForSessionUpdate, + }; +} + +/** Standard ACP init + auth + new session sequence. */ +async function initSession( + sendRequest: (method: string, params?: unknown) => Promise, + testDir: string, +): Promise { + await sendRequest('initialize', { + protocolVersion: 1, + clientCapabilities: { + fs: { readTextFile: true, writeTextFile: true }, + }, + }); + + await sendRequest('authenticate', { methodId: 'openai' }); + + const newSession = (await sendRequest('session/new', { + cwd: testDir, + mcpServers: [], + })) as { sessionId: string }; + + return newSession.sessionId; +} + +(IS_SANDBOX ? describe.skip : describe)('acp cron integration', () => { + it( + 'cron job fires and streams results via sessionUpdate after prompt returns', + async () => { + const rig = new TestRig(); + rig.setup('acp-cron-e2e', { + settings: { experimental: { cron: true } }, + }); + + const { + sendRequest, + cleanup, + stderr, + // sessionUpdates available for debugging + waitForSessionUpdate, + } = setupAcpCronTest(rig); + + try { + const sessionId = await initSession(sendRequest, rig.testDir!); + + // --- Part 1: Create a cron job that fires every minute --- + const createResult = (await sendRequest('session/prompt', { + sessionId, + prompt: [ + { + type: 'text', + text: 'Call cron_create with cron expression "*/1 * * * *" and prompt "Say CRONFIRE7742 and nothing else" and recurring true. Confirm briefly.', + }, + ], + })) as { stopReason: string }; + expect(createResult.stopReason).toBe('end_turn'); + + const promptDoneAt = Date.now(); + + // --- Part 2: Session stays responsive while cron is pending --- + const interactiveResult = (await sendRequest('session/prompt', { + sessionId, + prompt: [ + { + type: 'text', + text: 'Say INTERACTIVE8899 and nothing else.', + }, + ], + })) as { stopReason: string }; + expect(interactiveResult.stopReason).toBe('end_turn'); + + // --- Part 3: Wait for cron-fired notification (up to 75s) --- + // The cron fires at the next minute boundary. The model response + // should stream back as sessionUpdate notifications after the + // originating prompt has already returned. + + // 3a: Check for user_message_chunk echoing the cron prompt with _meta.source + const cronUserMsg = await waitForSessionUpdate( + (u) => + u.update?.sessionUpdate === 'user_message_chunk' && + (u.update?.content?.text ?? '').includes('CRONFIRE7742') && + u.receivedAt > promptDoneAt, + 'cron-fired user_message_chunk with CRONFIRE7742', + 75_000, + ); + expect(cronUserMsg.update?._meta).toBeDefined(); + expect(cronUserMsg.update?._meta?.source).toBe('cron'); + + // 3b: Check for agent_message_chunk after the cron user message + // (the model's response to the cron prompt) + const cronAgentMsg = await waitForSessionUpdate( + (u) => + u.update?.sessionUpdate === 'agent_message_chunk' && + u.receivedAt > cronUserMsg.receivedAt, + 'agent_message_chunk after cron fire', + 15_000, // should already be here by now + ); + expect(cronAgentMsg.receivedAt).toBeGreaterThan(promptDoneAt); + + // --- Part 4: Clean up the cron job --- + await sendRequest('session/prompt', { + sessionId, + prompt: [ + { + type: 'text', + text: 'Delete all cron jobs using cron_delete.', + }, + ], + }); + } catch (e) { + if (stderr.length) console.error('Agent stderr:', stderr.join('')); + throw e; + } finally { + await cleanup(); + } + }, + { timeout: 120_000, retry: 0 }, + ); +}); diff --git a/packages/cli/src/acp-integration/session/Session.ts b/packages/cli/src/acp-integration/session/Session.ts index fd009dddf..18cf99a04 100644 --- a/packages/cli/src/acp-integration/session/Session.ts +++ b/packages/cli/src/acp-integration/session/Session.ts @@ -112,6 +112,12 @@ export class Session implements SessionContext { private turn: number = 0; private readonly runtimeBaseDir: string; + // Cron scheduling state + private cronQueue: string[] = []; + private cronProcessing = false; + private cronAbortController: AbortController | null = null; + private cronCompletion: Promise | null = null; + // Modular components private readonly historyReplayer: HistoryReplayer; private readonly toolCallEmitter: ToolCallEmitter; @@ -155,12 +161,37 @@ export class Session implements SessionContext { } async cancelPendingPrompt(): Promise { - if (!this.pendingPrompt) { + const hadPrompt = !!this.pendingPrompt; + const hadCron = !!this.cronAbortController; + + if (!hadPrompt && !hadCron) { throw new Error('Not currently generating'); } - this.pendingPrompt.abort(); - this.pendingPrompt = null; + if (this.pendingPrompt) { + this.pendingPrompt.abort(); + this.pendingPrompt = null; + } + + // Cancel any in-progress cron execution + if (this.cronAbortController) { + this.cronAbortController.abort(); + this.cronAbortController = null; + this.cronQueue = []; + this.cronProcessing = false; + } + + // Stop scheduler and emit exit summary + const scheduler = this.config.isCronEnabled() + ? this.config.getCronScheduler() + : null; + if (scheduler) { + const summary = scheduler.getExitSummary(); + scheduler.stop(); + if (summary) { + await this.messageEmitter.emitAgentMessage(summary); + } + } } async prompt(params: PromptRequest): Promise { @@ -170,6 +201,22 @@ export class Session implements SessionContext { const pendingSend = new AbortController(); this.pendingPrompt = pendingSend; + // Abort any in-progress cron execution (user prompt takes priority) + if (this.cronAbortController) { + this.cronAbortController.abort(); + this.cronAbortController = null; + this.cronQueue = []; + this.cronProcessing = false; + } + if (this.cronCompletion) { + try { + await this.cronCompletion; + } catch { + // Expected: cron was aborted + } + this.cronCompletion = null; + } + // Wait for the previous prompt to finish so chat history is consistent. if (this.pendingPromptCompletion) { try { @@ -191,7 +238,9 @@ export class Session implements SessionContext { }); try { - return await this.#executePrompt(params, pendingSend); + const result = await this.#executePrompt(params, pendingSend); + this.#startCronSchedulerIfNeeded(); + return result; } finally { resolveCompletion(); } @@ -376,6 +425,166 @@ export class Session implements SessionContext { await this.client.sessionUpdate(params); } + /** + * Starts the cron scheduler if cron is enabled and jobs exist. + * The scheduler runs in the background, pushing fired prompts into + * `cronQueue` and triggering `#drainCronQueue`. + */ + #startCronSchedulerIfNeeded(): void { + if (!this.config.isCronEnabled()) return; + const scheduler = this.config.getCronScheduler(); + if (scheduler.size === 0) return; + + scheduler.start((job: { prompt: string }) => { + this.cronQueue.push(job.prompt); + void this.#drainCronQueue(); + }); + } + + /** + * Processes queued cron prompts one at a time. Uses `cronProcessing` + * as a mutex to prevent concurrent access to the chat. + */ + async #drainCronQueue(): Promise { + if (this.cronProcessing) return; + this.cronProcessing = true; + + let resolveCompletion!: () => void; + this.cronCompletion = new Promise((resolve) => { + resolveCompletion = resolve; + }); + + try { + while (this.cronQueue.length > 0) { + const prompt = this.cronQueue.shift()!; + await this.#executeCronPrompt(prompt); + } + } finally { + this.cronProcessing = false; + resolveCompletion(); + this.cronCompletion = null; + + // Stop scheduler if all jobs were deleted during execution + if (this.config.isCronEnabled()) { + const scheduler = this.config.getCronScheduler(); + if (scheduler.size === 0) { + scheduler.stop(); + } + } + } + } + + /** + * Executes a single cron-fired prompt: echoes it as a user message with + * `_meta.source='cron'`, streams the model response, and handles tool calls. + */ + async #executeCronPrompt(prompt: string): Promise { + return Storage.runWithRuntimeBaseDir( + this.runtimeBaseDir, + this.config.getWorkingDir(), + async () => { + const ac = new AbortController(); + this.cronAbortController = ac; + const promptId = + this.config.getSessionId() + '########cron' + Date.now(); + + try { + // Echo the cron prompt as a user message so the client sees it + await this.sendUpdate({ + sessionUpdate: 'user_message_chunk', + content: { type: 'text', text: prompt }, + _meta: { source: 'cron' }, + }); + + let nextMessage: Content | null = { + role: 'user', + parts: [{ text: prompt }], + }; + + while (nextMessage !== null) { + if (ac.signal.aborted) return; + + const functionCalls: FunctionCall[] = []; + let usageMetadata: GenerateContentResponseUsageMetadata | null = + null; + const streamStartTime = Date.now(); + + const responseStream = await this.chat.sendMessageStream( + this.config.getModel(), + { + message: nextMessage.parts ?? [], + config: { abortSignal: ac.signal }, + }, + promptId, + ); + nextMessage = null; + + for await (const resp of responseStream) { + if (ac.signal.aborted) return; + + if ( + resp.type === StreamEventType.CHUNK && + resp.value.candidates && + resp.value.candidates.length > 0 + ) { + const candidate = resp.value.candidates[0]; + for (const part of candidate.content?.parts ?? []) { + if (!part.text) continue; + this.messageEmitter.emitMessage( + part.text, + 'assistant', + part.thought, + ); + } + } + + if ( + resp.type === StreamEventType.CHUNK && + resp.value.usageMetadata + ) { + usageMetadata = resp.value.usageMetadata; + } + + if ( + resp.type === StreamEventType.CHUNK && + resp.value.functionCalls + ) { + functionCalls.push(...resp.value.functionCalls); + } + } + + if (usageMetadata) { + const durationMs = Date.now() - streamStartTime; + await this.messageEmitter.emitUsageMetadata( + usageMetadata, + '', + durationMs, + ); + } + + if (functionCalls.length > 0) { + const toolResponseParts: Part[] = []; + for (const fc of functionCalls) { + const response = await this.runTool(ac.signal, promptId, fc); + toolResponseParts.push(...response); + } + nextMessage = { role: 'user', parts: toolResponseParts }; + } + } + } catch (error) { + if (ac.signal.aborted) return; + debugLogger.error('Error processing cron prompt:', error); + const msg = error instanceof Error ? error.message : String(error); + await this.messageEmitter.emitAgentMessage(`[cron error] ${msg}`); + } finally { + if (this.cronAbortController === ac) { + this.cronAbortController = null; + } + } + }, + ); + } + async sendAvailableCommandsUpdate(): Promise { const abortController = new AbortController(); try {