Update Self_Improving_Search.py for windows

This commit is contained in:
Hafeez 2024-11-20 22:29:43 +05:30 committed by GitHub
parent ab194f62a0
commit 1029885bd0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,26 +1,32 @@
import time
import re
import os
from typing import List, Dict, Tuple, Union
from colorama import Fore, Style
import logging
import sys
import msvcrt
import os
from colorama import init, Fore, Style
import logging
import time
from io import StringIO
from web_scraper import get_web_content, can_fetch
from Self_Improving_Search import EnhancedSelfImprovingSearch
from llm_config import get_llm_config
from llm_response_parser import UltimateLLMResponseParser
from llm_wrapper import LLMWrapper
from urllib.parse import urlparse
from strategic_analysis_parser import StrategicAnalysisParser
from research_manager import ResearchManager
# Initialize colorama
if os.name != 'nt':
print("This version is Windows-specific. Please use the Unix version for other operating systems.")
sys.exit(1)
init() # Initialize colorama
# Set up logging
log_directory = 'logs'
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Configure logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
log_file = os.path.join(log_directory, 'llama_output.log')
log_file = os.path.join(log_directory, 'web_llm.log')
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
@ -28,14 +34,12 @@ logger.handlers = []
logger.addHandler(file_handler)
logger.propagate = False
# Suppress other loggers
for name in ['root', 'duckduckgo_search', 'requests', 'urllib3']:
logging.getLogger(name).setLevel(logging.WARNING)
logging.getLogger(name).handlers = []
logging.getLogger(name).propagate = False
# Disable other loggers
for name in logging.root.manager.loggerDict:
if name != __name__:
logging.getLogger(name).disabled = True
class OutputRedirector:
"""Windows-compatible output redirection"""
def __init__(self, stream=None):
self.stream = stream or StringIO()
self.original_stdout = sys.stdout
@ -50,386 +54,261 @@ class OutputRedirector:
sys.stdout = self.original_stdout
sys.stderr = self.original_stderr
class EnhancedSelfImprovingSearch:
def __init__(self, llm: LLMWrapper, parser: UltimateLLMResponseParser, max_attempts: int = 5):
self.llm = llm
self.parser = parser
self.max_attempts = max_attempts
self.llm_config = get_llm_config()
def print_header():
print(Fore.CYAN + Style.BRIGHT + """
🌐 Advanced Research Assistant 🤖
""" + Style.RESET_ALL)
print(Fore.YELLOW + """
Welcome to the Advanced Research Assistant!
@staticmethod
def initialize_llm():
llm_wrapper = LLMWrapper()
return llm_wrapper
Commands:
- For web search: start message with '/'
Example: "/latest news on AI advancements"
def print_thinking(self):
print(Fore.MAGENTA + "🧠 Thinking..." + Style.RESET_ALL)
- For research mode: start message with '@'
Example: "@analyze the impact of AI on healthcare"
def print_searching(self):
print(Fore.MAGENTA + "📝 Searching..." + Style.RESET_ALL)
Press CTRL+Z to submit input.
""" + Style.RESET_ALL)
def search_and_improve(self, user_query: str) -> str:
attempt = 0
while attempt < self.max_attempts:
print(f"\n{Fore.CYAN}Search attempt {attempt + 1}:{Style.RESET_ALL}")
self.print_searching()
def get_multiline_input() -> str:
"""Windows-compatible multiline input handler with improved reliability"""
print(f"{Fore.GREEN}📝 Enter your message (Press CTRL+Z to submit):{Style.RESET_ALL}")
lines = []
current_line = ""
try:
formulated_query, time_range = self.formulate_query(user_query, attempt)
while True:
if msvcrt.kbhit():
char = msvcrt.getch()
print(f"{Fore.YELLOW}Original query: {user_query}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Formulated query: {formulated_query}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Time range: {time_range}{Style.RESET_ALL}")
# Convert bytes to string for comparison
char_code = ord(char)
if not formulated_query:
print(f"{Fore.RED}Error: Empty search query. Retrying...{Style.RESET_ALL}")
attempt += 1
# CTRL+Z detection (Windows EOF)
if char_code == 26: # ASCII code for CTRL+Z
print() # New line
if current_line:
lines.append(current_line)
return ' '.join(lines).strip() or "q"
# Enter key
elif char in [b'\r', b'\n']:
print() # New line
lines.append(current_line)
current_line = ""
# Backspace
elif char_code == 8: # ASCII code for backspace
if current_line:
current_line = current_line[:-1]
print('\b \b', end='', flush=True)
# Regular character input
elif 32 <= char_code <= 126: # Printable ASCII range
try:
char_str = char.decode('utf-8')
current_line += char_str
print(char_str, end='', flush=True)
except UnicodeDecodeError:
continue
search_results = self.perform_search(formulated_query, time_range)
if not search_results:
print(f"{Fore.RED}No results found. Retrying with a different query...{Style.RESET_ALL}")
attempt += 1
continue
self.display_search_results(search_results)
selected_urls = self.select_relevant_pages(search_results, user_query)
if not selected_urls:
print(f"{Fore.RED}No relevant URLs found. Retrying...{Style.RESET_ALL}")
attempt += 1
continue
print(Fore.MAGENTA + "⚙️ Scraping selected pages..." + Style.RESET_ALL)
# Scraping is done without OutputRedirector to ensure messages are visible
scraped_content = self.scrape_content(selected_urls)
if not scraped_content:
print(f"{Fore.RED}Failed to scrape content. Retrying...{Style.RESET_ALL}")
attempt += 1
continue
self.display_scraped_content(scraped_content)
self.print_thinking()
with OutputRedirector() as output:
evaluation, decision = self.evaluate_scraped_content(user_query, scraped_content)
llm_output = output.getvalue()
logger.info(f"LLM Output in evaluate_scraped_content:\n{llm_output}")
print(f"{Fore.MAGENTA}Evaluation: {evaluation}{Style.RESET_ALL}")
print(f"{Fore.MAGENTA}Decision: {decision}{Style.RESET_ALL}")
if decision == "answer":
return self.generate_final_answer(user_query, scraped_content)
elif decision == "refine":
print(f"{Fore.YELLOW}Refining search...{Style.RESET_ALL}")
attempt += 1
else:
print(f"{Fore.RED}Unexpected decision. Proceeding to answer.{Style.RESET_ALL}")
return self.generate_final_answer(user_query, scraped_content)
time.sleep(0.01) # Prevent high CPU usage
except KeyboardInterrupt:
print("\nInput interrupted")
return "q"
except Exception as e:
print(f"{Fore.RED}An error occurred during search attempt. Check the log file for details.{Style.RESET_ALL}")
logger.error(f"An error occurred during search: {str(e)}", exc_info=True)
attempt += 1
logger.error(f"Input error: {str(e)}")
return "q"
return self.synthesize_final_answer(user_query)
def initialize_system():
"""Initialize system with enhanced error checking and recovery"""
try:
print(Fore.YELLOW + "Initializing system..." + Style.RESET_ALL)
def evaluate_scraped_content(self, user_query: str, scraped_content: Dict[str, str]) -> Tuple[str, str]:
user_query_short = user_query[:200]
prompt = f"""
Evaluate if the following scraped content contains sufficient information to answer the user's question comprehensively:
# Load configuration
llm_config = get_llm_config()
User's question: "{user_query_short}"
Scraped Content:
{self.format_scraped_content(scraped_content)}
Your task:
1. Determine if the scraped content provides enough relevant and detailed information to answer the user's question thoroughly.
2. If the information is sufficient, decide to 'answer'. If more information or clarification is needed, decide to 'refine' the search.
Respond using EXACTLY this format:
Evaluation: [Your evaluation of the scraped content]
Decision: [ONLY 'answer' if content is sufficient, or 'refine' if more information is needed]
"""
# Validate Ollama connection
if llm_config['llm_type'] == 'ollama':
import requests
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
response_text = self.llm.generate(prompt, max_tokens=200, stop=None)
evaluation, decision = self.parse_evaluation_response(response_text)
if decision in ['answer', 'refine']:
return evaluation, decision
except Exception as e:
logger.warning(f"Error in evaluate_scraped_content (attempt {attempt + 1}): {str(e)}")
logger.warning("Failed to get a valid decision in evaluate_scraped_content. Defaulting to 'refine'.")
return "Failed to evaluate content.", "refine"
def parse_evaluation_response(self, response: str) -> Tuple[str, str]:
evaluation = ""
decision = ""
for line in response.strip().split('\n'):
if line.startswith('Evaluation:'):
evaluation = line.split(':', 1)[1].strip()
elif line.startswith('Decision:'):
decision = line.split(':', 1)[1].strip().lower()
return evaluation, decision
def formulate_query(self, user_query: str, attempt: int) -> Tuple[str, str]:
user_query_short = user_query[:200]
prompt = f"""
Based on the following user question, formulate a concise and effective search query:
"{user_query_short}"
Your task:
1. Create a search query of 2-5 words that will yield relevant results.
2. Determine if a specific time range is needed for the search.
Time range options:
- 'd': Limit results to the past day. Use for very recent events or rapidly changing information.
- 'w': Limit results to the past week. Use for recent events or topics with frequent updates.
- 'm': Limit results to the past month. Use for relatively recent information or ongoing events.
- 'y': Limit results to the past year. Use for annual events or information that changes yearly.
- 'none': No time limit. Use for historical information or topics not tied to a specific time frame.
Respond in the following format:
Search query: [Your 2-5 word query]
Time range: [d/w/m/y/none]
Do not provide any additional information or explanation.
"""
max_retries = 3
for retry in range(max_retries):
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=50, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in formulate_query:\n{llm_output}")
query, time_range = self.parse_query_response(response_text)
if query and time_range:
return query, time_range
return self.fallback_query(user_query), "none"
def parse_query_response(self, response: str) -> Tuple[str, str]:
query = ""
time_range = "none"
for line in response.strip().split('\n'):
if ":" in line:
key, value = line.split(":", 1)
key = key.strip().lower()
value = value.strip()
if "query" in key:
query = self.clean_query(value)
elif "time" in key or "range" in key:
time_range = self.validate_time_range(value)
return query, time_range
def clean_query(self, query: str) -> str:
query = re.sub(r'["\'\[\]]', '', query)
query = re.sub(r'\s+', ' ', query)
return query.strip()[:100]
def validate_time_range(self, time_range: str) -> str:
valid_ranges = ['d', 'w', 'm', 'y', 'none']
time_range = time_range.lower()
return time_range if time_range in valid_ranges else 'none'
def fallback_query(self, user_query: str) -> str:
words = user_query.split()
return " ".join(words[:5])
def perform_search(self, query: str, time_range: str) -> List[Dict]:
if not query:
return []
from duckduckgo_search import DDGS
with DDGS() as ddgs:
try:
with OutputRedirector() as output:
if time_range and time_range != 'none':
results = list(ddgs.text(query, timelimit=time_range, max_results=10))
response = requests.get(llm_config['base_url'], timeout=5)
if response.status_code == 200:
break
elif attempt < max_retries - 1:
print(f"{Fore.YELLOW}Retrying Ollama connection ({attempt + 1}/{max_retries})...{Style.RESET_ALL}")
time.sleep(retry_delay)
else:
results = list(ddgs.text(query, max_results=10))
ddg_output = output.getvalue()
logger.info(f"DDG Output in perform_search:\n{ddg_output}")
return [{'number': i+1, **result} for i, result in enumerate(results)]
except Exception as e:
print(f"{Fore.RED}Search error: {str(e)}{Style.RESET_ALL}")
return []
raise ConnectionError("Cannot connect to Ollama server")
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise ConnectionError(
"\nCannot connect to Ollama server!"
"\nPlease ensure:"
"\n1. Ollama is installed"
"\n2. Ollama server is running (try 'ollama serve')"
"\n3. The model specified in llm_config.py is pulled"
)
time.sleep(retry_delay)
def display_search_results(self, results: List[Dict]) -> None:
"""Display search results with minimal output"""
# Initialize components with output redirection
with OutputRedirector() as output:
llm_wrapper = LLMWrapper()
parser = UltimateLLMResponseParser()
search_engine = EnhancedSelfImprovingSearch(llm_wrapper, parser)
research_manager = ResearchManager(llm_wrapper, parser, search_engine)
# Validate LLM
test_response = llm_wrapper.generate("Test", max_tokens=10)
if not test_response:
raise ConnectionError("LLM failed to generate response")
print(Fore.GREEN + "System initialized successfully." + Style.RESET_ALL)
return llm_wrapper, parser, search_engine, research_manager
except Exception as e:
logger.error(f"Error initializing system: {str(e)}", exc_info=True)
print(Fore.RED + f"System initialization failed: {str(e)}" + Style.RESET_ALL)
return None, None, None, None
def handle_search_mode(search_engine, query):
"""Handles web search operations"""
print(f"{Fore.CYAN}Initiating web search...{Style.RESET_ALL}")
try:
if not results:
# Change search() to search_and_improve() which is the correct method name
results = search_engine.search_and_improve(query)
print(f"\n{Fore.GREEN}Search Results:{Style.RESET_ALL}")
print(results)
except Exception as e:
logger.error(f"Search error: {str(e)}")
print(f"{Fore.RED}Search failed: {str(e)}{Style.RESET_ALL}")
def handle_research_mode(research_manager, query):
"""Handles research mode operations"""
print(f"{Fore.CYAN}Initiating research mode...{Style.RESET_ALL}")
try:
# Start the research
research_manager.start_research(query)
submit_key = "CTRL+Z" if os.name == 'nt' else "CTRL+D"
print(f"\n{Fore.YELLOW}Research Running. Available Commands:{Style.RESET_ALL}")
print(f"Type command and press {submit_key}:")
print("'s' = Show status")
print("'f' = Show focus")
print("'q' = Quit research")
while research_manager.is_active():
try:
command = get_multiline_input().strip().lower()
if command == 's':
print("\n" + research_manager.get_progress())
elif command == 'f':
if research_manager.current_focus:
print(f"\n{Fore.CYAN}Current Focus:{Style.RESET_ALL}")
print(f"Area: {research_manager.current_focus.area}")
print(f"Priority: {research_manager.current_focus.priority}")
print(f"Reasoning: {research_manager.current_focus.reasoning}")
else:
print(f"\n{Fore.YELLOW}No current focus area{Style.RESET_ALL}")
elif command == 'q':
break
except KeyboardInterrupt:
break
# Get final summary first
summary = research_manager.terminate_research()
# Ensure research UI is fully cleaned up
research_manager._cleanup_research_ui()
# Now in main terminal, show summary
print(f"\n{Fore.GREEN}Research Summary:{Style.RESET_ALL}")
print(summary)
# Only NOW start conversation mode if we have a valid summary
if hasattr(research_manager, 'research_complete') and \
hasattr(research_manager, 'research_summary') and \
research_manager.research_complete and \
research_manager.research_summary:
time.sleep(0.5) # Small delay to ensure clean transition
research_manager.start_conversation_mode()
return
# Only show search success status
print(f"\nSearch query sent to DuckDuckGo: {self.last_query}")
print(f"Time range sent to DuckDuckGo: {self.last_time_range}")
print(f"Number of results: {len(results)}")
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Research interrupted.{Style.RESET_ALL}")
research_manager.terminate_research()
except Exception as e:
logger.error(f"Error displaying search results: {str(e)}")
logger.error(f"Research error: {str(e)}")
print(f"\n{Fore.RED}Research error: {str(e)}{Style.RESET_ALL}")
research_manager.terminate_research()
def select_relevant_pages(self, search_results: List[Dict], user_query: str) -> List[str]:
prompt = f"""
Given the following search results for the user's question: "{user_query}"
Select the 2 most relevant results to scrape and analyze. Explain your reasoning for each selection.
def main():
init() # Initialize colorama
print_header()
Search Results:
{self.format_results(search_results)}
Instructions:
1. You MUST select exactly 2 result numbers from the search results.
2. Choose the results that are most likely to contain comprehensive and relevant information to answer the user's question.
3. Provide a brief reason for each selection.
You MUST respond using EXACTLY this format and nothing else:
Selected Results: [Two numbers corresponding to the selected results]
Reasoning: [Your reasoning for the selections]
"""
max_retries = 3
for retry in range(max_retries):
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=200, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in select_relevant_pages:\n{llm_output}")
parsed_response = self.parse_page_selection_response(response_text)
if parsed_response and self.validate_page_selection_response(parsed_response, len(search_results)):
selected_urls = [result['href'] for result in search_results if result['number'] in parsed_response['selected_results']]
allowed_urls = [url for url in selected_urls if can_fetch(url)]
if allowed_urls:
return allowed_urls
else:
print(f"{Fore.YELLOW}Warning: All selected URLs are disallowed by robots.txt. Retrying selection.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}Warning: Invalid page selection. Retrying.{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Warning: All attempts to select relevant pages failed. Falling back to top allowed results.{Style.RESET_ALL}")
allowed_urls = [result['href'] for result in search_results if can_fetch(result['href'])][:2]
return allowed_urls
def parse_page_selection_response(self, response: str) -> Dict[str, Union[List[int], str]]:
lines = response.strip().split('\n')
parsed = {}
for line in lines:
if line.startswith('Selected Results:'):
parsed['selected_results'] = [int(num.strip()) for num in re.findall(r'\d+', line)]
elif line.startswith('Reasoning:'):
parsed['reasoning'] = line.split(':', 1)[1].strip()
return parsed if 'selected_results' in parsed and 'reasoning' in parsed else None
def validate_page_selection_response(self, parsed_response: Dict[str, Union[List[int], str]], num_results: int) -> bool:
if len(parsed_response['selected_results']) != 2:
return False
if any(num < 1 or num > num_results for num in parsed_response['selected_results']):
return False
return True
def format_results(self, results: List[Dict]) -> str:
formatted_results = []
for result in results:
formatted_result = f"{result['number']}. Title: {result.get('title', 'N/A')}\n"
formatted_result += f" Snippet: {result.get('body', 'N/A')[:200]}...\n"
formatted_result += f" URL: {result.get('href', 'N/A')}\n"
formatted_results.append(formatted_result)
return "\n".join(formatted_results)
def scrape_content(self, urls: List[str]) -> Dict[str, str]:
scraped_content = {}
blocked_urls = []
for url in urls:
robots_allowed = can_fetch(url)
if robots_allowed:
content = get_web_content([url])
if content:
scraped_content.update(content)
print(Fore.YELLOW + f"Successfully scraped: {url}" + Style.RESET_ALL)
logger.info(f"Successfully scraped: {url}")
else:
print(Fore.RED + f"Robots.txt disallows scraping of {url}" + Style.RESET_ALL)
logger.warning(f"Robots.txt disallows scraping of {url}")
else:
blocked_urls.append(url)
print(Fore.RED + f"Warning: Robots.txt disallows scraping of {url}" + Style.RESET_ALL)
logger.warning(f"Robots.txt disallows scraping of {url}")
print(Fore.CYAN + f"Scraped content received for {len(scraped_content)} URLs" + Style.RESET_ALL)
logger.info(f"Scraped content received for {len(scraped_content)} URLs")
if blocked_urls:
print(Fore.RED + f"Warning: {len(blocked_urls)} URL(s) were not scraped due to robots.txt restrictions." + Style.RESET_ALL)
logger.warning(f"{len(blocked_urls)} URL(s) were not scraped due to robots.txt restrictions: {', '.join(blocked_urls)}")
return scraped_content
def display_scraped_content(self, scraped_content: Dict[str, str]):
print(f"\n{Fore.CYAN}Scraped Content:{Style.RESET_ALL}")
for url, content in scraped_content.items():
print(f"{Fore.GREEN}URL: {url}{Style.RESET_ALL}")
print(f"Content: {content[:4000]}...\n")
def generate_final_answer(self, user_query: str, scraped_content: Dict[str, str]) -> str:
user_query_short = user_query[:200]
prompt = f"""
You are an AI assistant. Provide a comprehensive and detailed answer to the following question using ONLY the information provided in the scraped content. Do not include any references or mention any sources. Answer directly and thoroughly.
Question: "{user_query_short}"
Scraped Content:
{self.format_scraped_content(scraped_content)}
Important Instructions:
1. Do not use phrases like "Based on the absence of selected results" or similar.
2. If the scraped content does not contain enough information to answer the question, say so explicitly and explain what information is missing.
3. Provide as much relevant detail as possible from the scraped content.
Answer:
"""
max_retries = 3
for attempt in range(max_retries):
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=1024, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in generate_final_answer:\n{llm_output}")
if response_text:
logger.info(f"LLM Response:\n{response_text}")
return response_text
error_message = "I apologize, but I couldn't generate a satisfactory answer based on the available information."
logger.warning(f"Failed to generate a response after {max_retries} attempts. Returning error message.")
return error_message
def format_scraped_content(self, scraped_content: Dict[str, str]) -> str:
formatted_content = []
for url, content in scraped_content.items():
content = re.sub(r'\s+', ' ', content)
formatted_content.append(f"Content from {url}:\n{content}\n")
return "\n".join(formatted_content)
def synthesize_final_answer(self, user_query: str) -> str:
prompt = f"""
After multiple search attempts, we couldn't find a fully satisfactory answer to the user's question: "{user_query}"
Please provide the best possible answer you can, acknowledging any limitations or uncertainties.
If appropriate, suggest ways the user might refine their question or where they might find more information.
Respond in a clear, concise, and informative manner.
"""
try:
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=self.llm_config.get('max_tokens', 1024), stop=self.llm_config.get('stop', None))
llm_output = output.getvalue()
logger.info(f"LLM Output in synthesize_final_answer:\n{llm_output}")
if response_text:
return response_text.strip()
except Exception as e:
logger.error(f"Error in synthesize_final_answer: {str(e)}", exc_info=True)
return "I apologize, but after multiple attempts, I wasn't able to find a satisfactory answer to your question. Please try rephrasing your question or breaking it down into smaller, more specific queries."
components = initialize_system()
if not all(components):
sys.exit(1)
# End of EnhancedSelfImprovingSearch class
llm, parser, search_engine, research_manager = components
while True:
try:
user_input = get_multiline_input()
# Skip empty inputs
if not user_input:
continue
# Handle exit commands
if user_input.lower() in ["@quit", "quit", "q"]:
break
# Handle help command
if user_input.lower() == 'help':
print_header()
continue
# Process commands
if user_input.startswith('/'):
handle_search_mode(search_engine, user_input[1:].strip())
elif user_input.startswith('@'):
handle_research_mode(research_manager, user_input[1:].strip())
else:
print(f"{Fore.YELLOW}Please start with '/' for search or '@' for research.{Style.RESET_ALL}")
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Use 'q' to quit or continue with new input.{Style.RESET_ALL}")
continue
except Exception as e:
logger.error(f"Error processing input: {str(e)}")
print(f"{Fore.RED}Error: {str(e)}{Style.RESET_ALL}")
continue
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Program terminated by user.{Style.RESET_ALL}")
except Exception as e:
logger.critical(f"Critical error: {str(e)}")
print(f"{Fore.RED}Critical error: {str(e)}{Style.RESET_ALL}")
finally:
try:
if 'research_manager' in locals() and research_manager:
research_manager.cleanup()
except Exception as e:
logger.error(f"Cleanup error: {str(e)}")
print(Fore.YELLOW + "\nGoodbye!" + Style.RESET_ALL)
sys.exit(0)
if __name__ == "__main__":
main()