# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. ========= import os import platform import re import shutil import subprocess import sys import tempfile import venv from typing import Optional, Set, Tuple from camel.logger import get_logger logger = get_logger(__name__) # Pre-compiled regex patterns for command safety checks _CMD_PATTERN = re.compile( r'(?:^|;|\||&&)\s*\b([a-zA-Z_/][\w\-/]*)', re.IGNORECASE ) _QUOTED_STRING_PATTERN = re.compile(r'''["'][^"']*["']''') # Match cd command with optional quoted or unquoted path # Handles: cd path, cd "path with spaces", cd 'path with spaces' _CD_PATTERN = re.compile(r'\bcd\s+(["\'][^"\']*["\']|[^\s;|&]+)') def check_command_safety( command: str, allowed_commands: Optional[Set[str]] = None, ) -> Tuple[bool, str]: r"""Check if a command (potentially with chaining) is safe to execute. Args: command (str): The command string to check allowed_commands (Optional[Set[str]]): Set of allowed commands (whitelist mode) Returns: Tuple[bool, str]: (is_safe, reason) """ if not command.strip(): return False, "Empty command is not allowed." # Dangerous commands list - including ALL rm operations dangerous_commands = [ # System administration 'sudo', 'su', 'reboot', 'shutdown', 'halt', 'poweroff', 'init', # File system manipulation 'rm', 'chown', 'chgrp', 'umount', 'mount', # Disk operations 'dd', 'mkfs', 'fdisk', 'parted', 'fsck', 'mkswap', 'swapon', 'swapoff', # Process management 'service', 'systemctl', 'systemd', # Network configuration 'iptables', 'ip6tables', 'ifconfig', 'route', 'iptables-save', # Cron and scheduling 'crontab', 'at', 'batch', # User management 'useradd', 'userdel', 'usermod', 'passwd', 'chpasswd', 'newgrp', # Kernel modules 'modprobe', 'rmmod', 'insmod', 'lsmod', ] # Remove quoted strings to avoid false positives clean_command = _QUOTED_STRING_PATTERN.sub(' ', command) # If whitelist mode, check ALL commands against the whitelist if allowed_commands is not None: # Extract all command words (at start or after operators) found_commands = _CMD_PATTERN.findall(clean_command) for cmd in found_commands: if cmd.lower() not in allowed_commands: return ( False, f"Command '{cmd}' is not in the allowed commands list.", ) return True, "" # Check for dangerous commands for cmd in dangerous_commands: pattern = rf'(?:^|;|\||&&)\s*\b{re.escape(cmd)}\b' if re.search(pattern, clean_command, re.IGNORECASE): return False, f"Command '{cmd}' is blocked for safety." return True, "" def sanitize_command( command: str, use_docker_backend: bool = False, safe_mode: bool = True, working_dir: Optional[str] = None, allowed_commands: Optional[Set[str]] = None, ) -> Tuple[bool, str]: r"""A comprehensive command sanitizer for both local and Docker backends. Args: command (str): The command to sanitize use_docker_backend (bool): Whether using Docker backend safe_mode (bool): Whether to apply security checks working_dir (Optional[str]): Working directory for path validation allowed_commands (Optional[Set[str]]): Set of allowed commands Returns: Tuple[bool, str]: (is_safe, message_or_command) """ if not safe_mode: return True, command # Skip all checks if safe_mode is disabled # Use safety checker is_safe, reason = check_command_safety(command, allowed_commands) if not is_safe: return False, reason # Additional check for local backend: prevent cd outside working directory if not use_docker_backend and working_dir and 'cd ' in command: # Extract cd commands and check their targets # Normalize working_dir to ensure consistent comparison normalized_working_dir = os.path.normpath(os.path.abspath(working_dir)) for match in _CD_PATTERN.finditer(command): target_path = match.group(1).strip('\'"') target_dir = os.path.normpath( os.path.abspath(os.path.join(working_dir, target_path)) ) # Use os.path.commonpath for safe path comparison try: common = os.path.commonpath( [normalized_working_dir, target_dir] ) if common != normalized_working_dir: return ( False, "Cannot 'cd' outside of the working directory.", ) except ValueError: # Different drives on Windows or other path issues return False, "Cannot 'cd' outside of the working directory." return True, command # Environment management utilities def is_uv_environment() -> bool: r"""Detect whether the current Python runtime is managed by uv.""" return ( "UV_CACHE_DIR" in os.environ or "uv" in sys.executable or shutil.which("uv") is not None ) def ensure_uv_available(update_callback=None) -> Tuple[bool, Optional[str]]: r"""Ensure uv is available, installing it if necessary. Args: update_callback: Optional callback function to receive status updates Returns: Tuple[bool, Optional[str]]: (success, uv_path) """ # Check if uv is already available existing_uv = shutil.which("uv") if existing_uv is not None: if update_callback: update_callback(f"uv is already available at: {existing_uv}\n") return True, existing_uv try: if update_callback: update_callback("uv not found, installing...\n") os_type = platform.system() # Install uv using the official installer script if os_type.lower() in [ 'darwin', 'linux', ] or os_type.lower().startswith('linux'): # Use curl to download and execute the installer install_cmd = "curl -LsSf https://astral.sh/uv/install.sh | sh" result = subprocess.run( install_cmd, shell=True, capture_output=True, text=True, timeout=60, ) if result.returncode != 0: if update_callback: update_callback(f"Failed to install uv: {result.stderr}\n") return False, None # Check if uv was installed in the expected location home = os.path.expanduser("~") uv_bin_path = os.path.join(home, ".cargo", "bin") uv_executable = os.path.join(uv_bin_path, "uv") if os.path.exists(uv_executable): if update_callback: update_callback( f"uv installed successfully at: {uv_executable}\n" ) return True, uv_executable elif os_type.lower() == 'windows': # Use PowerShell to install uv on Windows install_cmd = ( "powershell -ExecutionPolicy Bypass -c " "\"irm https://astral.sh/uv/install.ps1 | iex\"" ) result = subprocess.run( install_cmd, shell=True, capture_output=True, text=True, timeout=60, ) if result.returncode != 0: if update_callback: update_callback(f"Failed to install uv: {result.stderr}\n") return False, None # Check if uv was installed in the expected location on Windows home = os.path.expanduser("~") uv_bin_path = os.path.join(home, ".cargo", "bin") uv_executable = os.path.join(uv_bin_path, "uv.exe") if os.path.exists(uv_executable): if update_callback: update_callback( f"uv installed successfully at: {uv_executable}\n" ) return True, uv_executable if update_callback: update_callback("Failed to verify uv installation\n") return False, None except Exception as e: if update_callback: update_callback(f"Error installing uv: {e!s}\n") logger.error(f"Failed to install uv: {e}") return False, None def setup_initial_env_with_uv( env_path: str, uv_path: str, working_dir: str, update_callback=None ) -> bool: r"""Set up initial environment using uv.""" try: if platform.system() == 'Windows': python_path = os.path.join(env_path, "Scripts", "python.exe") else: python_path = os.path.join(env_path, "bin", "python") if os.path.exists(python_path): if update_callback: update_callback( "[UV] Environment already exists, skipping creation\n" ) return True # Create virtual environment with Python 3.10 using uv subprocess.run( [uv_path, "venv", "--python", "3.10", env_path], check=True, capture_output=True, cwd=working_dir, timeout=300, ) # Install essential packages using uv essential_packages = [ "pip", "setuptools", "wheel", ] subprocess.run( [ uv_path, "pip", "install", "--python", python_path, *essential_packages, ], check=True, capture_output=True, cwd=working_dir, timeout=300, ) if update_callback: update_callback( "[UV] Initial environment created with Python 3.10 " "and essential packages" ) return True except subprocess.CalledProcessError as e: error_msg = e.stderr.decode() if e.stderr else str(e) if update_callback: update_callback(f"UV setup failed: {error_msg}\n") return False except subprocess.TimeoutExpired: if update_callback: update_callback("UV setup timed out after 5 minutes\n") return False def setup_initial_env_with_venv( env_path: str, working_dir: str, update_callback=None ) -> bool: r"""Set up initial environment using standard venv.""" try: # Get pip path if platform.system() == 'Windows': pip_path = os.path.join(env_path, "Scripts", "pip.exe") else: pip_path = os.path.join(env_path, "bin", "pip") # Check if environment already exists if os.path.exists(pip_path): if update_callback: update_callback( "Environment already exists, skipping creation\n" ) return True # Create virtual environment with system Python try: venv.create( env_path, with_pip=True, system_site_packages=False, symlinks=True, ) except Exception: # Clean up partial environment if os.path.exists(env_path): shutil.rmtree(env_path) # Fallback to symlinks=False if symlinks=True fails # (e.g., on some Windows configurations or macOS Beta) venv.create( env_path, with_pip=True, system_site_packages=False, symlinks=False, ) # Upgrade pip and install essential packages essential_packages = [ "pip", "setuptools", "wheel", ] subprocess.run( [pip_path, "install", "--upgrade", *essential_packages], check=True, capture_output=True, cwd=working_dir, timeout=300, ) if update_callback: update_callback( "Initial environment created with system Python " "and essential packages" ) return True except subprocess.CalledProcessError as e: error_msg = e.stderr.decode() if e.stderr else str(e) if update_callback: update_callback(f"Venv setup failed: {error_msg}\n") return False except subprocess.TimeoutExpired: if update_callback: update_callback("Venv setup timed out after 5 minutes\n") return False def clone_current_environment( env_path: str, working_dir: str, update_callback=None ) -> bool: r"""Clone the current Python environment to a new virtual environment. This function creates a new virtual environment with the same Python version as the current environment and installs all packages from the current environment. Args: env_path: Path where the new environment will be created. working_dir: Working directory for subprocess commands. update_callback: Optional callback for status updates. Returns: True if the environment was created successfully, False otherwise. """ try: if os.path.exists(env_path): if update_callback: update_callback(f"Using existing environment: {env_path}\n") return True if update_callback: update_callback( f"Cloning current Python environment to: {env_path}\n" ) # Get current Python version current_version = f"{sys.version_info.major}.{sys.version_info.minor}" # Get list of installed packages in current environment if update_callback: update_callback("Collecting installed packages...\n") freeze_result = subprocess.run( [sys.executable, "-m", "pip", "freeze"], capture_output=True, text=True, timeout=60, ) if freeze_result.returncode != 0: if update_callback: update_callback( "Warning: Failed to get installed packages, " "creating empty environment\n" ) installed_packages = "" else: installed_packages = freeze_result.stdout.strip() # Try to use uv if available success, uv_path = ensure_uv_available(update_callback) if success and uv_path: # Create venv with uv subprocess.run( [uv_path, "venv", "--python", current_version, env_path], check=True, capture_output=True, cwd=working_dir, timeout=300, ) # Get the python path from the new environment if platform.system() == 'Windows': python_path = os.path.join(env_path, "Scripts", "python.exe") else: python_path = os.path.join(env_path, "bin", "python") # Install pip, setuptools, wheel first subprocess.run( [ uv_path, "pip", "install", "--python", python_path, "pip", "setuptools", "wheel", ], check=True, capture_output=True, cwd=working_dir, timeout=300, ) # Install cloned packages if any if installed_packages: if update_callback: update_callback("Installing cloned packages with uv...\n") # Write requirements to temp file (auto-deleted) fd, requirements_file = tempfile.mkstemp( suffix=".txt", prefix="requirements_", dir=working_dir ) try: with os.fdopen(fd, "w") as f: f.write(installed_packages) subprocess.run( [ uv_path, "pip", "install", "--python", python_path, "-r", requirements_file, ], check=True, capture_output=True, cwd=working_dir, timeout=600, ) finally: if os.path.exists(requirements_file): os.remove(requirements_file) if update_callback: update_callback("[UV] Environment cloned successfully!\n") return True else: # Fallback to standard venv if update_callback: update_callback( "Falling back to standard venv for cloning environment\n" ) try: venv.create(env_path, with_pip=True, symlinks=True) except Exception: # Clean up partial environment if os.path.exists(env_path): shutil.rmtree(env_path) # Fallback to symlinks=False if symlinks=True fails venv.create(env_path, with_pip=True, symlinks=False) # Get python/pip path if platform.system() == 'Windows': python_path = os.path.join(env_path, "Scripts", "python.exe") else: python_path = os.path.join(env_path, "bin", "python") if not os.path.exists(python_path): if update_callback: update_callback( f"Warning: Python executable not found at " f"{python_path}\n" ) return False # Upgrade pip subprocess.run( [python_path, "-m", "pip", "install", "--upgrade", "pip"], check=True, capture_output=True, cwd=working_dir, timeout=60, ) # Install cloned packages if any if installed_packages: if update_callback: update_callback("Installing cloned packages with pip...\n") # Write requirements to temp file (auto-deleted) fd, requirements_file = tempfile.mkstemp( suffix=".txt", prefix="requirements_", dir=working_dir ) try: with os.fdopen(fd, "w") as f: f.write(installed_packages) subprocess.run( [ python_path, "-m", "pip", "install", "-r", requirements_file, ], check=True, capture_output=True, cwd=working_dir, timeout=600, ) finally: if os.path.exists(requirements_file): os.remove(requirements_file) if update_callback: update_callback("Environment cloned successfully!\n") return True except subprocess.CalledProcessError as e: error_msg = e.stderr.decode() if e.stderr else str(e) if update_callback: update_callback(f"Failed to clone environment: {error_msg}\n") logger.error(f"Failed to clone environment: {error_msg}") return False except subprocess.TimeoutExpired: if update_callback: update_callback("Environment cloning timed out\n") return False except Exception as e: if update_callback: update_callback(f"Failed to clone environment: {e!s}\n") logger.error(f"Failed to clone environment: {e}") return False def check_nodejs_availability(update_callback=None) -> Tuple[bool, str]: r"""Check if Node.js is available without modifying the system.""" try: # Check if Node.js is already available in the system node_result = subprocess.run( ["node", "--version"], check=False, capture_output=True, timeout=10, ) npm_result = subprocess.run( ["npm", "--version"], check=False, capture_output=True, timeout=10, ) if node_result.returncode == 0 and npm_result.returncode == 0: node_version = node_result.stdout.decode().strip() npm_version = npm_result.stdout.decode().strip() info = ( f"Node.js {node_version} and npm {npm_version} are available" ) if update_callback: update_callback(f"{info}\n") return True, info else: info = "Node.js not found. If needed, please install it manually." if update_callback: update_callback(f"Note: {info}\n") return False, info except Exception as e: info = f"Could not check Node.js availability - {e}" if update_callback: update_callback(f"Note: {info}.\n") logger.warning(f"Failed to check Node.js: {e}") return False, info