eigent/backend/app/agent/factory/browser.py
2026-03-31 17:20:08 +08:00

659 lines
24 KiB
Python

# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
import asyncio
import inspect
import platform
import threading
import uuid
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
from camel.messages import BaseMessage
from camel.toolkits import (
FunctionTool,
RegisteredAgentToolkit,
ToolkitMessageIntegration,
)
from app.agent.agent_model import agent_model
from app.agent.listen_chat_agent import logger
from app.agent.prompt import build_browser_system_prompt
from app.agent.toolkit.human_toolkit import HumanToolkit
from app.agent.toolkit.hybrid_browser_toolkit import HybridBrowserToolkit
# TODO: Remove NoteTakingToolkit and use TerminalToolkit instead
from app.agent.toolkit.note_taking_toolkit import NoteTakingToolkit
from app.agent.toolkit.screenshot_toolkit import ScreenshotToolkit
from app.agent.toolkit.search_toolkit import SearchToolkit
from app.agent.toolkit.skill_toolkit import SkillToolkit
from app.agent.toolkit.terminal_toolkit import TerminalToolkit
from app.agent.utils import NOW_STR
from app.component.environment import env
from app.model.chat import Chat
from app.service.task import Agents
from app.utils.file_utils import get_working_directory
@dataclass
class BrowserAgentTooling:
tools: list[FunctionTool | Callable[..., Any]]
tool_names: list[str]
toolkits_to_register_agent: list[RegisteredAgentToolkit]
def _build_browser_system_prompt_compat(**kwargs: Any) -> str:
"""Call the shared prompt builder with only supported keyword args."""
supported_kwargs = {
key: value
for key, value in kwargs.items()
if key in inspect.signature(build_browser_system_prompt).parameters
}
return build_browser_system_prompt(**supported_kwargs)
def _get_browser_port(browser: dict) -> int:
"""Extract port from a browser config dict, with fallback to env default."""
return int(browser.get("port", env("browser_port", "9222")))
class CdpBrowserPoolManager:
"""Manages CDP browser pool occupation to ensure
parallel tasks use different browsers."""
def __init__(self):
self._occupied_ports: dict[int, str] = {}
self._session_to_port: dict[str, int] = {}
self._session_to_task: dict[str, str | None] = {}
self._lock = threading.Lock()
def acquire_browser(
self,
cdp_browsers: list[dict],
session_id: str,
task_id: str | None = None,
) -> dict | None:
"""Acquire an available browser from the pool.
Args:
cdp_browsers: List of browser configurations.
session_id: Unique session identifier.
task_id: Optional task identifier for ownership tracking.
Returns:
Browser configuration dict or None if all occupied.
"""
with self._lock:
for browser in cdp_browsers:
port = browser.get("port")
if port and port not in self._occupied_ports:
self._occupied_ports[port] = session_id
self._session_to_port[session_id] = port
self._session_to_task[session_id] = task_id
logger.info(
f"Acquired browser on port {port} for session "
f"{session_id}. Occupied: "
f"{list(self._occupied_ports.keys())}"
)
return browser
logger.warning(
f"No available browsers for session {session_id}. "
f"All occupied: {list(self._occupied_ports.keys())}"
)
return None
def release_browser(self, port: int, session_id: str):
"""Release a browser back to the pool."""
with self._lock:
if (
port in self._occupied_ports
and self._occupied_ports[port] == session_id
):
del self._occupied_ports[port]
self._session_to_port.pop(session_id, None)
self._session_to_task.pop(session_id, None)
logger.info(
f"Released browser on port {port} from session "
f"{session_id}. Occupied: "
f"{list(self._occupied_ports.keys())}"
)
else:
logger.warning(
f"Attempted to release browser on port {port} "
f"but it was not occupied by {session_id}"
)
def release_by_task(self, task_id: str) -> list[int]:
"""Release all browsers associated with a task_id.
Returns:
List of released ports.
"""
released_ports = []
with self._lock:
sessions = [
s for s, t in self._session_to_task.items() if t == task_id
]
for session_id in sessions:
port = self._session_to_port.get(session_id)
if (
port is not None
and self._occupied_ports.get(port) == session_id
):
del self._occupied_ports[port]
released_ports.append(port)
self._session_to_port.pop(session_id, None)
self._session_to_task.pop(session_id, None)
if released_ports:
logger.info(
f"Released {len(released_ports)} browser(s) for "
f"task {task_id}. Occupied: "
f"{list(self._occupied_ports.keys())}"
)
return released_ports
def get_occupied_ports(self) -> list[int]:
"""Get list of currently occupied ports."""
with self._lock:
return list(self._occupied_ports.keys())
# Global CDP browser pool manager instance
_cdp_pool_manager = CdpBrowserPoolManager()
class ExtensionTabPoolManager:
"""Manages tab allocations within a shared extension proxy connection
for parallel agent tasks."""
def __init__(self):
self._occupied_tabs: dict[int, str] = {} # tabId -> session_id
self._session_to_tab: dict[str, int] = {}
self._session_to_task: dict[str, str | None] = {}
self._extension_proxy = None
self._lock = threading.Lock()
def set_extension_proxy(self, proxy):
"""Set the shared ExtensionProxyWrapper instance."""
self._extension_proxy = proxy
async def acquire_tab(
self,
session_id: str,
task_id: str | None = None,
url: str = "about:blank",
) -> int | None:
"""Create a new tab and assign it to a session."""
if not self._extension_proxy:
logger.error("ExtensionTabPoolManager: no extension proxy set")
return None
try:
result = await self._extension_proxy.open_tab(url)
tab_id = result.get("tabId")
if tab_id is not None:
with self._lock:
self._occupied_tabs[tab_id] = session_id
self._session_to_tab[session_id] = tab_id
self._session_to_task[session_id] = task_id
logger.info(
f"Acquired tab {tab_id} for session {session_id}. "
f"Total tabs: {len(self._occupied_tabs)}"
)
return tab_id
except Exception as e:
logger.error(f"Failed to acquire tab: {e}")
return None
async def release_tab(self, tab_id: int, session_id: str):
"""Close a tab and release it from the session."""
with self._lock:
if (
tab_id in self._occupied_tabs
and self._occupied_tabs[tab_id] == session_id
):
del self._occupied_tabs[tab_id]
self._session_to_tab.pop(session_id, None)
self._session_to_task.pop(session_id, None)
else:
logger.warning(
f"Tab {tab_id} not occupied by session {session_id}"
)
return
try:
if self._extension_proxy:
await self._extension_proxy.close_tab(tab_id)
logger.info(
f"Released tab {tab_id} from session {session_id}. "
f"Remaining tabs: {len(self._occupied_tabs)}"
)
except Exception as e:
logger.error(f"Failed to close tab {tab_id}: {e}")
async def release_by_task(self, task_id: str) -> list[int]:
"""Release all tabs associated with a task_id."""
released = []
with self._lock:
sessions = [
s for s, t in self._session_to_task.items() if t == task_id
]
for session_id in sessions:
tab_id = self._session_to_tab.get(session_id)
if tab_id is not None:
released.append((tab_id, session_id))
self._occupied_tabs.pop(tab_id, None)
self._session_to_tab.pop(session_id, None)
self._session_to_task.pop(session_id, None)
for tab_id, _ in released:
try:
if self._extension_proxy:
await self._extension_proxy.close_tab(tab_id)
except Exception as e:
logger.error(f"Failed to close tab {tab_id}: {e}")
if released:
logger.info(f"Released {len(released)} tab(s) for task {task_id}")
return [t for t, _ in released]
# Global extension tab pool manager instance
_tab_pool_manager = ExtensionTabPoolManager()
def _find_extension_proxy_browser(
cdp_browsers: list[dict],
) -> dict | None:
"""Find the first extension proxy browser in the list."""
for browser in cdp_browsers:
if browser.get("isExtensionProxy", False):
return browser
return None
def build_browser_agent_tooling(
*,
api_task_id: str,
working_directory: str,
user_id: str | None,
web_toolkit_custom: HybridBrowserToolkit,
agent_name: str = Agents.browser_agent,
include_human_toolkit: bool = True,
include_note_toolkit: bool = True,
include_search_toolkit: bool = True,
include_terminal_toolkit: bool = True,
message_handler: Callable | None = None,
) -> BrowserAgentTooling:
"""Build the shared browser-agent toolkit stack."""
message_integration = ToolkitMessageIntegration(
message_handler=message_handler
)
# Save references before toolkit methods are wrapped.
web_toolkit_for_agent_registration = web_toolkit_custom
web_toolkit_custom = message_integration.register_toolkits(
web_toolkit_custom
)
terminal_toolkit = None
if include_terminal_toolkit:
_terminal_tk = TerminalToolkit(
api_task_id,
agent_name,
working_directory=working_directory,
safe_mode=True,
clone_current_env=True,
)
terminal_toolkit = message_integration.register_functions(
[_terminal_tk.shell_exec]
)
note_toolkit = None
if include_note_toolkit:
note_toolkit = NoteTakingToolkit(
api_task_id,
agent_name,
working_directory=working_directory,
)
note_toolkit = message_integration.register_toolkits(note_toolkit)
screenshot_toolkit = ScreenshotToolkit(
api_task_id,
working_directory=working_directory,
agent_name=agent_name,
)
screenshot_toolkit_for_agent_registration = screenshot_toolkit
screenshot_toolkit = message_integration.register_toolkits(
screenshot_toolkit
)
skill_toolkit = SkillToolkit(
api_task_id,
agent_name,
working_directory=working_directory,
user_id=user_id,
)
skill_toolkit = message_integration.register_toolkits(skill_toolkit)
search_tools = []
if include_search_toolkit:
search_tools = SearchToolkit.get_can_use_tools(
api_task_id, agent_name=agent_name
)
if search_tools:
search_tools = message_integration.register_functions(
search_tools
)
else:
search_tools = []
tools: list[FunctionTool | Callable[..., Any]] = []
if include_human_toolkit:
tools.extend(HumanToolkit.get_can_use_tools(api_task_id, agent_name))
tools.extend(web_toolkit_custom.get_tools())
if terminal_toolkit is not None:
tools.extend(terminal_toolkit)
if note_toolkit is not None:
tools.extend(note_toolkit.get_tools())
tools.extend(screenshot_toolkit.get_tools())
tools.extend(search_tools)
tools.extend(skill_toolkit.get_tools())
tool_names = [
SearchToolkit.toolkit_name(),
HybridBrowserToolkit.toolkit_name(),
TerminalToolkit.toolkit_name(),
ScreenshotToolkit.toolkit_name(),
SkillToolkit.toolkit_name(),
]
if include_human_toolkit:
tool_names.insert(2, HumanToolkit.toolkit_name())
if include_note_toolkit:
insert_at = 3 if include_human_toolkit else 2
tool_names.insert(insert_at, NoteTakingToolkit.toolkit_name())
return BrowserAgentTooling(
tools=tools,
tool_names=tool_names,
toolkits_to_register_agent=[
web_toolkit_for_agent_registration,
screenshot_toolkit_for_agent_registration,
],
)
def browser_agent(options: Chat):
working_directory = get_working_directory(options)
logger.info(
f"Creating browser agent for project: {options.project_id} "
f"in directory: {working_directory}"
)
human_message_handler = HumanToolkit(
options.project_id, Agents.browser_agent
).send_message_to_user
# Acquire CDP browser from pool or use default port
toolkit_session_id = str(uuid.uuid4())[:8]
selected_port = None
selected_is_external = False
use_extension_proxy = False
extension_proxy_port = 8765
# Check for extension proxy browser first
extension_proxy_browser = None
if options.cdp_browsers:
extension_proxy_browser = _find_extension_proxy_browser(
options.cdp_browsers
)
if extension_proxy_browser:
# Extension proxy mode: use plugin exclusively, no CDP fallback
use_extension_proxy = True
extension_proxy_port = int(extension_proxy_browser.get("port", 8765))
selected_is_external = True
logger.info(
f"Using extension proxy mode (exclusive): "
f"port={extension_proxy_port}, session_id={toolkit_session_id}"
)
else:
# No extension proxy — use CDP browsers
cdp_only_browsers = [
b
for b in (options.cdp_browsers or [])
if not b.get("isExtensionProxy", False)
]
if cdp_only_browsers:
selected_browser = _cdp_pool_manager.acquire_browser(
cdp_only_browsers, toolkit_session_id, options.task_id
)
if selected_browser:
selected_port = _get_browser_port(selected_browser)
selected_is_external = selected_browser.get(
"isExternal", False
)
logger.info(
f"Acquired CDP browser from pool (initial): "
f"port={selected_port}, "
f"isExternal={selected_is_external}, "
f"session_id={toolkit_session_id}"
)
else:
selected_port = _get_browser_port(cdp_only_browsers[0])
selected_is_external = cdp_only_browsers[0].get(
"isExternal", False
)
logger.warning(
f"No available browsers in pool (initial), "
f"using first: port={selected_port}, "
f"session_id={toolkit_session_id}"
)
else:
selected_port = env("browser_port", "9222")
enabled_tools = [
"browser_click",
"browser_type",
"browser_back",
"browser_forward",
"browser_select",
"browser_console_exec",
"browser_console_view",
"browser_switch_tab",
"browser_enter",
"browser_visit_page",
"browser_scroll",
"browser_sheet_read",
"browser_sheet_input",
"browser_get_page_snapshot",
"browser_open",
]
if use_extension_proxy:
web_toolkit_custom = HybridBrowserToolkit(
options.project_id,
cdp_keep_current_page=True,
headless=False,
browser_log_to_file=True,
stealth=True,
session_id=toolkit_session_id,
extension_proxy_mode=True,
extension_proxy_port=extension_proxy_port,
enabled_tools=enabled_tools,
)
# Inject pre-started global wrapper if available
from app.service.extension_proxy_service import (
get_extension_proxy_wrapper,
)
global_wrapper = get_extension_proxy_wrapper()
if global_wrapper is not None:
web_toolkit_custom._extension_proxy_wrapper = global_wrapper
_tab_pool_manager.set_extension_proxy(global_wrapper)
logger.info(
"Injected pre-started global ExtensionProxyWrapper "
"into toolkit"
)
else:
web_toolkit_custom = HybridBrowserToolkit(
options.project_id,
cdp_keep_current_page=True,
headless=False,
browser_log_to_file=True,
stealth=True,
session_id=toolkit_session_id,
cdp_url=f"http://localhost:{selected_port}",
enabled_tools=enabled_tools,
)
tooling = build_browser_agent_tooling(
api_task_id=options.project_id,
working_directory=working_directory,
user_id=options.skill_config_user_id(),
web_toolkit_custom=web_toolkit_custom,
agent_name=Agents.browser_agent,
include_human_toolkit=True,
message_handler=human_message_handler,
)
# Build external browser notice
external_browser_notice = ""
if selected_is_external:
external_browser_notice = (
"\n<external_browser_connection>\n"
"**IMPORTANT**: You are connected to an external browser instance. "
"The browser may already be open with active sessions and logged-in "
"websites. When you use browser_open, you will connect to this "
"existing browser and can immediately access its current state and "
"pages.\n"
"</external_browser_connection>\n"
)
system_message = _build_browser_system_prompt_compat(
platform_system=platform.system(),
platform_machine=platform.machine(),
working_directory=working_directory,
now_str=NOW_STR,
external_browser_notice=external_browser_notice,
include_human_toolkit=True,
)
agent = agent_model(
Agents.browser_agent,
BaseMessage.make_assistant_message(
role_name="Browser Agent",
content=system_message,
),
options,
tooling.tools,
prune_tool_calls_from_memory=True,
tool_names=tooling.tool_names,
toolkits_to_register_agent=tooling.toolkits_to_register_agent,
enable_snapshot_clean=True,
)
if use_extension_proxy:
# Extension proxy mode: tab-based parallelism
def acquire_tab_for_agent(agent_instance):
"""Acquire a new tab from extension for a cloned agent."""
session_id = str(uuid.uuid4())[:8]
try:
loop = asyncio.get_event_loop()
tab_id = loop.run_until_complete(
_tab_pool_manager.acquire_tab(session_id, options.task_id)
)
except RuntimeError:
# If no event loop, create one
tab_id = asyncio.run(
_tab_pool_manager.acquire_tab(session_id, options.task_id)
)
agent_instance._extension_tab_id = tab_id
agent_instance._cdp_session_id = session_id
agent_instance._cdp_port = extension_proxy_port
logger.info(
f"Acquired tab {tab_id} for cloned agent "
f"{agent_instance.agent_id}, session={session_id}"
)
def release_tab_from_agent(agent_instance):
"""Release tab back to pool."""
tab_id = getattr(agent_instance, "_extension_tab_id", None)
session_id = getattr(agent_instance, "_cdp_session_id", None)
if tab_id is not None and session_id is not None:
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(
_tab_pool_manager.release_tab(tab_id, session_id)
)
except RuntimeError:
asyncio.run(
_tab_pool_manager.release_tab(tab_id, session_id)
)
logger.info(
f"Released tab {tab_id} for agent "
f"{agent_instance.agent_id}"
)
agent._cdp_acquire_callback = acquire_tab_for_agent
agent._cdp_release_callback = release_tab_from_agent
agent._cdp_port = extension_proxy_port
agent._cdp_session_id = toolkit_session_id
agent._cdp_task_id = options.task_id
agent._cdp_options = options
agent._browser_toolkit = web_toolkit_for_agent_registration
agent._use_extension_proxy = True
else:
# Standard CDP mode: port-based parallelism
def acquire_cdp_for_agent(agent_instance):
"""Acquire a CDP browser from pool for a cloned agent."""
cdp_only = [
b
for b in (options.cdp_browsers or [])
if not b.get("isExtensionProxy", False)
]
if not cdp_only:
return
session_id = str(uuid.uuid4())[:8]
selected = _cdp_pool_manager.acquire_browser(
cdp_only, session_id, options.task_id
)
if selected:
agent_instance._cdp_port = _get_browser_port(selected)
else:
agent_instance._cdp_port = _get_browser_port(cdp_only[0])
agent_instance._cdp_session_id = session_id
logger.info(
f"Acquired CDP for cloned agent {agent_instance.agent_id}: "
f"port={agent_instance._cdp_port}, session={session_id}"
)
def release_cdp_from_agent(agent_instance):
"""Release CDP browser back to pool."""
port = getattr(agent_instance, "_cdp_port", None)
session_id = getattr(agent_instance, "_cdp_session_id", None)
if port is not None and session_id is not None:
_cdp_pool_manager.release_browser(port, session_id)
logger.info(
f"Released CDP for agent {agent_instance.agent_id}: "
f"port={port}, session={session_id}"
)
agent._cdp_acquire_callback = acquire_cdp_for_agent
agent._cdp_release_callback = release_cdp_from_agent
agent._cdp_port = selected_port
agent._cdp_session_id = toolkit_session_id
agent._cdp_task_id = options.task_id
agent._cdp_options = options
agent._browser_toolkit = web_toolkit_for_agent_registration
return agent