diff --git a/packages/cli/src/config/settingsSchema.ts b/packages/cli/src/config/settingsSchema.ts index ca86ea0a5..c901f5db5 100644 --- a/packages/cli/src/config/settingsSchema.ts +++ b/packages/cli/src/config/settingsSchema.ts @@ -1231,6 +1231,26 @@ const SETTINGS_SCHEMA = { 'When enabled, Arena worktrees and session state files are preserved after the session ends or the main agent exits.', showInDialog: true, }, + maxRoundsPerAgent: { + type: 'number', + label: 'Max Rounds Per Agent', + category: 'Advanced', + requiresRestart: false, + default: undefined as number | undefined, + description: + 'Maximum number of rounds (turns) each agent can execute. No limit if unset.', + showInDialog: false, + }, + timeoutSeconds: { + type: 'number', + label: 'Timeout (seconds)', + category: 'Advanced', + requiresRestart: false, + default: undefined as number | undefined, + description: + 'Total timeout in seconds for the Arena session. No limit if unset.', + showInDialog: false, + }, }, }, team: { diff --git a/packages/cli/src/ui/commands/arenaCommand.test.ts b/packages/cli/src/ui/commands/arenaCommand.test.ts index 04f3f5597..99f902259 100644 --- a/packages/cli/src/ui/commands/arenaCommand.test.ts +++ b/packages/cli/src/ui/commands/arenaCommand.test.ts @@ -7,7 +7,7 @@ import { describe, it, expect, vi, beforeEach } from 'vitest'; import { type ArenaManager, - ArenaAgentStatus, + AgentStatus, ArenaSessionStatus, } from '@qwen-code/qwen-code-core'; import { arenaCommand } from './arenaCommand.js'; @@ -242,7 +242,7 @@ describe('arenaCommand select subcommand', () => { getAgentStates: vi.fn(() => [ { agentId: 'agent-1', - status: ArenaAgentStatus.TERMINATED, + status: AgentStatus.FAILED, model: { modelId: 'model-1' }, }, ]), @@ -267,12 +267,12 @@ describe('arenaCommand select subcommand', () => { getAgentStates: vi.fn(() => [ { agentId: 'agent-1', - status: ArenaAgentStatus.COMPLETED, + status: AgentStatus.COMPLETED, model: { modelId: 'model-1' }, }, { agentId: 'agent-2', - status: ArenaAgentStatus.COMPLETED, + status: AgentStatus.COMPLETED, model: { modelId: 'model-2' }, }, ]), @@ -294,12 +294,12 @@ describe('arenaCommand select subcommand', () => { getAgentStates: vi.fn(() => [ { agentId: 'agent-1', - status: ArenaAgentStatus.COMPLETED, + status: AgentStatus.COMPLETED, model: { modelId: 'gpt-4o', displayName: 'gpt-4o' }, }, { agentId: 'agent-2', - status: ArenaAgentStatus.COMPLETED, + status: AgentStatus.COMPLETED, model: { modelId: 'claude-sonnet', displayName: 'claude-sonnet' }, }, ]), @@ -327,7 +327,7 @@ describe('arenaCommand select subcommand', () => { getAgentStates: vi.fn(() => [ { agentId: 'agent-1', - status: ArenaAgentStatus.COMPLETED, + status: AgentStatus.COMPLETED, model: { modelId: 'gpt-4o', displayName: 'gpt-4o' }, }, ]), @@ -350,7 +350,7 @@ describe('arenaCommand select subcommand', () => { getAgentStates: vi.fn(() => [ { agentId: 'agent-1', - status: ArenaAgentStatus.COMPLETED, + status: AgentStatus.COMPLETED, model: { modelId: 'gpt-4o' }, }, ]), @@ -373,7 +373,7 @@ describe('arenaCommand select subcommand', () => { getAgentStates: vi.fn(() => [ { agentId: 'agent-1', - status: ArenaAgentStatus.COMPLETED, + status: AgentStatus.COMPLETED, model: { modelId: 'gpt-4o' }, }, ]), diff --git a/packages/cli/src/ui/commands/arenaCommand.ts b/packages/cli/src/ui/commands/arenaCommand.ts index 5339f94ca..cf47f4feb 100644 --- a/packages/cli/src/ui/commands/arenaCommand.ts +++ b/packages/cli/src/ui/commands/arenaCommand.ts @@ -16,7 +16,8 @@ import { CommandKind } from './types.js'; import { ArenaManager, ArenaEventType, - ArenaAgentStatus, + AgentStatus, + isTerminalStatus, ArenaSessionStatus, AuthType, createDebugLogger, @@ -246,41 +247,23 @@ function executeArenaCommand( const buildAgentCardData = ( result: ArenaAgentCompleteEvent['result'], - ): ArenaAgentCardData => { - let status: ArenaAgentCardData['status']; - switch (result.status) { - case ArenaAgentStatus.COMPLETED: - status = 'completed'; - break; - case ArenaAgentStatus.CANCELLED: - status = 'cancelled'; - break; - default: - status = 'terminated'; - break; - } - return { - label: result.model.displayName || result.model.modelId, - status, - durationMs: result.stats.durationMs, - totalTokens: result.stats.totalTokens, - inputTokens: result.stats.inputTokens, - outputTokens: result.stats.outputTokens, - toolCalls: result.stats.toolCalls, - successfulToolCalls: result.stats.successfulToolCalls, - failedToolCalls: result.stats.failedToolCalls, - rounds: result.stats.rounds, - error: result.error, - diff: result.diff, - }; - }; + ): ArenaAgentCardData => ({ + label: result.model.displayName || result.model.modelId, + status: result.status, + durationMs: result.stats.durationMs, + totalTokens: result.stats.totalTokens, + inputTokens: result.stats.inputTokens, + outputTokens: result.stats.outputTokens, + toolCalls: result.stats.toolCalls, + successfulToolCalls: result.stats.successfulToolCalls, + failedToolCalls: result.stats.failedToolCalls, + rounds: result.stats.rounds, + error: result.error, + diff: result.diff, + }); const handleAgentComplete = (event: ArenaAgentCompleteEvent) => { - if ( - event.result.status !== ArenaAgentStatus.COMPLETED && - event.result.status !== ArenaAgentStatus.CANCELLED && - event.result.status !== ArenaAgentStatus.TERMINATED - ) { + if (!isTerminalStatus(event.result.status)) { return; } @@ -598,7 +581,7 @@ export const arenaCommand: SlashCommand = { const agents = manager.getAgentStates(); const hasSuccessful = agents.some( - (a) => a.status === ArenaAgentStatus.COMPLETED, + (a) => a.status === AgentStatus.COMPLETED, ); if (!hasSuccessful) { @@ -616,7 +599,7 @@ export const arenaCommand: SlashCommand = { const matchingAgent = agents.find((a) => { const label = a.model.displayName || a.model.modelId; return ( - a.status === ArenaAgentStatus.COMPLETED && + a.status === AgentStatus.COMPLETED && (label.toLowerCase() === trimmedArgs.toLowerCase() || a.model.modelId.toLowerCase() === trimmedArgs.toLowerCase()) ); diff --git a/packages/cli/src/ui/components/ArenaSelectDialog.tsx b/packages/cli/src/ui/components/ArenaSelectDialog.tsx index b42d8e8d1..9d2f15806 100644 --- a/packages/cli/src/ui/components/ArenaSelectDialog.tsx +++ b/packages/cli/src/ui/components/ArenaSelectDialog.tsx @@ -9,7 +9,7 @@ import { useCallback, useMemo } from 'react'; import { Box, Text } from 'ink'; import { type ArenaManager, - ArenaAgentStatus, + AgentStatus, type Config, } from '@qwen-code/qwen-code-core'; import { theme } from '../semantic-colors.js'; @@ -138,7 +138,7 @@ export function ArenaSelectDialog({ // Build diff summary from cached result if available let diffAdditions = 0; let diffDeletions = 0; - if (agent.status === ArenaAgentStatus.COMPLETED && result) { + if (agent.status === AgentStatus.COMPLETED && result) { const agentResult = result.agents.find( (a) => a.agentId === agent.agentId, ); @@ -182,7 +182,7 @@ export function ArenaSelectDialog({ value: agent.agentId, title, description, - disabled: agent.status !== ArenaAgentStatus.COMPLETED, + disabled: agent.status !== AgentStatus.COMPLETED, }; }), [agents, result], diff --git a/packages/cli/src/ui/components/ArenaStatusDialog.tsx b/packages/cli/src/ui/components/ArenaStatusDialog.tsx index 221e2f3e6..211a9d9ba 100644 --- a/packages/cli/src/ui/components/ArenaStatusDialog.tsx +++ b/packages/cli/src/ui/components/ArenaStatusDialog.tsx @@ -10,7 +10,7 @@ import { Box, Text } from 'ink'; import { type ArenaManager, type ArenaAgentState, - ArenaAgentStatus, + isTerminalStatus, ArenaSessionStatus, } from '@qwen-code/qwen-code-core'; import { theme } from '../semantic-colors.js'; @@ -42,11 +42,7 @@ function pad( } function getElapsedMs(agent: ArenaAgentState): number { - if ( - agent.status === ArenaAgentStatus.COMPLETED || - agent.status === ArenaAgentStatus.TERMINATED || - agent.status === ArenaAgentStatus.CANCELLED - ) { + if (isTerminalStatus(agent.status)) { return agent.stats.durationMs; } return Date.now() - agent.startedAt; diff --git a/packages/cli/src/ui/types.ts b/packages/cli/src/ui/types.ts index ea3c53ad6..9b07964bf 100644 --- a/packages/cli/src/ui/types.ts +++ b/packages/cli/src/ui/types.ts @@ -11,6 +11,7 @@ import type { ToolCallConfirmationDetails, ToolConfirmationOutcome, ToolResultDisplay, + AgentStatus, } from '@qwen-code/qwen-code-core'; import type { PartListUnion } from '@google/genai'; import { type ReactNode } from 'react'; @@ -266,7 +267,7 @@ export type HistoryItemMcpStatus = HistoryItemBase & { */ export interface ArenaAgentCardData { label: string; - status: 'completed' | 'cancelled' | 'terminated'; + status: AgentStatus; durationMs: number; totalTokens: number; inputTokens: number; diff --git a/packages/cli/src/ui/utils/displayUtils.ts b/packages/cli/src/ui/utils/displayUtils.ts index 2e8f22078..7f422e250 100644 --- a/packages/cli/src/ui/utils/displayUtils.ts +++ b/packages/cli/src/ui/utils/displayUtils.ts @@ -5,7 +5,7 @@ */ import { theme } from '../semantic-colors.js'; -import { ArenaAgentStatus } from '@qwen-code/qwen-code-core'; +import { AgentStatus } from '@qwen-code/qwen-code-core'; // --- Status Labels --- @@ -15,24 +15,17 @@ export interface StatusLabel { color: string; } -export function getArenaStatusLabel( - status: ArenaAgentStatus | string, -): StatusLabel { +export function getArenaStatusLabel(status: AgentStatus): StatusLabel { switch (status) { - case ArenaAgentStatus.COMPLETED: - case 'completed': + case AgentStatus.COMPLETED: return { icon: '✓', text: 'Done', color: theme.status.success }; - case ArenaAgentStatus.CANCELLED: - case 'cancelled': + case AgentStatus.CANCELLED: return { icon: '⊘', text: 'Cancelled', color: theme.status.warning }; - case ArenaAgentStatus.TERMINATED: - case 'terminated': - return { icon: '✗', text: 'Terminated', color: theme.status.error }; - case ArenaAgentStatus.RUNNING: - case 'running': + case AgentStatus.FAILED: + return { icon: '✗', text: 'Failed', color: theme.status.error }; + case AgentStatus.RUNNING: return { icon: '○', text: 'Running', color: theme.text.secondary }; - case ArenaAgentStatus.INITIALIZING: - case 'initializing': + case AgentStatus.INITIALIZING: return { icon: '○', text: 'Initializing', color: theme.text.secondary }; default: return { icon: '○', text: status, color: theme.text.secondary }; diff --git a/packages/core/src/agents/arena/ArenaAgentClient.test.ts b/packages/core/src/agents/arena/ArenaAgentClient.test.ts index d5a5f5f91..6ab61039c 100644 --- a/packages/core/src/agents/arena/ArenaAgentClient.test.ts +++ b/packages/core/src/agents/arena/ArenaAgentClient.test.ts @@ -444,9 +444,9 @@ describe('ArenaAgentClient', () => { }); }); - describe('buildStatsFromMetrics()', () => { - it('should aggregate stats across multiple models', () => { - const metrics: SessionMetrics = { + describe('stats aggregation and wall-clock durationMs', () => { + it('should aggregate multi-model stats and use wall-clock durationMs', async () => { + vi.mocked(uiTelemetryService.getMetrics).mockReturnValue({ models: { 'model-a': { api: { @@ -493,32 +493,58 @@ describe('ArenaAgentClient', () => { byName: {}, }, files: { totalLinesAdded: 0, totalLinesRemoved: 0 }, - }; + }); - const stats = ArenaAgentClient.buildStatsFromMetrics(metrics); + const reporter = new ArenaAgentClient('model-a', tempDir); + await reporter.init(); + await reporter.updateStatus(); - expect(stats.rounds).toBe(5); - expect(stats.totalTokens).toBe(450); - expect(stats.inputTokens).toBe(300); - expect(stats.outputTokens).toBe(150); - expect(stats.durationMs).toBe(1500); - expect(stats.toolCalls).toBe(10); - expect(stats.successfulToolCalls).toBe(8); - expect(stats.failedToolCalls).toBe(2); + const statusPath = path.join( + tempDir, + 'agents', + `${safeAgentId('model-a')}.json`, + ); + const content = JSON.parse(await fs.readFile(statusPath, 'utf-8')); + + expect(content.stats.rounds).toBe(5); + expect(content.stats.totalTokens).toBe(450); + expect(content.stats.inputTokens).toBe(300); + expect(content.stats.outputTokens).toBe(150); + expect(content.stats.toolCalls).toBe(10); + expect(content.stats.successfulToolCalls).toBe(8); + expect(content.stats.failedToolCalls).toBe(2); + // durationMs should be wall-clock time, not API latency sum (1500) + expect(content.stats.durationMs).toBeGreaterThanOrEqual(0); + expect(content.stats.durationMs).toBeLessThan(5000); }); - it('should return zeros when no models exist', () => { - const metrics = createMockMetrics(); + it('should return zeros when no models exist', async () => { + vi.mocked(uiTelemetryService.getMetrics).mockReturnValue( + createMockMetrics(), + ); // Override with empty models - metrics.models = {}; + vi.mocked(uiTelemetryService.getMetrics).mockReturnValue({ + ...createMockMetrics(), + models: {}, + }); - const stats = ArenaAgentClient.buildStatsFromMetrics(metrics); + const reporter = new ArenaAgentClient('model-a', tempDir); + await reporter.init(); + await reporter.updateStatus(); - expect(stats.rounds).toBe(0); - expect(stats.totalTokens).toBe(0); - expect(stats.inputTokens).toBe(0); - expect(stats.outputTokens).toBe(0); - expect(stats.durationMs).toBe(0); + const statusPath = path.join( + tempDir, + 'agents', + `${safeAgentId('model-a')}.json`, + ); + const content = JSON.parse(await fs.readFile(statusPath, 'utf-8')); + + expect(content.stats.rounds).toBe(0); + expect(content.stats.totalTokens).toBe(0); + expect(content.stats.inputTokens).toBe(0); + expect(content.stats.outputTokens).toBe(0); + // durationMs is wall-clock, so still non-negative even with no models + expect(content.stats.durationMs).toBeGreaterThanOrEqual(0); }); }); diff --git a/packages/core/src/agents/arena/ArenaAgentClient.ts b/packages/core/src/agents/arena/ArenaAgentClient.ts index 8b1eb8ba1..1099825e4 100644 --- a/packages/core/src/agents/arena/ArenaAgentClient.ts +++ b/packages/core/src/agents/arena/ArenaAgentClient.ts @@ -9,16 +9,14 @@ import * as path from 'node:path'; import * as crypto from 'node:crypto'; import { createDebugLogger } from '../../utils/debugLogger.js'; import { isNodeError } from '../../utils/errors.js'; -import { - uiTelemetryService, - type SessionMetrics, -} from '../../telemetry/uiTelemetry.js'; +import { uiTelemetryService } from '../../telemetry/uiTelemetry.js'; import type { ArenaAgentStats, ArenaControlSignal, ArenaStatusFile, } from './types.js'; import { safeAgentId } from './types.js'; +import { AgentStatus } from '../runtime/agent-types.js'; const debugLogger = createDebugLogger('ARENA_AGENT_CLIENT'); @@ -44,6 +42,7 @@ export class ArenaAgentClient { private readonly controlDir: string; private readonly statusFilePath: string; private readonly controlFilePath: string; + private readonly startTimeMs: number; private initialized = false; /** @@ -71,6 +70,7 @@ export class ArenaAgentClient { this.controlDir = path.join(arenaSessionDir, CONTROL_SUBDIR); this.statusFilePath = path.join(this.agentsDir, `${safe}.json`); this.controlFilePath = path.join(this.controlDir, `${safe}.json`); + this.startTimeMs = Date.now(); } /** @@ -100,7 +100,7 @@ export class ArenaAgentClient { const statusFile: ArenaStatusFile = { agentId: this.agentId, - status: 'running', + status: AgentStatus.RUNNING, updatedAt: Date.now(), rounds: stats.rounds, currentActivity, @@ -150,7 +150,7 @@ export class ArenaAgentClient { const statusFile: ArenaStatusFile = { agentId: this.agentId, - status: 'completed', + status: AgentStatus.COMPLETED, updatedAt: Date.now(), rounds: stats.rounds, stats, @@ -171,7 +171,7 @@ export class ArenaAgentClient { const statusFile: ArenaStatusFile = { agentId: this.agentId, - status: 'error', + status: AgentStatus.FAILED, updatedAt: Date.now(), rounds: stats.rounds, stats, @@ -192,7 +192,7 @@ export class ArenaAgentClient { const statusFile: ArenaStatusFile = { agentId: this.agentId, - status: 'cancelled', + status: AgentStatus.CANCELLED, updatedAt: Date.now(), rounds: stats.rounds, stats, @@ -204,31 +204,21 @@ export class ArenaAgentClient { } /** - * Build ArenaAgentStats from the current uiTelemetryService metrics. + * Build ArenaAgentStats from uiTelemetryService metrics */ private getStatsFromTelemetry(): ArenaAgentStats { - return ArenaAgentClient.buildStatsFromMetrics( - uiTelemetryService.getMetrics(), - ); - } + const metrics = uiTelemetryService.getMetrics(); - /** - * Convert SessionMetrics into ArenaAgentStats by aggregating across - * all models. Exposed as a static method for testability. - */ - static buildStatsFromMetrics(metrics: SessionMetrics): ArenaAgentStats { let rounds = 0; let totalTokens = 0; let inputTokens = 0; let outputTokens = 0; - let durationMs = 0; for (const model of Object.values(metrics.models)) { rounds += model.api.totalRequests; totalTokens += model.tokens.total; inputTokens += model.tokens.prompt; outputTokens += model.tokens.candidates; - durationMs += model.api.totalLatencyMs; } return { @@ -236,7 +226,7 @@ export class ArenaAgentClient { totalTokens, inputTokens, outputTokens, - durationMs, + durationMs: Date.now() - this.startTimeMs, toolCalls: metrics.tools.totalCalls, successfulToolCalls: metrics.tools.totalSuccess, failedToolCalls: metrics.tools.totalFail, diff --git a/packages/core/src/agents/arena/ArenaManager.test.ts b/packages/core/src/agents/arena/ArenaManager.test.ts index 0bf2b60ec..3d175be6b 100644 --- a/packages/core/src/agents/arena/ArenaManager.test.ts +++ b/packages/core/src/agents/arena/ArenaManager.test.ts @@ -18,9 +18,13 @@ const hoistedMockGetWorktreeDiff = vi.hoisted(() => vi.fn()); const hoistedMockApplyWorktreeChanges = vi.hoisted(() => vi.fn()); const hoistedMockDetectBackend = vi.hoisted(() => vi.fn()); -vi.mock('../index.js', () => ({ - detectBackend: hoistedMockDetectBackend, -})); +vi.mock('../index.js', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + detectBackend: hoistedMockDetectBackend, + }; +}); // Mock GitWorktreeService to avoid real git operations. // The class mock includes static methods used by ArenaManager. @@ -48,6 +52,7 @@ const createMockConfig = (workingDir: string) => ({ getWorkingDir: () => workingDir, getModel: () => 'test-model', getSessionId: () => 'test-session', + getUserMemory: () => '', getToolRegistry: () => ({ getFunctionDeclarations: () => [], getFunctionDeclarationsFiltered: () => [], @@ -294,7 +299,10 @@ describe('ArenaManager', () => { await manager.start(createValidStartOptions()); - expect(hoistedMockDetectBackend).toHaveBeenCalledWith(undefined); + expect(hoistedMockDetectBackend).toHaveBeenCalledWith( + undefined, + expect.anything(), + ); const warningUpdate = updates.find((u) => u.type === 'warning'); expect(warningUpdate).toBeDefined(); expect(warningUpdate?.message).toContain('fallback to tmux backend'); diff --git a/packages/core/src/agents/arena/ArenaManager.ts b/packages/core/src/agents/arena/ArenaManager.ts index c1f075f08..f6b098838 100644 --- a/packages/core/src/agents/arena/ArenaManager.ts +++ b/packages/core/src/agents/arena/ArenaManager.ts @@ -9,12 +9,18 @@ import * as fs from 'node:fs/promises'; import * as path from 'node:path'; import { GitWorktreeService } from '../../services/gitWorktreeService.js'; import type { Config } from '../../config/config.js'; +import { getCoreSystemPrompt } from '../../core/prompts.js'; import { createDebugLogger } from '../../utils/debugLogger.js'; import { isNodeError } from '../../utils/errors.js'; import type { AnsiOutput } from '../../utils/terminalSerializer.js'; import { ArenaEventEmitter, ArenaEventType } from './arena-events.js'; import type { AgentSpawnConfig, Backend, DisplayMode } from '../index.js'; -import { detectBackend } from '../index.js'; +import { detectBackend, DISPLAY_MODE } from '../index.js'; +import type { InProcessBackend } from '../backends/InProcessBackend.js'; +import { + AgentEventType, + type AgentStatusChangeEvent, +} from '../runtime/agent-events.js'; import { type ArenaConfig, type ArenaConfigFile, @@ -25,11 +31,11 @@ import { type ArenaAgentState, type ArenaCallbacks, type ArenaStatusFile, - ArenaAgentStatus, ArenaSessionStatus, ARENA_MAX_AGENTS, safeAgentId, } from './types.js'; +import { AgentStatus, isTerminalStatus } from '../runtime/agent-types.js'; const debugLogger = createDebugLogger('ARENA'); @@ -73,6 +79,8 @@ export class ArenaManager { private terminalRows: number; private pollingInterval: ReturnType | null = null; private lifecyclePromise: Promise | null = null; + /** Cleanup functions for in-process event bridge listeners. */ + private eventBridgeCleanups: Array<() => void> = []; constructor(config: Config, callbacks: ArenaCallbacks = {}) { this.config = config; @@ -260,13 +268,15 @@ export class ArenaManager { this.masterAbortController = new AbortController(); const sourceRepoPath = this.config.getWorkingDir(); + const arenaSettings = this.config.getAgentsSettings().arena; this.arenaConfig = { sessionId: this.sessionId, task: options.task, models: options.models, - maxRoundsPerAgent: options.maxRoundsPerAgent ?? 50, - timeoutSeconds: options.timeoutSeconds ?? 600, + maxRoundsPerAgent: + options.maxRoundsPerAgent ?? arenaSettings?.maxRoundsPerAgent, + timeoutSeconds: options.timeoutSeconds ?? arenaSettings?.timeoutSeconds, approvalMode: options.approvalMode, sourceRepoPath, }; @@ -372,17 +382,15 @@ export class ArenaManager { // Abort the master controller this.masterAbortController?.abort(); - const isTerminal = (s: ArenaAgentStatus) => - s === ArenaAgentStatus.TERMINATED || s === ArenaAgentStatus.CANCELLED; - // Force stop all PTY processes (sends Ctrl-C) this.backend?.stopAll(); - // Update agent statuses + // Update agent statuses — skip agents already in a terminal state + // (COMPLETED, FAILED, CANCELLED) so we don't overwrite a successful result. for (const agent of this.agents.values()) { - if (!isTerminal(agent.status)) { + if (!isTerminalStatus(agent.status)) { agent.abortController.abort(); - this.updateAgentStatus(agent.agentId, ArenaAgentStatus.TERMINATED); + this.updateAgentStatus(agent.agentId, AgentStatus.CANCELLED); } } @@ -402,6 +410,9 @@ export class ArenaManager { // Stop polling in case cleanup is called without cancel this.stopPolling(); + // Remove in-process event bridge listeners + this.teardownEventBridge(); + // Clean up backend resources if (this.backend) { await this.backend.cleanup(); @@ -432,6 +443,9 @@ export class ArenaManager { this.stopPolling(); + // Remove in-process event bridge listeners + this.teardownEventBridge(); + if (this.backend) { await this.backend.cleanup(); } @@ -454,7 +468,7 @@ export class ArenaManager { return { success: false, error: `Agent ${agentId} not found` }; } - if (agent.status !== ArenaAgentStatus.COMPLETED) { + if (agent.status !== AgentStatus.COMPLETED) { return { success: false, error: `Agent ${agentId} has not completed (current status: ${agent.status})`, @@ -537,7 +551,7 @@ export class ArenaManager { * Initialize the backend. */ private async initializeBackend(displayMode?: DisplayMode): Promise { - const { backend, warning } = await detectBackend(displayMode); + const { backend, warning } = await detectBackend(displayMode, this.config); await backend.init(); this.backend = backend; @@ -607,7 +621,7 @@ export class ArenaManager { const agentState: ArenaAgentState = { agentId, model, - status: ArenaAgentStatus.INITIALIZING, + status: AgentStatus.INITIALIZING, worktree, abortController: new AbortController(), stats: { @@ -646,25 +660,36 @@ export class ArenaManager { this.handleAgentExit(agentId, exitCode, signal); }); + const isInProcess = backend.type === DISPLAY_MODE.IN_PROCESS; + // Spawn agents sequentially — each spawn completes before starting the next. // This creates a visual effect where panes appear one by one. for (const agent of this.agents.values()) { await this.spawnAgentPty(agent); } - // Start polling agent status files - this.startPolling(); + // For in-process mode, set up event bridges instead of file-based polling. + // For PTY mode, start polling agent status files. + if (isInProcess) { + this.setupInProcessEventBridge(backend as InProcessBackend); + } else { + this.startPolling(); + } // Set up timeout - const timeoutMs = (this.arenaConfig.timeoutSeconds ?? 600) * 1000; + const timeoutSeconds = this.arenaConfig.timeoutSeconds; // Wait for all agents to reach IDLE or TERMINATED, or timeout. // Unlike waitForAll (which waits for PTY exit), this resolves as soon // as every agent has finished its first task in interactive mode. - const allSettled = await this.waitForAllAgentsSettled(timeoutMs); + const allSettled = await this.waitForAllAgentsSettled( + timeoutSeconds ? timeoutSeconds * 1000 : undefined, + ); - // Stop polling when all agents are done - this.stopPolling(); + // Stop polling when all agents are done (no-op for in-process mode) + if (!isInProcess) { + this.stopPolling(); + } if (!allSettled) { debugLogger.info('Arena session timed out, stopping remaining agents'); @@ -672,14 +697,11 @@ export class ArenaManager { // Terminate remaining active agents for (const agent of this.agents.values()) { - if ( - agent.status !== ArenaAgentStatus.COMPLETED && - agent.status !== ArenaAgentStatus.CANCELLED && - agent.status !== ArenaAgentStatus.TERMINATED - ) { + if (!isTerminalStatus(agent.status)) { backend.stopAgent(agent.agentId); agent.abortController.abort(); - this.updateAgentStatus(agent.agentId, ArenaAgentStatus.TERMINATED); + agent.stats.durationMs = Date.now() - agent.startedAt; + this.updateAgentStatus(agent.agentId, AgentStatus.CANCELLED); } } } @@ -699,7 +721,7 @@ export class ArenaManager { debugLogger.info(`Spawning agent PTY: ${agentId}`); agent.startedAt = Date.now(); - this.updateAgentStatus(agentId, ArenaAgentStatus.RUNNING); + this.updateAgentStatus(agentId, AgentStatus.RUNNING); // Emit agent start event this.eventEmitter.emit(ArenaEventType.AGENT_START, { @@ -721,7 +743,7 @@ export class ArenaManager { const errorMessage = error instanceof Error ? error.message : String(error); agent.error = errorMessage; - this.updateAgentStatus(agentId, ArenaAgentStatus.TERMINATED); + this.updateAgentStatus(agentId, AgentStatus.FAILED); this.eventEmitter.emit(ArenaEventType.AGENT_ERROR, { sessionId: this.requireConfig().sessionId, @@ -758,8 +780,8 @@ export class ArenaManager { return; } - // Already terminated (e.g. via cancel) - if (agent.status === ArenaAgentStatus.TERMINATED) { + // Already failed/cancelled (e.g. via cancel) + if (isTerminalStatus(agent.status)) { return; } @@ -779,8 +801,13 @@ export class ArenaManager { }); } - this.updateAgentStatus(agentId, ArenaAgentStatus.TERMINATED); - debugLogger.info(`Agent terminated: ${agentId} (exit code: ${exitCode})`); + this.updateAgentStatus( + agentId, + agent.abortController.signal.aborted + ? AgentStatus.CANCELLED + : AgentStatus.FAILED, + ); + debugLogger.info(`Agent exited: ${agentId} (exit code: ${exitCode})`); } /** @@ -832,7 +859,7 @@ export class ArenaManager { env['QWEN_BASE_URL'] = model.baseUrl; } - const spawnConfig = { + const spawnConfig: AgentSpawnConfig = { agentId, command: process.execPath, // Use the same Node.js binary args: [path.resolve(process.argv[1]!), ...args], // Re-launch the CLI entry point (must be absolute path since cwd changes) @@ -840,6 +867,30 @@ export class ArenaManager { env, cols: this.terminalCols, rows: this.terminalRows, + inProcess: { + agentName: model.displayName || model.modelId, + initialTask: this.arenaConfig?.task, + runtimeConfig: { + promptConfig: { + systemPrompt: getCoreSystemPrompt( + this.config.getUserMemory(), + model.modelId, + ), + }, + modelConfig: { model: model.modelId }, + runConfig: { + max_turns: this.arenaConfig?.maxRoundsPerAgent, + max_time_minutes: this.arenaConfig?.timeoutSeconds + ? Math.ceil(this.arenaConfig.timeoutSeconds / 60) + : undefined, + }, + }, + authOverrides: { + authType: model.authType, + apiKey: model.apiKey, + baseUrl: model.baseUrl, + }, + }, }; debugLogger.info( @@ -857,10 +908,26 @@ export class ArenaManager { // ─── Private: Status & Results ───────────────────────────────── - private updateAgentStatus( - agentId: string, - newStatus: ArenaAgentStatus, - ): void { + /** Decide whether a status transition is valid. Returns the new status or null. */ + private resolveTransition( + current: AgentStatus, + incoming: AgentStatus, + ): AgentStatus | null { + if (current === incoming) return null; + if (isTerminalStatus(current)) { + // Allow revival: COMPLETED → RUNNING (agent received new input) + if ( + current === AgentStatus.COMPLETED && + incoming === AgentStatus.RUNNING + ) { + return incoming; + } + return null; + } + return incoming; + } + + private updateAgentStatus(agentId: string, newStatus: AgentStatus): void { const agent = this.agents.get(agentId); if (!agent) { return; @@ -877,12 +944,8 @@ export class ArenaManager { timestamp: Date.now(), }); - // Emit AGENT_COMPLETE when agent reaches COMPLETED, CANCELLED, or TERMINATED - if ( - newStatus === ArenaAgentStatus.COMPLETED || - newStatus === ArenaAgentStatus.CANCELLED || - newStatus === ArenaAgentStatus.TERMINATED - ) { + // Emit AGENT_COMPLETE when agent reaches a terminal status + if (isTerminalStatus(newStatus)) { const result = this.buildAgentResult(agent); this.eventEmitter.emit(ArenaEventType.AGENT_COMPLETE, { @@ -932,15 +995,11 @@ export class ArenaManager { * Wait for all agents to reach IDLE or TERMINATED state. * Returns true if all agents settled, false if timeout was reached. */ - private waitForAllAgentsSettled(timeoutMs: number): Promise { + private waitForAllAgentsSettled(timeoutMs?: number): Promise { return new Promise((resolve) => { const checkSettled = () => { for (const agent of this.agents.values()) { - if ( - agent.status !== ArenaAgentStatus.COMPLETED && - agent.status !== ArenaAgentStatus.CANCELLED && - agent.status !== ArenaAgentStatus.TERMINATED - ) { + if (!isTerminalStatus(agent.status)) { return false; } } @@ -952,16 +1011,19 @@ export class ArenaManager { return; } - const timeoutHandle = setTimeout(() => { - clearInterval(pollHandle); - resolve(false); - }, timeoutMs); + let timeoutHandle: ReturnType | undefined; + if (timeoutMs !== undefined) { + timeoutHandle = setTimeout(() => { + clearInterval(pollHandle); + resolve(false); + }, timeoutMs); + } // Re-check periodically (piggybacks on the same polling interval) const pollHandle = setInterval(() => { if (checkSettled()) { clearInterval(pollHandle); - clearTimeout(timeoutHandle); + if (timeoutHandle) clearTimeout(timeoutHandle); resolve(true); } }, ARENA_POLL_INTERVAL_MS); @@ -993,6 +1055,80 @@ export class ArenaManager { } } + /** + * Set up event bridges for in-process agents. + * Subscribes to each AgentInteractive's events to update ArenaManager state. + * Listeners are tracked in `eventBridgeCleanups` for teardown. + */ + private setupInProcessEventBridge(backend: InProcessBackend): void { + for (const agent of this.agents.values()) { + const interactive = backend.getAgent(agent.agentId); + if (!interactive) continue; + + const emitter = interactive.getEventEmitter(); + if (!emitter) continue; + + // AgentInteractive emits canonical AgentStatus values — no mapping needed. + + const syncStats = () => { + const { totalToolCalls, totalDurationMs, ...rest } = + interactive.getStats(); + Object.assign(agent.stats, rest, { + toolCalls: totalToolCalls, + durationMs: totalDurationMs, + }); + }; + + const applyStatus = (incoming: AgentStatus) => { + const resolved = this.resolveTransition(agent.status, incoming); + if (!resolved) return; + if (resolved === AgentStatus.FAILED) { + agent.error = + interactive.getLastRoundError() || interactive.getError(); + } + if (isTerminalStatus(resolved)) { + agent.stats.durationMs = Date.now() - agent.startedAt; + } + this.updateAgentStatus(agent.agentId, resolved); + }; + + // Sync stats before mapping so counters are up-to-date even when + // the provider omits usage_metadata events. + const onStatusChange = (event: AgentStatusChangeEvent) => { + syncStats(); + applyStatus(event.newStatus); + }; + + const onUsageMetadata = () => syncStats(); + + emitter.on(AgentEventType.STATUS_CHANGE, onStatusChange); + emitter.on(AgentEventType.USAGE_METADATA, onUsageMetadata); + + // Store cleanup functions so listeners can be removed during teardown + this.eventBridgeCleanups.push(() => { + emitter.off(AgentEventType.STATUS_CHANGE, onStatusChange); + emitter.off(AgentEventType.USAGE_METADATA, onUsageMetadata); + }); + + // Reconcile: if the agent already transitioned before the bridge was + // attached (e.g. fast completion or createChat failure during spawn), + // backfill stats and apply its current status now so + // waitForAllAgentsSettled sees it. + syncStats(); + applyStatus(interactive.getStatus()); + } + } + + /** + * Remove all event bridge listeners registered by setupInProcessEventBridge. + */ + private teardownEventBridge(): void { + for (const cleanup of this.eventBridgeCleanups) { + cleanup(); + } + this.eventBridgeCleanups.length = 0; + } + /** * Read per-agent status files from `/agents/` directory. * Updates agent stats, emits AGENT_STATS_UPDATE events, and writes a @@ -1004,11 +1140,10 @@ export class ArenaManager { const consolidatedAgents: Record = {}; for (const agent of this.agents.values()) { - // Only poll agents that are still alive (RUNNING or IDLE) + // Only poll agents that are still alive (RUNNING) if ( - agent.status === ArenaAgentStatus.TERMINATED || - agent.status === ArenaAgentStatus.CANCELLED || - agent.status === ArenaAgentStatus.INITIALIZING + isTerminalStatus(agent.status) || + agent.status === AgentStatus.INITIALIZING ) { continue; } @@ -1024,45 +1159,22 @@ export class ArenaManager { // Collect for consolidated file consolidatedAgents[agent.agentId] = statusFile; - // Update agent stats from the status file, but preserve locally - // calculated durationMs (the child process doesn't track it). - const { durationMs: _childDuration, ...fileStats } = statusFile.stats; + // Update agent stats from the status file. agent.stats = { ...agent.stats, - ...fileStats, + ...statusFile.stats, }; // Detect state transitions from the sideband status file - if ( - statusFile.status === 'completed' && - agent.status === ArenaAgentStatus.RUNNING - ) { - // Agent finished its task successfully - agent.stats.durationMs = Date.now() - agent.startedAt; - this.updateAgentStatus(agent.agentId, ArenaAgentStatus.COMPLETED); - } else if ( - statusFile.status === 'cancelled' && - agent.status === ArenaAgentStatus.RUNNING - ) { - // Agent was cancelled by user - agent.stats.durationMs = Date.now() - agent.startedAt; - this.updateAgentStatus(agent.agentId, ArenaAgentStatus.CANCELLED); - } else if ( - statusFile.status === 'error' && - agent.status === ArenaAgentStatus.RUNNING - ) { - // Agent hit an error - agent.stats.durationMs = Date.now() - agent.startedAt; - if (statusFile.error) { + const resolved = this.resolveTransition( + agent.status, + statusFile.status, + ); + if (resolved) { + if (resolved === AgentStatus.FAILED && statusFile.error) { agent.error = statusFile.error; } - this.updateAgentStatus(agent.agentId, ArenaAgentStatus.TERMINATED); - } else if ( - statusFile.status === 'running' && - agent.status === ArenaAgentStatus.COMPLETED - ) { - // Agent received new input and is working again - this.updateAgentStatus(agent.agentId, ArenaAgentStatus.RUNNING); + this.updateAgentStatus(agent.agentId, resolved); } this.callbacks.onAgentStatsUpdate?.(agent.agentId, statusFile.stats); @@ -1195,7 +1307,7 @@ export class ArenaManager { const result = this.buildAgentResult(agent); // Get diff for completed agents (they finished their task) - if (agent.status === ArenaAgentStatus.COMPLETED) { + if (agent.status === AgentStatus.COMPLETED) { try { result.diff = await this.worktreeService.getWorktreeDiff( agent.worktree.path, diff --git a/packages/core/src/agents/arena/arena-events.ts b/packages/core/src/agents/arena/arena-events.ts index 1098fcafa..20f82d6d5 100644 --- a/packages/core/src/agents/arena/arena-events.ts +++ b/packages/core/src/agents/arena/arena-events.ts @@ -6,11 +6,11 @@ import { EventEmitter } from 'events'; import type { - ArenaAgentStatus, ArenaModelConfig, ArenaAgentResult, ArenaSessionResult, } from './types.js'; +import type { AgentStatus } from '../runtime/agent-types.js'; /** * Arena event types. @@ -109,8 +109,8 @@ export interface ArenaAgentCompleteEvent { export interface ArenaAgentStatusChangeEvent { sessionId: string; agentId: string; - previousStatus: ArenaAgentStatus; - newStatus: ArenaAgentStatus; + previousStatus: AgentStatus; + newStatus: AgentStatus; timestamp: number; } diff --git a/packages/core/src/agents/arena/types.ts b/packages/core/src/agents/arena/types.ts index 0fe6e299c..22a002056 100644 --- a/packages/core/src/agents/arena/types.ts +++ b/packages/core/src/agents/arena/types.ts @@ -6,41 +6,13 @@ import type { WorktreeInfo } from '../../services/gitWorktreeService.js'; import type { DisplayMode } from '../backends/types.js'; +import type { AgentStatus } from '../runtime/agent-types.js'; /** * Maximum number of concurrent agents allowed in an Arena session. */ export const ARENA_MAX_AGENTS = 5; -/** - * Represents the status of an Arena agent in interactive mode. - * - * Agents run as interactive CLI subprocesses (--prompt-interactive), so - * they never truly "complete" or "exit" on their own. Instead: - * - * INITIALIZING → RUNNING ⇄ COMPLETED → TERMINATED - * ↘ CANCELLED - * - * - INITIALIZING: Worktree created, PTY not yet spawned. - * - RUNNING: Agent is actively processing a turn (model thinking / tool execution). - * - COMPLETED: Agent finished the current task successfully. - * This is the "selectable" state for /arena select. - * - CANCELLED: Agent's current request was cancelled by the user. - * - TERMINATED: PTY process has exited (killed, crashed, or shut down). - */ -export enum ArenaAgentStatus { - /** Worktree created, PTY not yet spawned */ - INITIALIZING = 'initializing', - /** Agent is actively processing a turn */ - RUNNING = 'running', - /** Agent finished current task successfully */ - COMPLETED = 'completed', - /** Agent's current request was cancelled by the user */ - CANCELLED = 'cancelled', - /** PTY process has exited */ - TERMINATED = 'terminated', -} - /** * Represents the status of an Arena session. */ @@ -124,7 +96,7 @@ export interface ArenaAgentResult { /** Model configuration used */ model: ArenaModelConfig; /** Final status */ - status: ArenaAgentStatus; + status: AgentStatus; /** Worktree information */ worktree: WorktreeInfo; /** Final text output from the agent */ @@ -215,7 +187,7 @@ export interface ArenaCallbacks { */ export interface ArenaStatusFile { agentId: string; - status: 'running' | 'completed' | 'error' | 'cancelled'; + status: AgentStatus; updatedAt: number; rounds: number; currentActivity?: string; @@ -275,7 +247,7 @@ export interface ArenaAgentState { /** Model configuration */ model: ArenaModelConfig; /** Current status */ - status: ArenaAgentStatus; + status: AgentStatus; /** Worktree information */ worktree: WorktreeInfo; /** Abort controller for cancellation */ diff --git a/packages/core/src/agents/backends/InProcessBackend.test.ts b/packages/core/src/agents/backends/InProcessBackend.test.ts new file mode 100644 index 000000000..6c4734f32 --- /dev/null +++ b/packages/core/src/agents/backends/InProcessBackend.test.ts @@ -0,0 +1,536 @@ +/** + * @license + * Copyright 2025 Qwen + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { InProcessBackend } from './InProcessBackend.js'; +import { DISPLAY_MODE } from './types.js'; +import type { AgentSpawnConfig } from './types.js'; +import { AgentCore } from '../runtime/agent-core.js'; +import { createContentGenerator } from '../../core/contentGenerator.js'; + +// Mock createContentGenerator to avoid real API client setup +const mockContentGenerator = { + generateContentStream: vi.fn(), +}; +vi.mock('../../core/contentGenerator.js', () => ({ + createContentGenerator: vi.fn().mockResolvedValue({ + generateContentStream: vi.fn(), + }), +})); + +// Mock AgentCore and AgentInteractive to avoid real model calls +vi.mock('../runtime/agent-core.js', () => ({ + AgentCore: vi.fn().mockImplementation(() => ({ + subagentId: 'mock-id', + name: 'mock-agent', + eventEmitter: { + on: vi.fn(), + off: vi.fn(), + emit: vi.fn(), + }, + stats: { + start: vi.fn(), + getSummary: vi.fn().mockReturnValue({}), + }, + createChat: vi.fn().mockResolvedValue({}), + prepareTools: vi.fn().mockReturnValue([]), + runReasoningLoop: vi.fn().mockResolvedValue({ + text: 'Done', + terminateMode: null, + turnsUsed: 1, + }), + getEventEmitter: vi.fn().mockReturnValue({ + on: vi.fn(), + off: vi.fn(), + emit: vi.fn(), + }), + getExecutionSummary: vi.fn().mockReturnValue({}), + })), +})); + +function createMockToolRegistry() { + return { + getFunctionDeclarations: vi.fn().mockReturnValue([]), + getAllTools: vi.fn().mockReturnValue([]), + getAllToolNames: vi.fn().mockReturnValue([]), + registerTool: vi.fn(), + copyDiscoveredToolsFrom: vi.fn(), + stop: vi.fn().mockResolvedValue(undefined), + }; +} + +function createMockConfig() { + const registry = createMockToolRegistry(); + return { + getModel: vi.fn().mockReturnValue('test-model'), + getToolRegistry: vi.fn().mockReturnValue(registry), + getSessionId: vi.fn().mockReturnValue('test-session'), + getWorkingDir: vi.fn().mockReturnValue('/tmp'), + getTargetDir: vi.fn().mockReturnValue('/tmp'), + createToolRegistry: vi.fn().mockResolvedValue(createMockToolRegistry()), + getContentGenerator: vi.fn().mockReturnValue(mockContentGenerator), + getContentGeneratorConfig: vi.fn().mockReturnValue({ + model: 'test-model', + authType: 'openai', + apiKey: 'parent-key', + baseUrl: 'https://parent.example.com', + }), + getAuthType: vi.fn().mockReturnValue('openai'), + } as never; +} + +function createSpawnConfig(agentId: string): AgentSpawnConfig { + return { + agentId, + command: 'node', + args: [], + cwd: '/tmp', + inProcess: { + agentName: `Agent ${agentId}`, + initialTask: 'Do something', + runtimeConfig: { + promptConfig: { systemPrompt: 'You are a helpful assistant.' }, + modelConfig: { model: 'test-model' }, + runConfig: { max_turns: 10 }, + }, + }, + }; +} + +describe('InProcessBackend', () => { + let backend: InProcessBackend; + + beforeEach(() => { + backend = new InProcessBackend(createMockConfig()); + }); + + it('should have IN_PROCESS type', () => { + expect(backend.type).toBe(DISPLAY_MODE.IN_PROCESS); + }); + + it('should init without error', async () => { + await expect(backend.init()).resolves.toBeUndefined(); + }); + + it('should throw when spawning without inProcess config', async () => { + const config: AgentSpawnConfig = { + agentId: 'test', + command: 'node', + args: [], + cwd: '/tmp', + }; + + await expect(backend.spawnAgent(config)).rejects.toThrow( + 'InProcessBackend requires inProcess config', + ); + }); + + it('should spawn an agent with inProcess config', async () => { + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + + expect(backend.getActiveAgentId()).toBe('agent-1'); + expect(backend.getAgent('agent-1')).toBeDefined(); + }); + + it('should set first spawned agent as active', async () => { + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + await backend.spawnAgent(createSpawnConfig('agent-2')); + + expect(backend.getActiveAgentId()).toBe('agent-1'); + }); + + it('should navigate between agents', async () => { + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + await backend.spawnAgent(createSpawnConfig('agent-2')); + await backend.spawnAgent(createSpawnConfig('agent-3')); + + expect(backend.getActiveAgentId()).toBe('agent-1'); + + backend.switchToNext(); + expect(backend.getActiveAgentId()).toBe('agent-2'); + + backend.switchToNext(); + expect(backend.getActiveAgentId()).toBe('agent-3'); + + // Wraps around + backend.switchToNext(); + expect(backend.getActiveAgentId()).toBe('agent-1'); + + backend.switchToPrevious(); + expect(backend.getActiveAgentId()).toBe('agent-3'); + }); + + it('should switch to a specific agent', async () => { + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + await backend.spawnAgent(createSpawnConfig('agent-2')); + + backend.switchTo('agent-2'); + expect(backend.getActiveAgentId()).toBe('agent-2'); + }); + + it('should forward input to active agent', async () => { + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + + const result = backend.forwardInput('hello'); + expect(result).toBe(true); + }); + + it('should return false for forwardInput with no active agent', () => { + expect(backend.forwardInput('hello')).toBe(false); + }); + + it('should write to specific agent', async () => { + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + + expect(backend.writeToAgent('agent-1', 'hello')).toBe(true); + expect(backend.writeToAgent('nonexistent', 'hello')).toBe(false); + }); + + it('should return null for screen capture methods', async () => { + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + + expect(backend.getActiveSnapshot()).toBeNull(); + expect(backend.getAgentSnapshot('agent-1')).toBeNull(); + expect(backend.getAgentScrollbackLength('agent-1')).toBe(0); + }); + + it('should return null for attach hint', () => { + expect(backend.getAttachHint()).toBeNull(); + }); + + it('should stop a specific agent', async () => { + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + + const agent = backend.getAgent('agent-1'); + expect(agent).toBeDefined(); + + backend.stopAgent('agent-1'); + // Agent should eventually reach cancelled state + }); + + it('should stop all agents', async () => { + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + await backend.spawnAgent(createSpawnConfig('agent-2')); + + backend.stopAll(); + // Both agents should be aborted + }); + + it('should cleanup all agents', async () => { + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + + await backend.cleanup(); + + expect(backend.getActiveAgentId()).toBeNull(); + expect(backend.getAgent('agent-1')).toBeUndefined(); + }); + + it('should fire exit callback when agent completes', async () => { + await backend.init(); + + const exitCallback = vi.fn(); + backend.setOnAgentExit(exitCallback); + + await backend.spawnAgent(createSpawnConfig('agent-1')); + + // The mock agent stays idle after processing initialTask. + // Trigger a graceful shutdown to make it complete. + const agent = backend.getAgent('agent-1'); + expect(agent).toBeDefined(); + await agent!.shutdown(); + + // Wait for the exit callback to fire + await vi.waitFor(() => { + expect(exitCallback).toHaveBeenCalledWith( + 'agent-1', + expect.any(Number), + null, + ); + }); + }); + + it('should pass per-agent cwd to AgentCore via config proxy', async () => { + const parentConfig = createMockConfig(); + const backendWithParentCwd = new InProcessBackend(parentConfig); + await backendWithParentCwd.init(); + + const agentCwd = '/worktree/agent-1'; + const config = createSpawnConfig('agent-1'); + config.cwd = agentCwd; + + await backendWithParentCwd.spawnAgent(config); + + const MockAgentCore = AgentCore as unknown as ReturnType; + const lastCall = MockAgentCore.mock.calls.at(-1); + expect(lastCall).toBeDefined(); + + // Second arg is the runtime context (Config) + const agentContext = lastCall![1] as { + getWorkingDir: () => string; + getTargetDir: () => string; + getToolRegistry: () => unknown; + }; + expect(agentContext.getWorkingDir()).toBe(agentCwd); + expect(agentContext.getTargetDir()).toBe(agentCwd); + expect(agentContext.getToolRegistry()).toBeDefined(); + }); + + it('should propagate runConfig limits to AgentInteractive', async () => { + await backend.init(); + + const config = createSpawnConfig('agent-1'); + config.inProcess!.runtimeConfig.runConfig = { + max_turns: 5, + max_time_minutes: 10, + }; + + await backend.spawnAgent(config); + + const agent = backend.getAgent('agent-1'); + expect(agent).toBeDefined(); + expect(agent!.config.maxTurnsPerMessage).toBe(5); + expect(agent!.config.maxTimeMinutesPerMessage).toBe(10); + }); + + it('should default limits to undefined when runConfig omits them', async () => { + await backend.init(); + + const config = createSpawnConfig('agent-1'); + config.inProcess!.runtimeConfig.runConfig = {}; + + await backend.spawnAgent(config); + + const agent = backend.getAgent('agent-1'); + expect(agent).toBeDefined(); + expect(agent!.config.maxTurnsPerMessage).toBeUndefined(); + expect(agent!.config.maxTimeMinutesPerMessage).toBeUndefined(); + }); + + it('should give each agent its own cwd even when sharing a backend', async () => { + await backend.init(); + + const config1 = createSpawnConfig('agent-1'); + config1.cwd = '/worktree/agent-1'; + const config2 = createSpawnConfig('agent-2'); + config2.cwd = '/worktree/agent-2'; + + await backend.spawnAgent(config1); + await backend.spawnAgent(config2); + + const MockAgentCore = AgentCore as unknown as ReturnType; + const calls = MockAgentCore.mock.calls; + + const ctx1 = calls.at(-2)![1] as { + getWorkingDir: () => string; + getTargetDir: () => string; + }; + const ctx2 = calls.at(-1)![1] as { + getWorkingDir: () => string; + getTargetDir: () => string; + }; + + expect(ctx1.getWorkingDir()).toBe('/worktree/agent-1'); + expect(ctx1.getTargetDir()).toBe('/worktree/agent-1'); + expect(ctx2.getWorkingDir()).toBe('/worktree/agent-2'); + expect(ctx2.getTargetDir()).toBe('/worktree/agent-2'); + }); + + it('should throw when spawning a duplicate agent ID', async () => { + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + + await expect( + backend.spawnAgent(createSpawnConfig('agent-1')), + ).rejects.toThrow('Agent "agent-1" already exists.'); + }); + + it('should fire exit callback with code 1 when start() throws', async () => { + // Make createChat throw for this test + const MockAgentCore = AgentCore as unknown as ReturnType; + MockAgentCore.mockImplementationOnce(() => ({ + subagentId: 'mock-id', + name: 'mock-agent', + eventEmitter: { + on: vi.fn(), + off: vi.fn(), + emit: vi.fn(), + }, + stats: { + start: vi.fn(), + getSummary: vi.fn().mockReturnValue({}), + }, + createChat: vi.fn().mockRejectedValue(new Error('Auth failed')), + prepareTools: vi.fn().mockReturnValue([]), + getEventEmitter: vi.fn().mockReturnValue({ + on: vi.fn(), + off: vi.fn(), + emit: vi.fn(), + }), + getExecutionSummary: vi.fn().mockReturnValue({}), + })); + + await backend.init(); + + const exitCallback = vi.fn(); + backend.setOnAgentExit(exitCallback); + + // spawnAgent should NOT throw — it catches the error internally + await expect( + backend.spawnAgent(createSpawnConfig('agent-fail')), + ).resolves.toBeUndefined(); + + // Exit callback should have been fired with exit code 1 + expect(exitCallback).toHaveBeenCalledWith('agent-fail', 1, null); + }); + + it('should return true immediately from waitForAll after cleanup', async () => { + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + + await backend.cleanup(); + + // waitForAll should return immediately after cleanup + const result = await backend.waitForAll(5000); + expect(result).toBe(true); + }); + + describe('auth isolation', () => { + it('should create per-agent ContentGenerator when authOverrides is provided', async () => { + await backend.init(); + + const config = createSpawnConfig('agent-1'); + config.inProcess!.authOverrides = { + authType: 'anthropic', + apiKey: 'agent-key-123', + baseUrl: 'https://agent.example.com', + }; + + await backend.spawnAgent(config); + + const mockCreate = createContentGenerator as ReturnType; + expect(mockCreate).toHaveBeenCalledWith( + expect.objectContaining({ + authType: 'anthropic', + apiKey: 'agent-key-123', + baseUrl: 'https://agent.example.com', + model: 'test-model', + }), + expect.anything(), + ); + }); + + it('should override getContentGenerator on per-agent config', async () => { + const agentGenerator = { generateContentStream: vi.fn() }; + const mockCreate = createContentGenerator as ReturnType; + mockCreate.mockResolvedValueOnce(agentGenerator); + + await backend.init(); + + const config = createSpawnConfig('agent-1'); + config.inProcess!.authOverrides = { + authType: 'anthropic', + apiKey: 'agent-key', + }; + + await backend.spawnAgent(config); + + const MockAgentCore = AgentCore as unknown as ReturnType; + const lastCall = MockAgentCore.mock.calls.at(-1); + const agentContext = lastCall![1] as { + getContentGenerator: () => unknown; + getAuthType: () => string | undefined; + getModel: () => string; + }; + + expect(agentContext.getContentGenerator()).toBe(agentGenerator); + expect(agentContext.getAuthType()).toBe('anthropic'); + }); + + it('should not create per-agent ContentGenerator without authOverrides', async () => { + const mockCreate = createContentGenerator as ReturnType; + mockCreate.mockClear(); + + await backend.init(); + await backend.spawnAgent(createSpawnConfig('agent-1')); + + expect(mockCreate).not.toHaveBeenCalled(); + }); + + it('should fall back to parent ContentGenerator if per-agent creation fails', async () => { + const mockCreate = createContentGenerator as ReturnType; + mockCreate.mockRejectedValueOnce(new Error('Auth failed')); + + await backend.init(); + + const config = createSpawnConfig('agent-1'); + config.inProcess!.authOverrides = { + authType: 'anthropic', + apiKey: 'bad-key', + }; + + // Should not throw — falls back gracefully + await expect(backend.spawnAgent(config)).resolves.toBeUndefined(); + + const MockAgentCore = AgentCore as unknown as ReturnType; + const lastCall = MockAgentCore.mock.calls.at(-1); + const agentContext = lastCall![1] as { + getContentGenerator: () => unknown; + }; + + // Falls back to parent's content generator + expect(agentContext.getContentGenerator()).toBe(mockContentGenerator); + }); + + it('should give different agents different ContentGenerators', async () => { + const gen1 = { generateContentStream: vi.fn() }; + const gen2 = { generateContentStream: vi.fn() }; + const mockCreate = createContentGenerator as ReturnType; + mockCreate.mockResolvedValueOnce(gen1).mockResolvedValueOnce(gen2); + + await backend.init(); + + const config1 = createSpawnConfig('agent-1'); + config1.inProcess!.authOverrides = { + authType: 'openai', + apiKey: 'key-1', + baseUrl: 'https://api1.example.com', + }; + const config2 = createSpawnConfig('agent-2'); + config2.inProcess!.authOverrides = { + authType: 'anthropic', + apiKey: 'key-2', + baseUrl: 'https://api2.example.com', + }; + + await backend.spawnAgent(config1); + await backend.spawnAgent(config2); + + const MockAgentCore = AgentCore as unknown as ReturnType; + const calls = MockAgentCore.mock.calls; + + const ctx1 = calls.at(-2)![1] as { + getContentGenerator: () => unknown; + }; + const ctx2 = calls.at(-1)![1] as { + getContentGenerator: () => unknown; + }; + + expect(ctx1.getContentGenerator()).toBe(gen1); + expect(ctx2.getContentGenerator()).toBe(gen2); + expect(ctx1.getContentGenerator()).not.toBe(ctx2.getContentGenerator()); + }); + }); +}); diff --git a/packages/core/src/agents/backends/InProcessBackend.ts b/packages/core/src/agents/backends/InProcessBackend.ts new file mode 100644 index 000000000..6ea1de34e --- /dev/null +++ b/packages/core/src/agents/backends/InProcessBackend.ts @@ -0,0 +1,459 @@ +/** + * @license + * Copyright 2025 Qwen + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * @fileoverview InProcessBackend — Backend implementation that runs agents + * in the current process using AgentInteractive instead of PTY subprocesses. + * + * This enables Arena to work without tmux or any external terminal multiplexer. + */ + +import { createDebugLogger } from '../../utils/debugLogger.js'; +import type { Config } from '../../config/config.js'; +import { + type AuthType, + type ContentGenerator, + type ContentGeneratorConfig, + createContentGenerator, +} from '../../core/contentGenerator.js'; +import { AUTH_ENV_MAPPINGS } from '../../models/constants.js'; +import { AgentStatus } from '../runtime/agent-types.js'; +import { AgentCore } from '../runtime/agent-core.js'; +import { AgentEventEmitter } from '../runtime/agent-events.js'; +import { ContextState } from '../runtime/agent-headless.js'; +import { AgentInteractive } from '../runtime/agent-interactive.js'; +import type { + Backend, + AgentSpawnConfig, + AgentExitCallback, + InProcessSpawnConfig, +} from './types.js'; +import { DISPLAY_MODE } from './types.js'; +import type { AnsiOutput } from '../../utils/terminalSerializer.js'; +import { WorkspaceContext } from '../../utils/workspaceContext.js'; +import { FileDiscoveryService } from '../../services/fileDiscoveryService.js'; +import type { ToolRegistry } from '../../tools/tool-registry.js'; + +const debugLogger = createDebugLogger('IN_PROCESS_BACKEND'); + +/** + * InProcessBackend runs agents in the current Node.js process. + * + * Instead of spawning PTY subprocesses, it creates AgentCore + AgentInteractive + * instances that execute in-process. Screen capture returns null (the UI reads + * messages directly from AgentInteractive). + */ +export class InProcessBackend implements Backend { + readonly type = DISPLAY_MODE.IN_PROCESS; + + private readonly runtimeContext: Config; + private readonly agents = new Map(); + private readonly agentRegistries: ToolRegistry[] = []; + private readonly agentOrder: string[] = []; + private activeAgentId: string | null = null; + private exitCallback: AgentExitCallback | null = null; + /** Whether cleanup() has been called */ + private cleanedUp = false; + + constructor(runtimeContext: Config) { + this.runtimeContext = runtimeContext; + } + + // ─── Backend Interface ───────────────────────────────────── + + async init(): Promise { + debugLogger.info('InProcessBackend initialized'); + } + + async spawnAgent(config: AgentSpawnConfig): Promise { + const inProcessConfig = config.inProcess; + if (!inProcessConfig) { + throw new Error( + `InProcessBackend requires inProcess config for agent ${config.agentId}`, + ); + } + + if (this.agents.has(config.agentId)) { + throw new Error(`Agent "${config.agentId}" already exists.`); + } + + const { promptConfig, modelConfig, runConfig, toolConfig } = + inProcessConfig.runtimeConfig; + + const eventEmitter = new AgentEventEmitter(); + + // Build a per-agent runtime context with isolated working directory, + // target directory, workspace context, tool registry, and (optionally) + // a dedicated ContentGenerator for per-agent auth isolation. + const agentContext = await createPerAgentConfig( + this.runtimeContext, + config.cwd, + inProcessConfig.runtimeConfig.modelConfig.model, + inProcessConfig.authOverrides, + ); + + this.agentRegistries.push(agentContext.getToolRegistry()); + + const core = new AgentCore( + inProcessConfig.agentName, + agentContext, + promptConfig, + modelConfig, + runConfig, + toolConfig, + eventEmitter, + ); + + const interactive = new AgentInteractive( + { + agentId: config.agentId, + agentName: inProcessConfig.agentName, + initialTask: inProcessConfig.initialTask, + maxTurnsPerMessage: runConfig.max_turns, + maxTimeMinutesPerMessage: runConfig.max_time_minutes, + }, + core, + ); + + this.agents.set(config.agentId, interactive); + this.agentOrder.push(config.agentId); + + // Set first agent as active + if (this.activeAgentId === null) { + this.activeAgentId = config.agentId; + } + + try { + const context = new ContextState(); + await interactive.start(context); + + // Watch for completion and fire exit callback + void interactive.waitForCompletion().then(() => { + const status = interactive.getStatus(); + const exitCode = + status === AgentStatus.COMPLETED + ? 0 + : status === AgentStatus.FAILED + ? 1 + : null; + this.exitCallback?.(config.agentId, exitCode, null); + }); + + debugLogger.info(`Spawned in-process agent: ${config.agentId}`); + } catch (error) { + debugLogger.error( + `Failed to start in-process agent "${config.agentId}":`, + error, + ); + this.exitCallback?.(config.agentId, 1, null); + } + } + + stopAgent(agentId: string): void { + const agent = this.agents.get(agentId); + if (agent) { + agent.abort(); + debugLogger.info(`Stopped agent: ${agentId}`); + } + } + + stopAll(): void { + for (const agent of this.agents.values()) { + agent.abort(); + } + debugLogger.info('Stopped all in-process agents'); + } + + async cleanup(): Promise { + this.cleanedUp = true; + + for (const agent of this.agents.values()) { + agent.abort(); + } + // Wait briefly for loops to settle + const promises = Array.from(this.agents.values()).map((a) => + a.waitForCompletion().catch(() => {}), + ); + await Promise.allSettled(promises); + + // Stop per-agent tool registries so tools like TaskTool can release + // listeners registered on shared managers (e.g. SubagentManager). + for (const registry of this.agentRegistries) { + await registry.stop().catch(() => {}); + } + this.agentRegistries.length = 0; + + this.agents.clear(); + this.agentOrder.length = 0; + this.activeAgentId = null; + debugLogger.info('InProcessBackend cleaned up'); + } + + setOnAgentExit(callback: AgentExitCallback): void { + this.exitCallback = callback; + } + + async waitForAll(timeoutMs?: number): Promise { + if (this.cleanedUp) return true; + + const promises = Array.from(this.agents.values()).map((a) => + a.waitForCompletion(), + ); + + if (timeoutMs === undefined) { + await Promise.allSettled(promises); + return true; + } + + let timerId: ReturnType; + const timeout = new Promise<'timeout'>((resolve) => { + timerId = setTimeout(() => resolve('timeout'), timeoutMs); + }); + + const result = await Promise.race([ + Promise.allSettled(promises).then(() => 'done' as const), + timeout, + ]); + + clearTimeout(timerId!); + return result === 'done'; + } + + // ─── Navigation ──────────────────────────────────────────── + + switchTo(agentId: string): void { + if (this.agents.has(agentId)) { + this.activeAgentId = agentId; + } + } + + switchToNext(): void { + this.activeAgentId = this.navigate(1); + } + + switchToPrevious(): void { + this.activeAgentId = this.navigate(-1); + } + + getActiveAgentId(): string | null { + return this.activeAgentId; + } + + // ─── Screen Capture (no-op for in-process) ───────────────── + + getActiveSnapshot(): AnsiOutput | null { + return null; + } + + getAgentSnapshot( + _agentId: string, + _scrollOffset?: number, + ): AnsiOutput | null { + return null; + } + + getAgentScrollbackLength(_agentId: string): number { + return 0; + } + + // ─── Input ───────────────────────────────────────────────── + + forwardInput(data: string): boolean { + if (!this.activeAgentId) return false; + return this.writeToAgent(this.activeAgentId, data); + } + + writeToAgent(agentId: string, data: string): boolean { + const agent = this.agents.get(agentId); + if (!agent) return false; + + agent.enqueueMessage(data); + return true; + } + + // ─── Resize (no-op) ─────────────────────────────────────── + + resizeAll(_cols: number, _rows: number): void { + // No terminals to resize in-process + } + + // ─── External Session ────────────────────────────────────── + + getAttachHint(): string | null { + return null; + } + + // ─── Extra: Direct Access ────────────────────────────────── + + /** + * Get an AgentInteractive instance by agent ID. + * Used by ArenaManager for direct event subscription. + */ + getAgent(agentId: string): AgentInteractive | undefined { + return this.agents.get(agentId); + } + + // ─── Private ─────────────────────────────────────────────── + + private navigate(direction: 1 | -1): string | null { + if (this.agentOrder.length === 0) return null; + if (!this.activeAgentId) return this.agentOrder[0] ?? null; + + const currentIndex = this.agentOrder.indexOf(this.activeAgentId); + if (currentIndex === -1) return this.agentOrder[0] ?? null; + + const nextIndex = + (currentIndex + direction + this.agentOrder.length) % + this.agentOrder.length; + return this.agentOrder[nextIndex] ?? null; + } +} + +/** + * Create a per-agent Config that delegates to the shared base Config but + * overrides key methods to provide per-agent isolation: + * + * - `getWorkingDir()` / `getTargetDir()` → agent's worktree cwd + * - `getWorkspaceContext()` → WorkspaceContext rooted at agent's cwd + * - `getFileService()` → FileDiscoveryService rooted at agent's cwd + * (so .qwenignore checks resolve against the agent's worktree) + * - `getToolRegistry()` → per-agent tool registry with core tools bound to + * the agent Config (so tools resolve paths against the agent's worktree) + * - `getContentGenerator()` / `getContentGeneratorConfig()` / `getAuthType()` + * → per-agent ContentGenerator when `authOverrides` is provided, enabling + * agents to target different model providers in the same Arena session + * + * Uses prototypal delegation so all other Config methods/properties resolve + * against the original instance transparently. + */ +async function createPerAgentConfig( + base: Config, + cwd: string, + modelId?: string, + authOverrides?: InProcessSpawnConfig['authOverrides'], +): Promise { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const override = Object.create(base) as any; + + override.getWorkingDir = () => cwd; + override.getTargetDir = () => cwd; + override.getProjectRoot = () => cwd; + + const agentWorkspace = new WorkspaceContext(cwd); + override.getWorkspaceContext = () => agentWorkspace; + + const agentFileService = new FileDiscoveryService(cwd); + override.getFileService = () => agentFileService; + + // Build a per-agent tool registry: core tools are constructed with + // the per-agent Config so they resolve paths against cwd. Discovered + // (MCP/command) tools are copied from the parent registry as-is. + const agentRegistry: ToolRegistry = await override.createToolRegistry( + undefined, + { skipDiscovery: true }, + ); + agentRegistry.copyDiscoveredToolsFrom(base.getToolRegistry()); + override.getToolRegistry = () => agentRegistry; + + // Build a per-agent ContentGenerator when auth overrides are provided. + // This enables Arena agents to use different providers (OpenAI, Anthropic, + // Gemini, etc.) than the parent process. + if (authOverrides?.authType) { + try { + const agentGeneratorConfig = buildAgentContentGeneratorConfig( + base, + modelId, + authOverrides, + ); + const agentGenerator = await createContentGenerator( + agentGeneratorConfig, + override as Config, + ); + override.getContentGenerator = (): ContentGenerator => agentGenerator; + override.getContentGeneratorConfig = (): ContentGeneratorConfig => + agentGeneratorConfig; + override.getAuthType = (): AuthType | undefined => + agentGeneratorConfig.authType; + override.getModel = (): string => agentGeneratorConfig.model; + + debugLogger.info( + `Created per-agent ContentGenerator: authType=${authOverrides.authType}, model=${agentGeneratorConfig.model}`, + ); + } catch (error) { + debugLogger.error( + 'Failed to create per-agent ContentGenerator, falling back to parent:', + error, + ); + } + } + + return override as Config; +} + +/** + * Build a ContentGeneratorConfig for a per-agent ContentGenerator. + * Inherits operational settings (timeout, retries, proxy, sampling, etc.) + * from the parent's config and overlays the agent-specific auth fields. + * + * For cross-provider agents the parent's API key / base URL are invalid, + * so we resolve credentials from the provider-specific environment + * variables (e.g. ANTHROPIC_API_KEY, ANTHROPIC_BASE_URL). This mirrors + * what a PTY subprocess does during its own initialization. + */ +function buildAgentContentGeneratorConfig( + base: Config, + modelId: string | undefined, + authOverrides: NonNullable, +): ContentGeneratorConfig { + const parentConfig = base.getContentGeneratorConfig(); + const sameProvider = authOverrides.authType === parentConfig.authType; + + const resolvedApiKey = resolveCredentialField( + authOverrides.apiKey, + sameProvider ? parentConfig.apiKey : undefined, + authOverrides.authType, + 'apiKey', + ); + + const resolvedBaseUrl = resolveCredentialField( + authOverrides.baseUrl, + sameProvider ? parentConfig.baseUrl : undefined, + authOverrides.authType, + 'baseUrl', + ); + + return { + ...parentConfig, + model: modelId ?? parentConfig.model, + authType: authOverrides.authType as AuthType, + apiKey: resolvedApiKey, + baseUrl: resolvedBaseUrl, + }; +} + +/** + * Resolve a credential field (apiKey or baseUrl) with the following + * priority: explicit override → same-provider parent value → env var. + */ +function resolveCredentialField( + explicitValue: string | undefined, + inheritedValue: string | undefined, + authType: string, + field: 'apiKey' | 'baseUrl', +): string | undefined { + if (explicitValue) return explicitValue; + if (inheritedValue) return inheritedValue; + + const envMapping = + AUTH_ENV_MAPPINGS[authType as keyof typeof AUTH_ENV_MAPPINGS]; + if (!envMapping) return undefined; + + for (const envKey of envMapping[field]) { + const value = process.env[envKey]; + if (value) return value; + } + return undefined; +} diff --git a/packages/core/src/agents/backends/detect.ts b/packages/core/src/agents/backends/detect.ts index 3c53c5ceb..c8c43c2c8 100644 --- a/packages/core/src/agents/backends/detect.ts +++ b/packages/core/src/agents/backends/detect.ts @@ -5,7 +5,9 @@ */ import { createDebugLogger } from '../../utils/debugLogger.js'; +import type { Config } from '../../config/config.js'; import { TmuxBackend } from './TmuxBackend.js'; +import { InProcessBackend } from './InProcessBackend.js'; import { type Backend, DISPLAY_MODE, type DisplayMode } from './types.js'; import { isTmuxAvailable } from './tmux-commands.js'; @@ -19,30 +21,29 @@ export interface DetectBackendResult { /** * Detect and create the appropriate Backend. * - * Design principle for current Arena flow: - * - Keep all display mode values in the API surface - * - Only tmux is runnable for now - * - in-process / iTerm2 preferences fail fast as "not implemented yet" - * * Detection priority: * 1. User explicit preference (--display=in-process|tmux|iterm2) * 2. Auto-detect: * - inside tmux: TmuxBackend * - other terminals: tmux external session mode when tmux is available + * - fallback to InProcessBackend + * + * @param preference - Optional display mode preference + * @param runtimeContext - Runtime config for in-process fallback */ export async function detectBackend( - preference?: DisplayMode, + preference: DisplayMode | undefined, + runtimeContext: Config, ): Promise { // 1. User explicit preference if (preference === DISPLAY_MODE.IN_PROCESS) { - throw new Error( - `Arena display mode "${DISPLAY_MODE.IN_PROCESS}" is not implemented yet. Please use "${DISPLAY_MODE.TMUX}".`, - ); + debugLogger.info('Using InProcessBackend (user preference)'); + return { backend: new InProcessBackend(runtimeContext) }; } if (preference === DISPLAY_MODE.ITERM2) { throw new Error( - `Arena display mode "${DISPLAY_MODE.ITERM2}" is not implemented yet. Please use "${DISPLAY_MODE.TMUX}".`, + `Arena display mode "${DISPLAY_MODE.ITERM2}" is not implemented yet. Please use "${DISPLAY_MODE.TMUX}" or "${DISPLAY_MODE.IN_PROCESS}".`, ); } @@ -65,10 +66,13 @@ export async function detectBackend( return { backend: new TmuxBackend() }; } - // No supported backend available. - const tmuxEnv = process.env['TMUX']; - const termProgram = process.env['TERM_PROGRAM']; - throw new Error( - `No supported Arena backend detected. $TMUX=${tmuxEnv ? `"${tmuxEnv}"` : '(unset)'}, $TERM_PROGRAM=${termProgram ? `"${termProgram}"` : '(unset)'}. Install tmux to use Arena split-pane mode.`, + // Fallback: use InProcessBackend + debugLogger.info( + 'No PTY backend available — falling back to InProcessBackend', ); + return { + backend: new InProcessBackend(runtimeContext), + warning: + 'tmux is not available. Using in-process mode (no split-pane terminal view).', + }; } diff --git a/packages/core/src/agents/backends/index.ts b/packages/core/src/agents/backends/index.ts index f85fe163e..6105fe45c 100644 --- a/packages/core/src/agents/backends/index.ts +++ b/packages/core/src/agents/backends/index.ts @@ -11,7 +11,9 @@ export type { AgentSpawnConfig, AgentExitCallback, TmuxBackendOptions, + InProcessSpawnConfig, } from './types.js'; export { TmuxBackend } from './TmuxBackend.js'; export { ITermBackend } from './ITermBackend.js'; +export { InProcessBackend } from './InProcessBackend.js'; export { detectBackend, type DetectBackendResult } from './detect.js'; diff --git a/packages/core/src/agents/backends/types.ts b/packages/core/src/agents/backends/types.ts index 577096639..0b706b08f 100644 --- a/packages/core/src/agents/backends/types.ts +++ b/packages/core/src/agents/backends/types.ts @@ -12,6 +12,12 @@ */ import type { AnsiOutput } from '../../utils/terminalSerializer.js'; +import type { + PromptConfig, + ModelConfig, + RunConfig, + ToolConfig, +} from '../runtime/agent-types.js'; /** * Canonical display mode values shared across core and CLI. @@ -52,6 +58,41 @@ export interface AgentSpawnConfig { backend?: { tmux?: TmuxBackendOptions; }; + + /** + * In-process spawn configuration (optional). + * When provided, InProcessBackend uses this to create an AgentInteractive + * instead of launching a PTY subprocess. + */ + inProcess?: InProcessSpawnConfig; +} + +/** + * Configuration for spawning an in-process agent (no PTY subprocess). + */ +export interface InProcessSpawnConfig { + /** Human-readable agent name for display. */ + agentName: string; + /** Optional initial task to start working on immediately. */ + initialTask?: string; + /** Runtime configuration for the AgentCore. */ + runtimeConfig: { + promptConfig: PromptConfig; + modelConfig: ModelConfig; + runConfig: RunConfig; + toolConfig?: ToolConfig; + }; + /** + * Per-agent auth/provider overrides. When present, a dedicated + * ContentGenerator is created for this agent instead of inheriting + * the parent process's. This enables Arena agents to target different + * model providers (OpenAI, Anthropic, Gemini, etc.) in the same session. + */ + authOverrides?: { + authType: string; + apiKey?: string; + baseUrl?: string; + }; } /** diff --git a/packages/core/src/agents/runtime/agent-core.ts b/packages/core/src/agents/runtime/agent-core.ts index 8af0f9247..4767c258d 100644 --- a/packages/core/src/agents/runtime/agent-core.ts +++ b/packages/core/src/agents/runtime/agent-core.ts @@ -43,17 +43,17 @@ import type { ModelConfig, RunConfig, ToolConfig, -} from '../../subagents/types.js'; -import { SubagentTerminateMode } from '../../subagents/types.js'; +} from './agent-types.js'; +import { AgentTerminateMode } from './agent-types.js'; import type { AgentRoundEvent, AgentToolCallEvent, AgentToolResultEvent, AgentUsageEvent, + AgentHooks, } from './agent-events.js'; import { type AgentEventEmitter, AgentEventType } from './agent-events.js'; import { AgentStatistics, type AgentStatsSummary } from './agent-statistics.js'; -import type { AgentHooks } from './agent-hooks.js'; import { TaskTool } from '../../tools/task.js'; import { DEFAULT_QWEN_MODEL } from '../../config/models.js'; import { type ContextState, templateString } from './agent-headless.js'; @@ -65,7 +65,7 @@ export interface ReasoningLoopResult { /** The final model text response (empty if terminated by abort/limits). */ text: string; /** Why the loop ended. null = normal text completion (no tool calls). */ - terminateMode: SubagentTerminateMode | null; + terminateMode: AgentTerminateMode | null; /** Number of model round-trips completed. */ turnsUsed: number; } @@ -324,18 +324,18 @@ export class AgentCore { let currentMessages = initialMessages; let turnCounter = 0; let finalText = ''; - let terminateMode: SubagentTerminateMode | null = null; + let terminateMode: AgentTerminateMode | null = null; while (true) { // Check termination conditions. if (options?.maxTurns && turnCounter >= options.maxTurns) { - terminateMode = SubagentTerminateMode.MAX_TURNS; + terminateMode = AgentTerminateMode.MAX_TURNS; break; } let durationMin = (Date.now() - startTime) / (1000 * 60); if (options?.maxTimeMinutes && durationMin >= options.maxTimeMinutes) { - terminateMode = SubagentTerminateMode.TIMEOUT; + terminateMode = AgentTerminateMode.TIMEOUT; break; } @@ -384,7 +384,7 @@ export class AgentCore { abortController.signal.removeEventListener('abort', onParentAbort); return { text: finalText, - terminateMode: SubagentTerminateMode.CANCELLED, + terminateMode: AgentTerminateMode.CANCELLED, turnsUsed: turnCounter, }; } @@ -427,7 +427,7 @@ export class AgentCore { durationMin = (Date.now() - startTime) / (1000 * 60); if (options?.maxTimeMinutes && durationMin >= options.maxTimeMinutes) { abortController.signal.removeEventListener('abort', onParentAbort); - terminateMode = SubagentTerminateMode.TIMEOUT; + terminateMode = AgentTerminateMode.TIMEOUT; break; } diff --git a/packages/core/src/agents/runtime/agent-events.ts b/packages/core/src/agents/runtime/agent-events.ts index 8f68dd1c3..e02d8b692 100644 --- a/packages/core/src/agents/runtime/agent-events.ts +++ b/packages/core/src/agents/runtime/agent-events.ts @@ -4,6 +4,15 @@ * SPDX-License-Identifier: Apache-2.0 */ +/** + * @fileoverview Agent event types, emitter, and lifecycle hooks. + * + * Defines the observation/notification contracts for the agent runtime: + * - Event types emitted during agent execution (streaming, tool calls, etc.) + * - AgentEventEmitter — typed wrapper around EventEmitter + * - Lifecycle hooks (pre/post tool use, stop) for synchronous callbacks + */ + import { EventEmitter } from 'events'; import type { ToolCallConfirmationDetails, @@ -11,6 +20,9 @@ import type { ToolResultDisplay, } from '../../tools/tools.js'; import type { Part, GenerateContentResponseUsageMetadata } from '@google/genai'; +import type { AgentStatus } from './agent-types.js'; + +// ─── Event Types ──────────────────────────────────────────── export type AgentEvent = | 'start' @@ -22,7 +34,8 @@ export type AgentEvent = | 'tool_waiting_approval' | 'usage_metadata' | 'finish' - | 'error'; + | 'error' + | 'status_change'; export enum AgentEventType { START = 'start', @@ -35,8 +48,11 @@ export enum AgentEventType { USAGE_METADATA = 'usage_metadata', FINISH = 'finish', ERROR = 'error', + STATUS_CHANGE = 'status_change', } +// ─── Event Payloads ───────────────────────────────────────── + export interface AgentStartEvent { subagentId: string; name: string; @@ -128,18 +144,85 @@ export interface AgentErrorEvent { timestamp: number; } +export interface AgentStatusChangeEvent { + agentId: string; + previousStatus: AgentStatus; + newStatus: AgentStatus; + timestamp: number; +} + +// ─── Event Map ────────────────────────────────────────────── + +/** + * Maps each event type to its payload type for type-safe emit/on. + */ +export interface AgentEventMap { + [AgentEventType.START]: AgentStartEvent; + [AgentEventType.ROUND_START]: AgentRoundEvent; + [AgentEventType.ROUND_END]: AgentRoundEvent; + [AgentEventType.STREAM_TEXT]: AgentStreamTextEvent; + [AgentEventType.TOOL_CALL]: AgentToolCallEvent; + [AgentEventType.TOOL_RESULT]: AgentToolResultEvent; + [AgentEventType.TOOL_WAITING_APPROVAL]: AgentApprovalRequestEvent; + [AgentEventType.USAGE_METADATA]: AgentUsageEvent; + [AgentEventType.FINISH]: AgentFinishEvent; + [AgentEventType.ERROR]: AgentErrorEvent; + [AgentEventType.STATUS_CHANGE]: AgentStatusChangeEvent; +} + +// ─── Event Emitter ────────────────────────────────────────── + export class AgentEventEmitter { private ee = new EventEmitter(); - on(event: AgentEvent, listener: (...args: unknown[]) => void) { - this.ee.on(event, listener); + on( + event: E, + listener: (payload: AgentEventMap[E]) => void, + ): void { + this.ee.on(event, listener as (...args: unknown[]) => void); } - off(event: AgentEvent, listener: (...args: unknown[]) => void) { - this.ee.off(event, listener); + off( + event: E, + listener: (payload: AgentEventMap[E]) => void, + ): void { + this.ee.off(event, listener as (...args: unknown[]) => void); } - emit(event: AgentEvent, payload: unknown) { + emit( + event: E, + payload: AgentEventMap[E], + ): void { this.ee.emit(event, payload); } } + +// ─── Lifecycle Hooks ──────────────────────────────────────── + +export interface PreToolUsePayload { + subagentId: string; + name: string; // subagent name + toolName: string; + args: Record; + timestamp: number; +} + +export interface PostToolUsePayload extends PreToolUsePayload { + success: boolean; + durationMs: number; + errorMessage?: string; +} + +export interface AgentStopPayload { + subagentId: string; + name: string; // subagent name + terminateReason: string; + summary: Record; + timestamp: number; +} + +export interface AgentHooks { + preToolUse?(payload: PreToolUsePayload): Promise | void; + postToolUse?(payload: PostToolUsePayload): Promise | void; + onStop?(payload: AgentStopPayload): Promise | void; +} diff --git a/packages/core/src/agents/runtime/agent-headless.test.ts b/packages/core/src/agents/runtime/agent-headless.test.ts index 41b31cddc..82bdc2d70 100644 --- a/packages/core/src/agents/runtime/agent-headless.test.ts +++ b/packages/core/src/agents/runtime/agent-headless.test.ts @@ -46,8 +46,8 @@ import type { PromptConfig, RunConfig, ToolConfig, -} from '../../subagents/types.js'; -import { SubagentTerminateMode } from '../../subagents/types.js'; +} from './agent-types.js'; +import { AgentTerminateMode } from './agent-types.js'; vi.mock('../../core/geminiChat.js'); vi.mock('../../core/contentGenerator.js', async (importOriginal) => { @@ -517,7 +517,7 @@ describe('subagent.ts', () => { await expect(scope.execute(context)).rejects.toThrow( 'Missing context values for the following keys: missing', ); - expect(scope.getTerminateMode()).toBe(SubagentTerminateMode.ERROR); + expect(scope.getTerminateMode()).toBe(AgentTerminateMode.ERROR); }); it('should validate that systemPrompt and initialMessages are mutually exclusive', async () => { @@ -539,7 +539,7 @@ describe('subagent.ts', () => { await expect(agent.execute(context)).rejects.toThrow( 'PromptConfig cannot have both `systemPrompt` and `initialMessages` defined.', ); - expect(agent.getTerminateMode()).toBe(SubagentTerminateMode.ERROR); + expect(agent.getTerminateMode()).toBe(AgentTerminateMode.ERROR); }); }); @@ -562,7 +562,7 @@ describe('subagent.ts', () => { await scope.execute(new ContextState()); - expect(scope.getTerminateMode()).toBe(SubagentTerminateMode.GOAL); + expect(scope.getTerminateMode()).toBe(AgentTerminateMode.GOAL); expect(mockSendMessageStream).toHaveBeenCalledTimes(1); // Check the initial message expect(mockSendMessageStream.mock.calls[0][1].message).toEqual([ @@ -586,7 +586,7 @@ describe('subagent.ts', () => { await scope.execute(new ContextState()); - expect(scope.getTerminateMode()).toBe(SubagentTerminateMode.GOAL); + expect(scope.getTerminateMode()).toBe(AgentTerminateMode.GOAL); expect(mockSendMessageStream).toHaveBeenCalledTimes(1); }); @@ -667,7 +667,7 @@ describe('subagent.ts', () => { 'file1.txt\nfile2.ts', ); - expect(scope.getTerminateMode()).toBe(SubagentTerminateMode.GOAL); + expect(scope.getTerminateMode()).toBe(AgentTerminateMode.GOAL); }); }); @@ -714,7 +714,7 @@ describe('subagent.ts', () => { await scope.execute(new ContextState()); expect(mockSendMessageStream).toHaveBeenCalledTimes(2); - expect(scope.getTerminateMode()).toBe(SubagentTerminateMode.MAX_TURNS); + expect(scope.getTerminateMode()).toBe(AgentTerminateMode.MAX_TURNS); }); it.skip('should terminate with TIMEOUT if the time limit is reached during an LLM call', async () => { @@ -757,7 +757,7 @@ describe('subagent.ts', () => { await runPromise; - expect(scope.getTerminateMode()).toBe(SubagentTerminateMode.TIMEOUT); + expect(scope.getTerminateMode()).toBe(AgentTerminateMode.TIMEOUT); expect(mockSendMessageStream).toHaveBeenCalledTimes(1); vi.useRealTimers(); @@ -778,7 +778,7 @@ describe('subagent.ts', () => { await expect(scope.execute(new ContextState())).rejects.toThrow( 'API Failure', ); - expect(scope.getTerminateMode()).toBe(SubagentTerminateMode.ERROR); + expect(scope.getTerminateMode()).toBe(AgentTerminateMode.ERROR); }); }); @@ -865,7 +865,7 @@ describe('subagent.ts', () => { await scope.execute(new ContextState()); - expect(scope.getTerminateMode()).toBe(SubagentTerminateMode.GOAL); + expect(scope.getTerminateMode()).toBe(AgentTerminateMode.GOAL); expect(scope.getFinalText()).toBe('The final answer.'); }); @@ -929,7 +929,7 @@ describe('subagent.ts', () => { await scope.execute(new ContextState()); - expect(scope.getTerminateMode()).toBe(SubagentTerminateMode.GOAL); + expect(scope.getTerminateMode()).toBe(AgentTerminateMode.GOAL); expect(scope.getFinalText()).toBe('Actual output.'); // Should have been called twice: first with thought-only, then nudged expect(mockSendMessageStream).toHaveBeenCalledTimes(2); diff --git a/packages/core/src/agents/runtime/agent-headless.ts b/packages/core/src/agents/runtime/agent-headless.ts index ce97d143b..ac02f80df 100644 --- a/packages/core/src/agents/runtime/agent-headless.ts +++ b/packages/core/src/agents/runtime/agent-headless.ts @@ -16,22 +16,22 @@ import type { Config } from '../../config/config.js'; import { createDebugLogger } from '../../utils/debugLogger.js'; -import type { AgentEventEmitter } from './agent-events.js'; -import { AgentEventType } from './agent-events.js'; import type { + AgentEventEmitter, AgentStartEvent, AgentErrorEvent, AgentFinishEvent, + AgentHooks, } from './agent-events.js'; +import { AgentEventType } from './agent-events.js'; import type { AgentStatsSummary } from './agent-statistics.js'; -import type { AgentHooks } from './agent-hooks.js'; import type { PromptConfig, ModelConfig, RunConfig, ToolConfig, -} from '../../subagents/types.js'; -import { SubagentTerminateMode } from '../../subagents/types.js'; +} from './agent-types.js'; +import { AgentTerminateMode } from './agent-types.js'; import { logSubagentExecution } from '../../telemetry/loggers.js'; import { SubagentExecutionEvent } from '../../telemetry/types.js'; import { AgentCore } from './agent-core.js'; @@ -135,7 +135,7 @@ export function templateString( export class AgentHeadless { private readonly core: AgentCore; private finalText: string = ''; - private terminateMode: SubagentTerminateMode = SubagentTerminateMode.ERROR; + private terminateMode: AgentTerminateMode = AgentTerminateMode.ERROR; private constructor(core: AgentCore) { this.core = core; @@ -196,7 +196,7 @@ export class AgentHeadless { const chat = await this.core.createChat(context); if (!chat) { - this.terminateMode = SubagentTerminateMode.ERROR; + this.terminateMode = AgentTerminateMode.ERROR; return; } @@ -258,10 +258,10 @@ export class AgentHeadless { ); this.finalText = result.text; - this.terminateMode = result.terminateMode ?? SubagentTerminateMode.GOAL; + this.terminateMode = result.terminateMode ?? AgentTerminateMode.GOAL; } catch (error) { debugLogger.error('Error during subagent execution:', error); - this.terminateMode = SubagentTerminateMode.ERROR; + this.terminateMode = AgentTerminateMode.ERROR; this.core.eventEmitter?.emit(AgentEventType.ERROR, { subagentId: this.core.subagentId, error: error instanceof Error ? error.message : String(error), @@ -291,9 +291,7 @@ export class AgentHeadless { const completionEvent = new SubagentExecutionEvent( this.core.name, - this.terminateMode === SubagentTerminateMode.GOAL - ? 'completed' - : 'failed', + this.terminateMode === AgentTerminateMode.GOAL ? 'completed' : 'failed', { terminate_reason: this.terminateMode, result: this.finalText, @@ -348,7 +346,7 @@ export class AgentHeadless { return this.finalText; } - getTerminateMode(): SubagentTerminateMode { + getTerminateMode(): AgentTerminateMode { return this.terminateMode; } diff --git a/packages/core/src/agents/runtime/agent-hooks.ts b/packages/core/src/agents/runtime/agent-hooks.ts deleted file mode 100644 index 76b65f95e..000000000 --- a/packages/core/src/agents/runtime/agent-hooks.ts +++ /dev/null @@ -1,33 +0,0 @@ -/** - * @license - * Copyright 2025 Qwen - * SPDX-License-Identifier: Apache-2.0 - */ - -export interface PreToolUsePayload { - subagentId: string; - name: string; // subagent name - toolName: string; - args: Record; - timestamp: number; -} - -export interface PostToolUsePayload extends PreToolUsePayload { - success: boolean; - durationMs: number; - errorMessage?: string; -} - -export interface AgentStopPayload { - subagentId: string; - name: string; // subagent name - terminateReason: string; - summary: Record; - timestamp: number; -} - -export interface AgentHooks { - preToolUse?(payload: PreToolUsePayload): Promise | void; - postToolUse?(payload: PostToolUsePayload): Promise | void; - onStop?(payload: AgentStopPayload): Promise | void; -} diff --git a/packages/core/src/agents/runtime/agent-interactive.test.ts b/packages/core/src/agents/runtime/agent-interactive.test.ts new file mode 100644 index 000000000..633043ba7 --- /dev/null +++ b/packages/core/src/agents/runtime/agent-interactive.test.ts @@ -0,0 +1,625 @@ +/** + * @license + * Copyright 2025 Qwen + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { AgentInteractive } from './agent-interactive.js'; +import type { AgentCore } from './agent-core.js'; +import { AgentEventEmitter, AgentEventType } from './agent-events.js'; +import { ContextState } from './agent-headless.js'; +import type { AgentInteractiveConfig } from './agent-types.js'; +import { AgentStatus } from './agent-types.js'; + +function createMockChat() { + return { + sendMessageStream: vi.fn(), + }; +} + +function createMockCore( + overrides: { + chatValue?: unknown; + nullChat?: boolean; + loopResult?: { text: string; terminateMode: null; turnsUsed: number }; + } = {}, +) { + const emitter = new AgentEventEmitter(); + const chatReturnValue = overrides.nullChat + ? undefined + : overrides.chatValue !== undefined + ? overrides.chatValue + : createMockChat(); + const core = { + subagentId: 'test-agent-abc123', + name: 'test-agent', + eventEmitter: emitter, + stats: { + start: vi.fn(), + getSummary: vi.fn().mockReturnValue({ + rounds: 1, + totalDurationMs: 100, + totalToolCalls: 0, + successfulToolCalls: 0, + failedToolCalls: 0, + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + }), + setRounds: vi.fn(), + recordToolCall: vi.fn(), + recordTokens: vi.fn(), + }, + createChat: vi.fn().mockResolvedValue(chatReturnValue), + prepareTools: vi.fn().mockReturnValue([]), + runReasoningLoop: vi.fn().mockResolvedValue( + overrides.loopResult ?? { + text: 'Done', + terminateMode: null, + turnsUsed: 1, + }, + ), + getEventEmitter: () => emitter, + getExecutionSummary: vi.fn().mockReturnValue({ + rounds: 1, + totalDurationMs: 100, + totalToolCalls: 0, + successfulToolCalls: 0, + failedToolCalls: 0, + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + }), + } as unknown as AgentCore; + + return { core, emitter }; +} + +function createConfig( + overrides: Partial = {}, +): AgentInteractiveConfig { + return { + agentId: 'agent-1', + agentName: 'Test Agent', + ...overrides, + }; +} + +describe('AgentInteractive', () => { + let context: ContextState; + + beforeEach(() => { + context = new ContextState(); + }); + + // ─── Lifecycle ────────────────────────────────────────────── + + it('should initialize and complete cleanly without initialTask', async () => { + const { core } = createMockCore(); + const config = createConfig(); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + // No initialTask → agent is waiting on queue, status is still initializing. + // Shutdown drains queue, loop exits normally → completed. + await agent.shutdown(); + expect(agent.getStatus()).toBe('completed'); + }); + + it('should process initialTask immediately on start', async () => { + const { core } = createMockCore(); + const config = createConfig({ initialTask: 'Do something' }); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('completed'); + }); + + expect(core.runReasoningLoop).toHaveBeenCalledOnce(); + expect(agent.getMessages().length).toBeGreaterThan(0); + expect(agent.getMessages()[0]?.role).toBe('user'); + expect(agent.getMessages()[0]?.content).toBe('Do something'); + + await agent.shutdown(); + }); + + it('should process enqueued messages', async () => { + const { core } = createMockCore(); + const config = createConfig(); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + + agent.enqueueMessage('Hello'); + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('completed'); + }); + + expect(core.runReasoningLoop).toHaveBeenCalledOnce(); + + await agent.shutdown(); + }); + + it('should set status to failed when chat creation fails', async () => { + const { core } = createMockCore({ nullChat: true }); + const config = createConfig(); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + + expect(agent.getStatus()).toBe('failed'); + expect(agent.getError()).toBe('Failed to create chat session'); + }); + + // ─── Error Recovery ──────────────────────────────────────── + + it('should survive round errors and recover', async () => { + const { core } = createMockCore(); + + let callCount = 0; + (core.runReasoningLoop as ReturnType).mockImplementation( + () => { + callCount++; + if (callCount === 1) { + return Promise.reject(new Error('Model error')); + } + return Promise.resolve({ + text: 'Recovered', + terminateMode: null, + turnsUsed: 1, + }); + }, + ); + + const config = createConfig(); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + + agent.enqueueMessage('cause error'); + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('failed'); + expect(callCount).toBe(1); + }); + + // Error recorded as assistant message with error metadata + const messages = agent.getMessages(); + const errorMsg = messages.find( + (m) => + m.role === 'assistant' && + m.content.includes('Error: Model error') && + m.metadata?.['error'] === true, + ); + expect(errorMsg).toBeDefined(); + + // Second message works fine + agent.enqueueMessage('recover'); + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('completed'); + expect(callCount).toBe(2); + }); + + await agent.shutdown(); + }); + + // ─── Cancellation ────────────────────────────────────────── + + it('should cancel current round without killing the agent', async () => { + const { core } = createMockCore(); + let resolveLoop: () => void; + (core.runReasoningLoop as ReturnType).mockImplementation( + () => + new Promise<{ text: string; terminateMode: string; turnsUsed: number }>( + (resolve) => { + resolveLoop = () => + resolve({ text: '', terminateMode: 'cancelled', turnsUsed: 0 }); + }, + ), + ); + + const config = createConfig(); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + + agent.enqueueMessage('long task'); + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('running'); + }); + + agent.cancelCurrentRound(); + resolveLoop!(); + + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('failed'); + }); + + await agent.shutdown(); + }); + + it('should abort immediately', async () => { + const { core } = createMockCore(); + (core.runReasoningLoop as ReturnType).mockImplementation( + () => + new Promise((resolve) => { + setTimeout( + () => + resolve({ + text: '', + terminateMode: 'cancelled', + turnsUsed: 0, + }), + 50, + ); + }), + ); + + const config = createConfig({ initialTask: 'long task' }); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + agent.abort(); + + await agent.waitForCompletion(); + expect(agent.getStatus()).toBe('cancelled'); + }); + + // ─── Accessors ───────────────────────────────────────────── + + it('should provide stats via getStats()', async () => { + const { core } = createMockCore(); + const config = createConfig(); + const agent = new AgentInteractive(config, core); + + const stats = agent.getStats(); + expect(stats).toBeDefined(); + expect(stats.rounds).toBe(1); + }); + + it('should provide core via getCore()', () => { + const { core } = createMockCore(); + const config = createConfig(); + const agent = new AgentInteractive(config, core); + + expect(agent.getCore()).toBe(core); + }); + + // ─── Stream Buffer & Message Recording ───────────────────── + + it('should record assistant text from stream events (not result.text)', async () => { + const { core, emitter } = createMockCore(); + + (core.runReasoningLoop as ReturnType).mockImplementation( + () => { + emitter.emit(AgentEventType.STREAM_TEXT, { + subagentId: 'test', + round: 1, + text: 'Hello from stream', + timestamp: Date.now(), + }); + return Promise.resolve({ + text: 'Hello from stream', + terminateMode: null, + turnsUsed: 1, + }); + }, + ); + + const config = createConfig({ initialTask: 'test' }); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('completed'); + }); + + const assistantMsgs = agent + .getMessages() + .filter((m) => m.role === 'assistant' && !m.thought); + // Exactly one — from stream flush, not duplicated by result.text + expect(assistantMsgs).toHaveLength(1); + expect(assistantMsgs[0]?.content).toBe('Hello from stream'); + + await agent.shutdown(); + }); + + it('should not carry stream buffer across messages', async () => { + const { core, emitter } = createMockCore(); + + let runCount = 0; + (core.runReasoningLoop as ReturnType).mockImplementation( + () => { + runCount++; + emitter.emit(AgentEventType.STREAM_TEXT, { + subagentId: 'test', + round: 1, + text: `response-${runCount}`, + timestamp: Date.now(), + }); + return Promise.resolve({ + text: `response-${runCount}`, + terminateMode: null, + turnsUsed: 1, + }); + }, + ); + + const config = createConfig({ initialTask: 'first message' }); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('completed'); + }); + + agent.enqueueMessage('second message'); + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('completed'); + expect(runCount).toBe(2); + }); + + // No message containing both responses (no cross-contamination) + const messages = agent.getMessages(); + const assistantMessages = messages.filter( + (m) => m.role === 'assistant' && !m.thought, + ); + const corrupted = assistantMessages.find( + (m) => + m.content.includes('response-1') && m.content.includes('response-2'), + ); + expect(corrupted).toBeUndefined(); + + await agent.shutdown(); + }); + + it('should capture thinking text as assistant messages with thought=true', async () => { + const { core, emitter } = createMockCore(); + + (core.runReasoningLoop as ReturnType).mockImplementation( + () => { + emitter.emit(AgentEventType.STREAM_TEXT, { + subagentId: 'test', + round: 1, + text: 'Let me think...', + thought: true, + timestamp: Date.now(), + }); + emitter.emit(AgentEventType.STREAM_TEXT, { + subagentId: 'test', + round: 1, + text: 'Here is the answer', + thought: false, + timestamp: Date.now(), + }); + return Promise.resolve({ + text: 'Here is the answer', + terminateMode: null, + turnsUsed: 1, + }); + }, + ); + + const config = createConfig({ initialTask: 'think about this' }); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('completed'); + }); + + const messages = agent.getMessages(); + const thoughtMsg = messages.find( + (m) => m.role === 'assistant' && m.thought === true, + ); + const textMsg = messages.find((m) => m.role === 'assistant' && !m.thought); + + expect(thoughtMsg).toBeDefined(); + expect(thoughtMsg?.content).toBe('Let me think...'); + expect(textMsg).toBeDefined(); + expect(textMsg?.content).toBe('Here is the answer'); + + await agent.shutdown(); + }); + + it('should record tool_call and tool_result with correct roles', async () => { + const { core, emitter } = createMockCore(); + + (core.runReasoningLoop as ReturnType).mockImplementation( + () => { + emitter.emit(AgentEventType.STREAM_TEXT, { + subagentId: 'test', + round: 1, + text: 'I will read the file', + timestamp: Date.now(), + }); + emitter.emit(AgentEventType.TOOL_CALL, { + subagentId: 'test', + round: 1, + callId: 'call-1', + name: 'read_file', + args: { path: 'test.ts' }, + description: 'Read test.ts', + timestamp: Date.now(), + }); + emitter.emit(AgentEventType.TOOL_RESULT, { + subagentId: 'test', + round: 1, + callId: 'call-1', + name: 'read_file', + success: true, + timestamp: Date.now(), + }); + emitter.emit(AgentEventType.ROUND_END, { + subagentId: 'test', + round: 1, + promptId: 'p1', + timestamp: Date.now(), + }); + return Promise.resolve({ + text: '', + terminateMode: null, + turnsUsed: 1, + }); + }, + ); + + const config = createConfig({ initialTask: 'read a file' }); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('completed'); + }); + + const messages = agent.getMessages(); + const toolCall = messages.find((m) => m.role === 'tool_call'); + const toolResult = messages.find((m) => m.role === 'tool_result'); + + expect(toolCall).toBeDefined(); + expect(toolCall?.metadata?.['toolName']).toBe('read_file'); + expect(toolCall?.metadata?.['callId']).toBe('call-1'); + + expect(toolResult).toBeDefined(); + expect(toolResult?.metadata?.['success']).toBe(true); + + await agent.shutdown(); + }); + + it('should flush text before tool_call to preserve temporal ordering', async () => { + const { core, emitter } = createMockCore(); + + (core.runReasoningLoop as ReturnType).mockImplementation( + () => { + // Text arrives before tool call in the stream + emitter.emit(AgentEventType.STREAM_TEXT, { + subagentId: 'test', + round: 1, + text: 'Let me check', + timestamp: Date.now(), + }); + emitter.emit(AgentEventType.TOOL_CALL, { + subagentId: 'test', + round: 1, + callId: 'call-1', + name: 'read_file', + args: {}, + description: '', + timestamp: Date.now(), + }); + emitter.emit(AgentEventType.TOOL_RESULT, { + subagentId: 'test', + round: 1, + callId: 'call-1', + name: 'read_file', + success: true, + timestamp: Date.now(), + }); + emitter.emit(AgentEventType.ROUND_END, { + subagentId: 'test', + round: 1, + promptId: 'p1', + timestamp: Date.now(), + }); + return Promise.resolve({ + text: '', + terminateMode: null, + turnsUsed: 1, + }); + }, + ); + + const config = createConfig({ initialTask: 'task' }); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('completed'); + }); + + const messages = agent.getMessages(); + // Filter to just the non-user messages for ordering check + const nonUser = messages.filter((m) => m.role !== 'user'); + + // Text should come before tool_call + const textIdx = nonUser.findIndex( + (m) => m.role === 'assistant' && m.content === 'Let me check', + ); + const toolIdx = nonUser.findIndex((m) => m.role === 'tool_call'); + expect(textIdx).toBeLessThan(toolIdx); + + await agent.shutdown(); + }); + + it('should return in-progress stream state during streaming', async () => { + const { core, emitter } = createMockCore(); + + let capturedInProgress: ReturnType< + typeof AgentInteractive.prototype.getInProgressStream + > = null; + + (core.runReasoningLoop as ReturnType).mockImplementation( + () => { + emitter.emit(AgentEventType.STREAM_TEXT, { + subagentId: 'test', + round: 1, + text: 'thinking...', + thought: true, + timestamp: Date.now(), + }); + emitter.emit(AgentEventType.STREAM_TEXT, { + subagentId: 'test', + round: 1, + text: 'visible text', + timestamp: Date.now(), + }); + // Capture in-progress state before the loop returns + capturedInProgress = agent.getInProgressStream(); + return Promise.resolve({ + text: 'visible text', + terminateMode: null, + turnsUsed: 1, + }); + }, + ); + + const config = createConfig({ initialTask: 'test' }); + const agent = new AgentInteractive(config, core); + + await agent.start(context); + await vi.waitFor(() => { + expect(agent.getStatus()).toBe('completed'); + }); + + // During streaming, in-progress state was available + expect(capturedInProgress).toEqual({ + text: 'visible text', + thinking: 'thinking...', + round: 1, + }); + + // After flush, in-progress state is null + expect(agent.getInProgressStream()).toBeNull(); + + await agent.shutdown(); + }); + + // ─── Events ──────────────────────────────────────────────── + + it('should emit status_change events', async () => { + const { core, emitter } = createMockCore(); + const config = createConfig(); + const agent = new AgentInteractive(config, core); + + const statuses: AgentStatus[] = []; + emitter.on(AgentEventType.STATUS_CHANGE, (payload) => { + statuses.push(payload.newStatus); + }); + + await agent.start(context); + await agent.shutdown(); + + expect(statuses).toContain(AgentStatus.COMPLETED); + }); +}); diff --git a/packages/core/src/agents/runtime/agent-interactive.ts b/packages/core/src/agents/runtime/agent-interactive.ts new file mode 100644 index 000000000..66fa4faa5 --- /dev/null +++ b/packages/core/src/agents/runtime/agent-interactive.ts @@ -0,0 +1,425 @@ +/** + * @license + * Copyright 2025 Qwen + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * @fileoverview AgentInteractive — persistent interactive agent. + * + * Composes AgentCore with on-demand message processing to provide an agent + * that processes user inputs sequentially and settles between batches. + * Used by InProcessBackend for Arena's in-process mode. + * + * AgentInteractive is the **sole consumer** of AgentCore events. It builds + * conversation state (messages + in-progress stream) that the UI reads. + * The UI never directly subscribes to AgentCore events for data — it reads + * from AgentInteractive and uses notifications to know when to re-render. + * + * Lifecycle: start() → (running ↔ completed/failed)* → shutdown()/abort() + */ + +import { createDebugLogger } from '../../utils/debugLogger.js'; +import { type AgentEventEmitter, AgentEventType } from './agent-events.js'; +import type { + AgentStreamTextEvent, + AgentToolCallEvent, + AgentToolResultEvent, +} from './agent-events.js'; +import type { AgentStatsSummary } from './agent-statistics.js'; +import type { AgentCore } from './agent-core.js'; +import type { ContextState } from './agent-headless.js'; +import type { GeminiChat } from '../../core/geminiChat.js'; +import type { FunctionDeclaration } from '@google/genai'; +import { AsyncMessageQueue } from '../../utils/asyncMessageQueue.js'; +import { + AgentTerminateMode, + AgentStatus, + isTerminalStatus, + type AgentInteractiveConfig, + type AgentMessage, + type InProgressStreamState, +} from './agent-types.js'; + +const debugLogger = createDebugLogger('AGENT_INTERACTIVE'); + +/** + * AgentInteractive — persistent interactive agent that processes + * messages on demand. + * + * Three-level cancellation: + * - `cancelCurrentRound()` — abort the current reasoning loop only + * - `shutdown()` — graceful: stop accepting messages, wait for cycle + * - `abort()` — immediate: master abort, set cancelled + */ +export class AgentInteractive { + readonly config: AgentInteractiveConfig; + private readonly core: AgentCore; + private readonly queue = new AsyncMessageQueue(); + private readonly messages: AgentMessage[] = []; + + private status: AgentStatus = AgentStatus.INITIALIZING; + private error: string | undefined; + private lastRoundError: string | undefined; + private executionPromise: Promise | undefined; + private masterAbortController = new AbortController(); + private roundAbortController: AbortController | undefined; + private chat: GeminiChat | undefined; + private toolsList: FunctionDeclaration[] = []; + private processing = false; + + // Stream accumulator — separate buffers for thought and non-thought text. + // Flushed to messages on ROUND_END (intermediate rounds), before TOOL_CALL + // events (to preserve temporal ordering), and after runReasoningLoop returns + // (final round, since ROUND_END doesn't fire for it). + private thoughtBuffer = ''; + private textBuffer = ''; + private streamRound = -1; + + constructor(config: AgentInteractiveConfig, core: AgentCore) { + this.config = config; + this.core = core; + this.setupEventListeners(); + } + + // ─── Lifecycle ────────────────────────────────────────────── + + /** + * Start the agent. Initializes the chat session, then kicks off + * processing if an initialTask is configured. + */ + async start(context: ContextState): Promise { + this.setStatus(AgentStatus.INITIALIZING); + + this.chat = await this.core.createChat(context, { interactive: true }); + if (!this.chat) { + this.error = 'Failed to create chat session'; + this.setStatus(AgentStatus.FAILED); + return; + } + + this.toolsList = this.core.prepareTools(); + this.core.stats.start(Date.now()); + + if (this.config.initialTask) { + this.queue.enqueue(this.config.initialTask); + this.executionPromise = this.runLoop(); + } + } + + /** + * Run loop: process all pending messages, then settle status. + * Exits when the queue is empty or the agent is aborted. + */ + private async runLoop(): Promise { + this.processing = true; + try { + let message = this.queue.dequeue(); + while (message !== null && !this.masterAbortController.signal.aborted) { + this.addMessage('user', message); + await this.runOneRound(message); + message = this.queue.dequeue(); + } + + if (this.masterAbortController.signal.aborted) { + this.setStatus(AgentStatus.CANCELLED); + } else { + this.settleRoundStatus(); + } + } catch (err) { + this.error = err instanceof Error ? err.message : String(err); + this.setStatus(AgentStatus.FAILED); + debugLogger.error('AgentInteractive processing failed:', err); + } finally { + this.processing = false; + } + } + + /** + * Run a single reasoning round for one message. + * Creates a per-round AbortController so cancellation is scoped. + */ + private async runOneRound(message: string): Promise { + if (!this.chat) return; + + this.setStatus(AgentStatus.RUNNING); + this.lastRoundError = undefined; + this.roundAbortController = new AbortController(); + + // Propagate master abort to round + const onMasterAbort = () => this.roundAbortController?.abort(); + this.masterAbortController.signal.addEventListener('abort', onMasterAbort); + if (this.masterAbortController.signal.aborted) { + this.roundAbortController.abort(); + } + + try { + const initialMessages = [ + { role: 'user' as const, parts: [{ text: message }] }, + ]; + + const result = await this.core.runReasoningLoop( + this.chat, + initialMessages, + this.toolsList, + this.roundAbortController, + { + maxTurns: this.config.maxTurnsPerMessage, + maxTimeMinutes: this.config.maxTimeMinutesPerMessage, + }, + ); + + // Finalize any unflushed stream content from the last round. + // ROUND_END doesn't fire for the final text-producing round + // (AgentCore breaks before emitting it), so we flush here. + this.flushStreamBuffers(); + + // Surface non-normal termination so Arena (and other consumers) + // can distinguish limit-triggered stops from successful completions. + if ( + result.terminateMode && + result.terminateMode !== AgentTerminateMode.GOAL + ) { + this.lastRoundError = `Terminated: ${result.terminateMode}`; + } + } catch (err) { + // Agent survives round errors — log and settle status in runLoop. + // Flush any partial stream content accumulated before the error. + this.flushStreamBuffers(); + const errorMessage = err instanceof Error ? err.message : String(err); + this.lastRoundError = errorMessage; + debugLogger.error('AgentInteractive round error:', err); + this.addMessage('assistant', `Error: ${errorMessage}`, { + metadata: { error: true }, + }); + } finally { + this.masterAbortController.signal.removeEventListener( + 'abort', + onMasterAbort, + ); + this.roundAbortController = undefined; + } + } + + // ─── Cancellation ────────────────────────────────────────── + + /** + * Cancel only the current reasoning round. + */ + cancelCurrentRound(): void { + this.roundAbortController?.abort(); + } + + /** + * Graceful shutdown: stop accepting messages and wait for current + * processing to finish. + */ + async shutdown(): Promise { + this.queue.drain(); + if (this.executionPromise) { + await this.executionPromise; + } + // If no processing cycle ever ran (no initialTask, no messages), + // ensure the agent reaches a terminal status. + if (!isTerminalStatus(this.status)) { + this.setStatus(AgentStatus.COMPLETED); + } + } + + /** + * Immediate abort: cancel everything and set status to cancelled. + */ + abort(): void { + this.masterAbortController.abort(); + this.queue.drain(); + } + + // ─── Message Queue ───────────────────────────────────────── + + /** + * Enqueue a message for the agent to process. + */ + enqueueMessage(message: string): void { + this.queue.enqueue(message); + if (!this.processing) { + this.executionPromise = this.runLoop(); + } + } + + // ─── State Accessors ─────────────────────────────────────── + + getMessages(): readonly AgentMessage[] { + return this.messages; + } + + /** + * Returns the in-progress streaming state for UI mid-switch handoff. + * The UI reads this when attaching to an agent that's currently streaming + * to display content accumulated before the UI subscribed. + */ + getInProgressStream(): InProgressStreamState | null { + if (!this.textBuffer && !this.thoughtBuffer) return null; + return { + text: this.textBuffer, + thinking: this.thoughtBuffer, + round: this.streamRound, + }; + } + + getStatus(): AgentStatus { + return this.status; + } + + getError(): string | undefined { + return this.error; + } + + getLastRoundError(): string | undefined { + return this.lastRoundError; + } + + getStats(): AgentStatsSummary { + return this.core.getExecutionSummary(); + } + + getCore(): AgentCore { + return this.core; + } + + getEventEmitter(): AgentEventEmitter | undefined { + return this.core.getEventEmitter(); + } + + /** + * Wait for the run loop to finish (used by InProcessBackend). + */ + async waitForCompletion(): Promise { + if (this.executionPromise) { + await this.executionPromise; + } + } + + // ─── Private Helpers ─────────────────────────────────────── + + /** Emit terminal status for the just-completed round. */ + private settleRoundStatus(): void { + if (this.lastRoundError) { + this.setStatus(AgentStatus.FAILED); + } else { + this.setStatus(AgentStatus.COMPLETED); + } + } + + private setStatus(newStatus: AgentStatus): void { + const previousStatus = this.status; + if (previousStatus === newStatus) return; + + this.status = newStatus; + + this.core.eventEmitter?.emit(AgentEventType.STATUS_CHANGE, { + agentId: this.config.agentId, + previousStatus, + newStatus, + timestamp: Date.now(), + }); + } + + private addMessage( + role: AgentMessage['role'], + content: string, + options?: { thought?: boolean; metadata?: Record }, + ): void { + const message: AgentMessage = { + role, + content, + timestamp: Date.now(), + }; + if (options?.thought) { + message.thought = true; + } + if (options?.metadata) { + message.metadata = options.metadata; + } + this.messages.push(message); + } + + /** + * Flush accumulated stream buffers to finalized messages. + * + * Thought text → assistant message with thought=true. + * Regular text → assistant message. + * Called on ROUND_END, before TOOL_CALL (ordering), and after + * runReasoningLoop returns (final round). + */ + private flushStreamBuffers(): void { + if (this.thoughtBuffer) { + this.addMessage('assistant', this.thoughtBuffer, { thought: true }); + this.thoughtBuffer = ''; + } + if (this.textBuffer) { + this.addMessage('assistant', this.textBuffer); + this.textBuffer = ''; + } + this.streamRound = -1; + } + + /** + * Set up listeners on AgentCore's event emitter. + * + * AgentInteractive is the sole consumer of these events. It builds + * the conversation state (messages + in-progress stream) that the + * UI reads. Listeners use canonical event types from agent-events.ts. + */ + private setupEventListeners(): void { + const emitter = this.core.eventEmitter; + if (!emitter) return; + + emitter.on(AgentEventType.STREAM_TEXT, (event: AgentStreamTextEvent) => { + // Round boundary: flush previous round's buffers before starting a new one + if (event.round !== this.streamRound && this.streamRound !== -1) { + this.flushStreamBuffers(); + } + this.streamRound = event.round; + + if (event.thought) { + this.thoughtBuffer += event.text; + } else { + this.textBuffer += event.text; + } + }); + + emitter.on(AgentEventType.TOOL_CALL, (event: AgentToolCallEvent) => { + // Flush text buffers first — in the stream, text arrives before + // tool calls, so flushing preserves temporal ordering in messages. + this.flushStreamBuffers(); + + this.addMessage('tool_call', `Tool call: ${event.name}`, { + metadata: { + callId: event.callId, + toolName: event.name, + args: event.args, + round: event.round, + }, + }); + }); + + emitter.on(AgentEventType.TOOL_RESULT, (event: AgentToolResultEvent) => { + const statusText = event.success ? 'succeeded' : 'failed'; + const summary = event.error + ? `Tool ${event.name} ${statusText}: ${event.error}` + : `Tool ${event.name} ${statusText}`; + this.addMessage('tool_result', summary, { + metadata: { + callId: event.callId, + toolName: event.name, + success: event.success, + round: event.round, + }, + }); + }); + + emitter.on(AgentEventType.ROUND_END, () => { + this.flushStreamBuffers(); + }); + } +} diff --git a/packages/core/src/agents/runtime/agent-types.ts b/packages/core/src/agents/runtime/agent-types.ts new file mode 100644 index 000000000..df3e5fc9a --- /dev/null +++ b/packages/core/src/agents/runtime/agent-types.ts @@ -0,0 +1,175 @@ +/** + * @license + * Copyright 2025 Qwen + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * @fileoverview Agent runtime types. + * + * Contains the canonical definitions for agent configuration (prompt, model, + * run, tool), termination modes, and interactive agent types. + */ + +import type { Content, FunctionDeclaration } from '@google/genai'; + +// ─── Agent Configuration ───────────────────────────────────── + +/** + * Configures the initial prompt for an agent. + */ +export interface PromptConfig { + /** + * A single system prompt string that defines the agent's persona and instructions. + * Note: You should use either `systemPrompt` or `initialMessages`, but not both. + */ + systemPrompt?: string; + + /** + * An array of user/model content pairs to seed the chat history for few-shot prompting. + * Note: You should use either `systemPrompt` or `initialMessages`, but not both. + */ + initialMessages?: Content[]; +} + +/** + * Configures the generative model parameters for an agent. + */ +export interface ModelConfig { + /** + * The name or identifier of the model to be used (e.g., 'qwen3-coder-plus'). + * + * TODO: In the future, this needs to support 'auto' or some other string to support routing use cases. + */ + model?: string; + /** The temperature for the model's sampling process. */ + temp?: number; + /** The top-p value for nucleus sampling. */ + top_p?: number; +} + +/** + * Configures the execution environment and constraints for an agent. + * + * TODO: Consider adding max_tokens as a form of budgeting. + */ +export interface RunConfig { + /** The maximum execution time for the agent in minutes. */ + max_time_minutes?: number; + /** + * The maximum number of conversational turns (a user message + model response) + * before the execution is terminated. Helps prevent infinite loops. + */ + max_turns?: number; +} + +/** + * Configures the tools available to an agent during its execution. + */ +export interface ToolConfig { + /** + * A list of tool names (from the tool registry) or full function declarations + * that the agent is permitted to use. + */ + tools: Array; +} + +/** + * Describes the possible termination modes for an agent. + * This enum provides a clear indication of why an agent's execution ended. + */ +export enum AgentTerminateMode { + /** The agent's execution terminated due to an unrecoverable error. */ + ERROR = 'ERROR', + /** The agent's execution terminated because it exceeded the maximum allowed working time. */ + TIMEOUT = 'TIMEOUT', + /** The agent's execution successfully completed all its defined goals. */ + GOAL = 'GOAL', + /** The agent's execution terminated because it exceeded the maximum number of turns. */ + MAX_TURNS = 'MAX_TURNS', + /** The agent's execution was cancelled via an abort signal. */ + CANCELLED = 'CANCELLED', + /** The agent was gracefully shut down (e.g., arena/team session ended). */ + SHUTDOWN = 'SHUTDOWN', +} + +// ─── Agent Status ──────────────────────────────────────────── + +/** + * Canonical lifecycle status for any agent (headless, interactive, arena). + * + * State machine: + * INITIALIZING → RUNNING ⇄ COMPLETED / FAILED / CANCELLED + * + * - INITIALIZING: Setting up (creating chat, loading tools). + * - RUNNING: Actively processing (model thinking / tool execution). + * - COMPLETED: Finished successfully (may re-enter RUNNING on new input). + * - FAILED: Finished with error (API failure, process crash, etc.). + * - CANCELLED: Cancelled by user or system. + */ +export enum AgentStatus { + INITIALIZING = 'initializing', + RUNNING = 'running', + COMPLETED = 'completed', + FAILED = 'failed', + CANCELLED = 'cancelled', +} + +/** True for COMPLETED, FAILED, CANCELLED — agent is done working. */ +export const isTerminalStatus = (s: AgentStatus): boolean => + s === AgentStatus.COMPLETED || + s === AgentStatus.FAILED || + s === AgentStatus.CANCELLED; + +/** + * Lightweight configuration for an AgentInteractive instance. + * Carries only interactive-specific parameters; the heavy runtime + * configs (prompt, model, run, tools) live on AgentCore. + */ +export interface AgentInteractiveConfig { + /** Unique identifier for this agent. */ + agentId: string; + /** Human-readable name for display. */ + agentName: string; + /** Optional initial task to start working on immediately. */ + initialTask?: string; + /** Max model round-trips per enqueued message (default: unlimited). */ + maxTurnsPerMessage?: number; + /** Max wall-clock minutes per enqueued message (default: unlimited). */ + maxTimeMinutesPerMessage?: number; +} + +/** + * A message exchanged with or produced by an interactive agent. + * + * This is a UI-oriented data model (not the Gemini API Content type). + * AgentInteractive is the sole writer; the UI reads via getMessages(). + */ +export interface AgentMessage { + /** Discriminator for the message kind. */ + role: 'user' | 'assistant' | 'tool_call' | 'tool_result'; + /** The text content of the message. */ + content: string; + /** When the message was created (ms since epoch). */ + timestamp: number; + /** + * Whether this assistant message contains thinking/reasoning content. + * Mirrors AgentStreamTextEvent.thought. Only meaningful when role is 'assistant'. + */ + thought?: boolean; + /** Optional metadata (e.g. tool call info, round number). */ + metadata?: Record; +} + +/** + * Snapshot of in-progress streaming state for UI mid-switch handoff. + * Returned by AgentInteractive.getInProgressStream(). + */ +export interface InProgressStreamState { + /** Accumulated non-thought text so far in the current round. */ + text: string; + /** Accumulated thinking text so far in the current round. */ + thinking: string; + /** The reasoning-loop round number being streamed. */ + round: number; +} diff --git a/packages/core/src/agents/runtime/index.ts b/packages/core/src/agents/runtime/index.ts index 025790798..93ef0e5a3 100644 --- a/packages/core/src/agents/runtime/index.ts +++ b/packages/core/src/agents/runtime/index.ts @@ -8,8 +8,10 @@ * @fileoverview Runtime barrel — re-exports agent execution primitives. */ +export * from './agent-types.js'; export * from './agent-core.js'; export * from './agent-headless.js'; +export * from './agent-interactive.js'; export * from './agent-events.js'; export * from './agent-statistics.js'; -export * from './agent-hooks.js'; +export { AsyncMessageQueue } from '../../utils/asyncMessageQueue.js'; diff --git a/packages/core/src/config/config.ts b/packages/core/src/config/config.ts index 0d7fd5a09..b032c9c02 100644 --- a/packages/core/src/config/config.ts +++ b/packages/core/src/config/config.ts @@ -295,6 +295,10 @@ export interface AgentsCollabSettings { worktreeBaseDir?: string; /** Preserve worktrees and state files after session ends */ preserveArtifacts?: boolean; + /** Maximum rounds (turns) per agent. No limit if unset. */ + maxRoundsPerAgent?: number; + /** Total timeout in seconds for the Arena session. No limit if unset. */ + timeoutSeconds?: number; }; } @@ -1698,6 +1702,7 @@ export class Config { async createToolRegistry( sendSdkMcpMessage?: SendSdkMcpMessage, + options?: { skipDiscovery?: boolean }, ): Promise { const registry = new ToolRegistry( this, @@ -1786,7 +1791,9 @@ export class Config { registerCoreTool(LspTool, this); } - await registry.discoverAllTools(); + if (!options?.skipDiscovery) { + await registry.discoverAllTools(); + } this.debugLogger.debug( `ToolRegistry created: ${JSON.stringify(registry.getAllToolNames())} (${registry.getAllToolNames().length} tools)`, ); diff --git a/packages/core/src/core/client.ts b/packages/core/src/core/client.ts index 751d15221..7b0924840 100644 --- a/packages/core/src/core/client.ts +++ b/packages/core/src/core/client.ts @@ -492,9 +492,7 @@ export class GeminiClient { debugLogger.info( `Arena control signal received: ${controlSignal.type} - ${controlSignal.reason}`, ); - await arenaAgentClient.reportCompleted( - `Stopped by control signal: ${controlSignal.reason}`, - ); + await arenaAgentClient.reportCancelled(); return new Turn(this.getChat(), prompt_id); } } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 6b6b18351..6345fd054 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -248,7 +248,6 @@ export { export * from './extension/index.js'; export * from './prompts/mcp-prompts.js'; export * from './skills/index.js'; -export * from './subagents/index.js'; // ============================================================================ // Utilities diff --git a/packages/core/src/subagents/index.ts b/packages/core/src/subagents/index.ts index f877d23d8..c05c38697 100644 --- a/packages/core/src/subagents/index.ts +++ b/packages/core/src/subagents/index.ts @@ -5,18 +5,11 @@ */ /** - * @fileoverview Subagents Phase 1 implementation - File-based configuration layer + * @fileoverview Subagents — file-based configuration layer. * * This module provides the foundation for the subagents feature by implementing - * a file-based configuration system that builds on the AgentHeadless - * runtime system. It includes: + * a file-based configuration system that builds on the agent runtime. * - * - Type definitions for file-based subagent configurations - * - Validation system for configuration integrity - * - Runtime conversion functions integrated into the manager - * - Manager class for CRUD operations on subagent files - * - * The implementation follows the Markdown + YAML frontmatter format , with storage at both project and user levels. */ // Core types and interfaces @@ -40,39 +33,3 @@ export { SubagentValidator } from './validation.js'; // Main management class export { SubagentManager } from './subagent-manager.js'; - -// Re-export existing runtime types for convenience -export type { - PromptConfig, - ModelConfig, - RunConfig, - ToolConfig, - SubagentTerminateMode, -} from './types.js'; - -export { AgentHeadless } from '../agents/runtime/agent-headless.js'; - -// Event system for UI integration -export type { - AgentEvent, - AgentStartEvent, - AgentRoundEvent, - AgentStreamTextEvent, - AgentUsageEvent, - AgentToolCallEvent, - AgentToolResultEvent, - AgentFinishEvent, - AgentErrorEvent, - AgentApprovalRequestEvent, -} from '../agents/runtime/agent-events.js'; - -export { - AgentEventEmitter, - AgentEventType, -} from '../agents/runtime/agent-events.js'; - -// Statistics and formatting -export type { - AgentStatsSummary, - ToolUsageStats, -} from '../agents/runtime/agent-statistics.js'; diff --git a/packages/core/src/subagents/subagent-manager.ts b/packages/core/src/subagents/subagent-manager.ts index b2fa2c47e..ca908527d 100644 --- a/packages/core/src/subagents/subagent-manager.ts +++ b/packages/core/src/subagents/subagent-manager.ts @@ -19,16 +19,20 @@ import type { SubagentLevel, ListSubagentsOptions, CreateSubagentOptions, +} from './types.js'; +import type { PromptConfig, ModelConfig, RunConfig, ToolConfig, -} from './types.js'; +} from '../agents/runtime/agent-types.js'; import { SubagentError, SubagentErrorCode } from './types.js'; import { SubagentValidator } from './validation.js'; import { AgentHeadless } from '../agents/runtime/agent-headless.js'; -import type { AgentEventEmitter } from '../agents/runtime/agent-events.js'; -import type { AgentHooks } from '../agents/runtime/agent-hooks.js'; +import type { + AgentEventEmitter, + AgentHooks, +} from '../agents/runtime/agent-events.js'; import type { Config } from '../config/config.js'; import { createDebugLogger } from '../utils/debugLogger.js'; diff --git a/packages/core/src/subagents/types.ts b/packages/core/src/subagents/types.ts index e41fe620b..55e57f61e 100644 --- a/packages/core/src/subagents/types.ts +++ b/packages/core/src/subagents/types.ts @@ -4,7 +4,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -import type { Content, FunctionDeclaration } from '@google/genai'; +/** + * @fileoverview Subagent configuration types. + * + * Agent runtime types (PromptConfig, ModelConfig, RunConfig, ToolConfig, + * AgentTerminateMode) are canonically defined in agents/runtime/agent-types.ts. + */ + +import type { + ModelConfig, + RunConfig, + PromptConfig, + ToolConfig, +} from '../agents/runtime/agent-types.js'; /** * Represents the storage level for a subagent configuration. @@ -176,101 +188,3 @@ export const SubagentErrorCode = { export type SubagentErrorCode = (typeof SubagentErrorCode)[keyof typeof SubagentErrorCode]; - -/** - * Describes the possible termination modes for a subagent. - * This enum provides a clear indication of why a subagent's execution might have ended. - */ -export enum SubagentTerminateMode { - /** - * Indicates that the subagent's execution terminated due to an unrecoverable error. - */ - ERROR = 'ERROR', - /** - * Indicates that the subagent's execution terminated because it exceeded the maximum allowed working time. - */ - TIMEOUT = 'TIMEOUT', - /** - * Indicates that the subagent's execution successfully completed all its defined goals. - */ - GOAL = 'GOAL', - /** - * Indicates that the subagent's execution terminated because it exceeded the maximum number of turns. - */ - MAX_TURNS = 'MAX_TURNS', - /** - * Indicates that the subagent's execution was cancelled via an abort signal. - */ - CANCELLED = 'CANCELLED', - /** - * Indicates that the subagent was gracefully shut down (e.g., arena/team session ended). - */ - SHUTDOWN = 'SHUTDOWN', -} - -/** - * Configures the initial prompt for the subagent. - */ -export interface PromptConfig { - /** - * A single system prompt string that defines the subagent's persona and instructions. - * Note: You should use either `systemPrompt` or `initialMessages`, but not both. - */ - systemPrompt?: string; - - /** - * An array of user/model content pairs to seed the chat history for few-shot prompting. - * Note: You should use either `systemPrompt` or `initialMessages`, but not both. - */ - initialMessages?: Content[]; -} - -/** - * Configures the tools available to the subagent during its execution. - */ -export interface ToolConfig { - /** - * A list of tool names (from the tool registry) or full function declarations - * that the subagent is permitted to use. - */ - tools: Array; -} - -/** - * Configures the generative model parameters for the subagent. - * This interface specifies the model to be used and its associated generation settings, - * such as temperature and top-p values, which influence the creativity and diversity of the model's output. - */ -export interface ModelConfig { - /** - * The name or identifier of the model to be used (e.g., 'qwen3-coder-plus'). - * - * TODO: In the future, this needs to support 'auto' or some other string to support routing use cases. - */ - model?: string; - /** - * The temperature for the model's sampling process. - */ - temp?: number; - /** - * The top-p value for nucleus sampling. - */ - top_p?: number; -} - -/** - * Configures the execution environment and constraints for the subagent. - * This interface defines parameters that control the subagent's runtime behavior, - * such as maximum execution time, to prevent infinite loops or excessive resource consumption. - * - * TODO: Consider adding max_tokens as a form of budgeting. - */ -export interface RunConfig { - /** The maximum execution time for the subagent in minutes. */ - max_time_minutes?: number; - /** - * The maximum number of conversational turns (a user message + model response) - * before the execution is terminated. Helps prevent infinite loops. - */ - max_turns?: number; -} diff --git a/packages/core/src/subagents/validation.ts b/packages/core/src/subagents/validation.ts index 5df8cc315..cc38a4a43 100644 --- a/packages/core/src/subagents/validation.ts +++ b/packages/core/src/subagents/validation.ts @@ -5,12 +5,8 @@ */ import { SubagentError, SubagentErrorCode } from './types.js'; -import type { - ModelConfig, - RunConfig, - SubagentConfig, - ValidationResult, -} from './types.js'; +import type { SubagentConfig, ValidationResult } from './types.js'; +import type { ModelConfig, RunConfig } from '../agents/runtime/agent-types.js'; /** * Validates subagent configurations to ensure they are well-formed diff --git a/packages/core/src/tools/task.test.ts b/packages/core/src/tools/task.test.ts index a8323f71e..28b6168be 100644 --- a/packages/core/src/tools/task.test.ts +++ b/packages/core/src/tools/task.test.ts @@ -10,10 +10,8 @@ import type { PartListUnion } from '@google/genai'; import type { ToolResultDisplay, TaskResultDisplay } from './tools.js'; import type { Config } from '../config/config.js'; import { SubagentManager } from '../subagents/subagent-manager.js'; -import { - type SubagentConfig, - SubagentTerminateMode, -} from '../subagents/types.js'; +import type { SubagentConfig } from '../subagents/types.js'; +import { AgentTerminateMode } from '../agents/runtime/agent-types.js'; import { type AgentHeadless, ContextState, @@ -303,7 +301,7 @@ describe('TaskTool', () => { mockSubagentScope = { execute: vi.fn().mockResolvedValue(undefined), result: 'Task completed successfully', - terminateMode: SubagentTerminateMode.GOAL, + terminateMode: AgentTerminateMode.GOAL, getFinalText: vi.fn().mockReturnValue('Task completed successfully'), formatCompactResult: vi .fn() @@ -347,7 +345,7 @@ describe('TaskTool', () => { successfulToolCalls: 3, failedToolCalls: 0, }), - getTerminateMode: vi.fn().mockReturnValue(SubagentTerminateMode.GOAL), + getTerminateMode: vi.fn().mockReturnValue(AgentTerminateMode.GOAL), } as unknown as AgentHeadless; mockContextState = { diff --git a/packages/core/src/tools/task.ts b/packages/core/src/tools/task.ts index 35aa8af41..430d25a65 100644 --- a/packages/core/src/tools/task.ts +++ b/packages/core/src/tools/task.ts @@ -18,10 +18,8 @@ import type { } from './tools.js'; import type { Config } from '../config/config.js'; import type { SubagentManager } from '../subagents/subagent-manager.js'; -import { - type SubagentConfig, - SubagentTerminateMode, -} from '../subagents/types.js'; +import type { SubagentConfig } from '../subagents/types.js'; +import { AgentTerminateMode } from '../agents/runtime/agent-types.js'; import { ContextState } from '../agents/runtime/agent-headless.js'; import { AgentEventEmitter, @@ -54,6 +52,7 @@ export class TaskTool extends BaseDeclarativeTool { private subagentManager: SubagentManager; private availableSubagents: SubagentConfig[] = []; + private readonly removeChangeListener: () => void; constructor(private readonly config: Config) { // Initialize with a basic schema first @@ -89,7 +88,7 @@ export class TaskTool extends BaseDeclarativeTool { ); this.subagentManager = config.getSubagentManager(); - this.subagentManager.addChangeListener(() => { + this.removeChangeListener = this.subagentManager.addChangeListener(() => { void this.refreshSubagents(); }); @@ -97,6 +96,10 @@ export class TaskTool extends BaseDeclarativeTool { this.refreshSubagents(); } + dispose(): void { + this.removeChangeListener(); + } + /** * Asynchronously initializes the tool by loading available subagents * and updating the description and schema. @@ -514,7 +517,7 @@ class TaskToolInvocation extends BaseToolInvocation { // Get the results const finalText = subagent.getFinalText(); const terminateMode = subagent.getTerminateMode(); - const success = terminateMode === SubagentTerminateMode.GOAL; + const success = terminateMode === AgentTerminateMode.GOAL; const executionSummary = subagent.getExecutionSummary(); if (signal?.aborted) { diff --git a/packages/core/src/tools/tool-registry.ts b/packages/core/src/tools/tool-registry.ts index 1db7f7e59..3ce247781 100644 --- a/packages/core/src/tools/tool-registry.ts +++ b/packages/core/src/tools/tool-registry.ts @@ -209,6 +209,22 @@ export class ToolRegistry { this.tools.set(tool.name, tool); } + /** + * Copies discovered (non-core) tools from another registry into this one. + * Used to share MCP/command-discovered tools with per-agent registries + * that were built with skipDiscovery. + */ + copyDiscoveredToolsFrom(source: ToolRegistry): void { + for (const tool of source.getAllTools()) { + if ( + (tool instanceof DiscoveredTool || tool instanceof DiscoveredMCPTool) && + !this.tools.has(tool.name) + ) { + this.tools.set(tool.name, tool); + } + } + } + private removeDiscoveredTools(): void { for (const tool of this.tools.values()) { if (tool instanceof DiscoveredTool || tool instanceof DiscoveredMCPTool) { @@ -489,10 +505,20 @@ export class ToolRegistry { } /** - * Stops all MCP clients and cleans up resources. + * Stops all MCP clients, disposes tools, and cleans up resources. * This method is idempotent and safe to call multiple times. */ async stop(): Promise { + for (const tool of this.tools.values()) { + if ('dispose' in tool && typeof tool.dispose === 'function') { + try { + tool.dispose(); + } catch (error) { + debugLogger.error(`Error disposing tool ${tool.name}:`, error); + } + } + } + try { await this.mcpClientManager.stop(); } catch (error) { diff --git a/packages/core/src/utils/asyncMessageQueue.test.ts b/packages/core/src/utils/asyncMessageQueue.test.ts new file mode 100644 index 000000000..fe5421033 --- /dev/null +++ b/packages/core/src/utils/asyncMessageQueue.test.ts @@ -0,0 +1,75 @@ +/** + * @license + * Copyright 2025 Qwen + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect } from 'vitest'; +import { AsyncMessageQueue } from './asyncMessageQueue.js'; + +describe('AsyncMessageQueue', () => { + it('should dequeue items in FIFO order', () => { + const queue = new AsyncMessageQueue(); + queue.enqueue('a'); + queue.enqueue('b'); + queue.enqueue('c'); + + expect(queue.dequeue()).toBe('a'); + expect(queue.dequeue()).toBe('b'); + expect(queue.dequeue()).toBe('c'); + }); + + it('should return null when empty', () => { + const queue = new AsyncMessageQueue(); + expect(queue.dequeue()).toBeNull(); + }); + + it('should return remaining items then null after drain()', () => { + const queue = new AsyncMessageQueue(); + queue.enqueue('x'); + queue.enqueue('y'); + + queue.drain(); + + expect(queue.dequeue()).toBe('x'); + expect(queue.dequeue()).toBe('y'); + expect(queue.dequeue()).toBeNull(); + }); + + it('should silently drop items enqueued after drain()', () => { + const queue = new AsyncMessageQueue(); + queue.drain(); + queue.enqueue('dropped'); + + expect(queue.size).toBe(0); + }); + + it('should track size accurately', () => { + const queue = new AsyncMessageQueue(); + expect(queue.size).toBe(0); + + queue.enqueue(1); + queue.enqueue(2); + expect(queue.size).toBe(2); + + queue.dequeue(); + expect(queue.size).toBe(1); + }); + + it('should report isDrained correctly', () => { + const queue = new AsyncMessageQueue(); + expect(queue.isDrained).toBe(false); + + queue.drain(); + expect(queue.isDrained).toBe(true); + }); + + it('should handle multiple sequential enqueue-dequeue cycles', () => { + const queue = new AsyncMessageQueue(); + + for (let i = 0; i < 5; i++) { + queue.enqueue(i); + expect(queue.dequeue()).toBe(i); + } + }); +}); diff --git a/packages/core/src/utils/asyncMessageQueue.ts b/packages/core/src/utils/asyncMessageQueue.ts new file mode 100644 index 000000000..3268718ef --- /dev/null +++ b/packages/core/src/utils/asyncMessageQueue.ts @@ -0,0 +1,54 @@ +/** + * @license + * Copyright 2025 Qwen + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * @fileoverview Generic non-blocking message queue. + * + * Simple FIFO queue for producer/consumer patterns. Dequeue is + * non-blocking — returns null when empty. The consumer decides + * when and how to process items. + */ + +/** + * A generic non-blocking message queue. + * + * - `enqueue(item)` adds an item. Silently dropped after `drain()`. + * - `dequeue()` returns the next item, or `null` if empty. + * - `drain()` signals that no more items will be enqueued. + */ +export class AsyncMessageQueue { + private items: T[] = []; + private drained = false; + + /** Add an item to the queue. Dropped silently after drain. */ + enqueue(item: T): void { + if (this.drained) return; + this.items.push(item); + } + + /** Remove and return the next item, or null if empty. */ + dequeue(): T | null { + if (this.items.length > 0) { + return this.items.shift()!; + } + return null; + } + + /** Signal that no more items will be enqueued. */ + drain(): void { + this.drained = true; + } + + /** Number of items currently in the queue. */ + get size(): number { + return this.items.length; + } + + /** Whether `drain()` has been called. */ + get isDrained(): boolean { + return this.drained; + } +}