fix(code_execution): avoid double-close of PTY master fd

Use a shared mutable holder for the POSIX PTY master fd and invalidate it before close. This keeps EOF cleanup and TTYSession.close()/kill() idempotent and prevents closing an unrelated resource if the OS reuses the old fd number.
This commit is contained in:
Agent Zero Local 2026-05-02 20:56:22 +00:00
parent a0f0c2e8d2
commit d4eaa7c030

View file

@ -25,6 +25,7 @@ class TTYSession:
self._buf: asyncio.Queue = None # type: ignore
self._pump_task = None
self._pty_master = None
self._pty_master_ref = None
def __del__(self):
# Simple cleanup on object destruction
@ -48,7 +49,12 @@ class TTYSession:
self._proc = await _spawn_posix_pty(
self.cmd, self.cwd, self.env, self.echo
) # ← pass echo
self._pty_master = getattr(self._proc, "_pty_master", None)
self._pty_master_ref = getattr(self._proc, "_pty_master_ref", None)
self._pty_master = (
self._pty_master_ref.get("fd")
if self._pty_master_ref is not None
else getattr(self._proc, "_pty_master", None)
)
self._pump_task = asyncio.create_task(self._pump_stdout())
async def close(self):
@ -62,14 +68,6 @@ class TTYSession:
except Exception:
pass
master = self._pty_master
if master is not None:
try:
loop = asyncio.get_running_loop()
loop.remove_reader(master)
except Exception:
pass
# Terminate the process if it exists
if self._proc:
try:
@ -84,21 +82,47 @@ class TTYSession:
except Exception:
pass
if master is not None:
try:
os.close(master)
except OSError:
pass
self._release_pty_master()
self._proc = None
self._pump_task = None
def _release_pty_master(self):
"""Release the POSIX PTY master exactly once.
The fd number is invalidated before os.close() so that a concurrent or
later cleanup path cannot close the same integer after the OS has reused
it for another file/socket.
"""
ref = self._pty_master_ref
master = ref.get("fd") if ref is not None else self._pty_master
if master is None:
self._pty_master = None
return
if ref is not None:
ref["fd"] = None
self._pty_master = None
try:
loop = asyncio.get_running_loop()
loop.remove_reader(master)
except Exception:
pass
try:
os.close(master)
except OSError:
pass
self._pty_master_ref = None
async def send(self, data: str | bytes):
if self._proc is None:
raise RuntimeError("TTYSpawn is not started")
if self._pty_master is None and not _IS_WIN:
raise RuntimeError("TTYSpawn PTY is closed")
if not _IS_WIN:
master = (
self._pty_master_ref.get("fd")
if self._pty_master_ref is not None
else self._pty_master
)
if master is None:
raise RuntimeError("TTYSpawn PTY is closed")
if getattr(self._proc, "returncode", None) is not None:
raise RuntimeError("TTYSpawn process has exited")
if isinstance(data, str):
@ -108,7 +132,7 @@ class TTYSession:
await self._proc.stdin.drain() # type: ignore
except OSError as e:
if e.errno in (errno.EBADF, errno.EIO, errno.EINVAL):
self._pty_master = None
self._release_pty_master()
raise RuntimeError("TTYSpawn PTY is closed") from e
raise
@ -140,18 +164,7 @@ class TTYSession:
except ProcessLookupError:
# Child already gone treat as successfully killed
pass
master = self._pty_master
if master is not None:
try:
loop = asyncio.get_running_loop()
loop.remove_reader(master)
except Exception:
pass
try:
os.close(master)
except OSError:
pass
self._pty_master = None
self._release_pty_master()
async def read(self, timeout=None):
# Return any decoded text the child produced, or None on timeout
@ -226,10 +239,34 @@ async def _spawn_posix_pty(cmd, cwd, env, echo):
loop = asyncio.get_running_loop()
reader = asyncio.StreamReader()
master_ref = {"fd": master}
def _release_master_fd():
cur = master_ref.get("fd")
if cur is None:
return
# Invalidate before close so later cleanup cannot close a reused fd.
master_ref["fd"] = None
try:
proc._pty_master = None # type: ignore[attr-defined]
except Exception:
pass
try:
loop.remove_reader(cur)
except Exception:
pass
try:
os.close(cur)
except OSError:
pass
def _on_data():
cur = master_ref.get("fd")
if cur is None:
reader.feed_eof()
return
try:
data = os.read(master, 1 << 16)
data = os.read(cur, 1 << 16)
except OSError as e:
if e.errno != errno.EIO: # EIO == EOF on some systems
raise
@ -238,20 +275,16 @@ async def _spawn_posix_pty(cmd, cwd, env, echo):
reader.feed_data(data)
else:
reader.feed_eof()
try:
loop.remove_reader(master)
except Exception:
pass
try:
os.close(master)
except OSError:
pass
_release_master_fd()
loop.add_reader(master, _on_data)
class _Stdin:
def write(self, d):
os.write(master, d)
cur = master_ref.get("fd")
if cur is None:
raise OSError(errno.EBADF, "PTY master closed")
os.write(cur, d)
async def drain(self):
await asyncio.sleep(0)
@ -259,6 +292,7 @@ async def _spawn_posix_pty(cmd, cwd, env, echo):
proc.stdin = _Stdin() # type: ignore
proc.stdout = reader
proc._pty_master = master # type: ignore[attr-defined]
proc._pty_master_ref = master_ref # type: ignore[attr-defined]
return proc