Merge pull request #194 from MODSetter/dev

feat(BACKEND): Added task logging for podcast generation
This commit is contained in:
Rohan Verma 2025-07-17 15:13:46 +05:30 committed by GitHub
commit 51875e5b8e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 181 additions and 75 deletions

View file

@ -30,7 +30,7 @@ async def add_crawled_url_document(
task_name="crawl_url_document", task_name="crawl_url_document",
source="background_task", source="background_task",
message=f"Starting URL crawling process for: {url}", message=f"Starting URL crawling process for: {url}",
metadata={"url": url, "user_id": user_id} metadata={"url": url, "user_id": str(user_id)}
) )
try: try:
@ -259,7 +259,7 @@ async def add_extension_received_document(
metadata={ metadata={
"url": content.metadata.VisitedWebPageURL, "url": content.metadata.VisitedWebPageURL,
"title": content.metadata.VisitedWebPageTitle, "title": content.metadata.VisitedWebPageTitle,
"user_id": user_id "user_id": str(user_id)
} }
) )
@ -392,7 +392,7 @@ async def add_received_markdown_file_document(
task_name="markdown_file_document", task_name="markdown_file_document",
source="background_task", source="background_task",
message=f"Processing markdown file: {file_name}", message=f"Processing markdown file: {file_name}",
metadata={"filename": file_name, "user_id": user_id, "content_length": len(file_in_markdown)} metadata={"filename": file_name, "user_id": str(user_id), "content_length": len(file_in_markdown)}
) )
try: try:
@ -667,7 +667,7 @@ async def add_youtube_video_document(
task_name="youtube_video_document", task_name="youtube_video_document",
source="background_task", source="background_task",
message=f"Starting YouTube video processing for: {url}", message=f"Starting YouTube video processing for: {url}",
metadata={"url": url, "user_id": user_id} metadata={"url": url, "user_id": str(user_id)}
) )
try: try:

View file

@ -50,7 +50,7 @@ async def index_slack_messages(
task_name="slack_messages_indexing", task_name="slack_messages_indexing",
source="connector_indexing_task", source="connector_indexing_task",
message=f"Starting Slack messages indexing for connector {connector_id}", message=f"Starting Slack messages indexing for connector {connector_id}",
metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
) )
try: try:
@ -412,7 +412,7 @@ async def index_notion_pages(
task_name="notion_pages_indexing", task_name="notion_pages_indexing",
source="connector_indexing_task", source="connector_indexing_task",
message=f"Starting Notion pages indexing for connector {connector_id}", message=f"Starting Notion pages indexing for connector {connector_id}",
metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
) )
try: try:
@ -763,7 +763,7 @@ async def index_github_repos(
task_name="github_repos_indexing", task_name="github_repos_indexing",
source="connector_indexing_task", source="connector_indexing_task",
message=f"Starting GitHub repositories indexing for connector {connector_id}", message=f"Starting GitHub repositories indexing for connector {connector_id}",
metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
) )
documents_processed = 0 documents_processed = 0
@ -1007,7 +1007,7 @@ async def index_linear_issues(
task_name="linear_issues_indexing", task_name="linear_issues_indexing",
source="connector_indexing_task", source="connector_indexing_task",
message=f"Starting Linear issues indexing for connector {connector_id}", message=f"Starting Linear issues indexing for connector {connector_id}",
metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
) )
try: try:
@ -1315,7 +1315,7 @@ async def index_discord_messages(
task_name="discord_messages_indexing", task_name="discord_messages_indexing",
source="connector_indexing_task", source="connector_indexing_task",
message=f"Starting Discord messages indexing for connector {connector_id}", message=f"Starting Discord messages indexing for connector {connector_id}",
metadata={"connector_id": connector_id, "user_id": user_id, "start_date": start_date, "end_date": end_date} metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
) )
try: try:

View file

