diff --git a/Self_Improving_Search.py b/Self_Improving_Search.py index 07da533..05dcf9b 100644 --- a/Self_Improving_Search.py +++ b/Self_Improving_Search.py @@ -1,22 +1,26 @@ -import sys -import msvcrt -import os -from colorama import init, Fore, Style -import logging import time +import re +import os +from typing import List, Dict, Tuple, Union, Optional +from colorama import Fore, Style, init +import logging +import sys from io import StringIO +from web_scraper import get_web_content, can_fetch from llm_config import get_llm_config from llm_response_parser import UltimateLLMResponseParser from llm_wrapper import LLMWrapper -from strategic_analysis_parser import StrategicAnalysisParser -from research_manager import ResearchManager +from urllib.parse import urlparse, quote_plus +import requests +from bs4 import BeautifulSoup +import json +from datetime import datetime, timedelta +import threading +from queue import Queue +import concurrent.futures # Initialize colorama -if os.name != 'nt': - print("This version is Windows-specific. Please use the Unix version for other operating systems.") - sys.exit(1) - -init() # Initialize colorama +init() # Set up logging log_directory = 'logs' @@ -25,289 +29,347 @@ if not os.path.exists(log_directory): logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -log_file = os.path.join(log_directory, 'web_llm.log') +log_file = os.path.join(log_directory, 'search.log') file_handler = logging.FileHandler(log_file) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) -logger.handlers = [] logger.addHandler(file_handler) -logger.propagate = False -# Disable other loggers -for name in logging.root.manager.loggerDict: - if name != __name__: - logging.getLogger(name).disabled = True +class SearchResult: + def __init__(self, title: str, url: str, snippet: str, score: float = 0.0): + self.title = title + self.url = url + self.snippet = snippet + self.score = score + self.content: Optional[str] = None + self.processed = False + self.error = None -class OutputRedirector: - def __init__(self, stream=None): - self.stream = stream or StringIO() - self.original_stdout = sys.stdout - self.original_stderr = sys.stderr + def to_dict(self) -> Dict: + return { + 'title': self.title, + 'url': self.url, + 'snippet': self.snippet, + 'score': self.score, + 'has_content': bool(self.content), + 'processed': self.processed, + 'error': str(self.error) if self.error else None + } - def __enter__(self): - sys.stdout = self.stream - sys.stderr = self.stream - return self.stream +class EnhancedSelfImprovingSearch: + def __init__(self, llm: LLMWrapper, parser: UltimateLLMResponseParser, max_attempts: int = 5): + self.llm = llm + self.parser = parser + self.max_attempts = max_attempts + self.llm_config = get_llm_config() + self.last_query = "" + self.last_time_range = "" + self.search_cache = {} + self.content_cache = {} + self.max_cache_size = 100 + self.max_concurrent_requests = 5 + self.request_timeout = 15 + self.headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } - def __exit__(self, exc_type, exc_val, exc_tb): - sys.stdout = self.original_stdout - sys.stderr = self.original_stderr - -def print_header(): - print(Fore.CYAN + Style.BRIGHT + """ - ╔══════════════════════════════════════════════════════════╗ - ║ 🌐 Advanced Research Assistant 🤖 ║ - ╚══════════════════════════════════════════════════════════╝ - """ + Style.RESET_ALL) - print(Fore.YELLOW + """ - Welcome to the Advanced Research Assistant! - - Commands: - - For web search: start message with '/' - Example: "/latest news on AI advancements" - - - For research mode: start message with '@' - Example: "@analyze the impact of AI on healthcare" - - Press CTRL+Z to submit input. - """ + Style.RESET_ALL) - -def get_multiline_input() -> str: - """Windows-compatible multiline input handler with improved reliability""" - print(f"{Fore.GREEN}📝 Enter your message (Press CTRL+Z to submit):{Style.RESET_ALL}") - lines = [] - current_line = "" - - try: - while True: - if msvcrt.kbhit(): - char = msvcrt.getch() - - # Convert bytes to string for comparison - char_code = ord(char) - - # CTRL+Z detection (Windows EOF) - if char_code == 26: # ASCII code for CTRL+Z - print() # New line - if current_line: - lines.append(current_line) - return ' '.join(lines).strip() or "q" - - # Enter key - elif char in [b'\r', b'\n']: - print() # New line - lines.append(current_line) - current_line = "" - - # Backspace - elif char_code == 8: # ASCII code for backspace - if current_line: - current_line = current_line[:-1] - print('\b \b', end='', flush=True) - - # Regular character input - elif 32 <= char_code <= 126: # Printable ASCII range - try: - char_str = char.decode('utf-8') - current_line += char_str - print(char_str, end='', flush=True) - except UnicodeDecodeError: - continue - - time.sleep(0.01) # Prevent high CPU usage - - except KeyboardInterrupt: - print("\nInput interrupted") - return "q" - except Exception as e: - logger.error(f"Input error: {str(e)}") - return "q" - -def initialize_system(): - """Initialize system with enhanced error checking and recovery""" - try: - print(Fore.YELLOW + "Initializing system..." + Style.RESET_ALL) - - # Load configuration - llm_config = get_llm_config() - - # Validate Ollama connection - if llm_config['llm_type'] == 'ollama': - import requests - max_retries = 3 - retry_delay = 2 - - for attempt in range(max_retries): - try: - response = requests.get(llm_config['base_url'], timeout=5) - if response.status_code == 200: - break - elif attempt < max_retries - 1: - print(f"{Fore.YELLOW}Retrying Ollama connection ({attempt + 1}/{max_retries})...{Style.RESET_ALL}") - time.sleep(retry_delay) - else: - raise ConnectionError("Cannot connect to Ollama server") - except requests.exceptions.RequestException as e: - if attempt == max_retries - 1: - raise ConnectionError( - "\nCannot connect to Ollama server!" - "\nPlease ensure:" - "\n1. Ollama is installed" - "\n2. Ollama server is running (try 'ollama serve')" - "\n3. The model specified in llm_config.py is pulled" - ) - time.sleep(retry_delay) - - # Initialize components with output redirection - with OutputRedirector() as output: - llm_wrapper = LLMWrapper() - parser = UltimateLLMResponseParser() - search_engine = EnhancedSelfImprovingSearch(llm_wrapper, parser) - research_manager = ResearchManager(llm_wrapper, parser, search_engine) - - # Validate LLM - test_response = llm_wrapper.generate("Test", max_tokens=10) - if not test_response: - raise ConnectionError("LLM failed to generate response") - - print(Fore.GREEN + "System initialized successfully." + Style.RESET_ALL) - return llm_wrapper, parser, search_engine, research_manager - - except Exception as e: - logger.error(f"Error initializing system: {str(e)}", exc_info=True) - print(Fore.RED + f"System initialization failed: {str(e)}" + Style.RESET_ALL) - return None, None, None, None - -def handle_search_mode(search_engine, query): - """Handles web search operations""" - print(f"{Fore.CYAN}Initiating web search...{Style.RESET_ALL}") - try: - # Change search() to search_and_improve() which is the correct method name - results = search_engine.search_and_improve(query) - print(f"\n{Fore.GREEN}Search Results:{Style.RESET_ALL}") - print(results) - except Exception as e: - logger.error(f"Search error: {str(e)}") - print(f"{Fore.RED}Search failed: {str(e)}{Style.RESET_ALL}") - -def handle_research_mode(research_manager, query): - """Handles research mode operations""" - print(f"{Fore.CYAN}Initiating research mode...{Style.RESET_ALL}") - - try: - # Start the research - research_manager.start_research(query) - - submit_key = "CTRL+Z" if os.name == 'nt' else "CTRL+D" - print(f"\n{Fore.YELLOW}Research Running. Available Commands:{Style.RESET_ALL}") - print(f"Type command and press {submit_key}:") - print("'s' = Show status") - print("'f' = Show focus") - print("'q' = Quit research") - - while research_manager.is_active(): - try: - command = get_multiline_input().strip().lower() - if command == 's': - print("\n" + research_manager.get_progress()) - elif command == 'f': - if research_manager.current_focus: - print(f"\n{Fore.CYAN}Current Focus:{Style.RESET_ALL}") - print(f"Area: {research_manager.current_focus.area}") - print(f"Priority: {research_manager.current_focus.priority}") - print(f"Reasoning: {research_manager.current_focus.reasoning}") - else: - print(f"\n{Fore.YELLOW}No current focus area{Style.RESET_ALL}") - elif command == 'q': - break - except KeyboardInterrupt: - break - - # Get final summary first - summary = research_manager.terminate_research() - - # Ensure research UI is fully cleaned up - research_manager._cleanup_research_ui() - - # Now in main terminal, show summary - print(f"\n{Fore.GREEN}Research Summary:{Style.RESET_ALL}") - print(summary) - - # Only NOW start conversation mode if we have a valid summary - if hasattr(research_manager, 'research_complete') and \ - hasattr(research_manager, 'research_summary') and \ - research_manager.research_complete and \ - research_manager.research_summary: - time.sleep(0.5) # Small delay to ensure clean transition - research_manager.start_conversation_mode() - - return - - except KeyboardInterrupt: - print(f"\n{Fore.YELLOW}Research interrupted.{Style.RESET_ALL}") - research_manager.terminate_research() - except Exception as e: - logger.error(f"Research error: {str(e)}") - print(f"\n{Fore.RED}Research error: {str(e)}{Style.RESET_ALL}") - research_manager.terminate_research() - -def main(): - init() # Initialize colorama - print_header() - - try: - components = initialize_system() - if not all(components): - sys.exit(1) - - llm, parser, search_engine, research_manager = components - - while True: - try: - user_input = get_multiline_input() - - # Skip empty inputs - if not user_input: - continue - - # Handle exit commands - if user_input.lower() in ["@quit", "quit", "q"]: - break - - # Handle help command - if user_input.lower() == 'help': - print_header() - continue - - # Process commands - if user_input.startswith('/'): - handle_search_mode(search_engine, user_input[1:].strip()) - elif user_input.startswith('@'): - handle_research_mode(research_manager, user_input[1:].strip()) - else: - print(f"{Fore.YELLOW}Please start with '/' for search or '@' for research.{Style.RESET_ALL}") - - except KeyboardInterrupt: - print(f"\n{Fore.YELLOW}Use 'q' to quit or continue with new input.{Style.RESET_ALL}") - continue - except Exception as e: - logger.error(f"Error processing input: {str(e)}") - print(f"{Fore.RED}Error: {str(e)}{Style.RESET_ALL}") - continue - - except KeyboardInterrupt: - print(f"\n{Fore.YELLOW}Program terminated by user.{Style.RESET_ALL}") - except Exception as e: - logger.critical(f"Critical error: {str(e)}") - print(f"{Fore.RED}Critical error: {str(e)}{Style.RESET_ALL}") - finally: + def search_and_improve(self, query: str, time_range: str = "auto") -> str: + """Main search method that includes self-improvement""" try: - if 'research_manager' in locals() and research_manager: - research_manager.cleanup() + logger.info(f"Starting search for query: {query}") + self.last_query = query + self.last_time_range = time_range + + # Check cache first + cache_key = f"{query}_{time_range}" + if cache_key in self.search_cache: + logger.info("Returning cached results") + return self.search_cache[cache_key] + + # Perform initial search + results = self.perform_search(query, time_range) + if not results: + return "No results found." + + # Enhance results with content fetching + enhanced_results = self.enhance_search_results(results) + + # Generate improved summary + summary = self.generate_enhanced_summary(enhanced_results, query) + + # Cache the results + self.cache_results(cache_key, summary) + + return summary + except Exception as e: - logger.error(f"Cleanup error: {str(e)}") - print(Fore.YELLOW + "\nGoodbye!" + Style.RESET_ALL) - sys.exit(0) + logger.error(f"Search and improve error: {str(e)}", exc_info=True) + return f"Error during search: {str(e)}" + + def perform_search(self, query: str, time_range: str) -> List[SearchResult]: + """Performs web search with improved error handling and retry logic""" + if not query: + return [] + + results = [] + retries = 3 + delay = 2 + + for attempt in range(retries): + try: + encoded_query = quote_plus(query) + search_url = f"https://html.duckduckgo.com/html/?q={encoded_query}" + + response = requests.get(search_url, headers=self.headers, timeout=self.request_timeout) + response.raise_for_status() + + soup = BeautifulSoup(response.text, 'html.parser') + + for i, result in enumerate(soup.select('.result'), 1): + if i > 15: # Increased limit for better coverage + break + + title_elem = result.select_one('.result__title') + snippet_elem = result.select_one('.result__snippet') + link_elem = result.select_one('.result__url') + + if title_elem and link_elem: + title = title_elem.get_text(strip=True) + snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" + url = link_elem.get('href', '') + + # Basic result scoring + score = self.calculate_result_score(title, snippet, query) + + results.append(SearchResult(title, url, snippet, score)) + + if results: + # Sort results by score + results.sort(key=lambda x: x.score, reverse=True) + return results + + if attempt < retries - 1: + logger.warning(f"No results found, retrying ({attempt + 1}/{retries})...") + time.sleep(delay) + + except Exception as e: + logger.error(f"Search attempt {attempt + 1} failed: {str(e)}") + if attempt < retries - 1: + time.sleep(delay) + else: + raise + + return results + + def calculate_result_score(self, title: str, snippet: str, query: str) -> float: + """Calculate relevance score for search result""" + score = 0.0 + query_terms = query.lower().split() + + # Title matching + title_lower = title.lower() + for term in query_terms: + if term in title_lower: + score += 2.0 + + # Snippet matching + snippet_lower = snippet.lower() + for term in query_terms: + if term in snippet_lower: + score += 1.0 + + # Exact phrase matching + if query.lower() in title_lower: + score += 3.0 + if query.lower() in snippet_lower: + score += 1.5 + + return score + + def enhance_search_results(self, results: List[SearchResult]) -> List[SearchResult]: + """Enhance search results with parallel content fetching""" + enhanced_results = [] + + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_concurrent_requests) as executor: + future_to_result = { + executor.submit(self.fetch_and_process_content, result): result + for result in results[:10] # Limit to top 10 results + } + + for future in concurrent.futures.as_completed(future_to_result): + result = future_to_result[future] + try: + content = future.result() + if content: + result.content = content + result.processed = True + enhanced_results.append(result) + except Exception as e: + logger.error(f"Error processing {result.url}: {str(e)}") + result.error = e + + return enhanced_results + + def fetch_and_process_content(self, result: SearchResult) -> Optional[str]: + """Fetch and process content for a search result""" + try: + # Check cache first + if result.url in self.content_cache: + return self.content_cache[result.url] + + # Check if we can fetch the content + if not can_fetch(result.url): + logger.warning(f"Cannot fetch content from {result.url}") + return None + + content = get_web_content(result.url) + if content: + # Process and clean content + cleaned_content = self.clean_content(content) + + # Cache the content + self.cache_content(result.url, cleaned_content) + + return cleaned_content + + except Exception as e: + logger.error(f"Error fetching content from {result.url}: {str(e)}") + return None + + def clean_content(self, content: str) -> str: + """Clean and normalize web content""" + # Remove HTML tags if any remained + content = re.sub(r'<[^>]+>', '', content) + + # Remove extra whitespace + content = re.sub(r'\s+', ' ', content) + + # Remove special characters + content = re.sub(r'[^\w\s.,!?-]', '', content) + + # Truncate if too long + max_length = 5000 + if len(content) > max_length: + content = content[:max_length] + "..." + + return content.strip() + + def generate_enhanced_summary(self, results: List[SearchResult], query: str) -> str: + """Generate an enhanced summary using LLM with improved context""" + try: + # Prepare context from enhanced results + context = self.prepare_summary_context(results, query) + + prompt = f""" + Based on the following comprehensive search results for "{query}", + provide a detailed analysis that: + 1. Synthesizes key information from multiple sources + 2. Highlights important findings and patterns + 3. Maintains factual accuracy and cites sources + 4. Presents a balanced view of different perspectives + 5. Identifies any gaps or limitations in the available information + + Context: + {context} + + Please provide a well-structured analysis: + """ + + summary = self.llm.generate(prompt, max_tokens=1500) + return self.format_summary(summary) + + except Exception as e: + logger.error(f"Summary generation error: {str(e)}") + return f"Error generating summary: {str(e)}" + + def prepare_summary_context(self, results: List[SearchResult], query: str) -> str: + """Prepare context for summary generation""" + context = f"Query: {query}\n\n" + + for i, result in enumerate(results, 1): + context += f"Source {i}:\n" + context += f"Title: {result.title}\n" + context += f"URL: {result.url}\n" + + if result.content: + # Include relevant excerpts from content + excerpts = self.extract_relevant_excerpts(result.content, query) + context += f"Key Excerpts:\n{excerpts}\n" + else: + context += f"Summary: {result.snippet}\n" + + context += "\n" + + return context + + def extract_relevant_excerpts(self, content: str, query: str, max_excerpts: int = 3) -> str: + """Extract relevant excerpts from content""" + sentences = re.split(r'[.!?]+', content) + scored_sentences = [] + + query_terms = set(query.lower().split()) + + for sentence in sentences: + sentence = sentence.strip() + if not sentence: + continue + + score = sum(1 for term in query_terms if term in sentence.lower()) + if score > 0: + scored_sentences.append((sentence, score)) + + # Sort by relevance score and take top excerpts + scored_sentences.sort(key=lambda x: x[1], reverse=True) + excerpts = [sentence for sentence, _ in scored_sentences[:max_excerpts]] + + return "\n".join(f"- {excerpt}" for excerpt in excerpts) + + def format_summary(self, summary: str) -> str: + """Format the final summary for better readability""" + # Add section headers if not present + if not re.search(r'^Key Findings:', summary, re.MULTILINE): + summary = "Key Findings:\n" + summary + + # Add source attribution if not present + if not re.search(r'^Sources:', summary, re.MULTILINE): + summary += "\n\nSources: Based on analysis of search results" + + # Add formatting + summary = summary.replace('Key Findings:', f"{Fore.CYAN}Key Findings:{Style.RESET_ALL}") + summary = summary.replace('Sources:', f"\n{Fore.CYAN}Sources:{Style.RESET_ALL}") + + return summary + + def cache_results(self, key: str, value: str) -> None: + """Cache search results with size limit""" + if len(self.search_cache) >= self.max_cache_size: + # Remove oldest entry + oldest_key = next(iter(self.search_cache)) + del self.search_cache[oldest_key] + + self.search_cache[key] = value + + def cache_content(self, url: str, content: str) -> None: + """Cache web content with size limit""" + if len(self.content_cache) >= self.max_cache_size: + # Remove oldest entry + oldest_key = next(iter(self.content_cache)) + del self.content_cache[oldest_key] + + self.content_cache[url] = content + + def clear_cache(self) -> None: + """Clear all caches""" + self.search_cache.clear() + self.content_cache.clear() + + def get_last_query(self) -> str: + """Returns the last executed query""" + return self.last_query + + def get_last_time_range(self) -> str: + """Returns the last used time range""" + return self.last_time_range if __name__ == "__main__": - main() + pass