new messages now get their own cli instance and replies are queued

This commit is contained in:
Alishahryar1 2026-01-28 21:19:16 -08:00
parent 8c5d3df323
commit 452c4e2bb1
3 changed files with 332 additions and 42 deletions

View file

@ -52,4 +52,5 @@ TELEGRAM_API_HASH="abcdef123456..."
ALLOWED_TELEGRAM_USER_ID="YOUR_TELEGRAM_USER_ID"
CLAUDE_WORKSPACE="./agent_workspace"
ALLOWED_DIR="C:/Users/YourName/projects/myproject"
MAX_CLI_SESSIONS=10 # Maximum parallel Claude CLI instances
WRAPPER_WS_URL="ws://localhost:8083/ws"

View file

@ -0,0 +1,206 @@
"""
CLI Session Manager for Multi-Instance Claude CLI Support
Manages a pool of CLISession instances, each handling one conversation.
This enables true parallel processing where multiple conversations run
simultaneously in separate CLI processes.
"""
import asyncio
import uuid
import logging
from typing import Dict, Optional, Tuple, List
from .claude_cli import CLISession
logger = logging.getLogger(__name__)
class CLISessionManager:
"""
Manages multiple CLISession instances for parallel conversation processing.
Each new conversation gets its own CLISession with its own subprocess.
Replies to existing conversations reuse the same CLISession instance.
"""
def __init__(
self,
workspace_path: str,
api_url: str,
allowed_dirs: Optional[List[str]] = None,
max_sessions: int = 10,
):
"""
Initialize the session manager.
Args:
workspace_path: Working directory for CLI processes
api_url: API URL for the proxy
allowed_dirs: Directories the CLI is allowed to access
max_sessions: Maximum concurrent sessions (prevents resource exhaustion)
"""
self.workspace = workspace_path
self.api_url = api_url
self.allowed_dirs = allowed_dirs or []
self.max_sessions = max_sessions
# Active sessions: real_session_id -> CLISession
self._sessions: Dict[str, CLISession] = {}
# Pending sessions: temp_id -> CLISession (before we know real session ID)
self._pending_sessions: Dict[str, CLISession] = {}
# Mapping: temp_id -> real_session_id (for updating after CLI responds)
self._temp_to_real: Dict[str, str] = {}
# Lock for thread-safe session management
self._lock = asyncio.Lock()
logger.info(f"CLISessionManager initialized (max_sessions={max_sessions})")
async def get_or_create_session(
self, session_id: Optional[str] = None
) -> Tuple[CLISession, str, bool]:
"""
Get an existing session or create a new one.
Args:
session_id: Optional existing session ID to resume.
If None, creates a new session.
Returns:
Tuple of (CLISession instance, session_id, is_new_session)
For new sessions, session_id is a temporary ID until CLI assigns real one.
"""
async with self._lock:
# Case 1: Resume existing session
if session_id and session_id in self._sessions:
logger.debug(f"Reusing existing session: {session_id}")
return self._sessions[session_id], session_id, False
# Case 2: Check if we're at capacity
total_sessions = len(self._sessions) + len(self._pending_sessions)
if total_sessions >= self.max_sessions:
# Find and clean up any idle sessions
await self._cleanup_idle_sessions_unlocked()
# Re-check after cleanup
total_sessions = len(self._sessions) + len(self._pending_sessions)
if total_sessions >= self.max_sessions:
logger.warning(f"Max sessions ({self.max_sessions}) reached")
raise RuntimeError(
f"Maximum concurrent sessions ({self.max_sessions}) reached. "
"Please wait for existing conversations to complete."
)
# Case 3: Create new session
temp_id = f"pending_{uuid.uuid4().hex[:8]}"
new_session = CLISession(
workspace_path=self.workspace,
api_url=self.api_url,
allowed_dirs=self.allowed_dirs,
)
self._pending_sessions[temp_id] = new_session
logger.info(f"Created new session with temp_id: {temp_id}")
return new_session, temp_id, True
async def register_real_session_id(self, temp_id: str, real_session_id: str) -> bool:
"""
Called when we learn the real session ID from CLI output.
Moves session from pending to active.
Args:
temp_id: The temporary ID we assigned
real_session_id: The real session ID from Claude CLI
Returns:
True if registration succeeded, False otherwise
"""
async with self._lock:
if temp_id not in self._pending_sessions:
logger.warning(f"Temp session {temp_id} not found for registration")
return False
session = self._pending_sessions.pop(temp_id)
self._sessions[real_session_id] = session
self._temp_to_real[temp_id] = real_session_id
logger.info(f"Registered session: {temp_id} -> {real_session_id}")
return True
async def get_real_session_id(self, temp_id: str) -> Optional[str]:
"""Get the real session ID for a temporary ID."""
async with self._lock:
return self._temp_to_real.get(temp_id)
async def remove_session(self, session_id: str) -> bool:
"""
Remove a session from the manager.
Args:
session_id: Session ID to remove (can be temp or real)
Returns:
True if session was removed, False if not found
"""
async with self._lock:
# Check pending sessions
if session_id in self._pending_sessions:
session = self._pending_sessions.pop(session_id)
await session.stop()
logger.info(f"Removed pending session: {session_id}")
return True
# Check active sessions
if session_id in self._sessions:
session = self._sessions.pop(session_id)
await session.stop()
# Clean up temp mapping
for temp, real in list(self._temp_to_real.items()):
if real == session_id:
del self._temp_to_real[temp]
logger.info(f"Removed active session: {session_id}")
return True
return False
async def _cleanup_idle_sessions_unlocked(self):
"""
Clean up sessions that are no longer busy.
Must be called while holding self._lock.
"""
idle_sessions = []
for session_id, session in self._sessions.items():
if not session.is_busy:
idle_sessions.append(session_id)
for session_id in idle_sessions[:3]: # Remove up to 3 idle sessions
session = self._sessions.pop(session_id)
await session.stop()
logger.debug(f"Cleaned up idle session: {session_id}")
async def stop_all(self):
"""Stop all active sessions. Called on shutdown."""
async with self._lock:
all_sessions = list(self._sessions.values()) + list(self._pending_sessions.values())
for session in all_sessions:
try:
await session.stop()
except Exception as e:
logger.error(f"Error stopping session: {e}")
self._sessions.clear()
self._pending_sessions.clear()
self._temp_to_real.clear()
logger.info("All sessions stopped")
def get_stats(self) -> Dict:
"""Get current session statistics."""
return {
"active_sessions": len(self._sessions),
"pending_sessions": len(self._pending_sessions),
"max_sessions": self.max_sessions,
"busy_count": sum(1 for s in self._sessions.values() if s.is_busy),
}

