# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import asyncio import logging import os import platform import shutil import subprocess import threading from concurrent.futures import ThreadPoolExecutor from app.agent.toolkit.abstract_toolkit import AbstractToolkit from app.component.environment import env from app.service.task import (Action, ActionTerminalData, Agents, get_task_lock, process_task) from app.utils.listen.toolkit_listen import auto_listen_toolkit from camel.toolkits.terminal_toolkit import \ TerminalToolkit as BaseTerminalToolkit from camel.toolkits.terminal_toolkit.terminal_toolkit import _to_plain logger = logging.getLogger("terminal_toolkit") # App version - should match electron app version # TODO: Consider getting this from a shared config APP_VERSION = "0.0.87" def get_terminal_base_venv_path() -> str: """Get the path to the terminal base venv created during app installation.""" return os.path.join( os.path.expanduser("~"), ".eigent", "venvs", f"terminal_base-{APP_VERSION}", ) @auto_listen_toolkit(BaseTerminalToolkit) class TerminalToolkit(BaseTerminalToolkit, AbstractToolkit): agent_name: str = Agents.developer_agent _thread_pool: ThreadPoolExecutor | None = None _thread_local = threading.local() def __init__( self, api_task_id: str, agent_name: str | None = None, timeout: float | None = None, working_directory: str | None = None, use_docker_backend: bool = False, docker_container_name: str | None = None, session_logs_dir: str | None = None, safe_mode: bool = True, allowed_commands: list[str] | None = None, clone_current_env: bool = True, ): self.api_task_id = api_task_id if agent_name is not None: self.agent_name = agent_name # Get base directory from environment base_dir = env( "file_save_path", os.path.expanduser("~/.eigent/terminal/") ) if working_directory is None: working_directory = base_dir self._agent_venv_dir = os.path.join(base_dir, self.agent_name) logger.debug( f"Initializing TerminalToolkit for agent={self.agent_name}", extra={ "api_task_id": api_task_id, "working_directory": working_directory, "agent_venv_dir": self._agent_venv_dir, }, ) if TerminalToolkit._thread_pool is None: TerminalToolkit._thread_pool = ThreadPoolExecutor( max_workers=1, thread_name_prefix="terminal_toolkit" ) super().__init__( timeout=timeout, working_directory=working_directory, use_docker_backend=use_docker_backend, docker_container_name=docker_container_name, session_logs_dir=session_logs_dir, safe_mode=safe_mode, allowed_commands=allowed_commands, clone_current_env=True, install_dependencies=[], ) # Auto-register with TaskLock for cleanup when task ends from app.service.task import get_task_lock_if_exists task_lock = get_task_lock_if_exists(api_task_id) if task_lock: task_lock.register_toolkit(self) logger.info( "TerminalToolkit registered for cleanup", extra={ "api_task_id": api_task_id, "working_directory": working_directory, }, ) def _setup_cloned_environment(self): """Override to clone from terminal_base venv instead of current process venv. Creates a lightweight clone using symlinks to the terminal_base venv, which contains pre-installed packages (pandas, numpy, matplotlib, etc.). """ self.cloned_env_path = os.path.join(self._agent_venv_dir, ".venv") terminal_base_path = get_terminal_base_venv_path() # Check if terminal_base exists if platform.system() == "Windows": base_python = os.path.join( terminal_base_path, "Scripts", "python.exe" ) else: base_python = os.path.join(terminal_base_path, "bin", "python") if not os.path.exists(base_python): logger.warning( f"Terminal base venv not found at {terminal_base_path}, " "falling back to system Python" ) return # Check if cloned env already exists if platform.system() == "Windows": cloned_python = os.path.join( self.cloned_env_path, "Scripts", "python.exe" ) else: cloned_python = os.path.join(self.cloned_env_path, "bin", "python") if os.path.exists(cloned_python): logger.info( f"Using existing cloned environment: {self.cloned_env_path}" ) self.python_executable = cloned_python return logger.info(f"Cloning terminal_base venv to: {self.cloned_env_path}") try: # Create the cloned venv directory os.makedirs(self.cloned_env_path, exist_ok=True) # Clone using symlinks for efficiency # We need to create proper venv structure with symlinks to terminal_base self._clone_venv_with_symlinks( terminal_base_path, self.cloned_env_path ) self.python_executable = cloned_python logger.info( f"Successfully cloned environment to: {self.cloned_env_path}" ) except Exception as e: logger.error( f"Failed to clone terminal_base venv: {e}", exc_info=True ) # Cleanup partial clone if os.path.exists(self.cloned_env_path): shutil.rmtree(self.cloned_env_path, ignore_errors=True) logger.warning("Falling back to system Python") def _get_venv_path(self): """Return the cloned venv path for shell activation.""" cloned_env_path = getattr(self, "cloned_env_path", None) if cloned_env_path and os.path.exists(cloned_env_path): return cloned_env_path return None def _clone_venv_with_symlinks(self, source_venv: str, target_venv: str): """Clone a venv using symlinks for efficiency. Creates the structure needed: pyvenv.cfg, bin/python, lib symlink, and activate scripts. """ is_windows = platform.system() == "Windows" # Read source pyvenv.cfg to get Python home source_cfg = os.path.join(source_venv, "pyvenv.cfg") python_home = None with open(source_cfg, encoding="utf-8") as f: for line in f: if line.startswith("home = "): python_home = line.split("=", 1)[1].strip() break if not python_home: raise RuntimeError( f"Could not determine Python home from {source_cfg}" ) # Copy pyvenv.cfg (simpler than recreating) shutil.copy2(source_cfg, os.path.join(target_venv, "pyvenv.cfg")) if is_windows: # Windows: copy executables from source target_bin = os.path.join(target_venv, "Scripts") os.makedirs(target_bin, exist_ok=True) source_scripts = os.path.join(source_venv, "Scripts") for exe in ["python.exe", "pythonw.exe"]: src = os.path.join(source_scripts, exe) if os.path.exists(src): shutil.copy2(src, os.path.join(target_bin, exe)) # Copy activate scripts (need to modify VIRTUAL_ENV path) for script in ["activate.bat", "activate.ps1", "deactivate.bat"]: src = os.path.join(source_scripts, script) if os.path.exists(src): with open(src, encoding="utf-8") as f: content = f.read() content = content.replace(source_venv, target_venv) dst = os.path.join(target_bin, script) with open(dst, "w", encoding="utf-8") as f: f.write(content) # Use directory junction for Lib (no admin rights needed, unlike symlink) source_lib = os.path.join(source_venv, "Lib") target_lib = os.path.join(target_venv, "Lib") subprocess.run( ["cmd", "/c", "mklink", "/J", target_lib, source_lib], check=True, capture_output=True, ) else: # Unix: symlink python executable and lib directory target_bin = os.path.join(target_venv, "bin") os.makedirs(target_bin, exist_ok=True) # Symlink python to the base Python python_exe = os.path.join(python_home, "python3") if not os.path.exists(python_exe): python_exe = os.path.join(python_home, "python") os.symlink(python_exe, os.path.join(target_bin, "python")) os.symlink("python", os.path.join(target_bin, "python3")) # Copy activate scripts (need to modify VIRTUAL_ENV path) source_bin = os.path.join(source_venv, "bin") for script in ["activate", "activate.csh", "activate.fish"]: src = os.path.join(source_bin, script) if os.path.exists(src): with open(src) as f: content = f.read() # Replace source venv path with target venv path content = content.replace(source_venv, target_venv) dst = os.path.join(target_bin, script) with open(dst, "w") as f: f.write(content) # Symlink lib directory source_lib = os.path.join(source_venv, "lib") os.symlink(source_lib, os.path.join(target_venv, "lib")) def _write_to_log(self, log_file: str, content: str) -> None: r"""Write content to log file with optional ANSI stripping. Args: log_file (str): Path to the log file content (str): Content to write """ # Convert ANSI escape sequences to plain text super()._write_to_log(log_file, content) logger.debug( "Terminal output logged", extra={ "api_task_id": self.api_task_id, "log_file": log_file, "content_length": len(content), }, ) self._update_terminal_output(_to_plain(content)) def _update_terminal_output(self, output: str): task_lock = get_task_lock(self.api_task_id) process_task_id = process_task.get("") # Create the coroutine coro = task_lock.put_queue( ActionTerminalData( action=Action.terminal, process_task_id=process_task_id, data=output, ) ) # Try to get the current event loop, if none exists, create a new one in a thread try: loop = asyncio.get_running_loop() # If we're in an async context, schedule the coroutine task = loop.create_task(coro) if hasattr(task_lock, "add_background_task"): task_lock.add_background_task(task) except RuntimeError: self._thread_pool.submit(self._run_coro_in_thread, coro, task_lock) @staticmethod def _run_coro_in_thread(coro, task_lock): """ Execute coro in the thread pool, with each thread bound to a long-term event loop """ if not hasattr(TerminalToolkit._thread_local, "loop"): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) TerminalToolkit._thread_local.loop = loop else: loop = TerminalToolkit._thread_local.loop if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) TerminalToolkit._thread_local.loop = loop try: task = loop.create_task(coro) if hasattr(task_lock, "add_background_task"): task_lock.add_background_task(task) loop.run_until_complete(task) except Exception as e: logging.error( f"Failed to execute coroutine in thread pool: {str(e)}", exc_info=True, ) def shell_exec( self, command: str, id: str | None = None, block: bool = True, timeout: float = 20.0, ) -> str: r"""Executes a shell command in blocking or non-blocking mode. Args: command (str): The shell command to execute. id (str, optional): A unique identifier for the command's session. If not provided, a unique ID will be automatically generated. block (bool, optional): Determines the execution mode. Defaults to True. timeout (float, optional): Timeout in seconds for blocking mode. Defaults to 20.0. Returns: str: The output of the command execution. """ # Auto-generate ID if not provided if id is None: import time id = f"auto_{int(time.time() * 1000)}" result = super().shell_exec( id=id, command=command, block=block, timeout=timeout ) # If the command executed successfully but returned empty output, # provide a clear success message to help the AI agent understand # that the command completed without error. if block and result == "": return "Command executed successfully (no output)." return result def cleanup(self, remove_venv: bool = True): """Clean up all active sessions and optionally remove the virtual environment. Args: remove_venv: If True, removes the .venv or .initial_env folder created by this toolkit. Defaults to True to prevent disk bloat. """ # First call parent cleanup to kill all shell sessions super().cleanup() if not remove_venv: return # Remove cloned env (.venv) if it exists cloned_env_path = getattr(self, "cloned_env_path", None) if cloned_env_path and os.path.exists(cloned_env_path): try: shutil.rmtree(cloned_env_path) logger.info( "Removed cloned venv", extra={ "api_task_id": self.api_task_id, "path": cloned_env_path, }, ) except Exception as e: logger.warning( "Failed to remove cloned venv", extra={ "api_task_id": self.api_task_id, "path": cloned_env_path, "error": str(e), }, ) # Remove initial env (.initial_env) if it exists initial_env_path = getattr(self, "initial_env_path", None) if initial_env_path and os.path.exists(initial_env_path): try: shutil.rmtree(initial_env_path) logger.info( "Removed initial env", extra={ "api_task_id": self.api_task_id, "path": initial_env_path, }, ) except Exception as e: logger.warning( "Failed to remove initial env", extra={ "api_task_id": self.api_task_id, "path": initial_env_path, "error": str(e), }, ) @classmethod def shutdown(cls): if cls._thread_pool: cls._thread_pool.shutdown(wait=True) cls._thread_pool = None