Add files via upload

This commit is contained in:
James 2024-11-20 17:56:34 +10:00 committed by GitHub
parent 7a221ee448
commit 84b587a03b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 3092 additions and 1 deletions

View file

@ -1,6 +1,6 @@
MIT License MIT License
Copyright (c) 2024 James Copyright (c) 2024 James Warburton
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

132
README.md Normal file
View file

@ -0,0 +1,132 @@
# Automated-AI-Web-Researcher-Ollama
## Description
Automated-AI-Web-Researcher is an innovative research assistant that leverages locally-run large language models through Ollama to conduct thorough, automated online research on any given topic or question. Unlike traditional LLM interactions, this tool actually performs structured research by breaking down queries into focused research areas, systematically investigating via web searching and then scraping of relevant websites each area, and compiling it's findings all saved automatically into a text document with all content found and links for the source of each, and whenever you want it to stop it's research you can input a command which then results in the research terminating and the LLM reviewing all the content it found and providing a comprehensive final summary to your original topic or question, and then you can also ask the LLM questions about it's research findings if you would like.
Here's how it works:
1. You provide a research query (e.g., "What year will global population begin to decrease rather than increase according to research?")
2. The LLM analyzes your query and generates 5 specific research focus areas, each with assigned priorities based on relevance to the topic or question.
3. Starting with the highest priority area, the LLM:
- Formulates targeted search queries
- Performs web searches
- Analyzes search results selecting the most relevant web pages
- Scrapes and extracts relevant information for selected web pages
- Documents all content it has found during the research session into a research file including links to websites that the content was retrieved from
4. After investigating all focus areas, the LLM based on information is found generates new focus areas, and repeating it's research cycle, often finding new relevant focus areas based on findings in research it has previously found leading to interesting and novel research focuses in some cases.
5. You can let it research as long as you would like at any time being able to input a quit command which then stops the research and causes the LLM to review all the content collected so far in full and generate a comprehensive summary to respond to your original query or topic.
6. Then the LLM will enter a conversation mode where you can ask specific questions about the research findings if desired.
The key distinction is that this isn't just a chatbot - it's an automated research assistant that methodically investigates topics and maintains a documented research trail all from a single question or topic of your choosing, and depending on your system and model can do over a hundred searches and content retrievals in a relatively short amount of time, you can leave it running and come back to a full text document with over a hundred pieces of content from relevant websites, and then have it summarise the findings and then even ask it questions about what it found.
## Features
- Automated research planning with prioritized focus areas
- Systematic web searching and content analysis
- All research content and source URLs saved into a detailed text document
- Research summary generation
- Post-research Q&A capability about findings
- Self-improving search mechanism
- Rich console output with status indicators
- Comprehensive answer synthesis using web-sourced information
- Research conversation mode for exploring findings
## Installation
1. Clone the repository:
git clone https://github.com/YourUsername/Automated-AI-Web-Researcher-Ollama
cd Automated-AI-Web-Researcher-Ollama
2. Create and activate a virtual environment:
python -m venv venv
source venv/bin/activate # On Windows, use venv\Scripts\activate
3. Install dependencies:
pip install -r requirements.txt
4. Install and Configure Ollama:
- Install Ollama following instructions at https://ollama.ai
- Using your selected model file, create a custom model variant with the required context length
(phi3:3.8b-mini-128k-instruct or phi3:14b-medium-128k-instruct are recommended)
Create a file named `modelfile` with these exact contents:
FROM your-model-name
PARAMETER num_ctx 38000
Replace "your-model-name" with your chosen model (e.g., phi3:3.8b-mini-128k-instruct).
Then create the model:
ollama create research-phi3 -f modelfile
Note: This specific configuration is necessary as recent Ollama versions have reduced context windows on models like phi3:3.8b-mini-128k-instruct despite the name suggesing high context which is why the modelfile step is necessary due to the high amount of information being used during the research process.
## Usage
1. Start Ollama:
ollama serve
2. Run the researcher:
python Web-LLM.py
3. Start a research session:
- Type @ followed by your research query
- Press CTRL+D to submit
- Example: "@What year is global population projected to start declining?"
4. During research you can use the following commands by typing the letter associated with each and submitting with CTRL+D:
- Use 's' to show status.
- Use 'f' to show current focus.
- Use 'p' to pause and assess research progress, which will give you an assessment from the LLM after reviewing the entire research content whether it can answer your query or not with the content it has so far collected, then it waits for you to input one of two commands, 'c' to continue with the research or 'q' to terminate it which will result in a summary like if you terminated it without using the pause feature.
- Use 'q' to quit research.
5. After research completes:
- Wait for the summary to be generated, and review the LLM's findings.
- Enter conversation mode to ask specific questions about the findings.
- Access the detailed research content found, avaliable in the in a research session text file which will appear in the programs directory, which includes:
* All retrieved content
* Source URLs for all information
* Focus areas investigated
* Generated summary
## Configuration
The LLM settings can be modified in `llm_config.py`. You must specify your model name in the configuration for the researcher to function. The default configuration is optimized for research tasks with the specified Phi-3 model.
## Current Status
This is a prototype that demonstrates functional automated research capabilities. While still in development, it successfully performs structured research tasks. Currently tested and working well with the phi3:3.8b-mini-128k-instruct model when the context is set as advised previously.
## Dependencies
- Ollama
- Python packages listed in requirements.txt
- Recommended model: phi3:3.8b-mini-128k-instruct or phi3:14b-medium-128k-instruct (with custom context length as specified)
## Contributing
Contributions are welcome! This is a prototype with room for improvements and new features.
## License
This project is licensed under the MIT License - see the [LICENSE] file for details.
## Acknowledgments
- Ollama team for their local LLM runtime
- DuckDuckGo for their search API
## Personal Note
This tool represents an attempt to bridge the gap between simple LLM interactions and genuine research capabilities. By structuring the research process and maintaining documentation, it aims to provide more thorough and verifiable results than traditional LLM conversations. It also represents an attempt to improve on my previous project 'Web-LLM-Assistant-Llamacpp-Ollama' which simply gave LLM's the ability to search and scrape websites to answer questions. This new program, unlike it's predecessor I feel thos program takes that capability and uses it in a novel and actually very useful way, I feel that it is the most advanced and useful way I could conceive of building on my previous program, as a very new proggrammer this being my second ever program I feel very good about the result, I hope that it hits the mark!
Given how much I have now been using it myself, unlike the previous program which felt more like a novelty then an actual tool, this is actually quite useful and unique, but I am quite biased!
Please enjoy! and feel free to submit any suggestions for improvements, so that we can make this automated AI researcher even more capable.
## Disclaimer
This project is for educational purposes only. Ensure you comply with the terms of service of all APIs and services used.

