diff --git a/koboldcpp.py b/koboldcpp.py index 080d7ea93..52156d45f 100755 --- a/koboldcpp.py +++ b/koboldcpp.py @@ -9,6 +9,7 @@ # scenarios and everything Kobold and KoboldAI Lite have to offer. import os +import queue try: os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" # try set GPU to PCI order first thing except Exception: @@ -481,11 +482,18 @@ class MCPStdioClient: self.lock = threading.Lock() self.stderr_buffer = [] self.stderr_limit = 20 - self.alive = True + self.stderr_alive = True + self.stdout_alive = True + self.stdout_queue = queue.Queue() self.stderr_thread = threading.Thread( target=self._read_stderr, daemon=True ) + self.stdout_thread = threading.Thread( + target=self._read_stdout, + daemon=True + ) + self.stdout_thread.start() self.stderr_thread.start() def _read_stderr(self): try: @@ -497,23 +505,51 @@ class MCPStdioClient: if len(self.stderr_buffer) > self.stderr_limit: self.stderr_buffer.pop(0) finally: - self.alive = False + self.stderr_alive = False + def _read_stdout(self): + try: + for line in self.process.stdout: + if not line: + break + line = line.rstrip("\n") + if line: + try: + data = json.loads(line) + if isinstance(data, dict) and isinstance(data.get("id"), (int, str)): + self.stdout_queue.put(data) + except json.JSONDecodeError: + # Ignore lines that aren't valid JSON + pass + finally: + self.stdout_alive = False + def _empty_queue(self): + # Drain any stale responses belonging to orphans or other IDs + while not self.stdout_queue.empty(): + try: + self.stdout_queue.get_nowait() + except queue.Empty: + break def send(self, message: dict, await_response=True) -> dict: # Send JSON-RPC request and wait for one response. line = json.dumps(message) - with self.lock: + response = None + with self.lock: #idea: since we use a lock and empty queue before/after, we guarantee that any ID belongs to us, single request in flight only if self.process.stdin.closed: raise RuntimeError("MCP server stdin is closed") + self._empty_queue() self.process.stdin.write(line + "\n") self.process.stdin.flush() if not await_response: return None - response = self.process.stdout.readline() + try: + response = self.stdout_queue.get(timeout=30) # short 30s timeout in case stalled + except queue.Empty: + response = None if not response: errmsg = "\n".join(self.stderr_buffer[-10:]) print(f"[MCP Server Error!]\n{errmsg}") - raise RuntimeError("MCP server closed stdout") - return json.loads(response) + raise RuntimeError("MCP server stdio response failed") + return response def notify(self, message: dict) -> None: # Send JSON-RPC notification (no response expected). line = json.dumps(message) with self.lock: