Add CDP proxy authentication via Fetch.authRequired (#4936)
Some checks are pending
Run tests and pre-commit / Frontend Lint and Build (push) Waiting to run
Run tests and pre-commit / Run tests and pre-commit hooks (push) Waiting to run
Publish Fern Docs / run (push) Waiting to run

This commit is contained in:
LawyZheng 2026-03-01 17:25:53 +08:00 committed by GitHub
parent 5da370a813
commit 6a0c5ebdf1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 330 additions and 4 deletions

View file

@ -203,11 +203,21 @@ class CDPDownloadInterceptor:
- Download extract body save to disk Fetch.fulfillRequest (block browser save)
"""
def __init__(self, output_dir: str | None = None) -> None:
def __init__(
self,
output_dir: str | None = None,
proxy_username: str | None = None,
proxy_password: str | None = None,
) -> None:
self._output_dir: Path | None = Path(output_dir) if output_dir else None
self._proxy_username: str | None = proxy_username
self._proxy_password: str | None = proxy_password
self._cdp_sessions: list[CDPSession] = []
self._enabled = False
self._download_index = 0
# Track auth attempts per requestId to prevent infinite retry loops
# when proxy credentials are rejected (407 → ProvideCredentials → 407 → …)
self._auth_attempts: dict[str, int] = {}
def set_download_dir(self, download_dir: str) -> None:
"""Set or update the download directory. Can be called after init when run_id becomes available."""
@ -221,9 +231,22 @@ class CDPDownloadInterceptor:
# Capture cdp_session in the closure so the handler uses the correct session
# (requestId is scoped to the session that fired Fetch.requestPaused).
cdp_session.on("Fetch.requestPaused", lambda event: self._on_request_paused(event, cdp_session))
# When proxy credentials are provided, also handle Fetch.authRequired events.
# This solves the problem where Playwright's new_context(proxy=...) cannot handle
# proxy authentication (407) on remote CDP browsers.
has_proxy_auth = bool(self._proxy_username and self._proxy_password)
if has_proxy_auth:
cdp_session.on("Fetch.authRequired", lambda event: self._on_auth_required(event, cdp_session))
# Note: Fetch.authRequired events are controlled solely by handleAuthRequests,
# independent of the patterns array (which only affects Fetch.requestPaused).
await cdp_session.send(
"Fetch.enable",
{"patterns": [{"requestStage": "Response"}]},
{
"patterns": [{"requestStage": "Response"}],
"handleAuthRequests": has_proxy_auth,
},
)
self._cdp_sessions.append(cdp_session)
self._enabled = True
@ -232,6 +255,7 @@ class CDPDownloadInterceptor:
page_url=page.url,
session_count=len(self._cdp_sessions),
output_dir=str(self._output_dir),
proxy_auth_enabled=has_proxy_auth,
)
async def disable(self) -> None:
@ -254,6 +278,74 @@ class CDPDownloadInterceptor:
"""Handle Fetch.requestPaused — schedule async handler with the originating session."""
asyncio.ensure_future(self._handle_request_paused(event, cdp_session))
def _on_auth_required(self, event: dict[str, Any], cdp_session: CDPSession) -> None:
"""Handle Fetch.authRequired — schedule async handler with the originating session."""
asyncio.ensure_future(self._handle_auth_required(event, cdp_session))
async def _handle_auth_required(self, event: dict[str, Any], cdp_session: CDPSession) -> None:
"""Handle proxy 407 auth challenges via CDP Fetch.continueWithAuth.
Only responds to proxy auth challenges (source == "Proxy") when credentials are available
and the request hasn't already been retried (to prevent infinite loops when credentials
are rejected). All other auth challenges are cancelled to prevent hanging.
"""
try:
request_id = event["requestId"]
auth_challenge = event.get("authChallenge", {})
source = auth_challenge.get("source", "")
url = event.get("request", {}).get("url", "<unknown>")
# Defensive: this handler is only registered when credentials are present,
# but we still check to guard against future refactors.
attempts = self._auth_attempts.get(request_id, 0)
if source == "Proxy" and self._proxy_username and self._proxy_password and attempts < 1:
self._auth_attempts[request_id] = attempts + 1
LOG.info(
"CDP proxy auth challenge received, providing credentials",
url=url,
origin=auth_challenge.get("origin", ""),
)
await cdp_session.send(
"Fetch.continueWithAuth",
{
"requestId": request_id,
"authChallengeResponse": {
"response": "ProvideCredentials",
"username": self._proxy_username,
"password": self._proxy_password,
},
},
)
else:
# Clean up attempt tracking for this request
self._auth_attempts.pop(request_id, None)
if attempts >= 1:
LOG.warning(
"CDP proxy auth credentials rejected, cancelling to prevent retry loop",
url=url,
source=source,
attempts=attempts,
)
else:
LOG.warning(
"CDP auth challenge received, cancelling (non-proxy or no credentials)",
url=url,
source=source,
)
await cdp_session.send(
"Fetch.continueWithAuth",
{
"requestId": request_id,
"authChallengeResponse": {"response": "CancelAuth"},
},
)
except Exception as e:
LOG.error(
"Error handling CDP auth challenge",
error=str(e),
exc_info=True,
)
async def _handle_request_paused(self, event: dict[str, Any], cdp_session: CDPSession) -> None:
"""Async handler for paused requests."""
request_id = event["requestId"]

View file

@ -1,8 +1,11 @@
"""Unit tests for CDPDownloadInterceptor pure functions (no browser needed)."""
"""Unit tests for CDPDownloadInterceptor pure functions and proxy auth handling."""
import time
from unittest.mock import AsyncMock, MagicMock
from skyvern.webeye.cdp_download_interceptor import extract_filename, is_download_response
import pytest
from skyvern.webeye.cdp_download_interceptor import CDPDownloadInterceptor, extract_filename, is_download_response
class TestIsDownloadResponse:
@ -225,3 +228,234 @@ class TestExtractFilename:
# extract_filename returns the raw name; sanitization is done in _handle_download.
# But verify the raw output so tests document the behavior.
assert result == "../../etc/cron.d/evil"
class TestCDPDownloadInterceptorProxyAuth:
"""Tests for CDP proxy authentication handling (Fetch.authRequired + continueWithAuth)."""
def _make_interceptor(
self,
proxy_username: str | None = None,
proxy_password: str | None = None,
) -> CDPDownloadInterceptor:
return CDPDownloadInterceptor(
output_dir="/tmp/test_downloads",
proxy_username=proxy_username,
proxy_password=proxy_password,
)
def _make_cdp_session(self) -> MagicMock:
session = MagicMock()
session.send = AsyncMock()
return session
@pytest.mark.asyncio
async def test_proxy_auth_provides_credentials(self) -> None:
"""Proxy 407 challenge should respond with ProvideCredentials when credentials are available."""
interceptor = self._make_interceptor(proxy_username="user1", proxy_password="pass1")
cdp_session = self._make_cdp_session()
event = {
"requestId": "req-1",
"authChallenge": {"source": "Proxy", "origin": "http://proxy.example.com"},
"request": {"url": "https://example.com/page"},
}
await interceptor._handle_auth_required(event, cdp_session)
cdp_session.send.assert_called_once_with(
"Fetch.continueWithAuth",
{
"requestId": "req-1",
"authChallengeResponse": {
"response": "ProvideCredentials",
"username": "user1",
"password": "pass1",
},
},
)
@pytest.mark.asyncio
async def test_non_proxy_auth_cancels(self) -> None:
"""Non-proxy auth challenges (e.g., HTTP Basic from origin) should be cancelled."""
interceptor = self._make_interceptor(proxy_username="user1", proxy_password="pass1")
cdp_session = self._make_cdp_session()
event = {
"requestId": "req-2",
"authChallenge": {"source": "Server", "origin": "https://example.com"},
"request": {"url": "https://example.com/protected"},
}
await interceptor._handle_auth_required(event, cdp_session)
cdp_session.send.assert_called_once_with(
"Fetch.continueWithAuth",
{
"requestId": "req-2",
"authChallengeResponse": {"response": "CancelAuth"},
},
)
@pytest.mark.asyncio
async def test_no_credentials_cancels_proxy_auth(self) -> None:
"""Proxy auth challenge without credentials should be cancelled."""
interceptor = self._make_interceptor() # No credentials
cdp_session = self._make_cdp_session()
event = {
"requestId": "req-3",
"authChallenge": {"source": "Proxy", "origin": "http://proxy.example.com"},
"request": {"url": "https://example.com/page"},
}
await interceptor._handle_auth_required(event, cdp_session)
cdp_session.send.assert_called_once_with(
"Fetch.continueWithAuth",
{
"requestId": "req-3",
"authChallengeResponse": {"response": "CancelAuth"},
},
)
@pytest.mark.asyncio
async def test_partial_credentials_cancels(self) -> None:
"""Proxy auth with only username (no password) should cancel."""
interceptor = self._make_interceptor(proxy_username="user1")
cdp_session = self._make_cdp_session()
event = {
"requestId": "req-4",
"authChallenge": {"source": "Proxy", "origin": "http://proxy.example.com"},
"request": {"url": "https://example.com/page"},
}
await interceptor._handle_auth_required(event, cdp_session)
cdp_session.send.assert_called_once_with(
"Fetch.continueWithAuth",
{
"requestId": "req-4",
"authChallengeResponse": {"response": "CancelAuth"},
},
)
@pytest.mark.asyncio
async def test_auth_error_does_not_raise(self) -> None:
"""Errors during auth handling should be caught, not raised."""
interceptor = self._make_interceptor(proxy_username="user1", proxy_password="pass1")
cdp_session = self._make_cdp_session()
cdp_session.send.side_effect = Exception("CDP connection lost")
event = {
"requestId": "req-5",
"authChallenge": {"source": "Proxy", "origin": "http://proxy.example.com"},
"request": {"url": "https://example.com/page"},
}
# Should not raise
await interceptor._handle_auth_required(event, cdp_session)
def test_init_stores_proxy_credentials(self) -> None:
"""Constructor should store proxy credentials."""
interceptor = self._make_interceptor(proxy_username="user", proxy_password="pass")
assert interceptor._proxy_username == "user"
assert interceptor._proxy_password == "pass"
def test_init_no_proxy_credentials(self) -> None:
"""Constructor without credentials should store None."""
interceptor = self._make_interceptor()
assert interceptor._proxy_username is None
assert interceptor._proxy_password is None
@pytest.mark.asyncio
async def test_enable_for_page_with_proxy_auth(self) -> None:
"""enable_for_page should set handleAuthRequests=True and register authRequired handler."""
interceptor = self._make_interceptor(proxy_username="user", proxy_password="pass")
mock_cdp_session = self._make_cdp_session()
mock_page = MagicMock()
mock_page.url = "about:blank"
mock_page.context.new_cdp_session = AsyncMock(return_value=mock_cdp_session)
await interceptor.enable_for_page(mock_page)
# Verify Fetch.enable was called with handleAuthRequests=True
mock_cdp_session.send.assert_called_once_with(
"Fetch.enable",
{
"patterns": [{"requestStage": "Response"}],
"handleAuthRequests": True,
},
)
# Verify both event handlers were registered
event_names = [call.args[0] for call in mock_cdp_session.on.call_args_list]
assert "Fetch.requestPaused" in event_names
assert "Fetch.authRequired" in event_names
@pytest.mark.asyncio
async def test_enable_for_page_without_proxy_auth(self) -> None:
"""enable_for_page without credentials should set handleAuthRequests=False."""
interceptor = self._make_interceptor()
mock_cdp_session = self._make_cdp_session()
mock_page = MagicMock()
mock_page.url = "about:blank"
mock_page.context.new_cdp_session = AsyncMock(return_value=mock_cdp_session)
await interceptor.enable_for_page(mock_page)
# Verify Fetch.enable was called with handleAuthRequests=False
mock_cdp_session.send.assert_called_once_with(
"Fetch.enable",
{
"patterns": [{"requestStage": "Response"}],
"handleAuthRequests": False,
},
)
# Verify only requestPaused handler was registered (not authRequired)
event_names = [call.args[0] for call in mock_cdp_session.on.call_args_list]
assert "Fetch.requestPaused" in event_names
assert "Fetch.authRequired" not in event_names
@pytest.mark.asyncio
async def test_malformed_event_missing_request_id(self) -> None:
"""Malformed event without requestId should be caught, not raise."""
interceptor = self._make_interceptor(proxy_username="user1", proxy_password="pass1")
cdp_session = self._make_cdp_session()
event: dict = {
"authChallenge": {"source": "Proxy", "origin": "http://proxy.example.com"},
"request": {"url": "https://example.com/page"},
}
# Should not raise — KeyError is caught by the try/except
await interceptor._handle_auth_required(event, cdp_session)
cdp_session.send.assert_not_called()
@pytest.mark.asyncio
async def test_retry_loop_prevention(self) -> None:
"""Second auth attempt for the same requestId should CancelAuth to prevent infinite loop."""
interceptor = self._make_interceptor(proxy_username="user1", proxy_password="pass1")
cdp_session = self._make_cdp_session()
event = {
"requestId": "req-retry",
"authChallenge": {"source": "Proxy", "origin": "http://proxy.example.com"},
"request": {"url": "https://example.com/page"},
}
# First attempt: should provide credentials
await interceptor._handle_auth_required(event, cdp_session)
first_call = cdp_session.send.call_args
assert first_call.args[1]["authChallengeResponse"]["response"] == "ProvideCredentials"
cdp_session.send.reset_mock()
# Second attempt (credentials rejected): should cancel
await interceptor._handle_auth_required(event, cdp_session)
second_call = cdp_session.send.call_args
assert second_call.args[1]["authChallengeResponse"]["response"] == "CancelAuth"