434
Self_Improving_Search.py Normal file
View file

@ -0,0 +1,434 @@
import time
import re
import os
from typing import List, Dict, Tuple, Union
from colorama import Fore, Style
import logging
import sys
from io import StringIO
from web_scraper import get_web_content, can_fetch
from llm_config import get_llm_config
from llm_response_parser import UltimateLLMResponseParser
from llm_wrapper import LLMWrapper
from urllib.parse import urlparse
# Set up logging
log_directory = 'logs'
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# Configure logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
log_file = os.path.join(log_directory, 'llama_output.log')
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.handlers = []
logger.addHandler(file_handler)
logger.propagate = False
# Suppress other loggers
for name in ['root', 'duckduckgo_search', 'requests', 'urllib3']:
logging.getLogger(name).setLevel(logging.WARNING)
logging.getLogger(name).handlers = []
logging.getLogger(name).propagate = False
class OutputRedirector:
def __init__(self, stream=None):
self.stream = stream or StringIO()
self.original_stdout = sys.stdout
self.original_stderr = sys.stderr
def __enter__(self):
sys.stdout = self.stream
sys.stderr = self.stream
return self.stream
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout = self.original_stdout
sys.stderr = self.original_stderr
class EnhancedSelfImprovingSearch:
def __init__(self, llm: LLMWrapper, parser: UltimateLLMResponseParser, max_attempts: int = 5):
self.llm = llm
self.parser = parser
self.max_attempts = max_attempts
self.llm_config = get_llm_config()
@staticmethod
def initialize_llm():
llm_wrapper = LLMWrapper()
return llm_wrapper
def print_thinking(self):
print(Fore.MAGENTA + "🧠 Thinking..." + Style.RESET_ALL)
def print_searching(self):
print(Fore.MAGENTA + "📝 Searching..." + Style.RESET_ALL)
def search_and_improve(self, user_query: str) -> str:
attempt = 0
while attempt < self.max_attempts:
print(f"\n{Fore.CYAN}Search attempt {attempt + 1}:{Style.RESET_ALL}")
self.print_searching()
try:
formulated_query, time_range = self.formulate_query(user_query, attempt)
print(f"{Fore.YELLOW}Original query: {user_query}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Formulated query: {formulated_query}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Time range: {time_range}{Style.RESET_ALL}")
if not formulated_query:
print(f"{Fore.RED}Error: Empty search query. Retrying...{Style.RESET_ALL}")
attempt += 1
continue
search_results = self.perform_search(formulated_query, time_range)
if not search_results:
print(f"{Fore.RED}No results found. Retrying with a different query...{Style.RESET_ALL}")
attempt += 1
continue
self.display_search_results(search_results)
selected_urls = self.select_relevant_pages(search_results, user_query)
if not selected_urls:
print(f"{Fore.RED}No relevant URLs found. Retrying...{Style.RESET_ALL}")
attempt += 1
continue
print(Fore.MAGENTA + "⚙️ Scraping selected pages..." + Style.RESET_ALL)
# Scraping is done without OutputRedirector to ensure messages are visible
scraped_content = self.scrape_content(selected_urls)
if not scraped_content:
print(f"{Fore.RED}Failed to scrape content. Retrying...{Style.RESET_ALL}")
attempt += 1
continue
self.display_scraped_content(scraped_content)
self.print_thinking()
with OutputRedirector() as output:
evaluation, decision = self.evaluate_scraped_content(user_query, scraped_content)
llm_output = output.getvalue()
logger.info(f"LLM Output in evaluate_scraped_content:\n{llm_output}")
print(f"{Fore.MAGENTA}Evaluation: {evaluation}{Style.RESET_ALL}")
print(f"{Fore.MAGENTA}Decision: {decision}{Style.RESET_ALL}")
if decision == "answer":
return self.generate_final_answer(user_query, scraped_content)
elif decision == "refine":
print(f"{Fore.YELLOW}Refining search...{Style.RESET_ALL}")
attempt += 1
else:
print(f"{Fore.RED}Unexpected decision. Proceeding to answer.{Style.RESET_ALL}")
return self.generate_final_answer(user_query, scraped_content)
except Exception as e:
print(f"{Fore.RED}An error occurred during search attempt. Check the log file for details.{Style.RESET_ALL}")
logger.error(f"An error occurred during search: {str(e)}", exc_info=True)
attempt += 1
return self.synthesize_final_answer(user_query)
def evaluate_scraped_content(self, user_query: str, scraped_content: Dict[str, str]) -> Tuple[str, str]:
user_query_short = user_query[:200]
prompt = f"""
Evaluate if the following scraped content contains sufficient information to answer the user's question comprehensively:
User's question: "{user_query_short}"
Scraped Content:
{self.format_scraped_content(scraped_content)}
Your task:
1. Determine if the scraped content provides enough relevant and detailed information to answer the user's question thoroughly.
2. If the information is sufficient, decide to 'answer'. If more information or clarification is needed, decide to 'refine' the search.
Respond using EXACTLY this format:
Evaluation: [Your evaluation of the scraped content]
Decision: [ONLY 'answer' if content is sufficient, or 'refine' if more information is needed]
"""
max_retries = 3
for attempt in range(max_retries):
try:
response_text = self.llm.generate(prompt, max_tokens=200, stop=None)
evaluation, decision = self.parse_evaluation_response(response_text)
if decision in ['answer', 'refine']:
return evaluation, decision
except Exception as e:
logger.warning(f"Error in evaluate_scraped_content (attempt {attempt + 1}): {str(e)}")
logger.warning("Failed to get a valid decision in evaluate_scraped_content. Defaulting to 'refine'.")
return "Failed to evaluate content.", "refine"
def parse_evaluation_response(self, response: str) -> Tuple[str, str]:
evaluation = ""
decision = ""
for line in response.strip().split('\n'):
if line.startswith('Evaluation:'):
evaluation = line.split(':', 1)[1].strip()
elif line.startswith('Decision:'):
decision = line.split(':', 1)[1].strip().lower()
return evaluation, decision
def formulate_query(self, user_query: str, attempt: int) -> Tuple[str, str]:
user_query_short = user_query[:200]
prompt = f"""
Based on the following user question, formulate a concise and effective search query:
"{user_query_short}"
Your task:
1. Create a search query of 2-5 words that will yield relevant results.
2. Determine if a specific time range is needed for the search.
Time range options:
- 'd': Limit results to the past day. Use for very recent events or rapidly changing information.
- 'w': Limit results to the past week. Use for recent events or topics with frequent updates.
- 'm': Limit results to the past month. Use for relatively recent information or ongoing events.
- 'y': Limit results to the past year. Use for annual events or information that changes yearly.
- 'none': No time limit. Use for historical information or topics not tied to a specific time frame.
Respond in the following format:
Search query: [Your 2-5 word query]
Time range: [d/w/m/y/none]
Do not provide any additional information or explanation.
"""
max_retries = 3
for retry in range(max_retries):
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=50, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in formulate_query:\n{llm_output}")
query, time_range = self.parse_query_response(response_text)
if query and time_range:
return query, time_range
return self.fallback_query(user_query), "none"
def parse_query_response(self, response: str) -> Tuple[str, str]:
query = ""
time_range = "none"
for line in response.strip().split('\n'):
if ":" in line:
key, value = line.split(":", 1)
key = key.strip().lower()
value = value.strip()
if "query" in key:
query = self.clean_query(value)
elif "time" in key or "range" in key:
time_range = self.validate_time_range(value)
return query, time_range
def clean_query(self, query: str) -> str:
query = re.sub(r'["\'\[\]]', '', query)
query = re.sub(r'\s+', ' ', query)
return query.strip()[:100]
def validate_time_range(self, time_range: str) -> str:
valid_ranges = ['d', 'w', 'm', 'y', 'none']
time_range = time_range.lower()
return time_range if time_range in valid_ranges else 'none'
def fallback_query(self, user_query: str) -> str:
words = user_query.split()
return " ".join(words[:5])
def perform_search(self, query: str, time_range: str) -> List[Dict]:
if not query:
return []
from duckduckgo_search import DDGS
with DDGS() as ddgs:
try:
with OutputRedirector() as output:
if time_range and time_range != 'none':
results = list(ddgs.text(query, timelimit=time_range, max_results=10))
else:
results = list(ddgs.text(query, max_results=10))
ddg_output = output.getvalue()
logger.info(f"DDG Output in perform_search:\n{ddg_output}")
return [{'number': i+1, **result} for i, result in enumerate(results)]
except Exception as e:
print(f"{Fore.RED}Search error: {str(e)}{Style.RESET_ALL}")
return []
def display_search_results(self, results: List[Dict]) -> None:
"""Display search results with minimal output"""
try:
if not results:
return
# Only show search success status
print(f"\nSearch query sent to DuckDuckGo: {self.last_query}")
print(f"Time range sent to DuckDuckGo: {self.last_time_range}")
print(f"Number of results: {len(results)}")
except Exception as e:
logger.error(f"Error displaying search results: {str(e)}")
def select_relevant_pages(self, search_results: List[Dict], user_query: str) -> List[str]:
prompt = f"""
Given the following search results for the user's question: "{user_query}"
Select the 2 most relevant results to scrape and analyze. Explain your reasoning for each selection.
Search Results:
{self.format_results(search_results)}
Instructions:
1. You MUST select exactly 2 result numbers from the search results.
2. Choose the results that are most likely to contain comprehensive and relevant information to answer the user's question.
3. Provide a brief reason for each selection.
You MUST respond using EXACTLY this format and nothing else:
Selected Results: [Two numbers corresponding to the selected results]
Reasoning: [Your reasoning for the selections]
"""
max_retries = 3
for retry in range(max_retries):
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=200, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in select_relevant_pages:\n{llm_output}")
parsed_response = self.parse_page_selection_response(response_text)
if parsed_response and self.validate_page_selection_response(parsed_response, len(search_results)):
selected_urls = [result['href'] for result in search_results if result['number'] in parsed_response['selected_results']]
allowed_urls = [url for url in selected_urls if can_fetch(url)]
if allowed_urls:
return allowed_urls
else:
print(f"{Fore.YELLOW}Warning: All selected URLs are disallowed by robots.txt. Retrying selection.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}Warning: Invalid page selection. Retrying.{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Warning: All attempts to select relevant pages failed. Falling back to top allowed results.{Style.RESET_ALL}")
allowed_urls = [result['href'] for result in search_results if can_fetch(result['href'])][:2]
return allowed_urls
def parse_page_selection_response(self, response: str) -> Dict[str, Union[List[int], str]]:
lines = response.strip().split('\n')
parsed = {}
for line in lines:
if line.startswith('Selected Results:'):
parsed['selected_results'] = [int(num.strip()) for num in re.findall(r'\d+', line)]
elif line.startswith('Reasoning:'):
parsed['reasoning'] = line.split(':', 1)[1].strip()
return parsed if 'selected_results' in parsed and 'reasoning' in parsed else None
def validate_page_selection_response(self, parsed_response: Dict[str, Union[List[int], str]], num_results: int) -> bool:
if len(parsed_response['selected_results']) != 2:
return False
if any(num < 1 or num > num_results for num in parsed_response['selected_results']):
return False
return True
def format_results(self, results: List[Dict]) -> str:
formatted_results = []
for result in results:
formatted_result = f"{result['number']}. Title: {result.get('title', 'N/A')}\n"
formatted_result += f" Snippet: {result.get('body', 'N/A')[:200]}...\n"
formatted_result += f" URL: {result.get('href', 'N/A')}\n"
formatted_results.append(formatted_result)
return "\n".join(formatted_results)
def scrape_content(self, urls: List[str]) -> Dict[str, str]:
scraped_content = {}
blocked_urls = []
for url in urls:
robots_allowed = can_fetch(url)
if robots_allowed:
content = get_web_content([url])
if content:
scraped_content.update(content)
print(Fore.YELLOW + f"Successfully scraped: {url}" + Style.RESET_ALL)
logger.info(f"Successfully scraped: {url}")
else:
print(Fore.RED + f"Robots.txt disallows scraping of {url}" + Style.RESET_ALL)
logger.warning(f"Robots.txt disallows scraping of {url}")
else:
blocked_urls.append(url)
print(Fore.RED + f"Warning: Robots.txt disallows scraping of {url}" + Style.RESET_ALL)
logger.warning(f"Robots.txt disallows scraping of {url}")
print(Fore.CYAN + f"Scraped content received for {len(scraped_content)} URLs" + Style.RESET_ALL)
logger.info(f"Scraped content received for {len(scraped_content)} URLs")
if blocked_urls:
print(Fore.RED + f"Warning: {len(blocked_urls)} URL(s) were not scraped due to robots.txt restrictions." + Style.RESET_ALL)
logger.warning(f"{len(blocked_urls)} URL(s) were not scraped due to robots.txt restrictions: {', '.join(blocked_urls)}")
return scraped_content
def display_scraped_content(self, scraped_content: Dict[str, str]):
print(f"\n{Fore.CYAN}Scraped Content:{Style.RESET_ALL}")
for url, content in scraped_content.items():
print(f"{Fore.GREEN}URL: {url}{Style.RESET_ALL}")
print(f"Content: {content[:4000]}...\n")
def generate_final_answer(self, user_query: str, scraped_content: Dict[str, str]) -> str:
user_query_short = user_query[:200]
prompt = f"""
You are an AI assistant. Provide a comprehensive and detailed answer to the following question using ONLY the information provided in the scraped content. Do not include any references or mention any sources. Answer directly and thoroughly.
Question: "{user_query_short}"
Scraped Content:
{self.format_scraped_content(scraped_content)}
Important Instructions:
1. Do not use phrases like "Based on the absence of selected results" or similar.
2. If the scraped content does not contain enough information to answer the question, say so explicitly and explain what information is missing.
3. Provide as much relevant detail as possible from the scraped content.
Answer:
"""
max_retries = 3
for attempt in range(max_retries):
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=1024, stop=None)
llm_output = output.getvalue()
logger.info(f"LLM Output in generate_final_answer:\n{llm_output}")
if response_text:
logger.info(f"LLM Response:\n{response_text}")
return response_text
error_message = "I apologize, but I couldn't generate a satisfactory answer based on the available information."
logger.warning(f"Failed to generate a response after {max_retries} attempts. Returning error message.")
return error_message
def format_scraped_content(self, scraped_content: Dict[str, str]) -> str:
formatted_content = []
for url, content in scraped_content.items():
content = re.sub(r'\s+', ' ', content)
formatted_content.append(f"Content from {url}:\n{content}\n")
return "\n".join(formatted_content)
def synthesize_final_answer(self, user_query: str) -> str:
prompt = f"""
After multiple search attempts, we couldn't find a fully satisfactory answer to the user's question: "{user_query}"
Please provide the best possible answer you can, acknowledging any limitations or uncertainties.
If appropriate, suggest ways the user might refine their question or where they might find more information.
Respond in a clear, concise, and informative manner.
"""
try:
with OutputRedirector() as output:
response_text = self.llm.generate(prompt, max_tokens=self.llm_config.get('max_tokens', 1024), stop=self.llm_config.get('stop', None))
llm_output = output.getvalue()
logger.info(f"LLM Output in synthesize_final_answer:\n{llm_output}")
if response_text:
return response_text.strip()
except Exception as e:
logger.error(f"Error in synthesize_final_answer: {str(e)}", exc_info=True)
return "I apologize, but after multiple attempts, I wasn't able to find a satisfactory answer to your question. Please try rephrasing your question or breaking it down into smaller, more specific queries."
# End of EnhancedSelfImprovingSearch class

