Web Search Providers update

This commit is contained in:
Graham V 2024-11-22 19:55:46 -05:00
parent 8457fc9d32
commit a361b005c9
18 changed files with 1698 additions and 223 deletions

View file

@ -15,7 +15,7 @@ Click the image above to watch the demonstration of My Project.
2. The LLM analyzes your query and generates 5 specific research focus areas, each with assigned priorities based on relevance to the topic or question.
3. Starting with the highest priority area, the LLM:
- Formulates targeted search queries
- Performs web searches
- Performs web searches using multiple providers (Tavily, Brave, Bing, Exa)
- Analyzes search results selecting the most relevant web pages
- Scrapes and extracts relevant information for selected web pages
- Documents all content it has found during the research session into a research text file including links to websites that the content was retrieved from
@ -27,15 +27,44 @@ The key distinction is that this isn't just a chatbot - it's an automated resear
## Features
- Automated research planning with prioritized focus areas
- Multi-provider search system with automatic fallback
- Systematic web searching and content analysis
- All research content and source URLs saved into a detailed text document
- Research summary generation
- Post-research Q&A capability about findings
- Self-improving search mechanism
- Self-improving search mechanism with provider optimization
- Rich console output with status indicators
- Comprehensive answer synthesis using web-sourced information
- Research conversation mode for exploring findings
## Search Providers
The system supports multiple search providers with automatic fallback:
1. Tavily (Primary)
- AI-powered search with relevance scoring
- Includes AI-generated summaries
- Optimized for research queries
2. Brave Search
- High-quality web results
- Built-in relevance scoring
- Real-time indexing
3. Bing
- Comprehensive web coverage
- News and recent content
- Academic results
4. Exa
- Specialized search capabilities
- High-precision results
- Content highlighting
5. DuckDuckGo (Fallback)
- Privacy-focused results
- No API key required
- Reliable fallback option
## Installation
1. Clone the repository:
@ -58,7 +87,17 @@ source venv/bin/activate # On Windows, use venv\Scripts\activate
pip install -r requirements.txt
```
4. Install and Configure Ollama:
4. Set up API keys:
Create a `.env` file in the project root with your API keys:
```
TAVILY_API_KEY=your_tavily_key
BRAVE_API_KEY=your_brave_key
BING_API_KEY=your_bing_key
EXA_API_KEY=your_exa_key
```
Note: The system will work with any combination of configured providers. If a provider's API key is not set, it will be skipped in the fallback chain. You don't need them all. If you don't add any, DuckDuckGo will be used.
5. Install and Configure Ollama:
- Install Ollama following instructions at https://ollama.ai
- Using your selected model file, create a custom model variant with the required context length
(phi3:3.8b-mini-128k-instruct or phi3:14b-medium-128k-instruct are recommended)
@ -117,7 +156,14 @@ python Web-LLM.py
## Configuration
The LLM settings can be modified in `llm_config.py`. You must specify your model name in the configuration for the researcher to function. The default configuration is optimized for research tasks with the specified Phi-3 model.
The system can be configured through several files:
1. `llm_config.py`: LLM settings and model configuration
2. `system_config.py`: Search provider settings, including:
- Default provider selection
- Fallback order
- Provider-specific parameters
- Rate limiting controls
## Current Status
This is a prototype that demonstrates functional automated research capabilities. While still in development, it successfully performs structured research tasks. Currently tested and working well with the phi3:3.8b-mini-128k-instruct model when the context is set as advised previously.
@ -126,6 +172,7 @@ This is a prototype that demonstrates functional automated research capabilities
- Ollama
- Python packages listed in requirements.txt
- Recommended model: phi3:3.8b-mini-128k-instruct or phi3:14b-medium-128k-instruct (with custom context length as specified)
- API keys for desired search providers
## Contributing
Contributions are welcome! This is a prototype with room for improvements and new features.
@ -135,7 +182,8 @@ This project is licensed under the MIT License - see the [LICENSE] file for deta
## Acknowledgments
- Ollama team for their local LLM runtime
- DuckDuckGo for their search API
- Search providers: Tavily, Brave, Bing, Exa, and DuckDuckGo
- Contributors to the Python packages used in this project
## Personal Note
This tool represents an attempt to bridge the gap between simple LLM interactions and genuine research capabilities. By structuring the research process and maintaining documentation, it aims to provide more thorough and verifiable results than traditional LLM conversations. It also represents an attempt to improve on my previous project 'Web-LLM-Assistant-Llamacpp-Ollama' which simply gave LLM's the ability to search and scrape websites to answer questions. This new program, unlike it's predecessor I feel thos program takes that capability and uses it in a novel and actually very useful way, I feel that it is the most advanced and useful way I could conceive of building on my previous program, as a very new programmer this being my second ever program I feel very good about the result, I hope that it hits the mark!

View file

@ -1,7 +1,10 @@
"""
Enhanced search functionality with multiple providers and self-improving capabilities.
"""
import time
import re
import os
from typing import List, Dict, Tuple, Union
from typing import List, Dict, Tuple, Union, Any
from colorama import Fore, Style
import logging
import sys
@ -10,7 +13,9 @@ from web_scraper import get_web_content, can_fetch
from llm_config import get_llm_config
from llm_response_parser import UltimateLLMResponseParser
from llm_wrapper import LLMWrapper
from search_manager import SearchManager
from urllib.parse import urlparse
from system_config import RESEARCH_CONFIG
# Set up logging
log_directory = 'logs'
@ -55,6 +60,19 @@ class EnhancedSelfImprovingSearch:
self.parser = parser
self.max_attempts = max_attempts
self.llm_config = get_llm_config()
self.search_manager = SearchManager()
# Rate limiting configuration
self.requests_per_minute = RESEARCH_CONFIG['rate_limiting']['requests_per_minute']
self.concurrent_requests = RESEARCH_CONFIG['rate_limiting']['concurrent_requests']
self.cooldown_period = RESEARCH_CONFIG['rate_limiting']['cooldown_period']
self.last_request_time = 0
self.request_count = 0
self.last_query = None
self.last_time_range = None
self.WHITESPACE_PATTERN = r'\s+'
@staticmethod
def initialize_llm():
@ -75,6 +93,8 @@ class EnhancedSelfImprovingSearch:
try:
formulated_query, time_range = self.formulate_query(user_query, attempt)
self.last_query = formulated_query
self.last_time_range = time_range
print(f"{Fore.YELLOW}Original query: {user_query}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Formulated query: {formulated_query}{Style.RESET_ALL}")
@ -86,15 +106,19 @@ class EnhancedSelfImprovingSearch:
continue
search_results = self.perform_search(formulated_query, time_range)
if not isinstance(search_results, dict):
print(f"{Fore.RED}Error: Invalid search results format. Expected dict, got {type(search_results)}{Style.RESET_ALL}")
attempt += 1
continue
if not search_results:
if not search_results.get('success') or not search_results.get('results'):
print(f"{Fore.RED}No results found. Retrying with a different query...{Style.RESET_ALL}")
attempt += 1
continue
self.display_search_results(search_results)
selected_urls = self.select_relevant_pages(search_results, user_query)
selected_urls = self.select_relevant_pages(search_results['results'], user_query)
if not selected_urls:
print(f"{Fore.RED}No relevant URLs found. Retrying...{Style.RESET_ALL}")
@ -102,7 +126,6 @@ class EnhancedSelfImprovingSearch:
continue
print(Fore.MAGENTA + "⚙️ Scraping selected pages..." + Style.RESET_ALL)
# Scraping is done without OutputRedirector to ensure messages are visible
scraped_content = self.scrape_content(selected_urls)
if not scraped_content:
@ -123,7 +146,9 @@ class EnhancedSelfImprovingSearch:
print(f"{Fore.MAGENTA}Decision: {decision}{Style.RESET_ALL}")
if decision == "answer":
return self.generate_final_answer(user_query, scraped_content)
# If Tavily provided an AI answer, include it in the final answer generation
ai_answer = search_results.get('answer', '') if search_results.get('provider') == 'tavily' else ''
return self.generate_final_answer(user_query, scraped_content, ai_answer)
elif decision == "refine":
print(f"{Fore.YELLOW}Refining search...{Style.RESET_ALL}")
attempt += 1
@ -138,157 +163,81 @@ class EnhancedSelfImprovingSearch:
return self.synthesize_final_answer(user_query)
def evaluate_scraped_content(self, user_query: str, scraped_content: Dict[str, str]) -> Tuple[str, str]:
user_query_short = user_query[:200]
prompt = f"""
Evaluate if the following scraped content contains sufficient information to answer the user's question comprehensively:
def formulate_query(self, query: str, attempt: int) -> Tuple[str, str]:
"""Placeholder for query formulation - returns original query and default time range."""
return query, 'none'
User's question: "{user_query_short}"
Scraped Content:
{self.format_scraped_content(scraped_content)}
Your task:
1. Determine if the scraped content provides enough relevant and detailed information to answer the user's question thoroughly.
2. If the information is sufficient, decide to 'answer'. If more information or clarification is needed, decide to 'refine' the search.
Respond using EXACTLY this format:
Evaluation: [Your evaluation of the scraped content]
Decision: [ONLY 'answer' if content is sufficient, or 'refine' if more information is needed]
"""
max_retries = 3
for attempt in range(max_retries):
try:
response_text = self.llm.generate(prompt, max_tokens=200, stop=None)
evaluation, decision = self.parse_evaluation_response(response_text)
if decision in ['answer', 'refine']:
return evaluation, decision
except Exception as e:
logger.warning(f"Error in evaluate_scraped_content (attempt {attempt + 1}): {str(e)}")
logger.warning("Failed to get a valid decision in evaluate_scraped_content. Defaulting to 'refine'.")
return "Failed to evaluate content.", "refine"
def parse_evaluation_response(self, response: str) -> Tuple[str, str]:
evaluation = ""
decision = ""
for line in response.strip().split('\n'):
if line.startswith('Evaluation:'):
evaluation = line.split(':', 1)[1].strip()
elif line.startswith('Decision:'):
decision = line.split(':', 1)[1].strip().lower()
return evaluation, decision
def formulate_query(self, user_query: str, attempt: int) -> Tuple[str, str]:
user_query_short = user_query[:200]
prompt = f"""
Based on the following user question, formulate a concise and effective search query:
"{user_query_short}"
Your task:
1. Create a search query of 2-5 words that will yield relevant results.
2. Determine if a specific time range is needed for the search.
Time range options:
- 'd': Limit results to the past day. Use for very recent events or rapidly changing information.
- 'w': Limit results to the past week. Use for recent events or topics with frequent updates.
- 'm': Limit results to the past month. Use for relatively recent information or ongoing events.
- 'y': Limit results to the past year. Use for annual events or information that changes yearly.
- 'none': No time limit. Use for historical information or topics not tied to a specific time frame.
Respond in the following format:
Search query: [Your 2-5 word query]
Time range: [d/w/m/y/none]
Do not provide any additional information or explanation.
"""
max_retries = 3
for retry in range(max_retries):
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=50, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in formulate_query:\n{llm_output}")
query, time_range = self.parse_query_response(response_text)
if query and time_range:
return query, time_range
return self.fallback_query(user_query), "none"
def parse_query_response(self, response: str) -> Tuple[str, str]:
query = ""
time_range = "none"
for line in response.strip().split('\n'):
if ":" in line:
key, value = line.split(":", 1)
key = key.strip().lower()
value = value.strip()
if "query" in key:
query = self.clean_query(value)
elif "time" in key or "range" in key:
time_range = self.validate_time_range(value)
return query, time_range
def clean_query(self, query: str) -> str:
query = re.sub(r'["\'\[\]]', '', query)
query = re.sub(r'\s+', ' ', query)
return query.strip()[:100]
def validate_time_range(self, time_range: str) -> str:
valid_ranges = ['d', 'w', 'm', 'y', 'none']
time_range = time_range.lower()
return time_range if time_range in valid_ranges else 'none'
def fallback_query(self, user_query: str) -> str:
words = user_query.split()
return " ".join(words[:5])
def perform_search(self, query: str, time_range: str) -> List[Dict]:
def perform_search(self, query: str, time_range: str) -> Dict[str, Any]:
"""
Perform search using SearchManager with time range adaptation and rate limiting.
"""
if not query:
return []
return {'success': False, 'error': 'Empty query', 'results': [], 'provider': None}
# Rate limiting check
current_time = time.time()
time_since_last_request = current_time - self.last_request_time
# Check if we need to cool down
if self.request_count >= self.requests_per_minute:
if time_since_last_request < self.cooldown_period:
logger.warning(f"Rate limit reached. Cooling down for {self.cooldown_period - time_since_last_request:.1f} seconds")
time.sleep(self.cooldown_period - time_since_last_request)
self.request_count = 0
# Update rate limiting trackers
self.last_request_time = time.time()
self.request_count += 1
search_params = {
'max_results': RESEARCH_CONFIG['search']['max_results_per_search'],
'min_relevance_score': RESEARCH_CONFIG['search']['min_relevance_score']
}
# Add time range parameters if specified
time_params = {
'd': {'days': 1},
'w': {'days': 7},
'm': {'days': 30},
'y': {'days': 365},
'none': {}
}
search_params.update(time_params.get(time_range.lower(), {}))
return self.search_manager.search(query, **search_params)
from duckduckgo_search import DDGS
with DDGS() as ddgs:
try:
with OutputRedirector() as output:
if time_range and time_range != 'none':
results = list(ddgs.text(query, timelimit=time_range, max_results=10))
else:
results = list(ddgs.text(query, max_results=10))
ddg_output = output.getvalue()
logger.info(f"DDG Output in perform_search:\n{ddg_output}")
return [{'number': i+1, **result} for i, result in enumerate(results)]
except Exception as e:
print(f"{Fore.RED}Search error: {str(e)}{Style.RESET_ALL}")
return []
def display_search_results(self, results: List[Dict]) -> None:
"""Display search results with minimal output"""
def display_search_results(self, results: Dict[str, Any]) -> None:
"""Display search results with provider information"""
try:
if not results:
if not results['success']:
print(f"{Fore.RED}Search failed: {results.get('error', 'Unknown error')}{Style.RESET_ALL}")
return
# Only show search success status
print(f"\nSearch query sent to DuckDuckGo: {self.last_query}")
print(f"Time range sent to DuckDuckGo: {self.last_time_range}")
print(f"Number of results: {len(results)}")
print(f"\n{Fore.CYAN}Search Results from {results['provider'].upper()}:{Style.RESET_ALL}")
print(f"Query: {self.last_query}")
print(f"Time range: {self.last_time_range}")
print(f"Number of results: {len(results['results'])}")
if results.get('answer'):
print(f"\n{Fore.GREEN}AI-Generated Summary:{Style.RESET_ALL}")
print(results['answer'])
except Exception as e:
logger.error(f"Error displaying search results: {str(e)}")
def select_relevant_pages(self, search_results: List[Dict], user_query: str) -> List[str]:
prompt = f"""
Given the following search results for the user's question: "{user_query}"
Select the 2 most relevant results to scrape and analyze. Explain your reasoning for each selection.
Search Results:
{self.format_results(search_results)}
Instructions:
1. You MUST select exactly 2 result numbers from the search results.
2. Choose the results that are most likely to contain comprehensive and relevant information to answer the user's question.
3. Provide a brief reason for each selection.
You MUST respond using EXACTLY this format and nothing else:
Selected Results: [Two numbers corresponding to the selected results]
Reasoning: [Your reasoning for the selections]
"""
prompt = (
f"Given the following search results for the user's question: \"{user_query}\"\n"
"Select the 2 most relevant results to scrape and analyze. Explain your reasoning for each selection.\n\n"
f"Search Results:\n{self.format_results(search_results)}\n\n"
"Instructions:\n"
"1. You MUST select exactly 2 result numbers from the search results.\n"
"2. Choose the results that are most likely to contain comprehensive and relevant information to answer the user's question.\n"
"3. Provide a brief reason for each selection.\n\n"
"You MUST respond using EXACTLY this format and nothing else:\n\n"
"Selected Results: [Two numbers corresponding to the selected results]\n"
"Reasoning: [Your reasoning for the selections]"
)
max_retries = 3
for retry in range(max_retries):
@ -297,45 +246,31 @@ Reasoning: [Your reasoning for the selections]
llm_output = output.getvalue()
logger.info(f"LLM Output in select_relevant_pages:\n{llm_output}")
parsed_response = self.parse_page_selection_response(response_text)
if parsed_response and self.validate_page_selection_response(parsed_response, len(search_results)):
selected_urls = [result['href'] for result in search_results if result['number'] in parsed_response['selected_results']]
parsed_response = {int(char) for char in response_text[:40] if char.isdigit()}
selected_urls = [search_results['results'][i-1]['url'] for i in parsed_response]
allowed_urls = [url for url in selected_urls if can_fetch(url)]
if allowed_urls:
return allowed_urls
else:
print(f"{Fore.YELLOW}Warning: All selected URLs are disallowed by robots.txt. Retrying selection.{Style.RESET_ALL}")
allowed_urls = [url for url in selected_urls if can_fetch(url)]
if allowed_urls:
return allowed_urls
else:
print(f"{Fore.YELLOW}Warning: Invalid page selection. Retrying.{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Warning: All selected URLs are disallowed by robots.txt. Retrying selection.{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Warning: All attempts to select relevant pages failed. Falling back to top allowed results.{Style.RESET_ALL}")
allowed_urls = [result['href'] for result in search_results if can_fetch(result['href'])][:2]
allowed_urls = [result['url'] for result in search_results if can_fetch(result['url'])][:2]
return allowed_urls
def parse_page_selection_response(self, response: str) -> Dict[str, Union[List[int], str]]:
lines = response.strip().split('\n')
parsed = {}
for line in lines:
if line.startswith('Selected Results:'):
parsed['selected_results'] = [int(num.strip()) for num in re.findall(r'\d+', line)]
elif line.startswith('Reasoning:'):
parsed['reasoning'] = line.split(':', 1)[1].strip()
return parsed if 'selected_results' in parsed and 'reasoning' in parsed else None
def validate_page_selection_response(self, parsed_response: Dict[str, Union[List[int], str]], num_results: int) -> bool:
if len(parsed_response['selected_results']) != 2:
return False
if any(num < 1 or num > num_results for num in parsed_response['selected_results']):
return False
return True
def format_results(self, results: List[Dict]) -> str:
formatted_results = []
for result in results:
formatted_result = f"{result['number']}. Title: {result.get('title', 'N/A')}\n"
formatted_result += f" Snippet: {result.get('body', 'N/A')[:200]}...\n"
formatted_result += f" URL: {result.get('href', 'N/A')}\n"
for i, result in enumerate(results['results'], 1):
formatted_result = f"{i}. Title: {result.get('title', 'N/A')}\n"
formatted_result += f" Snippet: {result.get('content', 'N/A')[:200]}...\n"
formatted_result += f" URL: {result.get('url', 'N/A')}\n"
if result.get('published_date'):
formatted_result += f" Published: {result['published_date']}\n"
if result.get('score'):
formatted_result += f" Relevance Score: {result['score']}\n"
formatted_results.append(formatted_result)
return "\n".join(formatted_results)
@ -373,27 +308,30 @@ Reasoning: [Your reasoning for the selections]
print(f"{Fore.GREEN}URL: {url}{Style.RESET_ALL}")
print(f"Content: {content[:4000]}...\n")
def generate_final_answer(self, user_query: str, scraped_content: Dict[str, str]) -> str:
def generate_final_answer(self, user_query: str, scraped_content: Dict[str, str], ai_answer: str = '') -> str:
user_query_short = user_query[:200]
prompt = f"""
You are an AI assistant. Provide a comprehensive and detailed answer to the following question using ONLY the information provided in the scraped content. Do not include any references or mention any sources. Answer directly and thoroughly.
ai_summary = f"AI-Generated Summary:\n{ai_answer}\n\n" if ai_answer else ""
prompt = (
f"You are an AI assistant. Provide a comprehensive and detailed answer to the following question "
f"using the provided information. Do not include any references or mention any sources. "
f"Answer directly and thoroughly.\n\n"
f"Question: \"{user_query_short}\"\n\n"
f"{ai_summary}"
f"Scraped Content:\n{self.format_scraped_content(scraped_content)}\n\n"
f"Important Instructions:\n"
f"1. Do not use phrases like \"Based on the absence of selected results\" or similar.\n"
f"2. If the scraped content does not contain enough information to answer the question, "
f"say so explicitly and explain what information is missing.\n"
f"3. Provide as much relevant detail as possible from the scraped content.\n"
f"4. If an AI-generated summary is provided, use it to enhance your answer but don't rely on it exclusively.\n\n"
f"Answer:"
)
Question: "{user_query_short}"
Scraped Content:
{self.format_scraped_content(scraped_content)}
Important Instructions:
1. Do not use phrases like "Based on the absence of selected results" or similar.
2. If the scraped content does not contain enough information to answer the question, say so explicitly and explain what information is missing.
3. Provide as much relevant detail as possible from the scraped content.
Answer:
"""
max_retries = 3
for attempt in range(max_retries):
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=1024, stop=None)
response_text = self.llm.generate(prompt, max_tokens=4096, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in generate_final_answer:\n{llm_output}")
if response_text:
@ -407,19 +345,18 @@ Answer:
def format_scraped_content(self, scraped_content: Dict[str, str]) -> str:
formatted_content = []
for url, content in scraped_content.items():
content = re.sub(r'\s+', ' ', content)
formatted_content.append(f"Content from {url}:\n{content}\n")
content = re.sub(self.WHITESPACE_PATTERN, ' ', content)
formatted_content.append(f"Content from {url}:{content}")
return "\n".join(formatted_content)
def synthesize_final_answer(self, user_query: str) -> str:
prompt = f"""
After multiple search attempts, we couldn't find a fully satisfactory answer to the user's question: "{user_query}"
Please provide the best possible answer you can, acknowledging any limitations or uncertainties.
If appropriate, suggest ways the user might refine their question or where they might find more information.
Respond in a clear, concise, and informative manner.
"""
prompt = (
f"After multiple search attempts, we couldn't find a fully satisfactory answer to the user's question: "
f"\"{user_query}\"\n\n"
f"Please provide the best possible answer you can, acknowledging any limitations or uncertainties.\n"
f"If appropriate, suggest ways the user might refine their question or where they might find more information.\n\n"
f"Respond in a clear, concise, and informative manner."
)
try:
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=self.llm_config.get('max_tokens', 1024), stop=self.llm_config.get('stop', None))

View file

@ -269,7 +269,7 @@ def main():
handle_research_mode(research_manager, research_query)
else:
print(f"{Fore.RED}Please start with '/' for search or '@' for research.{Style.RESET_ALL}")
print(f"{Fore.RED}Please start with '@' for research.{Style.RESET_ALL}")
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Exiting program...{Style.RESET_ALL}")

6
example.env Normal file
View file

@ -0,0 +1,6 @@
TAVILY_API_KEY="tvly-"
BING_API_KEY="495..."
BRAVE_API_KEY="BSAZ..."
EXA_API_KEY="85e199c6-.."
OPENAI_API_KEY="sk-proj-.."
ANTHROPIC_API_KEY="sk-ant-.."

View file

@ -1,6 +1,6 @@
# llm_config.py
LLM_TYPE = "anthropic" # Options: 'llama_cpp', 'ollama', 'openai', 'anthropic'
LLM_TYPE = "openai" # Options: 'llama_cpp', 'ollama', 'openai', 'anthropic'
# LLM settings for llama_cpp
MODEL_PATH = "/home/james/llama.cpp/models/gemma-2-9b-it-Q6_K.gguf" # Replace with your llama.cpp models filepath
@ -37,9 +37,10 @@ LLM_CONFIG_OPENAI = {
"api_key": "", # Set via environment variable OPENAI_API_KEY
"base_url": None, # Optional: Set to use alternative OpenAI-compatible endpoints
"model_name": "gpt-4o", # Required: Specify the model to use
"messages": [], # Placeholder for conversation history
"temperature": 0.7,
"top_p": 0.9,
"max_tokens": 4096,
"max_tokens": 32000,
"stop": ["User:", "\n\n"],
"presence_penalty": 0,
"frequency_penalty": 0

View file

@ -151,4 +151,4 @@ class LLMWrapper:
'stop': kwargs.get('stop', self.llm_config.get('stop', [])),
'echo': False,
}
return llama_kwargs
return llama_kwargs

View file

@ -11,3 +11,5 @@ tqdm
urllib3
openai>=1.0.0
anthropic>=0.7.0
tavily-python
exa-py

View file

@ -19,6 +19,7 @@ import tty
from threading import Event
from urllib.parse import urlparse
from pathlib import Path
from system_config import get_research_config
# Initialize colorama for cross-platform color support
if os.name == 'nt': # Windows-specific initialization
@ -46,6 +47,7 @@ for name in logging.root.manager.loggerDict:
if name != __name__:
logging.getLogger(name).disabled = True
RESEARCH_CONFIG = get_research_config()
@dataclass
class ResearchFocus:
"""Represents a specific area of research focus"""
@ -378,9 +380,17 @@ class TerminalUI:
self.status_win = None
def _cleanup(self):
"""Enhanced resource cleanup with better process handling"""
"""Enhanced cleanup to handle conversation mode and auto-save"""
self.conversation_active = False
self.should_terminate.set()
# Wait for auto-save thread to finish if it exists
if hasattr(self, 'auto_save_thread') and self.auto_save_thread and self.auto_save_thread.is_alive():
try:
self.auto_save_thread.join(timeout=1.0)
except Exception as e:
logger.error(f"Error cleaning up auto-save thread: {str(e)}")
# Handle research thread with improved termination
if self.research_thread and self.research_thread.is_alive():
try:
@ -752,10 +762,18 @@ Do not provide any additional information or explanation, note that the time ran
return {'query': '', 'time_range': 'none'}
def _cleanup(self):
"""Enhanced cleanup to handle conversation mode"""
"""Enhanced cleanup to handle conversation mode and auto-save"""
self.conversation_active = False
self.should_terminate.set()
# Wait for auto-save thread to finish if it exists
if hasattr(self, 'auto_save_thread') and self.auto_save_thread and self.auto_save_thread.is_alive():
try:
self.auto_save_thread.join(timeout=1.0)
except Exception as e:
logger.error(f"Error cleaning up auto-save thread: {str(e)}")
# Handle research thread with improved termination
if self.research_thread and self.research_thread.is_alive():
try:
self.research_thread.join(timeout=1.0)
@ -778,7 +796,7 @@ Do not provide any additional information or explanation, note that the time ran
self.ui.cleanup()
def _initialize_document(self):
"""Initialize research session document"""
"""Initialize research session document with auto-backup"""
try:
# Get all existing research session files
self.session_files = []
@ -802,6 +820,10 @@ Do not provide any additional information or explanation, note that the time ran
f.write("="*80 + "\n\n")
f.flush()
# Setup auto-save if enabled
if RESEARCH_CONFIG['storage']['auto_save']:
self._start_auto_save()
except Exception as e:
logger.error(f"Error initializing document: {str(e)}")
self.document_path = "research_findings.txt"
@ -983,15 +1005,11 @@ Do not provide any additional information or explanation, note that the time ran
try:
with open(self.document_path, 'r', encoding='utf-8') as f:
content = f.read()
estimated_tokens = len(content.split()) * 1.3
max_tokens = self.llm.llm_config.get('n_ctx', 2048)
current_ratio = estimated_tokens / max_tokens
if current_ratio > 0.8:
logger.warning(f"Document size at {current_ratio*100:.1f}% of context limit")
self.ui.update_output(f"Warning: Document size at {current_ratio*100:.1f}% of context limit")
return current_ratio > 0.9
if len(content) >= RESEARCH_CONFIG['content']['max_document_size']:
logger.warning(f"Document size exceeded configured limit of {RESEARCH_CONFIG['content']['max_document_size']} characters")
self.ui.update_output(f"Warning: Document size exceeded configured limit")
return True
return False
except Exception as e:
logger.error(f"Error checking document size: {str(e)}")
return True
@ -1165,7 +1183,7 @@ Research Progress:
Summary:
"""
summary = self.llm.generate(summary_prompt, max_tokens=4000)
summary = self.llm.generate(summary_prompt, max_tokens=16384)
# Signal that summary is complete to stop the progress indicator
self.summary_ready = True
@ -1430,6 +1448,36 @@ Answer:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
print() # New line for clean display
def _start_auto_save(self):
"""Start auto-save thread to periodically save research progress"""
def auto_save_loop():
while not self.should_terminate.is_set():
try:
# Create backup file name with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"{self.document_path}.{timestamp}.bak"
# Copy current document to backup
if os.path.exists(self.document_path):
with open(self.document_path, 'r', encoding='utf-8') as src:
content = src.read()
with open(backup_path, 'w', encoding='utf-8') as dst:
dst.write(content)
# Keep only last x backups defined in config item
backups = sorted([f for f in os.listdir() if f.startswith(f"{self.document_path}.") and f.endswith(".bak")])
while len(backups) > RESEARCH_CONFIG['storage']['max_backups']:
os.remove(backups.pop(0))
time.sleep(RESEARCH_CONFIG['storage']['auto_save_interval'])
except Exception as e:
logger.error(f"Error in auto-save: {str(e)}")
time.sleep(60) # Wait a minute before retrying if there's an error
# Start auto-save thread
self.auto_save_thread = threading.Thread(target=auto_save_loop, daemon=True)
self.auto_save_thread.start()
if __name__ == "__main__":
from llm_wrapper import LLMWrapper
from llm_response_parser import UltimateLLMResponseParser

219
search_manager.py Normal file
View file

@ -0,0 +1,219 @@
"""
SearchManager handles search provider selection, fallback, and result normalization.
"""
import logging
from typing import Dict, List, Any, Optional
from time import sleep
from system_config import get_search_config
from search_providers.factory import SearchProviderFactory
logger = logging.getLogger(__name__)
class SearchManager:
"""
Manages multiple search providers with fallback support and result normalization.
"""
def __init__(self, tavily_api_key=None, brave_api_key=None, bing_api_key=None, exa_api_key=None):
"""Initialize SearchManager with configuration and providers."""
self.config = get_search_config()
self.factory = SearchProviderFactory()
self.providers = self._initialize_providers(tavily_api_key, brave_api_key, bing_api_key, exa_api_key)
self.current_provider = self.config["default_provider"]
def _initialize_providers(self, tavily_api_key=None, brave_api_key=None, bing_api_key=None, exa_api_key=None) -> Dict[str, Any]:
"""Initialize all configured search providers."""
providers = {}
for provider_name in self.config["fallback_order"]:
try:
if provider_name == 'tavily':
provider = self.factory.get_provider(provider_name, api_key=tavily_api_key)
elif provider_name == 'brave':
provider = self.factory.get_provider(provider_name, api_key=brave_api_key)
elif provider_name == 'bing':
provider = self.factory.get_provider(provider_name, api_key=bing_api_key)
elif provider_name == 'exa':
provider = self.factory.get_provider(provider_name, api_key=exa_api_key)
else:
provider = self.factory.get_provider(provider_name)
if provider.is_configured():
providers[provider_name] = provider
logger.info(f"Successfully initialized {provider_name} provider")
else:
logger.warning(f"Provider {provider_name} not properly configured")
except Exception as e:
logger.error(f"Failed to initialize {provider_name} provider: {str(e)}")
return providers
def _normalize_results(self, results: Dict[str, Any], provider: str) -> Dict[str, Any]:
"""
Normalize search results to a standard format regardless of provider.
Standard format:
{
'success': bool,
'error': Optional[str],
'results': List[{
'title': str,
'url': str,
'content': str,
'score': float,
'published_date': Optional[str]
}],
'answer': Optional[str], # For providers that support AI-generated answers
'provider': str
}
"""
if not isinstance(results, dict):
return {
'success': False,
'error': f'Invalid results format from {provider}',
'results': [],
'provider': provider
}
if 'error' in results:
return {
'success': False,
'error': results['error'],
'results': [],
'provider': provider
}
normalized = {
'success': True,
'error': None,
'provider': provider,
'results': []
}
# Handle Tavily's AI answer if present
if 'answer' in results:
normalized['answer'] = results['answer']
# Normalize results based on provider
if provider == 'tavily':
# Handle both general and news results from Tavily
if 'articles' in results:
normalized['results'] = [{
'title': r.get('title', ''),
'url': r.get('url', ''),
'content': r.get('content', '')[:500],
'score': float(r.get('score', 0.0)),
'published_date': r.get('published_date')
} for r in results.get('articles', [])]
else:
normalized['results'] = results.get('results', [])
elif provider == 'brave':
normalized['results'] = [{
'title': r.get('title', ''),
'url': r.get('url', ''),
'content': r.get('description', '')[:500],
'score': float(r.get('relevance_score', 0.0)),
'published_date': r.get('published_date')
} for r in results.get('results', [])]
elif provider == 'bing':
normalized['results'] = [{
'title': r.get('title', ''),
'url': r.get('url', ''),
'content': r.get('content', '')[:500],
'score': 1.0, # Bing doesn't provide relevance scores
'published_date': None
} for r in results.get('results', [])]
elif provider == 'exa':
normalized['results'] = [{
'title': r.get('title', ''),
'url': r.get('url', ''),
'content': r.get('text', '')[:500],
'score': float(r.get('relevance_score', 0.0)),
'published_date': r.get('published_date')
} for r in results.get('results', [])]
elif provider == 'duckduckgo':
if not isinstance(results, list):
results = []
normalized['results'] = [{
'title': r.get('title', ''),
'url': r.get('link', ''),
'content': r.get('snippet', '')[:500],
'score': 1.0, # DuckDuckGo doesn't provide relevance scores
'published_date': None
} for r in results]
return normalized
def search(self, query: str, **kwargs) -> Dict[str, Any]:
"""
Perform a search using configured providers with fallback support.
"""
tried_providers = set()
# First try the default provider
if self.current_provider in self.providers:
try:
provider = self.providers[self.current_provider]
provider_settings = self.config["provider_settings"].get(self.current_provider, {})
search_params = {**provider_settings, **kwargs}
results = provider.search(query, **search_params)
normalized_results = self._normalize_results(results, self.current_provider)
if normalized_results['success']:
return normalized_results
logger.warning(
f"Search with default provider {self.current_provider} failed: {normalized_results.get('error')}"
)
except Exception as e:
logger.error(f"Error using default provider {self.current_provider}: {str(e)}")
tried_providers.add(self.current_provider)
# Then try providers in fallback order
for provider_name in self.config["fallback_order"]:
if provider_name not in self.providers or provider_name in tried_providers:
continue
tried_providers.add(provider_name)
provider = self.providers[provider_name]
try:
# Get provider-specific settings
provider_settings = self.config["provider_settings"].get(provider_name, {})
search_params = {**provider_settings, **kwargs}
# Perform search
results = provider.search(query, **search_params)
normalized_results = self._normalize_results(results, provider_name)
# If search was successful, update current provider and return results
if normalized_results['success']:
self.current_provider = provider_name
return normalized_results
logger.warning(
f"Search with {provider_name} failed: {normalized_results.get('error')}"
)
except Exception as e:
logger.error(f"Error using {provider_name} provider: {str(e)}")
# Apply rate limiting before trying next provider
sleep(self.config["rate_limiting"]["cooldown_period"] / len(self.providers))
# If all providers failed, return error
return {
'success': False,
'error': 'All search providers failed',
'results': [],
'provider': None
}
def get_current_provider(self) -> str:
"""Get the name of the currently active search provider."""
return self.current_provider
def get_available_providers(self) -> List[str]:
"""Get list of available (properly configured) search providers."""
return list(self.providers.keys())

View file

@ -0,0 +1,5 @@
from .base_provider import BaseSearchProvider
from .tavily_provider import TavilySearchProvider
from .factory import SearchProviderFactory
__all__ = ['BaseSearchProvider', 'TavilySearchProvider', 'SearchProviderFactory']

View file

@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
class BaseSearchProvider(ABC):
"""
Abstract base class for search providers.
All search providers must implement these methods.
"""
@abstractmethod
def __init__(self, api_key: Optional[str] = None):
"""
Initialize the search provider.
Args:
api_key: Optional API key for the search provider
"""
pass
@abstractmethod
def search(self, query: str, **kwargs) -> Dict[str, Any]:
"""
Perform a search using the provider.
Args:
query: The search query string
**kwargs: Additional search parameters specific to the provider
Returns:
Dict containing the search results or error information
"""
pass
@abstractmethod
def is_configured(self) -> bool:
"""
Check if the provider is properly configured (e.g., has valid API key).
Returns:
bool indicating if the provider is ready to use
"""
pass

View file

@ -0,0 +1,200 @@
from typing import Dict, Any, Optional
import os
import sys
from pathlib import Path
import requests
from datetime import datetime, timedelta
import json
# Add parent directory to path for imports when running as script
if __name__ == "__main__":
sys.path.append(str(Path(__file__).parent.parent))
from search_providers.base_provider import BaseSearchProvider
else:
from .base_provider import BaseSearchProvider
class BingSearchProvider(BaseSearchProvider):
"""
Bing implementation of the search provider interface.
Handles both web and news-specific searches using Bing's APIs.
"""
WEB_SEARCH_ENDPOINT = "https://api.bing.microsoft.com/v7.0/search"
NEWS_SEARCH_ENDPOINT = "https://api.bing.microsoft.com/v7.0/news/search"
def __init__(self, api_key: Optional[str] = None):
"""
Initialize the Bing search provider.
Args:
api_key: Optional Bing API key. If not provided, will try to get from environment.
"""
self.api_key = api_key or os.getenv("BING_API_KEY")
self.headers = {
'Ocp-Apim-Subscription-Key': self.api_key,
'Accept': 'application/json'
} if self.api_key else None
# Load trusted news sources
self.trusted_sources = self._load_trusted_sources()
def _load_trusted_sources(self) -> list:
"""Load first 5 trusted news sources from JSON file."""
try:
json_path = Path(__file__).parent / "trusted_news_sources.json"
with open(json_path) as f:
data = json.load(f)
# Only load the first 16 sources as per MSFT limits
return data.get("trusted_sources", [])[:16]
except Exception as e:
print(f"Warning: Could not load trusted news sources: {e}")
return []
def is_configured(self) -> bool:
"""Check if Bing API is properly configured."""
return self.headers is not None
def search(self, query: str, **kwargs) -> Dict[str, Any]:
"""
Perform a search using Bing API.
Args:
query: The search query string
**kwargs: Additional search parameters:
- topic: Optional search topic (e.g., "news")
- max_results: Maximum number of results (default: 10)
- market: Market code (default: "en-US")
- days: Number of days to look back (for news searches)
Returns:
Dict containing search results or error information
"""
if not self.is_configured():
return {'error': 'Bing API key not configured'}
try:
# Set default search parameters
search_params = {
'count': str(kwargs.get('max_results', 10)), # Changed default from 5 to 10
'mkt': kwargs.get('market', 'en-US'),
'textFormat': 'Raw'
}
# Determine if this is a news search
if kwargs.get('topic') == 'news':
# Add freshness parameter for news if days specified
if 'days' in kwargs:
# Bing API expects 'day', 'week', or 'month'
search_params['freshness'] = 'week' if kwargs['days'] >1 else 'day'
# Add site: operators for trusted sources
if self.trusted_sources:
site_operators = " OR ".join(f'site:{source}' for source in self.trusted_sources)
search_params['q'] = f"({query}) ({site_operators})"
else:
search_params['q'] = f"latest headlines about the topic: {query}"
response = requests.get(
self.NEWS_SEARCH_ENDPOINT,
headers=self.headers,
params=search_params
)
else:
search_params['q'] = query
response = requests.get(
self.WEB_SEARCH_ENDPOINT,
headers=self.headers,
params=search_params
)
if response.status_code != 200:
return {'error': f'API request failed with status {response.status_code}: {response.text}'}
response_data = response.json()
# Process results based on search type
if kwargs.get('topic') == 'news':
return self._process_news_results(
response_data,
days=kwargs.get('days', 3),
topic=query
)
else:
return self._process_general_results(response_data)
except requests.exceptions.RequestException as e:
return {'error': f'API request failed: {str(e)}'}
except Exception as e:
return {'error': f'An unexpected error occurred: {str(e)}'}
def _process_general_results(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""Process results for general web searches."""
webpages = response.get('webPages', {}).get('value', [])
return {
'results': [{
'title': result.get('name', ''),
'url': result.get('url', ''),
'content': result.get('snippet', ''),
'score': 1.0 # Bing doesn't provide relevance scores
} for result in webpages[:10]] # Changed from 3 to 10
}
def _process_news_results(self, response: Dict[str, Any], days: int, topic: str) -> Dict[str, Any]:
"""Process results for news-specific searches."""
articles = response.get('value', [])
return {
'articles': [{
'title': article.get('name', ''),
'url': article.get('url', ''),
'published_date': article.get('datePublished', ''),
'content': article.get('description', ''),
'score': 1.0 # Bing doesn't provide relevance scores
} for article in articles],
'time_period': f"Past {days} days",
'topic': topic
}
if __name__ == "__main__":
# Test code using actual API
provider = BingSearchProvider()
if not provider.is_configured():
print("Error: Bing API key not configured")
exit(1)
# Print loaded trusted sources
print("\n=== Loaded Trusted Sources ===")
print(provider.trusted_sources)
# Test general search
print("\n=== Testing General Search ===")
general_result = provider.search(
"What is artificial intelligence?",
max_results=10 # Changed from 3 to 10
)
if 'error' in general_result:
print(f"Error in general search: {general_result['error']}")
else:
print("\nTop Results:")
for idx, result in enumerate(general_result['results'], 1):
print(f"\n{idx}. {result['title']}")
print(f" URL: {result['url']}")
print(f" Preview: {result['content'][:400]}...")
# Test news search
print("\n\n=== Testing News Search ===")
news_result = provider.search(
"mike tyson fight",
topic="news",
days=3
)
if 'error' in news_result:
print(f"Error in news search: {news_result['error']}")
else:
print("\nRecent Articles:")
for idx, article in enumerate(news_result['articles'], 1):
print(f"\n{idx}. {article['title']}")
print(f" Published: {article['published_date']}")
print(f" URL: {article['url']}")
print(f" Preview: {article['content'][:400]}...")

View file

@ -0,0 +1,308 @@
from typing import Dict, Any, Optional
import os
import sys
from pathlib import Path
import requests
from datetime import datetime, timedelta
import json
from concurrent.futures import ThreadPoolExecutor
# Add parent directory to path for imports when running as script
if __name__ == "__main__":
sys.path.append(str(Path(__file__).parent.parent))
from search_providers.base_provider import BaseSearchProvider
else:
from .base_provider import BaseSearchProvider
class BraveSearchProvider(BaseSearchProvider):
"""
Brave implementation of the search provider interface.
Handles both web and news-specific searches using Brave's APIs.
"""
WEB_SEARCH_ENDPOINT = "https://api.search.brave.com/res/v1/web/search"
NEWS_SEARCH_ENDPOINT = "https://api.search.brave.com/res/v1/news/search"
SUMMARIZER_ENDPOINT = "https://api.search.brave.com/res/v1/summarizer/search"
def __init__(self, api_key: Optional[str] = None):
"""
Initialize the Brave search provider.
Args:
api_key: Optional Brave API key. If not provided, will try to get from environment.
"""
self.api_key = api_key or os.getenv("BRAVE_API_KEY")
self.pro_api_key = os.getenv("BRAVE_AI_PRO_API_KEY") #Optional, used for AI summary requests
self.headers = {
'X-Subscription-Token': self.api_key,
'Accept': 'application/json'
} if self.api_key else None
self.proheaders = {
'X-Subscription-Token': self.pro_api_key,
'Accept': 'application/json'
} if self.pro_api_key else None
def is_configured(self) -> bool:
"""Check if Brave API is properly configured."""
return self.headers is not None
def get_brave_summary(self, query):
# Query parameters
params = {
"q": query,
"summary": 1
}
# Make the initial web search request to get summarizer key
search_response = requests.get(self.WEB_SEARCH_ENDPOINT, headers=self.proheaders, params=params)
if search_response.status_code == 200:
data = search_response.json()
if "summarizer" in data and "key" in data["summarizer"]:
summarizer_key = data["summarizer"]["key"]
# Make request to summarizer endpoint
summarizer_params = {
"key": summarizer_key,
"entity_info": 1
}
summary_response = requests.get(
self.SUMMARIZER_ENDPOINT,
headers=self.proheaders,
params=summarizer_params
)
if summary_response.status_code == 200:
summary_data = summary_response.json()
try:
return summary_data['summary'][0]['data']
except (KeyError, IndexError):
return None
return None
def search(self, query: str, **kwargs) -> Dict[str, Any]:
"""
Perform a search using Brave API.
Args:
query: The search query string
**kwargs: Additional search parameters:
- topic: Optional search topic (e.g., "news")
- max_results: Maximum number of results (default: 10)
- market: Market code (default: "en-US")
- days: Number of days to look back (for news searches)
Returns:
Dict containing search results or error information
"""
if not self.is_configured():
return {'error': 'Brave API key not configured'}
try:
# Set default search parameters
search_params = {
'count': str(kwargs.get('max_results', 10)),
'country': kwargs.get('market', 'us'), # Brave uses country code
'q': query
}
# Determine if this is a news search
if kwargs.get('topic') == 'news':
# Add freshness parameter for news if days specified
if 'days' in kwargs:
days = kwargs['days']
if days <= 1:
search_params['freshness'] = 'pd' # past day
elif days <= 7:
search_params['freshness'] = 'pw' # past week
else:
search_params['freshness'] = 'pm' # past month
response = requests.get(
self.NEWS_SEARCH_ENDPOINT,
headers=self.headers,
params=search_params
)
response_data = response.json()
result = self._process_news_results(response_data, days=kwargs.get('days', 3), topic=query)
else:
response = requests.get(
self.WEB_SEARCH_ENDPOINT,
headers=self.headers,
params=search_params
)
response_data = response.json()
result = self._process_general_results(response_data)
# Include summarizer response if it exists
summary_response = self.get_brave_summary(query)
if summary_response:
result['summarizer'] = summary_response
return result
except requests.exceptions.RequestException as e:
return {'error': f'API request failed: {str(e)}'}
except Exception as e:
return {'error': f'An unexpected error occurred: {str(e)}'}
def _process_general_results(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""Process results for general web searches."""
web_results = response.get('web', {}).get('results', [])
with ThreadPoolExecutor() as executor:
# Use index as key instead of the result dictionary
futures = {i: executor.submit(self.get_brave_summary, result.get('title', ''))
for i, result in enumerate(web_results[:2])}
results = []
for i, result in enumerate(web_results):
summary = None
if i < 2:
try:
summary = futures[i].result()
except Exception as e:
print(f"Error getting summary: {e}")
processed_result = {
'title': result.get('title', ''),
'url': result.get('url', ''),
'content': result.get('description', ''),
'score': result.get('score', 1.0),
'extra_snippets': None,
'summary': None
}
if summary:
processed_result['summary'] = summary
else:
processed_result['extra_snippets'] = result.get('extra_snippets', [])
results.append(processed_result)
return {'results': results}
def _process_news_results(self, response: Dict[str, Any], days: int, topic: str) -> Dict[str, Any]:
"""Process results for news-specific searches."""
news_results = response.get('results', [])
def convert_age_to_minutes(age_str: str) -> int:
"""
Convert age string to minutes.
Args:
age_str: Age string in the format of "X minutes", "X hours", "X days"
Returns:
Age in minutes
"""
age_value = int(age_str.split()[0])
age_unit = age_str.split()[1]
if age_unit == 'minutes':
return age_value
elif age_unit == 'hours':
return age_value * 60
elif age_unit == 'days':
return age_value * 1440 # 24 hours * 60 minutes
else:
return 0 # Default to 0 if unknown unit
# Sort news results based on the age field
news_results.sort(key=lambda x: convert_age_to_minutes(x.get('age', '0 minutes')))
with ThreadPoolExecutor() as executor:
# Use enumerate to create futures with index as key
futures = {i: executor.submit(self.get_brave_summary, article_data.get('title', ''))
for i, article_data in enumerate(news_results)}
articles = []
for i, article_data in enumerate(news_results):
try:
summary = futures[i].result()
except Exception as e:
print(f"Error getting summary: {e}")
summary = None
article = {
'title': article_data.get('title', ''),
'url': article_data.get('url', ''),
'published_date': article_data.get('age', ''),
'breaking' : article_data.get('breaking', False),
'content': article_data.get('description', ''),
'extra_snippets': None,
'summary': None,
'score': article_data.get('score', 1.0)
}
if summary:
article['summary'] = summary
else:
article['extra_snippets'] = article_data.get('extra_snippets', [])
articles.append(article)
return {
'articles': articles,
'time_period': f"Past {days} days",
'topic': topic
}
if __name__ == "__main__":
# Test code using actual API
provider = BraveSearchProvider()
if not provider.is_configured():
print("Error: Brave API key not configured")
exit(1)
# Test general search
print("\n=== Testing General Search ===")
general_result = provider.search(
"What is artificial intelligence?",
max_results=1 # Increased max_results to test summary limiting
)
if 'error' in general_result:
print(f"Error in general search: {general_result['error']}")
else:
print("\nTop Results:")
for idx, result in enumerate(general_result['results'], 1):
print(f"\n{idx}. {result['title']}")
print(f" URL: {result['url']}")
print(f" Preview: {result['content']}...")
print(f" Score: {result['score']}")
if result['extra_snippets']:
print(" Extra Snippets:")
for snippet in result['extra_snippets']:
print(f" - {snippet}")
if result['summary']: # Check if summary exists before printing
print(f" Summary: {result.get('summary', '')}...")
import time
time.sleep(1)
# Test news search
print("\n\n=== Testing News Search ===")
import time
start_time = time.time()
news_result = provider.search(
"mike tyson fight",
topic="news",
days=3,
max_results=1
)
end_time = time.time()
if 'error' in news_result:
print(f"Error in news search: {news_result['error']}")
else:
print("\nRecent Articles:")
for idx, article in enumerate(news_result['articles'], 1):
print(f"\n{idx}. {article['title']}")
print(f" Published: {article['published_date']}")
print(f" Breaking: {article['breaking']}")
print(f" URL: {article['url']}")
print(f" Preview: {article['content'][:400]}...")
if article['extra_snippets']:
print(" Extra Snippets:")
for snippet in article['extra_snippets']:
print(f" - {snippet}")
if article['summary']:
print(f" Summary: {article.get('summary', '')}...")
print(f"Execution time: {round(end_time - start_time, 1)} seconds")

View file

@ -0,0 +1,231 @@
from typing import Dict, Any, Optional
import os
import sys
import json
from pathlib import Path
import requests
from datetime import datetime, timedelta
# Add parent directory to path for imports when running as script
if __name__ == "__main__":
sys.path.append(str(Path(__file__).parent.parent))
from search_providers.base_provider import BaseSearchProvider
else:
from .base_provider import BaseSearchProvider
class ExaSearchProvider(BaseSearchProvider):
"""
Exa.ai implementation of the search provider interface.
Handles web searches with optional full page content retrieval.
"""
def __init__(self, api_key: Optional[str] = None):
"""
Initialize the Exa search provider.
Args:
api_key: Optional Exa API key. If not provided, will try to get from environment.
"""
self.api_key = api_key or os.getenv("EXA_API_KEY")
self.base_url = "https://api.exa.ai/search"
self.trusted_sources = self._load_trusted_sources()
def _load_trusted_sources(self) -> list:
"""Load trusted news sources from JSON file."""
try:
json_path = Path(__file__).parent / 'trusted_news_sources.json'
with open(json_path) as f:
data = json.load(f)
return data.get('trusted_sources', [])
except Exception as e:
print(f"Warning: Could not load trusted sources: {e}")
return []
def is_configured(self) -> bool:
"""Check if Exa client is properly configured."""
return bool(self.api_key)
def search(self, query: str, **kwargs) -> Dict[str, Any]:
"""
Perform a search using Exa API.
Args:
query: The search query string
**kwargs: Additional search parameters:
- include_content: Whether to retrieve full page contents (default: False)
- max_results: Maximum number of results (default: 3)
- days: Number of days to look back (for news searches)
Returns:
Dict containing search results or error information
"""
if not self.is_configured():
return {'error': 'Exa API key not configured'}
try:
# Set default search parameters
search_params = {
'query': query,
'type': 'neural',
'useAutoprompt': True,
'numResults': kwargs.get('max_results', 3),
}
# Add optional parameters
if kwargs.get('include_content'):
search_params['contents'] = {
"highlights": True,
"summary": True
}
if kwargs.get('days'):
# Convert days to timestamp for time-based filtering
date_limit = datetime.now() - timedelta(days=kwargs['days'])
search_params['startPublishedTime'] = date_limit.isoformat()
# Add trusted domains for news searches
if kwargs.get('topic') == 'news' and self.trusted_sources:
search_params['includeDomains'] = self.trusted_sources
# Make API request
headers = {
'x-api-key': self.api_key,
'Content-Type': 'application/json',
'accept': 'application/json'
}
response = requests.post(
self.base_url,
headers=headers,
json=search_params
)
response.raise_for_status()
data = response.json()
# Process results based on whether it's a news search
if kwargs.get('topic') == 'news':
return self._process_news_results(
data,
days=kwargs.get('days', 3),
topic=query
)
else:
return self._process_general_results(data)
except requests.exceptions.RequestException as e:
if e.response and e.response.status_code == 401:
return {'error': 'Invalid Exa API key'}
elif e.response and e.response.status_code == 429:
return {'error': 'Exa API rate limit exceeded'}
else:
return {'error': f'An error occurred while making the request: {str(e)}'}
except Exception as e:
return {'error': f'An unexpected error occurred: {str(e)}'}
def _process_general_results(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""Process results for general searches."""
results = []
for result in response.get('results', []):
processed_result = {
'title': result.get('title', ''),
'url': result.get('url', ''),
'highlights': result.get('highlights', []),
'summary': result.get('summary', ''),
'score': result.get('score', 0.0)
}
results.append(processed_result)
return {
'results': results,
'autoprompt': response.get('autopromptString', '')
}
def _process_news_results(self, response: Dict[str, Any], days: int, topic: str) -> Dict[str, Any]:
"""Process results for news-specific searches."""
articles = []
for article in response.get('results', []):
processed_article = {
'title': article.get('title', ''),
'url': article.get('url', ''),
'published_date': article.get('publishedDate', ''),
'highlights': article.get('highlights', []),
'summary': article.get('summary', ''),
'score': article.get('score', 0.0)
}
articles.append(processed_article)
return {
'articles': articles,
'time_period': f"Past {days} days",
'topic': topic,
'autoprompt': response.get('autopromptString', '')
}
if __name__ == "__main__":
# Test code for the Exa provider
provider = ExaSearchProvider()
if not provider.is_configured():
print("Error: Exa API key not configured")
exit(1)
# Test general search
print("\n=== Testing General Search ===")
import time
start_time = time.time()
general_result = provider.search(
"What is artificial intelligence?",
max_results=3,
include_content=True
)
end_time = time.time()
if 'error' in general_result:
print("Error:", general_result['error'])
else:
print("\nTop Results:")
print(f"Autoprompt: {general_result.get('autoprompt', '')}")
for idx, result in enumerate(general_result['results'], 1):
print(f"\n{idx}. {result['title']}")
print(f" URL: {result['url']}")
print(f" Score: {result['score']}")
print(f" Summary: {result['summary']}")
if result['highlights']:
print(" Highlights:")
for highlight in result['highlights']:
print(f" - {highlight}")
print(f"\n\nTime taken for general search: {end_time - start_time} seconds")
# Test news search
print("\n\n=== Testing News Search ===")
start_time = time.time()
news_result = provider.search(
"Latest developments in AI",
topic="news",
days=3,
max_results=3,
include_content=True
)
end_time = time.time()
if 'error' in news_result:
print("Error:", news_result['error'])
else:
print("\nRecent Articles:")
print(f"Autoprompt: {news_result.get('autoprompt', '')}")
for idx, article in enumerate(news_result['articles'], 1):
print(f"\n{idx}. {article['title']}")
print(f" Published: {article['published_date']}")
print(f" URL: {article['url']}")
print(f" Score: {article['score']}")
print(f" Summary: {article['summary']}")
if article['highlights']:
print(" Highlights:")
for highlight in article['highlights']:
print(f" - {highlight}")
print(f"\n\nTime taken for news search: {end_time - start_time} seconds")
# Test error handling
print("\n\n=== Testing Error Handling ===")
bad_provider = ExaSearchProvider(api_key="invalid_key")
error_result = bad_provider.search("test query")
print("\nExpected error with invalid API key:", error_result['error'])

View file

@ -0,0 +1,50 @@
"""Factory for creating search providers based on configuration."""
from typing import Type, Dict, Any
from search_providers.base_provider import BaseSearchProvider
from search_providers.bing_provider import BingSearchProvider
from search_providers.brave_provider import BraveSearchProvider
from search_providers.exa_provider import ExaSearchProvider
from search_providers.tavily_provider import TavilySearchProvider
from system_config import get_search_config
class SearchProviderFactory:
"""
Factory class for creating instances of search providers.
"""
_providers: Dict[str, Type[BaseSearchProvider]] = {
"bing": BingSearchProvider,
"brave": BraveSearchProvider,
"exa": ExaSearchProvider,
"tavily": TavilySearchProvider,
}
@classmethod
def get_provider(cls, provider_type: str, **kwargs) -> BaseSearchProvider:
"""
Get an instance of the specified search provider.
Args:
provider_type: The type of search provider to create (e.g., "bing", "google").
**kwargs: Additional keyword arguments to pass to the provider's constructor.
Returns:
An instance of the requested search provider, or None if the provider type is invalid.
"""
provider_class = cls._providers.get(provider_type.lower())
if not provider_class:
raise ValueError(f"Invalid search provider type: {provider_type}")
return provider_class(**kwargs)
@classmethod
def get_available_providers(cls) -> Dict[str, Type[BaseSearchProvider]]:
"""
Get a dictionary of available search provider types and their corresponding classes.
Returns:
A dictionary where keys are provider types (e.g., "bing", "google") and values are
the corresponding search provider classes.
"""
return cls._providers

View file

@ -0,0 +1,160 @@
from typing import Dict, Any, Optional
import os
import sys
from pathlib import Path
# Add parent directory to path for imports when running as script
if __name__ == "__main__":
sys.path.append(str(Path(__file__).parent.parent))
from search_providers.base_provider import BaseSearchProvider
else:
from .base_provider import BaseSearchProvider
from tavily import TavilyClient, MissingAPIKeyError, InvalidAPIKeyError, UsageLimitExceededError
class TavilySearchProvider(BaseSearchProvider):
"""
Tavily implementation of the search provider interface.
Handles both general and news-specific searches.
"""
def __init__(self, api_key: Optional[str] = None):
"""
Initialize the Tavily search provider.
Args:
api_key: Optional Tavily API key. If not provided, will try to get from environment.
"""
self.api_key = api_key or os.getenv("TAVILY_API_KEY")
try:
self.client = TavilyClient(api_key=self.api_key) if self.api_key else None
except MissingAPIKeyError:
self.client = None
def is_configured(self) -> bool:
"""Check if Tavily client is properly configured."""
return self.client is not None
def search(self, query: str, **kwargs) -> Dict[str, Any]:
"""
Perform a search using Tavily API.
Args:
query: The search query string
**kwargs: Additional search parameters:
- search_depth: "basic" or "advanced" (default: "basic")
- topic: Optional search topic (e.g., "news")
- max_results: Maximum number of results (default: 5)
- include_answer: Whether to include AI-generated answer (default: True)
- include_images: Whether to include images (default: False)
- days: Number of days to look back (for news searches)
Returns:
Dict containing search results or error information
"""
if not self.is_configured():
return {'error': 'Tavily API key not configured'}
try:
# Set default search parameters
search_params = {
'search_depth': "basic",
'max_results': 5,
'include_answer': True,
'include_images': False
}
# Update with any provided parameters
search_params.update(kwargs)
# Execute search
response = self.client.search(query, **search_params)
# Process results based on whether it's a news search
if kwargs.get('topic') == 'news':
return self._process_news_results(
response,
days=kwargs.get('days', 3),
topic=query
)
else:
return self._process_general_results(response)
except InvalidAPIKeyError:
return {'error': 'Invalid Tavily API key'}
except UsageLimitExceededError:
return {'error': 'Tavily API usage limit exceeded'}
except Exception as e:
return {'error': f'An unexpected error occurred: {e}'}
def _process_general_results(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""Process results for general searches."""
return {
'answer': response.get('answer', ''),
'results': [{
'title': result.get('title', ''),
'url': result.get('url', ''),
'content': result.get('content', '')[:500] + '...' if result.get('content') else '',
'score': result.get('score', 0.0)
} for result in response.get('results', [])]
}
def _process_news_results(self, response: Dict[str, Any], days: int, topic: str) -> Dict[str, Any]:
"""Process results for news-specific searches."""
return {
'answer': response.get('answer', ''),
'articles': [{
'title': article.get('title', ''),
'url': article.get('url', ''),
'published_date': article.get('published_date', ''),
'content': article.get('content', '')[:500] + '...' if article.get('content') else '',
'score': article.get('score', 0.0)
} for article in response.get('results', [])],
'time_period': f"Past {days} days",
'topic': topic
}
if __name__ == "__main__":
# Test code for the Tavily provider
provider = TavilySearchProvider()
if not provider.is_configured():
print("Error: Tavily API key not configured")
exit(1)
# Test general search
print("\n=== Testing General Search ===")
general_result = provider.search(
"What is artificial intelligence?",
search_depth="advanced",
max_results=3
)
print("\nQuery Answer:", general_result['answer'])
print("\nTop Results:")
for idx, result in enumerate(general_result['results'], 1):
print(f"\n{idx}. {result['title']}")
print(f" URL: {result['url']}")
print(f" Score: {result['score']}")
print(f" Preview: {result['content'][:200]}...")
# Test news search
print("\n\n=== Testing News Search ===")
news_result = provider.search(
"Latest developments in AI",
topic="news",
days=3,
search_depth="advanced"
)
print("\nNews Summary:", news_result['answer'])
print("\nRecent Articles:")
for idx, article in enumerate(news_result['articles'], 1):
print(f"\n{idx}. {article['title']}")
print(f" Published: {article['published_date']}")
print(f" URL: {article['url']}")
print(f" Score: {article['score']}")
print(f" Preview: {article['content'][:400]}...")
# Test error handling
print("\n\n=== Testing Error Handling ===")
bad_provider = TavilySearchProvider(api_key="invalid_key")
error_result = bad_provider.search("test query")
print("\nExpected error with invalid API key:", error_result['error'])

View file

@ -0,0 +1,71 @@
{
"trusted_sources": [
"apnews.com",
"reuters.com",
"bbc.com",
"wsj.com",
"nytimes.com",
"economist.com",
"bloomberg.com",
"ft.com",
"aljazeera.com",
"afp.com",
"techcrunch.com",
"wired.com",
"arstechnica.com",
"theverge.com",
"cnet.com",
"theguardian.com",
"businessinsider.com",
"dw.com",
"time.com",
"afp.com",
"pbs.org",
"npr.org",
"cnbc.com",
"forbes.com",
"thehill.com",
"politico.com",
"axios.com",
"euronews.com",
"japantimes.co.jp",
"scmp.com",
"straitstimes.com",
"themoscowtimes.com",
"haaretz.com",
"timesofindia.com",
"globeandmail.com",
"abc.net.au",
"rte.ie",
"swissinfo.ch",
"thelocal.fr",
"thelocal.de",
"thelocal.se",
"kyivpost.com",
"arabnews.com",
"koreatimes.co.kr",
"bangkokpost.com",
"zdnet.com",
"cnet.com",
"engadget.com",
"gizmodo.com",
"thenextweb.com",
"venturebeat.com",
"techradar.com",
"tomshardware.com",
"anandtech.com",
"slashdot.org",
"techspot.com",
"phoronix.com",
"404media.co",
"theregister.com",
"techdirt.com",
"techrepublic.com",
"mit.edu",
"protocol.com",
"theinformation.com",
"restofworld.org",
"news.ycombinator.com"
]
}

147
system_config.py Normal file
View file

@ -0,0 +1,147 @@
"""
System-wide configuration settings for Web Scraper, Logging, and Research components
"""
import logging
import logging.handlers
# Web Scraper Configuration
SCRAPER_CONFIG = {
"user_agent": "WebLLMAssistant/1.0 (+https://github.com/YourUsername/Web-LLM-Assistant-Llama-cpp)",
"rate_limit": 1, # Seconds between requests to same domain
"timeout": 10, # Request timeout in seconds
"max_retries": 3, # Number of retry attempts for failed requests
"max_workers": 5, # Maximum number of concurrent scraping threads
"content_limits": {
"max_content_length": 2400, # Maximum characters to extract from content
"max_links": 10 # Maximum number of links to extract
},
"respect_robots_txt": False # Whether to respect robots.txt
}
# Search Provider Configuration
SEARCH_CONFIG = {
"default_provider": "duckduckgo", # Default search provider to use
"fallback_order": [ # Order of providers to try if default fails
"exa",
"bing",
"brave",
"tavily",
"duckduckgo" # Keep DuckDuckGo as final fallback
],
"provider_settings": {
"tavily": {
"search_depth": "basic",
"max_results": 5,
"include_answer": True,
"include_images": False
},
"brave": {
"max_results": 10
},
"bing": {
"max_results": 10,
"freshness": "Month" # Time range for results
},
"exa": {
"max_results": 10,
"use_highlights": True
},
"duckduckgo": {
"max_results": 10,
"region": "wt-wt", # Worldwide results
"safesearch": "off"
}
},
"rate_limiting": {
"requests_per_minute": 10,
"cooldown_period": 60 # Seconds to wait after hitting rate limit
}
}
# System-wide Logging Configuration
LOGGING_CONFIG = {
"level": logging.INFO,
"format": "%(asctime)s - %(levelname)s - %(message)s",
"handlers": {
"console": {
"enabled": True,
"level": logging.INFO
},
"file": {
"enabled": True,
"level": logging.DEBUG,
"filename": "web_llm.log",
"max_bytes": 1024 * 1024, # 1MB
"backup_count": 3
}
}
}
# Research Configuration
RESEARCH_CONFIG = {
"search": {
"max_searches_per_cycle": 5,
"max_results_per_search": 10,
"min_relevance_score": 0.6
},
"content": {
"max_document_size": 12000, # Maximum size of research document in characters
"max_chunk_size": 2000, # Maximum size of content chunks for processing
"min_chunk_size": 100 # Minimum size of content chunks to process
},
"storage": {
"auto_save": True,
"auto_save_interval": 150, # Auto-save interval in seconds
"backup_enabled": True,
"max_backups": 2
},
"rate_limiting": {
"requests_per_minute": 60,
"concurrent_requests": 5,
"cooldown_period": 60 # Seconds to wait after hitting rate limit
}
}
def setup_logging():
"""Configure logging based on LOGGING_CONFIG settings"""
logging.basicConfig(
level=LOGGING_CONFIG["level"],
format=LOGGING_CONFIG["format"]
)
logger = logging.getLogger()
# Clear existing handlers
logger.handlers.clear()
# Console handler
if LOGGING_CONFIG["handlers"]["console"]["enabled"]:
console_handler = logging.StreamHandler()
console_handler.setLevel(LOGGING_CONFIG["handlers"]["console"]["level"])
console_handler.setFormatter(logging.Formatter(LOGGING_CONFIG["format"]))
logger.addHandler(console_handler)
# File handler
if LOGGING_CONFIG["handlers"]["file"]["enabled"]:
file_handler = logging.handlers.RotatingFileHandler(
LOGGING_CONFIG["handlers"]["file"]["filename"],
maxBytes=LOGGING_CONFIG["handlers"]["file"]["max_bytes"],
backupCount=LOGGING_CONFIG["handlers"]["file"]["backup_count"]
)
file_handler.setLevel(LOGGING_CONFIG["handlers"]["file"]["level"])
file_handler.setFormatter(logging.Formatter(LOGGING_CONFIG["format"]))
logger.addHandler(file_handler)
return logger
def get_scraper_config():
"""Get the web scraper configuration"""
return SCRAPER_CONFIG
def get_research_config():
"""Get the research configuration"""
return RESEARCH_CONFIG
def get_search_config():
"""Get the search provider configuration"""
return SEARCH_CONFIG