roocodetests_1/extract_text_oldnew.py

320 lines
No EOL
13 KiB
Python

import os
import ebooklib
from ebooklib import epub
from bs4 import BeautifulSoup
import time
from threading import Lock
from elasticsearch import Elasticsearch
import PyPDF2
# Elasticsearch Configuration
ELASTICSEARCH_HOST = os.environ.get("ELASTICSEARCH_HOST", "localhost")
ELASTICSEARCH_PORT = int(os.environ.get("ELASTICSEARCH_PORT", 9200))
INDEX_NAME = "book_index"
# Global variables for progress tracking
indexing_progress = {
'total_files': 0,
'processed_files': 0,
'start_time': None,
'is_running': False,
'current_file': '',
'errors': []
}
progress_lock = Lock()
# Initialize Elasticsearch client
es = None
try:
es = Elasticsearch([{'host': ELASTICSEARCH_HOST, 'port': ELASTICSEARCH_PORT, 'scheme': 'http'}])
except Exception as e:
print(f"Error connecting to Elasticsearch: {e}")
def create_index():
"""Create the Elasticsearch index if it doesn't exist."""
if es and not es.indices.exists(index=INDEX_NAME):
es.indices.create(index=INDEX_NAME, body={
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"file_path": {"type": "keyword"},
"content": {"type": "text"}
}
}
})
def extract_text_from_epub(epub_path, progress_lock=None, indexing_progress=None):
"""Extract text from EPUB with robust error handling.
Args:
epub_path: Path to EPUB file
progress_lock: Optional threading lock for progress updates
indexing_progress: Optional dict containing 'errors' list for tracking
Returns tuple of (extracted_text, error_messages)"""
text = ''
errors = []
def add_error(msg):
errors.append(msg)
if indexing_progress is not None and progress_lock is not None:
with progress_lock:
indexing_progress['errors'].append(msg)
if not os.path.exists(epub_path):
add_error(f"EPUB file not found: {epub_path}")
return '', errors
try:
# Start with book parsing information
info_messages = []
info_messages.append(f"[INFO] Starting to parse EPUB file: {epub_path}")
book = epub.read_epub(epub_path)
# Add book metadata
info_messages.append("[INFO] Book Metadata:")
info_messages.append(f"[INFO] Title: {book.get_metadata('DC', 'title')[0][0] if book.get_metadata('DC', 'title') else 'Unknown'}")
info_messages.append(f"[INFO] Author: {book.get_metadata('DC', 'creator')[0][0] if book.get_metadata('DC', 'creator') else 'Unknown'}")
info_messages.append(f"[INFO] ID: {book.get_metadata('DC', 'identifier')[0][0] if book.get_metadata('DC', 'identifier') else 'Unknown'}")
info_messages.append(f"[INFO] Language: {book.get_metadata('DC', 'language')[0][0] if book.get_metadata('DC', 'language') else 'Unknown'}")
items = book.get_items()
if items is None:
return '\n'.join(info_messages), errors
# Add EPUB structure summary and get items collection (it doesn't work otherwise on some test fixes)
media_types = {}
collected_items =[]
for item in items:
media_types[item.media_type] = media_types.get(item.media_type, 0) + 1
# This is necessary to work around a bug in some test files where items not returned otherwise
collected_items.append(item)
info_messages.append("[INFO] EPUB Structure Summary:")
for media_type, count in media_types.items():
info_messages.append(f"[INFO] {media_type}: {count} items")
# Get spine items first to validate references
spine_items = []
try:
spine = book.spine
spine_items = [book.get_item_with_id(item[0]) for item in spine]
info_messages.append(f"[INFO] Parsing spine (reading order)...")
info_messages.append(f"[INFO] Found {len(spine_items)} items in spine")
# Handle None items in spine
for i, item in enumerate(spine_items):
if item is None:
add_error(f"Skipping missing spine item {spine[i][0]} in {epub_path}")
except Exception as e:
add_error(f"Error getting EPUB spine: {str(e)}")
info_messages.append("[INFO] Extracting text from content documents...")
content_text = ''
content_items = 0
for item in collected_items:
if item.media_type == 'application/xhtml+xml':
try:
# Safely get item name for logging
item_name = getattr(item, 'get_name', lambda: 'unnamed')()
#info_messages.append(f"[INFO] Processing content document: {item_name} (in spine: {item in spine_items})")
content = item.get_content().decode('utf-8')
soup = BeautifulSoup(content, 'html.parser')
extracted = soup.get_text(separator=' ', strip=True)
content_text += extracted + '\n'
info_messages.append(f"{item_name} contains {len(extracted)} characters")
content_items += 1
except UnicodeDecodeError:
try:
content = item.get_content().decode('latin-1')
soup = BeautifulSoup(content, 'html.parser')
extracted = soup.get_text(separator=' ', strip=True)
content_text += extracted + '\n'
info_messages.append(f"[INFO] Extracted {len(extracted)} characters from {item_name} (using latin-1 fallback)")
content_items += 1
except Exception as e:
add_error(f"Error parsing EPUB item {item_name}: {str(e)}")
except Exception as e:
add_error(f"Error parsing EPUB item {item_name}: {str(e)}")
# Combine info messages and content
full_text = '\n'.join(info_messages)
full_text += f"\n[INFO] Completed text extraction: {len(content_text)} characters from {content_items} content documents"
full_text += f"\n[INFO] Total extracted text length: {len(content_text)} characters"
full_text += f"\n\n{content_text.strip()}"
return full_text, errors
except Exception as e:
add_error(f"Error processing EPUB {epub_path}: {str(e)}")
return text, errors
def get_progress():
"""Get the current indexing progress.
Returns None if indexing is not running, otherwise returns a dictionary with progress information."""
with progress_lock:
if not indexing_progress['is_running']:
return None
progress = indexing_progress.copy()
if progress['total_files'] > 0:
progress['percentage'] = (progress['processed_files'] / progress['total_files']) * 100
else:
progress['percentage'] = 0
elapsed = time.time() - progress['start_time'] if progress['start_time'] else 0
progress['elapsed_time'] = elapsed
if progress['processed_files'] > 0:
time_per_file = elapsed / progress['processed_files']
remaining_files = progress['total_files'] - progress['processed_files']
progress['estimated_remaining'] = time_per_file * remaining_files
progress['estimated_completion'] = time.time() + progress['estimated_remaining']
else:
progress['estimated_remaining'] = 0
progress['estimated_completion'] = 0
return progress
def extract_text_from_pdf(pdf_path, progress_lock=None, indexing_progress=None):
"""Extract text from PDF with robust error handling.
Args:
pdf_path: Path to PDF file
progress_lock: Optional threading lock for progress updates
indexing_progress: Optional dict containing 'errors' list for tracking
Returns tuple of (extracted_text, error_messages)"""
text = ''
errors = []
def add_error(msg):
errors.append(msg)
if indexing_progress is not None and progress_lock is not None:
with progress_lock:
indexing_progress['errors'].append(msg)
# Validate input file
if not os.path.exists(pdf_path):
add_error(f"File not found: {pdf_path}")
return '', errors
if not os.access(pdf_path, os.R_OK):
add_error(f"File not readable: {pdf_path}")
return '', errors
try:
with open(pdf_path, 'rb') as pdf_file:
try:
pdf_reader = PyPDF2.PdfReader(pdf_file)
total_pages = len(pdf_reader.pages)
for page_num in range(total_pages):
try:
page = pdf_reader.pages[page_num]
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
except Exception as page_error:
add_error(f"Page {page_num+1}/{total_pages}: {str(page_error)}")
continue
except Exception as pdf_error:
add_error(f"PDF processing error: {str(pdf_error)}")
except Exception as file_error:
add_error(f"File access error: {str(file_error)}")
return text, errors
def index_files(directory):
"""Index files in the specified directory.
This function scans the directory for EPUB, PDF, and TXT files,
extracts text from them, and indexes the content in Elasticsearch."""
global indexing_progress
with progress_lock:
indexing_progress = {
'total_files': 0,
'processed_files': 0,
'start_time': time.time(),
'is_running': True,
'current_file': '',
'errors': []
}
try:
# Create the Elasticsearch index if it doesn't exist
if es:
create_index()
else:
with progress_lock:
indexing_progress['errors'].append("Elasticsearch connection not available")
return
# First count all files
total_files = 0
for root, _, files in os.walk(directory):
for file in files:
if file.lower().endswith(('.epub', '.pdf', '.txt')):
total_files += 1
with progress_lock:
indexing_progress['total_files'] = total_files
# Now process files
for root, _, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
with progress_lock:
indexing_progress['current_file'] = file_path
try:
text = ""
errors = []
if file_path.lower().endswith(".epub"):
text, errors = extract_text_from_epub(
file_path,
progress_lock=progress_lock,
indexing_progress=indexing_progress
)
elif file_path.lower().endswith(".pdf"):
text, errors = extract_text_from_pdf(
file_path,
progress_lock=progress_lock,
indexing_progress=indexing_progress
)
elif file_path.lower().endswith(".txt"):
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
except Exception as e:
with progress_lock:
indexing_progress['errors'].append(f"Error reading {file_path}: {str(e)}")
continue
else:
print(f"Skipping unsupported file type: {file_path}")
continue
# Index the document in Elasticsearch
if es and text:
doc = {
'file_path': file_path,
'content': text
}
es.index(index=INDEX_NAME, document=doc)
print(f"Indexed: {file_path}")
with progress_lock:
indexing_progress['processed_files'] += 1
except Exception as e:
error_msg = f"Error indexing {file_path}: {str(e)}"
print(error_msg)
with progress_lock:
indexing_progress['errors'].append(error_msg)
finally:
with progress_lock:
indexing_progress['is_running'] = False