Update research_manager.py - patch update 2.0

This commit is contained in:
Hafeez 2024-11-21 18:02:39 +05:30 committed by GitHub
parent 555ddf951e
commit 3e439cc593
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -20,10 +20,10 @@ from urllib.parse import urlparse
from pathlib import Path
# Initialize colorama for cross-platform color support
if os.name == 'nt':
import msvcrt
if os.name == 'nt': # Windows-specific initialization
init(convert=True, strip=False, wrap=True)
else:
raise ImportError("This version is Windows-specific. Please use the Unix version for other operating systems.")
init()
# Set up logging
log_directory = 'logs'
@ -53,6 +53,7 @@ class ResearchFocus:
source_query: str = ""
timestamp: str = ""
search_queries: List[str] = None
reasoning: Optional[str] = None
def __post_init__(self):
if not self.timestamp:
@ -299,14 +300,13 @@ class TerminalUI:
self.research_thread = None
self.last_display_height = 0 # Track display height for corruption fix
def setup(self):
"""Initialize the terminal UI"""
if self.is_setup:
return
# Save terminal settings
if not os.name == 'nt': # Unix-like systems
if os.name != 'nt': # Unix-like systems
self.old_terminal_settings = termios.tcgetattr(sys.stdin.fileno())
self.stdscr = curses.initscr()
@ -315,12 +315,6 @@ class TerminalUI:
curses.cbreak()
self.stdscr.keypad(True)
# Enable only scroll wheel events, not all mouse events
# curses.mousemask(curses.BUTTON4_PRESSED | curses.BUTTON5_PRESSED)
# Remove this line that was causing the spam
# print('\033[?1003h') # We don't want mouse movement events
# Get terminal dimensions
self.max_y, self.max_x = self.stdscr.getmaxyx()
@ -361,7 +355,8 @@ class TerminalUI:
curses.endwin()
# Restore original terminal settings
if self.old_terminal_settings and not os.name == 'nt':
if self.old_terminal_settings and os.name != 'nt':
import termios
termios.tcsetattr(
sys.stdin.fileno(),
termios.TCSADRAIN,
@ -543,20 +538,40 @@ class TerminalUI:
os._exit(0) # Ensure exit
class NonBlockingInput:
"""Windows-compatible non-blocking input"""
"""Handles non-blocking keyboard input for Windows systems."""
def __init__(self):
pass
"""Initialize NonBlockingInput (no special setup required on Windows)."""
if os.name != 'nt':
raise EnvironmentError("NonBlockingInput is designed for Windows only.")
def __enter__(self):
"""Enter the context (no-op for Windows)."""
return self
def __exit__(self, type, value, traceback):
"""Exit the context (no-op for Windows)."""
pass
def check_input(self, timeout=0.1):
"""Check for input without blocking"""
"""
Check for keyboard input without blocking.
Args:
timeout (float): Time in seconds to wait before returning if no input.
Returns:
str or None: The input character as a string, or None if no input is detected.
"""
start_time = time.time()
while True:
if msvcrt.kbhit():
try:
return msvcrt.getch().decode('utf-8')
except UnicodeDecodeError:
# Handle non-ASCII characters gracefully
return None
if time.time() - start_time > timeout:
return None
class ResearchManager:
@ -809,7 +824,7 @@ Do not provide any additional information or explanation, note that the time ran
char = msvcrt.getch()
# CTRL+D or CTRL+Z detection
if char in [b'\x04', b'\x1a']:
if char in [b'\x04', b'\x1a']: # CTRL+D (Unix) and CTRL+Z (Windows)
sys.stdout.write('\n')
if current_line:
buffer.append(''.join(current_line))
@ -830,8 +845,22 @@ Do not provide any additional information or explanation, note that the time ran
sys.stdout.write('\n')
return 'quit'
# Normal character
elif 32 <= ord(char[0]) <= 126:
# Handle command inputs like 'p', 's', 'f', 'q'
elif char == b's': # Show status command
buffer.append('s')
sys.stdout.write('s') # Print 's' to indicate the command
elif char == b'f': # Show focus command
buffer.append('f')
sys.stdout.write('f') # Print 'f' to indicate the command
elif char == b'p': # Pause command
buffer.append('p')
sys.stdout.write('p') # Print 'p' to indicate the command
elif char == b'q': # Quit command
buffer.append('q')
sys.stdout.write('q') # Print 'q' to indicate the command
# Normal character input
elif 32 <= ord(char[0]) <= 126: # Printable characters
current_line.append(char.decode('utf-8'))
sys.stdout.write(char.decode('utf-8'))
@ -840,7 +869,6 @@ Do not provide any additional information or explanation, note that the time ran
except Exception as e:
logger.error(f"Error in multiline input: {str(e)}")
return 'quit'
def _process_search_results(self, results: Dict[str, str], focus_area: str):
"""Process and store search results"""
if not results:
@ -854,74 +882,91 @@ Do not provide any additional information or explanation, note that the time ran
"""Main research loop with comprehensive functionality"""
self.is_running = True
try:
logging.debug("Research loop started.")
self.research_started.set()
while not self.should_terminate.is_set() and not self.shutdown_event.is_set():
# Check if research is paused
if self.research_paused:
logging.debug("Research is paused.")
time.sleep(1)
continue
self.ui.update_output("\nAnalyzing research progress...")
logging.debug("Analyzing research progress.")
# Generate focus areas
self.ui.update_output("\nGenerating research focus areas...")
logging.debug("Generating research focus areas.")
analysis_result = self.strategic_parser.strategic_analysis(self.original_query)
if not analysis_result:
self.ui.update_output("\nFailed to generate analysis result. Retrying...")
logging.warning("Failed to generate analysis result. Retrying...")
continue
focus_areas = analysis_result.focus_areas
if not focus_areas:
self.ui.update_output("\nNo valid focus areas generated. Retrying...")
logging.warning("No valid focus areas generated. Retrying...")
continue
self.ui.update_output(f"\nGenerated {len(focus_areas)} research areas:")
logging.debug(f"Generated {len(focus_areas)} research areas.")
for i, focus in enumerate(focus_areas, 1):
self.ui.update_output(f"\nArea {i}: {focus.area}")
self.ui.update_output(f"Priority: {focus.priority}")
logging.debug(f"Area {i}: {focus.area}, Priority: {focus.priority}")
# Process each focus area in priority order
for focus_area in focus_areas:
if self.should_terminate.is_set():
logging.debug("Termination signal received. Exiting focus area processing.")
break
# Check if research is paused
while self.research_paused and not self.should_terminate.is_set():
logging.debug("Research is paused during focus area processing.")
time.sleep(1)
if self.should_terminate.is_set():
logging.debug("Termination signal received. Exiting focus area processing.")
break
self.current_focus = focus_area
self.ui.update_output(f"\nInvestigating: {focus_area.area}")
logging.debug(f"Investigating focus area: {focus_area.area}")
queries = self.formulate_search_queries(focus_area)
if not queries:
logging.warning("No queries formulated for focus area.")
continue
for query in queries:
if self.should_terminate.is_set():
logging.debug("Termination signal received. Exiting query processing.")
break
# Check if research is paused
while self.research_paused and not self.should_terminate.is_set():
logging.debug("Research is paused during query processing.")
time.sleep(1)
if self.should_terminate.is_set():
logging.debug("Termination signal received. Exiting query processing.")
break
try:
self.ui.update_output(f"\nSearching: {query}")
logging.debug(f"Performing search for query: {query}")
results = self.search_engine.perform_search(query, time_range='none')
if results:
# self.search_engine.display_search_results(results)
selected_urls = self.search_engine.select_relevant_pages(results, query)
if selected_urls:
self.ui.update_output("\n⚙️ Scraping selected pages...")
logging.debug("Scraping selected pages.")
scraped_content = self.search_engine.scrape_content(selected_urls)
if scraped_content:
for url, content in scraped_content.items():
@ -934,20 +979,23 @@ Do not provide any additional information or explanation, note that the time ran
if self.check_document_size():
self.ui.update_output("\nDocument size limit reached. Finalizing research.")
logging.info("Document size limit reached. Finalizing research.")
return
# After processing all areas, cycle back to generate new ones
self.ui.update_output("\nAll current focus areas investigated. Generating new areas...")
logging.debug("All current focus areas investigated. Generating new areas.")
except Exception as e:
logger.error(f"Error in research loop: {str(e)}")
self.ui.update_output(f"Error in research process: {str(e)}")
finally:
self.is_running = False
logging.debug("Research loop ended.")
def start_research(self, topic: str):
"""Start research with new session document"""
try:
logging.debug("Setting up UI and initializing document.")
self.ui.setup()
self.original_query = topic
self._initialize_document()
@ -961,21 +1009,25 @@ Do not provide any additional information or explanation, note that the time ran
self.ui.update_output("'q' = Quit research\n")
# Reset events
logging.debug("Resetting events and flags.")
self.should_terminate.clear()
self.research_started.clear()
self.research_paused = False # Ensure research is not paused at the start
self.awaiting_user_decision = False
# Start research thread
logging.debug("Starting research thread.")
self.research_thread = threading.Thread(target=self._research_loop, daemon=True)
self.research_thread.start()
# Wait for research to actually start
if not self.research_started.wait(timeout=10):
self.ui.update_output("Error: Research failed to start within timeout period")
logging.error("Research failed to start within timeout period.")
self.should_terminate.set()
return
logging.debug("Entering command loop.")
while not self.should_terminate.is_set():
cmd = self.ui.get_input("Enter command: ")
if cmd is None or self.shutdown_event.is_set():
@ -989,8 +1041,9 @@ Do not provide any additional information or explanation, note that the time ran
self._handle_command(cmd)
except Exception as e:
logger.error(f"Error in research process: {str(e)}")
logging.error(f"Error in research process: {str(e)}")
finally:
logging.debug("Cleaning up resources.")
self._cleanup()
def check_document_size(self) -> bool:
@ -1032,6 +1085,175 @@ Do not provide any additional information or explanation, note that the time ran
self.ui.update_output("\nFinal Research Summary:")
self.ui.update_output(summary)
def show_progress_indicator(self, message="Generating summary, please wait..."):
"""Show a rotating progress indicator until the summary is ready."""
symbols = ['|', '/', '-', '\\']
idx = 0
self.summary_ready = False # Track whether the summary is complete
try:
while not self.summary_ready:
sys.stdout.write(f"\r{message} {symbols[idx]}")
sys.stdout.flush()
idx = (idx + 1) % len(symbols)
time.sleep(0.2) # Adjust the speed of the rotation if needed
except KeyboardInterrupt:
sys.stdout.write("\rOperation interrupted.\n")
self.summary_ready = True
finally:
sys.stdout.write("\r" + " " * (len(message) + 2) + "\r") # Clear the line when done
sys.stdout.flush()
def _cleanup_research_ui(self):
"""Clean up just the research UI components"""
if hasattr(self, 'ui') and self.ui:
self.ui.cleanup()
def show_thinking_indicator(self, message: str, stop_flag_name: str):
"""Show a rotating thinking indicator with custom message"""
symbols = ['|', '/', '-', '\\']
idx = 0
while getattr(self, stop_flag_name): # Use dynamic attribute lookup
sys.stdout.write(f"\r{message} {symbols[idx]}")
sys.stdout.flush()
idx = (idx + 1) % len(symbols)
time.sleep(0.2)
sys.stdout.write("\r" + " " * (len(message) + 2) + "\r") # Clear the line when done
def start_conversation_mode(self):
"""Start interactive conversation mode with CTRL+D input handling and thinking indicator"""
self.conversation_active = True
self.thinking = False
# Print header with clear instructions
print("\n" + "="*80)
print(Fore.CYAN + "Research Conversation Mode" + Style.RESET_ALL)
print("="*80)
print(Fore.YELLOW + "\nInstructions:")
print("- Type your question and press CTRL+D to submit")
print("- Type 'quit' and press CTRL+D to exit")
print("- Your messages appear in green")
print("- AI responses appear in cyan" + Style.RESET_ALL + "\n")
while self.conversation_active:
try:
# Show prompt with user input in green
print(Fore.GREEN + "Your question (Press CTRL+D to submit):" + Style.RESET_ALL)
user_input = self.get_multiline_conversation_input()
# Handle exit commands
if not user_input or user_input.lower() in ['quit', 'exit', 'q']:
print(Fore.YELLOW + "\nExiting conversation mode..." + Style.RESET_ALL)
self.conversation_active = False
break
# Skip empty input
if not user_input.strip():
continue
# Echo the submitted question for clarity
print(Fore.GREEN + "Submitted question:" + Style.RESET_ALL)
print(Fore.GREEN + user_input + Style.RESET_ALL + "\n")
# Start thinking indicator in a separate thread
self.thinking = True # Set flag before starting thread
thinking_thread = threading.Thread(
target=self.show_thinking_indicator,
args=("Thinking...", "thinking")
)
thinking_thread.daemon = True
thinking_thread.start()
try:
# Generate response
response = self._generate_conversation_response(user_input)
# Stop thinking indicator
self.thinking = False
thinking_thread.join()
# Display response in cyan
print(Fore.CYAN + "AI Response:" + Style.RESET_ALL)
print(f"{Fore.CYAN}{response}{Style.RESET_ALL}\n")
print("-" * 80 + "\n") # Separator between QA pairs
except Exception as e:
self.thinking = False # Ensure thinking indicator stops
thinking_thread.join()
raise e
except KeyboardInterrupt:
self.thinking = False # Ensure thinking indicator stops
print(Fore.YELLOW + "\nOperation cancelled. Submit 'quit' to exit." + Style.RESET_ALL)
except Exception as e:
logger.error(f"Error in conversation mode: {str(e)}")
print(Fore.RED + f"Error processing question: {str(e)}" + Style.RESET_ALL)
def _generate_conversation_response(self, user_query: str) -> str:
"""Generate contextual responses with improved context handling"""
try:
# Add debug logging to verify content
logger.info(f"Research summary length: {len(self.research_summary) if self.research_summary else 0}")
logger.info(f"Research content length: {len(self.research_content) if self.research_content else 0}")
# First verify we have content
if not self.research_content and not self.research_summary:
# Try to reload from file if available
try:
if os.path.exists(self.document_path):
with open(self.document_path, 'r', encoding='utf-8') as f:
self.research_content = f.read().strip()
except Exception as e:
logger.error(f"Failed to reload research content: {str(e)}")
# Prepare context, ensuring we have content
context = f"""
Research Content:
{self.research_content}
Research Summary:
{self.research_summary if self.research_summary else 'No summary available'}
"""
prompt = f"""
Based on the following research content and summary, please answer this question:
{context}
Question: {user_query}
you have 2 sets of instructions the applied set and the unapplied set, the applied set should be followed if the question is directly relating to the research content whereas anything else other then direct questions about the content of the research will result in you instead following the unapplied ruleset
Applied:
Instructions:
1. Answer based ONLY on the research content provided above if asked a question about your research or that content.
2. If the information requested isn't in the research, clearly state that it isn't in the content you gathered.
3. Be direct and specific in your response, DO NOT directly cite research unless specifically asked to, be concise and give direct answers to questions based on the research, unless instructed otherwise.
Unapplied:
Instructions:
1. Do not make up anything that isn't actually true.
2. Respond directly to the user's question in an honest and thoughtful manner.
3. disregard rules in the applied set for queries not DIRECTLY related to the research, including queries about the research process or what you remember about the research should result in the unapplied ruleset being used.
Answer:
"""
response = self.llm.generate(
prompt,
max_tokens=1000, # Increased for more detailed responses
temperature=0.7
)
if not response or not response.strip():
return "I apologize, but I cannot find relevant information in the research content to answer your question."
return response.strip()
except Exception as e:
logger.error(f"Error generating response: {str(e)}")
return f"I apologize, but I encountered an error processing your question: {str(e)}"
def pause_and_assess(self):
"""Pause the research and assess if the collected content is sufficient."""
try:
@ -1230,170 +1452,178 @@ Research Progress:
# Clean up research UI
self._cleanup_research_ui()
def show_progress_indicator(self, message="Generating summary, please wait..."):
"""Show a rotating progress indicator until the summary is ready."""
symbols = ['|', '/', '-', '\\']
idx = 0
self.summary_ready = False # Track whether the summary is complete
while not self.summary_ready:
sys.stdout.write(f"\r{message} {symbols[idx]}")
sys.stdout.flush()
idx = (idx + 1) % len(symbols)
time.sleep(0.2) # Adjust the speed of the rotation if needed
sys.stdout.write("\r" + " " * (len(message) + 2) + "\r") # Clear the line when done
# def show_progress_indicator(self, message="Generating summary, please wait..."):
# """Show a rotating progress indicator until the summary is ready."""
# symbols = ['|', '/', '-', '\\']
# idx = 0
# self.summary_ready = False # Track whether the summary is complete
def _cleanup_research_ui(self):
"""Clean up just the research UI components"""
if hasattr(self, 'ui') and self.ui:
self.ui.cleanup()
# try:
# while not self.summary_ready:
# sys.stdout.write(f"\r{message} {symbols[idx]}")
# sys.stdout.flush()
# idx = (idx + 1) % len(symbols)
# time.sleep(0.2) # Adjust the speed of the rotation if needed
# except KeyboardInterrupt:
# sys.stdout.write("\rOperation interrupted.\n")
# self.summary_ready = True
# finally:
# sys.stdout.write("\r" + " " * (len(message) + 2) + "\r") # Clear the line when done
# sys.stdout.flush()
# Clear the line when done
def show_thinking_indicator(self, message: str, stop_flag_name: str):
"""Show a rotating thinking indicator with custom message"""
symbols = ['|', '/', '-', '\\']
idx = 0
while getattr(self, stop_flag_name): # Use dynamic attribute lookup
sys.stdout.write(f"\r{message} {symbols[idx]}")
sys.stdout.flush()
idx = (idx + 1) % len(symbols)
time.sleep(0.2)
sys.stdout.write("\r" + " " * (len(message) + 2) + "\r") # Clear the line when done
# def _cleanup_research_ui(self):
# """Clean up just the research UI components"""
# if hasattr(self, 'ui') and self.ui:
# self.ui.cleanup()
def start_conversation_mode(self):
"""Start interactive conversation mode with CTRL+D input handling and thinking indicator"""
self.conversation_active = True
self.thinking = False
# def show_thinking_indicator(self, message: str, stop_flag_name: str):
# """Show a rotating thinking indicator with custom message"""
# symbols = ['|', '/', '-', '\\']
# idx = 0
# while getattr(self, stop_flag_name): # Use dynamic attribute lookup
# sys.stdout.write(f"\r{message} {symbols[idx]}")
# sys.stdout.flush()
# idx = (idx + 1) % len(symbols)
# time.sleep(0.2)
# sys.stdout.write("\r" + " " * (len(message) + 2) + "\r") # Clear the line when done
# Print header with clear instructions
print("\n" + "="*80)
print(Fore.CYAN + "Research Conversation Mode" + Style.RESET_ALL)
print("="*80)
print(Fore.YELLOW + "\nInstructions:")
print("- Type your question and press CTRL+D to submit")
print("- Type 'quit' and press CTRL+D to exit")
print("- Your messages appear in green")
print("- AI responses appear in cyan" + Style.RESET_ALL + "\n")
# def start_conversation_mode(self):
# """Start interactive conversation mode with CTRL+D input handling and thinking indicator"""
# self.conversation_active = True
# self.thinking = False
while self.conversation_active:
try:
# Show prompt with user input in green
print(Fore.GREEN + "Your question (Press CTRL+D to submit):" + Style.RESET_ALL)
user_input = self.get_multiline_conversation_input()
# # Print header with clear instructions
# print("\n" + "="*80)
# print(Fore.CYAN + "Research Conversation Mode" + Style.RESET_ALL)
# print("="*80)
# print(Fore.YELLOW + "\nInstructions:")
# print("- Type your question and press CTRL+D to submit")
# print("- Type 'quit' and press CTRL+D to exit")
# print("- Your messages appear in green")
# print("- AI responses appear in cyan" + Style.RESET_ALL + "\n")
# Handle exit commands
if not user_input or user_input.lower() in ['quit', 'exit', 'q']:
print(Fore.YELLOW + "\nExiting conversation mode..." + Style.RESET_ALL)
self.conversation_active = False
break
# while self.conversation_active:
# try:
# # Show prompt with user input in green
# print(Fore.GREEN + "Your question (Press CTRL+D to submit):" + Style.RESET_ALL)
# user_input = self.get_multiline_conversation_input()
# Skip empty input
if not user_input.strip():
continue
# # Handle exit commands
# if not user_input or user_input.lower() in ['quit', 'exit', 'q']:
# print(Fore.YELLOW + "\nExiting conversation mode..." + Style.RESET_ALL)
# self.conversation_active = False
# break
# Echo the submitted question for clarity
print(Fore.GREEN + "Submitted question:" + Style.RESET_ALL)
print(Fore.GREEN + user_input + Style.RESET_ALL + "\n")
# # Skip empty input
# if not user_input.strip():
# continue
# Start thinking indicator in a separate thread
self.thinking = True # Set flag before starting thread
thinking_thread = threading.Thread(
target=self.show_thinking_indicator,
args=("Thinking...", "thinking")
)
thinking_thread.daemon = True
thinking_thread.start()
# # Echo the submitted question for clarity
# print(Fore.GREEN + "Submitted question:" + Style.RESET_ALL)
# print(Fore.GREEN + user_input + Style.RESET_ALL + "\n")
try:
# Generate response
response = self._generate_conversation_response(user_input)
# # Start thinking indicator in a separate thread
# self.thinking = True # Set flag before starting thread
# thinking_thread = threading.Thread(
# target=self.show_thinking_indicator,
# args=("Thinking...", "thinking")
# )
# thinking_thread.daemon = True
# thinking_thread.start()
# Stop thinking indicator
self.thinking = False
thinking_thread.join()
# try:
# # Generate response
# response = self._generate_conversation_response(user_input)
# Display response in cyan
print(Fore.CYAN + "AI Response:" + Style.RESET_ALL)
print(f"{Fore.CYAN}{response}{Style.RESET_ALL}\n")
print("-" * 80 + "\n") # Separator between QA pairs
# # Stop thinking indicator
# self.thinking = False
# thinking_thread.join()
except Exception as e:
self.thinking = False # Ensure thinking indicator stops
thinking_thread.join()
raise e
# # Display response in cyan
# print(Fore.CYAN + "AI Response:" + Style.RESET_ALL)
# print(f"{Fore.CYAN}{response}{Style.RESET_ALL}\n")
# print("-" * 80 + "\n") # Separator between QA pairs
except KeyboardInterrupt:
self.thinking = False # Ensure thinking indicator stops
print(Fore.YELLOW + "\nOperation cancelled. Submit 'quit' to exit." + Style.RESET_ALL)
except Exception as e:
logger.error(f"Error in conversation mode: {str(e)}")
print(Fore.RED + f"Error processing question: {str(e)}" + Style.RESET_ALL)
# except Exception as e:
# self.thinking = False # Ensure thinking indicator stops
# thinking_thread.join()
# raise e
def _generate_conversation_response(self, user_query: str) -> str:
"""Generate contextual responses with improved context handling"""
try:
# Add debug logging to verify content
logger.info(f"Research summary length: {len(self.research_summary) if self.research_summary else 0}")
logger.info(f"Research content length: {len(self.research_content) if self.research_content else 0}")
# except KeyboardInterrupt:
# self.thinking = False # Ensure thinking indicator stops
# print(Fore.YELLOW + "\nOperation cancelled. Submit 'quit' to exit." + Style.RESET_ALL)
# except Exception as e:
# logger.error(f"Error in conversation mode: {str(e)}")
# print(Fore.RED + f"Error processing question: {str(e)}" + Style.RESET_ALL)
# First verify we have content
if not self.research_content and not self.research_summary:
# Try to reload from file if available
try:
if os.path.exists(self.document_path):
with open(self.document_path, 'r', encoding='utf-8') as f:
self.research_content = f.read().strip()
except Exception as e:
logger.error(f"Failed to reload research content: {str(e)}")
# def _generate_conversation_response(self, user_query: str) -> str:
# """Generate contextual responses with improved context handling"""
# try:
# # Add debug logging to verify content
# logger.info(f"Research summary length: {len(self.research_summary) if self.research_summary else 0}")
# logger.info(f"Research content length: {len(self.research_content) if self.research_content else 0}")
# Prepare context, ensuring we have content
context = f"""
Research Content:
{self.research_content}
# # First verify we have content
# if not self.research_content and not self.research_summary:
# # Try to reload from file if available
# try:
# if os.path.exists(self.document_path):
# with open(self.document_path, 'r', encoding='utf-8') as f:
# self.research_content = f.read().strip()
# except Exception as e:
# logger.error(f"Failed to reload research content: {str(e)}")
Research Summary:
{self.research_summary if self.research_summary else 'No summary available'}
"""
# # Prepare context, ensuring we have content
# context = f"""
# Research Content:
# {self.research_content}
prompt = f"""
Based on the following research content and summary, please answer this question:
# Research Summary:
# {self.research_summary if self.research_summary else 'No summary available'}
# """
{context}
# prompt = f"""
# Based on the following research content and summary, please answer this question:
Question: {user_query}
# {context}
you have 2 sets of instructions the applied set and the unapplied set, the applied set should be followed if the question is directly relating to the research content whereas anything else other then direct questions about the content of the research will result in you instead following the unapplied ruleset
# Question: {user_query}
Applied:
# you have 2 sets of instructions the applied set and the unapplied set, the applied set should be followed if the question is directly relating to the research content whereas anything else other then direct questions about the content of the research will result in you instead following the unapplied ruleset
Instructions:
1. Answer based ONLY on the research content provided above if asked a question about your research or that content.
2. If the information requested isn't in the research, clearly state that it isn't in the content you gathered.
3. Be direct and specific in your response, DO NOT directly cite research unless specifically asked to, be concise and give direct answers to questions based on the research, unless instructed otherwise.
# Applied:
Unapplied:
# Instructions:
# 1. Answer based ONLY on the research content provided above if asked a question about your research or that content.
# 2. If the information requested isn't in the research, clearly state that it isn't in the content you gathered.
# 3. Be direct and specific in your response, DO NOT directly cite research unless specifically asked to, be concise and give direct answers to questions based on the research, unless instructed otherwise.
Instructions:
# Unapplied:
1. Do not make up anything that isn't actually true.
2. Respond directly to the user's question in an honest and thoughtful manner.
3. disregard rules in the applied set for queries not DIRECTLY related to the research, including queries about the research process or what you remember about the research should result in the unapplied ruleset being used.
# Instructions:
Answer:
"""
# 1. Do not make up anything that isn't actually true.
# 2. Respond directly to the user's question in an honest and thoughtful manner.
# 3. disregard rules in the applied set for queries not DIRECTLY related to the research, including queries about the research process or what you remember about the research should result in the unapplied ruleset being used.
response = self.llm.generate(
prompt,
max_tokens=1000, # Increased for more detailed responses
temperature=0.7
)
# Answer:
# """
if not response or not response.strip():
return "I apologize, but I cannot find relevant information in the research content to answer your question."
# response = self.llm.generate(
# prompt,
# max_tokens=1000, # Increased for more detailed responses
# temperature=0.7
# )
return response.strip()
# if not response or not response.strip():
# return "I apologize, but I cannot find relevant information in the research content to answer your question."
except Exception as e:
logger.error(f"Error generating response: {str(e)}")
return f"I apologize, but I encountered an error processing your question: {str(e)}"
# return response.strip()
# except Exception as e:
# logger.error(f"Error generating response: {str(e)}")
# return f"I apologize, but I encountered an error processing your question: {str(e)}"
if __name__ == "__main__":