feat(cron): integrate cron scheduling into ACP session lifecycle

- Add cron queue and scheduler state management to Session class
- Handle cron cancellation on user prompt and cancelPendingPrompt
- Start cron scheduler after prompt execution completes
- Drain cron queue sequentially to prevent concurrent chat access
- Execute cron prompts with proper message echoing and tool handling
- Add integration test for cron firing and sessionUpdate streaming

This enables cron jobs created during an ACP session to fire in the
background and stream results back to the client via sessionUpdate
notifications, even after the originating prompt has returned.

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
tanzhenxin 2026-03-30 18:06:09 +08:00
parent aa454a5a72
commit 9dd22eb152
2 changed files with 593 additions and 4 deletions

View file

@ -112,6 +112,12 @@ export class Session implements SessionContext {
private turn: number = 0;
private readonly runtimeBaseDir: string;
// Cron scheduling state
private cronQueue: string[] = [];
private cronProcessing = false;
private cronAbortController: AbortController | null = null;
private cronCompletion: Promise<void> | null = null;
// Modular components
private readonly historyReplayer: HistoryReplayer;
private readonly toolCallEmitter: ToolCallEmitter;
@ -155,12 +161,37 @@ export class Session implements SessionContext {
}
async cancelPendingPrompt(): Promise<void> {
if (!this.pendingPrompt) {
const hadPrompt = !!this.pendingPrompt;
const hadCron = !!this.cronAbortController;
if (!hadPrompt && !hadCron) {
throw new Error('Not currently generating');
}
this.pendingPrompt.abort();
this.pendingPrompt = null;
if (this.pendingPrompt) {
this.pendingPrompt.abort();
this.pendingPrompt = null;
}
// Cancel any in-progress cron execution
if (this.cronAbortController) {
this.cronAbortController.abort();
this.cronAbortController = null;
this.cronQueue = [];
this.cronProcessing = false;
}
// Stop scheduler and emit exit summary
const scheduler = this.config.isCronEnabled()
? this.config.getCronScheduler()
: null;
if (scheduler) {
const summary = scheduler.getExitSummary();
scheduler.stop();
if (summary) {
await this.messageEmitter.emitAgentMessage(summary);
}
}
}
async prompt(params: PromptRequest): Promise<PromptResponse> {
@ -170,6 +201,22 @@ export class Session implements SessionContext {
const pendingSend = new AbortController();
this.pendingPrompt = pendingSend;
// Abort any in-progress cron execution (user prompt takes priority)
if (this.cronAbortController) {
this.cronAbortController.abort();
this.cronAbortController = null;
this.cronQueue = [];
this.cronProcessing = false;
}
if (this.cronCompletion) {
try {
await this.cronCompletion;
} catch {
// Expected: cron was aborted
}
this.cronCompletion = null;
}
// Wait for the previous prompt to finish so chat history is consistent.
if (this.pendingPromptCompletion) {
try {
@ -191,7 +238,9 @@ export class Session implements SessionContext {
});
try {
return await this.#executePrompt(params, pendingSend);
const result = await this.#executePrompt(params, pendingSend);
this.#startCronSchedulerIfNeeded();
return result;
} finally {
resolveCompletion();
}
@ -376,6 +425,166 @@ export class Session implements SessionContext {
await this.client.sessionUpdate(params);
}
/**
* Starts the cron scheduler if cron is enabled and jobs exist.
* The scheduler runs in the background, pushing fired prompts into
* `cronQueue` and triggering `#drainCronQueue`.
*/
#startCronSchedulerIfNeeded(): void {
if (!this.config.isCronEnabled()) return;
const scheduler = this.config.getCronScheduler();
if (scheduler.size === 0) return;
scheduler.start((job: { prompt: string }) => {
this.cronQueue.push(job.prompt);
void this.#drainCronQueue();
});
}
/**
* Processes queued cron prompts one at a time. Uses `cronProcessing`
* as a mutex to prevent concurrent access to the chat.
*/
async #drainCronQueue(): Promise<void> {
if (this.cronProcessing) return;
this.cronProcessing = true;
let resolveCompletion!: () => void;
this.cronCompletion = new Promise<void>((resolve) => {
resolveCompletion = resolve;
});
try {
while (this.cronQueue.length > 0) {
const prompt = this.cronQueue.shift()!;
await this.#executeCronPrompt(prompt);
}
} finally {
this.cronProcessing = false;
resolveCompletion();
this.cronCompletion = null;
// Stop scheduler if all jobs were deleted during execution
if (this.config.isCronEnabled()) {
const scheduler = this.config.getCronScheduler();
if (scheduler.size === 0) {
scheduler.stop();
}
}
}
}
/**
* Executes a single cron-fired prompt: echoes it as a user message with
* `_meta.source='cron'`, streams the model response, and handles tool calls.
*/
async #executeCronPrompt(prompt: string): Promise<void> {
return Storage.runWithRuntimeBaseDir(
this.runtimeBaseDir,
this.config.getWorkingDir(),
async () => {
const ac = new AbortController();
this.cronAbortController = ac;
const promptId =
this.config.getSessionId() + '########cron' + Date.now();
try {
// Echo the cron prompt as a user message so the client sees it
await this.sendUpdate({
sessionUpdate: 'user_message_chunk',
content: { type: 'text', text: prompt },
_meta: { source: 'cron' },
});
let nextMessage: Content | null = {
role: 'user',
parts: [{ text: prompt }],
};
while (nextMessage !== null) {
if (ac.signal.aborted) return;
const functionCalls: FunctionCall[] = [];
let usageMetadata: GenerateContentResponseUsageMetadata | null =
null;
const streamStartTime = Date.now();
const responseStream = await this.chat.sendMessageStream(
this.config.getModel(),
{
message: nextMessage.parts ?? [],
config: { abortSignal: ac.signal },
},
promptId,
);
nextMessage = null;
for await (const resp of responseStream) {
if (ac.signal.aborted) return;
if (
resp.type === StreamEventType.CHUNK &&
resp.value.candidates &&
resp.value.candidates.length > 0
) {
const candidate = resp.value.candidates[0];
for (const part of candidate.content?.parts ?? []) {
if (!part.text) continue;
this.messageEmitter.emitMessage(
part.text,
'assistant',
part.thought,
);
}
}
if (
resp.type === StreamEventType.CHUNK &&
resp.value.usageMetadata
) {
usageMetadata = resp.value.usageMetadata;
}
if (
resp.type === StreamEventType.CHUNK &&
resp.value.functionCalls
) {
functionCalls.push(...resp.value.functionCalls);
}
}
if (usageMetadata) {
const durationMs = Date.now() - streamStartTime;
await this.messageEmitter.emitUsageMetadata(
usageMetadata,
'',
durationMs,
);
}
if (functionCalls.length > 0) {
const toolResponseParts: Part[] = [];
for (const fc of functionCalls) {
const response = await this.runTool(ac.signal, promptId, fc);
toolResponseParts.push(...response);
}
nextMessage = { role: 'user', parts: toolResponseParts };
}
}
} catch (error) {
if (ac.signal.aborted) return;
debugLogger.error('Error processing cron prompt:', error);
const msg = error instanceof Error ? error.message : String(error);
await this.messageEmitter.emitAgentMessage(`[cron error] ${msg}`);
} finally {
if (this.cronAbortController === ac) {
this.cronAbortController = null;
}
}
},
);
}
async sendAvailableCommandsUpdate(): Promise<void> {
const abortController = new AbortController();
try {