305
Web-LLM.py Normal file
View file

@ -0,0 +1,305 @@
import sys
import os
from colorama import init, Fore, Style
import logging
import time
from io import StringIO
from Self_Improving_Search import EnhancedSelfImprovingSearch
from llm_config import get_llm_config
from llm_response_parser import UltimateLLMResponseParser
from llm_wrapper import LLMWrapper
from strategic_analysis_parser import StrategicAnalysisParser
from research_manager import ResearchManager
# Initialize colorama
if os.name == 'nt': # Windows-specific initialization
init(convert=True, strip=False, wrap=True)
else:
init()
# Set up logging
log_directory = 'logs'
if not os.path.exists(log_directory):
os.makedirs(log_directory)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
log_file = os.path.join(log_directory, 'web_llm.log')
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.handlers = []
logger.addHandler(file_handler)
logger.propagate = False
# Disable other loggers
for name in logging.root.manager.loggerDict:
if name != __name__:
logging.getLogger(name).disabled = True
class OutputRedirector:
def __init__(self, stream=None):
self.stream = stream or StringIO()
self.original_stdout = sys.stdout
self.original_stderr = sys.stderr
def __enter__(self):
sys.stdout = self.stream
sys.stderr = self.stream
return self.stream
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout = self.original_stdout
sys.stderr = self.original_stderr
def print_header():
print(Fore.CYAN + Style.BRIGHT + """
🌐 Advanced Research Assistant 🤖
""" + Style.RESET_ALL)
print(Fore.YELLOW + """
Welcome to the Advanced Research Assistant!
Commands:
- For web search: start message with '/'
Example: "/latest news on AI advancements"
- For research mode: start message with '@'
Example: "@analyze the impact of AI on healthcare"
Press CTRL+D (Linux/Mac) or CTRL+Z (Windows) to submit input.
""" + Style.RESET_ALL)
def get_multiline_input() -> str:
"""Get multiline input using raw terminal mode for reliable CTRL+D handling"""
print(f"{Fore.GREEN}📝 Enter your message (Press CTRL+D to submit):{Style.RESET_ALL}")
lines = []
import termios
import tty
import sys
# Save original terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# Set terminal to raw mode
tty.setraw(fd)
current_line = []
while True:
# Read one character at a time
char = sys.stdin.read(1)
# CTRL+D detection
if not char or ord(char) == 4: # EOF or CTRL+D
sys.stdout.write('\n') # New line for clean display
if current_line:
lines.append(''.join(current_line))
return ' '.join(lines).strip()
# Handle special characters
elif ord(char) == 13: # Enter
sys.stdout.write('\n')
lines.append(''.join(current_line))
current_line = []
elif ord(char) == 127: # Backspace
if current_line:
current_line.pop()
sys.stdout.write('\b \b') # Erase character
elif ord(char) == 3: # CTRL+C
sys.stdout.write('\n')
return 'q'
# Normal character
elif 32 <= ord(char) <= 126: # Printable characters
current_line.append(char)
sys.stdout.write(char)
# Flush output
sys.stdout.flush()
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
print() # New line for clean display
def initialize_system():
"""Initialize system with proper error checking"""
try:
print(Fore.YELLOW + "Initializing system..." + Style.RESET_ALL)
llm_config = get_llm_config()
if llm_config['llm_type'] == 'ollama':
import requests
try:
response = requests.get(llm_config['base_url'], timeout=5)
if response.status_code != 200:
raise ConnectionError("Cannot connect to Ollama server")
except requests.exceptions.RequestException:
raise ConnectionError(
"\nCannot connect to Ollama server!"
"\nPlease ensure:"
"\n1. Ollama is installed"
"\n2. Ollama server is running (try 'ollama serve')"
"\n3. The model specified in llm_config.py is pulled"
)
elif llm_config['llm_type'] == 'llama_cpp':
model_path = llm_config.get('model_path')
if not model_path or not os.path.exists(model_path):
raise FileNotFoundError(
f"\nLLama.cpp model not found at: {model_path}"
"\nPlease ensure model path in llm_config.py is correct"
)
with OutputRedirector() as output:
llm_wrapper = LLMWrapper()
try:
test_response = llm_wrapper.generate("Test", max_tokens=10)
if not test_response:
raise ConnectionError("LLM failed to generate response")
except Exception as e:
raise ConnectionError(f"LLM test failed: {str(e)}")
parser = UltimateLLMResponseParser()
search_engine = EnhancedSelfImprovingSearch(llm_wrapper, parser)
research_manager = ResearchManager(llm_wrapper, parser, search_engine)
print(Fore.GREEN + "System initialized successfully." + Style.RESET_ALL)
return llm_wrapper, parser, search_engine, research_manager
except Exception as e:
logger.error(f"Error initializing system: {str(e)}", exc_info=True)
print(Fore.RED + f"System initialization failed: {str(e)}" + Style.RESET_ALL)
return None, None, None, None
def handle_research_mode(research_manager, query):
"""Handles research mode operations"""
print(f"{Fore.CYAN}Initiating research mode...{Style.RESET_ALL}")
try:
# Start the research
research_manager.start_research(query)
submit_key = "CTRL+Z" if os.name == 'nt' else "CTRL+D"
print(f"\n{Fore.YELLOW}Research Running. Available Commands:{Style.RESET_ALL}")
print(f"Type command and press {submit_key}:")
print("'s' = Show status")
print("'f' = Show focus")
print("'q' = Quit research")
while research_manager.is_active():
try:
command = get_multiline_input().strip().lower()
if command == 's':
print("\n" + research_manager.get_progress())
elif command == 'f':
if research_manager.current_focus:
print(f"\n{Fore.CYAN}Current Focus:{Style.RESET_ALL}")
print(f"Area: {research_manager.current_focus.area}")
print(f"Priority: {research_manager.current_focus.priority}")
print(f"Reasoning: {research_manager.current_focus.reasoning}")
else:
print(f"\n{Fore.YELLOW}No current focus area{Style.RESET_ALL}")
elif command == 'q':
break
except KeyboardInterrupt:
break
# Get final summary first
summary = research_manager.terminate_research()
# Ensure research UI is fully cleaned up
research_manager._cleanup_research_ui()
# Now in main terminal, show summary
print(f"\n{Fore.GREEN}Research Summary:{Style.RESET_ALL}")
print(summary)
# Only NOW start conversation mode if we have a valid summary
if research_manager.research_complete and research_manager.research_summary:
time.sleep(0.5) # Small delay to ensure clean transition
research_manager.start_conversation_mode()
return
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Research interrupted.{Style.RESET_ALL}")
research_manager.terminate_research()
except Exception as e:
print(f"\n{Fore.RED}Research error: {str(e)}{Style.RESET_ALL}")
research_manager.terminate_research()
def main():
print_header()
try:
llm, parser, search_engine, research_manager = initialize_system()
if not all([llm, parser, search_engine, research_manager]):
return
while True:
try:
# Get input with improved CTRL+D handling
user_input = get_multiline_input()
# Handle immediate CTRL+D (empty input)
if user_input == "":
user_input = "@quit" # Convert empty CTRL+D to quit command
user_input = user_input.strip()
# Check for special quit markers
if user_input in ["@quit", "quit", "q"]:
print(Fore.YELLOW + "\nGoodbye!" + Style.RESET_ALL)
break
if not user_input:
continue
if user_input.lower() == 'help':
print_header()
continue
if user_input.startswith('/'):
search_query = user_input[1:].strip()
handle_search_mode(search_engine, search_query)
elif user_input.startswith('@'):
research_query = user_input[1:].strip()
handle_research_mode(research_manager, research_query)
else:
print(f"{Fore.RED}Please start with '/' for search or '@' for research.{Style.RESET_ALL}")
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Exiting program...{Style.RESET_ALL}")
break
except Exception as e:
logger.error(f"Error in main loop: {str(e)}")
print(f"{Fore.RED}An error occurred: {str(e)}{Style.RESET_ALL}")
continue
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Program terminated by user.{Style.RESET_ALL}")
except Exception as e:
logger.critical(f"Critical error: {str(e)}")
print(f"{Fore.RED}Critical error: {str(e)}{Style.RESET_ALL}")
finally:
# Ensure proper cleanup on exit
try:
if 'research_manager' in locals() and research_manager:
if hasattr(research_manager, 'ui'):
research_manager.ui.cleanup()
curses.endwin()
except:
pass
os._exit(0)
if __name__ == "__main__":
main()

