mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-17 12:31:20 +00:00
Teach the Browser content helper to ignore global/delegated framework event bindings so snapshots surface the actual actionable controls instead of broad wrapper elements. Add an accessible name to the Browser address bar for more reliable capture output. Allow agents to use selector-based reference actions, coordinate click fallbacks, focused-field typing, and string key chords such as CTRL+A across the browser tool, container runtime, and host connector runtime. Cover the behavior with browser regression and host connector tests.
531 lines
20 KiB
Python
531 lines
20 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import time
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from helpers import files
|
|
from helpers.print_style import PrintStyle
|
|
from helpers.tool import Response, Tool
|
|
from plugins._browser.helpers.selector import get_tool_runtime
|
|
|
|
|
|
HISTORY_SCREENSHOT_QUALITY = 62
|
|
HISTORY_SCREENSHOT_ACTION_DENYLIST = {"close", "close_all"}
|
|
|
|
|
|
async def get_runtime(context_id: str, create: bool = True, agent: Any | None = None):
|
|
if agent is not None:
|
|
return await get_tool_runtime(agent)
|
|
from plugins._browser.helpers.runtime import get_runtime as get_container_runtime
|
|
|
|
return await get_container_runtime(context_id, create=create)
|
|
|
|
|
|
class Browser(Tool):
|
|
async def execute(
|
|
self,
|
|
action: str = "",
|
|
browser_id: int | str | None = None,
|
|
url: str = "",
|
|
ref: int | str | None = None,
|
|
target_ref: int | str | None = None,
|
|
text: str = "",
|
|
selector: str = "",
|
|
selectors: list[str] | None = None,
|
|
script: str = "",
|
|
modifiers: list[str] | str | None = None,
|
|
keys: list[str] | None = None,
|
|
key: str = "",
|
|
include_content: bool = False,
|
|
focus_popup: bool | None = None,
|
|
event_type: str = "",
|
|
x: float = 0.0,
|
|
y: float = 0.0,
|
|
to_x: float = 0.0,
|
|
to_y: float = 0.0,
|
|
offset_x: float = 0.0,
|
|
offset_y: float = 0.0,
|
|
target_offset_x: float = 0.0,
|
|
target_offset_y: float = 0.0,
|
|
delta_x: float = 0.0,
|
|
delta_y: float = 0.0,
|
|
button: str = "left",
|
|
quality: int = 80,
|
|
full_page: bool = False,
|
|
path: str = "",
|
|
paths: list[str] | None = None,
|
|
value: str = "",
|
|
values: list[str] | None = None,
|
|
checked: bool | None = None,
|
|
width: int = 0,
|
|
height: int = 0,
|
|
calls: list[dict[str, Any]] | None = None,
|
|
**kwargs: Any,
|
|
) -> Response:
|
|
method_action = str(self.method or "").strip().lower().replace("-", "_")
|
|
requested_action = str(action or "").strip().lower().replace("-", "_")
|
|
clipboard_action = ""
|
|
if method_action == "clipboard" and requested_action in {"copy", "cut", "paste"}:
|
|
clipboard_action = requested_action
|
|
action = "clipboard"
|
|
else:
|
|
action = str(action or self.method or "state").strip().lower().replace("-", "_")
|
|
try:
|
|
runtime = await get_runtime(self.agent.context.id, agent=self.agent)
|
|
except Exception as exc:
|
|
return Response(message=f"Browser runtime unavailable: {exc}", break_loop=False)
|
|
|
|
if isinstance(modifiers, str):
|
|
modifiers = [modifiers] if modifiers else None
|
|
elif isinstance(modifiers, list) and not modifiers:
|
|
modifiers = None
|
|
keys = self._normalize_keys(keys)
|
|
|
|
try:
|
|
if action == "open":
|
|
result = await runtime.call("open", url or "")
|
|
elif action == "screenshot":
|
|
if not path:
|
|
path = self._history_screenshot_path(action)
|
|
result = await runtime.call(
|
|
"screenshot_file",
|
|
browser_id,
|
|
quality=quality,
|
|
full_page=full_page,
|
|
path=path,
|
|
)
|
|
elif action == "list":
|
|
result = await runtime.call("list", include_content=bool(include_content))
|
|
elif action == "state":
|
|
result = await runtime.call("state", browser_id)
|
|
elif action in {"set_active", "setactive", "activate", "focus"}:
|
|
result = await runtime.call("set_active", browser_id)
|
|
elif action == "navigate":
|
|
result = await runtime.call("navigate", browser_id, url)
|
|
elif action == "back":
|
|
result = await runtime.call("back", browser_id)
|
|
elif action == "forward":
|
|
result = await runtime.call("forward", browser_id)
|
|
elif action == "reload":
|
|
result = await runtime.call("reload", browser_id)
|
|
elif action == "content":
|
|
payload = self._selector_payload(selector, selectors)
|
|
result = await runtime.call("content", browser_id, payload)
|
|
elif action == "detail":
|
|
result = await runtime.call(
|
|
"detail",
|
|
browser_id,
|
|
await self._resolve_ref(runtime, browser_id, ref, selector, action),
|
|
)
|
|
elif action == "click":
|
|
resolved_ref = await self._resolve_ref(
|
|
runtime,
|
|
browser_id,
|
|
ref,
|
|
selector,
|
|
action,
|
|
required=not self._has_coordinates(x, y),
|
|
)
|
|
if resolved_ref is None and self._has_coordinates(x, y):
|
|
result = await runtime.call(
|
|
"mouse", browser_id, "click", x, y,
|
|
button=button or "left", modifiers=modifiers,
|
|
)
|
|
elif modifiers:
|
|
result = await runtime.call(
|
|
"click", browser_id, resolved_ref,
|
|
modifiers=modifiers, focus_popup=focus_popup,
|
|
)
|
|
else:
|
|
result = await runtime.call("click", browser_id, resolved_ref)
|
|
elif action == "type":
|
|
resolved_ref = await self._resolve_ref(
|
|
runtime,
|
|
browser_id,
|
|
ref,
|
|
selector,
|
|
action,
|
|
required=False,
|
|
)
|
|
if resolved_ref is None:
|
|
result = await runtime.call("keyboard", browser_id, key="", text=text)
|
|
else:
|
|
result = await runtime.call("type", browser_id, resolved_ref, text)
|
|
elif action == "submit":
|
|
result = await runtime.call(
|
|
"submit",
|
|
browser_id,
|
|
await self._resolve_ref(runtime, browser_id, ref, selector, action),
|
|
)
|
|
elif action in {"type_submit", "typesubmit"}:
|
|
result = await runtime.call(
|
|
"type_submit",
|
|
browser_id,
|
|
await self._resolve_ref(runtime, browser_id, ref, selector, action),
|
|
text,
|
|
)
|
|
elif action == "scroll":
|
|
result = await runtime.call(
|
|
"scroll",
|
|
browser_id,
|
|
await self._resolve_ref(runtime, browser_id, ref, selector, action),
|
|
)
|
|
elif action == "evaluate":
|
|
result = await runtime.call("evaluate", browser_id, script)
|
|
elif action in {"key_chord", "keychord"}:
|
|
if not keys:
|
|
raise ValueError("key_chord requires non-empty 'keys' list")
|
|
result = await runtime.call("key_chord", browser_id, keys)
|
|
elif action == "hover":
|
|
result = await runtime.call(
|
|
"hover",
|
|
browser_id,
|
|
ref=ref,
|
|
x=x,
|
|
y=y,
|
|
offset_x=offset_x,
|
|
offset_y=offset_y,
|
|
)
|
|
elif action == "double_click":
|
|
result = await runtime.call(
|
|
"double_click",
|
|
browser_id,
|
|
ref=ref,
|
|
x=x,
|
|
y=y,
|
|
button=button or "left",
|
|
modifiers=modifiers,
|
|
offset_x=offset_x,
|
|
offset_y=offset_y,
|
|
)
|
|
elif action == "right_click":
|
|
result = await runtime.call(
|
|
"right_click",
|
|
browser_id,
|
|
ref=ref,
|
|
x=x,
|
|
y=y,
|
|
modifiers=modifiers,
|
|
offset_x=offset_x,
|
|
offset_y=offset_y,
|
|
)
|
|
elif action == "drag":
|
|
result = await runtime.call(
|
|
"drag",
|
|
browser_id,
|
|
ref=ref,
|
|
target_ref=target_ref,
|
|
x=x,
|
|
y=y,
|
|
to_x=to_x,
|
|
to_y=to_y,
|
|
offset_x=offset_x,
|
|
offset_y=offset_y,
|
|
target_offset_x=target_offset_x,
|
|
target_offset_y=target_offset_y,
|
|
)
|
|
elif action == "wheel":
|
|
result = await runtime.call(
|
|
"wheel",
|
|
browser_id,
|
|
x,
|
|
y,
|
|
delta_x,
|
|
delta_y,
|
|
)
|
|
elif action == "keyboard":
|
|
result = await runtime.call(
|
|
"keyboard",
|
|
browser_id,
|
|
key=key,
|
|
text=text,
|
|
)
|
|
elif action == "clipboard":
|
|
normalized_clipboard_action = clipboard_action or str(
|
|
kwargs.get("clipboard_action")
|
|
or kwargs.get("operation")
|
|
or event_type
|
|
or ""
|
|
).strip().lower()
|
|
result = await runtime.call(
|
|
"clipboard",
|
|
browser_id,
|
|
action=normalized_clipboard_action,
|
|
text=text,
|
|
)
|
|
elif action in {"copy", "cut", "paste"}:
|
|
result = await runtime.call(
|
|
"clipboard",
|
|
browser_id,
|
|
action=action,
|
|
text=text,
|
|
)
|
|
elif action == "set_viewport":
|
|
result = await runtime.call("set_viewport", browser_id, width, height)
|
|
elif action == "select_option":
|
|
result = await runtime.call(
|
|
"select_option",
|
|
browser_id,
|
|
await self._resolve_ref(runtime, browser_id, ref, selector, action),
|
|
value=value,
|
|
values=values,
|
|
)
|
|
elif action == "set_checked":
|
|
result = await runtime.call(
|
|
"set_checked",
|
|
browser_id,
|
|
await self._resolve_ref(runtime, browser_id, ref, selector, action),
|
|
checked=True if checked is None else bool(checked),
|
|
)
|
|
elif action == "upload_file":
|
|
result = await runtime.call(
|
|
"upload_file",
|
|
browser_id,
|
|
await self._resolve_ref(runtime, browser_id, ref, selector, action),
|
|
path=path,
|
|
paths=paths,
|
|
)
|
|
elif action == "mouse":
|
|
result = await runtime.call(
|
|
"mouse", browser_id, event_type or "click", x, y,
|
|
button=button or "left", modifiers=modifiers,
|
|
)
|
|
elif action == "multi":
|
|
if not calls:
|
|
raise ValueError("multi requires non-empty 'calls' list")
|
|
result = await runtime.call("multi", list(calls))
|
|
elif action == "close":
|
|
result = await runtime.call("close_browser", browser_id)
|
|
elif action == "close_all":
|
|
result = await runtime.call("close_all_browsers")
|
|
else:
|
|
return Response(
|
|
message=f"Unknown browser action: {action}",
|
|
break_loop=False,
|
|
)
|
|
await self._record_history_screenshot(runtime, action, result, browser_id)
|
|
except Exception as exc:
|
|
return Response(message=f"Browser {action} failed: {exc}", break_loop=False)
|
|
|
|
return Response(message=self._format_result(action, result), break_loop=False)
|
|
|
|
def get_log_object(self):
|
|
return self.agent.context.log.log(
|
|
type="tool",
|
|
heading=f"icon://captive_portal {self.agent.agent_name}: Using browser",
|
|
content="",
|
|
kvps=self.args,
|
|
_tool_name=self.name,
|
|
)
|
|
|
|
@staticmethod
|
|
def _require_ref(ref: int | str | None) -> int | str:
|
|
if ref is None or str(ref).strip() == "":
|
|
raise ValueError("ref is required for this browser action")
|
|
return ref
|
|
|
|
@staticmethod
|
|
def _has_ref(ref: int | str | None) -> bool:
|
|
return ref is not None and str(ref).strip() != ""
|
|
|
|
@staticmethod
|
|
def _has_coordinates(x: float, y: float) -> bool:
|
|
return bool(float(x or 0) or float(y or 0))
|
|
|
|
@classmethod
|
|
async def _resolve_ref(
|
|
cls,
|
|
runtime: Any,
|
|
browser_id: int | str | None,
|
|
ref: int | str | None,
|
|
selector: str = "",
|
|
action: str = "action",
|
|
*,
|
|
required: bool = True,
|
|
) -> int | str | None:
|
|
if cls._has_ref(ref):
|
|
return ref
|
|
|
|
selector = str(selector or "").strip()
|
|
if selector:
|
|
content = await runtime.call("content", browser_id, {"selector": selector})
|
|
resolved = cls._first_ref_from_content(content, selector)
|
|
if resolved is not None:
|
|
return resolved
|
|
raise ValueError(
|
|
f"{action} could not resolve selector {selector!r} to a browser ref"
|
|
)
|
|
|
|
if required:
|
|
return cls._require_ref(ref)
|
|
return None
|
|
|
|
@staticmethod
|
|
def _first_ref_from_content(content: Any, selector: str = "") -> str | None:
|
|
if isinstance(content, dict):
|
|
values: list[Any] = []
|
|
if selector and selector in content:
|
|
values.append(content.get(selector))
|
|
values.extend(value for key, value in content.items() if key != selector)
|
|
text = "\n".join(str(value or "") for value in values)
|
|
else:
|
|
text = str(content or "")
|
|
match = re.search(r"\[[^\]\n]*?\b(\d+)\]", text)
|
|
return match.group(1) if match else None
|
|
|
|
@staticmethod
|
|
def _normalize_keys(keys: list[str] | str | None) -> list[str]:
|
|
if keys is None:
|
|
return []
|
|
if isinstance(keys, str):
|
|
raw = re.split(r"\s*\+\s*|\s*,\s*", keys.strip())
|
|
elif isinstance(keys, list):
|
|
raw = keys
|
|
else:
|
|
raw = [str(keys)]
|
|
aliases = {
|
|
"cmd": "Meta",
|
|
"command": "Meta",
|
|
"control": "Control",
|
|
"ctrl": "Control",
|
|
"escape": "Escape",
|
|
"esc": "Escape",
|
|
"meta": "Meta",
|
|
"option": "Alt",
|
|
"return": "Enter",
|
|
"space": "Space",
|
|
}
|
|
normalized: list[str] = []
|
|
for key in raw:
|
|
value = str(key or "").strip()
|
|
if not value:
|
|
continue
|
|
normalized.append(aliases.get(value.lower(), value.upper() if len(value) == 1 and value.isalpha() else value))
|
|
return normalized
|
|
|
|
@staticmethod
|
|
def _selector_payload(selector: str = "", selectors: list[str] | None = None) -> dict | None:
|
|
if selectors:
|
|
return {"selectors": selectors}
|
|
if selector:
|
|
return {"selector": selector}
|
|
return None
|
|
|
|
async def _record_history_screenshot(
|
|
self,
|
|
runtime: Any,
|
|
action: str,
|
|
result: Any,
|
|
requested_browser_id: int | str | None = None,
|
|
) -> None:
|
|
if not getattr(self, "log", None):
|
|
return
|
|
if action in HISTORY_SCREENSHOT_ACTION_DENYLIST:
|
|
return
|
|
|
|
screenshot = result if action == "screenshot" and isinstance(result, dict) else None
|
|
if not self._screenshot_has_path(screenshot):
|
|
target_browser_id = self._browser_id_from_result(result) or requested_browser_id
|
|
output_path = self._history_screenshot_path(action)
|
|
if not output_path:
|
|
return
|
|
try:
|
|
screenshot = await runtime.call(
|
|
"screenshot_file",
|
|
target_browser_id,
|
|
quality=HISTORY_SCREENSHOT_QUALITY,
|
|
full_page=False,
|
|
path=output_path,
|
|
)
|
|
except Exception as exc:
|
|
PrintStyle.debug(
|
|
"Browser history screenshot capture failed:",
|
|
f"browser_id={target_browser_id}",
|
|
f"quality={HISTORY_SCREENSHOT_QUALITY}",
|
|
f"path={output_path}",
|
|
f"error={exc}",
|
|
)
|
|
return
|
|
|
|
if not self._screenshot_has_path(screenshot):
|
|
return
|
|
|
|
local_path = str(screenshot.get("path") or files.fix_dev_path(str(screenshot.get("a0_path") or "")))
|
|
if not local_path:
|
|
return
|
|
uri = f"img://{local_path}&t={time.time()}"
|
|
state = screenshot.get("state") if isinstance(screenshot.get("state"), dict) else {}
|
|
self.log.update(
|
|
Screenshot=uri,
|
|
browser_snapshot={
|
|
"uri": uri,
|
|
"path": local_path,
|
|
"a0_path": screenshot.get("a0_path") or files.normalize_a0_path(local_path),
|
|
"mime": screenshot.get("mime") or "image/jpeg",
|
|
"browser_id": screenshot.get("browser_id") or state.get("id") or requested_browser_id,
|
|
"context_id": screenshot.get("context_id") or state.get("context_id") or "",
|
|
},
|
|
)
|
|
|
|
def _history_screenshot_path(self, action: str) -> str:
|
|
if not getattr(self, "agent", None) or not getattr(self.agent, "context", None):
|
|
return ""
|
|
context_id = str(getattr(self.agent.context, "id", "") or "").strip()
|
|
if not context_id:
|
|
return ""
|
|
from helpers import persist_chat
|
|
|
|
token = str(getattr(getattr(self, "log", None), "id", "") or uuid.uuid4())
|
|
safe_action = files.safe_file_name(str(action or "browser"))
|
|
safe_token = files.safe_file_name(token)
|
|
timestamp = time.strftime("%Y%m%d-%H%M%S")
|
|
return str(
|
|
Path(persist_chat.get_chat_folder_path(context_id))
|
|
/ "browser"
|
|
/ "screenshots"
|
|
/ f"{timestamp}-{safe_action}-{safe_token}.jpg"
|
|
)
|
|
|
|
@staticmethod
|
|
def _browser_id_from_result(result: Any) -> Any:
|
|
if not isinstance(result, dict):
|
|
return None
|
|
browsers = result.get("browsers") if isinstance(result.get("browsers"), list) else []
|
|
last_interacted_id = result.get("last_interacted_browser_id")
|
|
listed_browser = None
|
|
if last_interacted_id is not None:
|
|
listed_browser = next(
|
|
(
|
|
browser
|
|
for browser in browsers
|
|
if isinstance(browser, dict) and str(browser.get("id")) == str(last_interacted_id)
|
|
),
|
|
None,
|
|
)
|
|
if listed_browser is None and browsers:
|
|
listed_browser = next((browser for browser in browsers if isinstance(browser, dict)), None)
|
|
state = result.get("state") if isinstance(result.get("state"), dict) else {}
|
|
return (
|
|
result.get("id")
|
|
or result.get("browser_id")
|
|
or state.get("id")
|
|
or last_interacted_id
|
|
or (listed_browser or {}).get("id")
|
|
)
|
|
|
|
@staticmethod
|
|
def _screenshot_has_path(screenshot: Any) -> bool:
|
|
return isinstance(screenshot, dict) and bool(screenshot.get("path") or screenshot.get("a0_path"))
|
|
|
|
@staticmethod
|
|
def _format_result(action: str, result: Any) -> str:
|
|
if action == "content" and isinstance(result, dict):
|
|
if set(result.keys()) == {"document"}:
|
|
return str(result.get("document") or "")
|
|
return json.dumps(result, indent=2, ensure_ascii=False)
|
|
|
|
return json.dumps(result, indent=2, ensure_ascii=False, default=str)
|