feat(youtube): integrate YouTube video processing connector

- Added support for processing YouTube videos, including transcript extraction and document creation.
- Implemented a new background task for adding YouTube video documents.
- Enhanced the connector service to search for YouTube videos and return relevant results.
- Updated frontend components to include YouTube video options in the dashboard and connector sources.
- Added necessary dependencies for YouTube transcript API.
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-04-11 15:05:17 -07:00
parent 753f40dfea
commit b43272a115
13 changed files with 608 additions and 18 deletions

View file

@ -244,3 +244,142 @@ async def add_received_file_document(
except Exception as e:
await session.rollback()
raise RuntimeError(f"Failed to process file document: {str(e)}")
async def add_youtube_video_document(
session: AsyncSession,
url: str,
search_space_id: int
):
"""
Process a YouTube video URL, extract transcripts, and add as document.
"""
try:
from youtube_transcript_api import YouTubeTranscriptApi
# Extract video ID from URL
def get_youtube_video_id(url: str):
from urllib.parse import urlparse, parse_qs
parsed_url = urlparse(url)
hostname = parsed_url.hostname
if hostname == "youtu.be":
return parsed_url.path[1:]
if hostname in ("www.youtube.com", "youtube.com"):
if parsed_url.path == "/watch":
query_params = parse_qs(parsed_url.query)
return query_params.get("v", [None])[0]
if parsed_url.path.startswith("/embed/"):
return parsed_url.path.split("/")[2]
if parsed_url.path.startswith("/v/"):
return parsed_url.path.split("/")[2]
return None
# Get video ID
video_id = get_youtube_video_id(url)
if not video_id:
raise ValueError(f"Could not extract video ID from URL: {url}")
# Get video metadata
import json
from urllib.parse import urlencode
from urllib.request import urlopen
params = {"format": "json", "url": f"https://www.youtube.com/watch?v={video_id}"}
oembed_url = "https://www.youtube.com/oembed"
query_string = urlencode(params)
full_url = oembed_url + "?" + query_string
with urlopen(full_url) as response:
response_text = response.read()
video_data = json.loads(response_text.decode())
# Get video transcript
try:
captions = YouTubeTranscriptApi.get_transcript(video_id)
# Include complete caption information with timestamps
transcript_segments = []
for line in captions:
start_time = line.get("start", 0)
duration = line.get("duration", 0)
text = line.get("text", "")
timestamp = f"[{start_time:.2f}s-{start_time + duration:.2f}s]"
transcript_segments.append(f"{timestamp} {text}")
transcript_text = "\n".join(transcript_segments)
except Exception as e:
transcript_text = f"No captions available for this video. Error: {str(e)}"
# Format document metadata in a more maintainable way
metadata_sections = [
("METADATA", [
f"TITLE: {video_data.get('title', 'YouTube Video')}",
f"URL: {url}",
f"VIDEO_ID: {video_id}",
f"AUTHOR: {video_data.get('author_name', 'Unknown')}",
f"THUMBNAIL: {video_data.get('thumbnail_url', '')}"
]),
("CONTENT", [
"FORMAT: transcript",
"TEXT_START",
transcript_text,
"TEXT_END"
])
]
# Build the document string more efficiently
document_parts = []
document_parts.append("<DOCUMENT>")
for section_title, section_content in metadata_sections:
document_parts.append(f"<{section_title}>")
document_parts.extend(section_content)
document_parts.append(f"</{section_title}>")
document_parts.append("</DOCUMENT>")
combined_document_string = '\n'.join(document_parts)
# Generate summary
summary_chain = SUMMARY_PROMPT_TEMPLATE | config.long_context_llm_instance
summary_result = await summary_chain.ainvoke({"document": combined_document_string})
summary_content = summary_result.content
summary_embedding = config.embedding_model_instance.embed(summary_content)
# Process chunks
chunks = [
Chunk(content=chunk.text, embedding=chunk.embedding)
for chunk in config.chunker_instance.chunk(transcript_text)
]
# Create document
from app.db import Document, DocumentType
document = Document(
title=video_data.get("title", "YouTube Video"),
document_type=DocumentType.YOUTUBE_VIDEO,
document_metadata={
"url": url,
"video_id": video_id,
"video_title": video_data.get("title", "YouTube Video"),
"author": video_data.get("author_name", "Unknown"),
"thumbnail": video_data.get("thumbnail_url", "")
},
content=summary_content,
embedding=summary_embedding,
chunks=chunks,
search_space_id=search_space_id
)
session.add(document)
await session.commit()
await session.refresh(document)
return document
except SQLAlchemyError as db_error:
await session.rollback()
raise db_error
except Exception as e:
await session.rollback()
import logging
logging.error(f"Failed to process YouTube video: {str(e)}")
raise