feat: replace content processing engine with content-core

This commit is contained in:
LUIS NOVO 2025-05-30 13:35:46 -03:00
parent 77b104c071
commit 36e928eb75
12 changed files with 2214 additions and 2243 deletions

View file

@ -1,145 +0,0 @@
import os
from typing import Any, Dict
import magic
from langgraph.graph import END, START, StateGraph
from loguru import logger
from open_notebook.exceptions import UnsupportedTypeException
from open_notebook.graphs.content_processing.audio import extract_audio
from open_notebook.graphs.content_processing.office import (
SUPPORTED_OFFICE_TYPES,
extract_office_content,
)
from open_notebook.graphs.content_processing.pdf import (
SUPPORTED_FITZ_TYPES,
extract_pdf,
)
from open_notebook.graphs.content_processing.state import ContentState
from open_notebook.graphs.content_processing.text import extract_txt
from open_notebook.graphs.content_processing.url import extract_url, url_provider
from open_notebook.graphs.content_processing.video import extract_best_audio_from_video
from open_notebook.graphs.content_processing.youtube import extract_youtube_transcript
async def source_identification(state: ContentState) -> Dict[str, str]:
"""
Identify the content source based on parameters
"""
if state.get("content"):
doc_type = "text"
elif state.get("file_path"):
doc_type = "file"
elif state.get("url"):
doc_type = "url"
else:
raise ValueError("No source provided.")
return {"source_type": doc_type}
async def file_type(state: ContentState) -> Dict[str, Any]:
"""
Identify the file using python-magic
"""
return_dict = {}
file_path = state.get("file_path")
if file_path is not None:
return_dict["identified_type"] = magic.from_file(file_path, mime=True)
return_dict["title"] = os.path.basename(file_path)
return return_dict
async def file_type_edge(data: ContentState) -> str:
assert data.get("identified_type"), "Type not identified"
identified_type = data["identified_type"]
if identified_type == "text/plain":
return "extract_txt"
elif identified_type in SUPPORTED_FITZ_TYPES:
return "extract_pdf"
elif identified_type in SUPPORTED_OFFICE_TYPES:
return "extract_office_content"
elif identified_type.startswith("video"):
return "extract_best_audio_from_video"
elif identified_type.startswith("audio"):
return "extract_audio"
else:
raise UnsupportedTypeException(
f"Unsupported file type: {data.get('identified_type')}"
)
async def delete_file(data: ContentState) -> Dict[str, Any]:
if data.get("delete_source"):
logger.debug(f"Deleting file: {data.get('file_path')}")
file_path = data.get("file_path")
if file_path is not None:
try:
os.remove(file_path)
return {"file_path": None}
except FileNotFoundError:
logger.warning(f"File not found while trying to delete: {file_path}")
else:
logger.debug("Not deleting file")
return {}
async def url_type_router(x: ContentState) -> str:
return x.get("identified_type", "")
async def source_type_router(x: ContentState) -> str:
return x.get("source_type", "")
# Create workflow
workflow = StateGraph(ContentState)
# Add nodes
workflow.add_node("source", source_identification)
workflow.add_node("url_provider", url_provider)
workflow.add_node("file_type", file_type)
workflow.add_node("extract_txt", extract_txt)
workflow.add_node("extract_pdf", extract_pdf)
workflow.add_node("extract_url", extract_url)
workflow.add_node("extract_office_content", extract_office_content)
workflow.add_node("extract_best_audio_from_video", extract_best_audio_from_video)
workflow.add_node("extract_audio", extract_audio)
workflow.add_node("extract_youtube_transcript", extract_youtube_transcript)
workflow.add_node("delete_file", delete_file)
# Add edges
workflow.add_edge(START, "source")
workflow.add_conditional_edges(
"source",
source_type_router,
{
"url": "url_provider",
"file": "file_type",
"text": END,
},
)
workflow.add_conditional_edges(
"file_type",
file_type_edge,
)
workflow.add_conditional_edges(
"url_provider",
url_type_router,
{"article": "extract_url", "youtube": "extract_youtube_transcript"},
)
workflow.add_edge("url_provider", END)
workflow.add_edge("file_type", END)
workflow.add_edge("extract_url", END)
workflow.add_edge("extract_txt", END)
workflow.add_edge("extract_youtube_transcript", END)
workflow.add_edge("extract_pdf", "delete_file")
workflow.add_edge("extract_office_content", "delete_file")
workflow.add_edge("extract_best_audio_from_video", "extract_audio")
workflow.add_edge("extract_audio", "delete_file")
workflow.add_edge("delete_file", END)
# Compile graph
graph = workflow.compile()

