mirror of
https://github.com/intari/search2_chatgpt.git
synced 2025-04-19 13:19:09 +00:00
306 lines
16 KiB
Python
306 lines
16 KiB
Python
import os
|
||
import requests
|
||
import time
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import Optional, List, Dict, Any, Tuple, Set
|
||
from pdfminer.high_level import extract_text as pdf_extract_text
|
||
from pdfminer.pdfparser import PDFSyntaxError
|
||
from ebooklib import epub, ITEM_DOCUMENT
|
||
from bs4 import BeautifulSoup
|
||
from dotenv import load_dotenv
|
||
|
||
# Загрузка переменных окружения
|
||
load_dotenv()
|
||
|
||
# Настройка логирования
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Конфигурация
|
||
FILES_DIR: str = os.getenv("LOCAL_STORAGE_PATH", "/mnt/storage")
|
||
SEARCH_ENGINE_URL: str = os.getenv("MEILI_URL", "http://meilisearch:7700")
|
||
MEILI_API_KEY: Optional[str] = os.getenv("MEILI_MASTER_KEY") # Используйте Master Key или Index API Key
|
||
INDEX_NAME: str = "documents"
|
||
BATCH_SIZE: int = 100 # Количество документов для отправки в Meilisearch за раз
|
||
|
||
# --- Функции извлечения текста ---
|
||
|
||
def extract_text_from_txt(file_path: Path) -> str:
|
||
"""Извлекает текст из TXT файла, пробуя разные кодировки."""
|
||
encodings_to_try = ['utf-8', 'cp1251', 'latin-1']
|
||
for encoding in encodings_to_try:
|
||
try:
|
||
return file_path.read_text(encoding=encoding)
|
||
except UnicodeDecodeError:
|
||
continue
|
||
except Exception as e:
|
||
raise IOError(f"Не удалось прочитать TXT файл {file_path} даже после попыток смены кодировки.") from e
|
||
# Если ни одна кодировка не подошла
|
||
logger.warning(f"Не удалось определить кодировку для TXT файла: {file_path.name}. Пропускаем.")
|
||
raise ValueError(f"Unknown encoding for {file_path.name}")
|
||
|
||
|
||
def extract_text_from_pdf(file_path: Path) -> str:
|
||
"""Извлекает текст из PDF файла."""
|
||
try:
|
||
return pdf_extract_text(str(file_path))
|
||
except PDFSyntaxError as e:
|
||
raise ValueError(f"Ошибка синтаксиса PDF: {file_path.name}") from e
|
||
except Exception as e:
|
||
# Ловим другие возможные ошибки pdfminer
|
||
raise IOError(f"Не удалось обработать PDF файл {file_path.name}") from e
|
||
|
||
def extract_text_from_epub(file_path: Path) -> str:
|
||
"""Извлекает текст из EPUB файла."""
|
||
try:
|
||
book = epub.read_epub(str(file_path))
|
||
text_parts: List[str] = []
|
||
for item in book.get_items_of_type(ITEM_DOCUMENT):
|
||
soup = BeautifulSoup(item.content, "html.parser")
|
||
# Удаляем скрипты и стили, чтобы не индексировать их содержимое
|
||
for script_or_style in soup(["script", "style"]):
|
||
script_or_style.decompose()
|
||
# Получаем текст, разделяя блоки параграфами для лучшей читаемости
|
||
# Используем strip=True для удаления лишних пробелов по краям
|
||
block_text = soup.get_text(separator='\n', strip=True)
|
||
if block_text:
|
||
text_parts.append(block_text)
|
||
return "\n\n".join(text_parts) # Разделяем контент разных HTML-файлов двойным переносом строки
|
||
except KeyError as e:
|
||
# Иногда возникает при проблемах с оглавлением или структурой epub
|
||
raise ValueError(f"Ошибка структуры EPUB файла: {file_path.name}, KeyError: {e}") from e
|
||
except Exception as e:
|
||
raise IOError(f"Не удалось обработать EPUB файл {file_path.name}") from e
|
||
|
||
# --- Функции взаимодействия с Meilisearch ---
|
||
|
||
def get_meili_client() -> requests.Session:
|
||
"""Создает и настраивает HTTP клиент для Meilisearch."""
|
||
session = requests.Session()
|
||
headers = {}
|
||
if MEILI_API_KEY:
|
||
headers['Authorization'] = f'Bearer {MEILI_API_KEY}'
|
||
session.headers.update(headers)
|
||
return session
|
||
|
||
def get_indexed_files(client: requests.Session) -> Dict[str, float]:
|
||
"""Получает список ID и время модификации проиндексированных файлов из Meilisearch."""
|
||
indexed_files: Dict[str, float] = {}
|
||
offset = 0
|
||
limit = 1000 # Получаем по 1000 за раз
|
||
url = f"{SEARCH_ENGINE_URL}/indexes/{INDEX_NAME}/documents"
|
||
params = {"limit": limit, "fields": "id,file_mtime"}
|
||
|
||
while True:
|
||
try:
|
||
params["offset"] = offset
|
||
response = client.get(url, params=params)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
results = data.get("results", [])
|
||
if not results:
|
||
break # Больше нет документов
|
||
|
||
for doc in results:
|
||
# Убедимся, что file_mtime существует и является числом
|
||
mtime = doc.get("file_mtime")
|
||
if isinstance(mtime, (int, float)):
|
||
indexed_files[doc['id']] = float(mtime)
|
||
else:
|
||
# Если времени модификации нет, считаем, что файл нужно переиндексировать
|
||
indexed_files[doc['id']] = 0.0
|
||
|
||
offset += len(results)
|
||
|
||
# Защита от бесконечного цикла, если API вернет некорректные данные
|
||
if len(results) < limit:
|
||
break
|
||
|
||
except requests.exceptions.HTTPError as e:
|
||
# Если индекс не найден (404), это нормально при первом запуске
|
||
if e.response.status_code == 404:
|
||
logger.info(f"Индекс '{INDEX_NAME}' не найден. Будет создан при первой индексации.")
|
||
return {} # Возвращаем пустой словарь
|
||
else:
|
||
logger.error(f"Ошибка получения документов из Meilisearch: {e}")
|
||
raise # Передаем ошибку дальше, т.к. не можем продолжить
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"Ошибка соединения с Meilisearch ({url}): {e}")
|
||
raise
|
||
logger.info(f"Найдено {len(indexed_files)} документов в индексе '{INDEX_NAME}'.")
|
||
return indexed_files
|
||
|
||
def update_meili_index(client: requests.Session, documents: List[Dict[str, Any]]) -> None:
|
||
"""Отправляет пакет документов в Meilisearch для добавления/обновления."""
|
||
if not documents:
|
||
return
|
||
url = f"{SEARCH_ENGINE_URL}/indexes/{INDEX_NAME}/documents"
|
||
try:
|
||
# Отправляем частями (батчами)
|
||
for i in range(0, len(documents), BATCH_SIZE):
|
||
batch = documents[i:i + BATCH_SIZE]
|
||
response = client.post(url, json=batch)
|
||
response.raise_for_status()
|
||
task_info = response.json()
|
||
logger.info(f"Отправлено {len(batch)} документов на индексацию. Task UID: {task_info.get('taskUid', 'N/A')}")
|
||
# В продакшене можно добавить мониторинг статуса задачи Meilisearch
|
||
time.sleep(0.1) # Небольшая пауза между батчами
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"Ошибка при отправке документов в Meilisearch: {e}")
|
||
# Можно добавить логику повторной попытки или сохранения неудавшихся батчей
|
||
|
||
def delete_from_meili_index(client: requests.Session, file_ids: List[str]) -> None:
|
||
"""Удаляет документы из Meilisearch по списку ID."""
|
||
if not file_ids:
|
||
return
|
||
url = f"{SEARCH_ENGINE_URL}/indexes/{INDEX_NAME}/documents/delete-batch"
|
||
try:
|
||
# Удаляем частями (батчами)
|
||
for i in range(0, len(file_ids), BATCH_SIZE):
|
||
batch_ids = file_ids[i:i + BATCH_SIZE]
|
||
response = client.post(url, json=batch_ids)
|
||
response.raise_for_status()
|
||
task_info = response.json()
|
||
logger.info(f"Отправлено {len(batch_ids)} ID на удаление. Task UID: {task_info.get('taskUid', 'N/A')}")
|
||
time.sleep(0.1) # Небольшая пауза
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
logger.error(f"Ошибка при удалении документов из Meilisearch: {e}")
|
||
|
||
# --- Основная логика индексации ---
|
||
|
||
def process_file(file_path: Path) -> Optional[Dict[str, Any]]:
|
||
"""Обрабатывает один файл: извлекает текст и формирует документ для Meilisearch."""
|
||
filename = file_path.name
|
||
content: Optional[str] = None
|
||
file_ext = file_path.suffix.lower()
|
||
processed = False # Флаг, что файл был обработан (извлечен текст)
|
||
|
||
try:
|
||
logger.debug(f"Обработка файла: {filename}")
|
||
|
||
# --- Сначала извлекаем текст ---
|
||
if file_ext == ".txt":
|
||
content = extract_text_from_txt(file_path)
|
||
processed = True
|
||
elif file_ext == ".pdf":
|
||
content = extract_text_from_pdf(file_path)
|
||
processed = True
|
||
elif file_ext == ".epub":
|
||
content = extract_text_from_epub(file_path)
|
||
processed = True
|
||
else:
|
||
logger.debug(f"Неподдерживаемый формат файла: {filename}. Пропускаем.")
|
||
return None # Неподдерживаемый формат
|
||
|
||
# --- Если текст не извлечен или пуст ---
|
||
if not processed or content is None or not content.strip():
|
||
logger.warning(f"Не удалось извлечь текст или текст пуст: {filename}")
|
||
return None
|
||
|
||
# --- Только если текст успешно извлечен, получаем mtime ---
|
||
file_mtime = file_path.stat().st_mtime # Используем stat() как более надежный способ
|
||
|
||
# Формируем документ для Meilisearch
|
||
document = {
|
||
"id": filename, # Используем имя файла как уникальный ID
|
||
"content": content.strip(),
|
||
"file_mtime": file_mtime, # Сохраняем время модификации
|
||
"indexed_at": time.time() # Время последней индексации
|
||
}
|
||
return document
|
||
|
||
except (ValueError, IOError, FileNotFoundError, Exception) as e: # Добавим FileNotFoundError на всякий случай
|
||
# Ловим ошибки чтения, парсинга или другие проблемы с файлом
|
||
logger.error(f"❌ Ошибка обработки файла {filename}: {e}")
|
||
return None # Пропускаем этот файл
|
||
|
||
def scan_and_index_files() -> None:
|
||
"""Сканирует директорию, сравнивает с индексом и обновляет Meilisearch."""
|
||
logger.info(f"🚀 Запуск сканирования директории: {FILES_DIR}")
|
||
target_dir = Path(FILES_DIR)
|
||
if not target_dir.is_dir():
|
||
logger.error(f"Директория не найдена: {FILES_DIR}")
|
||
return
|
||
|
||
client = get_meili_client()
|
||
|
||
# 1. Получаем состояние индекса
|
||
try:
|
||
indexed_files_mtimes: Dict[str, float] = get_indexed_files(client)
|
||
except Exception as e:
|
||
logger.error(f"Не удалось получить состояние индекса. Прерывание: {e}")
|
||
return
|
||
|
||
# 2. Сканируем локальные файлы
|
||
local_files_mtimes: Dict[str, float] = {}
|
||
files_to_process: List[Path] = []
|
||
processed_extensions = {".txt", ".pdf", ".epub"}
|
||
|
||
for item in target_dir.rglob('*'): # Рекурсивно обходим все файлы
|
||
if item.is_file() and item.suffix.lower() in processed_extensions:
|
||
try:
|
||
local_files_mtimes[item.name] = item.stat().st_mtime
|
||
files_to_process.append(item)
|
||
except FileNotFoundError:
|
||
logger.warning(f"Файл был удален во время сканирования: {item.name}")
|
||
continue # Пропускаем, если файл исчез между листингом и stat()
|
||
|
||
logger.info(f"Найдено {len(local_files_mtimes)} поддерживаемых файлов локально.")
|
||
|
||
# 3. Определяем изменения
|
||
local_filenames: Set[str] = set(local_files_mtimes.keys())
|
||
indexed_filenames: Set[str] = set(indexed_files_mtimes.keys())
|
||
|
||
files_to_add: Set[str] = local_filenames - indexed_filenames
|
||
files_to_delete: Set[str] = indexed_filenames - local_filenames
|
||
files_to_check_for_update: Set[str] = local_filenames.intersection(indexed_filenames)
|
||
|
||
files_to_update: Set[str] = {
|
||
fname for fname in files_to_check_for_update
|
||
if local_files_mtimes[fname] > indexed_files_mtimes.get(fname, 0.0) # Сравниваем время модификации
|
||
}
|
||
|
||
logger.info(f"К добавлению: {len(files_to_add)}, к обновлению: {len(files_to_update)}, к удалению: {len(files_to_delete)}")
|
||
|
||
# 4. Обрабатываем и отправляем добавления/обновления
|
||
docs_for_meili: List[Dict[str, Any]] = []
|
||
files_requiring_processing: Set[str] = files_to_add.union(files_to_update)
|
||
|
||
processed_count = 0
|
||
skipped_count = 0
|
||
error_count = 0
|
||
|
||
for file_path in files_to_process:
|
||
if file_path.name in files_requiring_processing:
|
||
processed_count +=1
|
||
document = process_file(file_path)
|
||
if document:
|
||
docs_for_meili.append(document)
|
||
else:
|
||
error_count += 1 # Ошибка или не удалось извлечь текст
|
||
else:
|
||
skipped_count +=1 # Файл не изменился
|
||
|
||
logger.info(f"Обработано файлов: {processed_count} (пропущено без изменений: {skipped_count}, ошибки: {error_count})")
|
||
|
||
if docs_for_meili:
|
||
logger.info(f"Отправка {len(docs_for_meili)} документов в Meilisearch...")
|
||
update_meili_index(client, docs_for_meili)
|
||
else:
|
||
logger.info("Нет новых или обновленных файлов для индексации.")
|
||
|
||
# 5. Удаляем устаревшие документы
|
||
if files_to_delete:
|
||
logger.info(f"Удаление {len(files_to_delete)} устаревших документов из Meilisearch...")
|
||
delete_from_meili_index(client, list(files_to_delete))
|
||
else:
|
||
logger.info("Нет файлов для удаления из индекса.")
|
||
|
||
logger.info("✅ Индексация завершена.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
scan_and_index_files()
|