mirror of
https://github.com/eigent-ai/eigent.git
synced 2026-05-22 19:47:28 +00:00
Co-authored-by: Douglas <douglas.ym.lai@gmail.com> Co-authored-by: Douglas Lai <115660088+Douglasymlai@users.noreply.github.com>
467 lines
17 KiB
Python
467 lines
17 KiB
Python
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
|
|
|
|
import platform
|
|
import threading
|
|
import uuid
|
|
from urllib.parse import urlparse
|
|
|
|
from camel.messages import BaseMessage
|
|
from camel.toolkits import ToolkitMessageIntegration
|
|
|
|
from app.agent.agent_model import agent_model
|
|
from app.agent.listen_chat_agent import logger
|
|
from app.agent.prompt import BROWSER_SYS_PROMPT
|
|
from app.agent.toolkit.human_toolkit import HumanToolkit
|
|
from app.agent.toolkit.hybrid_browser_toolkit import HybridBrowserToolkit
|
|
|
|
# TODO: Remove NoteTakingToolkit and use TerminalToolkit instead
|
|
from app.agent.toolkit.note_taking_toolkit import NoteTakingToolkit
|
|
from app.agent.toolkit.screenshot_toolkit import ScreenshotToolkit
|
|
from app.agent.toolkit.search_toolkit import SearchToolkit
|
|
from app.agent.toolkit.skill_toolkit import SkillToolkit
|
|
from app.agent.toolkit.terminal_toolkit import TerminalToolkit
|
|
from app.agent.utils import NOW_STR
|
|
from app.component.environment import env
|
|
from app.hands.interface import IHands
|
|
from app.model.chat import Chat
|
|
from app.service.task import Agents
|
|
from app.utils.browser_launcher import normalize_cdp_url
|
|
from app.utils.file_utils import get_working_directory
|
|
|
|
|
|
def _get_browser_port(browser: dict) -> int:
|
|
"""Extract port from a browser config dict, with fallback to env default."""
|
|
raw_port = browser.get("port")
|
|
if raw_port is not None:
|
|
return int(raw_port)
|
|
|
|
raw_endpoint = browser.get("endpoint") or browser.get("cdp_url")
|
|
if raw_endpoint:
|
|
_, _, port = normalize_cdp_url(str(raw_endpoint))
|
|
return port
|
|
|
|
return int(env("browser_port", "9222"))
|
|
|
|
|
|
def _get_browser_endpoint(browser: dict) -> str:
|
|
"""Extract a normalized CDP endpoint from a browser config dict."""
|
|
raw_endpoint = browser.get("endpoint") or browser.get("cdp_url")
|
|
if raw_endpoint:
|
|
endpoint, _, _ = normalize_cdp_url(str(raw_endpoint))
|
|
return endpoint
|
|
|
|
return f"http://localhost:{_get_browser_port(browser)}"
|
|
|
|
|
|
class CdpBrowserPoolManager:
|
|
"""Manages CDP browser pool occupation to ensure
|
|
parallel tasks use different browsers."""
|
|
|
|
def __init__(self):
|
|
self._occupied_ports: dict[int, str] = {}
|
|
self._session_to_port: dict[str, int] = {}
|
|
self._session_to_task: dict[str, str | None] = {}
|
|
self._lock = threading.Lock()
|
|
|
|
def acquire_browser(
|
|
self,
|
|
cdp_browsers: list[dict],
|
|
session_id: str,
|
|
task_id: str | None = None,
|
|
) -> dict | None:
|
|
"""Acquire an available browser from the pool.
|
|
|
|
Args:
|
|
cdp_browsers: List of browser configurations.
|
|
session_id: Unique session identifier.
|
|
task_id: Optional task identifier for ownership tracking.
|
|
|
|
Returns:
|
|
Browser configuration dict or None if all occupied.
|
|
"""
|
|
with self._lock:
|
|
for browser in cdp_browsers:
|
|
port = browser.get("port")
|
|
if port and port not in self._occupied_ports:
|
|
self._occupied_ports[port] = session_id
|
|
self._session_to_port[session_id] = port
|
|
self._session_to_task[session_id] = task_id
|
|
logger.info(
|
|
f"Acquired browser on port {port} for session "
|
|
f"{session_id}. Occupied: "
|
|
f"{list(self._occupied_ports.keys())}"
|
|
)
|
|
return browser
|
|
logger.warning(
|
|
f"No available browsers for session {session_id}. "
|
|
f"All occupied: {list(self._occupied_ports.keys())}"
|
|
)
|
|
return None
|
|
|
|
def release_browser(self, port: int, session_id: str):
|
|
"""Release a browser back to the pool."""
|
|
with self._lock:
|
|
if (
|
|
port in self._occupied_ports
|
|
and self._occupied_ports[port] == session_id
|
|
):
|
|
del self._occupied_ports[port]
|
|
self._session_to_port.pop(session_id, None)
|
|
self._session_to_task.pop(session_id, None)
|
|
logger.info(
|
|
f"Released browser on port {port} from session "
|
|
f"{session_id}. Occupied: "
|
|
f"{list(self._occupied_ports.keys())}"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Attempted to release browser on port {port} "
|
|
f"but it was not occupied by {session_id}"
|
|
)
|
|
|
|
def release_by_task(self, task_id: str) -> list[int]:
|
|
"""Release all browsers associated with a task_id.
|
|
|
|
Returns:
|
|
List of released ports.
|
|
"""
|
|
released_ports = []
|
|
with self._lock:
|
|
sessions = [
|
|
s for s, t in self._session_to_task.items() if t == task_id
|
|
]
|
|
for session_id in sessions:
|
|
port = self._session_to_port.get(session_id)
|
|
if (
|
|
port is not None
|
|
and self._occupied_ports.get(port) == session_id
|
|
):
|
|
del self._occupied_ports[port]
|
|
released_ports.append(port)
|
|
self._session_to_port.pop(session_id, None)
|
|
self._session_to_task.pop(session_id, None)
|
|
if released_ports:
|
|
logger.info(
|
|
f"Released {len(released_ports)} browser(s) for "
|
|
f"task {task_id}. Occupied: "
|
|
f"{list(self._occupied_ports.keys())}"
|
|
)
|
|
return released_ports
|
|
|
|
def get_occupied_ports(self) -> list[int]:
|
|
"""Get list of currently occupied ports."""
|
|
with self._lock:
|
|
return list(self._occupied_ports.keys())
|
|
|
|
|
|
# Global CDP browser pool manager instance
|
|
_cdp_pool_manager = CdpBrowserPoolManager()
|
|
|
|
|
|
def browser_agent(
|
|
options: Chat,
|
|
hands: IHands | None = None,
|
|
):
|
|
working_directory = get_working_directory(options)
|
|
logger.info(
|
|
f"Creating browser agent for project: {options.project_id} "
|
|
f"in directory: {working_directory}"
|
|
)
|
|
message_integration = ToolkitMessageIntegration(
|
|
message_handler=HumanToolkit(
|
|
options.project_id, Agents.browser_agent
|
|
).send_message_to_user
|
|
)
|
|
|
|
use_browser = hands is None or hands.can_use_browser()
|
|
use_terminal = hands is None or hands.can_execute_terminal()
|
|
|
|
# Acquire CDP browser from pool or use default port (only when browser enabled)
|
|
toolkit_session_id = str(uuid.uuid4())[:8]
|
|
selected_port = None
|
|
selected_is_external = False
|
|
cdp_url = None
|
|
cdp_owned_by_hands = False
|
|
|
|
if use_browser and options.cdp_browsers:
|
|
selected_browser = _cdp_pool_manager.acquire_browser(
|
|
options.cdp_browsers, toolkit_session_id, options.task_id
|
|
)
|
|
if selected_browser:
|
|
selected_port = _get_browser_port(selected_browser)
|
|
cdp_url = _get_browser_endpoint(selected_browser)
|
|
selected_is_external = selected_browser.get("isExternal", False)
|
|
logger.info(
|
|
f"Acquired CDP browser from pool (initial): "
|
|
f"port={selected_port}, isExternal={selected_is_external}, "
|
|
f"session_id={toolkit_session_id}"
|
|
)
|
|
else:
|
|
fallback_browser = options.cdp_browsers[0]
|
|
selected_port = _get_browser_port(fallback_browser)
|
|
cdp_url = _get_browser_endpoint(fallback_browser)
|
|
selected_is_external = fallback_browser.get("isExternal", False)
|
|
logger.warning(
|
|
f"No available browsers in pool (initial), using first: "
|
|
f"port={selected_port}, session_id={toolkit_session_id}"
|
|
)
|
|
elif use_browser:
|
|
existing_cdp_url = env("EIGENT_CDP_URL", "").strip()
|
|
selected_port = env("browser_port", "9222")
|
|
cdp_url = f"http://localhost:{selected_port}"
|
|
|
|
if existing_cdp_url:
|
|
cdp_url = existing_cdp_url
|
|
try:
|
|
parsed = urlparse(existing_cdp_url)
|
|
if parsed.port is not None:
|
|
selected_port = parsed.port
|
|
except Exception:
|
|
selected_port = env("browser_port", "9222")
|
|
elif hands is not None:
|
|
try:
|
|
cdp_url = hands.acquire_resource(
|
|
"browser", toolkit_session_id, port=selected_port
|
|
)
|
|
cdp_owned_by_hands = True
|
|
except (NotImplementedError, ValueError):
|
|
cdp_url = f"http://localhost:{selected_port}"
|
|
|
|
# Web mode (no Electron): cdp_keep_current_page=False so toolkit can create
|
|
# pages when browser has 0 tabs. Electron mode: True to reuse user's page.
|
|
cdp_keep_current = bool(options.cdp_browsers) if use_browser else False
|
|
default_start_url = None if cdp_keep_current else "about:blank"
|
|
|
|
web_toolkit_custom = None
|
|
web_toolkit_for_agent_registration = None
|
|
if use_browser:
|
|
web_toolkit_custom = HybridBrowserToolkit(
|
|
options.project_id,
|
|
cdp_keep_current_page=cdp_keep_current,
|
|
default_start_url=default_start_url,
|
|
headless=False,
|
|
browser_log_to_file=True,
|
|
stealth=True,
|
|
session_id=toolkit_session_id,
|
|
cdp_url=cdp_url,
|
|
enabled_tools=[
|
|
"browser_click",
|
|
"browser_type",
|
|
"browser_back",
|
|
"browser_forward",
|
|
"browser_select",
|
|
"browser_console_exec",
|
|
"browser_console_view",
|
|
"browser_switch_tab",
|
|
"browser_enter",
|
|
"browser_visit_page",
|
|
"browser_scroll",
|
|
"browser_sheet_read",
|
|
"browser_sheet_input",
|
|
"browser_get_page_snapshot",
|
|
"browser_open",
|
|
"browser_upload_file",
|
|
"browser_download_file",
|
|
],
|
|
)
|
|
web_toolkit_for_agent_registration = web_toolkit_custom
|
|
web_toolkit_custom = message_integration.register_toolkits(
|
|
web_toolkit_custom
|
|
)
|
|
|
|
terminal_toolkit = None
|
|
if use_terminal:
|
|
terminal_toolkit = TerminalToolkit(
|
|
options.project_id,
|
|
Agents.browser_agent,
|
|
working_directory=working_directory,
|
|
safe_mode=True,
|
|
clone_current_env=True,
|
|
)
|
|
terminal_toolkit = message_integration.register_functions(
|
|
[terminal_toolkit.shell_exec]
|
|
)
|
|
|
|
note_toolkit = NoteTakingToolkit(
|
|
options.project_id,
|
|
Agents.browser_agent,
|
|
working_directory=working_directory,
|
|
)
|
|
note_toolkit = message_integration.register_toolkits(note_toolkit)
|
|
screenshot_toolkit = ScreenshotToolkit(
|
|
options.project_id,
|
|
working_directory=working_directory,
|
|
agent_name=Agents.browser_agent,
|
|
)
|
|
# Save reference before registering for toolkits_to_register_agent
|
|
screenshot_toolkit_for_agent_registration = screenshot_toolkit
|
|
screenshot_toolkit = message_integration.register_toolkits(
|
|
screenshot_toolkit
|
|
)
|
|
|
|
skill_toolkit = SkillToolkit(
|
|
options.project_id,
|
|
Agents.browser_agent,
|
|
working_directory=working_directory,
|
|
user_id=options.skill_config_user_id(),
|
|
)
|
|
skill_toolkit = message_integration.register_toolkits(skill_toolkit)
|
|
|
|
search_tools = SearchToolkit.get_can_use_tools(
|
|
options.project_id, agent_name=Agents.browser_agent
|
|
)
|
|
if search_tools:
|
|
search_tools = message_integration.register_functions(search_tools)
|
|
else:
|
|
search_tools = []
|
|
|
|
tools = [
|
|
*HumanToolkit.get_can_use_tools(
|
|
options.project_id, Agents.browser_agent
|
|
),
|
|
*note_toolkit.get_tools(),
|
|
*screenshot_toolkit.get_tools(),
|
|
*search_tools,
|
|
*skill_toolkit.get_tools(),
|
|
]
|
|
tool_names = [
|
|
SearchToolkit.toolkit_name(),
|
|
HumanToolkit.toolkit_name(),
|
|
NoteTakingToolkit.toolkit_name(),
|
|
ScreenshotToolkit.toolkit_name(),
|
|
SkillToolkit.toolkit_name(),
|
|
]
|
|
if use_browser and web_toolkit_custom:
|
|
tools = [
|
|
*HumanToolkit.get_can_use_tools(
|
|
options.project_id, Agents.browser_agent
|
|
),
|
|
*web_toolkit_custom.get_tools(),
|
|
*note_toolkit.get_tools(),
|
|
*screenshot_toolkit.get_tools(),
|
|
*search_tools,
|
|
*skill_toolkit.get_tools(),
|
|
]
|
|
tool_names.insert(1, HybridBrowserToolkit.toolkit_name())
|
|
if use_terminal and terminal_toolkit:
|
|
tools.extend(terminal_toolkit)
|
|
tool_names.append(TerminalToolkit.toolkit_name())
|
|
|
|
# Build external browser notice
|
|
external_browser_notice = ""
|
|
if selected_is_external:
|
|
external_browser_notice = (
|
|
"\n<external_browser_connection>\n"
|
|
"**IMPORTANT**: You are connected to an external browser instance. "
|
|
"The browser may already be open with active sessions and logged-in "
|
|
"websites. When you use browser_open, you will connect to this "
|
|
"existing browser and can immediately access its current state and "
|
|
"pages.\n"
|
|
"</external_browser_connection>\n"
|
|
)
|
|
|
|
system_message = BROWSER_SYS_PROMPT.format(
|
|
platform_system=platform.system(),
|
|
platform_machine=platform.machine(),
|
|
working_directory=working_directory,
|
|
now_str=NOW_STR,
|
|
external_browser_notice=external_browser_notice,
|
|
)
|
|
|
|
agent = agent_model(
|
|
Agents.browser_agent,
|
|
BaseMessage.make_assistant_message(
|
|
role_name="Browser Agent",
|
|
content=system_message,
|
|
),
|
|
options,
|
|
tools,
|
|
prune_tool_calls_from_memory=True,
|
|
tool_names=tool_names,
|
|
toolkits_to_register_agent=[
|
|
t
|
|
for t in (
|
|
web_toolkit_for_agent_registration,
|
|
screenshot_toolkit_for_agent_registration,
|
|
)
|
|
if t is not None
|
|
],
|
|
enable_snapshot_clean=True,
|
|
)
|
|
|
|
# Attach CDP management callbacks and info to the agent
|
|
def acquire_cdp_for_agent(agent_instance):
|
|
"""Acquire a CDP browser from pool for a cloned agent."""
|
|
if not options.cdp_browsers:
|
|
return
|
|
session_id = str(uuid.uuid4())[:8]
|
|
selected = _cdp_pool_manager.acquire_browser(
|
|
options.cdp_browsers, session_id, options.task_id
|
|
)
|
|
if selected:
|
|
agent_instance._cdp_port = _get_browser_port(selected)
|
|
else:
|
|
agent_instance._cdp_port = _get_browser_port(
|
|
options.cdp_browsers[0]
|
|
)
|
|
agent_instance._cdp_session_id = session_id
|
|
logger.info(
|
|
f"Acquired CDP for cloned agent {agent_instance.agent_id}: "
|
|
f"port={agent_instance._cdp_port}, session={session_id}"
|
|
)
|
|
|
|
def release_cdp_from_agent(agent_instance):
|
|
"""Release CDP browser back to pool."""
|
|
port = getattr(agent_instance, "_cdp_port", None)
|
|
session_id = getattr(agent_instance, "_cdp_session_id", None)
|
|
if (
|
|
port is not None
|
|
and session_id is not None
|
|
and options.cdp_browsers
|
|
):
|
|
_cdp_pool_manager.release_browser(port, session_id)
|
|
logger.info(
|
|
f"Released CDP for agent {agent_instance.agent_id}: "
|
|
f"port={port}, session={session_id}"
|
|
)
|
|
elif (
|
|
session_id is not None
|
|
and hands is not None
|
|
and getattr(agent_instance, "_cdp_owned_by_hands", False)
|
|
):
|
|
try:
|
|
hands.release_resource("browser", session_id)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Failed to release browser resource for session %s: %s",
|
|
session_id,
|
|
exc,
|
|
)
|
|
|
|
agent._cdp_acquire_callback = (
|
|
acquire_cdp_for_agent if use_browser else None
|
|
)
|
|
agent._cdp_release_callback = (
|
|
release_cdp_from_agent if use_browser else None
|
|
)
|
|
agent._cdp_port = selected_port
|
|
agent._cdp_url = cdp_url
|
|
agent._cdp_session_id = toolkit_session_id
|
|
agent._cdp_task_id = options.task_id
|
|
agent._cdp_options = options
|
|
agent._browser_toolkit = web_toolkit_for_agent_registration
|
|
agent._cdp_owned_by_hands = cdp_owned_by_hands
|
|
|
|
return agent
|