Survey agent persona (#8)

* Add human-in-the-loop survey helper

Co-authored-by: nic <nicsins@users.noreply.github.com>

* Make survey helper launcher robust and add CLI fallback

Co-authored-by: nic <nicsins@users.noreply.github.com>

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: nic <nicsins@users.noreply.github.com>
This commit is contained in:
nic 2026-03-17 01:55:24 -05:00 committed by GitHub
parent a0ae24d36b
commit 6a248a57cc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 978 additions and 0 deletions

View file

@ -0,0 +1,9 @@
"""
Survey assistant helpers.
This package intentionally focuses on:
- Extracting survey/form questions from rendered HTML
- Suggesting answers from a user-provided profile
- Keeping the human in control (no auto-submission)
"""

View file

@ -0,0 +1,90 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from pathlib import Path
from playwright.async_api import async_playwright
@dataclass
class RenderResult:
url: str
final_url: str
title: str
html: str
screenshot_path: str | None = None
async def _render_url_async(
url: str,
*,
screenshot_path: str | None = None,
viewport_width: int = 1200,
viewport_height: int = 900,
timeout_ms: int = 30000,
) -> RenderResult:
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True, args=["--disable-http2"])
context = await browser.new_context(
viewport={"width": viewport_width, "height": viewport_height},
user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125 Safari/537.36",
)
page = await context.new_page()
await page.goto(url, wait_until="networkidle", timeout=timeout_ms)
title = (await page.title()) or ""
final_url = page.url
html = await page.content()
if screenshot_path:
p = Path(screenshot_path)
p.parent.mkdir(parents=True, exist_ok=True)
await page.screenshot(path=str(p), full_page=False)
await context.close()
await browser.close()
return RenderResult(
url=url,
final_url=final_url,
title=title,
html=html,
screenshot_path=screenshot_path,
)
async def render_url_async(
url: str,
*,
screenshot_path: str | None = None,
viewport_width: int = 1200,
viewport_height: int = 900,
timeout_ms: int = 30000,
) -> RenderResult:
return await _render_url_async(
url,
screenshot_path=screenshot_path,
viewport_width=viewport_width,
viewport_height=viewport_height,
timeout_ms=timeout_ms,
)
def render_url(
url: str,
*,
screenshot_path: str | None = None,
viewport_width: int = 1200,
viewport_height: int = 900,
timeout_ms: int = 30000,
) -> RenderResult:
"""
Synchronous wrapper for GUI/CLI use.
"""
return asyncio.run(
_render_url_async(
url,
screenshot_path=screenshot_path,
viewport_width=viewport_width,
viewport_height=viewport_height,
timeout_ms=timeout_ms,
)
)

View file

