From 2ce1947b0fc02ac12a8817a1804ff5b40abe5f8a Mon Sep 17 00:00:00 2001 From: Agent Zero Local Date: Sat, 2 May 2026 13:07:27 +0000 Subject: [PATCH 1/4] fix(code_execution): close PTY file descriptors Store and close POSIX PTY master descriptors when terminal sessions are closed or killed, and make local terminal session shutdown await the full TTY cleanup path. This prevents leaked /dev/ptmx descriptors from exhausting the process file descriptor limit. --- .../_code_execution/helpers/shell_local.py | 4 +- .../_code_execution/helpers/tty_session.py | 59 +++++++++++++++++-- 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/plugins/_code_execution/helpers/shell_local.py b/plugins/_code_execution/helpers/shell_local.py index 797cc7fa0..13530999c 100644 --- a/plugins/_code_execution/helpers/shell_local.py +++ b/plugins/_code_execution/helpers/shell_local.py @@ -21,8 +21,8 @@ class LocalInteractiveSession: async def close(self): if self.session: - self.session.kill() - # self.session.wait() + await self.session.close() + self.session = None async def send_command(self, command: str): if not self.session: diff --git a/plugins/_code_execution/helpers/tty_session.py b/plugins/_code_execution/helpers/tty_session.py index 384daf552..d39537ae1 100644 --- a/plugins/_code_execution/helpers/tty_session.py +++ b/plugins/_code_execution/helpers/tty_session.py @@ -23,6 +23,8 @@ class TTYSession: self.echo = echo # ← store preference self._proc = None self._buf: asyncio.Queue = None # type: ignore + self._pump_task = None + self._pty_master = None def __del__(self): # Simple cleanup on object destruction @@ -46,22 +48,51 @@ class TTYSession: self._proc = await _spawn_posix_pty( self.cmd, self.cwd, self.env, self.echo ) # ← pass echo + self._pty_master = getattr(self._proc, "_pty_master", None) self._pump_task = asyncio.create_task(self._pump_stdout()) async def close(self): # Cancel the pump task if it exists - if hasattr(self, "_pump_task") and self._pump_task: + if self._pump_task: self._pump_task.cancel() try: await self._pump_task except asyncio.CancelledError: pass + except Exception: + pass + + master = self._pty_master + if master is not None: + try: + loop = asyncio.get_running_loop() + loop.remove_reader(master) + except Exception: + pass + # Terminate the process if it exists if self._proc: - self._proc.terminate() - await self._proc.wait() + try: + if getattr(self._proc, "returncode", None) is None: + self._proc.terminate() + except ProcessLookupError: + pass + except Exception: + pass + try: + await self._proc.wait() + except Exception: + pass + + if master is not None: + try: + os.close(master) + except OSError: + pass + self._proc = None self._pump_task = None + self._pty_master = None async def send(self, data: str | bytes): if self._proc is None: @@ -99,6 +130,18 @@ class TTYSession: except ProcessLookupError: # Child already gone – treat as successfully killed pass + master = self._pty_master + if master is not None: + try: + loop = asyncio.get_running_loop() + loop.remove_reader(master) + except Exception: + pass + try: + os.close(master) + except OSError: + pass + self._pty_master = None async def read(self, timeout=None): # Return any decoded text the child produced, or None on timeout @@ -185,7 +228,14 @@ async def _spawn_posix_pty(cmd, cwd, env, echo): reader.feed_data(data) else: reader.feed_eof() - loop.remove_reader(master) + try: + loop.remove_reader(master) + except Exception: + pass + try: + os.close(master) + except OSError: + pass loop.add_reader(master, _on_data) @@ -198,6 +248,7 @@ async def _spawn_posix_pty(cmd, cwd, env, echo): proc.stdin = _Stdin() # type: ignore proc.stdout = reader + proc._pty_master = master # type: ignore[attr-defined] return proc From a0f0c2e8d2ca12f194fe50c553eda8b2824084fa Mon Sep 17 00:00:00 2001 From: Agent Zero Local Date: Sat, 2 May 2026 13:34:27 +0000 Subject: [PATCH 2/4] fix(code_execution): recover from closed PTY sessions Detect closed or exited local TTY sessions before writing, convert invalid PTY write errors into retryable session failures, and reset/retry the terminal session once after send/read failures. --- plugins/_code_execution/helpers/tty_session.py | 14 ++++++++++++-- .../_code_execution/tools/code_execution_tool.py | 9 ++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/plugins/_code_execution/helpers/tty_session.py b/plugins/_code_execution/helpers/tty_session.py index d39537ae1..bd8013d79 100644 --- a/plugins/_code_execution/helpers/tty_session.py +++ b/plugins/_code_execution/helpers/tty_session.py @@ -97,10 +97,20 @@ class TTYSession: async def send(self, data: str | bytes): if self._proc is None: raise RuntimeError("TTYSpawn is not started") + if self._pty_master is None and not _IS_WIN: + raise RuntimeError("TTYSpawn PTY is closed") + if getattr(self._proc, "returncode", None) is not None: + raise RuntimeError("TTYSpawn process has exited") if isinstance(data, str): data = data.encode(self.encoding) - self._proc.stdin.write(data) # type: ignore - await self._proc.stdin.drain() # type: ignore + try: + self._proc.stdin.write(data) # type: ignore + await self._proc.stdin.drain() # type: ignore + except OSError as e: + if e.errno in (errno.EBADF, errno.EIO, errno.EINVAL): + self._pty_master = None + raise RuntimeError("TTYSpawn PTY is closed") from e + raise async def sendline(self, line: str): await self.send(line + "\n") diff --git a/plugins/_code_execution/tools/code_execution_tool.py b/plugins/_code_execution/tools/code_execution_tool.py index 6ebec1d61..45566b00f 100644 --- a/plugins/_code_execution/tools/code_execution_tool.py +++ b/plugins/_code_execution/tools/code_execution_tool.py @@ -194,12 +194,11 @@ class CodeExecution(Tool): ) except Exception as e: - if i == 1: - PrintStyle.error(str(e)) - await self.prepare_state(cfg, reset=True, session=session) + PrintStyle.error(str(e)) + await self.prepare_state(cfg, reset=True, session=session) + if i == 0: continue - else: - raise e + raise e def format_command_for_output(self, command: str): short_cmd = command[:250] From d4eaa7c0305eea10d97694971d747df1e891eb40 Mon Sep 17 00:00:00 2001 From: Agent Zero Local Date: Sat, 2 May 2026 20:56:22 +0000 Subject: [PATCH 3/4] fix(code_execution): avoid double-close of PTY master fd Use a shared mutable holder for the POSIX PTY master fd and invalidate it before close. This keeps EOF cleanup and TTYSession.close()/kill() idempotent and prevents closing an unrelated resource if the OS reuses the old fd number. --- .../_code_execution/helpers/tty_session.py | 114 ++++++++++++------ 1 file changed, 74 insertions(+), 40 deletions(-) diff --git a/plugins/_code_execution/helpers/tty_session.py b/plugins/_code_execution/helpers/tty_session.py index bd8013d79..444b11816 100644 --- a/plugins/_code_execution/helpers/tty_session.py +++ b/plugins/_code_execution/helpers/tty_session.py @@ -25,6 +25,7 @@ class TTYSession: self._buf: asyncio.Queue = None # type: ignore self._pump_task = None self._pty_master = None + self._pty_master_ref = None def __del__(self): # Simple cleanup on object destruction @@ -48,7 +49,12 @@ class TTYSession: self._proc = await _spawn_posix_pty( self.cmd, self.cwd, self.env, self.echo ) # ← pass echo - self._pty_master = getattr(self._proc, "_pty_master", None) + self._pty_master_ref = getattr(self._proc, "_pty_master_ref", None) + self._pty_master = ( + self._pty_master_ref.get("fd") + if self._pty_master_ref is not None + else getattr(self._proc, "_pty_master", None) + ) self._pump_task = asyncio.create_task(self._pump_stdout()) async def close(self): @@ -62,14 +68,6 @@ class TTYSession: except Exception: pass - master = self._pty_master - if master is not None: - try: - loop = asyncio.get_running_loop() - loop.remove_reader(master) - except Exception: - pass - # Terminate the process if it exists if self._proc: try: @@ -84,21 +82,47 @@ class TTYSession: except Exception: pass - if master is not None: - try: - os.close(master) - except OSError: - pass - + self._release_pty_master() self._proc = None self._pump_task = None + + def _release_pty_master(self): + """Release the POSIX PTY master exactly once. + + The fd number is invalidated before os.close() so that a concurrent or + later cleanup path cannot close the same integer after the OS has reused + it for another file/socket. + """ + ref = self._pty_master_ref + master = ref.get("fd") if ref is not None else self._pty_master + if master is None: + self._pty_master = None + return + if ref is not None: + ref["fd"] = None self._pty_master = None + try: + loop = asyncio.get_running_loop() + loop.remove_reader(master) + except Exception: + pass + try: + os.close(master) + except OSError: + pass + self._pty_master_ref = None async def send(self, data: str | bytes): if self._proc is None: raise RuntimeError("TTYSpawn is not started") - if self._pty_master is None and not _IS_WIN: - raise RuntimeError("TTYSpawn PTY is closed") + if not _IS_WIN: + master = ( + self._pty_master_ref.get("fd") + if self._pty_master_ref is not None + else self._pty_master + ) + if master is None: + raise RuntimeError("TTYSpawn PTY is closed") if getattr(self._proc, "returncode", None) is not None: raise RuntimeError("TTYSpawn process has exited") if isinstance(data, str): @@ -108,7 +132,7 @@ class TTYSession: await self._proc.stdin.drain() # type: ignore except OSError as e: if e.errno in (errno.EBADF, errno.EIO, errno.EINVAL): - self._pty_master = None + self._release_pty_master() raise RuntimeError("TTYSpawn PTY is closed") from e raise @@ -140,18 +164,7 @@ class TTYSession: except ProcessLookupError: # Child already gone – treat as successfully killed pass - master = self._pty_master - if master is not None: - try: - loop = asyncio.get_running_loop() - loop.remove_reader(master) - except Exception: - pass - try: - os.close(master) - except OSError: - pass - self._pty_master = None + self._release_pty_master() async def read(self, timeout=None): # Return any decoded text the child produced, or None on timeout @@ -226,10 +239,34 @@ async def _spawn_posix_pty(cmd, cwd, env, echo): loop = asyncio.get_running_loop() reader = asyncio.StreamReader() + master_ref = {"fd": master} + + def _release_master_fd(): + cur = master_ref.get("fd") + if cur is None: + return + # Invalidate before close so later cleanup cannot close a reused fd. + master_ref["fd"] = None + try: + proc._pty_master = None # type: ignore[attr-defined] + except Exception: + pass + try: + loop.remove_reader(cur) + except Exception: + pass + try: + os.close(cur) + except OSError: + pass def _on_data(): + cur = master_ref.get("fd") + if cur is None: + reader.feed_eof() + return try: - data = os.read(master, 1 << 16) + data = os.read(cur, 1 << 16) except OSError as e: if e.errno != errno.EIO: # EIO == EOF on some systems raise @@ -238,20 +275,16 @@ async def _spawn_posix_pty(cmd, cwd, env, echo): reader.feed_data(data) else: reader.feed_eof() - try: - loop.remove_reader(master) - except Exception: - pass - try: - os.close(master) - except OSError: - pass + _release_master_fd() loop.add_reader(master, _on_data) class _Stdin: def write(self, d): - os.write(master, d) + cur = master_ref.get("fd") + if cur is None: + raise OSError(errno.EBADF, "PTY master closed") + os.write(cur, d) async def drain(self): await asyncio.sleep(0) @@ -259,6 +292,7 @@ async def _spawn_posix_pty(cmd, cwd, env, echo): proc.stdin = _Stdin() # type: ignore proc.stdout = reader proc._pty_master = master # type: ignore[attr-defined] + proc._pty_master_ref = master_ref # type: ignore[attr-defined] return proc From eecbb5ba344cbb9f780e6c0c14af1a4b16219144 Mon Sep 17 00:00:00 2001 From: Agent Zero Local Date: Sat, 2 May 2026 21:33:08 +0000 Subject: [PATCH 4/4] fix(code_execution): handle closed PTYs while reading output --- .../_code_execution/helpers/shell_local.py | 9 +++- .../tools/code_execution_tool.py | 50 +++++++++++++++---- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/plugins/_code_execution/helpers/shell_local.py b/plugins/_code_execution/helpers/shell_local.py index 13530999c..d10c4d3c7 100644 --- a/plugins/_code_execution/helpers/shell_local.py +++ b/plugins/_code_execution/helpers/shell_local.py @@ -21,8 +21,15 @@ class LocalInteractiveSession: async def close(self): if self.session: - await self.session.close() + session = self.session self.session = None + try: + await session.close() + except Exception: + try: + session.kill() + except Exception: + pass async def send_command(self, command: str): if not self.session: diff --git a/plugins/_code_execution/tools/code_execution_tool.py b/plugins/_code_execution/tools/code_execution_tool.py index 45566b00f..188f03aa1 100644 --- a/plugins/_code_execution/tools/code_execution_tool.py +++ b/plugins/_code_execution/tools/code_execution_tool.py @@ -1,4 +1,5 @@ import asyncio +import errno from dataclasses import dataclass import re import shlex @@ -15,6 +16,17 @@ from plugins._code_execution.helpers.shell_local import LocalInteractiveSession from plugins._code_execution.helpers.shell_ssh import SSHInteractiveSession +def _is_closed_pty_error(exc: BaseException) -> bool: + if isinstance(exc, RuntimeError) and "TTYSpawn PTY is closed" in str(exc): + return True + if isinstance(exc, OSError) and exc.errno in (errno.EBADF, errno.EIO, errno.EINVAL): + return True + cause = getattr(exc, "__cause__", None) + if cause and cause is not exc: + return _is_closed_pty_error(cause) + return False + + @dataclass class ShellWrap: id: int @@ -194,11 +206,12 @@ class CodeExecution(Tool): ) except Exception as e: - PrintStyle.error(str(e)) - await self.prepare_state(cfg, reset=True, session=session) - if i == 0: + if _is_closed_pty_error(e) and i == 0: + PrintStyle.warning(f"Terminal session {session} was closed; resetting and retrying once.") + await self.prepare_state(cfg, reset=True, session=session) continue - raise e + PrintStyle.error(str(e)) + raise def format_command_for_output(self, command: str): short_cmd = command[:250] @@ -244,9 +257,19 @@ class CodeExecution(Tool): while True: await asyncio.sleep(sleep_time) - full_output, partial_output = await self.state.shells[session].session.read_output( - timeout=1, reset_full_output=reset_full_output - ) + try: + full_output, partial_output = await self.state.shells[session].session.read_output( + timeout=1, reset_full_output=reset_full_output + ) + except Exception as e: + if _is_closed_pty_error(e): + await self.prepare_state(cfg, reset=True, session=session) + self.mark_session_idle(session) + sysinfo = "Terminal session was closed and has been reset. Please run the command again." + response = self.agent.read_prompt("fw.code.info.md", info=sysinfo) + self.log.update(content=prefix + response) + return response + raise reset_full_output = False # only reset once await self.agent.handle_intervention() @@ -363,9 +386,16 @@ class CodeExecution(Tool): prompt_patterns = cfg["prompt_patterns"] dialog_patterns = cfg["dialog_patterns"] - full_output, _ = await self.state.shells[session].session.read_output( - timeout=1, reset_full_output=reset_full_output - ) + try: + full_output, _ = await self.state.shells[session].session.read_output( + timeout=1, reset_full_output=reset_full_output + ) + except Exception as e: + if _is_closed_pty_error(e): + await self.prepare_state(cfg, reset=True, session=session) + self.mark_session_idle(session) + return None + raise truncated_output = self.fix_full_output(full_output) self.set_progress(truncated_output) heading = self.get_heading_from_output(truncated_output, 0)