diff --git a/koboldcpp.py b/koboldcpp.py index 52156d45f..080d7ea93 100755 --- a/koboldcpp.py +++ b/koboldcpp.py @@ -9,7 +9,6 @@ # scenarios and everything Kobold and KoboldAI Lite have to offer. import os -import queue try: os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" # try set GPU to PCI order first thing except Exception: @@ -482,18 +481,11 @@ class MCPStdioClient: self.lock = threading.Lock() self.stderr_buffer = [] self.stderr_limit = 20 - self.stderr_alive = True - self.stdout_alive = True - self.stdout_queue = queue.Queue() + self.alive = True self.stderr_thread = threading.Thread( target=self._read_stderr, daemon=True ) - self.stdout_thread = threading.Thread( - target=self._read_stdout, - daemon=True - ) - self.stdout_thread.start() self.stderr_thread.start() def _read_stderr(self): try: @@ -505,51 +497,23 @@ class MCPStdioClient: if len(self.stderr_buffer) > self.stderr_limit: self.stderr_buffer.pop(0) finally: - self.stderr_alive = False - def _read_stdout(self): - try: - for line in self.process.stdout: - if not line: - break - line = line.rstrip("\n") - if line: - try: - data = json.loads(line) - if isinstance(data, dict) and isinstance(data.get("id"), (int, str)): - self.stdout_queue.put(data) - except json.JSONDecodeError: - # Ignore lines that aren't valid JSON - pass - finally: - self.stdout_alive = False - def _empty_queue(self): - # Drain any stale responses belonging to orphans or other IDs - while not self.stdout_queue.empty(): - try: - self.stdout_queue.get_nowait() - except queue.Empty: - break + self.alive = False def send(self, message: dict, await_response=True) -> dict: # Send JSON-RPC request and wait for one response. line = json.dumps(message) - response = None - with self.lock: #idea: since we use a lock and empty queue before/after, we guarantee that any ID belongs to us, single request in flight only + with self.lock: if self.process.stdin.closed: raise RuntimeError("MCP server stdin is closed") - self._empty_queue() self.process.stdin.write(line + "\n") self.process.stdin.flush() if not await_response: return None - try: - response = self.stdout_queue.get(timeout=30) # short 30s timeout in case stalled - except queue.Empty: - response = None + response = self.process.stdout.readline() if not response: errmsg = "\n".join(self.stderr_buffer[-10:]) print(f"[MCP Server Error!]\n{errmsg}") - raise RuntimeError("MCP server stdio response failed") - return response + raise RuntimeError("MCP server closed stdout") + return json.loads(response) def notify(self, message: dict) -> None: # Send JSON-RPC notification (no response expected). line = json.dumps(message) with self.lock: