mirror of
https://github.com/QwenLM/qwen-code.git
synced 2026-05-04 22:51:08 +00:00
feat(cron): add in-session loop scheduling with cron tools
Add session-scoped recurring jobs that fire while you work. Jobs live inside the current Qwen Code process and are gone when you exit. New tools: - cron_create: schedule a prompt to run on a cron expression - cron_list: list active cron jobs - cron_delete: cancel a scheduled job Components: - CronScheduler service for in-process job management - cronParser utility for 5-field cron expressions - /loop skill for natural language scheduling - Non-interactive mode integration to keep process alive Constraints: - Max 50 jobs per session - 3-day expiry for recurring jobs - Jitter to prevent thundering herd - No catch-up for missed fire times Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
parent
070ec5b43e
commit
aa4939111c
17 changed files with 1395 additions and 3 deletions
|
|
@ -371,6 +371,124 @@ export async function runNonInteractive(
|
|||
}
|
||||
currentMessages = [{ role: 'user', parts: toolResponseParts }];
|
||||
} else {
|
||||
// No more tool calls — check if cron jobs are keeping us alive
|
||||
const scheduler = config.isCronDisabled()
|
||||
? null
|
||||
: config.getCronScheduler();
|
||||
if (scheduler && scheduler.size > 0) {
|
||||
// Start the scheduler and wait for all jobs to complete or be deleted.
|
||||
// Each fired prompt is processed as a new turn through the same loop.
|
||||
await new Promise<void>((resolve) => {
|
||||
const checkDone = () => {
|
||||
if (scheduler.size === 0) {
|
||||
scheduler.stop();
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
|
||||
scheduler.start((job: { prompt: string }) => {
|
||||
// Process fired prompt as a new turn
|
||||
(async () => {
|
||||
try {
|
||||
turnCount++;
|
||||
let cronMessages: Content[] = [
|
||||
{ role: 'user', parts: [{ text: job.prompt }] },
|
||||
];
|
||||
let cronIsFirstTurn = true;
|
||||
|
||||
while (true) {
|
||||
const cronToolCallRequests: ToolCallRequestInfo[] = [];
|
||||
const cronApiStartTime = Date.now();
|
||||
const cronStream = geminiClient.sendMessageStream(
|
||||
cronMessages[0]?.parts || [],
|
||||
abortController.signal,
|
||||
prompt_id,
|
||||
{
|
||||
type: cronIsFirstTurn
|
||||
? SendMessageType.UserQuery
|
||||
: SendMessageType.ToolResult,
|
||||
},
|
||||
);
|
||||
cronIsFirstTurn = false;
|
||||
|
||||
adapter.startAssistantMessage();
|
||||
|
||||
for await (const event of cronStream) {
|
||||
if (abortController.signal.aborted) {
|
||||
scheduler.stop();
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
adapter.processEvent(event);
|
||||
if (event.type === GeminiEventType.ToolCallRequest) {
|
||||
cronToolCallRequests.push(event.value);
|
||||
}
|
||||
}
|
||||
|
||||
adapter.finalizeAssistantMessage();
|
||||
totalApiDurationMs += Date.now() - cronApiStartTime;
|
||||
|
||||
if (cronToolCallRequests.length > 0) {
|
||||
const cronToolResponseParts: Part[] = [];
|
||||
|
||||
for (const requestInfo of cronToolCallRequests) {
|
||||
const isAgentTool = requestInfo.name === 'agent';
|
||||
const { handler: outputUpdateHandler } = isAgentTool
|
||||
? createAgentToolProgressHandler(
|
||||
config,
|
||||
requestInfo.callId,
|
||||
adapter,
|
||||
)
|
||||
: createToolProgressHandler(requestInfo, adapter);
|
||||
|
||||
const toolResponse = await executeToolCall(
|
||||
config,
|
||||
requestInfo,
|
||||
abortController.signal,
|
||||
{ outputUpdateHandler },
|
||||
);
|
||||
|
||||
if (toolResponse.error) {
|
||||
handleToolError(
|
||||
requestInfo.name,
|
||||
toolResponse.error,
|
||||
config,
|
||||
toolResponse.errorType || 'TOOL_EXECUTION_ERROR',
|
||||
typeof toolResponse.resultDisplay === 'string'
|
||||
? toolResponse.resultDisplay
|
||||
: undefined,
|
||||
);
|
||||
}
|
||||
|
||||
adapter.emitToolResult(requestInfo, toolResponse);
|
||||
|
||||
if (toolResponse.responseParts) {
|
||||
cronToolResponseParts.push(
|
||||
...toolResponse.responseParts,
|
||||
);
|
||||
}
|
||||
}
|
||||
cronMessages = [
|
||||
{ role: 'user', parts: cronToolResponseParts },
|
||||
];
|
||||
} else {
|
||||
// Cron turn done — check if we should exit
|
||||
checkDone();
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
debugLogger.error('Error processing cron prompt:', error);
|
||||
checkDone();
|
||||
}
|
||||
})();
|
||||
});
|
||||
|
||||
// Also check immediately in case jobs were already deleted
|
||||
checkDone();
|
||||
});
|
||||
}
|
||||
|
||||
const metrics = uiTelemetryService.getMetrics();
|
||||
const usage = computeUsageFromMetrics(metrics);
|
||||
// Get stats for JSON format output
|
||||
|
|
|
|||
|
|
@ -1638,6 +1638,32 @@ export const useGeminiStream = (
|
|||
storage,
|
||||
]);
|
||||
|
||||
// ─── Cron scheduler integration ─────────────────────────
|
||||
const cronQueueRef = useRef<string[]>([]);
|
||||
|
||||
// Start the scheduler on mount, stop on unmount
|
||||
useEffect(() => {
|
||||
if (config.isCronDisabled()) return;
|
||||
const scheduler = config.getCronScheduler();
|
||||
scheduler.start((job: { prompt: string }) => {
|
||||
cronQueueRef.current.push(job.prompt);
|
||||
});
|
||||
return () => {
|
||||
scheduler.stop();
|
||||
};
|
||||
}, [config]);
|
||||
|
||||
// When idle, drain the cron queue one prompt at a time
|
||||
useEffect(() => {
|
||||
if (
|
||||
streamingState === StreamingState.Idle &&
|
||||
cronQueueRef.current.length > 0
|
||||
) {
|
||||
const prompt = cronQueueRef.current.shift()!;
|
||||
submitQuery(prompt, SendMessageType.UserQuery);
|
||||
}
|
||||
}, [streamingState, submitQuery]);
|
||||
|
||||
return {
|
||||
streamingState,
|
||||
submitQuery,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue