eigent/backend/camel/toolkits/browser_toolkit.py
2026-03-31 17:20:08 +08:00

1257 lines
44 KiB
Python

# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2026 @ CAMEL-AI.org. All Rights Reserved. =========
# Enables postponed evaluation of annotations (for string-based type hints)
from __future__ import annotations
import datetime
import io
import os
import re
import shutil
import time
import urllib.parse
from copy import deepcopy
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Literal,
Optional,
Tuple,
Union,
cast,
)
from PIL import Image
from camel.logger import get_logger
from camel.messages import BaseMessage
from camel.models import BaseModelBackend, ModelFactory
from camel.toolkits.base import BaseToolkit
from camel.toolkits.function_tool import FunctionTool
from camel.toolkits.video_analysis_toolkit import VideoAnalysisToolkit
from camel.types import ModelPlatformType, ModelType
from camel.utils import (
dependencies_required,
retry_on_error,
sanitize_filename,
)
# Import shared components from browser_toolkit_commons
from .browser_toolkit_commons import (
ACTION_WITH_FEEDBACK_LIST,
AVAILABLE_ACTIONS_PROMPT,
GET_FINAL_ANSWER_PROMPT_TEMPLATE,
OBSERVE_PROMPT_TEMPLATE,
PLANNING_AGENT_SYSTEM_PROMPT,
TASK_PLANNING_PROMPT_TEMPLATE,
TASK_REPLANNING_PROMPT_TEMPLATE,
WEB_AGENT_SYSTEM_PROMPT,
InteractiveRegion,
VisualViewport,
_add_set_of_mark,
_parse_json_output,
_reload_image,
interactive_region_from_dict,
visual_viewport_from_dict,
)
if TYPE_CHECKING:
from playwright.sync_api import (
Browser,
BrowserContext,
FloatRect,
Page,
Playwright,
)
from camel.agents import ChatAgent
logger = get_logger(__name__)
TOP_NO_LABEL_ZONE = 20
def _get_str(d: Any, k: str) -> str:
r"""Safely retrieve a string value from a dictionary."""
if k not in d:
raise KeyError(f"Missing required key: '{k}'")
val = d[k]
if isinstance(val, str):
return val
raise TypeError(
f"Expected a string for key '{k}', but got {type(val).__name__}"
)
def _get_number(d: Any, k: str) -> Union[int, float]:
r"""Safely retrieve a number (int or float) from a dictionary"""
val = d[k]
if isinstance(val, (int, float)):
return val
raise TypeError(
f"Expected a number (int/float) for key "
f"'{k}', but got {type(val).__name__}"
)
def _get_bool(d: Any, k: str) -> bool:
r"""Safely retrieve a boolean value from a dictionary."""
val = d[k]
if isinstance(val, bool):
return val
raise TypeError(
f"Expected a boolean for key '{k}', but got {type(val).__name__}"
)
class BaseBrowser:
def __init__(
self,
headless=True,
cache_dir: Optional[str] = None,
channel: Literal["chrome", "msedge", "chromium"] = "chromium",
cookie_json_path: Optional[str] = None,
user_data_dir: Optional[str] = None,
):
r"""Initialize the WebBrowser instance.
Args:
headless (bool): Whether to run the browser in headless mode.
cache_dir (Union[str, None]): The directory to store cache files.
channel (Literal["chrome", "msedge", "chromium"]): The browser
channel to use. Must be one of "chrome", "msedge", or
"chromium".
cookie_json_path (Optional[str]): Path to a JSON file containing
authentication cookies and browser storage state. If provided
and the file exists, the browser will load this state to
maintain authenticated sessions. This is primarily used when
`user_data_dir` is not set.
user_data_dir (Optional[str]): The directory to store user data
for persistent context. If None, a fresh browser instance
is used without saving data. (default: :obj:`None`)
Returns:
None
"""
self.history: List[Any] = []
self.headless = headless
self.channel = channel
self._ensure_browser_installed()
# lazy initialization - playwright is started in init() method
self.playwright: Optional[Playwright] = None
self.page_history: List[
str
] = [] # stores the history of visited pages
self.cookie_json_path = cookie_json_path
self.user_data_dir = user_data_dir
# Set the cache directory
self.cache_dir = "tmp/" if cache_dir is None else cache_dir
os.makedirs(self.cache_dir, exist_ok=True)
# Create user data directory only if specified
if self.user_data_dir:
os.makedirs(self.user_data_dir, exist_ok=True)
# Load the page script
abs_dir_path = os.path.dirname(os.path.abspath(__file__))
page_script_path = os.path.join(abs_dir_path, "page_script.js")
try:
with open(page_script_path, "r", encoding='utf-8') as f:
self.page_script = f.read()
f.close()
except FileNotFoundError:
raise FileNotFoundError(
f"Page script file not found at path: {page_script_path}"
)
self.browser: Optional[Browser] = None
self.context: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self.page_url: Optional[str] = None
self.web_agent_model: Optional[BaseModelBackend] = (
None # Added for type hinting
)
def init(self) -> None:
r"""Initialize the browser."""
# lazy start playwright when init() is called, not in __init__
if self.playwright is None:
from playwright.sync_api import sync_playwright
self.playwright = sync_playwright().start()
browser_launch_args = [
"--disable-blink-features=AutomationControlled", # Basic stealth
]
user_agent_string = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/91.0.4472.124 Safari/537.36"
)
if self.user_data_dir:
self.context = self.playwright.chromium.launch_persistent_context(
user_data_dir=self.user_data_dir,
headless=self.headless,
channel=self.channel,
accept_downloads=True,
user_agent=user_agent_string,
java_script_enabled=True,
args=browser_launch_args,
)
self.browser = None # Not using a separate browser instance
if (
len(self.context.pages) > 0
): # Persistent context might reopen pages
self.page = self.context.pages[0]
else:
self.page = self.context.new_page()
else:
# Launch a fresh browser instance
self.browser = self.playwright.chromium.launch(
headless=self.headless,
channel=self.channel,
args=browser_launch_args,
)
new_context_kwargs: Dict[str, Any] = {
"accept_downloads": True,
"user_agent": user_agent_string,
"java_script_enabled": True,
}
if self.cookie_json_path and os.path.exists(self.cookie_json_path):
new_context_kwargs["storage_state"] = self.cookie_json_path
self.context = self.browser.new_context(**new_context_kwargs)
self.page = self.context.new_page()
assert self.context is not None
assert self.page is not None
def clean_cache(self) -> None:
r"""Delete the cache directory and its contents."""
if os.path.exists(self.cache_dir):
shutil.rmtree(self.cache_dir)
def _wait_for_load(self, timeout: int = 20) -> None:
r"""Wait for a certain amount of time for the page to load."""
timeout_ms = timeout * 1000
assert self.page is not None
self.page.wait_for_load_state("load", timeout=timeout_ms)
# TODO: check if this is needed
time.sleep(2)
def click_blank_area(self) -> None:
r"""Click a blank area of the page to unfocus the current element."""
assert self.page is not None
self.page.mouse.click(0, 0)
self._wait_for_load()
@retry_on_error()
def visit_page(self, url: str) -> None:
r"""Visit a page with the given URL."""
assert self.page is not None
self.page.goto(url)
self._wait_for_load()
self.page_url = url
def ask_question_about_video(self, question: str) -> str:
r"""Ask a question about the video on the current page,
such as YouTube video.
Args:
question (str): The question to ask.
Returns:
str: The answer to the question.
"""
current_url = self.get_url()
# Confirm with user before proceeding due to potential slow
# processing time
confirmation_message = (
f"Do you want to analyze the video on the current "
f"page({current_url})? This operation may take a long time.(y/n): "
)
user_confirmation = input(confirmation_message)
if user_confirmation.lower() not in ['y', 'yes']:
return "User cancelled the video analysis."
model = None
if (
hasattr(self, 'web_agent_model')
and self.web_agent_model is not None
):
model = self.web_agent_model
video_analyzer = VideoAnalysisToolkit(model=model)
result = video_analyzer.ask_question_about_video(current_url, question)
return result
@retry_on_error()
def get_screenshot(
self, save_image: bool = False
) -> Tuple[Image.Image, Union[str, None]]:
r"""Get a screenshot of the current page.
Args:
save_image (bool): Whether to save the image to the cache
directory.
Returns:
Tuple[Image.Image, str]: A tuple containing the screenshot
image and the path to the image file if saved, otherwise
:obj:`None`.
"""
assert self.page is not None
image_data = self.page.screenshot(timeout=60000)
image = Image.open(io.BytesIO(image_data))
file_path = None
if save_image:
# Get url name to form a file name
# Use urlparser for a safer extraction the url name
assert self.page_url is not None
parsed_url = urllib.parse.urlparse(self.page_url)
# Max length is set to 241 as there are 10 characters for the
# timestamp and 4 characters for the file extension:
url_name = sanitize_filename(str(parsed_url.path), max_length=241)
timestamp = datetime.datetime.now().strftime("%m%d%H%M%S")
file_path = os.path.join(
self.cache_dir, f"{url_name}_{timestamp}.png"
)
with open(file_path, "wb") as f:
image.save(f, "PNG")
f.close()
return image, file_path
def capture_full_page_screenshots(
self, scroll_ratio: float = 0.8
) -> List[str]:
r"""Capture full page screenshots by scrolling the page with a buffer
zone.
Args:
scroll_ratio (float): The ratio of viewport height to scroll each
step. (default: :obj:`0.8`)
Returns:
List[str]: A list of paths to the screenshot files.
"""
screenshots: List[str] = [] # Ensure screenshots is typed
assert self.page is not None
scroll_height_eval = self.page.evaluate("document.body.scrollHeight")
scroll_height = cast(
float, scroll_height_eval
) # Ensure scroll_height is
# float
assert self.page.viewport_size is not None
viewport_height = self.page.viewport_size["height"]
current_scroll_eval = self.page.evaluate("window.scrollY")
current_scroll = cast(float, current_scroll_eval)
# screenshot_index = 1 # This variable is not used
max_height = scroll_height - viewport_height
scroll_step = int(viewport_height * scroll_ratio)
last_height = 0.0 # Initialize last_height as float
while True:
logger.debug(
f"Current scroll: {current_scroll}, max_height: "
f"{max_height}, step: {scroll_step}"
)
_, file_path = self.get_screenshot(save_image=True)
if file_path is not None: # Ensure file_path is not None before
# appending
screenshots.append(file_path)
self.page.evaluate(f"window.scrollBy(0, {scroll_step})")
# Allow time for content to load
time.sleep(0.5)
current_scroll_eval = self.page.evaluate("window.scrollY")
current_scroll = cast(float, current_scroll_eval)
# Break if there is no significant scroll
if abs(current_scroll - last_height) < viewport_height * 0.1:
break
last_height = current_scroll
# screenshot_index += 1 # This variable is not used
return screenshots
def get_visual_viewport(self) -> VisualViewport:
r"""Get the visual viewport of the current page.
Returns:
VisualViewport: The visual viewport of the current page.
"""
assert self.page is not None
try:
self.page.evaluate(self.page_script)
except Exception as e:
logger.warning(f"Error evaluating page script: {e}")
visual_viewport_eval = self.page.evaluate(
"MultimodalWebSurfer.getVisualViewport();"
)
return visual_viewport_from_dict(
cast(Dict[str, Any], visual_viewport_eval)
)
def get_interactive_elements(self) -> Dict[str, InteractiveRegion]:
r"""Get the interactive elements of the current page.
Returns:
Dict[str, InteractiveRegion]: A dictionary of interactive elements.
"""
assert self.page is not None
try:
self.page.evaluate(self.page_script)
except Exception as e:
logger.warning(f"Error evaluating page script: {e}")
result = cast(
Dict[str, Dict[str, Any]],
self.page.evaluate("MultimodalWebSurfer.getInteractiveRects();"),
)
typed_results: Dict[str, InteractiveRegion] = {}
for k in result:
typed_results[k] = interactive_region_from_dict(result[k])
return typed_results
def get_som_screenshot(
self,
save_image: bool = False,
) -> Tuple[Image.Image, Union[str, None]]:
r"""Get a screenshot of the current viewport with interactive elements
marked.
Args:
save_image (bool): Whether to save the image to the cache
directory.
Returns:
Tuple[Image.Image, Union[str, None]]: A tuple containing the
screenshot image
and an optional path to the image file if saved, otherwise
:obj:`None`.
"""
self._wait_for_load()
screenshot, _ = self.get_screenshot(save_image=False)
rects = self.get_interactive_elements()
file_path: str | None = None
comp, _, _, _ = _add_set_of_mark(
screenshot,
rects,
)
if save_image:
assert self.page_url is not None
parsed_url = urllib.parse.urlparse(self.page_url)
# Max length is set to 241 as there are 10 characters for the
# timestamp and 4 characters for the file extension:
url_name = sanitize_filename(str(parsed_url.path), max_length=241)
timestamp = datetime.datetime.now().strftime("%m%d%H%M%S")
file_path = os.path.join(
self.cache_dir, f"{url_name}_{timestamp}.png"
)
with open(file_path, "wb") as f:
comp.save(f, "PNG")
f.close()
return comp, file_path
def scroll_up(self) -> None:
r"""Scroll up the page."""
assert self.page is not None
self.page.keyboard.press("PageUp")
def scroll_down(self) -> None:
r"""Scroll down the page."""
assert self.page is not None
self.page.keyboard.press("PageDown")
def get_url(self) -> str:
r"""Get the URL of the current page."""
assert self.page is not None
return self.page.url
def click_id(self, identifier: Union[str, int]) -> None:
r"""Click an element with the given identifier."""
assert self.page is not None
if isinstance(identifier, int):
identifier = str(identifier)
target = self.page.locator(f"[__elementId='{identifier}']")
try:
target.wait_for(timeout=5000)
except Exception as e: # Consider using playwright specific
# TimeoutError
logger.debug(f"Error during click operation: {e}")
raise ValueError("No such element.") from None
target.scroll_into_view_if_needed()
new_page = None
try:
with self.page.expect_event("popup", timeout=1000) as page_info:
box: Optional[FloatRect] = target.bounding_box()
if box is None:
logger.warning(
f"Bounding box not found for element '{identifier}'. "
f"Cannot click."
)
return
self.page.mouse.click(
box["x"] + box["width"] / 2, box["y"] + box["height"] / 2
)
new_page = page_info.value
# If a new page is opened, switch to it
if new_page:
self.page_history.append(deepcopy(self.page.url))
self.page = new_page
except Exception as e: # Consider using playwright specific
# TimeoutError
logger.debug(f"Error during click operation: {e}")
pass
self._wait_for_load()
def extract_url_content(self) -> str:
r"""Extract the content of the current page."""
assert self.page is not None
content = self.page.content()
return content
def download_file_id(self, identifier: Union[str, int]) -> str:
r"""Download a file with the given selector.
Args:
identifier (str): The identifier of the file to download.
Returns:
str: The result of the action.
"""
assert self.page is not None
if isinstance(identifier, int):
identifier = str(identifier)
try:
target = self.page.locator(f"[__elementId='{identifier}']")
except Exception as e: # Consider using playwright specific
# TimeoutError
logger.debug(f"Error during download operation: {e}")
logger.warning(
f"Element with identifier '{identifier}' not found."
)
return f"Element with identifier '{identifier}' not found."
target.scroll_into_view_if_needed()
file_path_val = os.path.join(self.cache_dir)
self._wait_for_load()
try:
with self.page.expect_download() as download_info:
target.click()
download = download_info.value
file_name = download.suggested_filename
file_path_val = os.path.join(file_path_val, file_name)
download.save_as(file_path_val)
return f"Downloaded file to path '{file_path_val}'."
except Exception as e: # Consider using playwright specific
# TimeoutError
logger.debug(f"Error during download operation: {e}")
return f"Failed to download file with identifier '{identifier}'."
def fill_input_id(self, identifier: Union[str, int], text: str) -> str:
r"""Fill an input field with the given text, and then press Enter.
Args:
identifier (str): The identifier of the input field.
text (str): The text to fill.
Returns:
str: The result of the action.
"""
assert self.page is not None
if isinstance(identifier, int):
identifier = str(identifier)
try:
target = self.page.locator(f"[__elementId='{identifier}']")
except Exception as e: # Consider using playwright specific
# TimeoutError
logger.debug(f"Error during fill operation: {e}")
logger.warning(
f"Element with identifier '{identifier}' not found."
)
return f"Element with identifier '{identifier}' not found."
target.scroll_into_view_if_needed()
target.focus()
try:
target.fill(text)
except Exception as e: # Consider using playwright specific
# TimeoutError
logger.debug(f"Error during fill operation: {e}")
target.press_sequentially(text)
target.press("Enter")
self._wait_for_load()
return (
f"Filled input field '{identifier}' with text '{text}' "
f"and pressed Enter."
)
def scroll_to_bottom(self) -> str:
assert self.page is not None
self.page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
self._wait_for_load()
return "Scrolled to the bottom of the page."
def scroll_to_top(self) -> str:
assert self.page is not None
self.page.evaluate("window.scrollTo(0, 0);")
self._wait_for_load()
return "Scrolled to the top of the page."
def hover_id(self, identifier: Union[str, int]) -> str:
r"""Hover over an element with the given identifier.
Args:
identifier (str): The identifier of the element to hover over.
Returns:
str: The result of the action.
"""
assert self.page is not None
if isinstance(identifier, int):
identifier = str(identifier)
try:
target = self.page.locator(f"[__elementId='{identifier}']")
except Exception as e: # Consider using playwright specific
# TimeoutError
logger.debug(f"Error during hover operation: {e}")
logger.warning(
f"Element with identifier '{identifier}' not found."
)
return f"Element with identifier '{identifier}' not found."
target.scroll_into_view_if_needed()
target.hover()
self._wait_for_load()
return f"Hovered over element with identifier '{identifier}'."
def find_text_on_page(self, search_text: str) -> str:
r"""Find the next given text on the page, and scroll the page to the
targeted text. It is equivalent to pressing Ctrl + F and searching for
the text.
"""
assert self.page is not None
script = f"""
(function() {{
let text = "{search_text}";
let found = window.find(text);
if (!found) {{
let elements = document.querySelectorAll("*:not(script):not(
style)");
for (let el of elements) {{
if (el.innerText && el.innerText.includes(text)) {{
el.scrollIntoView({{behavior: "smooth", block:
"center"}});
el.style.backgroundColor = "yellow";
el.style.border = '2px solid red';
return true;
}}
}}
return false;
}}
return true;
}})();
"""
found_eval = self.page.evaluate(script)
found = cast(bool, found_eval) # Ensure found is bool
self._wait_for_load()
if found:
return f"Found text '{search_text}' on the page."
else:
return f"Text '{search_text}' not found on the page."
def back(self):
r"""Navigate back to the previous page."""
assert self.page is not None
page_url_before = self.page.url
self.page.go_back()
page_url_after = self.page.url
if page_url_after == "about:blank":
self.visit_page(page_url_before)
if page_url_before == page_url_after:
# If the page is not changed, try to use the history
if len(self.page_history) > 0:
self.visit_page(self.page_history.pop())
time.sleep(1)
self._wait_for_load()
def close(self):
if self.context is not None:
self.context.close()
if (
self.browser is not None
): # Only close browser if it was launched separately
self.browser.close()
if self.playwright:
self.playwright.stop() # Stop playwright instance
def show_interactive_elements(self):
r"""Show simple interactive elements on the current page."""
assert self.page is not None
self.page.evaluate(self.page_script)
self.page.evaluate("""
() => {
document.querySelectorAll('a, button, input, select, textarea,
[tabindex]:not([tabindex="-1"]),
[contenteditable="true"]').forEach(el => {
el.style.border = '2px solid red';
});
}
""")
@retry_on_error()
def get_webpage_content(self) -> str:
from html2text import html2text
assert self.page is not None
self._wait_for_load()
html_content = self.page.content()
markdown_content = html2text(html_content)
return markdown_content
def _ensure_browser_installed(self) -> None:
r"""Ensure the browser is installed."""
import platform
import subprocess
import sys
try:
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(channel=self.channel)
browser.close()
except Exception:
logger.info("Installing Chromium browser...")
try:
subprocess.run(
[
sys.executable,
"-m",
"playwright",
"install",
self.channel,
],
check=True,
capture_output=True,
)
if platform.system().lower() == "linux":
subprocess.run(
[
sys.executable,
"-m",
"playwright",
"install-deps",
self.channel,
],
check=True,
capture_output=True,
)
logger.info("Chromium browser installation completed")
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to install browser: {e.stderr}")
class BrowserToolkit(BaseToolkit):
r"""A class for browsing the web and interacting with web pages.
This class provides methods for browsing the web and interacting with web
pages.
"""
def __init__(
self,
headless: bool = False,
cache_dir: Optional[str] = None,
channel: Literal["chrome", "msedge", "chromium"] = "chromium",
history_window: int = 5,
web_agent_model: Optional[BaseModelBackend] = None,
planning_agent_model: Optional[BaseModelBackend] = None,
output_language: str = "en",
cookie_json_path: Optional[str] = None,
user_data_dir: Optional[str] = None,
):
r"""Initialize the BrowserToolkit instance.
Args:
headless (bool): Whether to run the browser in headless mode.
When running inside a CAMEL runtime container, this is
automatically set to True since containers typically don't
have a display.
cache_dir (Union[str, None]): The directory to store cache files.
channel (Literal["chrome", "msedge", "chromium"]): The browser
channel to use. Must be one of "chrome", "msedge", or
"chromium".
history_window (int): The window size for storing the history of
actions.
web_agent_model (Optional[BaseModelBackend]): The model backend
for the web agent.
planning_agent_model (Optional[BaseModelBackend]): The model
backend for the planning agent.
output_language (str): The language to use for output.
(default: :obj:`"en`")
cookie_json_path (Optional[str]): Path to a JSON file containing
authentication cookies and browser storage state. If provided
and the file exists, the browser will load this state to
maintain
authenticated sessions without requiring manual login.
(default: :obj:`None`)
user_data_dir (Optional[str]): The directory to store user data
for persistent context. If None, a fresh browser instance
is used without saving data. (default: :obj:`None`)
"""
super().__init__() # Call to super().__init__() added
# auto-detect if running inside a CAMEL runtime container
# force headless mode since containers typically don't have a display
in_runtime = os.environ.get("CAMEL_RUNTIME", "").lower() == "true"
if in_runtime and not headless:
logger.info(
"Detected CAMEL_RUNTIME environment - enabling headless mode "
"since containers typically don't have a display"
)
headless = True
self.browser = BaseBrowser(
headless=headless,
cache_dir=cache_dir,
channel=channel,
cookie_json_path=cookie_json_path,
user_data_dir=user_data_dir,
)
self.browser.web_agent_model = web_agent_model # Pass model to
# BaseBrowser instance
self.history_window = history_window
self.web_agent_model = web_agent_model
self.planning_agent_model = planning_agent_model
self.output_language = output_language
self.history: List[Dict[str, Any]] = [] # Typed history list
self.web_agent: ChatAgent
self.planning_agent: ChatAgent
self.web_agent, self.planning_agent = self._initialize_agent(
web_agent_model, planning_agent_model
)
def _reset(self):
self.web_agent.reset()
self.planning_agent.reset()
self.history = []
os.makedirs(self.browser.cache_dir, exist_ok=True)
def _initialize_agent(
self,
web_agent_model_backend: Optional[BaseModelBackend],
planning_agent_model_backend: Optional[BaseModelBackend],
) -> Tuple[ChatAgent, ChatAgent]:
r"""Initialize the agent."""
from camel.agents import ChatAgent
if web_agent_model_backend is None:
web_agent_model_instance = ModelFactory.create(
model_platform=ModelPlatformType.DEFAULT,
model_type=ModelType.DEFAULT,
model_config_dict={"temperature": 0, "top_p": 1},
)
else:
web_agent_model_instance = web_agent_model_backend
if planning_agent_model_backend is None:
planning_model = ModelFactory.create(
model_platform=ModelPlatformType.DEFAULT,
model_type=ModelType.DEFAULT,
)
else:
planning_model = planning_agent_model_backend
system_prompt = WEB_AGENT_SYSTEM_PROMPT
web_agent = ChatAgent(
system_message=system_prompt,
model=web_agent_model_instance,
output_language=self.output_language,
)
planning_system_prompt = PLANNING_AGENT_SYSTEM_PROMPT
planning_agent = ChatAgent(
system_message=planning_system_prompt,
model=planning_model,
output_language=self.output_language,
)
return web_agent, planning_agent
def _observe(
self, task_prompt: str, detailed_plan: Optional[str] = None
) -> Tuple[str, str, str]:
r"""Let agent observe the current environment, and get the next
action."""
detailed_plan_prompt_str = ""
if detailed_plan is not None:
detailed_plan_prompt_str = f"""
Here is a plan about how to solve the task step-by-step which you must follow:
<detailed_plan>{detailed_plan}<detailed_plan>
"""
observe_prompt = OBSERVE_PROMPT_TEMPLATE.format(
task_prompt=task_prompt,
detailed_plan_prompt=detailed_plan_prompt_str,
AVAILABLE_ACTIONS_PROMPT=AVAILABLE_ACTIONS_PROMPT,
history_window=self.history_window,
history=self.history[-self.history_window :],
)
# get current state
som_screenshot, _ = self.browser.get_som_screenshot(save_image=True)
img = _reload_image(som_screenshot)
message = BaseMessage.make_user_message(
role_name='user', content=observe_prompt, image_list=[img]
)
# Reset the history message of web_agent.
self.web_agent.reset()
resp = self.web_agent.step(message)
resp_content = resp.msgs[0].content
resp_dict = _parse_json_output(resp_content, logger) # Pass logger to
# _parse_json_output
observation_result: str = resp_dict.get("observation", "")
reasoning_result: str = resp_dict.get("reasoning", "")
action_code: str = resp_dict.get("action_code", "")
if action_code and "(" in action_code and ")" not in action_code:
action_match = re.search(
r'"action_code"\s*:\s*[`"]([^`"]*\([^)]*\))[`"]', resp_content
)
if action_match:
action_code = action_match.group(1)
else:
logger.warning(
f"Incomplete action_code detected: {action_code}"
)
if action_code.startswith("fill_input_id("):
parts = action_code.split(",", 1)
if len(parts) > 1:
id_part = (
parts[0].replace("fill_input_id(", "").strip()
)
action_code = (
f"fill_input_id({id_part}, 'Please "
f"fill the text here.')"
)
action_code = action_code.replace("`", "").strip()
return observation_result, reasoning_result, action_code
def _act(self, action_code: str) -> Tuple[bool, str]:
r"""Let agent act based on the given action code.
Args:
action_code (str): The action code to act.
Returns:
Tuple[bool, str]: A tuple containing a boolean indicating whether
the action was successful, and the information to be returned.
"""
def _check_if_with_feedback(action_code: str) -> bool:
r"""Check if the action code needs feedback."""
for action_with_feedback in ACTION_WITH_FEEDBACK_LIST:
if action_with_feedback in action_code:
return True
return False
def _fix_action_code(action_code: str) -> str:
r"""Fix potential missing quotes in action code"""
match = re.match(r'(\w+)\((.*)\)', action_code)
if not match:
return action_code
func_name, args_str = match.groups()
args = []
current_arg = ""
in_quotes = False
quote_char = None
for char in args_str:
if char in ['"', "'"]:
if not in_quotes:
in_quotes = True
quote_char = char
current_arg += char
elif char == quote_char:
in_quotes = False
quote_char = None
current_arg += char
else:
current_arg += char
elif char == ',' and not in_quotes:
args.append(current_arg.strip())
current_arg = ""
else:
current_arg += char
if current_arg:
args.append(current_arg.strip())
fixed_args = []
for arg in args:
if (
(arg.startswith('"') and arg.endswith('"'))
or (arg.startswith("'") and arg.endswith("'"))
or re.match(r'^-?\d+(\.\d+)?$', arg)
or re.match(r'^-?\d+\.?\d*[eE][-+]?\d+$', arg)
or re.match(r'^0[xX][0-9a-fA-F]+$', arg)
):
fixed_args.append(arg)
else:
fixed_args.append(f"'{arg}'")
return f"{func_name}({', '.join(fixed_args)})"
action_code = _fix_action_code(action_code)
prefix = "self.browser."
code = f"{prefix}{action_code}"
try:
if _check_if_with_feedback(action_code):
# execute code, and get the executed result
result = eval(code)
time.sleep(1)
return True, result
else:
exec(code)
time.sleep(1)
return True, "Action was successful."
except Exception as e:
time.sleep(1)
return (
False,
f"Error while executing the action {action_code}: {e}. "
f"If timeout, please recheck whether you have provided the "
f"correct identifier.",
)
def _get_final_answer(self, task_prompt: str) -> str:
r"""Get the final answer based on the task prompt and current
browser state.
It is used when the agent thinks that the task can be completed
without any further action, and answer can be directly found in the
current viewport.
"""
prompt = GET_FINAL_ANSWER_PROMPT_TEMPLATE.format(
history=self.history, task_prompt=task_prompt
)
message = BaseMessage.make_user_message(
role_name='user',
content=prompt,
)
self.web_agent.reset() # Reset before step
resp = self.web_agent.step(message)
return resp.msgs[0].content
def _task_planning(self, task_prompt: str, start_url: str) -> str:
r"""Plan the task based on the given task prompt."""
planning_prompt = TASK_PLANNING_PROMPT_TEMPLATE.format(
task_prompt=task_prompt, start_url=start_url
)
message = BaseMessage.make_user_message(
role_name='user', content=planning_prompt
)
self.planning_agent.reset() # Reset before step
resp = self.planning_agent.step(message)
return resp.msgs[0].content
def _task_replanning(
self, task_prompt: str, detailed_plan: str
) -> Tuple[bool, str]:
r"""Replan the task based on the given task prompt.
Args:
task_prompt (str): The original task prompt.
detailed_plan (str): The detailed plan to replan.
Returns:
Tuple[bool, str]: A tuple containing a boolean indicating
whether the task needs to be replanned, and the replanned schema.
"""
replanning_prompt = TASK_REPLANNING_PROMPT_TEMPLATE.format(
task_prompt=task_prompt,
detailed_plan=detailed_plan,
history_window=self.history_window,
history=self.history[-self.history_window :],
)
# Reset the history message of planning_agent.
self.planning_agent.reset()
resp = self.planning_agent.step(replanning_prompt)
resp_dict = _parse_json_output(
resp.msgs[0].content, logger
) # Pass logger
if_need_replan_eval = resp_dict.get("if_need_replan", False)
if_need_replan = cast(bool, if_need_replan_eval) # Ensure bool
replanned_schema: str = resp_dict.get("replanned_schema", "")
if if_need_replan:
return True, replanned_schema
else:
return False, replanned_schema
@dependencies_required("playwright")
def browse_url(
self, task_prompt: str, start_url: str, round_limit: int = 12
) -> str:
r"""A powerful toolkit which can simulate the browser interaction to
solve the task which needs multi-step actions.
Args:
task_prompt (str): The task prompt to solve.
start_url (str): The start URL to visit.
round_limit (int): The round limit to solve the task.
(default: :obj:`12`).
Returns:
str: The simulation result to the task.
"""
self._reset()
task_completed = False
detailed_plan = self._task_planning(task_prompt, start_url)
logger.debug(f"Detailed plan: {detailed_plan}")
self.browser.init()
self.browser.visit_page(start_url)
for i in range(round_limit):
observation, reasoning, action_code = self._observe(
task_prompt, detailed_plan
)
logger.debug(f"Observation: {observation}")
logger.debug(f"Reasoning: {reasoning}")
logger.debug(f"Action code: {action_code}")
trajectory_info: Dict[str, Any]
if "stop" in action_code:
task_completed = True
trajectory_info = { # Typed trajectory_info
"round": i,
"observation": observation,
"thought": reasoning,
"action": action_code,
"action_if_success": True,
"info": None,
"current_url": self.browser.get_url(),
}
self.history.append(trajectory_info)
break
else:
success, info = self._act(action_code)
if not success:
logger.warning(f"Error while executing the action: {info}")
trajectory_info = { # Typed trajectory_info
"round": i,
"observation": observation,
"thought": reasoning,
"action": action_code,
"action_if_success": success,
"info": info,
"current_url": self.browser.get_url(),
}
self.history.append(trajectory_info)
# Replan the task if necessary
if_need_replan, replanned_schema = self._task_replanning(
task_prompt, detailed_plan
)
if if_need_replan:
detailed_plan = replanned_schema
logger.debug(f"Replanned schema: {replanned_schema}")
simulation_result: str
if not task_completed:
simulation_result = f"""
The task is not completed within the round limit. Please
check the last round {self.history_window} information to
see if there is any useful information:
<history>{self.history[-self.history_window :]}</history>
"""
else:
simulation_result = self._get_final_answer(task_prompt)
self.browser.close() # Close browser after task completion or limit
# reached
return simulation_result
def get_tools(self) -> List[FunctionTool]:
return [FunctionTool(self.browse_url)]