mirror of
https://github.com/MODSetter/SurfSense.git
synced 2025-09-02 10:39:13 +00:00
Updated Streaming Service to efficently stream content\
\ - Earlier for each chunk, whole message (with all annotations included) were streamed. Leading to extremely large data length. - Fixed to only stream new chunk. - Updated ANSWER part to be streamed as message content (following Vercel's Stream Protocol)\ - Fixed yield typo
This commit is contained in:
parent
d5aae6b229
commit
92781e726c
4 changed files with 638 additions and 256 deletions
|
@ -267,8 +267,13 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
|
||||||
|
|
||||||
streaming_service = state.streaming_service
|
streaming_service = state.streaming_service
|
||||||
|
|
||||||
streaming_service.only_update_terminal("🔍 Generating answer outline...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"🔍 Generating answer outline..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
# Get configuration from runnable config
|
# Get configuration from runnable config
|
||||||
configuration = Configuration.from_runnable_config(config)
|
configuration = Configuration.from_runnable_config(config)
|
||||||
reformulated_query = state.reformulated_query
|
reformulated_query = state.reformulated_query
|
||||||
|
@ -276,15 +281,19 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
|
||||||
num_sections = configuration.num_sections
|
num_sections = configuration.num_sections
|
||||||
user_id = configuration.user_id
|
user_id = configuration.user_id
|
||||||
|
|
||||||
streaming_service.only_update_terminal(f"🤔 Planning research approach for: \"{user_query[:100]}...\"")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f'🤔 Planning research approach for: "{user_query[:100]}..."'
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Get user's strategic LLM
|
# Get user's strategic LLM
|
||||||
llm = await get_user_strategic_llm(state.db_session, user_id)
|
llm = await get_user_strategic_llm(state.db_session, user_id)
|
||||||
if not llm:
|
if not llm:
|
||||||
error_message = f"No strategic LLM configured for user {user_id}"
|
error_message = f"No strategic LLM configured for user {user_id}"
|
||||||
streaming_service.only_update_terminal(f"❌ {error_message}", "error")
|
writer({"yield_value": streaming_service.format_error(error_message)})
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
|
||||||
raise RuntimeError(error_message)
|
raise RuntimeError(error_message)
|
||||||
|
|
||||||
# Create the human message content
|
# Create the human message content
|
||||||
|
@ -311,8 +320,13 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
|
||||||
Your output MUST be valid JSON in exactly this format. Do not include any other text or explanation.
|
Your output MUST be valid JSON in exactly this format. Do not include any other text or explanation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
streaming_service.only_update_terminal("📝 Designing structured outline with AI...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"📝 Designing structured outline with AI..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Create messages for the LLM
|
# Create messages for the LLM
|
||||||
messages = [
|
messages = [
|
||||||
|
@ -321,8 +335,13 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
|
||||||
]
|
]
|
||||||
|
|
||||||
# Call the LLM directly without using structured output
|
# Call the LLM directly without using structured output
|
||||||
streaming_service.only_update_terminal("⚙️ Processing answer structure...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"⚙️ Processing answer structure..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
response = await llm.ainvoke(messages)
|
response = await llm.ainvoke(messages)
|
||||||
|
|
||||||
|
@ -344,25 +363,33 @@ async def write_answer_outline(state: State, config: RunnableConfig, writer: Str
|
||||||
answer_outline = AnswerOutline(**parsed_data)
|
answer_outline = AnswerOutline(**parsed_data)
|
||||||
|
|
||||||
total_questions = sum(len(section.questions) for section in answer_outline.answer_outline)
|
total_questions = sum(len(section.questions) for section in answer_outline.answer_outline)
|
||||||
streaming_service.only_update_terminal(f"✅ Successfully generated outline with {len(answer_outline.answer_outline)} sections and {total_questions} research questions!")
|
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
|
||||||
|
|
||||||
print(f"Successfully generated answer outline with {len(answer_outline.answer_outline)} sections")
|
writer(
|
||||||
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"✅ Successfully generated outline with {len(answer_outline.answer_outline)} sections and {total_questions} research questions!"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"Successfully generated answer outline with {len(answer_outline.answer_outline)} sections"
|
||||||
|
)
|
||||||
|
|
||||||
# Return state update
|
# Return state update
|
||||||
return {"answer_outline": answer_outline}
|
return {"answer_outline": answer_outline}
|
||||||
else:
|
else:
|
||||||
# If JSON structure not found, raise a clear error
|
# If JSON structure not found, raise a clear error
|
||||||
error_message = f"Could not find valid JSON in LLM response. Raw response: {content}"
|
error_message = (
|
||||||
streaming_service.only_update_terminal(f"❌ {error_message}", "error")
|
f"Could not find valid JSON in LLM response. Raw response: {content}"
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
)
|
||||||
|
writer({"yield_value": streaming_service.format_error(error_message)})
|
||||||
raise ValueError(error_message)
|
raise ValueError(error_message)
|
||||||
|
|
||||||
except (json.JSONDecodeError, ValueError) as e:
|
except (json.JSONDecodeError, ValueError) as e:
|
||||||
# Log the error and re-raise it
|
# Log the error and re-raise it
|
||||||
error_message = f"Error parsing LLM response: {str(e)}"
|
error_message = f"Error parsing LLM response: {str(e)}"
|
||||||
streaming_service.only_update_terminal(f"❌ {error_message}", "error")
|
writer({"yield_value": streaming_service.format_error(error_message)})
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
|
||||||
|
|
||||||
print(f"Error parsing LLM response: {str(e)}")
|
print(f"Error parsing LLM response: {str(e)}")
|
||||||
print(f"Raw response: {response.content}")
|
print(f"Raw response: {response.content}")
|
||||||
|
@ -414,8 +441,13 @@ async def fetch_relevant_documents(
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
connector_names = [get_connector_friendly_name(connector) for connector in connectors_to_search]
|
connector_names = [get_connector_friendly_name(connector) for connector in connectors_to_search]
|
||||||
connector_names_str = ", ".join(connector_names)
|
connector_names_str = ", ".join(connector_names)
|
||||||
streaming_service.only_update_terminal(f"🔎 Starting research on {len(research_questions)} questions using {connector_names_str} data sources")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"🔎 Starting research on {len(research_questions)} questions using {connector_names_str} data sources"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
all_raw_documents = [] # Store all raw documents
|
all_raw_documents = [] # Store all raw documents
|
||||||
all_sources = [] # Store all sources
|
all_sources = [] # Store all sources
|
||||||
|
@ -423,8 +455,13 @@ async def fetch_relevant_documents(
|
||||||
for i, user_query in enumerate(research_questions):
|
for i, user_query in enumerate(research_questions):
|
||||||
# Stream question being researched
|
# Stream question being researched
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"🧠 Researching question {i+1}/{len(research_questions)}: \"{user_query[:100]}...\"")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f'🧠 Researching question {i + 1}/{len(research_questions)}: "{user_query[:100]}..."'
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Use original research question as the query
|
# Use original research question as the query
|
||||||
reformulated_query = user_query
|
reformulated_query = user_query
|
||||||
|
@ -435,8 +472,13 @@ async def fetch_relevant_documents(
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
connector_emoji = get_connector_emoji(connector)
|
connector_emoji = get_connector_emoji(connector)
|
||||||
friendly_name = get_connector_friendly_name(connector)
|
friendly_name = get_connector_friendly_name(connector)
|
||||||
streaming_service.only_update_terminal(f"{connector_emoji} Searching {friendly_name} for relevant information...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"{connector_emoji} Searching {friendly_name} for relevant information..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if connector == "YOUTUBE_VIDEO":
|
if connector == "YOUTUBE_VIDEO":
|
||||||
|
@ -455,8 +497,13 @@ async def fetch_relevant_documents(
|
||||||
|
|
||||||
# Stream found document count
|
# Stream found document count
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"📹 Found {len(youtube_chunks)} YouTube chunks related to your query")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"📹 Found {len(youtube_chunks)} YouTube chunks related to your query"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
elif connector == "EXTENSION":
|
elif connector == "EXTENSION":
|
||||||
source_object, extension_chunks = await connector_service.search_extension(
|
source_object, extension_chunks = await connector_service.search_extension(
|
||||||
|
@ -474,8 +521,13 @@ async def fetch_relevant_documents(
|
||||||
|
|
||||||
# Stream found document count
|
# Stream found document count
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"🧩 Found {len(extension_chunks)} Browser Extension chunks related to your query")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"🧩 Found {len(extension_chunks)} Browser Extension chunks related to your query"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
elif connector == "CRAWLED_URL":
|
elif connector == "CRAWLED_URL":
|
||||||
source_object, crawled_urls_chunks = await connector_service.search_crawled_urls(
|
source_object, crawled_urls_chunks = await connector_service.search_crawled_urls(
|
||||||
|
@ -493,8 +545,13 @@ async def fetch_relevant_documents(
|
||||||
|
|
||||||
# Stream found document count
|
# Stream found document count
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"🌐 Found {len(crawled_urls_chunks)} Web Pages chunks related to your query")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"🌐 Found {len(crawled_urls_chunks)} Web Pages chunks related to your query"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
elif connector == "FILE":
|
elif connector == "FILE":
|
||||||
source_object, files_chunks = await connector_service.search_files(
|
source_object, files_chunks = await connector_service.search_files(
|
||||||
|
@ -512,9 +569,13 @@ async def fetch_relevant_documents(
|
||||||
|
|
||||||
# Stream found document count
|
# Stream found document count
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"📄 Found {len(files_chunks)} Files chunks related to your query")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"📄 Found {len(files_chunks)} Files chunks related to your query"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
elif connector == "SLACK_CONNECTOR":
|
elif connector == "SLACK_CONNECTOR":
|
||||||
source_object, slack_chunks = await connector_service.search_slack(
|
source_object, slack_chunks = await connector_service.search_slack(
|
||||||
|
@ -532,8 +593,13 @@ async def fetch_relevant_documents(
|
||||||
|
|
||||||
# Stream found document count
|
# Stream found document count
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"💬 Found {len(slack_chunks)} Slack messages related to your query")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"💬 Found {len(slack_chunks)} Slack messages related to your query"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
elif connector == "NOTION_CONNECTOR":
|
elif connector == "NOTION_CONNECTOR":
|
||||||
source_object, notion_chunks = await connector_service.search_notion(
|
source_object, notion_chunks = await connector_service.search_notion(
|
||||||
|
@ -551,8 +617,13 @@ async def fetch_relevant_documents(
|
||||||
|
|
||||||
# Stream found document count
|
# Stream found document count
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"📘 Found {len(notion_chunks)} Notion pages/blocks related to your query")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"📘 Found {len(notion_chunks)} Notion pages/blocks related to your query"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
elif connector == "GITHUB_CONNECTOR":
|
elif connector == "GITHUB_CONNECTOR":
|
||||||
source_object, github_chunks = await connector_service.search_github(
|
source_object, github_chunks = await connector_service.search_github(
|
||||||
|
@ -570,8 +641,13 @@ async def fetch_relevant_documents(
|
||||||
|
|
||||||
# Stream found document count
|
# Stream found document count
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"🐙 Found {len(github_chunks)} GitHub files/issues related to your query")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"🐙 Found {len(github_chunks)} GitHub files/issues related to your query"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
elif connector == "LINEAR_CONNECTOR":
|
elif connector == "LINEAR_CONNECTOR":
|
||||||
source_object, linear_chunks = await connector_service.search_linear(
|
source_object, linear_chunks = await connector_service.search_linear(
|
||||||
|
@ -589,8 +665,13 @@ async def fetch_relevant_documents(
|
||||||
|
|
||||||
# Stream found document count
|
# Stream found document count
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"📊 Found {len(linear_chunks)} Linear issues related to your query")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"📊 Found {len(linear_chunks)} Linear issues related to your query"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
elif connector == "TAVILY_API":
|
elif connector == "TAVILY_API":
|
||||||
source_object, tavily_chunks = await connector_service.search_tavily(
|
source_object, tavily_chunks = await connector_service.search_tavily(
|
||||||
|
@ -606,8 +687,13 @@ async def fetch_relevant_documents(
|
||||||
|
|
||||||
# Stream found document count
|
# Stream found document count
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"🔍 Found {len(tavily_chunks)} Web Search results related to your query")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"🔍 Found {len(tavily_chunks)} Web Search results related to your query"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
elif connector == "LINKUP_API":
|
elif connector == "LINKUP_API":
|
||||||
if top_k > 10:
|
if top_k > 10:
|
||||||
|
@ -628,8 +714,13 @@ async def fetch_relevant_documents(
|
||||||
|
|
||||||
# Stream found document count
|
# Stream found document count
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"🔗 Found {len(linkup_chunks)} Linkup results related to your query")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"🔗 Found {len(linkup_chunks)} Linkup results related to your query"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
elif connector == "DISCORD_CONNECTOR":
|
elif connector == "DISCORD_CONNECTOR":
|
||||||
source_object, discord_chunks = await connector_service.search_discord(
|
source_object, discord_chunks = await connector_service.search_discord(
|
||||||
|
@ -645,9 +736,13 @@ async def fetch_relevant_documents(
|
||||||
all_raw_documents.extend(discord_chunks)
|
all_raw_documents.extend(discord_chunks)
|
||||||
# Stream found document count
|
# Stream found document count
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"🗨️ Found {len(discord_chunks)} Discord messages related to your query")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"🗨️ Found {len(discord_chunks)} Discord messages related to your query"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error searching connector {connector}: {str(e)}"
|
error_message = f"Error searching connector {connector}: {str(e)}"
|
||||||
|
@ -656,8 +751,13 @@ async def fetch_relevant_documents(
|
||||||
# Stream error message
|
# Stream error message
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
friendly_name = get_connector_friendly_name(connector)
|
friendly_name = get_connector_friendly_name(connector)
|
||||||
streaming_service.only_update_terminal(f"⚠️ Error searching {friendly_name}: {str(e)}", "error")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_error(
|
||||||
|
f"Error searching {friendly_name}: {str(e)}"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Continue with other connectors on error
|
# Continue with other connectors on error
|
||||||
continue
|
continue
|
||||||
|
@ -700,13 +800,18 @@ async def fetch_relevant_documents(
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
user_source_count = len(user_selected_sources) if user_selected_sources else 0
|
user_source_count = len(user_selected_sources) if user_selected_sources else 0
|
||||||
connector_source_count = len(deduplicated_sources) - user_source_count
|
connector_source_count = len(deduplicated_sources) - user_source_count
|
||||||
streaming_service.only_update_terminal(f"📚 Collected {len(deduplicated_sources)} total sources ({user_source_count} user-selected + {connector_source_count} from connectors)")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"📚 Collected {len(deduplicated_sources)} total sources ({user_source_count} user-selected + {connector_source_count} from connectors)"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# After all sources are collected and deduplicated, stream them
|
# After all sources are collected and deduplicated, stream them
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_sources(deduplicated_sources)
|
streaming_service.only_update_sources(deduplicated_sources)
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
writer({"yield_value": streaming_service._format_annotations()})
|
||||||
|
|
||||||
# Deduplicate raw documents based on chunk_id or content
|
# Deduplicate raw documents based on chunk_id or content
|
||||||
seen_chunk_ids = set()
|
seen_chunk_ids = set()
|
||||||
|
@ -730,8 +835,13 @@ async def fetch_relevant_documents(
|
||||||
|
|
||||||
# Stream info about deduplicated documents
|
# Stream info about deduplicated documents
|
||||||
if streaming_service and writer:
|
if streaming_service and writer:
|
||||||
streaming_service.only_update_terminal(f"🧹 Found {len(deduplicated_docs)} unique document chunks after removing duplicates")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"🧹 Found {len(deduplicated_docs)} unique document chunks after removing duplicates"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Return deduplicated documents
|
# Return deduplicated documents
|
||||||
return deduplicated_docs
|
return deduplicated_docs
|
||||||
|
@ -757,14 +867,19 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
|
||||||
# This is used to maintain section content while streaming multiple sections
|
# This is used to maintain section content while streaming multiple sections
|
||||||
section_contents = {}
|
section_contents = {}
|
||||||
|
|
||||||
streaming_service.only_update_terminal(f"🚀 Starting to process research sections...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"🚀 Starting to process research sections..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
print(f"Processing sections from outline: {answer_outline is not None}")
|
print(f"Processing sections from outline: {answer_outline is not None}")
|
||||||
|
|
||||||
if not answer_outline:
|
if not answer_outline:
|
||||||
streaming_service.only_update_terminal("❌ Error: No answer outline was provided. Cannot generate report.", "error")
|
error_message = "No answer outline was provided. Cannot generate report."
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
writer({"yield_value": streaming_service.format_error(error_message)})
|
||||||
return {
|
return {
|
||||||
"final_written_report": "No answer outline was provided. Cannot generate final report."
|
"final_written_report": "No answer outline was provided. Cannot generate final report."
|
||||||
}
|
}
|
||||||
|
@ -775,12 +890,22 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
|
||||||
all_questions.extend(section.questions)
|
all_questions.extend(section.questions)
|
||||||
|
|
||||||
print(f"Collected {len(all_questions)} questions from all sections")
|
print(f"Collected {len(all_questions)} questions from all sections")
|
||||||
streaming_service.only_update_terminal(f"🧩 Found {len(all_questions)} research questions across {len(answer_outline.answer_outline)} sections")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"🧩 Found {len(all_questions)} research questions across {len(answer_outline.answer_outline)} sections"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Fetch relevant documents once for all questions
|
# Fetch relevant documents once for all questions
|
||||||
streaming_service.only_update_terminal("🔍 Searching for relevant information across all connectors...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"🔍 Searching for relevant information across all connectors..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
if configuration.num_sections == 1:
|
if configuration.num_sections == 1:
|
||||||
TOP_K = 10
|
TOP_K = 10
|
||||||
|
@ -798,8 +923,13 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
|
||||||
try:
|
try:
|
||||||
# First, fetch user-selected documents if any
|
# First, fetch user-selected documents if any
|
||||||
if configuration.document_ids_to_add_in_context:
|
if configuration.document_ids_to_add_in_context:
|
||||||
streaming_service.only_update_terminal(f"📋 Including {len(configuration.document_ids_to_add_in_context)} user-selected documents...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"📋 Including {len(configuration.document_ids_to_add_in_context)} user-selected documents..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
user_selected_sources, user_selected_documents = await fetch_documents_by_ids(
|
user_selected_sources, user_selected_documents = await fetch_documents_by_ids(
|
||||||
document_ids=configuration.document_ids_to_add_in_context,
|
document_ids=configuration.document_ids_to_add_in_context,
|
||||||
|
@ -808,8 +938,13 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
|
||||||
)
|
)
|
||||||
|
|
||||||
if user_selected_documents:
|
if user_selected_documents:
|
||||||
streaming_service.only_update_terminal(f"✅ Successfully added {len(user_selected_documents)} user-selected documents to context")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"✅ Successfully added {len(user_selected_documents)} user-selected documents to context"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Create connector service using state db_session
|
# Create connector service using state db_session
|
||||||
connector_service = ConnectorService(state.db_session, user_id=configuration.user_id)
|
connector_service = ConnectorService(state.db_session, user_id=configuration.user_id)
|
||||||
|
@ -831,8 +966,7 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error fetching relevant documents: {str(e)}"
|
error_message = f"Error fetching relevant documents: {str(e)}"
|
||||||
print(error_message)
|
print(error_message)
|
||||||
streaming_service.only_update_terminal(f"❌ {error_message}", "error")
|
writer({"yield_value": streaming_service.format_error(error_message)})
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
|
||||||
# Log the error and continue with an empty list of documents
|
# Log the error and continue with an empty list of documents
|
||||||
# This allows the process to continue, but the report might lack information
|
# This allows the process to continue, but the report might lack information
|
||||||
relevant_documents = []
|
relevant_documents = []
|
||||||
|
@ -844,13 +978,23 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
|
||||||
print(f"Added {len(user_selected_documents)} user-selected documents for all sections")
|
print(f"Added {len(user_selected_documents)} user-selected documents for all sections")
|
||||||
print(f"Total documents for sections: {len(all_documents)}")
|
print(f"Total documents for sections: {len(all_documents)}")
|
||||||
|
|
||||||
streaming_service.only_update_terminal(f"✨ Starting to draft {len(answer_outline.answer_outline)} sections using {len(all_documents)} total document chunks ({len(user_selected_documents)} user-selected + {len(relevant_documents)} connector-found)")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"✨ Starting to draft {len(answer_outline.answer_outline)} sections using {len(all_documents)} total document chunks ({len(user_selected_documents)} user-selected + {len(relevant_documents)} connector-found)"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Create tasks to process each section in parallel with the same document set
|
# Create tasks to process each section in parallel with the same document set
|
||||||
section_tasks = []
|
section_tasks = []
|
||||||
streaming_service.only_update_terminal("⚙️ Creating processing tasks for each section...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"⚙️ Creating processing tasks for each section..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
for i, section in enumerate(answer_outline.answer_outline):
|
for i, section in enumerate(answer_outline.answer_outline):
|
||||||
if i == 0:
|
if i == 0:
|
||||||
|
@ -885,14 +1029,24 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
|
||||||
|
|
||||||
# Run all section processing tasks in parallel
|
# Run all section processing tasks in parallel
|
||||||
print(f"Running {len(section_tasks)} section processing tasks in parallel")
|
print(f"Running {len(section_tasks)} section processing tasks in parallel")
|
||||||
streaming_service.only_update_terminal(f"⏳ Processing {len(section_tasks)} sections simultaneously...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"⏳ Processing {len(section_tasks)} sections simultaneously..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
section_results = await asyncio.gather(*section_tasks, return_exceptions=True)
|
section_results = await asyncio.gather(*section_tasks, return_exceptions=True)
|
||||||
|
|
||||||
# Handle any exceptions in the results
|
# Handle any exceptions in the results
|
||||||
streaming_service.only_update_terminal("🧵 Combining section results into final report...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"🧵 Combining section results into final report..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
processed_results = []
|
processed_results = []
|
||||||
for i, result in enumerate(section_results):
|
for i, result in enumerate(section_results):
|
||||||
|
@ -900,8 +1054,7 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
|
||||||
section_title = answer_outline.answer_outline[i].section_title
|
section_title = answer_outline.answer_outline[i].section_title
|
||||||
error_message = f"Error processing section '{section_title}': {str(result)}"
|
error_message = f"Error processing section '{section_title}': {str(result)}"
|
||||||
print(error_message)
|
print(error_message)
|
||||||
streaming_service.only_update_terminal(f"⚠️ {error_message}", "error")
|
writer({"yield_value": streaming_service.format_error(error_message)})
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
|
||||||
processed_results.append(error_message)
|
processed_results.append(error_message)
|
||||||
else:
|
else:
|
||||||
processed_results.append(result)
|
processed_results.append(result)
|
||||||
|
@ -918,11 +1071,13 @@ async def process_sections(state: State, config: RunnableConfig, writer: StreamW
|
||||||
final_written_report = "\n".join(final_report)
|
final_written_report = "\n".join(final_report)
|
||||||
print(f"Generated final report with {len(final_report)} parts")
|
print(f"Generated final report with {len(final_report)} parts")
|
||||||
|
|
||||||
streaming_service.only_update_terminal("🎉 Final research report generated successfully!")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
# Skip the final update since we've been streaming incremental updates
|
"🎉 Final research report generated successfully!"
|
||||||
# The final answer from each section is already shown in the UI
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Use the shared documents for further question generation
|
# Use the shared documents for further question generation
|
||||||
# Since all sections used the same document pool, we can use it directly
|
# Since all sections used the same document pool, we can use it directly
|
||||||
|
@ -969,15 +1124,25 @@ async def process_section_with_documents(
|
||||||
|
|
||||||
# Send status update via streaming if available
|
# Send status update via streaming if available
|
||||||
if state and state.streaming_service and writer:
|
if state and state.streaming_service and writer:
|
||||||
state.streaming_service.only_update_terminal(f"📝 Writing section: \"{section_title}\" with {len(section_questions)} research questions")
|
writer(
|
||||||
writer({"yeild_value": state.streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": state.streaming_service.format_terminal_info_delta(
|
||||||
|
f'📝 Writing section: "{section_title}" with {len(section_questions)} research questions'
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Fallback if no documents found
|
# Fallback if no documents found
|
||||||
if not documents_to_use:
|
if not documents_to_use:
|
||||||
print(f"No relevant documents found for section: {section_title}")
|
print(f"No relevant documents found for section: {section_title}")
|
||||||
if state and state.streaming_service and writer:
|
if state and state.streaming_service and writer:
|
||||||
state.streaming_service.only_update_terminal(f"⚠️ Warning: No relevant documents found for section: \"{section_title}\"", "warning")
|
writer(
|
||||||
writer({"yeild_value": state.streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": state.streaming_service.format_error(
|
||||||
|
f'Warning: No relevant documents found for section: "{section_title}"'
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
documents_to_use = [
|
documents_to_use = [
|
||||||
{"content": f"No specific information was found for: {question}"}
|
{"content": f"No specific information was found for: {question}"}
|
||||||
|
@ -993,7 +1158,7 @@ async def process_section_with_documents(
|
||||||
"user_query": user_query,
|
"user_query": user_query,
|
||||||
"relevant_documents": documents_to_use,
|
"relevant_documents": documents_to_use,
|
||||||
"user_id": user_id,
|
"user_id": user_id,
|
||||||
"search_space_id": search_space_id
|
"search_space_id": search_space_id,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1006,8 +1171,13 @@ async def process_section_with_documents(
|
||||||
# Invoke the sub-section writer graph with streaming
|
# Invoke the sub-section writer graph with streaming
|
||||||
print(f"Invoking sub_section_writer for: {section_title}")
|
print(f"Invoking sub_section_writer for: {section_title}")
|
||||||
if state and state.streaming_service and writer:
|
if state and state.streaming_service and writer:
|
||||||
state.streaming_service.only_update_terminal(f"🧠 Analyzing information and drafting content for section: \"{section_title}\"")
|
writer(
|
||||||
writer({"yeild_value": state.streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": state.streaming_service.format_terminal_info_delta(
|
||||||
|
f'🧠 Analyzing information and drafting content for section: "{section_title}"'
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Variables to track streaming state
|
# Variables to track streaming state
|
||||||
complete_content = "" # Tracks the complete content received so far
|
complete_content = "" # Tracks the complete content received so far
|
||||||
|
@ -1025,7 +1195,13 @@ async def process_section_with_documents(
|
||||||
# Only stream if there's actual new content
|
# Only stream if there's actual new content
|
||||||
if delta and state and state.streaming_service and writer:
|
if delta and state and state.streaming_service and writer:
|
||||||
# Update terminal with real-time progress indicator
|
# Update terminal with real-time progress indicator
|
||||||
state.streaming_service.only_update_terminal(f"✍️ Writing section {section_id+1}... ({len(complete_content.split())} words)")
|
writer(
|
||||||
|
{
|
||||||
|
"yield_value": state.streaming_service.format_terminal_info_delta(
|
||||||
|
f"✍️ Writing section {section_id + 1}... ({len(complete_content.split())} words)"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Update section_contents with just the new delta
|
# Update section_contents with just the new delta
|
||||||
section_contents[section_id]["content"] += delta
|
section_contents[section_id]["content"] += delta
|
||||||
|
@ -1045,7 +1221,11 @@ async def process_section_with_documents(
|
||||||
|
|
||||||
# Update answer in UI in real-time
|
# Update answer in UI in real-time
|
||||||
state.streaming_service.only_update_answer(complete_answer)
|
state.streaming_service.only_update_answer(complete_answer)
|
||||||
writer({"yeild_value": state.streaming_service._format_annotations()})
|
writer(
|
||||||
|
{
|
||||||
|
"yield_value": state.streaming_service._format_annotations()
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Set default if no content was received
|
# Set default if no content was received
|
||||||
if not complete_content:
|
if not complete_content:
|
||||||
|
@ -1054,8 +1234,13 @@ async def process_section_with_documents(
|
||||||
|
|
||||||
# Final terminal update
|
# Final terminal update
|
||||||
if state and state.streaming_service and writer:
|
if state and state.streaming_service and writer:
|
||||||
state.streaming_service.only_update_terminal(f"✅ Completed section: \"{section_title}\"")
|
writer(
|
||||||
writer({"yeild_value": state.streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": state.streaming_service.format_terminal_info_delta(
|
||||||
|
f'✅ Completed section: "{section_title}"'
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
return complete_content
|
return complete_content
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -1063,8 +1248,13 @@ async def process_section_with_documents(
|
||||||
|
|
||||||
# Send error update via streaming if available
|
# Send error update via streaming if available
|
||||||
if state and state.streaming_service and writer:
|
if state and state.streaming_service and writer:
|
||||||
state.streaming_service.only_update_terminal(f"❌ Error processing section \"{section_title}\": {str(e)}", "error")
|
writer(
|
||||||
writer({"yeild_value": state.streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": state.streaming_service.format_error(
|
||||||
|
f'Error processing section "{section_title}": {str(e)}'
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
return f"Error processing section: {section_title}. Details: {str(e)}"
|
return f"Error processing section: {section_title}. Details: {str(e)}"
|
||||||
|
|
||||||
|
@ -1103,15 +1293,30 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
|
||||||
reformulated_query = state.reformulated_query
|
reformulated_query = state.reformulated_query
|
||||||
user_query = configuration.user_query
|
user_query = configuration.user_query
|
||||||
|
|
||||||
streaming_service.only_update_terminal("🤔 Starting Q&A research workflow...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"🤔 Starting Q&A research workflow..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
streaming_service.only_update_terminal(f"🔍 Researching: \"{user_query[:100]}...\"")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f'🔍 Researching: "{user_query[:100]}..."'
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Fetch relevant documents for the QNA query
|
# Fetch relevant documents for the QNA query
|
||||||
streaming_service.only_update_terminal("🔍 Searching for relevant information across all connectors...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"🔍 Searching for relevant information across all connectors..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Use a reasonable top_k for QNA - not too many documents to avoid overwhelming the LLM
|
# Use a reasonable top_k for QNA - not too many documents to avoid overwhelming the LLM
|
||||||
TOP_K = 15
|
TOP_K = 15
|
||||||
|
@ -1123,8 +1328,13 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
|
||||||
try:
|
try:
|
||||||
# First, fetch user-selected documents if any
|
# First, fetch user-selected documents if any
|
||||||
if configuration.document_ids_to_add_in_context:
|
if configuration.document_ids_to_add_in_context:
|
||||||
streaming_service.only_update_terminal(f"📋 Including {len(configuration.document_ids_to_add_in_context)} user-selected documents...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"📋 Including {len(configuration.document_ids_to_add_in_context)} user-selected documents..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
user_selected_sources, user_selected_documents = await fetch_documents_by_ids(
|
user_selected_sources, user_selected_documents = await fetch_documents_by_ids(
|
||||||
document_ids=configuration.document_ids_to_add_in_context,
|
document_ids=configuration.document_ids_to_add_in_context,
|
||||||
|
@ -1133,8 +1343,13 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
|
||||||
)
|
)
|
||||||
|
|
||||||
if user_selected_documents:
|
if user_selected_documents:
|
||||||
streaming_service.only_update_terminal(f"✅ Successfully added {len(user_selected_documents)} user-selected documents to context")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"✅ Successfully added {len(user_selected_documents)} user-selected documents to context"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Create connector service using state db_session
|
# Create connector service using state db_session
|
||||||
connector_service = ConnectorService(state.db_session, user_id=configuration.user_id)
|
connector_service = ConnectorService(state.db_session, user_id=configuration.user_id)
|
||||||
|
@ -1159,8 +1374,7 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error fetching relevant documents for QNA: {str(e)}"
|
error_message = f"Error fetching relevant documents for QNA: {str(e)}"
|
||||||
print(error_message)
|
print(error_message)
|
||||||
streaming_service.only_update_terminal(f"❌ {error_message}", "error")
|
writer({"yield_value": streaming_service.format_error(error_message)})
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
|
||||||
# Continue with empty documents - the QNA agent will handle this gracefully
|
# Continue with empty documents - the QNA agent will handle this gracefully
|
||||||
relevant_documents = []
|
relevant_documents = []
|
||||||
|
|
||||||
|
@ -1171,8 +1385,13 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
|
||||||
print(f"Added {len(user_selected_documents)} user-selected documents for QNA")
|
print(f"Added {len(user_selected_documents)} user-selected documents for QNA")
|
||||||
print(f"Total documents for QNA: {len(all_documents)}")
|
print(f"Total documents for QNA: {len(all_documents)}")
|
||||||
|
|
||||||
streaming_service.only_update_terminal(f"🧠 Generating comprehensive answer using {len(all_documents)} total sources ({len(user_selected_documents)} user-selected + {len(relevant_documents)} connector-found)...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"🧠 Generating comprehensive answer using {len(all_documents)} total sources ({len(user_selected_documents)} user-selected + {len(relevant_documents)} connector-found)..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Prepare configuration for the QNA agent
|
# Prepare configuration for the QNA agent
|
||||||
qna_config = {
|
qna_config = {
|
||||||
|
@ -1192,8 +1411,13 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
streaming_service.only_update_terminal("✍️ Writing comprehensive answer with citations...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"✍️ Writing comprehensive answer with citations..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Track streaming content for real-time updates
|
# Track streaming content for real-time updates
|
||||||
complete_content = ""
|
complete_content = ""
|
||||||
|
@ -1212,12 +1436,17 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
|
||||||
if delta:
|
if delta:
|
||||||
# Update terminal with progress
|
# Update terminal with progress
|
||||||
word_count = len(complete_content.split())
|
word_count = len(complete_content.split())
|
||||||
streaming_service.only_update_terminal(f"✍️ Writing answer... ({word_count} words)")
|
writer(
|
||||||
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"✍️ Writing answer... ({word_count} words)"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Update the answer in real-time
|
writer(
|
||||||
answer_lines = complete_content.split("\n")
|
{"yield_value": streaming_service.format_text_chunk(delta)}
|
||||||
streaming_service.only_update_answer(answer_lines)
|
)
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
|
||||||
|
|
||||||
# Capture reranked documents from QNA agent for further question generation
|
# Capture reranked documents from QNA agent for further question generation
|
||||||
if "reranked_documents" in chunk:
|
if "reranked_documents" in chunk:
|
||||||
|
@ -1227,8 +1456,13 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
|
||||||
if not complete_content:
|
if not complete_content:
|
||||||
complete_content = "I couldn't find relevant information in your knowledge base to answer this question."
|
complete_content = "I couldn't find relevant information in your knowledge base to answer this question."
|
||||||
|
|
||||||
streaming_service.only_update_terminal("🎉 Q&A answer generated successfully!")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"🎉 Q&A answer generated successfully!"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Return the final answer and captured reranked documents for further question generation
|
# Return the final answer and captured reranked documents for further question generation
|
||||||
return {
|
return {
|
||||||
|
@ -1239,12 +1473,9 @@ async def handle_qna_workflow(state: State, config: RunnableConfig, writer: Stre
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = f"Error generating QNA answer: {str(e)}"
|
error_message = f"Error generating QNA answer: {str(e)}"
|
||||||
print(error_message)
|
print(error_message)
|
||||||
streaming_service.only_update_terminal(f"❌ {error_message}", "error")
|
writer({"yield_value": streaming_service.format_error(error_message)})
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
|
||||||
|
|
||||||
return {
|
return {"final_written_report": f"Error generating answer: {str(e)}"}
|
||||||
"final_written_report": f"Error generating answer: {str(e)}"
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
async def generate_further_questions(state: State, config: RunnableConfig, writer: StreamWriter) -> Dict[str, Any]:
|
async def generate_further_questions(state: State, config: RunnableConfig, writer: StreamWriter) -> Dict[str, Any]:
|
||||||
|
@ -1269,19 +1500,23 @@ async def generate_further_questions(state: State, config: RunnableConfig, write
|
||||||
# Get reranked documents from the state (will be populated by sub-agents)
|
# Get reranked documents from the state (will be populated by sub-agents)
|
||||||
reranked_documents = getattr(state, 'reranked_documents', None) or []
|
reranked_documents = getattr(state, 'reranked_documents', None) or []
|
||||||
|
|
||||||
streaming_service.only_update_terminal("🤔 Generating follow-up questions...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"🤔 Generating follow-up questions..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Get user's fast LLM
|
# Get user's fast LLM
|
||||||
llm = await get_user_fast_llm(state.db_session, user_id)
|
llm = await get_user_fast_llm(state.db_session, user_id)
|
||||||
if not llm:
|
if not llm:
|
||||||
error_message = f"No fast LLM configured for user {user_id}"
|
error_message = f"No fast LLM configured for user {user_id}"
|
||||||
print(error_message)
|
print(error_message)
|
||||||
streaming_service.only_update_terminal(f"❌ {error_message}", "error")
|
writer({"yield_value": streaming_service.format_error(error_message)})
|
||||||
|
|
||||||
# Stream empty further questions to UI
|
# Stream empty further questions to UI
|
||||||
streaming_service.only_update_further_questions([])
|
writer({"yield_value": streaming_service.format_further_questions_delta([])})
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
|
||||||
return {"further_questions": []}
|
return {"further_questions": []}
|
||||||
|
|
||||||
# Format chat history for the prompt
|
# Format chat history for the prompt
|
||||||
|
@ -1339,8 +1574,13 @@ async def generate_further_questions(state: State, config: RunnableConfig, write
|
||||||
Do not include any other text or explanation. Only return the JSON.
|
Do not include any other text or explanation. Only return the JSON.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
streaming_service.only_update_terminal("🧠 Analyzing conversation context to suggest relevant questions...")
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
"🧠 Analyzing conversation context to suggest relevant questions..."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Create messages for the LLM
|
# Create messages for the LLM
|
||||||
messages = [
|
messages = [
|
||||||
|
@ -1367,46 +1607,66 @@ async def generate_further_questions(state: State, config: RunnableConfig, write
|
||||||
# Extract the further_questions array
|
# Extract the further_questions array
|
||||||
further_questions = parsed_data.get("further_questions", [])
|
further_questions = parsed_data.get("further_questions", [])
|
||||||
|
|
||||||
streaming_service.only_update_terminal(f"✅ Generated {len(further_questions)} contextual follow-up questions!")
|
writer(
|
||||||
|
{
|
||||||
|
"yield_value": streaming_service.format_terminal_info_delta(
|
||||||
|
f"✅ Generated {len(further_questions)} contextual follow-up questions!"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Stream the further questions to the UI
|
# Stream the further questions to the UI
|
||||||
streaming_service.only_update_further_questions(further_questions)
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{
|
||||||
|
"yield_value": streaming_service.format_further_questions_delta(
|
||||||
|
further_questions
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
print(f"Successfully generated {len(further_questions)} further questions")
|
print(f"Successfully generated {len(further_questions)} further questions")
|
||||||
|
|
||||||
return {"further_questions": further_questions}
|
return {"further_questions": further_questions}
|
||||||
else:
|
else:
|
||||||
# If JSON structure not found, return empty list
|
# If JSON structure not found, return empty list
|
||||||
error_message = "Could not find valid JSON in LLM response for further questions"
|
error_message = (
|
||||||
|
"Could not find valid JSON in LLM response for further questions"
|
||||||
|
)
|
||||||
print(error_message)
|
print(error_message)
|
||||||
streaming_service.only_update_terminal(f"⚠️ {error_message}", "warning")
|
writer(
|
||||||
|
{
|
||||||
|
"yield_value": streaming_service.format_error(
|
||||||
|
f"Warning: {error_message}"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
# Stream empty further questions to UI
|
# Stream empty further questions to UI
|
||||||
streaming_service.only_update_further_questions([])
|
writer(
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
{"yield_value": streaming_service.format_further_questions_delta([])}
|
||||||
|
)
|
||||||
return {"further_questions": []}
|
return {"further_questions": []}
|
||||||
|
|
||||||
except (json.JSONDecodeError, ValueError) as e:
|
except (json.JSONDecodeError, ValueError) as e:
|
||||||
# Log the error and return empty list
|
# Log the error and return empty list
|
||||||
error_message = f"Error parsing further questions response: {str(e)}"
|
error_message = f"Error parsing further questions response: {str(e)}"
|
||||||
print(error_message)
|
print(error_message)
|
||||||
streaming_service.only_update_terminal(f"⚠️ {error_message}", "warning")
|
writer(
|
||||||
|
{"yield_value": streaming_service.format_error(f"Warning: {error_message}")}
|
||||||
|
)
|
||||||
|
|
||||||
# Stream empty further questions to UI
|
# Stream empty further questions to UI
|
||||||
streaming_service.only_update_further_questions([])
|
writer({"yield_value": streaming_service.format_further_questions_delta([])})
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
|
||||||
return {"further_questions": []}
|
return {"further_questions": []}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Handle any other errors
|
# Handle any other errors
|
||||||
error_message = f"Error generating further questions: {str(e)}"
|
error_message = f"Error generating further questions: {str(e)}"
|
||||||
print(error_message)
|
print(error_message)
|
||||||
streaming_service.only_update_terminal(f"⚠️ {error_message}", "warning")
|
writer(
|
||||||
|
{"yield_value": streaming_service.format_error(f"Warning: {error_message}")}
|
||||||
|
)
|
||||||
|
|
||||||
# Stream empty further questions to UI
|
# Stream empty further questions to UI
|
||||||
streaming_service.only_update_further_questions([])
|
writer({"yield_value": streaming_service.format_further_questions_delta([])})
|
||||||
writer({"yeild_value": streaming_service._format_annotations()})
|
|
||||||
return {"further_questions": []}
|
return {"further_questions": []}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -54,32 +54,23 @@ async def handle_chat_data(
|
||||||
if message['role'] == "user":
|
if message['role'] == "user":
|
||||||
langchain_chat_history.append(HumanMessage(content=message['content']))
|
langchain_chat_history.append(HumanMessage(content=message['content']))
|
||||||
elif message['role'] == "assistant":
|
elif message['role'] == "assistant":
|
||||||
# Find the last "ANSWER" annotation specifically
|
langchain_chat_history.append(AIMessage(content=message['content']))
|
||||||
answer_annotation = None
|
|
||||||
for annotation in reversed(message['annotations']):
|
|
||||||
if annotation['type'] == "ANSWER":
|
|
||||||
answer_annotation = annotation
|
|
||||||
break
|
|
||||||
|
|
||||||
if answer_annotation:
|
response = StreamingResponse(
|
||||||
answer_text = answer_annotation['content']
|
stream_connector_search_results(
|
||||||
# If content is a list, join it into a single string
|
|
||||||
if isinstance(answer_text, list):
|
|
||||||
answer_text = "\n".join(answer_text)
|
|
||||||
langchain_chat_history.append(AIMessage(content=answer_text))
|
|
||||||
|
|
||||||
response = StreamingResponse(stream_connector_search_results(
|
|
||||||
user_query,
|
user_query,
|
||||||
user.id,
|
user.id,
|
||||||
search_space_id, # Already converted to int in lines 32-37
|
search_space_id,
|
||||||
session,
|
session,
|
||||||
research_mode,
|
research_mode,
|
||||||
selected_connectors,
|
selected_connectors,
|
||||||
langchain_chat_history,
|
langchain_chat_history,
|
||||||
search_mode_str,
|
search_mode_str,
|
||||||
document_ids_to_add_in_context
|
document_ids_to_add_in_context,
|
||||||
))
|
)
|
||||||
response.headers['x-vercel-ai-data-stream'] = 'v1'
|
)
|
||||||
|
|
||||||
|
response.headers["x-vercel-ai-data-stream"] = "v1"
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -23,17 +23,138 @@ class StreamingService:
|
||||||
"content": []
|
"content": []
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
# It is used to send annotations to the frontend
|
|
||||||
|
# DEPRECATED: This sends the full annotation array every time (inefficient)
|
||||||
def _format_annotations(self) -> str:
|
def _format_annotations(self) -> str:
|
||||||
"""
|
"""
|
||||||
Format the annotations as a string
|
Format the annotations as a string
|
||||||
|
|
||||||
|
DEPRECATED: This method sends the full annotation state every time.
|
||||||
|
Use the delta formatters instead for optimal streaming.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
str: The formatted annotations string
|
str: The formatted annotations string
|
||||||
"""
|
"""
|
||||||
return f'8:{json.dumps(self.message_annotations)}\n'
|
return f'8:{json.dumps(self.message_annotations)}\n'
|
||||||
|
|
||||||
# It is used to end Streaming
|
def format_terminal_info_delta(self, text: str, message_type: str = "info") -> str:
|
||||||
|
"""
|
||||||
|
Format a single terminal info message as a delta annotation
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: The terminal message text
|
||||||
|
message_type: The message type (info, error, success, etc.)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The formatted annotation delta string
|
||||||
|
"""
|
||||||
|
message = {"id": self.terminal_idx, "text": text, "type": message_type}
|
||||||
|
self.terminal_idx += 1
|
||||||
|
|
||||||
|
# Update internal state for reference
|
||||||
|
self.message_annotations[0]["content"].append(message)
|
||||||
|
|
||||||
|
# Return only the delta annotation
|
||||||
|
annotation = {"type": "TERMINAL_INFO", "content": [message]}
|
||||||
|
return f"8:[{json.dumps(annotation)}]\n"
|
||||||
|
|
||||||
|
def format_sources_delta(self, sources: List[Dict[str, Any]]) -> str:
|
||||||
|
"""
|
||||||
|
Format sources as a delta annotation
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sources: List of source objects
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The formatted annotation delta string
|
||||||
|
"""
|
||||||
|
# Update internal state
|
||||||
|
self.message_annotations[1]["content"] = sources
|
||||||
|
|
||||||
|
# Return only the delta annotation
|
||||||
|
annotation = {"type": "SOURCES", "content": sources}
|
||||||
|
return f"8:[{json.dumps(annotation)}]\n"
|
||||||
|
|
||||||
|
def format_answer_delta(self, answer_chunk: str) -> str:
|
||||||
|
"""
|
||||||
|
Format a single answer chunk as a delta annotation
|
||||||
|
|
||||||
|
Args:
|
||||||
|
answer_chunk: The new answer chunk to add
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The formatted annotation delta string
|
||||||
|
"""
|
||||||
|
# Update internal state by appending the chunk
|
||||||
|
if isinstance(self.message_annotations[2]["content"], list):
|
||||||
|
self.message_annotations[2]["content"].append(answer_chunk)
|
||||||
|
else:
|
||||||
|
self.message_annotations[2]["content"] = [answer_chunk]
|
||||||
|
|
||||||
|
# Return only the delta annotation with the new chunk
|
||||||
|
annotation = {"type": "ANSWER", "content": [answer_chunk]}
|
||||||
|
return f"8:[{json.dumps(annotation)}]\n"
|
||||||
|
|
||||||
|
def format_answer_annotation(self, answer_lines: List[str]) -> str:
|
||||||
|
"""
|
||||||
|
Format the complete answer as a replacement annotation
|
||||||
|
|
||||||
|
Args:
|
||||||
|
answer_lines: Complete list of answer lines
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The formatted annotation string
|
||||||
|
"""
|
||||||
|
# Update internal state
|
||||||
|
self.message_annotations[2]["content"] = answer_lines
|
||||||
|
|
||||||
|
# Return the full answer annotation
|
||||||
|
annotation = {"type": "ANSWER", "content": answer_lines}
|
||||||
|
return f"8:[{json.dumps(annotation)}]\n"
|
||||||
|
|
||||||
|
def format_further_questions_delta(
|
||||||
|
self, further_questions: List[Dict[str, Any]]
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Format further questions as a delta annotation
|
||||||
|
|
||||||
|
Args:
|
||||||
|
further_questions: List of further question objects
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The formatted annotation delta string
|
||||||
|
"""
|
||||||
|
# Update internal state
|
||||||
|
self.message_annotations[3]["content"] = further_questions
|
||||||
|
|
||||||
|
# Return only the delta annotation
|
||||||
|
annotation = {"type": "FURTHER_QUESTIONS", "content": further_questions}
|
||||||
|
return f"8:[{json.dumps(annotation)}]\n"
|
||||||
|
|
||||||
|
def format_text_chunk(self, text: str) -> str:
|
||||||
|
"""
|
||||||
|
Format a text chunk using the text stream part
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: The text chunk to stream
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The formatted text part string
|
||||||
|
"""
|
||||||
|
return f"0:{json.dumps(text)}\n"
|
||||||
|
|
||||||
|
def format_error(self, error_message: str) -> str:
|
||||||
|
"""
|
||||||
|
Format an error using the error stream part
|
||||||
|
|
||||||
|
Args:
|
||||||
|
error_message: The error message
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The formatted error part string
|
||||||
|
"""
|
||||||
|
return f"3:{json.dumps(error_message)}\n"
|
||||||
|
|
||||||
def format_completion(self, prompt_tokens: int = 156, completion_tokens: int = 204) -> str:
|
def format_completion(self, prompt_tokens: int = 156, completion_tokens: int = 204) -> str:
|
||||||
"""
|
"""
|
||||||
Format a completion message
|
Format a completion message
|
||||||
|
@ -56,7 +177,12 @@ class StreamingService:
|
||||||
}
|
}
|
||||||
return f'd:{json.dumps(completion_data)}\n'
|
return f'd:{json.dumps(completion_data)}\n'
|
||||||
|
|
||||||
|
|
||||||
|
# DEPRECATED METHODS: Keep for backward compatibility but mark as deprecated
|
||||||
def only_update_terminal(self, text: str, message_type: str = "info") -> str:
|
def only_update_terminal(self, text: str, message_type: str = "info") -> str:
|
||||||
|
"""
|
||||||
|
DEPRECATED: Use format_terminal_info_delta() instead for optimal streaming
|
||||||
|
"""
|
||||||
self.message_annotations[0]["content"].append({
|
self.message_annotations[0]["content"].append({
|
||||||
"id": self.terminal_idx,
|
"id": self.terminal_idx,
|
||||||
"text": text,
|
"text": text,
|
||||||
|
@ -66,16 +192,22 @@ class StreamingService:
|
||||||
return self.message_annotations
|
return self.message_annotations
|
||||||
|
|
||||||
def only_update_sources(self, sources: List[Dict[str, Any]]) -> str:
|
def only_update_sources(self, sources: List[Dict[str, Any]]) -> str:
|
||||||
|
"""
|
||||||
|
DEPRECATED: Use format_sources_delta() instead for optimal streaming
|
||||||
|
"""
|
||||||
self.message_annotations[1]["content"] = sources
|
self.message_annotations[1]["content"] = sources
|
||||||
return self.message_annotations
|
return self.message_annotations
|
||||||
|
|
||||||
def only_update_answer(self, answer: List[str]) -> str:
|
def only_update_answer(self, answer: List[str]) -> str:
|
||||||
|
"""
|
||||||
|
DEPRECATED: Use format_answer_delta() or format_answer_annotation() instead for optimal streaming
|
||||||
|
"""
|
||||||
self.message_annotations[2]["content"] = answer
|
self.message_annotations[2]["content"] = answer
|
||||||
return self.message_annotations
|
return self.message_annotations
|
||||||
|
|
||||||
def only_update_further_questions(self, further_questions: List[Dict[str, Any]]) -> str:
|
def only_update_further_questions(self, further_questions: List[Dict[str, Any]]) -> str:
|
||||||
"""
|
"""
|
||||||
Update the further questions annotation
|
DEPRECATED: Use format_further_questions_delta() instead for optimal streaming
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
further_questions: List of further question objects with id and question fields
|
further_questions: List of further question objects with id and question fields
|
||||||
|
|
|
@ -83,9 +83,8 @@ async def stream_connector_search_results(
|
||||||
config=config,
|
config=config,
|
||||||
stream_mode="custom",
|
stream_mode="custom",
|
||||||
):
|
):
|
||||||
# If the chunk contains a 'yeild_value' key, print its value
|
if isinstance(chunk, dict):
|
||||||
# Note: there's a typo in 'yeild_value' in the code, but we need to match it
|
if "yield_value" in chunk:
|
||||||
if isinstance(chunk, dict) and 'yeild_value' in chunk:
|
yield chunk["yield_value"]
|
||||||
yield chunk['yeild_value']
|
|
||||||
|
|
||||||
yield streaming_service.format_completion()
|
yield streaming_service.format_completion()
|
Loading…
Add table
Reference in a new issue