# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import asyncio import inspect import platform import threading import uuid from collections.abc import Callable from dataclasses import dataclass from typing import Any from camel.messages import BaseMessage from camel.toolkits import ( FunctionTool, RegisteredAgentToolkit, ToolkitMessageIntegration, ) from app.agent.agent_model import agent_model from app.agent.listen_chat_agent import logger from app.agent.prompt import build_browser_system_prompt from app.agent.toolkit.human_toolkit import HumanToolkit from app.agent.toolkit.hybrid_browser_toolkit import HybridBrowserToolkit # TODO: Remove NoteTakingToolkit and use TerminalToolkit instead from app.agent.toolkit.note_taking_toolkit import NoteTakingToolkit from app.agent.toolkit.screenshot_toolkit import ScreenshotToolkit from app.agent.toolkit.search_toolkit import SearchToolkit from app.agent.toolkit.skill_toolkit import SkillToolkit from app.agent.toolkit.terminal_toolkit import TerminalToolkit from app.agent.utils import NOW_STR from app.component.environment import env from app.model.chat import Chat from app.service.task import Agents from app.utils.file_utils import get_working_directory @dataclass class BrowserAgentTooling: tools: list[FunctionTool | Callable[..., Any]] tool_names: list[str] toolkits_to_register_agent: list[RegisteredAgentToolkit] def _build_browser_system_prompt_compat(**kwargs: Any) -> str: """Call the shared prompt builder with only supported keyword args.""" supported_kwargs = { key: value for key, value in kwargs.items() if key in inspect.signature(build_browser_system_prompt).parameters } return build_browser_system_prompt(**supported_kwargs) def _get_browser_port(browser: dict) -> int: """Extract port from a browser config dict, with fallback to env default.""" return int(browser.get("port", env("browser_port", "9222"))) class CdpBrowserPoolManager: """Manages CDP browser pool occupation to ensure parallel tasks use different browsers.""" def __init__(self): self._occupied_ports: dict[int, str] = {} self._session_to_port: dict[str, int] = {} self._session_to_task: dict[str, str | None] = {} self._lock = threading.Lock() def acquire_browser( self, cdp_browsers: list[dict], session_id: str, task_id: str | None = None, ) -> dict | None: """Acquire an available browser from the pool. Args: cdp_browsers: List of browser configurations. session_id: Unique session identifier. task_id: Optional task identifier for ownership tracking. Returns: Browser configuration dict or None if all occupied. """ with self._lock: for browser in cdp_browsers: port = browser.get("port") if port and port not in self._occupied_ports: self._occupied_ports[port] = session_id self._session_to_port[session_id] = port self._session_to_task[session_id] = task_id logger.info( f"Acquired browser on port {port} for session " f"{session_id}. Occupied: " f"{list(self._occupied_ports.keys())}" ) return browser logger.warning( f"No available browsers for session {session_id}. " f"All occupied: {list(self._occupied_ports.keys())}" ) return None def release_browser(self, port: int, session_id: str): """Release a browser back to the pool.""" with self._lock: if ( port in self._occupied_ports and self._occupied_ports[port] == session_id ): del self._occupied_ports[port] self._session_to_port.pop(session_id, None) self._session_to_task.pop(session_id, None) logger.info( f"Released browser on port {port} from session " f"{session_id}. Occupied: " f"{list(self._occupied_ports.keys())}" ) else: logger.warning( f"Attempted to release browser on port {port} " f"but it was not occupied by {session_id}" ) def release_by_task(self, task_id: str) -> list[int]: """Release all browsers associated with a task_id. Returns: List of released ports. """ released_ports = [] with self._lock: sessions = [ s for s, t in self._session_to_task.items() if t == task_id ] for session_id in sessions: port = self._session_to_port.get(session_id) if ( port is not None and self._occupied_ports.get(port) == session_id ): del self._occupied_ports[port] released_ports.append(port) self._session_to_port.pop(session_id, None) self._session_to_task.pop(session_id, None) if released_ports: logger.info( f"Released {len(released_ports)} browser(s) for " f"task {task_id}. Occupied: " f"{list(self._occupied_ports.keys())}" ) return released_ports def get_occupied_ports(self) -> list[int]: """Get list of currently occupied ports.""" with self._lock: return list(self._occupied_ports.keys()) # Global CDP browser pool manager instance _cdp_pool_manager = CdpBrowserPoolManager() class ExtensionTabPoolManager: """Manages tab allocations within a shared extension proxy connection for parallel agent tasks.""" def __init__(self): self._occupied_tabs: dict[int, str] = {} # tabId -> session_id self._session_to_tab: dict[str, int] = {} self._session_to_task: dict[str, str | None] = {} self._extension_proxy = None self._lock = threading.Lock() def set_extension_proxy(self, proxy): """Set the shared ExtensionProxyWrapper instance.""" self._extension_proxy = proxy async def acquire_tab( self, session_id: str, task_id: str | None = None, url: str = "about:blank", ) -> int | None: """Create a new tab and assign it to a session.""" if not self._extension_proxy: logger.error("ExtensionTabPoolManager: no extension proxy set") return None try: result = await self._extension_proxy.open_tab(url) tab_id = result.get("tabId") if tab_id is not None: with self._lock: self._occupied_tabs[tab_id] = session_id self._session_to_tab[session_id] = tab_id self._session_to_task[session_id] = task_id logger.info( f"Acquired tab {tab_id} for session {session_id}. " f"Total tabs: {len(self._occupied_tabs)}" ) return tab_id except Exception as e: logger.error(f"Failed to acquire tab: {e}") return None async def release_tab(self, tab_id: int, session_id: str): """Close a tab and release it from the session.""" with self._lock: if ( tab_id in self._occupied_tabs and self._occupied_tabs[tab_id] == session_id ): del self._occupied_tabs[tab_id] self._session_to_tab.pop(session_id, None) self._session_to_task.pop(session_id, None) else: logger.warning( f"Tab {tab_id} not occupied by session {session_id}" ) return try: if self._extension_proxy: await self._extension_proxy.close_tab(tab_id) logger.info( f"Released tab {tab_id} from session {session_id}. " f"Remaining tabs: {len(self._occupied_tabs)}" ) except Exception as e: logger.error(f"Failed to close tab {tab_id}: {e}") async def release_by_task(self, task_id: str) -> list[int]: """Release all tabs associated with a task_id.""" released = [] with self._lock: sessions = [ s for s, t in self._session_to_task.items() if t == task_id ] for session_id in sessions: tab_id = self._session_to_tab.get(session_id) if tab_id is not None: released.append((tab_id, session_id)) self._occupied_tabs.pop(tab_id, None) self._session_to_tab.pop(session_id, None) self._session_to_task.pop(session_id, None) for tab_id, _ in released: try: if self._extension_proxy: await self._extension_proxy.close_tab(tab_id) except Exception as e: logger.error(f"Failed to close tab {tab_id}: {e}") if released: logger.info(f"Released {len(released)} tab(s) for task {task_id}") return [t for t, _ in released] # Global extension tab pool manager instance _tab_pool_manager = ExtensionTabPoolManager() def _find_extension_proxy_browser( cdp_browsers: list[dict], ) -> dict | None: """Find the first extension proxy browser in the list.""" for browser in cdp_browsers: if browser.get("isExtensionProxy", False): return browser return None def build_browser_agent_tooling( *, api_task_id: str, working_directory: str, user_id: str | None, web_toolkit_custom: HybridBrowserToolkit, agent_name: str = Agents.browser_agent, include_human_toolkit: bool = True, include_note_toolkit: bool = True, include_search_toolkit: bool = True, include_terminal_toolkit: bool = True, message_handler: Callable | None = None, ) -> BrowserAgentTooling: """Build the shared browser-agent toolkit stack.""" message_integration = ToolkitMessageIntegration( message_handler=message_handler ) # Save references before toolkit methods are wrapped. web_toolkit_for_agent_registration = web_toolkit_custom web_toolkit_custom = message_integration.register_toolkits( web_toolkit_custom ) terminal_toolkit = None if include_terminal_toolkit: _terminal_tk = TerminalToolkit( api_task_id, agent_name, working_directory=working_directory, safe_mode=True, clone_current_env=True, ) terminal_toolkit = message_integration.register_functions( [_terminal_tk.shell_exec] ) note_toolkit = None if include_note_toolkit: note_toolkit = NoteTakingToolkit( api_task_id, agent_name, working_directory=working_directory, ) note_toolkit = message_integration.register_toolkits(note_toolkit) screenshot_toolkit = ScreenshotToolkit( api_task_id, working_directory=working_directory, agent_name=agent_name, ) screenshot_toolkit_for_agent_registration = screenshot_toolkit screenshot_toolkit = message_integration.register_toolkits( screenshot_toolkit ) skill_toolkit = SkillToolkit( api_task_id, agent_name, working_directory=working_directory, user_id=user_id, ) skill_toolkit = message_integration.register_toolkits(skill_toolkit) search_tools = [] if include_search_toolkit: search_tools = SearchToolkit.get_can_use_tools( api_task_id, agent_name=agent_name ) if search_tools: search_tools = message_integration.register_functions( search_tools ) else: search_tools = [] tools: list[FunctionTool | Callable[..., Any]] = [] if include_human_toolkit: tools.extend(HumanToolkit.get_can_use_tools(api_task_id, agent_name)) tools.extend(web_toolkit_custom.get_tools()) if terminal_toolkit is not None: tools.extend(terminal_toolkit) if note_toolkit is not None: tools.extend(note_toolkit.get_tools()) tools.extend(screenshot_toolkit.get_tools()) tools.extend(search_tools) tools.extend(skill_toolkit.get_tools()) tool_names = [ SearchToolkit.toolkit_name(), HybridBrowserToolkit.toolkit_name(), TerminalToolkit.toolkit_name(), ScreenshotToolkit.toolkit_name(), SkillToolkit.toolkit_name(), ] if include_human_toolkit: tool_names.insert(2, HumanToolkit.toolkit_name()) if include_note_toolkit: insert_at = 3 if include_human_toolkit else 2 tool_names.insert(insert_at, NoteTakingToolkit.toolkit_name()) return BrowserAgentTooling( tools=tools, tool_names=tool_names, toolkits_to_register_agent=[ web_toolkit_for_agent_registration, screenshot_toolkit_for_agent_registration, ], ) def browser_agent(options: Chat): working_directory = get_working_directory(options) logger.info( f"Creating browser agent for project: {options.project_id} " f"in directory: {working_directory}" ) human_message_handler = HumanToolkit( options.project_id, Agents.browser_agent ).send_message_to_user # Acquire CDP browser from pool or use default port toolkit_session_id = str(uuid.uuid4())[:8] selected_port = None selected_is_external = False use_extension_proxy = False extension_proxy_port = 8765 # Check for extension proxy browser first extension_proxy_browser = None if options.cdp_browsers: extension_proxy_browser = _find_extension_proxy_browser( options.cdp_browsers ) if extension_proxy_browser: # Extension proxy mode: use plugin exclusively, no CDP fallback use_extension_proxy = True extension_proxy_port = int(extension_proxy_browser.get("port", 8765)) selected_is_external = True logger.info( f"Using extension proxy mode (exclusive): " f"port={extension_proxy_port}, session_id={toolkit_session_id}" ) else: # No extension proxy — use CDP browsers cdp_only_browsers = [ b for b in (options.cdp_browsers or []) if not b.get("isExtensionProxy", False) ] if cdp_only_browsers: selected_browser = _cdp_pool_manager.acquire_browser( cdp_only_browsers, toolkit_session_id, options.task_id ) if selected_browser: selected_port = _get_browser_port(selected_browser) selected_is_external = selected_browser.get( "isExternal", False ) logger.info( f"Acquired CDP browser from pool (initial): " f"port={selected_port}, " f"isExternal={selected_is_external}, " f"session_id={toolkit_session_id}" ) else: selected_port = _get_browser_port(cdp_only_browsers[0]) selected_is_external = cdp_only_browsers[0].get( "isExternal", False ) logger.warning( f"No available browsers in pool (initial), " f"using first: port={selected_port}, " f"session_id={toolkit_session_id}" ) else: selected_port = env("browser_port", "9222") enabled_tools = [ "browser_click", "browser_type", "browser_back", "browser_forward", "browser_select", "browser_console_exec", "browser_console_view", "browser_switch_tab", "browser_enter", "browser_visit_page", "browser_scroll", "browser_sheet_read", "browser_sheet_input", "browser_get_page_snapshot", "browser_open", ] if use_extension_proxy: web_toolkit_custom = HybridBrowserToolkit( options.project_id, cdp_keep_current_page=True, headless=False, browser_log_to_file=True, stealth=True, session_id=toolkit_session_id, extension_proxy_mode=True, extension_proxy_port=extension_proxy_port, enabled_tools=enabled_tools, ) # Inject pre-started global wrapper if available from app.service.extension_proxy_service import ( get_extension_proxy_wrapper, ) global_wrapper = get_extension_proxy_wrapper() if global_wrapper is not None: web_toolkit_custom._extension_proxy_wrapper = global_wrapper _tab_pool_manager.set_extension_proxy(global_wrapper) logger.info( "Injected pre-started global ExtensionProxyWrapper " "into toolkit" ) else: web_toolkit_custom = HybridBrowserToolkit( options.project_id, cdp_keep_current_page=True, headless=False, browser_log_to_file=True, stealth=True, session_id=toolkit_session_id, cdp_url=f"http://localhost:{selected_port}", enabled_tools=enabled_tools, ) tooling = build_browser_agent_tooling( api_task_id=options.project_id, working_directory=working_directory, user_id=options.skill_config_user_id(), web_toolkit_custom=web_toolkit_custom, agent_name=Agents.browser_agent, include_human_toolkit=True, message_handler=human_message_handler, ) # Build external browser notice external_browser_notice = "" if selected_is_external: external_browser_notice = ( "\n\n" "**IMPORTANT**: You are connected to an external browser instance. " "The browser may already be open with active sessions and logged-in " "websites. When you use browser_open, you will connect to this " "existing browser and can immediately access its current state and " "pages.\n" "\n" ) system_message = _build_browser_system_prompt_compat( platform_system=platform.system(), platform_machine=platform.machine(), working_directory=working_directory, now_str=NOW_STR, external_browser_notice=external_browser_notice, include_human_toolkit=True, ) agent = agent_model( Agents.browser_agent, BaseMessage.make_assistant_message( role_name="Browser Agent", content=system_message, ), options, tooling.tools, prune_tool_calls_from_memory=True, tool_names=tooling.tool_names, toolkits_to_register_agent=tooling.toolkits_to_register_agent, enable_snapshot_clean=True, ) if use_extension_proxy: # Extension proxy mode: tab-based parallelism def acquire_tab_for_agent(agent_instance): """Acquire a new tab from extension for a cloned agent.""" session_id = str(uuid.uuid4())[:8] try: loop = asyncio.get_event_loop() tab_id = loop.run_until_complete( _tab_pool_manager.acquire_tab(session_id, options.task_id) ) except RuntimeError: # If no event loop, create one tab_id = asyncio.run( _tab_pool_manager.acquire_tab(session_id, options.task_id) ) agent_instance._extension_tab_id = tab_id agent_instance._cdp_session_id = session_id agent_instance._cdp_port = extension_proxy_port logger.info( f"Acquired tab {tab_id} for cloned agent " f"{agent_instance.agent_id}, session={session_id}" ) def release_tab_from_agent(agent_instance): """Release tab back to pool.""" tab_id = getattr(agent_instance, "_extension_tab_id", None) session_id = getattr(agent_instance, "_cdp_session_id", None) if tab_id is not None and session_id is not None: try: loop = asyncio.get_event_loop() loop.run_until_complete( _tab_pool_manager.release_tab(tab_id, session_id) ) except RuntimeError: asyncio.run( _tab_pool_manager.release_tab(tab_id, session_id) ) logger.info( f"Released tab {tab_id} for agent " f"{agent_instance.agent_id}" ) agent._cdp_acquire_callback = acquire_tab_for_agent agent._cdp_release_callback = release_tab_from_agent agent._cdp_port = extension_proxy_port agent._cdp_session_id = toolkit_session_id agent._cdp_task_id = options.task_id agent._cdp_options = options agent._browser_toolkit = web_toolkit_for_agent_registration agent._use_extension_proxy = True else: # Standard CDP mode: port-based parallelism def acquire_cdp_for_agent(agent_instance): """Acquire a CDP browser from pool for a cloned agent.""" cdp_only = [ b for b in (options.cdp_browsers or []) if not b.get("isExtensionProxy", False) ] if not cdp_only: return session_id = str(uuid.uuid4())[:8] selected = _cdp_pool_manager.acquire_browser( cdp_only, session_id, options.task_id ) if selected: agent_instance._cdp_port = _get_browser_port(selected) else: agent_instance._cdp_port = _get_browser_port(cdp_only[0]) agent_instance._cdp_session_id = session_id logger.info( f"Acquired CDP for cloned agent {agent_instance.agent_id}: " f"port={agent_instance._cdp_port}, session={session_id}" ) def release_cdp_from_agent(agent_instance): """Release CDP browser back to pool.""" port = getattr(agent_instance, "_cdp_port", None) session_id = getattr(agent_instance, "_cdp_session_id", None) if port is not None and session_id is not None: _cdp_pool_manager.release_browser(port, session_id) logger.info( f"Released CDP for agent {agent_instance.agent_id}: " f"port={port}, session={session_id}" ) agent._cdp_acquire_callback = acquire_cdp_for_agent agent._cdp_release_callback = release_cdp_from_agent agent._cdp_port = selected_port agent._cdp_session_id = toolkit_session_id agent._cdp_task_id = options.task_id agent._cdp_options = options agent._browser_toolkit = web_toolkit_for_agent_registration return agent