From 452c4e2bb163d85cd8f95505aa2a7fc4dde2f9e9 Mon Sep 17 00:00:00 2001 From: Alishahryar1 Date: Wed, 28 Jan 2026 21:19:16 -0800 Subject: [PATCH] new messages now get their own cli instance and replies are queued --- .env.example | 1 + providers/cli_session_manager.py | 206 +++++++++++++++++++++++++++++++ server.py | 167 ++++++++++++++++++------- 3 files changed, 332 insertions(+), 42 deletions(-) create mode 100644 providers/cli_session_manager.py diff --git a/.env.example b/.env.example index ecbcf45..55766db 100644 --- a/.env.example +++ b/.env.example @@ -52,4 +52,5 @@ TELEGRAM_API_HASH="abcdef123456..." ALLOWED_TELEGRAM_USER_ID="YOUR_TELEGRAM_USER_ID" CLAUDE_WORKSPACE="./agent_workspace" ALLOWED_DIR="C:/Users/YourName/projects/myproject" +MAX_CLI_SESSIONS=10 # Maximum parallel Claude CLI instances WRAPPER_WS_URL="ws://localhost:8083/ws" \ No newline at end of file diff --git a/providers/cli_session_manager.py b/providers/cli_session_manager.py new file mode 100644 index 0000000..dbd1ce5 --- /dev/null +++ b/providers/cli_session_manager.py @@ -0,0 +1,206 @@ +""" +CLI Session Manager for Multi-Instance Claude CLI Support + +Manages a pool of CLISession instances, each handling one conversation. +This enables true parallel processing where multiple conversations run +simultaneously in separate CLI processes. +""" + +import asyncio +import uuid +import logging +from typing import Dict, Optional, Tuple, List +from .claude_cli import CLISession + +logger = logging.getLogger(__name__) + + +class CLISessionManager: + """ + Manages multiple CLISession instances for parallel conversation processing. + + Each new conversation gets its own CLISession with its own subprocess. + Replies to existing conversations reuse the same CLISession instance. + """ + + def __init__( + self, + workspace_path: str, + api_url: str, + allowed_dirs: Optional[List[str]] = None, + max_sessions: int = 10, + ): + """ + Initialize the session manager. + + Args: + workspace_path: Working directory for CLI processes + api_url: API URL for the proxy + allowed_dirs: Directories the CLI is allowed to access + max_sessions: Maximum concurrent sessions (prevents resource exhaustion) + """ + self.workspace = workspace_path + self.api_url = api_url + self.allowed_dirs = allowed_dirs or [] + self.max_sessions = max_sessions + + # Active sessions: real_session_id -> CLISession + self._sessions: Dict[str, CLISession] = {} + + # Pending sessions: temp_id -> CLISession (before we know real session ID) + self._pending_sessions: Dict[str, CLISession] = {} + + # Mapping: temp_id -> real_session_id (for updating after CLI responds) + self._temp_to_real: Dict[str, str] = {} + + # Lock for thread-safe session management + self._lock = asyncio.Lock() + + logger.info(f"CLISessionManager initialized (max_sessions={max_sessions})") + + async def get_or_create_session( + self, session_id: Optional[str] = None + ) -> Tuple[CLISession, str, bool]: + """ + Get an existing session or create a new one. + + Args: + session_id: Optional existing session ID to resume. + If None, creates a new session. + + Returns: + Tuple of (CLISession instance, session_id, is_new_session) + For new sessions, session_id is a temporary ID until CLI assigns real one. + """ + async with self._lock: + # Case 1: Resume existing session + if session_id and session_id in self._sessions: + logger.debug(f"Reusing existing session: {session_id}") + return self._sessions[session_id], session_id, False + + # Case 2: Check if we're at capacity + total_sessions = len(self._sessions) + len(self._pending_sessions) + if total_sessions >= self.max_sessions: + # Find and clean up any idle sessions + await self._cleanup_idle_sessions_unlocked() + + # Re-check after cleanup + total_sessions = len(self._sessions) + len(self._pending_sessions) + if total_sessions >= self.max_sessions: + logger.warning(f"Max sessions ({self.max_sessions}) reached") + raise RuntimeError( + f"Maximum concurrent sessions ({self.max_sessions}) reached. " + "Please wait for existing conversations to complete." + ) + + # Case 3: Create new session + temp_id = f"pending_{uuid.uuid4().hex[:8]}" + new_session = CLISession( + workspace_path=self.workspace, + api_url=self.api_url, + allowed_dirs=self.allowed_dirs, + ) + self._pending_sessions[temp_id] = new_session + logger.info(f"Created new session with temp_id: {temp_id}") + + return new_session, temp_id, True + + async def register_real_session_id(self, temp_id: str, real_session_id: str) -> bool: + """ + Called when we learn the real session ID from CLI output. + Moves session from pending to active. + + Args: + temp_id: The temporary ID we assigned + real_session_id: The real session ID from Claude CLI + + Returns: + True if registration succeeded, False otherwise + """ + async with self._lock: + if temp_id not in self._pending_sessions: + logger.warning(f"Temp session {temp_id} not found for registration") + return False + + session = self._pending_sessions.pop(temp_id) + self._sessions[real_session_id] = session + self._temp_to_real[temp_id] = real_session_id + + logger.info(f"Registered session: {temp_id} -> {real_session_id}") + return True + + async def get_real_session_id(self, temp_id: str) -> Optional[str]: + """Get the real session ID for a temporary ID.""" + async with self._lock: + return self._temp_to_real.get(temp_id) + + async def remove_session(self, session_id: str) -> bool: + """ + Remove a session from the manager. + + Args: + session_id: Session ID to remove (can be temp or real) + + Returns: + True if session was removed, False if not found + """ + async with self._lock: + # Check pending sessions + if session_id in self._pending_sessions: + session = self._pending_sessions.pop(session_id) + await session.stop() + logger.info(f"Removed pending session: {session_id}") + return True + + # Check active sessions + if session_id in self._sessions: + session = self._sessions.pop(session_id) + await session.stop() + # Clean up temp mapping + for temp, real in list(self._temp_to_real.items()): + if real == session_id: + del self._temp_to_real[temp] + logger.info(f"Removed active session: {session_id}") + return True + + return False + + async def _cleanup_idle_sessions_unlocked(self): + """ + Clean up sessions that are no longer busy. + Must be called while holding self._lock. + """ + idle_sessions = [] + + for session_id, session in self._sessions.items(): + if not session.is_busy: + idle_sessions.append(session_id) + + for session_id in idle_sessions[:3]: # Remove up to 3 idle sessions + session = self._sessions.pop(session_id) + await session.stop() + logger.debug(f"Cleaned up idle session: {session_id}") + + async def stop_all(self): + """Stop all active sessions. Called on shutdown.""" + async with self._lock: + all_sessions = list(self._sessions.values()) + list(self._pending_sessions.values()) + for session in all_sessions: + try: + await session.stop() + except Exception as e: + logger.error(f"Error stopping session: {e}") + + self._sessions.clear() + self._pending_sessions.clear() + self._temp_to_real.clear() + logger.info("All sessions stopped") + + def get_stats(self) -> Dict: + """Get current session statistics.""" + return { + "active_sessions": len(self._sessions), + "pending_sessions": len(self._pending_sessions), + "max_sessions": self.max_sessions, + "busy_count": sum(1 for s in self._sessions.values() if s.is_busy), + } diff --git a/server.py b/server.py index 6988f58..1e0f697 100644 --- a/server.py +++ b/server.py @@ -25,7 +25,8 @@ from contextlib import asynccontextmanager from fastapi import FastAPI, Request, HTTPException, Depends from fastapi.responses import StreamingResponse, JSONResponse import tiktoken -from providers.claude_cli import CLISession, CLIParser +from providers.claude_cli import CLIParser +from providers.cli_session_manager import CLISessionManager # Optional: telethon for the bot try: @@ -307,8 +308,13 @@ else: INTERNAL_API_URL = "http://localhost:8082/v1" # Initialize Global Instances -# CLI runs in CLI_WORKSPACE (user's project), but we still pass it as allowed dir -cli_session = CLISession(CLI_WORKSPACE, INTERNAL_API_URL, [CLI_WORKSPACE]) +# CLI Session Manager - each conversation gets its own CLI instance +cli_session_manager = CLISessionManager( + workspace_path=CLI_WORKSPACE, + api_url=INTERNAL_API_URL, + allowed_dirs=[CLI_WORKSPACE], + max_sessions=int(os.getenv("MAX_CLI_SESSIONS", "10")), +) # Session storage and message queue (stored in internal data path, not user's project) from providers.session_store import SessionStore @@ -322,12 +328,22 @@ def register_bot_handlers(client: "TelegramClient"): ALLOWED_USER_ID = os.getenv("ALLOWED_TELEGRAM_USER_ID") logger.info(f"DEBUG: Registering bot handlers. Allowed user ID: {ALLOWED_USER_ID}") + async def send_error_to_user(chat_id: int, error_msg: str, context: str = ""): + """Send a formatted error message to the user.""" + try: + formatted = f"❌ **Error**" + if context: + formatted += f" ({context})" + formatted += f"\n\n```\n{str(error_msg)[:500]}\n```" + await client.send_message(chat_id, formatted, parse_mode="markdown") + except Exception as e: + logger.error(f"Failed to send error to user: {e}") + async def process_claude_task(session_id_to_resume: Optional[str], queued_msg: QueuedMessage): """ Core task processor - handles a single Claude CLI interaction. - Can be called directly or by the queue manager. + Now uses CLISessionManager for multi-instance support. """ - event = queued_msg.event prompt = queued_msg.prompt status_msg_id = queued_msg.reply_msg_id chat_id = queued_msg.chat_id @@ -338,12 +354,15 @@ def register_bot_handlers(client: "TelegramClient"): status_msg = await client.get_messages(chat_id, ids=status_msg_id) except Exception as e: logger.error(f"Failed to get status message: {e}") + await send_error_to_user(chat_id, str(e), "getting status message") return # Unified message accumulator message_parts = [] last_ui_update = 0 - captured_session_id = session_id_to_resume # May be updated from CLI output + captured_session_id = session_id_to_resume + temp_session_id = None # Track temp ID for new sessions + cli_session = None # The CLISession instance for this task def build_unified_message(status=None): lines = [] @@ -361,6 +380,8 @@ def register_bot_handlers(client: "TelegramClient"): lines.append(f"🤖 **Subagent:** {content}") elif part_type == "content": lines.append(content) + elif part_type == "error": + lines.append(f"⚠️ {content}") result = "\n".join(lines) if len(result) > 4000: @@ -381,26 +402,53 @@ def register_bot_handlers(client: "TelegramClient"): logger.debug(f"UI update failed: {e}") try: - # Start or resume the CLI session + # Get or create CLI session from the manager is_resume = session_id_to_resume is not None log_prefix = f"Resuming session {session_id_to_resume}" if is_resume else "Starting new session" logger.info(f"BOT: {log_prefix} for prompt: {prompt[:50]}...") - + + try: + cli_session, session_or_temp_id, is_new = await cli_session_manager.get_or_create_session( + session_id=session_id_to_resume + ) + if is_new: + temp_session_id = session_or_temp_id + logger.info(f"BOT: Created new CLI session with temp_id: {temp_session_id}") + else: + captured_session_id = session_or_temp_id + logger.info(f"BOT: Reusing CLI session: {captured_session_id}") + except RuntimeError as e: + # Max sessions reached + logger.warning(f"BOT: Session limit reached: {e}") + message_parts.append(("error", str(e))) + await update_bot_ui("⏳ **Session limit reached**", force=True) + return + except Exception as e: + logger.error(f"BOT: Failed to get/create session: {e}") + await send_error_to_user(chat_id, str(e), "creating session") + return + + # Process CLI events async for event_data in cli_session.start_task(prompt, session_id=session_id_to_resume): if not isinstance(event_data, dict): continue # Handle session_info event to capture session ID if event_data.get("type") == "session_info": - captured_session_id = event_data.get("session_id") - if captured_session_id and not is_resume: - # Save the session mapping for new sessions + real_session_id = event_data.get("session_id") + if real_session_id and temp_session_id: + # Register the real session ID + await cli_session_manager.register_real_session_id( + temp_session_id, real_session_id + ) + captured_session_id = real_session_id + # Save to session store for Telegram reply tracking session_store.save_session( - session_id=captured_session_id, + session_id=real_session_id, chat_id=chat_id, initial_msg_id=original_msg_id, ) - logger.info(f"BOT: Saved new session {captured_session_id} for msg {original_msg_id}") + logger.info(f"BOT: Registered session {temp_session_id} -> {real_session_id}") continue parsed = CLIParser.parse_event(event_data) @@ -411,6 +459,8 @@ def register_bot_handlers(client: "TelegramClient"): continue if "login" in raw_line.lower(): await client.send_message(chat_id, "⚠️ **Claude requires login. Run `claude` in terminal.**") + elif "error" in raw_line.lower(): + message_parts.append(("error", raw_line[:200])) continue if not parsed: @@ -459,15 +509,25 @@ def register_bot_handlers(client: "TelegramClient"): session_store.update_last_message(captured_session_id, status_msg.id) elif parsed["type"] == "error": - message_parts.append(("content", f"**Error:** {parsed['message']}")) + error_msg = parsed.get("message", "Unknown error") + message_parts.append(("error", f"**CLI Error:** {error_msg}")) await update_bot_ui("❌ **Error**", force=True) + except asyncio.CancelledError: + logger.info(f"BOT: Task cancelled for session {captured_session_id or temp_session_id}") + message_parts.append(("error", "Task was cancelled")) + await update_bot_ui("⏹ **Cancelled**", force=True) except Exception as e: - logger.error(f"Bot task failed: {e}") + import traceback + logger.error(f"Bot task failed: {e}\n{traceback.format_exc()}") try: - await client.send_message(chat_id, f"💥 **Failed:** {e}") + error_text = str(e)[:300] + await status_msg.edit( + f"💥 **Task Failed**\n\n```\n{error_text}\n```", + parse_mode="markdown" + ) except: - pass + await send_error_to_user(chat_id, str(e), "task execution") @client.on(events.NewMessage()) async def handle_telegram_message(event): @@ -482,17 +542,32 @@ def register_bot_handlers(client: "TelegramClient"): # 1. Handle Commands if event.text == "/stop": - await cli_session.stop() - await event.reply("⏹ **Claude process stopped.**") + await cli_session_manager.stop_all() + await event.reply("⏹ **All Claude sessions stopped.**") + return + + if event.text == "/stats": + stats = cli_session_manager.get_stats() + await event.reply( + f"📊 **Session Stats**\n\n" + f"• Active: {stats['active_sessions']}\n" + f"• Pending: {stats['pending_sessions']}\n" + f"• Busy: {stats['busy_count']}\n" + f"• Max: {stats['max_sessions']}" + ) return if event.text == "/queue": - # Show queue status (optional command) - await event.reply("📋 **Queue status:** Feature active. Reply to old messages to continue conversations.") + stats = cli_session_manager.get_stats() + await event.reply( + f"📋 **Queue Status**\n\n" + f"Active sessions: {stats['active_sessions']}/{stats['max_sessions']}\n" + f"Reply to old messages to continue conversations." + ) return # 2. Filter out bot's own status messages and empty text - if not event.text or any(event.text.startswith(p) for p in ["⏳", "💭", "🔧", "✅", "❌", "🚀", "🤖", "📋"]): + if not event.text or any(event.text.startswith(p) for p in ["⏳", "💭", "🔧", "✅", "❌", "🚀", "🤖", "📋", "📊", "🔄"]): return logger.info(f"BOT_TASK: {event.text}") @@ -510,14 +585,24 @@ def register_bot_handlers(client: "TelegramClient"): logger.info(f"BOT: No session found for reply to msg {reply_to_msg_id}, starting new session") # 4. Send initial status message - if session_id_to_resume: - if cli_session.is_busy: - queue_size = message_queue.get_queue_size(session_id_to_resume) + 1 - status_msg = await event.reply(f"📋 **Queued** (position {queue_size}) - Claude is busy, will process when ready...") + try: + if session_id_to_resume: + if message_queue.is_session_busy(session_id_to_resume): + queue_size = message_queue.get_queue_size(session_id_to_resume) + 1 + status_msg = await event.reply(f"📋 **Queued** (position {queue_size}) - waiting for previous request...") + else: + status_msg = await event.reply("🔄 **Continuing conversation...**") else: - status_msg = await event.reply("🔄 **Continuing conversation...**") - else: - status_msg = await event.reply("⏳ **Launching Claude CLI...**") + stats = cli_session_manager.get_stats() + if stats['active_sessions'] >= stats['max_sessions']: + status_msg = await event.reply( + f"⏳ **Waiting for slot...** ({stats['active_sessions']}/{stats['max_sessions']} sessions active)" + ) + else: + status_msg = await event.reply("⏳ **Launching new Claude CLI instance...**") + except Exception as e: + logger.error(f"Failed to send status message: {e}") + return # 5. Create queued message queued_msg = QueuedMessage( @@ -530,26 +615,24 @@ def register_bot_handlers(client: "TelegramClient"): # 6. Process or queue based on session state if session_id_to_resume and message_queue.is_session_busy(session_id_to_resume): - # Session is busy, queue the message + # Session is busy, queue the message for that specific session await message_queue.enqueue( session_id=session_id_to_resume, message=queued_msg, processor=process_claude_task, ) logger.info(f"BOT: Message queued for busy session {session_id_to_resume}") + elif session_id_to_resume: + # Resuming a free existing session - use queue to track busy state + await message_queue.enqueue( + session_id=session_id_to_resume, + message=queued_msg, + processor=process_claude_task, + ) else: - # Process immediately (either new session or free session) - if session_id_to_resume: - # Use the queue manager to track busy state - await message_queue.enqueue( - session_id=session_id_to_resume, - message=queued_msg, - processor=process_claude_task, - ) - else: - # New session - process directly - # We'll save the session ID once we get it from CLI - await process_claude_task(None, queued_msg) + # NEW session - process directly in a new task (parallel!) + # Each new message gets its own CLI instance immediately + asyncio.create_task(process_claude_task(None, queued_msg)) FAST_PREFIX_DETECTION = os.getenv("FAST_PREFIX_DETECTION", "true").lower() == "true"