diff --git a/skyvern/cli/commands/_state.py b/skyvern/cli/commands/_state.py index 96482c85d..a19e4448d 100644 --- a/skyvern/cli/commands/_state.py +++ b/skyvern/cli/commands/_state.py @@ -17,6 +17,9 @@ class CLIState: cdp_url: str | None = None mode: str | None = None # "cloud", "local", or "cdp" created_at: str | None = None + frame_selector: str | None = None + frame_name: str | None = None + frame_index: int | None = None def save_state(state: CLIState) -> None: diff --git a/skyvern/cli/commands/browser.py b/skyvern/cli/commands/browser.py index 5c452731a..8ad64f954 100644 --- a/skyvern/cli/commands/browser.py +++ b/skyvern/cli/commands/browser.py @@ -17,7 +17,15 @@ import typer from skyvern.cli.commands._output import console, output, output_error from skyvern.cli.commands._state import CLIState, clear_state, load_state, save_state from skyvern.cli.core.artifacts import save_artifact -from skyvern.cli.core.browser_ops import do_act, do_extract, do_navigate, do_screenshot +from skyvern.cli.core.browser_ops import ( + do_act, + do_extract, + do_frame_list, + do_frame_main, + do_frame_switch, + do_navigate, + do_screenshot, +) from skyvern.cli.core.client import get_skyvern from skyvern.cli.core.guards import ( CREDENTIAL_HINT, @@ -37,7 +45,9 @@ from skyvern.cli.mcp_tools.browser import skyvern_run_task as tool_run_task browser_app = typer.Typer(help="Browser automation commands.", no_args_is_help=True) session_app = typer.Typer(help="Manage browser sessions.", no_args_is_help=True) +frame_app = typer.Typer(help="Manage iframe context.", no_args_is_help=True) browser_app.add_typer(session_app, name="session") +browser_app.add_typer(frame_app, name="frame") @dataclass(frozen=True) @@ -96,6 +106,31 @@ def _resolve_ai_target(selector: str | None, intent: str | None, *, operation: s return ai_mode +async def _apply_cli_frame_state(page: Any) -> None: + """Re-apply saved frame state from CLIState to a fresh SkyvernBrowserPage. + + CLI commands get a new page object each invocation. If the user previously + ran ``skyvern browser frame switch``, the target frame is persisted in + CLIState and must be re-entered before executing the action. + """ + state = load_state() + if not state: + return + selector = state.frame_selector + name = state.frame_name + index = state.frame_index + if selector is None and name is None and index is None: + return + try: + await do_frame_switch(page, selector=selector, name=name, index=index) + except Exception as e: + console.print(f"[yellow]Warning: saved frame state is stale, clearing ({e})[/yellow]") + state.frame_selector = None + state.frame_name = None + state.frame_index = None + save_state(state) + + def _validate_wait_state(state: str) -> None: if state not in VALID_ELEMENT_STATES: raise GuardError(f"Invalid state: {state}", "Use visible, hidden, attached, or detached") @@ -294,6 +329,12 @@ def navigate( browser = await _connect_browser(connection) page = await browser.get_working_page() result = await do_navigate(page, url, timeout=timeout, wait_until=wait_until) + cli_state = load_state() + if cli_state: + cli_state.frame_selector = None + cli_state.frame_name = None + cli_state.frame_index = None + save_state(cli_state) return {"url": result.url, "title": result.title} try: @@ -322,6 +363,7 @@ def screenshot( connection = _resolve_connection(session, cdp) browser = await _connect_browser(connection) page = await browser.get_working_page() + await _apply_cli_frame_state(page) result = await do_screenshot(page, full_page=full_page, selector=selector) if output_path: @@ -365,6 +407,7 @@ def evaluate( connection = _resolve_connection(session, cdp) browser = await _connect_browser(connection) page = await browser.get_working_page() + await _apply_cli_frame_state(page) result = await page.evaluate(expression) return {"result": result} @@ -398,6 +441,7 @@ def click( connection = _resolve_connection(session, cdp) browser = await _connect_browser(connection) page = await browser.get_working_page() + await _apply_cli_frame_state(page) kwargs: dict[str, Any] = {"timeout": timeout} if button: @@ -443,6 +487,7 @@ def hover( connection = _resolve_connection(session, cdp) browser = await _connect_browser(connection) page = await browser.get_working_page() + await _apply_cli_frame_state(page) if ai_mode is not None: locator = page.locator(selector=selector, prompt=intent, ai=ai_mode) # type: ignore[arg-type] @@ -489,6 +534,7 @@ def type_text( connection = _resolve_connection(session, cdp) browser = await _connect_browser(connection) page = await browser.get_working_page() + await _apply_cli_frame_state(page) if selector: try: @@ -554,6 +600,7 @@ def scroll( connection = _resolve_connection(session, cdp) browser = await _connect_browser(connection) page = await browser.get_working_page() + await _apply_cli_frame_state(page) if intent: ai_mode = "fallback" if selector else "proactive" @@ -601,6 +648,7 @@ def select( connection = _resolve_connection(session, cdp) browser = await _connect_browser(connection) page = await browser.get_working_page() + await _apply_cli_frame_state(page) if ai_mode is not None: await page.select_option(selector=selector, value=value, prompt=intent, ai=ai_mode, timeout=timeout) # type: ignore[arg-type] @@ -639,6 +687,7 @@ def press_key( connection = _resolve_connection(session, cdp) browser = await _connect_browser(connection) page = await browser.get_working_page() + await _apply_cli_frame_state(page) if intent or selector: ai_mode, err = resolve_ai_mode(selector, intent) @@ -694,6 +743,7 @@ def wait( connection = _resolve_connection(session, cdp) browser = await _connect_browser(connection) page = await browser.get_working_page() + await _apply_cli_frame_state(page) waited_for = "" if time_ms is not None: @@ -751,6 +801,7 @@ def act( connection = _resolve_connection(session, cdp) browser = await _connect_browser(connection) page = await browser.get_working_page() + await _apply_cli_frame_state(page) result = await do_act(page, prompt) return {"prompt": result.prompt, "completed": result.completed} @@ -779,6 +830,7 @@ def extract( connection = _resolve_connection(session, cdp) browser = await _connect_browser(connection) page = await browser.get_working_page() + await _apply_cli_frame_state(page) result = await do_extract(page, prompt, schema=schema) return {"prompt": prompt, "extracted": result.extracted} @@ -806,6 +858,7 @@ def validate( connection = _resolve_connection(session, cdp) browser = await _connect_browser(connection) page = await browser.get_working_page() + await _apply_cli_frame_state(page) valid = await page.validate(prompt) return {"prompt": prompt, "valid": valid} @@ -1378,3 +1431,91 @@ def _print_serve_instructions_unified(result: dict[str, Any], browser_path: str) console.print("[bold]Press Ctrl+C to stop.[/bold]") console.print() + + +# --------------------------------------------------------------------------- +# Frame commands (iframe switching) +# --------------------------------------------------------------------------- + + +@frame_app.command("switch") +def frame_switch( + selector: str | None = typer.Option(None, "--selector", "-s", help="CSS selector for the iframe element."), + name: str | None = typer.Option(None, "--name", "-n", help="Frame name attribute."), + index: int | None = typer.Option(None, "--index", "-i", help="Frame index (0 = main)."), + session: str | None = typer.Option(None, help="Browser session ID."), + cdp: str | None = typer.Option(None, "--cdp", help="CDP WebSocket URL."), + json_output: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """Switch into an iframe for subsequent commands.""" + + async def _run() -> dict: + connection = _resolve_connection(session, cdp) + browser = await _connect_browser(connection) + page = await browser.get_working_page() + result = await do_frame_switch(page, selector=selector, name=name, index=index) + state = load_state() + if state: + state.frame_selector = selector + state.frame_name = name + state.frame_index = index + save_state(state) + return {"frame_name": result.name, "frame_url": result.url} + + try: + data = asyncio.run(_run()) + output(data, action="frame_switch", json_mode=json_output) + except (ValueError, GuardError) as e: + output_error(str(e), hint="Use 'skyvern browser frame list' to find frames.", json_mode=json_output) + except Exception as e: + output_error(str(e), json_mode=json_output) + + +@frame_app.command("main") +def frame_main_cmd( + session: str | None = typer.Option(None, help="Browser session ID."), + cdp: str | None = typer.Option(None, "--cdp", help="CDP WebSocket URL."), + json_output: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """Switch back to the main page frame.""" + + async def _run() -> dict: + connection = _resolve_connection(session, cdp) + browser = await _connect_browser(connection) + page = await browser.get_working_page() + do_frame_main(page) + state = load_state() + if state: + state.frame_selector = None + state.frame_name = None + state.frame_index = None + save_state(state) + return {"status": "switched_to_main_frame"} + + try: + data = asyncio.run(_run()) + output(data, action="frame_main", json_mode=json_output) + except Exception as e: + output_error(str(e), json_mode=json_output) + + +@frame_app.command("list") +def frame_list_cmd( + session: str | None = typer.Option(None, help="Browser session ID."), + cdp: str | None = typer.Option(None, "--cdp", help="CDP WebSocket URL."), + json_output: bool = typer.Option(False, "--json", help="Output as JSON."), +) -> None: + """List all frames on the current page.""" + + async def _run() -> list: + connection = _resolve_connection(session, cdp) + browser = await _connect_browser(connection) + page = await browser.get_working_page() + frames = await do_frame_list(page) + return [{"index": f.index, "name": f.name, "url": f.url, "is_main": f.is_main} for f in frames] + + try: + data = asyncio.run(_run()) + output(data, action="frame_list", json_mode=json_output) + except Exception as e: + output_error(str(e), json_mode=json_output) diff --git a/skyvern/cli/core/browser_ops.py b/skyvern/cli/core/browser_ops.py index e034bad5f..e922593f7 100644 --- a/skyvern/cli/core/browser_ops.py +++ b/skyvern/cli/core/browser_ops.py @@ -85,3 +85,49 @@ async def do_extract( parsed_schema = parse_extract_schema(schema) extracted = await page.extract(prompt=prompt, schema=parsed_schema) return ExtractResult(extracted=extracted) + + +# -- Frame operations -- + + +@dataclass +class FrameInfo: + index: int + name: str + url: str + is_main: bool + + +@dataclass +class FrameSwitchResult: + name: str | None + url: str | None + selector: str | None = None + requested_name: str | None = None + index: int | None = None + + +async def do_frame_switch( + page: Any, + *, + selector: str | None = None, + name: str | None = None, + index: int | None = None, +) -> FrameSwitchResult: + result = await page.frame_switch(selector=selector, name=name, index=index) + return FrameSwitchResult( + name=result.get("name"), + url=result.get("url"), + selector=selector, + requested_name=name, + index=index, + ) + + +def do_frame_main(page: Any) -> None: + page.frame_main() + + +async def do_frame_list(page: Any) -> list[FrameInfo]: + frames = await page.frame_list() + return [FrameInfo(index=f["index"], name=f["name"], url=f["url"], is_main=f["is_main"]) for f in frames] diff --git a/skyvern/cli/core/session_manager.py b/skyvern/cli/core/session_manager.py index 4b50d0e3b..ca234edbf 100644 --- a/skyvern/cli/core/session_manager.py +++ b/skyvern/cli/core/session_manager.py @@ -17,7 +17,7 @@ from .result import BrowserContext, ErrorCode, make_error LOG = structlog.get_logger(__name__) if TYPE_CHECKING: - from playwright.async_api import Page + from playwright.async_api import Frame, Page from skyvern.library.skyvern_browser import SkyvernBrowser from skyvern.library.skyvern_browser_page import SkyvernBrowserPage @@ -42,6 +42,8 @@ class SessionState: # -- Multi-page inspection hooks -- _hooked_page_ids: set[int] = field(default_factory=set) _hooked_handlers_map: dict[int, dict[str, Any]] = field(default_factory=dict) + # -- Iframe frame context -- + _working_frame: Frame | None = None _current_session: ContextVar[SessionState | None] = ContextVar("mcp_session", default=None) @@ -242,6 +244,20 @@ async def get_page( # Install page event listener for tab_wait_for_new (once per session) _install_page_event_listener(state, browser) + # Propagate iframe frame context from session state to the page + if state._working_frame is not None: + # Guard against stale (detached) frame references + detached = False + try: + detached = state._working_frame.is_detached() + except AttributeError: + pass # frame object doesn't support is_detached (e.g., test mocks) + if detached: + LOG.debug("Clearing detached _working_frame from session state") + state._working_frame = None + else: + page._working_frame = state._working_frame + return page, ctx diff --git a/skyvern/cli/mcp_tools/__init__.py b/skyvern/cli/mcp_tools/__init__.py index 5cff5fe9a..a4210e1f4 100644 --- a/skyvern/cli/mcp_tools/__init__.py +++ b/skyvern/cli/mcp_tools/__init__.py @@ -19,6 +19,9 @@ from .browser import ( skyvern_evaluate, skyvern_extract, skyvern_file_upload, + skyvern_frame_list, + skyvern_frame_main, + skyvern_frame_switch, skyvern_hover, skyvern_login, skyvern_navigate, @@ -383,6 +386,11 @@ mcp.tool(tags={"tab_management"}, annotations=_MUT)(skyvern_tab_switch) mcp.tool(tags={"tab_management"}, annotations=_DEST)(skyvern_tab_close) mcp.tool(tags={"tab_management"}, annotations=_RO)(skyvern_tab_wait_for_new) +# -- Frame management (iframe switching) -- +mcp.tool(tags={"browser_primitive"}, annotations=_MUT)(skyvern_frame_switch) +mcp.tool(tags={"browser_primitive"}, annotations=_MUT)(skyvern_frame_main) +mcp.tool(tags={"browser_primitive"}, annotations=_RO)(skyvern_frame_list) + # -- Inspection tools (console, network, dialog) -- mcp.tool(tags={"inspection"}, annotations=_RO)(skyvern_console_messages) mcp.tool(tags={"inspection"}, annotations=_RO)(skyvern_network_requests) @@ -461,6 +469,10 @@ __all__ = [ "skyvern_tab_switch", "skyvern_tab_close", "skyvern_tab_wait_for_new", + # Frame management (iframe switching) + "skyvern_frame_switch", + "skyvern_frame_main", + "skyvern_frame_list", # Inspection (console, network, dialog) "skyvern_console_messages", "skyvern_network_requests", diff --git a/skyvern/cli/mcp_tools/browser.py b/skyvern/cli/mcp_tools/browser.py index e5948686f..e26e9511f 100644 --- a/skyvern/cli/mcp_tools/browser.py +++ b/skyvern/cli/mcp_tools/browser.py @@ -11,7 +11,16 @@ import structlog from playwright.async_api import TimeoutError as PlaywrightTimeoutError from pydantic import Field -from skyvern.cli.core.browser_ops import do_act, do_extract, do_navigate, do_screenshot, parse_extract_schema +from skyvern.cli.core.browser_ops import ( + do_act, + do_extract, + do_frame_list, + do_frame_main, + do_frame_switch, + do_navigate, + do_screenshot, + parse_extract_schema, +) from skyvern.cli.core.guards import ( CREDENTIAL_HINT, JS_PASSWORD_PATTERN, @@ -33,7 +42,7 @@ from ._common import ( save_artifact, ) from ._localhost import is_localhost_url -from ._session import BrowserNotAvailableError, get_page, no_browser_error +from ._session import BrowserNotAvailableError, get_current_session, get_page, no_browser_error LOG = structlog.get_logger(__name__) @@ -92,6 +101,11 @@ async def skyvern_navigate( ), ) + # Any navigation attempt may destroy iframes — clear frame state upfront + # (even failed navigations can partially load and destroy existing frames) + state = get_current_session() + state._working_frame = None + with Timer() as timer: try: result = await do_navigate(page, url, timeout=timeout, wait_until=wait_until) @@ -1636,3 +1650,148 @@ async def skyvern_login( }, timing_ms=timer.timing_ms, ) + + +async def skyvern_frame_switch( + session_id: Annotated[str | None, Field(description="Browser session ID (pbs_...)")] = None, + cdp_url: Annotated[str | None, Field(description="CDP WebSocket URL")] = None, + selector: Annotated[ + str | None, + Field(description="CSS selector for the iframe element (e.g., '#payment-frame', 'iframe[name=checkout]')"), + ] = None, + name: Annotated[str | None, Field(description="Frame name attribute")] = None, + index: Annotated[ + int | None, Field(description="Frame index (0 = main). Use skyvern_frame_list to find indices") + ] = None, +) -> dict[str, Any]: + """Switch into an iframe so subsequent browser actions (click, type, extract, etc.) target elements inside it. + + Use this for embedded payment forms, embedded widgets, or any content inside an + + + + +""" + + +class _NoopAi(SkyvernPageAi): + """Stub AI that raises if called — e2e tests use direct selectors only.""" + + def __init__(self) -> None: + pass + + async def ai_click(self, **kwargs: Any) -> Any: + raise NotImplementedError("AI should not be called in e2e tests") + + async def ai_input_text(self, **kwargs: Any) -> Any: + raise NotImplementedError("AI should not be called in e2e tests") + + async def ai_act(self, prompt: str) -> Any: + raise NotImplementedError("AI should not be called in e2e tests") + + +class _TestPage(SkyvernBrowserPage): + """SkyvernBrowserPage that doesn't need a full SkyvernBrowser.""" + + def __init__(self, page: Any) -> None: + # Use SkyvernPage.__init__ directly to avoid SkyvernBrowser dependency + from skyvern.core.script_generations.skyvern_page import SkyvernPage + + SkyvernPage.__init__(self, page, _NoopAi()) + self._browser = MagicMock() + self.agent = MagicMock() + + +@pytest_asyncio.fixture +async def browser_page(): + """Launch a real headless Chromium and set up the test page.""" + async with async_playwright() as p: + browser = await p.chromium.launch(headless=True) + context = await browser.new_context() + pw_page = await context.new_page() + await pw_page.set_content(MAIN_HTML) + # Wait for iframes to load + await pw_page.wait_for_selector("#payment-frame") + await asyncio.sleep(0.3) + + page = _TestPage(pw_page) + yield page + + await context.close() + await browser.close() + + +# --------------------------------------------------------------------------- +# E2E: _locator_scope with real iframes +# --------------------------------------------------------------------------- + + +class TestLocatorScopeE2E: + @pytest.mark.asyncio + async def test_locator_scope_defaults_to_page(self, browser_page: _TestPage) -> None: + heading = await browser_page._locator_scope.locator("#main-heading").text_content() + assert heading == "Main Page" + + @pytest.mark.asyncio + async def test_locator_scope_scopes_to_frame(self, browser_page: _TestPage) -> None: + # Switch to iframe + await browser_page.frame_switch(selector="#payment-frame") + heading = await browser_page._locator_scope.locator("#iframe-heading").text_content() + assert heading == "Payment Form" + + @pytest.mark.asyncio + async def test_main_page_element_not_found_in_iframe(self, browser_page: _TestPage) -> None: + await browser_page.frame_switch(selector="#payment-frame") + count = await browser_page._locator_scope.locator("#main-heading").count() + assert count == 0 + + @pytest.mark.asyncio + async def test_iframe_element_not_found_on_main_page(self, browser_page: _TestPage) -> None: + count = await browser_page._locator_scope.locator("#card-number").count() + assert count == 0 + + +# --------------------------------------------------------------------------- +# E2E: frame_switch / frame_main / frame_list +# --------------------------------------------------------------------------- + + +class TestFrameSwitchE2E: + @pytest.mark.asyncio + async def test_switch_by_selector(self, browser_page: _TestPage) -> None: + result = await browser_page.frame_switch(selector="#payment-frame") + assert result["name"] == "payment" + assert browser_page._working_frame is not None + + @pytest.mark.asyncio + async def test_switch_by_name(self, browser_page: _TestPage) -> None: + result = await browser_page.frame_switch(name="payment") + assert result["name"] == "payment" + assert browser_page._working_frame is not None + + @pytest.mark.asyncio + async def test_switch_by_index(self, browser_page: _TestPage) -> None: + # Index 0 = main frame, 1 = payment iframe, 2 = empty iframe + result = await browser_page.frame_switch(index=1) + assert result["name"] == "payment" + + @pytest.mark.asyncio + async def test_switch_back_to_main(self, browser_page: _TestPage) -> None: + await browser_page.frame_switch(selector="#payment-frame") + assert browser_page._working_frame is not None + + browser_page.frame_main() + assert browser_page._working_frame is None + + heading = await browser_page._locator_scope.locator("#main-heading").text_content() + assert heading == "Main Page" + + @pytest.mark.asyncio + async def test_switch_to_different_iframe(self, browser_page: _TestPage) -> None: + await browser_page.frame_switch(name="payment") + heading = await browser_page._locator_scope.locator("#iframe-heading").text_content() + assert heading == "Payment Form" + + await browser_page.frame_switch(name="empty") + text = await browser_page._locator_scope.locator("p").text_content() + assert text == "Empty" + + +class TestFrameListE2E: + @pytest.mark.asyncio + async def test_lists_main_and_iframes(self, browser_page: _TestPage) -> None: + frames = await browser_page.frame_list() + assert len(frames) >= 3 # main + payment + empty + assert frames[0]["is_main"] is True + names = [f["name"] for f in frames] + assert "payment" in names + assert "empty" in names + + +# --------------------------------------------------------------------------- +# E2E: interactions INSIDE an iframe +# --------------------------------------------------------------------------- + + +class TestIframeInteractionE2E: + @pytest.mark.asyncio + async def test_fill_inside_iframe(self, browser_page: _TestPage) -> None: + await browser_page.frame_switch(selector="#payment-frame") + + locator = browser_page._locator_scope.locator("#card-number") + await locator.fill("4242424242424242") + + value = await locator.input_value() + assert value == "4242424242424242" + + @pytest.mark.asyncio + async def test_click_inside_iframe(self, browser_page: _TestPage) -> None: + await browser_page.frame_switch(selector="#payment-frame") + + await browser_page._locator_scope.locator("#pay-btn").click() + value = await browser_page._locator_scope.locator("#card-number").input_value() + assert value == "paid" + + @pytest.mark.asyncio + async def test_fill_main_page_unaffected_by_iframe(self, browser_page: _TestPage) -> None: + # Fill in iframe + await browser_page.frame_switch(selector="#payment-frame") + await browser_page._locator_scope.locator("#card-number").fill("1234") + + # Switch back and verify main page + browser_page.frame_main() + main_value = await browser_page._locator_scope.locator("#main-input").input_value() + assert main_value == "" # untouched + + @pytest.mark.asyncio + async def test_click_main_page_after_iframe(self, browser_page: _TestPage) -> None: + await browser_page.frame_switch(selector="#payment-frame") + await browser_page._locator_scope.locator("#card-number").fill("4242") + browser_page.frame_main() + + await browser_page._locator_scope.locator("#main-btn").click() + value = await browser_page._locator_scope.locator("#main-input").input_value() + assert value == "clicked" + + +# --------------------------------------------------------------------------- +# E2E: SkyvernPage interaction methods inside iframe +# --------------------------------------------------------------------------- + + +class TestSkyvernPageMethodsInIframe: + @pytest.mark.asyncio + async def test_click_method_scopes_to_iframe(self, browser_page: _TestPage) -> None: + """Test that SkyvernPage.click() with ai=None uses _locator_scope.""" + await browser_page.frame_switch(selector="#payment-frame") + # Use click with ai=None and mode=direct to go through the direct path + await browser_page.click("#pay-btn", ai=None, mode="direct") + value = await browser_page._locator_scope.locator("#card-number").input_value() + assert value == "paid" + + @pytest.mark.asyncio + async def test_fill_method_scopes_to_iframe(self, browser_page: _TestPage) -> None: + """Test that SkyvernPage.fill() with mode=direct uses _locator_scope.""" + await browser_page.frame_switch(selector="#payment-frame") + await browser_page.fill("#card-name", "John Doe", ai=None, mode="direct") + value = await browser_page._locator_scope.locator("#card-name").input_value() + assert value == "John Doe" + + @pytest.mark.asyncio + async def test_hover_method_scopes_to_iframe(self, browser_page: _TestPage) -> None: + """Test that SkyvernPage.hover() uses _locator_scope.""" + await browser_page.frame_switch(selector="#payment-frame") + # hover shouldn't throw — just verifying it resolves within frame + await browser_page.hover("#pay-btn") + + +# --------------------------------------------------------------------------- +# E2E: CLI frame state persistence round-trip +# +# These tests exercise the actual _apply_cli_frame_state code path: +# save frame identifiers to CLIState (file-backed), create a FRESH page +# (simulating a new CLI invocation), call _apply_cli_frame_state, then +# verify actions target the iframe — not the main page. +# --------------------------------------------------------------------------- + + +class TestCLIFrameStatePersistence: + @pytest.mark.asyncio + async def test_cli_frame_state_reapplied_by_selector(self, browser_page: _TestPage, tmp_path: Any) -> None: + """frame_switch by selector → save to CLIState → fresh page → _apply_cli_frame_state → in iframe.""" + import skyvern.cli.commands._state as state_mod + from skyvern.cli.commands._state import CLIState, save_state + from skyvern.cli.commands.browser import _apply_cli_frame_state + + # Patch STATE_FILE to a temp location so we don't touch the user's real state + orig_file = state_mod.STATE_FILE + state_mod.STATE_FILE = tmp_path / "state.json" + try: + # Simulate: user ran `skyvern browser frame switch --selector "#payment-frame"` + save_state(CLIState(session_id="test", mode="cloud", frame_selector="#payment-frame")) + + # Simulate: new CLI invocation gets a FRESH page (no _working_frame) + assert browser_page._working_frame is None + + # This is the code under test + await _apply_cli_frame_state(browser_page) + + # Verify: page is now scoped to the iframe + assert browser_page._working_frame is not None + heading = await browser_page._locator_scope.locator("#iframe-heading").text_content() + assert heading == "Payment Form" + + # Verify: main page elements are NOT visible from the iframe scope + count = await browser_page._locator_scope.locator("#main-heading").count() + assert count == 0 + finally: + state_mod.STATE_FILE = orig_file + + @pytest.mark.asyncio + async def test_cli_frame_state_reapplied_by_name(self, browser_page: _TestPage, tmp_path: Any) -> None: + """frame_switch by name → save to CLIState → fresh page → _apply_cli_frame_state → in iframe.""" + import skyvern.cli.commands._state as state_mod + from skyvern.cli.commands._state import CLIState, save_state + from skyvern.cli.commands.browser import _apply_cli_frame_state + + orig_file = state_mod.STATE_FILE + state_mod.STATE_FILE = tmp_path / "state.json" + try: + save_state(CLIState(session_id="test", mode="cloud", frame_name="payment")) + + assert browser_page._working_frame is None + await _apply_cli_frame_state(browser_page) + + assert browser_page._working_frame is not None + heading = await browser_page._locator_scope.locator("#iframe-heading").text_content() + assert heading == "Payment Form" + finally: + state_mod.STATE_FILE = orig_file + + @pytest.mark.asyncio + async def test_cli_frame_state_reapplied_by_index(self, browser_page: _TestPage, tmp_path: Any) -> None: + """frame_switch by index → save to CLIState → fresh page → _apply_cli_frame_state → in iframe.""" + import skyvern.cli.commands._state as state_mod + from skyvern.cli.commands._state import CLIState, save_state + from skyvern.cli.commands.browser import _apply_cli_frame_state + + orig_file = state_mod.STATE_FILE + state_mod.STATE_FILE = tmp_path / "state.json" + try: + # Index 1 = payment iframe (0 = main frame) + save_state(CLIState(session_id="test", mode="cloud", frame_index=1)) + + assert browser_page._working_frame is None + await _apply_cli_frame_state(browser_page) + + assert browser_page._working_frame is not None + heading = await browser_page._locator_scope.locator("#iframe-heading").text_content() + assert heading == "Payment Form" + finally: + state_mod.STATE_FILE = orig_file + + @pytest.mark.asyncio + async def test_cli_frame_state_noop_when_no_frame(self, browser_page: _TestPage, tmp_path: Any) -> None: + """No frame state in CLIState → _apply_cli_frame_state is a no-op.""" + import skyvern.cli.commands._state as state_mod + from skyvern.cli.commands._state import CLIState, save_state + from skyvern.cli.commands.browser import _apply_cli_frame_state + + orig_file = state_mod.STATE_FILE + state_mod.STATE_FILE = tmp_path / "state.json" + try: + save_state(CLIState(session_id="test", mode="cloud")) + + await _apply_cli_frame_state(browser_page) + + assert browser_page._working_frame is None + heading = await browser_page._locator_scope.locator("#main-heading").text_content() + assert heading == "Main Page" + finally: + state_mod.STATE_FILE = orig_file + + @pytest.mark.asyncio + async def test_cli_frame_state_clears_on_stale_selector(self, browser_page: _TestPage, tmp_path: Any) -> None: + """Stale selector in CLIState → _apply_cli_frame_state clears state and stays on main page.""" + import skyvern.cli.commands._state as state_mod + from skyvern.cli.commands._state import CLIState, load_state, save_state + from skyvern.cli.commands.browser import _apply_cli_frame_state + + orig_file = state_mod.STATE_FILE + state_mod.STATE_FILE = tmp_path / "state.json" + try: + save_state(CLIState(session_id="test", mode="cloud", frame_selector="#nonexistent-frame")) + + await _apply_cli_frame_state(browser_page) + + # Frame state should be cleared since the selector doesn't exist + assert browser_page._working_frame is None + reloaded = load_state() + assert reloaded is not None + assert reloaded.frame_selector is None + assert reloaded.frame_name is None + assert reloaded.frame_index is None + finally: + state_mod.STATE_FILE = orig_file + + @pytest.mark.asyncio + async def test_cli_fill_inside_iframe_after_state_restore(self, browser_page: _TestPage, tmp_path: Any) -> None: + """Full round-trip: save frame state → fresh page → restore → fill inside iframe.""" + import skyvern.cli.commands._state as state_mod + from skyvern.cli.commands._state import CLIState, save_state + from skyvern.cli.commands.browser import _apply_cli_frame_state + + orig_file = state_mod.STATE_FILE + state_mod.STATE_FILE = tmp_path / "state.json" + try: + save_state(CLIState(session_id="test", mode="cloud", frame_selector="#payment-frame")) + + await _apply_cli_frame_state(browser_page) + + # Now do what a CLI `type` command would do after _apply_cli_frame_state + await browser_page._locator_scope.locator("#card-number").fill("4242424242424242") + value = await browser_page._locator_scope.locator("#card-number").input_value() + assert value == "4242424242424242" + + # Main page input should be untouched + main_value = await browser_page.page.locator("#main-input").input_value() + assert main_value == "" + finally: + state_mod.STATE_FILE = orig_file diff --git a/tests/unit/test_iframe_mcp_e2e.py b/tests/unit/test_iframe_mcp_e2e.py new file mode 100644 index 000000000..2f319808e --- /dev/null +++ b/tests/unit/test_iframe_mcp_e2e.py @@ -0,0 +1,230 @@ +"""E2E test for iframe MCP tools with a real browser. + +Exercises the MCP tool chain (frame_list, frame_switch, frame_main) through +real Playwright + SessionState wiring, without requiring Skyvern's local +browser launcher infrastructure. + +Skipped in CI when Playwright browsers are not installed. +""" + +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import Any + +import pytest +import pytest_asyncio +from playwright.async_api import async_playwright + +from skyvern.cli.core.result import BrowserContext +from skyvern.cli.core.session_manager import SessionState, get_current_session, set_current_session +from skyvern.cli.mcp_tools.browser import skyvern_frame_list, skyvern_frame_main, skyvern_frame_switch + + +def _has_playwright_browser() -> bool: + """Check that Playwright's chromium binary exists for the current installed version.""" + try: + from playwright.sync_api import sync_playwright # noqa: PLC0415 + + with sync_playwright() as p: + return Path(p.chromium.executable_path).exists() + except Exception: + return False + + +_skip_no_browser = pytest.mark.skipif( + not _has_playwright_browser(), + reason="Requires Playwright browsers installed (run: playwright install chromium)", +) + +pytestmark = _skip_no_browser + +MAIN_HTML = """\ + + + +

