try improve mcp

This commit is contained in:
Concedo 2026-02-10 15:33:39 +08:00
parent e6d271db05
commit 10bf868088

View file

@ -9,6 +9,7 @@
# scenarios and everything Kobold and KoboldAI Lite have to offer.
import os
import queue
try:
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" # try set GPU to PCI order first thing
except Exception:
@ -481,11 +482,18 @@ class MCPStdioClient:
self.lock = threading.Lock()
self.stderr_buffer = []
self.stderr_limit = 20
self.alive = True
self.stderr_alive = True
self.stdout_alive = True
self.stdout_queue = queue.Queue()
self.stderr_thread = threading.Thread(
target=self._read_stderr,
daemon=True
)
self.stdout_thread = threading.Thread(
target=self._read_stdout,
daemon=True
)
self.stdout_thread.start()
self.stderr_thread.start()
def _read_stderr(self):
try:
@ -497,23 +505,51 @@ class MCPStdioClient:
if len(self.stderr_buffer) > self.stderr_limit:
self.stderr_buffer.pop(0)
finally:
self.alive = False
self.stderr_alive = False
def _read_stdout(self):
try:
for line in self.process.stdout:
if not line:
break
line = line.rstrip("\n")
if line:
try:
data = json.loads(line)
if isinstance(data, dict) and isinstance(data.get("id"), (int, str)):
self.stdout_queue.put(data)
except json.JSONDecodeError:
# Ignore lines that aren't valid JSON
pass
finally:
self.stdout_alive = False
def _empty_queue(self):
# Drain any stale responses belonging to orphans or other IDs
while not self.stdout_queue.empty():
try:
self.stdout_queue.get_nowait()
except queue.Empty:
break
def send(self, message: dict, await_response=True) -> dict: # Send JSON-RPC request and wait for one response.
line = json.dumps(message)
with self.lock:
response = None
with self.lock: #idea: since we use a lock and empty queue before/after, we guarantee that any ID belongs to us, single request in flight only
if self.process.stdin.closed:
raise RuntimeError("MCP server stdin is closed")
self._empty_queue()
self.process.stdin.write(line + "\n")
self.process.stdin.flush()
if not await_response:
return None
response = self.process.stdout.readline()
try:
response = self.stdout_queue.get(timeout=30) # short 30s timeout in case stalled
except queue.Empty:
response = None
if not response:
errmsg = "\n".join(self.stderr_buffer[-10:])
print(f"[MCP Server Error!]\n{errmsg}")
raise RuntimeError("MCP server closed stdout")
return json.loads(response)
raise RuntimeError("MCP server stdio response failed")
return response
def notify(self, message: dict) -> None: # Send JSON-RPC notification (no response expected).
line = json.dumps(message)
with self.lock: