mirror of
https://github.com/lfnovo/open-notebook.git
synced 2026-04-29 20:10:07 +00:00
reorg content graph
This commit is contained in:
parent
3f997aa22c
commit
669891617b
4 changed files with 281 additions and 0 deletions
136
open_notebook/graphs/content_processing/__init__.py
Normal file
136
open_notebook/graphs/content_processing/__init__.py
Normal file
|
|
@ -0,0 +1,136 @@
|
|||
import os
|
||||
|
||||
import magic
|
||||
from langgraph.graph import END, START, StateGraph
|
||||
from loguru import logger
|
||||
|
||||
from open_notebook.exceptions import UnsupportedTypeException
|
||||
from open_notebook.graphs.content_processing.audio import extract_audio
|
||||
from open_notebook.graphs.content_processing.office import (
|
||||
SUPPORTED_OFFICE_TYPES,
|
||||
extract_office_content,
|
||||
)
|
||||
from open_notebook.graphs.content_processing.pdf import (
|
||||
SUPPORTED_FITZ_TYPES,
|
||||
extract_pdf,
|
||||
)
|
||||
from open_notebook.graphs.content_processing.state import SourceState
|
||||
from open_notebook.graphs.content_processing.text import extract_txt
|
||||
from open_notebook.graphs.content_processing.url import extract_url, url_provider
|
||||
from open_notebook.graphs.content_processing.video import extract_best_audio_from_video
|
||||
from open_notebook.graphs.content_processing.youtube import extract_youtube_transcript
|
||||
|
||||
|
||||
def source_identification(state: SourceState):
|
||||
"""
|
||||
Identify the content source based on parameters
|
||||
"""
|
||||
if state.get("content"):
|
||||
doc_type = "text"
|
||||
elif state.get("file_path"):
|
||||
doc_type = "file"
|
||||
elif state.get("url"):
|
||||
doc_type = "url"
|
||||
else:
|
||||
raise ValueError("No source provided.")
|
||||
|
||||
return {"source_type": doc_type}
|
||||
|
||||
|
||||
def file_type(state: SourceState):
|
||||
"""
|
||||
Identify the file using python-magic
|
||||
"""
|
||||
return_dict = {}
|
||||
file_path = state.get("file_path")
|
||||
if file_path is not None:
|
||||
return_dict["identified_type"] = magic.from_file(file_path, mime=True)
|
||||
return return_dict
|
||||
|
||||
|
||||
# def _get_title(url):
|
||||
# """
|
||||
# Get the content of a URL
|
||||
# """
|
||||
# response = extract_url(dict(url=url))
|
||||
# if "title" in response:
|
||||
# return response["title"]
|
||||
|
||||
|
||||
def file_type_edge(data: SourceState):
|
||||
assert data.get("identified_type"), "Type not identified"
|
||||
identified_type = data["identified_type"]
|
||||
|
||||
if identified_type == "text/plain":
|
||||
return "extract_txt"
|
||||
elif identified_type in SUPPORTED_FITZ_TYPES:
|
||||
return "extract_pdf"
|
||||
elif identified_type in SUPPORTED_OFFICE_TYPES:
|
||||
return "extract_office_content"
|
||||
elif identified_type.startswith("video"):
|
||||
return "extract_best_audio_from_video"
|
||||
elif identified_type.startswith("audio"):
|
||||
return "extract_audio"
|
||||
else:
|
||||
raise UnsupportedTypeException(
|
||||
f"Unsupported file type: {data.get('identified_type')}"
|
||||
)
|
||||
|
||||
|
||||
def delete_file(data: SourceState):
|
||||
if data.get("delete_source"):
|
||||
logger.debug(f"Deleting file: {data.get('file_path')}")
|
||||
file_path = data.get("file_path")
|
||||
if file_path is not None:
|
||||
try:
|
||||
os.remove(file_path)
|
||||
return {"file_path": None}
|
||||
except FileNotFoundError:
|
||||
logger.warning(f"File not found while trying to delete: {file_path}")
|
||||
else:
|
||||
logger.debug("Not deleting file")
|
||||
|
||||
|
||||
workflow = StateGraph(SourceState)
|
||||
workflow.add_node("source", source_identification)
|
||||
workflow.add_node("url_provider", url_provider)
|
||||
workflow.add_node("file_type", file_type)
|
||||
workflow.add_node("extract_txt", extract_txt)
|
||||
workflow.add_node("extract_pdf", extract_pdf)
|
||||
workflow.add_node("extract_url", extract_url)
|
||||
workflow.add_node("extract_office_content", extract_office_content)
|
||||
workflow.add_node("extract_best_audio_from_video", extract_best_audio_from_video)
|
||||
workflow.add_node("extract_audio", extract_audio)
|
||||
workflow.add_node("extract_youtube_transcript", extract_youtube_transcript)
|
||||
workflow.add_node("delete_file", delete_file)
|
||||
workflow.add_edge(START, "source")
|
||||
workflow.add_conditional_edges(
|
||||
"source",
|
||||
lambda x: x.get("source_type"),
|
||||
{
|
||||
"url": "url_provider",
|
||||
"file": "file_type",
|
||||
"text": END,
|
||||
},
|
||||
)
|
||||
workflow.add_conditional_edges(
|
||||
"file_type",
|
||||
file_type_edge,
|
||||
)
|
||||
workflow.add_conditional_edges(
|
||||
"url_provider",
|
||||
lambda x: x.get("identified_type"),
|
||||
{"article": "extract_url", "youtube": "extract_youtube_transcript"},
|
||||
)
|
||||
workflow.add_edge("url_provider", END)
|
||||
workflow.add_edge("file_type", END)
|
||||
workflow.add_edge("extract_url", END)
|
||||
workflow.add_edge("extract_txt", END)
|
||||
workflow.add_edge("extract_youtube_transcript", END)
|
||||
|
||||
workflow.add_edge("extract_pdf", "delete_file")
|
||||
workflow.add_edge("extract_office_content", "delete_file")
|
||||
workflow.add_edge("extract_best_audio_from_video", "extract_audio")
|
||||
workflow.add_edge("extract_audio", "delete_file")
|
||||
workflow.add_edge("delete_file", END)
|
||||
graph = workflow.compile()
|
||||
104
open_notebook/graphs/content_processing/audio.py
Normal file
104
open_notebook/graphs/content_processing/audio.py
Normal file
|
|
@ -0,0 +1,104 @@
|
|||
import os
|
||||
from math import ceil
|
||||
|
||||
from loguru import logger
|
||||
from pydub import AudioSegment
|
||||
|
||||
from open_notebook.graphs.content_processing.state import SourceState
|
||||
|
||||
# todo: add a speechtotext model to the config
|
||||
# future: parallelize the transcription process
|
||||
|
||||
|
||||
def split_audio(input_file, segment_length_minutes=15, output_prefix=None):
|
||||
"""
|
||||
Split an audio file into segments of specified length.
|
||||
|
||||
Args:
|
||||
input_file (str): Path to the input audio file
|
||||
segment_length_minutes (int): Length of each segment in minutes
|
||||
output_dir (str): Directory to save the segments (defaults to input file's directory)
|
||||
output_prefix (str): Prefix for output files (defaults to input filename)
|
||||
|
||||
Returns:
|
||||
list: List of paths to the created segment files
|
||||
"""
|
||||
# Convert input file to absolute path
|
||||
input_file = os.path.abspath(input_file)
|
||||
|
||||
output_dir = os.path.dirname(input_file)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Set up output prefix
|
||||
if output_prefix is None:
|
||||
output_prefix = os.path.splitext(os.path.basename(input_file))[0]
|
||||
|
||||
# Load the audio file
|
||||
audio = AudioSegment.from_file(input_file)
|
||||
|
||||
# Calculate segment length in milliseconds
|
||||
segment_length_ms = segment_length_minutes * 60 * 1000
|
||||
|
||||
# Calculate number of segments
|
||||
total_segments = ceil(len(audio) / segment_length_ms)
|
||||
logger.debug(f"Splitting file: {input_file} into {total_segments} segments")
|
||||
|
||||
# List to store output file paths
|
||||
output_files = []
|
||||
|
||||
# Split the audio into segments
|
||||
for i in range(total_segments):
|
||||
# Calculate start and end times for this segment
|
||||
start_time = i * segment_length_ms
|
||||
end_time = min((i + 1) * segment_length_ms, len(audio))
|
||||
|
||||
# Extract segment
|
||||
segment = audio[start_time:end_time]
|
||||
|
||||
# Generate output filename
|
||||
# Format: prefix_001.mp3 (padding with zeros ensures correct ordering)
|
||||
output_filename = f"{output_prefix}_{str(i+1).zfill(3)}.mp3"
|
||||
output_path = os.path.join(output_dir, output_filename)
|
||||
|
||||
# Export segment
|
||||
segment.export(output_path, format="mp3")
|
||||
|
||||
output_files.append(output_path)
|
||||
|
||||
# Optional progress indication
|
||||
logger.debug(f"Exported segment {i+1}/{total_segments}: {output_filename}")
|
||||
|
||||
return output_files
|
||||
|
||||
|
||||
def extract_audio(data: SourceState):
|
||||
input_audio_path = data.get("file_path")
|
||||
from openai import OpenAI
|
||||
|
||||
client = OpenAI()
|
||||
audio_files = []
|
||||
|
||||
try:
|
||||
audio_files = split_audio(input_audio_path)
|
||||
transcriptions = []
|
||||
|
||||
for audio_file in audio_files:
|
||||
with open(audio_file, "rb") as audio:
|
||||
transcription = client.audio.transcriptions.create(
|
||||
model="whisper-1", file=audio
|
||||
)
|
||||
transcriptions.append(transcription.text)
|
||||
|
||||
return {"content": " ".join(transcriptions)}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error transcribing audio: {str(e)}")
|
||||
logger.exception(e)
|
||||
raise # Re-raise the exception after logging
|
||||
|
||||
finally:
|
||||
for file in audio_files:
|
||||
try:
|
||||
os.remove(file)
|
||||
except OSError as e:
|
||||
logger.error(f"Error removing temporary file {file}: {str(e)}")
|
||||
13
open_notebook/graphs/content_processing/state.py
Normal file
13
open_notebook/graphs/content_processing/state.py
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
from typing_extensions import TypedDict
|
||||
|
||||
|
||||
class SourceState(TypedDict):
|
||||
content: str
|
||||
file_path: str
|
||||
url: str
|
||||
title: str
|
||||
source_type: str
|
||||
identified_type: str
|
||||
identified_provider: str
|
||||
metadata: dict
|
||||
delete_source: bool = False
|
||||
28
open_notebook/graphs/content_processing/text.py
Normal file
28
open_notebook/graphs/content_processing/text.py
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
from loguru import logger
|
||||
|
||||
from open_notebook.graphs.content_processing.state import SourceState
|
||||
|
||||
|
||||
def extract_txt(state: SourceState):
|
||||
"""
|
||||
Parse the text file and print its content.
|
||||
"""
|
||||
return_dict = {}
|
||||
if (
|
||||
state.get("file_path") is not None
|
||||
and state.get("identified_type") == "text/plain"
|
||||
):
|
||||
logger.debug(f"Extracting text from {state.get('file_path')}")
|
||||
file_path = state.get("file_path")
|
||||
if file_path is not None:
|
||||
try:
|
||||
with open(file_path, "r", encoding="utf-8") as file:
|
||||
content = file.read()
|
||||
logger.debug(f"Extracted: {content[:100]}")
|
||||
return_dict["content"] = content
|
||||
except FileNotFoundError:
|
||||
raise FileNotFoundError(f"File not found at {file_path}")
|
||||
except Exception as e:
|
||||
raise Exception(f"An error occurred: {e}")
|
||||
|
||||
return return_dict
|
||||
Loading…
Add table
Add a link
Reference in a new issue