Update Self_Improving_Search.py

This commit is contained in:
Hafeez 2024-11-21 11:32:15 +05:30 committed by GitHub
parent 161698a228
commit 816ea37293
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,8 +1,8 @@
import time import time
import re import re
import os import os
from typing import List, Dict, Tuple, Union, Optional from typing import List, Dict, Tuple, Union
from colorama import Fore, Style, init from colorama import Fore, Style
import logging import logging
import sys import sys
from io import StringIO from io import StringIO
@ -10,365 +10,425 @@ from web_scraper import get_web_content, can_fetch
from llm_config import get_llm_config from llm_config import get_llm_config
from llm_response_parser import UltimateLLMResponseParser from llm_response_parser import UltimateLLMResponseParser
from llm_wrapper import LLMWrapper from llm_wrapper import LLMWrapper
from urllib.parse import urlparse, quote_plus from urllib.parse import urlparse
import requests
from bs4 import BeautifulSoup
import json
from datetime import datetime, timedelta
import threading
from queue import Queue
import concurrent.futures
# Initialize colorama
init()
# Set up logging # Set up logging
log_directory = 'logs' log_directory = 'logs'
if not os.path.exists(log_directory): if not os.path.exists(log_directory):
os.makedirs(log_directory) os.makedirs(log_directory)
# Configure logger
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
log_file = os.path.join(log_directory, 'search.log') log_file = os.path.join(log_directory, 'llama_output.log')
file_handler = logging.FileHandler(log_file) file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter) file_handler.setFormatter(formatter)
logger.handlers = []
logger.addHandler(file_handler) logger.addHandler(file_handler)
logger.propagate = False
class SearchResult: # Suppress other loggers
def __init__(self, title: str, url: str, snippet: str, score: float = 0.0): for name in ['root', 'duckduckgo_search', 'requests', 'urllib3']:
self.title = title logging.getLogger(name).setLevel(logging.WARNING)
self.url = url logging.getLogger(name).handlers = []
self.snippet = snippet logging.getLogger(name).propagate = False
self.score = score
self.content: Optional[str] = None
self.processed = False
self.error = None
def to_dict(self) -> Dict: class OutputRedirector:
return { def __init__(self, stream=None):
'title': self.title, self.stream = stream or StringIO()
'url': self.url, self.original_stdout = sys.stdout
'snippet': self.snippet, self.original_stderr = sys.stderr
'score': self.score,
'has_content': bool(self.content), def __enter__(self):
'processed': self.processed, sys.stdout = self.stream
'error': str(self.error) if self.error else None sys.stderr = self.stream
} return self.stream
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout = self.original_stdout
sys.stderr = self.original_stderr
class EnhancedSelfImprovingSearch: class EnhancedSelfImprovingSearch:
def __init__(self, llm: LLMWrapper, parser: UltimateLLMResponseParser, max_attempts: int = 5): def __init__(self, llm: LLMWrapper, parser: UltimateLLMResponseParser, max_attempts: int = 5):
self.llm = llm self.llm = llm
self.parser = parser self.parser = parser
self.max_attempts = max_attempts self.max_attempts = max_attempts
self.llm_config = get_llm_config() self.llm_config = get_llm_config()
self.last_query = ""
self.last_time_range = ""
self.search_cache = {}
self.content_cache = {}
self.max_cache_size = 100
self.max_concurrent_requests = 5
self.request_timeout = 15
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def search_and_improve(self, query: str, time_range: str = "auto") -> str: @staticmethod
"""Main search method that includes self-improvement""" def initialize_llm():
try: llm_wrapper = LLMWrapper()
logger.info(f"Starting search for query: {query}") return llm_wrapper
self.last_query = query
self.last_time_range = time_range
# Check cache first def print_thinking(self):
cache_key = f"{query}_{time_range}" print(Fore.MAGENTA + "🧠 Thinking..." + Style.RESET_ALL)
if cache_key in self.search_cache:
logger.info("Returning cached results")
return self.search_cache[cache_key]
# Perform initial search def print_searching(self):
results = self.perform_search(query, time_range) print(Fore.MAGENTA + "📝 Searching..." + Style.RESET_ALL)
if not results:
return "No results found."
# Enhance results with content fetching def search_and_improve(self, user_query: str) -> str:
enhanced_results = self.enhance_search_results(results) attempt = 0
while attempt < self.max_attempts:
# Generate improved summary print(f"\n{Fore.CYAN}Search attempt {attempt + 1}:{Style.RESET_ALL}")
summary = self.generate_enhanced_summary(enhanced_results, query) self.print_searching()
# Cache the results
self.cache_results(cache_key, summary)
return summary
except Exception as e: try:
logger.error(f"Search and improve error: {str(e)}", exc_info=True) formulated_query, time_range = self.formulate_query(user_query, attempt)
return f"Error during search: {str(e)}"
def perform_search(self, query: str, time_range: str) -> List[SearchResult]: print(f"{Fore.YELLOW}Original query: {user_query}{Style.RESET_ALL}")
"""Performs web search with improved error handling and retry logic""" print(f"{Fore.YELLOW}Formulated query: {formulated_query}{Style.RESET_ALL}")
if not query: print(f"{Fore.YELLOW}Time range: {time_range}{Style.RESET_ALL}")
return []
results = [] if not formulated_query:
retries = 3 print(f"{Fore.RED}Error: Empty search query. Retrying...{Style.RESET_ALL}")
delay = 2 attempt += 1
continue
for attempt in range(retries): search_results = self.perform_search(formulated_query, time_range)
try:
encoded_query = quote_plus(query)
search_url = f"https://html.duckduckgo.com/html/?q={encoded_query}"
response = requests.get(search_url, headers=self.headers, timeout=self.request_timeout)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
for i, result in enumerate(soup.select('.result'), 1):
if i > 15: # Increased limit for better coverage
break
title_elem = result.select_one('.result__title')
snippet_elem = result.select_one('.result__snippet')
link_elem = result.select_one('.result__url')
if title_elem and link_elem:
title = title_elem.get_text(strip=True)
snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
url = link_elem.get('href', '')
# Basic result scoring
score = self.calculate_result_score(title, snippet, query)
results.append(SearchResult(title, url, snippet, score))
if results: if not search_results:
# Sort results by score print(f"{Fore.RED}No results found. Retrying with a different query...{Style.RESET_ALL}")
results.sort(key=lambda x: x.score, reverse=True) attempt += 1
return results continue
if attempt < retries - 1:
logger.warning(f"No results found, retrying ({attempt + 1}/{retries})...")
time.sleep(delay)
except Exception as e:
logger.error(f"Search attempt {attempt + 1} failed: {str(e)}")
if attempt < retries - 1:
time.sleep(delay)
else:
raise
return results self.display_search_results(search_results)
def calculate_result_score(self, title: str, snippet: str, query: str) -> float: selected_urls = self.select_relevant_pages(search_results, user_query)
"""Calculate relevance score for search result"""
score = 0.0
query_terms = query.lower().split()
# Title matching
title_lower = title.lower()
for term in query_terms:
if term in title_lower:
score += 2.0
# Snippet matching
snippet_lower = snippet.lower()
for term in query_terms:
if term in snippet_lower:
score += 1.0
# Exact phrase matching
if query.lower() in title_lower:
score += 3.0
if query.lower() in snippet_lower:
score += 1.5
return score
def enhance_search_results(self, results: List[SearchResult]) -> List[SearchResult]: if not selected_urls:
"""Enhance search results with parallel content fetching""" print(f"{Fore.RED}No relevant URLs found. Retrying...{Style.RESET_ALL}")
enhanced_results = [] attempt += 1
continue
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_concurrent_requests) as executor:
future_to_result = {
executor.submit(self.fetch_and_process_content, result): result
for result in results[:10] # Limit to top 10 results
}
for future in concurrent.futures.as_completed(future_to_result):
result = future_to_result[future]
try:
content = future.result()
if content:
result.content = content
result.processed = True
enhanced_results.append(result)
except Exception as e:
logger.error(f"Error processing {result.url}: {str(e)}")
result.error = e
return enhanced_results
def fetch_and_process_content(self, result: SearchResult) -> Optional[str]: print(Fore.MAGENTA + "⚙️ Scraping selected pages..." + Style.RESET_ALL)
"""Fetch and process content for a search result""" # Scraping is done without OutputRedirector to ensure messages are visible
try: scraped_content = self.scrape_content(selected_urls)
# Check cache first
if result.url in self.content_cache:
return self.content_cache[result.url]
# Check if we can fetch the content if not scraped_content:
if not can_fetch(result.url): print(f"{Fore.RED}Failed to scrape content. Retrying...{Style.RESET_ALL}")
logger.warning(f"Cannot fetch content from {result.url}") attempt += 1
return None continue
content = get_web_content(result.url) self.display_scraped_content(scraped_content)
if content:
# Process and clean content
cleaned_content = self.clean_content(content)
# Cache the content
self.cache_content(result.url, cleaned_content)
return cleaned_content
except Exception as e:
logger.error(f"Error fetching content from {result.url}: {str(e)}")
return None
def clean_content(self, content: str) -> str: self.print_thinking()
"""Clean and normalize web content"""
# Remove HTML tags if any remained
content = re.sub(r'<[^>]+>', '', content)
# Remove extra whitespace
content = re.sub(r'\s+', ' ', content)
# Remove special characters
content = re.sub(r'[^\w\s.,!?-]', '', content)
# Truncate if too long
max_length = 5000
if len(content) > max_length:
content = content[:max_length] + "..."
return content.strip()
def generate_enhanced_summary(self, results: List[SearchResult], query: str) -> str: with OutputRedirector() as output:
"""Generate an enhanced summary using LLM with improved context""" evaluation, decision = self.evaluate_scraped_content(user_query, scraped_content)
try: llm_output = output.getvalue()
# Prepare context from enhanced results logger.info(f"LLM Output in evaluate_scraped_content:\n{llm_output}")
context = self.prepare_summary_context(results, query)
prompt = f"""
Based on the following comprehensive search results for "{query}",
provide a detailed analysis that:
1. Synthesizes key information from multiple sources
2. Highlights important findings and patterns
3. Maintains factual accuracy and cites sources
4. Presents a balanced view of different perspectives
5. Identifies any gaps or limitations in the available information
Context: print(f"{Fore.MAGENTA}Evaluation: {evaluation}{Style.RESET_ALL}")
{context} print(f"{Fore.MAGENTA}Decision: {decision}{Style.RESET_ALL}")
Please provide a well-structured analysis: if decision == "answer":
""" return self.generate_final_answer(user_query, scraped_content)
elif decision == "refine":
print(f"{Fore.YELLOW}Refining search...{Style.RESET_ALL}")
attempt += 1
else:
print(f"{Fore.RED}Unexpected decision. Proceeding to answer.{Style.RESET_ALL}")
return self.generate_final_answer(user_query, scraped_content)
summary = self.llm.generate(prompt, max_tokens=1500) except Exception as e:
return self.format_summary(summary) print(f"{Fore.RED}An error occurred during search attempt. Check the log file for details.{Style.RESET_ALL}")
logger.error(f"An error occurred during search: {str(e)}", exc_info=True)
attempt += 1
except Exception as e: return self.synthesize_final_answer(user_query)
logger.error(f"Summary generation error: {str(e)}")
return f"Error generating summary: {str(e)}"
def prepare_summary_context(self, results: List[SearchResult], query: str) -> str: def evaluate_scraped_content(self, user_query: str, scraped_content: Dict[str, str]) -> Tuple[str, str]:
"""Prepare context for summary generation""" user_query_short = user_query[:200]
context = f"Query: {query}\n\n" prompt = f"""
Evaluate if the following scraped content contains sufficient information to answer the user's question comprehensively:
for i, result in enumerate(results, 1):
context += f"Source {i}:\n"
context += f"Title: {result.title}\n"
context += f"URL: {result.url}\n"
if result.content:
# Include relevant excerpts from content
excerpts = self.extract_relevant_excerpts(result.content, query)
context += f"Key Excerpts:\n{excerpts}\n"
else:
context += f"Summary: {result.snippet}\n"
context += "\n"
return context
def extract_relevant_excerpts(self, content: str, query: str, max_excerpts: int = 3) -> str: User's question: "{user_query_short}"
"""Extract relevant excerpts from content"""
sentences = re.split(r'[.!?]+', content)
scored_sentences = []
query_terms = set(query.lower().split())
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
score = sum(1 for term in query_terms if term in sentence.lower())
if score > 0:
scored_sentences.append((sentence, score))
# Sort by relevance score and take top excerpts
scored_sentences.sort(key=lambda x: x[1], reverse=True)
excerpts = [sentence for sentence, _ in scored_sentences[:max_excerpts]]
return "\n".join(f"- {excerpt}" for excerpt in excerpts)
def format_summary(self, summary: str) -> str: Scraped Content:
"""Format the final summary for better readability""" {self.format_scraped_content(scraped_content)}
# Add section headers if not present
if not re.search(r'^Key Findings:', summary, re.MULTILINE):
summary = "Key Findings:\n" + summary
# Add source attribution if not present
if not re.search(r'^Sources:', summary, re.MULTILINE):
summary += "\n\nSources: Based on analysis of search results"
# Add formatting
summary = summary.replace('Key Findings:', f"{Fore.CYAN}Key Findings:{Style.RESET_ALL}")
summary = summary.replace('Sources:', f"\n{Fore.CYAN}Sources:{Style.RESET_ALL}")
return summary
def cache_results(self, key: str, value: str) -> None: Your task:
"""Cache search results with size limit""" 1. Determine if the scraped content provides enough relevant and detailed information to answer the user's question thoroughly.
if len(self.search_cache) >= self.max_cache_size: 2. If the information is sufficient, decide to 'answer'. If more information or clarification is needed, decide to 'refine' the search.
# Remove oldest entry
oldest_key = next(iter(self.search_cache))
del self.search_cache[oldest_key]
self.search_cache[key] = value
def cache_content(self, url: str, content: str) -> None: Respond using EXACTLY this format:
"""Cache web content with size limit""" Evaluation: [Your evaluation of the scraped content]
if len(self.content_cache) >= self.max_cache_size: Decision: [ONLY 'answer' if content is sufficient, or 'refine' if more information is needed]
# Remove oldest entry """
oldest_key = next(iter(self.content_cache)) max_retries = 3
del self.content_cache[oldest_key] for attempt in range(max_retries):
try:
self.content_cache[url] = content response_text = self.llm.generate(prompt, max_tokens=200, stop=None)
evaluation, decision = self.parse_evaluation_response(response_text)
if decision in ['answer', 'refine']:
return evaluation, decision
except Exception as e:
logger.warning(f"Error in evaluate_scraped_content (attempt {attempt + 1}): {str(e)}")
def clear_cache(self) -> None: logger.warning("Failed to get a valid decision in evaluate_scraped_content. Defaulting to 'refine'.")
"""Clear all caches""" return "Failed to evaluate content.", "refine"
self.search_cache.clear()
self.content_cache.clear()
def get_last_query(self) -> str: def parse_evaluation_response(self, response: str) -> Tuple[str, str]:
"""Returns the last executed query""" evaluation = ""
return self.last_query decision = ""
for line in response.strip().split('\n'):
if line.startswith('Evaluation:'):
evaluation = line.split(':', 1)[1].strip()
elif line.startswith('Decision:'):
decision = line.split(':', 1)[1].strip().lower()
return evaluation, decision
def get_last_time_range(self) -> str: def formulate_query(self, user_query: str, attempt: int) -> Tuple[str, str]:
"""Returns the last used time range""" user_query_short = user_query[:200]
return self.last_time_range prompt = f"""
Based on the following user question, formulate a concise and effective search query:
"{user_query_short}"
Your task:
1. Create a search query of 2-5 words that will yield relevant results.
2. Determine if a specific time range is needed for the search.
Time range options:
- 'd': Limit results to the past day. Use for very recent events or rapidly changing information.
- 'w': Limit results to the past week. Use for recent events or topics with frequent updates.
- 'm': Limit results to the past month. Use for relatively recent information or ongoing events.
- 'y': Limit results to the past year. Use for annual events or information that changes yearly.
- 'none': No time limit. Use for historical information or topics not tied to a specific time frame.
Respond in the following format:
Search query: [Your 2-5 word query]
Time range: [d/w/m/y/none]
Do not provide any additional information or explanation.
"""
max_retries = 3
for retry in range(max_retries):
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=50, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in formulate_query:\n{llm_output}")
query, time_range = self.parse_query_response(response_text)
if query and time_range:
return query, time_range
return self.fallback_query(user_query), "none"
if __name__ == "__main__": def parse_query_response(self, response: str) -> Tuple[str, str]:
pass query = ""
time_range = "none"
for line in response.strip().split('\n'):
if ":" in line:
key, value = line.split(":", 1)
key = key.strip().lower()
value = value.strip()
if "query" in key:
query = self.clean_query(value)
elif "time" in key or "range" in key:
time_range = self.validate_time_range(value)
return query, time_range
def clean_query(self, query: str) -> str:
query = re.sub(r'["\'\[\]]', '', query)
query = re.sub(r'\s+', ' ', query)
return query.strip()[:100]
def validate_time_range(self, time_range: str) -> str:
valid_ranges = ['d', 'w', 'm', 'y', 'none']
time_range = time_range.lower()
return time_range if time_range in valid_ranges else 'none'
def fallback_query(self, user_query: str) -> str:
words = user_query.split()
return " ".join(words[:5])
def perform_search(self, query: str, time_range: str) -> List[Dict]:
if not query:
return []
from duckduckgo_search import DDGS
with DDGS() as ddgs:
try:
with OutputRedirector() as output:
if time_range and time_range != 'none':
results = list(ddgs.text(query, timelimit=time_range, max_results=10))
else:
results = list(ddgs.text(query, max_results=10))
ddg_output = output.getvalue()
logger.info(f"DDG Output in perform_search:\n{ddg_output}")
return [{'number': i+1, **result} for i, result in enumerate(results)]
except Exception as e:
print(f"{Fore.RED}Search error: {str(e)}{Style.RESET_ALL}")
return []
def display_search_results(self, results: List[Dict]) -> None:
"""Display search results with minimal output"""
try:
if not results:
return
# Only show search success status
print(f"\nSearch query sent to DuckDuckGo: {self.last_query}")
print(f"Time range sent to DuckDuckGo: {self.last_time_range}")
print(f"Number of results: {len(results)}")
except Exception as e:
logger.error(f"Error displaying search results: {str(e)}")
def select_relevant_pages(self, search_results: List[Dict], user_query: str) -> List[str]:
prompt = f"""
Given the following search results for the user's question: "{user_query}"
Select the 2 most relevant results to scrape and analyze. Explain your reasoning for each selection.
Search Results:
{self.format_results(search_results)}
Instructions:
1. You MUST select exactly 2 result numbers from the search results.
2. Choose the results that are most likely to contain comprehensive and relevant information to answer the user's question.
3. Provide a brief reason for each selection.
You MUST respond using EXACTLY this format and nothing else:
Selected Results: [Two numbers corresponding to the selected results]
Reasoning: [Your reasoning for the selections]
"""
max_retries = 3
for retry in range(max_retries):
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=200, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in select_relevant_pages:\n{llm_output}")
parsed_response = self.parse_page_selection_response(response_text)
if parsed_response and self.validate_page_selection_response(parsed_response, len(search_results)):
selected_urls = [result['href'] for result in search_results if result['number'] in parsed_response['selected_results']]
allowed_urls = [url for url in selected_urls if can_fetch(url)]
if allowed_urls:
return allowed_urls
else:
print(f"{Fore.YELLOW}Warning: All selected URLs are disallowed by robots.txt. Retrying selection.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}Warning: Invalid page selection. Retrying.{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Warning: All attempts to select relevant pages failed. Falling back to top allowed results.{Style.RESET_ALL}")
allowed_urls = [result['href'] for result in search_results if can_fetch(result['href'])][:2]
return allowed_urls
def parse_page_selection_response(self, response: str) -> Dict[str, Union[List[int], str]]:
lines = response.strip().split('\n')
parsed = {}
for line in lines:
if line.startswith('Selected Results:'):
parsed['selected_results'] = [int(num.strip()) for num in re.findall(r'\d+', line)]
elif line.startswith('Reasoning:'):
parsed['reasoning'] = line.split(':', 1)[1].strip()
return parsed if 'selected_results' in parsed and 'reasoning' in parsed else None
def validate_page_selection_response(self, parsed_response: Dict[str, Union[List[int], str]], num_results: int) -> bool:
if len(parsed_response['selected_results']) != 2:
return False
if any(num < 1 or num > num_results for num in parsed_response['selected_results']):
return False
return True
def format_results(self, results: List[Dict]) -> str:
formatted_results = []
for result in results:
formatted_result = f"{result['number']}. Title: {result.get('title', 'N/A')}\n"
formatted_result += f" Snippet: {result.get('body', 'N/A')[:200]}...\n"
formatted_result += f" URL: {result.get('href', 'N/A')}\n"
formatted_results.append(formatted_result)
return "\n".join(formatted_results)
def scrape_content(self, urls: List[str]) -> Dict[str, str]:
scraped_content = {}
blocked_urls = []
for url in urls:
robots_allowed = can_fetch(url)
if robots_allowed:
content = get_web_content([url])
if content:
scraped_content.update(content)
print(Fore.YELLOW + f"Successfully scraped: {url}" + Style.RESET_ALL)
logger.info(f"Successfully scraped: {url}")
else:
print(Fore.RED + f"Robots.txt disallows scraping of {url}" + Style.RESET_ALL)
logger.warning(f"Robots.txt disallows scraping of {url}")
else:
blocked_urls.append(url)
print(Fore.RED + f"Warning: Robots.txt disallows scraping of {url}" + Style.RESET_ALL)
logger.warning(f"Robots.txt disallows scraping of {url}")
print(Fore.CYAN + f"Scraped content received for {len(scraped_content)} URLs" + Style.RESET_ALL)
logger.info(f"Scraped content received for {len(scraped_content)} URLs")
if blocked_urls:
print(Fore.RED + f"Warning: {len(blocked_urls)} URL(s) were not scraped due to robots.txt restrictions." + Style.RESET_ALL)
logger.warning(f"{len(blocked_urls)} URL(s) were not scraped due to robots.txt restrictions: {', '.join(blocked_urls)}")
return scraped_content
def display_scraped_content(self, scraped_content: Dict[str, str]):
print(f"\n{Fore.CYAN}Scraped Content:{Style.RESET_ALL}")
for url, content in scraped_content.items():
print(f"{Fore.GREEN}URL: {url}{Style.RESET_ALL}")
print(f"Content: {content[:4000]}...\n")
def generate_final_answer(self, user_query: str, scraped_content: Dict[str, str]) -> str:
user_query_short = user_query[:200]
prompt = f"""
You are an AI assistant. Provide a comprehensive and detailed answer to the following question using ONLY the information provided in the scraped content. Do not include any references or mention any sources. Answer directly and thoroughly.
Question: "{user_query_short}"
Scraped Content:
{self.format_scraped_content(scraped_content)}
Important Instructions:
1. Do not use phrases like "Based on the absence of selected results" or similar.
2. If the scraped content does not contain enough information to answer the question, say so explicitly and explain what information is missing.
3. Provide as much relevant detail as possible from the scraped content.
Answer:
"""
max_retries = 3
for attempt in range(max_retries):
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=1024, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in generate_final_answer:\n{llm_output}")
if response_text:
logger.info(f"LLM Response:\n{response_text}")
return response_text
error_message = "I apologize, but I couldn't generate a satisfactory answer based on the available information."
logger.warning(f"Failed to generate a response after {max_retries} attempts. Returning error message.")
return error_message
def format_scraped_content(self, scraped_content: Dict[str, str]) -> str:
formatted_content = []
for url, content in scraped_content.items():
content = re.sub(r'\s+', ' ', content)
formatted_content.append(f"Content from {url}:\n{content}\n")
return "\n".join(formatted_content)
def synthesize_final_answer(self, user_query: str) -> str:
prompt = f"""
After multiple search attempts, we couldn't find a fully satisfactory answer to the user's question: "{user_query}"
Please provide the best possible answer you can, acknowledging any limitations or uncertainties.
If appropriate, suggest ways the user might refine their question or where they might find more information.
Respond in a clear, concise, and informative manner.
"""
try:
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=self.llm_config.get('max_tokens', 1024), stop=self.llm_config.get('stop', None))
llm_output = output.getvalue()
logger.info(f"LLM Output in synthesize_final_answer:\n{llm_output}")
if response_text:
return response_text.strip()
except Exception as e:
logger.error(f"Error in synthesize_final_answer: {str(e)}", exc_info=True)
return "I apologize, but after multiple attempts, I wasn't able to find a satisfactory answer to your question. Please try rephrasing your question or breaking it down into smaller, more specific queries."
# End of EnhancedSelfImprovingSearch class