167
server.py
View file

@ -25,7 +25,8 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import StreamingResponse, JSONResponse
import tiktoken
from providers.claude_cli import CLISession, CLIParser
from providers.claude_cli import CLIParser
from providers.cli_session_manager import CLISessionManager
# Optional: telethon for the bot
try:
@ -307,8 +308,13 @@ else:
INTERNAL_API_URL = "http://localhost:8082/v1"
# Initialize Global Instances
# CLI runs in CLI_WORKSPACE (user's project), but we still pass it as allowed dir
cli_session = CLISession(CLI_WORKSPACE, INTERNAL_API_URL, [CLI_WORKSPACE])
# CLI Session Manager - each conversation gets its own CLI instance
cli_session_manager = CLISessionManager(
workspace_path=CLI_WORKSPACE,
api_url=INTERNAL_API_URL,
allowed_dirs=[CLI_WORKSPACE],
max_sessions=int(os.getenv("MAX_CLI_SESSIONS", "10")),
)
# Session storage and message queue (stored in internal data path, not user's project)
from providers.session_store import SessionStore
@ -322,12 +328,22 @@ def register_bot_handlers(client: "TelegramClient"):
ALLOWED_USER_ID = os.getenv("ALLOWED_TELEGRAM_USER_ID")
logger.info(f"DEBUG: Registering bot handlers. Allowed user ID: {ALLOWED_USER_ID}")
async def send_error_to_user(chat_id: int, error_msg: str, context: str = ""):
"""Send a formatted error message to the user."""
try:
formatted = f"❌ **Error**"
if context:
formatted += f" ({context})"
formatted += f"\n\n```\n{str(error_msg)[:500]}\n```"
await client.send_message(chat_id, formatted, parse_mode="markdown")
except Exception as e:
logger.error(f"Failed to send error to user: {e}")
async def process_claude_task(session_id_to_resume: Optional[str], queued_msg: QueuedMessage):
"""
Core task processor - handles a single Claude CLI interaction.
Can be called directly or by the queue manager.
Now uses CLISessionManager for multi-instance support.
"""
event = queued_msg.event
prompt = queued_msg.prompt
status_msg_id = queued_msg.reply_msg_id
chat_id = queued_msg.chat_id
@ -338,12 +354,15 @@ def register_bot_handlers(client: "TelegramClient"):
status_msg = await client.get_messages(chat_id, ids=status_msg_id)
except Exception as e:
logger.error(f"Failed to get status message: {e}")
await send_error_to_user(chat_id, str(e), "getting status message")
return
# Unified message accumulator
message_parts = []
last_ui_update = 0
captured_session_id = session_id_to_resume # May be updated from CLI output
captured_session_id = session_id_to_resume
temp_session_id = None # Track temp ID for new sessions
cli_session = None # The CLISession instance for this task
def build_unified_message(status=None):
lines = []
@ -361,6 +380,8 @@ def register_bot_handlers(client: "TelegramClient"):
lines.append(f"🤖 **Subagent:** {content}")
elif part_type == "content":
lines.append(content)
elif part_type == "error":
lines.append(f"⚠️ {content}")
result = "\n".join(lines)
if len(result) > 4000:
@ -381,26 +402,53 @@ def register_bot_handlers(client: "TelegramClient"):
logger.debug(f"UI update failed: {e}")
try:
# Start or resume the CLI session
# Get or create CLI session from the manager
is_resume = session_id_to_resume is not None
log_prefix = f"Resuming session {session_id_to_resume}" if is_resume else "Starting new session"
logger.info(f"BOT: {log_prefix} for prompt: {prompt[:50]}...")
try:
cli_session, session_or_temp_id, is_new = await cli_session_manager.get_or_create_session(
session_id=session_id_to_resume
)
if is_new:
temp_session_id = session_or_temp_id
logger.info(f"BOT: Created new CLI session with temp_id: {temp_session_id}")
else:
captured_session_id = session_or_temp_id
logger.info(f"BOT: Reusing CLI session: {captured_session_id}")
except RuntimeError as e:
# Max sessions reached
logger.warning(f"BOT: Session limit reached: {e}")
message_parts.append(("error", str(e)))
await update_bot_ui("⏳ **Session limit reached**", force=True)
return
except Exception as e:
logger.error(f"BOT: Failed to get/create session: {e}")
await send_error_to_user(chat_id, str(e), "creating session")
return
# Process CLI events
async for event_data in cli_session.start_task(prompt, session_id=session_id_to_resume):
if not isinstance(event_data, dict):
continue
# Handle session_info event to capture session ID
if event_data.get("type") == "session_info":
captured_session_id = event_data.get("session_id")
if captured_session_id and not is_resume:
# Save the session mapping for new sessions
real_session_id = event_data.get("session_id")
if real_session_id and temp_session_id:
# Register the real session ID
await cli_session_manager.register_real_session_id(
temp_session_id, real_session_id
)
captured_session_id = real_session_id
# Save to session store for Telegram reply tracking
session_store.save_session(
session_id=captured_session_id,
session_id=real_session_id,
chat_id=chat_id,
initial_msg_id=original_msg_id,
)
logger.info(f"BOT: Saved new session {captured_session_id} for msg {original_msg_id}")
logger.info(f"BOT: Registered session {temp_session_id} -> {real_session_id}")
continue
parsed = CLIParser.parse_event(event_data)
@ -411,6 +459,8 @@ def register_bot_handlers(client: "TelegramClient"):
continue
if "login" in raw_line.lower():
await client.send_message(chat_id, "⚠️ **Claude requires login. Run `claude` in terminal.**")
elif "error" in raw_line.lower():
message_parts.append(("error", raw_line[:200]))
continue
if not parsed:
@ -459,15 +509,25 @@ def register_bot_handlers(client: "TelegramClient"):
session_store.update_last_message(captured_session_id, status_msg.id)
elif parsed["type"] == "error":
message_parts.append(("content", f"**Error:** {parsed['message']}"))
error_msg = parsed.get("message", "Unknown error")
message_parts.append(("error", f"**CLI Error:** {error_msg}"))
await update_bot_ui("❌ **Error**", force=True)
except asyncio.CancelledError:
logger.info(f"BOT: Task cancelled for session {captured_session_id or temp_session_id}")
message_parts.append(("error", "Task was cancelled"))
await update_bot_ui("⏹ **Cancelled**", force=True)
except Exception as e:
logger.error(f"Bot task failed: {e}")
import traceback
logger.error(f"Bot task failed: {e}\n{traceback.format_exc()}")
try:
await client.send_message(chat_id, f"💥 **Failed:** {e}")
error_text = str(e)[:300]
await status_msg.edit(
f"💥 **Task Failed**\n\n```\n{error_text}\n```",
parse_mode="markdown"
)
except:
pass
await send_error_to_user(chat_id, str(e), "task execution")
@client.on(events.NewMessage())
async def handle_telegram_message(event):
@ -482,17 +542,32 @@ def register_bot_handlers(client: "TelegramClient"):
# 1. Handle Commands
if event.text == "/stop":
await cli_session.stop()
await event.reply("⏹ **Claude process stopped.**")
await cli_session_manager.stop_all()
await event.reply("⏹ **All Claude sessions stopped.**")
return
if event.text == "/stats":
stats = cli_session_manager.get_stats()
await event.reply(
f"📊 **Session Stats**\n\n"
f"• Active: {stats['active_sessions']}\n"
f"• Pending: {stats['pending_sessions']}\n"
f"• Busy: {stats['busy_count']}\n"
f"• Max: {stats['max_sessions']}"
)
return
if event.text == "/queue":
# Show queue status (optional command)
await event.reply("📋 **Queue status:** Feature active. Reply to old messages to continue conversations.")
stats = cli_session_manager.get_stats()
await event.reply(
f"📋 **Queue Status**\n\n"
f"Active sessions: {stats['active_sessions']}/{stats['max_sessions']}\n"
f"Reply to old messages to continue conversations."
)
return
# 2. Filter out bot's own status messages and empty text
if not event.text or any(event.text.startswith(p) for p in ["", "💭", "🔧", "", "", "🚀", "🤖", "📋"]):
if not event.text or any(event.text.startswith(p) for p in ["", "💭", "🔧", "", "", "🚀", "🤖", "📋", "📊", "🔄"]):
return
logger.info(f"BOT_TASK: {event.text}")
@ -510,14 +585,24 @@ def register_bot_handlers(client: "TelegramClient"):
logger.info(f"BOT: No session found for reply to msg {reply_to_msg_id}, starting new session")
# 4. Send initial status message
if session_id_to_resume:
if cli_session.is_busy:
queue_size = message_queue.get_queue_size(session_id_to_resume) + 1
status_msg = await event.reply(f"📋 **Queued** (position {queue_size}) - Claude is busy, will process when ready...")
try:
if session_id_to_resume:
if message_queue.is_session_busy(session_id_to_resume):
queue_size = message_queue.get_queue_size(session_id_to_resume) + 1
status_msg = await event.reply(f"📋 **Queued** (position {queue_size}) - waiting for previous request...")
else:
status_msg = await event.reply("🔄 **Continuing conversation...**")
else:
status_msg = await event.reply("🔄 **Continuing conversation...**")
else:
status_msg = await event.reply("⏳ **Launching Claude CLI...**")
stats = cli_session_manager.get_stats()
if stats['active_sessions'] >= stats['max_sessions']:
status_msg = await event.reply(
f"⏳ **Waiting for slot...** ({stats['active_sessions']}/{stats['max_sessions']} sessions active)"
)
else:
status_msg = await event.reply("⏳ **Launching new Claude CLI instance...**")
except Exception as e:
logger.error(f"Failed to send status message: {e}")
return
# 5. Create queued message
queued_msg = QueuedMessage(
@ -530,26 +615,24 @@ def register_bot_handlers(client: "TelegramClient"):
# 6. Process or queue based on session state
if session_id_to_resume and message_queue.is_session_busy(session_id_to_resume):
# Session is busy, queue the message
# Session is busy, queue the message for that specific session
await message_queue.enqueue(
session_id=session_id_to_resume,
message=queued_msg,
processor=process_claude_task,
)
logger.info(f"BOT: Message queued for busy session {session_id_to_resume}")
elif session_id_to_resume:
# Resuming a free existing session - use queue to track busy state
await message_queue.enqueue(
session_id=session_id_to_resume,
message=queued_msg,
processor=process_claude_task,
)
else:
# Process immediately (either new session or free session)
if session_id_to_resume:
# Use the queue manager to track busy state
await message_queue.enqueue(
session_id=session_id_to_resume,
message=queued_msg,
processor=process_claude_task,
)
else:
# New session - process directly
# We'll save the session ID once we get it from CLI
await process_claude_task(None, queued_msg)
# NEW session - process directly in a new task (parallel!)
# Each new message gets its own CLI instance immediately
asyncio.create_task(process_claude_task(None, queued_msg))
FAST_PREFIX_DETECTION = os.getenv("FAST_PREFIX_DETECTION", "true").lower() == "true"