diff --git a/integration-tests/concurrent-runner/README.md b/integration-tests/concurrent-runner/README.md new file mode 100644 index 000000000..f19c12239 --- /dev/null +++ b/integration-tests/concurrent-runner/README.md @@ -0,0 +1,138 @@ +# Qwen Concurrent Runner + +A Python tool for executing multiple Qwen CLI tasks across different models concurrently using isolated git worktrees. + +## Overview + +This tool enables you to: + +- Run multiple tasks against multiple models in parallel +- Create isolated git worktrees for each execution +- Track execution status in real-time +- Capture and store all outputs (stdout, stderr, and OpenAI logs) +- Resume or analyze results after completion + +## Installation + +```bash +# Install dependencies +pip install -r requirements.txt +``` + +## Usage + +```bash +python runner.py config.json +``` + +## Configuration + +Create a JSON configuration file (see `config.example.json`): + +```json +{ + "concurrency": 3, + "yolo": true, + "source_repo": ".", + "worktree_base": "~/.qwen/worktrees", + "outputs_dir": "./outputs", + "results_file": "./results.json", + "tasks": [ + { + "id": "code-review", + "name": "Security Code Review", + "prompts": ["Review the codebase for security vulnerabilities."] + } + ], + "models": ["claude-3-5-sonnet-20241022", "qwen3-coder-plus"] +} +``` + +### Configuration Options + +| Option | Type | Default | Description | +| --------------- | ------ | ----------------- | -------------------------------- | +| `concurrency` | int | 4 | Maximum parallel executions | +| `yolo` | bool | true | Auto-approve all actions | +| `source_repo` | string | . | Source git repository path | +| `worktree_base` | string | ~/.qwen/worktrees | Base directory for git worktrees | +| `outputs_dir` | string | ./outputs | Directory for captured output | +| `results_file` | string | ./results.json | JSON file for run tracking | +| `tasks` | array | [] | List of task definitions | +| `models` | array | [] | List of model identifiers | + +### Task Definition + +Each task has: + +- `id`: Unique identifier +- `name`: Human-readable name +- `prompts`: Array of prompt strings (joined with newlines) + +## Output Structure + +Each run creates an isolated output directory: + +``` +outputs/ +├── {run_id}/ +│ ├── stdout.txt # CLI stdout +│ ├── stderr.txt # CLI stderr +│ └── logs/ # OpenAI API logs +│ └── openai-*.json +``` + +## results.json + +```json +{ + "updated_at": "2026-01-28T10:30:00", + "runs": [ + { + "run_id": "abc123", + "task_id": "code-review", + "task_name": "Security Code Review", + "model": "qwen3-coder-plus", + "status": "succeeded", + "worktree_path": "~/.qwen/worktrees/run-abc123", + "output_dir": "outputs/abc123", + "logs_dir": "outputs/abc123/logs", + "started_at": "2026-01-28T10:00:00", + "ended_at": "2026-01-28T10:05:00", + "exit_code": 0, + "stdout_file": "outputs/abc123/stdout.txt", + "stderr_file": "outputs/abc123/stderr.txt" + } + ] +} +``` + +## Execution Flow + +1. **Generate Matrix**: Create N×M run combinations (tasks × models) +2. **Create Worktree**: Git worktree add from source repo +3. **Initialize**: npm install && npm run build +4. **Execute**: Run qwen CLI with captured output (logs go to run-specific folder) +5. **Cleanup**: Remove git worktree (always executed) + +## Status Values + +- `queued`: Waiting to start +- `preparing`: Creating git worktree +- `initializing`: Running npm install + build +- `running`: Executing qwen CLI +- `succeeded`: Completed successfully +- `failed`: Error occurred + +## Requirements + +- Python 3.10+ +- Git repository (for worktree operations) +- Node.js and npm (for build step) +- `qwen` CLI in PATH + +## Exit Codes + +- 0: All runs succeeded +- 1: One or more runs failed +- 130: Interrupted by user (Ctrl+C) diff --git a/integration-tests/concurrent-runner/config.example.json b/integration-tests/concurrent-runner/config.example.json new file mode 100644 index 000000000..5efe5178f --- /dev/null +++ b/integration-tests/concurrent-runner/config.example.json @@ -0,0 +1,34 @@ +{ + "concurrency": 3, + "yolo": true, + "source_repo": ".", + "worktree_base": "~/.qwen/worktrees", + "outputs_dir": "./outputs", + "results_file": "./results.json", + "tasks": [ + { + "id": "code-review", + "name": "Security Code Review", + "prompts": [ + "Review the codebase for security vulnerabilities.", + "Focus on input validation, authentication, and data handling." + ] + }, + { + "id": "refactor", + "name": "Refactoring Suggestions", + "prompts": [ + "Analyze the code structure and suggest refactoring improvements.", + "Prioritize changes that improve maintainability and performance." + ] + }, + { + "id": "docs", + "name": "Documentation Generation", + "prompts": [ + "Generate comprehensive API documentation for the main modules." + ] + } + ], + "models": ["claude-3-5-sonnet-20241022", "qwen3-coder-plus"] +} diff --git a/integration-tests/concurrent-runner/examples/toy-config.json b/integration-tests/concurrent-runner/examples/toy-config.json new file mode 100644 index 000000000..51eabff61 --- /dev/null +++ b/integration-tests/concurrent-runner/examples/toy-config.json @@ -0,0 +1,26 @@ +{ + "concurrency": 2, + "yolo": true, + "source_repo": "/Users/andy/workspace/projects/qwen-code/integration-tests/concurrent-runner/examples/toy-project", + "worktree_base": "~/.qwen/worktrees", + "outputs_dir": "./examples/outputs", + "results_file": "./examples/results.json", + "tasks": [ + { + "id": "task-1", + "name": "Rabbit Counting", + "prompts": [ + "Suppose we have 3 rabbits and 4 carrots. How many animals are there?" + ] + }, + { + "id": "task-2", + "name": "AGI Prediction", + "prompts": [ + "Use shell tool to get current date", + "Predict how many dates left until we have AGI (artificial general intelligence)" + ] + } + ], + "models": ["qwen3-coder-plus", "kimi-k2.5"] +} diff --git a/integration-tests/concurrent-runner/examples/toy-project/package.json b/integration-tests/concurrent-runner/examples/toy-project/package.json new file mode 100644 index 000000000..fd6439fd8 --- /dev/null +++ b/integration-tests/concurrent-runner/examples/toy-project/package.json @@ -0,0 +1,11 @@ +{ + "name": "toy-project", + "version": "1.0.0", + "description": "Minimal toy project for testing", + "scripts": { + "build": "echo 'Build complete!'" + }, + "keywords": [], + "author": "", + "license": "MIT" +} diff --git a/integration-tests/concurrent-runner/requirements.txt b/integration-tests/concurrent-runner/requirements.txt new file mode 100644 index 000000000..f1a4d5adc --- /dev/null +++ b/integration-tests/concurrent-runner/requirements.txt @@ -0,0 +1,2 @@ +rich>=13.0.0 +aiofiles>=23.0.0 diff --git a/integration-tests/concurrent-runner/runner.py b/integration-tests/concurrent-runner/runner.py new file mode 100644 index 000000000..23bf6ee88 --- /dev/null +++ b/integration-tests/concurrent-runner/runner.py @@ -0,0 +1,691 @@ +#!/usr/bin/env python3 +""" +Qwen Concurrent Runner - Execute multiple CLI tasks across different models concurrently. + +This tool creates isolated git worktrees for each task/model combination and executes +the Qwen CLI in parallel with status tracking and output capture. +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import shutil +import subprocess +import sys +import uuid +from dataclasses import dataclass, field, asdict +from datetime import datetime +from enum import Enum +from pathlib import Path +from typing import List, Optional, Dict, Any + +from rich.console import Console +from rich.live import Live +from rich.table import Table +from rich.panel import Panel +from rich.progress import Progress, TaskID +import aiofiles +import aiofiles.os + + +class RunStatus(Enum): + """Execution status for a single run.""" + QUEUED = "queued" + PREPARING = "preparing" + INITIALIZING = "initializing" + RUNNING = "running" + SUCCEEDED = "succeeded" + FAILED = "failed" + CLEANING = "cleaning" + + +@dataclass +class Task: + """A task definition containing one or more prompts.""" + id: str + name: str + prompts: List[str] + + +@dataclass +class RunConfig: + """Configuration for the concurrent execution.""" + tasks: List[Task] + models: List[str] + concurrency: int = 4 + yolo: bool = True + source_repo: Path = field(default_factory=lambda: Path.cwd()) + worktree_base: Path = field(default_factory=lambda: Path.home() / ".qwen" / "worktrees") + outputs_dir: Path = field(default_factory=lambda: Path("./outputs")) + results_file: Path = field(default_factory=lambda: Path("./results.json")) + + +@dataclass +class RunRecord: + """Record of a single task/model execution.""" + run_id: str + task_id: str + task_name: str + model: str + status: RunStatus + worktree_path: Optional[str] = None + output_dir: Optional[str] = None + logs_dir: Optional[str] = None + started_at: Optional[str] = None + ended_at: Optional[str] = None + exit_code: Optional[int] = None + error_message: Optional[str] = None + stdout_file: Optional[str] = None + stderr_file: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "run_id": self.run_id, + "task_id": self.task_id, + "task_name": self.task_name, + "model": self.model, + "status": self.status.value, + "worktree_path": self.worktree_path, + "output_dir": self.output_dir, + "logs_dir": self.logs_dir, + "started_at": self.started_at, + "ended_at": self.ended_at, + "exit_code": self.exit_code, + "error_message": self.error_message, + "stdout_file": self.stdout_file, + "stderr_file": self.stderr_file, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> RunRecord: + return cls( + run_id=data["run_id"], + task_id=data["task_id"], + task_name=data["task_name"], + model=data["model"], + status=RunStatus(data["status"]), + worktree_path=data.get("worktree_path"), + output_dir=data.get("output_dir"), + logs_dir=data.get("logs_dir"), + started_at=data.get("started_at"), + ended_at=data.get("ended_at"), + exit_code=data.get("exit_code"), + error_message=data.get("error_message"), + stdout_file=data.get("stdout_file"), + stderr_file=data.get("stderr_file"), + ) + + +@dataclass +class ExecutionState: + """Overall execution state across all runs.""" + runs: List[RunRecord] = field(default_factory=list) + total: int = 0 + completed: int = 0 + succeeded: int = 0 + failed: int = 0 + + +class GitWorktreeManager: + """Manages git worktree creation, initialization, and cleanup.""" + + def __init__(self, console: Console, source_repo: Path): + self.console = console + self.source_repo = source_repo + + async def create(self, source_repo: Path, worktree_dir: Path) -> Path: + """Create a new git worktree from the source repository.""" + worktree_dir.parent.mkdir(parents=True, exist_ok=True) + + cmd = ["git", "worktree", "add", str(worktree_dir), "HEAD"] + self.console.print(f"[dim]Git: {' '.join(cmd)}[/dim]") + result = await self._run_command(cmd, cwd=source_repo) + + if result.returncode != 0: + raise RuntimeError(f"Failed to create worktree: {result.stderr}") + + return worktree_dir + + async def initialize(self, worktree_dir: Path) -> None: + """Initialize the worktree by running npm install and npm run build.""" + # npm install + self.console.print(f"[dim]Running npm install in {worktree_dir.name}...[/dim]") + install_result = await self._run_command( + ["npm", "install"], + cwd=worktree_dir, + timeout=300 + ) + if install_result.returncode != 0: + raise RuntimeError(f"npm install failed: {install_result.stderr}") + + # npm run build + self.console.print(f"[dim]Running npm run build in {worktree_dir.name}...[/dim]") + build_result = await self._run_command( + ["npm", "run", "build"], + cwd=worktree_dir, + timeout=300 + ) + if build_result.returncode != 0: + raise RuntimeError(f"npm run build failed: {build_result.stderr}") + + async def remove(self, worktree_dir: Path) -> None: + """Remove a git worktree.""" + if not worktree_dir.exists(): + self.console.print(f"[dim]Worktree already removed: {worktree_dir}[/dim]") + return + + self.console.print(f"[dim]Removing worktree: {worktree_dir}[/dim]") + cmd = ["git", "worktree", "remove", "--force", str(worktree_dir)] + result = await self._run_command(cmd, cwd=self.source_repo) + + if result.returncode != 0: + self.console.print(f"[yellow]Warning: Failed to remove worktree {worktree_dir}: {result.stderr}[/yellow]") + # Fallback to manual removal + try: + shutil.rmtree(worktree_dir, ignore_errors=True) + except Exception: + pass + + async def _run_command( + self, + cmd: List[str], + cwd: Optional[Path] = None, + timeout: int = 60 + ) -> subprocess.CompletedProcess: + """Run a command asynchronously.""" + proc = await asyncio.create_subprocess_exec( + *cmd, + cwd=cwd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate(), + timeout=timeout + ) + return subprocess.CompletedProcess( + args=cmd, + returncode=proc.returncode, + stdout=stdout.decode() if stdout else "", + stderr=stderr.decode() if stderr else "", + ) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + raise RuntimeError(f"Command timed out after {timeout}s: {' '.join(cmd)}") + + +class StatusTracker: + """Thread-safe status tracking with JSON persistence.""" + + def __init__(self, results_file: Path, console: Console): + self.results_file = results_file + self.console = console + self._lock = asyncio.Lock() + self._runs: Dict[str, RunRecord] = {} + + async def initialize(self, runs: List[RunRecord]) -> None: + """Initialize the tracker with all runs.""" + async with self._lock: + for run in runs: + self._runs[run.run_id] = run + await self._persist() + + async def update_status( + self, + run_id: str, + status: RunStatus, + **kwargs + ) -> None: + """Update the status of a run.""" + async with self._lock: + if run_id in self._runs: + run = self._runs[run_id] + run.status = status + for key, value in kwargs.items(): + if hasattr(run, key): + setattr(run, key, value) + await self._persist() + + async def _persist(self) -> None: + """Persist current state to JSON file.""" + data = { + "updated_at": datetime.now().isoformat(), + "runs": [run.to_dict() for run in self._runs.values()], + } + + # Write atomically + temp_file = self.results_file.with_suffix('.tmp') + async with aiofiles.open(temp_file, 'w') as f: + await f.write(json.dumps(data, indent=2)) + + temp_file.replace(self.results_file) + + def get_state(self) -> ExecutionState: + """Get current execution state.""" + runs = list(self._runs.values()) + completed = sum(1 for r in runs if r.status in (RunStatus.SUCCEEDED, RunStatus.FAILED)) + succeeded = sum(1 for r in runs if r.status == RunStatus.SUCCEEDED) + failed = sum(1 for r in runs if r.status == RunStatus.FAILED) + + return ExecutionState( + runs=runs, + total=len(runs), + completed=completed, + succeeded=succeeded, + failed=failed, + ) + + def get_active_runs(self) -> List[RunRecord]: + """Get currently active runs.""" + active_statuses = {RunStatus.PREPARING, RunStatus.INITIALIZING, RunStatus.RUNNING} + return [r for r in self._runs.values() if r.status in active_statuses] + + +class ProgressDisplay: + """Rich-based progress display.""" + + def __init__(self, console: Console): + self.console = console + self.live: Optional[Live] = None + + def start(self) -> None: + """Start the live display.""" + self.live = Live(auto_refresh=True, console=self.console) + self.live.start() + + def stop(self) -> None: + """Stop the live display.""" + if self.live: + self.live.stop() + + def update(self, state: ExecutionState) -> None: + """Update the display with current state.""" + if not self.live: + return + + # Summary panel + summary = Table.grid(expand=True) + summary.add_column() + summary.add_column() + summary.add_row( + f"[bold]Total:[/bold] {state.total}", + f"[bold]Completed:[/bold] {state.completed}/{state.total}" + ) + summary.add_row( + f"[green bold]Succeeded:[/green bold] {state.succeeded}", + f"[red bold]Failed:[/red bold] {state.failed}" + ) + + # Active runs table + active_runs = [r for r in state.runs if r.status not in (RunStatus.SUCCEEDED, RunStatus.FAILED, RunStatus.QUEUED)] + + runs_table = Table( + title="Active Runs", + show_header=True, + header_style="bold magenta", + expand=True, + ) + runs_table.add_column("Task", style="cyan") + runs_table.add_column("Model", style="green") + runs_table.add_column("Status", style="yellow") + runs_table.add_column("Started", style="dim") + + for run in active_runs[:10]: # Show up to 10 active runs + started = run.started_at or "N/A" + if len(started) > 19: + started = started[11:19] # Extract time portion + runs_table.add_row( + run.task_name[:30], + run.model[:25], + run.status.value, + started, + ) + + # Recent completed runs + completed_runs = sorted( + [r for r in state.runs if r.status in (RunStatus.SUCCEEDED, RunStatus.FAILED)], + key=lambda r: r.ended_at or "", + reverse=True, + )[:5] + + completed_table = Table( + title="Recently Completed", + show_header=True, + header_style="bold blue", + expand=True, + ) + completed_table.add_column("Task", style="cyan") + completed_table.add_column("Model", style="green") + completed_table.add_column("Status", style="bold") + completed_table.add_column("Duration", style="dim") + + for run in completed_runs: + status_color = "green" if run.status == RunStatus.SUCCEEDED else "red" + duration = "N/A" + if run.started_at and run.ended_at: + try: + start = datetime.fromisoformat(run.started_at) + end = datetime.fromisoformat(run.ended_at) + duration_sec = (end - start).total_seconds() + duration = f"{duration_sec:.1f}s" + except: + pass + + completed_table.add_row( + run.task_name[:30], + run.model[:25], + f"[{status_color}]{run.status.value}[/{status_color}]", + duration, + ) + + # Combine everything + layout = Table.grid(expand=True) + layout.add_column() + layout.add_row(Panel(summary, title="Execution Summary", border_style="blue")) + layout.add_row(runs_table) + if completed_runs: + layout.add_row(completed_table) + + self.live.update(layout) + + def show_final_summary(self, state: ExecutionState) -> None: + """Show final execution summary.""" + self.console.print() + self.console.print(Panel( + f"[bold green]Execution Complete![/bold green]\n\n" + f"Total Runs: {state.total}\n" + f"Succeeded: [green]{state.succeeded}[/green]\n" + f"Failed: [red]{state.failed}[/red]\n" + f"Success Rate: {(state.succeeded / state.total * 100):.1f}%", + title="Final Results", + border_style="green" if state.failed == 0 else "yellow", + )) + + +class QwenRunner: + """Executes the Qwen CLI for a specific task and model.""" + + def __init__(self, config: RunConfig, console: Console): + self.config = config + self.console = console + + async def run( + self, + run: RunRecord, + worktree_dir: Path, + output_dir: Path, + ) -> None: + """Execute the Qwen CLI and capture output.""" + output_dir.mkdir(parents=True, exist_ok=True) + run.output_dir = str(output_dir) + + # Build command (needs output_dir set for logs) + cmd = self._build_command(run) + self.console.print(f"[blue]Executing qwen CLI...[/blue]") + self.console.print(f"[dim]Command: {' '.join(cmd)}[/dim]") + + # Prepare output files + stdout_file = output_dir / "stdout.txt" + stderr_file = output_dir / "stderr.txt" + + run.stdout_file = str(stdout_file) + run.stderr_file = str(stderr_file) + + # Run the CLI + env = os.environ.copy() + env["QWEN_CODE_ROOT"] = str(worktree_dir) + + proc = await asyncio.create_subprocess_exec( + *cmd, + cwd=worktree_dir, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + ) + + # Capture output + stdout_data = [] + stderr_data = [] + + async def read_stream(stream, data_list, file_path): + async with aiofiles.open(file_path, 'w') as f: + while True: + line = await stream.readline() + if not line: + break + decoded = line.decode() + data_list.append(decoded) + await f.write(decoded) + await f.flush() + + await asyncio.gather( + read_stream(proc.stdout, stdout_data, stdout_file), + read_stream(proc.stderr, stderr_data, stderr_file), + ) + + returncode = await proc.wait() + run.exit_code = returncode + + if returncode != 0: + raise RuntimeError(f"CLI exited with code {returncode}") + + def _build_command(self, run: RunRecord) -> List[str]: + """Build the qwen CLI command.""" + cmd = ["qwen"] + + # Add model + cmd.extend(["--model", run.model]) + + # Add yolo if enabled + if self.config.yolo: + cmd.append("--yolo") + + # Always enable OpenAI logging to run-specific logs directory + cmd.append("--openai-logging") + run_logs_dir = (Path(run.output_dir) / "logs").resolve() + run_logs_dir.mkdir(parents=True, exist_ok=True) + cmd.extend(["--openai-logging-dir", str(run_logs_dir)]) + run.logs_dir = str(run_logs_dir) + + # Get the task prompts + task = next((t for t in self.config.tasks if t.id == run.task_id), None) + if task: + prompt_text = "\n\n".join(task.prompts) + cmd.extend(["--prompt", prompt_text]) + + return cmd + + +def generate_run_matrix(config: RunConfig) -> List[RunRecord]: + """Generate all task × model combinations.""" + runs = [] + for task in config.tasks: + for model in config.models: + run_id = str(uuid.uuid4())[:8] + runs.append(RunRecord( + run_id=run_id, + task_id=task.id, + task_name=task.name, + model=model, + status=RunStatus.QUEUED, + )) + return runs + + +def load_config(config_path: Path) -> RunConfig: + """Load configuration from JSON file.""" + with open(config_path, 'r') as f: + data = json.load(f) + + tasks = [Task(**t) for t in data.get("tasks", [])] + + return RunConfig( + tasks=tasks, + models=data.get("models", []), + concurrency=data.get("concurrency", 4), + yolo=data.get("yolo", True), + source_repo=Path(data.get("source_repo", ".")).resolve(), + worktree_base=Path(data.get("worktree_base", "~/.qwen/worktrees")).expanduser(), + outputs_dir=Path(data.get("outputs_dir", "./outputs")), + results_file=Path(data.get("results_file", "./results.json")), + ) + + +async def execute_single_run( + run: RunRecord, + config: RunConfig, + tracker: StatusTracker, + worktree_manager: GitWorktreeManager, + qwen_runner: QwenRunner, + console: Console, +) -> None: + """Execute a single run with proper cleanup.""" + worktree_dir = None + + try: + # Step 1: Create worktree + await tracker.update_status(run.run_id, RunStatus.PREPARING) + worktree_dir = config.worktree_base / f"run-{run.run_id}" + await worktree_manager.create(config.source_repo, worktree_dir) + run.worktree_path = str(worktree_dir) + run.started_at = datetime.now().isoformat() + + # Step 2: Initialize (npm install + build) + await tracker.update_status(run.run_id, RunStatus.INITIALIZING) + await worktree_manager.initialize(worktree_dir) + + # Step 3: Run CLI + await tracker.update_status(run.run_id, RunStatus.RUNNING) + output_dir = config.outputs_dir / run.run_id + await qwen_runner.run(run, worktree_dir, output_dir) + + # Step 4: Success + run.ended_at = datetime.now().isoformat() + await tracker.update_status( + run.run_id, + RunStatus.SUCCEEDED, + exit_code=run.exit_code, + ended_at=run.ended_at, + ) + console.print(f"[green]✓[/green] {run.task_name} / {run.model}") + + except Exception as e: + run.ended_at = datetime.now().isoformat() + await tracker.update_status( + run.run_id, + RunStatus.FAILED, + error_message=str(e), + ended_at=run.ended_at, + ) + console.print(f"[red]✗[/red] {run.task_name} / {run.model}: {e}") + + finally: + # Step 5: Cleanup + if worktree_dir: + await worktree_manager.remove(worktree_dir) + + +async def run_all(config: RunConfig, console: Console) -> ExecutionState: + """Run all task/model combinations concurrently.""" + # Setup directories + config.worktree_base.mkdir(parents=True, exist_ok=True) + config.outputs_dir.mkdir(parents=True, exist_ok=True) + + # Generate all runs + runs = generate_run_matrix(config) + console.print(f"[bold]Generated {len(runs)} runs:[/bold] {len(config.tasks)} tasks × {len(config.models)} models") + + # Initialize components + tracker = StatusTracker(config.results_file, console) + await tracker.initialize(runs) + + worktree_manager = GitWorktreeManager(console, config.source_repo) + qwen_runner = QwenRunner(config, console) + display = ProgressDisplay(console) + + # Start progress display + display.start() + + # Progress update task + stop_event = asyncio.Event() + + async def update_progress(): + while not stop_event.is_set(): + state = tracker.get_state() + display.update(state) + if state.completed >= state.total: + stop_event.set() + break + try: + await asyncio.wait_for(stop_event.wait(), timeout=0.5) + except asyncio.TimeoutError: + continue + + # Execute runs with semaphore-controlled concurrency + semaphore = asyncio.Semaphore(config.concurrency) + + async def execute_with_limit(run: RunRecord): + async with semaphore: + await execute_single_run( + run, config, tracker, worktree_manager, qwen_runner, console + ) + + # Run everything + try: + await asyncio.gather( + update_progress(), + asyncio.gather(*[execute_with_limit(r) for r in runs]), + ) + finally: + stop_event.set() + display.stop() + + # Show final summary + final_state = tracker.get_state() + display.show_final_summary(final_state) + + return final_state + + +def main(): + parser = argparse.ArgumentParser( + description="Qwen Concurrent Runner - Execute multiple CLI tasks across models" + ) + parser.add_argument( + "config", + type=Path, + help="Path to configuration JSON file", + ) + parser.add_argument( + "--version", + action="version", + version="%(prog)s 1.0.0", + ) + + args = parser.parse_args() + + if not args.config.exists(): + print(f"Error: Config file not found: {args.config}", file=sys.stderr) + sys.exit(1) + + console = Console() + config = load_config(args.config) + + try: + final_state = asyncio.run(run_all(config, console)) + sys.exit(0 if final_state.failed == 0 else 1) + except KeyboardInterrupt: + console.print("\n[yellow]Interrupted by user[/yellow]") + sys.exit(130) + except Exception as e: + console.print(f"\n[red]Fatal error: {e}[/red]") + sys.exit(1) + + +if __name__ == "__main__": + main()