mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-23 21:06:50 +00:00
1219 lines
45 KiB
Python
1219 lines
45 KiB
Python
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import re
|
|
import shutil
|
|
import socket
|
|
import subprocess
|
|
import tempfile
|
|
import time
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from camel.logger import get_logger
|
|
from camel.toolkits import FunctionTool
|
|
from camel.toolkits.base import BaseToolkit
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class WebDeployToolkit(BaseToolkit):
|
|
r"""A simple toolkit for initializing React projects and deploying web.
|
|
|
|
This toolkit provides core functionality to:
|
|
- Initialize new React projects
|
|
- Build React applications
|
|
- Deploy HTML content to local server
|
|
- Serve static websites locally
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
timeout: Optional[float] = None,
|
|
add_branding_tag: bool = True,
|
|
logo_path: str = "../camel/misc/favicon.png",
|
|
tag_text: str = "Created by CAMEL",
|
|
tag_url: str = "https://github.com/camel-ai/camel",
|
|
remote_server_ip: Optional[str] = None,
|
|
remote_server_port: int = 8080,
|
|
):
|
|
r"""Initialize the WebDeployToolkit.
|
|
|
|
Args:
|
|
timeout (Optional[float]): Command timeout in seconds.
|
|
(default: :obj:`None`)
|
|
add_branding_tag (bool): Whether to add brand tag to deployed
|
|
pages. (default: :obj:`True`)
|
|
logo_path (str): Path to custom logo file (SVG, PNG, JPG, ICO).
|
|
(default: :obj:`../camel/misc/favicon.png`)
|
|
tag_text (str): Text to display in the tag.
|
|
(default: :obj:`Created by CAMEL`)
|
|
tag_url (str): URL to open when tag is clicked.
|
|
(default: :obj:`https://github.com/camel-ai/camel`)
|
|
remote_server_ip (Optional[str]): Remote server IP for deployment.
|
|
(default: :obj:`None` - use local deployment)
|
|
remote_server_port (int): Remote server port.
|
|
(default: :obj:`8080`)
|
|
"""
|
|
super().__init__(timeout=timeout)
|
|
self.timeout = timeout
|
|
self.server_instances: Dict[int, Any] = {} # Track running servers
|
|
self.add_branding_tag = add_branding_tag
|
|
self.logo_path = logo_path
|
|
self.tag_text = self._sanitize_text(tag_text)
|
|
self.tag_url = self._validate_url(tag_url)
|
|
self.remote_server_ip = (
|
|
self._validate_ip_or_domain(remote_server_ip)
|
|
if remote_server_ip
|
|
else None
|
|
)
|
|
self.remote_server_port = self._validate_port(remote_server_port)
|
|
self.server_registry_file = os.path.join(
|
|
tempfile.gettempdir(), "web_deploy_servers.json"
|
|
)
|
|
self._load_server_registry()
|
|
|
|
def _validate_ip_or_domain(self, address: str) -> str:
|
|
r"""Validate IP address or domain name format."""
|
|
import ipaddress
|
|
import re
|
|
|
|
try:
|
|
# Try to validate as IP address first
|
|
ipaddress.ip_address(address)
|
|
return address
|
|
except ValueError:
|
|
# If not a valid IP, check if it's a valid domain name
|
|
domain_pattern = re.compile(
|
|
r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?'
|
|
r'(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$'
|
|
)
|
|
if domain_pattern.match(address) and len(address) <= 253:
|
|
return address
|
|
else:
|
|
raise ValueError(
|
|
f"Invalid IP address or domain name: {address}"
|
|
)
|
|
|
|
def _validate_port(self, port: int) -> int:
|
|
r"""Validate port number."""
|
|
if not isinstance(port, int) or port < 1 or port > 65535:
|
|
raise ValueError(f"Invalid port number: {port}")
|
|
return port
|
|
|
|
def _sanitize_text(self, text: str) -> str:
|
|
r"""Sanitize text to prevent XSS."""
|
|
if not isinstance(text, str):
|
|
return ""
|
|
# Remove any HTML/script tags
|
|
text = re.sub(r'<[^>]+>', '', text)
|
|
# Escape special characters
|
|
text = (
|
|
text.replace('&', '&')
|
|
.replace('<', '<')
|
|
.replace('>', '>')
|
|
)
|
|
text = text.replace('"', '"').replace("'", ''')
|
|
return text[:100] # Limit length
|
|
|
|
def _validate_url(self, url: str) -> str:
|
|
r"""Validate URL format."""
|
|
if not isinstance(url, str):
|
|
raise ValueError("URL must be a string")
|
|
# Basic URL validation
|
|
url_pattern = re.compile(
|
|
r'^https?://'
|
|
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
|
|
r'localhost|'
|
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
|
|
r'(?::\d+)?'
|
|
r'(?:/?|[/?]\S+)$',
|
|
re.IGNORECASE,
|
|
)
|
|
if not url_pattern.match(url):
|
|
raise ValueError(f"Invalid URL format: {url}")
|
|
return url
|
|
|
|
def _validate_subdirectory(
|
|
self, subdirectory: Optional[str]
|
|
) -> Optional[str]:
|
|
r"""Validate subdirectory to prevent path traversal."""
|
|
if subdirectory is None:
|
|
return None
|
|
|
|
# Remove any leading/trailing slashes
|
|
subdirectory = subdirectory.strip('/')
|
|
|
|
# Check for path traversal attempts
|
|
if '..' in subdirectory or subdirectory.startswith('/'):
|
|
raise ValueError(f"Invalid subdirectory: {subdirectory}")
|
|
|
|
# Only allow alphanumeric, dash, underscore, and forward slashes
|
|
if not re.match(r'^[a-zA-Z0-9_-]+(?:/[a-zA-Z0-9_-]+)*$', subdirectory):
|
|
raise ValueError(f"Invalid subdirectory format: {subdirectory}")
|
|
|
|
return subdirectory
|
|
|
|
def _is_port_available(self, port: int) -> bool:
|
|
r"""Check if a port is available for binding."""
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
|
try:
|
|
sock.bind(('127.0.0.1', port))
|
|
return True
|
|
except OSError:
|
|
return False
|
|
|
|
def _load_server_registry(self):
|
|
r"""Load server registry from persistent storage."""
|
|
try:
|
|
if os.path.exists(self.server_registry_file):
|
|
with open(self.server_registry_file, 'r') as f:
|
|
data = json.load(f)
|
|
# Reconstruct server instances from registry
|
|
for port_str, server_info in data.items():
|
|
port = int(port_str)
|
|
pid = server_info.get('pid')
|
|
if pid and self._is_process_running(pid):
|
|
# Create a mock process object for tracking
|
|
self.server_instances[port] = {
|
|
'pid': pid,
|
|
'start_time': server_info.get('start_time'),
|
|
'directory': server_info.get('directory'),
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"Could not load server registry: {e}")
|
|
|
|
def _save_server_registry(self):
|
|
r"""Save server registry to persistent storage."""
|
|
try:
|
|
registry_data = {}
|
|
for port, server_info in self.server_instances.items():
|
|
if isinstance(server_info, dict):
|
|
registry_data[str(port)] = {
|
|
'pid': server_info.get('pid'),
|
|
'start_time': server_info.get('start_time'),
|
|
'directory': server_info.get('directory'),
|
|
}
|
|
else:
|
|
# Handle subprocess.Popen objects
|
|
registry_data[str(port)] = {
|
|
'pid': server_info.pid,
|
|
'start_time': time.time(),
|
|
'directory': getattr(server_info, 'directory', None),
|
|
}
|
|
|
|
with open(self.server_registry_file, 'w') as f:
|
|
json.dump(registry_data, f, indent=2)
|
|
except Exception as e:
|
|
logger.warning(f"Could not save server registry: {e}")
|
|
|
|
def _is_process_running(self, pid: int) -> bool:
|
|
r"""Check if a process with given PID is still running."""
|
|
try:
|
|
# Send signal 0 to check if process exists
|
|
os.kill(pid, 0)
|
|
return True
|
|
except (OSError, ProcessLookupError):
|
|
return False
|
|
|
|
def _build_custom_url(
|
|
self, domain: str, subdirectory: Optional[str] = None
|
|
) -> str:
|
|
r"""Build custom URL with optional subdirectory.
|
|
|
|
Args:
|
|
domain (str): Custom domain
|
|
subdirectory (Optional[str]): Subdirectory path
|
|
|
|
Returns:
|
|
str: Complete custom URL
|
|
"""
|
|
# Validate domain
|
|
if not re.match(r'^[a-zA-Z0-9.-]+$', domain):
|
|
raise ValueError(f"Invalid domain format: {domain}")
|
|
custom_url = f"http://{domain}:8080"
|
|
if subdirectory:
|
|
subdirectory = self._validate_subdirectory(subdirectory)
|
|
custom_url += f"/{subdirectory}"
|
|
return custom_url
|
|
|
|
def _load_logo_as_data_uri(self, logo_path: str) -> str:
|
|
r"""Load a local logo file and convert it to data URI.
|
|
|
|
Args:
|
|
logo_path (str): Path to the logo file
|
|
|
|
Returns:
|
|
str: Data URI of the logo file
|
|
"""
|
|
try:
|
|
if not os.path.exists(logo_path):
|
|
logger.warning(f"Logo file not found: {logo_path}")
|
|
return self._get_default_logo()
|
|
|
|
# Get MIME type
|
|
mime_type, _ = mimetypes.guess_type(logo_path)
|
|
if not mime_type:
|
|
# Default MIME types for common formats
|
|
ext = os.path.splitext(logo_path)[1].lower()
|
|
mime_types_map = {
|
|
'.svg': 'image/svg+xml',
|
|
'.png': 'image/png',
|
|
'.jpg': 'image/jpeg',
|
|
'.jpeg': 'image/jpeg',
|
|
'.ico': 'image/x-icon',
|
|
'.gif': 'image/gif',
|
|
}
|
|
mime_type = mime_types_map.get(ext, 'image/png')
|
|
|
|
# Read file and encode to base64
|
|
with open(logo_path, 'rb') as f:
|
|
file_data = f.read()
|
|
|
|
base64_data = base64.b64encode(file_data).decode('utf-8')
|
|
return f"data:{mime_type};base64,{base64_data}"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading logo file {logo_path}: {e}")
|
|
return self._get_default_logo()
|
|
|
|
def _get_default_logo(self) -> str:
|
|
r"""Get the default logo as data URI.
|
|
|
|
Returns:
|
|
str: Default logo data URI
|
|
"""
|
|
default_logo_data_uri = (
|
|
"data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' "
|
|
"width='32' height='32' viewBox='0 0 32 32' fill='none'%3E%3Crect "
|
|
"width='32' height='32' rx='8' fill='%23333333'/%3E%3Ctext x='16' "
|
|
"y='22' font-family='system-ui, -apple-system, sans-serif' "
|
|
"font-size='12' font-weight='700' text-anchor='middle' "
|
|
"fill='white'%3EAI%3C/text%3E%3C/svg%3E"
|
|
)
|
|
return default_logo_data_uri
|
|
|
|
def deploy_html_content(
|
|
self,
|
|
html_content: Optional[str] = None,
|
|
html_file_path: Optional[str] = None,
|
|
file_name: str = "index.html",
|
|
port: int = 8000,
|
|
domain: Optional[str] = None,
|
|
subdirectory: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
r"""Deploy HTML content to a local server or remote server.
|
|
|
|
Args:
|
|
html_content (Optional[str]): HTML content to deploy. Either this
|
|
or html_file_path must be provided.
|
|
html_file_path (Optional[str]): Path to HTML file to deploy. Either
|
|
this or html_content must be provided.
|
|
file_name (str): Name for the HTML file when using html_content.
|
|
(default: :obj:`index.html`)
|
|
port (int): Port to serve on. (default: :obj:`8000`)
|
|
domain (Optional[str]): Custom domain to access the content.
|
|
(e.g., :obj:`example.com`)
|
|
subdirectory (Optional[str]): Subdirectory path for multi-user
|
|
deployment. (e.g., :obj:`user123`)
|
|
|
|
Returns:
|
|
Dict[str, Any]: Deployment result with server URL and custom domain
|
|
info.
|
|
"""
|
|
try:
|
|
# Validate inputs
|
|
if html_content is None and html_file_path is None:
|
|
return {
|
|
'success': False,
|
|
'error': (
|
|
'Either html_content or html_file_path must be '
|
|
'provided'
|
|
),
|
|
}
|
|
|
|
if html_content is not None and html_file_path is not None:
|
|
return {
|
|
'success': False,
|
|
'error': (
|
|
'Cannot provide both html_content and '
|
|
'html_file_path'
|
|
),
|
|
}
|
|
|
|
# Read content from file if file path is provided
|
|
if html_file_path:
|
|
if not os.path.exists(html_file_path):
|
|
return {
|
|
'success': False,
|
|
'error': f'HTML file not found: {html_file_path}',
|
|
}
|
|
|
|
try:
|
|
with open(html_file_path, 'r', encoding='utf-8') as f:
|
|
html_content = f.read()
|
|
# Use the original filename if deploying from file
|
|
file_name = os.path.basename(html_file_path)
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Error reading HTML file: {e}',
|
|
}
|
|
|
|
# Check if remote deployment is configured
|
|
if self.remote_server_ip:
|
|
return self._deploy_to_remote_server(
|
|
html_content, # type: ignore[arg-type]
|
|
subdirectory,
|
|
domain,
|
|
)
|
|
else:
|
|
return self._deploy_to_local_server(
|
|
html_content, # type: ignore[arg-type]
|
|
file_name,
|
|
port,
|
|
domain,
|
|
subdirectory,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deploying HTML content: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def _deploy_to_remote_server(
|
|
self,
|
|
html_content: str,
|
|
subdirectory: Optional[str] = None,
|
|
domain: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
r"""Deploy HTML content to remote server via API.
|
|
|
|
Args:
|
|
html_content (str): HTML content to deploy
|
|
subdirectory (Optional[str]): Subdirectory path for deployment
|
|
domain (Optional[str]): Custom domain
|
|
|
|
Returns:
|
|
Dict[str, Any]: Deployment result
|
|
"""
|
|
try:
|
|
import requests
|
|
|
|
# Validate subdirectory
|
|
subdirectory = self._validate_subdirectory(subdirectory)
|
|
|
|
# Prepare deployment data
|
|
deploy_data = {
|
|
"html_content": html_content,
|
|
"subdirectory": subdirectory,
|
|
"domain": domain,
|
|
"timestamp": time.time(),
|
|
}
|
|
|
|
# Send to remote server API
|
|
api_url = f"http://{self.remote_server_ip}:{self.remote_server_port}/api/deploy"
|
|
|
|
response = requests.post(
|
|
api_url,
|
|
json=deploy_data,
|
|
timeout=self.timeout,
|
|
# Security: disable redirects to prevent SSRF
|
|
allow_redirects=False,
|
|
# Add headers for security
|
|
headers={'Content-Type': 'application/json'},
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
response.json()
|
|
|
|
# Build URLs
|
|
base_url = (
|
|
f"http://{self.remote_server_ip}:{self.remote_server_port}"
|
|
)
|
|
deployed_url = (
|
|
f"{base_url}/{subdirectory}/" if subdirectory else base_url
|
|
)
|
|
|
|
return {
|
|
'success': True,
|
|
'remote_url': deployed_url,
|
|
'server_ip': self.remote_server_ip,
|
|
'subdirectory': subdirectory,
|
|
'domain': domain,
|
|
'message': f'Successfully deployed to remote server!\n • '
|
|
f'Access URL: {deployed_url}\n • Server: '
|
|
f'{self.remote_server_ip}:{self.remote_server_port}',
|
|
'branding_tag_added': self.add_branding_tag,
|
|
}
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'error': f'Remote deployment failed: HTTP '
|
|
f'{response.status_code}',
|
|
}
|
|
|
|
except ImportError:
|
|
return {
|
|
'success': False,
|
|
'error': 'Remote deployment requires requests library. '
|
|
'Install with: pip install requests',
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Remote deployment error: {e!s}',
|
|
}
|
|
|
|
def _deploy_to_local_server(
|
|
self,
|
|
html_content: str,
|
|
file_name: str,
|
|
port: int,
|
|
domain: Optional[str],
|
|
subdirectory: Optional[str],
|
|
) -> Dict[str, Any]:
|
|
r"""Deploy HTML content to local server (original functionality).
|
|
|
|
Args:
|
|
html_content (str): HTML content to deploy
|
|
file_name (str): Name for the HTML file
|
|
port (int): Port to serve on (default: 8000)
|
|
domain (Optional[str]): Custom domain
|
|
subdirectory (Optional[str]): Subdirectory path
|
|
|
|
Returns:
|
|
Dict[str, Any]: Deployment result
|
|
"""
|
|
temp_dir = None
|
|
try:
|
|
# Validate subdirectory
|
|
subdirectory = self._validate_subdirectory(subdirectory)
|
|
|
|
# Create temporary directory
|
|
temp_dir = tempfile.mkdtemp(prefix="web_deploy_")
|
|
|
|
# Handle subdirectory for multi-user deployment
|
|
if subdirectory:
|
|
deploy_dir = os.path.join(temp_dir, subdirectory)
|
|
os.makedirs(deploy_dir, exist_ok=True)
|
|
html_file_path = os.path.join(deploy_dir, file_name)
|
|
else:
|
|
html_file_path = os.path.join(temp_dir, file_name)
|
|
|
|
# Write enhanced HTML content to file
|
|
with open(html_file_path, 'w', encoding='utf-8') as f:
|
|
f.write(html_content)
|
|
|
|
# Start server
|
|
server_result = self._serve_static_files(temp_dir, port)
|
|
|
|
if server_result['success']:
|
|
# Build URLs with localhost fallback
|
|
local_url = server_result["server_url"]
|
|
if subdirectory:
|
|
local_url += f"/{subdirectory}"
|
|
|
|
# Custom domain URL (if provided)
|
|
custom_url = (
|
|
self._build_custom_url(domain, subdirectory)
|
|
if domain
|
|
else None
|
|
)
|
|
|
|
# Localhost fallback URL
|
|
localhost_url = f"http://localhost:{port}"
|
|
if subdirectory:
|
|
localhost_url += f"/{subdirectory}"
|
|
|
|
# Build message with all access options
|
|
message = 'HTML content deployed successfully!\n'
|
|
message += f' • Local access: {local_url}\n'
|
|
message += f' • Localhost fallback: {localhost_url}'
|
|
|
|
if custom_url:
|
|
message += f'\n • Custom domain: {custom_url}'
|
|
|
|
if self.add_branding_tag:
|
|
message += f'\n • Branding: "{self.tag_text}" tag added'
|
|
|
|
server_result.update(
|
|
{
|
|
'html_file': html_file_path,
|
|
'temp_directory': temp_dir,
|
|
'local_url': local_url,
|
|
'localhost_url': localhost_url,
|
|
'custom_url': custom_url,
|
|
'domain': domain,
|
|
'subdirectory': subdirectory,
|
|
'message': message,
|
|
'branding_tag_added': self.add_branding_tag,
|
|
}
|
|
)
|
|
|
|
return server_result
|
|
|
|
except Exception as e:
|
|
# Clean up temp directory on error
|
|
if temp_dir and os.path.exists(temp_dir):
|
|
try:
|
|
shutil.rmtree(temp_dir)
|
|
except Exception:
|
|
pass
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def _serve_static_files(self, directory: str, port: int) -> Dict[str, Any]:
|
|
r"""Serve static files from a directory using a local HTTP server
|
|
(as a background process).
|
|
|
|
Args:
|
|
directory (str): Directory to serve files from
|
|
port (int): Port to serve on (default: 8000)
|
|
|
|
Returns:
|
|
Dict[str, Any]: Server information
|
|
"""
|
|
import subprocess
|
|
|
|
try:
|
|
if not os.path.exists(directory):
|
|
return {
|
|
'success': False,
|
|
'error': f'Directory {directory} does not exist',
|
|
}
|
|
|
|
if not os.path.isdir(directory):
|
|
return {
|
|
'success': False,
|
|
'error': f'{directory} is not a directory',
|
|
}
|
|
|
|
# Validate port
|
|
port = self._validate_port(port)
|
|
|
|
# Check if port is already in use
|
|
if port in self.server_instances:
|
|
return {
|
|
'success': False,
|
|
'error': f'Port {port} is already in use by this toolkit',
|
|
}
|
|
|
|
# Check if port is available
|
|
if not self._is_port_available(port):
|
|
return {
|
|
'success': False,
|
|
'error': (
|
|
f'Port {port} is already in use by another process'
|
|
),
|
|
}
|
|
|
|
# Start http.server as a background process with security
|
|
# improvements
|
|
process = subprocess.Popen(
|
|
[
|
|
"python3",
|
|
"-m",
|
|
"http.server",
|
|
str(port),
|
|
"--bind",
|
|
"127.0.0.1",
|
|
],
|
|
cwd=directory,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
shell=False, # Prevent shell injection
|
|
env={**os.environ, 'PYTHONDONTWRITEBYTECODE': '1'},
|
|
)
|
|
|
|
# Store both process and metadata for persistence
|
|
self.server_instances[port] = {
|
|
'process': process,
|
|
'pid': process.pid,
|
|
'start_time': time.time(),
|
|
'directory': directory,
|
|
}
|
|
self._save_server_registry()
|
|
|
|
# Wait for server to start with timeout
|
|
start_time = time.time()
|
|
while time.time() - start_time < 5:
|
|
if not self._is_port_available(port):
|
|
# Port is now in use, server started
|
|
break
|
|
time.sleep(0.1)
|
|
else:
|
|
# Server didn't start in time
|
|
process.terminate()
|
|
del self.server_instances[port]
|
|
return {
|
|
'success': False,
|
|
'error': f'Server failed to start on port {port}',
|
|
}
|
|
|
|
server_url = f"http://localhost:{port}"
|
|
|
|
return {
|
|
'success': True,
|
|
'server_url': server_url,
|
|
'port': port,
|
|
'directory': directory,
|
|
'message': f'Static files served from {directory} at '
|
|
f'{server_url} (background process)',
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error serving static files: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def deploy_folder(
|
|
self,
|
|
folder_path: str,
|
|
port: int = 8000,
|
|
domain: Optional[str] = None,
|
|
subdirectory: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
r"""Deploy a folder containing web files.
|
|
|
|
Args:
|
|
folder_path (str): Path to the folder to deploy.
|
|
port (int): Port to serve on. (default: :obj:`8000`)
|
|
domain (Optional[str]): Custom domain to access the content.
|
|
(e.g., :obj:`example.com`)
|
|
subdirectory (Optional[str]): Subdirectory path for multi-user
|
|
deployment. (e.g., :obj:`user123`)
|
|
|
|
Returns:
|
|
Dict[str, Any]: Deployment result with custom domain info.
|
|
"""
|
|
try:
|
|
if not os.path.exists(folder_path):
|
|
return {
|
|
'success': False,
|
|
'error': f'Folder {folder_path} does not exist',
|
|
}
|
|
|
|
if not os.path.isdir(folder_path):
|
|
return {
|
|
'success': False,
|
|
'error': f'{folder_path} is not a directory',
|
|
}
|
|
|
|
# Validate subdirectory
|
|
subdirectory = self._validate_subdirectory(subdirectory)
|
|
|
|
# Check if remote deployment is configured
|
|
if self.remote_server_ip:
|
|
return self._deploy_folder_to_remote_server(
|
|
folder_path,
|
|
subdirectory,
|
|
domain,
|
|
)
|
|
else:
|
|
return self._deploy_folder_to_local_server(
|
|
folder_path,
|
|
port,
|
|
domain,
|
|
subdirectory,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deploying folder: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def _deploy_folder_to_local_server(
|
|
self,
|
|
folder_path: str,
|
|
port: int,
|
|
domain: Optional[str],
|
|
subdirectory: Optional[str],
|
|
) -> Dict[str, Any]:
|
|
r"""Deploy folder to local server (original functionality).
|
|
|
|
Args:
|
|
folder_path (str): Path to the folder to deploy
|
|
port (int): Port to serve on
|
|
domain (Optional[str]): Custom domain
|
|
subdirectory (Optional[str]): Subdirectory path
|
|
|
|
Returns:
|
|
Dict[str, Any]: Deployment result
|
|
"""
|
|
try:
|
|
temp_dir = None
|
|
if self.add_branding_tag:
|
|
# Create temporary directory and copy all files
|
|
temp_dir = tempfile.mkdtemp(prefix="web_deploy_enhanced_")
|
|
|
|
# Handle subdirectory structure
|
|
if subdirectory:
|
|
deploy_base = os.path.join(temp_dir, subdirectory)
|
|
os.makedirs(deploy_base, exist_ok=True)
|
|
shutil.copytree(
|
|
folder_path,
|
|
deploy_base,
|
|
dirs_exist_ok=True,
|
|
)
|
|
deploy_path = deploy_base
|
|
else:
|
|
shutil.copytree(
|
|
folder_path,
|
|
os.path.join(temp_dir, "site"),
|
|
dirs_exist_ok=True,
|
|
)
|
|
deploy_path = os.path.join(temp_dir, "site")
|
|
|
|
# Enhance HTML files with branding tag
|
|
html_files_enhanced = []
|
|
for root, _, files in os.walk(deploy_path):
|
|
for file in files:
|
|
if file.endswith('.html'):
|
|
html_file_path = os.path.join(root, file)
|
|
try:
|
|
with open(
|
|
html_file_path, 'r', encoding='utf-8'
|
|
) as f:
|
|
original_content = f.read()
|
|
|
|
with open(
|
|
html_file_path, 'w', encoding='utf-8'
|
|
) as f:
|
|
f.write(original_content)
|
|
|
|
html_files_enhanced.append(
|
|
os.path.relpath(
|
|
html_file_path, deploy_path
|
|
)
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to enhance {html_file_path}: {e}"
|
|
)
|
|
|
|
# Serve the enhanced folder
|
|
server_result = self._serve_static_files(temp_dir, port)
|
|
|
|
if server_result['success']:
|
|
# Build URLs with localhost fallback
|
|
local_url = server_result["server_url"]
|
|
if subdirectory:
|
|
local_url += f"/{subdirectory}"
|
|
|
|
# Custom domain URL (if provided)
|
|
custom_url = (
|
|
self._build_custom_url(domain, subdirectory)
|
|
if domain
|
|
else None
|
|
)
|
|
|
|
# Localhost fallback URL
|
|
localhost_url = f"http://localhost:{port}"
|
|
if subdirectory:
|
|
localhost_url += f"/{subdirectory}"
|
|
|
|
# Build message with all access options
|
|
message = 'Folder deployed successfully!\n'
|
|
message += f' • Local access: {local_url}\n'
|
|
message += f' • Localhost fallback: {localhost_url}'
|
|
|
|
if custom_url:
|
|
message += f'\n • Custom domain: {custom_url}'
|
|
|
|
if self.add_branding_tag:
|
|
message += f'\n • Branding: "{self.tag_text}" tag '
|
|
message += (
|
|
f'added to {len(html_files_enhanced)} HTML files'
|
|
)
|
|
|
|
server_result.update(
|
|
{
|
|
'original_folder': folder_path,
|
|
'enhanced_folder': deploy_path,
|
|
'html_files_enhanced': html_files_enhanced,
|
|
'local_url': local_url,
|
|
'localhost_url': localhost_url,
|
|
'custom_url': custom_url,
|
|
'domain': domain,
|
|
'subdirectory': subdirectory,
|
|
'branding_tag_added': True,
|
|
'message': message,
|
|
}
|
|
)
|
|
|
|
return server_result
|
|
else:
|
|
# Check for index.html
|
|
index_html = os.path.join(folder_path, 'index.html')
|
|
if not os.path.exists(index_html):
|
|
logger.warning(f'No index.html found in {folder_path}')
|
|
|
|
# Handle subdirectory for original folder deployment
|
|
if subdirectory:
|
|
temp_dir = tempfile.mkdtemp(prefix="web_deploy_")
|
|
deploy_base = os.path.join(temp_dir, subdirectory)
|
|
shutil.copytree(
|
|
folder_path, deploy_base, dirs_exist_ok=True
|
|
)
|
|
deploy_path = temp_dir
|
|
else:
|
|
deploy_path = folder_path
|
|
|
|
# Serve the folder
|
|
server_result = self._serve_static_files(deploy_path, port)
|
|
|
|
if server_result['success']:
|
|
# Build URLs with localhost fallback
|
|
local_url = server_result["server_url"]
|
|
if subdirectory:
|
|
local_url += f"/{subdirectory}"
|
|
|
|
# Custom domain URL (if provided)
|
|
custom_url = (
|
|
self._build_custom_url(domain, subdirectory)
|
|
if domain
|
|
else None
|
|
)
|
|
|
|
# Localhost fallback URL
|
|
localhost_url = f"http://localhost:{port}"
|
|
if subdirectory:
|
|
localhost_url += f"/{subdirectory}"
|
|
|
|
# Build message with all access options
|
|
message = 'Folder deployed successfully!\n'
|
|
message += f' • Local access: {local_url}\n'
|
|
message += f' • Localhost fallback: {localhost_url}'
|
|
|
|
if custom_url:
|
|
message += f'\n • Custom domain: {custom_url}'
|
|
|
|
server_result.update(
|
|
{
|
|
'local_url': local_url,
|
|
'localhost_url': localhost_url,
|
|
'custom_url': custom_url,
|
|
'domain': domain,
|
|
'subdirectory': subdirectory,
|
|
'message': message,
|
|
'branding_tag_added': False,
|
|
}
|
|
)
|
|
|
|
return server_result
|
|
|
|
except Exception as e:
|
|
# Clean up temp directory on error
|
|
if (
|
|
'temp_dir' in locals()
|
|
and temp_dir
|
|
and os.path.exists(temp_dir)
|
|
):
|
|
try:
|
|
shutil.rmtree(temp_dir)
|
|
except Exception:
|
|
pass
|
|
logger.error(f"Error deploying folder: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def _deploy_folder_to_remote_server(
|
|
self,
|
|
folder_path: str,
|
|
subdirectory: Optional[str] = None,
|
|
domain: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
r"""Deploy folder to remote server via API.
|
|
|
|
Args:
|
|
folder_path (str): Path to the folder to deploy
|
|
subdirectory (Optional[str]): Subdirectory path for deployment
|
|
domain (Optional[str]): Custom domain
|
|
|
|
Returns:
|
|
Dict[str, Any]: Deployment result
|
|
"""
|
|
try:
|
|
import tempfile
|
|
import zipfile
|
|
|
|
import requests
|
|
|
|
# Validate subdirectory
|
|
subdirectory = self._validate_subdirectory(subdirectory)
|
|
|
|
# Create a temporary zip file of the folder
|
|
with tempfile.NamedTemporaryFile(
|
|
suffix='.zip', delete=False
|
|
) as temp_zip:
|
|
zip_path = temp_zip.name
|
|
|
|
try:
|
|
# Create zip archive
|
|
with zipfile.ZipFile(
|
|
zip_path, 'w', zipfile.ZIP_DEFLATED
|
|
) as zipf:
|
|
for root, _, files in os.walk(folder_path):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
# Calculate relative path within the archive
|
|
arcname = os.path.relpath(file_path, folder_path)
|
|
zipf.write(file_path, arcname)
|
|
|
|
# Read zip file as base64
|
|
with open(zip_path, 'rb') as f:
|
|
zip_data = base64.b64encode(f.read()).decode('utf-8')
|
|
|
|
# Prepare deployment data
|
|
deploy_data = {
|
|
"deployment_type": "folder",
|
|
"folder_data": zip_data,
|
|
"subdirectory": subdirectory,
|
|
"domain": domain,
|
|
"timestamp": time.time(),
|
|
}
|
|
|
|
# Add logo data if custom logo is specified
|
|
if self.logo_path and os.path.exists(self.logo_path):
|
|
try:
|
|
logo_ext = os.path.splitext(self.logo_path)[1]
|
|
logo_filename = f"custom_logo{logo_ext}"
|
|
|
|
with open(self.logo_path, 'rb') as logo_file:
|
|
logo_data = base64.b64encode(
|
|
logo_file.read()
|
|
).decode('utf-8')
|
|
|
|
deploy_data.update(
|
|
{
|
|
"logo_data": logo_data,
|
|
"logo_ext": logo_ext,
|
|
"logo_filename": logo_filename,
|
|
}
|
|
)
|
|
except Exception as logo_error:
|
|
logger.warning(
|
|
f"Failed to process custom logo: {logo_error}"
|
|
)
|
|
|
|
# Send to remote server API
|
|
api_url = f"http://{self.remote_server_ip}:{self.remote_server_port}/api/deploy"
|
|
|
|
response = requests.post(
|
|
api_url,
|
|
json=deploy_data,
|
|
timeout=self.timeout
|
|
or 60, # Extended timeout for folder uploads
|
|
allow_redirects=False,
|
|
headers={'Content-Type': 'application/json'},
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
|
|
# Build URLs
|
|
base_url = f"http://{self.remote_server_ip}:{self.remote_server_port}"
|
|
deployed_url = (
|
|
f"{base_url}/{subdirectory}/"
|
|
if subdirectory
|
|
else base_url
|
|
)
|
|
|
|
return {
|
|
'success': True,
|
|
'remote_url': deployed_url,
|
|
'server_ip': self.remote_server_ip,
|
|
'subdirectory': subdirectory,
|
|
'domain': domain,
|
|
'message': (
|
|
f'Successfully deployed folder to remote server!\n'
|
|
f' • Access URL: {deployed_url}\n'
|
|
f' • Server: '
|
|
f'{self.remote_server_ip}:{self.remote_server_port}'
|
|
),
|
|
'branding_tag_added': self.add_branding_tag,
|
|
'logo_processed': result.get('logo_processed', False),
|
|
}
|
|
else:
|
|
return {
|
|
'success': False,
|
|
'error': (
|
|
f'Remote folder deployment failed: '
|
|
f'HTTP {response.status_code}'
|
|
),
|
|
}
|
|
|
|
finally:
|
|
# Clean up temporary zip file
|
|
if os.path.exists(zip_path):
|
|
os.unlink(zip_path)
|
|
|
|
except ImportError:
|
|
return {
|
|
'success': False,
|
|
'error': 'Remote deployment requires requests library. '
|
|
'Install with: pip install requests',
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'Remote folder deployment error: {e!s}',
|
|
}
|
|
|
|
def stop_server(self, port: int) -> Dict[str, Any]:
|
|
r"""Stop a running server on the specified port.
|
|
|
|
Args:
|
|
port (int): Port of the server to stop.
|
|
|
|
Returns:
|
|
Dict[str, Any]: Result of stopping the server.
|
|
"""
|
|
try:
|
|
# Validate port
|
|
port = self._validate_port(port)
|
|
# First check persistent registry for servers
|
|
self._load_server_registry()
|
|
|
|
if port not in self.server_instances:
|
|
# Check if there's a process running on this port by PID
|
|
if os.path.exists(self.server_registry_file):
|
|
with open(self.server_registry_file, 'r') as f:
|
|
data = json.load(f)
|
|
port_str = str(port)
|
|
if port_str in data:
|
|
pid = data[port_str].get('pid')
|
|
if pid and self._is_process_running(pid):
|
|
try:
|
|
os.kill(pid, 15) # SIGTERM
|
|
# Remove from registry
|
|
del data[port_str]
|
|
with open(
|
|
self.server_registry_file, 'w'
|
|
) as f:
|
|
json.dump(data, f, indent=2)
|
|
return {
|
|
'success': True,
|
|
'port': port,
|
|
'message': (
|
|
f'Server on port {port} stopped '
|
|
f'successfully (from registry)'
|
|
),
|
|
}
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error stopping server by PID: {e}"
|
|
)
|
|
|
|
return {
|
|
'success': False,
|
|
'error': f'No server running on port {port}',
|
|
}
|
|
|
|
server_info = self.server_instances[port]
|
|
if isinstance(server_info, dict):
|
|
process = server_info.get('process')
|
|
pid = server_info.get('pid')
|
|
|
|
# Stop the main server process
|
|
if process:
|
|
process.terminate()
|
|
process.wait(
|
|
timeout=5
|
|
) # Wait for process to terminate gracefully
|
|
elif pid and self._is_process_running(pid):
|
|
os.kill(pid, 15) # SIGTERM
|
|
|
|
else:
|
|
# Handle old-style direct process objects
|
|
server_info.terminate()
|
|
server_info.wait(timeout=5)
|
|
|
|
del self.server_instances[port]
|
|
self._save_server_registry()
|
|
|
|
return {
|
|
'success': True,
|
|
'port': port,
|
|
'message': f'Server on port {port} stopped successfully',
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
if isinstance(server_info, dict):
|
|
process = server_info.get('process')
|
|
|
|
if process:
|
|
process.kill()
|
|
process.wait(timeout=5)
|
|
else:
|
|
server_info.kill()
|
|
server_info.wait(timeout=5)
|
|
del self.server_instances[port]
|
|
self._save_server_registry()
|
|
return {
|
|
'success': True,
|
|
'port': port,
|
|
'message': f'Server on port {port} stopped after timeout',
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error stopping server: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def list_running_servers(self) -> Dict[str, Any]:
|
|
r"""List all currently running servers.
|
|
|
|
Returns:
|
|
Dict[str, Any]: Information about running servers
|
|
"""
|
|
try:
|
|
self._load_server_registry()
|
|
|
|
running_servers = []
|
|
current_time = time.time()
|
|
|
|
for port, server_info in self.server_instances.items():
|
|
if isinstance(server_info, dict):
|
|
start_time = server_info.get('start_time', 0)
|
|
running_time = current_time - start_time
|
|
|
|
running_servers.append(
|
|
{
|
|
'port': port,
|
|
'pid': server_info.get('pid'),
|
|
'directory': server_info.get('directory'),
|
|
'start_time': start_time,
|
|
'running_time': running_time,
|
|
'url': f'http://localhost:{port}',
|
|
}
|
|
)
|
|
|
|
return {
|
|
'success': True,
|
|
'servers': running_servers,
|
|
'total_servers': len(running_servers),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing servers: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def get_tools(self) -> List[FunctionTool]:
|
|
r"""Get all available tools from the WebDeployToolkit."""
|
|
return [
|
|
FunctionTool(self.deploy_html_content),
|
|
FunctionTool(self.deploy_folder),
|
|
FunctionTool(self.stop_server),
|
|
FunctionTool(self.list_running_servers),
|
|
]
|