Merge pull request #2731 from QwenLM/feat/in-session-cron-loops

feat(cron): add in-session loop scheduling with cron tools
This commit is contained in:
tanzhenxin 2026-04-01 16:18:46 +08:00 committed by GitHub
commit 76d64c9464
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
60 changed files with 3110 additions and 41 deletions

View file

@ -75,6 +75,7 @@ describe('Session', () => {
getTargetDir: vi.fn().mockReturnValue(process.cwd()),
getDebugMode: vi.fn().mockReturnValue(false),
getAuthType: vi.fn().mockImplementation(() => currentAuthType),
isCronEnabled: vi.fn().mockReturnValue(false),
} as unknown as Config;
mockClient = {

View file

@ -112,6 +112,12 @@ export class Session implements SessionContext {
private turn: number = 0;
private readonly runtimeBaseDir: string;
// Cron scheduling state
private cronQueue: string[] = [];
private cronProcessing = false;
private cronAbortController: AbortController | null = null;
private cronCompletion: Promise<void> | null = null;
// Modular components
private readonly historyReplayer: HistoryReplayer;
private readonly toolCallEmitter: ToolCallEmitter;
@ -155,12 +161,37 @@ export class Session implements SessionContext {
}
async cancelPendingPrompt(): Promise<void> {
if (!this.pendingPrompt) {
const hadPrompt = !!this.pendingPrompt;
const hadCron = !!this.cronAbortController;
if (!hadPrompt && !hadCron) {
throw new Error('Not currently generating');
}
this.pendingPrompt.abort();
this.pendingPrompt = null;
if (this.pendingPrompt) {
this.pendingPrompt.abort();
this.pendingPrompt = null;
}
// Cancel any in-progress cron execution
if (this.cronAbortController) {
this.cronAbortController.abort();
this.cronAbortController = null;
this.cronQueue = [];
this.cronProcessing = false;
}
// Stop scheduler and emit exit summary
const scheduler = this.config.isCronEnabled()
? this.config.getCronScheduler()
: null;
if (scheduler) {
const summary = scheduler.getExitSummary();
scheduler.stop();
if (summary) {
await this.messageEmitter.emitAgentMessage(summary);
}
}
}
async prompt(params: PromptRequest): Promise<PromptResponse> {
@ -170,6 +201,22 @@ export class Session implements SessionContext {
const pendingSend = new AbortController();
this.pendingPrompt = pendingSend;
// Abort any in-progress cron execution (user prompt takes priority)
if (this.cronAbortController) {
this.cronAbortController.abort();
this.cronAbortController = null;
this.cronQueue = [];
this.cronProcessing = false;
}
if (this.cronCompletion) {
try {
await this.cronCompletion;
} catch {
// Expected: cron was aborted
}
this.cronCompletion = null;
}
// Wait for the previous prompt to finish so chat history is consistent.
if (this.pendingPromptCompletion) {
try {
@ -191,7 +238,12 @@ export class Session implements SessionContext {
});
try {
return await this.#executePrompt(params, pendingSend);
const result = await this.#executePrompt(params, pendingSend);
this.pendingPrompt = null;
this.#startCronSchedulerIfNeeded();
// Drain any cron prompts that queued while the prompt was active
void this.#drainCronQueue();
return result;
} finally {
resolveCompletion();
}
@ -376,6 +428,169 @@ export class Session implements SessionContext {
await this.client.sessionUpdate(params);
}
/**
* Starts the cron scheduler if cron is enabled and jobs exist.
* The scheduler runs in the background, pushing fired prompts into
* `cronQueue` and triggering `#drainCronQueue`.
*/
#startCronSchedulerIfNeeded(): void {
if (!this.config.isCronEnabled()) return;
const scheduler = this.config.getCronScheduler();
if (scheduler.size === 0) return;
scheduler.start((job: { prompt: string }) => {
this.cronQueue.push(job.prompt);
void this.#drainCronQueue();
});
}
/**
* Processes queued cron prompts one at a time. Uses `cronProcessing`
* as a mutex to prevent concurrent access to the chat.
*/
async #drainCronQueue(): Promise<void> {
if (this.cronProcessing) return;
// Don't process cron while a user prompt is active — the queue will be
// drained after the prompt completes (see end of prompt()).
if (this.pendingPrompt) return;
this.cronProcessing = true;
let resolveCompletion!: () => void;
this.cronCompletion = new Promise<void>((resolve) => {
resolveCompletion = resolve;
});
try {
while (this.cronQueue.length > 0) {
const prompt = this.cronQueue.shift()!;
await this.#executeCronPrompt(prompt);
}
} finally {
this.cronProcessing = false;
resolveCompletion();
this.cronCompletion = null;
// Stop scheduler if all jobs were deleted during execution
if (this.config.isCronEnabled()) {
const scheduler = this.config.getCronScheduler();
if (scheduler.size === 0) {
scheduler.stop();
}
}
}
}
/**
* Executes a single cron-fired prompt: echoes it as a user message with
* `_meta.source='cron'`, streams the model response, and handles tool calls.
*/
async #executeCronPrompt(prompt: string): Promise<void> {
return Storage.runWithRuntimeBaseDir(
this.runtimeBaseDir,
this.config.getWorkingDir(),
async () => {
const ac = new AbortController();
this.cronAbortController = ac;
const promptId =
this.config.getSessionId() + '########cron' + Date.now();
try {
// Echo the cron prompt as a user message so the client sees it
await this.sendUpdate({
sessionUpdate: 'user_message_chunk',
content: { type: 'text', text: prompt },
_meta: { source: 'cron' },
});
let nextMessage: Content | null = {
role: 'user',
parts: [{ text: prompt }],
};
while (nextMessage !== null) {
if (ac.signal.aborted) return;
const functionCalls: FunctionCall[] = [];
let usageMetadata: GenerateContentResponseUsageMetadata | null =
null;
const streamStartTime = Date.now();
const responseStream = await this.chat.sendMessageStream(
this.config.getModel(),
{
message: nextMessage.parts ?? [],
config: { abortSignal: ac.signal },
},
promptId,
);
nextMessage = null;
for await (const resp of responseStream) {
if (ac.signal.aborted) return;
if (
resp.type === StreamEventType.CHUNK &&
resp.value.candidates &&
resp.value.candidates.length > 0
) {
const candidate = resp.value.candidates[0];
for (const part of candidate.content?.parts ?? []) {
if (!part.text) continue;
this.messageEmitter.emitMessage(
part.text,
'assistant',
part.thought,
);
}
}
if (
resp.type === StreamEventType.CHUNK &&
resp.value.usageMetadata
) {
usageMetadata = resp.value.usageMetadata;
}
if (
resp.type === StreamEventType.CHUNK &&
resp.value.functionCalls
) {
functionCalls.push(...resp.value.functionCalls);
}
}
if (usageMetadata) {
const durationMs = Date.now() - streamStartTime;
await this.messageEmitter.emitUsageMetadata(
usageMetadata,
'',
durationMs,
);
}
if (functionCalls.length > 0) {
const toolResponseParts: Part[] = [];
for (const fc of functionCalls) {
const response = await this.runTool(ac.signal, promptId, fc);
toolResponseParts.push(...response);
}
nextMessage = { role: 'user', parts: toolResponseParts };
}
}
} catch (error) {
if (ac.signal.aborted) return;
debugLogger.error('Error processing cron prompt:', error);
const msg = error instanceof Error ? error.message : String(error);
await this.messageEmitter.emitAgentMessage(`[cron error] ${msg}`);
} finally {
if (this.cronAbortController === ac) {
this.cronAbortController = null;
}
}
},
);
}
async sendAvailableCommandsUpdate(): Promise<void> {
const abortController = new AbortController();
try {

View file

@ -1082,6 +1082,7 @@ export async function loadCliConfig(
maxSessionTurns:
argv.maxSessionTurns ?? settings.model?.maxSessionTurns ?? -1,
experimentalZedIntegration: argv.acp || argv.experimentalAcp || false,
cronEnabled: settings.experimental?.cron ?? false,
listExtensions: argv.listExtensions || false,
overrideExtensions: overrideExtensions || argv.extensions,
noBrowser: !!process.env['NO_BROWSER'],

View file

@ -1571,9 +1571,20 @@ const SETTINGS_SCHEMA = {
category: 'Experimental',
requiresRestart: true,
default: {},
description: 'Setting to enable experimental features',
description: 'Settings to enable experimental features.',
showInDialog: false,
properties: {},
properties: {
cron: {
type: 'boolean',
label: 'Enable Cron/Loop Tools',
category: 'Experimental',
requiresRestart: true,
default: false,
description:
'Enable in-session cron/loop tools (experimental). When enabled, the model can create recurring prompts using cron_create, cron_list, and cron_delete tools. Can also be enabled via QWEN_CODE_ENABLE_CRON=1 environment variable.',
showInDialog: true,
},
},
},
} as const satisfies SettingsSchema;

View file

@ -144,6 +144,8 @@ describe('runNonInteractive', () => {
}),
getExperimentalZedIntegration: vi.fn().mockReturnValue(false),
isInteractive: vi.fn().mockReturnValue(false),
isCronEnabled: vi.fn().mockReturnValue(false),
getCronScheduler: vi.fn().mockReturnValue(null),
} as unknown as Config;
mockSettings = {

View file

@ -371,6 +371,138 @@ export async function runNonInteractive(
}
currentMessages = [{ role: 'user', parts: toolResponseParts }];
} else {
// No more tool calls — check if cron jobs are keeping us alive
const scheduler = !config.isCronEnabled()
? null
: config.getCronScheduler();
if (scheduler && scheduler.size > 0) {
// Start the scheduler and wait for all jobs to complete or be deleted.
// Each fired prompt is processed as a new turn through the same loop.
await new Promise<void>((resolve) => {
const cronQueue: string[] = [];
let processing = false;
const checkDone = () => {
if (scheduler.size === 0 && !processing) {
scheduler.stop();
resolve();
}
};
const drainQueue = async () => {
if (processing) return;
processing = true;
try {
while (cronQueue.length > 0) {
const cronPrompt = cronQueue.shift()!;
turnCount++;
let cronMessages: Content[] = [
{ role: 'user', parts: [{ text: cronPrompt }] },
];
let cronIsFirstTurn = true;
while (true) {
const cronToolCallRequests: ToolCallRequestInfo[] = [];
const cronApiStartTime = Date.now();
const cronStream = geminiClient.sendMessageStream(
cronMessages[0]?.parts || [],
abortController.signal,
prompt_id,
{
type: cronIsFirstTurn
? SendMessageType.Cron
: SendMessageType.ToolResult,
},
);
cronIsFirstTurn = false;
adapter.startAssistantMessage();
for await (const event of cronStream) {
if (abortController.signal.aborted) {
const summary = scheduler.getExitSummary();
scheduler.stop();
if (summary) {
process.stderr.write(summary + '\n');
}
resolve();
return;
}
adapter.processEvent(event);
if (event.type === GeminiEventType.ToolCallRequest) {
cronToolCallRequests.push(event.value);
}
}
adapter.finalizeAssistantMessage();
totalApiDurationMs += Date.now() - cronApiStartTime;
if (cronToolCallRequests.length > 0) {
const cronToolResponseParts: Part[] = [];
for (const requestInfo of cronToolCallRequests) {
const isAgentTool = requestInfo.name === 'agent';
const { handler: outputUpdateHandler } = isAgentTool
? createAgentToolProgressHandler(
config,
requestInfo.callId,
adapter,
)
: createToolProgressHandler(requestInfo, adapter);
const toolResponse = await executeToolCall(
config,
requestInfo,
abortController.signal,
{ outputUpdateHandler },
);
if (toolResponse.error) {
handleToolError(
requestInfo.name,
toolResponse.error,
config,
toolResponse.errorType || 'TOOL_EXECUTION_ERROR',
typeof toolResponse.resultDisplay === 'string'
? toolResponse.resultDisplay
: undefined,
);
}
adapter.emitToolResult(requestInfo, toolResponse);
if (toolResponse.responseParts) {
cronToolResponseParts.push(
...toolResponse.responseParts,
);
}
}
cronMessages = [
{ role: 'user', parts: cronToolResponseParts },
];
} else {
break;
}
}
}
} catch (error) {
debugLogger.error('Error processing cron prompt:', error);
} finally {
processing = false;
checkDone();
}
};
scheduler.start((job: { prompt: string }) => {
cronQueue.push(job.prompt);
void drainQueue();
});
// Also check immediately in case jobs were already deleted
checkDone();
});
}
const metrics = uiTelemetryService.getMetrics();
const usage = computeUsageFromMetrics(metrics);
// Get stats for JSON format output

View file

@ -204,6 +204,8 @@ describe('useGeminiStream', () => {
.mockReturnValue(contentGeneratorConfig),
getMaxSessionTurns: vi.fn(() => 50),
getArenaAgentClient: vi.fn(() => null),
isCronEnabled: vi.fn(() => false),
getCronScheduler: vi.fn(() => null),
} as unknown as Config;
mockOnDebugMessage = vi.fn();
mockHandleSlashCommand = vi.fn().mockResolvedValue(false);

View file

@ -1236,7 +1236,10 @@ export const useGeminiStream = (
}
// Check image format support for non-continuations
if (submitType === SendMessageType.UserQuery) {
if (
submitType === SendMessageType.UserQuery ||
submitType === SendMessageType.Cron
) {
const formatCheck = checkImageFormatsSupport(queryToSend);
if (formatCheck.hasUnsupportedFormats) {
addItem(
@ -1253,7 +1256,10 @@ export const useGeminiStream = (
lastPromptRef.current = finalQueryToSend;
lastPromptErroredRef.current = false;
if (submitType === SendMessageType.UserQuery) {
if (
submitType === SendMessageType.UserQuery ||
submitType === SendMessageType.Cron
) {
// trigger new prompt event for session stats in CLI
startNewPrompt();
@ -1698,6 +1704,38 @@ export const useGeminiStream = (
storage,
]);
// ─── Cron scheduler integration ─────────────────────────
const cronQueueRef = useRef<string[]>([]);
const [cronTrigger, setCronTrigger] = useState(0);
// Start the scheduler on mount, stop on unmount
useEffect(() => {
if (!config.isCronEnabled()) return;
const scheduler = config.getCronScheduler();
scheduler.start((job: { prompt: string }) => {
cronQueueRef.current.push(job.prompt);
setCronTrigger((n) => n + 1);
});
return () => {
const summary = scheduler.getExitSummary();
scheduler.stop();
if (summary) {
process.stderr.write(summary + '\n');
}
};
}, [config]);
// When idle, drain the cron queue one prompt at a time
useEffect(() => {
if (
streamingState === StreamingState.Idle &&
cronQueueRef.current.length > 0
) {
const prompt = cronQueueRef.current.shift()!;
submitQuery(prompt, SendMessageType.Cron);
}
}, [streamingState, submitQuery, cronTrigger]);
return {
streamingState,
submitQuery,