40
llm_config.py Normal file
View file

@ -0,0 +1,40 @@
# llm_config.py
LLM_TYPE = "ollama" # Options: 'llama_cpp', 'ollama'
# LLM settings for llama_cpp
MODEL_PATH = "/home/james/llama.cpp/models/gemma-2-9b-it-Q6_K.gguf" # Replace with your llama.cpp models filepath
LLM_CONFIG_LLAMA_CPP = {
"llm_type": "llama_cpp",
"model_path": MODEL_PATH,
"n_ctx": 20000, # context size
"n_gpu_layers": 0, # number of layers to offload to GPU (-1 for all, 0 for none)
"n_threads": 8, # number of threads to use
"temperature": 0.7, # temperature for sampling
"top_p": 0.9, # top p for sampling
"top_k": 40, # top k for sampling
"repeat_penalty": 1.1, # repeat penalty
"max_tokens": 1024, # max tokens to generate
"stop": ["User:", "\n\n"] # stop sequences
}
# LLM settings for Ollama
LLM_CONFIG_OLLAMA = {
"llm_type": "ollama",
"base_url": "http://localhost:11434", # default Ollama server URL
"model_name": "custom-phi3-32k-Q4_K_M", # Replace with your Ollama model name
"temperature": 0.7,
"top_p": 0.9,
"n_ctx": 55000,
"context_length": 55000,
"stop": ["User:", "\n\n"]
}
def get_llm_config():
if LLM_TYPE == "llama_cpp":
return LLM_CONFIG_LLAMA_CPP
elif LLM_TYPE == "ollama":
return LLM_CONFIG_OLLAMA
else:
raise ValueError(f"Invalid LLM_TYPE: {LLM_TYPE}")