View file

@ -1,114 +0,0 @@
import asyncio
import os
from functools import partial
from math import ceil
from loguru import logger
from pydub import AudioSegment
from open_notebook.domain.models import model_manager
from open_notebook.graphs.content_processing.state import ContentState
# todo: remove reference to model_manager
# future: parallelize the transcription process
async def split_audio(input_file, segment_length_minutes=15, output_prefix=None):
"""
Split an audio file into segments asynchronously.
"""
def _split(input_file, segment_length_minutes, output_prefix):
# Convert input file to absolute path
input_file_abs = os.path.abspath(input_file)
output_dir = os.path.dirname(input_file_abs)
os.makedirs(output_dir, exist_ok=True)
# Set up output prefix
if output_prefix is None:
output_prefix = os.path.splitext(os.path.basename(input_file_abs))[0]
# Load the audio file
audio = AudioSegment.from_file(input_file_abs)
# Calculate segment length in milliseconds
segment_length_ms = segment_length_minutes * 60 * 1000
# Calculate number of segments
total_segments = ceil(len(audio) / segment_length_ms)
logger.debug(f"Splitting file: {input_file_abs} into {total_segments} segments")
output_files = []
# Split the audio into segments
for i in range(total_segments):
start_time = i * segment_length_ms
end_time = min((i + 1) * segment_length_ms, len(audio))
# Extract segment
segment = audio[start_time:end_time]
# Generate output filename
output_filename = f"{output_prefix}_{str(i+1).zfill(3)}.mp3"
output_path = os.path.join(output_dir, output_filename)
# Export segment
segment.export(output_path, format="mp3")
output_files.append(output_path)
logger.debug(f"Exported segment {i+1}/{total_segments}: {output_filename}")
return output_files
# Run CPU-bound audio processing in thread pool
return await asyncio.get_event_loop().run_in_executor(
None, partial(_split, input_file, segment_length_minutes, output_prefix)
)
async def transcribe_audio_segment(audio_file, model):
"""Transcribe a single audio segment asynchronously"""
def _transcribe(audio_file, model):
return model.transcribe(audio_file)
return await asyncio.get_event_loop().run_in_executor(
None, partial(_transcribe, audio_file, model)
)
async def extract_audio(data: ContentState):
SPEECH_TO_TEXT_MODEL = model_manager.speech_to_text
input_audio_path = data.get("file_path")
audio_files = []
try:
# Split audio into segments
audio_files = await split_audio(input_audio_path)
# Transcribe all segments concurrently
transcribe_tasks = [
transcribe_audio_segment(audio_file, SPEECH_TO_TEXT_MODEL)
for audio_file in audio_files
]
transcriptions = await asyncio.gather(*transcribe_tasks)
return {"content": " ".join(transcriptions)}
except Exception as e:
logger.error(f"Error transcribing audio: {str(e)}")
logger.exception(e)
raise
finally:
# Clean up temporary files
def _cleanup(files):
for file in files:
try:
os.remove(file)
except OSError as e:
logger.error(f"Error removing temporary file {file}: {str(e)}")
await asyncio.get_event_loop().run_in_executor(
None, partial(_cleanup, audio_files)
)

View file

