remove non-partial indexes (#5226)

This commit is contained in:
Shuchang Zheng 2026-03-24 17:33:19 -07:00 committed by GitHub
parent 07a8e654df
commit 17c5492c14
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 668 additions and 10 deletions

View file

@ -0,0 +1,35 @@
"""remove non-partial indexes
Revision ID: 01dbfbf87496
Revises: 6e966003a58e
Create Date: 2026-03-25 00:21:21.764132+00:00
"""
from typing import Sequence, Union
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "01dbfbf87496"
down_revision: Union[str, None] = "6e966003a58e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_artifacts_observer_cruise_id"), table_name="artifacts")
op.drop_index(op.f("ix_artifacts_observer_thought_id"), table_name="artifacts")
op.drop_index(op.f("ix_artifacts_run_id"), table_name="artifacts")
op.drop_index(op.f("ix_artifacts_workflow_run_block_id"), table_name="artifacts")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_index(op.f("ix_artifacts_workflow_run_block_id"), "artifacts", ["workflow_run_block_id"], unique=False)
op.create_index(op.f("ix_artifacts_run_id"), "artifacts", ["run_id"], unique=False)
op.create_index(op.f("ix_artifacts_observer_thought_id"), "artifacts", ["observer_thought_id"], unique=False)
op.create_index(op.f("ix_artifacts_observer_cruise_id"), "artifacts", ["observer_cruise_id"], unique=False)
# ### end Alembic commands ###

View file

