eigent/backend/camel/toolkits/async_browser_toolkit.py
2026-03-31 17:20:08 +08:00

1399 lines
49 KiB
Python

# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
# Enables postponed evaluation of annotations (for string-based type hints)
from __future__ import annotations
import asyncio
import datetime
import io
import os
import re
import shutil
import urllib.parse
from copy import deepcopy
from typing import (
TYPE_CHECKING,
Any,
Coroutine,
Dict,
List,
Literal,
Optional,
Tuple,
Union,
cast,
)
from PIL import Image
if TYPE_CHECKING:
from camel.agents import ChatAgent
from camel.logger import get_logger
from camel.messages import BaseMessage
from camel.models import BaseModelBackend, ModelFactory
from camel.toolkits.base import BaseToolkit
from camel.toolkits.function_tool import FunctionTool
from camel.toolkits.video_analysis_toolkit import VideoAnalysisToolkit
from camel.types import ModelPlatformType, ModelType
from camel.utils import (
dependencies_required,
retry_on_error,
sanitize_filename,
)
from .browser_toolkit_commons import (
ACTION_WITH_FEEDBACK_LIST,
AVAILABLE_ACTIONS_PROMPT,
GET_FINAL_ANSWER_PROMPT_TEMPLATE,
OBSERVE_PROMPT_TEMPLATE,
PLANNING_AGENT_SYSTEM_PROMPT,
TASK_PLANNING_PROMPT_TEMPLATE,
TASK_REPLANNING_PROMPT_TEMPLATE,
WEB_AGENT_SYSTEM_PROMPT,
InteractiveRegion,
VisualViewport,
_parse_json_output,
_reload_image,
add_set_of_mark,
interactive_region_from_dict,
visual_viewport_from_dict,
)
logger = get_logger(__name__)
ASYNC_ACTIONS = [
"fill_input_id",
"click_id",
"hover_id",
"download_file_id",
"scroll_up",
"scroll_down",
"scroll_to_bottom",
"scroll_to_top",
"back",
"stop",
"find_text_on_page",
"visit_page",
"click_blank_area",
]
def extract_function_name(s: str) -> str:
r"""Extract the pure function name from a string (without parameters or
parentheses)
Args:
s (str): Input string, e.g., `1.`**`click_id(14)`**, `scroll_up()`,
`\'visit_page(url)\'`, etc.
Returns:
str: Pure function name (e.g., `click_id`, `scroll_up`, `visit_page`)
"""
# 1. Strip leading/trailing whitespace and enclosing backticks or quotes
s = s.strip().strip('`"\'')
# Strip any leading numeric prefix like " 12. " or "3. "
s = re.sub(r'^\s*\d+\.\s*', '', s)
# 3. Match a Python-valid identifier followed by an opening parenthesis
match = re.match(r'^([A-Za-z_]\w*)\s*\(', s)
if match:
return match.group(1)
# 4. Fallback: take everything before the first space or '('
return re.split(r'[ (\n]', s, maxsplit=1)[0]
class AsyncBaseBrowser:
def __init__(
self,
headless=True,
cache_dir: Optional[str] = None,
channel: Literal["chrome", "msedge", "chromium"] = "chromium",
cookie_json_path: Optional[str] = None,
user_data_dir: Optional[str] = None,
):
r"""
Initialize the asynchronous browser core.
Args:
headless (bool): Whether to run the browser in headless mode.
cache_dir (Union[str, None]): The directory to store cache files.
channel (Literal["chrome", "msedge", "chromium"]): The browser
channel to use. Must be one of "chrome", "msedge", or
"chromium".
cookie_json_path (Optional[str]): Path to a JSON file containing
authentication cookies and browser storage state. If provided
and the file exists, the browser will load this state to
maintain authenticated sessions. This is primarily used when
`user_data_dir` is not set.
user_data_dir (Optional[str]): The directory to store user data
for persistent context. If None, a fresh browser instance
is used without saving data. (default: :obj:`None`)
Returns:
None
"""
from playwright.async_api import (
async_playwright,
)
self.history: list[Any] = []
self.headless = headless
self.channel = channel
self.playwright = async_playwright()
self.page_history: list[Any] = []
self.cookie_json_path = cookie_json_path
self.user_data_dir = user_data_dir
self.playwright_server: Any = None
self.playwright_started: bool = False
self.browser: Any = None
self.context: Any = None
self.page: Any = None
self.page_url: str = ""
self.web_agent_model: Optional[BaseModelBackend] = None
# Set the cache directory
self.cache_dir = "tmp/" if cache_dir is None else cache_dir
os.makedirs(self.cache_dir, exist_ok=True)
# Create user data directory only if specified
if self.user_data_dir:
os.makedirs(self.user_data_dir, exist_ok=True)
# Load the page script
abs_dir_path = os.path.dirname(os.path.abspath(__file__))
page_script_path = os.path.join(abs_dir_path, "page_script.js")
try:
with open(page_script_path, "r", encoding='utf-8') as f:
self.page_script = f.read()
f.close()
except FileNotFoundError:
raise FileNotFoundError(
f"Page script file not found at path: {page_script_path}"
)
async def async_init(self) -> None:
r"""Asynchronously initialize the browser."""
# Start Playwright asynchronously (only needed in async mode).
if not getattr(self, "playwright_started", False):
await self._ensure_browser_installed()
self.playwright_server = await self.playwright.start()
self.playwright_started = True
browser_launch_args = [
"--disable-blink-features=AutomationControlled", # Basic stealth
]
user_agent_string = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/91.0.4472.124 Safari/537.36"
)
if self.user_data_dir:
self.context = await (
self.playwright_server.chromium.launch_persistent_context(
user_data_dir=self.user_data_dir,
headless=self.headless,
channel=self.channel,
accept_downloads=True,
user_agent=user_agent_string,
java_script_enabled=True,
args=browser_launch_args,
)
)
self.browser = None # Not using a separate browser instance
if len(self.context.pages) > 0: # Persistent context might
# reopen pages
self.page = self.context.pages[0]
else:
self.page = await self.context.new_page()
else:
# Launch a fresh browser instance
self.browser = await self.playwright_server.chromium.launch(
headless=self.headless,
channel=self.channel,
args=browser_launch_args,
)
new_context_kwargs: Dict[str, Any] = {
"accept_downloads": True,
"user_agent": user_agent_string,
"java_script_enabled": True,
}
if self.cookie_json_path and os.path.exists(self.cookie_json_path):
new_context_kwargs["storage_state"] = self.cookie_json_path
self.context = await self.browser.new_context(**new_context_kwargs)
self.page = await self.context.new_page()
assert self.context is not None
assert self.page is not None
def init(self) -> Coroutine[Any, Any, None]:
r"""Initialize the browser asynchronously."""
return self.async_init()
def clean_cache(self) -> None:
r"""Delete the cache directory and its contents."""
if os.path.exists(self.cache_dir):
shutil.rmtree(self.cache_dir)
async def async_wait_for_load(self, timeout: int = 20) -> None:
r"""
Asynchronously Wait for a certain amount of time for the page to load.
Args:
timeout (int): Timeout in seconds.
"""
timeout_ms = timeout * 1000
await self.page.wait_for_load_state("load", timeout=timeout_ms)
# TODO: check if this is needed
await asyncio.sleep(2)
def wait_for_load(self, timeout: int = 20) -> Coroutine[Any, Any, None]:
r"""Wait for a certain amount of time for the page to load.
Args:
timeout (int): Timeout in seconds.
"""
return self.async_wait_for_load(timeout)
async def async_click_blank_area(self) -> None:
r"""Asynchronously click a blank area of the page to unfocus
the current element."""
await self.page.mouse.click(0, 0)
await self.wait_for_load()
def click_blank_area(self) -> Coroutine[Any, Any, None]:
r"""Click a blank area of the page to unfocus the current element."""
return self.async_click_blank_area()
async def async_visit_page(self, url: str) -> None:
r"""Visit a page with the given URL."""
await self.page.goto(url)
await self.wait_for_load()
self.page_url = url
@retry_on_error()
def visit_page(self, url: str) -> Coroutine[Any, Any, None]:
r"""Visit a page with the given URL."""
return self.async_visit_page(url)
def ask_question_about_video(self, question: str) -> str:
r"""Ask a question about the video on the current page,
such as YouTube video.
Args:
question (str): The question to ask.
Returns:
str: The answer to the question.
"""
current_url = self.get_url()
# Confirm with user before proceeding due to potential slow
# processing time
confirmation_message = (
f"Do you want to analyze the video on the current "
f"page({current_url})? This operation may take a long time.(y/n): "
)
user_confirmation = input(confirmation_message)
if user_confirmation.lower() not in ['y', 'yes']:
return "User cancelled the video analysis."
model = None
if (
hasattr(self, 'web_agent_model')
and self.web_agent_model is not None
):
model = self.web_agent_model
video_analyzer = VideoAnalysisToolkit(model=model)
result = video_analyzer.ask_question_about_video(current_url, question)
return result
@retry_on_error()
async def async_get_screenshot(
self, save_image: bool = False
) -> Tuple[Image.Image, Union[str, None]]:
r"""Asynchronously get a screenshot of the current page.
Args:
save_image (bool): Whether to save the image to the cache
directory.
Returns:
Tuple[Image.Image, str]: A tuple containing the screenshot
image and the path to the image file if saved, otherwise
:obj:`None`.
"""
image_data = await self.page.screenshot(timeout=60000)
image = Image.open(io.BytesIO(image_data))
file_path = None
if save_image:
# Get url name to form a file name
# Use urlparser for a safer extraction the url name
parsed_url = urllib.parse.urlparse(self.page_url)
# Max length is set to 241 as there are 10 characters for the
# timestamp and 4 characters for the file extension:
url_name = sanitize_filename(str(parsed_url.path), max_length=241)
timestamp = datetime.datetime.now().strftime("%m%d%H%M%S")
file_path = os.path.join(
self.cache_dir, f"{url_name}_{timestamp}.png"
)
with open(file_path, "wb") as f:
image.save(f, "PNG")
f.close()
return image, file_path
@retry_on_error()
def get_screenshot(
self, save_image: bool = False
) -> Coroutine[Any, Any, Tuple[Image.Image, Union[str, None]]]:
r"""Get a screenshot of the current page.
Args:
save_image (bool): Whether to save the image to the cache
directory.
Returns:
Tuple[Image.Image, str]: A tuple containing the screenshot
image and the path to the image file if saved, otherwise
:obj:`None`.
"""
return self.async_get_screenshot(save_image)
async def async_capture_full_page_screenshots(
self, scroll_ratio: float = 0.8
) -> List[str]:
r"""Asynchronously capture full page screenshots by scrolling the
page with a buffer zone.
Args:
scroll_ratio (float): The ratio of viewport height to scroll each
step (default: 0.8).
Returns:
List[str]: A list of paths to the captured screenshots.
"""
screenshots = []
scroll_height = await self.page.evaluate("document.body.scrollHeight")
assert self.page.viewport_size is not None
viewport_height = self.page.viewport_size["height"]
current_scroll = 0
screenshot_index = 1
max_height = scroll_height - viewport_height
scroll_step = int(viewport_height * scroll_ratio)
last_height = 0
while True:
logger.debug(
f"Current scroll: {current_scroll}, max_height: "
f"{max_height}, step: {scroll_step}"
)
_, file_path = await self.get_screenshot(save_image=True)
if file_path is not None:
screenshots.append(file_path)
await self.page.evaluate(f"window.scrollBy(0, {scroll_step})")
# Allow time for content to load
await asyncio.sleep(0.5)
current_scroll = await self.page.evaluate("window.scrollY")
# Break if there is no significant scroll
if abs(current_scroll - last_height) < viewport_height * 0.1:
break
last_height = current_scroll
screenshot_index += 1
return screenshots
def capture_full_page_screenshots(
self, scroll_ratio: float = 0.8
) -> Coroutine[Any, Any, List[str]]:
r"""Capture full page screenshots by scrolling the page with
a buffer zone.
Args:
scroll_ratio (float): The ratio of viewport height to scroll each
step (default: 0.8).
Returns:
List[str]: A list of paths to the captured screenshots.
"""
return self.async_capture_full_page_screenshots(scroll_ratio)
async def async_get_visual_viewport(self) -> VisualViewport:
r"""Asynchronously get the visual viewport of the current page.
Returns:
VisualViewport: The visual viewport of the current page.
"""
try:
await self.page.evaluate(self.page_script)
except Exception as e:
logger.warning(f"Error evaluating page script: {e}")
return visual_viewport_from_dict(
await self.page.evaluate(
"MultimodalWebSurfer.getVisualViewport();"
)
)
def get_visual_viewport(self) -> Coroutine[Any, Any, VisualViewport]:
r"""Get the visual viewport of the current page."""
return self.async_get_visual_viewport()
async def async_get_interactive_elements(
self,
) -> Dict[str, InteractiveRegion]:
r"""Asynchronously get the interactive elements of the current page.
Returns:
Dict[str, InteractiveRegion]: A dictionary containing the
interactive elements of the current page.
"""
try:
await self.page.evaluate(self.page_script)
except Exception as e:
logger.warning(f"Error evaluating page script: {e}")
result = cast(
Dict[str, Dict[str, Any]],
await self.page.evaluate(
"MultimodalWebSurfer.getInteractiveRects();"
),
)
typed_results: Dict[str, InteractiveRegion] = {}
for k in result:
typed_results[k] = interactive_region_from_dict(result[k])
return typed_results
def get_interactive_elements(
self,
) -> Coroutine[Any, Any, Dict[str, InteractiveRegion]]:
r"""Get the interactive elements of the current page.
Returns:
Dict[str, InteractiveRegion]: A dictionary of interactive elements.
"""
return self.async_get_interactive_elements()
async def async_get_som_screenshot(
self,
save_image: bool = False,
) -> Tuple[Image.Image, Union[str, None]]:
r"""Asynchronously get a screenshot of the current viewport
with interactive elements marked.
Args:
save_image (bool): Whether to save the image to the cache
directory.
Returns:
Tuple[Image.Image, str]: A tuple containing the screenshot
image and the path to the image file.
"""
await self.wait_for_load()
screenshot, _ = await self.async_get_screenshot(save_image=False)
rects = await self.async_get_interactive_elements()
file_path: str | None = None
comp, _, _, _ = add_set_of_mark(
screenshot,
rects,
)
if save_image:
parsed_url = urllib.parse.urlparse(self.page_url)
# Max length is set to 241 as there are 10 characters for the
# timestamp and 4 characters for the file extension:
url_name = sanitize_filename(str(parsed_url.path), max_length=241)
timestamp = datetime.datetime.now().strftime("%m%d%H%M%S")
file_path = os.path.join(
self.cache_dir, f"{url_name}_{timestamp}.png"
)
with open(file_path, "wb") as f:
comp.save(f, "PNG")
f.close()
return comp, file_path
def get_som_screenshot(
self,
save_image: bool = False,
) -> Coroutine[Any, Any, Tuple[Image.Image, Union[str, None]]]:
r"""Get a screenshot of the current viewport with interactive elements
marked.
Args:
save_image (bool): Whether to save the image to the cache
directory.
Returns:
Tuple[Image.Image, str]: A tuple containing the screenshot image
and the path to the image file.
"""
return self.async_get_som_screenshot(save_image)
async def async_scroll_up(self) -> None:
r"""Asynchronously scroll up the page."""
await self.page.keyboard.press("PageUp")
def scroll_up(self) -> Coroutine[Any, Any, None]:
r"""Scroll up the page."""
return self.async_scroll_up()
async def async_scroll_down(self) -> None:
r"""Asynchronously scroll down the page."""
await self.page.keyboard.press("PageDown")
def scroll_down(self) -> Coroutine[Any, Any, None]:
r"""Scroll down the page."""
return self.async_scroll_down()
def get_url(self) -> str:
r"""Get the URL of the current page."""
return self.page.url
async def async_click_id(self, identifier: Union[str, int]) -> None:
r"""Asynchronously click an element with the given ID.
Args:
identifier (Union[str, int]): The ID of the element to click.
"""
if isinstance(identifier, int):
identifier = str(identifier)
target = self.page.locator(f"[__elementId='{identifier}']")
try:
await target.wait_for(timeout=5000)
except (TimeoutError, Exception) as e: # type: ignore[misc]
logger.debug(f"Error during click operation: {e}")
raise ValueError("No such element.") from None
await target.scroll_into_view_if_needed()
new_page = None
try:
async with self.page.expect_event(
"popup", timeout=1000
) as page_info:
box = cast(
Dict[str, Union[int, float]], await target.bounding_box()
)
await self.page.mouse.click(
box["x"] + box["width"] / 2, box["y"] + box["height"] / 2
)
new_page = await page_info.value
# If a new page is opened, switch to it
if new_page:
self.page_history.append(deepcopy(self.page.url))
self.page = new_page
except (TimeoutError, Exception) as e: # type: ignore[misc]
logger.debug(f"Error during click operation: {e}")
pass
await self.wait_for_load()
def click_id(
self, identifier: Union[str, int]
) -> Coroutine[Any, Any, None]:
r"""Click an element with the given identifier."""
return self.async_click_id(identifier)
async def async_extract_url_content(self) -> str:
r"""Asynchronously extract the content of the current page."""
content = await self.page.content()
return content
def extract_url_content(self) -> Coroutine[Any, Any, str]:
r"""Extract the content of the current page."""
return self.async_extract_url_content()
async def async_download_file_id(self, identifier: Union[str, int]) -> str:
r"""Asynchronously download a file with the given selector.
Args:
identifier (Union[str, int]): The identifier of the file
to download.
Returns:
str: The path to the downloaded file.
"""
if isinstance(identifier, int):
identifier = str(identifier)
try:
target = self.page.locator(f"[__elementId='{identifier}']")
except (TimeoutError, Exception) as e: # type: ignore[misc]
logger.debug(f"Error during download operation: {e}")
logger.warning(
f"Element with identifier '{identifier}' not found."
)
return f"Element with identifier '{identifier}' not found."
await target.scroll_into_view_if_needed()
file_path = os.path.join(self.cache_dir)
await self.wait_for_load()
try:
async with self.page.expect_download(
timeout=5000
) as download_info:
await target.click()
download = await download_info.value
file_name = download.suggested_filename
file_path = os.path.join(file_path, file_name)
await download.save_as(file_path)
return f"Downloaded file to path '{file_path}'."
except Exception as e:
logger.debug(f"Error during download operation: {e}")
return f"Failed to download file with identifier '{identifier}'."
def download_file_id(
self, identifier: Union[str, int]
) -> Coroutine[Any, Any, str]:
r"""Download a file with the given identifier."""
return self.async_download_file_id(identifier)
async def async_fill_input_id(
self, identifier: Union[str, int], text: str
) -> str:
r"""Asynchronously fill an input field with the given text, and then
press Enter.
Args:
identifier (Union[str, int]): The identifier of the input field.
text (str): The text to fill.
Returns:
str: The result of the action.
"""
if isinstance(identifier, int):
identifier = str(identifier)
try:
target = self.page.locator(f"[__elementId='{identifier}']")
except (TimeoutError, Exception) as e: # type: ignore[misc]
logger.debug(f"Error during fill operation: {e}")
logger.warning(
f"Element with identifier '{identifier}' not found."
)
return f"Element with identifier '{identifier}' not found."
await target.scroll_into_view_if_needed()
await target.focus()
try:
await target.fill(text)
except Exception as e:
logger.debug(f"Error during fill operation: {e}")
await target.press_sequentially(text)
await target.press("Enter")
await self.wait_for_load()
return (
f"Filled input field '{identifier}' with text '{text}' "
f"and pressed Enter."
)
def fill_input_id(
self, identifier: Union[str, int], text: str
) -> Coroutine[Any, Any, str]:
r"""Fill an input field with the given text, and then press Enter."""
return self.async_fill_input_id(identifier, text)
async def async_scroll_to_bottom(self) -> str:
r"""Asynchronously scroll to the bottom of the page."""
await self.page.evaluate(
"window.scrollTo(0, document.body.scrollHeight);"
)
await self.wait_for_load()
return "Scrolled to the bottom of the page."
def scroll_to_bottom(self) -> Coroutine[Any, Any, str]:
r"""Scroll to the bottom of the page."""
return self.async_scroll_to_bottom()
async def async_scroll_to_top(self) -> str:
r"""Asynchronously scroll to the top of the page."""
await self.page.evaluate("window.scrollTo(0, 0);")
await self.wait_for_load()
return "Scrolled to the top of the page."
def scroll_to_top(self) -> Coroutine[Any, Any, str]:
r"""Scroll to the top of the page."""
return self.async_scroll_to_top()
async def async_hover_id(self, identifier: Union[str, int]) -> str:
r"""Asynchronously hover over an element with the given identifier.
Args:
identifier (Union[str, int]): The identifier of the element
to hover over.
Returns:
str: The result of the action.
"""
if isinstance(identifier, int):
identifier = str(identifier)
try:
target = self.page.locator(f"[__elementId='{identifier}']")
except (TimeoutError, Exception) as e: # type: ignore[misc]
logger.debug(f"Error during hover operation: {e}")
logger.warning(
f"Element with identifier '{identifier}' not found."
)
return f"Element with identifier '{identifier}' not found."
await target.scroll_into_view_if_needed()
await target.hover()
await self.wait_for_load()
return f"Hovered over element with identifier '{identifier}'."
def hover_id(
self, identifier: Union[str, int]
) -> Coroutine[Any, Any, str]:
r"""Hover over an element with the given identifier."""
return self.async_hover_id(identifier)
async def async_find_text_on_page(self, search_text: str) -> str:
r"""Asynchronously find the next given text on the page.It is
equivalent to pressing Ctrl + F and searching for the text.
Args:
search_text (str): The text to search for.
Returns:
str: The result of the action.
"""
script = f"""
(function() {{
let text = "{search_text}";
let found = window.find(text);
if (!found) {{
let elements = document.querySelectorAll(
"*:not(script):not(style)"
);
for (let el of elements) {{
if (el.innerText && el.innerText.includes(text)) {{
el.scrollIntoView({{
behavior: "smooth",
block: "center"
}});
el.style.backgroundColor = "yellow";
el.style.border = '2px solid red';
return true;
}}
}}
return false;
}}
return true;
}})();
"""
found = await self.page.evaluate(script)
await self.wait_for_load()
if found:
return f"Found text '{search_text}' on the page."
else:
return f"Text '{search_text}' not found on the page."
def find_text_on_page(self, search_text: str) -> Coroutine[Any, Any, str]:
r"""Find the next given text on the page, and scroll the page to
the targeted text. It is equivalent to pressing Ctrl + F and
searching for the text.
Args:
search_text (str): The text to search for.
Returns:
str: The result of the action.
"""
return self.async_find_text_on_page(search_text)
async def async_back(self) -> None:
r"""Asynchronously navigate back to the previous page."""
page_url_before = self.page.url
await self.page.go_back()
page_url_after = self.page.url
if page_url_after == "about:blank":
await self.visit_page(page_url_before)
if page_url_before == page_url_after:
# If the page is not changed, try to use the history
if len(self.page_history) > 0:
await self.visit_page(self.page_history.pop())
await asyncio.sleep(1)
await self.wait_for_load()
def back(self) -> Coroutine[Any, Any, None]:
r"""Navigate back to the previous page."""
return self.async_back()
async def async_close(self) -> None:
r"""Asynchronously close the browser."""
if self.context is not None:
await self.context.close()
if self.browser is not None: # Only close browser if it was
# launched separately
await self.browser.close()
if self.playwright_server and self.playwright_started:
await self.playwright_server.stop()
self.playwright_started = False
def close(self) -> Coroutine[Any, Any, None]:
r"""Close the browser."""
return self.async_close()
async def async_show_interactive_elements(self) -> None:
r"""Asynchronously show simple interactive elements on
the current page."""
await self.page.evaluate(self.page_script)
await self.page.evaluate("""
() => {
document.querySelectorAll(
'a, button, input, select, textarea, ' +
'[tabindex]:not([tabindex="-1"]), ' +
'[contenteditable="true"]'
).forEach(el => {
el.style.border = '2px solid red';
});
}
""")
def show_interactive_elements(self) -> Coroutine[Any, Any, None]:
r"""Show simple interactive elements on the current page."""
return self.async_show_interactive_elements()
async def async_get_webpage_content(self) -> str:
r"""Asynchronously extract the content of the current page and convert
it to markdown."""
from html2text import html2text
await self.wait_for_load()
html_content = await self.page.content()
markdown_content = html2text(html_content)
return markdown_content
@retry_on_error()
def get_webpage_content(self) -> Coroutine[Any, Any, str]:
r"""Extract the content of the current page."""
return self.async_get_webpage_content()
async def async_ensure_browser_installed(self) -> None:
r"""Ensure the browser is installed."""
import platform
import sys
try:
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(channel=self.channel)
await browser.close()
except Exception:
logger.info("Installing Chromium browser...")
try:
proc1 = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"playwright",
"install",
self.channel,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc1.communicate()
if proc1.returncode != 0:
raise RuntimeError(
f"Failed to install browser: {stderr.decode()}"
)
if platform.system().lower() == "linux":
proc2 = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"playwright",
"install-deps",
self.channel,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout2, stderr2 = await proc2.communicate()
if proc2.returncode != 0:
error_message = stderr2.decode()
raise RuntimeError(
f"Failed to install dependencies: {error_message}"
)
logger.info("Chromium browser installation completed")
except Exception as e:
raise RuntimeError(f"Installation failed: {e}")
def _ensure_browser_installed(self) -> Coroutine[Any, Any, None]:
r"""Ensure the browser is installed."""
return self.async_ensure_browser_installed()
class AsyncBrowserToolkit(BaseToolkit):
r"""An asynchronous class for browsing the web and interacting
with web pages.
This class provides methods for browsing the web and interacting with web
pages.
"""
def __init__(
self,
headless: bool = False,
cache_dir: Optional[str] = None,
channel: Literal["chrome", "msedge", "chromium"] = "chromium",
history_window: int = 5,
web_agent_model: Optional[BaseModelBackend] = None,
planning_agent_model: Optional[BaseModelBackend] = None,
output_language: str = "en",
cookie_json_path: Optional[str] = None,
user_data_dir: Optional[str] = None,
):
r"""Initialize the BrowserToolkit instance.
Args:
headless (bool): Whether to run the browser in headless mode.
cache_dir (Union[str, None]): The directory to store cache files.
channel (Literal["chrome", "msedge", "chromium"]): The browser
channel to use. Must be one of "chrome", "msedge", or
"chromium".
history_window (int): The window size for storing the history of
actions.
web_agent_model (Optional[BaseModelBackend]): The model backend
for the web agent.
planning_agent_model (Optional[BaseModelBackend]): The model
backend for the planning agent.
output_language (str): The language to use for output.
(default: :obj:`"en`")
cookie_json_path (Optional[str]): Path to a JSON file containing
authentication cookies and browser storage state. If provided
and the file exists, the browser will load this state to
maintain authenticated sessions without requiring manual
login.
(default: :obj:`None`)
user_data_dir (Optional[str]): The directory to store user data
for persistent context. (default: :obj:`"user_data_dir/"`)
"""
super().__init__()
self.browser = AsyncBaseBrowser(
headless=headless,
cache_dir=cache_dir,
channel=channel,
cookie_json_path=cookie_json_path,
user_data_dir=user_data_dir,
)
self.history_window = history_window
self.web_agent_model = web_agent_model
self.planning_agent_model = planning_agent_model
self.output_language = output_language
self.browser.web_agent_model = web_agent_model
self.history: list[Any] = []
self.web_agent, self.planning_agent = self._initialize_agent()
def _reset(self):
self.web_agent.reset()
self.planning_agent.reset()
self.history = []
os.makedirs(self.browser.cache_dir, exist_ok=True)
def _initialize_agent(self) -> Tuple["ChatAgent", "ChatAgent"]:
r"""Initialize the planning and web agents."""
from camel.agents.chat_agent import ChatAgent
if self.web_agent_model is None:
web_agent_model = ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=ModelType.GPT_4_1,
model_config_dict={"temperature": 0, "top_p": 1},
)
else:
web_agent_model = self.web_agent_model
if self.planning_agent_model is None:
planning_model = ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=ModelType.O3_MINI,
)
else:
planning_model = self.planning_agent_model
system_prompt = WEB_AGENT_SYSTEM_PROMPT
web_agent = ChatAgent(
system_message=system_prompt,
model=web_agent_model,
output_language=self.output_language,
)
planning_system_prompt = PLANNING_AGENT_SYSTEM_PROMPT
planning_agent = ChatAgent(
system_message=planning_system_prompt,
model=planning_model,
output_language=self.output_language,
)
return web_agent, planning_agent
async def async_observe(
self, task_prompt: str, detailed_plan: Optional[str] = None
) -> Tuple[str, str, str]:
r"""Let agent observe the current environment, and get the next
action."""
detailed_plan_prompt_str = ""
if detailed_plan is not None:
detailed_plan_prompt_str = f"""
Here is a plan about how to solve the task step-by-step which you must follow:
<detailed_plan>{detailed_plan}</detailed_plan>
"""
observe_prompt = OBSERVE_PROMPT_TEMPLATE.format(
task_prompt=task_prompt,
detailed_plan_prompt=detailed_plan_prompt_str,
AVAILABLE_ACTIONS_PROMPT=AVAILABLE_ACTIONS_PROMPT,
history_window=self.history_window,
history=self.history[-self.history_window :],
)
# get current state
som_screenshot, _ = await self.browser.async_get_som_screenshot(
save_image=True
)
img = _reload_image(som_screenshot)
message = BaseMessage.make_user_message(
role_name='user', content=observe_prompt, image_list=[img]
)
# Reset the history message of web_agent.
self.web_agent.reset()
resp = await self.web_agent.astep(message)
resp_content = resp.msgs[0].content
resp_dict = _parse_json_output(resp_content, logger)
observation_result: str = resp_dict.get("observation", "")
reasoning_result: str = resp_dict.get("reasoning", "")
action_code: str = resp_dict.get("action_code", "")
if action_code and "(" in action_code and ")" not in action_code:
action_match = re.search(
r'"action_code"\s*:\s*[`"]([^`"]*\([^)]*\))[`"]', resp_content
)
if action_match:
action_code = action_match.group(1)
else:
logger.warning(
f"Incomplete action_code detected: {action_code}"
)
if action_code.startswith("fill_input_id("):
parts = action_code.split(",", 1)
if len(parts) > 1:
id_part = (
parts[0].replace("fill_input_id(", "").strip()
)
action_code = (
f"fill_input_id({id_part}, "
f"'Please fill the text here.')"
)
action_code = action_code.replace("`", "").strip()
return observation_result, reasoning_result, action_code
async def async_act(self, action_code: str) -> Tuple[bool, str]:
r"""Let agent act based on the given action code.
Args:
action_code (str): The action code to act.
Returns:
Tuple[bool, str]: A tuple containing a boolean indicating whether
the action was successful, and the information to be returned.
"""
def _check_if_with_feedback(action_code: str) -> bool:
r"""Check if the action code needs feedback."""
for action_with_feedback in ACTION_WITH_FEEDBACK_LIST:
if action_with_feedback in action_code:
return True
return False
def _fix_action_code(action_code: str) -> str:
r"""Fix potential missing quotes in action code"""
match = re.match(r'(\w+)\((.*)\)', action_code)
if not match:
return action_code
func_name, args_str = match.groups()
args = []
current_arg = ""
in_quotes = False
quote_char = None
for char in args_str:
if char in ['"', "'"]:
if not in_quotes:
in_quotes = True
quote_char = char
current_arg += char
elif char == quote_char:
in_quotes = False
quote_char = None
current_arg += char
else:
current_arg += char
elif char == ',' and not in_quotes:
args.append(current_arg.strip())
current_arg = ""
else:
current_arg += char
if current_arg:
args.append(current_arg.strip())
fixed_args = []
for arg in args:
if (
(arg.startswith('"') and arg.endswith('"'))
or (arg.startswith("'") and arg.endswith("'"))
or re.match(r'^-?\d+(\.\d+)?$', arg)
or re.match(r'^-?\d+\.?\d*[eE][-+]?\d+$', arg)
or re.match(r'^0[xX][0-9a-fA-F]+$', arg)
):
fixed_args.append(arg)
else:
fixed_args.append(f"'{arg}'")
return f"{func_name}({', '.join(fixed_args)})"
action_code = _fix_action_code(action_code)
prefix = "self.browser."
code = f"{prefix}{action_code}"
async_flag = extract_function_name(action_code) in ASYNC_ACTIONS
feedback_flag = _check_if_with_feedback(action_code)
try:
result = "Action was successful."
if async_flag:
temp_coroutine = eval(code)
if feedback_flag:
result = await temp_coroutine
else:
await temp_coroutine
await asyncio.sleep(1)
return True, result
else:
if feedback_flag:
result = eval(code)
else:
exec(code)
await asyncio.sleep(1)
return True, result
except Exception as e:
await asyncio.sleep(1)
return (
False,
f"Error while executing the action {action_code}: {e}. "
f"If timeout, please recheck whether you have provided the "
f"correct identifier.",
)
async def _async_get_final_answer(self, task_prompt: str) -> str:
r"""Generate the final answer based on the task prompt."""
final_answer_prompt = GET_FINAL_ANSWER_PROMPT_TEMPLATE.format(
task_prompt=task_prompt, history=self.history
)
response = await self.planning_agent.astep(final_answer_prompt)
if response.msgs is None or len(response.msgs) == 0:
raise RuntimeError("Got empty final answer from planning agent.")
return response.msgs[0].content
async def _async_task_planning(
self, task_prompt: str, start_url: str
) -> str:
r"""Generate a detailed plan for the given task."""
planning_prompt = TASK_PLANNING_PROMPT_TEMPLATE.format(
task_prompt=task_prompt, start_url=start_url
)
response = await self.planning_agent.astep(planning_prompt)
if response.msgs is None or len(response.msgs) == 0:
raise RuntimeError("Got empty plan from planning agent.")
return response.msgs[0].content
async def _async_task_replanning(
self, task_prompt: str, detailed_plan: str
) -> Tuple[bool, str]:
r"""Replan the task based on the given task prompt.
Args:
task_prompt (str): The original task prompt.
detailed_plan (str): The detailed plan to replan.
Returns:
Tuple[bool, str]: A tuple containing a boolean indicating
whether the task needs to be replanned, and the replanned
schema.
"""
# Here are the available browser functions we can
# use: {AVAILABLE_ACTIONS_PROMPT}
replanning_prompt = TASK_REPLANNING_PROMPT_TEMPLATE.format(
task_prompt=task_prompt,
detailed_plan=detailed_plan,
history=self.history[-self.history_window :],
)
# Reset the history message of planning_agent.
self.planning_agent.reset()
resp = await self.planning_agent.astep(replanning_prompt)
resp_dict = _parse_json_output(resp.msgs[0].content, logger)
if_need_replan = resp_dict.get("if_need_replan", False)
replanned_schema = resp_dict.get("replanned_schema", "")
if if_need_replan:
return True, replanned_schema
else:
return False, replanned_schema
@dependencies_required("playwright")
async def browse_url(
self, task_prompt: str, start_url: str, round_limit: int = 12
) -> str:
r"""A powerful toolkit which can simulate the browser interaction to
solve the task which needs multi-step actions.
Args:
task_prompt (str): The task prompt to solve.
start_url (str): The start URL to visit.
round_limit (int): The round limit to solve the task.
(default: :obj:`12`).
Returns:
str: The simulation result to the task.
"""
self._reset()
task_completed = False
detailed_plan = await self._async_task_planning(task_prompt, start_url)
logger.debug(f"Detailed plan: {detailed_plan}")
await self.browser.async_init()
await self.browser.visit_page(start_url)
for i in range(round_limit):
observation, reasoning, action_code = await self.async_observe(
task_prompt, detailed_plan
)
logger.debug(f"Observation: {observation}")
logger.debug(f"Reasoning: {reasoning}")
logger.debug(f"Action code: {action_code}")
if "stop" in action_code:
task_completed = True
trajectory_info = {
"round": i,
"observation": observation,
"thought": reasoning,
"action": action_code,
"action_if_success": True,
"info": None,
"current_url": self.browser.get_url(),
}
self.history.append(trajectory_info)
break
else:
success, info = await self.async_act(action_code)
if not success:
logger.warning(f"Error while executing the action: {info}")
trajectory_info = {
"round": i,
"observation": observation,
"thought": reasoning,
"action": action_code,
"action_if_success": success,
"info": info,
"current_url": self.browser.get_url(),
}
self.history.append(trajectory_info)
# replan the task if necessary
(
if_need_replan,
replanned_schema,
# ruff: noqa: E501
) = await self._async_task_replanning(
task_prompt, detailed_plan
)
if if_need_replan:
detailed_plan = replanned_schema
logger.debug(f"Replanned schema: {replanned_schema}")
if not task_completed:
simulation_result = f"""
The task is not completed within the round limit. Please check
the last round {self.history_window} information to see if
there is any useful information:
<history>{self.history[-self.history_window:]}</history>
"""
else:
simulation_result = await self._async_get_final_answer(task_prompt)
await self.browser.close()
return simulation_result
def get_tools(self) -> List[FunctionTool]:
return [FunctionTool(self.browse_url)]