From 7bfb1e9b21000d36e6c24bb303ebe5581b9989d2 Mon Sep 17 00:00:00 2001 From: Maksim Ivanov Date: Wed, 8 Jan 2025 18:14:38 +0100 Subject: [PATCH] Define browser manager API (#1497) Co-authored-by: Shuchang Zheng --- scripts/test_persistent_browsers.py | 232 ++++++++++++++++++ skyvern/forge/app.py | 2 + skyvern/forge/sdk/db/client.py | 174 ++++++++++++- skyvern/forge/sdk/routes/agent_protocol.py | 95 +++++++ .../schemas/persistent_browser_sessions.py | 15 ++ skyvern/webeye/browser_factory.py | 36 ++- skyvern/webeye/persistent_sessions_manager.py | 191 ++++++++++++++ skyvern/webeye/schemas.py | 29 +++ 8 files changed, 764 insertions(+), 10 deletions(-) create mode 100644 scripts/test_persistent_browsers.py create mode 100644 skyvern/forge/sdk/schemas/persistent_browser_sessions.py create mode 100644 skyvern/webeye/persistent_sessions_manager.py create mode 100644 skyvern/webeye/schemas.py diff --git a/scripts/test_persistent_browsers.py b/scripts/test_persistent_browsers.py new file mode 100644 index 00000000..b67c51b1 --- /dev/null +++ b/scripts/test_persistent_browsers.py @@ -0,0 +1,232 @@ +import json +import os +from typing import Any, Optional + +import requests +from dotenv import load_dotenv + +from skyvern.forge import app + +load_dotenv("./skyvern-frontend/.env") +API_KEY = os.getenv("VITE_SKYVERN_API_KEY") + +API_BASE_URL = "http://localhost:8000/api/v1" +HEADERS = {"x-api-key": API_KEY, "Content-Type": "application/json"} + + +def make_request(method: str, endpoint: str, data: Optional[dict[str, Any]] = None) -> requests.Response: + """Helper function to make API requests""" + url = f"{API_BASE_URL}{endpoint}" + try: + response = requests.request(method=method, url=url, headers=HEADERS, json=data) + response.raise_for_status() + return response + except requests.exceptions.RequestException as e: + print(f"\nRequest failed: {method} {url}") + if hasattr(e, "response") and e.response is not None: + print(f"Status code: {e.response.status_code}") + try: + error_detail = e.response.json() if e.response is not None else str(e) + print(f"Error details: {json.dumps(error_detail, indent=2)}") + except json.JSONDecodeError: + print(f"Raw error response: {e.response.text}") + else: + print("Status code: N/A") + print(f"Error details: {str(e)}") + raise + + +def list_sessions() -> None: + """List all active browser sessions""" + try: + response = make_request("GET", "/browser_sessions") + sessions = response.json() + print("\nActive browser sessions:") + if not sessions: + print(" No active sessions found") + return + for session in sessions: + try: + print(json.dumps(session, indent=2)) + print(" ---") + except Exception as e: + print(f" Error parsing session data: {session}") + print(f" Error: {str(e)}") + except Exception as e: + print(f"Error listing sessions: {str(e)}") + + +def create_session() -> Optional[str]: + """Create a new browser session""" + try: + response = make_request("POST", "/browser_sessions") + session = response.json() + print("\nCreated new browser session:") + try: + print(f" ID: {session.get('browser_session_id', 'N/A')}") + print(f" Status: {session.get('status', 'N/A')}") + print(f"Full response: {json.dumps(session, indent=2)}") + return session.get("browser_session_id") + except Exception as e: + print(f"Error parsing response: {session}") + print(f"Error: {str(e)}") + return None + except Exception as e: + print(f"Error creating session: {str(e)}") + return None + + +def get_session(session_id: str) -> None: + """Get details of a specific browser session""" + try: + response = make_request("GET", f"/browser_sessions/{session_id}") + session = response.json() + print("\nBrowser session details:") + print(json.dumps(session, indent=2)) + except Exception as e: + print(f"Error getting session: {str(e)}") + + +def close_all_sessions() -> None: + """Close all active browser sessions""" + try: + response = make_request("POST", "/browser_sessions/close") + print("\nClosed all browser sessions") + print(f"Response: {response.json()}") + except Exception as e: + print(f"Error closing sessions: {str(e)}") + + +async def direct_get_network_info(session_id: str) -> None: + """Get network info directly from PersistentSessionsManager""" + try: + manager = app.PERSISTENT_SESSIONS_MANAGER + cdp_port, ip_address = await manager.get_network_info(session_id) + print("\nNetwork info:") + print(f" CDP Port: {cdp_port}") + print(f" IP Address: {ip_address}") + except Exception as e: + print(f"Error getting network info: {str(e)}") + + +async def direct_list_sessions(organization_id: str) -> None: + """List sessions directly from PersistentSessionsManager""" + try: + manager = app.PERSISTENT_SESSIONS_MANAGER + sessions = await manager.get_active_sessions(organization_id) + print("\nActive browser sessions (direct):") + if not sessions: + print(" No active sessions found") + return + for session in sessions: + print(json.dumps(session.model_dump(), indent=2)) + print(" ---") + except Exception as e: + print(f"Error listing sessions directly: {str(e)}") + + +def print_direct_help() -> None: + """Print available direct commands""" + print("\nAvailable direct commands:") + print(" direct_list - List all active browser sessions directly") + print(" direct_network - Get network info directly") + print(" help_direct - Show this help message") + + +async def handle_direct_command(cmd: str, args: list[str]) -> None: + """Handle direct method calls""" + if cmd == "help_direct": + print_direct_help() + elif cmd == "direct_network": + if not args: + print("Error: session_id required") + return + await direct_get_network_info(args[0]) + elif cmd == "direct_list": + if not args: + print("Error: organization_id required") + return + await direct_list_sessions(args[0]) + else: + print(f"Unknown direct command: {cmd}") + print("Type 'help_direct' for available direct commands") + + +def close_session(session_id: str) -> None: + """Close a specific browser session""" + try: + response = make_request("POST", f"/browser_sessions/{session_id}/close") + print("\nClosed browser session:") + print(f"Response: {response.json()}") + except Exception as e: + print(f"Error closing session: {str(e)}") + + +def print_help() -> None: + """Print available commands""" + print("\nHTTP API Commands:") + print(" list - List all active browser sessions") + print(" create - Create a new browser session") + print(" get - Get details of a specific session") + print(" close - Close a specific session") + print(" close_all - Close all active browser sessions") + print(" help - Show this help message") + print("\nDirect Method Commands:") + print(" direct_list - List sessions directly") + print(" direct_network - Get network info directly") + print(" help_direct - Show direct command help") + print("\nOther Commands:") + print(" exit - Exit the program") + + +async def main() -> None: + print("Browser Sessions Testing CLI") + print("Type 'help' for available commands") + + while True: + try: + command = input("\n> ").strip() + + if command == "": + continue + + parts = command.split() + cmd = parts[0] + args = parts[1:] + + if cmd == "exit": + break + elif cmd.startswith("direct_") or cmd == "help_direct": + await handle_direct_command(cmd, args) + elif cmd == "help": + print_help() + elif cmd == "list": + list_sessions() + elif cmd == "create": + create_session() + elif cmd == "get": + if not args: + print("Error: session_id required") + continue + get_session(args[0]) + elif cmd == "close": + if not args: + print("Error: session_id required") + continue + close_session(args[0]) + elif cmd == "close_all": + close_all_sessions() + else: + print(f"Unknown command: {cmd}") + print("Type 'help' for available commands") + + except KeyboardInterrupt: + print("\nUse 'exit' to quit") + except Exception as e: + print(f"Error: {str(e)}") + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/skyvern/forge/app.py b/skyvern/forge/app.py index 49f5e45a..926e6349 100644 --- a/skyvern/forge/app.py +++ b/skyvern/forge/app.py @@ -16,6 +16,7 @@ from skyvern.forge.sdk.settings_manager import SettingsManager from skyvern.forge.sdk.workflow.context_manager import WorkflowContextManager from skyvern.forge.sdk.workflow.service import WorkflowService from skyvern.webeye.browser_manager import BrowserManager +from skyvern.webeye.persistent_sessions_manager import PersistentSessionsManager from skyvern.webeye.scraper.scraper import ScrapeExcludeFunc SETTINGS_MANAGER = SettingsManager.get_settings() @@ -37,6 +38,7 @@ SECONDARY_LLM_API_HANDLER = LLMAPIHandlerFactory.get_llm_api_handler( WORKFLOW_CONTEXT_MANAGER = WorkflowContextManager() WORKFLOW_SERVICE = WorkflowService() AGENT_FUNCTION = AgentFunction() +PERSISTENT_SESSIONS_MANAGER = PersistentSessionsManager(database=DATABASE) scrape_exclude: ScrapeExcludeFunc | None = None authentication_function: Callable[[str], Awaitable[Organization]] | None = None setup_api_app: Callable[[FastAPI], None] | None = None diff --git a/skyvern/forge/sdk/db/client.py b/skyvern/forge/sdk/db/client.py index 3f69e821..d1468a7c 100644 --- a/skyvern/forge/sdk/db/client.py +++ b/skyvern/forge/sdk/db/client.py @@ -1,6 +1,6 @@ import json from datetime import datetime, timedelta -from typing import Any, Sequence +from typing import Any, List, Optional, Sequence import structlog from sqlalchemy import and_, delete, func, select, update @@ -24,6 +24,7 @@ from skyvern.forge.sdk.db.models import ( OrganizationAuthTokenModel, OrganizationModel, OutputParameterModel, + PersistentBrowserSessionModel, StepModel, TaskGenerationModel, TaskModel, @@ -62,6 +63,7 @@ from skyvern.forge.sdk.schemas.observers import ( ObserverThoughtType, ) from skyvern.forge.sdk.schemas.organizations import Organization, OrganizationAuthToken +from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession from skyvern.forge.sdk.schemas.task_generations import TaskGeneration from skyvern.forge.sdk.schemas.tasks import OrderBy, ProxyLocation, SortDirection, Task, TaskStatus from skyvern.forge.sdk.schemas.totp_codes import TOTPCode @@ -2251,3 +2253,173 @@ class AgentDB: convert_to_workflow_run_block(workflow_run_block, task=tasks_dict.get(workflow_run_block.task_id)) for workflow_run_block in workflow_run_blocks ] + + async def get_active_persistent_browser_sessions(self, organization_id: str) -> List[PersistentBrowserSession]: + """Get all active persistent browser sessions for an organization.""" + try: + async with self.Session() as session: + result = await session.execute( + select(PersistentBrowserSessionModel) + .filter_by(organization_id=organization_id) + .filter_by(deleted_at=None) + ) + sessions = result.scalars().all() + return [PersistentBrowserSession.model_validate(session) for session in sessions] + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + + async def get_persistent_browser_session( + self, session_id: str, organization_id: str + ) -> Optional[PersistentBrowserSessionModel]: + """Get a specific persistent browser session.""" + try: + async with self.Session() as session: + persistent_browser_session = ( + await session.scalars( + select(PersistentBrowserSessionModel) + .filter_by(persistent_browser_session_id=session_id) + .filter_by(organization_id=organization_id) + .filter_by(deleted_at=None) + ) + ).first() + if persistent_browser_session: + return PersistentBrowserSession.model_validate(persistent_browser_session) + raise NotFoundError(f"PersistentBrowserSession {session_id} not found") + except NotFoundError: + LOG.error("NotFoundError", exc_info=True) + raise + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + + async def create_persistent_browser_session( + self, + organization_id: str, + runnable_type: str | None = None, + runnable_id: str | None = None, + ) -> PersistentBrowserSessionModel: + """Create a new persistent browser session.""" + try: + async with self.Session() as session: + browser_session = PersistentBrowserSessionModel( + organization_id=organization_id, + runnable_type=runnable_type, + runnable_id=runnable_id, + ) + session.add(browser_session) + await session.commit() + await session.refresh(browser_session) + return PersistentBrowserSession.model_validate(browser_session) + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + + async def mark_persistent_browser_session_deleted(self, session_id: str, organization_id: str) -> None: + """Mark a persistent browser session as deleted.""" + try: + async with self.Session() as session: + persistent_browser_session = ( + await session.scalars( + select(PersistentBrowserSessionModel) + .filter_by(persistent_browser_session_id=session_id) + .filter_by(organization_id=organization_id) + ) + ).first() + if persistent_browser_session: + persistent_browser_session.deleted_at = datetime.utcnow() + await session.commit() + await session.refresh(persistent_browser_session) + else: + raise NotFoundError(f"PersistentBrowserSession {session_id} not found") + except NotFoundError: + LOG.error("NotFoundError", exc_info=True) + raise + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + + async def occupy_persistent_browser_session( + self, session_id: str, runnable_type: str, runnable_id: str, organization_id: str + ) -> None: + """Occupy a specific persistent browser session.""" + try: + async with self.Session() as session: + persistent_browser_session = ( + await session.scalars( + select(PersistentBrowserSessionModel) + .filter_by(persistent_browser_session_id=session_id) + .filter_by(organization_id=organization_id) + .filter_by(deleted_at=None) + ) + ).first() + if persistent_browser_session: + persistent_browser_session.runnable_type = runnable_type + persistent_browser_session.runnable_id = runnable_id + await session.commit() + await session.refresh(persistent_browser_session) + else: + raise NotFoundError(f"PersistentBrowserSession {session_id} not found") + except NotFoundError: + LOG.error("NotFoundError", exc_info=True) + raise + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + + async def release_persistent_browser_session(self, session_id: str, organization_id: str) -> None: + """Release a specific persistent browser session.""" + try: + async with self.Session() as session: + persistent_browser_session = ( + await session.scalars( + select(PersistentBrowserSessionModel) + .filter_by(persistent_browser_session_id=session_id) + .filter_by(organization_id=organization_id) + .filter_by(deleted_at=None) + ) + ).first() + if persistent_browser_session: + persistent_browser_session.runnable_type = None + persistent_browser_session.runnable_id = None + await session.commit() + await session.refresh(persistent_browser_session) + else: + raise NotFoundError(f"PersistentBrowserSession {session_id} not found") + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except NotFoundError: + LOG.error("NotFoundError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise + + async def get_all_active_persistent_browser_sessions(self) -> List[PersistentBrowserSessionModel]: + """Get all active persistent browser sessions across all organizations.""" + try: + async with self.Session() as session: + result = await session.execute(select(PersistentBrowserSessionModel).filter_by(deleted_at=None)) + return result.scalars().all() + except SQLAlchemyError: + LOG.error("SQLAlchemyError", exc_info=True) + raise + except Exception: + LOG.error("UnexpectedError", exc_info=True) + raise diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 466fc988..644fddc3 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -69,6 +69,7 @@ from skyvern.forge.sdk.workflow.models.workflow import ( ) from skyvern.forge.sdk.workflow.models.yaml import WorkflowCreateYAMLRequest from skyvern.webeye.actions.actions import Action +from skyvern.webeye.schemas import BrowserSessionResponse base_router = APIRouter() @@ -1123,3 +1124,97 @@ async def get_observer_cruise( if not observer_cruise: raise HTTPException(status_code=404, detail=f"Observer cruise {observer_cruise_id} not found") return observer_cruise + + +@base_router.get( + "/browser_sessions/{browser_session_id}", + response_model=BrowserSessionResponse, +) +@base_router.get( + "/browser_sessions/{browser_session_id}/", + response_model=BrowserSessionResponse, + include_in_schema=False, +) +async def get_browser_session_by_id( + browser_session_id: str, + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> BrowserSessionResponse: + analytics.capture("skyvern-oss-agent-workflow-run-get") + browser_session = await app.PERSISTENT_SESSIONS_MANAGER.get_session( + browser_session_id, + current_org.organization_id, + ) + if not browser_session: + raise HTTPException(status_code=404, detail=f"Browser session {browser_session_id} not found") + return BrowserSessionResponse.from_browser_session(browser_session) + + +@base_router.get( + "/browser_sessions", + response_model=list[BrowserSessionResponse], +) +@base_router.get( + "/browser_sessions/", + response_model=list[BrowserSessionResponse], + include_in_schema=False, +) +async def get_browser_sessions( + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> list[BrowserSessionResponse]: + """Get all active browser sessions for the organization""" + analytics.capture("skyvern-oss-agent-browser-sessions-get") + browser_sessions = await app.PERSISTENT_SESSIONS_MANAGER.get_active_sessions(current_org.organization_id) + return [BrowserSessionResponse.from_browser_session(browser_session) for browser_session in browser_sessions] + + +@base_router.post( + "/browser_sessions", + response_model=BrowserSessionResponse, +) +@base_router.post( + "/browser_sessions/", + response_model=BrowserSessionResponse, + include_in_schema=False, +) +async def create_browser_session( + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> BrowserSessionResponse: + browser_session, _ = await app.PERSISTENT_SESSIONS_MANAGER.create_session(current_org.organization_id) + return BrowserSessionResponse.from_browser_session(browser_session) + + +@base_router.post( + "/browser_sessions/close", +) +@base_router.post( + "/browser_sessions/close/", + include_in_schema=False, +) +async def close_browser_sessions( + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> ORJSONResponse: + await app.PERSISTENT_SESSIONS_MANAGER.close_all_sessions(current_org.organization_id) + return ORJSONResponse( + content={"message": "All browser sessions closed"}, + status_code=200, + media_type="application/json", + ) + + +@base_router.post( + "/browser_sessions/{session_id}/close", +) +@base_router.post( + "/browser_sessions/{session_id}/close/", + include_in_schema=False, +) +async def close_browser_session( + session_id: str, + current_org: Organization = Depends(org_auth_service.get_current_org), +) -> ORJSONResponse: + await app.PERSISTENT_SESSIONS_MANAGER.close_session(current_org.organization_id, session_id) + return ORJSONResponse( + content={"message": "Browser session closed"}, + status_code=200, + media_type="application/json", + ) diff --git a/skyvern/forge/sdk/schemas/persistent_browser_sessions.py b/skyvern/forge/sdk/schemas/persistent_browser_sessions.py new file mode 100644 index 00000000..44b7f03d --- /dev/null +++ b/skyvern/forge/sdk/schemas/persistent_browser_sessions.py @@ -0,0 +1,15 @@ +from datetime import datetime + +from pydantic import BaseModel, ConfigDict + + +class PersistentBrowserSession(BaseModel): + model_config = ConfigDict(from_attributes=True) + + persistent_browser_session_id: str + organization_id: str + runnable_type: str | None = None + runnable_id: str | None = None + created_at: datetime + modified_at: datetime + deleted_at: datetime | None = None diff --git a/skyvern/webeye/browser_factory.py b/skyvern/webeye/browser_factory.py index 594f5a2d..ae4ff065 100644 --- a/skyvern/webeye/browser_factory.py +++ b/skyvern/webeye/browser_factory.py @@ -161,20 +161,26 @@ class BrowserContextFactory: f.write(preference_file_content) @staticmethod - def build_browser_args(proxy_location: ProxyLocation | None = None) -> dict[str, Any]: + def build_browser_args(proxy_location: ProxyLocation | None = None, cdp_port: int | None = None) -> dict[str, Any]: video_dir = f"{settings.VIDEO_PATH}/{datetime.utcnow().strftime('%Y-%m-%d')}" har_dir = ( f"{settings.HAR_PATH}/{datetime.utcnow().strftime('%Y-%m-%d')}/{BrowserContextFactory.get_subdir()}.har" ) + + browser_args = [ + "--disable-blink-features=AutomationControlled", + "--disk-cache-size=1", + "--start-maximized", + "--kiosk-printing", + ] + + if cdp_port: + browser_args.append(f"--remote-debugging-port={cdp_port}") + args = { "locale": settings.BROWSER_LOCALE, "color_scheme": "no-preference", - "args": [ - "--disable-blink-features=AutomationControlled", - "--disk-cache-size=1", - "--start-maximized", - "--kiosk-printing", - ], + "args": browser_args, "ignore_default_args": [ "--enable-automation", ], @@ -287,6 +293,16 @@ class BrowserArtifacts(BaseModel): return await f.read() +def _get_cdp_port(kwargs: dict) -> int | None: + raw_cdp_port = kwargs.get("cdp_port") + if isinstance(raw_cdp_port, (int, str)): + try: + return int(raw_cdp_port) + except (ValueError, TypeError): + return None + return None + + async def _create_headless_chromium( playwright: Playwright, proxy_location: ProxyLocation | None = None, **kwargs: dict ) -> tuple[BrowserContext, BrowserArtifacts, BrowserCleanupFunc]: @@ -296,7 +312,8 @@ async def _create_headless_chromium( user_data_dir=user_data_dir, download_dir=download_dir, ) - browser_args = BrowserContextFactory.build_browser_args(proxy_location=proxy_location) + cdp_port: int | None = _get_cdp_port(kwargs) + browser_args = BrowserContextFactory.build_browser_args(proxy_location=proxy_location, cdp_port=cdp_port) browser_args.update( { "user_data_dir": user_data_dir, @@ -318,7 +335,8 @@ async def _create_headful_chromium( user_data_dir=user_data_dir, download_dir=download_dir, ) - browser_args = BrowserContextFactory.build_browser_args(proxy_location=proxy_location) + cdp_port: int | None = _get_cdp_port(kwargs) + browser_args = BrowserContextFactory.build_browser_args(proxy_location=proxy_location, cdp_port=cdp_port) browser_args.update( { "user_data_dir": user_data_dir, diff --git a/skyvern/webeye/persistent_sessions_manager.py b/skyvern/webeye/persistent_sessions_manager.py new file mode 100644 index 00000000..721be1fd --- /dev/null +++ b/skyvern/webeye/persistent_sessions_manager.py @@ -0,0 +1,191 @@ +from __future__ import annotations + +import asyncio +import socket +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple + +import structlog +from playwright.async_api import async_playwright + +from skyvern.forge.sdk.db.client import AgentDB +from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession +from skyvern.forge.sdk.schemas.tasks import ProxyLocation +from skyvern.webeye.browser_factory import BrowserContextFactory, BrowserState + +LOG = structlog.get_logger() + + +@dataclass +class BrowserSession: + browser_state: BrowserState + cdp_port: int + cdp_host: str = "localhost" + + +class PersistentSessionsManager: + instance = None + _browser_sessions: Dict[str, BrowserSession] = dict() + database: AgentDB + + def __new__(cls, database: AgentDB) -> PersistentSessionsManager: + if cls.instance is None: + cls.instance = super().__new__(cls) + cls.instance.database = database + return cls.instance + + async def get_active_sessions(self, organization_id: str) -> List[PersistentBrowserSession]: + """Get all active sessions for an organization.""" + return await self.database.get_active_persistent_browser_sessions(organization_id) + + def get_browser_state(self, session_id: str) -> BrowserState | None: + """Get a specific browser session's state by session ID.""" + browser_session = self._browser_sessions.get(session_id) + return browser_session.browser_state if browser_session else None + + async def get_session(self, session_id: str, organization_id: str) -> Optional[PersistentBrowserSession]: + """Get a specific browser session by session ID.""" + return await self.database.get_persistent_browser_session(session_id, organization_id) + + async def create_session( + self, + organization_id: str, + proxy_location: ProxyLocation | None = None, + url: str | None = None, + runnable_id: str | None = None, + runnable_type: str | None = None, + ) -> Tuple[PersistentBrowserSession, BrowserState]: + """Create a new browser session for an organization and return its ID with the browser state.""" + + LOG.info( + "Creating new browser session", + organization_id=organization_id, + ) + + browser_session_db = await self.database.create_persistent_browser_session( + organization_id=organization_id, + runnable_type=runnable_type, + runnable_id=runnable_id, + ) + + cdp_port = None + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + cdp_port = s.getsockname()[1] + + session_id = browser_session_db.persistent_browser_session_id + + pw = await async_playwright().start() + browser_context, browser_artifacts, browser_cleanup = await BrowserContextFactory.create_browser_context( + pw, + proxy_location=proxy_location, + url=url, + organization_id=organization_id, + cdp_port=cdp_port, + ) + + async def on_context_close() -> None: + await self._clean_up_on_session_close(session_id, organization_id) + + browser_context.on("close", lambda: asyncio.create_task(on_context_close())) + + browser_state = BrowserState( + pw=pw, + browser_context=browser_context, + page=None, + browser_artifacts=browser_artifacts, + browser_cleanup=browser_cleanup, + ) + + browser_session = BrowserSession( + browser_state=browser_state, + cdp_port=cdp_port, + ) + LOG.info( + "Created new browser session", + session_id=session_id, + cdp_port=cdp_port, + cdp_host="localhost", + ) + self._browser_sessions[session_id] = browser_session + + if url: + await browser_state.get_or_create_page( + url=url, + proxy_location=proxy_location, + organization_id=organization_id, + ) + + return browser_session_db, browser_state + + async def occupy_browser_session( + self, + session_id: str, + runnable_type: str, + runnable_id: str, + organization_id: str, + ) -> None: + """Occupy a specific browser session.""" + await self.database.occupy_persistent_browser_session( + session_id=session_id, + runnable_type=runnable_type, + runnable_id=runnable_id, + organization_id=organization_id, + ) + + async def get_network_info(self, session_id: str) -> Tuple[Optional[int], Optional[str]]: + """Returns cdp port and ip address of the browser session""" + browser_session = self._browser_sessions.get(session_id) + if browser_session: + return ( + browser_session.cdp_port, + browser_session.cdp_host, + ) + return None, None + + async def release_browser_session(self, session_id: str, organization_id: str) -> None: + """Release a specific browser session.""" + await self.database.release_persistent_browser_session(session_id, organization_id) + + async def _clean_up_on_session_close(self, session_id: str, organization_id: str) -> None: + """Clean up session data when browser session is closed""" + browser_session = self._browser_sessions.get(session_id) + if browser_session: + await self.database.mark_persistent_browser_session_deleted(session_id, organization_id) + self._browser_sessions.pop(session_id, None) + + async def close_session(self, organization_id: str, session_id: str) -> None: + """Close a specific browser session.""" + browser_session = self._browser_sessions.get(session_id) + if browser_session: + LOG.info( + "Closing browser session", + organization_id=organization_id, + session_id=session_id, + ) + self._browser_sessions.pop(session_id, None) + await browser_session.browser_state.close() + else: + LOG.info( + "Browser session not found in memory, marking as deleted in database", + organization_id=organization_id, + session_id=session_id, + ) + + await self.database.mark_persistent_browser_session_deleted(session_id, organization_id) + + async def close_all_sessions(self, organization_id: str) -> None: + """Close all browser sessions for an organization.""" + browser_sessions = await self.database.get_active_persistent_browser_sessions(organization_id) + for browser_session in browser_sessions: + await self.close_session(organization_id, browser_session.persistent_browser_session_id) + + @classmethod + async def close(cls) -> None: + """Close all browser sessions across all organizations.""" + LOG.info("Closing PersistentSessionsManager") + if cls.instance: + active_sessions = await cls.instance.database.get_all_active_persistent_browser_sessions() + for db_session in active_sessions: + await cls.instance.close_session(db_session.organization_id, db_session.persistent_browser_session_id) + LOG.info("PersistentSessionsManager is closed") diff --git a/skyvern/webeye/schemas.py b/skyvern/webeye/schemas.py new file mode 100644 index 00000000..57d0a7bf --- /dev/null +++ b/skyvern/webeye/schemas.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel + +from skyvern.forge.sdk.schemas.persistent_browser_sessions import PersistentBrowserSession + + +class BrowserSessionResponse(BaseModel): + session_id: str + organization_id: str + runnable_type: str | None = None + runnable_id: str | None = None + created_at: datetime + modified_at: datetime + deleted_at: datetime | None = None + + @classmethod + def from_browser_session(cls, browser_session: PersistentBrowserSession) -> BrowserSessionResponse: + return cls( + session_id=browser_session.persistent_browser_session_id, + organization_id=browser_session.organization_id, + runnable_type=browser_session.runnable_type, + runnable_id=browser_session.runnable_id, + created_at=browser_session.created_at, + modified_at=browser_session.modified_at, + deleted_at=browser_session.deleted_at, + )