eigent/backend/app/agent/toolkit/terminal_toolkit.py
2026-05-01 10:11:24 +08:00

450 lines
16 KiB
Python

# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
import asyncio
import logging
import os
import platform
import shutil
import subprocess
import threading
from concurrent.futures import ThreadPoolExecutor
from camel.toolkits.terminal_toolkit import (
TerminalToolkit as BaseTerminalToolkit,
)
from camel.toolkits.terminal_toolkit.terminal_toolkit import _to_plain
from app.agent.toolkit.abstract_toolkit import AbstractToolkit
from app.component.environment import env
from app.service.task import (
Action,
ActionTerminalData,
Agents,
get_task_lock,
process_task,
)
from app.utils.listen.toolkit_listen import auto_listen_toolkit
logger = logging.getLogger("terminal_toolkit")
# App version - should match electron app version
# TODO: Consider getting this from a shared config
APP_VERSION = "0.0.90"
def get_terminal_base_venv_path() -> str:
"""Get the path to the terminal base venv created during app installation."""
return os.path.join(
os.path.expanduser("~"),
".eigent",
"venvs",
f"terminal_base-{APP_VERSION}",
)
@auto_listen_toolkit(BaseTerminalToolkit)
class TerminalToolkit(BaseTerminalToolkit, AbstractToolkit):
agent_name: str = Agents.developer_agent
_thread_pool: ThreadPoolExecutor | None = None
_thread_local = threading.local()
def __init__(
self,
api_task_id: str,
agent_name: str | None = None,
timeout: float | None = None,
working_directory: str | None = None,
use_docker_backend: bool = False,
docker_container_name: str | None = None,
session_logs_dir: str | None = None,
safe_mode: bool = True,
allowed_commands: list[str] | None = None,
clone_current_env: bool = True,
):
self.api_task_id = api_task_id
if agent_name is not None:
self.agent_name = agent_name
# Get base directory from environment
base_dir = env(
"file_save_path", os.path.expanduser("~/.eigent/terminal/")
)
if working_directory is None:
working_directory = base_dir
self._agent_venv_dir = os.path.join(base_dir, self.agent_name)
logger.debug(
f"Initializing TerminalToolkit for agent={self.agent_name}",
extra={
"api_task_id": api_task_id,
"working_directory": working_directory,
"agent_venv_dir": self._agent_venv_dir,
},
)
if TerminalToolkit._thread_pool is None:
TerminalToolkit._thread_pool = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="terminal_toolkit"
)
super().__init__(
timeout=timeout,
working_directory=working_directory,
use_docker_backend=use_docker_backend,
docker_container_name=docker_container_name,
session_logs_dir=session_logs_dir,
safe_mode=safe_mode,
allowed_commands=allowed_commands,
clone_current_env=True,
install_dependencies=[],
)
# Auto-register with TaskLock for cleanup when task ends
from app.service.task import get_task_lock_if_exists
task_lock = get_task_lock_if_exists(api_task_id)
if task_lock:
task_lock.register_toolkit(self)
logger.info(
"TerminalToolkit registered for cleanup",
extra={
"api_task_id": api_task_id,
"working_directory": working_directory,
},
)
def _setup_cloned_environment(self):
"""Override to clone from terminal_base venv instead of current process venv.
Creates a lightweight clone using symlinks to the terminal_base venv,
which contains pre-installed packages (pandas, numpy, matplotlib, etc.).
"""
self.cloned_env_path = os.path.join(self._agent_venv_dir, ".venv")
terminal_base_path = get_terminal_base_venv_path()
# Check if terminal_base exists
if platform.system() == "Windows":
base_python = os.path.join(
terminal_base_path, "Scripts", "python.exe"
)
else:
base_python = os.path.join(terminal_base_path, "bin", "python")
if not os.path.exists(base_python):
logger.warning(
f"Terminal base venv not found at {terminal_base_path}, "
"falling back to system Python"
)
return
# Check if cloned env already exists
if platform.system() == "Windows":
cloned_python = os.path.join(
self.cloned_env_path, "Scripts", "python.exe"
)
else:
cloned_python = os.path.join(self.cloned_env_path, "bin", "python")
if os.path.exists(cloned_python):
logger.info(
f"Using existing cloned environment: {self.cloned_env_path}"
)
self.python_executable = cloned_python
return
logger.info(f"Cloning terminal_base venv to: {self.cloned_env_path}")
try:
# Create the cloned venv directory
os.makedirs(self.cloned_env_path, exist_ok=True)
# Clone using symlinks for efficiency
# We need to create proper venv structure with symlinks to terminal_base
self._clone_venv_with_symlinks(
terminal_base_path, self.cloned_env_path
)
self.python_executable = cloned_python
logger.info(
f"Successfully cloned environment to: {self.cloned_env_path}"
)
except Exception as e:
logger.error(
f"Failed to clone terminal_base venv: {e}", exc_info=True
)
# Cleanup partial clone
if os.path.exists(self.cloned_env_path):
shutil.rmtree(self.cloned_env_path, ignore_errors=True)
logger.warning("Falling back to system Python")
def _get_venv_path(self):
"""Return the cloned venv path for shell activation."""
cloned_env_path = getattr(self, "cloned_env_path", None)
if cloned_env_path and os.path.exists(cloned_env_path):
return cloned_env_path
return None
def _clone_venv_with_symlinks(self, source_venv: str, target_venv: str):
"""Clone a venv using symlinks for efficiency.
Creates the structure needed: pyvenv.cfg, bin/python, lib symlink, and activate scripts.
"""
is_windows = platform.system() == "Windows"
# Read source pyvenv.cfg to get Python home
source_cfg = os.path.join(source_venv, "pyvenv.cfg")
python_home = None
with open(source_cfg, encoding="utf-8") as f:
for line in f:
if line.startswith("home = "):
python_home = line.split("=", 1)[1].strip()
break
if not python_home:
raise RuntimeError(
f"Could not determine Python home from {source_cfg}"
)
# Copy pyvenv.cfg (simpler than recreating)
shutil.copy2(source_cfg, os.path.join(target_venv, "pyvenv.cfg"))
if is_windows:
# Windows: copy executables from source
target_bin = os.path.join(target_venv, "Scripts")
os.makedirs(target_bin, exist_ok=True)
source_scripts = os.path.join(source_venv, "Scripts")
for exe in ["python.exe", "pythonw.exe"]:
src = os.path.join(source_scripts, exe)
if os.path.exists(src):
shutil.copy2(src, os.path.join(target_bin, exe))
# Copy activate scripts (need to modify VIRTUAL_ENV path)
for script in ["activate.bat", "activate.ps1", "deactivate.bat"]:
src = os.path.join(source_scripts, script)
if os.path.exists(src):
with open(src, encoding="utf-8") as f:
content = f.read()
content = content.replace(source_venv, target_venv)
dst = os.path.join(target_bin, script)
with open(dst, "w", encoding="utf-8") as f:
f.write(content)
# Use directory junction for Lib (no admin rights needed, unlike symlink)
source_lib = os.path.join(source_venv, "Lib")
target_lib = os.path.join(target_venv, "Lib")
subprocess.run(
["cmd", "/c", "mklink", "/J", target_lib, source_lib],
check=True,
capture_output=True,
)
else:
# Unix: symlink python executable and lib directory
target_bin = os.path.join(target_venv, "bin")
os.makedirs(target_bin, exist_ok=True)
# Symlink python to the base Python
python_exe = os.path.join(python_home, "python3")
if not os.path.exists(python_exe):
python_exe = os.path.join(python_home, "python")
os.symlink(python_exe, os.path.join(target_bin, "python"))
os.symlink("python", os.path.join(target_bin, "python3"))
# Copy activate scripts (need to modify VIRTUAL_ENV path)
source_bin = os.path.join(source_venv, "bin")
for script in ["activate", "activate.csh", "activate.fish"]:
src = os.path.join(source_bin, script)
if os.path.exists(src):
with open(src) as f:
content = f.read()
# Replace source venv path with target venv path
content = content.replace(source_venv, target_venv)
dst = os.path.join(target_bin, script)
with open(dst, "w") as f:
f.write(content)
# Symlink lib directory
source_lib = os.path.join(source_venv, "lib")
os.symlink(source_lib, os.path.join(target_venv, "lib"))
def _write_to_log(self, log_file: str, content: str) -> None:
r"""Write content to log file with optional ANSI stripping.
Args:
log_file (str): Path to the log file
content (str): Content to write
"""
# Convert ANSI escape sequences to plain text
super()._write_to_log(log_file, content)
logger.debug(
"Terminal output logged",
extra={
"api_task_id": self.api_task_id,
"log_file": log_file,
"content_length": len(content),
},
)
self._update_terminal_output(_to_plain(content))
def _update_terminal_output(self, output: str):
task_lock = get_task_lock(self.api_task_id)
process_task_id = process_task.get("")
# Create the coroutine
coro = task_lock.put_queue(
ActionTerminalData(
action=Action.terminal,
process_task_id=process_task_id,
data=output,
)
)
# Try to get the current event loop, if none exists, create a new one in a thread
try:
loop = asyncio.get_running_loop()
# If we're in an async context, schedule the coroutine
task = loop.create_task(coro)
if hasattr(task_lock, "add_background_task"):
task_lock.add_background_task(task)
except RuntimeError:
self._thread_pool.submit(self._run_coro_in_thread, coro, task_lock)
@staticmethod
def _run_coro_in_thread(coro, task_lock):
"""
Execute coro in the thread pool, with each thread bound to a long-term event loop
"""
if not hasattr(TerminalToolkit._thread_local, "loop"):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
TerminalToolkit._thread_local.loop = loop
else:
loop = TerminalToolkit._thread_local.loop
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
TerminalToolkit._thread_local.loop = loop
try:
task = loop.create_task(coro)
if hasattr(task_lock, "add_background_task"):
task_lock.add_background_task(task)
loop.run_until_complete(task)
except Exception as e:
logging.error(
f"Failed to execute coroutine in thread pool: {str(e)}",
exc_info=True,
)
def shell_exec(
self,
command: str,
id: str | None = None,
block: bool = True,
timeout: float = 20.0,
) -> str:
r"""Executes a shell command in blocking or non-blocking mode.
Args:
command (str): The shell command to execute.
id (str, optional): A unique identifier for the command's session.
If not provided, a unique ID will be automatically generated.
block (bool, optional): Determines the execution mode. Defaults to True.
timeout (float, optional): Timeout in seconds for blocking mode. Defaults to 20.0.
Returns:
str: The output of the command execution.
"""
# Auto-generate ID if not provided
if id is None:
import time
id = f"auto_{int(time.time() * 1000)}"
result = super().shell_exec(
id=id, command=command, block=block, timeout=timeout
)
# If the command executed successfully but returned empty output,
# provide a clear success message to help the AI agent understand
# that the command completed without error.
if block and result == "":
return "Command executed successfully (no output)."
return result
def cleanup(self, remove_venv: bool = True):
"""Clean up all active sessions and optionally remove the virtual environment.
Args:
remove_venv: If True, removes the .venv or .initial_env folder created
by this toolkit. Defaults to True to prevent disk bloat.
"""
# First call parent cleanup to kill all shell sessions
super().cleanup()
if not remove_venv:
return
# Remove cloned env (.venv) if it exists
cloned_env_path = getattr(self, "cloned_env_path", None)
if cloned_env_path and os.path.exists(cloned_env_path):
try:
shutil.rmtree(cloned_env_path)
logger.info(
"Removed cloned venv",
extra={
"api_task_id": self.api_task_id,
"path": cloned_env_path,
},
)
except Exception as e:
logger.warning(
"Failed to remove cloned venv",
extra={
"api_task_id": self.api_task_id,
"path": cloned_env_path,
"error": str(e),
},
)
# Remove initial env (.initial_env) if it exists
initial_env_path = getattr(self, "initial_env_path", None)
if initial_env_path and os.path.exists(initial_env_path):
try:
shutil.rmtree(initial_env_path)
logger.info(
"Removed initial env",
extra={
"api_task_id": self.api_task_id,
"path": initial_env_path,
},
)
except Exception as e:
logger.warning(
"Failed to remove initial env",
extra={
"api_task_id": self.api_task_id,
"path": initial_env_path,
"error": str(e),
},
)
@classmethod
def shutdown(cls):
if cls._thread_pool:
cls._thread_pool.shutdown(wait=True)
cls._thread_pool = None