diff --git a/koboldcpp.py b/koboldcpp.py index 73c7a5c57..dd8f16dd4 100755 --- a/koboldcpp.py +++ b/koboldcpp.py @@ -40,6 +40,7 @@ from typing import Tuple import shutil import subprocess import gzip +import queue # constants sampler_order_max = 7 @@ -536,6 +537,15 @@ class MCPStdioClient: daemon=True ) self.stderr_thread.start() + + self._pending = {} + self._pending_lock = threading.Lock() + self.stdout_thread = threading.Thread( + target=self._read_stdout, + daemon=True + ) + self.stdout_thread.start() + def _read_stderr(self): try: for line in self.process.stderr: @@ -547,22 +557,51 @@ class MCPStdioClient: self.stderr_buffer.pop(0) finally: self.alive = False + def _read_stdout(self): # notifications (no id) are silently dropped + try: + for line in self.process.stdout: + if not line: + break + msg = json.loads(line) + msg_id = msg.get("id") + if msg_id is not None: + with self._pending_lock: + q = self._pending.get(msg_id) + if q: + q.put(msg) + finally: + self.alive = False + with self._pending_lock: + for q in self._pending.values(): + q.put(None) # unblock any waiting send() - def send(self, message: dict, await_response=True) -> dict: # Send JSON-RPC request and wait for one response. + def send(self, message: dict, await_response=True) -> dict: # Send JSON-RPC request and wait for response. line = json.dumps(message) - with self.lock: + msg_id = message.get("id") + + response_q = queue.Queue() + if await_response and msg_id is not None: + with self._pending_lock: + self._pending[msg_id] = response_q + + with self.lock: # write lock only now if self.process.stdin.closed: raise RuntimeError("MCP server stdin is closed") self.process.stdin.write(line + "\n") self.process.stdin.flush() - if not await_response: - return None - response = self.process.stdout.readline() - if not response: + + if not await_response: + return None + try: + response = response_q.get(timeout=30) + finally: + with self._pending_lock: + self._pending.pop(msg_id, None) + if response is None: errmsg = "\n".join(self.stderr_buffer[-10:]) print(f"[MCP Server Error!]\n{errmsg}") raise RuntimeError("MCP server closed stdout") - return json.loads(response) + return response def notify(self, message: dict) -> None: # Send JSON-RPC notification (no response expected). line = json.dumps(message) with self.lock: