add async content processing

This commit is contained in:
LUIS NOVO 2024-11-11 17:32:35 -03:00
parent ac2ea9e554
commit 00f070a644
10 changed files with 541 additions and 395 deletions

View file

@ -1,4 +1,5 @@
import os
from typing import Any, Dict
import magic
from langgraph.graph import END, START, StateGraph
@ -21,7 +22,7 @@ from open_notebook.graphs.content_processing.video import extract_best_audio_fro
from open_notebook.graphs.content_processing.youtube import extract_youtube_transcript
def source_identification(state: ContentState):
async def source_identification(state: ContentState) -> Dict[str, str]:
"""
Identify the content source based on parameters
"""
@ -37,7 +38,7 @@ def source_identification(state: ContentState):
return {"source_type": doc_type}
def file_type(state: ContentState):
async def file_type(state: ContentState) -> Dict[str, Any]:
"""
Identify the file using python-magic
"""
@ -49,7 +50,7 @@ def file_type(state: ContentState):
return return_dict
def file_type_edge(data: ContentState):
async def file_type_edge(data: ContentState) -> str:
assert data.get("identified_type"), "Type not identified"
identified_type = data["identified_type"]
@ -69,7 +70,7 @@ def file_type_edge(data: ContentState):
)
def delete_file(data: ContentState):
async def delete_file(data: ContentState) -> Dict[str, Any]:
if data.get("delete_source"):
logger.debug(f"Deleting file: {data.get('file_path')}")
file_path = data.get("file_path")
@ -81,9 +82,21 @@ def delete_file(data: ContentState):
logger.warning(f"File not found while trying to delete: {file_path}")
else:
logger.debug("Not deleting file")
return {}
async def url_type_router(x: ContentState) -> str:
return x.get("identified_type", "")
async def source_type_router(x: ContentState) -> str:
return x.get("source_type", "")
# Create workflow
workflow = StateGraph(ContentState)
# Add nodes
workflow.add_node("source", source_identification)
workflow.add_node("url_provider", url_provider)
workflow.add_node("file_type", file_type)
@ -95,10 +108,12 @@ workflow.add_node("extract_best_audio_from_video", extract_best_audio_from_video
workflow.add_node("extract_audio", extract_audio)
workflow.add_node("extract_youtube_transcript", extract_youtube_transcript)
workflow.add_node("delete_file", delete_file)
# Add edges
workflow.add_edge(START, "source")
workflow.add_conditional_edges(
"source",
lambda x: x.get("source_type"),
source_type_router,
{
"url": "url_provider",
"file": "file_type",
@ -111,7 +126,7 @@ workflow.add_conditional_edges(
)
workflow.add_conditional_edges(
"url_provider",
lambda x: x.get("identified_type"),
url_type_router,
{"article": "extract_url", "youtube": "extract_youtube_transcript"},
)
workflow.add_edge("url_provider", END)
@ -125,4 +140,6 @@ workflow.add_edge("extract_office_content", "delete_file")
workflow.add_edge("extract_best_audio_from_video", "extract_audio")
workflow.add_edge("extract_audio", "delete_file")
workflow.add_edge("delete_file", END)
# Compile graph
graph = workflow.compile()

View file

@ -1,4 +1,6 @@
import asyncio
import os
from functools import partial
from math import ceil
from loguru import logger
@ -11,90 +13,102 @@ from open_notebook.graphs.content_processing.state import ContentState
# future: parallelize the transcription process
def split_audio(input_file, segment_length_minutes=15, output_prefix=None):
async def split_audio(input_file, segment_length_minutes=15, output_prefix=None):
"""
Split an audio file into segments of specified length.
Args:
input_file (str): Path to the input audio file
segment_length_minutes (int): Length of each segment in minutes
output_dir (str): Directory to save the segments (defaults to input file's directory)
output_prefix (str): Prefix for output files (defaults to input filename)
Returns:
list: List of paths to the created segment files
Split an audio file into segments asynchronously.
"""
# Convert input file to absolute path
input_file = os.path.abspath(input_file)
output_dir = os.path.dirname(input_file)
os.makedirs(output_dir, exist_ok=True)
def _split(input_file, segment_length_minutes, output_prefix):
# Convert input file to absolute path
input_file_abs = os.path.abspath(input_file)
output_dir = os.path.dirname(input_file_abs)
os.makedirs(output_dir, exist_ok=True)
# Set up output prefix
if output_prefix is None:
output_prefix = os.path.splitext(os.path.basename(input_file))[0]
# Set up output prefix
if output_prefix is None:
output_prefix = os.path.splitext(os.path.basename(input_file_abs))[0]
# Load the audio file
audio = AudioSegment.from_file(input_file)
# Load the audio file
audio = AudioSegment.from_file(input_file_abs)
# Calculate segment length in milliseconds
segment_length_ms = segment_length_minutes * 60 * 1000
# Calculate segment length in milliseconds
segment_length_ms = segment_length_minutes * 60 * 1000
# Calculate number of segments
total_segments = ceil(len(audio) / segment_length_ms)
logger.debug(f"Splitting file: {input_file} into {total_segments} segments")
# Calculate number of segments
total_segments = ceil(len(audio) / segment_length_ms)
logger.debug(f"Splitting file: {input_file_abs} into {total_segments} segments")
# List to store output file paths
output_files = []
output_files = []
# Split the audio into segments
for i in range(total_segments):
# Calculate start and end times for this segment
start_time = i * segment_length_ms
end_time = min((i + 1) * segment_length_ms, len(audio))
# Split the audio into segments
for i in range(total_segments):
start_time = i * segment_length_ms
end_time = min((i + 1) * segment_length_ms, len(audio))
# Extract segment
segment = audio[start_time:end_time]
# Extract segment
segment = audio[start_time:end_time]
# Generate output filename
# Format: prefix_001.mp3 (padding with zeros ensures correct ordering)
output_filename = f"{output_prefix}_{str(i+1).zfill(3)}.mp3"
output_path = os.path.join(output_dir, output_filename)
# Generate output filename
output_filename = f"{output_prefix}_{str(i+1).zfill(3)}.mp3"
output_path = os.path.join(output_dir, output_filename)
# Export segment
segment.export(output_path, format="mp3")
# Export segment
segment.export(output_path, format="mp3")
output_files.append(output_path)
output_files.append(output_path)
logger.debug(f"Exported segment {i+1}/{total_segments}: {output_filename}")
# Optional progress indication
logger.debug(f"Exported segment {i+1}/{total_segments}: {output_filename}")
return output_files
return output_files
# Run CPU-bound audio processing in thread pool
return await asyncio.get_event_loop().run_in_executor(
None, partial(_split, input_file, segment_length_minutes, output_prefix)
)
def extract_audio(data: ContentState):
async def transcribe_audio_segment(audio_file, model):
"""Transcribe a single audio segment asynchronously"""
def _transcribe(audio_file, model):
return model.transcribe(audio_file)
return await asyncio.get_event_loop().run_in_executor(
None, partial(_transcribe, audio_file, model)
)
async def extract_audio(data: ContentState):
SPEECH_TO_TEXT_MODEL = model_manager.speech_to_text
input_audio_path = data.get("file_path")
audio_files = []
try:
audio_files = split_audio(input_audio_path)
transcriptions = []
# Split audio into segments
audio_files = await split_audio(input_audio_path)
for audio_file in audio_files:
transcriptions.append(SPEECH_TO_TEXT_MODEL.transcribe(audio_file))
# Transcribe all segments concurrently
transcribe_tasks = [
transcribe_audio_segment(audio_file, SPEECH_TO_TEXT_MODEL)
for audio_file in audio_files
]
transcriptions = await asyncio.gather(*transcribe_tasks)
return {"content": " ".join(transcriptions)}
except Exception as e:
logger.error(f"Error transcribing audio: {str(e)}")
logger.exception(e)
raise # Re-raise the exception after logging
raise
finally:
for file in audio_files:
try:
os.remove(file)
except OSError as e:
logger.error(f"Error removing temporary file {file}: {str(e)}")
# Clean up temporary files
def _cleanup(files):
for file in files:
try:
os.remove(file)
except OSError as e:
logger.error(f"Error removing temporary file {file}: {str(e)}")
await asyncio.get_event_loop().run_in_executor(
None, partial(_cleanup, audio_files)
)

View file

@ -1,3 +1,6 @@
import asyncio
from functools import partial
from docx import Document
from loguru import logger
from openpyxl import load_workbook
@ -12,252 +15,284 @@ SUPPORTED_OFFICE_TYPES = [
]
def extract_docx_content_detailed(file_path):
try:
doc = Document(file_path)
content = []
async def extract_docx_content_detailed(file_path):
"""Extract content from DOCX file"""
for paragraph in doc.paragraphs:
if not paragraph.text.strip():
continue
def _extract():
try:
doc = Document(file_path)
content = []
style = paragraph.style.name if paragraph.style else "Normal"
text = paragraph.text.strip()
for paragraph in doc.paragraphs:
if not paragraph.text.strip():
continue
# Get paragraph formatting
p_format = paragraph.paragraph_format
indent = p_format.left_indent or 0
style = paragraph.style.name if paragraph.style else "Normal"
text = paragraph.text.strip()
# Convert indent to spaces (1 level = 4 spaces)
indent_level = 0
if hasattr(indent, "pt"):
indent_level = int(indent.pt / 72) # 72 points = 1 inch
indent_spaces = " " * (indent_level * 4)
# Get paragraph formatting
p_format = paragraph.paragraph_format
indent = p_format.left_indent or 0
# Handle different types of formatting
if "Heading" in style:
level = style[-1] if style[-1].isdigit() else "1"
heading_marks = "#" * int(level)
content.append(f"\n{heading_marks} {text}\n")
# Convert indent to spaces (1 level = 4 spaces)
indent_level = 0
if hasattr(indent, "pt"):
indent_level = int(indent.pt / 72) # 72 points = 1 inch
indent_spaces = " " * (indent_level * 4)
# Handle bullet points
elif (
paragraph.style
and hasattr(paragraph.style, "name")
and paragraph.style.name.startswith("List")
):
# Numbered list
if (
hasattr(paragraph._p, "pPr")
and paragraph._p.pPr is not None
and hasattr(paragraph._p.pPr, "numPr")
and paragraph._p.pPr.numPr is not None
# Handle different types of formatting
if "Heading" in style:
level = style[-1] if style[-1].isdigit() else "1"
heading_marks = "#" * int(level)
content.append(f"\n{heading_marks} {text}\n")
# Handle bullet points
elif (
paragraph.style
and hasattr(paragraph.style, "name")
and paragraph.style.name.startswith("List")
):
# Try to get the actual number
try:
if (
hasattr(paragraph._p.pPr.numPr, "numId")
and paragraph._p.pPr.numPr.numId is not None
and hasattr(paragraph._p.pPr.numPr.numId, "val")
):
number = paragraph._p.pPr.numPr.numId.val
content.append(f"{indent_spaces}{number}. {text}")
else:
# Numbered list
if (
hasattr(paragraph._p, "pPr")
and paragraph._p.pPr is not None
and hasattr(paragraph._p.pPr, "numPr")
and paragraph._p.pPr.numPr is not None
):
# Try to get the actual number
try:
if (
hasattr(paragraph._p.pPr.numPr, "numId")
and paragraph._p.pPr.numPr.numId is not None
and hasattr(paragraph._p.pPr.numPr.numId, "val")
):
number = paragraph._p.pPr.numPr.numId.val
content.append(f"{indent_spaces}{number}. {text}")
else:
content.append(f"{indent_spaces}1. {text}")
except Exception:
content.append(f"{indent_spaces}1. {text}")
except Exception:
content.append(f"{indent_spaces}1. {text}")
# Bullet list
else:
content.append(f"{indent_spaces}* {text}")
else:
# Handle text formatting
formatted_text = []
for run in paragraph.runs:
if run.bold:
formatted_text.append(f"**{run.text}**")
elif run.italic:
formatted_text.append(f"*{run.text}*")
# Bullet list
else:
formatted_text.append(run.text)
content.append(f"{indent_spaces}* {text}")
content.append(f"{indent_spaces}{''.join(formatted_text)}")
else:
# Handle text formatting
formatted_text = []
for run in paragraph.runs:
if run.bold:
formatted_text.append(f"**{run.text}**")
elif run.italic:
formatted_text.append(f"*{run.text}*")
else:
formatted_text.append(run.text)
return "\n\n".join(content)
content.append(f"{indent_spaces}{''.join(formatted_text)}")
except Exception as e:
logger.error(f"Failed to extract DOCX content: {e}")
return None
return "\n\n".join(content)
except Exception as e:
logger.error(f"Failed to extract DOCX content: {e}")
return None
return await asyncio.get_event_loop().run_in_executor(None, _extract)
# Example of usage with metadata
def get_docx_info(file_path):
try:
doc = Document(file_path)
async def get_docx_info(file_path):
"""Get DOCX metadata and content"""
# Extract core properties if available
core_props = {
"author": doc.core_properties.author,
"created": doc.core_properties.created,
"modified": doc.core_properties.modified,
"title": doc.core_properties.title,
"subject": doc.core_properties.subject,
"keywords": doc.core_properties.keywords,
"category": doc.core_properties.category,
"comments": doc.core_properties.comments,
}
def _get_info():
try:
doc = Document(file_path)
# Get document content
content = extract_docx_content_detailed(file_path)
# Extract core properties if available
core_props = {
"author": doc.core_properties.author,
"created": doc.core_properties.created,
"modified": doc.core_properties.modified,
"title": doc.core_properties.title,
"subject": doc.core_properties.subject,
"keywords": doc.core_properties.keywords,
"category": doc.core_properties.category,
"comments": doc.core_properties.comments,
}
# Get document statistics
stats = {
"paragraph_count": len(doc.paragraphs),
"word_count": sum(
len(p.text.split()) for p in doc.paragraphs if p.text.strip()
),
"character_count": sum(
len(p.text) for p in doc.paragraphs if p.text.strip()
),
}
# Get document content
content = extract_docx_content_detailed(file_path)
return {"metadata": core_props, "content": content, "statistics": stats}
# Get document statistics
stats = {
"paragraph_count": len(doc.paragraphs),
"word_count": sum(
len(p.text.split()) for p in doc.paragraphs if p.text.strip()
),
"character_count": sum(
len(p.text) for p in doc.paragraphs if p.text.strip()
),
}
except Exception as e:
logger.error(f"Failed to get DOCX info: {e}")
return None
return {"metadata": core_props, "content": content, "statistics": stats}
except Exception as e:
logger.error(f"Failed to get DOCX info: {e}")
return None
return await asyncio.get_event_loop().run_in_executor(None, _get_info)
def extract_pptx_content(file_path):
try:
prs = Presentation(file_path)
content = []
async def extract_pptx_content(file_path):
"""Extract content from PPTX file"""
for slide_number, slide in enumerate(prs.slides, 1):
content.append(f"\n# Slide {slide_number}\n")
def _extract():
try:
prs = Presentation(file_path)
content = []
# Extract title
if slide.shapes.title:
content.append(f"## {slide.shapes.title.text}\n")
for slide_number, slide in enumerate(prs.slides, 1):
content.append(f"\n# Slide {slide_number}\n")
# Extract text from all shapes
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
if shape != slide.shapes.title: # Skip title as it's already added
content.append(shape.text.strip())
# Extract title
if slide.shapes.title:
content.append(f"## {slide.shapes.title.text}\n")
return "\n\n".join(content)
# Extract text from all shapes
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
if (
shape != slide.shapes.title
): # Skip title as it's already added
content.append(shape.text.strip())
except Exception as e:
logger.error(f"Failed to extract PPTX content: {e}")
return None
return "\n\n".join(content)
except Exception as e:
logger.error(f"Failed to extract PPTX content: {e}")
return None
return await asyncio.get_event_loop().run_in_executor(None, _extract)
def extract_xlsx_content(file_path, max_rows=1000, max_cols=100):
try:
wb = load_workbook(file_path, data_only=True)
content = []
async def extract_xlsx_content(file_path, max_rows=10000, max_cols=100):
"""Extract content from XLSX file"""
for sheet in wb.sheetnames:
ws = wb[sheet]
content.append(f"\n# Sheet: {sheet}\n")
def _extract():
try:
wb = load_workbook(file_path, data_only=True)
content = []
# Get the maximum row and column with data
max_row = min(ws.max_row, max_rows)
max_col = min(ws.max_column, max_cols)
for sheet in wb.sheetnames:
ws = wb[sheet]
content.append(f"\n# Sheet: {sheet}\n")
# Create markdown table header
headers = []
for col in range(1, max_col + 1):
cell_value = ws.cell(row=1, column=col).value
headers.append(str(cell_value) if cell_value is not None else "")
# Get the maximum row and column with data
max_row = min(ws.max_row, max_rows)
max_col = min(ws.max_column, max_cols)
content.append("| " + " | ".join(headers) + " |")
content.append("| " + " | ".join(["---"] * len(headers)) + " |")
# Add table content
for row in range(2, max_row + 1):
row_data = []
# Create markdown table header
headers = []
for col in range(1, max_col + 1):
cell_value = ws.cell(row=row, column=col).value
row_data.append(str(cell_value) if cell_value is not None else "")
content.append("| " + " | ".join(row_data) + " |")
cell_value = ws.cell(row=1, column=col).value
headers.append(str(cell_value) if cell_value is not None else "")
return "\n".join(content)
content.append("| " + " | ".join(headers) + " |")
content.append("| " + " | ".join(["---"] * len(headers)) + " |")
except Exception as e:
logger.error(f"Failed to extract XLSX content: {e}")
return None
# Add table content
for row in range(2, max_row + 1):
row_data = []
for col in range(1, max_col + 1):
cell_value = ws.cell(row=row, column=col).value
row_data.append(
str(cell_value) if cell_value is not None else ""
)
content.append("| " + " | ".join(row_data) + " |")
return "\n".join(content)
except Exception as e:
logger.error(f"Failed to extract XLSX content: {e}")
return None
return await asyncio.get_event_loop().run_in_executor(None, partial(_extract))
def get_pptx_info(file_path):
try:
prs = Presentation(file_path)
async def get_pptx_info(file_path):
"""Get PPTX metadata and content"""
# Extract basic properties
props = {
"slide_count": len(prs.slides),
"title": "", # PowerPoint doesn't have built-in metadata like Word
}
def _get_info():
try:
prs = Presentation(file_path)
# Get document content
content = extract_pptx_content(file_path)
# Extract basic properties
props = {
"slide_count": len(prs.slides),
"title": "", # PowerPoint doesn't have built-in metadata like Word
}
# Get presentation statistics
stats = {
"slide_count": len(prs.slides),
"shape_count": sum(len(slide.shapes) for slide in prs.slides),
"text_frame_count": sum(
sum(1 for shape in slide.shapes if hasattr(shape, "text"))
for slide in prs.slides
),
}
# Get document content
content = extract_pptx_content(file_path)
return {"metadata": props, "content": content, "statistics": stats}
# Get presentation statistics
stats = {
"slide_count": len(prs.slides),
"shape_count": sum(len(slide.shapes) for slide in prs.slides),
"text_frame_count": sum(
sum(1 for shape in slide.shapes if hasattr(shape, "text"))
for slide in prs.slides
),
}
except Exception as e:
logger.error(f"Failed to get PPTX info: {e}")
return None
return {"metadata": props, "content": content, "statistics": stats}
except Exception as e:
logger.error(f"Failed to get PPTX info: {e}")
return None
return await asyncio.get_event_loop().run_in_executor(None, _get_info)
def get_xlsx_info(file_path):
try:
wb = load_workbook(file_path, data_only=True)
async def get_xlsx_info(file_path):
"""Get XLSX metadata and content"""
# Extract basic properties
props = {
"sheet_count": len(wb.sheetnames),
"sheets": wb.sheetnames,
"title": wb.properties.title,
"creator": wb.properties.creator,
"created": wb.properties.created,
"modified": wb.properties.modified,
}
def _get_info():
try:
wb = load_workbook(file_path, data_only=True)
# Get document content
content = extract_xlsx_content(file_path)
# Extract basic properties
props = {
"sheet_count": len(wb.sheetnames),
"sheets": wb.sheetnames,
"title": wb.properties.title,
"creator": wb.properties.creator,
"created": wb.properties.created,
"modified": wb.properties.modified,
}
# Get workbook statistics
stats = {
"sheet_count": len(wb.sheetnames),
"total_rows": sum(sheet.max_row for sheet in wb.worksheets),
"total_columns": sum(sheet.max_column for sheet in wb.worksheets),
}
# Get document content
content = extract_xlsx_content(file_path)
return {"metadata": props, "content": content, "statistics": stats}
# Get workbook statistics
stats = {
"sheet_count": len(wb.sheetnames),
"total_rows": sum(sheet.max_row for sheet in wb.worksheets),
"total_columns": sum(sheet.max_column for sheet in wb.worksheets),
}
except Exception as e:
logger.error(f"Failed to get XLSX info: {e}")
return None
return {"metadata": props, "content": content, "statistics": stats}
except Exception as e:
logger.error(f"Failed to get XLSX info: {e}")
return None
return await asyncio.get_event_loop().run_in_executor(None, _get_info)
def extract_office_content(state: ContentState):
async def extract_office_content(state: ContentState):
"""Universal function to extract content from Office files"""
assert state.get("file_path"), "No file path provided"
assert (
state.get("identified_type") in SUPPORTED_OFFICE_TYPES
), "Unsupported File Type"
file_path = state["file_path"]
doc_type = state["identified_type"]
@ -266,24 +301,23 @@ def extract_office_content(state: ContentState):
== "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
):
logger.debug("Extracting content from DOCX file")
content = extract_docx_content_detailed(file_path)
info = get_docx_info(file_path)
content = await extract_docx_content_detailed(file_path)
info = await get_docx_info(file_path)
elif (
doc_type
== "application/vnd.openxmlformats-officedocument.presentationml.presentation"
):
logger.debug("Extracting content from PPTX file")
content = extract_pptx_content(file_path)
info = get_pptx_info(file_path)
content = await extract_pptx_content(file_path)
info = await get_pptx_info(file_path)
elif (
doc_type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
):
logger.debug("Extracting content from XLSX file")
content = extract_xlsx_content(file_path)
info = get_xlsx_info(file_path)
content = await extract_xlsx_content(file_path)
info = await get_xlsx_info(file_path)
else:
raise Exception(f"Unsupported file format: {doc_type}")
del info["content"]
return {"content": content, "metadata": info}

View file

@ -1,3 +1,4 @@
import asyncio
import re
import unicodedata
@ -114,7 +115,7 @@ def clean_pdf_text(text):
return text.strip()
def _extract_text_from_pdf(pdf_path):
async def _extract_text_from_pdf(pdf_path):
doc = fitz.open(pdf_path)
try:
text = ""
@ -127,20 +128,39 @@ def _extract_text_from_pdf(pdf_path):
doc.close()
def extract_pdf(state: ContentState):
async def _extract_text_from_pdf(pdf_path):
"""Extract text from PDF asynchronously"""
def _extract():
doc = fitz.open(pdf_path)
try:
text = ""
logger.debug(f"Found {len(doc)} pages in PDF")
for page in doc:
text += page.get_text()
return clean_pdf_text(text)
finally:
doc.close()
# Run CPU-bound PDF processing in a thread pool
return await asyncio.get_event_loop().run_in_executor(None, _extract)
async def extract_pdf(state: ContentState):
"""
Parse the text file and print its content.
Parse the PDF file and extract its content asynchronously.
"""
return_dict = {}
assert state.get("file_path"), "No file path provided"
assert state.get("identified_type") in SUPPORTED_FITZ_TYPES, "Unsupported File Type"
if (
state.get("file_path") is not None
and state.get("identified_type") in SUPPORTED_FITZ_TYPES
):
file_path = state.get("file_path")
try:
text = _extract_text_from_pdf(file_path)
text = await _extract_text_from_pdf(file_path)
return_dict["content"] = text
except FileNotFoundError:
raise FileNotFoundError(f"File not found at {file_path}")

View file

@ -1,11 +1,13 @@
import asyncio
from loguru import logger
from open_notebook.graphs.content_processing.state import ContentState
def extract_txt(state: ContentState):
async def extract_txt(state: ContentState):
"""
Parse the text file and print its content.
Parse the text file and extract its content asynchronously.
"""
return_dict = {}
if (
@ -14,12 +16,22 @@ def extract_txt(state: ContentState):
):
logger.debug(f"Extracting text from {state.get('file_path')}")
file_path = state.get("file_path")
if file_path is not None:
try:
with open(file_path, "r", encoding="utf-8") as file:
content = file.read()
logger.debug(f"Extracted: {content[:100]}")
return_dict["content"] = content
def _read_file():
with open(file_path, "r", encoding="utf-8") as file:
return file.read()
# Run file I/O in thread pool
content = await asyncio.get_event_loop().run_in_executor(
None, _read_file
)
logger.debug(f"Extracted: {content[:100]}")
return_dict["content"] = content
except FileNotFoundError:
raise FileNotFoundError(f"File not found at {file_path}")
except Exception as e:

View file

@ -1,7 +1,7 @@
import re
from urllib.parse import urlparse
import requests # type: ignore
import aiohttp
from bs4 import BeautifulSoup, Comment
from loguru import logger
@ -29,7 +29,7 @@ def url_provider(state: ContentState):
return return_dict
def extract_url_bs4(url: str):
async def extract_url_bs4(url: str):
"""
Get the title and content of a URL using bs4
"""
@ -42,9 +42,10 @@ def extract_url_bs4(url: str):
if url.startswith("<!DOCTYPE html>") or url.startswith("<html"):
html_content = url
else:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
html_content = response.text
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=10) as response:
response.raise_for_status()
html_content = await response.text()
soup = BeautifulSoup(html_content, "html.parser")
@ -143,7 +144,7 @@ def extract_url_bs4(url: str):
"url": url if not url.startswith("<!DOCTYPE html>") else None,
}
except requests.exceptions.RequestException as e:
except aiohttp.ClientError as e:
logger.error(f"Failed to fetch URL {url}: {e}")
return None
except Exception as e:
@ -151,38 +152,38 @@ def extract_url_bs4(url: str):
return None
def extract_url_jina(url: str):
async def extract_url_jina(url: str):
"""
Get the content of a URL using Jina
"""
response = requests.get(f"https://r.jina.ai/{url}")
text = response.text
if text.startswith("Title:") and "\n" in text:
title_end = text.index("\n")
title = text[6:title_end].strip()
content = text[title_end + 1 :].strip()
logger.debug(
f"Processed url: {url}, found title: {title}, content: {content[:100]}..."
)
return {"title": title, "content": content}
else:
content = text
logger.debug(
f"Processed url: {url}, does not have Title prefix, returning full content: {content[:100]}..."
)
return {"content": text}
async with aiohttp.ClientSession() as session:
async with session.get(f"https://r.jina.ai/{url}") as response:
text = await response.text()
if text.startswith("Title:") and "\n" in text:
title_end = text.index("\n")
title = text[6:title_end].strip()
content = text[title_end + 1 :].strip()
logger.debug(
f"Processed url: {url}, found title: {title}, content: {content[:100]}..."
)
return {"title": title, "content": content}
else:
logger.debug(
f"Processed url: {url}, does not have Title prefix, returning full content: {text[:100]}..."
)
return {"content": text}
def extract_url(state: ContentState):
async def extract_url(state: ContentState):
assert state.get("url"), "No URL provided"
url = state["url"]
try:
result = extract_url_bs4(url)
result = await extract_url_bs4(url)
if not result or not result.get("content"):
logger.debug(
f"BS4 extraction failed for url {url}, falling back to Jina extractor"
)
result = extract_url_jina(url)
result = await extract_url_jina(url)
return result
except Exception as e:
logger.error(f"URL extraction failed for URL: {url}")

View file

@ -1,114 +1,141 @@
import asyncio
import json
import os
import subprocess
from functools import partial
from loguru import logger
from open_notebook.graphs.content_processing.state import ContentState
def extract_audio_from_video(input_file, output_file, stream_index):
async def extract_audio_from_video(input_file, output_file, stream_index):
"""
Extract the specified audio stream to MP3 format
Extract the specified audio stream to MP3 format asynchronously
"""
try:
cmd = [
"ffmpeg",
"-i",
input_file,
"-map",
f"0:a:{stream_index}", # Select specific audio stream
"-codec:a",
"libmp3lame", # Use MP3 codec
"-q:a",
"2", # High quality setting
"-y", # Overwrite output file if exists
output_file,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"FFmpeg failed: {result.stderr}")
def _extract(input_file, output_file, stream_index):
try:
cmd = [
"ffmpeg",
"-i",
input_file,
"-map",
f"0:a:{stream_index}", # Select specific audio stream
"-codec:a",
"libmp3lame", # Use MP3 codec
"-q:a",
"2", # High quality setting
"-y", # Overwrite output file if exists
output_file,
]
return True
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"FFmpeg failed: {result.stderr}")
except Exception as e:
print(f"Error extracting audio: {str(e)}")
return False
return True
except Exception as e:
logger.error(f"Error extracting audio: {str(e)}")
return False
return await asyncio.get_event_loop().run_in_executor(
None, partial(_extract, input_file, output_file, stream_index)
)
def get_audio_streams(input_file):
async def get_audio_streams(input_file):
"""
Analyze video file and return information about all audio streams
Analyze video file and return information about all audio streams asynchronously
"""
logger.debug(f"Analyzing video file {input_file} for audio streams")
try:
# Get stream information in JSON format
cmd = [
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-select_streams",
"a",
input_file,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"FFprobe failed: {result.stderr}")
def _analyze(input_file):
logger.debug(f"Analyzing video file {input_file} for audio streams")
try:
cmd = [
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-select_streams",
"a",
input_file,
]
data = json.loads(result.stdout)
return data.get("streams", [])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"FFprobe failed: {result.stderr}")
except Exception as e:
print(f"Error analyzing file: {str(e)}")
return []
data = json.loads(result.stdout)
return data.get("streams", [])
except Exception as e:
logger.error(f"Error analyzing file: {str(e)}")
return []
return await asyncio.get_event_loop().run_in_executor(
None, partial(_analyze, input_file)
)
def select_best_audio_stream(streams):
async def select_best_audio_stream(streams):
"""
Select the best audio stream based on various quality metrics
"""
if not streams:
logger.debug("No audio streams found")
return None
else:
logger.debug(f"Found {len(streams)} audio streams")
# Score each stream based on various factors
scored_streams = []
for stream in streams:
score = 0
def _select(streams):
if not streams:
logger.debug("No audio streams found")
return None
else:
logger.debug(f"Found {len(streams)} audio streams")
# Prefer higher bit rates
bit_rate = stream.get("bit_rate")
if bit_rate:
score += int(int(bit_rate) / 1000000) # Convert to Mbps and ensure int
# Score each stream based on various factors
scored_streams = []
for stream in streams:
score = 0
# Prefer more channels (stereo over mono)
channels = stream.get("channels", 0)
score += channels * 10
# Prefer higher bit rates
bit_rate = stream.get("bit_rate")
if bit_rate:
score += int(int(bit_rate) / 1000000) # Convert to Mbps and ensure int
# Prefer higher sample rates
sample_rate = stream.get("sample_rate", "0")
score += int(int(sample_rate) / 48000)
# Prefer more channels (stereo over mono)
channels = stream.get("channels", 0)
score += channels * 10
scored_streams.append((score, stream))
# Prefer higher sample rates
sample_rate = stream.get("sample_rate", "0")
score += int(int(sample_rate) / 48000)
# Return the stream with highest score
return max(scored_streams, key=lambda x: x[0])[1]
scored_streams.append((score, stream))
# Return the stream with highest score
return max(scored_streams, key=lambda x: x[0])[1]
return await asyncio.get_event_loop().run_in_executor(
None, partial(_select, streams)
)
def extract_best_audio_from_video(data: ContentState):
async def extract_best_audio_from_video(data: ContentState):
"""
Main function to extract the best audio stream from a video file
Main function to extract the best audio stream from a video file asynchronously
"""
input_file = data.get("file_path")
assert input_file is not None, "Input file path must be provided"
if not os.path.exists(input_file):
def _check_file(path):
return os.path.exists(path)
file_exists = await asyncio.get_event_loop().run_in_executor(
None, partial(_check_file, input_file)
)
if not file_exists:
logger.critical(f"Input file not found: {input_file}")
return False
@ -116,20 +143,20 @@ def extract_best_audio_from_video(data: ContentState):
output_file = f"{base_name}_audio.mp3"
# Get all audio streams
streams = get_audio_streams(input_file)
streams = await get_audio_streams(input_file)
if not streams:
logger.debug("No audio streams found in the file")
return False
# Select best stream
best_stream = select_best_audio_stream(streams)
best_stream = await select_best_audio_stream(streams)
if not best_stream:
logger.error("Could not determine best audio stream")
return False
# Extract the selected stream
stream_index = streams.index(best_stream)
success = extract_audio_from_video(input_file, output_file, stream_index)
success = await extract_audio_from_video(input_file, output_file, stream_index)
if success:
logger.debug(f"Successfully extracted audio to: {output_file}")