@ -0,0 +1,191 @@
from __future__ import annotations
from dataclasses import dataclass, asdict, field
from typing import Any, Iterable
from bs4 import BeautifulSoup, Tag
@dataclass
class ExtractedField:
kind: str # input|textarea|select
input_type: str | None = None # text|radio|checkbox|...
name: str | None = None
id: str | None = None
label: str | None = None
required: bool = False
options: list[dict[str, Any]] = field(default_factory=list) # [{label,value}]
def to_dict(self) -> dict[str, Any]:
return asdict(self)
def _text(el: Tag | None) -> str:
if not el:
return ""
return " ".join(el.get_text(" ", strip=True).split())
def _first_non_empty(*vals: str | None) -> str | None:
for v in vals:
if v is None:
continue
vv = " ".join(str(v).strip().split())
if vv:
return vv
return None
def _is_hidden_input(tag: Tag) -> bool:
if tag.name != "input":
return False
t = (tag.get("type") or "").lower()
return t == "hidden"
def _is_ignorable_control(tag: Tag) -> bool:
if tag.name == "input":
t = (tag.get("type") or "text").lower()
return t in {"submit", "button", "reset", "image", "file", "hidden"}
if tag.name in {"button"}:
return True
return False
def _infer_label(soup: BeautifulSoup, field_tag: Tag) -> str | None:
# 1) explicit <label for="...">
field_id = field_tag.get("id")
if field_id:
lbl = soup.find("label", attrs={"for": field_id})
if isinstance(lbl, Tag):
txt = _text(lbl)
if txt:
return txt
# 2) enclosing label
enclosing = field_tag.find_parent("label")
if isinstance(enclosing, Tag):
txt = _text(enclosing)
if txt:
return txt
# 3) aria-label / placeholder / name
return _first_non_empty(
field_tag.get("aria-label"),
field_tag.get("placeholder"),
field_tag.get("name"),
)
def _iter_controls(soup: BeautifulSoup) -> Iterable[Tag]:
for tag in soup.find_all(["input", "textarea", "select"]):
if not isinstance(tag, Tag):
continue
if _is_ignorable_control(tag):
continue
# Skip invisible elements
if tag.has_attr("hidden") or (tag.get("aria-hidden") == "true"):
continue
if _is_hidden_input(tag):
continue
yield tag
def extract_form_fields(html: str, *, max_fields: int = 200) -> list[ExtractedField]:
"""
Extract likely survey/form fields from HTML into a normalized structure.
This is intentionally "best-effort" and works across many survey builders.
"""
soup = BeautifulSoup(html or "", "html.parser")
# First pass: gather raw controls in order
controls: list[Tag] = []
for tag in _iter_controls(soup):
controls.append(tag)
if len(controls) >= max_fields:
break
# Group radio/checkbox by name so they show up as one question
seen_group_names: set[str] = set()
out: list[ExtractedField] = []
for c in controls:
kind = c.name
name = c.get("name")
cid = c.get("id")
required = bool(c.has_attr("required") or (c.get("aria-required") == "true"))
if kind == "input":
input_type = (c.get("type") or "text").lower()
else:
input_type = None
if kind == "input" and input_type in {"radio", "checkbox"} and name:
if name in seen_group_names:
continue
seen_group_names.add(name)
group = soup.find_all("input", attrs={"name": name})
options: list[dict[str, Any]] = []
for opt in group:
if not isinstance(opt, Tag):
continue
if _is_ignorable_control(opt) or _is_hidden_input(opt):
continue
opt_id = opt.get("id")
opt_label = None
if opt_id:
lbl = soup.find("label", attrs={"for": opt_id})
if isinstance(lbl, Tag):
opt_label = _text(lbl) or None
opt_label = _first_non_empty(opt_label, opt.get("value"), opt_id)
options.append(
{
"label": opt_label,
"value": opt.get("value"),
}
)
out.append(
ExtractedField(
kind="input",
input_type=input_type,
name=name,
id=cid,
label=_infer_label(soup, c),
required=required,
options=options,
)
)
continue
if kind == "select":
options = []
for opt in c.find_all("option"):
if not isinstance(opt, Tag):
continue
options.append({"label": _text(opt) or opt.get("value"), "value": opt.get("value")})
out.append(
ExtractedField(
kind="select",
input_type=None,
name=name,
id=cid,
label=_infer_label(soup, c),
required=required,
options=options,
)
)
continue
out.append(
ExtractedField(
kind=str(kind),
input_type=input_type,
name=name,
id=cid,
label=_infer_label(soup, c),
required=required,
)
)
return out

View file

@ -0,0 +1,66 @@
from __future__ import annotations
import json
from urllib import request as url_request
from urllib import error as url_error
def _post_json(url: str, payload: dict, headers: dict[str, str] | None = None, method: str = "POST") -> dict:
data = None if method == "GET" else json.dumps(payload).encode("utf-8")
req_headers = {"Content-Type": "application/json"}
if headers:
req_headers.update(headers)
req = url_request.Request(url, data=data, headers=req_headers, method=method)
try:
with url_request.urlopen(req, timeout=60) as resp:
body = resp.read().decode("utf-8")
except url_error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code}: {detail}") from exc
except url_error.URLError as exc:
raise RuntimeError(f"Connection error: {exc}") from exc
return json.loads(body) if body else {}
def ollama_available(base_url: str = "http://localhost:11434") -> bool:
try:
_post_json(f"{base_url}/api/tags", payload={}, method="GET")
return True
except Exception:
return False
def suggest_answers_with_ollama(
*,
model: str,
questions_text: str,
profile_json: str,
base_url: str = "http://localhost:11434",
) -> str:
"""
Returns a plain-text set of suggestions.
Safety intent: do not fabricate user details; answer UNKNOWN if the profile doesn't support it.
"""
system = (
"You are a survey helper. Help the user answer honestly and consistently.\n"
"- Do NOT invent personal facts.\n"
"- If a question requires information not present in the profile, answer: UNKNOWN.\n"
"- Keep answers concise.\n"
)
prompt = (
f"{system}\n"
f"USER_PROFILE_JSON:\n{profile_json}\n\n"
f"SURVEY_QUESTIONS:\n{questions_text}\n\n"
"Return suggested answers in a readable bullet list."
)
payload = {
"model": model,
"messages": [
{"role": "user", "content": prompt},
],
"stream": False,
}
resp = _post_json(f"{base_url}/api/chat", payload=payload)
msg = resp.get("message", {}) if isinstance(resp, dict) else {}
return str(msg.get("content", "")).strip()