@ -1,323 +0,0 @@
import asyncio
from functools import partial
from docx import Document
from loguru import logger
from openpyxl import load_workbook
from pptx import Presentation
from open_notebook.graphs.content_processing.state import ContentState
SUPPORTED_OFFICE_TYPES = [
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
]
async def extract_docx_content_detailed(file_path):
"""Extract content from DOCX file"""
def _extract():
try:
doc = Document(file_path)
content = []
for paragraph in doc.paragraphs:
if not paragraph.text.strip():
continue
style = paragraph.style.name if paragraph.style else "Normal"
text = paragraph.text.strip()
# Get paragraph formatting
p_format = paragraph.paragraph_format
indent = p_format.left_indent or 0
# Convert indent to spaces (1 level = 4 spaces)
indent_level = 0
if hasattr(indent, "pt"):
indent_level = int(indent.pt / 72) # 72 points = 1 inch
indent_spaces = " " * (indent_level * 4)
# Handle different types of formatting
if "Heading" in style:
level = style[-1] if style[-1].isdigit() else "1"
heading_marks = "#" * int(level)
content.append(f"\n{heading_marks} {text}\n")
# Handle bullet points
elif (
paragraph.style
and hasattr(paragraph.style, "name")
and paragraph.style.name.startswith("List")
):
# Numbered list
if (
hasattr(paragraph._p, "pPr")
and paragraph._p.pPr is not None
and hasattr(paragraph._p.pPr, "numPr")
and paragraph._p.pPr.numPr is not None
):
# Try to get the actual number
try:
if (
hasattr(paragraph._p.pPr.numPr, "numId")
and paragraph._p.pPr.numPr.numId is not None
and hasattr(paragraph._p.pPr.numPr.numId, "val")
):
number = paragraph._p.pPr.numPr.numId.val
content.append(f"{indent_spaces}{number}. {text}")
else:
content.append(f"{indent_spaces}1. {text}")
except Exception:
content.append(f"{indent_spaces}1. {text}")
# Bullet list
else:
content.append(f"{indent_spaces}* {text}")
else:
# Handle text formatting
formatted_text = []
for run in paragraph.runs:
if run.bold:
formatted_text.append(f"**{run.text}**")
elif run.italic:
formatted_text.append(f"*{run.text}*")
else:
formatted_text.append(run.text)
content.append(f"{indent_spaces}{''.join(formatted_text)}")
return "\n\n".join(content)
except Exception as e:
logger.error(f"Failed to extract DOCX content: {e}")
return None
return await asyncio.get_event_loop().run_in_executor(None, _extract)
async def get_docx_info(file_path):
"""Get DOCX metadata and content"""
def _get_info():
try:
doc = Document(file_path)
# Extract core properties if available
core_props = {
"author": doc.core_properties.author,
"created": doc.core_properties.created,
"modified": doc.core_properties.modified,
"title": doc.core_properties.title,
"subject": doc.core_properties.subject,
"keywords": doc.core_properties.keywords,
"category": doc.core_properties.category,
"comments": doc.core_properties.comments,
}
# Get document content
content = extract_docx_content_detailed(file_path)
# Get document statistics
stats = {
"paragraph_count": len(doc.paragraphs),
"word_count": sum(
len(p.text.split()) for p in doc.paragraphs if p.text.strip()
),
"character_count": sum(
len(p.text) for p in doc.paragraphs if p.text.strip()
),
}
return {"metadata": core_props, "content": content, "statistics": stats}
except Exception as e:
logger.error(f"Failed to get DOCX info: {e}")
return None
return await asyncio.get_event_loop().run_in_executor(None, _get_info)
async def extract_pptx_content(file_path):
"""Extract content from PPTX file"""
def _extract():
try:
prs = Presentation(file_path)
content = []
for slide_number, slide in enumerate(prs.slides, 1):
content.append(f"\n# Slide {slide_number}\n")
# Extract title
if slide.shapes.title:
content.append(f"## {slide.shapes.title.text}\n")
# Extract text from all shapes
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
if (
shape != slide.shapes.title
): # Skip title as it's already added
content.append(shape.text.strip())
return "\n\n".join(content)
except Exception as e:
logger.error(f"Failed to extract PPTX content: {e}")
return None
return await asyncio.get_event_loop().run_in_executor(None, _extract)
async def extract_xlsx_content(file_path, max_rows=10000, max_cols=100):
"""Extract content from XLSX file"""
def _extract():
try:
wb = load_workbook(file_path, data_only=True)
content = []
for sheet in wb.sheetnames:
ws = wb[sheet]
content.append(f"\n# Sheet: {sheet}\n")
# Get the maximum row and column with data
max_row = min(ws.max_row, max_rows)
max_col = min(ws.max_column, max_cols)
# Create markdown table header
headers = []
for col in range(1, max_col + 1):
cell_value = ws.cell(row=1, column=col).value
headers.append(str(cell_value) if cell_value is not None else "")
content.append("| " + " | ".join(headers) + " |")
content.append("| " + " | ".join(["---"] * len(headers)) + " |")
# Add table content
for row in range(2, max_row + 1):
row_data = []
for col in range(1, max_col + 1):
cell_value = ws.cell(row=row, column=col).value
row_data.append(
str(cell_value) if cell_value is not None else ""
)
content.append("| " + " | ".join(row_data) + " |")
return "\n".join(content)
except Exception as e:
logger.error(f"Failed to extract XLSX content: {e}")
return None
return await asyncio.get_event_loop().run_in_executor(None, partial(_extract))
async def get_pptx_info(file_path):
"""Get PPTX metadata and content"""
def _get_info():
try:
prs = Presentation(file_path)
# Extract basic properties
props = {
"slide_count": len(prs.slides),
"title": "", # PowerPoint doesn't have built-in metadata like Word
}
# Get document content
content = extract_pptx_content(file_path)
# Get presentation statistics
stats = {
"slide_count": len(prs.slides),
"shape_count": sum(len(slide.shapes) for slide in prs.slides),
"text_frame_count": sum(
sum(1 for shape in slide.shapes if hasattr(shape, "text"))
for slide in prs.slides
),
}
return {"metadata": props, "content": content, "statistics": stats}
except Exception as e:
logger.error(f"Failed to get PPTX info: {e}")
return None
return await asyncio.get_event_loop().run_in_executor(None, _get_info)
async def get_xlsx_info(file_path):
"""Get XLSX metadata and content"""
def _get_info():
try:
wb = load_workbook(file_path, data_only=True)
# Extract basic properties
props = {
"sheet_count": len(wb.sheetnames),
"sheets": wb.sheetnames,
"title": wb.properties.title,
"creator": wb.properties.creator,
"created": wb.properties.created,
"modified": wb.properties.modified,
}
# Get document content
content = extract_xlsx_content(file_path)
# Get workbook statistics
stats = {
"sheet_count": len(wb.sheetnames),
"total_rows": sum(sheet.max_row for sheet in wb.worksheets),
"total_columns": sum(sheet.max_column for sheet in wb.worksheets),
}
return {"metadata": props, "content": content, "statistics": stats}
except Exception as e:
logger.error(f"Failed to get XLSX info: {e}")
return None
return await asyncio.get_event_loop().run_in_executor(None, _get_info)
async def extract_office_content(state: ContentState):
"""Universal function to extract content from Office files"""
assert state.get("file_path"), "No file path provided"
assert (
state.get("identified_type") in SUPPORTED_OFFICE_TYPES
), "Unsupported File Type"
file_path = state["file_path"]
doc_type = state["identified_type"]
if (
doc_type
== "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
):
logger.debug("Extracting content from DOCX file")
content = await extract_docx_content_detailed(file_path)
info = await get_docx_info(file_path)
elif (
doc_type
== "application/vnd.openxmlformats-officedocument.presentationml.presentation"
):
logger.debug("Extracting content from PPTX file")
content = await extract_pptx_content(file_path)
info = await get_pptx_info(file_path)
elif (
doc_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
):
logger.debug("Extracting content from XLSX file")
content = await extract_xlsx_content(file_path)
info = await get_xlsx_info(file_path)
else:
raise Exception(f"Unsupported file format: {doc_type}")
del info["content"]
return {"content": content, "metadata": info}

View file

@ -1,170 +0,0 @@
import asyncio
import re
import unicodedata
import fitz # type: ignore
from loguru import logger
from open_notebook.graphs.content_processing.state import ContentState
# todo: find tables - https://pymupdf.readthedocs.io/en/latest/the-basics.html#extracting-tables-from-a-page
# todo: what else can we do to make the text more readable?
# todo: try to fix encoding for some PDF that is still breaking
# def _extract_text_from_pdf(pdf_path):
# doc = fitz.open(pdf_path)
# text = ""
# logger.debug(f"Found {len(doc)} pages in PDF")
# for page in doc:
# # Use encode/decode if you need to clean up any encoding issues
# text += page.get_text().encode('utf-8').decode('utf-8')
# doc.close()
# return text
SUPPORTED_FITZ_TYPES = [
"application/pdf",
"application/epub+zip",
]
def clean_pdf_text(text):
"""
Clean text extracted from PDFs with enhanced space handling.
Preserves special characters like (, ), %, = that are valid in code/math.
Args:
text (str): The raw text extracted from a PDF
Returns:
str: Cleaned text with minimal necessary spacing
"""
if not text:
return text
# Step 1: Normalize Unicode characters
text = unicodedata.normalize("NFKC", text)
# Step 2: Replace common PDF artifacts
replacements = {
# Common ligatures
"": "fi",
"": "fl",
"": "ff",
"": "ffi",
"": "ffl",
# Quotation marks and apostrophes
""": "'", """: "'",
'"': '"',
"": "'",
"": ",",
"": '"',
# Dashes and hyphens
"": "-",
"": "-",
"": "-",
"": "-",
# Other common replacements
"": "...",
"": "*",
"°": " degrees ",
"¹": "1",
"²": "2",
"³": "3",
"©": "(c)",
"®": "(R)",
"": "(TM)",
}
for old, new in replacements.items():
text = text.replace(old, new)
# Step 3: Clean control characters while preserving essential whitespace and special chars
text = "".join(
char
for char in text
if unicodedata.category(char)[0] != "C"
or char in "\n\t "
or char in "()%=[]{}#$@!?.,;:+-*/^<>&|~"
)
# Step 4: Enhanced space cleaning
text = re.sub(r"[ \t]+", " ", text) # Consolidate horizontal whitespace
text = re.sub(r" +\n", "\n", text) # Remove spaces before newlines
text = re.sub(r"\n +", "\n", text) # Remove spaces after newlines
text = re.sub(r"\n\t+", "\n", text) # Remove tabs at start of lines
text = re.sub(r"\t+\n", "\n", text) # Remove tabs at end of lines
text = re.sub(r"\t+", " ", text) # Replace tabs with single space
# Step 5: Remove empty lines while preserving paragraph structure
text = re.sub(r"\n{3,}", "\n\n", text) # Max two consecutive newlines
text = re.sub(r"^\s+", "", text) # Remove leading whitespace
text = re.sub(r"\s+$", "", text) # Remove trailing whitespace
# Step 6: Clean up around punctuation
text = re.sub(r"\s+([.,;:!?)])", r"\1", text) # Remove spaces before punctuation
text = re.sub(r"(\()\s+", r"\1", text) # Remove spaces after opening parenthesis
text = re.sub(
r"\s+([.,])\s+", r"\1 ", text
) # Ensure single space after periods and commas
# Step 7: Remove zero-width and invisible characters
text = re.sub(r"[\u200b\u200c\u200d\ufeff\u200e\u200f]", "", text)
# Step 8: Fix hyphenation and line breaks
text = re.sub(
r"(?<=\w)-\s*\n\s*(?=\w)", "", text
) # Remove hyphenation at line breaks
return text.strip()
async def _extract_text_from_pdf(pdf_path):
doc = fitz.open(pdf_path)
try:
text = ""
logger.debug(f"Found {len(doc)} pages in PDF")
for page in doc:
text += page.get_text()
normalized_text = clean_pdf_text(text)
return normalized_text
finally:
doc.close()
async def _extract_text_from_pdf(pdf_path):
"""Extract text from PDF asynchronously"""
def _extract():
doc = fitz.open(pdf_path)
try:
text = ""
logger.debug(f"Found {len(doc)} pages in PDF")
for page in doc:
text += page.get_text()
return clean_pdf_text(text)
finally:
doc.close()
# Run CPU-bound PDF processing in a thread pool
return await asyncio.get_event_loop().run_in_executor(None, _extract)
async def extract_pdf(state: ContentState):
"""
Parse the PDF file and extract its content asynchronously.
"""
return_dict = {}
assert state.get("file_path"), "No file path provided"
assert state.get("identified_type") in SUPPORTED_FITZ_TYPES, "Unsupported File Type"
if (
state.get("file_path") is not None
and state.get("identified_type") in SUPPORTED_FITZ_TYPES
):
file_path = state.get("file_path")
try:
text = await _extract_text_from_pdf(file_path)
return_dict["content"] = text
except FileNotFoundError:
raise FileNotFoundError(f"File not found at {file_path}")
except Exception as e:
raise Exception(f"An error occurred: {e}")
return return_dict

View file

@ -1,13 +0,0 @@
from typing_extensions import TypedDict
class ContentState(TypedDict):
content: str
file_path: str
url: str
title: str
source_type: str
identified_type: str
identified_provider: str
metadata: dict
delete_source: bool = False

View file

@ -1,40 +0,0 @@
import asyncio
from loguru import logger
from open_notebook.graphs.content_processing.state import ContentState
async def extract_txt(state: ContentState):
"""
Parse the text file and extract its content asynchronously.
"""
return_dict = {}
if (
state.get("file_path") is not None
and state.get("identified_type") == "text/plain"
):
logger.debug(f"Extracting text from {state.get('file_path')}")
file_path = state.get("file_path")
if file_path is not None:
try:
def _read_file():
with open(file_path, "r", encoding="utf-8") as file:
return file.read()
# Run file I/O in thread pool
content = await asyncio.get_event_loop().run_in_executor(
None, _read_file
)
logger.debug(f"Extracted: {content[:100]}")
return_dict["content"] = content
except FileNotFoundError:
raise FileNotFoundError(f"File not found at {file_path}")
except Exception as e:
raise Exception(f"An error occurred: {e}")
return return_dict

View file

@ -1,191 +0,0 @@
import re
from urllib.parse import urlparse
import aiohttp
from bs4 import BeautifulSoup, Comment
from loguru import logger
from open_notebook.graphs.content_processing.state import ContentState
# future: better extraction methods
# https://github.com/buriy/python-readability
# also try readability: from readability import Document
def url_provider(state: ContentState):
"""
Identify the provider
"""
return_dict = {}
url = state.get("url")
if url:
if "youtube.com" in url or "youtu.be" in url:
return_dict["identified_type"] = (
"youtube" # future: playlists, channels in the future
)
else:
return_dict["identified_type"] = "article"
# future: article providers in the future
return return_dict
async def extract_url_bs4(url: str):
"""
Get the title and content of a URL using bs4
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
# If URL is actually HTML content
if url.startswith("<!DOCTYPE html>") or url.startswith("<html"):
html_content = url
else:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=10) as response:
response.raise_for_status()
html_content = await response.text()
soup = BeautifulSoup(html_content, "html.parser")
# Remove unwanted elements
for element in soup.find_all(
["script", "style", "nav", "footer", "iframe", "noscript", "ad"]
):
element.decompose()
# Remove comments
for comment in soup.find_all(text=lambda text: isinstance(text, Comment)):
comment.extract()
# Get title
title = None
title_tags = [
soup.find("meta", property="og:title"),
soup.find("meta", property="twitter:title"),
soup.find("title"),
soup.find("h1"),
]
for tag in title_tags:
if tag:
if tag.string:
title = tag.string
elif tag.get("content"):
title = tag.get("content")
break
# Clean up title
if title:
title = " ".join(title.split())
title = re.sub(r"\s*\|.*$", "", title)
title = re.sub(r"\s*-.*$", "", title)
# Get content
content = []
# Look for main article content
main_content = None
content_tags = [
soup.find("article"),
soup.find("main"),
soup.find(class_=re.compile(r"article|post|content|entry|document")),
soup.find(id=re.compile(r"article|post|content|entry|main")),
]
for tag in content_tags:
if tag:
main_content = tag
break
if not main_content:
main_content = soup
# Process content
for element in main_content.find_all(
["p", "h1", "h2", "h3", "h4", "h5", "h6", "pre", "div"]
):
# Handle code blocks
if element.name == "pre" or "highlight" in element.get("class", []):
code_text = element.get_text().strip()
if code_text:
content.append("\n```\n" + code_text + "\n```\n")
continue
# Handle regular text
text = element.get_text().strip()
if text:
# Skip if text matches common patterns for navigation/footer
if re.search(
r"copyright|all rights reserved|privacy policy|terms of use",
text.lower(),
):
continue
content.append(text)
# Join content with proper spacing
final_content = "\n\n".join(content)
# Clean up content
final_content = re.sub(
r"\n\s*\n\s*\n", "\n\n", final_content
) # Remove extra newlines
final_content = re.sub(r" +", " ", final_content) # Normalize whitespace
final_content = final_content.strip()
return {
"title": title,
"content": final_content,
"domain": urlparse(url).netloc
if not url.startswith("<!DOCTYPE html>")
else None,
"url": url if not url.startswith("<!DOCTYPE html>") else None,
}
except aiohttp.ClientError as e:
logger.error(f"Failed to fetch URL {url}: {e}")
return None
except Exception as e:
logger.error(f"Failed to process content: {e}")
return None
async def extract_url_jina(url: str):
"""
Get the content of a URL using Jina
"""
async with aiohttp.ClientSession() as session:
async with session.get(f"https://r.jina.ai/{url}") as response:
text = await response.text()
if text.startswith("Title:") and "\n" in text:
title_end = text.index("\n")
title = text[6:title_end].strip()
content = text[title_end + 1 :].strip()
logger.debug(
f"Processed url: {url}, found title: {title}, content: {content[:100]}..."
)
return {"title": title, "content": content}
else:
logger.debug(
f"Processed url: {url}, does not have Title prefix, returning full content: {text[:100]}..."
)
return {"content": text}
async def extract_url(state: ContentState):
assert state.get("url"), "No URL provided"
url = state["url"]
try:
result = await extract_url_bs4(url)
if not result or not result.get("content"):
logger.debug(
f"BS4 extraction failed for url {url}, falling back to Jina extractor"
)
result = await extract_url_jina(url)
return result
except Exception as e:
logger.error(f"URL extraction failed for URL: {url}")
logger.exception(e)
return None

View file

@ -1,167 +0,0 @@
import asyncio
import json
import os
import subprocess
from functools import partial
from loguru import logger
from open_notebook.graphs.content_processing.state import ContentState
async def extract_audio_from_video(input_file, output_file, stream_index):
"""
Extract the specified audio stream to MP3 format asynchronously
"""
def _extract(input_file, output_file, stream_index):
try:
cmd = [
"ffmpeg",
"-i",
input_file,
"-map",
f"0:a:{stream_index}", # Select specific audio stream
"-codec:a",
"libmp3lame", # Use MP3 codec
"-q:a",
"2", # High quality setting
"-y", # Overwrite output file if exists
output_file,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"FFmpeg failed: {result.stderr}")
return True
except Exception as e:
logger.error(f"Error extracting audio: {str(e)}")
return False
return await asyncio.get_event_loop().run_in_executor(
None, partial(_extract, input_file, output_file, stream_index)
)
async def get_audio_streams(input_file):
"""
Analyze video file and return information about all audio streams asynchronously
"""
def _analyze(input_file):
logger.debug(f"Analyzing video file {input_file} for audio streams")
try:
cmd = [
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-select_streams",
"a",
input_file,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"FFprobe failed: {result.stderr}")
data = json.loads(result.stdout)
return data.get("streams", [])
except Exception as e:
logger.error(f"Error analyzing file: {str(e)}")
return []
return await asyncio.get_event_loop().run_in_executor(
None, partial(_analyze, input_file)
)
async def select_best_audio_stream(streams):
"""
Select the best audio stream based on various quality metrics
"""
def _select(streams):
if not streams:
logger.debug("No audio streams found")
return None
else:
logger.debug(f"Found {len(streams)} audio streams")
# Score each stream based on various factors
scored_streams = []
for stream in streams:
score = 0
# Prefer higher bit rates
bit_rate = stream.get("bit_rate")
if bit_rate:
score += int(int(bit_rate) / 1000000) # Convert to Mbps and ensure int
# Prefer more channels (stereo over mono)
channels = stream.get("channels", 0)
score += channels * 10
# Prefer higher sample rates
sample_rate = stream.get("sample_rate", "0")
score += int(int(sample_rate) / 48000)
scored_streams.append((score, stream))
# Return the stream with highest score
return max(scored_streams, key=lambda x: x[0])[1]
return await asyncio.get_event_loop().run_in_executor(
None, partial(_select, streams)
)
async def extract_best_audio_from_video(data: ContentState):
"""
Main function to extract the best audio stream from a video file asynchronously
"""
input_file = data.get("file_path")
assert input_file is not None, "Input file path must be provided"
def _check_file(path):
return os.path.exists(path)
file_exists = await asyncio.get_event_loop().run_in_executor(
None, partial(_check_file, input_file)
)
if not file_exists:
logger.critical(f"Input file not found: {input_file}")
return False
base_name = os.path.splitext(input_file)[0]
output_file = f"{base_name}_audio.mp3"
# Get all audio streams
streams = await get_audio_streams(input_file)
if not streams:
logger.debug("No audio streams found in the file")
return False
# Select best stream
best_stream = await select_best_audio_stream(streams)
if not best_stream:
logger.error("Could not determine best audio stream")
return False
# Extract the selected stream
stream_index = streams.index(best_stream)
success = await extract_audio_from_video(input_file, output_file, stream_index)
if success:
logger.debug(f"Successfully extracted audio to: {output_file}")
logger.debug(f"- Channels: {best_stream.get('channels', 'unknown')}")
logger.debug(f"- Sample rate: {best_stream.get('sample_rate', 'unknown')} Hz")
logger.debug(f"- Bit rate: {best_stream.get('bit_rate', 'unknown')} bits/s")
return {"file_path": output_file, "identified_type": "audio/mp3"}

View file

@ -1,159 +0,0 @@
import re
import ssl
import aiohttp
from bs4 import BeautifulSoup
from loguru import logger
from youtube_transcript_api import YouTubeTranscriptApi # type: ignore
from youtube_transcript_api.formatters import TextFormatter # type: ignore
from open_notebook.config import CONFIG
from open_notebook.exceptions import NoTranscriptFound
from open_notebook.graphs.content_processing.state import ContentState
ssl._create_default_https_context = ssl._create_unverified_context
async def get_video_title(video_id):
try:
url = f"https://www.youtube.com/watch?v={video_id}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
html = await response.text()
# BeautifulSoup doesn't support async operations
soup = BeautifulSoup(html, "html.parser")
# YouTube stores title in a meta tag
title = soup.find("meta", property="og:title")["content"]
return title
except Exception as e:
logger.error(f"Failed to get video title: {e}")
return None
def _extract_youtube_id(url):
"""
Extract the YouTube video ID from a given URL using regular expressions.
Args:
url (str): The YouTube URL from which to extract the video ID.
Returns:
str: The extracted YouTube video ID or None if no valid ID is found.
"""
# Define a regular expression pattern to capture the YouTube video ID
youtube_regex = (
r"(?:https?://)?" # Optional scheme
r"(?:www\.)?" # Optional www.
r"(?:"
r"youtu\.be/" # Shortened URL
r"|youtube\.com" # Main URL
r"(?:" # Group start
r"/embed/" # Embed URL
r"|/v/" # Older video URL
r"|/watch\?v=" # Standard watch URL
r"|/watch\?.+&v=" # Other watch URL
r")" # Group end
r")" # End main group
r"([\w-]{11})" # 11 characters (YouTube video ID)
)
# Search the URL for the pattern
match = re.search(youtube_regex, url)
# Return the video ID if a match is found
return match.group(1) if match else None
async def get_best_transcript(video_id, preferred_langs=["en", "es", "pt"]):
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
# First try: Manual transcripts in preferred languages
manual_transcripts = []
try:
for transcript in transcript_list:
if not transcript.is_generated and not transcript.is_translatable:
manual_transcripts.append(transcript)
if manual_transcripts:
# Sort based on preferred language order
for lang in preferred_langs:
for transcript in manual_transcripts:
if transcript.language_code == lang:
return transcript.fetch()
# If no preferred language found, return first manual transcript
return manual_transcripts[0].fetch()
except NoTranscriptFound:
pass
# Second try: Auto-generated transcripts in preferred languages
generated_transcripts = []
try:
for transcript in transcript_list:
if transcript.is_generated and not transcript.is_translatable:
generated_transcripts.append(transcript)
if generated_transcripts:
# Sort based on preferred language order
for lang in preferred_langs:
for transcript in generated_transcripts:
if transcript.language_code == lang:
return transcript.fetch()
# If no preferred language found, return first generated transcript
return generated_transcripts[0].fetch()
except NoTranscriptFound:
pass
# Last try: Translated transcripts in preferred languages
translated_transcripts = []
try:
for transcript in transcript_list:
if transcript.is_translatable:
translated_transcripts.append(transcript)
if translated_transcripts:
# Sort based on preferred language order
for lang in preferred_langs:
for transcript in translated_transcripts:
if transcript.language_code == lang:
return transcript.fetch()
# If no preferred language found, return translation to first preferred language
translation = translated_transcripts[0].translate(preferred_langs[0])
return translation.fetch()
except NoTranscriptFound:
pass
raise Exception("No suitable transcript found")
except Exception as e:
logger.error(f"Failed to get transcript for video {video_id}: {e}")
return None
async def extract_youtube_transcript(state: ContentState):
"""
Parse the text file and print its content.
"""
languages = CONFIG.get("youtube_transcripts", {}).get(
"preferred_languages", ["en", "es", "pt"]
)
video_id = _extract_youtube_id(state.get("url"))
transcript = await get_best_transcript(video_id, languages)
logger.debug(f"Found transcript: {transcript}")
formatter = TextFormatter()
try:
title = await get_video_title(video_id)
except Exception as e:
logger.critical(f"Failed to get video title for video_id: {video_id}")
logger.exception(e)
title = None
return {
"content": formatter.format_transcript(transcript),
"title": title,
}