diff --git a/docs/developers/tools/_meta.ts b/docs/developers/tools/_meta.ts index 266256376..c86d89280 100644 --- a/docs/developers/tools/_meta.ts +++ b/docs/developers/tools/_meta.ts @@ -5,6 +5,7 @@ export default { shell: 'Shell', 'todo-write': 'Todo Write', task: 'Task', + swarm: 'Swarm', 'exit-plan-mode': 'Exit Plan Mode', 'web-fetch': 'Web Fetch', 'web-search': 'Web Search', diff --git a/docs/developers/tools/introduction.md b/docs/developers/tools/introduction.md index 9c7325552..85702c003 100644 --- a/docs/developers/tools/introduction.md +++ b/docs/developers/tools/introduction.md @@ -51,6 +51,7 @@ Qwen Code's built-in tools can be broadly categorized as follows: - **[Memory Tool](./memory.md) (`save_memory`):** For saving and recalling information across sessions. - **[Todo Write Tool](./todo-write.md) (`todo_write`):** For creating and managing structured task lists during coding sessions. - **[Task Tool](./task.md) (`task`):** For delegating complex tasks to specialized subagents. +- **[Swarm Tool](./swarm.md) (`swarm`):** For running many independent lightweight workers in parallel and aggregating their results. - **[Exit Plan Mode Tool](./exit-plan-mode.md) (`exit_plan_mode`):** For exiting plan mode and proceeding with implementation. Additionally, these tools incorporate: diff --git a/docs/developers/tools/swarm.md b/docs/developers/tools/swarm.md new file mode 100644 index 000000000..280c185e5 --- /dev/null +++ b/docs/developers/tools/swarm.md @@ -0,0 +1,102 @@ +# Swarm Tool (`swarm`) + +Use `swarm` to run many independent, simple tasks through ephemeral worker +agents and return a structured aggregate result to the parent agent. + +Swarm is intended for map-reduce style work: + +- analyzing many files independently +- processing chunks of a large data file +- running independent searches where the first successful result is enough +- collecting per-item summaries, counts, or validation results + +For a few complex role-based tasks, use the [`task`](./task.md) tool instead. +For model comparison on the same task, use Agent Arena. + +## Arguments + +- `description` (string, required): Short description of the overall swarm job. +- `tasks` (array, required): Independent tasks. Each task becomes one worker. + - `id` (string, optional): Stable identifier returned in results. + - `description` (string, required): Short per-worker description. + - `prompt` (string, required): Complete instructions for the worker. +- `mode` (`wait_all` or `first_success`, optional): Defaults to `wait_all`. +- `max_concurrency` (number, optional): Maximum workers to run at once. +- `max_turns` (number, optional): Maximum model/tool turns per worker. + Defaults to `8`. +- `timeout_seconds` (number, optional): Per-worker wall-clock timeout. +- `worker_system_prompt` (string, optional): Shared worker system prompt. +- `allowed_tools` (string array, optional): Tool allowlist for workers. +- `disallowed_tools` (string array, optional): Tools removed from workers. + +If `max_concurrency` is omitted, Qwen Code uses +`QWEN_CODE_MAX_SWARM_CONCURRENCY`, then `QWEN_CODE_MAX_TOOL_CONCURRENCY`, then +`10`. + +## Result + +The tool returns JSON to the parent agent with: + +- `summary.total` +- `summary.completed` +- `summary.failed` +- `summary.cancelled` +- `summary.notStarted` +- `results[]` with one entry per task, including `taskId`, `status`, `output` + or `error`, duration, and execution stats when available + +Individual worker failures do not abort the whole swarm. The parent agent is +responsible for reading the aggregate result and presenting the final answer. + +## Examples + +Analyze files in parallel: + +```text +swarm( + description="Extract function names", + tasks=[ + { + id="src/a.ts", + description="Analyze src/a.ts", + prompt="Read /repo/src/a.ts and return the exported function names." + }, + { + id="src/b.ts", + description="Analyze src/b.ts", + prompt="Read /repo/src/b.ts and return the exported function names." + } + ], + max_concurrency=10 +) +``` + +Use first successful result: + +```text +swarm( + description="Find API route definition", + mode="first_success", + tasks=[ + { + description="Search routes directory", + prompt="Search /repo/src/routes for the user creation route." + }, + { + description="Search controllers directory", + prompt="Search /repo/src/controllers for the user creation route." + } + ] +) +``` + +## Notes + +Workers are lightweight and ephemeral: they are spawned, execute one task, +return a result, and are cleaned up. Workers cannot spawn further subagents or +cron jobs. + +Swarm workers run concurrently, so interactive permission prompts are avoided. +Permission hooks can still approve actions, and permissive approval modes still +apply where configured. Prefer read-only or disjoint file scopes for swarm +tasks. diff --git a/docs/users/features/arena.md b/docs/users/features/arena.md index 7b53238c7..d30fc1dc5 100644 --- a/docs/users/features/arena.md +++ b/docs/users/features/arena.md @@ -199,9 +199,9 @@ Agent Arena is experimental. Current limitations: ## Comparison with other multi-agent modes -Agent Arena is one of several planned multi-agent modes in Qwen Code. **Agent Team** and **Agent Swarm** are not yet implemented — the table below describes their intended design for reference. +Agent Arena is one of several multi-agent modes in Qwen Code. **Agent Team** is not yet implemented. **Agent Swarm** is available as a lightweight tool for batch-style parallel worker execution. -| | **Agent Arena** | **Agent Team** (planned) | **Agent Swarm** (planned) | +| | **Agent Arena** | **Agent Team** (planned) | **Agent Swarm** | | :---------------- | :----------------------------------------------------- | :------------------------------------------------- | :------------------------------------------------------- | | **Goal** | Competitive: Find the best solution to the _same_ task | Collaborative: Tackle _different_ aspects together | Batch parallel: Dynamically spawn workers for bulk tasks | | **Agents** | Pre-configured models compete independently | Teammates collaborate with assigned roles | Workers spawned on-the-fly, destroyed on completion | diff --git a/packages/core/src/agents/runtime/agent-core.ts b/packages/core/src/agents/runtime/agent-core.ts index 7effc025f..8eab84e4c 100644 --- a/packages/core/src/agents/runtime/agent-core.ts +++ b/packages/core/src/agents/runtime/agent-core.ts @@ -72,6 +72,7 @@ import { type ContextState, templateString } from './agent-headless.js'; */ export const EXCLUDED_TOOLS_FOR_SUBAGENTS: ReadonlySet = new Set([ ToolNames.AGENT, + ToolNames.SWARM, ToolNames.CRON_CREATE, ToolNames.CRON_LIST, ToolNames.CRON_DELETE, diff --git a/packages/core/src/config/config.ts b/packages/core/src/config/config.ts index 9b4850bf8..3cd21bf04 100644 --- a/packages/core/src/config/config.ts +++ b/packages/core/src/config/config.ts @@ -2463,6 +2463,10 @@ export class Config { const { AgentTool } = await import('../tools/agent/agent.js'); return new AgentTool(this); }); + await registerLazy(ToolNames.SWARM, async () => { + const { SwarmTool } = await import('../tools/swarm.js'); + return new SwarmTool(this); + }); await registerLazy(ToolNames.SKILL, async () => { const { SkillTool } = await import('../tools/skill.js'); return new SkillTool(this); diff --git a/packages/core/src/core/coreToolScheduler.test.ts b/packages/core/src/core/coreToolScheduler.test.ts index f7be11c5a..79eccada7 100644 --- a/packages/core/src/core/coreToolScheduler.test.ts +++ b/packages/core/src/core/coreToolScheduler.test.ts @@ -43,6 +43,7 @@ import type { HookExecutionResponse } from '../confirmation-bus/types.js'; import { type NotificationType } from '../hooks/types.js'; import type { MessageBus } from '../confirmation-bus/message-bus.js'; import { IdeClient } from '../ide/ide-client.js'; +import { ToolNames } from '../tools/tool-names.js'; vi.mock('fs/promises', () => ({ writeFile: vi.fn(), @@ -3167,6 +3168,61 @@ describe('Fire hook functions integration', () => { expect(startIndices.every((i) => i < firstEnd)).toBe(true); }); + it('should execute multiple swarm tools sequentially', async () => { + const executionLog: string[] = []; + + const swarmTool = new MockTool({ + name: ToolNames.SWARM, + execute: async (params) => { + const id = (params as { id: string }).id; + executionLog.push(`swarm:start:${id}`); + await new Promise((r) => setTimeout(r, 50)); + executionLog.push(`swarm:end:${id}`); + return { + llmContent: `Swarm ${id} done`, + returnDisplay: `Swarm ${id} done`, + }; + }, + }); + + const tools = new Map([[ToolNames.SWARM, swarmTool]]); + const onAllToolCallsComplete = vi.fn(); + const onToolCallsUpdate = vi.fn(); + const scheduler = createScheduler( + tools, + onAllToolCallsComplete, + onToolCallsUpdate, + ); + + await scheduler.schedule( + [ + { + callId: '1', + name: ToolNames.SWARM, + args: { id: 'A' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + { + callId: '2', + name: ToolNames.SWARM, + args: { id: 'B' }, + isClientInitiated: false, + prompt_id: 'p1', + }, + ], + new AbortController().signal, + ); + + expect(onAllToolCallsComplete).toHaveBeenCalled(); + expect(executionLog).toEqual([ + 'swarm:start:A', + 'swarm:end:A', + 'swarm:start:B', + 'swarm:end:B', + ]); + }); + it('should run concurrency-safe tools in parallel and unsafe tools sequentially', async () => { const executionLog: string[] = []; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index b4eead082..7d743b34b 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -108,6 +108,7 @@ export type { } from './tools/shell.js'; export type { SkillTool, SkillParams } from './tools/skill.js'; export type { AgentTool, AgentParams } from './tools/agent/agent.js'; +export type { SwarmTool, SwarmParams, SwarmTask } from './tools/swarm.js'; export type { TodoWriteTool, TodoItem, diff --git a/packages/core/src/tools/swarm.test.ts b/packages/core/src/tools/swarm.test.ts new file mode 100644 index 000000000..9d608b44e --- /dev/null +++ b/packages/core/src/tools/swarm.test.ts @@ -0,0 +1,285 @@ +/** + * @license + * Copyright 2025 Qwen + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { SwarmTool, type SwarmParams } from './swarm.js'; +import type { Config } from '../config/config.js'; +import type { ToolResult, ToolResultDisplay } from './tools.js'; +import { AgentTerminateMode } from '../agents/runtime/agent-types.js'; +import { ToolNames } from './tool-names.js'; + +const hoisted = vi.hoisted(() => ({ + createAgent: vi.fn(), +})); + +vi.mock('../agents/runtime/agent-headless.js', () => { + class MockContextState { + private readonly values = new Map(); + + set(key: string, value: unknown): void { + this.values.set(key, value); + } + + get(key: string): unknown { + return this.values.get(key); + } + } + + return { + AgentHeadless: { + create: hoisted.createAgent, + }, + ContextState: MockContextState, + }; +}); + +type SwarmToolInvocation = { + execute: ( + signal?: AbortSignal, + updateOutput?: (output: ToolResultDisplay) => void, + ) => Promise; + getDescription: () => string; +}; + +type SwarmToolWithProtectedMethods = SwarmTool & { + createInvocation: (params: SwarmParams) => SwarmToolInvocation; +}; + +type MockWorker = { + execute: ReturnType; + getTerminateMode: ReturnType; + getFinalText: ReturnType; + getExecutionSummary: ReturnType; +}; + +const summary = { + rounds: 1, + totalDurationMs: 10, + totalToolCalls: 0, + successfulToolCalls: 0, + failedToolCalls: 0, + successRate: 0, + inputTokens: 1, + outputTokens: 1, + thoughtTokens: 0, + cachedTokens: 0, + totalTokens: 2, + toolUsage: [], +}; + +function createWorker( + text: string, + terminateMode = AgentTerminateMode.GOAL, + execute?: () => Promise, +): MockWorker { + return { + execute: vi.fn(execute ?? (async () => undefined)), + getTerminateMode: vi.fn(() => terminateMode), + getFinalText: vi.fn(() => text), + getExecutionSummary: vi.fn(() => summary), + }; +} + +function getInvocation(params: SwarmParams): SwarmToolInvocation { + const tool = new SwarmTool({} as Config) as SwarmToolWithProtectedMethods; + return tool.createInvocation(params); +} + +function getJsonResult(result: ToolResult) { + const content = result.llmContent as Array<{ text: string }>; + return JSON.parse(content[0]!.text) as { + summary: { + total: number; + completed: number; + failed: number; + cancelled: number; + notStarted: number; + }; + results: Array<{ taskId: string; status: string; output?: string }>; + }; +} + +function flushPromises(): Promise { + return new Promise((resolve) => setTimeout(resolve, 0)); +} + +describe('SwarmTool', () => { + beforeEach(() => { + hoisted.createAgent.mockReset(); + }); + + it('validates required swarm parameters', () => { + const tool = new SwarmTool({} as Config); + + expect( + tool.validateToolParams({ + description: '', + tasks: [{ description: 'A', prompt: 'Do A' }], + }), + ).toBe('Parameter "description" must be a non-empty string.'); + + expect( + tool.validateToolParams({ + description: 'Batch', + tasks: [], + }), + ).toBe('Parameter "tasks" must be a non-empty array.'); + + expect( + tool.validateToolParams({ + description: 'Batch', + tasks: [{ description: 'A', prompt: 'Do A' }], + max_concurrency: 0, + }), + ).toBe('Parameter "max_concurrency" must be a positive integer.'); + }); + + it('runs all workers and aggregates successes and failures', async () => { + hoisted.createAgent + .mockResolvedValueOnce(createWorker('Functions: foo, bar')) + .mockResolvedValueOnce( + createWorker('Could not parse file', AgentTerminateMode.ERROR), + ); + + const invocation = getInvocation({ + description: 'Extract functions', + tasks: [ + { id: 'a.ts', description: 'Analyze a.ts', prompt: 'Read a.ts' }, + { id: 'b.ts', description: 'Analyze b.ts', prompt: 'Read b.ts' }, + ], + }); + + const result = await invocation.execute(new AbortController().signal); + const aggregate = getJsonResult(result); + + expect(hoisted.createAgent).toHaveBeenCalledTimes(2); + expect(aggregate.summary).toEqual({ + total: 2, + completed: 1, + failed: 1, + cancelled: 0, + notStarted: 0, + }); + expect(aggregate.results[0]).toMatchObject({ + taskId: 'a.ts', + status: 'success', + output: 'Functions: foo, bar', + }); + expect(aggregate.results[1]).toMatchObject({ + taskId: 'b.ts', + status: 'failed', + }); + }); + + it('disallows interactive tools in workers by default', async () => { + hoisted.createAgent.mockResolvedValueOnce(createWorker('done')); + + await getInvocation({ + description: 'Non-interactive worker', + disallowed_tools: [ToolNames.SHELL], + tasks: [{ description: 'Task', prompt: 'Do task' }], + }).execute(new AbortController().signal); + + const toolConfig = hoisted.createAgent.mock.calls[0]![5] as { + disallowedTools?: string[]; + }; + expect(toolConfig.disallowedTools).toEqual([ + ToolNames.ASK_USER_QUESTION, + ToolNames.SHELL, + ]); + }); + + it('formats table output without incomplete backslash escaping', async () => { + hoisted.createAgent.mockResolvedValueOnce( + createWorker('C:\\tmp\\a|b\nline2'), + ); + + const result = await getInvocation({ + description: 'Format result', + tasks: [{ description: 'Task', prompt: 'Return path' }], + }).execute(new AbortController().signal); + + expect(String(result.returnDisplay)).toContain( + '| task-1 | success | C:\\tmp\\a|b
line2 |', + ); + expect(String(result.returnDisplay)).not.toContain('\\|'); + }); + + it('honors max_concurrency while draining queued tasks', async () => { + const releases: Array<() => void> = []; + let running = 0; + let maxObserved = 0; + + const makeControlledWorker = (text: string) => + createWorker(text, AgentTerminateMode.GOAL, async () => { + running++; + maxObserved = Math.max(maxObserved, running); + await new Promise((resolve) => { + releases.push(() => { + running--; + resolve(); + }); + }); + }); + + hoisted.createAgent + .mockResolvedValueOnce(makeControlledWorker('one')) + .mockResolvedValueOnce(makeControlledWorker('two')) + .mockResolvedValueOnce(makeControlledWorker('three')); + + const promise = getInvocation({ + description: 'Limited batch', + max_concurrency: 2, + tasks: [ + { description: 'One', prompt: 'Do one' }, + { description: 'Two', prompt: 'Do two' }, + { description: 'Three', prompt: 'Do three' }, + ], + }).execute(new AbortController().signal); + + await flushPromises(); + expect(releases).toHaveLength(2); + expect(hoisted.createAgent).toHaveBeenCalledTimes(2); + + releases.shift()!(); + await flushPromises(); + expect(hoisted.createAgent).toHaveBeenCalledTimes(3); + + while (releases.length > 0) { + releases.shift()!(); + await flushPromises(); + } + + const result = await promise; + expect(maxObserved).toBe(2); + expect(getJsonResult(result).summary.completed).toBe(3); + }); + + it('supports first_success by stopping before queued workers start', async () => { + hoisted.createAgent.mockResolvedValueOnce(createWorker('winner')); + + const result = await getInvocation({ + description: 'Find answer', + mode: 'first_success', + max_concurrency: 1, + tasks: [ + { description: 'Try A', prompt: 'Try A' }, + { description: 'Try B', prompt: 'Try B' }, + { description: 'Try C', prompt: 'Try C' }, + ], + }).execute(new AbortController().signal); + + const aggregate = getJsonResult(result); + expect(hoisted.createAgent).toHaveBeenCalledTimes(1); + expect(aggregate.summary).toEqual({ + total: 3, + completed: 1, + failed: 0, + cancelled: 0, + notStarted: 2, + }); + }); +}); diff --git a/packages/core/src/tools/swarm.ts b/packages/core/src/tools/swarm.ts new file mode 100644 index 000000000..874a7d7a5 --- /dev/null +++ b/packages/core/src/tools/swarm.ts @@ -0,0 +1,636 @@ +/** + * @license + * Copyright 2025 Qwen + * SPDX-License-Identifier: Apache-2.0 + */ + +import { BaseDeclarativeTool, BaseToolInvocation, Kind } from './tools.js'; +import type { ToolResult, ToolResultDisplay } from './tools.js'; +import { ToolDisplayNames, ToolNames } from './tool-names.js'; +import type { Config } from '../config/config.js'; +import { + AgentHeadless, + ContextState, +} from '../agents/runtime/agent-headless.js'; +import { AgentTerminateMode } from '../agents/runtime/agent-types.js'; +import type { + PromptConfig, + RunConfig, + ToolConfig, +} from '../agents/runtime/agent-types.js'; +import type { AgentStatsSummary } from '../agents/runtime/agent-statistics.js'; +import { createDebugLogger } from '../utils/debugLogger.js'; + +const debugLogger = createDebugLogger('SWARM'); + +const DEFAULT_MAX_TURNS = 8; +const DEFAULT_WORKER_NAME = 'swarm-worker'; +const DEFAULT_WORKER_DISALLOWED_TOOLS: readonly string[] = [ + ToolNames.ASK_USER_QUESTION, +]; + +const DEFAULT_WORKER_SYSTEM_PROMPT = `You are a lightweight swarm worker. +Execute exactly one assigned task independently and return only the requested result. +Do not ask follow-up questions, do not coordinate with other workers, and do not spawn sub-agents. +Keep the response concise and structured so the parent agent can aggregate it.`; + +export interface SwarmTask { + id?: string; + description: string; + prompt: string; +} + +export interface SwarmParams { + description: string; + tasks: SwarmTask[]; + mode?: 'wait_all' | 'first_success'; + max_concurrency?: number; + max_turns?: number; + timeout_seconds?: number; + worker_system_prompt?: string; + allowed_tools?: string[]; + disallowed_tools?: string[]; +} + +interface SwarmWorkerResult { + taskId: string; + description: string; + status: 'success' | 'failed' | 'cancelled' | 'not_started'; + output?: string; + error?: string; + terminateReason?: string; + durationMs?: number; + stats?: AgentStatsSummary; +} + +interface SwarmSummary { + total: number; + completed: number; + failed: number; + cancelled: number; + notStarted: number; +} + +interface SwarmAggregateResult { + description: string; + mode: 'wait_all' | 'first_success'; + maxConcurrency: number; + summary: SwarmSummary; + results: SwarmWorkerResult[]; +} + +/** + * Swarm tool for ephemeral parallel map-style worker execution. + */ +export class SwarmTool extends BaseDeclarativeTool { + static readonly Name: string = ToolNames.SWARM; + + constructor(private readonly config: Config) { + super( + SwarmTool.Name, + ToolDisplayNames.SWARM, + `Spawn a dynamic swarm of lightweight workers for independent batch tasks. + +Use this for map-reduce style work where many simple tasks can run independently, +such as analyzing files, scanning chunks of data, or trying independent searches. +The tool creates ephemeral workers at runtime, runs them with bounded concurrency, +and returns structured per-task results for aggregation. + +Do not use this for tasks that require worker-to-worker communication or tightly +coupled edits to the same files. For a few complex role-based tasks, use the +${ToolNames.AGENT} tool instead.`, + Kind.Other, + { + type: 'object', + properties: { + description: { + type: 'string', + description: 'A short description of the overall swarm job.', + }, + tasks: { + type: 'array', + description: + 'Independent tasks to execute. Each task becomes one ephemeral worker.', + minItems: 1, + items: { + type: 'object', + properties: { + id: { + type: 'string', + description: + 'Optional stable task identifier used in aggregated results.', + }, + description: { + type: 'string', + description: + 'Short per-worker description for progress and result labels.', + }, + prompt: { + type: 'string', + description: + 'Complete instructions for this worker. Include all task-specific context.', + }, + }, + required: ['description', 'prompt'], + additionalProperties: false, + }, + }, + mode: { + type: 'string', + enum: ['wait_all', 'first_success'], + description: + 'wait_all runs every task and returns all results. first_success returns after the first successful worker and cancels the rest.', + }, + max_concurrency: { + type: 'number', + description: + 'Maximum number of workers to run at once. Defaults to QWEN_CODE_MAX_SWARM_CONCURRENCY, QWEN_CODE_MAX_TOOL_CONCURRENCY, or 10.', + }, + max_turns: { + type: 'number', + description: + 'Maximum model/tool turns per worker. Defaults to 8 for lightweight execution.', + }, + timeout_seconds: { + type: 'number', + description: + 'Optional wall-clock timeout per worker in seconds. Timed-out workers are cancelled and reported as failures.', + }, + worker_system_prompt: { + type: 'string', + description: + 'Optional system prompt shared by all workers. Defaults to a concise one-task worker prompt.', + }, + allowed_tools: { + type: 'array', + items: { type: 'string' }, + description: + 'Optional allowlist of tool names available to workers. Defaults to all non-recursive tools.', + }, + disallowed_tools: { + type: 'array', + items: { type: 'string' }, + description: + 'Optional blocklist of tool names removed from the worker tool pool.', + }, + }, + required: ['description', 'tasks'], + additionalProperties: false, + $schema: 'http://json-schema.org/draft-07/schema#', + }, + true, + true, + ); + } + + override validateToolParams(params: SwarmParams): string | null { + if ( + !params.description || + typeof params.description !== 'string' || + params.description.trim() === '' + ) { + return 'Parameter "description" must be a non-empty string.'; + } + + if (!Array.isArray(params.tasks) || params.tasks.length === 0) { + return 'Parameter "tasks" must be a non-empty array.'; + } + + for (const [index, task] of params.tasks.entries()) { + if (!task || typeof task !== 'object') { + return `Task at index ${index} must be an object.`; + } + if ( + !task.description || + typeof task.description !== 'string' || + task.description.trim() === '' + ) { + return `Task at index ${index} must include a non-empty "description".`; + } + if ( + !task.prompt || + typeof task.prompt !== 'string' || + task.prompt.trim() === '' + ) { + return `Task at index ${index} must include a non-empty "prompt".`; + } + if (task.id !== undefined && typeof task.id !== 'string') { + return `Task at index ${index} has invalid "id"; it must be a string.`; + } + } + + if ( + params.mode !== undefined && + params.mode !== 'wait_all' && + params.mode !== 'first_success' + ) { + return 'Parameter "mode" must be "wait_all" or "first_success".'; + } + + const numericChecks: Array<[keyof SwarmParams, number | undefined]> = [ + ['max_concurrency', params.max_concurrency], + ['max_turns', params.max_turns], + ['timeout_seconds', params.timeout_seconds], + ]; + + for (const [name, value] of numericChecks) { + if (value === undefined) continue; + if (!Number.isInteger(value) || value < 1) { + return `Parameter "${name}" must be a positive integer.`; + } + } + + if ( + params.worker_system_prompt !== undefined && + (typeof params.worker_system_prompt !== 'string' || + params.worker_system_prompt.trim() === '') + ) { + return 'Parameter "worker_system_prompt" must be a non-empty string when provided.'; + } + + if ( + params.allowed_tools !== undefined && + !this.isStringArray(params.allowed_tools) + ) { + return 'Parameter "allowed_tools" must be an array of strings.'; + } + + if ( + params.disallowed_tools !== undefined && + !this.isStringArray(params.disallowed_tools) + ) { + return 'Parameter "disallowed_tools" must be an array of strings.'; + } + + return null; + } + + protected createInvocation(params: SwarmParams) { + return new SwarmToolInvocation(this.config, params); + } + + private isStringArray(value: unknown): value is string[] { + return ( + Array.isArray(value) && + value.every((item) => typeof item === 'string' && item.trim() !== '') + ); + } +} + +class SwarmToolInvocation extends BaseToolInvocation { + private readonly mode = this.params.mode ?? 'wait_all'; + + constructor( + private readonly config: Config, + params: SwarmParams, + ) { + super(params); + } + + getDescription(): string { + return `${this.params.description} (${this.params.tasks.length} workers)`; + } + + async execute( + signal?: AbortSignal, + updateOutput?: (output: ToolResultDisplay) => void, + ): Promise { + const maxConcurrency = Math.min( + this.params.tasks.length, + this.params.max_concurrency ?? getDefaultMaxConcurrency(), + ); + const results = new Array( + this.params.tasks.length, + ); + + updateOutput?.( + formatProgress( + this.params.description, + results, + this.params.tasks.length, + ), + ); + + if (this.mode === 'first_success') { + await this.runFirstSuccess(results, maxConcurrency, signal, updateOutput); + } else { + await this.runWaitAll(results, maxConcurrency, signal, updateOutput); + } + + for (const [index, task] of this.params.tasks.entries()) { + results[index] ??= { + taskId: getTaskId(task, index), + description: task.description, + status: 'not_started', + }; + } + + const aggregate = buildAggregateResult( + this.params.description, + this.mode, + maxConcurrency, + results as SwarmWorkerResult[], + ); + + return { + llmContent: [{ text: JSON.stringify(aggregate, null, 2) }], + returnDisplay: formatAggregateDisplay(aggregate), + }; + } + + private async runWaitAll( + results: Array, + maxConcurrency: number, + signal?: AbortSignal, + updateOutput?: (output: ToolResultDisplay) => void, + ): Promise { + let nextIndex = 0; + + const workerLoop = async () => { + while (!signal?.aborted) { + const index = nextIndex++; + if (index >= this.params.tasks.length) return; + results[index] = await this.executeTask(index, signal); + updateOutput?.( + formatProgress( + this.params.description, + results, + this.params.tasks.length, + ), + ); + } + }; + + await Promise.all( + Array.from({ length: maxConcurrency }, () => workerLoop()), + ); + } + + private async runFirstSuccess( + results: Array, + maxConcurrency: number, + signal?: AbortSignal, + updateOutput?: (output: ToolResultDisplay) => void, + ): Promise { + const controller = new AbortController(); + const onAbort = () => controller.abort(); + signal?.addEventListener('abort', onAbort, { once: true }); + + let nextIndex = 0; + let shouldStopLaunching = false; + + const workerLoop = async () => { + while (!signal?.aborted && !controller.signal.aborted) { + if (shouldStopLaunching) return; + const index = nextIndex++; + if (index >= this.params.tasks.length) return; + + const result = await this.executeTask(index, controller.signal); + results[index] = result; + updateOutput?.( + formatProgress( + this.params.description, + results, + this.params.tasks.length, + ), + ); + + if (result.status === 'success') { + shouldStopLaunching = true; + controller.abort(); + return; + } + } + }; + + try { + await Promise.all( + Array.from({ length: maxConcurrency }, () => workerLoop()), + ); + } finally { + signal?.removeEventListener('abort', onAbort); + } + } + + private async executeTask( + index: number, + signal?: AbortSignal, + ): Promise { + const task = this.params.tasks[index]!; + const taskId = getTaskId(task, index); + const startTime = Date.now(); + const timeoutController = new AbortController(); + const onAbort = () => timeoutController.abort(); + signal?.addEventListener('abort', onAbort, { once: true }); + + let timeoutHandle: NodeJS.Timeout | undefined; + let timedOut = false; + if (this.params.timeout_seconds) { + timeoutHandle = setTimeout(() => { + timedOut = true; + timeoutController.abort(); + }, this.params.timeout_seconds * 1000); + } + + try { + if (signal?.aborted) { + return { + taskId, + description: task.description, + status: 'cancelled', + error: 'Worker was cancelled before it started.', + durationMs: Date.now() - startTime, + }; + } + + const subagent = await AgentHeadless.create( + DEFAULT_WORKER_NAME, + this.createWorkerConfig(), + this.createPromptConfig(), + {}, + this.createRunConfig(), + this.createToolConfig(), + ); + + const contextState = new ContextState(); + contextState.set('task_prompt', buildTaskPrompt(task, taskId)); + await subagent.execute(contextState, timeoutController.signal); + + const terminateMode = subagent.getTerminateMode(); + const finalText = subagent.getFinalText(); + const stats = subagent.getExecutionSummary(); + const wasSuccessful = terminateMode === AgentTerminateMode.GOAL; + + return { + taskId, + description: task.description, + status: wasSuccessful ? 'success' : 'failed', + ...(finalText + ? wasSuccessful + ? { output: finalText } + : { error: finalText } + : {}), + terminateReason: terminateMode, + durationMs: Date.now() - startTime, + stats, + }; + } catch (error) { + const message = + error instanceof Error ? error.message : String(error ?? 'Unknown'); + debugLogger.warn(`Swarm worker ${taskId} failed: ${message}`); + return { + taskId, + description: task.description, + status: + timedOut || timeoutController.signal.aborted ? 'cancelled' : 'failed', + error: timedOut + ? `Worker timed out after ${this.params.timeout_seconds} seconds.` + : message, + durationMs: Date.now() - startTime, + }; + } finally { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + signal?.removeEventListener('abort', onAbort); + } + } + + private createWorkerConfig(): Config { + // Swarm workers run concurrently, so interactive prompts cannot be safely + // surfaced one-by-one. Permission hooks may still allow specific actions. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const workerConfig = Object.create(this.config) as any; + workerConfig.getShouldAvoidPermissionPrompts = () => true; + return workerConfig as Config; + } + + private createPromptConfig(): PromptConfig { + return { + systemPrompt: + this.params.worker_system_prompt ?? DEFAULT_WORKER_SYSTEM_PROMPT, + }; + } + + private createRunConfig(): RunConfig { + return { + max_turns: this.params.max_turns ?? DEFAULT_MAX_TURNS, + }; + } + + private createToolConfig(): ToolConfig { + const disallowedTools = Array.from( + new Set([ + ...DEFAULT_WORKER_DISALLOWED_TOOLS, + ...(this.params.disallowed_tools ?? []), + ]), + ); + + return { + tools: + this.params.allowed_tools && this.params.allowed_tools.length > 0 + ? this.params.allowed_tools + : ['*'], + disallowedTools, + }; + } +} + +function getDefaultMaxConcurrency(): number { + const parsed = parseInt( + process.env['QWEN_CODE_MAX_SWARM_CONCURRENCY'] || + process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY'] || + '', + 10, + ); + return Number.isFinite(parsed) && parsed >= 1 ? parsed : 10; +} + +function getTaskId(task: SwarmTask, index: number): string { + return task.id?.trim() || `task-${index + 1}`; +} + +function buildTaskPrompt(task: SwarmTask, taskId: string): string { + return `Swarm task id: ${taskId} +Swarm task description: ${task.description} + +${task.prompt} + +Return only the result for this task.`; +} + +function buildAggregateResult( + description: string, + mode: 'wait_all' | 'first_success', + maxConcurrency: number, + results: SwarmWorkerResult[], +): SwarmAggregateResult { + const summary: SwarmSummary = { + total: results.length, + completed: results.filter((r) => r.status === 'success').length, + failed: results.filter((r) => r.status === 'failed').length, + cancelled: results.filter((r) => r.status === 'cancelled').length, + notStarted: results.filter((r) => r.status === 'not_started').length, + }; + + return { + description, + mode, + maxConcurrency, + summary, + results, + }; +} + +function formatProgress( + description: string, + results: Array, + total: number, +): string { + const settled = results.filter(Boolean) as SwarmWorkerResult[]; + const completed = settled.filter((r) => r.status === 'success').length; + const failed = settled.filter((r) => r.status === 'failed').length; + const cancelled = settled.filter((r) => r.status === 'cancelled').length; + const finished = completed + failed + cancelled; + return `Swarm "${description}": ${finished}/${total} settled (${completed} succeeded, ${failed} failed, ${cancelled} cancelled).`; +} + +function formatAggregateDisplay(result: SwarmAggregateResult): string { + const lines = [ + `### Swarm Complete`, + ``, + `**Task**: ${result.description}`, + `**Mode**: ${result.mode}`, + `**Max concurrency**: ${result.maxConcurrency}`, + ``, + `| Status | Count |`, + `| --- | ---: |`, + `| Success | ${result.summary.completed} |`, + `| Failed | ${result.summary.failed} |`, + `| Cancelled | ${result.summary.cancelled} |`, + `| Not started | ${result.summary.notStarted} |`, + ``, + `| Task | Status | Result |`, + `| --- | --- | --- |`, + ]; + + for (const workerResult of result.results) { + lines.push( + `| ${escapeMarkdownTableCell(workerResult.taskId)} | ${workerResult.status} | ${escapeMarkdownTableCell( + summarizeWorkerResult(workerResult), + )} |`, + ); + } + + return lines.join('\n'); +} + +function summarizeWorkerResult(result: SwarmWorkerResult): string { + const value = result.output ?? result.error ?? result.terminateReason ?? ''; + if (value.length <= 160) return value; + return `${value.slice(0, 157)}...`; +} + +function escapeMarkdownTableCell(value: string): string { + return value.replace(/\|/g, '|').replace(/\r?\n/g, '
'); +} diff --git a/packages/core/src/tools/tool-names.ts b/packages/core/src/tools/tool-names.ts index 9edc21508..f85314a0f 100644 --- a/packages/core/src/tools/tool-names.ts +++ b/packages/core/src/tools/tool-names.ts @@ -19,6 +19,7 @@ export const ToolNames = { TODO_WRITE: 'todo_write', MEMORY: 'save_memory', AGENT: 'agent', + SWARM: 'swarm', SKILL: 'skill', EXIT_PLAN_MODE: 'exit_plan_mode', WEB_FETCH: 'web_fetch', @@ -46,6 +47,7 @@ export const ToolDisplayNames = { TODO_WRITE: 'TodoWrite', MEMORY: 'SaveMemory', AGENT: 'Agent', + SWARM: 'Swarm', SKILL: 'Skill', EXIT_PLAN_MODE: 'ExitPlanMode', WEB_FETCH: 'WebFetch',