diff --git a/extract_text_oldnew.py b/extract_text_oldnew.py new file mode 100644 index 0000000..80e7369 --- /dev/null +++ b/extract_text_oldnew.py @@ -0,0 +1,320 @@ +import os +import ebooklib +from ebooklib import epub +from bs4 import BeautifulSoup +import time +from threading import Lock +from elasticsearch import Elasticsearch +import PyPDF2 + +# Elasticsearch Configuration +ELASTICSEARCH_HOST = os.environ.get("ELASTICSEARCH_HOST", "localhost") +ELASTICSEARCH_PORT = int(os.environ.get("ELASTICSEARCH_PORT", 9200)) +INDEX_NAME = "book_index" + +# Global variables for progress tracking +indexing_progress = { + 'total_files': 0, + 'processed_files': 0, + 'start_time': None, + 'is_running': False, + 'current_file': '', + 'errors': [] +} +progress_lock = Lock() + +# Initialize Elasticsearch client +es = None +try: + es = Elasticsearch([{'host': ELASTICSEARCH_HOST, 'port': ELASTICSEARCH_PORT, 'scheme': 'http'}]) +except Exception as e: + print(f"Error connecting to Elasticsearch: {e}") + +def create_index(): + """Create the Elasticsearch index if it doesn't exist.""" + if es and not es.indices.exists(index=INDEX_NAME): + es.indices.create(index=INDEX_NAME, body={ + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "properties": { + "file_path": {"type": "keyword"}, + "content": {"type": "text"} + } + } + }) + +def extract_text_from_epub(epub_path, progress_lock=None, indexing_progress=None): + """Extract text from EPUB with robust error handling. + Args: + epub_path: Path to EPUB file + progress_lock: Optional threading lock for progress updates + indexing_progress: Optional dict containing 'errors' list for tracking + Returns tuple of (extracted_text, error_messages)""" + text = '' + errors = [] + + def add_error(msg): + errors.append(msg) + if indexing_progress is not None and progress_lock is not None: + with progress_lock: + indexing_progress['errors'].append(msg) + + if not os.path.exists(epub_path): + add_error(f"EPUB file not found: {epub_path}") + return '', errors + + try: + # Start with book parsing information + info_messages = [] + info_messages.append(f"[INFO] Starting to parse EPUB file: {epub_path}") + + book = epub.read_epub(epub_path) + + # Add book metadata + info_messages.append("[INFO] Book Metadata:") + info_messages.append(f"[INFO] Title: {book.get_metadata('DC', 'title')[0][0] if book.get_metadata('DC', 'title') else 'Unknown'}") + info_messages.append(f"[INFO] Author: {book.get_metadata('DC', 'creator')[0][0] if book.get_metadata('DC', 'creator') else 'Unknown'}") + info_messages.append(f"[INFO] ID: {book.get_metadata('DC', 'identifier')[0][0] if book.get_metadata('DC', 'identifier') else 'Unknown'}") + info_messages.append(f"[INFO] Language: {book.get_metadata('DC', 'language')[0][0] if book.get_metadata('DC', 'language') else 'Unknown'}") + + items = book.get_items() + if items is None: + return '\n'.join(info_messages), errors + + + # Add EPUB structure summary and get items collection (it doesn't work otherwise on some test fixes) + media_types = {} + collected_items =[] + for item in items: + media_types[item.media_type] = media_types.get(item.media_type, 0) + 1 + # This is necessary to work around a bug in some test files where items not returned otherwise + collected_items.append(item) + + info_messages.append("[INFO] EPUB Structure Summary:") + for media_type, count in media_types.items(): + info_messages.append(f"[INFO] {media_type}: {count} items") + + + # Get spine items first to validate references + spine_items = [] + try: + spine = book.spine + spine_items = [book.get_item_with_id(item[0]) for item in spine] + info_messages.append(f"[INFO] Parsing spine (reading order)...") + info_messages.append(f"[INFO] Found {len(spine_items)} items in spine") + + # Handle None items in spine + for i, item in enumerate(spine_items): + if item is None: + add_error(f"Skipping missing spine item {spine[i][0]} in {epub_path}") + except Exception as e: + add_error(f"Error getting EPUB spine: {str(e)}") + + info_messages.append("[INFO] Extracting text from content documents...") + content_text = '' + content_items = 0 + for item in collected_items: + if item.media_type == 'application/xhtml+xml': + try: + # Safely get item name for logging + item_name = getattr(item, 'get_name', lambda: 'unnamed')() + #info_messages.append(f"[INFO] Processing content document: {item_name} (in spine: {item in spine_items})") + + content = item.get_content().decode('utf-8') + soup = BeautifulSoup(content, 'html.parser') + extracted = soup.get_text(separator=' ', strip=True) + content_text += extracted + '\n' + info_messages.append(f"{item_name} contains {len(extracted)} characters") + content_items += 1 + except UnicodeDecodeError: + try: + content = item.get_content().decode('latin-1') + soup = BeautifulSoup(content, 'html.parser') + extracted = soup.get_text(separator=' ', strip=True) + content_text += extracted + '\n' + info_messages.append(f"[INFO] Extracted {len(extracted)} characters from {item_name} (using latin-1 fallback)") + content_items += 1 + except Exception as e: + add_error(f"Error parsing EPUB item {item_name}: {str(e)}") + except Exception as e: + add_error(f"Error parsing EPUB item {item_name}: {str(e)}") + + # Combine info messages and content + full_text = '\n'.join(info_messages) + full_text += f"\n[INFO] Completed text extraction: {len(content_text)} characters from {content_items} content documents" + full_text += f"\n[INFO] Total extracted text length: {len(content_text)} characters" + full_text += f"\n\n{content_text.strip()}" + return full_text, errors + except Exception as e: + add_error(f"Error processing EPUB {epub_path}: {str(e)}") + return text, errors + +def get_progress(): + """Get the current indexing progress. + Returns None if indexing is not running, otherwise returns a dictionary with progress information.""" + with progress_lock: + if not indexing_progress['is_running']: + return None + + progress = indexing_progress.copy() + if progress['total_files'] > 0: + progress['percentage'] = (progress['processed_files'] / progress['total_files']) * 100 + else: + progress['percentage'] = 0 + + elapsed = time.time() - progress['start_time'] if progress['start_time'] else 0 + progress['elapsed_time'] = elapsed + if progress['processed_files'] > 0: + time_per_file = elapsed / progress['processed_files'] + remaining_files = progress['total_files'] - progress['processed_files'] + progress['estimated_remaining'] = time_per_file * remaining_files + progress['estimated_completion'] = time.time() + progress['estimated_remaining'] + else: + progress['estimated_remaining'] = 0 + progress['estimated_completion'] = 0 + + return progress + +def extract_text_from_pdf(pdf_path, progress_lock=None, indexing_progress=None): + """Extract text from PDF with robust error handling. + Args: + pdf_path: Path to PDF file + progress_lock: Optional threading lock for progress updates + indexing_progress: Optional dict containing 'errors' list for tracking + Returns tuple of (extracted_text, error_messages)""" + text = '' + errors = [] + + def add_error(msg): + errors.append(msg) + if indexing_progress is not None and progress_lock is not None: + with progress_lock: + indexing_progress['errors'].append(msg) + + # Validate input file + if not os.path.exists(pdf_path): + add_error(f"File not found: {pdf_path}") + return '', errors + if not os.access(pdf_path, os.R_OK): + add_error(f"File not readable: {pdf_path}") + return '', errors + + try: + with open(pdf_path, 'rb') as pdf_file: + try: + pdf_reader = PyPDF2.PdfReader(pdf_file) + total_pages = len(pdf_reader.pages) + + for page_num in range(total_pages): + try: + page = pdf_reader.pages[page_num] + page_text = page.extract_text() + if page_text: + text += page_text + "\n" + except Exception as page_error: + add_error(f"Page {page_num+1}/{total_pages}: {str(page_error)}") + continue + + except Exception as pdf_error: + add_error(f"PDF processing error: {str(pdf_error)}") + + except Exception as file_error: + add_error(f"File access error: {str(file_error)}") + + return text, errors + +def index_files(directory): + """Index files in the specified directory. + This function scans the directory for EPUB, PDF, and TXT files, + extracts text from them, and indexes the content in Elasticsearch.""" + global indexing_progress + + with progress_lock: + indexing_progress = { + 'total_files': 0, + 'processed_files': 0, + 'start_time': time.time(), + 'is_running': True, + 'current_file': '', + 'errors': [] + } + + try: + # Create the Elasticsearch index if it doesn't exist + if es: + create_index() + else: + with progress_lock: + indexing_progress['errors'].append("Elasticsearch connection not available") + return + + # First count all files + total_files = 0 + for root, _, files in os.walk(directory): + for file in files: + if file.lower().endswith(('.epub', '.pdf', '.txt')): + total_files += 1 + + with progress_lock: + indexing_progress['total_files'] = total_files + + # Now process files + for root, _, files in os.walk(directory): + for file in files: + file_path = os.path.join(root, file) + + with progress_lock: + indexing_progress['current_file'] = file_path + + try: + text = "" + errors = [] + + if file_path.lower().endswith(".epub"): + text, errors = extract_text_from_epub( + file_path, + progress_lock=progress_lock, + indexing_progress=indexing_progress + ) + elif file_path.lower().endswith(".pdf"): + text, errors = extract_text_from_pdf( + file_path, + progress_lock=progress_lock, + indexing_progress=indexing_progress + ) + elif file_path.lower().endswith(".txt"): + try: + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + text = f.read() + except Exception as e: + with progress_lock: + indexing_progress['errors'].append(f"Error reading {file_path}: {str(e)}") + continue + else: + print(f"Skipping unsupported file type: {file_path}") + continue + + # Index the document in Elasticsearch + if es and text: + doc = { + 'file_path': file_path, + 'content': text + } + es.index(index=INDEX_NAME, document=doc) + print(f"Indexed: {file_path}") + + with progress_lock: + indexing_progress['processed_files'] += 1 + + except Exception as e: + error_msg = f"Error indexing {file_path}: {str(e)}" + print(error_msg) + with progress_lock: + indexing_progress['errors'].append(error_msg) + + finally: + with progress_lock: + indexing_progress['is_running'] = False \ No newline at end of file diff --git a/test_data/testfile2.epub b/test_data/testfile2.epub new file mode 100644 index 0000000..8fd8da1 Binary files /dev/null and b/test_data/testfile2.epub differ diff --git a/test_data/testfile_spine1.epub b/test_data/testfile_spine1.epub new file mode 100644 index 0000000..47235b4 Binary files /dev/null and b/test_data/testfile_spine1.epub differ