eigent/backend/tests/unit/controller/test_task_controller.py
Ahmed Awelkair A 33829f2c1e
feat: copyright header pre-commit (#1053)
Co-authored-by: 4pmtong <web_chentong@163.com>
Co-authored-by: Wendong-Fan <133094783+Wendong-Fan@users.noreply.github.com>
Co-authored-by: Wendong-Fan <w3ndong.fan@gmail.com>
2026-01-24 08:22:20 +08:00

363 lines
14 KiB
Python

# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
from unittest.mock import MagicMock, patch
import pytest
from fastapi import Response
from fastapi.testclient import TestClient
from app.controller.task_controller import start, put, take_control, add_agent, TakeControl
from app.model.chat import NewAgent, UpdateData, TaskContent
from app.service.task import Action
@pytest.mark.unit
class TestTaskController:
"""Test cases for task controller endpoints."""
def test_start_task_success(self, mock_task_lock):
"""Test successful task start."""
task_id = "test_task_123"
with patch("app.controller.task_controller.get_task_lock", return_value=mock_task_lock), \
patch("asyncio.run") as mock_run:
response = start(task_id)
assert isinstance(response, Response)
assert response.status_code == 201
mock_run.assert_called_once()
def test_update_task_success(self, mock_task_lock):
"""Test successful task update."""
task_id = "test_task_123"
update_data = UpdateData(
task=[
TaskContent(id="subtask_1", content="Updated content 1"),
TaskContent(id="subtask_2", content="Updated content 2")
]
)
with patch("app.controller.task_controller.get_task_lock", return_value=mock_task_lock), \
patch("asyncio.run") as mock_run:
response = put(task_id, update_data)
assert isinstance(response, Response)
assert response.status_code == 201
mock_run.assert_called_once()
def test_take_control_pause_success(self, mock_task_lock):
"""Test successful task pause control."""
task_id = "test_task_123"
control_data = TakeControl(action=Action.pause)
with patch("app.controller.task_controller.get_task_lock", return_value=mock_task_lock), \
patch("asyncio.run") as mock_run:
response = take_control(task_id, control_data)
assert isinstance(response, Response)
assert response.status_code == 204
mock_run.assert_called_once()
def test_take_control_resume_success(self, mock_task_lock):
"""Test successful task resume control."""
task_id = "test_task_123"
control_data = TakeControl(action=Action.resume)
with patch("app.controller.task_controller.get_task_lock", return_value=mock_task_lock), \
patch("asyncio.run") as mock_run:
response = take_control(task_id, control_data)
assert isinstance(response, Response)
assert response.status_code == 204
mock_run.assert_called_once()
def test_add_agent_success(self, mock_task_lock):
"""Test successful agent addition."""
task_id = "test_task_123"
new_agent = NewAgent(
name="Test Agent",
description="A test agent",
tools=["search", "code"],
mcp_tools=None,
env_path=".env"
)
with patch("app.controller.task_controller.get_task_lock", return_value=mock_task_lock), \
patch("app.controller.task_controller.load_dotenv"), \
patch("asyncio.run") as mock_run:
response = add_agent(task_id, new_agent)
assert isinstance(response, Response)
assert response.status_code == 204
mock_run.assert_called_once()
def test_start_task_nonexistent_task(self):
"""Test start task with nonexistent task ID."""
task_id = "nonexistent_task"
with patch("app.controller.task_controller.get_task_lock", side_effect=KeyError("Task not found")):
with pytest.raises(KeyError):
start(task_id)
def test_update_task_empty_data(self, mock_task_lock):
"""Test update task with empty task list."""
task_id = "test_task_123"
update_data = UpdateData(task=[])
with patch("app.controller.task_controller.get_task_lock", return_value=mock_task_lock), \
patch("asyncio.run") as mock_run:
response = put(task_id, update_data)
assert isinstance(response, Response)
assert response.status_code == 201
mock_run.assert_called_once()
def test_add_agent_with_mcp_tools(self, mock_task_lock):
"""Test adding agent with MCP tools."""
task_id = "test_task_123"
new_agent = NewAgent(
name="MCP Agent",
description="An agent with MCP tools",
tools=["search"],
mcp_tools={"mcpServers": {"notion": {"config": "test"}}},
env_path=".env"
)
with patch("app.controller.task_controller.get_task_lock", return_value=mock_task_lock), \
patch("app.controller.task_controller.load_dotenv"), \
patch("asyncio.run") as mock_run:
response = add_agent(task_id, new_agent)
assert isinstance(response, Response)
assert response.status_code == 204
mock_run.assert_called_once()
@pytest.mark.integration
class TestTaskControllerIntegration:
"""Integration tests for task controller."""
def test_start_task_endpoint_integration(self, client: TestClient):
"""Test start task endpoint through FastAPI test client."""
task_id = "test_task_123"
with patch("app.controller.task_controller.get_task_lock") as mock_get_lock, \
patch("asyncio.run"):
mock_task_lock = MagicMock()
mock_get_lock.return_value = mock_task_lock
response = client.post(f"/task/{task_id}/start")
assert response.status_code == 201
def test_update_task_endpoint_integration(self, client: TestClient):
"""Test update task endpoint through FastAPI test client."""
task_id = "test_task_123"
update_data = {
"task": [
{"id": "subtask_1", "content": "Updated content 1"},
{"id": "subtask_2", "content": "Updated content 2"}
]
}
with patch("app.controller.task_controller.get_task_lock") as mock_get_lock, \
patch("asyncio.run"):
mock_task_lock = MagicMock()
mock_get_lock.return_value = mock_task_lock
response = client.put(f"/task/{task_id}", json=update_data)
assert response.status_code == 201
def test_take_control_pause_endpoint_integration(self, client: TestClient):
"""Test take control pause endpoint through FastAPI test client."""
task_id = "test_task_123"
control_data = {"action": "pause"}
with patch("app.controller.task_controller.get_task_lock") as mock_get_lock, \
patch("asyncio.run"):
mock_task_lock = MagicMock()
mock_get_lock.return_value = mock_task_lock
response = client.put(f"/task/{task_id}/take-control", json=control_data)
assert response.status_code == 204
def test_take_control_resume_endpoint_integration(self, client: TestClient):
"""Test take control resume endpoint through FastAPI test client."""
task_id = "test_task_123"
control_data = {"action": "resume"}
with patch("app.controller.task_controller.get_task_lock") as mock_get_lock, \
patch("asyncio.run"):
mock_task_lock = MagicMock()
mock_get_lock.return_value = mock_task_lock
response = client.put(f"/task/{task_id}/take-control", json=control_data)
assert response.status_code == 204
def test_add_agent_endpoint_integration(self, client: TestClient):
"""Test add agent endpoint through FastAPI test client."""
task_id = "test_task_123"
agent_data = {
"name": "Test Agent",
"description": "A test agent",
"tools": ["search", "code"],
"mcp_tools": None,
"env_path": ".env"
}
with patch("app.controller.task_controller.get_task_lock") as mock_get_lock, \
patch("app.controller.task_controller.load_dotenv"), \
patch("asyncio.run"):
mock_task_lock = MagicMock()
mock_get_lock.return_value = mock_task_lock
response = client.post(f"/task/{task_id}/add-agent", json=agent_data)
assert response.status_code == 204
@pytest.mark.unit
class TestTaskControllerErrorCases:
"""Test error cases and edge conditions for task controller."""
def test_start_task_async_error(self, mock_task_lock):
"""Test start task when async operation fails."""
task_id = "test_task_123"
with patch("app.controller.task_controller.get_task_lock", return_value=mock_task_lock), \
patch("asyncio.run", side_effect=Exception("Async error")):
with pytest.raises(Exception, match="Async error"):
start(task_id)
def test_update_task_with_invalid_task_content(self, mock_task_lock):
"""Test update task with invalid task content."""
task_id = "test_task_123"
# Create invalid update data that might cause validation errors
update_data = UpdateData(task=[
TaskContent(id="", content=""), # Empty ID and content
TaskContent(id="valid_id", content="Valid content")
])
with patch("app.controller.task_controller.get_task_lock", return_value=mock_task_lock), \
patch("asyncio.run") as mock_run:
# Should handle invalid data gracefully or raise appropriate error
response = put(task_id, update_data)
assert response.status_code == 201
def test_take_control_invalid_action(self):
"""Test take control with invalid action value."""
task_id = "test_task_123"
# This should be caught by Pydantic validation
with pytest.raises((ValueError, TypeError)):
TakeControl(action="invalid_action")
def test_add_agent_env_load_failure(self, mock_task_lock):
"""Test add agent when environment loading fails."""
task_id = "test_task_123"
new_agent = NewAgent(
name="Test Agent",
description="A test agent",
tools=["search"],
mcp_tools=None,
env_path="nonexistent.env"
)
with patch("app.controller.task_controller.get_task_lock", return_value=mock_task_lock), \
patch("app.controller.task_controller.load_dotenv", side_effect=Exception("Env load failed")), \
patch("asyncio.run"):
# Should handle environment load failure gracefully or raise error
with pytest.raises(Exception, match="Env load failed"):
add_agent(task_id, new_agent)
def test_add_agent_with_empty_name(self, mock_task_lock):
"""Test add agent with empty name."""
task_id = "test_task_123"
new_agent = NewAgent(
name="", # Empty name
description="A test agent",
tools=["search"],
mcp_tools=None,
env_path=".env"
)
with patch("app.controller.task_controller.get_task_lock", return_value=mock_task_lock), \
patch("app.controller.task_controller.load_dotenv"), \
patch("asyncio.run"):
# Should handle empty name appropriately
response = add_agent(task_id, new_agent)
assert response.status_code == 204
def test_task_operations_with_concurrent_access(self, mock_task_lock):
"""Test task operations with concurrent access scenarios."""
task_id = "test_task_123"
# Simulate concurrent access by having the task lock be modified during operation
def side_effect():
mock_task_lock.status = "modified_during_operation"
return None
mock_task_lock.put_queue.side_effect = side_effect
with patch("app.controller.task_controller.get_task_lock", return_value=mock_task_lock), \
patch("asyncio.run") as mock_run:
response = start(task_id)
assert response.status_code == 201
@pytest.mark.model_backend
class TestTaskControllerWithLLM:
"""Tests that require LLM backend (marked for selective running)."""
def test_add_agent_with_real_model_integration(self, mock_task_lock):
"""Test adding an agent that requires real model integration."""
task_id = "test_task_123"
new_agent = NewAgent(
name="Real Model Agent",
description="An agent that uses real models",
tools=["search", "code"],
mcp_tools=None,
env_path=".env"
)
# This test would involve real model creation and configuration
# Marked as model_backend test for selective execution
assert True # Placeholder
@pytest.mark.very_slow
def test_full_task_workflow_integration(self):
"""Test complete task workflow from start to completion (very slow test)."""
# This test would run a complete task workflow including agent interactions
# Marked as very_slow for execution only in full test mode
assert True # Placeholder