eigent/backend/app/utils/toolkit/hybrid_browser_toolkit.py
2025-08-12 01:16:39 +02:00

260 lines
11 KiB
Python

import os
import subprocess
import time
from typing import Any, Dict, List
from camel.models import BaseModelBackend
from camel.toolkits.hybrid_browser_toolkit.hybrid_browser_toolkit_ts import (
HybridBrowserToolkit as BaseHybridBrowserToolkit,
)
from camel.toolkits.hybrid_browser_toolkit.ws_wrapper import WebSocketBrowserWrapper as BaseWebSocketBrowserWrapper
from loguru import logger
import websockets
from app.component.command import bun, uv
from app.service.task import Agents
from app.utils.listen.toolkit_listen import listen_toolkit
from app.utils.toolkit.abstract_toolkit import AbstractToolkit
class WebSocketBrowserWrapper(BaseWebSocketBrowserWrapper):
async def start(self):
# Check if node_modules exists (dependencies installed)
node_modules_path = os.path.join(self.ts_dir, "node_modules")
if not os.path.exists(node_modules_path):
logger.warning("Node modules not found. Running npm install...")
install_result = subprocess.run(
[uv(), "run", "npm", "install"],
cwd=self.ts_dir,
capture_output=True,
text=True,
)
if install_result.returncode != 0:
logger.error(f"npm install failed: {install_result.stderr}")
raise RuntimeError(
f"Failed to install npm dependencies: {install_result.stderr}\n" # noqa:E501
f"Please run 'npm install' in {self.ts_dir} manually."
)
logger.info("npm dependencies installed successfully")
# Ensure the TypeScript code is built
build_result = subprocess.run(
[uv(), "run", "npm", "run", "build"],
cwd=self.ts_dir,
capture_output=True,
text=True,
)
if build_result.returncode != 0:
logger.error(f"TypeScript build failed: {build_result.stderr}")
raise RuntimeError(f"TypeScript build failed: {build_result.stderr}")
# Start the WebSocket server
self.process = subprocess.Popen(
[uv(), "run", "node", "websocket-server.js"], # bun not support playwright, use uv nodejs-bin
cwd=self.ts_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Wait for server to output the port
server_ready = False
timeout = 10 # 10 seconds timeout
start_time = time.time()
while not server_ready and time.time() - start_time < timeout:
if self.process.poll() is not None:
# Process died
stderr = self.process.stderr.read() # type: ignore
raise RuntimeError(f"WebSocket server failed to start: {stderr}")
try:
line = self.process.stdout.readline() # type: ignore
logger.debug(f"WebSocket server output: {line}")
if line.startswith("SERVER_READY:"):
self.server_port = int(line.split(":")[1].strip())
server_ready = True
logger.info(f"WebSocket server ready on port {self.server_port}")
except (ValueError, IndexError):
continue
if not server_ready:
self.process.kill()
raise RuntimeError("WebSocket server failed to start within timeout")
# Connect to the WebSocket server
try:
self.websocket = await websockets.connect(
f"ws://localhost:{self.server_port}",
ping_interval=30,
ping_timeout=10,
max_size=50 * 1024 * 1024, # 50MB limit to match server
)
logger.info("Connected to WebSocket server")
except Exception as e:
self.process.kill()
raise RuntimeError(f"Failed to connect to WebSocket server: {e}") from e
# Initialize the browser toolkit
logger.debug(f"send init {self.config}")
await self._send_command("init", self.config)
logger.debug("WebSocket server initialized successfully")
websocket_browser_wrapper = None
"""ensure only one instance of websocket_browser_wrapper"""
class HybridBrowserToolkit(BaseHybridBrowserToolkit, AbstractToolkit):
agent_name: str = Agents.search_agent
def __init__(
self,
api_task_id: str,
*,
headless: bool = False,
user_data_dir: str | None = None,
stealth: bool = True,
web_agent_model: BaseModelBackend | None = None,
cache_dir: str = "tmp/",
enabled_tools: List[str] | None = None,
browser_log_to_file: bool = False,
session_id: str | None = None,
default_start_url: str = "https://google.com/",
default_timeout: int | None = None,
short_timeout: int | None = None,
navigation_timeout: int | None = None,
network_idle_timeout: int | None = None,
screenshot_timeout: int | None = None,
page_stability_timeout: int | None = None,
dom_content_loaded_timeout: int | None = None,
viewport_limit: bool = False,
connect_over_cdp: bool = False,
cdp_url: str | None = None,
) -> None:
self.api_task_id = api_task_id
super().__init__(
headless=headless,
user_data_dir=user_data_dir,
stealth=stealth,
web_agent_model=web_agent_model,
cache_dir=cache_dir,
enabled_tools=enabled_tools,
browser_log_to_file=browser_log_to_file,
session_id=session_id,
default_start_url=default_start_url,
default_timeout=default_timeout,
short_timeout=short_timeout,
navigation_timeout=navigation_timeout,
network_idle_timeout=network_idle_timeout,
screenshot_timeout=screenshot_timeout,
page_stability_timeout=page_stability_timeout,
dom_content_loaded_timeout=dom_content_loaded_timeout,
viewport_limit=viewport_limit,
connect_over_cdp=connect_over_cdp,
cdp_url=cdp_url,
)
async def _ensure_ws_wrapper(self):
"""Ensure WebSocket wrapper is initialized."""
if self._ws_wrapper is None:
global websocket_browser_wrapper
if websocket_browser_wrapper is None:
websocket_browser_wrapper = WebSocketBrowserWrapper(self._ws_config)
self._ws_wrapper = websocket_browser_wrapper
await self._ws_wrapper.start()
def clone_for_new_session(self, new_session_id: str | None = None) -> "HybridBrowserToolkit":
import uuid
if new_session_id is None:
new_session_id = str(uuid.uuid4())[:8]
return HybridBrowserToolkit(
self.api_task_id,
headless=self._headless,
user_data_dir=self._user_data_dir,
stealth=self._stealth,
web_agent_model=self._web_agent_model,
cache_dir=f"{self._cache_dir.rstrip('/')}/_clone_{new_session_id}/",
enabled_tools=self.enabled_tools.copy(),
browser_log_to_file=self._browser_log_to_file,
session_id=new_session_id,
default_start_url=self._default_start_url,
default_timeout=self._default_timeout,
short_timeout=self._short_timeout,
navigation_timeout=self._navigation_timeout,
network_idle_timeout=self._network_idle_timeout,
screenshot_timeout=self._screenshot_timeout,
page_stability_timeout=self._page_stability_timeout,
dom_content_loaded_timeout=self._dom_content_loaded_timeout,
viewport_limit=self._viewport_limit,
connect_over_cdp=self.config_loader.get_browser_config().connect_over_cdp,
cdp_url=self.config_loader.get_browser_config().cdp_url,
)
@classmethod
def toolkit_name(cls) -> str:
return "Browser Toolkit"
@listen_toolkit(BaseHybridBrowserToolkit.browser_open)
async def browser_open(self) -> Dict[str, Any]:
return await super().browser_open()
@listen_toolkit(BaseHybridBrowserToolkit.browser_close)
async def browser_close(self) -> str:
return await super().browser_close()
@listen_toolkit(BaseHybridBrowserToolkit.browser_visit_page)
async def browser_visit_page(self, url: str) -> Dict[str, Any]:
return await super().browser_visit_page(url)
@listen_toolkit(BaseHybridBrowserToolkit.browser_back)
async def browser_back(self) -> Dict[str, Any]:
return await super().browser_back()
@listen_toolkit(BaseHybridBrowserToolkit.browser_forward)
async def browser_forward(self) -> Dict[str, Any]:
return await super().browser_forward()
@listen_toolkit(BaseHybridBrowserToolkit.browser_get_page_snapshot)
async def browser_get_page_snapshot(self) -> str:
return await super().browser_get_page_snapshot()
@listen_toolkit(BaseHybridBrowserToolkit.browser_get_som_screenshot)
async def browser_get_som_screenshot(self, read_image: bool = False, instruction: str | None = None) -> str:
return await super().browser_get_som_screenshot(read_image, instruction)
@listen_toolkit(BaseHybridBrowserToolkit.browser_click)
async def browser_click(self, *, ref: str) -> Dict[str, Any]:
return await super().browser_click(ref=ref)
@listen_toolkit(BaseHybridBrowserToolkit.browser_type)
async def browser_type(self, *, ref: str, text: str) -> Dict[str, Any]:
return await super().browser_type(ref=ref, text=text)
@listen_toolkit(BaseHybridBrowserToolkit.browser_select)
async def browser_select(self, *, ref: str, value: str) -> Dict[str, Any]:
return await super().browser_select(ref=ref, value=value)
@listen_toolkit(BaseHybridBrowserToolkit.browser_scroll)
async def browser_scroll(self, *, direction: str, amount: int = 500) -> Dict[str, Any]:
return await super().browser_scroll(direction=direction, amount=amount)
@listen_toolkit(BaseHybridBrowserToolkit.browser_enter)
async def browser_enter(self) -> Dict[str, Any]:
return await super().browser_enter()
@listen_toolkit(BaseHybridBrowserToolkit.browser_wait_user)
async def browser_wait_user(self, timeout_sec: float | None = None) -> Dict[str, Any]:
return await super().browser_wait_user(timeout_sec)
@listen_toolkit(BaseHybridBrowserToolkit.browser_switch_tab)
async def browser_switch_tab(self, *, tab_id: str) -> Dict[str, Any]:
return await super().browser_switch_tab(tab_id=tab_id)
@listen_toolkit(BaseHybridBrowserToolkit.browser_close_tab)
async def browser_close_tab(self, *, tab_id: str) -> Dict[str, Any]:
return await super().browser_close_tab(tab_id=tab_id)
@listen_toolkit(BaseHybridBrowserToolkit.browser_get_tab_info)
async def browser_get_tab_info(self) -> Dict[str, Any]:
return await super().browser_get_tab_info()