240
llm_response_parser.py Normal file
View file

@ -0,0 +1,240 @@
import re
from typing import Dict, List, Union, Optional
import logging
import json
from strategic_analysis_parser import StrategicAnalysisParser, AnalysisResult, ResearchFocus
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class UltimateLLMResponseParser:
def __init__(self):
self.decision_keywords = {
'refine': ['refine', 'need more info', 'insufficient', 'unclear', 'more research', 'additional search'],
'answer': ['answer', 'sufficient', 'enough info', 'can respond', 'adequate', 'comprehensive']
}
self.section_identifiers = [
('decision', r'(?i)decision\s*:'),
('reasoning', r'(?i)reasoning\s*:'),
('selected_results', r'(?i)selected results\s*:'),
('response', r'(?i)response\s*:')
]
# Initialize strategic analysis parser
self.strategic_parser = StrategicAnalysisParser()
def parse_llm_response(self, response: str, mode: str = 'search') -> Dict[str, Union[str, List[int], AnalysisResult]]:
"""
Parse LLM response based on mode
Args:
response (str): The LLM's response text
mode (str): 'search' for web search, 'research' for strategic analysis
Returns:
Dict containing parsed response
"""
logger.info(f"Starting to parse LLM response in {mode} mode")
if mode == 'research':
return self._parse_research_response(response)
# Original search mode parsing
result = {
'decision': None,
'reasoning': None,
'selected_results': [],
'response': None
}
parsing_strategies = [
self._parse_structured_response,
self._parse_json_response,
self._parse_unstructured_response,
self._parse_implicit_response
]
for strategy in parsing_strategies:
try:
parsed_result = strategy(response)
if self._is_valid_result(parsed_result):
result.update(parsed_result)
logger.info(f"Successfully parsed using strategy: {strategy.__name__}")
break
except Exception as e:
logger.warning(f"Error in parsing strategy {strategy.__name__}: {str(e)}")
if not self._is_valid_result(result):
logger.warning("All parsing strategies failed. Using fallback parsing.")
result = self._fallback_parsing(response)
result = self._post_process_result(result)
logger.info("Finished parsing LLM response")
return result
def _parse_research_response(self, response: str) -> Dict[str, Union[str, AnalysisResult]]:
"""Handle research mode specific parsing"""
try:
analysis_result = self.strategic_parser.parse_analysis(response)
if analysis_result:
return {
'mode': 'research',
'analysis_result': analysis_result,
'error': None
}
else:
logger.error("Failed to parse strategic analysis")
return {
'mode': 'research',
'analysis_result': None,
'error': 'Failed to parse strategic analysis'
}
except Exception as e:
logger.error(f"Error in research response parsing: {str(e)}")
return {
'mode': 'research',
'analysis_result': None,
'error': str(e)
}
def parse_search_query(self, query_response: str) -> Dict[str, str]:
"""Parse search query formulation response"""
try:
lines = query_response.strip().split('\n')
result = {
'query': '',
'time_range': 'none'
}
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
key = key.strip().lower()
value = value.strip()
if 'query' in key:
result['query'] = self._clean_query(value)
elif 'time' in key or 'range' in key:
result['time_range'] = self._validate_time_range(value)
return result
except Exception as e:
logger.error(f"Error parsing search query: {str(e)}")
return {'query': '', 'time_range': 'none'}
def _parse_structured_response(self, response: str) -> Dict[str, Union[str, List[int]]]:
result = {}
for key, pattern in self.section_identifiers:
match = re.search(f'{pattern}(.*?)(?={"|".join([p for k, p in self.section_identifiers if k != key])}|$)',
response, re.IGNORECASE | re.DOTALL)
if match:
result[key] = match.group(1).strip()
if 'selected_results' in result:
result['selected_results'] = self._extract_numbers(result['selected_results'])
return result
def _parse_json_response(self, response: str) -> Dict[str, Union[str, List[int]]]:
try:
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
json_str = json_match.group(0)
parsed_json = json.loads(json_str)
return {k: v for k, v in parsed_json.items()
if k in ['decision', 'reasoning', 'selected_results', 'response']}
except json.JSONDecodeError:
pass
return {}
def _parse_unstructured_response(self, response: str) -> Dict[str, Union[str, List[int]]]:
result = {}
lines = response.split('\n')
current_section = None
for line in lines:
section_match = re.match(r'(.+?)[:.-](.+)', line)
if section_match:
key = self._match_section_to_key(section_match.group(1))
if key:
current_section = key
result[key] = section_match.group(2).strip()
elif current_section:
result[current_section] += ' ' + line.strip()
if 'selected_results' in result:
result['selected_results'] = self._extract_numbers(result['selected_results'])
return result
def _parse_implicit_response(self, response: str) -> Dict[str, Union[str, List[int]]]:
result = {}
decision = self._infer_decision(response)
if decision:
result['decision'] = decision
numbers = self._extract_numbers(response)
if numbers:
result['selected_results'] = numbers
if not result:
result['response'] = response.strip()
return result
def _fallback_parsing(self, response: str) -> Dict[str, Union[str, List[int]]]:
return {
'decision': self._infer_decision(response),
'reasoning': None,
'selected_results': self._extract_numbers(response),
'response': response.strip()
}
def _post_process_result(self, result: Dict[str, Union[str, List[int]]]) -> Dict[str, Union[str, List[int]]]:
if result['decision'] not in ['refine', 'answer']:
result['decision'] = self._infer_decision(str(result))
if not isinstance(result['selected_results'], list):
result['selected_results'] = self._extract_numbers(str(result['selected_results']))
result['selected_results'] = result['selected_results'][:2]
if not result['reasoning']:
result['reasoning'] = f"Based on the {'presence' if result['selected_results'] else 'absence'} of selected results and the overall content."
if not result['response']:
result['response'] = result.get('reasoning', 'No clear response found.')
return result
def _match_section_to_key(self, section: str) -> Optional[str]:
for key, pattern in self.section_identifiers:
if re.search(pattern, section, re.IGNORECASE):
return key
return None
def _extract_numbers(self, text: str) -> List[int]:
return [int(num) for num in re.findall(r'\b(?:10|[1-9])\b', text)]
def _infer_decision(self, text: str) -> str:
text = text.lower()
refine_score = sum(text.count(keyword) for keyword in self.decision_keywords['refine'])
answer_score = sum(text.count(keyword) for keyword in self.decision_keywords['answer'])
return 'refine' if refine_score > answer_score else 'answer'
def _is_valid_result(self, result: Dict[str, Union[str, List[int]]]) -> bool:
return bool(result.get('decision') or result.get('response') or result.get('selected_results'))
def _clean_query(self, query: str) -> str:
"""Clean and validate search query"""
query = re.sub(r'["\'\[\]]', '', query)
query = re.sub(r'\s+', ' ', query)
return query.strip()[:100]
def _validate_time_range(self, time_range: str) -> str:
"""Validate time range value"""
valid_ranges = ['d', 'w', 'm', 'y', 'none']
time_range = time_range.lower()
return time_range if time_range in valid_ranges else 'none'

80
llm_wrapper.py Normal file
View file

@ -0,0 +1,80 @@
from llama_cpp import Llama
import requests
import json
from llm_config import get_llm_config
class LLMWrapper:
def __init__(self):
self.llm_config = get_llm_config()
self.llm_type = self.llm_config.get('llm_type', 'llama_cpp')
if self.llm_type == 'llama_cpp':
self.llm = self._initialize_llama_cpp()
elif self.llm_type == 'ollama':
self.base_url = self.llm_config.get('base_url', 'http://localhost:11434')
self.model_name = self.llm_config.get('model_name', 'your_model_name')
else:
raise ValueError(f"Unsupported LLM type: {self.llm_type}")
def _initialize_llama_cpp(self):
return Llama(
model_path=self.llm_config.get('model_path'),
n_ctx=self.llm_config.get('n_ctx', 55000),
n_gpu_layers=self.llm_config.get('n_gpu_layers', 0),
n_threads=self.llm_config.get('n_threads', 8),
verbose=False
)
def generate(self, prompt, **kwargs):
if self.llm_type == 'llama_cpp':
llama_kwargs = self._prepare_llama_kwargs(kwargs)
response = self.llm(prompt, **llama_kwargs)
return response['choices'][0]['text'].strip()
elif self.llm_type == 'ollama':
return self._ollama_generate(prompt, **kwargs)
else:
raise ValueError(f"Unsupported LLM type: {self.llm_type}")
def _ollama_generate(self, prompt, **kwargs):
url = f"{self.base_url}/api/generate"
data = {
'model': self.model_name,
'prompt': prompt,
'options': {
'temperature': kwargs.get('temperature', self.llm_config.get('temperature', 0.7)),
'top_p': kwargs.get('top_p', self.llm_config.get('top_p', 0.9)),
'stop': kwargs.get('stop', self.llm_config.get('stop', [])),
'num_predict': kwargs.get('max_tokens', self.llm_config.get('max_tokens', 55000)),
'context_length': self.llm_config.get('n_ctx', 55000)
}
}
response = requests.post(url, json=data, stream=True)
if response.status_code != 200:
raise Exception(f"Ollama API request failed with status {response.status_code}: {response.text}")
text = ''.join(json.loads(line)['response'] for line in response.iter_lines() if line)
return text.strip()
def _cleanup(self):
"""Force terminate any running LLM processes"""
if self.llm_type == 'ollama':
try:
# Force terminate Ollama process
requests.post(f"{self.base_url}/api/terminate")
except:
pass
try:
# Also try to terminate via subprocess if needed
import subprocess
subprocess.run(['pkill', '-f', 'ollama'], capture_output=True)
except:
pass
def _prepare_llama_kwargs(self, kwargs):
llama_kwargs = {
'max_tokens': kwargs.get('max_tokens', self.llm_config.get('max_tokens', 55000)),
'temperature': kwargs.get('temperature', self.llm_config.get('temperature', 0.7)),
'top_p': kwargs.get('top_p', self.llm_config.get('top_p', 0.9)),
'stop': kwargs.get('stop', self.llm_config.get('stop', [])),
'echo': False,
}
return llama_kwargs

11
requirements.txt Normal file
View file

@ -0,0 +1,11 @@
llama-cpp-python
duckduckgo-search
colorama
requests
beautifulsoup4
trafilatura
readchar
keyboard
curses-windows; sys_platform == 'win32'
tqdm
urllib3

1481
research_manager.py Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,219 @@
from typing import List, Dict, Optional, Union
import re
import logging
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ResearchFocus:
"""Represents a specific area of research focus"""
area: str
priority: int
source_query: str = ""
timestamp: str = ""
search_queries: List[str] = None
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if self.search_queries is None:
self.search_queries = []
@dataclass
class AnalysisResult:
"""Contains the complete analysis result"""
original_question: str
focus_areas: List[ResearchFocus]
raw_response: str
timestamp: str = ""
confidence_score: float = 0.0
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Set up logging
logger = logging.getLogger(__name__)
class StrategicAnalysisParser:
"""Enhanced parser with improved pattern matching and validation"""
def __init__(self):
self.patterns = {
'original_question': [
r"(?i)original question analysis:\s*(.*?)(?=research gap|$)",
r"(?i)original query:\s*(.*?)(?=research gap|$)",
r"(?i)research question:\s*(.*?)(?=research gap|$)",
r"(?i)topic analysis:\s*(.*?)(?=research gap|$)"
],
'research_gaps': [
r"(?i)research gaps?:\s*",
r"(?i)gaps identified:\s*",
r"(?i)areas for research:\s*",
r"(?i)investigation areas:\s*"
],
'priority': [
r"(?i)priority:\s*(\d+)",
r"(?i)priority level:\s*(\d+)",
r"(?i)\(priority:\s*(\d+)\)",
r"(?i)importance:\s*(\d+)"
]
}
self.logger = logging.getLogger(__name__)
def parse_analysis(self, llm_response: str) -> Optional[AnalysisResult]:
"""Main parsing method with improved validation"""
try:
# Clean and normalize the response
cleaned_response = self._clean_text(llm_response)
# Extract original question with validation
original_question = self._extract_original_question(cleaned_response)
if not original_question:
self.logger.warning("Failed to extract original question")
original_question = "Original question extraction failed"
# Extract and validate research areas
focus_areas = self._extract_research_areas(cleaned_response)
focus_areas = self._normalize_focus_areas(focus_areas)
# Calculate confidence score
confidence_score = self._calculate_confidence_score(original_question, focus_areas)
return AnalysisResult(
original_question=original_question,
focus_areas=focus_areas,
raw_response=llm_response,
confidence_score=confidence_score
)
except Exception as e:
self.logger.error(f"Error in parse_analysis: {str(e)}")
return None
def _clean_text(self, text: str) -> str:
"""Clean and normalize text for parsing"""
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'\s{2,}', ' ', text)
text = re.sub(r'(\d+\))', r'\1.', text)
return text.strip()
def _extract_original_question(self, text: str) -> str:
"""Extract original question with improved matching"""
for pattern in self.patterns['original_question']:
match = re.search(pattern, text, re.DOTALL)
if match:
return self._clean_text(match.group(1))
return ""
def _extract_research_areas(self, text: str) -> List[ResearchFocus]:
"""Extract research areas with enhanced validation"""
areas = []
for pattern in self.patterns['research_gaps']:
gap_match = re.search(pattern, text)
if gap_match:
sections = re.split(r'\n\s*\d+[\.)]\s+', text[gap_match.end():])
sections = [s for s in sections if s.strip()]
for section in sections:
focus = self._parse_research_focus(section)
if focus and self._is_valid_focus(focus):
areas.append(focus)
break
return areas
def _parse_research_focus(self, text: str) -> Optional[ResearchFocus]:
"""Parse research focus with improved validation without reasoning."""
try:
# Extract area
area = text.split('\n')[0].strip()
# Extract and validate priority
priority = self._extract_priority(text)
# Return ResearchFocus without reasoning
return ResearchFocus(
area=area,
priority=priority
)
except Exception as e:
self.logger.error(f"Error parsing research focus: {str(e)}")
return None
def _extract_priority(self, text: str) -> int:
"""Extract priority with validation"""
for pattern in self.patterns['priority']:
priority_match = re.search(pattern, text)
if priority_match:
try:
priority = int(priority_match.group(1))
return max(1, min(5, priority))
except ValueError:
continue
return 3 # Default priority
def _is_valid_focus(self, focus: ResearchFocus) -> bool:
"""Validate research focus completeness and quality"""
if not focus.area: # Only check if area exists and isn't empty
return False
if focus.priority < 1 or focus.priority > 5:
return False
return True
def _normalize_focus_areas(self, areas: List[ResearchFocus]) -> List[ResearchFocus]:
"""Normalize and validate focus areas"""
normalized = []
for area in areas:
if not area.area.strip():
continue
area.priority = max(1, min(5, area.priority))
if self._is_valid_focus(area):
normalized.append(area)
# Sort by priority (highest first) but don't add any filler areas
normalized.sort(key=lambda x: x.priority, reverse=True)
return normalized
def _calculate_confidence_score(self, question: str, areas: List[ResearchFocus]) -> float:
"""Calculate confidence score for analysis quality"""
score = 0.0
# Question quality (0.3)
if question and len(question.split()) >= 3:
score += 0.3
# Areas quality (0.7)
if areas:
# Valid areas ratio (0.35) - now based on proportion that are valid vs total
num_areas = len(areas)
if num_areas > 0: # Avoid division by zero
valid_areas = sum(1 for a in areas if self._is_valid_focus(a))
score += 0.35 * (valid_areas / num_areas)
# Priority distribution (0.35) - now based on having different priorities
if num_areas > 0: # Avoid division by zero
unique_priorities = len(set(a.priority for a in areas))
score += 0.35 * (unique_priorities / num_areas)
return round(score, 2)
def format_analysis_result(self, result: AnalysisResult) -> str:
"""Format analysis result for display without reasoning."""
formatted = [
"Strategic Analysis Result",
"=" * 80,
f"\nOriginal Question Analysis:\n{result.original_question}\n",
f"Analysis Confidence Score: {result.confidence_score}",
"\nResearch Focus Areas:"
]
for i, focus in enumerate(result.focus_areas, 1):
formatted.extend([
f"\n{i}. {focus.area}",
f" Priority: {focus.priority}"
])
return "\n".join(formatted)

