From 84b587a03b15c185e4880eec8cbcab64f3f02a6e Mon Sep 17 00:00:00 2001 From: James Date: Wed, 20 Nov 2024 17:56:34 +1000 Subject: [PATCH] Add files via upload --- LICENSE | 2 +- README.md | 132 +++ Self_Improving_Search.py | 434 ++++++++++ Web-LLM.py | 305 +++++++ llm_config.py | 40 + llm_response_parser.py | 240 ++++++ llm_wrapper.py | 80 ++ requirements.txt | 11 + research_manager.py | 1481 ++++++++++++++++++++++++++++++++++ strategic_analysis_parser.py | 219 +++++ web_scraper.py | 149 ++++ 11 files changed, 3092 insertions(+), 1 deletion(-) create mode 100644 README.md create mode 100644 Self_Improving_Search.py create mode 100644 Web-LLM.py create mode 100644 llm_config.py create mode 100644 llm_response_parser.py create mode 100644 llm_wrapper.py create mode 100644 requirements.txt create mode 100644 research_manager.py create mode 100644 strategic_analysis_parser.py create mode 100644 web_scraper.py diff --git a/LICENSE b/LICENSE index c5076ca..c46e6f6 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2024 James +Copyright (c) 2024 James Warburton Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md new file mode 100644 index 0000000..113c7b8 --- /dev/null +++ b/README.md @@ -0,0 +1,132 @@ +# Automated-AI-Web-Researcher-Ollama + +## Description +Automated-AI-Web-Researcher is an innovative research assistant that leverages locally-run large language models through Ollama to conduct thorough, automated online research on any given topic or question. Unlike traditional LLM interactions, this tool actually performs structured research by breaking down queries into focused research areas, systematically investigating via web searching and then scraping of relevant websites each area, and compiling it's findings all saved automatically into a text document with all content found and links for the source of each, and whenever you want it to stop it's research you can input a command which then results in the research terminating and the LLM reviewing all the content it found and providing a comprehensive final summary to your original topic or question, and then you can also ask the LLM questions about it's research findings if you would like. + +Here's how it works: + +1. You provide a research query (e.g., "What year will global population begin to decrease rather than increase according to research?") +2. The LLM analyzes your query and generates 5 specific research focus areas, each with assigned priorities based on relevance to the topic or question. +3. Starting with the highest priority area, the LLM: + - Formulates targeted search queries + - Performs web searches + - Analyzes search results selecting the most relevant web pages + - Scrapes and extracts relevant information for selected web pages + - Documents all content it has found during the research session into a research file including links to websites that the content was retrieved from +4. After investigating all focus areas, the LLM based on information is found generates new focus areas, and repeating it's research cycle, often finding new relevant focus areas based on findings in research it has previously found leading to interesting and novel research focuses in some cases. +5. You can let it research as long as you would like at any time being able to input a quit command which then stops the research and causes the LLM to review all the content collected so far in full and generate a comprehensive summary to respond to your original query or topic. +6. Then the LLM will enter a conversation mode where you can ask specific questions about the research findings if desired. + +The key distinction is that this isn't just a chatbot - it's an automated research assistant that methodically investigates topics and maintains a documented research trail all from a single question or topic of your choosing, and depending on your system and model can do over a hundred searches and content retrievals in a relatively short amount of time, you can leave it running and come back to a full text document with over a hundred pieces of content from relevant websites, and then have it summarise the findings and then even ask it questions about what it found. + +## Features +- Automated research planning with prioritized focus areas +- Systematic web searching and content analysis +- All research content and source URLs saved into a detailed text document +- Research summary generation +- Post-research Q&A capability about findings +- Self-improving search mechanism +- Rich console output with status indicators +- Comprehensive answer synthesis using web-sourced information +- Research conversation mode for exploring findings + +## Installation + +1. Clone the repository: + +git clone https://github.com/YourUsername/Automated-AI-Web-Researcher-Ollama +cd Automated-AI-Web-Researcher-Ollama + + +2. Create and activate a virtual environment: + +python -m venv venv +source venv/bin/activate # On Windows, use venv\Scripts\activate + + +3. Install dependencies: + +pip install -r requirements.txt + + +4. Install and Configure Ollama: +- Install Ollama following instructions at https://ollama.ai +- Using your selected model file, create a custom model variant with the required context length + (phi3:3.8b-mini-128k-instruct or phi3:14b-medium-128k-instruct are recommended) + +Create a file named `modelfile` with these exact contents: + +FROM your-model-name +PARAMETER num_ctx 38000 + +Replace "your-model-name" with your chosen model (e.g., phi3:3.8b-mini-128k-instruct). + +Then create the model: + +ollama create research-phi3 -f modelfile + + +Note: This specific configuration is necessary as recent Ollama versions have reduced context windows on models like phi3:3.8b-mini-128k-instruct despite the name suggesing high context which is why the modelfile step is necessary due to the high amount of information being used during the research process. + +## Usage + +1. Start Ollama: + +ollama serve + + +2. Run the researcher: + +python Web-LLM.py + + +3. Start a research session: +- Type @ followed by your research query +- Press CTRL+D to submit +- Example: "@What year is global population projected to start declining?" + +4. During research you can use the following commands by typing the letter associated with each and submitting with CTRL+D: +- Use 's' to show status. +- Use 'f' to show current focus. +- Use 'p' to pause and assess research progress, which will give you an assessment from the LLM after reviewing the entire research content whether it can answer your query or not with the content it has so far collected, then it waits for you to input one of two commands, 'c' to continue with the research or 'q' to terminate it which will result in a summary like if you terminated it without using the pause feature. +- Use 'q' to quit research. + +5. After research completes: +- Wait for the summary to be generated, and review the LLM's findings. +- Enter conversation mode to ask specific questions about the findings. +- Access the detailed research content found, avaliable in the in a research session text file which will appear in the programs directory, which includes: + * All retrieved content + * Source URLs for all information + * Focus areas investigated + * Generated summary + +## Configuration + +The LLM settings can be modified in `llm_config.py`. You must specify your model name in the configuration for the researcher to function. The default configuration is optimized for research tasks with the specified Phi-3 model. + +## Current Status +This is a prototype that demonstrates functional automated research capabilities. While still in development, it successfully performs structured research tasks. Currently tested and working well with the phi3:3.8b-mini-128k-instruct model when the context is set as advised previously. + +## Dependencies +- Ollama +- Python packages listed in requirements.txt +- Recommended model: phi3:3.8b-mini-128k-instruct or phi3:14b-medium-128k-instruct (with custom context length as specified) + +## Contributing +Contributions are welcome! This is a prototype with room for improvements and new features. + +## License +This project is licensed under the MIT License - see the [LICENSE] file for details. + +## Acknowledgments +- Ollama team for their local LLM runtime +- DuckDuckGo for their search API + +## Personal Note +This tool represents an attempt to bridge the gap between simple LLM interactions and genuine research capabilities. By structuring the research process and maintaining documentation, it aims to provide more thorough and verifiable results than traditional LLM conversations. It also represents an attempt to improve on my previous project 'Web-LLM-Assistant-Llamacpp-Ollama' which simply gave LLM's the ability to search and scrape websites to answer questions. This new program, unlike it's predecessor I feel thos program takes that capability and uses it in a novel and actually very useful way, I feel that it is the most advanced and useful way I could conceive of building on my previous program, as a very new proggrammer this being my second ever program I feel very good about the result, I hope that it hits the mark! +Given how much I have now been using it myself, unlike the previous program which felt more like a novelty then an actual tool, this is actually quite useful and unique, but I am quite biased! + +Please enjoy! and feel free to submit any suggestions for improvements, so that we can make this automated AI researcher even more capable. + +## Disclaimer +This project is for educational purposes only. Ensure you comply with the terms of service of all APIs and services used. diff --git a/Self_Improving_Search.py b/Self_Improving_Search.py new file mode 100644 index 0000000..2f3bd76 --- /dev/null +++ b/Self_Improving_Search.py @@ -0,0 +1,434 @@ +import time +import re +import os +from typing import List, Dict, Tuple, Union +from colorama import Fore, Style +import logging +import sys +from io import StringIO +from web_scraper import get_web_content, can_fetch +from llm_config import get_llm_config +from llm_response_parser import UltimateLLMResponseParser +from llm_wrapper import LLMWrapper +from urllib.parse import urlparse + +# Set up logging +log_directory = 'logs' +if not os.path.exists(log_directory): + os.makedirs(log_directory) + +# Configure logger +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +log_file = os.path.join(log_directory, 'llama_output.log') +file_handler = logging.FileHandler(log_file) +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +file_handler.setFormatter(formatter) +logger.handlers = [] +logger.addHandler(file_handler) +logger.propagate = False + +# Suppress other loggers +for name in ['root', 'duckduckgo_search', 'requests', 'urllib3']: + logging.getLogger(name).setLevel(logging.WARNING) + logging.getLogger(name).handlers = [] + logging.getLogger(name).propagate = False + +class OutputRedirector: + def __init__(self, stream=None): + self.stream = stream or StringIO() + self.original_stdout = sys.stdout + self.original_stderr = sys.stderr + + def __enter__(self): + sys.stdout = self.stream + sys.stderr = self.stream + return self.stream + + def __exit__(self, exc_type, exc_val, exc_tb): + sys.stdout = self.original_stdout + sys.stderr = self.original_stderr + +class EnhancedSelfImprovingSearch: + def __init__(self, llm: LLMWrapper, parser: UltimateLLMResponseParser, max_attempts: int = 5): + self.llm = llm + self.parser = parser + self.max_attempts = max_attempts + self.llm_config = get_llm_config() + + @staticmethod + def initialize_llm(): + llm_wrapper = LLMWrapper() + return llm_wrapper + + def print_thinking(self): + print(Fore.MAGENTA + "šŸ§  Thinking..." + Style.RESET_ALL) + + def print_searching(self): + print(Fore.MAGENTA + "šŸ“ Searching..." + Style.RESET_ALL) + + def search_and_improve(self, user_query: str) -> str: + attempt = 0 + while attempt < self.max_attempts: + print(f"\n{Fore.CYAN}Search attempt {attempt + 1}:{Style.RESET_ALL}") + self.print_searching() + + try: + formulated_query, time_range = self.formulate_query(user_query, attempt) + + print(f"{Fore.YELLOW}Original query: {user_query}{Style.RESET_ALL}") + print(f"{Fore.YELLOW}Formulated query: {formulated_query}{Style.RESET_ALL}") + print(f"{Fore.YELLOW}Time range: {time_range}{Style.RESET_ALL}") + + if not formulated_query: + print(f"{Fore.RED}Error: Empty search query. Retrying...{Style.RESET_ALL}") + attempt += 1 + continue + + search_results = self.perform_search(formulated_query, time_range) + + if not search_results: + print(f"{Fore.RED}No results found. Retrying with a different query...{Style.RESET_ALL}") + attempt += 1 + continue + + self.display_search_results(search_results) + + selected_urls = self.select_relevant_pages(search_results, user_query) + + if not selected_urls: + print(f"{Fore.RED}No relevant URLs found. Retrying...{Style.RESET_ALL}") + attempt += 1 + continue + + print(Fore.MAGENTA + "āš™ļø Scraping selected pages..." + Style.RESET_ALL) + # Scraping is done without OutputRedirector to ensure messages are visible + scraped_content = self.scrape_content(selected_urls) + + if not scraped_content: + print(f"{Fore.RED}Failed to scrape content. Retrying...{Style.RESET_ALL}") + attempt += 1 + continue + + self.display_scraped_content(scraped_content) + + self.print_thinking() + + with OutputRedirector() as output: + evaluation, decision = self.evaluate_scraped_content(user_query, scraped_content) + llm_output = output.getvalue() + logger.info(f"LLM Output in evaluate_scraped_content:\n{llm_output}") + + print(f"{Fore.MAGENTA}Evaluation: {evaluation}{Style.RESET_ALL}") + print(f"{Fore.MAGENTA}Decision: {decision}{Style.RESET_ALL}") + + if decision == "answer": + return self.generate_final_answer(user_query, scraped_content) + elif decision == "refine": + print(f"{Fore.YELLOW}Refining search...{Style.RESET_ALL}") + attempt += 1 + else: + print(f"{Fore.RED}Unexpected decision. Proceeding to answer.{Style.RESET_ALL}") + return self.generate_final_answer(user_query, scraped_content) + + except Exception as e: + print(f"{Fore.RED}An error occurred during search attempt. Check the log file for details.{Style.RESET_ALL}") + logger.error(f"An error occurred during search: {str(e)}", exc_info=True) + attempt += 1 + + return self.synthesize_final_answer(user_query) + + def evaluate_scraped_content(self, user_query: str, scraped_content: Dict[str, str]) -> Tuple[str, str]: + user_query_short = user_query[:200] + prompt = f""" +Evaluate if the following scraped content contains sufficient information to answer the user's question comprehensively: + +User's question: "{user_query_short}" + +Scraped Content: +{self.format_scraped_content(scraped_content)} + +Your task: +1. Determine if the scraped content provides enough relevant and detailed information to answer the user's question thoroughly. +2. If the information is sufficient, decide to 'answer'. If more information or clarification is needed, decide to 'refine' the search. + +Respond using EXACTLY this format: +Evaluation: [Your evaluation of the scraped content] +Decision: [ONLY 'answer' if content is sufficient, or 'refine' if more information is needed] +""" + max_retries = 3 + for attempt in range(max_retries): + try: + response_text = self.llm.generate(prompt, max_tokens=200, stop=None) + evaluation, decision = self.parse_evaluation_response(response_text) + if decision in ['answer', 'refine']: + return evaluation, decision + except Exception as e: + logger.warning(f"Error in evaluate_scraped_content (attempt {attempt + 1}): {str(e)}") + + logger.warning("Failed to get a valid decision in evaluate_scraped_content. Defaulting to 'refine'.") + return "Failed to evaluate content.", "refine" + + def parse_evaluation_response(self, response: str) -> Tuple[str, str]: + evaluation = "" + decision = "" + for line in response.strip().split('\n'): + if line.startswith('Evaluation:'): + evaluation = line.split(':', 1)[1].strip() + elif line.startswith('Decision:'): + decision = line.split(':', 1)[1].strip().lower() + return evaluation, decision + + def formulate_query(self, user_query: str, attempt: int) -> Tuple[str, str]: + user_query_short = user_query[:200] + prompt = f""" +Based on the following user question, formulate a concise and effective search query: +"{user_query_short}" +Your task: +1. Create a search query of 2-5 words that will yield relevant results. +2. Determine if a specific time range is needed for the search. +Time range options: +- 'd': Limit results to the past day. Use for very recent events or rapidly changing information. +- 'w': Limit results to the past week. Use for recent events or topics with frequent updates. +- 'm': Limit results to the past month. Use for relatively recent information or ongoing events. +- 'y': Limit results to the past year. Use for annual events or information that changes yearly. +- 'none': No time limit. Use for historical information or topics not tied to a specific time frame. +Respond in the following format: +Search query: [Your 2-5 word query] +Time range: [d/w/m/y/none] +Do not provide any additional information or explanation. +""" + max_retries = 3 + for retry in range(max_retries): + with OutputRedirector() as output: + response_text = self.llm.generate(prompt, max_tokens=50, stop=None) + llm_output = output.getvalue() + logger.info(f"LLM Output in formulate_query:\n{llm_output}") + query, time_range = self.parse_query_response(response_text) + if query and time_range: + return query, time_range + return self.fallback_query(user_query), "none" + + def parse_query_response(self, response: str) -> Tuple[str, str]: + query = "" + time_range = "none" + for line in response.strip().split('\n'): + if ":" in line: + key, value = line.split(":", 1) + key = key.strip().lower() + value = value.strip() + if "query" in key: + query = self.clean_query(value) + elif "time" in key or "range" in key: + time_range = self.validate_time_range(value) + return query, time_range + + def clean_query(self, query: str) -> str: + query = re.sub(r'["\'\[\]]', '', query) + query = re.sub(r'\s+', ' ', query) + return query.strip()[:100] + + def validate_time_range(self, time_range: str) -> str: + valid_ranges = ['d', 'w', 'm', 'y', 'none'] + time_range = time_range.lower() + return time_range if time_range in valid_ranges else 'none' + + def fallback_query(self, user_query: str) -> str: + words = user_query.split() + return " ".join(words[:5]) + + def perform_search(self, query: str, time_range: str) -> List[Dict]: + if not query: + return [] + + from duckduckgo_search import DDGS + + with DDGS() as ddgs: + try: + with OutputRedirector() as output: + if time_range and time_range != 'none': + results = list(ddgs.text(query, timelimit=time_range, max_results=10)) + else: + results = list(ddgs.text(query, max_results=10)) + ddg_output = output.getvalue() + logger.info(f"DDG Output in perform_search:\n{ddg_output}") + return [{'number': i+1, **result} for i, result in enumerate(results)] + except Exception as e: + print(f"{Fore.RED}Search error: {str(e)}{Style.RESET_ALL}") + return [] + + def display_search_results(self, results: List[Dict]) -> None: + """Display search results with minimal output""" + try: + if not results: + return + + # Only show search success status + print(f"\nSearch query sent to DuckDuckGo: {self.last_query}") + print(f"Time range sent to DuckDuckGo: {self.last_time_range}") + print(f"Number of results: {len(results)}") + + except Exception as e: + logger.error(f"Error displaying search results: {str(e)}") + + def select_relevant_pages(self, search_results: List[Dict], user_query: str) -> List[str]: + prompt = f""" +Given the following search results for the user's question: "{user_query}" +Select the 2 most relevant results to scrape and analyze. Explain your reasoning for each selection. + +Search Results: +{self.format_results(search_results)} + +Instructions: +1. You MUST select exactly 2 result numbers from the search results. +2. Choose the results that are most likely to contain comprehensive and relevant information to answer the user's question. +3. Provide a brief reason for each selection. + +You MUST respond using EXACTLY this format and nothing else: + +Selected Results: [Two numbers corresponding to the selected results] +Reasoning: [Your reasoning for the selections] +""" + + max_retries = 3 + for retry in range(max_retries): + with OutputRedirector() as output: + response_text = self.llm.generate(prompt, max_tokens=200, stop=None) + llm_output = output.getvalue() + logger.info(f"LLM Output in select_relevant_pages:\n{llm_output}") + + parsed_response = self.parse_page_selection_response(response_text) + if parsed_response and self.validate_page_selection_response(parsed_response, len(search_results)): + selected_urls = [result['href'] for result in search_results if result['number'] in parsed_response['selected_results']] + + allowed_urls = [url for url in selected_urls if can_fetch(url)] + if allowed_urls: + return allowed_urls + else: + print(f"{Fore.YELLOW}Warning: All selected URLs are disallowed by robots.txt. Retrying selection.{Style.RESET_ALL}") + else: + print(f"{Fore.YELLOW}Warning: Invalid page selection. Retrying.{Style.RESET_ALL}") + + print(f"{Fore.YELLOW}Warning: All attempts to select relevant pages failed. Falling back to top allowed results.{Style.RESET_ALL}") + allowed_urls = [result['href'] for result in search_results if can_fetch(result['href'])][:2] + return allowed_urls + + def parse_page_selection_response(self, response: str) -> Dict[str, Union[List[int], str]]: + lines = response.strip().split('\n') + parsed = {} + for line in lines: + if line.startswith('Selected Results:'): + parsed['selected_results'] = [int(num.strip()) for num in re.findall(r'\d+', line)] + elif line.startswith('Reasoning:'): + parsed['reasoning'] = line.split(':', 1)[1].strip() + return parsed if 'selected_results' in parsed and 'reasoning' in parsed else None + + def validate_page_selection_response(self, parsed_response: Dict[str, Union[List[int], str]], num_results: int) -> bool: + if len(parsed_response['selected_results']) != 2: + return False + if any(num < 1 or num > num_results for num in parsed_response['selected_results']): + return False + return True + + def format_results(self, results: List[Dict]) -> str: + formatted_results = [] + for result in results: + formatted_result = f"{result['number']}. Title: {result.get('title', 'N/A')}\n" + formatted_result += f" Snippet: {result.get('body', 'N/A')[:200]}...\n" + formatted_result += f" URL: {result.get('href', 'N/A')}\n" + formatted_results.append(formatted_result) + return "\n".join(formatted_results) + + def scrape_content(self, urls: List[str]) -> Dict[str, str]: + scraped_content = {} + blocked_urls = [] + for url in urls: + robots_allowed = can_fetch(url) + if robots_allowed: + content = get_web_content([url]) + if content: + scraped_content.update(content) + print(Fore.YELLOW + f"Successfully scraped: {url}" + Style.RESET_ALL) + logger.info(f"Successfully scraped: {url}") + else: + print(Fore.RED + f"Robots.txt disallows scraping of {url}" + Style.RESET_ALL) + logger.warning(f"Robots.txt disallows scraping of {url}") + else: + blocked_urls.append(url) + print(Fore.RED + f"Warning: Robots.txt disallows scraping of {url}" + Style.RESET_ALL) + logger.warning(f"Robots.txt disallows scraping of {url}") + + print(Fore.CYAN + f"Scraped content received for {len(scraped_content)} URLs" + Style.RESET_ALL) + logger.info(f"Scraped content received for {len(scraped_content)} URLs") + + if blocked_urls: + print(Fore.RED + f"Warning: {len(blocked_urls)} URL(s) were not scraped due to robots.txt restrictions." + Style.RESET_ALL) + logger.warning(f"{len(blocked_urls)} URL(s) were not scraped due to robots.txt restrictions: {', '.join(blocked_urls)}") + + return scraped_content + + def display_scraped_content(self, scraped_content: Dict[str, str]): + print(f"\n{Fore.CYAN}Scraped Content:{Style.RESET_ALL}") + for url, content in scraped_content.items(): + print(f"{Fore.GREEN}URL: {url}{Style.RESET_ALL}") + print(f"Content: {content[:4000]}...\n") + + def generate_final_answer(self, user_query: str, scraped_content: Dict[str, str]) -> str: + user_query_short = user_query[:200] + prompt = f""" +You are an AI assistant. Provide a comprehensive and detailed answer to the following question using ONLY the information provided in the scraped content. Do not include any references or mention any sources. Answer directly and thoroughly. + +Question: "{user_query_short}" + +Scraped Content: +{self.format_scraped_content(scraped_content)} + +Important Instructions: +1. Do not use phrases like "Based on the absence of selected results" or similar. +2. If the scraped content does not contain enough information to answer the question, say so explicitly and explain what information is missing. +3. Provide as much relevant detail as possible from the scraped content. + +Answer: +""" + max_retries = 3 + for attempt in range(max_retries): + with OutputRedirector() as output: + response_text = self.llm.generate(prompt, max_tokens=1024, stop=None) + llm_output = output.getvalue() + logger.info(f"LLM Output in generate_final_answer:\n{llm_output}") + if response_text: + logger.info(f"LLM Response:\n{response_text}") + return response_text + + error_message = "I apologize, but I couldn't generate a satisfactory answer based on the available information." + logger.warning(f"Failed to generate a response after {max_retries} attempts. Returning error message.") + return error_message + + def format_scraped_content(self, scraped_content: Dict[str, str]) -> str: + formatted_content = [] + for url, content in scraped_content.items(): + content = re.sub(r'\s+', ' ', content) + formatted_content.append(f"Content from {url}:\n{content}\n") + return "\n".join(formatted_content) + + def synthesize_final_answer(self, user_query: str) -> str: + prompt = f""" +After multiple search attempts, we couldn't find a fully satisfactory answer to the user's question: "{user_query}" + +Please provide the best possible answer you can, acknowledging any limitations or uncertainties. +If appropriate, suggest ways the user might refine their question or where they might find more information. + +Respond in a clear, concise, and informative manner. +""" + try: + with OutputRedirector() as output: + response_text = self.llm.generate(prompt, max_tokens=self.llm_config.get('max_tokens', 1024), stop=self.llm_config.get('stop', None)) + llm_output = output.getvalue() + logger.info(f"LLM Output in synthesize_final_answer:\n{llm_output}") + if response_text: + return response_text.strip() + except Exception as e: + logger.error(f"Error in synthesize_final_answer: {str(e)}", exc_info=True) + return "I apologize, but after multiple attempts, I wasn't able to find a satisfactory answer to your question. Please try rephrasing your question or breaking it down into smaller, more specific queries." + +# End of EnhancedSelfImprovingSearch class diff --git a/Web-LLM.py b/Web-LLM.py new file mode 100644 index 0000000..dd3a445 --- /dev/null +++ b/Web-LLM.py @@ -0,0 +1,305 @@ +import sys +import os +from colorama import init, Fore, Style +import logging +import time +from io import StringIO +from Self_Improving_Search import EnhancedSelfImprovingSearch +from llm_config import get_llm_config +from llm_response_parser import UltimateLLMResponseParser +from llm_wrapper import LLMWrapper +from strategic_analysis_parser import StrategicAnalysisParser +from research_manager import ResearchManager + +# Initialize colorama +if os.name == 'nt': # Windows-specific initialization + init(convert=True, strip=False, wrap=True) +else: + init() + +# Set up logging +log_directory = 'logs' +if not os.path.exists(log_directory): + os.makedirs(log_directory) + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +log_file = os.path.join(log_directory, 'web_llm.log') +file_handler = logging.FileHandler(log_file) +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +file_handler.setFormatter(formatter) +logger.handlers = [] +logger.addHandler(file_handler) +logger.propagate = False + +# Disable other loggers +for name in logging.root.manager.loggerDict: + if name != __name__: + logging.getLogger(name).disabled = True + +class OutputRedirector: + def __init__(self, stream=None): + self.stream = stream or StringIO() + self.original_stdout = sys.stdout + self.original_stderr = sys.stderr + + def __enter__(self): + sys.stdout = self.stream + sys.stderr = self.stream + return self.stream + + def __exit__(self, exc_type, exc_val, exc_tb): + sys.stdout = self.original_stdout + sys.stderr = self.original_stderr + +def print_header(): + print(Fore.CYAN + Style.BRIGHT + """ + ā•”ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•— + ā•‘ šŸŒ Advanced Research Assistant šŸ¤– ā•‘ + ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā• + """ + Style.RESET_ALL) + print(Fore.YELLOW + """ + Welcome to the Advanced Research Assistant! + + Commands: + - For web search: start message with '/' + Example: "/latest news on AI advancements" + + - For research mode: start message with '@' + Example: "@analyze the impact of AI on healthcare" + + Press CTRL+D (Linux/Mac) or CTRL+Z (Windows) to submit input. + """ + Style.RESET_ALL) + +def get_multiline_input() -> str: + """Get multiline input using raw terminal mode for reliable CTRL+D handling""" + print(f"{Fore.GREEN}šŸ“ Enter your message (Press CTRL+D to submit):{Style.RESET_ALL}") + lines = [] + + import termios + import tty + import sys + + # Save original terminal settings + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + + try: + # Set terminal to raw mode + tty.setraw(fd) + + current_line = [] + while True: + # Read one character at a time + char = sys.stdin.read(1) + + # CTRL+D detection + if not char or ord(char) == 4: # EOF or CTRL+D + sys.stdout.write('\n') # New line for clean display + if current_line: + lines.append(''.join(current_line)) + return ' '.join(lines).strip() + + # Handle special characters + elif ord(char) == 13: # Enter + sys.stdout.write('\n') + lines.append(''.join(current_line)) + current_line = [] + + elif ord(char) == 127: # Backspace + if current_line: + current_line.pop() + sys.stdout.write('\b \b') # Erase character + + elif ord(char) == 3: # CTRL+C + sys.stdout.write('\n') + return 'q' + + # Normal character + elif 32 <= ord(char) <= 126: # Printable characters + current_line.append(char) + sys.stdout.write(char) + + # Flush output + sys.stdout.flush() + + finally: + # Restore terminal settings + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + print() # New line for clean display + +def initialize_system(): + """Initialize system with proper error checking""" + try: + print(Fore.YELLOW + "Initializing system..." + Style.RESET_ALL) + + llm_config = get_llm_config() + if llm_config['llm_type'] == 'ollama': + import requests + try: + response = requests.get(llm_config['base_url'], timeout=5) + if response.status_code != 200: + raise ConnectionError("Cannot connect to Ollama server") + except requests.exceptions.RequestException: + raise ConnectionError( + "\nCannot connect to Ollama server!" + "\nPlease ensure:" + "\n1. Ollama is installed" + "\n2. Ollama server is running (try 'ollama serve')" + "\n3. The model specified in llm_config.py is pulled" + ) + elif llm_config['llm_type'] == 'llama_cpp': + model_path = llm_config.get('model_path') + if not model_path or not os.path.exists(model_path): + raise FileNotFoundError( + f"\nLLama.cpp model not found at: {model_path}" + "\nPlease ensure model path in llm_config.py is correct" + ) + + with OutputRedirector() as output: + llm_wrapper = LLMWrapper() + try: + test_response = llm_wrapper.generate("Test", max_tokens=10) + if not test_response: + raise ConnectionError("LLM failed to generate response") + except Exception as e: + raise ConnectionError(f"LLM test failed: {str(e)}") + + parser = UltimateLLMResponseParser() + search_engine = EnhancedSelfImprovingSearch(llm_wrapper, parser) + research_manager = ResearchManager(llm_wrapper, parser, search_engine) + + print(Fore.GREEN + "System initialized successfully." + Style.RESET_ALL) + return llm_wrapper, parser, search_engine, research_manager + except Exception as e: + logger.error(f"Error initializing system: {str(e)}", exc_info=True) + print(Fore.RED + f"System initialization failed: {str(e)}" + Style.RESET_ALL) + return None, None, None, None + +def handle_research_mode(research_manager, query): + """Handles research mode operations""" + print(f"{Fore.CYAN}Initiating research mode...{Style.RESET_ALL}") + + try: + # Start the research + research_manager.start_research(query) + + submit_key = "CTRL+Z" if os.name == 'nt' else "CTRL+D" + print(f"\n{Fore.YELLOW}Research Running. Available Commands:{Style.RESET_ALL}") + print(f"Type command and press {submit_key}:") + print("'s' = Show status") + print("'f' = Show focus") + print("'q' = Quit research") + + while research_manager.is_active(): + try: + command = get_multiline_input().strip().lower() + if command == 's': + print("\n" + research_manager.get_progress()) + elif command == 'f': + if research_manager.current_focus: + print(f"\n{Fore.CYAN}Current Focus:{Style.RESET_ALL}") + print(f"Area: {research_manager.current_focus.area}") + print(f"Priority: {research_manager.current_focus.priority}") + print(f"Reasoning: {research_manager.current_focus.reasoning}") + else: + print(f"\n{Fore.YELLOW}No current focus area{Style.RESET_ALL}") + elif command == 'q': + break + except KeyboardInterrupt: + break + + # Get final summary first + summary = research_manager.terminate_research() + + # Ensure research UI is fully cleaned up + research_manager._cleanup_research_ui() + + # Now in main terminal, show summary + print(f"\n{Fore.GREEN}Research Summary:{Style.RESET_ALL}") + print(summary) + + # Only NOW start conversation mode if we have a valid summary + if research_manager.research_complete and research_manager.research_summary: + time.sleep(0.5) # Small delay to ensure clean transition + research_manager.start_conversation_mode() + + return + + except KeyboardInterrupt: + print(f"\n{Fore.YELLOW}Research interrupted.{Style.RESET_ALL}") + research_manager.terminate_research() + except Exception as e: + print(f"\n{Fore.RED}Research error: {str(e)}{Style.RESET_ALL}") + research_manager.terminate_research() + +def main(): + print_header() + try: + llm, parser, search_engine, research_manager = initialize_system() + if not all([llm, parser, search_engine, research_manager]): + return + + while True: + try: + # Get input with improved CTRL+D handling + user_input = get_multiline_input() + + # Handle immediate CTRL+D (empty input) + if user_input == "": + user_input = "@quit" # Convert empty CTRL+D to quit command + + user_input = user_input.strip() + + # Check for special quit markers + if user_input in ["@quit", "quit", "q"]: + print(Fore.YELLOW + "\nGoodbye!" + Style.RESET_ALL) + break + + if not user_input: + continue + + if user_input.lower() == 'help': + print_header() + continue + + if user_input.startswith('/'): + search_query = user_input[1:].strip() + handle_search_mode(search_engine, search_query) + + elif user_input.startswith('@'): + research_query = user_input[1:].strip() + handle_research_mode(research_manager, research_query) + + else: + print(f"{Fore.RED}Please start with '/' for search or '@' for research.{Style.RESET_ALL}") + + except KeyboardInterrupt: + print(f"\n{Fore.YELLOW}Exiting program...{Style.RESET_ALL}") + break + + except Exception as e: + logger.error(f"Error in main loop: {str(e)}") + print(f"{Fore.RED}An error occurred: {str(e)}{Style.RESET_ALL}") + continue + + except KeyboardInterrupt: + print(f"\n{Fore.YELLOW}Program terminated by user.{Style.RESET_ALL}") + + except Exception as e: + logger.critical(f"Critical error: {str(e)}") + print(f"{Fore.RED}Critical error: {str(e)}{Style.RESET_ALL}") + + finally: + # Ensure proper cleanup on exit + try: + if 'research_manager' in locals() and research_manager: + if hasattr(research_manager, 'ui'): + research_manager.ui.cleanup() + curses.endwin() + except: + pass + os._exit(0) + +if __name__ == "__main__": + main() diff --git a/llm_config.py b/llm_config.py new file mode 100644 index 0000000..b5f99c5 --- /dev/null +++ b/llm_config.py @@ -0,0 +1,40 @@ +# llm_config.py + +LLM_TYPE = "ollama" # Options: 'llama_cpp', 'ollama' + +# LLM settings for llama_cpp +MODEL_PATH = "/home/james/llama.cpp/models/gemma-2-9b-it-Q6_K.gguf" # Replace with your llama.cpp models filepath + +LLM_CONFIG_LLAMA_CPP = { + "llm_type": "llama_cpp", + "model_path": MODEL_PATH, + "n_ctx": 20000, # context size + "n_gpu_layers": 0, # number of layers to offload to GPU (-1 for all, 0 for none) + "n_threads": 8, # number of threads to use + "temperature": 0.7, # temperature for sampling + "top_p": 0.9, # top p for sampling + "top_k": 40, # top k for sampling + "repeat_penalty": 1.1, # repeat penalty + "max_tokens": 1024, # max tokens to generate + "stop": ["User:", "\n\n"] # stop sequences +} + +# LLM settings for Ollama +LLM_CONFIG_OLLAMA = { + "llm_type": "ollama", + "base_url": "http://localhost:11434", # default Ollama server URL + "model_name": "custom-phi3-32k-Q4_K_M", # Replace with your Ollama model name + "temperature": 0.7, + "top_p": 0.9, + "n_ctx": 55000, + "context_length": 55000, + "stop": ["User:", "\n\n"] +} + +def get_llm_config(): + if LLM_TYPE == "llama_cpp": + return LLM_CONFIG_LLAMA_CPP + elif LLM_TYPE == "ollama": + return LLM_CONFIG_OLLAMA + else: + raise ValueError(f"Invalid LLM_TYPE: {LLM_TYPE}") diff --git a/llm_response_parser.py b/llm_response_parser.py new file mode 100644 index 0000000..e65266e --- /dev/null +++ b/llm_response_parser.py @@ -0,0 +1,240 @@ +import re +from typing import Dict, List, Union, Optional +import logging +import json +from strategic_analysis_parser import StrategicAnalysisParser, AnalysisResult, ResearchFocus + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class UltimateLLMResponseParser: + def __init__(self): + self.decision_keywords = { + 'refine': ['refine', 'need more info', 'insufficient', 'unclear', 'more research', 'additional search'], + 'answer': ['answer', 'sufficient', 'enough info', 'can respond', 'adequate', 'comprehensive'] + } + self.section_identifiers = [ + ('decision', r'(?i)decision\s*:'), + ('reasoning', r'(?i)reasoning\s*:'), + ('selected_results', r'(?i)selected results\s*:'), + ('response', r'(?i)response\s*:') + ] + # Initialize strategic analysis parser + self.strategic_parser = StrategicAnalysisParser() + + def parse_llm_response(self, response: str, mode: str = 'search') -> Dict[str, Union[str, List[int], AnalysisResult]]: + """ + Parse LLM response based on mode + + Args: + response (str): The LLM's response text + mode (str): 'search' for web search, 'research' for strategic analysis + + Returns: + Dict containing parsed response + """ + logger.info(f"Starting to parse LLM response in {mode} mode") + + if mode == 'research': + return self._parse_research_response(response) + + # Original search mode parsing + result = { + 'decision': None, + 'reasoning': None, + 'selected_results': [], + 'response': None + } + + parsing_strategies = [ + self._parse_structured_response, + self._parse_json_response, + self._parse_unstructured_response, + self._parse_implicit_response + ] + + for strategy in parsing_strategies: + try: + parsed_result = strategy(response) + if self._is_valid_result(parsed_result): + result.update(parsed_result) + logger.info(f"Successfully parsed using strategy: {strategy.__name__}") + break + except Exception as e: + logger.warning(f"Error in parsing strategy {strategy.__name__}: {str(e)}") + + if not self._is_valid_result(result): + logger.warning("All parsing strategies failed. Using fallback parsing.") + result = self._fallback_parsing(response) + + result = self._post_process_result(result) + + logger.info("Finished parsing LLM response") + return result + + def _parse_research_response(self, response: str) -> Dict[str, Union[str, AnalysisResult]]: + """Handle research mode specific parsing""" + try: + analysis_result = self.strategic_parser.parse_analysis(response) + if analysis_result: + return { + 'mode': 'research', + 'analysis_result': analysis_result, + 'error': None + } + else: + logger.error("Failed to parse strategic analysis") + return { + 'mode': 'research', + 'analysis_result': None, + 'error': 'Failed to parse strategic analysis' + } + except Exception as e: + logger.error(f"Error in research response parsing: {str(e)}") + return { + 'mode': 'research', + 'analysis_result': None, + 'error': str(e) + } + + def parse_search_query(self, query_response: str) -> Dict[str, str]: + """Parse search query formulation response""" + try: + lines = query_response.strip().split('\n') + result = { + 'query': '', + 'time_range': 'none' + } + + for line in lines: + if ':' in line: + key, value = line.split(':', 1) + key = key.strip().lower() + value = value.strip() + + if 'query' in key: + result['query'] = self._clean_query(value) + elif 'time' in key or 'range' in key: + result['time_range'] = self._validate_time_range(value) + + return result + except Exception as e: + logger.error(f"Error parsing search query: {str(e)}") + return {'query': '', 'time_range': 'none'} + + def _parse_structured_response(self, response: str) -> Dict[str, Union[str, List[int]]]: + result = {} + for key, pattern in self.section_identifiers: + match = re.search(f'{pattern}(.*?)(?={"|".join([p for k, p in self.section_identifiers if k != key])}|$)', + response, re.IGNORECASE | re.DOTALL) + if match: + result[key] = match.group(1).strip() + + if 'selected_results' in result: + result['selected_results'] = self._extract_numbers(result['selected_results']) + + return result + + def _parse_json_response(self, response: str) -> Dict[str, Union[str, List[int]]]: + try: + json_match = re.search(r'\{.*\}', response, re.DOTALL) + if json_match: + json_str = json_match.group(0) + parsed_json = json.loads(json_str) + return {k: v for k, v in parsed_json.items() + if k in ['decision', 'reasoning', 'selected_results', 'response']} + except json.JSONDecodeError: + pass + return {} + + def _parse_unstructured_response(self, response: str) -> Dict[str, Union[str, List[int]]]: + result = {} + lines = response.split('\n') + current_section = None + + for line in lines: + section_match = re.match(r'(.+?)[:.-](.+)', line) + if section_match: + key = self._match_section_to_key(section_match.group(1)) + if key: + current_section = key + result[key] = section_match.group(2).strip() + elif current_section: + result[current_section] += ' ' + line.strip() + + if 'selected_results' in result: + result['selected_results'] = self._extract_numbers(result['selected_results']) + + return result + + def _parse_implicit_response(self, response: str) -> Dict[str, Union[str, List[int]]]: + result = {} + + decision = self._infer_decision(response) + if decision: + result['decision'] = decision + + numbers = self._extract_numbers(response) + if numbers: + result['selected_results'] = numbers + + if not result: + result['response'] = response.strip() + + return result + + def _fallback_parsing(self, response: str) -> Dict[str, Union[str, List[int]]]: + return { + 'decision': self._infer_decision(response), + 'reasoning': None, + 'selected_results': self._extract_numbers(response), + 'response': response.strip() + } + + def _post_process_result(self, result: Dict[str, Union[str, List[int]]]) -> Dict[str, Union[str, List[int]]]: + if result['decision'] not in ['refine', 'answer']: + result['decision'] = self._infer_decision(str(result)) + + if not isinstance(result['selected_results'], list): + result['selected_results'] = self._extract_numbers(str(result['selected_results'])) + + result['selected_results'] = result['selected_results'][:2] + + if not result['reasoning']: + result['reasoning'] = f"Based on the {'presence' if result['selected_results'] else 'absence'} of selected results and the overall content." + + if not result['response']: + result['response'] = result.get('reasoning', 'No clear response found.') + + return result + + def _match_section_to_key(self, section: str) -> Optional[str]: + for key, pattern in self.section_identifiers: + if re.search(pattern, section, re.IGNORECASE): + return key + return None + + def _extract_numbers(self, text: str) -> List[int]: + return [int(num) for num in re.findall(r'\b(?:10|[1-9])\b', text)] + + def _infer_decision(self, text: str) -> str: + text = text.lower() + refine_score = sum(text.count(keyword) for keyword in self.decision_keywords['refine']) + answer_score = sum(text.count(keyword) for keyword in self.decision_keywords['answer']) + return 'refine' if refine_score > answer_score else 'answer' + + def _is_valid_result(self, result: Dict[str, Union[str, List[int]]]) -> bool: + return bool(result.get('decision') or result.get('response') or result.get('selected_results')) + + def _clean_query(self, query: str) -> str: + """Clean and validate search query""" + query = re.sub(r'["\'\[\]]', '', query) + query = re.sub(r'\s+', ' ', query) + return query.strip()[:100] + + def _validate_time_range(self, time_range: str) -> str: + """Validate time range value""" + valid_ranges = ['d', 'w', 'm', 'y', 'none'] + time_range = time_range.lower() + return time_range if time_range in valid_ranges else 'none' diff --git a/llm_wrapper.py b/llm_wrapper.py new file mode 100644 index 0000000..f8b97c0 --- /dev/null +++ b/llm_wrapper.py @@ -0,0 +1,80 @@ +from llama_cpp import Llama +import requests +import json +from llm_config import get_llm_config + +class LLMWrapper: + def __init__(self): + self.llm_config = get_llm_config() + self.llm_type = self.llm_config.get('llm_type', 'llama_cpp') + if self.llm_type == 'llama_cpp': + self.llm = self._initialize_llama_cpp() + elif self.llm_type == 'ollama': + self.base_url = self.llm_config.get('base_url', 'http://localhost:11434') + self.model_name = self.llm_config.get('model_name', 'your_model_name') + else: + raise ValueError(f"Unsupported LLM type: {self.llm_type}") + + def _initialize_llama_cpp(self): + return Llama( + model_path=self.llm_config.get('model_path'), + n_ctx=self.llm_config.get('n_ctx', 55000), + n_gpu_layers=self.llm_config.get('n_gpu_layers', 0), + n_threads=self.llm_config.get('n_threads', 8), + verbose=False + ) + + def generate(self, prompt, **kwargs): + if self.llm_type == 'llama_cpp': + llama_kwargs = self._prepare_llama_kwargs(kwargs) + response = self.llm(prompt, **llama_kwargs) + return response['choices'][0]['text'].strip() + elif self.llm_type == 'ollama': + return self._ollama_generate(prompt, **kwargs) + else: + raise ValueError(f"Unsupported LLM type: {self.llm_type}") + + def _ollama_generate(self, prompt, **kwargs): + url = f"{self.base_url}/api/generate" + data = { + 'model': self.model_name, + 'prompt': prompt, + 'options': { + 'temperature': kwargs.get('temperature', self.llm_config.get('temperature', 0.7)), + 'top_p': kwargs.get('top_p', self.llm_config.get('top_p', 0.9)), + 'stop': kwargs.get('stop', self.llm_config.get('stop', [])), + 'num_predict': kwargs.get('max_tokens', self.llm_config.get('max_tokens', 55000)), + 'context_length': self.llm_config.get('n_ctx', 55000) + } + } + response = requests.post(url, json=data, stream=True) + if response.status_code != 200: + raise Exception(f"Ollama API request failed with status {response.status_code}: {response.text}") + text = ''.join(json.loads(line)['response'] for line in response.iter_lines() if line) + return text.strip() + + def _cleanup(self): + """Force terminate any running LLM processes""" + if self.llm_type == 'ollama': + try: + # Force terminate Ollama process + requests.post(f"{self.base_url}/api/terminate") + except: + pass + + try: + # Also try to terminate via subprocess if needed + import subprocess + subprocess.run(['pkill', '-f', 'ollama'], capture_output=True) + except: + pass + + def _prepare_llama_kwargs(self, kwargs): + llama_kwargs = { + 'max_tokens': kwargs.get('max_tokens', self.llm_config.get('max_tokens', 55000)), + 'temperature': kwargs.get('temperature', self.llm_config.get('temperature', 0.7)), + 'top_p': kwargs.get('top_p', self.llm_config.get('top_p', 0.9)), + 'stop': kwargs.get('stop', self.llm_config.get('stop', [])), + 'echo': False, + } + return llama_kwargs diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..491c599 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,11 @@ +llama-cpp-python +duckduckgo-search +colorama +requests +beautifulsoup4 +trafilatura +readchar +keyboard +curses-windows; sys_platform == 'win32' +tqdm +urllib3 diff --git a/research_manager.py b/research_manager.py new file mode 100644 index 0000000..e95f873 --- /dev/null +++ b/research_manager.py @@ -0,0 +1,1481 @@ +import os +import sys +import threading +import time +import re +import json +import logging +import curses +import signal +from typing import List, Dict, Set, Optional, Tuple, Union +from dataclasses import dataclass +from queue import Queue +from datetime import datetime +from io import StringIO +from colorama import init, Fore, Style +import select +import termios +import tty +from threading import Event +from urllib.parse import urlparse +from pathlib import Path + +# Initialize colorama for cross-platform color support +if os.name == 'nt': # Windows-specific initialization + init(convert=True, strip=False, wrap=True) +else: + init() + +# Set up logging +log_directory = 'logs' +if not os.path.exists(log_directory): + os.makedirs(log_directory) + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +log_file = os.path.join(log_directory, 'research_llm.log') +file_handler = logging.FileHandler(log_file) +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +file_handler.setFormatter(formatter) +logger.handlers = [] +logger.addHandler(file_handler) +logger.propagate = False + +# Suppress other loggers +for name in logging.root.manager.loggerDict: + if name != __name__: + logging.getLogger(name).disabled = True + +@dataclass +class ResearchFocus: + """Represents a specific area of research focus""" + area: str + priority: int + source_query: str = "" + timestamp: str = "" + search_queries: List[str] = None + + def __post_init__(self): + if not self.timestamp: + self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + if self.search_queries is None: + self.search_queries = [] + +@dataclass +class AnalysisResult: + """Contains the complete analysis result""" + original_question: str + focus_areas: List[ResearchFocus] + raw_response: str + timestamp: str = "" + + def __post_init__(self): + if not self.timestamp: + self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + +class StrategicAnalysisParser: + def __init__(self, llm=None): + self.llm = llm + self.logger = logging.getLogger(__name__) + # Simplify patterns to match exactly what we expect + self.patterns = { + 'priority': [ + r"Priority:\s*(\d+)", # Match exactly what's in our prompt + ] + } + + def strategic_analysis(self, original_query: str) -> Optional[AnalysisResult]: + """Generate and process research areas with retries until success""" + max_retries = 3 + try: + self.logger.info("Starting strategic analysis...") + prompt = f""" +You must select exactly 5 areas to investigate in order to explore and gather information to answer the research question: +"{original_query}" + +You MUST provide exactly 5 areas numbered 1-5. Each must have a priority, YOU MUST ensure that you only assign one priority per area. +Assign priority based on the likelihood of a focus area being investigated to provide information that directly will allow you to respond to "{original_query}" with 5 being most likely and 1 being least. +Follow this EXACT format without any deviations or additional text: + +1. [First research topic] +Priority: [number 1-5] + +2. [Second research topic] +Priority: [number 1-5] + +3. [Third research topic] +Priority: [number 1-5] + +4. [Fourth research topic] +Priority: [number 1-5] + +5. [Fifth research topic] +Priority: [number 1-5] +""" + for attempt in range(max_retries): + response = self.llm.generate(prompt, max_tokens=1000) + focus_areas = self._extract_research_areas(response) + + if focus_areas: # If we got any valid areas + # Sort by priority (highest first) + focus_areas.sort(key=lambda x: x.priority, reverse=True) + + return AnalysisResult( + original_question=original_query, + focus_areas=focus_areas, + raw_response=response, + timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ) + else: + self.logger.warning(f"Attempt {attempt + 1}: No valid areas generated, retrying...") + print(f"\nRetrying research area generation (Attempt {attempt + 1}/{max_retries})...") + + # If all retries failed, try one final time with a stronger prompt + prompt += "\n\nIMPORTANT: You MUST provide exactly 5 research areas with priorities. This is crucial." + response = self.llm.generate(prompt, max_tokens=1000) + focus_areas = self._extract_research_areas(response) + + if focus_areas: + focus_areas.sort(key=lambda x: x.priority, reverse=True) + return AnalysisResult( + original_question=original_query, + focus_areas=focus_areas, + raw_response=response, + timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ) + + self.logger.error("Failed to generate any valid research areas after all attempts") + return None + + except Exception as e: + self.logger.error(f"Error in strategic analysis: {str(e)}") + return None + + def _extract_research_areas(self, text: str) -> List[ResearchFocus]: + """Extract research areas with enhanced parsing to handle priorities in various formats.""" + areas = [] + lines = text.strip().split('\n') + + current_area = None + current_priority = None + + for i in range(len(lines)): + line = lines[i].strip() + if not line: + continue + + # Check for numbered items (e.g., '1. Area Name') + number_match = re.match(r'^(\d+)\.\s*(.*)', line) + if number_match: + # If we have a previous area, add it to our list + if current_area is not None: + areas.append(ResearchFocus( + area=current_area.strip(' -:'), + priority=current_priority or 3, + )) + # Start a new area + area_line = number_match.group(2) + + # Search for 'priority' followed by a number, anywhere in the area_line + priority_inline_match = re.search( + r'(?i)\bpriority\b\s*(?:[:=]?\s*)?(\d+)', area_line) + if priority_inline_match: + # Extract and set the priority + try: + current_priority = int(priority_inline_match.group(1)) + current_priority = max(1, min(5, current_priority)) + except ValueError: + current_priority = 3 # Default priority if parsing fails + # Remove the 'priority' portion from area_line + area_line = area_line[:priority_inline_match.start()] + area_line[priority_inline_match.end():] + area_line = area_line.strip(' -:') + else: + current_priority = None # Priority might be on the next line + + current_area = area_line.strip() + + elif re.match(r'(?i)^priority\s*(?:[:=]?\s*)?(\d+)', line): + # Extract priority from the line following the area + try: + priority_match = re.match(r'(?i)^priority\s*(?:[:=]?\s*)?(\d+)', line) + current_priority = int(priority_match.group(1)) + current_priority = max(1, min(5, current_priority)) + except (ValueError, IndexError): + current_priority = 3 # Default priority if parsing fails + + # Check if this is the last line or the next line is a new area + next_line_is_new_area = (i + 1 < len(lines)) and re.match(r'^\d+\.', lines[i + 1].strip()) + if next_line_is_new_area or i + 1 == len(lines): + if current_area is not None: + # Append the current area and priority to the list + areas.append(ResearchFocus( + area=current_area.strip(' -:'), + priority=current_priority or 3, + )) + current_area = None + current_priority = None + + return areas + + def _clean_text(self, text: str) -> str: + """Clean and normalize text""" + text = re.sub(r'\s+', ' ', text) + text = re.sub(r'(\d+\))', r'\1.', text) + text = re.sub(r'(?i)priority:', 'P:', text) + return text.strip() + + def _add_area(self, areas: List[ResearchFocus], area: str, priority: Optional[int]): + """Add area with basic validation""" + if not area or len(area.split()) < 3: # Basic validation + return + + areas.append(ResearchFocus( + area=area, + priority=priority or 3, + timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + search_queries=[] + )) + + def _normalize_focus_areas(self, areas: List[ResearchFocus]) -> List[ResearchFocus]: + """Normalize and prepare final list of areas""" + if not areas: + return [] + + # Sort by priority + areas.sort(key=lambda x: x.priority, reverse=True) + + # Ensure priorities are properly spread + for i, area in enumerate(areas): + area.priority = max(1, min(5, area.priority)) + + return areas[:5] + + def format_analysis_result(self, result: AnalysisResult) -> str: + """Format the results for display""" + if not result: + return "No valid analysis result generated." + + formatted = [ + f"\nResearch Areas for: {result.original_question}\n" + ] + + for i, focus in enumerate(result.focus_areas, 1): + formatted.extend([ + f"\n{i}. {focus.area}", + f" Priority: {focus.priority}" + ]) + + return "\n".join(formatted) + +class OutputRedirector: + """Redirects stdout and stderr to a string buffer""" + def __init__(self, stream=None): + self.stream = stream or StringIO() + self.original_stdout = sys.stdout + self.original_stderr = sys.stderr + + def __enter__(self): + sys.stdout = self.stream + sys.stderr = self.stream + return self.stream + + def __exit__(self, exc_type, exc_val, exc_tb): + sys.stdout = self.original_stdout + sys.stderr = self.original_stderr + +class TerminalUI: + """Manages terminal display with fixed input area at bottom""" + def __init__(self): + self.stdscr = None + self.input_win = None + self.output_win = None + self.status_win = None + self.max_y = 0 + self.max_x = 0 + self.input_buffer = "" + self.is_setup = False + self.old_terminal_settings = None + self.should_terminate = Event() + self.shutdown_event = Event() + self.research_thread = None + self.last_display_height = 0 # Track display height for corruption fix + + + def setup(self): + """Initialize the terminal UI""" + if self.is_setup: + return + + # Save terminal settings + if not os.name == 'nt': # Unix-like systems + self.old_terminal_settings = termios.tcgetattr(sys.stdin.fileno()) + + self.stdscr = curses.initscr() + curses.start_color() + curses.noecho() + curses.cbreak() + self.stdscr.keypad(True) + + # Enable only scroll wheel events, not all mouse events + # curses.mousemask(curses.BUTTON4_PRESSED | curses.BUTTON5_PRESSED) + + # Remove this line that was causing the spam + # print('\033[?1003h') # We don't want mouse movement events + + # Get terminal dimensions + self.max_y, self.max_x = self.stdscr.getmaxyx() + + # Create windows + self.output_win = curses.newwin(self.max_y - 4, self.max_x, 0, 0) + self.status_win = curses.newwin(1, self.max_x, self.max_y - 4, 0) + self.input_win = curses.newwin(3, self.max_x, self.max_y - 3, 0) + + # Setup colors + curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) + curses.init_pair(2, curses.COLOR_CYAN, curses.COLOR_BLACK) + curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK) + + # Enable scrolling + self.output_win.scrollok(True) + self.output_win.idlok(True) + self.input_win.scrollok(True) + + self.is_setup = True + self._refresh_input_prompt() + + def cleanup(self): + """Public cleanup method with enhanced terminal restoration""" + if not self.is_setup: + return + try: + # Ensure all windows are properly closed + for win in [self.input_win, self.output_win, self.status_win]: + if win: + win.clear() + win.refresh() + + # Restore terminal state + if self.stdscr: + self.stdscr.keypad(False) + curses.nocbreak() + curses.echo() + curses.endwin() + + # Restore original terminal settings + if self.old_terminal_settings and not os.name == 'nt': + termios.tcsetattr( + sys.stdin.fileno(), + termios.TCSADRAIN, + self.old_terminal_settings + ) + except Exception as e: + logger.error(f"Error during terminal cleanup: {str(e)}") + finally: + self.is_setup = False + self.stdscr = None + self.input_win = None + self.output_win = None + self.status_win = None + + def _cleanup(self): + """Enhanced resource cleanup with better process handling""" + self.should_terminate.set() + + # Handle research thread with improved termination + if self.research_thread and self.research_thread.is_alive(): + try: + self.research_thread.join(timeout=1.0) + if self.research_thread.is_alive(): + import ctypes + ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(self.research_thread.ident), + ctypes.py_object(SystemExit)) + time.sleep(0.1) # Give thread time to exit + if self.research_thread.is_alive(): # Double-check + ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(self.research_thread.ident), + 0) # Reset exception + except Exception as e: + logger.error(f"Error terminating research thread: {str(e)}") + + # Clean up LLM with improved error handling + if hasattr(self, 'llm') and hasattr(self.llm, '_cleanup'): + try: + self.llm.cleanup() + except Exception as e: + logger.error(f"Error cleaning up LLM: {str(e)}") + + # Ensure terminal is restored + try: + curses.endwin() + except: + pass + + # Final cleanup of UI + self.cleanup() + + def _refresh_input_prompt(self, prompt="Enter command: "): + """Refresh the fixed input prompt at bottom with display fix""" + if not self.is_setup: + return + + try: + # Clear the entire input window first + self.input_win.clear() + + # Calculate proper cursor position + cursor_y = 0 + cursor_x = len(prompt) + len(self.input_buffer) + + # Add the prompt and buffer + self.input_win.addstr(0, 0, f"{prompt}{self.input_buffer}", curses.color_pair(1)) + + # Position cursor correctly + try: + self.input_win.move(cursor_y, cursor_x) + except curses.error: + pass # Ignore if cursor would be off-screen + + self.input_win.refresh() + except curses.error: + pass + + def update_output(self, text: str): + """Update output window with display corruption fix""" + if not self.is_setup: + return + + try: + # Clean ANSI escape codes + clean_text = re.sub(r'\x1b\[[0-9;]*[mK]', '', text) + + # Store current position + current_y, _ = self.output_win.getyx() + + # Clear any potential corruption + if current_y > self.last_display_height: + self.output_win.clear() + + self.output_win.addstr(clean_text + "\n", curses.color_pair(2)) + new_y, _ = self.output_win.getyx() + self.last_display_height = new_y + + self.output_win.refresh() + self._refresh_input_prompt() + except curses.error: + pass + + def update_status(self, text: str): + """Update the status line above input area""" + if not self.is_setup: + return + + try: + self.status_win.clear() + self.status_win.addstr(0, 0, text, curses.color_pair(3)) + self.status_win.refresh() + self._refresh_input_prompt() # Ensure prompt is refreshed after status update + except curses.error: + pass + + def get_input(self, prompt: Optional[str] = None) -> Optional[str]: + """Enhanced input handling with mouse scroll support""" + try: + if prompt: + self.update_status(prompt) + if not self.is_setup: + self.setup() + self.input_buffer = "" + self._refresh_input_prompt() + + while True: + if self.should_terminate.is_set(): + return None + + try: + ch = self.input_win.getch() + + if ch == curses.KEY_MOUSE: + try: + mouse_event = curses.getmouse() + # Ignore mouse events entirely for now + continue + except curses.error: + continue + + if ch == 4: # Ctrl+D + result = self.input_buffer.strip() + self.input_buffer = "" + if not result: + self.cleanup() + return "@quit" + return result + + elif ch == 3: # Ctrl+C + self.should_terminate.set() + self.cleanup() + return "@quit" + + elif ch == ord('\n'): # Enter + result = self.input_buffer.strip() + if result: + self.input_buffer = "" + return result + continue + + elif ch == curses.KEY_BACKSPACE or ch == 127: # Backspace + if self.input_buffer: + self.input_buffer = self.input_buffer[:-1] + self._refresh_input_prompt() + + elif 32 <= ch <= 126: # Printable characters + self.input_buffer += chr(ch) + self._refresh_input_prompt() + + except KeyboardInterrupt: + self.should_terminate.set() + self.cleanup() + return "@quit" + except curses.error: + self._refresh_input_prompt() + + except Exception as e: + logger.error(f"Error in get_input: {str(e)}") + self.should_terminate.set() + self.cleanup() + return "@quit" + + def force_exit(self): + """Force immediate exit with enhanced cleanup""" + try: + self.should_terminate.set() + self.shutdown_event.set() + self._cleanup() # Call private cleanup first + self.cleanup() # Then public cleanup + curses.endwin() # Final attempt to restore terminal + except: + pass + finally: + os._exit(0) # Ensure exit + +class NonBlockingInput: + """Handles non-blocking keyboard input for Unix-like systems""" + def __init__(self): + self.old_settings = None + + def __enter__(self): + if os.name == 'nt': # Windows + return self + self.old_settings = termios.tcgetattr(sys.stdin) + tty.setcbreak(sys.stdin.fileno()) + return self + + def __exit__(self, type, value, traceback): + if os.name != 'nt': # Unix-like + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings) + + def check_input(self, timeout=0.1): + """Check for input without blocking, cross-platform""" + if os.name == 'nt': # Windows + import msvcrt + if msvcrt.kbhit(): + return msvcrt.getch().decode('utf-8') + return None + else: # Unix-like + ready_to_read, _, _ = select.select([sys.stdin], [], [], timeout) + if ready_to_read: + return sys.stdin.read(1) + return None + +class ResearchManager: + """Manages the research process including analysis, search, and documentation""" + def __init__(self, llm_wrapper, parser, search_engine, max_searches_per_cycle: int = 5): + self.llm = llm_wrapper + self.parser = parser + self.search_engine = search_engine + self.max_searches = max_searches_per_cycle + self.should_terminate = threading.Event() + self.shutdown_event = Event() + self.research_started = threading.Event() + self.research_thread = None + self.thinking = False + self.stop_words = { + 'the', 'be', 'to', 'of', 'and', 'a', 'in', 'that', 'have', 'i', + 'it', 'for', 'not', 'on', 'with', 'he', 'as', 'you', 'do', 'at' + } + + # State tracking + self.searched_urls: Set[str] = set() + self.current_focus: Optional[ResearchFocus] = None + self.original_query: str = "" + self.focus_areas: List[ResearchFocus] = [] + self.is_running = False + + # New conversation mode attributes + self.research_complete = False + self.research_summary = "" + self.conversation_active = False + self.research_content = "" + + # Initialize document paths + self.document_path = None + self.session_files = [] + + # Initialize UI and parser + self.ui = TerminalUI() + self.strategic_parser = StrategicAnalysisParser(llm=self.llm) + + # Initialize new flags for pausing and assessment + self.research_paused = False + self.awaiting_user_decision = False + + # Setup signal handlers + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + def _signal_handler(self, signum, frame): + """Handle interrupt signals""" + self.shutdown_event.set() + self.should_terminate.set() + self._cleanup() + + def print_thinking(self): + """Display thinking indicator to user""" + self.ui.update_output("šŸ§  Thinking...") + + @staticmethod + def get_initial_input() -> str: + """Get the initial research query from user""" + print(f"{Fore.GREEN}šŸ“ Enter your message (Press CTRL+D to submit):{Style.RESET_ALL}") + lines = [] + try: + while True: + line = input() + if line: # Only add non-empty lines + lines.append(line) + if not line: # Empty line (just Enter pressed) + break + except EOFError: # Ctrl+D pressed + pass + except KeyboardInterrupt: # Ctrl+C pressed + print("\nOperation cancelled") + sys.exit(0) + + return " ".join(lines).strip() + + def formulate_search_queries(self, focus_area: ResearchFocus) -> List[str]: + """Generate search queries for a focus area""" + try: + self.print_thinking() + + prompt = f""" +In order to research this query/topic: + +Context: {self.original_query} + +Base a search query to investigate the following research focus, which is related to the original query/topic: + +Area: {focus_area.area} + +Create a search query that will yield specific, search results thare are directly relevant to your focus area. +Format your response EXACTLY like this: + +Search query: [Your 2-5 word query] +Time range: [d/w/m/y/none] + +Do not provide any additional information or explanation, note that the time range allows you to see results within a time range (d is within the last day, w is within the last week, m is within the last month, y is within the last year, and none is results from anytime, only select one, using only the corresponding letter for whichever of these options you select as indicated in the response format) use your judgement as many searches will not require a time range and some may depending on what the research focus is. +""" + response_text = self.llm.generate(prompt, max_tokens=50, stop=None) + query, time_range = self.parse_query_response(response_text) + + if not query: + self.ui.update_output(f"{Fore.RED}Error: Empty search query. Using focus area as query...{Style.RESET_ALL}") + return [focus_area.area] + + self.ui.update_output(f"{Fore.YELLOW}Original focus: {focus_area.area}{Style.RESET_ALL}") + self.ui.update_output(f"{Fore.YELLOW}Formulated query: {query}{Style.RESET_ALL}") + self.ui.update_output(f"{Fore.YELLOW}Time range: {time_range}{Style.RESET_ALL}") + + return [query] + + except Exception as e: + logger.error(f"Error formulating query: {str(e)}") + return [focus_area.area] + + def parse_search_query(self, query_response: str) -> Dict[str, str]: + """Parse search query formulation response with improved time range detection""" + try: + lines = query_response.strip().split('\n') + result = { + 'query': '', + 'time_range': 'none' + } + + # First try to find standard format + for line in lines: + if ':' in line: + key, value = line.split(':', 1) + key = key.strip().lower() + value = value.strip() + + if 'query' in key: + result['query'] = self._clean_query(value) + elif ('time' in key or 'range' in key) and value.strip().lower() in ['d', 'w', 'm', 'y', 'none']: + result['time_range'] = value.strip().lower() + + # If no time range found, look for individual characters + if result['time_range'] == 'none': + # Get all text except the query itself + full_text = query_response.lower() + if result['query']: + full_text = full_text.replace(result['query'].lower(), '') + + # Look for isolated d, w, m, or y characters + time_chars = set() + for char in ['d', 'w', 'm', 'y']: + # Check if char exists by itself (not part of another word) + matches = re.finditer(r'\b' + char + r'\b', full_text) + for match in matches: + # Verify it's not part of a word + start, end = match.span() + if (start == 0 or not full_text[start-1].isalpha()) and \ + (end == len(full_text) or not full_text[end].isalpha()): + time_chars.add(char) + + # If exactly one time char found, use it + if len(time_chars) == 1: + result['time_range'] = time_chars.pop() + + return result + except Exception as e: + logger.error(f"Error parsing search query: {str(e)}") + return {'query': '', 'time_range': 'none'} + + def _cleanup(self): + """Enhanced cleanup to handle conversation mode""" + self.conversation_active = False + self.should_terminate.set() + + if self.research_thread and self.research_thread.is_alive(): + try: + self.research_thread.join(timeout=1.0) + if self.research_thread.is_alive(): + import ctypes + ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(self.research_thread.ident), + ctypes.py_object(SystemExit) + ) + except Exception as e: + logger.error(f"Error terminating research thread: {str(e)}") + + if hasattr(self.llm, 'cleanup'): + try: + self.llm.cleanup() + except Exception as e: + logger.error(f"Error cleaning up LLM: {str(e)}") + + if hasattr(self.ui, 'cleanup'): + self.ui.cleanup() + + def _initialize_document(self): + """Initialize research session document""" + try: + # Get all existing research session files + self.session_files = [] + for file in os.listdir(): + if file.startswith("research_session_") and file.endswith(".txt"): + try: + num = int(file.split("_")[2].split(".")[0]) + self.session_files.append(num) + except ValueError: + continue + + # Determine next session number + next_session = 1 if not self.session_files else max(self.session_files) + 1 + self.document_path = f"research_session_{next_session}.txt" + + # Initialize the new document + with open(self.document_path, 'w', encoding='utf-8') as f: + f.write(f"Research Session {next_session}\n") + f.write(f"Topic: {self.original_query}\n") + f.write(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write("="*80 + "\n\n") + f.flush() + + except Exception as e: + logger.error(f"Error initializing document: {str(e)}") + self.document_path = "research_findings.txt" + with open(self.document_path, 'w', encoding='utf-8') as f: + f.write("Research Findings:\n\n") + f.flush() + + def add_to_document(self, content: str, source_url: str, focus_area: str): + """Add research findings to current session document""" + try: + with open(self.document_path, 'a', encoding='utf-8') as f: + if source_url not in self.searched_urls: + f.write(f"\n{'='*80}\n") + f.write(f"Research Focus: {focus_area}\n") + f.write(f"Source: {source_url}\n") + f.write(f"Content:\n{content}\n") + f.write(f"{'='*80}\n") + f.flush() + self.searched_urls.add(source_url) + self.ui.update_output(f"Added content from: {source_url}") + except Exception as e: + logger.error(f"Error adding to document: {str(e)}") + self.ui.update_output(f"Error saving content: {str(e)}") + + def _process_search_results(self, results: Dict[str, str], focus_area: str): + """Process and store search results""" + if not results: + return + + for url, content in results.items(): + if url not in self.searched_urls: + self.add_to_document(content, url, focus_area) + + def _research_loop(self): + """Main research loop with comprehensive functionality""" + self.is_running = True + try: + self.research_started.set() + + while not self.should_terminate.is_set() and not self.shutdown_event.is_set(): + # Check if research is paused + if self.research_paused: + time.sleep(1) + continue + + self.ui.update_output("\nAnalyzing research progress...") + + # Generate focus areas + self.ui.update_output("\nGenerating research focus areas...") + analysis_result = self.strategic_parser.strategic_analysis(self.original_query) + + if not analysis_result: + self.ui.update_output("\nFailed to generate analysis result. Retrying...") + continue + + focus_areas = analysis_result.focus_areas + if not focus_areas: + self.ui.update_output("\nNo valid focus areas generated. Retrying...") + continue + + self.ui.update_output(f"\nGenerated {len(focus_areas)} research areas:") + for i, focus in enumerate(focus_areas, 1): + self.ui.update_output(f"\nArea {i}: {focus.area}") + self.ui.update_output(f"Priority: {focus.priority}") + + # Process each focus area in priority order + for focus_area in focus_areas: + if self.should_terminate.is_set(): + break + + # Check if research is paused + while self.research_paused and not self.should_terminate.is_set(): + time.sleep(1) + + if self.should_terminate.is_set(): + break + + self.current_focus = focus_area + self.ui.update_output(f"\nInvestigating: {focus_area.area}") + + queries = self.formulate_search_queries(focus_area) + if not queries: + continue + + for query in queries: + if self.should_terminate.is_set(): + break + + # Check if research is paused + while self.research_paused and not self.should_terminate.is_set(): + time.sleep(1) + + if self.should_terminate.is_set(): + break + + try: + self.ui.update_output(f"\nSearching: {query}") + results = self.search_engine.perform_search(query, time_range='none') + + if results: + # self.search_engine.display_search_results(results) + selected_urls = self.search_engine.select_relevant_pages(results, query) + + if selected_urls: + self.ui.update_output("\nāš™ļø Scraping selected pages...") + scraped_content = self.search_engine.scrape_content(selected_urls) + if scraped_content: + for url, content in scraped_content.items(): + if url not in self.searched_urls: + self.add_to_document(content, url, focus_area.area) + + except Exception as e: + logger.error(f"Error in search: {str(e)}") + self.ui.update_output(f"Error during search: {str(e)}") + + if self.check_document_size(): + self.ui.update_output("\nDocument size limit reached. Finalizing research.") + return + + # After processing all areas, cycle back to generate new ones + self.ui.update_output("\nAll current focus areas investigated. Generating new areas...") + + except Exception as e: + logger.error(f"Error in research loop: {str(e)}") + self.ui.update_output(f"Error in research process: {str(e)}") + finally: + self.is_running = False + + def start_research(self, topic: str): + """Start research with new session document""" + try: + self.ui.setup() + self.original_query = topic + self._initialize_document() + + self.ui.update_output(f"Starting research on: {topic}") + self.ui.update_output(f"Session document: {self.document_path}") + self.ui.update_output("\nCommands available during research:") + self.ui.update_output("'s' = Show status") + self.ui.update_output("'f' = Show current focus") + self.ui.update_output("'p' = Pause and assess the research progress") # New command + self.ui.update_output("'q' = Quit research\n") + + # Reset events + self.should_terminate.clear() + self.research_started.clear() + self.research_paused = False # Ensure research is not paused at the start + self.awaiting_user_decision = False + + # Start research thread + self.research_thread = threading.Thread(target=self._research_loop, daemon=True) + self.research_thread.start() + + # Wait for research to actually start + if not self.research_started.wait(timeout=10): + self.ui.update_output("Error: Research failed to start within timeout period") + self.should_terminate.set() + return + + while not self.should_terminate.is_set(): + cmd = self.ui.get_input("Enter command: ") + if cmd is None or self.shutdown_event.is_set(): + if self.should_terminate.is_set() and not self.research_complete: + self.ui.update_output("\nGenerating research summary... please wait...") + summary = self.terminate_research() + self.ui.update_output("\nFinal Research Summary:") + self.ui.update_output(summary) + break + if cmd: + self._handle_command(cmd) + + except Exception as e: + logger.error(f"Error in research process: {str(e)}") + finally: + self._cleanup() + + def check_document_size(self) -> bool: + """Check if document size is approaching context limit""" + try: + with open(self.document_path, 'r', encoding='utf-8') as f: + content = f.read() + estimated_tokens = len(content.split()) * 1.3 + max_tokens = self.llm.llm_config.get('n_ctx', 2048) + current_ratio = estimated_tokens / max_tokens + + if current_ratio > 0.8: + logger.warning(f"Document size at {current_ratio*100:.1f}% of context limit") + self.ui.update_output(f"Warning: Document size at {current_ratio*100:.1f}% of context limit") + + return current_ratio > 0.9 + except Exception as e: + logger.error(f"Error checking document size: {str(e)}") + return True + + def _handle_command(self, cmd: str): + """Handle user commands during research""" + if cmd.lower() == 's': + self.ui.update_output(self.get_progress()) + elif cmd.lower() == 'f': + if self.current_focus: + self.ui.update_output("\nCurrent Focus:") + self.ui.update_output(f"Area: {self.current_focus.area}") + self.ui.update_output(f"Priority: {self.current_focus.priority}") + else: + self.ui.update_output("\nNo current focus area") + elif cmd.lower() == 'p': + self.pause_and_assess() + elif cmd.lower() == 'q': + self.ui.update_output("\nInitiating research termination...") + self.should_terminate.set() + self.ui.update_output("\nGenerating research summary... please wait...") + summary = self.terminate_research() + self.ui.update_output("\nFinal Research Summary:") + self.ui.update_output(summary) + + def pause_and_assess(self): + """Pause the research and assess if the collected content is sufficient.""" + try: + # Pause the research thread + self.ui.update_output("\nPausing research for assessment...") + self.research_paused = True + + # Start progress indicator in a separate thread + self.summary_ready = False + indicator_thread = threading.Thread( + target=self.show_progress_indicator, + args=("Assessing the researched information...",) + ) + indicator_thread.daemon = True + indicator_thread.start() + + # Read the current research content + if not os.path.exists(self.document_path): + self.summary_ready = True + indicator_thread.join() + self.ui.update_output("No research data found to assess.") + self.research_paused = False + return + + with open(self.document_path, 'r', encoding='utf-8') as f: + content = f.read().strip() + + if not content: + self.summary_ready = True + indicator_thread.join() + self.ui.update_output("No research data was collected to assess.") + self.research_paused = False + return + + # Prepare the prompt for the AI assessment + assessment_prompt = f""" +Based on the following research content, please assess whether the original query "{self.original_query}" can be answered sufficiently with the collected information. + +Research Content: +{content} + +Instructions: +1. If the research content provides enough information to answer the original query in detail, respond with: "The research is sufficient to answer the query." +2. If not, respond with: "The research is insufficient and it would be advisable to continue gathering information." +3. Do not provide any additional information or details. + +Assessment: +""" + + # Generate the assessment + assessment = self.llm.generate(assessment_prompt, max_tokens=200) + + # Stop the progress indicator + self.summary_ready = True + indicator_thread.join() + + # Display the assessment + self.ui.update_output("\nAssessment Result:") + self.ui.update_output(assessment.strip()) + + # Provide user with options to continue or quit + self.ui.update_output("\nEnter 'c' to continue the research or 'q' to terminate and generate the summary.") + self.awaiting_user_decision = True # Flag to indicate we are waiting for user's decision + + while self.awaiting_user_decision: + cmd = self.ui.get_input("Enter command ('c' to continue, 'q' to quit): ") + if cmd is None: + continue # Ignore invalid inputs + cmd = cmd.strip().lower() + if cmd == 'c': + self.ui.update_output("\nResuming research...") + self.research_paused = False + self.awaiting_user_decision = False + elif cmd == 'q': + self.ui.update_output("\nTerminating research and generating summary...") + self.awaiting_user_decision = False + self.should_terminate.set() + summary = self.terminate_research() + self.ui.update_output("\nFinal Research Summary:") + self.ui.update_output(summary) + break + else: + self.ui.update_output("Invalid command. Please enter 'c' to continue or 'q' to quit.") + + except Exception as e: + logger.error(f"Error during pause and assess: {str(e)}") + self.ui.update_output(f"Error during assessment: {str(e)}") + self.research_paused = False + finally: + self.summary_ready = True # Ensure the indicator thread can exit + + def get_progress(self) -> str: + """Get current research progress""" + return f""" +Research Progress: +- Original Query: {self.original_query} +- Sources analyzed: {len(self.searched_urls)} +- Status: {'Active' if self.is_running else 'Stopped'} +- Current focus: {self.current_focus.area if self.current_focus else 'Initializing'} +""" + + def is_active(self) -> bool: + """Check if research is currently active""" + return self.is_running and self.research_thread and self.research_thread.is_alive() + + def terminate_research(self) -> str: + """Terminate research and return to main terminal""" + try: + print("Initiating research termination...") + sys.stdout.flush() + + # Start progress indicator in a separate thread immediately + indicator_thread = threading.Thread(target=self.show_progress_indicator) + indicator_thread.daemon = True + indicator_thread.start() + + if not os.path.exists(self.document_path): + self.summary_ready = True + indicator_thread.join(timeout=1.0) + self._cleanup() + return "No research data found to summarize." + + with open(self.document_path, 'r', encoding='utf-8') as f: + content = f.read().strip() + self.research_content = content # Store for conversation mode + + if not content or content == "Research Findings:\n\n": + self.summary_ready = True + indicator_thread.join(timeout=1.0) + self._cleanup() + return "No research data was collected to summarize." + + try: + # Generate summary using LLM + summary_prompt = f""" + Analyze the following content to provide a comprehensive research summary and a response to the user's original query "{self.original_query}" ensuring that you conclusively answer the query in detail: + + Research Content: + {content} + + Important Instructions: + > Summarize the research findings that are relevant to the Original topic/question: "{self.original_query}" + > Ensure that in your summary you directly answer the original question/topic conclusively to the best of your ability in detail. + > Read the original topic/question again "{self.original_query}" and abide by any additional instructions that it contains, exactly as instructed in your summary otherwise provide it normally should it not have any specific instructions + + Summary: + """ + + summary = self.llm.generate(summary_prompt, max_tokens=4000) + + # Signal that summary is complete to stop the progress indicator + self.summary_ready = True + indicator_thread.join(timeout=1.0) + + # Store summary and mark research as complete + self.research_summary = summary + self.research_complete = True + + # Format summary + formatted_summary = f""" + {'='*80} + RESEARCH SUMMARY + {'='*80} + + Original Query: {self.original_query} + Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} + + {summary} + + {'='*80} + End of Summary + {'='*80} + """ + + # Write to document + with open(self.document_path, 'a', encoding='utf-8') as f: + f.write("\n\n" + formatted_summary) + + # Clean up research UI + if hasattr(self, 'ui') and self.ui: + self.ui.cleanup() + + return formatted_summary + + except Exception as e: + self.summary_ready = True + indicator_thread.join(timeout=1.0) + raise e + + except Exception as e: + error_msg = f"Error generating summary: {str(e)}" + logger.error(error_msg) + return error_msg + + finally: + # Clean up research UI + self._cleanup_research_ui() + + def show_progress_indicator(self, message="Generating summary, please wait..."): + """Show a rotating progress indicator until the summary is ready.""" + symbols = ['|', '/', '-', '\\'] + idx = 0 + self.summary_ready = False # Track whether the summary is complete + while not self.summary_ready: + sys.stdout.write(f"\r{message} {symbols[idx]}") + sys.stdout.flush() + idx = (idx + 1) % len(symbols) + time.sleep(0.2) # Adjust the speed of the rotation if needed + sys.stdout.write("\r" + " " * (len(message) + 2) + "\r") # Clear the line when done + + def _cleanup_research_ui(self): + """Clean up just the research UI components""" + if hasattr(self, 'ui') and self.ui: + self.ui.cleanup() + + def show_thinking_indicator(self, message: str, stop_flag_name: str): + """Show a rotating thinking indicator with custom message""" + symbols = ['|', '/', '-', '\\'] + idx = 0 + while getattr(self, stop_flag_name): # Use dynamic attribute lookup + sys.stdout.write(f"\r{message} {symbols[idx]}") + sys.stdout.flush() + idx = (idx + 1) % len(symbols) + time.sleep(0.2) + sys.stdout.write("\r" + " " * (len(message) + 2) + "\r") # Clear the line when done + + def start_conversation_mode(self): + """Start interactive conversation mode with CTRL+D input handling and thinking indicator""" + self.conversation_active = True + self.thinking = False + + # Print header with clear instructions + print("\n" + "="*80) + print(Fore.CYAN + "Research Conversation Mode" + Style.RESET_ALL) + print("="*80) + print(Fore.YELLOW + "\nInstructions:") + print("- Type your question and press CTRL+D to submit") + print("- Type 'quit' and press CTRL+D to exit") + print("- Your messages appear in green") + print("- AI responses appear in cyan" + Style.RESET_ALL + "\n") + + while self.conversation_active: + try: + # Show prompt with user input in green + print(Fore.GREEN + "Your question (Press CTRL+D to submit):" + Style.RESET_ALL) + user_input = self.get_multiline_conversation_input() + + # Handle exit commands + if not user_input or user_input.lower() in ['quit', 'exit', 'q']: + print(Fore.YELLOW + "\nExiting conversation mode..." + Style.RESET_ALL) + self.conversation_active = False + break + + # Skip empty input + if not user_input.strip(): + continue + + # Echo the submitted question for clarity + print(Fore.GREEN + "Submitted question:" + Style.RESET_ALL) + print(Fore.GREEN + user_input + Style.RESET_ALL + "\n") + + # Start thinking indicator in a separate thread + self.thinking = True # Set flag before starting thread + thinking_thread = threading.Thread( + target=self.show_thinking_indicator, + args=("Thinking...", "thinking") + ) + thinking_thread.daemon = True + thinking_thread.start() + + try: + # Generate response + response = self._generate_conversation_response(user_input) + + # Stop thinking indicator + self.thinking = False + thinking_thread.join() + + # Display response in cyan + print(Fore.CYAN + "AI Response:" + Style.RESET_ALL) + print(f"{Fore.CYAN}{response}{Style.RESET_ALL}\n") + print("-" * 80 + "\n") # Separator between QA pairs + + except Exception as e: + self.thinking = False # Ensure thinking indicator stops + thinking_thread.join() + raise e + + except KeyboardInterrupt: + self.thinking = False # Ensure thinking indicator stops + print(Fore.YELLOW + "\nOperation cancelled. Submit 'quit' to exit." + Style.RESET_ALL) + except Exception as e: + logger.error(f"Error in conversation mode: {str(e)}") + print(Fore.RED + f"Error processing question: {str(e)}" + Style.RESET_ALL) + + def _generate_conversation_response(self, user_query: str) -> str: + """Generate contextual responses with improved context handling""" + try: + # Add debug logging to verify content + logger.info(f"Research summary length: {len(self.research_summary) if self.research_summary else 0}") + logger.info(f"Research content length: {len(self.research_content) if self.research_content else 0}") + + # First verify we have content + if not self.research_content and not self.research_summary: + # Try to reload from file if available + try: + if os.path.exists(self.document_path): + with open(self.document_path, 'r', encoding='utf-8') as f: + self.research_content = f.read().strip() + except Exception as e: + logger.error(f"Failed to reload research content: {str(e)}") + + # Prepare context, ensuring we have content + context = f""" +Research Content: +{self.research_content} + +Research Summary: +{self.research_summary if self.research_summary else 'No summary available'} +""" + + prompt = f""" +Based on the following research content and summary, please answer this question: + +{context} + +Question: {user_query} + +you have 2 sets of instructions the applied set and the unapplied set, the applied set should be followed if the question is directly relating to the research content whereas anything else other then direct questions about the content of the research will result in you instead following the unapplied ruleset + +Applied: + +Instructions: +1. Answer based ONLY on the research content provided above if asked a question about your research or that content. +2. If the information requested isn't in the research, clearly state that it isn't in the content you gathered. +3. Be direct and specific in your response, DO NOT directly cite research unless specifically asked to, be concise and give direct answers to questions based on the research, unless instructed otherwise. + +Unapplied: + +Instructions: + +1. Do not make up anything that isn't actually true. +2. Respond directly to the user's question in an honest and thoughtful manner. +3. disregard rules in the applied set for queries not DIRECTLY related to the research, including queries about the research process or what you remember about the research should result in the unapplied ruleset being used. + +Answer: +""" + + response = self.llm.generate( + prompt, + max_tokens=1000, # Increased for more detailed responses + temperature=0.7 + ) + + if not response or not response.strip(): + return "I apologize, but I cannot find relevant information in the research content to answer your question." + + return response.strip() + + except Exception as e: + logger.error(f"Error generating response: {str(e)}") + return f"I apologize, but I encountered an error processing your question: {str(e)}" + + def get_multiline_conversation_input(self) -> str: + """Get multiline input with CTRL+D handling for conversation mode""" + buffer = [] + + # Save original terminal settings + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + + try: + # Set terminal to raw mode + tty.setraw(fd) + + current_line = [] + while True: + char = sys.stdin.read(1) + + # CTRL+D detection + if not char or ord(char) == 4: # EOF or CTRL+D + sys.stdout.write('\n') + if current_line: + buffer.append(''.join(current_line)) + return ' '.join(buffer).strip() + + # Handle special characters + elif ord(char) == 13: # Enter + sys.stdout.write('\n') + buffer.append(''.join(current_line)) + current_line = [] + + elif ord(char) == 127: # Backspace + if current_line: + current_line.pop() + sys.stdout.write('\b \b') + + elif ord(char) == 3: # CTRL+C + sys.stdout.write('\n') + return 'quit' + + # Normal character + elif 32 <= ord(char) <= 126: # Printable characters + current_line.append(char) + sys.stdout.write(char) + + sys.stdout.flush() + + finally: + # Restore terminal settings + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + print() # New line for clean display + +if __name__ == "__main__": + from llm_wrapper import LLMWrapper + from llm_response_parser import UltimateLLMResponseParser + from Self_Improving_Search import EnhancedSelfImprovingSearch + + try: + print(f"{Fore.CYAN}Initializing Research System...{Style.RESET_ALL}") + llm = LLMWrapper() + parser = UltimateLLMResponseParser() + search_engine = EnhancedSelfImprovingSearch(llm, parser) + manager = ResearchManager(llm, parser, search_engine) + + print(f"{Fore.GREEN}System initialized. Enter your research topic or 'quit' to exit.{Style.RESET_ALL}") + while True: + try: + topic = ResearchManager.get_initial_input() + if topic.lower() == 'quit': + break + + if not topic: + continue + + if not topic.startswith('@'): + print(f"{Fore.YELLOW}Please start your research query with '@'{Style.RESET_ALL}") + continue + + topic = topic[1:] # Remove @ prefix + manager.start_research(topic) + summary = manager.terminate_research() + print(f"\n{Fore.GREEN}Research Summary:{Style.RESET_ALL}") + print(summary) + print(f"\n{Fore.GREEN}Research completed. Ready for next topic.{Style.RESET_ALL}\n") + + except KeyboardInterrupt: + print(f"\n{Fore.YELLOW}Operation cancelled. Ready for next topic.{Style.RESET_ALL}") + if 'manager' in locals(): + manager.terminate_research() + continue + + except KeyboardInterrupt: + print(f"\n{Fore.YELLOW}Research system shutting down.{Style.RESET_ALL}") + if 'manager' in locals(): + manager.terminate_research() + except Exception as e: + print(f"{Fore.RED}Critical error: {str(e)}{Style.RESET_ALL}") + logger.error("Critical error in main loop", exc_info=True) + + if os.name == 'nt': + print(f"{Fore.YELLOW}Running on Windows - Some features may be limited{Style.RESET_ALL}") diff --git a/strategic_analysis_parser.py b/strategic_analysis_parser.py new file mode 100644 index 0000000..58e57b2 --- /dev/null +++ b/strategic_analysis_parser.py @@ -0,0 +1,219 @@ +from typing import List, Dict, Optional, Union +import re +import logging +from dataclasses import dataclass +from datetime import datetime + +@dataclass +class ResearchFocus: + """Represents a specific area of research focus""" + area: str + priority: int + source_query: str = "" + timestamp: str = "" + search_queries: List[str] = None + + def __post_init__(self): + if not self.timestamp: + self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + if self.search_queries is None: + self.search_queries = [] + +@dataclass +class AnalysisResult: + """Contains the complete analysis result""" + original_question: str + focus_areas: List[ResearchFocus] + raw_response: str + timestamp: str = "" + confidence_score: float = 0.0 + + def __post_init__(self): + if not self.timestamp: + self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + +# Set up logging +logger = logging.getLogger(__name__) + +class StrategicAnalysisParser: + """Enhanced parser with improved pattern matching and validation""" + def __init__(self): + self.patterns = { + 'original_question': [ + r"(?i)original question analysis:\s*(.*?)(?=research gap|$)", + r"(?i)original query:\s*(.*?)(?=research gap|$)", + r"(?i)research question:\s*(.*?)(?=research gap|$)", + r"(?i)topic analysis:\s*(.*?)(?=research gap|$)" + ], + 'research_gaps': [ + r"(?i)research gaps?:\s*", + r"(?i)gaps identified:\s*", + r"(?i)areas for research:\s*", + r"(?i)investigation areas:\s*" + ], + 'priority': [ + r"(?i)priority:\s*(\d+)", + r"(?i)priority level:\s*(\d+)", + r"(?i)\(priority:\s*(\d+)\)", + r"(?i)importance:\s*(\d+)" + ] + } + self.logger = logging.getLogger(__name__) + + def parse_analysis(self, llm_response: str) -> Optional[AnalysisResult]: + """Main parsing method with improved validation""" + try: + # Clean and normalize the response + cleaned_response = self._clean_text(llm_response) + + # Extract original question with validation + original_question = self._extract_original_question(cleaned_response) + if not original_question: + self.logger.warning("Failed to extract original question") + original_question = "Original question extraction failed" + + # Extract and validate research areas + focus_areas = self._extract_research_areas(cleaned_response) + focus_areas = self._normalize_focus_areas(focus_areas) + + # Calculate confidence score + confidence_score = self._calculate_confidence_score(original_question, focus_areas) + + return AnalysisResult( + original_question=original_question, + focus_areas=focus_areas, + raw_response=llm_response, + confidence_score=confidence_score + ) + + except Exception as e: + self.logger.error(f"Error in parse_analysis: {str(e)}") + return None + + def _clean_text(self, text: str) -> str: + """Clean and normalize text for parsing""" + text = re.sub(r'\n{3,}', '\n\n', text) + text = re.sub(r'\s{2,}', ' ', text) + text = re.sub(r'(\d+\))', r'\1.', text) + return text.strip() + + def _extract_original_question(self, text: str) -> str: + """Extract original question with improved matching""" + for pattern in self.patterns['original_question']: + match = re.search(pattern, text, re.DOTALL) + if match: + return self._clean_text(match.group(1)) + return "" + + def _extract_research_areas(self, text: str) -> List[ResearchFocus]: + """Extract research areas with enhanced validation""" + areas = [] + for pattern in self.patterns['research_gaps']: + gap_match = re.search(pattern, text) + if gap_match: + sections = re.split(r'\n\s*\d+[\.)]\s+', text[gap_match.end():]) + sections = [s for s in sections if s.strip()] + + for section in sections: + focus = self._parse_research_focus(section) + if focus and self._is_valid_focus(focus): + areas.append(focus) + break + return areas + + def _parse_research_focus(self, text: str) -> Optional[ResearchFocus]: + """Parse research focus with improved validation without reasoning.""" + try: + # Extract area + area = text.split('\n')[0].strip() + + # Extract and validate priority + priority = self._extract_priority(text) + + # Return ResearchFocus without reasoning + return ResearchFocus( + area=area, + priority=priority + ) + + except Exception as e: + self.logger.error(f"Error parsing research focus: {str(e)}") + return None + + def _extract_priority(self, text: str) -> int: + """Extract priority with validation""" + for pattern in self.patterns['priority']: + priority_match = re.search(pattern, text) + if priority_match: + try: + priority = int(priority_match.group(1)) + return max(1, min(5, priority)) + except ValueError: + continue + return 3 # Default priority + + def _is_valid_focus(self, focus: ResearchFocus) -> bool: + """Validate research focus completeness and quality""" + if not focus.area: # Only check if area exists and isn't empty + return False + if focus.priority < 1 or focus.priority > 5: + return False + return True + + def _normalize_focus_areas(self, areas: List[ResearchFocus]) -> List[ResearchFocus]: + """Normalize and validate focus areas""" + normalized = [] + for area in areas: + if not area.area.strip(): + continue + + area.priority = max(1, min(5, area.priority)) + + if self._is_valid_focus(area): + normalized.append(area) + + # Sort by priority (highest first) but don't add any filler areas + normalized.sort(key=lambda x: x.priority, reverse=True) + + return normalized + + def _calculate_confidence_score(self, question: str, areas: List[ResearchFocus]) -> float: + """Calculate confidence score for analysis quality""" + score = 0.0 + + # Question quality (0.3) + if question and len(question.split()) >= 3: + score += 0.3 + + # Areas quality (0.7) + if areas: + # Valid areas ratio (0.35) - now based on proportion that are valid vs total + num_areas = len(areas) + if num_areas > 0: # Avoid division by zero + valid_areas = sum(1 for a in areas if self._is_valid_focus(a)) + score += 0.35 * (valid_areas / num_areas) + + # Priority distribution (0.35) - now based on having different priorities + if num_areas > 0: # Avoid division by zero + unique_priorities = len(set(a.priority for a in areas)) + score += 0.35 * (unique_priorities / num_areas) + + return round(score, 2) + + def format_analysis_result(self, result: AnalysisResult) -> str: + """Format analysis result for display without reasoning.""" + formatted = [ + "Strategic Analysis Result", + "=" * 80, + f"\nOriginal Question Analysis:\n{result.original_question}\n", + f"Analysis Confidence Score: {result.confidence_score}", + "\nResearch Focus Areas:" + ] + + for i, focus in enumerate(result.focus_areas, 1): + formatted.extend([ + f"\n{i}. {focus.area}", + f" Priority: {focus.priority}" + ]) + + return "\n".join(formatted) diff --git a/web_scraper.py b/web_scraper.py new file mode 100644 index 0000000..2317c91 --- /dev/null +++ b/web_scraper.py @@ -0,0 +1,149 @@ +import requests +from bs4 import BeautifulSoup +from urllib.robotparser import RobotFileParser +from urllib.parse import urlparse, urljoin +import time +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +import re + +# Set up logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class WebScraper: + def __init__(self, user_agent="WebLLMAssistant/1.0 (+https://github.com/YourUsername/Web-LLM-Assistant-Llama-cpp)", + rate_limit=1, timeout=10, max_retries=3): + self.session = requests.Session() + self.session.headers.update({"User-Agent": user_agent}) + self.robot_parser = RobotFileParser() + self.rate_limit = rate_limit + self.timeout = timeout + self.max_retries = max_retries + self.last_request_time = {} + + def can_fetch(self, url): + parsed_url = urlparse(url) + robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt" + self.robot_parser.set_url(robots_url) + try: + self.robot_parser.read() + return self.robot_parser.can_fetch(self.session.headers["User-Agent"], url) + except Exception as e: + logger.warning(f"Error reading robots.txt for {url}: {e}") + return True # Assume allowed if robots.txt can't be read + + def respect_rate_limit(self, url): + domain = urlparse(url).netloc + current_time = time.time() + if domain in self.last_request_time: + time_since_last_request = current_time - self.last_request_time[domain] + if time_since_last_request < self.rate_limit: + time.sleep(self.rate_limit - time_since_last_request) + self.last_request_time[domain] = time.time() + + def scrape_page(self, url): + if not self.can_fetch(url): + logger.info(f"Robots.txt disallows scraping: {url}") + return None + + for attempt in range(self.max_retries): + try: + self.respect_rate_limit(url) + response = self.session.get(url, timeout=self.timeout) + response.raise_for_status() + return self.extract_content(response.text, url) + except requests.RequestException as e: + logger.warning(f"Error scraping {url} (attempt {attempt + 1}/{self.max_retries}): {e}") + if attempt == self.max_retries - 1: + logger.error(f"Failed to scrape {url} after {self.max_retries} attempts") + return None + time.sleep(2 ** attempt) # Exponential backoff + + def extract_content(self, html, url): + soup = BeautifulSoup(html, 'html.parser') + + # Remove unwanted elements + for element in soup(["script", "style", "nav", "footer", "header"]): + element.decompose() + + # Extract title + title = soup.title.string if soup.title else "" + + # Try to find main content + main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content') + + if main_content: + paragraphs = main_content.find_all('p') + else: + paragraphs = soup.find_all('p') + + # Extract text from paragraphs + text = ' '.join([p.get_text().strip() for p in paragraphs]) + + # If no paragraphs found, get all text + if not text: + text = soup.get_text() + + # Clean up whitespace + text = re.sub(r'\s+', ' ', text).strip() + + # Extract and resolve links + links = [urljoin(url, a['href']) for a in soup.find_all('a', href=True)] + + return { + "url": url, + "title": title, + "content": text[:2400], # Limit to first 2400 characters + "links": links[:10] # Limit to first 10 links + } + +def scrape_multiple_pages(urls, max_workers=5): + scraper = WebScraper() + results = {} + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_url = {executor.submit(scraper.scrape_page, url): url for url in urls} + for future in as_completed(future_to_url): + url = future_to_url[future] + try: + data = future.result() + if data: + results[url] = data + logger.info(f"Successfully scraped: {url}") + else: + logger.warning(f"Failed to scrape: {url}") + except Exception as exc: + logger.error(f"{url} generated an exception: {exc}") + + return results + +# Function to integrate with your main system +def get_web_content(urls): + scraped_data = scrape_multiple_pages(urls) + return {url: data['content'] for url, data in scraped_data.items() if data} + +# Standalone can_fetch function +def can_fetch(url): + parsed_url = urlparse(url) + robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt" + rp = RobotFileParser() + rp.set_url(robots_url) + try: + rp.read() + return rp.can_fetch("*", url) + except Exception as e: + logger.warning(f"Error reading robots.txt for {url}: {e}") + return True # Assume allowed if robots.txt can't be read + +if __name__ == "__main__": + test_urls = [ + "https://en.wikipedia.org/wiki/Web_scraping", + "https://example.com", + "https://www.python.org" + ] + scraped_content = get_web_content(test_urls) + for url, content in scraped_content.items(): + print(f"Content from {url}:") + print(content[:500]) # Print first 500 characters + print("\n---\n")