@ -1,5 +1,6 @@
from __future__ import annotations
from collections import deque
from contextlib import asynccontextmanager
from contextvars import ContextVar
from dataclasses import dataclass, field
@ -23,9 +24,14 @@ class SessionState:
browser: SkyvernBrowser | None = None
context: BrowserContext | None = None
api_key_hash: str | None = None
console_messages: list[dict[str, Any]] = field(default_factory=list)
console_messages: deque[dict[str, Any]] = field(default_factory=lambda: deque(maxlen=1000))
network_requests: deque[dict[str, Any]] = field(default_factory=lambda: deque(maxlen=1000))
dialog_events: deque[dict[str, Any]] = field(default_factory=lambda: deque(maxlen=1000))
tracing_active: bool = False
har_enabled: bool = False
_hooked_page_id: int | None = None
_hooked_raw_page: Any = None
_hooked_handlers: dict[str, Any] = field(default_factory=dict)
_current_session: ContextVar[SessionState | None] = ContextVar("mcp_session", default=None)
@ -195,6 +201,16 @@ async def get_page(
"""Get the working page from the current or specified browser session."""
browser, ctx = await resolve_browser(session_id=session_id, cdp_url=cdp_url)
page = await browser.get_working_page()
# Register inspection hooks (console, network, dialog) on the underlying
# Playwright page. Idempotent — only registers when the page object changes
# (e.g., after tab switch). Import here to avoid circular imports.
from skyvern.cli.mcp_tools.inspection import ensure_hooks_registered
state = get_current_session()
if state is not None:
ensure_hooks_registered(state, page)
return page, ctx

View file

@ -40,6 +40,11 @@ from .folder import (
skyvern_folder_list,
skyvern_folder_update,
)
from .inspection import (
skyvern_console_messages,
skyvern_handle_dialog,
skyvern_network_requests,
)
from .prompts import build_workflow, debug_automation, extract_data, qa_test
from .session import (
skyvern_browser_session_close,
@ -135,16 +140,14 @@ targeted test cases, open a browser against the dev server, and report pass/fail
- **Cloud browsers with proxies** skyvern_browser_session_create launches cloud browsers with geographic proxy support.
## When to Use Playwright Instead of Skyvern
For capabilities that Skyvern does not wrap, fall back to Playwright MCP tools. These are the ONLY cases where Playwright tools are appropriate:
- browser_console_messages reading console logs
- browser_network_requests inspecting network traffic
- browser_handle_dialog JavaScript alert/confirm/prompt dialogs
For capabilities that Skyvern does not yet wrap, fall back to Playwright MCP tools. These are the ONLY cases where Playwright tools are appropriate:
- browser_file_upload file chooser uploads
- browser_tabs managing multiple tabs
- browser_run_code raw Playwright code snippets
- browser_drag drag-and-drop
For ALL other browser interactions, use Skyvern.
Skyvern now handles console logs (skyvern_console_messages), network inspection (skyvern_network_requests), \
and dialog handling (skyvern_handle_dialog) natively. For ALL other browser interactions, use Skyvern.
## Tool Modes (precision tools)
skyvern_click, skyvern_hover, skyvern_type, skyvern_select_option, skyvern_scroll, skyvern_press_key, skyvern_wait support three modes. When unsure, use intent. For multiple actions, prefer skyvern_act.
@ -286,6 +289,11 @@ mcp.tool()(skyvern_select_option)
mcp.tool()(skyvern_press_key)
mcp.tool()(skyvern_wait)
# -- Inspection tools (console, network, dialog) --
mcp.tool()(skyvern_console_messages)
mcp.tool()(skyvern_network_requests)
mcp.tool()(skyvern_handle_dialog)
# -- Block discovery + validation (no browser needed) --
mcp.tool()(skyvern_block_schema)
mcp.tool()(skyvern_block_validate)
@ -344,6 +352,10 @@ __all__ = [
"skyvern_select_option",
"skyvern_press_key",
"skyvern_wait",
# Inspection (console, network, dialog)
"skyvern_console_messages",
"skyvern_network_requests",
"skyvern_handle_dialog",
# Block discovery + validation
"skyvern_block_schema",
"skyvern_block_validate",

View file

@ -0,0 +1,377 @@
from __future__ import annotations
import asyncio
import re
import time
from typing import Annotated, Any
import structlog
from pydantic import Field
from ._common import ErrorCode, make_error, make_result
from ._session import BrowserNotAvailableError, get_current_session, get_page, no_browser_error
# Query param keys whose values are redacted from captured URLs.
_SECRET_QUERY_PARAMS = frozenset(
{
"token",
"api_key",
"apikey",
"api-key",
"access_token",
"secret",
"password",
"key",
"x-amz-signature",
"x-amz-credential",
"x-amz-security-token",
"sig",
"signature",
"authorization",
"auth",
}
)
_STATELESS_ERROR_MSG = (
"Inspection tools are not supported in stateless HTTP mode. "
"Event buffers are not persisted across requests in this transport. "
"Use stdio transport (Claude Code, gstack) for browser inspection, "
"or use skyvern_evaluate to run JavaScript that reads console/network state directly."
)
_STATELESS_HINT = (
"Connect via stdio transport: `skyvern mcp` (default). "
"Cloud-hosted inspection support is planned — see cloud_docs/mcp-inspection/TODOS.md"
)
LOG = structlog.get_logger(__name__)
# Network entries only capture content-type and content-length inline (not a full
# headers dict), so credential headers (Authorization, Cookie, Set-Cookie) are
# never included. If headers are added later, use an allowlist approach.
def _redact_url(url: str) -> str:
"""Strip secret values from URL query parameters.
Params like ?token=xxx, ?api_key=xxx, and AWS signed URL params are replaced
with ?token=REDACTED. Non-secret params are left intact.
"""
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
parts = urlsplit(url)
if not parts.query:
return url
params = parse_qs(parts.query, keep_blank_values=True)
redacted = False
for key in params:
if key.lower() in _SECRET_QUERY_PARAMS:
params[key] = ["REDACTED"]
redacted = True
if not redacted:
return url
new_query = urlencode(params, doseq=True)
return urlunsplit((parts.scheme, parts.netloc, parts.path, new_query, parts.fragment))
def ensure_hooks_registered(state: Any, page: Any) -> None:
"""Register console/network/dialog event listeners on the Playwright page.
Idempotent: only registers when the underlying Playwright Page object changes
(tracked by id). On page switch, removes listeners from the old page to prevent
listener leaks and stale events mixing into the buffers.
"""
raw_page = page.page # SkyvernPage stores raw Playwright Page as self.page
page_id = id(raw_page)
if state._hooked_page_id == page_id:
return
# Remove listeners from the old page to prevent leaks
old_page = state._hooked_raw_page
old_handlers = state._hooked_handlers
if old_page is not None and old_handlers:
for event_name, handler in old_handlers.items():
try:
old_page.remove_listener(event_name, handler)
except Exception:
pass # Old page may already be closed
state._hooked_raw_page = None
state._hooked_handlers = {}
def _on_console(msg: Any) -> None:
try:
state.console_messages.append(
{
"level": msg.type,
"text": msg.text,
"timestamp": time.time(),
"page_url": raw_page.url,
"source_url": msg.location.get("url", "") if hasattr(msg, "location") and msg.location else "",
"line_number": msg.location.get("lineNumber", 0)
if hasattr(msg, "location") and msg.location
else 0,
}
)
except Exception:
pass # Never let a listener error crash the tool pipeline
def _on_response(response: Any) -> None:
try:
timing = 0.0
try:
timing_obj = response.request.timing
if isinstance(timing_obj, dict):
timing = timing_obj.get("responseEnd", 0)
except Exception:
pass
content_length = response.headers.get("content-length")
state.network_requests.append(
{
"url": _redact_url(response.url),
"method": response.request.method,
"status": response.status,
"content_type": response.headers.get("content-type", ""),
"timing_ms": round(timing, 1),
"response_size": int(content_length) if content_length is not None else None,
"page_url": raw_page.url,
}
)
except Exception:
pass
def _on_dialog(dialog: Any) -> None:
try:
event_record: dict[str, Any] = {
"type": dialog.type,
"message": dialog.message,
"default_value": dialog.default_value if hasattr(dialog, "default_value") else None,
"action_taken": "dismiss_pending",
"timestamp": time.time(),
"page_url": raw_page.url,
}
state.dialog_events.append(event_record)
# Auto-dismiss to match Playwright defaults and prevent page lockup.
# dialog.dismiss() is async — schedule it and track the outcome.
task = asyncio.create_task(dialog.dismiss())
task.add_done_callback(lambda t: _dismiss_done(t, event_record))
except Exception:
pass
def _dismiss_done(task: asyncio.Task[None], event_record: dict[str, Any]) -> None:
if task.cancelled():
event_record["action_taken"] = "dismiss_cancelled"
elif task.exception() is not None:
event_record["action_taken"] = "dismiss_failed"
LOG.warning("Dialog dismiss failed", error=str(task.exception()))
else:
event_record["action_taken"] = "dismissed"
raw_page.on("console", _on_console)
raw_page.on("response", _on_response)
raw_page.on("dialog", _on_dialog)
state._hooked_page_id = page_id
state._hooked_raw_page = raw_page
state._hooked_handlers = {"console": _on_console, "response": _on_response, "dialog": _on_dialog}
async def skyvern_console_messages(
session_id: Annotated[str | None, Field(description="Browser session ID (pbs_...)")] = None,
cdp_url: Annotated[str | None, Field(description="CDP WebSocket URL")] = None,
level: Annotated[
str | None,
Field(description="Filter by level: log, info, warning, error, debug. Omit for all."),
] = None,
text: Annotated[
str | None,
Field(description="Filter by substring match in message text. Case-insensitive."),
] = None,
clear: Annotated[
bool,
Field(description="Clear the buffer after reading. Default false."),
] = False,
) -> dict[str, Any]:
"""Read console log messages from the browser. Captures console.log, console.error, console.warn, etc.
Messages are buffered automatically call this anytime to see what the page has logged.
Use level='error' to find JavaScript errors. Use text='...' to search for specific messages.
"""
# Inline import: session_manager → inspection (ensure_hooks_registered) creates a
# circular import if these are at module level. See session_manager.py:get_page().
from skyvern.cli.core.session_manager import is_stateless_http_mode
if is_stateless_http_mode():
return make_result(
"skyvern_console_messages",
ok=False,
error=make_error(ErrorCode.ACTION_FAILED, _STATELESS_ERROR_MSG, _STATELESS_HINT),
)
try:
page, ctx = await get_page(session_id=session_id, cdp_url=cdp_url)
except BrowserNotAvailableError:
return make_result("skyvern_console_messages", ok=False, error=no_browser_error())
state = get_current_session()
has_filter = level is not None or text is not None
entries = list(state.console_messages)
if level:
entries = [e for e in entries if e.get("level") == level]
if text:
text_lower = text.lower()
entries = [e for e in entries if text_lower in e.get("text", "").lower()]
if clear:
if has_filter:
# Only remove matched entries — keep unmatched ones in the buffer
matched = {id(e) for e in entries}
state.console_messages = type(state.console_messages)(
(e for e in state.console_messages if id(e) not in matched),
maxlen=state.console_messages.maxlen,
)
else:
state.console_messages.clear()
return make_result(
"skyvern_console_messages",
browser_context=ctx,
data={
"messages": entries,
"count": len(entries),
"buffer_size": len(state.console_messages),
},
)
async def skyvern_network_requests(
session_id: Annotated[str | None, Field(description="Browser session ID (pbs_...)")] = None,
cdp_url: Annotated[str | None, Field(description="CDP WebSocket URL")] = None,
url_pattern: Annotated[
str | None,
Field(description="Filter by URL regex pattern. Example: 'api/v1' or '\\.json$'"),
] = None,
status_code: Annotated[
int | None,
Field(description="Filter by exact HTTP status code. Example: 404"),
] = None,
method: Annotated[
str | None,
Field(description="Filter by HTTP method: GET, POST, PUT, DELETE, etc."),
] = None,
clear: Annotated[
bool,
Field(description="Clear the buffer after reading. Default false."),
] = False,
) -> dict[str, Any]:
"""Read network requests/responses from the browser. Captures all HTTP traffic the page generates.
Each entry includes: url, method, status, content_type, timing_ms, response_size, and page_url.
No response headers dict or response bodies are captured credential headers (Authorization,
Cookie, Set-Cookie) are never exposed. Use skyvern_evaluate with fetch() if you need body content.
"""
# Inline import: session_manager → inspection (ensure_hooks_registered) creates a
# circular import if these are at module level. See session_manager.py:get_page().
from skyvern.cli.core.session_manager import is_stateless_http_mode
if is_stateless_http_mode():
return make_result(
"skyvern_network_requests",
ok=False,
error=make_error(ErrorCode.ACTION_FAILED, _STATELESS_ERROR_MSG, _STATELESS_HINT),
)
try:
page, ctx = await get_page(session_id=session_id, cdp_url=cdp_url)
except BrowserNotAvailableError:
return make_result("skyvern_network_requests", ok=False, error=no_browser_error())
state = get_current_session()
has_filter = url_pattern is not None or status_code is not None or method is not None
entries = list(state.network_requests)
if url_pattern:
try:
pattern = re.compile(url_pattern)
entries = [e for e in entries if pattern.search(e.get("url", ""))]
except re.error:
return make_result(
"skyvern_network_requests",
ok=False,
browser_context=ctx,
error=make_error(
ErrorCode.INVALID_INPUT,
f"Invalid regex pattern: {url_pattern}",
"Provide a valid Python regex pattern",
),
)
if status_code is not None:
entries = [e for e in entries if e.get("status") == status_code]
if method:
method_upper = method.upper()
entries = [e for e in entries if e.get("method") == method_upper]
if clear:
if has_filter:
matched = {id(e) for e in entries}
state.network_requests = type(state.network_requests)(
(e for e in state.network_requests if id(e) not in matched),
maxlen=state.network_requests.maxlen,
)
else:
state.network_requests.clear()
return make_result(
"skyvern_network_requests",
browser_context=ctx,
data={
"requests": entries,
"count": len(entries),
"buffer_size": len(state.network_requests),
},
)
async def skyvern_handle_dialog(
session_id: Annotated[str | None, Field(description="Browser session ID (pbs_...)")] = None,
cdp_url: Annotated[str | None, Field(description="CDP WebSocket URL")] = None,
clear: Annotated[
bool,
Field(description="Clear the dialog history after reading. Default false."),
] = False,
) -> dict[str, Any]:
"""Read the history of JavaScript dialogs (alert, confirm, prompt) that appeared on the page.
Dialogs are automatically dismissed by default to prevent page lockup.
This tool lets you see what dialogs appeared and what action was taken.
"""
# Inline import: session_manager → inspection (ensure_hooks_registered) creates a
# circular import if these are at module level. See session_manager.py:get_page().
from skyvern.cli.core.session_manager import is_stateless_http_mode
if is_stateless_http_mode():
return make_result(
"skyvern_handle_dialog",
ok=False,
error=make_error(ErrorCode.ACTION_FAILED, _STATELESS_ERROR_MSG, _STATELESS_HINT),
)
try:
page, ctx = await get_page(session_id=session_id, cdp_url=cdp_url)
except BrowserNotAvailableError:
return make_result("skyvern_handle_dialog", ok=False, error=no_browser_error())
state = get_current_session()
entries = list(state.dialog_events)
if clear:
state.dialog_events.clear()
return make_result(
"skyvern_handle_dialog",
browser_context=ctx,
data={
"dialogs": entries,
"count": len(entries),
},
)

View file

@ -232,16 +232,16 @@ class ArtifactModel(Base):
artifact_id = Column(String, primary_key=True, default=generate_artifact_id)
organization_id = Column(String, ForeignKey("organizations.organization_id"))
workflow_run_id = Column(String, index=True)
workflow_run_block_id = Column(String, index=True)
observer_cruise_id = Column(String, index=True)
observer_thought_id = Column(String, index=True)
workflow_run_block_id = Column(String)
observer_cruise_id = Column(String)
observer_thought_id = Column(String)
ai_suggestion_id = Column(String)
task_id = Column(String)
step_id = Column(String, index=True)
artifact_type = Column(String)
uri = Column(String)
bundle_key = Column(String, nullable=True)
run_id = Column(String, nullable=True, index=True)
run_id = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
modified_at = Column(
DateTime,

View file

@ -0,0 +1,218 @@
from __future__ import annotations
from collections import deque
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from skyvern.cli.core.result import BrowserContext
from skyvern.cli.core.session_manager import SessionState
from skyvern.cli.mcp_tools.inspection import (
_redact_url,
ensure_hooks_registered,
skyvern_console_messages,
skyvern_handle_dialog,
skyvern_network_requests,
)
def _make_state() -> SessionState:
return SessionState(
console_messages=deque(maxlen=1000),
network_requests=deque(maxlen=1000),
dialog_events=deque(maxlen=1000),
)
def _make_page(raw: MagicMock | None = None) -> SimpleNamespace:
if raw is None:
raw = MagicMock()
raw.on = MagicMock()
return SimpleNamespace(page=raw)
def _patch(monkeypatch: pytest.MonkeyPatch, state: SessionState) -> None:
raw = MagicMock()
raw.on = MagicMock()
async def fake_get_page(**kwargs):
return _make_page(raw), BrowserContext(mode="local")
monkeypatch.setattr("skyvern.cli.mcp_tools.inspection.get_page", fake_get_page)
monkeypatch.setattr("skyvern.cli.mcp_tools.inspection.get_current_session", lambda: state)
def _console_entry(level: str = "log", text: str = "msg") -> dict:
return {"level": level, "text": text, "timestamp": 1.0, "source_url": "", "page_url": "", "line_number": 0}
def _network_entry(url: str = "https://a.com", method: str = "GET", status: int = 200) -> dict:
return {"url": url, "method": method, "status": status, "content_type": "", "timing_ms": 0, "response_size": 0}
# --- Hook registration ---
class TestEnsureHooks:
def test_registers_three_listeners(self) -> None:
state = _make_state()
raw = MagicMock()
raw.on = MagicMock()
ensure_hooks_registered(state, _make_page(raw))
assert raw.on.call_count == 3
assert {c.args[0] for c in raw.on.call_args_list} == {"console", "response", "dialog"}
def test_idempotent(self) -> None:
state = _make_state()
raw = MagicMock()
raw.on = MagicMock()
page = _make_page(raw)
ensure_hooks_registered(state, page)
ensure_hooks_registered(state, page)
assert raw.on.call_count == 3
def test_removes_old_listeners_on_switch(self) -> None:
state = _make_state()
raw1 = MagicMock()
raw1.on = MagicMock()
raw1.remove_listener = MagicMock()
raw2 = MagicMock()
raw2.on = MagicMock()
ensure_hooks_registered(state, _make_page(raw1))
ensure_hooks_registered(state, _make_page(raw2))
assert raw1.remove_listener.call_count == 3
assert raw2.on.call_count == 3
# --- Console messages ---
class TestConsoleMessages:
@pytest.mark.asyncio
async def test_returns_and_filters(self, monkeypatch: pytest.MonkeyPatch) -> None:
state = _make_state()
state.console_messages.append(_console_entry("log", "hello"))
state.console_messages.append(_console_entry("error", "fail"))
_patch(monkeypatch, state)
all_result = await skyvern_console_messages()
assert all_result["data"]["count"] == 2
by_level = await skyvern_console_messages(level="error")
assert by_level["data"]["count"] == 1
by_text = await skyvern_console_messages(text="hel")
assert by_text["data"]["count"] == 1
@pytest.mark.asyncio
async def test_clear_with_filter_preserves_unmatched(self, monkeypatch: pytest.MonkeyPatch) -> None:
state = _make_state()
state.console_messages.append(_console_entry("error", "fail"))
state.console_messages.append(_console_entry("log", "keep"))
_patch(monkeypatch, state)
result = await skyvern_console_messages(level="error", clear=True)
assert result["data"]["count"] == 1
assert len(state.console_messages) == 1
assert state.console_messages[0]["text"] == "keep"
@pytest.mark.asyncio
async def test_no_browser(self, monkeypatch: pytest.MonkeyPatch) -> None:
from skyvern.cli.core.session_manager import BrowserNotAvailableError
async def raise_err(**kw):
raise BrowserNotAvailableError()
monkeypatch.setattr("skyvern.cli.mcp_tools.inspection.get_page", raise_err)
result = await skyvern_console_messages()
assert result["ok"] is False
# --- Network requests ---
class TestNetworkRequests:
@pytest.mark.asyncio
async def test_returns_and_filters(self, monkeypatch: pytest.MonkeyPatch) -> None:
state = _make_state()
state.network_requests.append(_network_entry("https://api.com/v1", "GET", 200))
state.network_requests.append(_network_entry("https://cdn.com/img.png", "POST", 404))
_patch(monkeypatch, state)
by_url = await skyvern_network_requests(url_pattern="api")
assert by_url["data"]["count"] == 1
by_status = await skyvern_network_requests(status_code=404)
assert by_status["data"]["count"] == 1
by_method = await skyvern_network_requests(method="post")
assert by_method["data"]["count"] == 1
@pytest.mark.asyncio
async def test_invalid_regex(self, monkeypatch: pytest.MonkeyPatch) -> None:
_patch(monkeypatch, _make_state())
result = await skyvern_network_requests(url_pattern="[invalid")
assert result["ok"] is False
assert result["error"]["code"] == "INVALID_INPUT"
# --- Dialog ---
class TestDialog:
@pytest.mark.asyncio
async def test_returns_history(self, monkeypatch: pytest.MonkeyPatch) -> None:
state = _make_state()
state.dialog_events.append(
{"type": "alert", "message": "Hi", "default_value": None, "action_taken": "dismissed", "timestamp": 1.0}
)
_patch(monkeypatch, state)
result = await skyvern_handle_dialog()
assert result["data"]["count"] == 1
# --- URL redaction ---
@pytest.mark.parametrize(
"url,expected_missing",
[
("https://a.com/path", None), # no params — unchanged
("https://a.com?q=hello", None), # safe param — unchanged
("https://a.com?token=secret123", "secret123"),
("https://s3.aws.com/obj?X-Amz-Signature=abc", "abc"),
("https://a.com?api_key=my-key&page=1", "my-key"),
],
)
def test_redact_url(url: str, expected_missing: str | None) -> None:
result = _redact_url(url)
if expected_missing is None:
assert result == url
else:
assert expected_missing not in result
# --- Stateless HTTP error ---
class TestStatelessError:
@pytest.mark.asyncio
async def test_all_tools_error_in_stateless_mode(self, monkeypatch: pytest.MonkeyPatch) -> None:
_patch(monkeypatch, _make_state())
monkeypatch.setattr("skyvern.cli.core.session_manager._stateless_http_mode", True)
for tool in (skyvern_console_messages, skyvern_network_requests, skyvern_handle_dialog):
result = await tool()
assert result["ok"] is False, f"{tool.__name__} should error in stateless mode"
# --- Deque eviction ---
def test_deque_evicts_oldest() -> None:
state = _make_state()
for i in range(1001):
state.console_messages.append(_console_entry(text=f"msg-{i}"))
assert len(state.console_messages) == 1000
assert state.console_messages[0]["text"] == "msg-1"