#!/usr/bin/env python3
"""
Qwen Concurrent Runner - Execute multiple CLI tasks across different models concurrently.
This tool creates isolated git worktrees for each task/model combination and executes
the Qwen CLI in parallel with status tracking and output capture.
"""
from __future__ import annotations
import argparse
import html
import asyncio
import json
import os
import shutil
import subprocess
import sys
import uuid
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import List, Optional, Dict, Any, Tuple
from rich.console import Console
from rich.live import Live
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, TaskID
import aiofiles
import aiofiles.os
class RunStatus(Enum):
"""Execution status for a single run."""
QUEUED = "queued"
PREPARING = "preparing"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
CLEANING = "cleaning"
@dataclass
class Task:
"""A task definition containing one or more prompts."""
id: str
name: str
prompts: List[str]
@dataclass
class ModelSpec:
"""One model to run: name and optional auth_type (e.g. anthropic)."""
name: str
auth_type: Optional[str] = None
@dataclass
class RunConfig:
"""Configuration for the concurrent execution."""
tasks: List[Task]
models: List[ModelSpec] # name + optional auth_type per model
concurrency: int = 4
yolo: bool = True
source_repo: Path = field(default_factory=lambda: Path.cwd())
worktree_base: Path = field(default_factory=lambda: Path.home() / ".qwen" / "worktrees")
outputs_dir: Path = field(default_factory=lambda: Path("./outputs"))
results_file: Path = field(default_factory=lambda: Path("./results.json"))
branch: Optional[str] = None # Git branch to checkout (uses default if not set)
keep_worktree: bool = False # If true, don't remove git worktree after run
@dataclass
class PromptResult:
"""Result of a single prompt execution."""
prompt_index: int
prompt_text: str
stdout_file: str
stderr_file: str
exit_code: int
status: str # "succeeded" or "failed"
@dataclass
class RunRecord:
"""Record of a single task/model execution."""
run_id: str
task_id: str
task_name: str
model: str
status: RunStatus
auth_type: Optional[str] = None # e.g. "anthropic" for qwen --auth-type
worktree_path: Optional[str] = None
output_dir: Optional[str] = None
logs_dir: Optional[str] = None
started_at: Optional[str] = None
ended_at: Optional[str] = None
exit_code: Optional[int] = None
error_message: Optional[str] = None
prompt_results: List[PromptResult] = field(default_factory=list)
diff_file: Optional[str] = None # Path to git diff output
session_log_file: Optional[str] = None # Path to session log (chat recording)
session_html_file: Optional[str] = None # Path to rendered chat HTML
session_id: Optional[str] = None # Session ID (UUID from chat recording)
def to_dict(self) -> Dict[str, Any]:
return {
"run_id": self.run_id,
"task_id": self.task_id,
"task_name": self.task_name,
"model": self.model,
"status": self.status.value,
"auth_type": self.auth_type,
"worktree_path": self.worktree_path,
"output_dir": self.output_dir,
"logs_dir": self.logs_dir,
"started_at": self.started_at,
"ended_at": self.ended_at,
"exit_code": self.exit_code,
"error_message": self.error_message,
"diff_file": self.diff_file,
"session_log_file": self.session_log_file,
"session_html_file": self.session_html_file,
"session_id": self.session_id,
"prompt_results": [
{
"prompt_index": r.prompt_index,
"prompt_text": r.prompt_text,
"stdout_file": r.stdout_file,
"stderr_file": r.stderr_file,
"exit_code": r.exit_code,
"status": r.status,
}
for r in self.prompt_results
],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> RunRecord:
return cls(
run_id=data["run_id"],
task_id=data["task_id"],
task_name=data["task_name"],
model=data["model"],
status=RunStatus(data["status"]),
auth_type=data.get("auth_type"),
worktree_path=data.get("worktree_path"),
output_dir=data.get("output_dir"),
logs_dir=data.get("logs_dir"),
started_at=data.get("started_at"),
ended_at=data.get("ended_at"),
exit_code=data.get("exit_code"),
error_message=data.get("error_message"),
diff_file=data.get("diff_file"),
session_log_file=data.get("session_log_file"),
session_html_file=data.get("session_html_file"),
session_id=data.get("session_id"),
)
@dataclass
class ExecutionState:
"""Overall execution state across all runs."""
runs: List[RunRecord] = field(default_factory=list)
total: int = 0
completed: int = 0
succeeded: int = 0
failed: int = 0
class GitWorktreeManager:
"""Manages git worktree creation, initialization, and cleanup."""
def __init__(self, console: Console, source_repo: Path):
self.console = console
self.source_repo = source_repo
async def ensure_git_repo(self) -> None:
"""Ensure the source repository is a valid git repo, initialize if not."""
git_dir = self.source_repo / ".git"
if git_dir.exists():
return
self.console.print(f"[yellow]Source repo is not a git repository. Initializing...[/yellow]")
# git init
result = await self._run_command(["git", "init"], cwd=self.source_repo)
if result.returncode != 0:
raise RuntimeError(f"Failed to initialize git repo: {result.stderr}")
# git add .
result = await self._run_command(["git", "add", "."], cwd=self.source_repo)
if result.returncode != 0:
raise RuntimeError(f"Failed to stage files: {result.stderr}")
# git commit
result = await self._run_command(
["git", "commit", "-m", "Initial commit"],
cwd=self.source_repo
)
if result.returncode != 0:
raise RuntimeError(f"Failed to create initial commit: {result.stderr}")
self.console.print(f"[green]✓ Git repository initialized[/green]")
async def create(self, source_repo: Path, worktree_dir: Path, branch: Optional[str] = None) -> Path:
"""Create a new git worktree from the source repository."""
worktree_dir.parent.mkdir(parents=True, exist_ok=True)
# Build worktree command
if branch:
# Create a unique branch for this worktree based on the specified branch
worktree_branch = f"{branch}-{worktree_dir.name}"
cmd = ["git", "worktree", "add", "-b", worktree_branch, str(worktree_dir), branch]
self.console.print(f"[dim]Git: Creating worktree with branch '{worktree_branch}' from '{branch}'...[/dim]")
else:
# Create worktree from HEAD (default branch)
cmd = ["git", "worktree", "add", str(worktree_dir)]
self.console.print(f"[dim]Git: {' '.join(cmd)}[/dim]")
result = await self._run_command(cmd, cwd=source_repo)
if result.returncode != 0:
raise RuntimeError(f"Failed to create worktree: {result.stderr}")
return worktree_dir
async def remove(self, worktree_dir: Path) -> None:
"""Remove a git worktree."""
if not worktree_dir.exists():
self.console.print(f"[dim]Worktree already removed: {worktree_dir}[/dim]")
return
self.console.print(f"[dim]Removing worktree: {worktree_dir}[/dim]")
cmd = ["git", "worktree", "remove", "--force", str(worktree_dir)]
result = await self._run_command(cmd, cwd=self.source_repo)
if result.returncode != 0:
self.console.print(f"[yellow]Warning: Failed to remove worktree {worktree_dir}: {result.stderr}[/yellow]")
# Fallback to manual removal
try:
shutil.rmtree(worktree_dir, ignore_errors=True)
except Exception:
pass
async def get_diff(self, worktree_dir: Path) -> str:
"""Get git diff showing all changes in the worktree."""
self.console.print(f"[dim]Capturing git diff from {worktree_dir.name}...[/dim]")
# First, stage all changes (including untracked files) so we can get a complete diff
await self._run_command(["git", "add", "-A"], cwd=worktree_dir)
# Get the diff (staged changes)
result = await self._run_command(["git", "diff", "--cached", "--no-color"], cwd=worktree_dir)
if result.returncode != 0:
self.console.print(f"[yellow]Warning: Failed to get diff: {result.stderr}[/yellow]")
return ""
return result.stdout
async def collect_session_log(self, worktree_dir: Path, output_dir: Path) -> Optional[Tuple[Path, str, Path]]:
"""Collect the session log file from the worktree's chat recording.
Session logs are stored at:
~/.qwen/projects/{projectId}/chats/{sessionId}.jsonl
Where projectId is the sanitized worktree path.
Returns:
Tuple of (output_path, session_id, rendered_html_path) or None if not found.
"""
import re
# Compute projectId by sanitizing the worktree path (same as storage.ts)
project_id = re.sub(r'[^a-zA-Z0-9]', '-', str(worktree_dir))
# Build the chats directory path
qwen_dir = Path.home() / ".qwen"
chats_dir = qwen_dir / "projects" / project_id / "chats"
if not chats_dir.exists():
self.console.print(f"[dim]No chats directory found at {chats_dir}[/dim]")
return None
# Find all .jsonl files in the chats directory
jsonl_files = list(chats_dir.glob("*.jsonl"))
if not jsonl_files:
self.console.print(f"[dim]No session log files found in {chats_dir}[/dim]")
return None
# Get the most recently modified file (the one just created)
session_log = max(jsonl_files, key=lambda f: f.stat().st_mtime)
# Extract session ID from filename (remove .jsonl extension)
session_id = session_log.stem
# Copy to output directory with original filename (preserves session ID)
# Place in 'chats' subdir to match the actual session log structure
chats_output_dir = output_dir / "chats"
chats_output_dir.mkdir(parents=True, exist_ok=True)
output_log = chats_output_dir / session_log.name
# Read the original file, modify cwd field, and write to output
# cwd should be the actual current working dir (where runner is executed)
actual_cwd = str(Path.cwd())
messages = []
start_time = None
async with aiofiles.open(session_log, 'r') as src, aiofiles.open(output_log, 'w') as dst:
async for line in src:
line = line.strip()
if line:
try:
record = json.loads(line)
record['cwd'] = actual_cwd
messages.append(record)
if not start_time and 'time' in record:
start_time = record['time']
await dst.write(json.dumps(record, ensure_ascii=False) + '\n')
except json.JSONDecodeError:
# If line is not valid JSON, write it as-is
await dst.write(line + '\n')
self.console.print(f"[dim]Session log copied: {session_log.name}[/dim]")
# Generate rendered HTML using the JS exporter script
rendered_html_path = chats_output_dir / f"{session_id}.html"
try:
exporter_script = Path(__file__).parent / "export-html-from-chatrecord-jsonl.js"
if exporter_script.exists():
# Call the JS script to generate the HTML
result = await self._run_command(
["node", str(exporter_script), str(output_log)],
cwd=exporter_script.parent,
timeout=30
)
if result.returncode == 0:
self.console.print(f"[dim]Rendered chat HTML saved: {rendered_html_path.name}[/dim]")
else:
self.console.print(f"[yellow]Warning: HTML exporter failed: {result.stderr}[/yellow]")
else:
self.console.print(f"[yellow]Warning: HTML exporter script not found at {exporter_script}[/yellow]")
except Exception as e:
self.console.print(f"[yellow]Warning: Failed to render chat HTML: {e}[/yellow]")
return output_log, session_id, rendered_html_path
async def _run_command(
self,
cmd: List[str],
cwd: Optional[Path] = None,
timeout: int = 60
) -> subprocess.CompletedProcess:
"""Run a command asynchronously."""
proc = await asyncio.create_subprocess_exec(
*cmd,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout
)
return subprocess.CompletedProcess(
args=cmd,
returncode=proc.returncode,
stdout=stdout.decode() if stdout else "",
stderr=stderr.decode() if stderr else "",
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise RuntimeError(f"Command timed out after {timeout}s: {' '.join(cmd)}")
class StatusTracker:
"""Thread-safe status tracking with JSON persistence."""
def __init__(self, results_file: Path, console: Console):
self.results_file = results_file
self.console = console
self._lock = asyncio.Lock()
self._runs: Dict[str, RunRecord] = {}
async def initialize(self, runs: List[RunRecord]) -> None:
"""Initialize the tracker with all runs."""
async with self._lock:
for run in runs:
self._runs[run.run_id] = run
await self._persist()
async def update_status(
self,
run_id: str,
status: RunStatus,
**kwargs
) -> None:
"""Update the status of a run."""
async with self._lock:
if run_id in self._runs:
run = self._runs[run_id]
run.status = status
for key, value in kwargs.items():
if hasattr(run, key):
setattr(run, key, value)
await self._persist()
async def _persist(self) -> None:
"""Persist current state to JSON file and generate HTML report."""
data = {
"updated_at": datetime.now().isoformat(),
"runs": [run.to_dict() for run in self._runs.values()],
}
# Write JSON atomically
temp_file = self.results_file.with_suffix('.tmp')
async with aiofiles.open(temp_file, 'w') as f:
await f.write(json.dumps(data, indent=2))
temp_file.replace(self.results_file)
# Generate HTML report
await self._generate_html(data)
async def _generate_html(self, data: Dict[str, Any]) -> None:
"""Generate a beautiful HTML report."""
html_file = self.results_file.with_name("index.html")
# Calculate summary
total = len(data["runs"])
succeeded = sum(1 for r in data["runs"] if r["status"] == "succeeded")
failed = sum(1 for r in data["runs"] if r["status"] == "failed")
running = sum(1 for r in data["runs"] if r["status"] in ["preparing", "running"])
# Build rows
rows = []
for run in sorted(data["runs"], key=lambda x: x.get("started_at") or "", reverse=True):
status = run["status"]
status_class = f"status-{status}"
# Links
links = []
# Output Directory
if run.get("output_dir"):
# Make path absolute for local viewing
abs_output_dir = os.path.abspath(run["output_dir"])
links.append(f'Outputs')
# Diff File
if run.get("diff_file"):
abs_diff_file = os.path.abspath(run["diff_file"])
links.append(f'Diff')
# Session Log
if run.get("session_html_file"):
abs_session_html = os.path.abspath(run["session_html_file"])
links.append(f'Chat')
elif run.get("session_log_file"):
abs_session_log = os.path.abspath(run["session_log_file"])
links.append(f'Chat (Raw)')
# Worktree
if run.get("worktree_path"):
abs_worktree = os.path.abspath(run["worktree_path"])
links.append(f'Worktree')
# Prompt results (stdout/stderr)
prompt_links = []
for i, p in enumerate(run.get("prompt_results", []), 1):
p_links = []
if p.get("stdout_file"):
p_links.append(f'out')
if p.get("stderr_file"):
p_links.append(f'err')
if p_links:
prompt_links.append(f'P{i}: {"|".join(p_links)}')
links_html = " | ".join(links)
prompts_html = "
".join(prompt_links)
duration = "N/A"
if run.get("started_at") and run.get("ended_at"):
try:
start = datetime.fromisoformat(run["started_at"])
end = datetime.fromisoformat(run["ended_at"])
duration = f"{(end - start).total_seconds():.1f}s"
except: pass
error_msg = f'
{run["run_id"]}| ID | Task | Model | Status | Duration | Logs & Artifacts | Prompts | Error |
|---|