Main Page

+ + + + +""" + + +class _FakeBrowserContext: + """Minimal browser context to satisfy get_page() hooks from tab management.""" + + def __init__(self, page: Any) -> None: + self.pages = [page] + + def on(self, event: str, handler: Any) -> None: + pass # No-op for tests + + +class _FakeBrowser: + """Minimal SkyvernBrowser substitute that wraps a real Playwright page.""" + + def __init__(self, page: Any) -> None: + self._page = page + self._browser_context = _FakeBrowserContext(page) + + async def get_working_page(self) -> Any: + # Return a lightweight wrapper that has the frame methods + return _WrappedPage(self._page) + + +class _WrappedPage: + """Thin wrapper around Playwright Page to add frame_switch/main/list.""" + + def __init__(self, page: Any) -> None: + self.page = page + self._working_frame = None + + @property + def _locator_scope(self) -> Any: + if self._working_frame is not None: + return self._working_frame + return self.page + + async def frame_switch( + self, *, selector: str | None = None, name: str | None = None, index: int | None = None + ) -> dict[str, Any]: + params = sum(p is not None for p in (selector, name, index)) + if params != 1: + raise ValueError("Exactly one of selector, name, or index is required.") + + frame = None + if selector is not None: + element = await self.page.query_selector(selector) + if element is None: + raise ValueError(f"Selector '{selector}' did not match any element.") + frame = await element.content_frame() + if frame is None: + raise ValueError(f"Selector '{selector}' did not resolve to an iframe.") + elif name is not None: + frame = self.page.frame(name=name) + if frame is None: + raise ValueError(f"No frame found with name '{name}'.") + elif index is not None: + frames = self.page.frames + if index < 0 or index >= len(frames): + raise ValueError(f"Frame index {index} out of range.") + frame = frames[index] + + self._working_frame = frame + return {"name": frame.name if frame else None, "url": frame.url if frame else None} + + def frame_main(self) -> dict[str, str]: + self._working_frame = None + return {"status": "switched_to_main_frame"} + + async def frame_list(self) -> list[dict[str, Any]]: + return [ + {"index": i, "name": f.name, "url": f.url, "is_main": f == self.page.main_frame} + for i, f in enumerate(self.page.frames) + ] + + def __getattr__(self, name: str) -> Any: + return getattr(self.page, name) + + +@pytest_asyncio.fixture +async def mcp_session(): + """Set up a real Playwright browser and wire it into SessionState.""" + async with async_playwright() as p: + try: + browser = await p.chromium.launch(headless=True) + except Exception: + pytest.skip("Playwright chromium binary not available") + context = await browser.new_context() + pw_page = await context.new_page() + await pw_page.set_content(MAIN_HTML) + await asyncio.sleep(0.3) + + fake_browser = _FakeBrowser(pw_page) + ctx = BrowserContext(mode="local") + state = SessionState(browser=fake_browser, context=ctx) # type: ignore[arg-type] + set_current_session(state) + + yield state + + set_current_session(SessionState()) + await context.close() + await browser.close() + + +# --------------------------------------------------------------------------- +# MCP tool e2e tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_mcp_frame_list_real_browser(mcp_session: SessionState) -> None: + result = await skyvern_frame_list() + assert result["ok"] is True + frames = result["data"]["frames"] + assert len(frames) >= 2 + names = [f["name"] for f in frames] + assert "payment" in names + assert result["data"]["count"] >= 2 + + +@pytest.mark.asyncio +async def test_mcp_frame_switch_by_selector(mcp_session: SessionState) -> None: + result = await skyvern_frame_switch(selector="#pay-frame") + assert result["ok"] is True + assert result["data"]["frame_name"] == "payment" + assert result["data"]["switched_by"] == "selector" + + # Verify SessionState was updated + assert mcp_session._working_frame is not None + + +@pytest.mark.asyncio +async def test_mcp_frame_switch_by_name(mcp_session: SessionState) -> None: + result = await skyvern_frame_switch(name="payment") + assert result["ok"] is True + assert result["data"]["switched_by"] == "name" + assert mcp_session._working_frame is not None + + +@pytest.mark.asyncio +async def test_mcp_frame_main_clears_state(mcp_session: SessionState) -> None: + # Switch in first + await skyvern_frame_switch(selector="#pay-frame") + assert mcp_session._working_frame is not None + + # Switch back + result = await skyvern_frame_main() + assert result["ok"] is True + assert mcp_session._working_frame is None + + +@pytest.mark.asyncio +async def test_mcp_frame_switch_invalid_selector(mcp_session: SessionState) -> None: + result = await skyvern_frame_switch(selector="#nonexistent") + assert result["ok"] is False + + +@pytest.mark.asyncio +async def test_mcp_frame_switch_persists_across_calls(mcp_session: SessionState) -> None: + """Frame state set by frame_switch persists across subsequent get_page() calls.""" + # Switch into iframe + await skyvern_frame_switch(selector="#pay-frame") + + # Simulate a subsequent MCP call — get_page() reads _working_frame from SessionState + state = get_current_session() + assert state._working_frame is not None + + # The next get_page() call would set page._working_frame from state._working_frame + # Verify the state is there for the propagation + frame = state._working_frame + heading = await frame.locator("#frame-heading").text_content() + assert heading == "Payment" diff --git a/tests/unit/test_iframe_support.py b/tests/unit/test_iframe_support.py new file mode 100644 index 000000000..35c27b4f0 --- /dev/null +++ b/tests/unit/test_iframe_support.py @@ -0,0 +1,496 @@ +"""Unit tests for iframe support: _locator_scope, frame_switch, frame_main, frame_list.""" + +from __future__ import annotations + +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from skyvern.cli.core.browser_ops import do_frame_list, do_frame_main, do_frame_switch + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_fake_frame(*, name: str = "", url: str = "about:blank") -> MagicMock: + frame = MagicMock() + frame.name = name + frame.url = url + frame.locator = MagicMock(return_value=MagicMock()) + return frame + + +def _make_fake_page(frames: list[MagicMock] | None = None) -> MagicMock: + """Build a mock Playwright Page with .locator(), .frames, .main_frame, .frame().""" + page = MagicMock() + all_frames = frames or [_make_fake_frame(name="main", url="https://example.com")] + page.frames = all_frames + page.main_frame = all_frames[0] + page.locator = MagicMock(return_value=MagicMock()) + page.frame = MagicMock(return_value=None) + return page + + +class FakeSkyvernPage: + """Minimal stand-in for SkyvernPage to test _locator_scope without Playwright.""" + + def __init__(self, page: Any, working_frame: Any = None) -> None: + self.page = page + self._working_frame = working_frame + + @property + def _locator_scope(self) -> Any: + frame = object.__getattribute__(self, "_working_frame") + if frame is not None: + return frame + return object.__getattribute__(self, "page") + + +class FakeSkyvernBrowserPage(FakeSkyvernPage): + """Minimal stand-in for SkyvernBrowserPage to test frame methods.""" + + async def frame_switch( + self, *, selector: str | None = None, name: str | None = None, index: int | None = None + ) -> dict[str, Any]: + params = sum(p is not None for p in (selector, name, index)) + if params != 1: + raise ValueError("Exactly one of selector, name, or index is required.") + + frame = None + + if selector is not None: + element = await self.page.query_selector(selector) + if element is None: + raise ValueError(f"Selector '{selector}' did not match any element.") + frame = await element.content_frame() + if frame is None: + raise ValueError(f"Selector '{selector}' did not resolve to an iframe.") + elif name is not None: + frame = self.page.frame(name=name) + if frame is None: + raise ValueError(f"No frame found with name '{name}'.") + elif index is not None: + frames = self.page.frames + if index < 0 or index >= len(frames): + raise ValueError(f"Frame index {index} out of range (0-{len(frames) - 1}).") + frame = frames[index] + + self._working_frame = frame + return { + "name": frame.name if frame else None, + "url": frame.url if frame else None, + "selector": selector, + "frame_name": name, + "index": index, + } + + def frame_main(self) -> dict[str, str]: + self._working_frame = None + return {"status": "switched_to_main_frame"} + + async def frame_list(self) -> list[dict[str, Any]]: + frames = self.page.frames + return [ + {"index": i, "name": f.name, "url": f.url, "is_main": f == self.page.main_frame} + for i, f in enumerate(frames) + ] + + +# --------------------------------------------------------------------------- +# _locator_scope tests +# --------------------------------------------------------------------------- + + +class TestLocatorScope: + def test_returns_page_when_no_frame(self) -> None: + page = _make_fake_page() + sp = FakeSkyvernPage(page) + assert sp._locator_scope is page + + def test_returns_frame_when_set(self) -> None: + page = _make_fake_page() + frame = _make_fake_frame(name="iframe1") + sp = FakeSkyvernPage(page, working_frame=frame) + assert sp._locator_scope is frame + + def test_returns_page_after_clearing_frame(self) -> None: + page = _make_fake_page() + frame = _make_fake_frame() + sp = FakeSkyvernPage(page, working_frame=frame) + assert sp._locator_scope is frame + sp._working_frame = None + assert sp._locator_scope is page + + def test_locator_call_delegates_to_frame(self) -> None: + page = _make_fake_page() + frame = _make_fake_frame() + sp = FakeSkyvernPage(page, working_frame=frame) + sp._locator_scope.locator("#btn") + frame.locator.assert_called_once_with("#btn") + page.locator.assert_not_called() + + def test_locator_call_delegates_to_page_when_no_frame(self) -> None: + page = _make_fake_page() + sp = FakeSkyvernPage(page) + sp._locator_scope.locator("#btn") + page.locator.assert_called_once_with("#btn") + + +# --------------------------------------------------------------------------- +# frame_switch tests +# --------------------------------------------------------------------------- + + +class TestFrameSwitch: + @pytest.mark.asyncio + async def test_switch_by_selector(self) -> None: + iframe = _make_fake_frame(name="payment", url="https://payment.example.com/v3") + element_mock = MagicMock() + element_mock.content_frame = AsyncMock(return_value=iframe) + + page = _make_fake_page() + page.query_selector = AsyncMock(return_value=element_mock) + + sp = FakeSkyvernBrowserPage(page) + result = await sp.frame_switch(selector="#payment-frame") + + assert sp._working_frame is iframe + assert result["name"] == "payment" + assert result["url"] == "https://payment.example.com/v3" + + @pytest.mark.asyncio + async def test_switch_by_name(self) -> None: + iframe = _make_fake_frame(name="checkout", url="https://checkout.com") + page = _make_fake_page() + page.frame = MagicMock(return_value=iframe) + + sp = FakeSkyvernBrowserPage(page) + result = await sp.frame_switch(name="checkout") + + assert sp._working_frame is iframe + assert result["frame_name"] == "checkout" + + @pytest.mark.asyncio + async def test_switch_by_index(self) -> None: + main = _make_fake_frame(name="main", url="https://example.com") + iframe = _make_fake_frame(name="embed", url="https://embed.com") + page = _make_fake_page([main, iframe]) + + sp = FakeSkyvernBrowserPage(page) + result = await sp.frame_switch(index=1) + + assert sp._working_frame is iframe + assert result["index"] == 1 + + @pytest.mark.asyncio + async def test_switch_no_params_raises(self) -> None: + page = _make_fake_page() + sp = FakeSkyvernBrowserPage(page) + with pytest.raises(ValueError, match="Exactly one"): + await sp.frame_switch() + + @pytest.mark.asyncio + async def test_switch_multiple_params_raises(self) -> None: + page = _make_fake_page() + sp = FakeSkyvernBrowserPage(page) + with pytest.raises(ValueError, match="Exactly one"): + await sp.frame_switch(selector="#x", name="y") + + @pytest.mark.asyncio + async def test_switch_selector_not_iframe_raises(self) -> None: + element_mock = MagicMock() + element_mock.content_frame = AsyncMock(return_value=None) + + page = _make_fake_page() + page.query_selector = AsyncMock(return_value=element_mock) + + sp = FakeSkyvernBrowserPage(page) + with pytest.raises(ValueError, match="did not resolve to an iframe"): + await sp.frame_switch(selector="#not-iframe") + + @pytest.mark.asyncio + async def test_switch_name_not_found_raises(self) -> None: + page = _make_fake_page() + page.frame = MagicMock(return_value=None) + + sp = FakeSkyvernBrowserPage(page) + with pytest.raises(ValueError, match="No frame found"): + await sp.frame_switch(name="nonexistent") + + @pytest.mark.asyncio + async def test_switch_index_out_of_range_raises(self) -> None: + page = _make_fake_page([_make_fake_frame()]) + sp = FakeSkyvernBrowserPage(page) + with pytest.raises(ValueError, match="out of range"): + await sp.frame_switch(index=5) + + +# --------------------------------------------------------------------------- +# frame_main tests +# --------------------------------------------------------------------------- + + +class TestFrameMain: + def test_clears_working_frame(self) -> None: + page = _make_fake_page() + frame = _make_fake_frame() + sp = FakeSkyvernBrowserPage(page, working_frame=frame) + assert sp._working_frame is frame + sp.frame_main() + assert sp._working_frame is None + assert sp._locator_scope is page + + +# --------------------------------------------------------------------------- +# frame_list tests +# --------------------------------------------------------------------------- + + +class TestFrameList: + @pytest.mark.asyncio + async def test_lists_all_frames(self) -> None: + main = _make_fake_frame(name="", url="https://example.com") + iframe1 = _make_fake_frame(name="ads", url="https://ads.com") + iframe2 = _make_fake_frame(name="payment", url="https://payment.example.com") + page = _make_fake_page([main, iframe1, iframe2]) + + sp = FakeSkyvernBrowserPage(page) + frames = await sp.frame_list() + + assert len(frames) == 3 + assert frames[0]["is_main"] is True + assert frames[1]["name"] == "ads" + assert frames[2]["name"] == "payment" + + +# --------------------------------------------------------------------------- +# MCP tool tests +# --------------------------------------------------------------------------- + + +class TestMCPFrameTools: + @pytest.mark.asyncio + async def test_frame_switch_invalid_params_preflight(self, monkeypatch: pytest.MonkeyPatch) -> None: + from skyvern.cli.mcp_tools import browser as mcp_browser + + get_page = AsyncMock(side_effect=AssertionError("get_page should not be called")) + monkeypatch.setattr(mcp_browser, "get_page", get_page) + + result = await mcp_browser.skyvern_frame_switch() + assert result["ok"] is False + assert result["error"]["code"] == mcp_browser.ErrorCode.INVALID_INPUT + get_page.assert_not_awaited() + + @pytest.mark.asyncio + async def test_frame_switch_multiple_params_preflight(self, monkeypatch: pytest.MonkeyPatch) -> None: + from skyvern.cli.mcp_tools import browser as mcp_browser + + get_page = AsyncMock(side_effect=AssertionError("get_page should not be called")) + monkeypatch.setattr(mcp_browser, "get_page", get_page) + + result = await mcp_browser.skyvern_frame_switch(selector="#x", name="y") + assert result["ok"] is False + get_page.assert_not_awaited() + + @pytest.mark.asyncio + async def test_frame_list_no_browser(self, monkeypatch: pytest.MonkeyPatch) -> None: + from skyvern.cli.mcp_tools import browser as mcp_browser + from skyvern.cli.mcp_tools._session import BrowserNotAvailableError + + monkeypatch.setattr(mcp_browser, "get_page", AsyncMock(side_effect=BrowserNotAvailableError())) + + result = await mcp_browser.skyvern_frame_list() + assert result["ok"] is False + assert result["error"]["code"] == mcp_browser.ErrorCode.NO_ACTIVE_BROWSER + + @pytest.mark.asyncio + async def test_frame_main_no_browser(self, monkeypatch: pytest.MonkeyPatch) -> None: + from skyvern.cli.mcp_tools import browser as mcp_browser + from skyvern.cli.mcp_tools._session import BrowserNotAvailableError + + monkeypatch.setattr(mcp_browser, "get_page", AsyncMock(side_effect=BrowserNotAvailableError())) + + result = await mcp_browser.skyvern_frame_main() + assert result["ok"] is False + assert result["error"]["code"] == mcp_browser.ErrorCode.NO_ACTIVE_BROWSER + + @pytest.mark.asyncio + async def test_navigate_clears_working_frame(self, monkeypatch: pytest.MonkeyPatch) -> None: + """skyvern_navigate must clear _working_frame to prevent stale frame references.""" + from skyvern.cli.core.session_manager import SessionState + from skyvern.cli.mcp_tools import browser as mcp_browser + + fake_page = MagicMock() + fake_page.goto = AsyncMock() + fake_page.url = "https://example.com/new" + fake_page.title = AsyncMock(return_value="New Page") + fake_ctx = MagicMock() + fake_ctx.mode = "local" + + monkeypatch.setattr(mcp_browser, "get_page", AsyncMock(return_value=(fake_page, fake_ctx))) + + # Pre-set a working frame on the session state + state = SessionState() + state._working_frame = MagicMock() # simulates an active iframe + monkeypatch.setattr(mcp_browser, "get_current_session", lambda: state) + + result = await mcp_browser.skyvern_navigate(url="https://example.com/new") + assert result["ok"] is True + assert state._working_frame is None + + @pytest.mark.asyncio + async def test_navigate_clears_frame_on_failure(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Frame state must be cleared even when navigation fails (partial load may destroy iframes).""" + from skyvern.cli.core.session_manager import SessionState + from skyvern.cli.mcp_tools import browser as mcp_browser + + fake_page = MagicMock() + fake_page.goto = AsyncMock(side_effect=TimeoutError("Navigation timeout")) + fake_ctx = MagicMock() + fake_ctx.mode = "local" + + monkeypatch.setattr(mcp_browser, "get_page", AsyncMock(return_value=(fake_page, fake_ctx))) + + state = SessionState() + state._working_frame = MagicMock() + monkeypatch.setattr(mcp_browser, "get_current_session", lambda: state) + + result = await mcp_browser.skyvern_navigate(url="https://example.com/timeout") + assert result["ok"] is False + assert state._working_frame is None # cleared despite failure + + @pytest.mark.asyncio + async def test_tab_switch_clears_working_frame(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Switching tabs must clear _working_frame to prevent stale cross-tab frame references.""" + import skyvern.cli.core.session_manager as sm_mod + from skyvern.cli.core.session_manager import SessionState + from skyvern.cli.mcp_tools import tabs as mcp_tabs + + fake_page = MagicMock() + fake_page.is_closed = MagicMock(return_value=False) + fake_ctx = MagicMock() + fake_ctx.mode = "local" + monkeypatch.setattr(mcp_tabs, "get_page", AsyncMock(return_value=(fake_page, fake_ctx))) + monkeypatch.setattr(sm_mod, "is_stateless_http_mode", lambda: False) + + target_page = MagicMock() + target_page.is_closed = MagicMock(return_value=False) + target_page.url = "https://other-tab.com" + target_page.title = AsyncMock(return_value="Other Tab") + target_page.bring_to_front = AsyncMock() + + state = SessionState() + state.browser = MagicMock() + state.browser._browser_context.pages = [fake_page, target_page] + state._working_frame = MagicMock() # stale frame from previous tab + monkeypatch.setattr(mcp_tabs, "get_current_session", lambda: state) + + result = await mcp_tabs.skyvern_tab_switch(index=1) + assert result["ok"] is True + assert state._working_frame is None + + @pytest.mark.asyncio + async def test_tab_new_clears_working_frame(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Opening a new tab must clear _working_frame.""" + from skyvern.cli.core.session_manager import SessionState + from skyvern.cli.mcp_tools import tabs as mcp_tabs + + fake_page = MagicMock() + fake_ctx = MagicMock() + fake_ctx.mode = "local" + monkeypatch.setattr(mcp_tabs, "get_page", AsyncMock(return_value=(fake_page, fake_ctx))) + + new_page = MagicMock() + new_page.url = "about:blank" + new_page.title = AsyncMock(return_value="") + + state = SessionState() + state.browser = MagicMock() + state.browser._browser_context.new_page = AsyncMock(return_value=new_page) + state.browser._browser_context.pages = [fake_page, new_page] + state._working_frame = MagicMock() # stale frame + monkeypatch.setattr(mcp_tabs, "get_current_session", lambda: state) + + result = await mcp_tabs.skyvern_tab_new() + assert result["ok"] is True + assert state._working_frame is None + + +# --------------------------------------------------------------------------- +# CLIState frame persistence tests +# --------------------------------------------------------------------------- + + +class TestCLIStateFrame: + def test_frame_fields_default_none(self) -> None: + from skyvern.cli.commands._state import CLIState + + state = CLIState() + assert state.frame_selector is None + assert state.frame_name is None + assert state.frame_index is None + + def test_frame_fields_roundtrip(self, tmp_path: Any) -> None: + import skyvern.cli.commands._state as state_mod + from skyvern.cli.commands._state import CLIState, load_state, save_state + + # Point to temp dir to avoid polluting real state + original_dir = state_mod.STATE_DIR + original_file = state_mod.STATE_FILE + state_mod.STATE_DIR = tmp_path + state_mod.STATE_FILE = tmp_path / "state.json" + try: + state = CLIState( + session_id="pbs_123", + mode="cloud", + frame_selector="#payment-frame", + ) + save_state(state) + loaded = load_state() + assert loaded is not None + assert loaded.frame_selector == "#payment-frame" + assert loaded.frame_name is None + assert loaded.frame_index is None + finally: + state_mod.STATE_DIR = original_dir + state_mod.STATE_FILE = original_file + + +# --------------------------------------------------------------------------- +# browser_ops tests +# --------------------------------------------------------------------------- + + +class TestBrowserOps: + @pytest.mark.asyncio + async def test_do_frame_switch_delegates(self) -> None: + page = MagicMock() + page.frame_switch = AsyncMock(return_value={"name": "pay", "url": "https://pay.com"}) + + result = await do_frame_switch(page, selector="#iframe") + page.frame_switch.assert_awaited_once_with(selector="#iframe", name=None, index=None) + assert result.name == "pay" + + def test_do_frame_main_delegates(self) -> None: + page = MagicMock() + page.frame_main = MagicMock(return_value={"status": "switched_to_main_frame"}) + + do_frame_main(page) + page.frame_main.assert_called_once() + + @pytest.mark.asyncio + async def test_do_frame_list_delegates(self) -> None: + page = MagicMock() + page.frame_list = AsyncMock( + return_value=[ + {"index": 0, "name": "", "url": "https://example.com", "is_main": True}, + {"index": 1, "name": "embed", "url": "https://embed.com", "is_main": False}, + ] + ) + + result = await do_frame_list(page) + assert len(result) == 2 + assert result[0].is_main is True + assert result[1].name == "embed"