eigent/backend/tests/app/controller/test_chat_controller.py
Tong Chen 6c827a3d06
refactor: establish Brain-centered architecture and frontend/backend separation foundations (#1597)
Co-authored-by: Douglas <douglas.ym.lai@gmail.com>
Co-authored-by: Douglas Lai <115660088+Douglasymlai@users.noreply.github.com>
2026-05-01 17:03:33 +08:00

628 lines
22 KiB
Python

# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. =========
import os
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import Response
from fastapi.responses import StreamingResponse
from fastapi.testclient import TestClient
from pydantic import ValidationError
from app.controller.chat_controller import (
human_reply,
improve,
install_mcp,
post,
stop,
supplement,
)
from app.exception.exception import UserException
from app.model.chat import Chat, HumanReply, McpServers, Status, SupplementChat
@pytest.mark.unit
class TestChatController:
"""Test cases for chat controller endpoints."""
@pytest.mark.asyncio
async def test_post_chat_endpoint_success(
self,
sample_chat_data,
mock_request,
mock_task_lock,
mock_environment_variables,
):
"""Test successful chat initialization."""
chat_data = Chat(**sample_chat_data)
with (
patch(
"app.controller.chat_controller.get_or_create_task_lock",
return_value=mock_task_lock,
),
patch(
"app.controller.chat_controller.step_solve"
) as mock_step_solve,
patch("app.controller.chat_controller.load_dotenv"),
patch("app.controller.chat_controller.set_current_task_id"),
patch("pathlib.Path.mkdir"),
patch("pathlib.Path.home", return_value=MagicMock()),
):
# Mock async generator
async def mock_generator():
yield "data: test_response\n\n"
yield "data: test_response_2\n\n"
mock_step_solve.return_value = mock_generator()
response = await post(chat_data, mock_request)
assert isinstance(response, StreamingResponse)
assert response.media_type == "text/event-stream"
@pytest.mark.asyncio
async def test_post_chat_sets_environment_variables(
self, sample_chat_data, mock_request, mock_task_lock
):
"""Test that environment variables are properly set."""
chat_data = Chat(**sample_chat_data)
with (
patch(
"app.controller.chat_controller.get_or_create_task_lock",
return_value=mock_task_lock,
),
patch(
"app.controller.chat_controller.step_solve"
) as mock_step_solve,
patch("app.controller.chat_controller.load_dotenv"),
patch("app.controller.chat_controller.set_current_task_id"),
patch(
"app.controller.chat_controller._prepare_browser_for_request",
return_value=True,
),
patch("pathlib.Path.mkdir"),
patch("pathlib.Path.home", return_value=MagicMock()),
patch.dict(os.environ, {}, clear=True),
):
async def mock_generator():
yield "data: test_response\n\n"
mock_step_solve.return_value = mock_generator()
await post(chat_data, mock_request)
# Check environment variables were set
assert os.environ.get("OPENAI_API_KEY") == "test_key"
assert (
os.environ.get("OPENAI_API_BASE_URL")
== "https://api.openai.com/v1"
)
assert os.environ.get("CAMEL_MODEL_LOG_ENABLED") == "true"
assert os.environ.get("browser_port") == "8080"
@pytest.mark.asyncio
async def test_post_chat_sets_cdp_url_when_browser_ready(
self, sample_chat_data, mock_request, mock_task_lock
):
"""Web mode should set EIGENT_CDP_URL after successful browser ensure."""
chat_data = Chat(**sample_chat_data)
mock_request.state = SimpleNamespace()
with (
patch(
"app.controller.chat_controller.get_or_create_task_lock",
return_value=mock_task_lock,
),
patch(
"app.controller.chat_controller.step_solve"
) as mock_step_solve,
patch(
"app.controller.chat_controller.is_cdp_url_available",
return_value=False,
),
patch(
"app.controller.chat_controller.ensure_cdp_browser_endpoint",
return_value="http://127.0.0.1:8080",
),
patch("app.controller.chat_controller.load_dotenv"),
patch("app.controller.chat_controller.set_current_task_id"),
patch("pathlib.Path.mkdir"),
patch("pathlib.Path.home", return_value=MagicMock()),
patch.dict(os.environ, {}, clear=True),
):
async def mock_generator():
yield "data: test_response\n\n"
mock_step_solve.return_value = mock_generator()
await post(chat_data, mock_request)
assert os.environ.get("EIGENT_CDP_URL") == "http://127.0.0.1:8080"
assert os.environ.get("browser_port") == "8080"
assert mock_request.state.browser_available is True
@pytest.mark.asyncio
async def test_post_chat_clears_cdp_url_when_browser_unavailable(
self, sample_chat_data, mock_request, mock_task_lock
):
"""Web mode should mark browser unavailable and clear EIGENT_CDP_URL."""
chat_data = Chat(**sample_chat_data)
mock_request.state = SimpleNamespace()
with (
patch(
"app.controller.chat_controller.get_or_create_task_lock",
return_value=mock_task_lock,
),
patch(
"app.controller.chat_controller.step_solve"
) as mock_step_solve,
patch(
"app.controller.chat_controller.is_cdp_url_available",
return_value=False,
),
patch(
"app.controller.chat_controller.ensure_cdp_browser_endpoint",
return_value=None,
),
patch("app.controller.chat_controller.load_dotenv"),
patch("app.controller.chat_controller.set_current_task_id"),
patch("pathlib.Path.mkdir"),
patch("pathlib.Path.home", return_value=MagicMock()),
patch.dict(
os.environ,
{"EIGENT_CDP_URL": "http://127.0.0.1:9222"},
clear=True,
),
):
async def mock_generator():
yield "data: test_response\n\n"
mock_step_solve.return_value = mock_generator()
await post(chat_data, mock_request)
assert "EIGENT_CDP_URL" not in os.environ
assert mock_request.state.browser_available is False
@pytest.mark.asyncio
async def test_post_chat_preserves_existing_cdp_url(
self, sample_chat_data, mock_request, mock_task_lock
):
chat_data = Chat(**sample_chat_data)
mock_request.state = SimpleNamespace()
with (
patch(
"app.controller.chat_controller.get_or_create_task_lock",
return_value=mock_task_lock,
),
patch(
"app.controller.chat_controller.step_solve"
) as mock_step_solve,
patch(
"app.controller.chat_controller.is_cdp_url_available",
return_value=True,
),
patch(
"app.controller.chat_controller.ensure_cdp_browser_endpoint",
) as mock_ensure_browser,
patch("app.controller.chat_controller.load_dotenv"),
patch("app.controller.chat_controller.set_current_task_id"),
patch("pathlib.Path.mkdir"),
patch("pathlib.Path.home", return_value=MagicMock()),
patch.dict(
os.environ,
{"EIGENT_CDP_URL": "http://worker-17:9222"},
clear=True,
),
):
async def mock_generator():
yield "data: test_response\n\n"
mock_step_solve.return_value = mock_generator()
await post(chat_data, mock_request)
assert os.environ.get("EIGENT_CDP_URL") == "http://worker-17:9222"
assert os.environ.get("browser_port") == "9222"
assert mock_request.state.browser_available is True
mock_ensure_browser.assert_not_called()
def test_improve_chat_success(self, mock_task_lock, mock_request):
"""Test successful chat improvement."""
task_id = "test_task_123"
supplement_data = SupplementChat(question="Improve this code")
mock_task_lock.status = Status.processing
with (
patch(
"app.controller.chat_controller.get_task_lock",
return_value=mock_task_lock,
),
patch("asyncio.run") as mock_run,
):
mock_run.side_effect = lambda coro: coro.close()
response = improve(task_id, supplement_data, mock_request)
assert isinstance(response, Response)
assert response.status_code == 201
assert mock_run.call_count == 2
# put_queue is invoked when creating the coroutine passed to asyncio.run
mock_task_lock.put_queue.assert_called_once()
def test_improve_chat_task_done_resets_to_confirming(
self, mock_task_lock, mock_request
):
"""Test improvement when task is done resets status to confirming."""
task_id = "test_task_123"
supplement_data = SupplementChat(question="Improve this code")
mock_task_lock.status = Status.done
with (
patch(
"app.controller.chat_controller.get_task_lock",
return_value=mock_task_lock,
),
patch("asyncio.run") as mock_run,
):
mock_run.side_effect = lambda coro: coro.close()
response = improve(task_id, supplement_data, mock_request)
assert mock_task_lock.status == Status.confirming
assert isinstance(response, Response)
assert response.status_code == 201
def test_supplement_chat_success(self, mock_task_lock):
"""Test successful chat supplementation."""
task_id = "test_task_123"
supplement_data = SupplementChat(question="Add more details")
mock_task_lock.status = Status.done
with (
patch(
"app.controller.chat_controller.get_task_lock",
return_value=mock_task_lock,
),
patch("asyncio.run") as mock_run,
):
response = supplement(task_id, supplement_data)
assert isinstance(response, Response)
assert response.status_code == 201
mock_run.assert_called_once()
def test_supplement_chat_task_not_done_error(self, mock_task_lock):
"""Test supplementation fails when task is not done."""
task_id = "test_task_123"
supplement_data = SupplementChat(question="Add more details")
mock_task_lock.status = Status.processing
with patch(
"app.controller.chat_controller.get_task_lock",
return_value=mock_task_lock,
):
with pytest.raises(UserException):
supplement(task_id, supplement_data)
def test_stop_chat_success(self, mock_task_lock):
"""Test successful chat stopping."""
task_id = "test_task_123"
with (
patch(
"app.controller.chat_controller.get_task_lock_if_exists",
return_value=mock_task_lock,
),
patch("asyncio.run") as mock_run,
):
mock_run.side_effect = lambda coro: coro.close()
response = stop(task_id)
assert isinstance(response, Response)
assert response.status_code == 204
mock_run.assert_called_once()
def test_human_reply_success(self, mock_task_lock):
"""Test successful human reply."""
task_id = "test_task_123"
reply_data = HumanReply(agent="test_agent", reply="This is my reply")
with (
patch(
"app.controller.chat_controller.get_task_lock",
return_value=mock_task_lock,
),
patch("asyncio.run") as mock_run,
):
response = human_reply(task_id, reply_data)
assert isinstance(response, Response)
assert response.status_code == 201
mock_run.assert_called_once()
def test_install_mcp_success(self, mock_task_lock):
"""Test successful MCP installation."""
task_id = "test_task_123"
mcp_data: McpServers = {
"mcpServers": {"test_server": {"config": "test"}}
}
with (
patch(
"app.controller.chat_controller.get_task_lock",
return_value=mock_task_lock,
),
patch("asyncio.run") as mock_run,
):
response = install_mcp(task_id, mcp_data)
assert isinstance(response, Response)
assert response.status_code == 201
mock_run.assert_called_once()
@pytest.mark.integration
class TestChatControllerIntegration:
"""Integration tests for chat controller."""
def test_chat_endpoint_integration(
self, client: TestClient, sample_chat_data
):
"""Test chat endpoint through FastAPI test client."""
with (
patch(
"app.controller.chat_controller.get_or_create_task_lock"
) as mock_create_lock,
patch(
"app.controller.chat_controller.step_solve"
) as mock_step_solve,
patch("app.controller.chat_controller.load_dotenv"),
patch("app.controller.chat_controller.set_current_task_id"),
patch("pathlib.Path.mkdir"),
patch("pathlib.Path.home", return_value=MagicMock()),
):
mock_task_lock = MagicMock()
mock_task_lock.put_queue = AsyncMock()
mock_create_lock.return_value = mock_task_lock
async def mock_generator():
yield "data: test_response\n\n"
mock_step_solve.return_value = mock_generator()
response = client.post("/chat", json=sample_chat_data)
assert response.status_code == 200
assert (
response.headers["content-type"]
== "text/event-stream; charset=utf-8"
)
def test_improve_chat_endpoint_integration(self, client: TestClient):
"""Test improve chat endpoint through FastAPI test client."""
task_id = "test_task_123"
supplement_data = {"question": "Improve this code"}
with (
patch(
"app.controller.chat_controller.get_task_lock"
) as mock_get_lock,
patch("asyncio.run"),
):
mock_task_lock = MagicMock()
mock_task_lock.status = Status.processing
mock_get_lock.return_value = mock_task_lock
response = client.post(f"/chat/{task_id}", json=supplement_data)
assert response.status_code == 201
def test_supplement_chat_endpoint_integration(self, client: TestClient):
"""Test supplement chat endpoint through FastAPI test client."""
task_id = "test_task_123"
supplement_data = {"question": "Add more details"}
with (
patch(
"app.controller.chat_controller.get_task_lock"
) as mock_get_lock,
patch("asyncio.run"),
):
mock_task_lock = MagicMock()
mock_task_lock.status = Status.done
mock_get_lock.return_value = mock_task_lock
response = client.put(f"/chat/{task_id}", json=supplement_data)
assert response.status_code == 201
def test_stop_chat_endpoint_integration(self, client: TestClient):
"""Test stop chat endpoint through FastAPI test client."""
task_id = "test_task_123"
with (
patch(
"app.controller.chat_controller.get_task_lock"
) as mock_get_lock,
patch("asyncio.run"),
):
mock_task_lock = MagicMock()
mock_get_lock.return_value = mock_task_lock
response = client.delete(f"/chat/{task_id}")
assert response.status_code == 204
def test_human_reply_endpoint_integration(self, client: TestClient):
"""Test human reply endpoint through FastAPI test client."""
task_id = "test_task_123"
reply_data = {"agent": "test_agent", "reply": "This is my reply"}
with (
patch(
"app.controller.chat_controller.get_task_lock"
) as mock_get_lock,
patch("asyncio.run"),
):
mock_task_lock = MagicMock()
mock_get_lock.return_value = mock_task_lock
response = client.post(
f"/chat/{task_id}/human-reply", json=reply_data
)
assert response.status_code == 201
def test_install_mcp_endpoint_integration(self, client: TestClient):
"""Test install MCP endpoint through FastAPI test client."""
task_id = "test_task_123"
mcp_data = {"mcpServers": {"test_server": {"config": "test"}}}
with (
patch(
"app.controller.chat_controller.get_task_lock"
) as mock_get_lock,
patch("asyncio.run"),
):
mock_task_lock = MagicMock()
mock_get_lock.return_value = mock_task_lock
response = client.post(
f"/chat/{task_id}/install-mcp", json=mcp_data
)
assert response.status_code == 201
@pytest.mark.model_backend
class TestChatControllerWithLLM:
"""Tests that require LLM backend (marked for selective running)."""
@pytest.mark.asyncio
async def test_post_with_real_llm_model(
self, sample_chat_data, mock_request
):
"""Test chat endpoint with real LLM model (slow test)."""
# This test would use actual LLM models and should be marked accordingly
Chat(**sample_chat_data)
# Test implementation would involve real model calls
# This is marked as model_backend test for selective execution
assert True # Placeholder
@pytest.mark.very_slow
async def test_full_chat_workflow_with_llm(
self, sample_chat_data, mock_request
):
"""Test complete chat workflow with LLM (very slow test)."""
# This test would run the complete workflow including actual agent interactions
# Marked as very_slow for execution only in full test mode
assert True # Placeholder
@pytest.mark.unit
class TestChatControllerErrorCases:
"""Test error cases and edge conditions."""
@pytest.mark.asyncio
async def test_post_with_invalid_data(self, mock_request):
"""Test chat endpoint with invalid data."""
# Construction itself should raise a validation error due to multiple invalid fields
with pytest.raises((ValueError, TypeError, ValidationError)):
Chat(
task_id="", # Invalid empty task_id
email="invalid_email", # Invalid email format
question="", # Empty question
attaches=[],
model="invalid_model", # Field not defined in model -> triggers error
model_platform="invalid_platform",
api_key="",
api_url="invalid_url",
new_agents=[],
env_path="nonexistent.env",
browser_port=-1, # Invalid port
summary_prompt="",
)
# If future validation moves to endpoint level, keep logic placeholder below.
# (Intentionally not calling post with invalid Chat object since creation fails.)
def test_improve_with_nonexistent_task(self):
"""Test improve endpoint with nonexistent task."""
task_id = "nonexistent_task"
supplement_data = SupplementChat(question="Improve this code")
request = SimpleNamespace()
with patch(
"app.controller.chat_controller.get_task_lock",
side_effect=KeyError("Task not found"),
):
with pytest.raises(KeyError):
improve(task_id, supplement_data, request)
def test_supplement_with_empty_question(self, mock_task_lock):
"""Test supplement endpoint with empty question."""
task_id = "test_task_123"
supplement_data = SupplementChat(question="")
mock_task_lock.status = Status.done
with (
patch(
"app.controller.chat_controller.get_task_lock",
return_value=mock_task_lock,
),
patch("asyncio.run"),
):
# Should handle empty question gracefully or raise appropriate error
response = supplement(task_id, supplement_data)
assert response.status_code == 201 # Or should it be an error?
@pytest.mark.asyncio
async def test_post_environment_setup_failure(
self, sample_chat_data, mock_request
):
"""Test chat endpoint when environment setup fails."""
chat_data = Chat(**sample_chat_data)
with (
patch(
"app.controller.chat_controller.get_or_create_task_lock"
) as mock_create_lock,
patch(
"app.controller.chat_controller.sanitize_env_path",
return_value="/tmp/fake.env",
),
patch(
"app.controller.chat_controller.load_dotenv",
side_effect=Exception("Env load failed"),
),
patch(
"pathlib.Path.mkdir",
side_effect=Exception("Directory creation failed"),
),
):
mock_task_lock = MagicMock()
mock_create_lock.return_value = mock_task_lock
# Should handle environment setup failures gracefully
with pytest.raises(Exception):
await post(chat_data, mock_request)