mirror of
https://github.com/intari/search2_chatgpt.git
synced 2025-04-25 16:09:11 +00:00
Refactor tests in test_app.py and test_indexer.py to enhance mocking strategies and improve error handling. Update mock configurations for session and file reading tests, ensuring accurate simulation of responses. Adjust health check tests to verify proper handling of Meilisearch availability. Improve clarity and consistency in argument passing across various test cases.
This commit is contained in:
parent
b48c6bc3ca
commit
fbb514f2dd
2 changed files with 214 additions and 164 deletions
backend/tests
|
@ -5,47 +5,40 @@ from unittest.mock import patch, MagicMock
|
|||
import os
|
||||
import requests # <-- Добавлен импорт
|
||||
|
||||
# Мокирование load_dotenv (остается)
|
||||
# Мокирование load_dotenv (выполняется до импорта app)
|
||||
patcher_dotenv_app = patch('dotenv.load_dotenv', return_value=True)
|
||||
patcher_dotenv_app.start()
|
||||
|
||||
# Импортируем app и get_search_session ПОСЛЕ старта патчера dotenv
|
||||
from backend.app import app, get_search_session
|
||||
|
||||
# Фикстура для мокированной сессии (теперь будет использоваться в dependency_overrides)
|
||||
# Фикстура для мокированной сессии (scope="function" по умолчанию)
|
||||
@pytest.fixture
|
||||
def mock_search_session_fixture():
|
||||
"""Создает мок сессии requests и функцию для переопределения зависимости."""
|
||||
mock_session_instance = MagicMock(spec=requests.Session)
|
||||
|
||||
# Настраиваем мок для POST запроса на /search
|
||||
# Настраиваем моки ответов Meilisearch
|
||||
mock_response_search_ok = MagicMock(spec=Response)
|
||||
mock_response_search_ok.status_code = 200
|
||||
mock_response_search_ok.json.return_value = {
|
||||
"hits": [
|
||||
{"id": "test.txt", "_formatted": {"id": "test.txt", "content": "Это <em>тест</em>"}},
|
||||
{"id": "another.pdf", "_formatted": {"id": "another.pdf", "content": "Еще один <em>тест</em>овый файл"}}
|
||||
], "estimatedTotalHits": 2 # Добавим обязательные поля для Meili v1+
|
||||
],
|
||||
"query": "тест", # Добавим обязательные поля для Meili v1+
|
||||
"processingTimeMs": 10,
|
||||
"limit": 20,
|
||||
"offset": 0,
|
||||
"estimatedTotalHits": 2
|
||||
}
|
||||
mock_response_search_ok.raise_for_status.return_value = None # Успешный запрос не вызывает исключений
|
||||
mock_response_search_ok.raise_for_status.return_value = None # Успешный запрос
|
||||
|
||||
# Настраиваем мок для GET запроса на /health
|
||||
mock_response_health_ok = MagicMock(spec=Response)
|
||||
mock_response_health_ok.status_code = 200
|
||||
mock_response_health_ok.json.return_value = {"status": "available"}
|
||||
mock_response_health_ok.raise_for_status.return_value = None
|
||||
|
||||
# Мок для ошибки Meili при поиске
|
||||
mock_response_search_err = MagicMock(spec=Response)
|
||||
mock_response_search_err.status_code = 503 # Или 500, в зависимости от ошибки Meili
|
||||
http_error = requests.exceptions.RequestException("Simulated Meili Connection Error")
|
||||
mock_response_search_err.raise_for_status.side_effect = http_error
|
||||
|
||||
# Мок для ошибки Meili при health check
|
||||
mock_response_health_err = MagicMock(spec=Response)
|
||||
mock_response_health_err.status_code = 503
|
||||
health_http_error = requests.exceptions.RequestException("Simulated Meili Health Error")
|
||||
mock_response_health_err.raise_for_status.side_effect = health_http_error
|
||||
|
||||
|
||||
# Используем side_effect для разных URL и методов
|
||||
def side_effect_post(*args, **kwargs):
|
||||
|
@ -53,42 +46,54 @@ def mock_search_session_fixture():
|
|||
if 'search' in url:
|
||||
query = kwargs.get('json', {}).get('q')
|
||||
if query == "ошибка_сети": # Имитируем ошибку сети при поиске
|
||||
# Имитируем ошибку RequestException, которая должна привести к 503 в app.py
|
||||
raise requests.exceptions.RequestException("Simulated network error during search")
|
||||
else: # Успешный поиск
|
||||
return mock_response_search_ok
|
||||
return MagicMock(status_code=404) # Поведение по умолчанию
|
||||
# Возвращаем 404 для любых других POST запросов к моку
|
||||
default_resp = MagicMock(status_code=404)
|
||||
default_resp.raise_for_status.side_effect = requests.exceptions.HTTPError("Not Found in Mock")
|
||||
return default_resp
|
||||
|
||||
def side_effect_get(*args, **kwargs):
|
||||
# Этот side_effect будет переопределен в тестах health_check
|
||||
url = args[0]
|
||||
if 'health' in url:
|
||||
# Имитируем ситуацию, когда health check падает с ошибкой сети
|
||||
# Чтобы тест test_health_check проверял статус 'недоступен'
|
||||
# raise requests.exceptions.RequestException("Simulated network error during health")
|
||||
# ИЛИ Имитируем успешный ответ, чтобы тест проверял статус 'доступен'
|
||||
return mock_response_health_ok # <--- ИЗМЕНИ ЭТУ СТРОКУ, если хочешь проверить другой сценарий health
|
||||
return MagicMock(status_code=404)
|
||||
# По умолчанию имитируем успех для health
|
||||
return mock_response_health_ok
|
||||
# Возвращаем 404 для других GET
|
||||
default_resp = MagicMock(status_code=404)
|
||||
default_resp.raise_for_status.side_effect = requests.exceptions.HTTPError("Not Found in Mock")
|
||||
return default_resp
|
||||
|
||||
mock_session_instance.post.side_effect = side_effect_post
|
||||
mock_session_instance.get.side_effect = side_effect_get
|
||||
|
||||
# Возвращаем функцию, которая будет вызвана FastAPI вместо get_search_session
|
||||
# Функция, которая будет возвращать наш мок вместо реальной сессии
|
||||
def override_get_search_session():
|
||||
return mock_session_instance
|
||||
|
||||
yield override_get_search_session, mock_session_instance # Возвращаем и функцию, и сам мок для проверок вызовов
|
||||
# Возвращаем и функцию переопределения, и сам мок для проверок
|
||||
yield override_get_search_session, mock_session_instance
|
||||
|
||||
# Останавливаем патчер dotenv после тестов модуля
|
||||
patcher_dotenv_app.stop()
|
||||
# Останавливаем патчер dotenv один раз после всех тестов модуля
|
||||
# (Технически, лучше бы это было в фикстуре с scope="module", но пока оставим так)
|
||||
# Важно: это остановит ПАТЧЕР, а не фикстуру
|
||||
try:
|
||||
patcher_dotenv_app.stop()
|
||||
except RuntimeError: # Если уже остановлен
|
||||
pass
|
||||
|
||||
|
||||
# Фикстура для тестового клиента с переопределенной зависимостью
|
||||
@pytest.fixture(scope="module")
|
||||
# Фикстура для тестового клиента (scope="function" по умолчанию)
|
||||
@pytest.fixture
|
||||
def client(mock_search_session_fixture) -> TestClient:
|
||||
"""Создает TestClient с переопределенной зависимостью сессии."""
|
||||
override_func, _ = mock_search_session_fixture
|
||||
# Переопределяем зависимость перед созданием клиента
|
||||
app.dependency_overrides[get_search_session] = override_func
|
||||
# Создаем клиент для теста
|
||||
yield TestClient(app)
|
||||
# Очищаем переопределение после тестов
|
||||
# Очищаем переопределение после теста, чтобы не влиять на другие
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
|
@ -96,36 +101,43 @@ def client(mock_search_session_fixture) -> TestClient:
|
|||
|
||||
def test_search_success(client: TestClient, mock_search_session_fixture):
|
||||
"""Тестирует успешный поиск."""
|
||||
_, mock_session = mock_search_session_fixture
|
||||
_, mock_session = mock_search_session_fixture # Получаем сам мок для assert'ов
|
||||
response = client.get("/search?q=тест")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "results" in data
|
||||
assert len(data["results"]) == 2
|
||||
assert data["results"][0]["id"] == "test.txt"
|
||||
# Проверяем, что был вызван POST к MeiliSearch
|
||||
# Проверяем, что был вызван POST к MeiliSearch с правильными параметрами
|
||||
mock_session.post.assert_called_once()
|
||||
# Можно проверить и аргументы вызова, если нужно
|
||||
call_args, call_kwargs = mock_session.post.call_args
|
||||
assert call_args[0].endswith("/indexes/documents/search")
|
||||
assert call_kwargs['json']['q'] == 'тест'
|
||||
|
||||
|
||||
def test_search_empty_query(client: TestClient):
|
||||
"""Тестирует поиск с пустым запросом (FastAPI вернет 422)."""
|
||||
# Теперь не должно быть сетевого вызова, FastAPI вернет ошибку валидации
|
||||
# Здесь не должно быть вызова мока сессии, так как FastAPI отклонит запрос раньше
|
||||
response = client.get("/search?q=")
|
||||
assert response.status_code == 422 # Ошибка валидации FastAPI
|
||||
|
||||
|
||||
def test_search_meili_error(client: TestClient, mock_search_session_fixture):
|
||||
"""Тестирует обработку ошибки сети при обращении к Meilisearch."""
|
||||
# Используем 'ошибка_сети', чтобы вызвать RequestException в моке
|
||||
_, mock_session = mock_search_session_fixture
|
||||
# Используем 'ошибка_сети', чтобы вызвать RequestException в моке side_effect_post
|
||||
response = client.get("/search?q=ошибка_сети")
|
||||
# Ожидаем 503, так как app.py ловит RequestException
|
||||
# Ожидаем 503, так как app.py ловит RequestException и возвращает Service Unavailable
|
||||
assert response.status_code == 503
|
||||
assert "Сервис поиска временно недоступен" in response.json()["detail"]
|
||||
# Проверяем, что post был вызван
|
||||
mock_session.post.assert_called_once()
|
||||
|
||||
|
||||
def test_get_file_not_found(client: TestClient):
|
||||
"""Тестирует запрос несуществующего файла."""
|
||||
# Мокируем os.path.exists, чтобы имитировать отсутствие файла
|
||||
# Мокируем os.path.exists внутри эндпоинта /files/{filename}
|
||||
with patch("backend.app.os.path.exists", return_value=False):
|
||||
response = client.get("/files/nonexistent.txt")
|
||||
assert response.status_code == 404
|
||||
|
@ -133,36 +145,35 @@ def test_get_file_not_found(client: TestClient):
|
|||
|
||||
|
||||
def test_get_file_invalid_name_slash(client: TestClient):
|
||||
"""Тестирует запрос файла с '/' в имени."""
|
||||
# Эта проверка должна срабатывать в FastAPI
|
||||
"""Тестирует запрос файла с '/' в имени (должен вернуть 400)."""
|
||||
# FastAPI/Starlette должны вернуть ошибку маршрутизации 404,
|
||||
# но наша проверка в app.py должна отловить это раньше и вернуть 400.
|
||||
response = client.get("/files/subdir/secret.txt")
|
||||
# Ожидаем 400 из-за проверки "/" in filename
|
||||
assert response.status_code == 400
|
||||
assert response.json()["detail"] == "Некорректное имя файла"
|
||||
|
||||
def test_get_file_invalid_name_dotdot(client: TestClient):
|
||||
"""Тестирует запрос файла с '..' в имени."""
|
||||
# Попробуем этот запрос, он все еще может нормализоваться клиентом,
|
||||
# но если нет - проверка FastAPI должна вернуть 400.
|
||||
# Если клиент нормализует, ожидаем 404 или 403 в зависимости от дальнейшей логики.
|
||||
"""Тестирует запрос файла с '..' в имени (должен вернуть 400)."""
|
||||
# Ожидаем 400 из-за проверки ".." in filename в app.py
|
||||
response = client.get("/files/../secret.txt")
|
||||
# Ожидаем 400 от FastAPI проверки, если она сработает
|
||||
# или 404 если имя нормализовалось и файл не найден (без мока os.path)
|
||||
# или 403 если имя нормализовалось и вышло за пределы корня (этот тест не проверяет)
|
||||
assert response.status_code in [400, 404] # Допускаем оба варианта из-за неопределенности нормализации
|
||||
assert response.status_code == 400
|
||||
assert response.json()["detail"] == "Некорректное имя файла"
|
||||
|
||||
|
||||
def test_health_check_meili_ok(client: TestClient, mock_search_session_fixture):
|
||||
"""Тестирует эндпоинт /health, когда Meilisearch доступен."""
|
||||
override_func, mock_session = mock_search_session_fixture
|
||||
|
||||
# Убедимся, что мок GET возвращает успешный ответ
|
||||
# Явно настраиваем мок GET для возврата успешного ответа health
|
||||
mock_response_health_ok = MagicMock(spec=Response)
|
||||
mock_response_health_ok.status_code = 200
|
||||
mock_response_health_ok.json.return_value = {"status": "available"}
|
||||
mock_response_health_ok.raise_for_status.return_value = None
|
||||
mock_session.get.side_effect = lambda *args, **kwargs: mock_response_health_ok if 'health' in args[0] else MagicMock(status_code=404)
|
||||
|
||||
# Переопределяем зависимость для этого теста
|
||||
# Переопределяем зависимость *только для этого теста*, используя фикстуру client
|
||||
# Фикстура client сама очистит override после теста
|
||||
app.dependency_overrides[get_search_session] = override_func
|
||||
|
||||
response = client.get("/health")
|
||||
|
@ -171,18 +182,17 @@ def test_health_check_meili_ok(client: TestClient, mock_search_session_fixture):
|
|||
assert data["status"] == "ok"
|
||||
assert data["meilisearch_status"] == "доступен" # Ожидаем "доступен"
|
||||
mock_session.get.assert_called_once()
|
||||
|
||||
app.dependency_overrides.clear() # Очищаем после теста
|
||||
# app.dependency_overrides.clear() <-- Очистка происходит в фикстуре client
|
||||
|
||||
|
||||
def test_health_check_meili_fail(client: TestClient, mock_search_session_fixture):
|
||||
"""Тестирует эндпоинт /health, когда Meilisearch недоступен."""
|
||||
override_func, mock_session = mock_search_session_fixture
|
||||
|
||||
# Убедимся, что мок GET вызывает ошибку сети
|
||||
# Явно настраиваем мок GET для вызова ошибки RequestException
|
||||
mock_session.get.side_effect = requests.exceptions.RequestException("Simulated health check network error")
|
||||
|
||||
# Переопределяем зависимость для этого теста
|
||||
# Переопределяем зависимость *только для этого теста*
|
||||
app.dependency_overrides[get_search_session] = override_func
|
||||
|
||||
response = client.get("/health")
|
||||
|
@ -190,5 +200,4 @@ def test_health_check_meili_fail(client: TestClient, mock_search_session_fixture
|
|||
data = response.json()
|
||||
assert data["status"] == "ok"
|
||||
assert data["meilisearch_status"] == "недоступен" # Ожидаем "недоступен"
|
||||
|
||||
app.dependency_overrides.clear() # Очищаем после теста
|
||||
# app.dependency_overrides.clear() <-- Очистка происходит в фикстуре client
|
||||
|
|
|
@ -2,17 +2,23 @@ import pytest
|
|||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock, mock_open
|
||||
import os
|
||||
import time # Добавим для мокирования time.time
|
||||
|
||||
# Мокирование load_dotenv (остается)
|
||||
# Мокирование load_dotenv (выполняется до импорта indexer)
|
||||
patcher_dotenv_indexer = patch('dotenv.load_dotenv', return_value=True)
|
||||
patcher_dotenv_indexer.start()
|
||||
|
||||
# Импортируем модуль indexer ПОСЛЕ старта патчера dotenv
|
||||
from backend import indexer
|
||||
|
||||
# Фикстура для остановки патчера dotenv после всех тестов модуля
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def stop_indexer_patch():
|
||||
yield
|
||||
patcher_dotenv_indexer.stop()
|
||||
try:
|
||||
patcher_dotenv_indexer.stop()
|
||||
except RuntimeError: # Если уже остановлен
|
||||
pass
|
||||
|
||||
|
||||
# --- Тесты для функций извлечения текста ---
|
||||
|
@ -20,177 +26,198 @@ def stop_indexer_patch():
|
|||
def test_extract_text_from_txt_success():
|
||||
"""Тестирует успешное чтение UTF-8 TXT файла."""
|
||||
mock_content = "Привет, мир!"
|
||||
# Патчим open, используемый внутри read_text
|
||||
m_open = mock_open(read_data=mock_content.encode('utf-8'))
|
||||
with patch('pathlib.Path.open', m_open):
|
||||
# Теперь Path("dummy.txt").read_text() будет использовать наш мок open
|
||||
result = indexer.extract_text_from_txt(Path("dummy.txt"))
|
||||
# Мокируем напрямую read_text у класса Path
|
||||
with patch.object(Path, 'read_text', return_value=mock_content) as mock_read:
|
||||
# Создаем фиктивный объект Path для вызова функции
|
||||
dummy_path = Path("dummy.txt")
|
||||
result = indexer.extract_text_from_txt(dummy_path)
|
||||
assert result == mock_content
|
||||
# Проверяем, что open был вызван с правильными параметрами для read_text
|
||||
m_open.assert_called_once_with('r', encoding='utf-8', errors=None) # read_text вызывает open с 'r'
|
||||
|
||||
# Проверяем, что read_text был вызван с кодировкой utf-8
|
||||
mock_read.assert_called_once_with(encoding='utf-8')
|
||||
|
||||
def test_extract_text_from_txt_cp1251():
|
||||
"""Тестирует чтение TXT файла в CP1251 после неудачи с UTF-8."""
|
||||
mock_content_cp1251 = "Тест CP1251".encode('cp1251')
|
||||
# Имитируем ошибки и успешное чтение через side_effect для open
|
||||
m_open = mock_open()
|
||||
handle_mock = m_open() # Получаем мок файлового дескриптора
|
||||
|
||||
# Определяем поведение read() для разных кодировок
|
||||
def read_side_effect(*args, **kwargs):
|
||||
# Первый вызов (UTF-8) должен вызвать ошибку при decode
|
||||
# Второй вызов (CP1251) должен вернуть байты
|
||||
# Третий (Latin-1) - тоже байты на всякий случай
|
||||
if handle_mock.encoding == 'utf-8':
|
||||
# Чтобы вызвать UnicodeDecodeError внутри read_text, вернем байты, которые не декодируются
|
||||
return b'\xd2\xe5\xf1\xf2' # "Тест" в cp1251, вызовет ошибку в utf-8
|
||||
elif handle_mock.encoding == 'cp1251':
|
||||
return mock_content_cp1251
|
||||
else: # latin-1
|
||||
return mock_content_cp1251
|
||||
|
||||
handle_mock.read.side_effect = read_side_effect
|
||||
|
||||
# Патчим сам open
|
||||
with patch('pathlib.Path.open', m_open):
|
||||
result = indexer.extract_text_from_txt(Path("dummy_cp1251.txt"))
|
||||
assert result == "Тест CP1251"
|
||||
# Проверяем, что open вызывался дважды (для utf-8 и cp1251)
|
||||
assert m_open.call_count == 2
|
||||
assert m_open.call_args_list[0][1]['encoding'] == 'utf-8'
|
||||
assert m_open.call_args_list[1][1]['encoding'] == 'cp1251'
|
||||
mock_content_cp1251_str = "Тест CP1251"
|
||||
# Настраиваем side_effect для имитации попыток чтения
|
||||
mock_read_text = MagicMock(name='read_text_mock')
|
||||
mock_read_text.side_effect = [
|
||||
# Ошибка при первой попытке (UTF-8)
|
||||
UnicodeDecodeError('utf-8', b'\xd2\xe5\xf1\xf2', 0, 1, 'invalid start byte'),
|
||||
# Успешный результат при второй попытке (CP1251)
|
||||
mock_content_cp1251_str,
|
||||
# Результат для третьей попытки (Latin-1), на всякий случай
|
||||
mock_content_cp1251_str
|
||||
]
|
||||
with patch.object(Path, 'read_text', mock_read_text):
|
||||
dummy_path = Path("dummy_cp1251.txt")
|
||||
result = indexer.extract_text_from_txt(dummy_path)
|
||||
assert result == mock_content_cp1251_str
|
||||
# Проверяем, что было две попытки вызова read_text
|
||||
assert mock_read_text.call_count == 2
|
||||
# Проверяем аргументы кодировки для каждой попытки
|
||||
assert mock_read_text.call_args_list[0][1]['encoding'] == 'utf-8'
|
||||
assert mock_read_text.call_args_list[1][1]['encoding'] == 'cp1251'
|
||||
|
||||
|
||||
def test_extract_text_from_txt_unknown_encoding():
|
||||
"""Тестирует случай, когда ни одна кодировка не подходит."""
|
||||
m_open = mock_open()
|
||||
handle_mock = m_open()
|
||||
# Имитируем ошибку чтения для всех кодировок
|
||||
handle_mock.read.return_value = b'\xff\xfe' # BOM UTF-16, который не пройдет наши проверки
|
||||
|
||||
with patch('pathlib.Path.open', m_open), \
|
||||
pytest.raises(ValueError, match="Unknown encoding for dummy_unknown.txt"):
|
||||
indexer.extract_text_from_txt(Path("dummy_unknown.txt"))
|
||||
assert m_open.call_count == 3 # Попытки utf-8, cp1251, latin-1
|
||||
# Имитируем ошибку UnicodeDecodeError для всех трех попыток
|
||||
mock_read_text = MagicMock(name='read_text_mock_fail')
|
||||
mock_read_text.side_effect = [
|
||||
UnicodeDecodeError('utf-8', b'\xff\xfe', 0, 1, 'invalid bom'),
|
||||
UnicodeDecodeError('cp1251', b'\xff\xfe', 0, 1, 'invalid sequence'),
|
||||
UnicodeDecodeError('latin-1', b'\xff\xfe', 0, 1, 'invalid sequence')
|
||||
]
|
||||
with patch.object(Path, 'read_text', mock_read_text), \
|
||||
pytest.raises(ValueError, match="Unknown encoding for dummy_unknown.txt"): # Ожидаем ValueError
|
||||
dummy_path = Path("dummy_unknown.txt")
|
||||
indexer.extract_text_from_txt(dummy_path)
|
||||
# Проверяем, что было сделано 3 попытки вызова read_text
|
||||
assert mock_read_text.call_count == 3
|
||||
|
||||
|
||||
# ... (тесты для PDF и EPUB остаются как были, но можно проверить аргументы) ...
|
||||
@patch('backend.indexer.pdf_extract_text', return_value="PDF content here")
|
||||
def test_extract_text_from_pdf_success(mock_pdf_extract):
|
||||
result = indexer.extract_text_from_pdf(Path("dummy.pdf"))
|
||||
"""Тестирует успешное извлечение текста из PDF."""
|
||||
dummy_path = Path("dummy.pdf")
|
||||
result = indexer.extract_text_from_pdf(dummy_path)
|
||||
assert result == "PDF content here"
|
||||
mock_pdf_extract.assert_called_once_with(str(Path("dummy.pdf"))) # Убедимся, что передается строка
|
||||
# Убедимся, что pdf_extract_text вызывается со строковым представлением пути
|
||||
mock_pdf_extract.assert_called_once_with(str(dummy_path))
|
||||
|
||||
|
||||
@patch('backend.indexer.pdf_extract_text', side_effect=indexer.PDFSyntaxError("Bad PDF"))
|
||||
def test_extract_text_from_pdf_syntax_error(mock_pdf_extract):
|
||||
"""Тестирует обработку PDFSyntaxError."""
|
||||
dummy_path = Path("dummy.pdf")
|
||||
with pytest.raises(ValueError, match="Ошибка синтаксиса PDF: dummy.pdf"):
|
||||
indexer.extract_text_from_pdf(Path("dummy.pdf"))
|
||||
indexer.extract_text_from_pdf(dummy_path)
|
||||
|
||||
|
||||
@patch('backend.indexer.epub.read_epub')
|
||||
def test_extract_text_from_epub_success(mock_read_epub):
|
||||
"""Тестирует успешное извлечение текста из EPUB."""
|
||||
# Создаем моки для epub объектов
|
||||
mock_item1 = MagicMock(); mock_item1.get_type.return_value = indexer.ITEM_DOCUMENT
|
||||
mock_item1.content = b"<html><body><p>First paragraph.</p></body></html>"
|
||||
mock_item2 = MagicMock(); mock_item2.get_type.return_value = indexer.ITEM_DOCUMENT
|
||||
mock_item2.content = b"<html><body><style>.test{}</style><p>Second.</p><script>alert();</script><div>Third</div></body></html>"
|
||||
mock_book = MagicMock(); mock_book.get_items_of_type.return_value = [mock_item1, mock_item2]
|
||||
mock_read_epub.return_value = mock_book
|
||||
result = indexer.extract_text_from_epub(Path("dummy.epub"))
|
||||
assert result == "First paragraph.\n\nSecond.\nThird" # Проверяем результат с учетом \n и strip
|
||||
mock_read_epub.assert_called_once_with(str(Path("dummy.epub"))) # Убедимся, что передается строка
|
||||
mock_read_epub.return_value = mock_book # Настраиваем мок read_epub
|
||||
|
||||
dummy_path = Path("dummy.epub")
|
||||
result = indexer.extract_text_from_epub(dummy_path)
|
||||
# Проверяем результат с учетом удаления тегов и добавления переносов строк
|
||||
assert result == "First paragraph.\n\nSecond.\nThird"
|
||||
# Убедимся, что read_epub вызывается со строковым представлением пути
|
||||
mock_read_epub.assert_called_once_with(str(dummy_path))
|
||||
mock_book.get_items_of_type.assert_called_once_with(indexer.ITEM_DOCUMENT)
|
||||
|
||||
|
||||
@patch('backend.indexer.epub.read_epub', side_effect=Exception("EPUB Read Error"))
|
||||
def test_extract_text_from_epub_error(mock_read_epub):
|
||||
"""Тестирует обработку общей ошибки при чтении EPUB."""
|
||||
dummy_path = Path("dummy.epub")
|
||||
with pytest.raises(IOError, match="Не удалось обработать EPUB файл dummy.epub"):
|
||||
indexer.extract_text_from_epub(Path("dummy.epub"))
|
||||
indexer.extract_text_from_epub(dummy_path)
|
||||
|
||||
|
||||
# --- Тесты для process_file ---
|
||||
|
||||
# Патчим stat() вместо getmtime, так как process_file был изменен
|
||||
@patch('pathlib.Path.stat')
|
||||
@patch('backend.indexer.extract_text_from_txt', return_value="Текстовый контент")
|
||||
# Патчим open, чтобы read_text внутри extract_text_from_txt работал
|
||||
@patch('pathlib.Path.open', new_callable=mock_open, read_data=b'content')
|
||||
def test_process_file_txt_success(mock_file_open, mock_extract, mock_stat):
|
||||
# Мокируем зависимости для process_file
|
||||
@patch('pathlib.Path.stat') # Используется для получения mtime
|
||||
@patch('backend.indexer.extract_text_from_txt') # Мокируем функцию извлечения
|
||||
@patch('backend.indexer.time.time', return_value=99999.99) # Мокируем текущее время
|
||||
def test_process_file_txt_success(mock_time, mock_extract, mock_stat):
|
||||
"""Тестирует успешную обработку TXT файла."""
|
||||
mock_stat_result = MagicMock()
|
||||
mock_stat_result.st_mtime = 12345.67
|
||||
# Настройка моков
|
||||
mock_stat_result = MagicMock(); mock_stat_result.st_mtime = 12345.67
|
||||
mock_stat.return_value = mock_stat_result
|
||||
mock_extract.return_value="Текстовый контент" # Что вернет функция извлечения
|
||||
|
||||
p = Path("file.txt")
|
||||
# Мокаем suffix напрямую у экземпляра Path, который будет создан
|
||||
with patch.object(Path, 'suffix', '.txt'):
|
||||
# Мокаем и name для единообразия
|
||||
with patch.object(Path, 'name', "file.txt"):
|
||||
result = indexer.process_file(p)
|
||||
# Создаем мок объекта Path для передачи в process_file
|
||||
p = MagicMock(spec=Path)
|
||||
p.name = "file.txt"
|
||||
p.suffix = ".txt"
|
||||
# Важно: нужно чтобы str(p) возвращало имя файла для логов и т.п.
|
||||
p.__str__.return_value = "file.txt"
|
||||
|
||||
result = indexer.process_file(p)
|
||||
|
||||
assert result is not None
|
||||
assert result["id"] == "file.txt"
|
||||
assert result["content"] == "Текстовый контент"
|
||||
assert result["file_mtime"] == 12345.67
|
||||
mock_extract.assert_called_once_with(p) # Проверяем вызов extract
|
||||
mock_stat.assert_called_once() # Проверяем вызов stat
|
||||
assert result["indexed_at"] == 99999.99 # Проверяем мок времени
|
||||
# Проверяем, что были вызваны нужные функции/методы
|
||||
mock_extract.assert_called_once_with(p)
|
||||
p.stat.assert_called_once() # Проверяем вызов stat у мока Path
|
||||
|
||||
|
||||
@patch('pathlib.Path.stat') # Нужен для вызова внутри process_file
|
||||
@patch('pathlib.Path.stat') # stat не должен вызваться
|
||||
@patch('backend.indexer.extract_text_from_pdf', side_effect=IOError("PDF read failed"))
|
||||
def test_process_file_pdf_error(mock_extract, mock_stat):
|
||||
"""Тестирует обработку ошибки при извлечении текста из PDF."""
|
||||
# Мокируем stat, хотя он может не вызваться из-за раннего return
|
||||
mock_stat_result = MagicMock(); mock_stat_result.st_mtime = 12345.67
|
||||
mock_stat.return_value = mock_stat_result
|
||||
p = MagicMock(spec=Path)
|
||||
p.name = "broken.pdf"
|
||||
p.suffix = ".pdf"
|
||||
p.__str__.return_value = "broken.pdf"
|
||||
|
||||
p = Path("broken.pdf")
|
||||
with patch.object(Path, 'suffix', '.pdf'):
|
||||
with patch.object(Path, 'name', "broken.pdf"):
|
||||
result = indexer.process_file(p)
|
||||
result = indexer.process_file(p)
|
||||
|
||||
assert result is None # Ожидаем None при ошибке извлечения
|
||||
mock_extract.assert_called_once_with(p)
|
||||
mock_stat.assert_not_called() # stat не должен вызваться, так как ошибка была раньше
|
||||
mock_extract.assert_called_once_with(p) # Проверяем, что попытка извлечения была
|
||||
mock_stat.assert_not_called() # stat не должен вызваться, т.к. функция вышла раньше
|
||||
|
||||
|
||||
# МОК для stat НЕ НУЖЕН, так как функция выйдет раньше
|
||||
@patch('backend.indexer.extract_text_from_txt') # Патчим на всякий случай
|
||||
# Мокируем все функции извлечения, чтобы убедиться, что они НЕ вызываются
|
||||
@patch('backend.indexer.extract_text_from_txt')
|
||||
@patch('backend.indexer.extract_text_from_pdf')
|
||||
@patch('backend.indexer.extract_text_from_epub')
|
||||
def test_process_file_unsupported_extension(mock_epub, mock_pdf, mock_txt):
|
||||
@patch('pathlib.Path.stat') # stat тоже не должен вызваться
|
||||
def test_process_file_unsupported_extension(mock_stat, mock_epub, mock_pdf, mock_txt):
|
||||
"""Тестирует обработку файла с неподдерживаемым расширением."""
|
||||
p = Path("image.jpg")
|
||||
# Мокаем только suffix
|
||||
with patch.object(Path, 'suffix', '.jpg'):
|
||||
with patch.object(Path, 'name', "image.jpg"):
|
||||
result = indexer.process_file(p)
|
||||
p = MagicMock(spec=Path)
|
||||
p.name = "image.jpg"
|
||||
p.suffix = ".jpg"
|
||||
p.__str__.return_value = "image.jpg"
|
||||
|
||||
result = indexer.process_file(p)
|
||||
|
||||
assert result is None
|
||||
# Убедимся, что функции извлечения не вызывались
|
||||
# Убедимся, что ни одна функция извлечения не вызывалась
|
||||
mock_txt.assert_not_called()
|
||||
mock_pdf.assert_not_called()
|
||||
mock_epub.assert_not_called()
|
||||
mock_stat.assert_not_called()
|
||||
|
||||
|
||||
# --- Тесты для основной логики (scan_and_index_files) ---
|
||||
# (Остается как был, т.к. он мокирует process_file целиком)
|
||||
@patch('backend.indexer.Path')
|
||||
# Мокируем все внешние зависимости scan_and_index_files
|
||||
@patch('backend.indexer.Path') # Мокируем сам класс Path
|
||||
@patch('backend.indexer.get_meili_client')
|
||||
@patch('backend.indexer.get_indexed_files')
|
||||
@patch('backend.indexer.update_meili_index')
|
||||
@patch('backend.indexer.delete_from_meili_index')
|
||||
@patch('backend.indexer.process_file')
|
||||
@patch('backend.indexer.process_file') # Мокируем обработку отдельного файла
|
||||
def test_scan_and_index_new_file(
|
||||
mock_process_file, mock_delete, mock_update, mock_get_indexed, mock_client_func, MockPath
|
||||
):
|
||||
"""Тестирует сценарий добавления нового файла."""
|
||||
# Настройка моков
|
||||
# 1. Настройка моков
|
||||
# Мок клиента Meilisearch
|
||||
mock_meili_client_instance = MagicMock()
|
||||
mock_client_func.return_value = mock_meili_client_instance
|
||||
mock_get_indexed.return_value = {} # Индекс пуст
|
||||
# Мок ответа от Meilisearch (индекс пуст)
|
||||
mock_get_indexed.return_value = {}
|
||||
|
||||
# Имитация экземпляра Path для директории
|
||||
# Мок объекта Path, представляющего директорию
|
||||
mock_target_dir_instance = MagicMock(spec=Path)
|
||||
mock_target_dir_instance.is_dir.return_value = True
|
||||
# Настроим конструктор Path, чтобы он возвращал наш мок директории
|
||||
MockPath.return_value = mock_target_dir_instance
|
||||
|
||||
# Имитация файла на диске, возвращаемого rglob
|
||||
# Мок объекта Path, представляющего новый файл
|
||||
new_file_path_mock = MagicMock(spec=Path)
|
||||
new_file_path_mock.name = "new.txt"
|
||||
new_file_path_mock.is_file.return_value = True
|
||||
|
@ -201,26 +228,40 @@ def test_scan_and_index_new_file(
|
|||
# Настроим rglob на возврат этого мок-файла
|
||||
mock_target_dir_instance.rglob.return_value = [new_file_path_mock]
|
||||
|
||||
# Настроим конструктор Path
|
||||
MockPath.return_value = mock_target_dir_instance
|
||||
|
||||
# Имитация успешной обработки файла process_file
|
||||
# Мок результата успешной обработки файла
|
||||
mock_process_file.return_value = {"id": "new.txt", "content": "new file", "file_mtime": 100.0, "indexed_at": 101.0}
|
||||
|
||||
# Запуск функции
|
||||
# 2. Запуск тестируемой функции
|
||||
indexer.scan_and_index_files()
|
||||
|
||||
# Проверки (остаются как были)
|
||||
# 3. Проверки вызовов
|
||||
# Проверяем, что был создан Path для нужной директории
|
||||
MockPath.assert_called_once_with(indexer.FILES_DIR)
|
||||
# Проверяем, что была вызвана проверка is_dir
|
||||
mock_target_dir_instance.is_dir.assert_called_once()
|
||||
# Проверяем получение клиента Meili
|
||||
mock_client_func.assert_called_once()
|
||||
# Проверяем получение состояния индекса
|
||||
mock_get_indexed.assert_called_once_with(mock_meili_client_instance)
|
||||
# Проверяем сканирование файлов
|
||||
mock_target_dir_instance.rglob.assert_called_once_with('*')
|
||||
# Проверяем вызов stat для найденного файла
|
||||
new_file_path_mock.stat.assert_called_once()
|
||||
# Проверяем вызов обработки файла
|
||||
mock_process_file.assert_called_once_with(new_file_path_mock)
|
||||
# Проверяем вызов обновления индекса
|
||||
mock_update.assert_called_once()
|
||||
# Проверяем аргументы, переданные в update_meili_index
|
||||
call_args, _ = mock_update.call_args
|
||||
assert call_args[0] is mock_meili_client_instance
|
||||
assert len(call_args[1]) == 1
|
||||
assert call_args[1][0]["id"] == "new.txt"
|
||||
assert call_args[0] is mock_meili_client_instance # Проверяем переданный клиент
|
||||
assert len(call_args[1]) == 1 # Проверяем, что передан один документ
|
||||
assert call_args[1][0]["id"] == "new.txt" # Проверяем id документа
|
||||
# Проверяем, что удаление не вызывалось
|
||||
mock_delete.assert_not_called()
|
||||
|
||||
# TODO: Добавить больше тестов для scan_and_index_files, покрывающих другие сценарии:
|
||||
# - Обновление существующего файла
|
||||
# - Удаление файла
|
||||
# - Файл не изменился
|
||||
# - Ошибка при обработке файла (process_file возвращает None)
|
||||
# - Ошибки при взаимодействии с Meilisearch (в get_indexed_files, update, delete)
|
||||
|
|
Loading…
Add table
Reference in a new issue