View file

@ -1,7 +1,7 @@
import re
import ssl
import requests
import aiohttp
from bs4 import BeautifulSoup
from loguru import logger
from youtube_transcript_api import YouTubeTranscriptApi # type: ignore
@ -14,11 +14,15 @@ from open_notebook.graphs.content_processing.state import ContentState
ssl._create_default_https_context = ssl._create_unverified_context
def get_video_title(video_id):
async def get_video_title(video_id):
try:
url = f"https://www.youtube.com/watch?v={video_id}"
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
html = await response.text()
# BeautifulSoup doesn't support async operations
soup = BeautifulSoup(html, "html.parser")
# YouTube stores title in a meta tag
title = soup.find("meta", property="og:title")["content"]
@ -63,7 +67,7 @@ def _extract_youtube_id(url):
return match.group(1) if match else None
def get_best_transcript(video_id, preferred_langs=["en", "es", "pt"]):
async def get_best_transcript(video_id, preferred_langs=["en", "es", "pt"]):
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
@ -129,7 +133,7 @@ def get_best_transcript(video_id, preferred_langs=["en", "es", "pt"]):
return None
def extract_youtube_transcript(state: ContentState):
async def extract_youtube_transcript(state: ContentState):
"""
Parse the text file and print its content.
"""
@ -139,12 +143,12 @@ def extract_youtube_transcript(state: ContentState):
)
video_id = _extract_youtube_id(state.get("url"))
transcript = get_best_transcript(video_id, languages)
transcript = await get_best_transcript(video_id, languages)
logger.debug(f"Found transcript: {transcript}")
formatter = TextFormatter()
try:
title = get_video_title(video_id)
title = await get_video_title(video_id)
except Exception as e:
logger.critical(f"Failed to get video title for video_id: {video_id}")
logger.exception(e)