from __future__ import annotations import json import re import time import uuid from pathlib import Path from typing import Any from helpers import files from helpers.print_style import PrintStyle from helpers.tool import Response, Tool from plugins._browser.helpers.selector import get_tool_runtime HISTORY_SCREENSHOT_QUALITY = 62 HISTORY_SCREENSHOT_ACTION_DENYLIST = {"close", "close_all"} async def get_runtime(context_id: str, create: bool = True, agent: Any | None = None): if agent is not None: return await get_tool_runtime(agent) from plugins._browser.helpers.runtime import get_runtime as get_container_runtime return await get_container_runtime(context_id, create=create) class Browser(Tool): async def execute( self, action: str = "", browser_id: int | str | None = None, url: str = "", ref: int | str | None = None, target_ref: int | str | None = None, text: str = "", selector: str = "", selectors: list[str] | None = None, script: str = "", modifiers: list[str] | str | None = None, keys: list[str] | None = None, key: str = "", include_content: bool = False, focus_popup: bool | None = None, event_type: str = "", x: float = 0.0, y: float = 0.0, to_x: float = 0.0, to_y: float = 0.0, offset_x: float = 0.0, offset_y: float = 0.0, target_offset_x: float = 0.0, target_offset_y: float = 0.0, delta_x: float = 0.0, delta_y: float = 0.0, button: str = "left", quality: int = 80, full_page: bool = False, path: str = "", paths: list[str] | None = None, value: str = "", values: list[str] | None = None, checked: bool | None = None, width: int = 0, height: int = 0, calls: list[dict[str, Any]] | None = None, **kwargs: Any, ) -> Response: method_action = str(self.method or "").strip().lower().replace("-", "_") requested_action = str(action or "").strip().lower().replace("-", "_") clipboard_action = "" if method_action == "clipboard" and requested_action in {"copy", "cut", "paste"}: clipboard_action = requested_action action = "clipboard" else: action = str(action or self.method or "state").strip().lower().replace("-", "_") try: runtime = await get_runtime(self.agent.context.id, agent=self.agent) except Exception as exc: return Response(message=f"Browser runtime unavailable: {exc}", break_loop=False) if isinstance(modifiers, str): modifiers = [modifiers] if modifiers else None elif isinstance(modifiers, list) and not modifiers: modifiers = None keys = self._normalize_keys(keys) try: if action == "open": result = await runtime.call("open", url or "") elif action == "screenshot": if not path: path = self._history_screenshot_path(action) result = await runtime.call( "screenshot_file", browser_id, quality=quality, full_page=full_page, path=path, ) elif action == "list": result = await runtime.call("list", include_content=bool(include_content)) elif action == "state": result = await runtime.call("state", browser_id) elif action in {"set_active", "setactive", "activate", "focus"}: result = await runtime.call("set_active", browser_id) elif action == "navigate": result = await runtime.call("navigate", browser_id, url) elif action == "back": result = await runtime.call("back", browser_id) elif action == "forward": result = await runtime.call("forward", browser_id) elif action == "reload": result = await runtime.call("reload", browser_id) elif action == "content": payload = self._selector_payload(selector, selectors) result = await runtime.call("content", browser_id, payload) elif action == "detail": result = await runtime.call( "detail", browser_id, await self._resolve_ref(runtime, browser_id, ref, selector, action), ) elif action == "click": resolved_ref = await self._resolve_ref( runtime, browser_id, ref, selector, action, required=not self._has_coordinates(x, y), ) if resolved_ref is None and self._has_coordinates(x, y): result = await runtime.call( "mouse", browser_id, "click", x, y, button=button or "left", modifiers=modifiers, ) elif modifiers: result = await runtime.call( "click", browser_id, resolved_ref, modifiers=modifiers, focus_popup=focus_popup, ) else: result = await runtime.call("click", browser_id, resolved_ref) elif action == "type": resolved_ref = await self._resolve_ref( runtime, browser_id, ref, selector, action, required=False, ) if resolved_ref is None: result = await runtime.call("keyboard", browser_id, key="", text=text) else: result = await runtime.call("type", browser_id, resolved_ref, text) elif action == "submit": result = await runtime.call( "submit", browser_id, await self._resolve_ref(runtime, browser_id, ref, selector, action), ) elif action in {"type_submit", "typesubmit"}: result = await runtime.call( "type_submit", browser_id, await self._resolve_ref(runtime, browser_id, ref, selector, action), text, ) elif action == "scroll": result = await runtime.call( "scroll", browser_id, await self._resolve_ref(runtime, browser_id, ref, selector, action), ) elif action == "evaluate": result = await runtime.call("evaluate", browser_id, script) elif action in {"key_chord", "keychord"}: if not keys: raise ValueError("key_chord requires non-empty 'keys' list") result = await runtime.call("key_chord", browser_id, keys) elif action == "hover": result = await runtime.call( "hover", browser_id, ref=ref, x=x, y=y, offset_x=offset_x, offset_y=offset_y, ) elif action == "double_click": result = await runtime.call( "double_click", browser_id, ref=ref, x=x, y=y, button=button or "left", modifiers=modifiers, offset_x=offset_x, offset_y=offset_y, ) elif action == "right_click": result = await runtime.call( "right_click", browser_id, ref=ref, x=x, y=y, modifiers=modifiers, offset_x=offset_x, offset_y=offset_y, ) elif action == "drag": result = await runtime.call( "drag", browser_id, ref=ref, target_ref=target_ref, x=x, y=y, to_x=to_x, to_y=to_y, offset_x=offset_x, offset_y=offset_y, target_offset_x=target_offset_x, target_offset_y=target_offset_y, ) elif action == "wheel": result = await runtime.call( "wheel", browser_id, x, y, delta_x, delta_y, ) elif action == "keyboard": result = await runtime.call( "keyboard", browser_id, key=key, text=text, ) elif action == "clipboard": normalized_clipboard_action = clipboard_action or str( kwargs.get("clipboard_action") or kwargs.get("operation") or event_type or "" ).strip().lower() result = await runtime.call( "clipboard", browser_id, action=normalized_clipboard_action, text=text, ) elif action in {"copy", "cut", "paste"}: result = await runtime.call( "clipboard", browser_id, action=action, text=text, ) elif action == "set_viewport": result = await runtime.call("set_viewport", browser_id, width, height) elif action == "select_option": result = await runtime.call( "select_option", browser_id, await self._resolve_ref(runtime, browser_id, ref, selector, action), value=value, values=values, ) elif action == "set_checked": result = await runtime.call( "set_checked", browser_id, await self._resolve_ref(runtime, browser_id, ref, selector, action), checked=True if checked is None else bool(checked), ) elif action == "upload_file": result = await runtime.call( "upload_file", browser_id, await self._resolve_ref(runtime, browser_id, ref, selector, action), path=path, paths=paths, ) elif action == "mouse": result = await runtime.call( "mouse", browser_id, event_type or "click", x, y, button=button or "left", modifiers=modifiers, ) elif action == "multi": if not calls: raise ValueError("multi requires non-empty 'calls' list") result = await runtime.call("multi", list(calls)) elif action == "close": result = await runtime.call("close_browser", browser_id) elif action == "close_all": result = await runtime.call("close_all_browsers") else: return Response( message=f"Unknown browser action: {action}", break_loop=False, ) await self._record_history_screenshot(runtime, action, result, browser_id) except Exception as exc: return Response(message=f"Browser {action} failed: {exc}", break_loop=False) return Response(message=self._format_result(action, result), break_loop=False) def get_log_object(self): return self.agent.context.log.log( type="tool", heading=f"icon://captive_portal {self.agent.agent_name}: Using browser", content="", kvps=self.args, _tool_name=self.name, ) @staticmethod def _require_ref(ref: int | str | None) -> int | str: if ref is None or str(ref).strip() == "": raise ValueError("ref is required for this browser action") return ref @staticmethod def _has_ref(ref: int | str | None) -> bool: return ref is not None and str(ref).strip() != "" @staticmethod def _has_coordinates(x: float, y: float) -> bool: return bool(float(x or 0) or float(y or 0)) @classmethod async def _resolve_ref( cls, runtime: Any, browser_id: int | str | None, ref: int | str | None, selector: str = "", action: str = "action", *, required: bool = True, ) -> int | str | None: if cls._has_ref(ref): return ref selector = str(selector or "").strip() if selector: content = await runtime.call("content", browser_id, {"selector": selector}) resolved = cls._first_ref_from_content(content, selector) if resolved is not None: return resolved raise ValueError( f"{action} could not resolve selector {selector!r} to a browser ref" ) if required: return cls._require_ref(ref) return None @staticmethod def _first_ref_from_content(content: Any, selector: str = "") -> str | None: if isinstance(content, dict): values: list[Any] = [] if selector and selector in content: values.append(content.get(selector)) values.extend(value for key, value in content.items() if key != selector) text = "\n".join(str(value or "") for value in values) else: text = str(content or "") match = re.search(r"\[[^\]\n]*?\b(\d+)\]", text) return match.group(1) if match else None @staticmethod def _normalize_keys(keys: list[str] | str | None) -> list[str]: if keys is None: return [] if isinstance(keys, str): raw = re.split(r"\s*\+\s*|\s*,\s*", keys.strip()) elif isinstance(keys, list): raw = keys else: raw = [str(keys)] aliases = { "cmd": "Meta", "command": "Meta", "control": "Control", "ctrl": "Control", "escape": "Escape", "esc": "Escape", "meta": "Meta", "option": "Alt", "return": "Enter", "space": "Space", } normalized: list[str] = [] for key in raw: value = str(key or "").strip() if not value: continue normalized.append(aliases.get(value.lower(), value.upper() if len(value) == 1 and value.isalpha() else value)) return normalized @staticmethod def _selector_payload(selector: str = "", selectors: list[str] | None = None) -> dict | None: if selectors: return {"selectors": selectors} if selector: return {"selector": selector} return None async def _record_history_screenshot( self, runtime: Any, action: str, result: Any, requested_browser_id: int | str | None = None, ) -> None: if not getattr(self, "log", None): return if action in HISTORY_SCREENSHOT_ACTION_DENYLIST: return screenshot = result if action == "screenshot" and isinstance(result, dict) else None if not self._screenshot_has_path(screenshot): target_browser_id = self._browser_id_from_result(result) or requested_browser_id output_path = self._history_screenshot_path(action) if not output_path: return try: screenshot = await runtime.call( "screenshot_file", target_browser_id, quality=HISTORY_SCREENSHOT_QUALITY, full_page=False, path=output_path, ) except Exception as exc: PrintStyle.debug( "Browser history screenshot capture failed:", f"browser_id={target_browser_id}", f"quality={HISTORY_SCREENSHOT_QUALITY}", f"path={output_path}", f"error={exc}", ) return if not self._screenshot_has_path(screenshot): return local_path = str(screenshot.get("path") or files.fix_dev_path(str(screenshot.get("a0_path") or ""))) if not local_path: return uri = f"img://{local_path}&t={time.time()}" state = screenshot.get("state") if isinstance(screenshot.get("state"), dict) else {} self.log.update( Screenshot=uri, browser_snapshot={ "uri": uri, "path": local_path, "a0_path": screenshot.get("a0_path") or files.normalize_a0_path(local_path), "mime": screenshot.get("mime") or "image/jpeg", "browser_id": screenshot.get("browser_id") or state.get("id") or requested_browser_id, "context_id": screenshot.get("context_id") or state.get("context_id") or "", }, ) def _history_screenshot_path(self, action: str) -> str: if not getattr(self, "agent", None) or not getattr(self.agent, "context", None): return "" context_id = str(getattr(self.agent.context, "id", "") or "").strip() if not context_id: return "" from helpers import persist_chat token = str(getattr(getattr(self, "log", None), "id", "") or uuid.uuid4()) safe_action = files.safe_file_name(str(action or "browser")) safe_token = files.safe_file_name(token) timestamp = time.strftime("%Y%m%d-%H%M%S") return str( Path(persist_chat.get_chat_folder_path(context_id)) / "browser" / "screenshots" / f"{timestamp}-{safe_action}-{safe_token}.jpg" ) @staticmethod def _browser_id_from_result(result: Any) -> Any: if not isinstance(result, dict): return None browsers = result.get("browsers") if isinstance(result.get("browsers"), list) else [] last_interacted_id = result.get("last_interacted_browser_id") listed_browser = None if last_interacted_id is not None: listed_browser = next( ( browser for browser in browsers if isinstance(browser, dict) and str(browser.get("id")) == str(last_interacted_id) ), None, ) if listed_browser is None and browsers: listed_browser = next((browser for browser in browsers if isinstance(browser, dict)), None) state = result.get("state") if isinstance(result.get("state"), dict) else {} return ( result.get("id") or result.get("browser_id") or state.get("id") or last_interacted_id or (listed_browser or {}).get("id") ) @staticmethod def _screenshot_has_path(screenshot: Any) -> bool: return isinstance(screenshot, dict) and bool(screenshot.get("path") or screenshot.get("a0_path")) @staticmethod def _format_result(action: str, result: Any) -> str: if action == "content" and isinstance(result, dict): if set(result.keys()) == {"document"}: return str(result.get("document") or "") return json.dumps(result, indent=2, ensure_ascii=False) return json.dumps(result, indent=2, ensure_ascii=False, default=str)