eigent/backend/tests/unit/utils/test_agent.py
Ahmed Awelkair A 33829f2c1e
feat: copyright header pre-commit (#1053)
Co-authored-by: 4pmtong <web_chentong@163.com>
Co-authored-by: Wendong-Fan <133094783+Wendong-Fan@users.noreply.github.com>
Co-authored-by: Wendong-Fan <w3ndong.fan@gmail.com>
2026-01-24 08:22:20 +08:00

1059 lines
47 KiB
Python

# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
import asyncio
from threading import Event
from typing import List
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import uuid
from camel.agents import ChatAgent
from camel.agents._types import ToolCallRequest
from camel.messages import BaseMessage
from camel.models import BaseModelBackend
from camel.responses import ChatAgentResponse
from camel.toolkits import FunctionTool
from camel.types.agents import ToolCallingRecord
from app.utils.agent import (
ListenChatAgent,
agent_model,
question_confirm_agent,
task_summary_agent,
developer_agent,
browser_agent,
document_agent,
multi_modal_agent,
social_medium_agent,
mcp_agent,
get_toolkits,
get_mcp_tools
)
from app.model.chat import Chat, McpServers
from app.service.task import ActionActivateAgentData, ActionDeactivateAgentData
@pytest.mark.unit
class TestListenChatAgent:
"""Test cases for ListenChatAgent class."""
def test_listen_chat_agent_initialization(self):
"""Test ListenChatAgent initialization."""
api_task_id = "test_api_task_123"
agent_name = "TestAgent"
with patch('app.utils.agent.get_task_lock') as mock_get_lock, \
patch('camel.models.ModelFactory.create') as mock_create_model:
mock_task_lock = MagicMock()
mock_get_lock.return_value = mock_task_lock
# Mock the model backend creation
mock_backend = MagicMock()
mock_backend.model_type = "gpt-4"
mock_backend.current_model = MagicMock()
mock_backend.current_model.model_type = "gpt-4"
mock_create_model.return_value = mock_backend
agent = ListenChatAgent(
api_task_id=api_task_id,
agent_name=agent_name,
model="gpt-4", # Use string instead of mock
system_message="You are a helpful assistant",
tools=[],
agent_id="test_agent_123"
)
assert agent.api_task_id == api_task_id
assert agent.agent_name == agent_name
assert isinstance(agent, ChatAgent)
def test_listen_chat_agent_step_with_string_input(self, mock_task_lock):
"""Test ListenChatAgent step method with string input."""
api_task_id = "test_api_task_123"
agent_name = "TestAgent"
with patch('app.utils.agent.get_task_lock', return_value=mock_task_lock), \
patch('camel.models.ModelFactory.create') as mock_create_model, \
patch('asyncio.create_task') as mock_create_task:
# Mock the model backend creation
mock_backend = MagicMock()
mock_backend.model_type = "gpt-4"
mock_backend.current_model = MagicMock()
mock_backend.current_model.model_type = "gpt-4"
mock_create_model.return_value = mock_backend
agent = ListenChatAgent(
api_task_id=api_task_id,
agent_name=agent_name,
model="gpt-4"
)
agent.process_task_id = "test_process_task"
# Mock the parent step method and create proper response
mock_response = MagicMock(spec=ChatAgentResponse)
mock_response.msg = MagicMock()
mock_response.msg.content = "Test response content"
mock_response.info = {"usage": {"total_tokens": 100}}
with patch.object(ChatAgent, 'step', return_value=mock_response) as mock_parent_step:
result = agent.step("Test input message")
assert result is mock_response
# Check that step was called with the input message (don't assert on response_format param)
mock_parent_step.assert_called_once()
args, kwargs = mock_parent_step.call_args
assert args[0] == "Test input message"
# Should queue activation notification
mock_task_lock.put_queue.assert_called()
def test_listen_chat_agent_step_with_base_message_input(self, mock_task_lock):
"""Test ListenChatAgent step method with BaseMessage input."""
api_task_id = "test_api_task_123"
agent_name = "TestAgent"
with patch('app.utils.agent.get_task_lock', return_value=mock_task_lock), \
patch('camel.models.ModelFactory.create') as mock_create_model, \
patch('asyncio.create_task') as mock_create_task:
# Mock the model backend creation
mock_backend = MagicMock()
mock_backend.model_type = "gpt-4"
mock_backend.current_model = MagicMock()
mock_backend.current_model.model_type = "gpt-4"
mock_create_model.return_value = mock_backend
agent = ListenChatAgent(
api_task_id=api_task_id,
agent_name=agent_name,
model="gpt-4"
)
agent.agent_id = "test_agent_456"
agent.process_task_id = "test_process_task"
# Create mock BaseMessage
mock_message = MagicMock(spec=BaseMessage)
mock_message.content = "Base message content"
# Create proper mock response
mock_response = MagicMock(spec=ChatAgentResponse)
mock_response.msg = MagicMock()
mock_response.msg.content = "Test response content"
mock_response.info = {"usage": {"total_tokens": 100}}
with patch.object(ChatAgent, 'step', return_value=mock_response) as mock_parent_step:
result = agent.step(mock_message)
assert result is mock_response
# Check that step was called with the mock message (don't assert on response_format param)
mock_parent_step.assert_called_once()
args, kwargs = mock_parent_step.call_args
assert args[0] is mock_message
# Should queue activation with message content
mock_task_lock.put_queue.assert_called()
# Just verify put_queue was called - don't check internal data structure details
@pytest.mark.asyncio
async def test_listen_chat_agent_astep(self, mock_task_lock):
"""Test ListenChatAgent async step method."""
api_task_id = "test_api_task_123"
agent_name = "TestAgent"
with patch('app.utils.agent.get_task_lock', return_value=mock_task_lock), \
patch('camel.models.ModelFactory.create') as mock_create_model, \
patch('asyncio.create_task') as mock_create_task:
# Mock the model backend creation
mock_backend = MagicMock()
mock_backend.model_type = "gpt-4"
mock_backend.current_model = MagicMock()
mock_backend.current_model.model_type = "gpt-4"
mock_create_model.return_value = mock_backend
agent = ListenChatAgent(
api_task_id=api_task_id,
agent_name=agent_name,
model="gpt-4"
)
agent.process_task_id = "test_process_task"
# Mock the parent astep method
mock_response = MagicMock()
mock_response.msg = MagicMock()
mock_response.msg.content = "Test response message"
mock_response.info = {"usage": {"total_tokens": 100}}
with patch.object(ChatAgent, 'astep', return_value=mock_response) as mock_parent_astep:
result = await agent.astep("Test async input")
assert result is mock_response
# Check that astep was called with the input message (don't assert on response_format param)
mock_parent_astep.assert_called_once()
args, kwargs = mock_parent_astep.call_args
assert args[0] == "Test async input"
# Verify that task lock put_queue was called
mock_task_lock.put_queue.assert_called()
def test_listen_chat_agent_execute_tool(self, mock_task_lock):
"""Test ListenChatAgent _execute_tool method."""
api_task_id = "test_api_task_123"
agent_name = "TestAgent"
with patch('app.utils.agent.get_task_lock', return_value=mock_task_lock), \
patch('camel.models.ModelFactory.create') as mock_create_model, \
patch('asyncio.create_task') as mock_create_task:
# Mock the model backend creation
mock_backend = MagicMock()
mock_backend.model_type = "gpt-4"
mock_backend.current_model = MagicMock()
mock_backend.current_model.model_type = "gpt-4"
mock_create_model.return_value = mock_backend
agent = ListenChatAgent(
api_task_id=api_task_id,
agent_name=agent_name,
model="gpt-4"
)
# Create a mock tool and add it to _internal_tools
mock_tool = MagicMock(spec=FunctionTool)
mock_tool.func = MagicMock()
mock_tool.return_value = "test_result"
agent._internal_tools = {"test_tool": mock_tool}
# Mock tool call request
tool_call_request = MagicMock(spec=ToolCallRequest)
tool_call_request.tool_name = "test_tool"
tool_call_request.id = "tool_call_123"
tool_call_request.tool_call_id = "tool_call_123"
tool_call_request.args = {"arg1": "value1"}
# Mock tool calling record
mock_record = MagicMock(spec=ToolCallingRecord)
with patch.object(agent, '_record_tool_calling', return_value=mock_record) as mock_record_func:
result = agent._execute_tool(tool_call_request)
assert result is mock_record
mock_record_func.assert_called_once()
# Should queue toolkit activation and deactivation notifications
assert mock_task_lock.put_queue.call_count >= 2
@pytest.mark.asyncio
async def test_listen_chat_agent_aexecute_tool(self, mock_task_lock):
"""Test ListenChatAgent _aexecute_tool method."""
api_task_id = "test_api_task_123"
agent_name = "TestAgent"
with patch('app.utils.agent.get_task_lock', return_value=mock_task_lock), \
patch('camel.models.ModelFactory.create') as mock_create_model:
# Mock the model backend creation
mock_backend = MagicMock()
mock_backend.model_type = "gpt-4"
mock_backend.current_model = MagicMock()
mock_backend.current_model.model_type = "gpt-4"
mock_create_model.return_value = mock_backend
agent = ListenChatAgent(
api_task_id=api_task_id,
agent_name=agent_name,
model="gpt-4"
)
# Create a mock tool and add it to _internal_tools
mock_tool = MagicMock(spec=FunctionTool)
mock_tool.func = AsyncMock()
mock_tool.return_value = "test_async_result"
agent._internal_tools = {"test_async_tool": mock_tool}
tool_call_request = MagicMock(spec=ToolCallRequest)
tool_call_request.tool_name = "test_async_tool"
tool_call_request.id = "async_tool_call_123"
tool_call_request.tool_call_id = "async_tool_call_123"
tool_call_request.args = {"arg1": "value1"}
mock_record = MagicMock(spec=ToolCallingRecord)
with patch.object(agent, '_record_tool_calling', return_value=mock_record) as mock_record_func:
result = await agent._aexecute_tool(tool_call_request)
assert result is mock_record
mock_record_func.assert_called_once()
# Should queue toolkit activation and deactivation notifications
assert mock_task_lock.put_queue.call_count >= 2
def test_listen_chat_agent_clone(self, mock_task_lock):
"""Test ListenChatAgent clone method."""
api_task_id = "test_api_task_123"
agent_name = "TestAgent"
with patch('app.utils.agent.get_task_lock', return_value=mock_task_lock), \
patch('camel.models.ModelFactory.create') as mock_create_model:
# Mock the model backend creation
mock_backend = MagicMock()
mock_backend.model_type = "gpt-4"
mock_backend.current_model = MagicMock()
mock_backend.current_model.model_type = "gpt-4"
mock_backend.models = "gpt-4" # String instead of list to avoid list processing
mock_backend.scheduling_strategy = MagicMock()
mock_backend.scheduling_strategy.__name__ = "round_robin"
mock_create_model.return_value = mock_backend
# Mock the clone process by patching ListenChatAgent constructor for clone
cloned_agent = MagicMock()
cloned_agent.process_task_id = "test_process_task"
# First create the initial agent
agent = ListenChatAgent(
api_task_id=api_task_id,
agent_name=agent_name,
model="gpt-4"
)
# Set up necessary attributes for cloning
agent._original_system_message = "test system message"
agent.memory = MagicMock()
agent.memory.window_size = 10
agent.memory.get_context_creator = MagicMock()
agent.memory.get_context_creator.return_value.token_limit = 4000
agent._output_language = "en"
agent._external_tool_schemas = {}
agent.response_terminators = []
agent.max_iteration = None
agent.agent_id = "test_agent_id"
agent.stop_event = None
agent.tool_execution_timeout = None
agent.mask_tool_output = False
agent.pause_event = None
agent.prune_tool_calls_from_memory = False
# Now mock the constructor for the clone call
with patch('app.utils.agent.ListenChatAgent', return_value=cloned_agent) as mock_clone_constructor, \
patch.object(agent, '_clone_tools', return_value=([], [])):
result = agent.clone(with_memory=True)
assert result is cloned_agent
mock_clone_constructor.assert_called_once()
def test_listen_chat_agent_with_tools(self, mock_task_lock):
"""Test ListenChatAgent with tools."""
api_task_id = "test_api_task_123"
agent_name = "TestAgent"
# Mock tool
mock_tool = MagicMock(spec=FunctionTool)
tools = [mock_tool]
with patch('app.utils.agent.get_task_lock', return_value=mock_task_lock), \
patch('camel.models.ModelFactory.create') as mock_create_model:
# Mock the model backend creation
mock_backend = MagicMock()
mock_backend.model_type = "gpt-4"
mock_backend.current_model = MagicMock()
mock_backend.current_model.model_type = "gpt-4"
mock_create_model.return_value = mock_backend
agent = ListenChatAgent(
api_task_id=api_task_id,
agent_name=agent_name,
model="gpt-4",
tools=tools
)
# Mock function_list attribute that is expected to exist
agent.function_list = [mock_tool]
assert len(agent.function_list) == 1 # Should have the tool
# Check that tools were passed to parent class
mock_task_lock.put_queue.assert_not_called() # No immediate action for tool setup
def test_listen_chat_agent_with_pause_event(self, mock_task_lock):
"""Test ListenChatAgent with pause event."""
api_task_id = "test_api_task_123"
agent_name = "TestAgent"
pause_event = asyncio.Event()
with patch('app.utils.agent.get_task_lock', return_value=mock_task_lock), \
patch('camel.models.ModelFactory.create') as mock_create_model:
# Mock the model backend creation
mock_backend = MagicMock()
mock_backend.model_type = "gpt-4"
mock_backend.current_model = MagicMock()
mock_backend.current_model.model_type = "gpt-4"
mock_create_model.return_value = mock_backend
agent = ListenChatAgent(
api_task_id=api_task_id,
agent_name=agent_name,
model="gpt-4",
pause_event=pause_event
)
assert agent.pause_event is pause_event
@pytest.mark.unit
class TestAgentFactoryFunctions:
"""Test cases for agent factory functions."""
def test_agent_model_creation(self, sample_chat_data):
"""Test agent_model creates agent properly."""
options = Chat(**sample_chat_data)
agent_name = "TestAgent"
system_prompt = "You are a helpful assistant"
# Setup task lock in the registry before calling agent_model
from app.service.task import task_locks
mock_task_lock = MagicMock()
task_locks[options.task_id] = mock_task_lock
with patch('app.utils.agent.ListenChatAgent') as mock_listen_agent, \
patch('app.utils.agent.ModelFactory.create') as mock_model_factory, \
patch('app.utils.agent.HumanToolkit.get_can_use_tools', return_value=[]), \
patch('asyncio.create_task') as mock_create_task:
mock_agent = MagicMock()
mock_listen_agent.return_value = mock_agent
mock_model_factory.return_value = MagicMock()
result = agent_model(agent_name, system_prompt, options, [])
assert result is mock_agent
mock_listen_agent.assert_called_once()
def test_question_confirm_agent_creation(self, sample_chat_data):
"""Test question_confirm_agent creates specialized agent."""
options = Chat(**sample_chat_data)
# Setup task lock in the registry before calling agent function
from app.service.task import task_locks
mock_task_lock = MagicMock()
task_locks[options.task_id] = mock_task_lock
with patch('app.utils.agent.agent_model') as mock_agent_model, \
patch('asyncio.create_task'):
mock_agent = MagicMock()
mock_agent_model.return_value = mock_agent
result = question_confirm_agent(options)
assert result is mock_agent
mock_agent_model.assert_called_once()
# Check that it was called with question confirmation prompt
call_args = mock_agent_model.call_args
assert "question_confirm_agent" in call_args[0][0] # agent_name
assert "analyze a user's request" in call_args[0][1] # system_prompt
def test_task_summary_agent_creation(self, sample_chat_data):
"""Test task_summary_agent creates specialized agent."""
options = Chat(**sample_chat_data)
# Setup task lock in the registry before calling agent function
from app.service.task import task_locks
mock_task_lock = MagicMock()
task_locks[options.task_id] = mock_task_lock
with patch('app.utils.agent.agent_model') as mock_agent_model, \
patch('asyncio.create_task'):
mock_agent = MagicMock()
mock_agent_model.return_value = mock_agent
result = task_summary_agent(options)
assert result is mock_agent
mock_agent_model.assert_called_once()
# Check that it was called with task summary prompt
call_args = mock_agent_model.call_args
assert "task_summary_agent" in call_args[0][0] # agent_name
assert "task assistant" in call_args[0][1].lower() # system_prompt
@pytest.mark.asyncio
async def test_developer_agent_creation(self, sample_chat_data):
"""Test developer_agent creates agent with development tools."""
options = Chat(**sample_chat_data)
# Setup task lock in the registry before calling agent function
from app.service.task import task_locks
mock_task_lock = MagicMock()
task_locks[options.task_id] = mock_task_lock
with patch('app.utils.agent.agent_model') as mock_agent_model, \
patch('app.utils.agent.get_toolkits') as mock_get_toolkits, \
patch('asyncio.create_task'), \
patch('app.utils.agent.HumanToolkit') as mock_human_toolkit, \
patch('app.utils.agent.NoteTakingToolkit') as mock_note_toolkit, \
patch('app.utils.agent.WebDeployToolkit') as mock_web_toolkit, \
patch('app.utils.agent.ScreenshotToolkit') as mock_screenshot_toolkit, \
patch('app.utils.agent.TerminalToolkit') as mock_terminal_toolkit, \
patch('app.utils.agent.ToolkitMessageIntegration'):
# Mock all toolkit instances
mock_human_toolkit.get_can_use_tools.return_value = []
mock_note_toolkit.return_value.get_tools.return_value = []
mock_web_toolkit.return_value.get_tools.return_value = []
mock_screenshot_toolkit.return_value.get_tools.return_value = []
mock_terminal_toolkit.return_value.get_tools.return_value = []
mock_agent = MagicMock()
mock_agent_model.return_value = mock_agent
mock_get_toolkits.return_value = []
result = await developer_agent(options)
assert result is mock_agent
mock_agent_model.assert_called_once()
# Should have called with development-related tools
call_args = mock_agent_model.call_args
assert "developer_agent" in str(call_args[0][0]) # agent_name (enum contains this value)
tools_arg = call_args[0][3] # tools argument
assert isinstance(tools_arg, list)
def test_browser_agent_creation(self, sample_chat_data):
"""Test browser_agent creates agent with search tools."""
options = Chat(**sample_chat_data)
# Setup task lock in the registry before calling agent function
from app.service.task import task_locks
mock_task_lock = MagicMock()
task_locks[options.task_id] = mock_task_lock
with patch('app.utils.agent.agent_model') as mock_agent_model, \
patch('asyncio.create_task'), \
patch('app.utils.agent.HumanToolkit') as mock_human_toolkit, \
patch('app.utils.agent.HybridBrowserToolkit') as mock_browser_toolkit, \
patch('app.utils.agent.TerminalToolkit') as mock_terminal_toolkit, \
patch('app.utils.agent.NoteTakingToolkit') as mock_note_toolkit, \
patch('app.utils.agent.SearchToolkit') as mock_search_toolkit, \
patch('app.utils.agent.ToolkitMessageIntegration'), \
patch('uuid.uuid4') as mock_uuid:
# Mock all toolkit instances
mock_human_toolkit.get_can_use_tools.return_value = []
mock_browser_toolkit.return_value.get_tools.return_value = []
# Create a proper terminal toolkit mock
mock_terminal_instance = MagicMock()
mock_terminal_instance.shell_exec = MagicMock()
mock_terminal_toolkit.return_value = mock_terminal_instance
mock_note_toolkit.return_value.get_tools.return_value = []
mock_search_instance = MagicMock()
mock_search_instance.search_google = MagicMock()
mock_search_toolkit.return_value = mock_search_instance
mock_uuid.return_value.__getitem__ = lambda self, key: "test_session"
mock_agent = MagicMock()
mock_agent_model.return_value = mock_agent
result = browser_agent(options)
assert result is mock_agent
mock_agent_model.assert_called_once()
# Check that it was called with browser agent configuration
call_args = mock_agent_model.call_args
assert "browser_agent" in str(call_args[0][0]) # agent_name (enum contains this value)
# The system_prompt is a BaseMessage, so check its content attribute
system_message = call_args[0][1]
if hasattr(system_message, 'content'):
assert "search" in system_message.content.lower()
else:
assert "search" in str(system_message).lower() # system_prompt contains search
@pytest.mark.asyncio
async def test_document_agent_creation(self, sample_chat_data):
"""Test document_agent creates agent with document tools."""
options = Chat(**sample_chat_data)
# Setup task lock in the registry before calling agent function
from app.service.task import task_locks
mock_task_lock = MagicMock()
task_locks[options.task_id] = mock_task_lock
with patch('app.utils.agent.agent_model') as mock_agent_model, \
patch('app.utils.agent.get_toolkits') as mock_get_toolkits, \
patch('asyncio.create_task'), \
patch('app.utils.agent.HumanToolkit') as mock_human_toolkit, \
patch('app.utils.agent.FileToolkit') as mock_file_toolkit, \
patch('app.utils.agent.PPTXToolkit') as mock_pptx_toolkit, \
patch('app.utils.agent.MarkItDownToolkit') as mock_markdown_toolkit, \
patch('app.utils.agent.ExcelToolkit') as mock_excel_toolkit, \
patch('app.utils.agent.NoteTakingToolkit') as mock_note_toolkit, \
patch('app.utils.agent.TerminalToolkit') as mock_terminal_toolkit, \
patch('app.utils.agent.GoogleDriveMCPToolkit') as mock_gdrive_toolkit, \
patch('app.utils.agent.ToolkitMessageIntegration'):
# Mock all toolkit instances
mock_human_toolkit.get_can_use_tools.return_value = []
mock_file_toolkit.return_value.get_tools.return_value = []
mock_pptx_toolkit.return_value.get_tools.return_value = []
mock_markdown_toolkit.return_value.get_tools.return_value = []
mock_excel_toolkit.return_value.get_tools.return_value = []
mock_note_toolkit.return_value.get_tools.return_value = []
mock_terminal_toolkit.return_value.get_tools.return_value = []
mock_gdrive_toolkit.get_can_use_tools = AsyncMock(return_value=[])
mock_agent = MagicMock()
mock_agent_model.return_value = mock_agent
mock_get_toolkits.return_value = []
result = await document_agent(options)
assert result is mock_agent
mock_agent_model.assert_called_once()
# Should have called with document-related tools
call_args = mock_agent_model.call_args
assert "document_agent" in str(call_args[0][0]) # agent_name (enum contains this value)
def test_multi_modal_agent_creation(self, sample_chat_data):
"""Test multi_modal_agent creates agent with multimedia tools."""
options = Chat(**sample_chat_data)
# Setup task lock in the registry before calling agent function
from app.service.task import task_locks
mock_task_lock = MagicMock()
task_locks[options.task_id] = mock_task_lock
with patch('app.utils.agent.agent_model') as mock_agent_model, \
patch('asyncio.create_task'), \
patch('app.utils.agent.HumanToolkit') as mock_human_toolkit, \
patch('app.utils.agent.VideoDownloaderToolkit') as mock_video_toolkit, \
patch('app.utils.agent.ImageAnalysisToolkit') as mock_image_toolkit, \
patch('app.utils.agent.TerminalToolkit') as mock_terminal_toolkit, \
patch('app.utils.agent.NoteTakingToolkit') as mock_note_toolkit, \
patch('app.utils.agent.ToolkitMessageIntegration'):
# Mock all toolkit instances
mock_human_toolkit.get_can_use_tools.return_value = []
mock_video_toolkit.return_value.get_tools.return_value = []
mock_image_toolkit.return_value.get_tools.return_value = []
mock_terminal_toolkit.return_value.get_tools.return_value = []
mock_note_toolkit.return_value.get_tools.return_value = []
mock_agent = MagicMock()
mock_agent_model.return_value = mock_agent
result = multi_modal_agent(options)
assert result is mock_agent
mock_agent_model.assert_called_once()
# Check that it was called with multi-modal agent configuration
call_args = mock_agent_model.call_args
assert "multi_modal_agent" in str(call_args[0][0]) # agent_name (enum contains this value)
@pytest.mark.asyncio
async def test_social_medium_agent_creation(self, sample_chat_data):
"""Test social_medium_agent creates agent with social media tools."""
options = Chat(**sample_chat_data)
# Setup task lock in the registry before calling agent function
from app.service.task import task_locks
mock_task_lock = MagicMock()
task_locks[options.task_id] = mock_task_lock
with patch('app.utils.agent.agent_model') as mock_agent_model, \
patch('app.utils.agent.get_toolkits') as mock_get_toolkits, \
patch('asyncio.create_task'), \
patch('app.utils.agent.WhatsAppToolkit') as mock_whatsapp_toolkit, \
patch('app.utils.agent.TwitterToolkit') as mock_twitter_toolkit, \
patch('app.utils.agent.LinkedInToolkit') as mock_linkedin_toolkit, \
patch('app.utils.agent.RedditToolkit') as mock_reddit_toolkit, \
patch('app.utils.agent.NotionMCPToolkit') as mock_notion_mcp_toolkit, \
patch('app.utils.agent.GoogleGmailMCPToolkit') as mock_gmail_toolkit, \
patch('app.utils.agent.GoogleCalendarToolkit') as mock_calendar_toolkit, \
patch('app.utils.agent.HumanToolkit') as mock_human_toolkit, \
patch('app.utils.agent.TerminalToolkit') as mock_terminal_toolkit, \
patch('app.utils.agent.NoteTakingToolkit') as mock_note_toolkit:
# Mock all toolkit instances
mock_whatsapp_toolkit.get_can_use_tools.return_value = []
mock_twitter_toolkit.get_can_use_tools.return_value = []
mock_linkedin_toolkit.get_can_use_tools.return_value = []
mock_reddit_toolkit.get_can_use_tools.return_value = []
mock_notion_mcp_toolkit.get_can_use_tools = AsyncMock(return_value=[])
mock_gmail_toolkit.get_can_use_tools = AsyncMock(return_value=[])
mock_calendar_toolkit.get_can_use_tools.return_value = []
mock_human_toolkit.get_can_use_tools.return_value = []
mock_terminal_toolkit.return_value.get_tools.return_value = []
mock_note_toolkit.return_value.get_tools.return_value = []
mock_agent = MagicMock()
mock_agent_model.return_value = mock_agent
mock_get_toolkits.return_value = []
result = await social_medium_agent(options)
assert result is mock_agent
mock_agent_model.assert_called_once()
@pytest.mark.asyncio
async def test_mcp_agent_creation(self, sample_chat_data):
"""Test mcp_agent creates agent with MCP tools."""
options = Chat(**sample_chat_data)
# Setup task lock in the registry before calling agent function
from app.service.task import task_locks
mock_task_lock = MagicMock()
task_locks[options.task_id] = mock_task_lock
with patch('app.utils.agent.ListenChatAgent') as mock_listen_agent, \
patch('app.utils.agent.ModelFactory.create') as mock_model_factory, \
patch('asyncio.create_task'), \
patch('app.utils.agent.McpSearchToolkit') as mock_mcp_search_toolkit, \
patch('app.utils.agent.get_mcp_tools') as mock_get_mcp_tools:
# Mock toolkit instances
mock_mcp_search_toolkit.return_value.get_tools.return_value = []
mock_get_mcp_tools.return_value = []
mock_agent = MagicMock()
mock_listen_agent.return_value = mock_agent
mock_model_factory.return_value = MagicMock()
result = await mcp_agent(options)
assert result is mock_agent
mock_listen_agent.assert_called_once()
# Check that it was called with MCP agent configuration
call_args = mock_listen_agent.call_args
assert "mcp_agent" in str(call_args[0][1]) # agent_name (enum contains this value)
@pytest.mark.unit
class TestToolkitFunctions:
"""Test cases for toolkit utility functions."""
@pytest.mark.asyncio
async def test_get_toolkits_with_known_tools(self):
"""Test get_toolkits with known tool names."""
tools = ["search_toolkit", "terminal_toolkit", "file_write_toolkit"]
agent_name = "TestAgent"
api_task_id = "test_task_123"
with patch('app.utils.agent.SearchToolkit') as mock_search_toolkit, \
patch('app.utils.agent.TerminalToolkit') as mock_terminal_toolkit, \
patch('app.utils.agent.FileToolkit') as mock_file_toolkit:
# Mock toolkit instances - these should return tools directly from get_can_use_tools
mock_search_instance = MagicMock()
mock_search_instance.agent_name = agent_name
mock_search_tools = [MagicMock(), MagicMock()]
mock_search_instance.get_can_use_tools.return_value = mock_search_tools
mock_search_toolkit.return_value = mock_search_instance
mock_terminal_instance = MagicMock()
mock_terminal_instance.agent_name = agent_name
mock_terminal_tools = [MagicMock()]
mock_terminal_instance.get_can_use_tools.return_value = mock_terminal_tools
mock_terminal_toolkit.return_value = mock_terminal_instance
mock_file_instance = MagicMock()
mock_file_instance.agent_name = agent_name
mock_file_tools = [MagicMock()]
mock_file_instance.get_can_use_tools.return_value = mock_file_tools
mock_file_toolkit.return_value = mock_file_instance
# Mock the toolkit classes to have get_can_use_tools class method that returns the mock tools
mock_search_toolkit.get_can_use_tools = MagicMock(return_value=mock_search_tools)
mock_terminal_toolkit.get_can_use_tools = MagicMock(return_value=mock_terminal_tools)
mock_file_toolkit.get_can_use_tools = MagicMock(return_value=mock_file_tools)
result = await get_toolkits(tools, agent_name, api_task_id)
# The result should contain tools from the toolkits that match
assert isinstance(result, list)
# Since get_toolkits filters by known toolkit names, only matching ones should be included
assert len(result) >= 0 # Should have some tools if any match
@pytest.mark.asyncio
async def test_get_toolkits_with_unknown_tool(self):
"""Test get_toolkits with unknown tool name."""
tools = ["unknown_tool"]
agent_name = "TestAgent"
api_task_id = "test_task_123"
result = await get_toolkits(tools, agent_name, api_task_id)
# Should return empty list or handle unknown tools gracefully
assert isinstance(result, list)
@pytest.mark.asyncio
async def test_get_toolkits_empty_tools(self):
"""Test get_toolkits with empty tools list."""
tools = []
agent_name = "TestAgent"
api_task_id = "test_task_123"
result = await get_toolkits(tools, agent_name, api_task_id)
assert result == []
@pytest.mark.asyncio
async def test_get_mcp_tools_success(self):
"""Test get_mcp_tools with valid MCP server configuration."""
mcp_servers: McpServers = {
"mcpServers": {
"notion": {
"command": "npx",
"args": ["@modelcontextprotocol/server-notion"]
}
}
}
mock_tools = [MagicMock(), MagicMock()]
with patch('app.utils.agent.MCPToolkit') as mock_mcp_toolkit:
mock_toolkit_instance = MagicMock() # Use MagicMock instead of AsyncMock
mock_toolkit_instance.connect = AsyncMock()
mock_toolkit_instance.get_tools.return_value = mock_tools # This should return the tools directly
mock_mcp_toolkit.return_value = mock_toolkit_instance
result = await get_mcp_tools(mcp_servers)
# get_mcp_tools should return the tools directly
assert len(result) == 2
assert result == mock_tools
mock_mcp_toolkit.assert_called_once()
mock_toolkit_instance.connect.assert_called_once()
@pytest.mark.asyncio
async def test_get_mcp_tools_empty_servers(self):
"""Test get_mcp_tools with empty server configuration."""
mcp_servers: McpServers = {"mcpServers": {}}
result = await get_mcp_tools(mcp_servers)
assert result == []
@pytest.mark.asyncio
async def test_get_mcp_tools_connection_failure(self):
"""Test get_mcp_tools when MCP connection fails."""
mcp_servers: McpServers = {
"mcpServers": {
"failing_server": {
"command": "invalid_command"
}
}
}
with patch('app.utils.agent.MCPToolkit', side_effect=Exception("Connection failed")):
# Should handle connection failures gracefully
with pytest.raises(Exception):
await get_mcp_tools(mcp_servers)
@pytest.mark.integration
class TestAgentIntegration:
"""Integration tests for agent utilities."""
def setup_method(self):
"""Clean up before each test."""
from app.service.task import task_locks
task_locks.clear()
@pytest.mark.asyncio
async def test_full_agent_workflow(self, sample_chat_data):
"""Test complete agent creation and usage workflow."""
from app.service.task import task_locks
options = Chat(**sample_chat_data)
api_task_id = options.task_id
# Create task lock
mock_task_lock = MagicMock()
task_locks[api_task_id] = mock_task_lock
# Create agent
with patch('app.utils.agent.ModelFactory.create') as mock_model_factory, \
patch('asyncio.create_task'), \
patch('app.utils.agent.ListenChatAgent') as mock_listen_agent:
mock_model = MagicMock()
mock_model_factory.return_value = mock_model
mock_agent_instance = MagicMock()
mock_agent_instance.api_task_id = api_task_id
mock_listen_agent.return_value = mock_agent_instance
agent = agent_model("IntegrationAgent", "Test system prompt", options, [])
assert agent is mock_agent_instance
assert agent.api_task_id == api_task_id
# Test step operation
mock_response = MagicMock()
mock_response.msg = MagicMock()
mock_response.msg.content = "Test response"
mock_response.info = {"usage": {"total_tokens": 50}}
agent.step = MagicMock(return_value=mock_response)
result = agent.step("Test message")
assert result is mock_response
@pytest.mark.asyncio
async def test_agent_with_multiple_toolkits(self, sample_chat_data):
"""Test agent creation with multiple toolkits."""
options = Chat(**sample_chat_data)
tools = ["search_toolkit", "terminal_toolkit", "file_write_toolkit"]
# Setup task lock in the registry before calling agent function
from app.service.task import task_locks
mock_task_lock = MagicMock()
task_locks[options.task_id] = mock_task_lock
with patch('app.utils.agent.agent_model') as mock_agent_model, \
patch('app.utils.agent.get_toolkits') as mock_get_toolkits, \
patch('asyncio.create_task'), \
patch('app.utils.agent.HumanToolkit') as mock_human_toolkit, \
patch('app.utils.agent.NoteTakingToolkit') as mock_note_toolkit, \
patch('app.utils.agent.WebDeployToolkit') as mock_web_toolkit, \
patch('app.utils.agent.ScreenshotToolkit') as mock_screenshot_toolkit, \
patch('app.utils.agent.TerminalToolkit') as mock_terminal_toolkit, \
patch('app.utils.agent.ToolkitMessageIntegration'):
# Mock all toolkit instances
mock_human_toolkit.get_can_use_tools.return_value = []
mock_note_toolkit.return_value.get_tools.return_value = []
mock_web_toolkit.return_value.get_tools.return_value = []
mock_screenshot_toolkit.return_value.get_tools.return_value = []
mock_terminal_toolkit.return_value.get_tools.return_value = []
mock_tools = [MagicMock() for _ in range(5)] # Mock multiple tools
mock_get_toolkits.return_value = mock_tools
mock_agent = MagicMock()
mock_agent_model.return_value = mock_agent
result = await developer_agent(options)
assert result is mock_agent
mock_get_toolkits.assert_not_called() # developer_agent doesn't call get_toolkits
@pytest.mark.model_backend
class TestAgentWithLLM:
"""Tests that require LLM backend (marked for selective running)."""
@pytest.mark.asyncio
async def test_agent_with_real_model(self, sample_chat_data):
"""Test agent creation with real LLM model."""
options = Chat(**sample_chat_data)
# This test would use real model backends
# Marked as model_backend test for selective execution
assert True # Placeholder
@pytest.mark.very_slow
async def test_full_agent_conversation_workflow(self, sample_chat_data):
"""Test complete agent conversation workflow (very slow test)."""
options = Chat(**sample_chat_data)
# This test would run complete conversation workflow
# Marked as very_slow for execution only in full test mode
assert True # Placeholder
@pytest.mark.unit
class TestAgentErrorCases:
"""Test error cases and edge conditions for agent utilities."""
def test_listen_chat_agent_with_invalid_model(self):
"""Test ListenChatAgent with invalid model."""
api_task_id = "error_test_123"
agent_name = "ErrorAgent"
with patch('app.utils.agent.get_task_lock') as mock_get_lock, \
patch('camel.models.ModelFactory.create', side_effect=ValueError("Invalid model")):
mock_task_lock = MagicMock()
mock_get_lock.return_value = mock_task_lock
# Try to create agent with invalid model which should raise an error through ModelFactory
with pytest.raises(ValueError):
ListenChatAgent(
api_task_id=api_task_id,
agent_name=agent_name,
model="invalid_model_string" # Invalid model type
)
def test_agent_model_with_missing_options(self):
"""Test agent_model with missing required options."""
agent_name = "ErrorAgent"
system_prompt = "Test prompt"
# Missing required Chat options
with pytest.raises((AttributeError, KeyError)):
agent_model(agent_name, system_prompt, None, [])
@pytest.mark.asyncio
async def test_get_toolkits_with_toolkit_initialization_error(self):
"""Test get_toolkits when toolkit initialization fails."""
tools = ["search"]
agent_name = "ErrorAgent"
api_task_id = "error_test_123"
with patch('app.utils.agent.SearchToolkit', side_effect=Exception("Toolkit init failed")):
# Should handle toolkit initialization errors
result = await get_toolkits(tools, agent_name, api_task_id)
# Should return what it can or empty list
assert isinstance(result, list)
def test_listen_chat_agent_step_with_task_lock_error(self):
"""Test ListenChatAgent step when task lock retrieval fails."""
api_task_id = "error_test_123"
agent_name = "ErrorAgent"
with patch('app.utils.agent.get_task_lock', side_effect=Exception("Task lock not found")), \
patch('camel.models.ModelFactory.create') as mock_create_model:
# Mock the model backend creation
mock_backend = MagicMock()
mock_backend.model_type = "gpt-4"
mock_backend.current_model = MagicMock()
mock_backend.current_model.model_type = "gpt-4"
mock_create_model.return_value = mock_backend
agent = ListenChatAgent(
api_task_id=api_task_id,
agent_name=agent_name,
model="gpt-4"
)
# Should handle task lock errors gracefully
with pytest.raises(Exception):
agent.step("Test message")
@pytest.mark.asyncio
async def test_get_mcp_tools_with_malformed_config(self):
"""Test get_mcp_tools with malformed configuration."""
mcp_servers = {"invalid_key": "invalid_value"} # Malformed structure
with pytest.raises((KeyError, TypeError)):
await get_mcp_tools(mcp_servers)