From 1029885bd088d9458be964683423272389df633e Mon Sep 17 00:00:00 2001 From: Hafeez <90968109+hafeezhmha@users.noreply.github.com> Date: Wed, 20 Nov 2024 22:29:43 +0530 Subject: [PATCH] Update Self_Improving_Search.py for windows --- Self_Improving_Search.py | 681 ++++++++++++++++----------------------- 1 file changed, 280 insertions(+), 401 deletions(-) diff --git a/Self_Improving_Search.py b/Self_Improving_Search.py index 03cac8f..ea097de 100644 --- a/Self_Improving_Search.py +++ b/Self_Improving_Search.py @@ -1,26 +1,32 @@ -import time -import re -import os -from typing import List, Dict, Tuple, Union -from colorama import Fore, Style -import logging import sys +import msvcrt +import os +from colorama import init, Fore, Style +import logging +import time from io import StringIO -from web_scraper import get_web_content, can_fetch +from Self_Improving_Search import EnhancedSelfImprovingSearch from llm_config import get_llm_config from llm_response_parser import UltimateLLMResponseParser from llm_wrapper import LLMWrapper -from urllib.parse import urlparse +from strategic_analysis_parser import StrategicAnalysisParser +from research_manager import ResearchManager + +# Initialize colorama +if os.name != 'nt': + print("This version is Windows-specific. Please use the Unix version for other operating systems.") + sys.exit(1) + +init() # Initialize colorama # Set up logging log_directory = 'logs' if not os.path.exists(log_directory): - os.makedirs(log_directory) + os.makedirs(log_directory) -# Configure logger logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -log_file = os.path.join(log_directory, 'llama_output.log') +log_file = os.path.join(log_directory, 'web_llm.log') file_handler = logging.FileHandler(log_file) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) @@ -28,19 +34,17 @@ logger.handlers = [] logger.addHandler(file_handler) logger.propagate = False -# Suppress other loggers -for name in ['root', 'duckduckgo_search', 'requests', 'urllib3']: - logging.getLogger(name).setLevel(logging.WARNING) - logging.getLogger(name).handlers = [] - logging.getLogger(name).propagate = False +# Disable other loggers +for name in logging.root.manager.loggerDict: + if name != __name__: + logging.getLogger(name).disabled = True class OutputRedirector: - """Windows-compatible output redirection""" def __init__(self, stream=None): self.stream = stream or StringIO() self.original_stdout = sys.stdout self.original_stderr = sys.stderr - + def __enter__(self): sys.stdout = self.stream sys.stderr = self.stream @@ -50,386 +54,261 @@ class OutputRedirector: sys.stdout = self.original_stdout sys.stderr = self.original_stderr -class EnhancedSelfImprovingSearch: - def __init__(self, llm: LLMWrapper, parser: UltimateLLMResponseParser, max_attempts: int = 5): - self.llm = llm - self.parser = parser - self.max_attempts = max_attempts - self.llm_config = get_llm_config() - - @staticmethod - def initialize_llm(): - llm_wrapper = LLMWrapper() - return llm_wrapper - - def print_thinking(self): - print(Fore.MAGENTA + "🧠 Thinking..." + Style.RESET_ALL) - - def print_searching(self): - print(Fore.MAGENTA + "📝 Searching..." + Style.RESET_ALL) - - def search_and_improve(self, user_query: str) -> str: - attempt = 0 - while attempt < self.max_attempts: - print(f"\n{Fore.CYAN}Search attempt {attempt + 1}:{Style.RESET_ALL}") - self.print_searching() - - try: - formulated_query, time_range = self.formulate_query(user_query, attempt) - - print(f"{Fore.YELLOW}Original query: {user_query}{Style.RESET_ALL}") - print(f"{Fore.YELLOW}Formulated query: {formulated_query}{Style.RESET_ALL}") - print(f"{Fore.YELLOW}Time range: {time_range}{Style.RESET_ALL}") - - if not formulated_query: - print(f"{Fore.RED}Error: Empty search query. Retrying...{Style.RESET_ALL}") - attempt += 1 - continue - - search_results = self.perform_search(formulated_query, time_range) - - if not search_results: - print(f"{Fore.RED}No results found. Retrying with a different query...{Style.RESET_ALL}") - attempt += 1 - continue - - self.display_search_results(search_results) - - selected_urls = self.select_relevant_pages(search_results, user_query) - - if not selected_urls: - print(f"{Fore.RED}No relevant URLs found. Retrying...{Style.RESET_ALL}") - attempt += 1 - continue - - print(Fore.MAGENTA + "⚙️ Scraping selected pages..." + Style.RESET_ALL) - # Scraping is done without OutputRedirector to ensure messages are visible - scraped_content = self.scrape_content(selected_urls) - - if not scraped_content: - print(f"{Fore.RED}Failed to scrape content. Retrying...{Style.RESET_ALL}") - attempt += 1 - continue - - self.display_scraped_content(scraped_content) - - self.print_thinking() - - with OutputRedirector() as output: - evaluation, decision = self.evaluate_scraped_content(user_query, scraped_content) - llm_output = output.getvalue() - logger.info(f"LLM Output in evaluate_scraped_content:\n{llm_output}") - - print(f"{Fore.MAGENTA}Evaluation: {evaluation}{Style.RESET_ALL}") - print(f"{Fore.MAGENTA}Decision: {decision}{Style.RESET_ALL}") - - if decision == "answer": - return self.generate_final_answer(user_query, scraped_content) - elif decision == "refine": - print(f"{Fore.YELLOW}Refining search...{Style.RESET_ALL}") - attempt += 1 - else: - print(f"{Fore.RED}Unexpected decision. Proceeding to answer.{Style.RESET_ALL}") - return self.generate_final_answer(user_query, scraped_content) - - except Exception as e: - print(f"{Fore.RED}An error occurred during search attempt. Check the log file for details.{Style.RESET_ALL}") - logger.error(f"An error occurred during search: {str(e)}", exc_info=True) - attempt += 1 - - return self.synthesize_final_answer(user_query) - - def evaluate_scraped_content(self, user_query: str, scraped_content: Dict[str, str]) -> Tuple[str, str]: - user_query_short = user_query[:200] - prompt = f""" -Evaluate if the following scraped content contains sufficient information to answer the user's question comprehensively: - -User's question: "{user_query_short}" - -Scraped Content: -{self.format_scraped_content(scraped_content)} - -Your task: -1. Determine if the scraped content provides enough relevant and detailed information to answer the user's question thoroughly. -2. If the information is sufficient, decide to 'answer'. If more information or clarification is needed, decide to 'refine' the search. - -Respond using EXACTLY this format: -Evaluation: [Your evaluation of the scraped content] -Decision: [ONLY 'answer' if content is sufficient, or 'refine' if more information is needed] -""" - max_retries = 3 - for attempt in range(max_retries): - try: - response_text = self.llm.generate(prompt, max_tokens=200, stop=None) - evaluation, decision = self.parse_evaluation_response(response_text) - if decision in ['answer', 'refine']: - return evaluation, decision - except Exception as e: - logger.warning(f"Error in evaluate_scraped_content (attempt {attempt + 1}): {str(e)}") - - logger.warning("Failed to get a valid decision in evaluate_scraped_content. Defaulting to 'refine'.") - return "Failed to evaluate content.", "refine" - - def parse_evaluation_response(self, response: str) -> Tuple[str, str]: - evaluation = "" - decision = "" - for line in response.strip().split('\n'): - if line.startswith('Evaluation:'): - evaluation = line.split(':', 1)[1].strip() - elif line.startswith('Decision:'): - decision = line.split(':', 1)[1].strip().lower() - return evaluation, decision - - def formulate_query(self, user_query: str, attempt: int) -> Tuple[str, str]: - user_query_short = user_query[:200] - prompt = f""" -Based on the following user question, formulate a concise and effective search query: -"{user_query_short}" -Your task: -1. Create a search query of 2-5 words that will yield relevant results. -2. Determine if a specific time range is needed for the search. -Time range options: -- 'd': Limit results to the past day. Use for very recent events or rapidly changing information. -- 'w': Limit results to the past week. Use for recent events or topics with frequent updates. -- 'm': Limit results to the past month. Use for relatively recent information or ongoing events. -- 'y': Limit results to the past year. Use for annual events or information that changes yearly. -- 'none': No time limit. Use for historical information or topics not tied to a specific time frame. -Respond in the following format: -Search query: [Your 2-5 word query] -Time range: [d/w/m/y/none] -Do not provide any additional information or explanation. -""" - max_retries = 3 - for retry in range(max_retries): - with OutputRedirector() as output: - response_text = self.llm.generate(prompt, max_tokens=50, stop=None) - llm_output = output.getvalue() - logger.info(f"LLM Output in formulate_query:\n{llm_output}") - query, time_range = self.parse_query_response(response_text) - if query and time_range: - return query, time_range - return self.fallback_query(user_query), "none" - - def parse_query_response(self, response: str) -> Tuple[str, str]: - query = "" - time_range = "none" - for line in response.strip().split('\n'): - if ":" in line: - key, value = line.split(":", 1) - key = key.strip().lower() - value = value.strip() - if "query" in key: - query = self.clean_query(value) - elif "time" in key or "range" in key: - time_range = self.validate_time_range(value) - return query, time_range - - def clean_query(self, query: str) -> str: - query = re.sub(r'["\'\[\]]', '', query) - query = re.sub(r'\s+', ' ', query) - return query.strip()[:100] - - def validate_time_range(self, time_range: str) -> str: - valid_ranges = ['d', 'w', 'm', 'y', 'none'] - time_range = time_range.lower() - return time_range if time_range in valid_ranges else 'none' - - def fallback_query(self, user_query: str) -> str: - words = user_query.split() - return " ".join(words[:5]) - - def perform_search(self, query: str, time_range: str) -> List[Dict]: - if not query: - return [] - - from duckduckgo_search import DDGS - - with DDGS() as ddgs: - try: - with OutputRedirector() as output: - if time_range and time_range != 'none': - results = list(ddgs.text(query, timelimit=time_range, max_results=10)) - else: - results = list(ddgs.text(query, max_results=10)) - ddg_output = output.getvalue() - logger.info(f"DDG Output in perform_search:\n{ddg_output}") - return [{'number': i+1, **result} for i, result in enumerate(results)] - except Exception as e: - print(f"{Fore.RED}Search error: {str(e)}{Style.RESET_ALL}") - return [] - - def display_search_results(self, results: List[Dict]) -> None: - """Display search results with minimal output""" - try: - if not results: - return - - # Only show search success status - print(f"\nSearch query sent to DuckDuckGo: {self.last_query}") - print(f"Time range sent to DuckDuckGo: {self.last_time_range}") - print(f"Number of results: {len(results)}") - - except Exception as e: - logger.error(f"Error displaying search results: {str(e)}") - - def select_relevant_pages(self, search_results: List[Dict], user_query: str) -> List[str]: - prompt = f""" -Given the following search results for the user's question: "{user_query}" -Select the 2 most relevant results to scrape and analyze. Explain your reasoning for each selection. - -Search Results: -{self.format_results(search_results)} - -Instructions: -1. You MUST select exactly 2 result numbers from the search results. -2. Choose the results that are most likely to contain comprehensive and relevant information to answer the user's question. -3. Provide a brief reason for each selection. - -You MUST respond using EXACTLY this format and nothing else: - -Selected Results: [Two numbers corresponding to the selected results] -Reasoning: [Your reasoning for the selections] -""" - - max_retries = 3 - for retry in range(max_retries): - with OutputRedirector() as output: - response_text = self.llm.generate(prompt, max_tokens=200, stop=None) - llm_output = output.getvalue() - logger.info(f"LLM Output in select_relevant_pages:\n{llm_output}") - - parsed_response = self.parse_page_selection_response(response_text) - if parsed_response and self.validate_page_selection_response(parsed_response, len(search_results)): - selected_urls = [result['href'] for result in search_results if result['number'] in parsed_response['selected_results']] - - allowed_urls = [url for url in selected_urls if can_fetch(url)] - if allowed_urls: - return allowed_urls - else: - print(f"{Fore.YELLOW}Warning: All selected URLs are disallowed by robots.txt. Retrying selection.{Style.RESET_ALL}") - else: - print(f"{Fore.YELLOW}Warning: Invalid page selection. Retrying.{Style.RESET_ALL}") - - print(f"{Fore.YELLOW}Warning: All attempts to select relevant pages failed. Falling back to top allowed results.{Style.RESET_ALL}") - allowed_urls = [result['href'] for result in search_results if can_fetch(result['href'])][:2] - return allowed_urls - - def parse_page_selection_response(self, response: str) -> Dict[str, Union[List[int], str]]: - lines = response.strip().split('\n') - parsed = {} - for line in lines: - if line.startswith('Selected Results:'): - parsed['selected_results'] = [int(num.strip()) for num in re.findall(r'\d+', line)] - elif line.startswith('Reasoning:'): - parsed['reasoning'] = line.split(':', 1)[1].strip() - return parsed if 'selected_results' in parsed and 'reasoning' in parsed else None - - def validate_page_selection_response(self, parsed_response: Dict[str, Union[List[int], str]], num_results: int) -> bool: - if len(parsed_response['selected_results']) != 2: - return False - if any(num < 1 or num > num_results for num in parsed_response['selected_results']): - return False - return True - - def format_results(self, results: List[Dict]) -> str: - formatted_results = [] - for result in results: - formatted_result = f"{result['number']}. Title: {result.get('title', 'N/A')}\n" - formatted_result += f" Snippet: {result.get('body', 'N/A')[:200]}...\n" - formatted_result += f" URL: {result.get('href', 'N/A')}\n" - formatted_results.append(formatted_result) - return "\n".join(formatted_results) - - def scrape_content(self, urls: List[str]) -> Dict[str, str]: - scraped_content = {} - blocked_urls = [] - for url in urls: - robots_allowed = can_fetch(url) - if robots_allowed: - content = get_web_content([url]) - if content: - scraped_content.update(content) - print(Fore.YELLOW + f"Successfully scraped: {url}" + Style.RESET_ALL) - logger.info(f"Successfully scraped: {url}") - else: - print(Fore.RED + f"Robots.txt disallows scraping of {url}" + Style.RESET_ALL) - logger.warning(f"Robots.txt disallows scraping of {url}") - else: - blocked_urls.append(url) - print(Fore.RED + f"Warning: Robots.txt disallows scraping of {url}" + Style.RESET_ALL) - logger.warning(f"Robots.txt disallows scraping of {url}") - - print(Fore.CYAN + f"Scraped content received for {len(scraped_content)} URLs" + Style.RESET_ALL) - logger.info(f"Scraped content received for {len(scraped_content)} URLs") - - if blocked_urls: - print(Fore.RED + f"Warning: {len(blocked_urls)} URL(s) were not scraped due to robots.txt restrictions." + Style.RESET_ALL) - logger.warning(f"{len(blocked_urls)} URL(s) were not scraped due to robots.txt restrictions: {', '.join(blocked_urls)}") - - return scraped_content - - def display_scraped_content(self, scraped_content: Dict[str, str]): - print(f"\n{Fore.CYAN}Scraped Content:{Style.RESET_ALL}") - for url, content in scraped_content.items(): - print(f"{Fore.GREEN}URL: {url}{Style.RESET_ALL}") - print(f"Content: {content[:4000]}...\n") - - def generate_final_answer(self, user_query: str, scraped_content: Dict[str, str]) -> str: - user_query_short = user_query[:200] - prompt = f""" -You are an AI assistant. Provide a comprehensive and detailed answer to the following question using ONLY the information provided in the scraped content. Do not include any references or mention any sources. Answer directly and thoroughly. - -Question: "{user_query_short}" - -Scraped Content: -{self.format_scraped_content(scraped_content)} - -Important Instructions: -1. Do not use phrases like "Based on the absence of selected results" or similar. -2. If the scraped content does not contain enough information to answer the question, say so explicitly and explain what information is missing. -3. Provide as much relevant detail as possible from the scraped content. - -Answer: -""" - max_retries = 3 - for attempt in range(max_retries): - with OutputRedirector() as output: - response_text = self.llm.generate(prompt, max_tokens=1024, stop=None) - llm_output = output.getvalue() - logger.info(f"LLM Output in generate_final_answer:\n{llm_output}") - if response_text: - logger.info(f"LLM Response:\n{response_text}") - return response_text - - error_message = "I apologize, but I couldn't generate a satisfactory answer based on the available information." - logger.warning(f"Failed to generate a response after {max_retries} attempts. Returning error message.") - return error_message - - def format_scraped_content(self, scraped_content: Dict[str, str]) -> str: - formatted_content = [] - for url, content in scraped_content.items(): - content = re.sub(r'\s+', ' ', content) - formatted_content.append(f"Content from {url}:\n{content}\n") - return "\n".join(formatted_content) - - def synthesize_final_answer(self, user_query: str) -> str: - prompt = f""" -After multiple search attempts, we couldn't find a fully satisfactory answer to the user's question: "{user_query}" - -Please provide the best possible answer you can, acknowledging any limitations or uncertainties. -If appropriate, suggest ways the user might refine their question or where they might find more information. - -Respond in a clear, concise, and informative manner. -""" - try: - with OutputRedirector() as output: - response_text = self.llm.generate(prompt, max_tokens=self.llm_config.get('max_tokens', 1024), stop=self.llm_config.get('stop', None)) - llm_output = output.getvalue() - logger.info(f"LLM Output in synthesize_final_answer:\n{llm_output}") - if response_text: - return response_text.strip() - except Exception as e: - logger.error(f"Error in synthesize_final_answer: {str(e)}", exc_info=True) - return "I apologize, but after multiple attempts, I wasn't able to find a satisfactory answer to your question. Please try rephrasing your question or breaking it down into smaller, more specific queries." - -# End of EnhancedSelfImprovingSearch class +def print_header(): + print(Fore.CYAN + Style.BRIGHT + """ + ╔══════════════════════════════════════════════════════════╗ + ║ 🌐 Advanced Research Assistant 🤖 ║ + ╚══════════════════════════════════════════════════════════╝ + """ + Style.RESET_ALL) + print(Fore.YELLOW + """ + Welcome to the Advanced Research Assistant! + + Commands: + - For web search: start message with '/' + Example: "/latest news on AI advancements" + + - For research mode: start message with '@' + Example: "@analyze the impact of AI on healthcare" + + Press CTRL+Z to submit input. + """ + Style.RESET_ALL) + +def get_multiline_input() -> str: + """Windows-compatible multiline input handler with improved reliability""" + print(f"{Fore.GREEN}📝 Enter your message (Press CTRL+Z to submit):{Style.RESET_ALL}") + lines = [] + current_line = "" + + try: + while True: + if msvcrt.kbhit(): + char = msvcrt.getch() + + # Convert bytes to string for comparison + char_code = ord(char) + + # CTRL+Z detection (Windows EOF) + if char_code == 26: # ASCII code for CTRL+Z + print() # New line + if current_line: + lines.append(current_line) + return ' '.join(lines).strip() or "q" + + # Enter key + elif char in [b'\r', b'\n']: + print() # New line + lines.append(current_line) + current_line = "" + + # Backspace + elif char_code == 8: # ASCII code for backspace + if current_line: + current_line = current_line[:-1] + print('\b \b', end='', flush=True) + + # Regular character input + elif 32 <= char_code <= 126: # Printable ASCII range + try: + char_str = char.decode('utf-8') + current_line += char_str + print(char_str, end='', flush=True) + except UnicodeDecodeError: + continue + + time.sleep(0.01) # Prevent high CPU usage + + except KeyboardInterrupt: + print("\nInput interrupted") + return "q" + except Exception as e: + logger.error(f"Input error: {str(e)}") + return "q" + +def initialize_system(): + """Initialize system with enhanced error checking and recovery""" + try: + print(Fore.YELLOW + "Initializing system..." + Style.RESET_ALL) + + # Load configuration + llm_config = get_llm_config() + + # Validate Ollama connection + if llm_config['llm_type'] == 'ollama': + import requests + max_retries = 3 + retry_delay = 2 + + for attempt in range(max_retries): + try: + response = requests.get(llm_config['base_url'], timeout=5) + if response.status_code == 200: + break + elif attempt < max_retries - 1: + print(f"{Fore.YELLOW}Retrying Ollama connection ({attempt + 1}/{max_retries})...{Style.RESET_ALL}") + time.sleep(retry_delay) + else: + raise ConnectionError("Cannot connect to Ollama server") + except requests.exceptions.RequestException as e: + if attempt == max_retries - 1: + raise ConnectionError( + "\nCannot connect to Ollama server!" + "\nPlease ensure:" + "\n1. Ollama is installed" + "\n2. Ollama server is running (try 'ollama serve')" + "\n3. The model specified in llm_config.py is pulled" + ) + time.sleep(retry_delay) + + # Initialize components with output redirection + with OutputRedirector() as output: + llm_wrapper = LLMWrapper() + parser = UltimateLLMResponseParser() + search_engine = EnhancedSelfImprovingSearch(llm_wrapper, parser) + research_manager = ResearchManager(llm_wrapper, parser, search_engine) + + # Validate LLM + test_response = llm_wrapper.generate("Test", max_tokens=10) + if not test_response: + raise ConnectionError("LLM failed to generate response") + + print(Fore.GREEN + "System initialized successfully." + Style.RESET_ALL) + return llm_wrapper, parser, search_engine, research_manager + + except Exception as e: + logger.error(f"Error initializing system: {str(e)}", exc_info=True) + print(Fore.RED + f"System initialization failed: {str(e)}" + Style.RESET_ALL) + return None, None, None, None + +def handle_search_mode(search_engine, query): + """Handles web search operations""" + print(f"{Fore.CYAN}Initiating web search...{Style.RESET_ALL}") + try: + # Change search() to search_and_improve() which is the correct method name + results = search_engine.search_and_improve(query) + print(f"\n{Fore.GREEN}Search Results:{Style.RESET_ALL}") + print(results) + except Exception as e: + logger.error(f"Search error: {str(e)}") + print(f"{Fore.RED}Search failed: {str(e)}{Style.RESET_ALL}") + +def handle_research_mode(research_manager, query): + """Handles research mode operations""" + print(f"{Fore.CYAN}Initiating research mode...{Style.RESET_ALL}") + + try: + # Start the research + research_manager.start_research(query) + + submit_key = "CTRL+Z" if os.name == 'nt' else "CTRL+D" + print(f"\n{Fore.YELLOW}Research Running. Available Commands:{Style.RESET_ALL}") + print(f"Type command and press {submit_key}:") + print("'s' = Show status") + print("'f' = Show focus") + print("'q' = Quit research") + + while research_manager.is_active(): + try: + command = get_multiline_input().strip().lower() + if command == 's': + print("\n" + research_manager.get_progress()) + elif command == 'f': + if research_manager.current_focus: + print(f"\n{Fore.CYAN}Current Focus:{Style.RESET_ALL}") + print(f"Area: {research_manager.current_focus.area}") + print(f"Priority: {research_manager.current_focus.priority}") + print(f"Reasoning: {research_manager.current_focus.reasoning}") + else: + print(f"\n{Fore.YELLOW}No current focus area{Style.RESET_ALL}") + elif command == 'q': + break + except KeyboardInterrupt: + break + + # Get final summary first + summary = research_manager.terminate_research() + + # Ensure research UI is fully cleaned up + research_manager._cleanup_research_ui() + + # Now in main terminal, show summary + print(f"\n{Fore.GREEN}Research Summary:{Style.RESET_ALL}") + print(summary) + + # Only NOW start conversation mode if we have a valid summary + if hasattr(research_manager, 'research_complete') and \ + hasattr(research_manager, 'research_summary') and \ + research_manager.research_complete and \ + research_manager.research_summary: + time.sleep(0.5) # Small delay to ensure clean transition + research_manager.start_conversation_mode() + + return + + except KeyboardInterrupt: + print(f"\n{Fore.YELLOW}Research interrupted.{Style.RESET_ALL}") + research_manager.terminate_research() + except Exception as e: + logger.error(f"Research error: {str(e)}") + print(f"\n{Fore.RED}Research error: {str(e)}{Style.RESET_ALL}") + research_manager.terminate_research() + +def main(): + init() # Initialize colorama + print_header() + + try: + components = initialize_system() + if not all(components): + sys.exit(1) + + llm, parser, search_engine, research_manager = components + + while True: + try: + user_input = get_multiline_input() + + # Skip empty inputs + if not user_input: + continue + + # Handle exit commands + if user_input.lower() in ["@quit", "quit", "q"]: + break + + # Handle help command + if user_input.lower() == 'help': + print_header() + continue + + # Process commands + if user_input.startswith('/'): + handle_search_mode(search_engine, user_input[1:].strip()) + elif user_input.startswith('@'): + handle_research_mode(research_manager, user_input[1:].strip()) + else: + print(f"{Fore.YELLOW}Please start with '/' for search or '@' for research.{Style.RESET_ALL}") + + except KeyboardInterrupt: + print(f"\n{Fore.YELLOW}Use 'q' to quit or continue with new input.{Style.RESET_ALL}") + continue + except Exception as e: + logger.error(f"Error processing input: {str(e)}") + print(f"{Fore.RED}Error: {str(e)}{Style.RESET_ALL}") + continue + + except KeyboardInterrupt: + print(f"\n{Fore.YELLOW}Program terminated by user.{Style.RESET_ALL}") + except Exception as e: + logger.critical(f"Critical error: {str(e)}") + print(f"{Fore.RED}Critical error: {str(e)}{Style.RESET_ALL}") + finally: + try: + if 'research_manager' in locals() and research_manager: + research_manager.cleanup() + except Exception as e: + logger.error(f"Cleanup error: {str(e)}") + print(Fore.YELLOW + "\nGoodbye!" + Style.RESET_ALL) + sys.exit(0) + +if __name__ == "__main__": + main()