free-claude-code/api/web_tools/parsers.py
Alishahryar1 f3a7528d49
Some checks are pending
CI / checks (push) Waiting to run
Major refactor: API, providers, messaging, and Anthropic protocol
Consolidates the incremental refactor work into a single change set: modular web tools (api/web_tools), native Anthropic request building and SSE block policy, OpenAI conversion and error handling, provider transports and rate limiting, messaging handler and tree queue, safe logging, smoke tests, and broad test coverage.
2026-04-26 03:01:14 -07:00

104 lines
3.2 KiB
Python

"""HTML parsing for web_search / web_fetch."""
from __future__ import annotations
import html
import re
from html.parser import HTMLParser
from typing import Any
from urllib.parse import parse_qs, unquote, urlparse
class SearchResultParser(HTMLParser):
"""DuckDuckGo lite HTML: extract result links and titles."""
def __init__(self) -> None:
super().__init__()
self.results: list[dict[str, str]] = []
self._href: str | None = None
self._title_parts: list[str] = []
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
if tag != "a":
return
href = dict(attrs).get("href")
if not href or "uddg=" not in href:
return
parsed = urlparse(href)
query = parse_qs(parsed.query)
uddg = query.get("uddg", [""])[0]
if not uddg:
return
self._href = unquote(uddg)
self._title_parts = []
def handle_data(self, data: str) -> None:
if self._href is not None:
self._title_parts.append(data)
def handle_endtag(self, tag: str) -> None:
if tag != "a" or self._href is None:
return
title = " ".join("".join(self._title_parts).split())
if title and not any(result["url"] == self._href for result in self.results):
self.results.append({"title": html.unescape(title), "url": self._href})
self._href = None
self._title_parts = []
class HTMLTextParser(HTMLParser):
"""Strip scripts/styles and collect visible text + title for fetch previews."""
def __init__(self) -> None:
super().__init__()
self.title = ""
self.text_parts: list[str] = []
self._in_title = False
self._skip_depth = 0
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
if tag in {"script", "style", "noscript"}:
self._skip_depth += 1
elif tag == "title":
self._in_title = True
def handle_endtag(self, tag: str) -> None:
if tag in {"script", "style", "noscript"} and self._skip_depth:
self._skip_depth -= 1
elif tag == "title":
self._in_title = False
def handle_data(self, data: str) -> None:
text = " ".join(data.split())
if not text:
return
if self._in_title:
self.title = f"{self.title} {text}".strip()
elif not self._skip_depth:
self.text_parts.append(text)
def content_text(content: Any) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict):
parts.append(str(item.get("text", "")))
else:
parts.append(str(getattr(item, "text", "")))
return "\n".join(part for part in parts if part)
return str(content)
def extract_query(text: str) -> str:
match = re.search(r"query:\s*(.+)", text, flags=re.IGNORECASE | re.DOTALL)
if match:
return match.group(1).strip().strip("\"'")
return text.strip()
def extract_url(text: str) -> str:
match = re.search(r"https?://\S+", text)
return match.group(0).rstrip(").,]") if match else text.strip()