handle notifications in mcp

This commit is contained in:
Concedo 2026-03-17 15:13:42 +08:00
parent f31b040941
commit 39f9007d12

View file

@ -40,6 +40,7 @@ from typing import Tuple
import shutil
import subprocess
import gzip
import queue
# constants
sampler_order_max = 7
@ -536,6 +537,15 @@ class MCPStdioClient:
daemon=True
)
self.stderr_thread.start()
self._pending = {}
self._pending_lock = threading.Lock()
self.stdout_thread = threading.Thread(
target=self._read_stdout,
daemon=True
)
self.stdout_thread.start()
def _read_stderr(self):
try:
for line in self.process.stderr:
@ -547,22 +557,51 @@ class MCPStdioClient:
self.stderr_buffer.pop(0)
finally:
self.alive = False
def _read_stdout(self): # notifications (no id) are silently dropped
try:
for line in self.process.stdout:
if not line:
break
msg = json.loads(line)
msg_id = msg.get("id")
if msg_id is not None:
with self._pending_lock:
q = self._pending.get(msg_id)
if q:
q.put(msg)
finally:
self.alive = False
with self._pending_lock:
for q in self._pending.values():
q.put(None) # unblock any waiting send()
def send(self, message: dict, await_response=True) -> dict: # Send JSON-RPC request and wait for one response.
def send(self, message: dict, await_response=True) -> dict: # Send JSON-RPC request and wait for response.
line = json.dumps(message)
with self.lock:
msg_id = message.get("id")
response_q = queue.Queue()
if await_response and msg_id is not None:
with self._pending_lock:
self._pending[msg_id] = response_q
with self.lock: # write lock only now
if self.process.stdin.closed:
raise RuntimeError("MCP server stdin is closed")
self.process.stdin.write(line + "\n")
self.process.stdin.flush()
if not await_response:
return None
response = self.process.stdout.readline()
if not response:
if not await_response:
return None
try:
response = response_q.get(timeout=30)
finally:
with self._pending_lock:
self._pending.pop(msg_id, None)
if response is None:
errmsg = "\n".join(self.stderr_buffer[-10:])
print(f"[MCP Server Error!]\n{errmsg}")
raise RuntimeError("MCP server closed stdout")
return json.loads(response)
return response
def notify(self, message: dict) -> None: # Send JSON-RPC notification (no response expected).
line = json.dumps(message)
with self.lock: