# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import asyncio import weakref from datetime import datetime, timedelta from unittest.mock import patch import pytest from camel.tasks import Task from app.exception.exception import ProgramException from app.model.chat import Status, SupplementChat, TaskContent, UpdateData from app.service.task import ( Action, ActionAskData, ActionCreateAgentData, ActionImproveData, ActionNewAgent, ActionStartData, ActionSupplementData, ActionTakeControl, ActionTaskStateData, ActionUpdateTaskData, Agents, ImprovePayload, TaskLock, create_task_lock, delete_task_lock, get_camel_task, get_task_lock, process_task, set_process_task, task_index, task_locks, ) @pytest.mark.unit class TestTaskServiceModels: """Test cases for task service data models.""" def test_action_improve_data_creation(self): """Test ActionImproveData model creation.""" payload = ImprovePayload(question="Improve this code") data = ActionImproveData(data=payload) assert data.action == Action.improve assert data.data.question == "Improve this code" assert data.data.attaches == [] assert data.new_task_id is None def test_action_improve_data_with_new_task_id(self): """Test ActionImproveData model creation with new_task_id.""" payload = ImprovePayload(question="Improve this code") data = ActionImproveData(data=payload, new_task_id="task_123") assert data.action == Action.improve assert data.data.question == "Improve this code" assert data.new_task_id == "task_123" def test_action_start_data_creation(self): """Test ActionStartData model creation.""" data = ActionStartData() assert data.action == Action.start def test_action_update_task_data_creation(self): """Test ActionUpdateTaskData model creation.""" update_data = UpdateData( task=[TaskContent(id="task_1", content="Updated content")] ) data = ActionUpdateTaskData(data=update_data) assert data.action == Action.update_task assert len(data.data.task) == 1 assert data.data.task[0].content == "Updated content" def test_action_task_state_data_creation(self): """Test ActionTaskStateData model creation.""" state_data = { "task_id": "test_123", "content": "Test content", "state": "RUNNING", "result": "In progress", "failure_count": 0, } data = ActionTaskStateData(data=state_data) assert data.action == Action.task_state assert data.data["task_id"] == "test_123" assert data.data["failure_count"] == 0 def test_action_ask_data_creation(self): """Test ActionAskData model creation.""" ask_data = { "question": "What should I do next?", "agent": "test_agent", } data = ActionAskData(data=ask_data) assert data.action == Action.ask assert data.data["question"] == "What should I do next?" assert data.data["agent"] == "test_agent" def test_action_create_agent_data_creation(self): """Test ActionCreateAgentData model creation.""" agent_data = { "agent_name": "TestAgent", "agent_id": "agent_123", "tools": ["search", "code"], } data = ActionCreateAgentData(data=agent_data) assert data.action == Action.create_agent assert data.data["agent_name"] == "TestAgent" assert data.data["tools"] == ["search", "code"] def test_action_supplement_data_creation(self): """Test ActionSupplementData model creation.""" supplement = SupplementChat(question="Add more details") data = ActionSupplementData(data=supplement) assert data.action == Action.supplement assert data.data.question == "Add more details" def test_action_take_control_pause(self): """Test ActionTakeControl with pause action.""" data = ActionTakeControl(action=Action.pause) assert data.action == Action.pause def test_action_take_control_resume(self): """Test ActionTakeControl with resume action.""" data = ActionTakeControl(action=Action.resume) assert data.action == Action.resume def test_action_new_agent_creation(self): """Test ActionNewAgent model creation.""" data = ActionNewAgent( name="New Agent", description="A new agent", tools=["search", "code"], mcp_tools={"mcpServers": {"test": {"config": "value"}}}, ) assert data.action == Action.new_agent assert data.name == "New Agent" assert data.description == "A new agent" assert data.tools == ["search", "code"] assert data.mcp_tools is not None def test_agents_enum_values(self): """Test Agents enum contains expected values.""" expected_agents = [ "task_agent", "coordinator_agent", "new_worker_agent", "developer_agent", "browser_agent", "document_agent", "multi_modal_agent", "social_media_agent", "mcp_agent", ] for agent in expected_agents: assert hasattr(Agents, agent) assert Agents[agent].value == agent @pytest.mark.unit class TestTaskLock: """Test cases for TaskLock class.""" def setup_method(self): """Clean up task_locks before each test.""" task_locks.clear() def test_task_lock_creation(self): """Test TaskLock instance creation.""" queue = asyncio.Queue() human_input = {} task_lock = TaskLock("test_123", queue, human_input) assert task_lock.id == "test_123" assert task_lock.status == Status.confirming assert task_lock.active_agent == "" assert task_lock.queue is queue assert task_lock.human_input is human_input assert isinstance(task_lock.created_at, datetime) assert isinstance(task_lock.last_accessed, datetime) assert len(task_lock.background_tasks) == 0 @pytest.mark.asyncio async def test_task_lock_put_queue(self): """Test putting data into task lock queue.""" queue = asyncio.Queue() task_lock = TaskLock("test_123", queue, {}) data = ActionStartData() initial_time = task_lock.last_accessed await asyncio.sleep(0.001) # Small delay to ensure time difference await task_lock.put_queue(data) # Should update last_accessed time assert task_lock.last_accessed > initial_time # Should be able to get the data from queue retrieved_data = await task_lock.get_queue() assert retrieved_data == data @pytest.mark.asyncio async def test_task_lock_get_queue(self): """Test getting data from task lock queue.""" queue = asyncio.Queue() task_lock = TaskLock("test_123", queue, {}) data = ActionStartData() # Put data first await queue.put(data) initial_time = task_lock.last_accessed await asyncio.sleep(0.001) # Small delay to ensure time difference retrieved_data = await task_lock.get_queue() # Should update last_accessed time assert task_lock.last_accessed > initial_time assert retrieved_data == data @pytest.mark.asyncio async def test_task_lock_human_input_operations(self): """Test human input operations.""" task_lock = TaskLock("test_123", asyncio.Queue(), {}) agent_name = "test_agent" # Add human input listener task_lock.add_human_input_listen(agent_name) assert agent_name in task_lock.human_input # Put and get human input await task_lock.put_human_input(agent_name, "user response") response = await task_lock.get_human_input(agent_name) assert response == "user response" @pytest.mark.asyncio async def test_task_lock_background_task_management(self): """Test background task management.""" task_lock = TaskLock("test_123", asyncio.Queue(), {}) async def dummy_task(): await asyncio.sleep(0.1) return "completed" task = asyncio.create_task(dummy_task()) task_lock.add_background_task(task) # Task should be in background_tasks assert task in task_lock.background_tasks # Wait for task to complete await task # Task should be automatically removed after completion # Note: This might need a small delay for the callback to execute await asyncio.sleep(0.01) assert task not in task_lock.background_tasks @pytest.mark.asyncio async def test_task_lock_cleanup(self): """Test task lock cleanup functionality.""" task_lock = TaskLock("test_123", asyncio.Queue(), {}) # Create some background tasks async def long_running_task(): await asyncio.sleep(10) # Long running task task1 = asyncio.create_task(long_running_task()) task2 = asyncio.create_task(long_running_task()) task_lock.add_background_task(task1) task_lock.add_background_task(task2) assert len(task_lock.background_tasks) == 2 # Cleanup should cancel all tasks await task_lock.cleanup() assert len(task_lock.background_tasks) == 0 assert task1.cancelled() assert task2.cancelled() @pytest.mark.unit class TestTaskLockManagement: """Test cases for task lock management functions.""" def setup_method(self): """Clean up task_locks before each test.""" task_locks.clear() def test_create_task_lock_success(self): """Test successful task lock creation.""" task_id = "test_123" task_lock = create_task_lock(task_id) assert task_lock.id == task_id assert task_id in task_locks assert task_locks[task_id] is task_lock def test_create_task_lock_already_exists(self): """Test creating task lock that already exists.""" task_id = "test_123" create_task_lock(task_id) # Should raise exception when trying to create duplicate with pytest.raises(ProgramException, match="Task already exists"): create_task_lock(task_id) def test_get_task_lock_success(self): """Test successful task lock retrieval.""" task_id = "test_123" created_lock = create_task_lock(task_id) retrieved_lock = get_task_lock(task_id) assert retrieved_lock is created_lock def test_get_task_lock_not_found(self): """Test getting task lock that doesn't exist.""" with pytest.raises(ProgramException, match="Task not found"): get_task_lock("nonexistent_task") @pytest.mark.asyncio async def test_delete_task_lock_success(self): """Test successful task lock deletion.""" task_id = "test_123" task_lock = create_task_lock(task_id) # Add some background tasks async def dummy_task(): await asyncio.sleep(1) task = asyncio.create_task(dummy_task()) task_lock.add_background_task(task) # Delete should clean up and remove await delete_task_lock(task_id) assert task_id not in task_locks assert task.cancelled() @pytest.mark.asyncio async def test_delete_task_lock_not_found(self): """Test deleting task lock that doesn't exist.""" with pytest.raises(ProgramException, match="Task not found"): await delete_task_lock("nonexistent_task") @pytest.mark.unit class TestCamelTaskManagement: """Test cases for CAMEL task management functions.""" def setup_method(self): """Clean up task_index before each test.""" task_index.clear() def test_get_camel_task_direct_match(self): """Test getting CAMEL task with direct ID match.""" task = Task(content="Test task", id="test_123") tasks = [task] result = get_camel_task("test_123", tasks) assert result is task def test_get_camel_task_in_subtasks(self): """Test getting CAMEL task from subtasks.""" subtask = Task(content="Subtask", id="subtask_123") parent_task = Task(content="Parent task", id="parent_123") parent_task.add_subtask(subtask) tasks = [parent_task] result = get_camel_task("subtask_123", tasks) assert result is subtask def test_get_camel_task_not_found(self): """Test getting CAMEL task that doesn't exist.""" task = Task(content="Test task", id="test_123") tasks = [task] result = get_camel_task("nonexistent_task", tasks) assert result is None def test_get_camel_task_from_cache(self): """Test getting CAMEL task from weak reference cache.""" task = Task(content="Test task", id="test_123") task_index["test_123"] = weakref.ref(task) result = get_camel_task("test_123", []) assert result is task def test_get_camel_task_dead_reference(self): """Test getting CAMEL task with dead weak reference.""" task = Task(content="Test task", id="test_123") task_ref = weakref.ref(task) task_index["test_123"] = task_ref # Delete the original task to make the weak reference dead del task # Should rebuild index and return None since task is not in tasks list result = get_camel_task("test_123", []) assert result is None assert "test_123" not in task_index def test_get_camel_task_rebuilds_index(self): """Test that get_camel_task rebuilds the index.""" task1 = Task(content="Task 1", id="task_1") task2 = Task(content="Task 2", id="task_2") tasks = [task1, task2] # Index should be empty initially assert len(task_index) == 0 # Getting a task should rebuild the index result = get_camel_task("task_2", tasks) assert result is task2 assert len(task_index) == 2 assert "task_1" in task_index assert "task_2" in task_index @pytest.mark.unit class TestProcessTaskContext: """Test cases for process task context management.""" def test_set_process_task_context(self): """Test setting process task context.""" process_task_id = "test_task_123" with set_process_task(process_task_id): assert process_task.get() == process_task_id def test_process_task_context_reset(self): """Test that process task context is reset after exiting.""" process_task_id = "test_task_123" # Set initial context initial_token = process_task.set("initial_task") try: with set_process_task(process_task_id): assert process_task.get() == process_task_id # Should be reset to initial value assert process_task.get() == "initial_task" finally: process_task.reset(initial_token) def test_nested_process_task_context(self): """Test nested process task contexts.""" with set_process_task("outer_task"): assert process_task.get() == "outer_task" with set_process_task("inner_task"): assert process_task.get() == "inner_task" # Should restore outer context assert process_task.get() == "outer_task" @pytest.mark.unit class TestPeriodicCleanup: """Test cases for periodic cleanup functionality.""" def setup_method(self): """Clean up task_locks before each test.""" task_locks.clear() @pytest.mark.asyncio async def test_periodic_cleanup_removes_stale_tasks(self): """Test that periodic cleanup removes stale task locks.""" # Create a task lock with old last_accessed time task_lock = create_task_lock("stale_task") task_lock.last_accessed = datetime.now() - timedelta(hours=3) # Create a fresh task lock fresh_lock = create_task_lock("fresh_task") fresh_lock.last_accessed = datetime.now() assert len(task_locks) == 2 # Directly call the cleanup logic once # instead of using the periodic function cutoff_time = datetime.now() - timedelta( hours=2 ) # Tasks older than 2 hours are stale to_delete = [] for task_id, lock in list(task_locks.items()): if lock.last_accessed < cutoff_time: to_delete.append(task_id) for task_id in to_delete: await delete_task_lock(task_id) # Stale task should be removed, fresh task should remain assert "stale_task" not in task_locks assert "fresh_task" in task_locks @pytest.mark.asyncio async def test_periodic_cleanup_handles_exceptions(self): """Test that periodic cleanup handles exceptions gracefully.""" import app.service.task as task_module # Create a stale task lock task_lock = create_task_lock("test_task") task_lock.last_accessed = datetime.now() - timedelta(hours=3) # Mock delete_task_lock to raise exception and call through module with ( patch.object( task_module, "delete_task_lock", side_effect=Exception("Test error"), ), patch.object(task_module, "logger") as mock_logger, ): # Simulate what _periodic_cleanup does when encountering an error try: await task_module.delete_task_lock("test_task") except Exception as e: task_module.logger.error(f"Error in periodic cleanup: {e}") # Should have logged the error mock_logger.error.assert_called() @pytest.mark.integration class TestTaskServiceIntegration: """Integration tests for task service components.""" def setup_method(self): """Clean up before each test.""" task_locks.clear() task_index.clear() @pytest.mark.asyncio async def test_full_task_lifecycle(self): """Test complete task lifecycle from creation to deletion.""" task_id = "integration_test_123" # Create task lock task_lock = create_task_lock(task_id) assert task_lock.id == task_id # Add human input listener agent_name = "test_agent" task_lock.add_human_input_listen(agent_name) # Test queue operations improve_data = ActionImproveData( data=ImprovePayload(question="Improve this") ) await task_lock.put_queue(improve_data) retrieved_data = await task_lock.get_queue() assert retrieved_data.action == Action.improve assert retrieved_data.data.question == "Improve this" # Test human input operations await task_lock.put_human_input(agent_name, "User response") user_response = await task_lock.get_human_input(agent_name) assert user_response == "User response" # Test background task management async def test_background_task(): await asyncio.sleep(0.1) return "done" bg_task = asyncio.create_task(test_background_task()) task_lock.add_background_task(bg_task) await bg_task # Clean up await delete_task_lock(task_id) assert task_id not in task_locks @pytest.mark.asyncio async def test_multiple_task_locks_management(self): """Test managing multiple task locks simultaneously.""" task_ids = ["task_1", "task_2", "task_3"] # Create multiple task locks task_locks_created = [] for task_id in task_ids: task_lock = create_task_lock(task_id) task_locks_created.append(task_lock) assert len(task_locks) == 3 # Test each task lock independently for i, task_id in enumerate(task_ids): task_lock = get_task_lock(task_id) assert task_lock is task_locks_created[i] # Test queue operations data = ActionStartData() await task_lock.put_queue(data) retrieved_data = await task_lock.get_queue() assert retrieved_data.action == Action.start # Clean up all task locks for task_id in task_ids: await delete_task_lock(task_id) assert len(task_locks) == 0 def test_complex_camel_task_hierarchy(self): """Test CAMEL task retrieval in complex hierarchy.""" # Create complex task hierarchy root_task = Task(content="Root task", id="root") level1_task1 = Task(content="Level 1 Task 1", id="level1_1") level1_task2 = Task(content="Level 1 Task 2", id="level1_2") level2_task1 = Task(content="Level 2 Task 1", id="level2_1") level2_task2 = Task(content="Level 2 Task 2", id="level2_2") root_task.add_subtask(level1_task1) root_task.add_subtask(level1_task2) level1_task1.add_subtask(level2_task1) level1_task2.add_subtask(level2_task2) tasks = [root_task] # Test retrieval at different levels assert get_camel_task("root", tasks) is root_task assert get_camel_task("level1_1", tasks) is level1_task1 assert get_camel_task("level1_2", tasks) is level1_task2 assert get_camel_task("level2_1", tasks) is level2_task1 assert get_camel_task("level2_2", tasks) is level2_task2 # Test non-existent task assert get_camel_task("nonexistent", tasks) is None @pytest.mark.model_backend class TestTaskServiceWithLLM: """Tests that require LLM backend (marked for selective running).""" @pytest.mark.asyncio async def test_task_with_real_camel_tasks(self): """Test task service with real CAMEL task integration.""" # This test would use real CAMEL task objects and workflows # Marked as model_backend test for selective execution assert True # Placeholder @pytest.mark.very_slow async def test_full_workflow_with_cleanup(self): """Test complete workflow including periodic cleanup (very slow test).""" # This test would run the complete workflow including periodic cleanup # Marked as very_slow for execution only in full test mode assert True # Placeholder