qwen-code/packages/cli/src/nonInteractiveCli.ts
tanzhenxin 0da1182b74
feat(cli): headless support and SDK task events for background agents (#3379)
* feat(cli): unify notification queue for cron and background agents

Migrate cron from its own queue (cronQueueRef / cronQueue) to the shared
notification queue used by background agents. Both producers now push the
same item shape { displayText, modelText, sendMessageType } and a single
drain effect / helper processes them in FIFO order.

Cron fires render as HistoryItemNotification (● prefix) instead of
HistoryItemUser (> prefix), with a "Cron: <prompt>" display label.
Records use subtype 'cron' for clean resume and analytics separation.

Lift the non-interactive rejection for background agents. Register a
notification callback in nonInteractiveCli.ts with a terminal hold-back
phase (100ms poll) that keeps the process alive until all background
agents complete and their notifications are processed.

* feat(cli): emit SDK task events for background subagents

Emit `task_started` when a background agent registers and
`task_notification` when it completes, fails, or is cancelled, so
headless/SDK consumers can track lifecycle without parsing display
text. Model-facing text is now structured XML with status, summary,
truncated result, and usage stats. Completion stats (tokens, tool
uses, duration) are captured from the subagent and included in both
the SDK payload and the model XML.

* fix: address codex review issues for background subagents

- Background subagents now inherit the resolved approval mode from
  agentConfig instead of the raw session config, so a subagent with
  `approvalMode: auto-edit` (or execution in a trusted folder) keeps
  that override when it runs asynchronously.
- Non-interactive cron drains are single-flight: concurrent cron fires
  now await the same in-flight drain, and the cron-done check gates
  on it, preventing the final result from being emitted while a cron
  turn is still streaming.
- Background forks go through createForkSubagent so they retain the
  parent's rendered system prompt and inherited history instead of
  degrading to a plain FORK_AGENT.

* fix(cli): restore cancellation, approval, and error paths in queued drain

- Hold-back loop now reacts to SIGINT/SIGTERM: when the main abort
  signal fires it calls registry.abortAll() so background agents with
  their own AbortControllers stop promptly instead of pinning the
  process open.
- Queued-turn tool execution forwards the stream-json approval update
  callback (onToolCallsUpdate) so permission-gated tools inside a
  background-notification follow-up emit can_use_tool requests.
- Queued-turn stream loop mirrors the main loop's text-mode handling
  of GeminiEventType.Error, writing to stderr and throwing so provider
  errors produce a non-zero exit code instead of silently succeeding.
- Interactive cron prompts go through the normal slash/@-command/shell
  preprocessing again; only Notification messages skip that path.

* fix(cli): skip duplicate user-message item for cron prompts

Cron prompts already render as a `● Cron: …` notification via the queue
drain, so adding them again as a `USER` history item produced a
duplicate `> …` line.

* fix(cli): honor SIGINT/SIGTERM during cron scheduler wait

The non-interactive cron phase awaits a Promise that resolves only when
scheduler.size reaches 0 and no drain is in flight. Recurring cron jobs
never drop the scheduler size to 0 on their own, so the previous abort
handling (added to the hold-back loop) was unreachable — the process
hung indefinitely after SIGINT/SIGTERM. Attach an abort listener inside
the promise so abort stops the scheduler and resolves immediately,
allowing the hold-back loop to run and the process to exit cleanly.

* feat(core): propagate tool-use id through background agent notifications

Plumb the scheduler's callId into AgentToolInvocation via an optional
setCallId hook on the invocation, detected structurally in
buildInvocation. The agent tool forwards it as toolUseId on the
BackgroundTaskRegistry entry so completion notifications can carry a
<tool-use-id> tag and SDK task_started / task_notification events can
emit tool_use_id — letting consumers correlate background completions
back to the original Agent tool-use that spawned them.

* fix(cli): drain single-flight race kept task_notification from emitting

drainLocalQueue wrapped its body in an async IIFE and cleared the
promise reference via finally. When the queue is empty the IIFE has
no awaits, so its finally runs synchronously as part of the RHS of
the assignment `drainPromise = (async () => {...})()` — clearing
drainPromise BEFORE the outer assignment overwrites it with the
resolved promise. The reference then stayed stuck on that fulfilled
promise forever, so later calls short-circuited through
`if (drainPromise) return drainPromise` and never processed
queued notifications.

Symptom: in headless `--output-format json` (and `stream-json`),
task_started emitted but task_notification never did, even after
the background agent completed. The process sat in the hold-back
loop until SIGTERM.

Fix: move the null-clearing out of the async body into an outer
`.finally()` on the returned promise. `.finally()` runs as a
microtask after the current synchronous block, so it clears the
latest drainPromise reference instead of the pre-assignment null.

* fix(cli): append newline to text-mode emitResult so zsh PROMPT_SP doesn't erase the line

Headless text mode wrote `resultMessage.result` without a trailing newline.
In a TTY, zsh themes that use PROMPT_SP (powerlevel10k, agnoster, …) detect
the missing `\n` and emit `\r\033[K` before drawing the next prompt, which
wipes the final line off the screen. Pipe-captured output was unaffected,
so the bug only surfaced for interactive shell users — most visibly in the
background-agent flow where the drain-loop's final assistant message is
the *only* stdout write in text mode.

Append `\n` to both the success (stdout) and error (stderr) writes.

* docs(skill): tighten worked-example blurb in structured-debugging

Mirror the simplified blurb from .claude/skills/structured-debugging/SKILL.md
(knowledge repo). Drops the round-by-round narrative; keeps the contradiction
+ two lessons.

* docs(skill): mirror SKILL.md improvements (reframing failure mode, generalized path, value-logging guidance)

Mirror of knowledge repo commit 38eb28d into the qwen-code .qwen/skills
copy.

* docs(skill): mirror worked example into .qwen/skills/structured-debugging/

Mirrors knowledge/.claude/skills/structured-debugging/examples/
headless-bg-agent-empty-stdout.md so the .qwen copy of the skill links
resolve.

* docs(skill): mirror generalized side-note path guidance

* fix(cli): harden headless cron and background-agent failure paths

Three regressions surfaced by Codex review of feat/background-subagent:

- Cron drain rejections were dropped by a bare `void`, so a failing
  queued turn left the outer Promise unresolved and hung the run. Route
  drain failures through the Promise's reject so they propagate to the
  outer catch.
- The background-agent registry entry was inserted before
  `createForkSubagent()` / `createAgentHeadless()` was awaited. Failed
  init returned an error from the tool call but left a phantom `running`
  entry, and the headless hold-back loop (`registry.getRunning()`) waited
  forever. Register only after init succeeds.
- SIGINT/SIGTERM during the hold-back phase aborted background tasks,
  then fell through to `emitResult({ isError: false })`, so a cancelled
  `qwen -p ...` exited 0 with the prior assistant text. Route through
  `handleCancellationError()` so cancellation exits non-zero, matching
  the main turn loop.

* test(cli): update stdout/stderr assertions for trailing newline

`feadf052f` appended `\n` to text-mode `emitResult` output, but the
nonInteractiveCli tests still asserted the pre-change strings. Update
the 11 affected assertions to expect the trailing newline.

* fix: address review comments on background-agent notifications

Four additional issues from the PR review that the prior regression-fix
commit didn't cover:

- Escape XML metacharacters when interpolating `description`, `result`,
  `error`, `agentId`, `toolUseId`, and `status` into the task-notification
  envelope. Subagent output (which itself may carry untrusted tool output,
  fetched HTML, or another agent's notification) could contain
  `</result>` or `</task-notification>` and forge sibling tags the parent
  model would treat as trusted metadata. Truncate result text *before*
  escaping so the truncation never slices through an entity like `&amp;`.
- Emit the terminal notification from `cancel()` and `abortAll()`. The
  fire-and-forget `complete()`/`fail()` from the subagent task is guarded
  by `status !== 'running'` and was no-op'd after cancellation, so SDK
  consumers saw `task_started` with no matching `task_notification`,
  breaking the contract this PR establishes. Updated two race-guard
  tests that asserted the old behavior.
- Call `adapter.finalizeAssistantMessage()` before the abort-triggered
  early return inside `drainOneItem`'s stream loop. Without it,
  `startAssistantMessage()` had already been called, so stream-json mode
  left `message_start` unpaired.
- Enforce `config.getMaxSessionTurns()` in `drainOneItem` for symmetry
  with the main turn loop. Cron fires and notification replies otherwise
  bypass the budget cap in headless runs.
2026-04-17 14:42:44 +08:00

753 lines
27 KiB
TypeScript

/**
* @license
* Copyright 2025 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/
import type {
BackgroundAgentStatus,
Config,
ToolCallRequestInfo,
} from '@qwen-code/qwen-code-core';
import { isSlashCommand } from './ui/utils/commandUtils.js';
import type { LoadedSettings } from './config/settings.js';
import {
executeToolCall,
shutdownTelemetry,
isTelemetrySdkInitialized,
GeminiEventType,
FatalInputError,
promptIdContext,
OutputFormat,
InputFormat,
uiTelemetryService,
parseAndFormatApiError,
createDebugLogger,
SendMessageType,
} from '@qwen-code/qwen-code-core';
import type { Content, Part, PartListUnion } from '@google/genai';
import type { CLIUserMessage, PermissionMode } from './nonInteractive/types.js';
import type { JsonOutputAdapterInterface } from './nonInteractive/io/BaseJsonOutputAdapter.js';
import { JsonOutputAdapter } from './nonInteractive/io/JsonOutputAdapter.js';
import { StreamJsonOutputAdapter } from './nonInteractive/io/StreamJsonOutputAdapter.js';
import type { ControlService } from './nonInteractive/control/ControlService.js';
import { handleSlashCommand } from './nonInteractiveCliCommands.js';
import { handleAtCommand } from './ui/hooks/atCommandProcessor.js';
import {
handleError,
handleToolError,
handleCancellationError,
handleMaxTurnsExceededError,
} from './utils/errors.js';
const debugLogger = createDebugLogger('NON_INTERACTIVE_CLI');
import {
normalizePartList,
extractPartsFromUserMessage,
buildSystemMessage,
createToolProgressHandler,
createAgentToolProgressHandler,
computeUsageFromMetrics,
} from './utils/nonInteractiveHelpers.js';
/**
* Emits a final message for slash command results.
* Note: systemMessage should already be emitted before calling this function.
*/
async function emitNonInteractiveFinalMessage(params: {
message: string;
isError: boolean;
adapter: JsonOutputAdapterInterface;
config: Config;
startTimeMs: number;
}): Promise<void> {
const { message, isError, adapter, config } = params;
// JSON output mode: emit assistant message and result
// (systemMessage should already be emitted by caller)
adapter.startAssistantMessage();
adapter.processEvent({
type: GeminiEventType.Content,
value: message,
} as unknown as Parameters<JsonOutputAdapterInterface['processEvent']>[0]);
adapter.finalizeAssistantMessage();
const metrics = uiTelemetryService.getMetrics();
const usage = computeUsageFromMetrics(metrics);
const outputFormat = config.getOutputFormat();
const stats =
outputFormat === OutputFormat.JSON
? uiTelemetryService.getMetrics()
: undefined;
adapter.emitResult({
isError,
durationMs: Date.now() - params.startTimeMs,
apiDurationMs: 0,
numTurns: 0,
errorMessage: isError ? message : undefined,
usage,
stats,
summary: message,
});
}
/**
* Provides optional overrides for `runNonInteractive` execution.
*
* @param abortController - Optional abort controller for cancellation.
* @param adapter - Optional JSON output adapter for structured output formats.
* @param userMessage - Optional CLI user message payload for preformatted input.
* @param controlService - Optional control service for future permission handling.
*/
export interface RunNonInteractiveOptions {
abortController?: AbortController;
adapter?: JsonOutputAdapterInterface;
userMessage?: CLIUserMessage;
controlService?: ControlService;
}
/**
* Executes the non-interactive CLI flow for a single request.
*/
export async function runNonInteractive(
config: Config,
settings: LoadedSettings,
input: string,
prompt_id: string,
options: RunNonInteractiveOptions = {},
): Promise<void> {
return promptIdContext.run(prompt_id, async () => {
// Create output adapter based on format
let adapter: JsonOutputAdapterInterface;
const outputFormat = config.getOutputFormat();
if (options.adapter) {
adapter = options.adapter;
} else if (outputFormat === OutputFormat.STREAM_JSON) {
adapter = new StreamJsonOutputAdapter(
config,
config.getIncludePartialMessages(),
);
} else {
adapter = new JsonOutputAdapter(config);
}
// Get readonly values once at the start
const sessionId = config.getSessionId();
const permissionMode = config.getApprovalMode() as PermissionMode;
let turnCount = 0;
let totalApiDurationMs = 0;
const startTime = Date.now();
const stdoutErrorHandler = (err: NodeJS.ErrnoException) => {
if (err.code === 'EPIPE') {
process.stdout.removeListener('error', stdoutErrorHandler);
process.exit(0);
}
};
const geminiClient = config.getGeminiClient();
const abortController = options.abortController ?? new AbortController();
// Setup signal handlers for graceful shutdown
const shutdownHandler = () => {
debugLogger.debug('[runNonInteractive] Shutdown signal received');
abortController.abort();
};
try {
process.stdout.on('error', stdoutErrorHandler);
process.on('SIGINT', shutdownHandler);
process.on('SIGTERM', shutdownHandler);
// Emit systemMessage first (always the first message in JSON mode)
const systemMessage = await buildSystemMessage(
config,
sessionId,
permissionMode,
);
adapter.emitMessage(systemMessage);
let initialPartList: PartListUnion | null = extractPartsFromUserMessage(
options.userMessage,
);
if (!initialPartList) {
let slashHandled = false;
if (isSlashCommand(input)) {
const slashCommandResult = await handleSlashCommand(
input,
abortController,
config,
settings,
);
switch (slashCommandResult.type) {
case 'submit_prompt':
// A slash command can replace the prompt entirely; fall back to @-command processing otherwise.
initialPartList = slashCommandResult.content;
slashHandled = true;
break;
case 'message': {
// systemMessage already emitted above
await emitNonInteractiveFinalMessage({
message: slashCommandResult.content,
isError: slashCommandResult.messageType === 'error',
adapter,
config,
startTimeMs: startTime,
});
return;
}
case 'stream_messages':
throw new FatalInputError(
'Stream messages mode is not supported in non-interactive CLI',
);
case 'unsupported': {
await emitNonInteractiveFinalMessage({
message: slashCommandResult.reason,
isError: true,
adapter,
config,
startTimeMs: startTime,
});
return;
}
case 'no_command':
break;
default: {
const _exhaustive: never = slashCommandResult;
throw new FatalInputError(
`Unhandled slash command result type: ${(_exhaustive as { type: string }).type}`,
);
}
}
}
if (!slashHandled) {
const { processedQuery, shouldProceed } = await handleAtCommand({
query: input,
config,
onDebugMessage: () => {},
messageId: Date.now(),
signal: abortController.signal,
});
if (!shouldProceed || !processedQuery) {
// An error occurred during @include processing (e.g., file not found).
// The error message is already logged by handleAtCommand.
throw new FatalInputError(
'Exiting due to an error processing the @ command.',
);
}
initialPartList = processedQuery as PartListUnion;
}
}
if (!initialPartList) {
initialPartList = [{ text: input }];
}
const initialParts = normalizePartList(initialPartList);
let currentMessages: Content[] = [{ role: 'user', parts: initialParts }];
// ─── Shared notification queue (cron + background agents) ──────
// Register the callback early so background agents launched during
// the main tool-call chain can push completions onto the queue.
interface LocalQueueItem {
displayText: string;
modelText: string;
sendMessageType: SendMessageType;
sdkNotification?: {
task_id: string;
tool_use_id?: string;
status: BackgroundAgentStatus;
usage?: {
total_tokens: number;
tool_uses: number;
duration_ms: number;
};
};
}
const localQueue: LocalQueueItem[] = [];
const registry = config.getBackgroundTaskRegistry();
registry.setNotificationCallback((displayText, modelText, meta) => {
localQueue.push({
displayText,
modelText,
sendMessageType: SendMessageType.Notification,
sdkNotification: {
task_id: meta.agentId,
tool_use_id: meta.toolUseId,
status: meta.status,
usage: meta.stats
? {
total_tokens: meta.stats.totalTokens,
tool_uses: meta.stats.toolUses,
duration_ms: meta.stats.durationMs,
}
: undefined,
},
});
});
registry.setRegisterCallback((entry) => {
adapter.emitSystemMessage('task_started', {
task_id: entry.agentId,
tool_use_id: entry.toolUseId,
description: entry.description,
subagent_type: entry.subagentType,
});
});
let isFirstTurn = true;
let modelOverride: string | undefined;
while (true) {
turnCount++;
if (
config.getMaxSessionTurns() >= 0 &&
turnCount > config.getMaxSessionTurns()
) {
handleMaxTurnsExceededError(config);
}
const toolCallRequests: ToolCallRequestInfo[] = [];
const apiStartTime = Date.now();
const responseStream = geminiClient.sendMessageStream(
currentMessages[0]?.parts || [],
abortController.signal,
prompt_id,
{
type: isFirstTurn
? SendMessageType.UserQuery
: SendMessageType.ToolResult,
modelOverride,
},
);
isFirstTurn = false;
// Start assistant message for this turn
adapter.startAssistantMessage();
for await (const event of responseStream) {
if (abortController.signal.aborted) {
handleCancellationError(config);
}
// Use adapter for all event processing
adapter.processEvent(event);
if (event.type === GeminiEventType.ToolCallRequest) {
toolCallRequests.push(event.value);
}
if (
outputFormat === OutputFormat.TEXT &&
event.type === GeminiEventType.Error
) {
const errorText = parseAndFormatApiError(
event.value.error,
config.getContentGeneratorConfig()?.authType,
);
process.stderr.write(`${errorText}\n`);
// Throw error to exit with non-zero code
throw new Error(errorText);
}
}
// Finalize assistant message
adapter.finalizeAssistantMessage();
totalApiDurationMs += Date.now() - apiStartTime;
if (toolCallRequests.length > 0) {
const toolResponseParts: Part[] = [];
for (const requestInfo of toolCallRequests) {
const finalRequestInfo = requestInfo;
const inputFormat =
typeof config.getInputFormat === 'function'
? config.getInputFormat()
: InputFormat.TEXT;
const toolCallUpdateCallback =
inputFormat === InputFormat.STREAM_JSON && options.controlService
? options.controlService.permission.getToolCallUpdateCallback()
: undefined;
// Build outputUpdateHandler for this tool call.
// Agent tool has its own complex handler (subagent messages).
// All other tools with canUpdateOutput=true (e.g., MCP tools)
// get a generic handler that emits progress via the adapter.
const isAgentTool = finalRequestInfo.name === 'agent';
const { handler: outputUpdateHandler } = isAgentTool
? createAgentToolProgressHandler(
config,
finalRequestInfo.callId,
adapter,
)
: createToolProgressHandler(finalRequestInfo, adapter);
const toolResponse = await executeToolCall(
config,
finalRequestInfo,
abortController.signal,
{
outputUpdateHandler,
...(toolCallUpdateCallback && {
onToolCallsUpdate: toolCallUpdateCallback,
}),
},
);
// Note: In JSON mode, subagent messages are automatically added to the main
// adapter's messages array and will be output together on emitResult()
if (toolResponse.error) {
// In JSON/STREAM_JSON mode, tool errors are tolerated and formatted
// as tool_result blocks. handleToolError will detect JSON/STREAM_JSON mode
// from config and allow the session to continue so the LLM can decide what to do next.
// In text mode, we still log the error.
handleToolError(
finalRequestInfo.name,
toolResponse.error,
config,
toolResponse.errorType || 'TOOL_EXECUTION_ERROR',
typeof toolResponse.resultDisplay === 'string'
? toolResponse.resultDisplay
: undefined,
);
}
adapter.emitToolResult(finalRequestInfo, toolResponse);
if (toolResponse.responseParts) {
toolResponseParts.push(...toolResponse.responseParts);
}
// Capture model override from skill tool results.
// Use `in` so that undefined (from inherit/no-model skills) clears a prior override,
// while non-skill tools (field absent) leave the current override intact.
if ('modelOverride' in toolResponse) {
modelOverride = toolResponse.modelOverride;
}
}
currentMessages = [{ role: 'user', parts: toolResponseParts }];
} else {
// No more tool calls — drain notifications and cron, then exit.
// Process one queue item through the full turn loop.
const drainOneItem = async () => {
if (localQueue.length === 0) return;
const item = localQueue.shift()!;
if (item.sendMessageType === SendMessageType.Notification) {
adapter.emitUserMessage([{ text: item.displayText }]);
if (item.sdkNotification) {
adapter.emitSystemMessage(
'task_notification',
item.sdkNotification,
);
}
}
turnCount++;
// Symmetry with the main turn loop: drain-turns (cron fires and
// background-agent notification replies) count toward the
// configured budget too, otherwise a looping cron or a model
// that keeps replying to notifications can exceed the cap
// silently in headless runs.
if (
config.getMaxSessionTurns() >= 0 &&
turnCount > config.getMaxSessionTurns()
) {
handleMaxTurnsExceededError(config);
}
let itemMessages: Content[] = [
{ role: 'user', parts: [{ text: item.modelText }] },
];
let itemIsFirstTurn = true;
let itemModelOverride: string | undefined;
while (true) {
const itemToolCallRequests: ToolCallRequestInfo[] = [];
const itemApiStartTime = Date.now();
const itemStream = geminiClient.sendMessageStream(
itemMessages[0]?.parts || [],
abortController.signal,
prompt_id,
{
type: itemIsFirstTurn
? item.sendMessageType
: SendMessageType.ToolResult,
modelOverride: itemModelOverride,
...(itemIsFirstTurn && {
notificationDisplayText: item.displayText,
}),
},
);
itemIsFirstTurn = false;
adapter.startAssistantMessage();
for await (const event of itemStream) {
if (abortController.signal.aborted) {
// Pair the startAssistantMessage() above so stream-json
// mode doesn't leave an unterminated message_start.
adapter.finalizeAssistantMessage();
return;
}
adapter.processEvent(event);
if (event.type === GeminiEventType.ToolCallRequest) {
itemToolCallRequests.push(event.value);
}
if (
outputFormat === OutputFormat.TEXT &&
event.type === GeminiEventType.Error
) {
const errorText = parseAndFormatApiError(
event.value.error,
config.getContentGeneratorConfig()?.authType,
);
process.stderr.write(`${errorText}\n`);
throw new Error(errorText);
}
}
adapter.finalizeAssistantMessage();
totalApiDurationMs += Date.now() - itemApiStartTime;
if (itemToolCallRequests.length > 0) {
const itemToolResponseParts: Part[] = [];
for (const requestInfo of itemToolCallRequests) {
const itemInputFormat =
typeof config.getInputFormat === 'function'
? config.getInputFormat()
: InputFormat.TEXT;
const itemToolCallUpdateCallback =
itemInputFormat === InputFormat.STREAM_JSON &&
options.controlService
? options.controlService.permission.getToolCallUpdateCallback()
: undefined;
const isAgentTool = requestInfo.name === 'agent';
const { handler: outputUpdateHandler } = isAgentTool
? createAgentToolProgressHandler(
config,
requestInfo.callId,
adapter,
)
: createToolProgressHandler(requestInfo, adapter);
const toolResponse = await executeToolCall(
config,
requestInfo,
abortController.signal,
{
outputUpdateHandler,
...(itemToolCallUpdateCallback && {
onToolCallsUpdate: itemToolCallUpdateCallback,
}),
},
);
if (toolResponse.error) {
handleToolError(
requestInfo.name,
toolResponse.error,
config,
toolResponse.errorType || 'TOOL_EXECUTION_ERROR',
typeof toolResponse.resultDisplay === 'string'
? toolResponse.resultDisplay
: undefined,
);
}
adapter.emitToolResult(requestInfo, toolResponse);
if (toolResponse.responseParts) {
itemToolResponseParts.push(...toolResponse.responseParts);
}
if ('modelOverride' in toolResponse) {
itemModelOverride = toolResponse.modelOverride;
}
}
itemMessages = [{ role: 'user', parts: itemToolResponseParts }];
} else {
break;
}
}
};
// Single-flight drain: concurrent callers wait for the running
// drain. A new drain starts only after the previous one finishes
// its queue, which prevents overlapping turns when cron jobs fire
// while an earlier queued item is still streaming.
//
// The null-clearing is attached via `.finally()` on the outer
// promise rather than inside the async body. When the queue is
// empty the async body runs to completion synchronously (no
// awaits) — an inner `finally { drainPromise = null }` would
// therefore fire BEFORE the outer `drainPromise = p` assignment,
// leaving drainPromise stuck holding a resolved Promise forever
// and making every future call return immediately without ever
// draining. Clearing via `p.finally()` schedules the null as a
// microtask that runs after the outer assignment.
let drainPromise: Promise<void> | null = null;
const drainLocalQueue = (): Promise<void> => {
if (drainPromise) return drainPromise;
const p = (async () => {
while (localQueue.length > 0) {
await drainOneItem();
}
})();
drainPromise = p;
void p.finally(() => {
if (drainPromise === p) drainPromise = null;
});
return p;
};
// Start cron scheduler — fires enqueue onto the shared queue.
const scheduler = !config.isCronEnabled()
? null
: config.getCronScheduler();
if (scheduler && scheduler.size > 0) {
await new Promise<void>((resolve, reject) => {
// Resolve on SIGINT/SIGTERM too — recurring cron jobs never
// drop scheduler.size to 0 on their own, so without this the
// hold-back loop below is unreachable after an abort.
const onAbort = () => {
scheduler.stop();
resolve();
};
if (abortController.signal.aborted) {
onAbort();
return;
}
abortController.signal.addEventListener('abort', onAbort, {
once: true,
});
const checkCronDone = () => {
if (scheduler.size === 0 && !drainPromise) {
abortController.signal.removeEventListener('abort', onAbort);
scheduler.stop();
resolve();
}
};
// Propagate drain failures. Without this, a rejected
// drainLocalQueue() (e.g. a text-mode API error surfacing
// out of drainOneItem) would be swallowed by `void` and
// checkCronDone would never fire — hanging the run.
const onDrainError = (err: unknown) => {
abortController.signal.removeEventListener('abort', onAbort);
scheduler.stop();
reject(err);
};
scheduler.start((job: { prompt: string }) => {
const label = job.prompt.slice(0, 40);
localQueue.push({
displayText: `Cron: ${label}`,
modelText: job.prompt,
sendMessageType: SendMessageType.Cron,
});
drainLocalQueue().then(checkCronDone, onDrainError);
});
// Check immediately in case jobs were already deleted
checkCronDone();
});
}
// ─── Terminal hold-back phase ──────────────────────────
// Wait for running background agents to complete and drain
// their notifications before emitting the final result. If
// SIGINT/SIGTERM fires here, abort running background agents
// (they use their own AbortControllers) and route through
// handleCancellationError so the run exits non-zero — falling
// through to the success emitResult below would silently
// convert cancellation into a successful completion.
while (true) {
if (abortController.signal.aborted) {
registry.abortAll();
handleCancellationError(config);
}
await drainLocalQueue();
const running = registry.getRunning();
if (running.length === 0 && localQueue.length === 0) break;
await new Promise((r) => setTimeout(r, 100));
}
const metrics = uiTelemetryService.getMetrics();
const usage = computeUsageFromMetrics(metrics);
// Get stats for JSON format output
const stats =
outputFormat === OutputFormat.JSON
? uiTelemetryService.getMetrics()
: undefined;
adapter.emitResult({
isError: false,
durationMs: Date.now() - startTime,
apiDurationMs: totalApiDurationMs,
numTurns: turnCount,
usage,
stats,
});
return;
}
}
} catch (error) {
// Ensure message_start / message_stop (and content_block events) are
// properly paired even when an error aborts the turn mid-stream.
// The call is safe when no message was started (throws → caught) or
// when already finalized (idempotent guard inside the adapter).
try {
adapter.finalizeAssistantMessage();
} catch {
// Expected when no message was started or already finalized
}
// For JSON and STREAM_JSON modes, compute usage from metrics
const message = error instanceof Error ? error.message : String(error);
const metrics = uiTelemetryService.getMetrics();
const usage = computeUsageFromMetrics(metrics);
// Get stats for JSON format output
const stats =
outputFormat === OutputFormat.JSON
? uiTelemetryService.getMetrics()
: undefined;
adapter.emitResult({
isError: true,
durationMs: Date.now() - startTime,
apiDurationMs: totalApiDurationMs,
numTurns: turnCount,
errorMessage: message,
usage,
stats,
});
handleError(error, config);
} finally {
try {
const reg = config.getBackgroundTaskRegistry();
reg.setNotificationCallback(undefined);
reg.setRegisterCallback(undefined);
} catch {
// Ignore — registry may not be initialized if we failed early.
}
process.stdout.removeListener('error', stdoutErrorHandler);
// Cleanup signal handlers
process.removeListener('SIGINT', shutdownHandler);
process.removeListener('SIGTERM', shutdownHandler);
if (isTelemetrySdkInitialized()) {
await shutdownTelemetry();
}
}
});
}