149
web_scraper.py Normal file
View file

@ -0,0 +1,149 @@
import requests
from bs4 import BeautifulSoup
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse, urljoin
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import re
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class WebScraper:
def __init__(self, user_agent="WebLLMAssistant/1.0 (+https://github.com/YourUsername/Web-LLM-Assistant-Llama-cpp)",
rate_limit=1, timeout=10, max_retries=3):
self.session = requests.Session()
self.session.headers.update({"User-Agent": user_agent})
self.robot_parser = RobotFileParser()
self.rate_limit = rate_limit
self.timeout = timeout
self.max_retries = max_retries
self.last_request_time = {}
def can_fetch(self, url):
parsed_url = urlparse(url)
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
self.robot_parser.set_url(robots_url)
try:
self.robot_parser.read()
return self.robot_parser.can_fetch(self.session.headers["User-Agent"], url)
except Exception as e:
logger.warning(f"Error reading robots.txt for {url}: {e}")
return True # Assume allowed if robots.txt can't be read
def respect_rate_limit(self, url):
domain = urlparse(url).netloc
current_time = time.time()
if domain in self.last_request_time:
time_since_last_request = current_time - self.last_request_time[domain]
if time_since_last_request < self.rate_limit:
time.sleep(self.rate_limit - time_since_last_request)
self.last_request_time[domain] = time.time()
def scrape_page(self, url):
if not self.can_fetch(url):
logger.info(f"Robots.txt disallows scraping: {url}")
return None
for attempt in range(self.max_retries):
try:
self.respect_rate_limit(url)
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
return self.extract_content(response.text, url)
except requests.RequestException as e:
logger.warning(f"Error scraping {url} (attempt {attempt + 1}/{self.max_retries}): {e}")
if attempt == self.max_retries - 1:
logger.error(f"Failed to scrape {url} after {self.max_retries} attempts")
return None
time.sleep(2 ** attempt) # Exponential backoff
def extract_content(self, html, url):
soup = BeautifulSoup(html, 'html.parser')
# Remove unwanted elements
for element in soup(["script", "style", "nav", "footer", "header"]):
element.decompose()
# Extract title
title = soup.title.string if soup.title else ""
# Try to find main content
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content')
if main_content:
paragraphs = main_content.find_all('p')
else:
paragraphs = soup.find_all('p')
# Extract text from paragraphs
text = ' '.join([p.get_text().strip() for p in paragraphs])
# If no paragraphs found, get all text
if not text:
text = soup.get_text()
# Clean up whitespace
text = re.sub(r'\s+', ' ', text).strip()
# Extract and resolve links
links = [urljoin(url, a['href']) for a in soup.find_all('a', href=True)]
return {
"url": url,
"title": title,
"content": text[:2400], # Limit to first 2400 characters
"links": links[:10] # Limit to first 10 links
}
def scrape_multiple_pages(urls, max_workers=5):
scraper = WebScraper()
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {executor.submit(scraper.scrape_page, url): url for url in urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
if data:
results[url] = data
logger.info(f"Successfully scraped: {url}")
else:
logger.warning(f"Failed to scrape: {url}")
except Exception as exc:
logger.error(f"{url} generated an exception: {exc}")
return results
# Function to integrate with your main system
def get_web_content(urls):
scraped_data = scrape_multiple_pages(urls)
return {url: data['content'] for url, data in scraped_data.items() if data}
# Standalone can_fetch function
def can_fetch(url):
parsed_url = urlparse(url)
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
rp = RobotFileParser()
rp.set_url(robots_url)
try:
rp.read()
return rp.can_fetch("*", url)
except Exception as e:
logger.warning(f"Error reading robots.txt for {url}: {e}")
return True # Assume allowed if robots.txt can't be read
if __name__ == "__main__":
test_urls = [
"https://en.wikipedia.org/wiki/Web_scraping",
"https://example.com",
"https://www.python.org"
]
scraped_content = get_web_content(test_urls)
for url, content in scraped_content.items():
print(f"Content from {url}:")
print(content[:500]) # Print first 500 characters
print("\n---\n")