diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index 59f25c13..a92a166e 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -30,21 +30,31 @@ jobs: run: uv sync --group dev - name: Run pre-commit - run: >- - uv run pre-commit run --files - app/agent/README.md - app/agent/__init__.py - app/agent/factory/__init__.py - app/agent/factory/social_media.py - app/service/chat_service.py - app/service/task.py - app/utils/toolkit/google_calendar_toolkit.py - app/utils/toolkit/google_gmail_mcp_toolkit.py - app/utils/toolkit/linkedin_toolkit.py - app/utils/toolkit/reddit_toolkit.py - app/utils/toolkit/slack_toolkit.py - app/utils/toolkit/twitter_toolkit.py - app/utils/toolkit/whatsapp_toolkit.py - tests/app/agent/factory/test_social_media.py + run: | + uv run pre-commit run --files \ + $(find \ + app/agent \ + app/controller \ + app/exception \ + app/middleware \ + app/model \ + app/service \ + tests/app \ + -type f ! -path '*__pycache__*') \ + app/__init__.py \ + app/router.py \ + app/component/__init__.py \ + app/component/pydantic/__init__.py \ + app/utils/listen/__init__.py \ + app/utils/server/__init__.py \ + app/utils/toolkit/__init__.py \ + app/utils/toolkit/google_calendar_toolkit.py \ + app/utils/toolkit/google_gmail_mcp_toolkit.py \ + app/utils/toolkit/linkedin_toolkit.py \ + app/utils/toolkit/reddit_toolkit.py \ + app/utils/toolkit/slack_toolkit.py \ + app/utils/toolkit/twitter_toolkit.py \ + app/utils/toolkit/whatsapp_toolkit.py \ + tests/conftest.py env: SKIP: no-commit-to-branch diff --git a/backend/app/__init__.py b/backend/app/__init__.py index 70d933d2..4345ae6a 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -20,5 +20,9 @@ api = FastAPI(title="Eigent Multi-Agent System API") # Add CORS middleware api.add_middleware( - CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"] + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"] ) diff --git a/backend/app/agent/agent_model.py b/backend/app/agent/agent_model.py index 08c1f3d1..2cc9e1a0 100644 --- a/backend/app/agent/agent_model.py +++ b/backend/app/agent/agent_model.py @@ -19,21 +19,23 @@ import uuid from threading import Lock from typing import Any, Callable -from app.agent.listen_chat_agent import ListenChatAgent, logger -from app.model.chat import AgentModelConfig, Chat -from app.service.task import ActionCreateAgentData, Agents, get_task_lock from camel.messages import BaseMessage from camel.models import ModelFactory from camel.toolkits import FunctionTool, RegisteredAgentToolkit from camel.types import ModelPlatformType +from app.agent.listen_chat_agent import ListenChatAgent, logger +from app.model.chat import AgentModelConfig, Chat +from app.service.task import ActionCreateAgentData, Agents, get_task_lock + # Thread-safe reference to main event loop using contextvars # This ensures each request has its own event loop reference, # avoiding race conditions _main_event_loop_var: contextvars.ContextVar[asyncio.AbstractEventLoop | None] = contextvars.ContextVar( "_main_event_loop", - default=None) + default=None + ) # Global fallback for main event loop reference # Used when contextvars don't propagate to worker threads @@ -77,10 +79,12 @@ def _schedule_async_task(coro): asyncio.run_coroutine_threadsafe(coro, main_loop) else: # This should not happen in normal operation - log error and skip - logging.error("No event loop available for async task " - "scheduling, task skipped. Ensure " - "set_main_event_loop() is called " - "before parallel agent creation.") + logging.error( + "No event loop available for async task " + "scheduling, task skipped. Ensure " + "set_main_event_loop() is called " + "before parallel agent creation." + ) def agent_model( @@ -96,8 +100,10 @@ def agent_model( ): task_lock = get_task_lock(options.project_id) agent_id = str(uuid.uuid4()) - logger.info(f"Creating agent: {agent_name} with id: {agent_id} " - f"for project: {options.project_id}") + logger.info( + f"Creating agent: {agent_name} with id: {agent_id} " + f"for project: {options.project_id}" + ) # Use thread-safe scheduling to support parallel agent creation _schedule_async_task( task_lock.put_queue( @@ -106,7 +112,10 @@ def agent_model( "agent_name": agent_name, "agent_id": agent_id, "tools": tool_names or [], - }))) + } + ) + ) + ) # Determine model configuration - use custom config if provided, # otherwise use task defaults @@ -117,11 +126,14 @@ def agent_model( for attr in config_attrs: effective_config[attr] = getattr(custom_model_config, attr, None) or getattr(options, attr) - extra_params = (custom_model_config.extra_params - or options.extra_params or {}) - logger.info(f"Agent {agent_name} using custom model config: " - f"platform={effective_config['model_platform']}, " - f"type={effective_config['model_type']}") + extra_params = ( + custom_model_config.extra_params or options.extra_params or {} + ) + logger.info( + f"Agent {agent_name} using custom model config: " + f"platform={effective_config['model_platform']}, " + f"type={effective_config['model_type']}" + ) else: for attr in config_attrs: effective_config[attr] = getattr(options, attr) @@ -163,13 +175,14 @@ def agent_model( if agent_name == Agents.browser_agent: try: model_platform_enum = ModelPlatformType( - effective_config["model_platform"].lower()) + effective_config["model_platform"].lower() + ) if model_platform_enum in { - ModelPlatformType.OPENAI, - ModelPlatformType.AZURE, - ModelPlatformType.OPENAI_COMPATIBLE_MODEL, - ModelPlatformType.LITELLM, - ModelPlatformType.OPENROUTER, + ModelPlatformType.OPENAI, + ModelPlatformType.AZURE, + ModelPlatformType.OPENAI_COMPATIBLE_MODEL, + ModelPlatformType.LITELLM, + ModelPlatformType.OPENROUTER, }: model_config["parallel_tool_calls"] = False except (ValueError, AttributeError): diff --git a/backend/app/agent/factory/browser.py b/backend/app/agent/factory/browser.py index 07091169..eed38e11 100644 --- a/backend/app/agent/factory/browser.py +++ b/backend/app/agent/factory/browser.py @@ -15,6 +15,9 @@ import platform import uuid +from camel.messages import BaseMessage +from camel.toolkits import ToolkitMessageIntegration + from app.agent.agent_model import agent_model from app.agent.listen_chat_agent import logger from app.agent.prompt import BROWSER_SYS_PROMPT @@ -25,21 +28,23 @@ from app.service.task import Agents from app.utils.file_utils import get_working_directory from app.utils.toolkit.human_toolkit import HumanToolkit from app.utils.toolkit.hybrid_browser_toolkit import HybridBrowserToolkit + # TODO: Remove NoteTakingToolkit and use TerminalToolkit instead from app.utils.toolkit.note_taking_toolkit import NoteTakingToolkit from app.utils.toolkit.search_toolkit import SearchToolkit from app.utils.toolkit.terminal_toolkit import TerminalToolkit -from camel.messages import BaseMessage -from camel.toolkits import ToolkitMessageIntegration def browser_agent(options: Chat): working_directory = get_working_directory(options) - logger.info(f"Creating browser agent for project: {options.project_id} " - f"in directory: {working_directory}") + logger.info( + f"Creating browser agent for project: {options.project_id} " + f"in directory: {working_directory}" + ) message_integration = ToolkitMessageIntegration( - message_handler=HumanToolkit( - options.project_id, Agents.browser_agent).send_message_to_user) + message_handler=HumanToolkit(options.project_id, Agents.browser_agent + ).send_message_to_user + ) web_toolkit_custom = HybridBrowserToolkit( options.project_id, @@ -70,7 +75,8 @@ def browser_agent(options: Chat): # Save reference before registering for toolkits_to_register_agent web_toolkit_for_agent_registration = web_toolkit_custom web_toolkit_custom = message_integration.register_toolkits( - web_toolkit_custom) + web_toolkit_custom + ) terminal_toolkit = TerminalToolkit( options.project_id, @@ -80,11 +86,14 @@ def browser_agent(options: Chat): clone_current_env=True, ) terminal_toolkit = message_integration.register_functions( - [terminal_toolkit.shell_exec]) + [terminal_toolkit.shell_exec] + ) - note_toolkit = NoteTakingToolkit(options.project_id, - Agents.browser_agent, - working_directory=working_directory) + note_toolkit = NoteTakingToolkit( + options.project_id, + Agents.browser_agent, + working_directory=working_directory + ) note_toolkit = message_integration.register_toolkits(note_toolkit) search_tools = SearchToolkit.get_can_use_tools(options.project_id) @@ -94,8 +103,8 @@ def browser_agent(options: Chat): search_tools = [] tools = [ - *HumanToolkit.get_can_use_tools(options.project_id, - Agents.browser_agent), + *HumanToolkit. + get_can_use_tools(options.project_id, Agents.browser_agent), *web_toolkit_custom.get_tools(), *terminal_toolkit, *note_toolkit.get_tools(), diff --git a/backend/app/agent/factory/developer.py b/backend/app/agent/factory/developer.py index dc7bb5d1..94a6c62f 100644 --- a/backend/app/agent/factory/developer.py +++ b/backend/app/agent/factory/developer.py @@ -14,6 +14,9 @@ import platform +from camel.messages import BaseMessage +from camel.toolkits import ToolkitMessageIntegration + from app.agent.agent_model import agent_model from app.agent.listen_chat_agent import logger from app.agent.prompt import DEVELOPER_SYS_PROMPT @@ -22,22 +25,25 @@ from app.model.chat import Chat from app.service.task import Agents from app.utils.file_utils import get_working_directory from app.utils.toolkit.human_toolkit import HumanToolkit + # TODO: Remove NoteTakingToolkit and use TerminalToolkit instead from app.utils.toolkit.note_taking_toolkit import NoteTakingToolkit from app.utils.toolkit.screenshot_toolkit import ScreenshotToolkit from app.utils.toolkit.terminal_toolkit import TerminalToolkit from app.utils.toolkit.web_deploy_toolkit import WebDeployToolkit -from camel.messages import BaseMessage -from camel.toolkits import ToolkitMessageIntegration async def developer_agent(options: Chat): working_directory = get_working_directory(options) - logger.info(f"Creating developer agent for project: {options.project_id} " - f"in directory: {working_directory}") + logger.info( + f"Creating developer agent for project: {options.project_id} " + f"in directory: {working_directory}" + ) message_integration = ToolkitMessageIntegration( message_handler=HumanToolkit( - options.project_id, Agents.developer_agent).send_message_to_user) + options.project_id, Agents.developer_agent + ).send_message_to_user + ) note_toolkit = NoteTakingToolkit( api_task_id=options.project_id, agent_name=Agents.developer_agent, @@ -46,11 +52,14 @@ async def developer_agent(options: Chat): note_toolkit = message_integration.register_toolkits(note_toolkit) web_deploy_toolkit = WebDeployToolkit(api_task_id=options.project_id) web_deploy_toolkit = message_integration.register_toolkits( - web_deploy_toolkit) - screenshot_toolkit = ScreenshotToolkit(options.project_id, - working_directory=working_directory) + web_deploy_toolkit + ) + screenshot_toolkit = ScreenshotToolkit( + options.project_id, working_directory=working_directory + ) screenshot_toolkit = message_integration.register_toolkits( - screenshot_toolkit) + screenshot_toolkit + ) terminal_toolkit = TerminalToolkit( options.project_id, @@ -62,8 +71,8 @@ async def developer_agent(options: Chat): terminal_toolkit = message_integration.register_toolkits(terminal_toolkit) tools = [ - *HumanToolkit.get_can_use_tools(options.project_id, - Agents.developer_agent), + *HumanToolkit. + get_can_use_tools(options.project_id, Agents.developer_agent), *note_toolkit.get_tools(), *web_deploy_toolkit.get_tools(), *terminal_toolkit.get_tools(), diff --git a/backend/app/agent/factory/document.py b/backend/app/agent/factory/document.py index a8677310..b9b01c71 100644 --- a/backend/app/agent/factory/document.py +++ b/backend/app/agent/factory/document.py @@ -13,6 +13,9 @@ # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import platform +from camel.messages import BaseMessage +from camel.toolkits import ToolkitMessageIntegration + from app.agent.agent_model import agent_model from app.agent.listen_chat_agent import logger from app.agent.prompt import DOCUMENT_SYS_PROMPT @@ -25,36 +28,44 @@ from app.utils.toolkit.file_write_toolkit import FileToolkit from app.utils.toolkit.google_drive_mcp_toolkit import GoogleDriveMCPToolkit from app.utils.toolkit.human_toolkit import HumanToolkit from app.utils.toolkit.markitdown_toolkit import MarkItDownToolkit + # TODO: Remove NoteTakingToolkit and use TerminalToolkit instead from app.utils.toolkit.note_taking_toolkit import NoteTakingToolkit from app.utils.toolkit.pptx_toolkit import PPTXToolkit from app.utils.toolkit.terminal_toolkit import TerminalToolkit -from camel.messages import BaseMessage -from camel.toolkits import ToolkitMessageIntegration async def document_agent(options: Chat): working_directory = get_working_directory(options) - logger.info(f"Creating document agent for project: {options.project_id} " - f"in directory: {working_directory}") + logger.info( + f"Creating document agent for project: {options.project_id} " + f"in directory: {working_directory}" + ) message_integration = ToolkitMessageIntegration( - message_handler=HumanToolkit(options.project_id, - Agents.task_agent).send_message_to_user) - file_write_toolkit = FileToolkit(options.project_id, - working_directory=working_directory) - pptx_toolkit = PPTXToolkit(options.project_id, - working_directory=working_directory) + message_handler=HumanToolkit(options.project_id, Agents.task_agent + ).send_message_to_user + ) + file_write_toolkit = FileToolkit( + options.project_id, working_directory=working_directory + ) + pptx_toolkit = PPTXToolkit( + options.project_id, working_directory=working_directory + ) pptx_toolkit = message_integration.register_toolkits(pptx_toolkit) mark_it_down_toolkit = MarkItDownToolkit(options.project_id) mark_it_down_toolkit = message_integration.register_toolkits( - mark_it_down_toolkit) - excel_toolkit = ExcelToolkit(options.project_id, - working_directory=working_directory) + mark_it_down_toolkit + ) + excel_toolkit = ExcelToolkit( + options.project_id, working_directory=working_directory + ) excel_toolkit = message_integration.register_toolkits(excel_toolkit) - note_toolkit = NoteTakingToolkit(options.project_id, - Agents.document_agent, - working_directory=working_directory) + note_toolkit = NoteTakingToolkit( + options.project_id, + Agents.document_agent, + working_directory=working_directory + ) note_toolkit = message_integration.register_toolkits(note_toolkit) terminal_toolkit = TerminalToolkit( @@ -67,13 +78,14 @@ async def document_agent(options: Chat): terminal_toolkit = message_integration.register_toolkits(terminal_toolkit) google_drive_tools = await GoogleDriveMCPToolkit.get_can_use_tools( - options.project_id, options.get_bun_env()) + options.project_id, options.get_bun_env() + ) tools = [ *file_write_toolkit.get_tools(), *pptx_toolkit.get_tools(), - *HumanToolkit.get_can_use_tools(options.project_id, - Agents.document_agent), + *HumanToolkit. + get_can_use_tools(options.project_id, Agents.document_agent), *mark_it_down_toolkit.get_tools(), *excel_toolkit.get_tools(), *note_toolkit.get_tools(), diff --git a/backend/app/agent/factory/mcp.py b/backend/app/agent/factory/mcp.py index 1800bfce..c8f3c40e 100644 --- a/backend/app/agent/factory/mcp.py +++ b/backend/app/agent/factory/mcp.py @@ -20,7 +20,7 @@ from app.agent.listen_chat_agent import ListenChatAgent, logger from app.agent.prompt import MCP_SYS_PROMPT from app.agent.tools import get_mcp_tools from app.model.chat import Chat -from app.service.task import Agents, ActionCreateAgentData, get_task_lock +from app.service.task import ActionCreateAgentData, Agents, get_task_lock from app.utils.toolkit.mcp_search_toolkit import McpSearchToolkit @@ -30,19 +30,22 @@ async def mcp_agent(options: Chat): f"with {len(options.installed_mcp['mcpServers'])} MCP servers" ) tools = [ - # *HumanToolkit.get_can_use_tools(options.project_id, Agents.mcp_agent), *McpSearchToolkit(options.project_id).get_tools(), ] if len(options.installed_mcp["mcpServers"]) > 0: try: mcp_tools = await get_mcp_tools(options.installed_mcp) logger.info( - f"Retrieved {len(mcp_tools)} MCP tools for task {options.project_id}" + f"Retrieved {len(mcp_tools)} MCP tools " + f"for task {options.project_id}" ) if mcp_tools: - tool_names = [(tool.get_function_name() if hasattr( - tool, "get_function_name") else str(tool)) - for tool in mcp_tools] + tool_names = [ + ( + tool.get_function_name() + if hasattr(tool, "get_function_name") else str(tool) + ) for tool in mcp_tools + ] logger.debug(f"MCP tools: {tool_names}") tools = [*tools, *mcp_tools] except Exception as e: @@ -51,7 +54,8 @@ async def mcp_agent(options: Chat): task_lock = get_task_lock(options.project_id) agent_id = str(uuid.uuid4()) logger.info( - f"Creating MCP agent: {Agents.mcp_agent} with id: {agent_id} for task: {options.project_id}" + f"Creating MCP agent: {Agents.mcp_agent} with id: " + f"{agent_id} for task: {options.project_id}" ) asyncio.create_task( task_lock.put_queue( @@ -65,7 +69,10 @@ async def mcp_agent(options: Chat): key for key in options.installed_mcp["mcpServers"].keys() ], - }))) + } + ) + ) + ) return ListenChatAgent( options.project_id, Agents.mcp_agent, @@ -75,9 +82,11 @@ async def mcp_agent(options: Chat): model_type=options.model_type, api_key=options.api_key, url=options.api_url, - model_config_dict=({ - "user": str(options.project_id), - } if options.is_cloud() else None), + model_config_dict=( + { + "user": str(options.project_id), + } if options.is_cloud() else None + ), timeout=600, # 10 minutes **{ k: v diff --git a/backend/app/agent/factory/multi_modal.py b/backend/app/agent/factory/multi_modal.py index 79860777..85bb973a 100644 --- a/backend/app/agent/factory/multi_modal.py +++ b/backend/app/agent/factory/multi_modal.py @@ -13,6 +13,11 @@ # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import platform +from camel.messages import BaseMessage +from camel.models import OpenAIAudioModels +from camel.toolkits import ToolkitMessageIntegration +from camel.types import ModelPlatformType + from app.agent.agent_model import agent_model from app.agent.listen_chat_agent import logger from app.agent.prompt import MULTI_MODAL_SYS_PROMPT @@ -23,34 +28,37 @@ from app.utils.file_utils import get_working_directory from app.utils.toolkit.audio_analysis_toolkit import AudioAnalysisToolkit from app.utils.toolkit.human_toolkit import HumanToolkit from app.utils.toolkit.image_analysis_toolkit import ImageAnalysisToolkit + # TODO: Remove NoteTakingToolkit and use TerminalToolkit instead from app.utils.toolkit.note_taking_toolkit import NoteTakingToolkit from app.utils.toolkit.openai_image_toolkit import OpenAIImageToolkit from app.utils.toolkit.search_toolkit import SearchToolkit from app.utils.toolkit.terminal_toolkit import TerminalToolkit from app.utils.toolkit.video_download_toolkit import VideoDownloaderToolkit -from camel.messages import BaseMessage -from camel.models import OpenAIAudioModels -from camel.toolkits import ToolkitMessageIntegration -from camel.types import ModelPlatformType def multi_modal_agent(options: Chat): working_directory = get_working_directory(options) logger.info( f"Creating multi-modal agent for project: {options.project_id} " - f"in directory: {working_directory}") + f"in directory: {working_directory}" + ) message_integration = ToolkitMessageIntegration( message_handler=HumanToolkit( - options.project_id, Agents.multi_modal_agent).send_message_to_user) + options.project_id, Agents.multi_modal_agent + ).send_message_to_user + ) video_download_toolkit = VideoDownloaderToolkit( - options.project_id, working_directory=working_directory) + options.project_id, working_directory=working_directory + ) video_download_toolkit = message_integration.register_toolkits( - video_download_toolkit) + video_download_toolkit + ) image_analysis_toolkit = ImageAnalysisToolkit(options.project_id) image_analysis_toolkit = message_integration.register_toolkits( - image_analysis_toolkit) + image_analysis_toolkit + ) terminal_toolkit = TerminalToolkit( options.project_id, @@ -70,8 +78,8 @@ def multi_modal_agent(options: Chat): tools = [ *video_download_toolkit.get_tools(), *image_analysis_toolkit.get_tools(), - *HumanToolkit.get_can_use_tools(options.project_id, - Agents.multi_modal_agent), + *HumanToolkit. + get_can_use_tools(options.project_id, Agents.multi_modal_agent), *terminal_toolkit.get_tools(), *note_toolkit.get_tools(), ] @@ -88,7 +96,8 @@ def multi_modal_agent(options: Chat): url=options.api_url, ) open_ai_image_toolkit = message_integration.register_toolkits( - open_ai_image_toolkit) + open_ai_image_toolkit + ) tools = [ *tools, *open_ai_image_toolkit.get_tools(), @@ -109,7 +118,8 @@ def multi_modal_agent(options: Chat): ), ) audio_analysis_toolkit = message_integration.register_toolkits( - audio_analysis_toolkit) + audio_analysis_toolkit + ) tools.extend(audio_analysis_toolkit.get_tools()) system_message = MULTI_MODAL_SYS_PROMPT.format( diff --git a/backend/app/agent/listen_chat_agent.py b/backend/app/agent/listen_chat_agent.py index 9f1a8f9d..c4a806e6 100644 --- a/backend/app/agent/listen_chat_agent.py +++ b/backend/app/agent/listen_chat_agent.py @@ -18,15 +18,12 @@ import logging from threading import Event from typing import Any, Callable, Dict, List, Tuple -from app.service.task import (Action, ActionActivateAgentData, - ActionActivateToolkitData, ActionBudgetNotEnough, - ActionDeactivateAgentData, - ActionDeactivateToolkitData, get_task_lock, - set_process_task) from camel.agents import ChatAgent from camel.agents._types import ToolCallRequest -from camel.agents.chat_agent import (AsyncStreamingChatAgentResponse, - StreamingChatAgentResponse) +from camel.agents.chat_agent import ( + AsyncStreamingChatAgentResponse, + StreamingChatAgentResponse, +) from camel.memories import AgentMemory from camel.messages import BaseMessage from camel.models import BaseModelBackend, ModelManager, ModelProcessingError @@ -37,6 +34,17 @@ from camel.types import ModelPlatformType, ModelType from camel.types.agents import ToolCallingRecord from pydantic import BaseModel +from app.service.task import ( + Action, + ActionActivateAgentData, + ActionActivateToolkitData, + ActionBudgetNotEnough, + ActionDeactivateAgentData, + ActionDeactivateToolkitData, + get_task_lock, + set_process_task, +) + # Logger for agent tracking logger = logging.getLogger("agent") @@ -48,26 +56,30 @@ class ListenChatAgent(ChatAgent): api_task_id: str, agent_name: str, system_message: BaseMessage | str | None = None, - model: (BaseModelBackend - | ModelManager - | Tuple[str, str] - | str - | ModelType - | Tuple[ModelPlatformType, ModelType] - | List[BaseModelBackend] - | List[str] - | List[ModelType] - | List[Tuple[str, str]] - | List[Tuple[ModelPlatformType, ModelType]] - | None) = None, + model: ( + BaseModelBackend + | ModelManager + | Tuple[str, str] + | str + | ModelType + | Tuple[ModelPlatformType, ModelType] + | List[BaseModelBackend] + | List[str] + | List[ModelType] + | List[Tuple[str, str]] + | List[Tuple[ModelPlatformType, ModelType]] + | None + ) = None, memory: AgentMemory | None = None, message_window_size: int | None = None, token_limit: int | None = None, output_language: str | None = None, tools: List[FunctionTool | Callable[..., Any]] | None = None, toolkits_to_register_agent: List[RegisteredAgentToolkit] | None = None, - external_tools: (List[FunctionTool | Callable[..., Any] - | Dict[str, Any]] | None) = None, + external_tools: ( + List[FunctionTool | Callable[..., Any] + | Dict[str, Any]] | None + ) = None, response_terminators: List[ResponseTerminator] | None = None, scheduling_strategy: str = "round_robin", max_iteration: int | None = None, @@ -117,23 +129,33 @@ class ListenChatAgent(ChatAgent): task_lock = get_task_lock(self.api_task_id) asyncio.create_task( task_lock.put_queue( - ActionActivateAgentData(data={ - "agent_name": - self.agent_name, - "process_task_id": - self.process_task_id, - "agent_id": - self.agent_id, - "message": (input_message.content if isinstance( - input_message, BaseMessage) else input_message), - }, ))) + ActionActivateAgentData( + data={ + "agent_name": + self.agent_name, + "process_task_id": + self.process_task_id, + "agent_id": + self.agent_id, + "message": ( + input_message.content + if isinstance(input_message, BaseMessage) else + input_message + ), + }, + ) + ) + ) error_info = None message = None res = None - msg = (input_message.content - if isinstance(input_message, BaseMessage) else input_message) + msg = ( + input_message.content + if isinstance(input_message, BaseMessage) else input_message + ) logger.info( - f"Agent {self.agent_name} starting step with message: {msg}") + f"Agent {self.agent_name} starting step with message: {msg}" + ) try: res = super().step(input_message, response_format) except ModelProcessingError as e: @@ -143,18 +165,21 @@ class ListenChatAgent(ChatAgent): message = "Budget has been exceeded" logger.warning(f"Agent {self.agent_name} budget exceeded") asyncio.create_task( - task_lock.put_queue(ActionBudgetNotEnough())) + task_lock.put_queue(ActionBudgetNotEnough()) + ) else: message = str(e) logger.error( - f"Agent {self.agent_name} model processing error: {e}") + f"Agent {self.agent_name} model processing error: {e}" + ) total_tokens = 0 except Exception as e: res = None error_info = e logger.error( f"Agent {self.agent_name} unexpected error in step: {e}", - exc_info=True) + exc_info=True + ) message = f"Error processing message: {e!s}" total_tokens = 0 @@ -177,47 +202,55 @@ class ListenChatAgent(ChatAgent): total_tokens = 0 if last_response: usage_info = last_response.info.get( - "usage") or last_response.info.get( - "token_usage") or {} + "usage" + ) or last_response.info.get("token_usage") or {} if usage_info: total_tokens = usage_info.get( - "total_tokens", 0) + "total_tokens", 0 + ) asyncio.create_task( task_lock.put_queue( - ActionDeactivateAgentData(data={ - "agent_name": - self.agent_name, - "process_task_id": - self.process_task_id, - "agent_id": - self.agent_id, - "message": - accumulated_content, - "tokens": - total_tokens, - }, ))) + ActionDeactivateAgentData( + data={ + "agent_name": self.agent_name, + "process_task_id": + self.process_task_id, + "agent_id": self.agent_id, + "message": accumulated_content, + "tokens": total_tokens, + }, + ) + ) + ) return StreamingChatAgentResponse(_stream_with_deactivate()) message = res.msg.content if res.msg else "" - usage_info = res.info.get("usage") or res.info.get( - "token_usage") or {} - total_tokens = usage_info.get("total_tokens", - 0) if usage_info else 0 - logger.info(f"Agent {self.agent_name} completed step, " - f"tokens used: {total_tokens}") + usage_info = res.info.get("usage") or res.info.get("token_usage" + ) or {} + total_tokens = usage_info.get( + "total_tokens", 0 + ) if usage_info else 0 + logger.info( + f"Agent {self.agent_name} completed step, " + f"tokens used: {total_tokens}" + ) assert message is not None asyncio.create_task( task_lock.put_queue( - ActionDeactivateAgentData(data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "message": message, - "tokens": total_tokens, - }, ))) + ActionDeactivateAgentData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "message": message, + "tokens": total_tokens, + }, + ) + ) + ) if error_info is not None: raise error_info @@ -240,18 +273,26 @@ class ListenChatAgent(ChatAgent): self.process_task_id, "agent_id": self.agent_id, - "message": (input_message.content if isinstance( - input_message, BaseMessage) else input_message), + "message": ( + input_message.content + if isinstance(input_message, BaseMessage) else + input_message + ), }, - )) + ) + ) error_info = None message = None res = None - msg = (input_message.content - if isinstance(input_message, BaseMessage) else input_message) - logger.debug(f"Agent {self.agent_name} starting async step " - f"with message: {msg}") + msg = ( + input_message.content + if isinstance(input_message, BaseMessage) else input_message + ) + logger.debug( + f"Agent {self.agent_name} starting async step " + f"with message: {msg}" + ) try: res = await super().astep(input_message, response_format) @@ -264,46 +305,56 @@ class ListenChatAgent(ChatAgent): message = "Budget has been exceeded" logger.warning(f"Agent {self.agent_name} budget exceeded") asyncio.create_task( - task_lock.put_queue(ActionBudgetNotEnough())) + task_lock.put_queue(ActionBudgetNotEnough()) + ) else: message = str(e) logger.error( - f"Agent {self.agent_name} model processing error: {e}") + f"Agent {self.agent_name} model processing error: {e}" + ) total_tokens = 0 except Exception as e: res = None error_info = e logger.error( f"Agent {self.agent_name} unexpected error in async step: {e}", - exc_info=True) + exc_info=True + ) message = f"Error processing message: {e!s}" total_tokens = 0 if res is not None: message = res.msg.content if res.msg else "" total_tokens = res.info["usage"]["total_tokens"] - logger.info(f"Agent {self.agent_name} completed step, " - f"tokens used: {total_tokens}") + logger.info( + f"Agent {self.agent_name} completed step, " + f"tokens used: {total_tokens}" + ) assert message is not None asyncio.create_task( task_lock.put_queue( - ActionDeactivateAgentData(data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "agent_id": self.agent_id, - "message": message, - "tokens": total_tokens, - }, ))) + ActionDeactivateAgentData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "agent_id": self.agent_id, + "message": message, + "tokens": total_tokens, + }, + ) + ) + ) if error_info is not None: raise error_info assert res is not None return res - def _execute_tool(self, - tool_call_request: ToolCallRequest) -> ToolCallingRecord: + def _execute_tool( + self, tool_call_request: ToolCallRequest + ) -> ToolCallingRecord: func_name = tool_call_request.tool_name tool: FunctionTool = self._internal_tools[func_name] # Route async functions to async execution @@ -327,28 +378,31 @@ class ListenChatAgent(ChatAgent): task_lock = get_task_lock(self.api_task_id) toolkit_name = getattr(tool, "_toolkit_name") if hasattr( - tool, "_toolkit_name") else "mcp_toolkit" - logger.debug(f"Agent {self.agent_name} executing tool: " - f"{func_name} from toolkit: {toolkit_name} " - f"with args: {json.dumps(args, ensure_ascii=False)}") + tool, "_toolkit_name" + ) else "mcp_toolkit" + logger.debug( + f"Agent {self.agent_name} executing tool: " + f"{func_name} from toolkit: {toolkit_name} " + f"with args: {json.dumps(args, ensure_ascii=False)}" + ) # Only send activate event if tool is # NOT wrapped by @listen_toolkit if not has_listen_decorator: asyncio.create_task( task_lock.put_queue( - ActionActivateToolkitData(data={ - "agent_name": - self.agent_name, - "process_task_id": - self.process_task_id, - "toolkit_name": - toolkit_name, - "method_name": - func_name, - "message": - json.dumps(args, ensure_ascii=False), - }, ))) + ActionActivateToolkitData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "toolkit_name": toolkit_name, + "method_name": func_name, + "message": + json.dumps(args, ensure_ascii=False), + }, + ) + ) + ) # Set process_task context for all tool executions with set_process_task(self.process_task_id): raw_result = tool(**args) @@ -357,7 +411,8 @@ class ListenChatAgent(ChatAgent): self._secure_result_store[tool_call_id] = raw_result result = ( "[The tool has been executed successfully, but the output" - " from the tool is masked. You can move forward]") + " from the tool is masked. You can move forward]" + ) mask_flag = True else: result = raw_result @@ -369,30 +424,39 @@ class ListenChatAgent(ChatAgent): result_str = repr(result) MAX_RESULT_LENGTH = 500 if len(result_str) > MAX_RESULT_LENGTH: - result_msg = (result_str[:MAX_RESULT_LENGTH] + - (f"... (truncated, total length: " - f"{len(result_str)} chars)")) + result_msg = ( + result_str[:MAX_RESULT_LENGTH] + ( + f"... (truncated, total length: " + f"{len(result_str)} chars)" + ) + ) else: result_msg = result_str - # Only send deactivate event if tool is NOT wrapped by @listen_toolkit + # Only send deactivate event if tool is + # NOT wrapped by @listen_toolkit if not has_listen_decorator: asyncio.create_task( task_lock.put_queue( - ActionDeactivateToolkitData(data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "toolkit_name": toolkit_name, - "method_name": func_name, - "message": result_msg, - }, ))) + ActionDeactivateToolkitData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "toolkit_name": toolkit_name, + "method_name": func_name, + "message": result_msg, + }, + ) + ) + ) except Exception as e: # Capture the error message to prevent framework crash error_msg = f"Error executing tool '{func_name}': {e!s}" result = f"Tool execution failed: {error_msg}" mask_flag = False - logger.error(f"Tool execution failed for {func_name}: {e}", - exc_info=True) + logger.error( + f"Tool execution failed for {func_name}: {e}", exc_info=True + ) return self._record_tool_calling( func_name, @@ -404,7 +468,8 @@ class ListenChatAgent(ChatAgent): ) async def _aexecute_tool( - self, tool_call_request: ToolCallRequest) -> ToolCallingRecord: + self, tool_call_request: ToolCallRequest + ) -> ToolCallingRecord: func_name = tool_call_request.tool_name tool: FunctionTool = self._internal_tools[func_name] @@ -420,21 +485,24 @@ class ListenChatAgent(ChatAgent): if hasattr(tool, "_toolkit_name"): toolkit_name = tool._toolkit_name - # Method 2: For MCP tools, check if func has __self__ (the toolkit instance) - if not toolkit_name and hasattr(tool, "func") and hasattr( - tool.func, "__self__"): + # Method 2: For MCP tools, check if func + # has __self__ (the toolkit instance) + if not toolkit_name and hasattr(tool, "func" + ) and hasattr(tool.func, "__self__"): toolkit_instance = tool.func.__self__ if hasattr(toolkit_instance, "toolkit_name") and callable( - toolkit_instance.toolkit_name): + toolkit_instance.toolkit_name + ): toolkit_name = toolkit_instance.toolkit_name() # Method 3: Check if tool.func is a bound method with toolkit if not toolkit_name and hasattr(tool, "func"): - if hasattr(tool.func, "func") and hasattr(tool.func.func, - "__self__"): + if hasattr(tool.func, + "func") and hasattr(tool.func.func, "__self__"): toolkit_instance = tool.func.func.__self__ if hasattr(toolkit_instance, "toolkit_name") and callable( - toolkit_instance.toolkit_name): + toolkit_instance.toolkit_name + ): toolkit_name = toolkit_instance.toolkit_name() # Default fallback @@ -442,7 +510,11 @@ class ListenChatAgent(ChatAgent): toolkit_name = "mcp_toolkit" logger.info( - f"Agent {self.agent_name} executing async tool: {func_name} from toolkit: {toolkit_name} with args: {json.dumps(args, ensure_ascii=False)}" + f"Agent {self.agent_name} executing" + f" async tool: {func_name}" + f" from toolkit: {toolkit_name}" + " with args:" + f" {json.dumps(args, ensure_ascii=False)}" ) # Check if tool is wrapped by @listen_toolkit decorator @@ -452,25 +524,24 @@ class ListenChatAgent(ChatAgent): # Only send activate event if tool is NOT wrapped by @listen_toolkit if not has_listen_decorator: await task_lock.put_queue( - ActionActivateToolkitData(data={ - "agent_name": - self.agent_name, - "process_task_id": - self.process_task_id, - "toolkit_name": - toolkit_name, - "method_name": - func_name, - "message": - json.dumps(args, ensure_ascii=False), - }, )) + ActionActivateToolkitData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "toolkit_name": toolkit_name, + "method_name": func_name, + "message": json.dumps(args, ensure_ascii=False), + }, + ) + ) try: # Set process_task context for all tool executions with set_process_task(self.process_task_id): # Try different invocation paths in order of preference if hasattr(tool, "func") and hasattr(tool.func, "async_call"): # Case: FunctionTool wrapping an MCP tool - # Check if the wrapped tool is sync to avoid run_in_executor + # Check if the wrapped tool is sync + # to avoid run_in_executor if hasattr(tool, "is_async") and not tool.is_async: # Sync tool: call directly to preserve ContextVar result = tool(**args) @@ -482,11 +553,14 @@ class ListenChatAgent(ChatAgent): elif hasattr(tool, "async_call") and callable(tool.async_call): # Case: tool itself has async_call - # Check if this is a sync tool to avoid run_in_executor (which breaks ContextVar) + # Check if this is a sync tool to avoid + # run_in_executor (breaks ContextVar) if hasattr(tool, "is_async") and not tool.is_async: - # Sync tool: call directly to preserve ContextVar in same thread + # Sync tool: call directly to preserve + # ContextVar in same thread result = tool(**args) - # Handle case where synchronous call returns a coroutine + # Handle case where synchronous call + # returns a coroutine if asyncio.iscoroutine(result): result = await result else: @@ -494,7 +568,8 @@ class ListenChatAgent(ChatAgent): result = await tool.async_call(**args) elif hasattr(tool, "func") and asyncio.iscoroutinefunction( - tool.func): + tool.func + ): # Case: tool wraps a direct async function result = await tool.func(**args) @@ -503,7 +578,8 @@ class ListenChatAgent(ChatAgent): result = await tool(**args) else: - # Fallback: synchronous call - call directly in current context + # Fallback: synchronous call - call + # directly in current context # DO NOT use run_in_executor to preserve ContextVar result = tool(**args) # Handle case where synchronous call returns a coroutine @@ -514,8 +590,10 @@ class ListenChatAgent(ChatAgent): # Capture the error message to prevent framework crash error_msg = f"Error executing async tool '{func_name}': {e!s}" result = {"error": error_msg} - logger.error(f"Async tool execution failed for {func_name}: {e}", - exc_info=True) + logger.error( + f"Async tool execution failed for {func_name}: {e}", + exc_info=True + ) # Prepare result message with truncation if isinstance(result, str): @@ -524,20 +602,27 @@ class ListenChatAgent(ChatAgent): result_str = repr(result) MAX_RESULT_LENGTH = 500 if len(result_str) > MAX_RESULT_LENGTH: - result_msg = result_str[:MAX_RESULT_LENGTH] + f"... (truncated, total length: {len(result_str)} chars)" + result_msg = ( + result_str[:MAX_RESULT_LENGTH] + "... (truncated, total" + f" length: {len(result_str)}" + " chars)" + ) else: result_msg = result_str # Only send deactivate event if tool is NOT wrapped by @listen_toolkit if not has_listen_decorator: await task_lock.put_queue( - ActionDeactivateToolkitData(data={ - "agent_name": self.agent_name, - "process_task_id": self.process_task_id, - "toolkit_name": toolkit_name, - "method_name": func_name, - "message": result_msg, - }, )) + ActionDeactivateToolkitData( + data={ + "agent_name": self.agent_name, + "process_task_id": self.process_task_id, + "toolkit_name": toolkit_name, + "method_name": func_name, + "message": result_msg, + }, + ) + ) return self._record_tool_calling( func_name, args, @@ -560,8 +645,9 @@ class ListenChatAgent(ChatAgent): model=self.model_backend.models, # Pass the existing model_backend memory=None, # clone memory later message_window_size=getattr(self.memory, "window_size", None), - token_limit=getattr(self.memory.get_context_creator(), - "token_limit", None), + token_limit=getattr( + self.memory.get_context_creator(), "token_limit", None + ), output_language=self._output_language, tools=cloned_tools, toolkits_to_register_agent=toolkits_to_register, diff --git a/backend/app/agent/prompt.py b/backend/app/agent/prompt.py index c13eaf36..ae09a00f 100644 --- a/backend/app/agent/prompt.py +++ b/backend/app/agent/prompt.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +# flake8: noqa SOCIAL_MEDIA_SYS_PROMPT = """\ You are a Social Media Management Assistant with comprehensive capabilities @@ -595,3 +596,17 @@ Your approach depends on available search tools: - When encountering verification challenges (like login, CAPTCHAs or robot checks), you MUST request help using the human toolkit. """ + +DEFAULT_SUMMARY_PROMPT = ( + "After completing the task, please generate" + " a summary of the entire task completion. " + "The summary must be enclosed in" + " tags and include:\n" + "1. A confirmation of task completion," + " referencing the original goal.\n" + "2. A high-level overview of the work" + " performed and the final outcome.\n" + "3. A bulleted list of key results" + " or accomplishments.\n" + "Adopt a confident and professional tone." +) diff --git a/backend/app/agent/tools.py b/backend/app/agent/tools.py index 1a10b68a..957e06ed 100644 --- a/backend/app/agent/tools.py +++ b/backend/app/agent/tools.py @@ -16,6 +16,8 @@ import asyncio import logging import os +from camel.toolkits import MCPToolkit + from app.component.environment import env from app.model.chat import McpServers from app.utils.toolkit.abstract_toolkit import AbstractToolkit @@ -41,14 +43,15 @@ from app.utils.toolkit.twitter_toolkit import TwitterToolkit from app.utils.toolkit.video_analysis_toolkit import VideoAnalysisToolkit from app.utils.toolkit.video_download_toolkit import VideoDownloaderToolkit from app.utils.toolkit.whatsapp_toolkit import WhatsAppToolkit -from camel.toolkits import MCPToolkit logger = logging.getLogger(__name__) async def get_toolkits(tools: list[str], agent_name: str, api_task_id: str): - logger.info(f"Getting toolkits for agent: {agent_name}, " - f"task: {api_task_id}, tools: {tools}") + logger.info( + f"Getting toolkits for agent: {agent_name}, " + f"task: {api_task_id}, tools: {tools}" + ) toolkits = { "audio_analysis_toolkit": AudioAnalysisToolkit, "openai_image_toolkit": OpenAIImageToolkit, @@ -80,7 +83,8 @@ async def get_toolkits(tools: list[str], agent_name: str, api_task_id: str): toolkit.agent_name = agent_name toolkit_tools = toolkit.get_can_use_tools(api_task_id) toolkit_tools = await toolkit_tools if asyncio.iscoroutine( - toolkit_tools) else toolkit_tools + toolkit_tools + ) else toolkit_tools res.extend(toolkit_tools) else: logger.warning(f"Toolkit {item} not found for agent {agent_name}") @@ -89,7 +93,8 @@ async def get_toolkits(tools: list[str], agent_name: str, api_task_id: str): async def get_mcp_tools(mcp_server: McpServers): logger.info( - f"Getting MCP tools for {len(mcp_server['mcpServers'])} servers") + f"Getting MCP tools for {len(mcp_server['mcpServers'])} servers" + ) if len(mcp_server["mcpServers"]) == 0: return [] @@ -102,19 +107,26 @@ async def get_mcp_tools(mcp_server: McpServers): # Set global auth directory to persist authentication across tasks if "MCP_REMOTE_CONFIG_DIR" not in server_config["env"]: server_config["env"]["MCP_REMOTE_CONFIG_DIR"] = env( - "MCP_REMOTE_CONFIG_DIR", os.path.expanduser("~/.mcp-auth")) + "MCP_REMOTE_CONFIG_DIR", os.path.expanduser("~/.mcp-auth") + ) mcp_toolkit = None try: mcp_toolkit = MCPToolkit(config_dict=config_dict, timeout=180) await mcp_toolkit.connect() - logger.info(f"Successfully connected to MCP toolkit with " - f"{len(mcp_server['mcpServers'])} servers") + logger.info( + f"Successfully connected to MCP toolkit with " + f"{len(mcp_server['mcpServers'])} servers" + ) tools = mcp_toolkit.get_tools() if tools: - tool_names = [(tool.get_function_name() if hasattr( - tool, "get_function_name") else str(tool)) for tool in tools] + tool_names = [ + ( + tool.get_function_name() + if hasattr(tool, "get_function_name") else str(tool) + ) for tool in tools + ] logging.debug(f"MCP tool names: {tool_names}") return tools except asyncio.CancelledError: diff --git a/backend/app/component/__init__.py b/backend/app/component/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/component/debug.py b/backend/app/component/debug.py deleted file mode 100644 index a8da8b94..00000000 --- a/backend/app/component/debug.py +++ /dev/null @@ -1,29 +0,0 @@ -# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= - -import inspect - - -def dump_class(obj, max_val_len=1000): - cls = obj.__class__ - print(f"Class: {cls.__name__}") - print("Attributes:") - for name, val in vars(obj).items(): - val_str = repr(val) - if len(val_str) > max_val_len: - val_str = val_str[:max_val_len] + "... [truncated]" - print(f" {name} = {val_str}") - # print("Methods:") - # for name, method in inspect.getmembers(cls, predicate=inspect.isfunction): - # print(f" {name}()") diff --git a/backend/app/component/pydantic/__init__.py b/backend/app/component/pydantic/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/controller/__init__.py b/backend/app/controller/__init__.py index 3a4d90c0..fa7455a0 100644 --- a/backend/app/controller/__init__.py +++ b/backend/app/controller/__init__.py @@ -11,4 +11,3 @@ # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= - diff --git a/backend/app/controller/chat_controller.py b/backend/app/controller/chat_controller.py index 04490f98..a379a636 100644 --- a/backend/app/controller/chat_controller.py +++ b/backend/app/controller/chat_controller.py @@ -13,37 +13,44 @@ # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import asyncio +import logging import os import re import time from pathlib import Path + from dotenv import load_dotenv -from fastapi import APIRouter, HTTPException, Request, Response +from fastapi import APIRouter, Request, Response from fastapi.responses import StreamingResponse -import logging + from app.component import code +from app.component.environment import sanitize_env_path, set_user_env_path from app.exception.exception import UserException -from app.model.chat import Chat, HumanReply, McpServers, Status, SupplementChat, AddTaskRequest, sse_json +from app.model.chat import ( + AddTaskRequest, + Chat, + HumanReply, + McpServers, + Status, + SupplementChat, + sse_json, +) from app.service.chat_service import step_solve from app.service.task import ( Action, + ActionAddTaskData, ActionImproveData, ActionInstallMcpData, - ActionStopData, - ActionSupplementData, - ActionAddTaskData, ActionRemoveTaskData, ActionSkipTaskData, + ActionStopData, + ActionSupplementData, + delete_task_lock, get_or_create_task_lock, get_task_lock, set_current_task_id, - delete_task_lock, task_locks, ) -from app.component.environment import set_user_env_path, sanitize_env_path -from app.utils.workforce import Workforce -from camel.tasks.task import Task - router = APIRouter() @@ -69,23 +76,37 @@ async def _cleanup_task_lock_safe(task_lock, reason: str) -> bool: # Check if task_lock still exists before attempting cleanup if task_lock.id not in task_locks: - chat_logger.debug(f"[{reason}] Task lock already removed, skipping cleanup", - extra={"task_id": task_lock.id}) + chat_logger.debug( + f"[{reason}] Task lock already removed, skipping cleanup", + extra={"task_id": task_lock.id} + ) return False try: task_lock.status = Status.done await delete_task_lock(task_lock.id) - chat_logger.info(f"[{reason}] Task lock cleanup completed", - extra={"task_id": task_lock.id}) + chat_logger.info( + f"[{reason}] Task lock cleanup completed", + extra={"task_id": task_lock.id} + ) return True except Exception as e: - chat_logger.error(f"[{reason}] Failed to cleanup task lock", - extra={"task_id": task_lock.id, "error": str(e)}, exc_info=True) + chat_logger.error( + f"[{reason}] Failed to cleanup task lock", + extra={ + "task_id": task_lock.id, + "error": str(e) + }, + exc_info=True + ) return False -async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TIMEOUT_SECONDS, task_lock=None): +async def timeout_stream_wrapper( + stream_generator, + timeout_seconds: int = SSE_TIMEOUT_SECONDS, + task_lock=None +): """Wraps a stream generator with timeout handling. Closes the SSE connection if no data is received within the timeout period. @@ -101,26 +122,45 @@ async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TI remaining_timeout = timeout_seconds - elapsed try: - data = await asyncio.wait_for(generator.__anext__(), timeout=remaining_timeout) + data = await asyncio.wait_for( + generator.__anext__(), timeout=remaining_timeout + ) last_data_time = time.time() yield data except asyncio.TimeoutError: - chat_logger.warning("SSE timeout: No data received, closing connection", - extra={"timeout_seconds": timeout_seconds}) - yield sse_json("error", {"message": f"Connection timeout: No data received for {timeout_seconds // 60} minutes"}) - cleanup_triggered = await _cleanup_task_lock_safe(task_lock, "TIMEOUT") + chat_logger.warning( + "SSE timeout: No data received, closing connection", + extra={"timeout_seconds": timeout_seconds} + ) + timeout_min = timeout_seconds // 60 + yield sse_json( + "error", { + "message": + "Connection timeout: No data" + f" received for {timeout_min}" + " minutes" + } + ) + cleanup_triggered = await _cleanup_task_lock_safe( + task_lock, "TIMEOUT" + ) break except StopAsyncIteration: break except asyncio.CancelledError: - chat_logger.info("[STREAM-CANCELLED] Stream cancelled, triggering cleanup") + chat_logger.info( + "[STREAM-CANCELLED] Stream cancelled, triggering cleanup" + ) if not cleanup_triggered: await _cleanup_task_lock_safe(task_lock, "CANCELLED") raise except Exception as e: - chat_logger.error("[STREAM-ERROR] Unexpected error in stream wrapper", - extra={"error": str(e)}, exc_info=True) + chat_logger.error( + "[STREAM-ERROR] Unexpected error in stream wrapper", + extra={"error": str(e)}, + exc_info=True + ) if not cleanup_triggered: await _cleanup_task_lock_safe(task_lock, "ERROR") raise @@ -130,7 +170,11 @@ async def timeout_stream_wrapper(stream_generator, timeout_seconds: int = SSE_TI async def post(data: Chat, request: Request): chat_logger.info( "Starting new chat session", - extra={"project_id": data.project_id, "task_id": data.task_id, "user": data.email} + extra={ + "project_id": data.project_id, + "task_id": data.task_id, + "user": data.email + } ) task_lock = get_or_create_task_lock(data.project_id) @@ -145,7 +189,8 @@ async def post(data: Chat, request: Request): os.environ["file_save_path"] = data.file_save_path() os.environ["browser_port"] = str(data.browser_port) os.environ["OPENAI_API_KEY"] = data.api_key - os.environ["OPENAI_API_BASE_URL"] = data.api_url or "https://api.openai.com/v1" + os.environ["OPENAI_API_BASE_URL" + ] = data.api_url or "https://api.openai.com/v1" os.environ["CAMEL_MODEL_LOG_ENABLED"] = "true" # Set user-specific search engine configuration if provided @@ -153,16 +198,17 @@ async def post(data: Chat, request: Request): for key, value in data.search_config.items(): if value: os.environ[key] = value - chat_logger.debug(f"Set search config: {key}", extra={"project_id": data.project_id}) + chat_logger.debug( + f"Set search config: {key}", + extra={"project_id": data.project_id} + ) - email_sanitized = re.sub(r'[\\/*?:"<>|\s]', "_", data.email.split("@")[0]).strip(".") + email_sanitized = re.sub(r'[\\/*?:"<>|\s]', "_", + data.email.split("@")[0]).strip(".") camel_log = ( - Path.home() - / ".eigent" - / email_sanitized - / ("project_" + data.project_id) - / ("task_" + data.task_id) - / "camel_logs" + Path.home() / ".eigent" / email_sanitized / + ("project_" + data.project_id) / ("task_" + data.task_id) / + "camel_logs" ) camel_log.mkdir(parents=True, exist_ok=True) @@ -175,20 +221,35 @@ async def post(data: Chat, request: Request): set_current_task_id(data.project_id, data.task_id) # Put initial action in queue to start processing - await task_lock.put_queue(ActionImproveData(data=data.question, new_task_id=data.task_id)) + await task_lock.put_queue( + ActionImproveData(data=data.question, new_task_id=data.task_id) + ) chat_logger.info( "Chat session initialized", - extra={"project_id": data.project_id, "task_id": data.task_id, "log_dir": str(camel_log)}, + extra={ + "project_id": data.project_id, + "task_id": data.task_id, + "log_dir": str(camel_log) + }, ) return StreamingResponse( - timeout_stream_wrapper(step_solve(data, request, task_lock), task_lock=task_lock), media_type="text/event-stream" + timeout_stream_wrapper( + step_solve(data, request, task_lock), task_lock=task_lock + ), + media_type="text/event-stream" ) @router.post("/chat/{id}", name="improve chat") def improve(id: str, data: SupplementChat): - chat_logger.info("Chat improvement requested", extra={"task_id": id, "question_length": len(data.question)}) + chat_logger.info( + "Chat improvement requested", + extra={ + "task_id": id, + "question_length": len(data.question) + } + ) task_lock = get_task_lock(id) # Allow continuing conversation even after task is done @@ -203,11 +264,21 @@ def improve(id: str, data: SupplementChat): # Log context preservation if hasattr(task_lock, "conversation_history"): - chat_logger.info(f"[CONTEXT] Preserved {len(task_lock.conversation_history)} conversation entries") + hist_len = len(task_lock.conversation_history) + chat_logger.info( + "[CONTEXT] Preserved" + f" {hist_len} conversation entries" + ) if hasattr(task_lock, "last_task_result"): - chat_logger.info(f"[CONTEXT] Preserved task result: {len(task_lock.last_task_result)} chars") + result_len = len(task_lock.last_task_result) + chat_logger.info( + "[CONTEXT] Preserved task" + f" result: {result_len} chars" + ) - # If task_id is provided, optimistically update file_save_path (will be destroyed if task is not complex) + # If task_id is provided, optimistically update + # file_save_path (will be destroyed if task is + # not complex) # this is because a NEW workforce instance may be created for this task new_folder_path = None if data.task_id: @@ -224,24 +295,49 @@ def improve(id: str, data: SupplementChat): if eigent_index + 1 < len(path_parts): current_email = path_parts[eigent_index + 1] - # If we have the necessary information, update the file_save_path + # If we have the necessary info, update + # the file_save_path if current_email and id: - # Create new path using the existing pattern: email/project_{project_id}/task_{task_id} - new_folder_path = Path.home() / "eigent" / current_email / f"project_{id}" / f"task_{data.task_id}" + # Create new path using the existing + # pattern: email/project_{id}/task_{id} + new_folder_path = ( + Path.home() / "eigent" / current_email / f"project_{id}" / + f"task_{data.task_id}" + ) new_folder_path.mkdir(parents=True, exist_ok=True) os.environ["file_save_path"] = str(new_folder_path) - chat_logger.info(f"Updated file_save_path to: {new_folder_path}") + chat_logger.info( + f"Updated file_save_path to: {new_folder_path}" + ) - # Store the new folder path in task_lock for potential cleanup and persistence + # Store the new folder path in task_lock + # for potential cleanup and persistence task_lock.new_folder_path = new_folder_path else: - chat_logger.warning(f"Could not update file_save_path - email: {current_email}, project_id: {id}") + chat_logger.warning( + "Could not update" + " file_save_path -" + f" email: {current_email}," + f" project_id: {id}" + ) except Exception as e: - chat_logger.error(f"Error updating file path for project_id: {id}, task_id: {data.task_id}: {e}") + chat_logger.error( + "Error updating file path for" + f" project_id: {id}," + f" task_id: {data.task_id}:" + f" {e}" + ) - asyncio.run(task_lock.put_queue(ActionImproveData(data=data.question, new_task_id=data.task_id))) - chat_logger.info("Improvement request queued with preserved context", extra={"project_id": id}) + asyncio.run( + task_lock.put_queue( + ActionImproveData(data=data.question, new_task_id=data.task_id) + ) + ) + chat_logger.info( + "Improvement request queued with preserved context", + extra={"project_id": id} + ) return Response(status_code=201) @@ -260,24 +356,50 @@ def supplement(id: str, data: SupplementChat): def stop(id: str): """stop the task""" chat_logger.info("=" * 80) - chat_logger.info("🛑 [STOP-BUTTON] DELETE /chat/{id} request received from frontend") + chat_logger.info( + "🛑 [STOP-BUTTON] DELETE /chat/{id} request received from frontend" + ) chat_logger.info(f"[STOP-BUTTON] project_id/task_id: {id}") chat_logger.info("=" * 80) try: task_lock = get_task_lock(id) - chat_logger.info(f"[STOP-BUTTON] Task lock retrieved, task_lock.id: {task_lock.id}, task_lock.status: {task_lock.status}") - chat_logger.info(f"[STOP-BUTTON] Queueing ActionStopData(Action.stop) to task_lock queue") + chat_logger.info( + "[STOP-BUTTON] Task lock retrieved," + f" task_lock.id: {task_lock.id}," + f" task_lock.status: {task_lock.status}" + ) + chat_logger.info( + "[STOP-BUTTON] Queueing" + " ActionStopData(Action.stop)" + " to task_lock queue" + ) asyncio.run(task_lock.put_queue(ActionStopData(action=Action.stop))) - chat_logger.info(f"[STOP-BUTTON] ✅ ActionStopData queued successfully, this will trigger workforce.stop_gracefully()") + chat_logger.info( + "[STOP-BUTTON] ActionStopData queued" + " successfully, this will trigger" + " workforce.stop_gracefully()" + ) except Exception as e: - # Task lock may not exist if task is already finished or never started - chat_logger.warning(f"[STOP-BUTTON] ⚠️ Task lock not found or already stopped, task_id: {id}, error: {str(e)}") + # Task lock may not exist if task is already + # finished or never started + chat_logger.warning( + "[STOP-BUTTON] Task lock not found" + " or already stopped," + f" task_id: {id}," + f" error: {str(e)}" + ) return Response(status_code=204) @router.post("/chat/{id}/human-reply") def human_reply(id: str, data: HumanReply): - chat_logger.info("Human reply received", extra={"task_id": id, "reply_length": len(data.reply)}) + chat_logger.info( + "Human reply received", + extra={ + "task_id": id, + "reply_length": len(data.reply) + } + ) task_lock = get_task_lock(id) asyncio.run(task_lock.put_human_input(data.agent, data.reply)) chat_logger.debug("Human reply processed", extra={"task_id": id}) @@ -286,9 +408,19 @@ def human_reply(id: str, data: HumanReply): @router.post("/chat/{id}/install-mcp") def install_mcp(id: str, data: McpServers): - chat_logger.info("Installing MCP servers", extra={"task_id": id, "servers_count": len(data.get("mcpServers", {}))}) + chat_logger.info( + "Installing MCP servers", + extra={ + "task_id": id, + "servers_count": len(data.get("mcpServers", {})) + } + ) task_lock = get_task_lock(id) - asyncio.run(task_lock.put_queue(ActionInstallMcpData(action=Action.install_mcp, data=data))) + asyncio.run( + task_lock.put_queue( + ActionInstallMcpData(action=Action.install_mcp, data=data) + ) + ) chat_logger.info("MCP installation queued", extra={"task_id": id}) return Response(status_code=201) @@ -296,7 +428,11 @@ def install_mcp(id: str, data: McpServers): @router.post("/chat/{id}/add-task", name="add task to workforce") def add_task(id: str, data: AddTaskRequest): """Add a new task to the workforce""" - chat_logger.info(f"Adding task to workforce for task_id: {id}, content: {data.content[:100]}...") + chat_logger.info( + "Adding task to workforce for" + f" task_id: {id}," + f" content: {data.content[:100]}..." + ) task_lock = get_task_lock(id) try: @@ -316,22 +452,35 @@ def add_task(id: str, data: AddTaskRequest): raise UserException(code.error, f"Failed to add task: {str(e)}") -@router.delete("/chat/{project_id}/remove-task/{task_id}", name="remove task from workforce") +@router.delete( + "/chat/{project_id}/remove-task/{task_id}", + name="remove task from workforce" +) def remove_task(project_id: str, task_id: str): """Remove a task from the workforce""" - chat_logger.info(f"Removing task {task_id} from workforce for project_id: {project_id}") + chat_logger.info( + f"Removing task {task_id} from workforce for project_id: {project_id}" + ) task_lock = get_task_lock(project_id) try: # Queue the remove task action - remove_task_action = ActionRemoveTaskData(task_id=task_id, project_id=project_id) + remove_task_action = ActionRemoveTaskData( + task_id=task_id, project_id=project_id + ) asyncio.run(task_lock.put_queue(remove_task_action)) - chat_logger.info(f"Task removal request queued for project_id: {project_id}, removing task: {task_id}") + chat_logger.info( + "Task removal request queued for" + f" project_id: {project_id}," + f" removing task: {task_id}" + ) return Response(status_code=204) except Exception as e: - chat_logger.error(f"Error removing task {task_id} for project_id: {project_id}: {e}") + chat_logger.error( + f"Error removing task {task_id} for project_id: {project_id}: {e}" + ) raise UserException(code.error, f"Failed to remove task: {str(e)}") @@ -349,21 +498,45 @@ def skip_task(project_id: str): - Keeps SSE connection alive for multi-turn conversation """ chat_logger.info("=" * 80) - chat_logger.info(f"🛑 [STOP-BUTTON] SKIP-TASK request received from frontend (User clicked Stop)") + chat_logger.info( + "[STOP-BUTTON] SKIP-TASK request" + " received from frontend" + " (User clicked Stop)" + ) chat_logger.info(f"[STOP-BUTTON] project_id: {project_id}") chat_logger.info("=" * 80) task_lock = get_task_lock(project_id) - chat_logger.info(f"[STOP-BUTTON] Task lock retrieved, task_lock.id: {task_lock.id}, task_lock.status: {task_lock.status}") + chat_logger.info( + "[STOP-BUTTON] Task lock retrieved," + f" task_lock.id: {task_lock.id}," + " task_lock.status:" + f" {task_lock.status}" + ) try: - # Queue the skip task action - this will preserve context for multi-turn + # Queue the skip task action - this will + # preserve context for multi-turn skip_task_action = ActionSkipTaskData(project_id=project_id) - chat_logger.info(f"[STOP-BUTTON] Queueing ActionSkipTaskData (preserves context, marks as done)") + chat_logger.info( + "[STOP-BUTTON] Queueing" + " ActionSkipTaskData" + " (preserves context," + " marks as done)" + ) asyncio.run(task_lock.put_queue(skip_task_action)) - chat_logger.info(f"[STOP-BUTTON] ✅ Skip request queued - task will stop gracefully and preserve context") + chat_logger.info( + "[STOP-BUTTON] Skip request" + " queued - task will stop" + " gracefully and preserve context" + ) return Response(status_code=201) except Exception as e: - chat_logger.error(f"[STOP-BUTTON] Error skipping task for project_id: {project_id}: {e}") + chat_logger.error( + "[STOP-BUTTON] Error skipping" + " task for" + f" project_id: {project_id}:" + f" {e}" + ) raise UserException(code.error, f"Failed to skip task: {str(e)}") diff --git a/backend/app/controller/health_controller.py b/backend/app/controller/health_controller.py index f093f963..83c0480a 100644 --- a/backend/app/controller/health_controller.py +++ b/backend/app/controller/health_controller.py @@ -12,9 +12,10 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +import logging + from fastapi import APIRouter from pydantic import BaseModel -import logging logger = logging.getLogger("health_controller") @@ -28,9 +29,15 @@ class HealthResponse(BaseModel): @router.get("/health", name="health check", response_model=HealthResponse) async def health_check(): - """Health check endpoint for verifying backend is ready to accept requests.""" + """Health check endpoint for verifying backend + is ready to accept requests.""" logger.debug("Health check requested") response = HealthResponse(status="ok", service="eigent") - logger.debug("Health check completed", extra={"status": response.status, "service": response.service}) + logger.debug( + "Health check completed", + extra={ + "status": response.status, + "service": response.service + } + ) return response - diff --git a/backend/app/controller/model_controller.py b/backend/app/controller/model_controller.py index 3a5aa008..d444f6d0 100644 --- a/backend/app/controller/model_controller.py +++ b/backend/app/controller/model_controller.py @@ -12,16 +12,16 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -from fastapi import APIRouter, HTTPException -from pydantic import BaseModel, Field, field_validator -from app.component.model_validation import create_agent -from app.model.chat import PLATFORM_MAPPING -from camel.types import ModelType -from app.component.error_format import normalize_error_to_openai_format import logging -logger = logging.getLogger("model_controller") +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel, Field, field_validator +from app.component.error_format import normalize_error_to_openai_format +from app.component.model_validation import create_agent +from app.model.chat import PLATFORM_MAPPING + +logger = logging.getLogger("model_controller") router = APIRouter() @@ -31,8 +31,12 @@ class ValidateModelRequest(BaseModel): model_type: str = Field("GPT_4O_MINI", description="Model type") api_key: str | None = Field(None, description="API key") url: str | None = Field(None, description="Model URL") - model_config_dict: dict | None = Field(None, description="Model config dict") - extra_params: dict | None = Field(None, description="Extra model parameters") + model_config_dict: dict | None = Field( + None, description="Model config dict" + ) + extra_params: dict | None = Field( + None, description="Extra model parameters" + ) @field_validator("model_platform") @classmethod @@ -56,11 +60,25 @@ async def validate_model(request: ValidateModelRequest): has_custom_url = request.url is not None has_config = request.model_config_dict is not None - logger.info("Model validation started", extra={"platform": platform, "model_type": model_type, "has_url": has_custom_url, "has_config": has_config}) + logger.info( + "Model validation started", + extra={ + "platform": platform, + "model_type": model_type, + "has_url": has_custom_url, + "has_config": has_config + } + ) # API key validation if request.api_key is not None and str(request.api_key).strip() == "": - logger.warning("Model validation failed: empty API key", extra={"platform": platform, "model_type": model_type}) + logger.warning( + "Model validation failed: empty API key", + extra={ + "platform": platform, + "model_type": model_type + } + ) raise HTTPException( status_code=400, detail={ @@ -77,7 +95,13 @@ async def validate_model(request: ValidateModelRequest): try: extra = request.extra_params or {} - logger.debug("Creating agent for validation", extra={"platform": platform, "model_type": model_type}) + logger.debug( + "Creating agent for validation", + extra={ + "platform": platform, + "model_type": model_type + } + ) agent = create_agent( platform, model_type, @@ -87,7 +111,13 @@ async def validate_model(request: ValidateModelRequest): **extra, ) - logger.debug("Agent created, executing test step", extra={"platform": platform, "model_type": model_type}) + logger.debug( + "Agent created, executing test step", + extra={ + "platform": platform, + "model_type": model_type + } + ) response = agent.step( input_message=""" Get the content of https://www.camel-ai.org, @@ -97,10 +127,17 @@ async def validate_model(request: ValidateModelRequest): """ ) - except Exception as e: # Normalize error to OpenAI-style error structure - logger.error("Model validation failed", extra={"platform": platform, "model_type": model_type, "error": str(e)}, exc_info=True) + logger.error( + "Model validation failed", + extra={ + "platform": platform, + "model_type": model_type, + "error": str(e) + }, + exc_info=True + ) message, error_code, error_obj = normalize_error_to_openai_format(e) raise HTTPException( @@ -111,7 +148,7 @@ async def validate_model(request: ValidateModelRequest): "error": error_obj, } ) - + # Check validation results is_valid = bool(response) is_tool_calls = False @@ -119,21 +156,35 @@ async def validate_model(request: ValidateModelRequest): if response and hasattr(response, "info") and response.info: tool_calls = response.info.get("tool_calls", []) if tool_calls and len(tool_calls) > 0: - is_tool_calls = ( - tool_calls[0].result - == "Tool execution completed successfully for https://www.camel-ai.org, Website Content: Welcome to CAMEL AI!" + expected = ( + "Tool execution completed" + " successfully for" + " https://www.camel-ai.org," + " Website Content:" + " Welcome to CAMEL AI!" ) + is_tool_calls = (tool_calls[0].result == expected) + no_tool_msg = ( + "This model doesn't support tool calls." + " please try with another model." + ) result = ValidateModelResponse( is_valid=is_valid, is_tool_calls=is_tool_calls, - message="Validation Success" - if is_tool_calls - else "This model doesn't support tool calls. please try with another model.", + message="Validation Success" if is_tool_calls else no_tool_msg, error_code=None, error=None, ) - logger.info("Model validation completed", extra={"platform": platform, "model_type": model_type, "is_valid": is_valid, "is_tool_calls": is_tool_calls}) + logger.info( + "Model validation completed", + extra={ + "platform": platform, + "model_type": model_type, + "is_valid": is_valid, + "is_tool_calls": is_tool_calls + } + ) return result diff --git a/backend/app/controller/task_controller.py b/backend/app/controller/task_controller.py index c4bec86b..90b4793b 100644 --- a/backend/app/controller/task_controller.py +++ b/backend/app/controller/task_controller.py @@ -12,28 +12,29 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= +import asyncio +import logging from typing import Literal + from dotenv import load_dotenv from fastapi import APIRouter, Response from pydantic import BaseModel + +from app.component.environment import sanitize_env_path, set_user_env_path from app.model.chat import NewAgent, UpdateData from app.service.task import ( Action, ActionNewAgent, + ActionStartData, ActionStopData, ActionTakeControl, - ActionStartData, ActionUpdateTaskData, get_task_lock, task_locks, ) -import asyncio -from app.component.environment import set_user_env_path, sanitize_env_path -import logging logger = logging.getLogger("task_controller") - router = APIRouter() @@ -48,10 +49,26 @@ def start(id: str): @router.put("/task/{id}", name="update task") def put(id: str, data: UpdateData): - logger.info("Updating task", extra={"task_id": id, "task_items_count": len(data.task)}) - logger.debug("Update task data", extra={"task_id": id, "data": data.model_dump_json()}) + logger.info( + "Updating task", + extra={ + "task_id": id, + "task_items_count": len(data.task) + } + ) + logger.debug( + "Update task data", + extra={ + "task_id": id, + "data": data.model_dump_json() + } + ) task_lock = get_task_lock(id) - asyncio.run(task_lock.put_queue(ActionUpdateTaskData(action=Action.update_task, data=data))) + asyncio.run( + task_lock.put_queue( + ActionUpdateTaskData(action=Action.update_task, data=data) + ) + ) logger.info("Task updated successfully", extra={"task_id": id}) return Response(status_code=201) @@ -62,25 +79,55 @@ class TakeControl(BaseModel): @router.put("/task/{id}/take-control", name="take control pause or resume") def take_control(id: str, data: TakeControl): - logger.info("Task control action", extra={"task_id": id, "action": data.action}) + logger.info( + "Task control action", extra={ + "task_id": id, + "action": data.action + } + ) task_lock = get_task_lock(id) asyncio.run(task_lock.put_queue(ActionTakeControl(action=data.action))) - logger.info("Task control action completed", extra={"task_id": id, "action": data.action}) + logger.info( + "Task control action completed", + extra={ + "task_id": id, + "action": data.action + } + ) return Response(status_code=204) @router.post("/task/{id}/add-agent", name="add new agent") def add_agent(id: str, data: NewAgent): - logger.info("Adding new agent to task", extra={"task_id": id, "agent_name": data.name}) - logger.debug("New agent data", extra={"task_id": id, "agent_data": data.model_dump_json()}) + logger.info( + "Adding new agent to task", + extra={ + "task_id": id, + "agent_name": data.name + } + ) + logger.debug( + "New agent data", + extra={ + "task_id": id, + "agent_data": data.model_dump_json() + } + ) # Set user-specific environment path for this thread set_user_env_path(data.env_path) # Load environment with validated path safe_env_path = sanitize_env_path(data.env_path) if safe_env_path: load_dotenv(dotenv_path=safe_env_path) - asyncio.run(get_task_lock(id).put_queue(ActionNewAgent(**data.model_dump()))) - logger.info("Agent added to task", extra={"task_id": id, "agent_name": data.name}) + asyncio.run( + get_task_lock(id).put_queue(ActionNewAgent(**data.model_dump())) + ) + logger.info( + "Agent added to task", extra={ + "task_id": id, + "agent_name": data.name + } + ) return Response(status_code=204) diff --git a/backend/app/controller/tool_controller.py b/backend/app/controller/tool_controller.py index c34cc22a..573800b2 100644 --- a/backend/app/controller/tool_controller.py +++ b/backend/app/controller/tool_controller.py @@ -12,25 +12,19 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -from typing import Optional -from fastapi import APIRouter, HTTPException -from pydantic import BaseModel -from app.utils.toolkit.notion_mcp_toolkit import NotionMCPToolkit -from app.utils.toolkit.google_calendar_toolkit import GoogleCalendarToolkit -from app.utils.toolkit.linkedin_toolkit import LinkedInToolkit -from app.utils.oauth_state_manager import oauth_state_manager import logging - - -from camel.toolkits.hybrid_browser_toolkit.hybrid_browser_toolkit_ts import ( - HybridBrowserToolkit as BaseHybridBrowserToolkit, -) -from app.utils.cookie_manager import CookieManager - - import os import time -import uuid +from typing import Optional + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from app.utils.cookie_manager import CookieManager +from app.utils.oauth_state_manager import oauth_state_manager +from app.utils.toolkit.google_calendar_toolkit import GoogleCalendarToolkit +from app.utils.toolkit.linkedin_toolkit import LinkedInToolkit +from app.utils.toolkit.notion_mcp_toolkit import NotionMCPToolkit class LinkedInTokenRequest(BaseModel): @@ -58,7 +52,8 @@ async def install_tool(tool: str): """ if tool == "notion": try: - # Use a dummy task_id for installation, as this is just for pre-authentication + # Use a dummy task_id for installation, + # as this is just for pre-authentication toolkit = NotionMCPToolkit("install_auth") try: @@ -66,10 +61,15 @@ async def install_tool(tool: str): await toolkit.connect() # Get available tools to verify connection - tools = [tool_func.func.__name__ for tool_func in - toolkit.get_tools()] + tools = [ + tool_func.func.__name__ + for tool_func in toolkit.get_tools() + ] logger.info( - f"Successfully pre-instantiated {tool} toolkit with {len(tools)} tools") + "Successfully pre-instantiated" + f" {tool} toolkit with" + f" {len(tools)} tools" + ) # Disconnect, authentication info is saved await toolkit.disconnect() @@ -77,35 +77,54 @@ async def install_tool(tool: str): return { "success": True, "tools": tools, - "message": f"Successfully installed and authenticated {tool} toolkit", + "message": + f"Successfully installed and authenticated {tool} toolkit", "count": len(tools), "toolkit_name": "NotionMCPToolkit" } except Exception as connect_error: logger.warning( - f"Could not connect to {tool} MCP server: {connect_error}") - # Even if connection fails, mark as installed so user can use it later + f"Could not connect to {tool} MCP server: {connect_error}" + ) + # Even if connection fails, mark as + # installed so user can use it later return { - "success": True, + "success": + True, "tools": [], - "message": f"{tool} toolkit installed but not connected. Will connect when needed.", - "count": 0, - "toolkit_name": "NotionMCPToolkit", - "warning": "Could not connect to Notion MCP server. You may need to authenticate when using the tool." + "message": + f"{tool} toolkit installed but" + " not connected. Will connect" + " when needed.", + "count": + 0, + "toolkit_name": + "NotionMCPToolkit", + "warning": + "Could not connect to Notion" + " MCP server. You may need to" + " authenticate when using" + " the tool." } except Exception as e: logger.error(f"Failed to install {tool} toolkit: {e}") raise HTTPException( - status_code=500, - detail=f"Failed to install {tool}: {str(e)}" + status_code=500, detail=f"Failed to install {tool}: {str(e)}" ) elif tool == "google_calendar": try: # Try to initialize toolkit - will succeed if credentials exist try: toolkit = GoogleCalendarToolkit("install_auth") - tools = [tool_func.func.__name__ for tool_func in toolkit.get_tools()] - logger.info(f"Successfully initialized Google Calendar toolkit with {len(tools)} tools") + tools = [ + tool_func.func.__name__ + for tool_func in toolkit.get_tools() + ] + logger.info( + "Successfully initialized Google" + " Calendar toolkit with" + f" {len(tools)} tools" + ) return { "success": True, @@ -116,24 +135,36 @@ async def install_tool(tool: str): } except ValueError as auth_error: # No credentials - need authorization - logger.info(f"No credentials found, starting authorization: {auth_error}") + logger.info( + "No credentials found, starting" + f" authorization: {auth_error}" + ) # Start background authorization in a new thread - logger.info("Starting background Google Calendar authorization") + logger.info( + "Starting background Google Calendar authorization" + ) GoogleCalendarToolkit.start_background_auth("install_auth") return { - "success": False, - "status": "authorizing", - "message": "Authorization required. Browser should open automatically. Complete authorization and try installing again.", - "toolkit_name": "GoogleCalendarToolkit", - "requires_auth": True + "success": + False, + "status": + "authorizing", + "message": + "Authorization required. Browser" + " should open automatically." + " Complete authorization and" + " try installing again.", + "toolkit_name": + "GoogleCalendarToolkit", + "requires_auth": + True } except Exception as e: logger.error(f"Failed to install {tool} toolkit: {e}") raise HTTPException( - status_code=500, - detail=f"Failed to install {tool}: {str(e)}" + status_code=500, detail=f"Failed to install {tool}: {str(e)}" ) elif tool == "linkedin": try: @@ -143,17 +174,28 @@ async def install_tool(tool: str): if LinkedInToolkit.is_token_expired(): logger.info("LinkedIn token has expired") return { - "success": False, - "status": "token_expired", - "message": "LinkedIn token has expired. Please re-authenticate via OAuth.", - "toolkit_name": "LinkedInToolkit", - "requires_auth": True, - "oauth_url": "/api/oauth/linkedin/login" + "success": + False, + "status": + "token_expired", + "message": + "LinkedIn token has expired." + " Please re-authenticate" + " via OAuth.", + "toolkit_name": + "LinkedInToolkit", + "requires_auth": + True, + "oauth_url": + "/api/oauth/linkedin/login" } try: toolkit = LinkedInToolkit("install_auth") - tools = [tool_func.func.__name__ for tool_func in toolkit.get_tools()] + tools = [ + tool_func.func.__name__ + for tool_func in toolkit.get_tools() + ] # Try to get profile to verify token is valid profile = toolkit.get_profile_safe() @@ -163,10 +205,22 @@ async def install_tool(tool: str): if LinkedInToolkit.is_token_expiring_soon(): token_info = LinkedInToolkit.get_token_info() if token_info and token_info.get("expires_at"): - days_remaining = (token_info["expires_at"] - int(time.time())) // (24 * 60 * 60) - token_warning = f"Token expires in {days_remaining} days. Consider re-authenticating soon." + days_remaining = ( + token_info["expires_at"] - int(time.time()) + ) // (24 * 60 * 60) + token_warning = ( + "Token expires in" + f" {days_remaining}" + " days. Consider" + " re-authenticating" + " soon." + ) - logger.info(f"Successfully initialized LinkedIn toolkit with {len(tools)} tools") + logger.info( + "Successfully initialized" + " LinkedIn toolkit with" + f" {len(tools)} tools" + ) result = { "success": True, "tools": tools, @@ -182,12 +236,20 @@ async def install_tool(tool: str): logger.warning(f"LinkedIn token may be invalid: {e}") # Token exists but may be expired/invalid return { - "success": False, - "status": "token_invalid", - "message": "LinkedIn token may be expired or invalid. Please re-authenticate via OAuth.", - "toolkit_name": "LinkedInToolkit", - "requires_auth": True, - "oauth_url": "/api/oauth/linkedin/login" + "success": + False, + "status": + "token_invalid", + "message": + "LinkedIn token may be expired" + " or invalid. Please" + " re-authenticate via OAuth.", + "toolkit_name": + "LinkedInToolkit", + "requires_auth": + True, + "oauth_url": + "/api/oauth/linkedin/login" } else: # No credentials - need OAuth authorization @@ -195,7 +257,8 @@ async def install_tool(tool: str): return { "success": False, "status": "not_configured", - "message": "LinkedIn OAuth required. Redirect user to OAuth login.", + "message": + "LinkedIn OAuth required. Redirect user to OAuth login.", "toolkit_name": "LinkedInToolkit", "requires_auth": True, "oauth_url": "/api/oauth/linkedin/login" @@ -203,13 +266,18 @@ async def install_tool(tool: str): except Exception as e: logger.error(f"Failed to install {tool} toolkit: {e}") raise HTTPException( - status_code=500, - detail=f"Failed to install {tool}: {str(e)}" + status_code=500, detail=f"Failed to install {tool}: {str(e)}" ) else: raise HTTPException( status_code=404, - detail=f"Tool '{tool}' not found. Available tools: ['notion', 'google_calendar', 'linkedin']" + detail=( + f"Tool '{tool}' not found." + " Available tools:" + " ['notion'," + " 'google_calendar'," + " 'linkedin']" + ) ) @@ -224,26 +292,47 @@ async def list_available_tools(): return { "tools": [ { - "name": "notion", - "display_name": "Notion MCP", - "description": "Notion workspace integration for reading and managing Notion pages", - "toolkit_class": "NotionMCPToolkit", - "requires_auth": True - }, - { - "name": "google_calendar", - "display_name": "Google Calendar", - "description": "Google Calendar integration for managing events and schedules", - "toolkit_class": "GoogleCalendarToolkit", - "requires_auth": True - }, - { - "name": "linkedin", - "display_name": "LinkedIn", - "description": "LinkedIn integration for creating posts, managing profile, and social media automation", - "toolkit_class": "LinkedInToolkit", - "requires_auth": True, - "oauth_url": "/api/oauth/linkedin/login" + "name": + "notion", + "display_name": + "Notion MCP", + "description": + "Notion workspace integration" + " for reading and managing" + " Notion pages", + "toolkit_class": + "NotionMCPToolkit", + "requires_auth": + True + }, { + "name": + "google_calendar", + "display_name": + "Google Calendar", + "description": + "Google Calendar integration" + " for managing events" + " and schedules", + "toolkit_class": + "GoogleCalendarToolkit", + "requires_auth": + True + }, { + "name": + "linkedin", + "display_name": + "LinkedIn", + "description": + "LinkedIn integration for" + " creating posts, managing" + " profile, and social media" + " automation", + "toolkit_class": + "LinkedInToolkit", + "requires_auth": + True, + "oauth_url": + "/api/oauth/linkedin/login" } ] } @@ -323,8 +412,8 @@ async def uninstall_tool(tool: str): if tool == "notion": try: - import hashlib import glob + import hashlib # Calculate the hash for Notion MCP URL # mcp-remote uses MD5 hash of the URL to generate file names @@ -348,13 +437,21 @@ async def uninstall_tool(tool: str): try: os.remove(file_path) deleted_files.append(file_path) - logger.info(f"Removed Notion auth file: {file_path}") + logger.info( + f"Removed Notion auth file: {file_path}" + ) except Exception as e: - logger.warning(f"Failed to remove {file_path}: {e}") + logger.warning( + f"Failed to remove {file_path}: {e}" + ) message = f"Successfully uninstalled {tool}" if deleted_files: - message += f" and cleaned up {len(deleted_files)} authentication file(s)" + message += ( + " and cleaned up" + f" {len(deleted_files)}" + " authentication file(s)" + ) return { "success": True, @@ -373,16 +470,31 @@ async def uninstall_tool(tool: str): # Clean up Google Calendar token directories (user-scoped + legacy) token_dirs = set() try: - token_dirs.add(os.path.dirname(GoogleCalendarToolkit._build_canonical_token_path())) + token_dirs.add( + os.path.dirname( + GoogleCalendarToolkit._build_canonical_token_path() + ) + ) except Exception as e: - logger.warning(f"Failed to resolve canonical Google Calendar token path: {e}") + logger.warning( + "Failed to resolve canonical" + " Google Calendar token" + f" path: {e}" + ) - token_dirs.add(os.path.join(os.path.expanduser("~"), ".eigent", "tokens", "google_calendar")) + token_dirs.add( + os.path.join( + os.path.expanduser("~"), ".eigent", "tokens", + "google_calendar" + ) + ) for token_dir in token_dirs: if os.path.exists(token_dir): shutil.rmtree(token_dir) - logger.info(f"Removed Google Calendar token directory: {token_dir}") + logger.info( + f"Removed Google Calendar token directory: {token_dir}" + ) # Clear OAuth state manager cache (this is the key fix!) # This removes the cached credentials from memory @@ -390,14 +502,20 @@ async def uninstall_tool(tool: str): if state: if state.status in ["pending", "authorizing"]: state.cancel() - logger.info("Cancelled ongoing Google Calendar authorization") + logger.info( + "Cancelled ongoing Google Calendar authorization" + ) # Clear the state completely to remove cached credentials oauth_state_manager._states.pop("google_calendar", None) logger.info("Cleared Google Calendar OAuth state cache") return { - "success": True, - "message": f"Successfully uninstalled {tool} and cleaned up authentication tokens" + "success": + True, + "message": + "Successfully uninstalled" + f" {tool} and cleaned up" + " authentication tokens" } except Exception as e: logger.error(f"Failed to uninstall {tool}: {e}") @@ -412,13 +530,18 @@ async def uninstall_tool(tool: str): if success: return { - "success": True, - "message": f"Successfully uninstalled {tool} and cleaned up authentication tokens" + "success": + True, + "message": + "Successfully uninstalled" + f" {tool} and cleaned up" + " authentication tokens" } else: return { "success": True, - "message": f"Uninstalled {tool} (no tokens found to clean up)" + "message": + f"Uninstalled {tool} (no tokens found to clean up)" } except Exception as e: logger.error(f"Failed to uninstall {tool}: {e}") @@ -429,7 +552,13 @@ async def uninstall_tool(tool: str): else: raise HTTPException( status_code=404, - detail=f"Tool '{tool}' not found. Available tools: ['notion', 'google_calendar', 'linkedin']" + detail=( + f"Tool '{tool}' not found." + " Available tools:" + " ['notion'," + " 'google_calendar'," + " 'linkedin']" + ) ) @@ -438,7 +567,8 @@ async def save_linkedin_token(token_request: LinkedInTokenRequest): r"""Save LinkedIn OAuth token after successful authorization. Args: - token_request: Token data containing access_token and optionally refresh_token + token_request: Token data containing + access_token and optionally refresh_token Returns: Save result with tool information @@ -453,7 +583,10 @@ async def save_linkedin_token(token_request: LinkedInTokenRequest): # Verify the token works by initializing toolkit try: toolkit = LinkedInToolkit("install_auth") - tools = [tool_func.func.__name__ for tool_func in toolkit.get_tools()] + tools = [ + tool_func.func.__name__ + for tool_func in toolkit.get_tools() + ] profile = toolkit.get_profile_safe() return { @@ -472,16 +605,14 @@ async def save_linkedin_token(token_request: LinkedInTokenRequest): } else: raise HTTPException( - status_code=500, - detail="Failed to save LinkedIn token" + status_code=500, detail="Failed to save LinkedIn token" ) except HTTPException: raise except Exception as e: logger.error(f"Failed to save LinkedIn token: {e}") raise HTTPException( - status_code=500, - detail=f"Failed to save token: {str(e)}" + status_code=500, detail=f"Failed to save token: {str(e)}" ) @@ -508,8 +639,11 @@ async def get_linkedin_status(): is_expiring_soon = LinkedInToolkit.is_token_expiring_soon() result = { - "authenticated": True, - "status": "expired" if is_expired else ("expiring_soon" if is_expiring_soon else "valid"), + "authenticated": + True, + "status": + "expired" if is_expired else + ("expiring_soon" if is_expiring_soon else "valid"), } if token_info: @@ -528,7 +662,12 @@ async def get_linkedin_status(): result["message"] = "Token has expired. Please re-authenticate." result["oauth_url"] = "/api/oauth/linkedin/login" elif is_expiring_soon: - result["message"] = f"Token expires in {result.get('days_remaining', 'unknown')} days. Consider re-authenticating." + days = result.get('days_remaining', 'unknown') + result["message"] = ( + f"Token expires in {days}" + " days. Consider" + " re-authenticating." + ) result["oauth_url"] = "/api/oauth/linkedin/login" else: result["message"] = "LinkedIn is connected and token is valid." @@ -537,45 +676,51 @@ async def get_linkedin_status(): except Exception as e: logger.error(f"Failed to get LinkedIn status: {e}") raise HTTPException( - status_code=500, - detail=f"Failed to get status: {str(e)}" + status_code=500, detail=f"Failed to get status: {str(e)}" ) @router.post("/browser/login", name="open browser for login") async def open_browser_login(): """ - Open an Electron-based Chrome browser for user login with a dedicated user data directory + Open an Electron-based Chrome browser for + user login with a dedicated user data directory Returns: Browser session information """ try: - import subprocess - import platform import socket - import json - + import subprocess + # Use fixed profile name for persistent logins (no port suffix) session_id = "user_login" cdp_port = 9223 # IMPORTANT: Use dedicated profile for tool_controller browser # This is the SOURCE OF TRUTH for login data - # On Eigent startup, this data will be copied to WebView partition (one-way sync) - browser_profiles_base = os.path.expanduser("~/.eigent/browser_profiles") - user_data_dir = os.path.join(browser_profiles_base, "profile_user_login") + # On Eigent startup, this data will be copied + # to WebView partition (one-way sync) + browser_profiles_base = os.path.expanduser( + "~/.eigent/browser_profiles" + ) + user_data_dir = os.path.join( + browser_profiles_base, "profile_user_login" + ) os.makedirs(user_data_dir, exist_ok=True) logger.info( - f"Creating browser session {session_id} with profile at: {user_data_dir}") - + "Creating browser session" + f" {session_id} with profile" + f" at: {user_data_dir}" + ) + # Check if browser is already running on this port def is_port_in_use(port): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(('localhost', port)) == 0 - + if is_port_in_use(cdp_port): logger.info(f"Browser already running on port {cdp_port}") return { @@ -583,31 +728,38 @@ async def open_browser_login(): "session_id": session_id, "user_data_dir": user_data_dir, "cdp_port": cdp_port, - "message": "Browser already running. Use existing window to log in.", + "message": + "Browser already running. Use existing window to log in.", "note": "Your login data will be saved in the profile." } - + # Use static Electron browser script - electron_script_path = os.path.join(os.path.dirname(__file__), "electron_browser.cjs") + electron_script_path = os.path.join( + os.path.dirname(__file__), "electron_browser.cjs" + ) # Verify script exists if not os.path.exists(electron_script_path): - raise FileNotFoundError(f"Electron browser script not found: {electron_script_path}") + raise FileNotFoundError( + f"Electron browser script not found: {electron_script_path}" + ) electron_cmd = "npx" electron_args = [ - electron_cmd, - "electron", - electron_script_path, - user_data_dir, - str(cdp_port), - "https://www.google.com" + electron_cmd, "electron", electron_script_path, user_data_dir, + str(cdp_port), "https://www.google.com" ] - + # Get the app's directory to run npx in the right context - app_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) - - logger.info(f"[PROFILE USER LOGIN] Launching Electron browser with CDP on port {cdp_port}") + app_dir = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + ) + + logger.info( + "[PROFILE USER LOGIN] Launching" + " Electron browser with CDP" + f" on port {cdp_port}" + ) logger.info(f"[PROFILE USER LOGIN] Working directory: {app_dir}") logger.info(f"[PROFILE USER LOGIN] userData path: {user_data_dir}") logger.info(f"[PROFILE USER LOGIN] Electron args: {electron_args}") @@ -632,29 +784,43 @@ async def open_browser_login(): import asyncio asyncio.create_task(log_electron_output()) - + # Wait a bit for Electron to start import asyncio await asyncio.sleep(3) - logger.info(f"[PROFILE USER LOGIN] Electron browser launched with PID {process.pid}") + logger.info( + "[PROFILE USER LOGIN] Electron" + " browser launched with" + f" PID {process.pid}" + ) return { - "success": True, - "session_id": session_id, - "user_data_dir": user_data_dir, - "cdp_port": cdp_port, - "pid": process.pid, - "chrome_version": "130.0.6723.191", # Electron 33's Chrome version - "message": "Electron browser opened successfully. Please log in to your accounts.", - "note": "The browser will remain open for you to log in. Your login data will be saved in the profile." + "success": + True, + "session_id": + session_id, + "user_data_dir": + user_data_dir, + "cdp_port": + cdp_port, + "pid": + process.pid, + "chrome_version": + "130.0.6723.191", # Electron 33's Chrome version + "message": + "Electron browser opened successfully." + " Please log in to your accounts.", + "note": + "The browser will remain open for" + " you to log in. Your login data" + " will be saved in the profile." } except Exception as e: logger.error(f"Failed to open Electron browser for login: {e}") raise HTTPException( - status_code=500, - detail=f"Failed to open browser: {str(e)}" + status_code=500, detail=f"Failed to open browser: {str(e)}" ) @@ -674,21 +840,37 @@ async def list_cookie_domains(search: str = None): user_data_base = os.path.expanduser("~/.eigent/browser_profiles") user_data_dir = os.path.join(user_data_base, "profile_user_login") - logger.info(f"[COOKIES CHECK] Tool controller user_data_dir: {user_data_dir}") - logger.info(f"[COOKIES CHECK] Tool controller user_data_dir exists: {os.path.exists(user_data_dir)}") + logger.info( + f"[COOKIES CHECK] Tool controller user_data_dir: {user_data_dir}" + ) + logger.info( + "[COOKIES CHECK] Tool controller" + " user_data_dir exists:" + f" {os.path.exists(user_data_dir)}" + ) # Check partition path - partition_path = os.path.join(user_data_dir, "Partitions", "user_login") + partition_path = os.path.join( + user_data_dir, "Partitions", "user_login" + ) logger.info(f"[COOKIES CHECK] partition path: {partition_path}") - logger.info(f"[COOKIES CHECK] partition exists: {os.path.exists(partition_path)}") + logger.info( + "[COOKIES CHECK] partition" + f" exists: {os.path.exists(partition_path)}" + ) # Check cookies file cookies_file = os.path.join(partition_path, "Cookies") logger.info(f"[COOKIES CHECK] cookies file: {cookies_file}") - logger.info(f"[COOKIES CHECK] cookies file exists: {os.path.exists(cookies_file)}") + logger.info( + "[COOKIES CHECK] cookies file" + f" exists: {os.path.exists(cookies_file)}" + ) if os.path.exists(cookies_file): stat = os.stat(cookies_file) - logger.info(f"[COOKIES CHECK] cookies file size: {stat.st_size} bytes") + logger.info( + f"[COOKIES CHECK] cookies file size: {stat.st_size} bytes" + ) # Try to read actual cookie count try: @@ -697,16 +879,24 @@ async def list_cookie_domains(search: str = None): cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM cookies") count = cursor.fetchone()[0] - logger.info(f"[COOKIES CHECK] actual cookie count in database: {count}") + logger.info( + f"[COOKIES CHECK] actual cookie count in database: {count}" + ) conn.close() except Exception as e: - logger.error(f"[COOKIES CHECK] failed to read cookie count: {e}") + logger.error( + f"[COOKIES CHECK] failed to read cookie count: {e}" + ) if not os.path.exists(user_data_dir): return { - "success": True, + "success": + True, "domains": [], - "message": "No browser profile found. Please login first using /browser/login." + "message": + "No browser profile found." + " Please login first" + " using /browser/login." } cookie_manager = CookieManager(user_data_dir) @@ -726,8 +916,7 @@ async def list_cookie_domains(search: str = None): except Exception as e: logger.error(f"Failed to list cookie domains: {e}") raise HTTPException( - status_code=500, - detail=f"Failed to list cookies: {str(e)}" + status_code=500, detail=f"Failed to list cookies: {str(e)}" ) @@ -749,7 +938,11 @@ async def get_domain_cookies(domain: str): if not os.path.exists(user_data_dir): raise HTTPException( status_code=404, - detail="No browser profile found. Please login first using /browser/login." + detail=( + "No browser profile found." + " Please login first using" + " /browser/login." + ) ) cookie_manager = CookieManager(user_data_dir) @@ -767,8 +960,7 @@ async def get_domain_cookies(domain: str): except Exception as e: logger.error(f"Failed to get cookies for domain {domain}: {e}") raise HTTPException( - status_code=500, - detail=f"Failed to get cookies: {str(e)}" + status_code=500, detail=f"Failed to get cookies: {str(e)}" ) @@ -790,7 +982,11 @@ async def delete_domain_cookies(domain: str): if not os.path.exists(user_data_dir): raise HTTPException( status_code=404, - detail="No browser profile found. Please login first using /browser/login." + detail=( + "No browser profile found." + " Please login first using" + " /browser/login." + ) ) cookie_manager = CookieManager(user_data_dir) @@ -812,8 +1008,7 @@ async def delete_domain_cookies(domain: str): except Exception as e: logger.error(f"Failed to delete cookies for domain {domain}: {e}") raise HTTPException( - status_code=500, - detail=f"Failed to delete cookies: {str(e)}" + status_code=500, detail=f"Failed to delete cookies: {str(e)}" ) @@ -831,8 +1026,7 @@ async def delete_all_cookies(): if not os.path.exists(user_data_dir): raise HTTPException( - status_code=404, - detail="No browser profile found." + status_code=404, detail="No browser profile found." ) cookie_manager = CookieManager(user_data_dir) @@ -845,8 +1039,7 @@ async def delete_all_cookies(): } else: raise HTTPException( - status_code=500, - detail="Failed to delete all cookies" + status_code=500, detail="Failed to delete all cookies" ) except HTTPException: @@ -854,6 +1047,5 @@ async def delete_all_cookies(): except Exception as e: logger.error(f"Failed to delete all cookies: {e}") raise HTTPException( - status_code=500, - detail=f"Failed to delete cookies: {str(e)}" + status_code=500, detail=f"Failed to delete cookies: {str(e)}" ) diff --git a/backend/app/exception/__init__.py b/backend/app/exception/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/exception/exception.py b/backend/app/exception/exception.py index d51a5281..4c1137ab 100644 --- a/backend/app/exception/exception.py +++ b/backend/app/exception/exception.py @@ -12,23 +12,28 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= + class UserException(Exception): + def __init__(self, code: int, description: str): self.code = code self.description = description class TokenException(Exception): + def __init__(self, code: int, text: str): self.code = code self.text = text class NoPermissionException(Exception): + def __init__(self, text: str): self.text = text class ProgramException(Exception): + def __init__(self, text: str): self.text = text diff --git a/backend/app/exception/handler.py b/backend/app/exception/handler.py index bfa95c43..12371519 100644 --- a/backend/app/exception/handler.py +++ b/backend/app/exception/handler.py @@ -12,17 +12,22 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -import traceback +import logging + from fastapi import Request from fastapi.encoders import jsonable_encoder from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse + from app import api from app.component import code -from app.exception.exception import NoPermissionException, ProgramException, TokenException -from app.component.pydantic.i18n import trans, get_language -from app.exception.exception import UserException -import logging +from app.component.pydantic.i18n import get_language, trans +from app.exception.exception import ( + NoPermissionException, + ProgramException, + TokenException, + UserException, +) logger = logging.getLogger("exception_handler") @@ -32,11 +37,13 @@ async def request_exception(request: Request, e: RequestValidationError): if (lang := get_language(request.headers.get("Accept-Language"))) is None: lang = "en_US" logger.warning(f"Validation error on {request.url.path}: {e.errors()}") - + return JSONResponse( content={ - "code": code.form_error, - "error": jsonable_encoder(trans.translate(list(e.errors()), locale=lang)), + "code": + code.form_error, + "error": + jsonable_encoder(trans.translate(list(e.errors()), locale=lang)), } ) @@ -58,16 +65,27 @@ async def no_permission(request: Request, exception: NoPermissionException): logger.warning(f"No permission on {request.url.path}: {exception.text}") return JSONResponse( status_code=200, - content={"code": code.no_permission_error, "text": exception.text}, + content={ + "code": code.no_permission_error, + "text": exception.text + }, ) @api.exception_handler(ProgramException) -async def program_exception(request: Request, exception: NoPermissionException): - logger.error(f"Program exception on {request.url.path}: {exception.text}", exc_info=True) +async def program_exception( + request: Request, exception: NoPermissionException +): + logger.error( + f"Program exception on {request.url.path}: {exception.text}", + exc_info=True + ) return JSONResponse( status_code=200, - content={"code": code.program_error, "text": exception.text}, + content={ + "code": code.program_error, + "text": exception.text + }, ) diff --git a/backend/app/middleware/__init__.py b/backend/app/middleware/__init__.py index cb43c349..75732f94 100644 --- a/backend/app/middleware/__init__.py +++ b/backend/app/middleware/__init__.py @@ -12,9 +12,9 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -from app import api -from app.component.babel import babel_configs from fastapi_babel import BabelMiddleware +from app import api +from app.component.babel import babel_configs api.add_middleware(BabelMiddleware, babel_configs=babel_configs) diff --git a/backend/app/model/__init__.py b/backend/app/model/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/model/chat.py b/backend/app/model/chat.py index c1e53efa..c2976f1d 100644 --- a/backend/app/model/chat.py +++ b/backend/app/model/chat.py @@ -12,14 +12,17 @@ # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -from enum import Enum import json -from pathlib import Path -import re -from typing import Literal -from pydantic import BaseModel, Field, field_validator -from camel.types import ModelType, RoleType import logging +import re +from enum import Enum +from pathlib import Path +from typing import Literal + +from camel.types import ModelType, RoleType +from pydantic import BaseModel, Field, field_validator + +from app.agent.prompt import DEFAULT_SUMMARY_PROMPT logger = logging.getLogger("chat_model") @@ -42,7 +45,8 @@ class QuestionAnalysisResult(BaseModel): ) answer: str | None = Field( default=None, - description="Direct answer for simple questions. None for complex tasks." + description="Direct answer for simple questions." + " None for complex tasks." ) @@ -53,6 +57,7 @@ PLATFORM_MAPPING = { "ModelArk": "openai-compatible-model", } + class Chat(BaseModel): task_id: str project_id: str @@ -62,7 +67,8 @@ class Chat(BaseModel): model_platform: str model_type: str api_key: str - api_url: str | None = None # for cloud version, user don't need to set api_url + # for cloud version, user don't need to set api_url + api_url: str | None = None language: str = "en" browser_port: int = 9222 max_retries: int = 3 @@ -71,17 +77,13 @@ class Chat(BaseModel): bun_mirror: str = "" uvx_mirror: str = "" env_path: str | None = None - summary_prompt: str = ( - "After completing the task, please generate a summary of the entire task completion. " - "The summary must be enclosed in tags and include:\n" - "1. A confirmation of task completion, referencing the original goal.\n" - "2. A high-level overview of the work performed and the final outcome.\n" - "3. A bulleted list of key results or accomplishments.\n" - "Adopt a confident and professional tone." - ) + summary_prompt: str = DEFAULT_SUMMARY_PROMPT new_agents: list["NewAgent"] = [] - extra_params: dict | None = None # For provider-specific parameters like Azure - search_config: dict[str, str] | None = None # User-specific search engine configurations (e.g., GOOGLE_API_KEY, SEARCH_ENGINE_ID) + # For provider-specific parameters like Azure + extra_params: dict | None = None + # User-specific search engine configurations + # (e.g., GOOGLE_API_KEY, SEARCH_ENGINE_ID) + search_config: dict[str, str] | None = None @field_validator("model_platform") @classmethod @@ -99,18 +101,27 @@ class Chat(BaseModel): return model_type def get_bun_env(self) -> dict[str, str]: - return {"NPM_CONFIG_REGISTRY": self.bun_mirror} if self.bun_mirror else {} + return { + "NPM_CONFIG_REGISTRY": self.bun_mirror + } if self.bun_mirror else {} def get_uvx_env(self) -> dict[str, str]: - return {"UV_DEFAULT_INDEX": self.uvx_mirror, "PIP_INDEX_URL": self.uvx_mirror} if self.uvx_mirror else {} + return { + "UV_DEFAULT_INDEX": self.uvx_mirror, + "PIP_INDEX_URL": self.uvx_mirror + } if self.uvx_mirror else {} def is_cloud(self): return self.api_url is not None and "44.247.171.124" in self.api_url def file_save_path(self, path: str | None = None): - email = re.sub(r'[\\/*?:"<>|\s]', "_", self.email.split("@")[0]).strip(".") + email = re.sub(r'[\\/*?:"<>|\s]', "_", + self.email.split("@")[0]).strip(".") # Use project-based structure: project_{project_id}/task_{task_id} - save_path = Path.home() / "eigent" / email / f"project_{self.project_id}" / f"task_{self.task_id}" + save_path = ( + Path.home() / "eigent" / email / f"project_{self.project_id}" / + f"task_{self.task_id}" + ) if path is not None: save_path = save_path / path save_path.mkdir(parents=True, exist_ok=True) @@ -138,7 +149,8 @@ class UpdateData(BaseModel): class AgentModelConfig(BaseModel): - """Optional per-agent model configuration to override the default task model.""" + """Optional per-agent model configuration + to override the default task model.""" model_platform: str | None = None model_type: str | None = None api_key: str | None = None @@ -147,13 +159,15 @@ class AgentModelConfig(BaseModel): def has_custom_config(self) -> bool: """Check if any custom model configuration is set.""" - return any([ - self.model_platform is not None, - self.model_type is not None, - self.api_key is not None, - self.api_url is not None, - self.extra_params is not None, - ]) + return any( + [ + self.model_platform is not None, + self.model_type is not None, + self.api_key is not None, + self.api_url is not None, + self.extra_params is not None, + ] + ) class NewAgent(BaseModel): @@ -177,6 +191,7 @@ class AddTaskRequest(BaseModel): class RemoveTaskRequest(BaseModel): task_id: str + def sse_json(step: str, data): res_format = {"step": step, "data": data} return f"data: {json.dumps(res_format, ensure_ascii=False)}\n\n" diff --git a/backend/app/router.py b/backend/app/router.py index e1515266..b73ab2f2 100644 --- a/backend/app/router.py +++ b/backend/app/router.py @@ -11,27 +11,35 @@ # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= - """ Centralized router registration for the Eigent API. -All routers are explicitly registered here for better visibility and maintainability. +All routers are explicitly registered here +for better visibility and maintainability. """ -from fastapi import FastAPI -from app.controller import chat_controller, model_controller, task_controller, tool_controller, health_controller import logging +from fastapi import FastAPI + +from app.controller import ( + chat_controller, + health_controller, + model_controller, + task_controller, + tool_controller, +) + logger = logging.getLogger("router") def register_routers(app: FastAPI, prefix: str = "") -> None: """ Register all API routers with their respective prefixes and tags. - + This replaces the auto-discovery mechanism for better: - Visibility: See all routes in one place - Maintainability: Easy to add/remove routes - Debugging: Clear registration order and configuration - + Args: app: FastAPI application instance prefix: Optional global prefix for all routes (e.g., "/api") @@ -43,9 +51,11 @@ def register_routers(app: FastAPI, prefix: str = "") -> None: "description": "Health check endpoint for service readiness" }, { - "router": chat_controller.router, + "router": + chat_controller.router, "tags": ["chat"], - "description": "Chat session management, improvements, and human interactions" + "description": + "Chat session management, improvements, and human interactions" }, { "router": model_controller.router, @@ -53,26 +63,28 @@ def register_routers(app: FastAPI, prefix: str = "") -> None: "description": "Model validation and configuration" }, { - "router": task_controller.router, + "router": + task_controller.router, "tags": ["task"], - "description": "Task lifecycle management (start, stop, update, control)" + "description": + "Task lifecycle management (start, stop, update, control)" }, { "router": tool_controller.router, - "tags": ["tool"], + "tags": ["tool"], "description": "Tool installation and management" }, ] - + for config in routers_config: app.include_router( - config["router"], - prefix=prefix, - tags=config["tags"] + config["router"], prefix=prefix, tags=config["tags"] ) route_count = len(config["router"].routes) logger.info( - f"Registered {config['tags'][0]} router: {route_count} routes - {config['description']}" + f"Registered {config['tags'][0]} router:" + f" {route_count} routes -" + f" {config['description']}" ) - - logger.info(f"Total routers registered: {len(routers_config)}") \ No newline at end of file + + logger.info(f"Total routers registered: {len(routers_config)}") diff --git a/backend/app/service/__init__.py b/backend/app/service/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/utils/listen/__init__.py b/backend/app/utils/listen/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/utils/server/__init__.py b/backend/app/utils/server/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/utils/toolkit/__init__.py b/backend/app/utils/toolkit/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index c8f5b5ea..56df4c8a 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -54,7 +54,9 @@ def pytest_addoption(parser: pytest.Parser) -> None: ) -def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None: +def pytest_collection_modifyitems( + config: pytest.Config, items: list[pytest.Item] +) -> None: if config.getoption("--llm-test-only"): skip_fast = pytest.mark.skip(reason="Skipped for llm test only") for item in items: @@ -118,13 +120,13 @@ def mock_openai_api(): with patch("openai.OpenAI") as mock_openai: mock_client = MagicMock() mock_openai.return_value = mock_client - + # Mock chat completion mock_response = MagicMock() mock_response.choices = [MagicMock()] mock_response.choices[0].message.content = "Test response" mock_response.usage.total_tokens = 100 - + mock_client.chat.completions.create.return_value = mock_response yield mock_client @@ -148,19 +150,21 @@ def mock_camel_agent(): agent = MagicMock() # Use MagicMock instead of AsyncMock agent.role_name = "test_agent" agent.agent_id = "test_agent_123" - + # Make step method return proper structure with both .msg and .msgs[0] mock_response = MagicMock() mock_message = MagicMock() mock_message.content = "Test agent response" mock_message.parsed = None - + mock_response.msg = mock_message - mock_response.msgs = [mock_message] # msgs[0] should point to the same content + mock_response.msgs = [ + mock_message + ] # msgs[0] should point to the same content mock_response.info = {"usage": {"total_tokens": 50}} - + agent.step.return_value = mock_response - + agent.astep = AsyncMock() agent.astep.return_value.msg.content = "Test async agent response" agent.astep.return_value.msg.parsed = None @@ -196,17 +200,18 @@ def mock_request(): def app() -> FastAPI: """Create FastAPI test application.""" from fastapi import FastAPI + from app.controller.chat_controller import router as chat_router from app.controller.model_controller import router as model_router from app.controller.task_controller import router as task_router from app.controller.tool_controller import router as tool_router - + app = FastAPI() app.include_router(chat_router) app.include_router(model_router) app.include_router(task_router) app.include_router(tool_router) - + return app @@ -253,14 +258,14 @@ def mock_worker_with_agent(): worker.agent_id = "test_agent_123" worker.astep = AsyncMock() worker.step = MagicMock() - + # Mock response structure mock_response = MagicMock() mock_response.msg = MagicMock() mock_response.msg.content = "Test worker response" mock_response.msg.parsed = {"result": "test"} mock_response.info = {"usage": {"total_tokens": 50}} - + worker.astep.return_value = mock_response worker.step.return_value = mock_response return worker @@ -285,7 +290,7 @@ def mock_environment_variables(): "file_save_path": "/tmp/test_files", "browser_port": "8080" } - + with patch.dict(os.environ, env_vars, clear=False): yield env_vars @@ -327,15 +332,15 @@ async def async_mock_agent() -> AsyncGenerator[AsyncMock, None]: agent = AsyncMock() agent.role_name = "async_test_agent" agent.agent_id = "async_test_agent_456" - + # Mock async step method mock_response = MagicMock() mock_response.msg.content = "Async test response" mock_response.msg.parsed = {"test": "data"} mock_response.info = {"usage": {"total_tokens": 75}} - + agent.astep.return_value = mock_response - + yield agent @@ -349,7 +354,8 @@ def pytest_configure(config): "markers", "model_backend: mark test as requiring model backend" ) config.addinivalue_line( - "markers", "very_slow: mark test as very slow (requires full test mode)" + "markers", + "very_slow: mark test as very slow (requires full test mode)" ) config.addinivalue_line( "markers", "optional: mark test as optional (skipped in fast mode)" @@ -357,6 +363,4 @@ def pytest_configure(config): config.addinivalue_line( "markers", "integration: mark test as integration test" ) - config.addinivalue_line( - "markers", "unit: mark test as unit test" - ) + config.addinivalue_line("markers", "unit: mark test as unit test")