View file

@ -0,0 +1,68 @@
from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
DEFAULT_PROFILE_PATH = Path("memory") / "survey_profile.json"
@dataclass
class SurveyProfile:
"""
A user-owned profile used to answer surveys honestly and consistently.
This is not a "made-up persona" generator. It's meant to store real, user-approved facts
and preferences that can be reused across surveys.
"""
demographics: dict[str, Any] = field(default_factory=dict)
preferences: dict[str, Any] = field(default_factory=dict)
writing_style: dict[str, Any] = field(default_factory=dict)
misc: dict[str, Any] = field(default_factory=dict)
@classmethod
def load(cls, path: str | Path = DEFAULT_PROFILE_PATH) -> "SurveyProfile":
p = Path(path)
if not p.exists():
return cls()
data = json.loads(p.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return cls()
return cls(
demographics=dict(data.get("demographics") or {}),
preferences=dict(data.get("preferences") or {}),
writing_style=dict(data.get("writing_style") or {}),
misc=dict(data.get("misc") or {}),
)
def save(self, path: str | Path = DEFAULT_PROFILE_PATH) -> None:
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
payload = {
"demographics": self.demographics,
"preferences": self.preferences,
"writing_style": self.writing_style,
"misc": self.misc,
}
p.write_text(json.dumps(payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
def as_dict(self) -> dict[str, Any]:
return {
"demographics": self.demographics,
"preferences": self.preferences,
"writing_style": self.writing_style,
"misc": self.misc,
}
def merge_updates(self, updates: dict[str, Any]) -> None:
"""
Shallow-merge updates into the profile. Intended for user-approved updates.
"""
for key in ("demographics", "preferences", "writing_style", "misc"):
val = updates.get(key)
if isinstance(val, dict):
getattr(self, key).update(val)

View file

@ -0,0 +1,76 @@
import json
from python.helpers.tool import Tool, Response
from python.survey_assistant.browser_render import render_url_async
from python.survey_assistant.extract import extract_form_fields
from python.survey_assistant.profile import SurveyProfile
from python.survey_assistant.llm import ollama_available, suggest_answers_with_ollama
class SurveyHelper(Tool):
"""
Extract survey/form questions and optionally suggest answers from a saved user profile.
Safety: this tool does not fill or submit forms. It only extracts and suggests.
"""
async def execute(
self,
url: str = "",
html: str = "",
include_suggestions: bool = False,
ollama_model: str = "llama3",
**kwargs,
) -> Response:
if not url and not html:
return Response(
message="Error: Provide either 'url' or 'html'.",
break_loop=False,
)
page_title = ""
final_url = url
if url:
rr = await render_url_async(url, screenshot_path=None)
html = rr.html
page_title = rr.title
final_url = rr.final_url
fields = extract_form_fields(html or "")
payload = {
"url": final_url,
"title": page_title,
"field_count": len(fields),
"fields": [f.to_dict() for f in fields],
}
if include_suggestions:
profile = SurveyProfile.load()
questions_lines = []
for i, f in enumerate(fields, start=1):
label = f.label or f.name or f.id or "(unlabeled)"
t = f.input_type or f.kind
req = " (required)" if f.required else ""
questions_lines.append(f"{i}. {label}{t}{req}")
if f.options:
for opt in f.options[:30]:
questions_lines.append(f" - {opt.get('label')}")
if len(f.options) > 30:
questions_lines.append(" - ...")
if ollama_available():
try:
payload["suggestions"] = suggest_answers_with_ollama(
model=ollama_model,
questions_text="\n".join(questions_lines),
profile_json=json.dumps(profile.as_dict(), indent=2, ensure_ascii=False),
)
except Exception as exc:
payload["suggestions_error"] = str(exc)
else:
payload["suggestions_error"] = (
"Ollama not available at http://localhost:11434"
)
return Response(message=json.dumps(payload, indent=2, ensure_ascii=False), break_loop=False)