@ -2,8 +2,10 @@
from app.agents.podcaster.graph import graph as podcaster_graph from app.agents.podcaster.graph import graph as podcaster_graph
from app.agents.podcaster.state import State from app.agents.podcaster.state import State
from app.db import Chat, Podcast from app.db import Chat, Podcast
from app.services.task_logging_service import TaskLoggingService
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import SQLAlchemyError
async def generate_document_podcast( async def generate_document_podcast(
@ -24,73 +26,177 @@ async def generate_chat_podcast(
podcast_title: str, podcast_title: str,
user_id: int user_id: int
): ):
# Fetch the chat with the specified ID task_logger = TaskLoggingService(session, search_space_id)
query = select(Chat).filter(
Chat.id == chat_id,
Chat.search_space_id == search_space_id
)
result = await session.execute(query) # Log task start
chat = result.scalars().first() log_entry = await task_logger.log_task_start(
task_name="generate_chat_podcast",
if not chat: source="podcast_task",
raise ValueError(f"Chat with id {chat_id} not found in search space {search_space_id}") message=f"Starting podcast generation for chat {chat_id}",
metadata={
# Create chat history structure "chat_id": chat_id,
chat_history_str = "<chat_history>" "search_space_id": search_space_id,
"podcast_title": podcast_title,
for message in chat.messages: "user_id": str(user_id)
if message["role"] == "user":
chat_history_str += f"<user_message>{message['content']}</user_message>"
elif message["role"] == "assistant":
# Last annotation type will always be "ANSWER" here
answer_annotation = message["annotations"][-1]
answer_text = ""
if answer_annotation["type"] == "ANSWER":
answer_text = answer_annotation["content"]
# If content is a list, join it into a single string
if isinstance(answer_text, list):
answer_text = "\n".join(answer_text)
chat_history_str += f"<assistant_message>{answer_text}</assistant_message>"
chat_history_str += "</chat_history>"
# Pass it to the SurfSense Podcaster
config = {
"configurable": {
"podcast_title": "SurfSense",
"user_id": str(user_id),
} }
}
# Initialize state with database session and streaming service
initial_state = State(
source_content=chat_history_str,
db_session=session
) )
# Run the graph directly try:
result = await podcaster_graph.ainvoke(initial_state, config=config) # Fetch the chat with the specified ID
await task_logger.log_task_progress(
log_entry,
f"Fetching chat {chat_id} from database",
{"stage": "fetch_chat"}
)
# Convert podcast transcript entries to serializable format query = select(Chat).filter(
serializable_transcript = [] Chat.id == chat_id,
for entry in result["podcast_transcript"]: Chat.search_space_id == search_space_id
serializable_transcript.append({ )
"speaker_id": entry.speaker_id,
"dialog": entry.dialog
})
# Create a new podcast entry result = await session.execute(query)
podcast = Podcast( chat = result.scalars().first()
title=f"{podcast_title}",
podcast_transcript=serializable_transcript,
file_location=result["final_podcast_file_path"],
search_space_id=search_space_id
)
# Add to session and commit if not chat:
session.add(podcast) await task_logger.log_task_failure(
await session.commit() log_entry,
await session.refresh(podcast) f"Chat with id {chat_id} not found in search space {search_space_id}",
"Chat not found",
{"error_type": "ChatNotFound"}
)
raise ValueError(f"Chat with id {chat_id} not found in search space {search_space_id}")
return podcast # Create chat history structure
await task_logger.log_task_progress(
log_entry,
f"Processing chat history for chat {chat_id}",
{"stage": "process_chat_history", "message_count": len(chat.messages)}
)
chat_history_str = "<chat_history>"
processed_messages = 0
for message in chat.messages:
if message["role"] == "user":
chat_history_str += f"<user_message>{message['content']}</user_message>"
processed_messages += 1
elif message["role"] == "assistant":
# Last annotation type will always be "ANSWER" here
answer_annotation = message["annotations"][-1]
answer_text = ""
if answer_annotation["type"] == "ANSWER":
answer_text = answer_annotation["content"]
# If content is a list, join it into a single string
if isinstance(answer_text, list):
answer_text = "\n".join(answer_text)
chat_history_str += f"<assistant_message>{answer_text}</assistant_message>"
processed_messages += 1
chat_history_str += "</chat_history>"
# Pass it to the SurfSense Podcaster
await task_logger.log_task_progress(
log_entry,
f"Initializing podcast generation for chat {chat_id}",
{"stage": "initialize_podcast_generation", "processed_messages": processed_messages, "content_length": len(chat_history_str)}
)
config = {
"configurable": {
"podcast_title": "SurfSense",
"user_id": str(user_id),
}
}
# Initialize state with database session and streaming service
initial_state = State(
source_content=chat_history_str,
db_session=session
)
# Run the graph directly
await task_logger.log_task_progress(
log_entry,
f"Running podcast generation graph for chat {chat_id}",
{"stage": "run_podcast_graph"}
)
result = await podcaster_graph.ainvoke(initial_state, config=config)
# Convert podcast transcript entries to serializable format
await task_logger.log_task_progress(
log_entry,
f"Processing podcast transcript for chat {chat_id}",
{"stage": "process_transcript", "transcript_entries": len(result["podcast_transcript"])}
)
serializable_transcript = []
for entry in result["podcast_transcript"]:
serializable_transcript.append({
"speaker_id": entry.speaker_id,
"dialog": entry.dialog
})
# Create a new podcast entry
await task_logger.log_task_progress(
log_entry,
f"Creating podcast database entry for chat {chat_id}",
{"stage": "create_podcast_entry", "file_location": result.get("final_podcast_file_path")}
)
podcast = Podcast(
title=f"{podcast_title}",
podcast_transcript=serializable_transcript,
file_location=result["final_podcast_file_path"],
search_space_id=search_space_id
)
# Add to session and commit
session.add(podcast)
await session.commit()
await session.refresh(podcast)
# Log success
await task_logger.log_task_success(
log_entry,
f"Successfully generated podcast for chat {chat_id}",
{
"podcast_id": podcast.id,
"podcast_title": podcast_title,
"transcript_entries": len(serializable_transcript),
"file_location": result.get("final_podcast_file_path"),
"processed_messages": processed_messages,
"content_length": len(chat_history_str)
}
)
return podcast
except ValueError as ve:
# ValueError is already logged above for chat not found
if "not found" not in str(ve):
await task_logger.log_task_failure(
log_entry,
f"Value error during podcast generation for chat {chat_id}",
str(ve),
{"error_type": "ValueError"}
)
raise ve
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during podcast generation for chat {chat_id}",
str(db_error),
{"error_type": "SQLAlchemyError"}
)
raise db_error
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Unexpected error during podcast generation for chat {chat_id}",
str(e),
{"error_type": type(e).__name__}
)
raise RuntimeError(f"Failed to generate podcast for chat {chat_id}: {str(e)}")

View file

@ -329,7 +329,7 @@ export default function ChatsPageClient({ searchSpaceId }: ChatsPageClientProps)
// Helper to finish the podcast generation process // Helper to finish the podcast generation process
const finishPodcastGeneration = () => { const finishPodcastGeneration = () => {
toast.success("All podcasts are being generated! Check the podcasts tab to see them when ready."); toast.success("All podcasts are being generated! Check the logs tab to see their status.");
setPodcastDialogOpen(false); setPodcastDialogOpen(false);
setSelectedChats([]); setSelectedChats([]);
setSelectionMode(false); setSelectionMode(false);