diff --git a/surfsense_backend/alembic/versions/12_add_logs_table.py b/surfsense_backend/alembic/versions/12_add_logs_table.py
new file mode 100644
index 0000000..0b2cc13
--- /dev/null
+++ b/surfsense_backend/alembic/versions/12_add_logs_table.py
@@ -0,0 +1,71 @@
+"""Add LogLevel and LogStatus enums and logs table
+
+Revision ID: 12
+Revises: 11
+"""
+
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects.postgresql import JSON
+
+
+# revision identifiers, used by Alembic.
+revision: str = "12"
+down_revision: Union[str, None] = "11"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ """Upgrade schema - add LogLevel and LogStatus enums and logs table."""
+
+ # Create LogLevel enum
+ op.execute("""
+ CREATE TYPE loglevel AS ENUM ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
+ """)
+
+ # Create LogStatus enum
+ op.execute("""
+ CREATE TYPE logstatus AS ENUM ('IN_PROGRESS', 'SUCCESS', 'FAILED')
+ """)
+
+ # Create logs table
+ op.execute("""
+ CREATE TABLE logs (
+ id SERIAL PRIMARY KEY,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
+ level loglevel NOT NULL,
+ status logstatus NOT NULL,
+ message TEXT NOT NULL,
+ source VARCHAR(200),
+ log_metadata JSONB DEFAULT '{}',
+ search_space_id INTEGER NOT NULL REFERENCES searchspaces(id) ON DELETE CASCADE
+ )
+ """)
+
+ # Create indexes
+ op.create_index(op.f('ix_logs_id'), 'logs', ['id'], unique=False)
+ op.create_index(op.f('ix_logs_created_at'), 'logs', ['created_at'], unique=False)
+ op.create_index(op.f('ix_logs_level'), 'logs', ['level'], unique=False)
+ op.create_index(op.f('ix_logs_status'), 'logs', ['status'], unique=False)
+ op.create_index(op.f('ix_logs_source'), 'logs', ['source'], unique=False)
+
+
+def downgrade() -> None:
+ """Downgrade schema - remove logs table and enums."""
+
+ # Drop indexes
+ op.drop_index(op.f('ix_logs_source'), table_name='logs')
+ op.drop_index(op.f('ix_logs_status'), table_name='logs')
+ op.drop_index(op.f('ix_logs_level'), table_name='logs')
+ op.drop_index(op.f('ix_logs_created_at'), table_name='logs')
+ op.drop_index(op.f('ix_logs_id'), table_name='logs')
+
+ # Drop logs table
+ op.drop_table('logs')
+
+ # Drop enums
+ op.execute("DROP TYPE IF EXISTS logstatus")
+ op.execute("DROP TYPE IF EXISTS loglevel")
\ No newline at end of file
diff --git a/surfsense_backend/app/agents/researcher/nodes.py b/surfsense_backend/app/agents/researcher/nodes.py
index d8c5ac1..cbcd44f 100644
--- a/surfsense_backend/app/agents/researcher/nodes.py
+++ b/surfsense_backend/app/agents/researcher/nodes.py
@@ -266,27 +266,36 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
from app.db import get_async_session
streaming_service = state.streaming_service
-
- streaming_service.only_update_terminal("๐ Generating answer outline...")
- writer({"yeild_value": streaming_service._format_annotations()})
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "๐ Generating answer outline..."
+ )
+ }
+ )
# Get configuration from runnable config
configuration = Configuration.from_runnable_config(config)
reformulated_query = state.reformulated_query
user_query = configuration.user_query
num_sections = configuration.num_sections
user_id = configuration.user_id
-
- streaming_service.only_update_terminal(f"๐ค Planning research approach for: \"{user_query[:100]}...\"")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f'๐ค Planning research approach for: "{user_query[:100]}..."'
+ )
+ }
+ )
+
# Get user's strategic LLM
llm = await get_user_strategic_llm(state.db_session, user_id)
if not llm:
error_message = f"No strategic LLM configured for user {user_id}"
- streaming_service.only_update_terminal(f"โ {error_message}", "error")
- writer({"yeild_value": streaming_service._format_annotations()})
+ writer({"yield_value": streaming_service.format_error(error_message)})
raise RuntimeError(error_message)
-
+
# Create the human message content
human_message_content = f"""
Now Please create an answer outline for the following query:
@@ -310,10 +319,15 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
Your output MUST be valid JSON in exactly this format. Do not include any other text or explanation.
"""
-
- streaming_service.only_update_terminal("๐ Designing structured outline with AI...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "๐ Designing structured outline with AI..."
+ )
+ }
+ )
+
# Create messages for the LLM
messages = [
SystemMessage(content=get_answer_outline_system_prompt()),
@@ -321,9 +335,14 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
]
# Call the LLM directly without using structured output
- streaming_service.only_update_terminal("โ๏ธ Processing answer structure...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "โ๏ธ Processing answer structure..."
+ )
+ }
+ )
+
response = await llm.ainvoke(messages)
# Parse the JSON response manually
@@ -344,26 +363,34 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
answer_outline = AnswerOutline(**parsed_data)
total_questions = sum(len(section.questions) for section in answer_outline.answer_outline)
- streaming_service.only_update_terminal(f"โ
Successfully generated outline with {len(answer_outline.answer_outline)} sections and {total_questions} research questions!")
- writer({"yeild_value": streaming_service._format_annotations()})
-
- print(f"Successfully generated answer outline with {len(answer_outline.answer_outline)} sections")
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"โ
Successfully generated outline with {len(answer_outline.answer_outline)} sections and {total_questions} research questions!"
+ )
+ }
+ )
+
+ print(
+ f"Successfully generated answer outline with {len(answer_outline.answer_outline)} sections"
+ )
+
# Return state update
return {"answer_outline": answer_outline}
else:
# If JSON structure not found, raise a clear error
- error_message = f"Could not find valid JSON in LLM response. Raw response: {content}"
- streaming_service.only_update_terminal(f"โ {error_message}", "error")
- writer({"yeild_value": streaming_service._format_annotations()})
+ error_message = (
+ f"Could not find valid JSON in LLM response. Raw response: {content}"
+ )
+ writer({"yield_value": streaming_service.format_error(error_message)})
raise ValueError(error_message)
-
+
except (json.JSONDecodeError, ValueError) as e:
# Log the error and re-raise it
error_message = f"Error parsing LLM response: {str(e)}"
- streaming_service.only_update_terminal(f"โ {error_message}", "error")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer({"yield_value": streaming_service.format_error(error_message)})
+
print(f"Error parsing LLM response: {str(e)}")
print(f"Raw response: {response.content}")
raise
@@ -414,8 +441,13 @@ async def fetch_relevant_documents(
if streaming_service and writer:
connector_names = [get_connector_friendly_name(connector) for connector in connectors_to_search]
connector_names_str = ", ".join(connector_names)
- streaming_service.only_update_terminal(f"๐ Starting research on {len(research_questions)} questions using {connector_names_str} data sources")
- writer({"yeild_value": streaming_service._format_annotations()})
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ Starting research on {len(research_questions)} questions using {connector_names_str} data sources"
+ )
+ }
+ )
all_raw_documents = [] # Store all raw documents
all_sources = [] # Store all sources
@@ -423,9 +455,14 @@ async def fetch_relevant_documents(
for i, user_query in enumerate(research_questions):
# Stream question being researched
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐ง Researching question {i+1}/{len(research_questions)}: \"{user_query[:100]}...\"")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f'๐ง Researching question {i + 1}/{len(research_questions)}: "{user_query[:100]}..."'
+ )
+ }
+ )
+
# Use original research question as the query
reformulated_query = user_query
@@ -435,9 +472,14 @@ async def fetch_relevant_documents(
if streaming_service and writer:
connector_emoji = get_connector_emoji(connector)
friendly_name = get_connector_friendly_name(connector)
- streaming_service.only_update_terminal(f"{connector_emoji} Searching {friendly_name} for relevant information...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"{connector_emoji} Searching {friendly_name} for relevant information..."
+ )
+ }
+ )
+
try:
if connector == "YOUTUBE_VIDEO":
source_object, youtube_chunks = await connector_service.search_youtube(
@@ -455,9 +497,14 @@ async def fetch_relevant_documents(
# Stream found document count
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐น Found {len(youtube_chunks)} YouTube chunks related to your query")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐น Found {len(youtube_chunks)} YouTube chunks related to your query"
+ )
+ }
+ )
+
elif connector == "EXTENSION":
source_object, extension_chunks = await connector_service.search_extension(
user_query=reformulated_query,
@@ -474,9 +521,14 @@ async def fetch_relevant_documents(
# Stream found document count
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐งฉ Found {len(extension_chunks)} Browser Extension chunks related to your query")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐งฉ Found {len(extension_chunks)} Browser Extension chunks related to your query"
+ )
+ }
+ )
+
elif connector == "CRAWLED_URL":
source_object, crawled_urls_chunks = await connector_service.search_crawled_urls(
user_query=reformulated_query,
@@ -493,9 +545,14 @@ async def fetch_relevant_documents(
# Stream found document count
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐ Found {len(crawled_urls_chunks)} Web Pages chunks related to your query")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ Found {len(crawled_urls_chunks)} Web Pages chunks related to your query"
+ )
+ }
+ )
+
elif connector == "FILE":
source_object, files_chunks = await connector_service.search_files(
user_query=reformulated_query,
@@ -512,10 +569,14 @@ async def fetch_relevant_documents(
# Stream found document count
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐ Found {len(files_chunks)} Files chunks related to your query")
- writer({"yeild_value": streaming_service._format_annotations()})
-
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ Found {len(files_chunks)} Files chunks related to your query"
+ )
+ }
+ )
+
elif connector == "SLACK_CONNECTOR":
source_object, slack_chunks = await connector_service.search_slack(
user_query=reformulated_query,
@@ -532,9 +593,14 @@ async def fetch_relevant_documents(
# Stream found document count
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐ฌ Found {len(slack_chunks)} Slack messages related to your query")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ฌ Found {len(slack_chunks)} Slack messages related to your query"
+ )
+ }
+ )
+
elif connector == "NOTION_CONNECTOR":
source_object, notion_chunks = await connector_service.search_notion(
user_query=reformulated_query,
@@ -551,9 +617,14 @@ async def fetch_relevant_documents(
# Stream found document count
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐ Found {len(notion_chunks)} Notion pages/blocks related to your query")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ Found {len(notion_chunks)} Notion pages/blocks related to your query"
+ )
+ }
+ )
+
elif connector == "GITHUB_CONNECTOR":
source_object, github_chunks = await connector_service.search_github(
user_query=reformulated_query,
@@ -570,9 +641,14 @@ async def fetch_relevant_documents(
# Stream found document count
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐ Found {len(github_chunks)} GitHub files/issues related to your query")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ Found {len(github_chunks)} GitHub files/issues related to your query"
+ )
+ }
+ )
+
elif connector == "LINEAR_CONNECTOR":
source_object, linear_chunks = await connector_service.search_linear(
user_query=reformulated_query,
@@ -589,9 +665,14 @@ async def fetch_relevant_documents(
# Stream found document count
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐ Found {len(linear_chunks)} Linear issues related to your query")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ Found {len(linear_chunks)} Linear issues related to your query"
+ )
+ }
+ )
+
elif connector == "TAVILY_API":
source_object, tavily_chunks = await connector_service.search_tavily(
user_query=reformulated_query,
@@ -606,9 +687,14 @@ async def fetch_relevant_documents(
# Stream found document count
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐ Found {len(tavily_chunks)} Web Search results related to your query")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ Found {len(tavily_chunks)} Web Search results related to your query"
+ )
+ }
+ )
+
elif connector == "LINKUP_API":
if top_k > 10:
linkup_mode = "deep"
@@ -628,9 +714,14 @@ async def fetch_relevant_documents(
# Stream found document count
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐ Found {len(linkup_chunks)} Linkup results related to your query")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ Found {len(linkup_chunks)} Linkup results related to your query"
+ )
+ }
+ )
+
elif connector == "DISCORD_CONNECTOR":
source_object, discord_chunks = await connector_service.search_discord(
user_query=reformulated_query,
@@ -645,9 +736,13 @@ async def fetch_relevant_documents(
all_raw_documents.extend(discord_chunks)
# Stream found document count
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐จ๏ธ Found {len(discord_chunks)} Discord messages related to your query")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐จ๏ธ Found {len(discord_chunks)} Discord messages related to your query"
+ )
+ }
+ )
except Exception as e:
error_message = f"Error searching connector {connector}: {str(e)}"
@@ -656,9 +751,14 @@ async def fetch_relevant_documents(
# Stream error message
if streaming_service and writer:
friendly_name = get_connector_friendly_name(connector)
- streaming_service.only_update_terminal(f"โ ๏ธ Error searching {friendly_name}: {str(e)}", "error")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_error(
+ f"Error searching {friendly_name}: {str(e)}"
+ )
+ }
+ )
+
# Continue with other connectors on error
continue
@@ -700,14 +800,19 @@ async def fetch_relevant_documents(
if streaming_service and writer:
user_source_count = len(user_selected_sources) if user_selected_sources else 0
connector_source_count = len(deduplicated_sources) - user_source_count
- streaming_service.only_update_terminal(f"๐ Collected {len(deduplicated_sources)} total sources ({user_source_count} user-selected + {connector_source_count} from connectors)")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ Collected {len(deduplicated_sources)} total sources ({user_source_count} user-selected + {connector_source_count} from connectors)"
+ )
+ }
+ )
+
# After all sources are collected and deduplicated, stream them
if streaming_service and writer:
streaming_service.only_update_sources(deduplicated_sources)
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer({"yield_value": streaming_service._format_annotations()})
+
# Deduplicate raw documents based on chunk_id or content
seen_chunk_ids = set()
seen_content_hashes = set()
@@ -730,9 +835,14 @@ async def fetch_relevant_documents(
# Stream info about deduplicated documents
if streaming_service and writer:
- streaming_service.only_update_terminal(f"๐งน Found {len(deduplicated_docs)} unique document chunks after removing duplicates")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐งน Found {len(deduplicated_docs)} unique document chunks after removing duplicates"
+ )
+ }
+ )
+
# Return deduplicated documents
return deduplicated_docs
@@ -756,15 +866,20 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
# Initialize a dictionary to track content for all sections
# This is used to maintain section content while streaming multiple sections
section_contents = {}
-
- streaming_service.only_update_terminal(f"๐ Starting to process research sections...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "๐ Starting to process research sections..."
+ )
+ }
+ )
+
print(f"Processing sections from outline: {answer_outline is not None}")
if not answer_outline:
- streaming_service.only_update_terminal("โ Error: No answer outline was provided. Cannot generate report.", "error")
- writer({"yeild_value": streaming_service._format_annotations()})
+ error_message = "No answer outline was provided. Cannot generate report."
+ writer({"yield_value": streaming_service.format_error(error_message)})
return {
"final_written_report": "No answer outline was provided. Cannot generate final report."
}
@@ -775,13 +890,23 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
all_questions.extend(section.questions)
print(f"Collected {len(all_questions)} questions from all sections")
- streaming_service.only_update_terminal(f"๐งฉ Found {len(all_questions)} research questions across {len(answer_outline.answer_outline)} sections")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐งฉ Found {len(all_questions)} research questions across {len(answer_outline.answer_outline)} sections"
+ )
+ }
+ )
+
# Fetch relevant documents once for all questions
- streaming_service.only_update_terminal("๐ Searching for relevant information across all connectors...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "๐ Searching for relevant information across all connectors..."
+ )
+ }
+ )
+
if configuration.num_sections == 1:
TOP_K = 10
elif configuration.num_sections == 3:
@@ -798,9 +923,14 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
try:
# First, fetch user-selected documents if any
if configuration.document_ids_to_add_in_context:
- streaming_service.only_update_terminal(f"๐ Including {len(configuration.document_ids_to_add_in_context)} user-selected documents...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ Including {len(configuration.document_ids_to_add_in_context)} user-selected documents..."
+ )
+ }
+ )
+
user_selected_sources, user_selected_documents = await fetch_documents_by_ids(
document_ids=configuration.document_ids_to_add_in_context,
user_id=configuration.user_id,
@@ -808,9 +938,14 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
)
if user_selected_documents:
- streaming_service.only_update_terminal(f"โ
Successfully added {len(user_selected_documents)} user-selected documents to context")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"โ
Successfully added {len(user_selected_documents)} user-selected documents to context"
+ )
+ }
+ )
+
# Create connector service using state db_session
connector_service = ConnectorService(state.db_session, user_id=configuration.user_id)
await connector_service.initialize_counter()
@@ -831,8 +966,7 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
except Exception as e:
error_message = f"Error fetching relevant documents: {str(e)}"
print(error_message)
- streaming_service.only_update_terminal(f"โ {error_message}", "error")
- writer({"yeild_value": streaming_service._format_annotations()})
+ writer({"yield_value": streaming_service.format_error(error_message)})
# Log the error and continue with an empty list of documents
# This allows the process to continue, but the report might lack information
relevant_documents = []
@@ -843,15 +977,25 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
print(f"Fetched {len(relevant_documents)} relevant documents for all sections")
print(f"Added {len(user_selected_documents)} user-selected documents for all sections")
print(f"Total documents for sections: {len(all_documents)}")
-
- streaming_service.only_update_terminal(f"โจ Starting to draft {len(answer_outline.answer_outline)} sections using {len(all_documents)} total document chunks ({len(user_selected_documents)} user-selected + {len(relevant_documents)} connector-found)")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"โจ Starting to draft {len(answer_outline.answer_outline)} sections using {len(all_documents)} total document chunks ({len(user_selected_documents)} user-selected + {len(relevant_documents)} connector-found)"
+ )
+ }
+ )
+
# Create tasks to process each section in parallel with the same document set
section_tasks = []
- streaming_service.only_update_terminal("โ๏ธ Creating processing tasks for each section...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "โ๏ธ Creating processing tasks for each section..."
+ )
+ }
+ )
+
for i, section in enumerate(answer_outline.answer_outline):
if i == 0:
sub_section_type = SubSectionType.START
@@ -885,23 +1029,32 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
# Run all section processing tasks in parallel
print(f"Running {len(section_tasks)} section processing tasks in parallel")
- streaming_service.only_update_terminal(f"โณ Processing {len(section_tasks)} sections simultaneously...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"โณ Processing {len(section_tasks)} sections simultaneously..."
+ )
+ }
+ )
+
section_results = await asyncio.gather(*section_tasks, return_exceptions=True)
# Handle any exceptions in the results
- streaming_service.only_update_terminal("๐งต Combining section results into final report...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "๐งต Combining section results into final report..."
+ )
+ }
+ )
+
processed_results = []
for i, result in enumerate(section_results):
if isinstance(result, Exception):
section_title = answer_outline.answer_outline[i].section_title
error_message = f"Error processing section '{section_title}': {str(result)}"
print(error_message)
- streaming_service.only_update_terminal(f"โ ๏ธ {error_message}", "error")
- writer({"yeild_value": streaming_service._format_annotations()})
+ writer({"yield_value": streaming_service.format_error(error_message)})
processed_results.append(error_message)
else:
processed_results.append(result)
@@ -912,18 +1065,27 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
# Skip adding the section header since the content already contains the title
final_report.append(content)
final_report.append("\n")
+
+ # Stream each section with its title
+ writer(
+ {
+ "yield_value": state.streaming_service.format_text_chunk(f"# {section.section_title}\n\n{content}")
+ }
+ )
# Join all sections with newlines
final_written_report = "\n".join(final_report)
print(f"Generated final report with {len(final_report)} parts")
-
- streaming_service.only_update_terminal("๐ Final research report generated successfully!")
- writer({"yeild_value": streaming_service._format_annotations()})
-
- # Skip the final update since we've been streaming incremental updates
- # The final answer from each section is already shown in the UI
-
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "๐ Final research report generated successfully!"
+ )
+ }
+ )
+
# Use the shared documents for further question generation
# Since all sections used the same document pool, we can use it directly
return {
@@ -969,16 +1131,26 @@ async def process_section_with_documents(
# Send status update via streaming if available
if state and state.streaming_service and writer:
- state.streaming_service.only_update_terminal(f"๐ Writing section: \"{section_title}\" with {len(section_questions)} research questions")
- writer({"yeild_value": state.streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": state.streaming_service.format_terminal_info_delta(
+ f'๐ Writing section: "{section_title}" with {len(section_questions)} research questions'
+ )
+ }
+ )
+
# Fallback if no documents found
if not documents_to_use:
print(f"No relevant documents found for section: {section_title}")
if state and state.streaming_service and writer:
- state.streaming_service.only_update_terminal(f"โ ๏ธ Warning: No relevant documents found for section: \"{section_title}\"", "warning")
- writer({"yeild_value": state.streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": state.streaming_service.format_error(
+ f'Warning: No relevant documents found for section: "{section_title}"'
+ )
+ }
+ )
+
documents_to_use = [
{"content": f"No specific information was found for: {question}"}
for question in section_questions
@@ -993,7 +1165,7 @@ async def process_section_with_documents(
"user_query": user_query,
"relevant_documents": documents_to_use,
"user_id": user_id,
- "search_space_id": search_space_id
+ "search_space_id": search_space_id,
}
}
@@ -1006,9 +1178,14 @@ async def process_section_with_documents(
# Invoke the sub-section writer graph with streaming
print(f"Invoking sub_section_writer for: {section_title}")
if state and state.streaming_service and writer:
- state.streaming_service.only_update_terminal(f"๐ง Analyzing information and drafting content for section: \"{section_title}\"")
- writer({"yeild_value": state.streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": state.streaming_service.format_terminal_info_delta(
+ f'๐ง Analyzing information and drafting content for section: "{section_title}"'
+ )
+ }
+ )
+
# Variables to track streaming state
complete_content = "" # Tracks the complete content received so far
@@ -1025,8 +1202,14 @@ async def process_section_with_documents(
# Only stream if there's actual new content
if delta and state and state.streaming_service and writer:
# Update terminal with real-time progress indicator
- state.streaming_service.only_update_terminal(f"โ๏ธ Writing section {section_id+1}... ({len(complete_content.split())} words)")
-
+ writer(
+ {
+ "yield_value": state.streaming_service.format_terminal_info_delta(
+ f"โ๏ธ Writing section {section_id + 1}... ({len(complete_content.split())} words)"
+ )
+ }
+ )
+
# Update section_contents with just the new delta
section_contents[section_id]["content"] += delta
@@ -1043,10 +1226,7 @@ async def process_section_with_documents(
complete_answer.extend(content_lines)
complete_answer.append("") # Empty line after content
- # Update answer in UI in real-time
- state.streaming_service.only_update_answer(complete_answer)
- writer({"yeild_value": state.streaming_service._format_annotations()})
-
+
# Set default if no content was received
if not complete_content:
complete_content = "No content was generated for this section."
@@ -1054,18 +1234,28 @@ async def process_section_with_documents(
# Final terminal update
if state and state.streaming_service and writer:
- state.streaming_service.only_update_terminal(f"โ
Completed section: \"{section_title}\"")
- writer({"yeild_value": state.streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": state.streaming_service.format_terminal_info_delta(
+ f'โ
Completed section: "{section_title}"'
+ )
+ }
+ )
+
return complete_content
except Exception as e:
print(f"Error processing section '{section_title}': {str(e)}")
# Send error update via streaming if available
if state and state.streaming_service and writer:
- state.streaming_service.only_update_terminal(f"โ Error processing section \"{section_title}\": {str(e)}", "error")
- writer({"yeild_value": state.streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": state.streaming_service.format_error(
+ f'Error processing section "{section_title}": {str(e)}'
+ )
+ }
+ )
+
return f"Error processing section: {section_title}. Details: {str(e)}"
@@ -1102,17 +1292,32 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
reformulated_query = state.reformulated_query
user_query = configuration.user_query
-
- streaming_service.only_update_terminal("๐ค Starting Q&A research workflow...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
- streaming_service.only_update_terminal(f"๐ Researching: \"{user_query[:100]}...\"")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "๐ค Starting Q&A research workflow..."
+ )
+ }
+ )
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f'๐ Researching: "{user_query[:100]}..."'
+ )
+ }
+ )
+
# Fetch relevant documents for the QNA query
- streaming_service.only_update_terminal("๐ Searching for relevant information across all connectors...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "๐ Searching for relevant information across all connectors..."
+ )
+ }
+ )
+
# Use a reasonable top_k for QNA - not too many documents to avoid overwhelming the LLM
TOP_K = 15
@@ -1123,9 +1328,14 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
try:
# First, fetch user-selected documents if any
if configuration.document_ids_to_add_in_context:
- streaming_service.only_update_terminal(f"๐ Including {len(configuration.document_ids_to_add_in_context)} user-selected documents...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ Including {len(configuration.document_ids_to_add_in_context)} user-selected documents..."
+ )
+ }
+ )
+
user_selected_sources, user_selected_documents = await fetch_documents_by_ids(
document_ids=configuration.document_ids_to_add_in_context,
user_id=configuration.user_id,
@@ -1133,9 +1343,14 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
)
if user_selected_documents:
- streaming_service.only_update_terminal(f"โ
Successfully added {len(user_selected_documents)} user-selected documents to context")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"โ
Successfully added {len(user_selected_documents)} user-selected documents to context"
+ )
+ }
+ )
+
# Create connector service using state db_session
connector_service = ConnectorService(state.db_session, user_id=configuration.user_id)
await connector_service.initialize_counter()
@@ -1159,8 +1374,7 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
except Exception as e:
error_message = f"Error fetching relevant documents for QNA: {str(e)}"
print(error_message)
- streaming_service.only_update_terminal(f"โ {error_message}", "error")
- writer({"yeild_value": streaming_service._format_annotations()})
+ writer({"yield_value": streaming_service.format_error(error_message)})
# Continue with empty documents - the QNA agent will handle this gracefully
relevant_documents = []
@@ -1170,10 +1384,15 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
print(f"Fetched {len(relevant_documents)} relevant documents for QNA")
print(f"Added {len(user_selected_documents)} user-selected documents for QNA")
print(f"Total documents for QNA: {len(all_documents)}")
-
- streaming_service.only_update_terminal(f"๐ง Generating comprehensive answer using {len(all_documents)} total sources ({len(user_selected_documents)} user-selected + {len(relevant_documents)} connector-found)...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"๐ง Generating comprehensive answer using {len(all_documents)} total sources ({len(user_selected_documents)} user-selected + {len(relevant_documents)} connector-found)..."
+ )
+ }
+ )
+
# Prepare configuration for the QNA agent
qna_config = {
"configurable": {
@@ -1192,9 +1411,14 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
}
try:
- streaming_service.only_update_terminal("โ๏ธ Writing comprehensive answer with citations...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "โ๏ธ Writing comprehensive answer with citations..."
+ )
+ }
+ )
+
# Track streaming content for real-time updates
complete_content = ""
captured_reranked_documents = []
@@ -1212,13 +1436,18 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
if delta:
# Update terminal with progress
word_count = len(complete_content.split())
- streaming_service.only_update_terminal(f"โ๏ธ Writing answer... ({word_count} words)")
-
- # Update the answer in real-time
- answer_lines = complete_content.split("\n")
- streaming_service.only_update_answer(answer_lines)
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"โ๏ธ Writing answer... ({word_count} words)"
+ )
+ }
+ )
+
+ writer(
+ {"yield_value": streaming_service.format_text_chunk(delta)}
+ )
+
# Capture reranked documents from QNA agent for further question generation
if "reranked_documents" in chunk:
captured_reranked_documents = chunk["reranked_documents"]
@@ -1226,10 +1455,15 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
# Set default if no content was received
if not complete_content:
complete_content = "I couldn't find relevant information in your knowledge base to answer this question."
-
- streaming_service.only_update_terminal("๐ Q&A answer generated successfully!")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "๐ Q&A answer generated successfully!"
+ )
+ }
+ )
+
# Return the final answer and captured reranked documents for further question generation
return {
"final_written_report": complete_content,
@@ -1239,12 +1473,9 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
except Exception as e:
error_message = f"Error generating QNA answer: {str(e)}"
print(error_message)
- streaming_service.only_update_terminal(f"โ {error_message}", "error")
- writer({"yeild_value": streaming_service._format_annotations()})
-
- return {
- "final_written_report": f"Error generating answer: {str(e)}"
- }
+ writer({"yield_value": streaming_service.format_error(error_message)})
+
+ return {"final_written_report": f"Error generating answer: {str(e)}"}
async def generate_further_questions(state: State, config: RunnableConfig, writer: StreamWriter) -> Dict[str, Any]:
@@ -1268,20 +1499,24 @@ async def generate_further_questions(state: State, config: RunnableConfig, write
# Get reranked documents from the state (will be populated by sub-agents)
reranked_documents = getattr(state, 'reranked_documents', None) or []
-
- streaming_service.only_update_terminal("๐ค Generating follow-up questions...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "๐ค Generating follow-up questions..."
+ )
+ }
+ )
+
# Get user's fast LLM
llm = await get_user_fast_llm(state.db_session, user_id)
if not llm:
error_message = f"No fast LLM configured for user {user_id}"
print(error_message)
- streaming_service.only_update_terminal(f"โ {error_message}", "error")
-
+ writer({"yield_value": streaming_service.format_error(error_message)})
+
# Stream empty further questions to UI
- streaming_service.only_update_further_questions([])
- writer({"yeild_value": streaming_service._format_annotations()})
+ writer({"yield_value": streaming_service.format_further_questions_delta([])})
return {"further_questions": []}
# Format chat history for the prompt
@@ -1338,10 +1573,15 @@ async def generate_further_questions(state: State, config: RunnableConfig, write
Do not include any other text or explanation. Only return the JSON.
"""
-
- streaming_service.only_update_terminal("๐ง Analyzing conversation context to suggest relevant questions...")
- writer({"yeild_value": streaming_service._format_annotations()})
-
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ "๐ง Analyzing conversation context to suggest relevant questions..."
+ )
+ }
+ )
+
# Create messages for the LLM
messages = [
SystemMessage(content=get_further_questions_system_prompt()),
@@ -1366,47 +1606,67 @@ async def generate_further_questions(state: State, config: RunnableConfig, write
# Extract the further_questions array
further_questions = parsed_data.get("further_questions", [])
-
- streaming_service.only_update_terminal(f"โ
Generated {len(further_questions)} contextual follow-up questions!")
-
+
+ writer(
+ {
+ "yield_value": streaming_service.format_terminal_info_delta(
+ f"โ
Generated {len(further_questions)} contextual follow-up questions!"
+ )
+ }
+ )
+
# Stream the further questions to the UI
- streaming_service.only_update_further_questions(further_questions)
- writer({"yeild_value": streaming_service._format_annotations()})
-
+ writer(
+ {
+ "yield_value": streaming_service.format_further_questions_delta(
+ further_questions
+ )
+ }
+ )
+
print(f"Successfully generated {len(further_questions)} further questions")
return {"further_questions": further_questions}
else:
# If JSON structure not found, return empty list
- error_message = "Could not find valid JSON in LLM response for further questions"
+ error_message = (
+ "Could not find valid JSON in LLM response for further questions"
+ )
print(error_message)
- streaming_service.only_update_terminal(f"โ ๏ธ {error_message}", "warning")
-
+ writer(
+ {
+ "yield_value": streaming_service.format_error(
+ f"Warning: {error_message}"
+ )
+ }
+ )
+
# Stream empty further questions to UI
- streaming_service.only_update_further_questions([])
- writer({"yeild_value": streaming_service._format_annotations()})
+ writer(
+ {"yield_value": streaming_service.format_further_questions_delta([])}
+ )
return {"further_questions": []}
except (json.JSONDecodeError, ValueError) as e:
# Log the error and return empty list
error_message = f"Error parsing further questions response: {str(e)}"
print(error_message)
- streaming_service.only_update_terminal(f"โ ๏ธ {error_message}", "warning")
-
+ writer(
+ {"yield_value": streaming_service.format_error(f"Warning: {error_message}")}
+ )
+
# Stream empty further questions to UI
- streaming_service.only_update_further_questions([])
- writer({"yeild_value": streaming_service._format_annotations()})
+ writer({"yield_value": streaming_service.format_further_questions_delta([])})
return {"further_questions": []}
except Exception as e:
# Handle any other errors
error_message = f"Error generating further questions: {str(e)}"
print(error_message)
- streaming_service.only_update_terminal(f"โ ๏ธ {error_message}", "warning")
-
+ writer(
+ {"yield_value": streaming_service.format_error(f"Warning: {error_message}")}
+ )
+
# Stream empty further questions to UI
- streaming_service.only_update_further_questions([])
- writer({"yeild_value": streaming_service._format_annotations()})
+ writer({"yield_value": streaming_service.format_further_questions_delta([])})
return {"further_questions": []}
-
-
diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py
index 0c6311a..7caf365 100644
--- a/surfsense_backend/app/db.py
+++ b/surfsense_backend/app/db.py
@@ -91,6 +91,18 @@ class LiteLLMProvider(str, Enum):
ALEPH_ALPHA = "ALEPH_ALPHA"
PETALS = "PETALS"
CUSTOM = "CUSTOM"
+
+class LogLevel(str, Enum):
+ DEBUG = "DEBUG"
+ INFO = "INFO"
+ WARNING = "WARNING"
+ ERROR = "ERROR"
+ CRITICAL = "CRITICAL"
+
+class LogStatus(str, Enum):
+ IN_PROGRESS = "IN_PROGRESS"
+ SUCCESS = "SUCCESS"
+ FAILED = "FAILED"
class Base(DeclarativeBase):
pass
@@ -163,6 +175,7 @@ class SearchSpace(BaseModel, TimestampMixin):
documents = relationship("Document", back_populates="search_space", order_by="Document.id", cascade="all, delete-orphan")
podcasts = relationship("Podcast", back_populates="search_space", order_by="Podcast.id", cascade="all, delete-orphan")
chats = relationship('Chat', back_populates='search_space', order_by='Chat.id', cascade="all, delete-orphan")
+ logs = relationship("Log", back_populates="search_space", order_by="Log.id", cascade="all, delete-orphan")
class SearchSourceConnector(BaseModel, TimestampMixin):
__tablename__ = "search_source_connectors"
@@ -196,6 +209,18 @@ class LLMConfig(BaseModel, TimestampMixin):
user_id = Column(UUID(as_uuid=True), ForeignKey("user.id", ondelete='CASCADE'), nullable=False)
user = relationship("User", back_populates="llm_configs", foreign_keys=[user_id])
+class Log(BaseModel, TimestampMixin):
+ __tablename__ = "logs"
+
+ level = Column(SQLAlchemyEnum(LogLevel), nullable=False, index=True)
+ status = Column(SQLAlchemyEnum(LogStatus), nullable=False, index=True)
+ message = Column(Text, nullable=False)
+ source = Column(String(200), nullable=True, index=True) # Service/component that generated the log
+ log_metadata = Column(JSON, nullable=True, default={}) # Additional context data
+
+ search_space_id = Column(Integer, ForeignKey("searchspaces.id", ondelete='CASCADE'), nullable=False)
+ search_space = relationship("SearchSpace", back_populates="logs")
+
if config.AUTH_TYPE == "GOOGLE":
class OAuthAccount(SQLAlchemyBaseOAuthAccountTableUUID, Base):
pass
diff --git a/surfsense_backend/app/routes/__init__.py b/surfsense_backend/app/routes/__init__.py
index 3420f35..dd6be9b 100644
--- a/surfsense_backend/app/routes/__init__.py
+++ b/surfsense_backend/app/routes/__init__.py
@@ -5,6 +5,7 @@ from .podcasts_routes import router as podcasts_router
from .chats_routes import router as chats_router
from .search_source_connectors_routes import router as search_source_connectors_router
from .llm_config_routes import router as llm_config_router
+from .logs_routes import router as logs_router
router = APIRouter()
@@ -14,3 +15,4 @@ router.include_router(podcasts_router)
router.include_router(chats_router)
router.include_router(search_source_connectors_router)
router.include_router(llm_config_router)
+router.include_router(logs_router)
diff --git a/surfsense_backend/app/routes/chats_routes.py b/surfsense_backend/app/routes/chats_routes.py
index 9db77f4..dc7c126 100644
--- a/surfsense_backend/app/routes/chats_routes.py
+++ b/surfsense_backend/app/routes/chats_routes.py
@@ -54,32 +54,23 @@ async def handle_chat_data(
if message['role'] == "user":
langchain_chat_history.append(HumanMessage(content=message['content']))
elif message['role'] == "assistant":
- # Find the last "ANSWER" annotation specifically
- answer_annotation = None
- for annotation in reversed(message['annotations']):
- if annotation['type'] == "ANSWER":
- answer_annotation = annotation
- break
-
- if answer_annotation:
- answer_text = answer_annotation['content']
- # If content is a list, join it into a single string
- if isinstance(answer_text, list):
- answer_text = "\n".join(answer_text)
- langchain_chat_history.append(AIMessage(content=answer_text))
+ langchain_chat_history.append(AIMessage(content=message['content']))
- response = StreamingResponse(stream_connector_search_results(
- user_query,
- user.id,
- search_space_id, # Already converted to int in lines 32-37
- session,
- research_mode,
- selected_connectors,
- langchain_chat_history,
- search_mode_str,
- document_ids_to_add_in_context
- ))
- response.headers['x-vercel-ai-data-stream'] = 'v1'
+ response = StreamingResponse(
+ stream_connector_search_results(
+ user_query,
+ user.id,
+ search_space_id,
+ session,
+ research_mode,
+ selected_connectors,
+ langchain_chat_history,
+ search_mode_str,
+ document_ids_to_add_in_context,
+ )
+ )
+
+ response.headers["x-vercel-ai-data-stream"] = "v1"
return response
diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py
index e9725be..5c2a0a0 100644
--- a/surfsense_backend/app/routes/documents_routes.py
+++ b/surfsense_backend/app/routes/documents_routes.py
@@ -135,11 +135,19 @@ async def process_file_in_background(
filename: str,
search_space_id: int,
user_id: str,
- session: AsyncSession
+ session: AsyncSession,
+ task_logger: 'TaskLoggingService',
+ log_entry: 'Log'
):
try:
# Check if the file is a markdown or text file
if filename.lower().endswith(('.md', '.markdown', '.txt')):
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing markdown/text file: {filename}",
+ {"file_type": "markdown", "processing_stage": "reading_file"}
+ )
+
# For markdown files, read the content directly
with open(file_path, 'r', encoding='utf-8') as f:
markdown_content = f.read()
@@ -151,16 +159,42 @@ async def process_file_in_background(
except:
pass
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Creating document from markdown content: {filename}",
+ {"processing_stage": "creating_document", "content_length": len(markdown_content)}
+ )
+
# Process markdown directly through specialized function
- await add_received_markdown_file_document(
+ result = await add_received_markdown_file_document(
session,
filename,
markdown_content,
search_space_id,
user_id
)
+
+ if result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed markdown file: {filename}",
+ {"document_id": result.id, "content_hash": result.content_hash, "file_type": "markdown"}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Markdown file already exists (duplicate): {filename}",
+ {"duplicate_detected": True, "file_type": "markdown"}
+ )
+
# Check if the file is an audio file
elif filename.lower().endswith(('.mp3', '.mp4', '.mpeg', '.mpga', '.m4a', '.wav', '.webm')):
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing audio file for transcription: {filename}",
+ {"file_type": "audio", "processing_stage": "starting_transcription"}
+ )
+
# Open the audio file for transcription
with open(file_path, "rb") as audio_file:
# Use LiteLLM for audio transcription
@@ -184,6 +218,12 @@ async def process_file_in_background(
# Add metadata about the transcription
transcribed_text = f"# Transcription of {filename}\n\n{transcribed_text}"
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Transcription completed, creating document: {filename}",
+ {"processing_stage": "transcription_complete", "transcript_length": len(transcribed_text)}
+ )
+
# Clean up the temp file
try:
os.unlink(file_path)
@@ -191,15 +231,35 @@ async def process_file_in_background(
pass
# Process transcription as markdown document
- await add_received_markdown_file_document(
+ result = await add_received_markdown_file_document(
session,
filename,
transcribed_text,
search_space_id,
user_id
)
+
+ if result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully transcribed and processed audio file: {filename}",
+ {"document_id": result.id, "content_hash": result.content_hash, "file_type": "audio", "transcript_length": len(transcribed_text)}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Audio file transcript already exists (duplicate): {filename}",
+ {"duplicate_detected": True, "file_type": "audio"}
+ )
+
else:
if app_config.ETL_SERVICE == "UNSTRUCTURED":
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing file with Unstructured ETL: {filename}",
+ {"file_type": "document", "etl_service": "UNSTRUCTURED", "processing_stage": "loading"}
+ )
+
from langchain_unstructured import UnstructuredLoader
# Process the file
@@ -215,6 +275,12 @@ async def process_file_in_background(
docs = await loader.aload()
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Unstructured ETL completed, creating document: {filename}",
+ {"processing_stage": "etl_complete", "elements_count": len(docs)}
+ )
+
# Clean up the temp file
import os
try:
@@ -223,14 +289,34 @@ async def process_file_in_background(
pass
# Pass the documents to the existing background task
- await add_received_file_document_using_unstructured(
+ result = await add_received_file_document_using_unstructured(
session,
filename,
docs,
search_space_id,
user_id
)
+
+ if result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed file with Unstructured: {filename}",
+ {"document_id": result.id, "content_hash": result.content_hash, "file_type": "document", "etl_service": "UNSTRUCTURED"}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Document already exists (duplicate): {filename}",
+ {"duplicate_detected": True, "file_type": "document", "etl_service": "UNSTRUCTURED"}
+ )
+
elif app_config.ETL_SERVICE == "LLAMACLOUD":
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing file with LlamaCloud ETL: {filename}",
+ {"file_type": "document", "etl_service": "LLAMACLOUD", "processing_stage": "parsing"}
+ )
+
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
@@ -257,19 +343,45 @@ async def process_file_in_background(
# Get markdown documents from the result
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
+ await task_logger.log_task_progress(
+ log_entry,
+ f"LlamaCloud parsing completed, creating documents: {filename}",
+ {"processing_stage": "parsing_complete", "documents_count": len(markdown_documents)}
+ )
+
for doc in markdown_documents:
# Extract text content from the markdown documents
markdown_content = doc.text
# Process the documents using our LlamaCloud background task
- await add_received_file_document_using_llamacloud(
+ doc_result = await add_received_file_document_using_llamacloud(
session,
filename,
llamacloud_markdown_document=markdown_content,
search_space_id=search_space_id,
user_id=user_id
)
+
+ if doc_result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed file with LlamaCloud: {filename}",
+ {"document_id": doc_result.id, "content_hash": doc_result.content_hash, "file_type": "document", "etl_service": "LLAMACLOUD"}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Document already exists (duplicate): {filename}",
+ {"duplicate_detected": True, "file_type": "document", "etl_service": "LLAMACLOUD"}
+ )
+
elif app_config.ETL_SERVICE == "DOCLING":
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing file with Docling ETL: {filename}",
+ {"file_type": "document", "etl_service": "DOCLING", "processing_stage": "parsing"}
+ )
+
# Use Docling service for document processing
from app.services.document_processing.docling_service import create_docling_service
@@ -286,17 +398,43 @@ async def process_file_in_background(
except:
pass
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Docling parsing completed, creating document: {filename}",
+ {"processing_stage": "parsing_complete", "content_length": len(result['content'])}
+ )
+
# Process the document using our Docling background task
- await add_received_file_document_using_docling(
+ doc_result = await add_received_file_document_using_docling(
session,
filename,
docling_markdown_document=result['content'],
search_space_id=search_space_id,
user_id=user_id
)
+
+ if doc_result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed file with Docling: {filename}",
+ {"document_id": doc_result.id, "content_hash": doc_result.content_hash, "file_type": "document", "etl_service": "DOCLING"}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Document already exists (duplicate): {filename}",
+ {"duplicate_detected": True, "file_type": "document", "etl_service": "DOCLING"}
+ )
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process file: {filename}",
+ str(e),
+ {"error_type": type(e).__name__, "filename": filename}
+ )
import logging
logging.error(f"Error processing file in background: {str(e)}")
+ raise # Re-raise so the wrapper can also handle it
@router.get("/documents/", response_model=List[DocumentRead])
@@ -467,11 +605,47 @@ async def process_extension_document_with_new_session(
):
"""Create a new session and process extension document."""
from app.db import async_session_maker
+ from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
+ # Initialize task logging service
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="process_extension_document",
+ source="document_processor",
+ message=f"Starting processing of extension document from {individual_document.metadata.VisitedWebPageTitle}",
+ metadata={
+ "document_type": "EXTENSION",
+ "url": individual_document.metadata.VisitedWebPageURL,
+ "title": individual_document.metadata.VisitedWebPageTitle,
+ "user_id": user_id
+ }
+ )
+
try:
- await add_extension_received_document(session, individual_document, search_space_id, user_id)
+ result = await add_extension_received_document(session, individual_document, search_space_id, user_id)
+
+ if result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed extension document: {individual_document.metadata.VisitedWebPageTitle}",
+ {"document_id": result.id, "content_hash": result.content_hash}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Extension document already exists (duplicate): {individual_document.metadata.VisitedWebPageTitle}",
+ {"duplicate_detected": True}
+ )
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process extension document: {individual_document.metadata.VisitedWebPageTitle}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
import logging
logging.error(f"Error processing extension document: {str(e)}")
@@ -483,11 +657,46 @@ async def process_crawled_url_with_new_session(
):
"""Create a new session and process crawled URL."""
from app.db import async_session_maker
+ from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
+ # Initialize task logging service
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="process_crawled_url",
+ source="document_processor",
+ message=f"Starting URL crawling and processing for: {url}",
+ metadata={
+ "document_type": "CRAWLED_URL",
+ "url": url,
+ "user_id": user_id
+ }
+ )
+
try:
- await add_crawled_url_document(session, url, search_space_id, user_id)
+ result = await add_crawled_url_document(session, url, search_space_id, user_id)
+
+ if result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully crawled and processed URL: {url}",
+ {"document_id": result.id, "title": result.title, "content_hash": result.content_hash}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"URL document already exists (duplicate): {url}",
+ {"duplicate_detected": True}
+ )
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to crawl URL: {url}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
import logging
logging.error(f"Error processing crawled URL: {str(e)}")
@@ -500,9 +709,38 @@ async def process_file_in_background_with_new_session(
):
"""Create a new session and process file."""
from app.db import async_session_maker
+ from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
- await process_file_in_background(file_path, filename, search_space_id, user_id, session)
+ # Initialize task logging service
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="process_file_upload",
+ source="document_processor",
+ message=f"Starting file processing for: {filename}",
+ metadata={
+ "document_type": "FILE",
+ "filename": filename,
+ "file_path": file_path,
+ "user_id": user_id
+ }
+ )
+
+ try:
+ await process_file_in_background(file_path, filename, search_space_id, user_id, session, task_logger, log_entry)
+
+ # Note: success/failure logging is handled within process_file_in_background
+ except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process file: {filename}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
+ import logging
+ logging.error(f"Error processing file: {str(e)}")
async def process_youtube_video_with_new_session(
@@ -512,11 +750,46 @@ async def process_youtube_video_with_new_session(
):
"""Create a new session and process YouTube video."""
from app.db import async_session_maker
+ from app.services.task_logging_service import TaskLoggingService
async with async_session_maker() as session:
+ # Initialize task logging service
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="process_youtube_video",
+ source="document_processor",
+ message=f"Starting YouTube video processing for: {url}",
+ metadata={
+ "document_type": "YOUTUBE_VIDEO",
+ "url": url,
+ "user_id": user_id
+ }
+ )
+
try:
- await add_youtube_video_document(session, url, search_space_id, user_id)
+ result = await add_youtube_video_document(session, url, search_space_id, user_id)
+
+ if result:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed YouTube video: {result.title}",
+ {"document_id": result.id, "video_id": result.document_metadata.get("video_id"), "content_hash": result.content_hash}
+ )
+ else:
+ await task_logger.log_task_success(
+ log_entry,
+ f"YouTube video document already exists (duplicate): {url}",
+ {"duplicate_detected": True}
+ )
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process YouTube video: {url}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
import logging
logging.error(f"Error processing YouTube video: {str(e)}")
diff --git a/surfsense_backend/app/routes/logs_routes.py b/surfsense_backend/app/routes/logs_routes.py
new file mode 100644
index 0000000..65e33ec
--- /dev/null
+++ b/surfsense_backend/app/routes/logs_routes.py
@@ -0,0 +1,280 @@
+from fastapi import APIRouter, Depends, HTTPException, Query
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.future import select
+from sqlalchemy import and_, desc
+from typing import List, Optional
+from datetime import datetime, timedelta
+
+from app.db import get_async_session, User, SearchSpace, Log, LogLevel, LogStatus
+from app.schemas import LogCreate, LogUpdate, LogRead, LogFilter
+from app.users import current_active_user
+from app.utils.check_ownership import check_ownership
+
+router = APIRouter()
+
+@router.post("/logs/", response_model=LogRead)
+async def create_log(
+ log: LogCreate,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user)
+):
+ """Create a new log entry."""
+ try:
+ # Check if the user owns the search space
+ await check_ownership(session, SearchSpace, log.search_space_id, user)
+
+ db_log = Log(**log.model_dump())
+ session.add(db_log)
+ await session.commit()
+ await session.refresh(db_log)
+ return db_log
+ except HTTPException:
+ raise
+ except Exception as e:
+ await session.rollback()
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to create log: {str(e)}"
+ )
+
+@router.get("/logs/", response_model=List[LogRead])
+async def read_logs(
+ skip: int = 0,
+ limit: int = 100,
+ search_space_id: Optional[int] = None,
+ level: Optional[LogLevel] = None,
+ status: Optional[LogStatus] = None,
+ source: Optional[str] = None,
+ start_date: Optional[datetime] = None,
+ end_date: Optional[datetime] = None,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user)
+):
+ """Get logs with optional filtering."""
+ try:
+ # Build base query - only logs from user's search spaces
+ query = (
+ select(Log)
+ .join(SearchSpace)
+ .filter(SearchSpace.user_id == user.id)
+ .order_by(desc(Log.created_at)) # Most recent first
+ )
+
+ # Apply filters
+ filters = []
+
+ if search_space_id is not None:
+ await check_ownership(session, SearchSpace, search_space_id, user)
+ filters.append(Log.search_space_id == search_space_id)
+
+ if level is not None:
+ filters.append(Log.level == level)
+
+ if status is not None:
+ filters.append(Log.status == status)
+
+ if source is not None:
+ filters.append(Log.source.ilike(f"%{source}%"))
+
+ if start_date is not None:
+ filters.append(Log.created_at >= start_date)
+
+ if end_date is not None:
+ filters.append(Log.created_at <= end_date)
+
+ if filters:
+ query = query.filter(and_(*filters))
+
+ # Apply pagination
+ result = await session.execute(query.offset(skip).limit(limit))
+ return result.scalars().all()
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to fetch logs: {str(e)}"
+ )
+
+@router.get("/logs/{log_id}", response_model=LogRead)
+async def read_log(
+ log_id: int,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user)
+):
+ """Get a specific log by ID."""
+ try:
+ # Get log and verify user owns the search space
+ result = await session.execute(
+ select(Log)
+ .join(SearchSpace)
+ .filter(Log.id == log_id, SearchSpace.user_id == user.id)
+ )
+ log = result.scalars().first()
+
+ if not log:
+ raise HTTPException(status_code=404, detail="Log not found")
+
+ return log
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to fetch log: {str(e)}"
+ )
+
+@router.put("/logs/{log_id}", response_model=LogRead)
+async def update_log(
+ log_id: int,
+ log_update: LogUpdate,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user)
+):
+ """Update a log entry."""
+ try:
+ # Get log and verify user owns the search space
+ result = await session.execute(
+ select(Log)
+ .join(SearchSpace)
+ .filter(Log.id == log_id, SearchSpace.user_id == user.id)
+ )
+ db_log = result.scalars().first()
+
+ if not db_log:
+ raise HTTPException(status_code=404, detail="Log not found")
+
+ # Update only provided fields
+ update_data = log_update.model_dump(exclude_unset=True)
+ for field, value in update_data.items():
+ setattr(db_log, field, value)
+
+ await session.commit()
+ await session.refresh(db_log)
+ return db_log
+ except HTTPException:
+ raise
+ except Exception as e:
+ await session.rollback()
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to update log: {str(e)}"
+ )
+
+@router.delete("/logs/{log_id}")
+async def delete_log(
+ log_id: int,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user)
+):
+ """Delete a log entry."""
+ try:
+ # Get log and verify user owns the search space
+ result = await session.execute(
+ select(Log)
+ .join(SearchSpace)
+ .filter(Log.id == log_id, SearchSpace.user_id == user.id)
+ )
+ db_log = result.scalars().first()
+
+ if not db_log:
+ raise HTTPException(status_code=404, detail="Log not found")
+
+ await session.delete(db_log)
+ await session.commit()
+ return {"message": "Log deleted successfully"}
+ except HTTPException:
+ raise
+ except Exception as e:
+ await session.rollback()
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to delete log: {str(e)}"
+ )
+
+@router.get("/logs/search-space/{search_space_id}/summary")
+async def get_logs_summary(
+ search_space_id: int,
+ hours: int = 24,
+ session: AsyncSession = Depends(get_async_session),
+ user: User = Depends(current_active_user)
+):
+ """Get a summary of logs for a search space in the last X hours."""
+ try:
+ # Check ownership
+ await check_ownership(session, SearchSpace, search_space_id, user)
+
+ # Calculate time window
+ since = datetime.utcnow().replace(microsecond=0) - timedelta(hours=hours)
+
+ # Get logs from the time window
+ result = await session.execute(
+ select(Log)
+ .filter(
+ and_(
+ Log.search_space_id == search_space_id,
+ Log.created_at >= since
+ )
+ )
+ .order_by(desc(Log.created_at))
+ )
+ logs = result.scalars().all()
+
+ # Create summary
+ summary = {
+ "total_logs": len(logs),
+ "time_window_hours": hours,
+ "by_status": {},
+ "by_level": {},
+ "by_source": {},
+ "active_tasks": [],
+ "recent_failures": []
+ }
+
+ # Count by status and level
+ for log in logs:
+ # Status counts
+ status_str = log.status.value
+ summary["by_status"][status_str] = summary["by_status"].get(status_str, 0) + 1
+
+ # Level counts
+ level_str = log.level.value
+ summary["by_level"][level_str] = summary["by_level"].get(level_str, 0) + 1
+
+ # Source counts
+ if log.source:
+ summary["by_source"][log.source] = summary["by_source"].get(log.source, 0) + 1
+
+ # Active tasks (IN_PROGRESS)
+ if log.status == LogStatus.IN_PROGRESS:
+ task_name = log.log_metadata.get("task_name", "Unknown") if log.log_metadata else "Unknown"
+ summary["active_tasks"].append({
+ "id": log.id,
+ "task_name": task_name,
+ "message": log.message,
+ "started_at": log.created_at,
+ "source": log.source
+ })
+
+ # Recent failures
+ if log.status == LogStatus.FAILED and len(summary["recent_failures"]) < 10:
+ task_name = log.log_metadata.get("task_name", "Unknown") if log.log_metadata else "Unknown"
+ summary["recent_failures"].append({
+ "id": log.id,
+ "task_name": task_name,
+ "message": log.message,
+ "failed_at": log.created_at,
+ "source": log.source,
+ "error_details": log.log_metadata.get("error_details") if log.log_metadata else None
+ })
+
+ return summary
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to generate logs summary: {str(e)}"
+ )
\ No newline at end of file
diff --git a/surfsense_backend/app/schemas/__init__.py b/surfsense_backend/app/schemas/__init__.py
index f62172a..89525c9 100644
--- a/surfsense_backend/app/schemas/__init__.py
+++ b/surfsense_backend/app/schemas/__init__.py
@@ -14,6 +14,7 @@ from .podcasts import PodcastBase, PodcastCreate, PodcastUpdate, PodcastRead, Po
from .chats import ChatBase, ChatCreate, ChatUpdate, ChatRead, AISDKChatRequest
from .search_source_connector import SearchSourceConnectorBase, SearchSourceConnectorCreate, SearchSourceConnectorUpdate, SearchSourceConnectorRead
from .llm_config import LLMConfigBase, LLMConfigCreate, LLMConfigUpdate, LLMConfigRead
+from .logs import LogBase, LogCreate, LogUpdate, LogRead, LogFilter
__all__ = [
"AISDKChatRequest",
@@ -53,4 +54,9 @@ __all__ = [
"LLMConfigCreate",
"LLMConfigUpdate",
"LLMConfigRead",
+ "LogBase",
+ "LogCreate",
+ "LogUpdate",
+ "LogRead",
+ "LogFilter",
]
\ No newline at end of file
diff --git a/surfsense_backend/app/schemas/logs.py b/surfsense_backend/app/schemas/logs.py
new file mode 100644
index 0000000..1d9d7e7
--- /dev/null
+++ b/surfsense_backend/app/schemas/logs.py
@@ -0,0 +1,44 @@
+from datetime import datetime
+from typing import Optional, Dict, Any
+from pydantic import BaseModel, ConfigDict
+from .base import IDModel, TimestampModel
+from app.db import LogLevel, LogStatus
+
+class LogBase(BaseModel):
+ level: LogLevel
+ status: LogStatus
+ message: str
+ source: Optional[str] = None
+ log_metadata: Optional[Dict[str, Any]] = None
+
+class LogCreate(BaseModel):
+ level: LogLevel
+ status: LogStatus
+ message: str
+ source: Optional[str] = None
+ log_metadata: Optional[Dict[str, Any]] = None
+ search_space_id: int
+
+class LogUpdate(BaseModel):
+ level: Optional[LogLevel] = None
+ status: Optional[LogStatus] = None
+ message: Optional[str] = None
+ source: Optional[str] = None
+ log_metadata: Optional[Dict[str, Any]] = None
+
+class LogRead(LogBase, IDModel, TimestampModel):
+ id: int
+ created_at: datetime
+ search_space_id: int
+
+ model_config = ConfigDict(from_attributes=True)
+
+class LogFilter(BaseModel):
+ level: Optional[LogLevel] = None
+ status: Optional[LogStatus] = None
+ source: Optional[str] = None
+ search_space_id: Optional[int] = None
+ start_date: Optional[datetime] = None
+ end_date: Optional[datetime] = None
+
+ model_config = ConfigDict(from_attributes=True)
\ No newline at end of file
diff --git a/surfsense_backend/app/services/streaming_service.py b/surfsense_backend/app/services/streaming_service.py
index 514e76b..ce1188a 100644
--- a/surfsense_backend/app/services/streaming_service.py
+++ b/surfsense_backend/app/services/streaming_service.py
@@ -23,17 +23,138 @@ class StreamingService:
"content": []
}
]
- # It is used to send annotations to the frontend
+
+ # DEPRECATED: This sends the full annotation array every time (inefficient)
def _format_annotations(self) -> str:
"""
Format the annotations as a string
-
+
+ DEPRECATED: This method sends the full annotation state every time.
+ Use the delta formatters instead for optimal streaming.
+
Returns:
str: The formatted annotations string
"""
return f'8:{json.dumps(self.message_annotations)}\n'
-
- # It is used to end Streaming
+
+ def format_terminal_info_delta(self, text: str, message_type: str = "info") -> str:
+ """
+ Format a single terminal info message as a delta annotation
+
+ Args:
+ text: The terminal message text
+ message_type: The message type (info, error, success, etc.)
+
+ Returns:
+ str: The formatted annotation delta string
+ """
+ message = {"id": self.terminal_idx, "text": text, "type": message_type}
+ self.terminal_idx += 1
+
+ # Update internal state for reference
+ self.message_annotations[0]["content"].append(message)
+
+ # Return only the delta annotation
+ annotation = {"type": "TERMINAL_INFO", "content": [message]}
+ return f"8:[{json.dumps(annotation)}]\n"
+
+ def format_sources_delta(self, sources: List[Dict[str, Any]]) -> str:
+ """
+ Format sources as a delta annotation
+
+ Args:
+ sources: List of source objects
+
+ Returns:
+ str: The formatted annotation delta string
+ """
+ # Update internal state
+ self.message_annotations[1]["content"] = sources
+
+ # Return only the delta annotation
+ annotation = {"type": "SOURCES", "content": sources}
+ return f"8:[{json.dumps(annotation)}]\n"
+
+ def format_answer_delta(self, answer_chunk: str) -> str:
+ """
+ Format a single answer chunk as a delta annotation
+
+ Args:
+ answer_chunk: The new answer chunk to add
+
+ Returns:
+ str: The formatted annotation delta string
+ """
+ # Update internal state by appending the chunk
+ if isinstance(self.message_annotations[2]["content"], list):
+ self.message_annotations[2]["content"].append(answer_chunk)
+ else:
+ self.message_annotations[2]["content"] = [answer_chunk]
+
+ # Return only the delta annotation with the new chunk
+ annotation = {"type": "ANSWER", "content": [answer_chunk]}
+ return f"8:[{json.dumps(annotation)}]\n"
+
+ def format_answer_annotation(self, answer_lines: List[str]) -> str:
+ """
+ Format the complete answer as a replacement annotation
+
+ Args:
+ answer_lines: Complete list of answer lines
+
+ Returns:
+ str: The formatted annotation string
+ """
+ # Update internal state
+ self.message_annotations[2]["content"] = answer_lines
+
+ # Return the full answer annotation
+ annotation = {"type": "ANSWER", "content": answer_lines}
+ return f"8:[{json.dumps(annotation)}]\n"
+
+ def format_further_questions_delta(
+ self, further_questions: List[Dict[str, Any]]
+ ) -> str:
+ """
+ Format further questions as a delta annotation
+
+ Args:
+ further_questions: List of further question objects
+
+ Returns:
+ str: The formatted annotation delta string
+ """
+ # Update internal state
+ self.message_annotations[3]["content"] = further_questions
+
+ # Return only the delta annotation
+ annotation = {"type": "FURTHER_QUESTIONS", "content": further_questions}
+ return f"8:[{json.dumps(annotation)}]\n"
+
+ def format_text_chunk(self, text: str) -> str:
+ """
+ Format a text chunk using the text stream part
+
+ Args:
+ text: The text chunk to stream
+
+ Returns:
+ str: The formatted text part string
+ """
+ return f"0:{json.dumps(text)}\n"
+
+ def format_error(self, error_message: str) -> str:
+ """
+ Format an error using the error stream part
+
+ Args:
+ error_message: The error message
+
+ Returns:
+ str: The formatted error part string
+ """
+ return f"3:{json.dumps(error_message)}\n"
+
def format_completion(self, prompt_tokens: int = 156, completion_tokens: int = 204) -> str:
"""
Format a completion message
@@ -56,7 +177,12 @@ class StreamingService:
}
return f'd:{json.dumps(completion_data)}\n'
+
+ # DEPRECATED METHODS: Keep for backward compatibility but mark as deprecated
def only_update_terminal(self, text: str, message_type: str = "info") -> str:
+ """
+ DEPRECATED: Use format_terminal_info_delta() instead for optimal streaming
+ """
self.message_annotations[0]["content"].append({
"id": self.terminal_idx,
"text": text,
@@ -66,17 +192,23 @@ class StreamingService:
return self.message_annotations
def only_update_sources(self, sources: List[Dict[str, Any]]) -> str:
+ """
+ DEPRECATED: Use format_sources_delta() instead for optimal streaming
+ """
self.message_annotations[1]["content"] = sources
return self.message_annotations
def only_update_answer(self, answer: List[str]) -> str:
+ """
+ DEPRECATED: Use format_answer_delta() or format_answer_annotation() instead for optimal streaming
+ """
self.message_annotations[2]["content"] = answer
return self.message_annotations
-
+
def only_update_further_questions(self, further_questions: List[Dict[str, Any]]) -> str:
"""
- Update the further questions annotation
-
+ DEPRECATED: Use format_further_questions_delta() instead for optimal streaming
+
Args:
further_questions: List of further question objects with id and question fields
diff --git a/surfsense_backend/app/services/task_logging_service.py b/surfsense_backend/app/services/task_logging_service.py
new file mode 100644
index 0000000..c50e420
--- /dev/null
+++ b/surfsense_backend/app/services/task_logging_service.py
@@ -0,0 +1,204 @@
+from typing import Optional, Dict, Any
+from sqlalchemy.ext.asyncio import AsyncSession
+from app.db import Log, LogLevel, LogStatus
+import logging
+import json
+from datetime import datetime
+
+logger = logging.getLogger(__name__)
+
+class TaskLoggingService:
+ """Service for logging background tasks using the database Log model"""
+
+ def __init__(self, session: AsyncSession, search_space_id: int):
+ self.session = session
+ self.search_space_id = search_space_id
+
+ async def log_task_start(
+ self,
+ task_name: str,
+ source: str,
+ message: str,
+ metadata: Optional[Dict[str, Any]] = None
+ ) -> Log:
+ """
+ Log the start of a task with IN_PROGRESS status
+
+ Args:
+ task_name: Name/identifier of the task
+ source: Source service/component (e.g., 'document_processor', 'slack_indexer')
+ message: Human-readable message about the task
+ metadata: Additional context data
+
+ Returns:
+ Log: The created log entry
+ """
+ log_metadata = metadata or {}
+ log_metadata.update({
+ "task_name": task_name,
+ "started_at": datetime.utcnow().isoformat()
+ })
+
+ log_entry = Log(
+ level=LogLevel.INFO,
+ status=LogStatus.IN_PROGRESS,
+ message=message,
+ source=source,
+ log_metadata=log_metadata,
+ search_space_id=self.search_space_id
+ )
+
+ self.session.add(log_entry)
+ await self.session.commit()
+ await self.session.refresh(log_entry)
+
+ logger.info(f"Started task {task_name}: {message}")
+ return log_entry
+
+ async def log_task_success(
+ self,
+ log_entry: Log,
+ message: str,
+ additional_metadata: Optional[Dict[str, Any]] = None
+ ) -> Log:
+ """
+ Update a log entry to SUCCESS status
+
+ Args:
+ log_entry: The original log entry to update
+ message: Success message
+ additional_metadata: Additional metadata to merge
+
+ Returns:
+ Log: The updated log entry
+ """
+ # Update the existing log entry
+ log_entry.status = LogStatus.SUCCESS
+ log_entry.message = message
+
+ # Merge additional metadata
+ if additional_metadata:
+ if log_entry.log_metadata is None:
+ log_entry.log_metadata = {}
+ log_entry.log_metadata.update(additional_metadata)
+ log_entry.log_metadata["completed_at"] = datetime.utcnow().isoformat()
+
+ await self.session.commit()
+ await self.session.refresh(log_entry)
+
+ task_name = log_entry.log_metadata.get("task_name", "unknown") if log_entry.log_metadata else "unknown"
+ logger.info(f"Completed task {task_name}: {message}")
+ return log_entry
+
+ async def log_task_failure(
+ self,
+ log_entry: Log,
+ error_message: str,
+ error_details: Optional[str] = None,
+ additional_metadata: Optional[Dict[str, Any]] = None
+ ) -> Log:
+ """
+ Update a log entry to FAILED status
+
+ Args:
+ log_entry: The original log entry to update
+ error_message: Error message
+ error_details: Detailed error information
+ additional_metadata: Additional metadata to merge
+
+ Returns:
+ Log: The updated log entry
+ """
+ # Update the existing log entry
+ log_entry.status = LogStatus.FAILED
+ log_entry.level = LogLevel.ERROR
+ log_entry.message = error_message
+
+ # Merge additional metadata
+ if log_entry.log_metadata is None:
+ log_entry.log_metadata = {}
+
+ log_entry.log_metadata.update({
+ "failed_at": datetime.utcnow().isoformat(),
+ "error_details": error_details
+ })
+
+ if additional_metadata:
+ log_entry.log_metadata.update(additional_metadata)
+
+ await self.session.commit()
+ await self.session.refresh(log_entry)
+
+ task_name = log_entry.log_metadata.get("task_name", "unknown") if log_entry.log_metadata else "unknown"
+ logger.error(f"Failed task {task_name}: {error_message}")
+ if error_details:
+ logger.error(f"Error details: {error_details}")
+
+ return log_entry
+
+ async def log_task_progress(
+ self,
+ log_entry: Log,
+ progress_message: str,
+ progress_metadata: Optional[Dict[str, Any]] = None
+ ) -> Log:
+ """
+ Update a log entry with progress information while keeping IN_PROGRESS status
+
+ Args:
+ log_entry: The log entry to update
+ progress_message: Progress update message
+ progress_metadata: Additional progress metadata
+
+ Returns:
+ Log: The updated log entry
+ """
+ log_entry.message = progress_message
+
+ if progress_metadata:
+ if log_entry.log_metadata is None:
+ log_entry.log_metadata = {}
+ log_entry.log_metadata.update(progress_metadata)
+ log_entry.log_metadata["last_progress_update"] = datetime.utcnow().isoformat()
+
+ await self.session.commit()
+ await self.session.refresh(log_entry)
+
+ task_name = log_entry.log_metadata.get("task_name", "unknown") if log_entry.log_metadata else "unknown"
+ logger.info(f"Progress update for task {task_name}: {progress_message}")
+ return log_entry
+
+ async def log_simple_event(
+ self,
+ level: LogLevel,
+ source: str,
+ message: str,
+ metadata: Optional[Dict[str, Any]] = None
+ ) -> Log:
+ """
+ Log a simple event (not a long-running task)
+
+ Args:
+ level: Log level
+ source: Source service/component
+ message: Log message
+ metadata: Additional context data
+
+ Returns:
+ Log: The created log entry
+ """
+ log_entry = Log(
+ level=level,
+ status=LogStatus.SUCCESS, # Simple events are immediately complete
+ message=message,
+ source=source,
+ log_metadata=metadata or {},
+ search_space_id=self.search_space_id
+ )
+
+ self.session.add(log_entry)
+ await self.session.commit()
+ await self.session.refresh(log_entry)
+
+ logger.info(f"Logged event from {source}: {message}")
+ return log_entry
\ No newline at end of file
diff --git a/surfsense_backend/app/tasks/background_tasks.py b/surfsense_backend/app/tasks/background_tasks.py
index a1ed40c..d405861 100644
--- a/surfsense_backend/app/tasks/background_tasks.py
+++ b/surfsense_backend/app/tasks/background_tasks.py
@@ -8,6 +8,7 @@ from app.config import config
from app.prompts import SUMMARY_PROMPT_TEMPLATE
from app.utils.document_converters import convert_document_to_markdown, generate_content_hash
from app.services.llm_service import get_user_long_context_llm
+from app.services.task_logging_service import TaskLoggingService
from langchain_core.documents import Document as LangChainDocument
from langchain_community.document_loaders import FireCrawlLoader, AsyncChromiumLoader
from langchain_community.document_transformers import MarkdownifyTransformer
@@ -22,10 +23,34 @@ md = MarkdownifyTransformer()
async def add_crawled_url_document(
session: AsyncSession, url: str, search_space_id: int, user_id: str
) -> Optional[Document]:
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="crawl_url_document",
+ source="background_task",
+ message=f"Starting URL crawling process for: {url}",
+ metadata={"url": url, "user_id": str(user_id)}
+ )
+
try:
+ # URL validation step
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Validating URL: {url}",
+ {"stage": "validation"}
+ )
+
if not validators.url(url):
raise ValueError(f"Url {url} is not a valid URL address")
+ # Set up crawler
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Setting up crawler for URL: {url}",
+ {"stage": "crawler_setup", "firecrawl_available": bool(config.FIRECRAWL_API_KEY)}
+ )
+
if config.FIRECRAWL_API_KEY:
crawl_loader = FireCrawlLoader(
url=url,
@@ -39,6 +64,13 @@ async def add_crawled_url_document(
else:
crawl_loader = AsyncChromiumLoader(urls=[url], headless=True)
+ # Perform crawling
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Crawling URL content: {url}",
+ {"stage": "crawling", "crawler_type": type(crawl_loader).__name__}
+ )
+
url_crawled = await crawl_loader.aload()
if type(crawl_loader) == FireCrawlLoader:
@@ -46,6 +78,13 @@ async def add_crawled_url_document(
elif type(crawl_loader) == AsyncChromiumLoader:
content_in_markdown = md.transform_documents(url_crawled)[0].page_content
+ # Format document
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing crawled content from: {url}",
+ {"stage": "content_processing", "content_length": len(content_in_markdown)}
+ )
+
# Format document metadata in a more maintainable way
metadata_sections = [
(
@@ -74,6 +113,13 @@ async def add_crawled_url_document(
combined_document_string = "\n".join(document_parts)
content_hash = generate_content_hash(combined_document_string, search_space_id)
+ # Check for duplicates
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Checking for duplicate content: {url}",
+ {"stage": "duplicate_check", "content_hash": content_hash}
+ )
+
# Check if document with this content hash already exists
existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
@@ -81,15 +127,33 @@ async def add_crawled_url_document(
existing_document = existing_doc_result.scalars().first()
if existing_document:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Document already exists for URL: {url}",
+ {"duplicate_detected": True, "existing_document_id": existing_document.id}
+ )
logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.")
return existing_document
+ # Get LLM for summary generation
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Preparing for summary generation: {url}",
+ {"stage": "llm_setup"}
+ )
+
# Get user's long context LLM
user_llm = await get_user_long_context_llm(session, user_id)
if not user_llm:
raise RuntimeError(f"No long context LLM configured for user {user_id}")
# Generate summary
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Generating summary for URL content: {url}",
+ {"stage": "summary_generation"}
+ )
+
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
summary_result = await summary_chain.ainvoke(
{"document": combined_document_string}
@@ -98,6 +162,12 @@ async def add_crawled_url_document(
summary_embedding = config.embedding_model_instance.embed(summary_content)
# Process chunks
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing content chunks for URL: {url}",
+ {"stage": "chunk_processing"}
+ )
+
chunks = [
Chunk(
content=chunk.text,
@@ -107,6 +177,12 @@ async def add_crawled_url_document(
]
# Create and store document
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Creating document in database for URL: {url}",
+ {"stage": "document_creation", "chunks_count": len(chunks)}
+ )
+
document = Document(
search_space_id=search_space_id,
title=url_crawled[0].metadata["title"]
@@ -124,13 +200,38 @@ async def add_crawled_url_document(
await session.commit()
await session.refresh(document)
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully crawled and processed URL: {url}",
+ {
+ "document_id": document.id,
+ "title": document.title,
+ "content_hash": content_hash,
+ "chunks_count": len(chunks),
+ "summary_length": len(summary_content)
+ }
+ )
+
return document
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error while processing URL: {url}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
raise db_error
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to crawl URL: {url}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
raise RuntimeError(f"Failed to crawl URL: {str(e)}")
@@ -148,6 +249,20 @@ async def add_extension_received_document(
Returns:
Document object if successful, None if failed
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="extension_document",
+ source="background_task",
+ message=f"Processing extension document: {content.metadata.VisitedWebPageTitle}",
+ metadata={
+ "url": content.metadata.VisitedWebPageURL,
+ "title": content.metadata.VisitedWebPageTitle,
+ "user_id": str(user_id)
+ }
+ )
+
try:
# Format document metadata in a more maintainable way
metadata_sections = [
@@ -188,6 +303,11 @@ async def add_extension_received_document(
existing_document = existing_doc_result.scalars().first()
if existing_document:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Extension document already exists: {content.metadata.VisitedWebPageTitle}",
+ {"duplicate_detected": True, "existing_document_id": existing_document.id}
+ )
logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.")
return existing_document
@@ -229,19 +349,52 @@ async def add_extension_received_document(
await session.commit()
await session.refresh(document)
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed extension document: {content.metadata.VisitedWebPageTitle}",
+ {
+ "document_id": document.id,
+ "content_hash": content_hash,
+ "url": content.metadata.VisitedWebPageURL
+ }
+ )
+
return document
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error processing extension document: {content.metadata.VisitedWebPageTitle}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
raise db_error
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process extension document: {content.metadata.VisitedWebPageTitle}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
raise RuntimeError(f"Failed to process extension document: {str(e)}")
async def add_received_markdown_file_document(
session: AsyncSession, file_name: str, file_in_markdown: str, search_space_id: int, user_id: str
) -> Optional[Document]:
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="markdown_file_document",
+ source="background_task",
+ message=f"Processing markdown file: {file_name}",
+ metadata={"filename": file_name, "user_id": str(user_id), "content_length": len(file_in_markdown)}
+ )
+
try:
content_hash = generate_content_hash(file_in_markdown, search_space_id)
@@ -252,6 +405,11 @@ async def add_received_markdown_file_document(
existing_document = existing_doc_result.scalars().first()
if existing_document:
+ await task_logger.log_task_success(
+ log_entry,
+ f"Markdown file document already exists: {file_name}",
+ {"duplicate_detected": True, "existing_document_id": existing_document.id}
+ )
logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.")
return existing_document
@@ -293,12 +451,36 @@ async def add_received_markdown_file_document(
await session.commit()
await session.refresh(document)
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed markdown file: {file_name}",
+ {
+ "document_id": document.id,
+ "content_hash": content_hash,
+ "chunks_count": len(chunks),
+ "summary_length": len(summary_content)
+ }
+ )
+
return document
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error processing markdown file: {file_name}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
raise db_error
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process markdown file: {file_name}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
raise RuntimeError(f"Failed to process file document: {str(e)}")
@@ -566,8 +748,24 @@ async def add_youtube_video_document(
SQLAlchemyError: If there's a database error
RuntimeError: If the video processing fails
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="youtube_video_document",
+ source="background_task",
+ message=f"Starting YouTube video processing for: {url}",
+ metadata={"url": url, "user_id": str(user_id)}
+ )
+
try:
# Extract video ID from URL
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Extracting video ID from URL: {url}",
+ {"stage": "video_id_extraction"}
+ )
+
def get_youtube_video_id(url: str):
parsed_url = urlparse(url)
hostname = parsed_url.hostname
@@ -589,7 +787,19 @@ async def add_youtube_video_document(
if not video_id:
raise ValueError(f"Could not extract video ID from URL: {url}")
- # Get video metadata using async HTTP client
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Video ID extracted: {video_id}",
+ {"stage": "video_id_extracted", "video_id": video_id}
+ )
+
+ # Get video metadata
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Fetching video metadata for: {video_id}",
+ {"stage": "metadata_fetch"}
+ )
+
params = {
"format": "json",
"url": f"https://www.youtube.com/watch?v={video_id}",
@@ -600,7 +810,19 @@ async def add_youtube_video_document(
async with http_session.get(oembed_url, params=params) as response:
video_data = await response.json()
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Video metadata fetched: {video_data.get('title', 'Unknown')}",
+ {"stage": "metadata_fetched", "title": video_data.get('title'), "author": video_data.get('author_name')}
+ )
+
# Get video transcript
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Fetching transcript for video: {video_id}",
+ {"stage": "transcript_fetch"}
+ )
+
try:
captions = YouTubeTranscriptApi.get_transcript(video_id)
# Include complete caption information with timestamps
@@ -612,8 +834,26 @@ async def add_youtube_video_document(
timestamp = f"[{start_time:.2f}s-{start_time + duration:.2f}s]"
transcript_segments.append(f"{timestamp} {text}")
transcript_text = "\n".join(transcript_segments)
+
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Transcript fetched successfully: {len(captions)} segments",
+ {"stage": "transcript_fetched", "segments_count": len(captions), "transcript_length": len(transcript_text)}
+ )
except Exception as e:
transcript_text = f"No captions available for this video. Error: {str(e)}"
+ await task_logger.log_task_progress(
+ log_entry,
+ f"No transcript available for video: {video_id}",
+ {"stage": "transcript_unavailable", "error": str(e)}
+ )
+
+ # Format document
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing video content: {video_data.get('title', 'YouTube Video')}",
+ {"stage": "content_processing"}
+ )
# Format document metadata in a more maintainable way
metadata_sections = [
@@ -646,6 +886,13 @@ async def add_youtube_video_document(
combined_document_string = "\n".join(document_parts)
content_hash = generate_content_hash(combined_document_string, search_space_id)
+ # Check for duplicates
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Checking for duplicate video content: {video_id}",
+ {"stage": "duplicate_check", "content_hash": content_hash}
+ )
+
# Check if document with this content hash already exists
existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
@@ -653,15 +900,33 @@ async def add_youtube_video_document(
existing_document = existing_doc_result.scalars().first()
if existing_document:
+ await task_logger.log_task_success(
+ log_entry,
+ f"YouTube video document already exists: {video_data.get('title', 'YouTube Video')}",
+ {"duplicate_detected": True, "existing_document_id": existing_document.id, "video_id": video_id}
+ )
logging.info(f"Document with content hash {content_hash} already exists. Skipping processing.")
return existing_document
+ # Get LLM for summary generation
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Preparing for summary generation: {video_data.get('title', 'YouTube Video')}",
+ {"stage": "llm_setup"}
+ )
+
# Get user's long context LLM
user_llm = await get_user_long_context_llm(session, user_id)
if not user_llm:
raise RuntimeError(f"No long context LLM configured for user {user_id}")
# Generate summary
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Generating summary for video: {video_data.get('title', 'YouTube Video')}",
+ {"stage": "summary_generation"}
+ )
+
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
summary_result = await summary_chain.ainvoke(
{"document": combined_document_string}
@@ -670,6 +935,12 @@ async def add_youtube_video_document(
summary_embedding = config.embedding_model_instance.embed(summary_content)
# Process chunks
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Processing content chunks for video: {video_data.get('title', 'YouTube Video')}",
+ {"stage": "chunk_processing"}
+ )
+
chunks = [
Chunk(
content=chunk.text,
@@ -679,6 +950,11 @@ async def add_youtube_video_document(
]
# Create document
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Creating YouTube video document in database: {video_data.get('title', 'YouTube Video')}",
+ {"stage": "document_creation", "chunks_count": len(chunks)}
+ )
document = Document(
title=video_data.get("title", "YouTube Video"),
@@ -701,11 +977,38 @@ async def add_youtube_video_document(
await session.commit()
await session.refresh(document)
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully processed YouTube video: {video_data.get('title', 'YouTube Video')}",
+ {
+ "document_id": document.id,
+ "video_id": video_id,
+ "title": document.title,
+ "content_hash": content_hash,
+ "chunks_count": len(chunks),
+ "summary_length": len(summary_content),
+ "has_transcript": "No captions available" not in transcript_text
+ }
+ )
+
return document
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error while processing YouTube video: {url}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError", "video_id": video_id if 'video_id' in locals() else None}
+ )
raise db_error
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to process YouTube video: {url}",
+ str(e),
+ {"error_type": type(e).__name__, "video_id": video_id if 'video_id' in locals() else None}
+ )
logging.error(f"Failed to process YouTube video: {str(e)}")
raise
diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py
index c25a7f4..e0b3cd1 100644
--- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py
+++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py
@@ -7,6 +7,7 @@ from app.db import Document, DocumentType, Chunk, SearchSourceConnector, SearchS
from app.config import config
from app.prompts import SUMMARY_PROMPT_TEMPLATE
from app.services.llm_service import get_user_long_context_llm
+from app.services.task_logging_service import TaskLoggingService
from app.connectors.slack_history import SlackHistory
from app.connectors.notion_history import NotionHistoryConnector
from app.connectors.github_connector import GitHubConnector
@@ -42,8 +43,24 @@ async def index_slack_messages(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="slack_messages_indexing",
+ source="connector_indexing_task",
+ message=f"Starting Slack messages indexing for connector {connector_id}",
+ metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
+ )
+
try:
# Get the connector
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Retrieving Slack connector {connector_id} from database",
+ {"stage": "connector_retrieval"}
+ )
+
result = await session.execute(
select(SearchSourceConnector)
.filter(
@@ -54,17 +71,41 @@ async def index_slack_messages(
connector = result.scalars().first()
if not connector:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Connector with ID {connector_id} not found or is not a Slack connector",
+ "Connector not found",
+ {"error_type": "ConnectorNotFound"}
+ )
return 0, f"Connector with ID {connector_id} not found or is not a Slack connector"
# Get the Slack token from the connector config
slack_token = connector.config.get("SLACK_BOT_TOKEN")
if not slack_token:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Slack token not found in connector config for connector {connector_id}",
+ "Missing Slack token",
+ {"error_type": "MissingToken"}
+ )
return 0, "Slack token not found in connector config"
# Initialize Slack client
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Initializing Slack client for connector {connector_id}",
+ {"stage": "client_initialization"}
+ )
+
slack_client = SlackHistory(token=slack_token)
# Calculate date range
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Calculating date range for Slack indexing",
+ {"stage": "date_calculation", "provided_start_date": start_date, "provided_end_date": end_date}
+ )
+
if start_date is None or end_date is None:
# Fall back to calculating dates based on last_indexed_at
calculated_end_date = datetime.now()
@@ -95,13 +136,30 @@ async def index_slack_messages(
logger.info(f"Indexing Slack messages from {start_date_str} to {end_date_str}")
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Fetching Slack channels from {start_date_str} to {end_date_str}",
+ {"stage": "fetch_channels", "start_date": start_date_str, "end_date": end_date_str}
+ )
+
# Get all channels
try:
channels = slack_client.get_all_channels()
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to get Slack channels for connector {connector_id}",
+ str(e),
+ {"error_type": "ChannelFetchError"}
+ )
return 0, f"Failed to get Slack channels: {str(e)}"
if not channels:
+ await task_logger.log_task_success(
+ log_entry,
+ f"No Slack channels found for connector {connector_id}",
+ {"channels_found": 0}
+ )
return 0, "No Slack channels found"
# Track the number of documents indexed
@@ -109,6 +167,12 @@ async def index_slack_messages(
documents_skipped = 0
skipped_channels = []
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Starting to process {len(channels)} Slack channels",
+ {"stage": "process_channels", "total_channels": len(channels)}
+ )
+
# Process each channel
for channel_obj in channels: # Modified loop to iterate over list of channel objects
channel_id = channel_obj["id"]
@@ -283,15 +347,40 @@ async def index_slack_messages(
else:
result_message = f"Processed {total_processed} channels."
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully completed Slack indexing for connector {connector_id}",
+ {
+ "channels_processed": total_processed,
+ "documents_indexed": documents_indexed,
+ "documents_skipped": documents_skipped,
+ "skipped_channels_count": len(skipped_channels),
+ "result_message": result_message
+ }
+ )
+
logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_skipped} skipped")
return total_processed, result_message
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error during Slack indexing for connector {connector_id}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
logger.error(f"Database error: {str(db_error)}")
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to index Slack messages for connector {connector_id}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
logger.error(f"Failed to index Slack messages: {str(e)}")
return 0, f"Failed to index Slack messages: {str(e)}"
@@ -316,8 +405,24 @@ async def index_notion_pages(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="notion_pages_indexing",
+ source="connector_indexing_task",
+ message=f"Starting Notion pages indexing for connector {connector_id}",
+ metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
+ )
+
try:
# Get the connector
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Retrieving Notion connector {connector_id} from database",
+ {"stage": "connector_retrieval"}
+ )
+
result = await session.execute(
select(SearchSourceConnector)
.filter(
@@ -328,14 +433,32 @@ async def index_notion_pages(
connector = result.scalars().first()
if not connector:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Connector with ID {connector_id} not found or is not a Notion connector",
+ "Connector not found",
+ {"error_type": "ConnectorNotFound"}
+ )
return 0, f"Connector with ID {connector_id} not found or is not a Notion connector"
# Get the Notion token from the connector config
notion_token = connector.config.get("NOTION_INTEGRATION_TOKEN")
if not notion_token:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Notion integration token not found in connector config for connector {connector_id}",
+ "Missing Notion token",
+ {"error_type": "MissingToken"}
+ )
return 0, "Notion integration token not found in connector config"
# Initialize Notion client
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Initializing Notion client for connector {connector_id}",
+ {"stage": "client_initialization"}
+ )
+
logger.info(f"Initializing Notion client for connector {connector_id}")
notion_client = NotionHistoryConnector(token=notion_token)
@@ -364,15 +487,32 @@ async def index_notion_pages(
logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}")
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Fetching Notion pages from {start_date_iso} to {end_date_iso}",
+ {"stage": "fetch_pages", "start_date": start_date_iso, "end_date": end_date_iso}
+ )
+
# Get all pages
try:
pages = notion_client.get_all_pages(start_date=start_date_iso, end_date=end_date_iso)
logger.info(f"Found {len(pages)} Notion pages")
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to get Notion pages for connector {connector_id}",
+ str(e),
+ {"error_type": "PageFetchError"}
+ )
logger.error(f"Error fetching Notion pages: {str(e)}", exc_info=True)
return 0, f"Failed to get Notion pages: {str(e)}"
if not pages:
+ await task_logger.log_task_success(
+ log_entry,
+ f"No Notion pages found for connector {connector_id}",
+ {"pages_found": 0}
+ )
logger.info("No Notion pages found to index")
return 0, "No Notion pages found"
@@ -381,6 +521,12 @@ async def index_notion_pages(
documents_skipped = 0
skipped_pages = []
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Starting to process {len(pages)} Notion pages",
+ {"stage": "process_pages", "total_pages": len(pages)}
+ )
+
# Process each page
for page in pages:
try:
@@ -552,15 +698,40 @@ async def index_notion_pages(
else:
result_message = f"Processed {total_processed} pages."
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully completed Notion indexing for connector {connector_id}",
+ {
+ "pages_processed": total_processed,
+ "documents_indexed": documents_indexed,
+ "documents_skipped": documents_skipped,
+ "skipped_pages_count": len(skipped_pages),
+ "result_message": result_message
+ }
+ )
+
logger.info(f"Notion indexing completed: {documents_indexed} new pages, {documents_skipped} skipped")
return total_processed, result_message
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error during Notion indexing for connector {connector_id}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
logger.error(f"Database error during Notion indexing: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to index Notion pages for connector {connector_id}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
logger.error(f"Failed to index Notion pages: {str(e)}", exc_info=True)
return 0, f"Failed to index Notion pages: {str(e)}"
@@ -585,11 +756,27 @@ async def index_github_repos(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="github_repos_indexing",
+ source="connector_indexing_task",
+ message=f"Starting GitHub repositories indexing for connector {connector_id}",
+ metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
+ )
+
documents_processed = 0
errors = []
try:
# 1. Get the GitHub connector from the database
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Retrieving GitHub connector {connector_id} from database",
+ {"stage": "connector_retrieval"}
+ )
+
result = await session.execute(
select(SearchSourceConnector)
.filter(
@@ -600,6 +787,12 @@ async def index_github_repos(
connector = result.scalars().first()
if not connector:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Connector with ID {connector_id} not found or is not a GitHub connector",
+ "Connector not found",
+ {"error_type": "ConnectorNotFound"}
+ )
return 0, f"Connector with ID {connector_id} not found or is not a GitHub connector"
# 2. Get the GitHub PAT and selected repositories from the connector config
@@ -607,20 +800,50 @@ async def index_github_repos(
repo_full_names_to_index = connector.config.get("repo_full_names")
if not github_pat:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"GitHub Personal Access Token (PAT) not found in connector config for connector {connector_id}",
+ "Missing GitHub PAT",
+ {"error_type": "MissingToken"}
+ )
return 0, "GitHub Personal Access Token (PAT) not found in connector config"
if not repo_full_names_to_index or not isinstance(repo_full_names_to_index, list):
- return 0, "'repo_full_names' not found or is not a list in connector config"
+ await task_logger.log_task_failure(
+ log_entry,
+ f"'repo_full_names' not found or is not a list in connector config for connector {connector_id}",
+ "Invalid repo configuration",
+ {"error_type": "InvalidConfiguration"}
+ )
+ return 0, "'repo_full_names' not found or is not a list in connector config"
# 3. Initialize GitHub connector client
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Initializing GitHub client for connector {connector_id}",
+ {"stage": "client_initialization", "repo_count": len(repo_full_names_to_index)}
+ )
+
try:
github_client = GitHubConnector(token=github_pat)
except ValueError as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to initialize GitHub client for connector {connector_id}",
+ str(e),
+ {"error_type": "ClientInitializationError"}
+ )
return 0, f"Failed to initialize GitHub client: {str(e)}"
# 4. Validate selected repositories
# For simplicity, we'll proceed with the list provided.
# If a repo is inaccessible, get_repository_files will likely fail gracefully later.
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Starting indexing for {len(repo_full_names_to_index)} selected repositories",
+ {"stage": "repo_processing", "repo_count": len(repo_full_names_to_index), "start_date": start_date, "end_date": end_date}
+ )
+
logger.info(f"Starting indexing for {len(repo_full_names_to_index)} selected repositories.")
if start_date and end_date:
logger.info(f"Date range requested: {start_date} to {end_date} (Note: GitHub indexing processes all files regardless of dates)")
@@ -719,13 +942,36 @@ async def index_github_repos(
await session.commit()
logger.info(f"Finished GitHub indexing for connector {connector_id}. Processed {documents_processed} files.")
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully completed GitHub indexing for connector {connector_id}",
+ {
+ "documents_processed": documents_processed,
+ "errors_count": len(errors),
+ "repo_count": len(repo_full_names_to_index)
+ }
+ )
+
except SQLAlchemyError as db_err:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error during GitHub indexing for connector {connector_id}",
+ str(db_err),
+ {"error_type": "SQLAlchemyError"}
+ )
logger.error(f"Database error during GitHub indexing for connector {connector_id}: {db_err}")
errors.append(f"Database error: {db_err}")
return documents_processed, "; ".join(errors) if errors else str(db_err)
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Unexpected error during GitHub indexing for connector {connector_id}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
logger.error(f"Unexpected error during GitHub indexing for connector {connector_id}: {e}", exc_info=True)
errors.append(f"Unexpected error: {e}")
return documents_processed, "; ".join(errors) if errors else str(e)
@@ -754,8 +1000,24 @@ async def index_linear_issues(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="linear_issues_indexing",
+ source="connector_indexing_task",
+ message=f"Starting Linear issues indexing for connector {connector_id}",
+ metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
+ )
+
try:
# Get the connector
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Retrieving Linear connector {connector_id} from database",
+ {"stage": "connector_retrieval"}
+ )
+
result = await session.execute(
select(SearchSourceConnector)
.filter(
@@ -766,14 +1028,32 @@ async def index_linear_issues(
connector = result.scalars().first()
if not connector:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Connector with ID {connector_id} not found or is not a Linear connector",
+ "Connector not found",
+ {"error_type": "ConnectorNotFound"}
+ )
return 0, f"Connector with ID {connector_id} not found or is not a Linear connector"
# Get the Linear token from the connector config
linear_token = connector.config.get("LINEAR_API_KEY")
if not linear_token:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Linear API token not found in connector config for connector {connector_id}",
+ "Missing Linear token",
+ {"error_type": "MissingToken"}
+ )
return 0, "Linear API token not found in connector config"
# Initialize Linear client
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Initializing Linear client for connector {connector_id}",
+ {"stage": "client_initialization"}
+ )
+
linear_client = LinearConnector(token=linear_token)
# Calculate date range
@@ -807,6 +1087,12 @@ async def index_linear_issues(
logger.info(f"Fetching Linear issues from {start_date_str} to {end_date_str}")
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Fetching Linear issues from {start_date_str} to {end_date_str}",
+ {"stage": "fetch_issues", "start_date": start_date_str, "end_date": end_date_str}
+ )
+
# Get issues within date range
try:
issues, error = linear_client.get_issues_by_date_range(
@@ -855,6 +1141,12 @@ async def index_linear_issues(
documents_skipped = 0
skipped_issues = []
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Starting to process {len(issues)} Linear issues",
+ {"stage": "process_issues", "total_issues": len(issues)}
+ )
+
# Process each issue
for issue in issues:
try:
@@ -959,16 +1251,39 @@ async def index_linear_issues(
await session.commit()
logger.info(f"Successfully committed all Linear document changes to database")
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully completed Linear indexing for connector {connector_id}",
+ {
+ "issues_processed": total_processed,
+ "documents_indexed": documents_indexed,
+ "documents_skipped": documents_skipped,
+ "skipped_issues_count": len(skipped_issues)
+ }
+ )
logger.info(f"Linear indexing completed: {documents_indexed} new issues, {documents_skipped} skipped")
return total_processed, None # Return None as the error message to indicate success
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error during Linear indexing for connector {connector_id}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
logger.error(f"Database error: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to index Linear issues for connector {connector_id}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
logger.error(f"Failed to index Linear issues: {str(e)}", exc_info=True)
return 0, f"Failed to index Linear issues: {str(e)}"
@@ -993,8 +1308,24 @@ async def index_discord_messages(
Returns:
Tuple containing (number of documents indexed, error message or None)
"""
+ task_logger = TaskLoggingService(session, search_space_id)
+
+ # Log task start
+ log_entry = await task_logger.log_task_start(
+ task_name="discord_messages_indexing",
+ source="connector_indexing_task",
+ message=f"Starting Discord messages indexing for connector {connector_id}",
+ metadata={"connector_id": connector_id, "user_id": str(user_id), "start_date": start_date, "end_date": end_date}
+ )
+
try:
# Get the connector
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Retrieving Discord connector {connector_id} from database",
+ {"stage": "connector_retrieval"}
+ )
+
result = await session.execute(
select(SearchSourceConnector)
.filter(
@@ -1005,16 +1336,34 @@ async def index_discord_messages(
connector = result.scalars().first()
if not connector:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Connector with ID {connector_id} not found or is not a Discord connector",
+ "Connector not found",
+ {"error_type": "ConnectorNotFound"}
+ )
return 0, f"Connector with ID {connector_id} not found or is not a Discord connector"
# Get the Discord token from the connector config
discord_token = connector.config.get("DISCORD_BOT_TOKEN")
if not discord_token:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Discord token not found in connector config for connector {connector_id}",
+ "Missing Discord token",
+ {"error_type": "MissingToken"}
+ )
return 0, "Discord token not found in connector config"
logger.info(f"Starting Discord indexing for connector {connector_id}")
# Initialize Discord client
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Initializing Discord client for connector {connector_id}",
+ {"stage": "client_initialization"}
+ )
+
discord_client = DiscordConnector(token=discord_token)
# Calculate date range
@@ -1054,6 +1403,12 @@ async def index_discord_messages(
skipped_channels = []
try:
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Starting Discord bot and fetching guilds for connector {connector_id}",
+ {"stage": "fetch_guilds"}
+ )
+
logger.info("Starting Discord bot to fetch guilds")
discord_client._bot_task = asyncio.create_task(discord_client.start_bot())
await discord_client._wait_until_ready()
@@ -1062,15 +1417,32 @@ async def index_discord_messages(
guilds = await discord_client.get_guilds()
logger.info(f"Found {len(guilds)} guilds")
except Exception as e:
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to get Discord guilds for connector {connector_id}",
+ str(e),
+ {"error_type": "GuildFetchError"}
+ )
logger.error(f"Failed to get Discord guilds: {str(e)}", exc_info=True)
await discord_client.close_bot()
return 0, f"Failed to get Discord guilds: {str(e)}"
if not guilds:
+ await task_logger.log_task_success(
+ log_entry,
+ f"No Discord guilds found for connector {connector_id}",
+ {"guilds_found": 0}
+ )
logger.info("No Discord guilds found to index")
await discord_client.close_bot()
return 0, "No Discord guilds found"
# Process each guild and channel
+ await task_logger.log_task_progress(
+ log_entry,
+ f"Starting to process {len(guilds)} Discord guilds",
+ {"stage": "process_guilds", "total_guilds": len(guilds)}
+ )
+
for guild in guilds:
guild_id = guild["id"]
guild_name = guild["name"]
@@ -1242,14 +1614,40 @@ async def index_discord_messages(
else:
result_message = f"Processed {documents_indexed} channels."
+ # Log success
+ await task_logger.log_task_success(
+ log_entry,
+ f"Successfully completed Discord indexing for connector {connector_id}",
+ {
+ "channels_processed": documents_indexed,
+ "documents_indexed": documents_indexed,
+ "documents_skipped": documents_skipped,
+ "skipped_channels_count": len(skipped_channels),
+ "guilds_processed": len(guilds),
+ "result_message": result_message
+ }
+ )
+
logger.info(f"Discord indexing completed: {documents_indexed} new channels, {documents_skipped} skipped")
return documents_indexed, result_message
except SQLAlchemyError as db_error:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Database error during Discord indexing for connector {connector_id}",
+ str(db_error),
+ {"error_type": "SQLAlchemyError"}
+ )
logger.error(f"Database error during Discord indexing: {str(db_error)}", exc_info=True)
return 0, f"Database error: {str(db_error)}"
except Exception as e:
await session.rollback()
+ await task_logger.log_task_failure(
+ log_entry,
+ f"Failed to index Discord messages for connector {connector_id}",
+ str(e),
+ {"error_type": type(e).__name__}
+ )
logger.error(f"Failed to index Discord messages: {str(e)}", exc_info=True)
return 0, f"Failed to index Discord messages: {str(e)}"
diff --git a/surfsense_backend/app/tasks/podcast_tasks.py b/surfsense_backend/app/tasks/podcast_tasks.py
index a6be546..f4907af 100644
--- a/surfsense_backend/app/tasks/podcast_tasks.py
+++ b/surfsense_backend/app/tasks/podcast_tasks.py
@@ -2,8 +2,10 @@
from app.agents.podcaster.graph import graph as podcaster_graph
from app.agents.podcaster.state import State
from app.db import Chat, Podcast
+from app.services.task_logging_service import TaskLoggingService
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.exc import SQLAlchemyError
async def generate_document_podcast(
@@ -24,73 +26,177 @@ async def generate_chat_podcast(
podcast_title: str,
user_id: int
):
- # Fetch the chat with the specified ID
- query = select(Chat).filter(
- Chat.id == chat_id,
- Chat.search_space_id == search_space_id
- )
+ task_logger = TaskLoggingService(session, search_space_id)
- result = await session.execute(query)
- chat = result.scalars().first()
-
- if not chat:
- raise ValueError(f"Chat with id {chat_id} not found in search space {search_space_id}")
-
- # Create chat history structure
- chat_history_str = "
+ Monitor and analyze all task execution logs
+ Failed to load summary
+ Last {summary.time_window_hours} hours
+
+ Currently running
+
+ {summary.by_status?.SUCCESS || 0} successful
+
+ Need attention
+ Loading logs... Error loading logs No logs found
+
+ {table.getState().pagination.pageIndex * table.getState().pagination.pageSize + 1}-
+ {Math.min(
+ table.getState().pagination.pageIndex * table.getState().pagination.pageSize +
+ table.getState().pagination.pageSize,
+ table.getRowCount(),
+ )}
+ {" "}
+ of {table.getRowCount()}
+
- Include your API key in the Authorization header of your requests:
+ Include your API key in the Authorization header of your
+ requests:
Task Logs
+
+
+ How to use your API key
+
+ How to use your API key
+
Authentication
+
+ Authentication
+
- Authorization: Bearer {apiKey || 'YOUR_API_KEY'}
+ Authorization: Bearer {apiKey || "YOUR_API_KEY"}
{user.name}
++ {user.email} +
+- {isEditing - ? "Update your search space details" - : "Create a new search space to organize your documents, chats, and podcasts."} -
++ {isEditing + ? "Update your search space details" + : "Create a new search space to organize your documents, chats, and podcasts."} +
+