optimize code

This commit is contained in:
puzhen 2026-02-17 00:41:14 +00:00
parent 2d5f002ccc
commit be695fcb13
7 changed files with 858 additions and 855 deletions

View file

@ -36,6 +36,11 @@ from app.service.task import Agents
from app.utils.file_utils import get_working_directory
def _get_browser_port(browser: dict) -> int:
"""Extract port from a browser config dict, with fallback to env default."""
return int(browser.get("port", env("browser_port", "9222")))
class CdpBrowserPoolManager:
"""Manages CDP browser pool occupation to ensure
parallel tasks use different browsers."""
@ -87,6 +92,13 @@ class CdpBrowserPoolManager:
f"{list(self._occupied_ports.keys())}"
)
def clear_all(self):
"""Force-clear all occupied ports (safety net for task cleanup)."""
with self._lock:
count = len(self._occupied_ports)
self._occupied_ports.clear()
return count
def get_occupied_ports(self) -> list[int]:
"""Get list of currently occupied ports."""
with self._lock:
@ -119,9 +131,7 @@ def browser_agent(options: Chat):
options.cdp_browsers, toolkit_session_id
)
if selected_browser:
selected_port = selected_browser.get(
"port", env("browser_port", "9222")
)
selected_port = _get_browser_port(selected_browser)
selected_is_external = selected_browser.get("isExternal", False)
logger.info(
f"Acquired CDP browser from pool (initial): "
@ -129,9 +139,7 @@ def browser_agent(options: Chat):
f"session_id={toolkit_session_id}"
)
else:
selected_port = options.cdp_browsers[0].get(
"port", env("browser_port", "9222")
)
selected_port = _get_browser_port(options.cdp_browsers[0])
selected_is_external = options.cdp_browsers[0].get(
"isExternal", False
)
@ -259,12 +267,10 @@ def browser_agent(options: Chat):
options.cdp_browsers, session_id
)
if selected:
agent_instance._cdp_port = selected.get(
"port", env("browser_port", "9222")
)
agent_instance._cdp_port = _get_browser_port(selected)
else:
agent_instance._cdp_port = options.cdp_browsers[0].get(
"port", env("browser_port", "9222")
agent_instance._cdp_port = _get_browser_port(
options.cdp_browsers[0]
)
agent_instance._cdp_session_id = session_id
logger.info(

View file

@ -15,6 +15,7 @@
import asyncio
import json
import logging
import threading
from collections.abc import Callable
from threading import Event
from typing import Any
@ -52,6 +53,8 @@ logger = logging.getLogger("agent")
class ListenChatAgent(ChatAgent):
_cdp_clone_lock = threading.Lock() # Protects CDP URL mutation during clone
def __init__(
self,
api_task_id: str,
@ -700,10 +703,12 @@ class ListenChatAgent(ChatAgent):
getattr(self, "_cdp_acquire_callback", None)
)
need_cdp_clone = False
if has_cdp and hasattr(self, "_cdp_options"):
options = self._cdp_options
cdp_browsers = getattr(options, "cdp_browsers", [])
if cdp_browsers and hasattr(self, "_browser_toolkit"):
need_cdp_clone = True
import uuid as _uuid
from app.agent.factory.browser import _cdp_pool_manager
@ -712,32 +717,40 @@ class ListenChatAgent(ChatAgent):
selected = _cdp_pool_manager.acquire_browser(
cdp_browsers, new_cdp_session
)
from app.component.environment import env
from app.agent.factory.browser import _get_browser_port
if selected:
new_cdp_port = selected.get(
"port", env("browser_port", "9222")
)
new_cdp_port = _get_browser_port(selected)
else:
new_cdp_port = cdp_browsers[0].get(
"port", env("browser_port", "9222")
)
new_cdp_port = _get_browser_port(cdp_browsers[0])
# Temporarily override the browser toolkit's CDP URL
toolkit = self._browser_toolkit
if need_cdp_clone:
# Temporarily override the browser toolkit's CDP URL.
# Lock prevents concurrent clones from clobbering each
# other's cdp_url on the shared parent toolkit.
toolkit = self._browser_toolkit
with ListenChatAgent._cdp_clone_lock:
original_cdp_url = (
toolkit.config_loader.get_browser_config().cdp_url
)
toolkit.config_loader.get_browser_config().cdp_url = (
f"http://localhost:{new_cdp_port}"
)
# Clone tools and collect toolkits that need registration
cloned_tools, toolkits_to_register = self._clone_tools()
# Restore original CDP URL in parent toolkit
if new_cdp_port is not None and hasattr(self, "_browser_toolkit"):
self._browser_toolkit.config_loader.get_browser_config().cdp_url = original_cdp_url
try:
cloned_tools, toolkits_to_register = (
self._clone_tools()
)
except Exception:
_cdp_pool_manager.release_browser(
new_cdp_port, new_cdp_session
)
raise
finally:
toolkit.config_loader.get_browser_config().cdp_url = (
original_cdp_url
)
else:
cloned_tools, toolkits_to_register = self._clone_tools()
new_agent = ListenChatAgent(
api_task_id=self.api_task_id,

View file

@ -456,7 +456,7 @@ class HybridBrowserToolkit(BaseHybridBrowserToolkit, AbstractToolkit):
page_stability_timeout: int | None = None,
dom_content_loaded_timeout: int | None = None,
viewport_limit: bool = False,
connect_over_cdp: bool = True,
connect_over_cdp: bool = True, # Deprecated: auto-set to True when cdp_url is provided, kept for compatibility
cdp_url: str | None = "http://localhost:9222",
cdp_keep_current_page: bool = False,
full_visual_mode: bool = False,

View file

@ -92,7 +92,7 @@ class SingleAgentWorker(BaseSingleAgentWorker):
if len(task.content) > 100
else task.content
)
logger.info(
logger.debug(
f"[TASK REQUEST] Requesting agent for task_id={task.id}, content_preview='{task_content_preview}'"
)

View file

@ -968,7 +968,7 @@ class Workforce(BaseWorkforce):
try:
from app.agent.factory.browser import _cdp_pool_manager
_cdp_pool_manager._occupied_ports.clear()
_cdp_pool_manager.clear_all()
except Exception as e:
logger.error(f"[WF-CLEANUP] Error clearing CDP pool: {e}")

View file

@ -100,6 +100,19 @@ let cdp_browser_pool: CdpBrowser[] = [];
let cdp_browser_processes: Map<number, ChildProcessWithoutNullStreams> =
new Map();
/** Remove a non-external browser from the pool by port (used on process error/exit). */
function removeFromPoolByPort(port: number, reason: string): void {
const idx = cdp_browser_pool.findIndex(
(b) => b.port === port && !b.isExternal
);
if (idx !== -1) {
const removed = cdp_browser_pool.splice(idx, 1)[0];
log.warn(
`[CDP POOL] Auto-removed port=${port} (${reason}), id=${removed.id}, pool_size=${cdp_browser_pool.length}`
);
}
}
// Protocol URL queue for handling URLs before window is ready
let protocolUrlQueue: string[] = [];
let isWindowReady = false;
@ -424,56 +437,25 @@ function registerIpcHandlers() {
// Get all browsers in the pool
ipcMain.handle('get-cdp-browsers', () => {
log.info(`[CDP POOL GET] ========================================`);
log.info(
`[CDP POOL GET] Getting CDP browser pool at ${new Date().toISOString()}`
);
log.info(`[CDP POOL GET] Pool size: ${cdp_browser_pool.length}`);
if (cdp_browser_pool.length > 0) {
cdp_browser_pool.forEach((b, idx) => {
log.info(
`[CDP POOL GET] Browser ${idx + 1}: port=${b.port}, isExternal=${b.isExternal}, name="${b.name}", id=${b.id}`
);
});
} else {
log.warn(`[CDP POOL GET] ⚠️ Pool is EMPTY - no browsers configured`);
}
log.info(`[CDP POOL GET] ========================================`);
log.debug(`[CDP POOL] GET pool (size=${cdp_browser_pool.length})`);
return cdp_browser_pool;
});
// Get running browser processes
ipcMain.handle('get-running-browser-ports', () => {
const runningPorts = Array.from(cdp_browser_processes.keys());
log.info(`Getting running browser ports: ${runningPorts.join(', ')}`);
return runningPorts;
return Array.from(cdp_browser_processes.keys());
});
// Add browser to pool
ipcMain.handle(
'add-cdp-browser',
(event, port: number, isExternal: boolean, name?: string) => {
log.info(`[CDP POOL ADD] ========================================`);
log.info(
`[CDP POOL ADD] Request to add browser at ${new Date().toISOString()}`
);
log.info(`[CDP POOL ADD] Port: ${port}`);
log.info(`[CDP POOL ADD] Is External: ${isExternal}`);
log.info(`[CDP POOL ADD] Name: "${name}"`);
log.info(`[CDP POOL ADD] Current pool size: ${cdp_browser_pool.length}`);
// Check if browser with this port already exists
const existing = cdp_browser_pool.find((b) => b.port === port);
if (existing) {
log.warn(
`[CDP POOL ADD] ❌ REJECTED - Browser with port ${port} already exists in pool`
`[CDP POOL] ADD rejected: port ${port} already exists (id=${existing.id})`
);
log.warn(
`[CDP POOL ADD] Existing browser: id=${existing.id}, name="${existing.name}"`
);
log.info(`[CDP POOL ADD] ========================================`);
return {
success: false,
error: 'Browser with this port already exists',
@ -489,13 +471,9 @@ function registerIpcHandlers() {
};
cdp_browser_pool.push(newBrowser);
log.info(`[CDP POOL ADD] ✅ SUCCESS - Browser added to pool`);
log.info(`[CDP POOL ADD] Browser ID: ${newBrowser.id}`);
log.info(`[CDP POOL ADD] New pool size: ${cdp_browser_pool.length}`);
log.info(
`[CDP POOL ADD] All ports in pool: [${cdp_browser_pool.map((b) => b.port).join(', ')}]`
`[CDP POOL] ADD: port=${port}, isExternal=${isExternal}, id=${newBrowser.id}, pool_size=${cdp_browser_pool.length}`
);
log.info(`[CDP POOL ADD] ========================================`);
return { success: true, browser: newBrowser };
}
@ -503,43 +481,30 @@ function registerIpcHandlers() {
// Remove browser from pool
ipcMain.handle('remove-cdp-browser', (event, browserId: string) => {
log.info(`[CDP POOL REMOVE] ========================================`);
log.info(`[CDP POOL REMOVE] Request to remove browser: ${browserId}`);
const index = cdp_browser_pool.findIndex((b) => b.id === browserId);
if (index === -1) {
log.warn(`[CDP POOL REMOVE] ❌ Browser not found: ${browserId}`);
log.info(`[CDP POOL REMOVE] ========================================`);
log.warn(`[CDP POOL] REMOVE: browser not found: ${browserId}`);
return { success: false, error: 'Browser not found' };
}
const removed = cdp_browser_pool.splice(index, 1)[0];
log.info(
`[CDP POOL REMOVE] Removed browser: port=${removed.port}, name="${removed.name}"`
);
// If it's a launched browser, kill the process
if (!removed.isExternal && cdp_browser_processes.has(removed.port)) {
log.info(
`[CDP POOL REMOVE] Killing launched browser process on port ${removed.port}`
);
try {
const process = cdp_browser_processes.get(removed.port);
process?.kill();
cdp_browser_processes.delete(removed.port);
log.info(`[CDP POOL REMOVE] Browser process killed successfully`);
} catch (error) {
log.warn(
`[CDP POOL REMOVE] Failed to kill browser process on port ${removed.port}: ${error}`
`[CDP POOL] Failed to kill browser process on port ${removed.port}: ${error}`
);
}
}
log.info(
`[CDP POOL REMOVE] ✅ SUCCESS - Remaining pool size: ${cdp_browser_pool.length}`
`[CDP POOL] REMOVE: port=${removed.port}, id=${removed.id}, pool_size=${cdp_browser_pool.length}`
);
log.info(`[CDP POOL REMOVE] ========================================`);
return { success: true, browser: removed };
});
@ -595,11 +560,7 @@ function registerIpcHandlers() {
// Launch CDP browser with custom port
ipcMain.handle('launch-cdp-browser', async (event, port: number) => {
log.info(`[CDP LAUNCH] ========================================`);
log.info(
`[CDP LAUNCH] Request to launch browser at ${new Date().toISOString()}`
);
log.info(`[CDP LAUNCH] Target port: ${port}`);
log.info(`[CDP LAUNCH] Launching browser on port ${port}`);
try {
const platform = process.platform;
@ -782,10 +743,7 @@ function registerIpcHandlers() {
// Check if browser on this port is already running
if (cdp_browser_processes.has(port)) {
log.warn(
`[CDP LAUNCH] ❌ Browser process already exists on port ${port}`
);
log.info(`[CDP LAUNCH] ========================================`);
log.warn(`[CDP LAUNCH] Browser process already exists on port ${port}`);
return {
success: false,
error: `Browser already running on port ${port}`,
@ -802,9 +760,7 @@ function registerIpcHandlers() {
'about:blank',
];
log.info(`[CDP LAUNCH] Spawning Chrome process...`);
log.info(`[CDP LAUNCH] Executable: ${chromeExecutable}`);
log.info(`[CDP LAUNCH] Args: ${args.join(' ')}`);
log.info(`[CDP LAUNCH] Spawning: ${chromeExecutable} on port ${port}`);
// Spawn Chrome process
const browserProcess = spawn(chromeExecutable, args, {
@ -817,21 +773,7 @@ function registerIpcHandlers() {
`[CDP LAUNCH] Browser process error on port ${port}: ${error}`
);
cdp_browser_processes.delete(port);
// Also remove from pool if it was added
const browserInPool = cdp_browser_pool.find(
(b) => b.port === port && !b.isExternal
);
if (browserInPool) {
const index = cdp_browser_pool.indexOf(browserInPool);
cdp_browser_pool.splice(index, 1);
log.warn(
`[CDP POOL AUTO-REMOVE] Browser on port ${port} removed from pool due to process error`
);
log.info(
`[CDP POOL AUTO-REMOVE] New pool size: ${cdp_browser_pool.length}`
);
}
removeFromPoolByPort(port, 'process error');
});
browserProcess.on('exit', (code) => {
@ -839,30 +781,7 @@ function registerIpcHandlers() {
`[CDP LAUNCH] Browser process on port ${port} exited with code ${code}`
);
cdp_browser_processes.delete(port);
// Also remove from pool if it was added
const browserInPool = cdp_browser_pool.find(
(b) => b.port === port && !b.isExternal
);
if (browserInPool) {
const index = cdp_browser_pool.indexOf(browserInPool);
cdp_browser_pool.splice(index, 1);
log.warn(
`[CDP POOL AUTO-REMOVE] Browser on port ${port} removed from pool due to process exit`
);
log.info(`[CDP POOL AUTO-REMOVE] Exited with code: ${code}`);
log.info(
`[CDP POOL AUTO-REMOVE] Browser ID: ${browserInPool.id}, Name: "${browserInPool.name}"`
);
log.info(
`[CDP POOL AUTO-REMOVE] New pool size: ${cdp_browser_pool.length}`
);
if (cdp_browser_pool.length > 0) {
log.info(
`[CDP POOL AUTO-REMOVE] Remaining ports: [${cdp_browser_pool.map((b) => b.port).join(', ')}]`
);
}
}
removeFromPoolByPort(port, `exit code ${code}`);
});
// Store the process in the Map
@ -905,10 +824,6 @@ function registerIpcHandlers() {
log.info(
`[CDP LAUNCH] ⚠️ NOTE: Browser launched but NOT added to pool yet`
);
log.info(
`[CDP LAUNCH] ⚠️ The UI must call 'add-cdp-browser' to add it to the pool`
);
log.info(`[CDP LAUNCH] ========================================`);
// This is our own launched browser, not external
use_external_cdp = false;
return {
@ -932,21 +847,22 @@ function registerIpcHandlers() {
}
// If we get here, browser didn't respond within max wait time
// Kill the orphaned process to avoid resource leak
const proc = cdp_browser_processes.get(port);
if (proc) {
proc.kill();
cdp_browser_processes.delete(port);
}
const totalTime = Date.now() - startTime;
log.warn(
`[CDP LAUNCH] ❌ Verification failed after ${totalTime}ms (${attempt} attempts)`
`[CDP LAUNCH] Verification failed after ${totalTime}ms (${attempt} attempts), last error: ${lastError?.code || lastError?.message || 'Unknown'}`
);
log.warn(
`[CDP LAUNCH] Last error: ${lastError?.code || lastError?.message || 'Unknown'}`
);
log.info(`[CDP LAUNCH] ========================================`);
return {
success: false,
error: `Browser launched but not responding on CDP port after ${totalTime}ms`,
};
} catch (error: any) {
log.error(`[CDP LAUNCH] ❌ FAILED to launch browser: ${error}`);
log.info(`[CDP LAUNCH] ========================================`);
log.error(`[CDP LAUNCH] Failed to launch browser: ${error}`);
return {
success: false,
error: error.message,

File diff suppressed because it is too large Load diff