# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= import os from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch import pytest from fastapi import Response from fastapi.responses import StreamingResponse from fastapi.testclient import TestClient from pydantic import ValidationError from app.controller.chat_controller import ( human_reply, improve, install_mcp, post, stop, supplement, ) from app.exception.exception import UserException from app.model.chat import Chat, HumanReply, McpServers, Status, SupplementChat @pytest.mark.unit class TestChatController: """Test cases for chat controller endpoints.""" @pytest.mark.asyncio async def test_post_chat_endpoint_success( self, sample_chat_data, mock_request, mock_task_lock, mock_environment_variables, ): """Test successful chat initialization.""" chat_data = Chat(**sample_chat_data) with ( patch( "app.controller.chat_controller.get_or_create_task_lock", return_value=mock_task_lock, ), patch( "app.controller.chat_controller.step_solve" ) as mock_step_solve, patch("app.controller.chat_controller.load_dotenv"), patch("app.controller.chat_controller.set_current_task_id"), patch("pathlib.Path.mkdir"), patch("pathlib.Path.home", return_value=MagicMock()), ): # Mock async generator async def mock_generator(): yield "data: test_response\n\n" yield "data: test_response_2\n\n" mock_step_solve.return_value = mock_generator() response = await post(chat_data, mock_request) assert isinstance(response, StreamingResponse) assert response.media_type == "text/event-stream" @pytest.mark.asyncio async def test_post_chat_sets_environment_variables( self, sample_chat_data, mock_request, mock_task_lock ): """Test that environment variables are properly set.""" chat_data = Chat(**sample_chat_data) with ( patch( "app.controller.chat_controller.get_or_create_task_lock", return_value=mock_task_lock, ), patch( "app.controller.chat_controller.step_solve" ) as mock_step_solve, patch("app.controller.chat_controller.load_dotenv"), patch("app.controller.chat_controller.set_current_task_id"), patch( "app.controller.chat_controller._prepare_browser_for_request", return_value=True, ), patch("pathlib.Path.mkdir"), patch("pathlib.Path.home", return_value=MagicMock()), patch.dict(os.environ, {}, clear=True), ): async def mock_generator(): yield "data: test_response\n\n" mock_step_solve.return_value = mock_generator() await post(chat_data, mock_request) # Check environment variables were set assert os.environ.get("OPENAI_API_KEY") == "test_key" assert ( os.environ.get("OPENAI_API_BASE_URL") == "https://api.openai.com/v1" ) assert os.environ.get("CAMEL_MODEL_LOG_ENABLED") == "true" assert os.environ.get("browser_port") == "8080" @pytest.mark.asyncio async def test_post_chat_sets_cdp_url_when_browser_ready( self, sample_chat_data, mock_request, mock_task_lock ): """Web mode should set EIGENT_CDP_URL after successful browser ensure.""" chat_data = Chat(**sample_chat_data) mock_request.state = SimpleNamespace() with ( patch( "app.controller.chat_controller.get_or_create_task_lock", return_value=mock_task_lock, ), patch( "app.controller.chat_controller.step_solve" ) as mock_step_solve, patch( "app.controller.chat_controller.is_cdp_url_available", return_value=False, ), patch( "app.controller.chat_controller.ensure_cdp_browser_endpoint", return_value="http://127.0.0.1:8080", ), patch("app.controller.chat_controller.load_dotenv"), patch("app.controller.chat_controller.set_current_task_id"), patch("pathlib.Path.mkdir"), patch("pathlib.Path.home", return_value=MagicMock()), patch.dict(os.environ, {}, clear=True), ): async def mock_generator(): yield "data: test_response\n\n" mock_step_solve.return_value = mock_generator() await post(chat_data, mock_request) assert os.environ.get("EIGENT_CDP_URL") == "http://127.0.0.1:8080" assert os.environ.get("browser_port") == "8080" assert mock_request.state.browser_available is True @pytest.mark.asyncio async def test_post_chat_clears_cdp_url_when_browser_unavailable( self, sample_chat_data, mock_request, mock_task_lock ): """Web mode should mark browser unavailable and clear EIGENT_CDP_URL.""" chat_data = Chat(**sample_chat_data) mock_request.state = SimpleNamespace() with ( patch( "app.controller.chat_controller.get_or_create_task_lock", return_value=mock_task_lock, ), patch( "app.controller.chat_controller.step_solve" ) as mock_step_solve, patch( "app.controller.chat_controller.is_cdp_url_available", return_value=False, ), patch( "app.controller.chat_controller.ensure_cdp_browser_endpoint", return_value=None, ), patch("app.controller.chat_controller.load_dotenv"), patch("app.controller.chat_controller.set_current_task_id"), patch("pathlib.Path.mkdir"), patch("pathlib.Path.home", return_value=MagicMock()), patch.dict( os.environ, {"EIGENT_CDP_URL": "http://127.0.0.1:9222"}, clear=True, ), ): async def mock_generator(): yield "data: test_response\n\n" mock_step_solve.return_value = mock_generator() await post(chat_data, mock_request) assert "EIGENT_CDP_URL" not in os.environ assert mock_request.state.browser_available is False @pytest.mark.asyncio async def test_post_chat_preserves_existing_cdp_url( self, sample_chat_data, mock_request, mock_task_lock ): chat_data = Chat(**sample_chat_data) mock_request.state = SimpleNamespace() with ( patch( "app.controller.chat_controller.get_or_create_task_lock", return_value=mock_task_lock, ), patch( "app.controller.chat_controller.step_solve" ) as mock_step_solve, patch( "app.controller.chat_controller.is_cdp_url_available", return_value=True, ), patch( "app.controller.chat_controller.ensure_cdp_browser_endpoint", ) as mock_ensure_browser, patch("app.controller.chat_controller.load_dotenv"), patch("app.controller.chat_controller.set_current_task_id"), patch("pathlib.Path.mkdir"), patch("pathlib.Path.home", return_value=MagicMock()), patch.dict( os.environ, {"EIGENT_CDP_URL": "http://worker-17:9222"}, clear=True, ), ): async def mock_generator(): yield "data: test_response\n\n" mock_step_solve.return_value = mock_generator() await post(chat_data, mock_request) assert os.environ.get("EIGENT_CDP_URL") == "http://worker-17:9222" assert os.environ.get("browser_port") == "9222" assert mock_request.state.browser_available is True mock_ensure_browser.assert_not_called() def test_improve_chat_success(self, mock_task_lock, mock_request): """Test successful chat improvement.""" task_id = "test_task_123" supplement_data = SupplementChat(question="Improve this code") mock_task_lock.status = Status.processing with ( patch( "app.controller.chat_controller.get_task_lock", return_value=mock_task_lock, ), patch("asyncio.run") as mock_run, ): mock_run.side_effect = lambda coro: coro.close() response = improve(task_id, supplement_data, mock_request) assert isinstance(response, Response) assert response.status_code == 201 assert mock_run.call_count == 2 # put_queue is invoked when creating the coroutine passed to asyncio.run mock_task_lock.put_queue.assert_called_once() def test_improve_chat_task_done_resets_to_confirming( self, mock_task_lock, mock_request ): """Test improvement when task is done resets status to confirming.""" task_id = "test_task_123" supplement_data = SupplementChat(question="Improve this code") mock_task_lock.status = Status.done with ( patch( "app.controller.chat_controller.get_task_lock", return_value=mock_task_lock, ), patch("asyncio.run") as mock_run, ): mock_run.side_effect = lambda coro: coro.close() response = improve(task_id, supplement_data, mock_request) assert mock_task_lock.status == Status.confirming assert isinstance(response, Response) assert response.status_code == 201 def test_supplement_chat_success(self, mock_task_lock): """Test successful chat supplementation.""" task_id = "test_task_123" supplement_data = SupplementChat(question="Add more details") mock_task_lock.status = Status.done with ( patch( "app.controller.chat_controller.get_task_lock", return_value=mock_task_lock, ), patch("asyncio.run") as mock_run, ): response = supplement(task_id, supplement_data) assert isinstance(response, Response) assert response.status_code == 201 mock_run.assert_called_once() def test_supplement_chat_task_not_done_error(self, mock_task_lock): """Test supplementation fails when task is not done.""" task_id = "test_task_123" supplement_data = SupplementChat(question="Add more details") mock_task_lock.status = Status.processing with patch( "app.controller.chat_controller.get_task_lock", return_value=mock_task_lock, ): with pytest.raises(UserException): supplement(task_id, supplement_data) def test_stop_chat_success(self, mock_task_lock): """Test successful chat stopping.""" task_id = "test_task_123" with ( patch( "app.controller.chat_controller.get_task_lock_if_exists", return_value=mock_task_lock, ), patch("asyncio.run") as mock_run, ): mock_run.side_effect = lambda coro: coro.close() response = stop(task_id) assert isinstance(response, Response) assert response.status_code == 204 mock_run.assert_called_once() def test_human_reply_success(self, mock_task_lock): """Test successful human reply.""" task_id = "test_task_123" reply_data = HumanReply(agent="test_agent", reply="This is my reply") with ( patch( "app.controller.chat_controller.get_task_lock", return_value=mock_task_lock, ), patch("asyncio.run") as mock_run, ): response = human_reply(task_id, reply_data) assert isinstance(response, Response) assert response.status_code == 201 mock_run.assert_called_once() def test_install_mcp_success(self, mock_task_lock): """Test successful MCP installation.""" task_id = "test_task_123" mcp_data: McpServers = { "mcpServers": {"test_server": {"config": "test"}} } with ( patch( "app.controller.chat_controller.get_task_lock", return_value=mock_task_lock, ), patch("asyncio.run") as mock_run, ): response = install_mcp(task_id, mcp_data) assert isinstance(response, Response) assert response.status_code == 201 mock_run.assert_called_once() @pytest.mark.integration class TestChatControllerIntegration: """Integration tests for chat controller.""" def test_chat_endpoint_integration( self, client: TestClient, sample_chat_data ): """Test chat endpoint through FastAPI test client.""" with ( patch( "app.controller.chat_controller.get_or_create_task_lock" ) as mock_create_lock, patch( "app.controller.chat_controller.step_solve" ) as mock_step_solve, patch("app.controller.chat_controller.load_dotenv"), patch("app.controller.chat_controller.set_current_task_id"), patch("pathlib.Path.mkdir"), patch("pathlib.Path.home", return_value=MagicMock()), ): mock_task_lock = MagicMock() mock_task_lock.put_queue = AsyncMock() mock_create_lock.return_value = mock_task_lock async def mock_generator(): yield "data: test_response\n\n" mock_step_solve.return_value = mock_generator() response = client.post("/chat", json=sample_chat_data) assert response.status_code == 200 assert ( response.headers["content-type"] == "text/event-stream; charset=utf-8" ) def test_improve_chat_endpoint_integration(self, client: TestClient): """Test improve chat endpoint through FastAPI test client.""" task_id = "test_task_123" supplement_data = {"question": "Improve this code"} with ( patch( "app.controller.chat_controller.get_task_lock" ) as mock_get_lock, patch("asyncio.run"), ): mock_task_lock = MagicMock() mock_task_lock.status = Status.processing mock_get_lock.return_value = mock_task_lock response = client.post(f"/chat/{task_id}", json=supplement_data) assert response.status_code == 201 def test_supplement_chat_endpoint_integration(self, client: TestClient): """Test supplement chat endpoint through FastAPI test client.""" task_id = "test_task_123" supplement_data = {"question": "Add more details"} with ( patch( "app.controller.chat_controller.get_task_lock" ) as mock_get_lock, patch("asyncio.run"), ): mock_task_lock = MagicMock() mock_task_lock.status = Status.done mock_get_lock.return_value = mock_task_lock response = client.put(f"/chat/{task_id}", json=supplement_data) assert response.status_code == 201 def test_stop_chat_endpoint_integration(self, client: TestClient): """Test stop chat endpoint through FastAPI test client.""" task_id = "test_task_123" with ( patch( "app.controller.chat_controller.get_task_lock" ) as mock_get_lock, patch("asyncio.run"), ): mock_task_lock = MagicMock() mock_get_lock.return_value = mock_task_lock response = client.delete(f"/chat/{task_id}") assert response.status_code == 204 def test_human_reply_endpoint_integration(self, client: TestClient): """Test human reply endpoint through FastAPI test client.""" task_id = "test_task_123" reply_data = {"agent": "test_agent", "reply": "This is my reply"} with ( patch( "app.controller.chat_controller.get_task_lock" ) as mock_get_lock, patch("asyncio.run"), ): mock_task_lock = MagicMock() mock_get_lock.return_value = mock_task_lock response = client.post( f"/chat/{task_id}/human-reply", json=reply_data ) assert response.status_code == 201 def test_install_mcp_endpoint_integration(self, client: TestClient): """Test install MCP endpoint through FastAPI test client.""" task_id = "test_task_123" mcp_data = {"mcpServers": {"test_server": {"config": "test"}}} with ( patch( "app.controller.chat_controller.get_task_lock" ) as mock_get_lock, patch("asyncio.run"), ): mock_task_lock = MagicMock() mock_get_lock.return_value = mock_task_lock response = client.post( f"/chat/{task_id}/install-mcp", json=mcp_data ) assert response.status_code == 201 @pytest.mark.model_backend class TestChatControllerWithLLM: """Tests that require LLM backend (marked for selective running).""" @pytest.mark.asyncio async def test_post_with_real_llm_model( self, sample_chat_data, mock_request ): """Test chat endpoint with real LLM model (slow test).""" # This test would use actual LLM models and should be marked accordingly Chat(**sample_chat_data) # Test implementation would involve real model calls # This is marked as model_backend test for selective execution assert True # Placeholder @pytest.mark.very_slow async def test_full_chat_workflow_with_llm( self, sample_chat_data, mock_request ): """Test complete chat workflow with LLM (very slow test).""" # This test would run the complete workflow including actual agent interactions # Marked as very_slow for execution only in full test mode assert True # Placeholder @pytest.mark.unit class TestChatControllerErrorCases: """Test error cases and edge conditions.""" @pytest.mark.asyncio async def test_post_with_invalid_data(self, mock_request): """Test chat endpoint with invalid data.""" # Construction itself should raise a validation error due to multiple invalid fields with pytest.raises((ValueError, TypeError, ValidationError)): Chat( task_id="", # Invalid empty task_id email="invalid_email", # Invalid email format question="", # Empty question attaches=[], model="invalid_model", # Field not defined in model -> triggers error model_platform="invalid_platform", api_key="", api_url="invalid_url", new_agents=[], env_path="nonexistent.env", browser_port=-1, # Invalid port summary_prompt="", ) # If future validation moves to endpoint level, keep logic placeholder below. # (Intentionally not calling post with invalid Chat object since creation fails.) def test_improve_with_nonexistent_task(self): """Test improve endpoint with nonexistent task.""" task_id = "nonexistent_task" supplement_data = SupplementChat(question="Improve this code") request = SimpleNamespace() with patch( "app.controller.chat_controller.get_task_lock", side_effect=KeyError("Task not found"), ): with pytest.raises(KeyError): improve(task_id, supplement_data, request) def test_supplement_with_empty_question(self, mock_task_lock): """Test supplement endpoint with empty question.""" task_id = "test_task_123" supplement_data = SupplementChat(question="") mock_task_lock.status = Status.done with ( patch( "app.controller.chat_controller.get_task_lock", return_value=mock_task_lock, ), patch("asyncio.run"), ): # Should handle empty question gracefully or raise appropriate error response = supplement(task_id, supplement_data) assert response.status_code == 201 # Or should it be an error? @pytest.mark.asyncio async def test_post_environment_setup_failure( self, sample_chat_data, mock_request ): """Test chat endpoint when environment setup fails.""" chat_data = Chat(**sample_chat_data) with ( patch( "app.controller.chat_controller.get_or_create_task_lock" ) as mock_create_lock, patch( "app.controller.chat_controller.sanitize_env_path", return_value="/tmp/fake.env", ), patch( "app.controller.chat_controller.load_dotenv", side_effect=Exception("Env load failed"), ), patch( "pathlib.Path.mkdir", side_effect=Exception("Directory creation failed"), ), ): mock_task_lock = MagicMock() mock_create_lock.return_value = mock_task_lock # Should handle environment setup failures gracefully with pytest.raises(Exception): await post(chat_data, mock_request)