Automated-AI-Web-Researcher.../research_manager.py

1482 lines
58 KiB
Python
Raw Permalink Normal View History

2024-11-26 02:25:04 +00:00
import os
import sys
import threading
import time
import re
import json
import logging
import curses
import signal
from typing import List, Dict, Set, Optional, Tuple, Union
from dataclasses import dataclass
from queue import Queue
from datetime import datetime
from io import StringIO
from colorama import init, Fore, Style
import select
import termios
import tty
from threading import Event
from urllib.parse import urlparse
from pathlib import Path
# Initialize colorama for cross-platform color support
if os.name == 'nt': # Windows-specific initialization
init(convert=True, strip=False, wrap=True)
else:
init()
# Set up logging
log_directory = 'logs'
if not os.path.exists(log_directory):
os.makedirs(log_directory)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
log_file = os.path.join(log_directory, 'research_llm.log')
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.handlers = []
logger.addHandler(file_handler)
logger.propagate = False
# Suppress other loggers
for name in logging.root.manager.loggerDict:
if name != __name__:
logging.getLogger(name).disabled = True
@dataclass
class ResearchFocus:
"""Represents a specific area of research focus"""
area: str
priority: int
source_query: str = ""
timestamp: str = ""
search_queries: List[str] = None
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if self.search_queries is None:
self.search_queries = []
@dataclass
class AnalysisResult:
"""Contains the complete analysis result"""
original_question: str
focus_areas: List[ResearchFocus]
raw_response: str
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
class StrategicAnalysisParser:
def __init__(self, llm=None):
self.llm = llm
self.logger = logging.getLogger(__name__)
# Simplify patterns to match exactly what we expect
self.patterns = {
'priority': [
r"Priority:\s*(\d+)", # Match exactly what's in our prompt
]
}
def strategic_analysis(self, original_query: str) -> Optional[AnalysisResult]:
"""Generate and process research areas with retries until success"""
max_retries = 3
try:
self.logger.info("Starting strategic analysis...")
prompt = f"""
You must select exactly 5 areas to investigate in order to explore and gather information to answer the research question:
"{original_query}"
You MUST provide exactly 5 areas numbered 1-5. Each must have a priority, YOU MUST ensure that you only assign one priority per area.
Assign priority based on the likelihood of a focus area being investigated to provide information that directly will allow you to respond to "{original_query}" with 5 being most likely and 1 being least.
Follow this EXACT format without any deviations or additional text:
1. [First research topic]
Priority: [number 1-5]
2. [Second research topic]
Priority: [number 1-5]
3. [Third research topic]
Priority: [number 1-5]
4. [Fourth research topic]
Priority: [number 1-5]
5. [Fifth research topic]
Priority: [number 1-5]
"""
for attempt in range(max_retries):
response = self.llm.generate(prompt, max_tokens=1000)
focus_areas = self._extract_research_areas(response)
if focus_areas: # If we got any valid areas
# Sort by priority (highest first)
focus_areas.sort(key=lambda x: x.priority, reverse=True)
return AnalysisResult(
original_question=original_query,
focus_areas=focus_areas,
raw_response=response,
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
else:
self.logger.warning(f"Attempt {attempt + 1}: No valid areas generated, retrying...")
print(f"\nRetrying research area generation (Attempt {attempt + 1}/{max_retries})...")
# If all retries failed, try one final time with a stronger prompt
prompt += "\n\nIMPORTANT: You MUST provide exactly 5 research areas with priorities. This is crucial."
response = self.llm.generate(prompt, max_tokens=1000)
focus_areas = self._extract_research_areas(response)
if focus_areas:
focus_areas.sort(key=lambda x: x.priority, reverse=True)
return AnalysisResult(
original_question=original_query,
focus_areas=focus_areas,
raw_response=response,
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
self.logger.error("Failed to generate any valid research areas after all attempts")
return None
except Exception as e:
self.logger.error(f"Error in strategic analysis: {str(e)}")
return None
def _extract_research_areas(self, text: str) -> List[ResearchFocus]:
"""Extract research areas with enhanced parsing to handle priorities in various formats."""
areas = []
lines = text.strip().split('\n')
current_area = None
current_priority = None
for i in range(len(lines)):
line = lines[i].strip()
if not line:
continue
# Check for numbered items (e.g., '1. Area Name')
number_match = re.match(r'^(\d+)\.\s*(.*)', line)
if number_match:
# If we have a previous area, add it to our list
if current_area is not None:
areas.append(ResearchFocus(
area=current_area.strip(' -:'),
priority=current_priority or 3,
))
# Start a new area
area_line = number_match.group(2)
# Search for 'priority' followed by a number, anywhere in the area_line
priority_inline_match = re.search(
r'(?i)\bpriority\b\s*(?:[:=]?\s*)?(\d+)', area_line)
if priority_inline_match:
# Extract and set the priority
try:
current_priority = int(priority_inline_match.group(1))
current_priority = max(1, min(5, current_priority))
except ValueError:
current_priority = 3 # Default priority if parsing fails
# Remove the 'priority' portion from area_line
area_line = area_line[:priority_inline_match.start()] + area_line[priority_inline_match.end():]
area_line = area_line.strip(' -:')
else:
current_priority = None # Priority might be on the next line
current_area = area_line.strip()
elif re.match(r'(?i)^priority\s*(?:[:=]?\s*)?(\d+)', line):
# Extract priority from the line following the area
try:
priority_match = re.match(r'(?i)^priority\s*(?:[:=]?\s*)?(\d+)', line)
current_priority = int(priority_match.group(1))
current_priority = max(1, min(5, current_priority))
except (ValueError, IndexError):
current_priority = 3 # Default priority if parsing fails
# Check if this is the last line or the next line is a new area
next_line_is_new_area = (i + 1 < len(lines)) and re.match(r'^\d+\.', lines[i + 1].strip())
if next_line_is_new_area or i + 1 == len(lines):
if current_area is not None:
# Append the current area and priority to the list
areas.append(ResearchFocus(
area=current_area.strip(' -:'),
priority=current_priority or 3,
))
current_area = None
current_priority = None
return areas
def _clean_text(self, text: str) -> str:
"""Clean and normalize text"""
text = re.sub(r'\s+', ' ', text)
text = re.sub(r'(\d+\))', r'\1.', text)
text = re.sub(r'(?i)priority:', 'P:', text)
return text.strip()
def _add_area(self, areas: List[ResearchFocus], area: str, priority: Optional[int]):
"""Add area with basic validation"""
if not area or len(area.split()) < 3: # Basic validation
return
areas.append(ResearchFocus(
area=area,
priority=priority or 3,
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
search_queries=[]
))
def _normalize_focus_areas(self, areas: List[ResearchFocus]) -> List[ResearchFocus]:
"""Normalize and prepare final list of areas"""
if not areas:
return []
# Sort by priority
areas.sort(key=lambda x: x.priority, reverse=True)
# Ensure priorities are properly spread
for i, area in enumerate(areas):
area.priority = max(1, min(5, area.priority))
return areas[:5]
def format_analysis_result(self, result: AnalysisResult) -> str:
"""Format the results for display"""
if not result:
return "No valid analysis result generated."
formatted = [
f"\nResearch Areas for: {result.original_question}\n"
]
for i, focus in enumerate(result.focus_areas, 1):
formatted.extend([
f"\n{i}. {focus.area}",
f" Priority: {focus.priority}"
])
return "\n".join(formatted)
class OutputRedirector:
"""Redirects stdout and stderr to a string buffer"""
def __init__(self, stream=None):
self.stream = stream or StringIO()
self.original_stdout = sys.stdout
self.original_stderr = sys.stderr
def __enter__(self):
sys.stdout = self.stream
sys.stderr = self.stream
return self.stream
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout = self.original_stdout
sys.stderr = self.original_stderr
class TerminalUI:
"""Manages terminal display with fixed input area at bottom"""
def __init__(self):
self.stdscr = None
self.input_win = None
self.output_win = None
self.status_win = None
self.max_y = 0
self.max_x = 0
self.input_buffer = ""
self.is_setup = False
self.old_terminal_settings = None
self.should_terminate = Event()
self.shutdown_event = Event()
self.research_thread = None
self.last_display_height = 0 # Track display height for corruption fix
def setup(self):
"""Initialize the terminal UI"""
if self.is_setup:
return
# Save terminal settings
if not os.name == 'nt': # Unix-like systems
self.old_terminal_settings = termios.tcgetattr(sys.stdin.fileno())
self.stdscr = curses.initscr()
curses.start_color()
curses.noecho()
curses.cbreak()
self.stdscr.keypad(True)
# Enable only scroll wheel events, not all mouse events
# curses.mousemask(curses.BUTTON4_PRESSED | curses.BUTTON5_PRESSED)
# Remove this line that was causing the spam
# print('\033[?1003h') # We don't want mouse movement events
# Get terminal dimensions
self.max_y, self.max_x = self.stdscr.getmaxyx()
# Create windows
self.output_win = curses.newwin(self.max_y - 4, self.max_x, 0, 0)
self.status_win = curses.newwin(1, self.max_x, self.max_y - 4, 0)
self.input_win = curses.newwin(3, self.max_x, self.max_y - 3, 0)
# Setup colors
curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_CYAN, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK)
# Enable scrolling
self.output_win.scrollok(True)
self.output_win.idlok(True)
self.input_win.scrollok(True)
self.is_setup = True
self._refresh_input_prompt()
def cleanup(self):
"""Public cleanup method with enhanced terminal restoration"""
if not self.is_setup:
return
try:
# Ensure all windows are properly closed
for win in [self.input_win, self.output_win, self.status_win]:
if win:
win.clear()
win.refresh()
# Restore terminal state
if self.stdscr:
self.stdscr.keypad(False)
curses.nocbreak()
curses.echo()
curses.endwin()
# Restore original terminal settings
if self.old_terminal_settings and not os.name == 'nt':
termios.tcsetattr(
sys.stdin.fileno(),
termios.TCSADRAIN,
self.old_terminal_settings
)
except Exception as e:
logger.error(f"Error during terminal cleanup: {str(e)}")
finally:
self.is_setup = False
self.stdscr = None
self.input_win = None
self.output_win = None
self.status_win = None
def _cleanup(self):
"""Enhanced resource cleanup with better process handling"""
self.should_terminate.set()
# Handle research thread with improved termination
if self.research_thread and self.research_thread.is_alive():
try:
self.research_thread.join(timeout=1.0)
if self.research_thread.is_alive():
import ctypes
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self.research_thread.ident),
ctypes.py_object(SystemExit))
time.sleep(0.1) # Give thread time to exit
if self.research_thread.is_alive(): # Double-check
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self.research_thread.ident),
0) # Reset exception
except Exception as e:
logger.error(f"Error terminating research thread: {str(e)}")
# Clean up LLM with improved error handling
if hasattr(self, 'llm') and hasattr(self.llm, '_cleanup'):
try:
self.llm.cleanup()
except Exception as e:
logger.error(f"Error cleaning up LLM: {str(e)}")
# Ensure terminal is restored
try:
curses.endwin()
except:
pass
# Final cleanup of UI
self.cleanup()
def _refresh_input_prompt(self, prompt="Enter command: "):
"""Refresh the fixed input prompt at bottom with display fix"""
if not self.is_setup:
return
try:
# Clear the entire input window first
self.input_win.clear()
# Calculate proper cursor position
cursor_y = 0
cursor_x = len(prompt) + len(self.input_buffer)
# Add the prompt and buffer
self.input_win.addstr(0, 0, f"{prompt}{self.input_buffer}", curses.color_pair(1))
# Position cursor correctly
try:
self.input_win.move(cursor_y, cursor_x)
except curses.error:
pass # Ignore if cursor would be off-screen
self.input_win.refresh()
except curses.error:
pass
def update_output(self, text: str):
"""Update output window with display corruption fix"""
if not self.is_setup:
return
try:
# Clean ANSI escape codes
clean_text = re.sub(r'\x1b\[[0-9;]*[mK]', '', text)
# Store current position
current_y, _ = self.output_win.getyx()
# Clear any potential corruption
if current_y > self.last_display_height:
self.output_win.clear()
self.output_win.addstr(clean_text + "\n", curses.color_pair(2))
new_y, _ = self.output_win.getyx()
self.last_display_height = new_y
self.output_win.refresh()
self._refresh_input_prompt()
except curses.error:
pass
def update_status(self, text: str):
"""Update the status line above input area"""
if not self.is_setup:
return
try:
self.status_win.clear()
self.status_win.addstr(0, 0, text, curses.color_pair(3))
self.status_win.refresh()
self._refresh_input_prompt() # Ensure prompt is refreshed after status update
except curses.error:
pass
def get_input(self, prompt: Optional[str] = None) -> Optional[str]:
"""Enhanced input handling with mouse scroll support"""
try:
if prompt:
self.update_status(prompt)
if not self.is_setup:
self.setup()
self.input_buffer = ""
self._refresh_input_prompt()
while True:
if self.should_terminate.is_set():
return None
try:
ch = self.input_win.getch()
if ch == curses.KEY_MOUSE:
try:
mouse_event = curses.getmouse()
# Ignore mouse events entirely for now
continue
except curses.error:
continue
if ch == 4: # Ctrl+D
result = self.input_buffer.strip()
self.input_buffer = ""
if not result:
self.cleanup()
return "@quit"
return result
elif ch == 3: # Ctrl+C
self.should_terminate.set()
self.cleanup()
return "@quit"
elif ch == ord('\n'): # Enter
result = self.input_buffer.strip()
if result:
self.input_buffer = ""
return result
continue
elif ch == curses.KEY_BACKSPACE or ch == 127: # Backspace
if self.input_buffer:
self.input_buffer = self.input_buffer[:-1]
self._refresh_input_prompt()
elif 32 <= ch <= 126: # Printable characters
self.input_buffer += chr(ch)
self._refresh_input_prompt()
except KeyboardInterrupt:
self.should_terminate.set()
self.cleanup()
return "@quit"
except curses.error:
self._refresh_input_prompt()
except Exception as e:
logger.error(f"Error in get_input: {str(e)}")
self.should_terminate.set()
self.cleanup()
return "@quit"
def force_exit(self):
"""Force immediate exit with enhanced cleanup"""
try:
self.should_terminate.set()
self.shutdown_event.set()
self._cleanup() # Call private cleanup first
self.cleanup() # Then public cleanup
curses.endwin() # Final attempt to restore terminal
except:
pass
finally:
os._exit(0) # Ensure exit
class NonBlockingInput:
"""Handles non-blocking keyboard input for Unix-like systems"""
def __init__(self):
self.old_settings = None
def __enter__(self):
if os.name == 'nt': # Windows
return self
self.old_settings = termios.tcgetattr(sys.stdin)
tty.setcbreak(sys.stdin.fileno())
return self
def __exit__(self, type, value, traceback):
if os.name != 'nt': # Unix-like
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings)
def check_input(self, timeout=0.1):
"""Check for input without blocking, cross-platform"""
if os.name == 'nt': # Windows
import msvcrt
if msvcrt.kbhit():
return msvcrt.getch().decode('utf-8')
return None
else: # Unix-like
ready_to_read, _, _ = select.select([sys.stdin], [], [], timeout)
if ready_to_read:
return sys.stdin.read(1)
return None
class ResearchManager:
"""Manages the research process including analysis, search, and documentation"""
def __init__(self, llm_wrapper, parser, search_engine, max_searches_per_cycle: int = 5):
self.llm = llm_wrapper
self.parser = parser
self.search_engine = search_engine
self.max_searches = max_searches_per_cycle
self.should_terminate = threading.Event()
self.shutdown_event = Event()
self.research_started = threading.Event()
self.research_thread = None
self.thinking = False
self.stop_words = {
'the', 'be', 'to', 'of', 'and', 'a', 'in', 'that', 'have', 'i',
'it', 'for', 'not', 'on', 'with', 'he', 'as', 'you', 'do', 'at'
}
# State tracking
self.searched_urls: Set[str] = set()
self.current_focus: Optional[ResearchFocus] = None
self.original_query: str = ""
self.focus_areas: List[ResearchFocus] = []
self.is_running = False
# New conversation mode attributes
self.research_complete = False
self.research_summary = ""
self.conversation_active = False
self.research_content = ""
# Initialize document paths
self.document_path = None
self.session_files = []
# Initialize UI and parser
self.ui = TerminalUI()
self.strategic_parser = StrategicAnalysisParser(llm=self.llm)
# Initialize new flags for pausing and assessment
self.research_paused = False
self.awaiting_user_decision = False
# Setup signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle interrupt signals"""
self.shutdown_event.set()
self.should_terminate.set()
self._cleanup()
def print_thinking(self):
"""Display thinking indicator to user"""
self.ui.update_output("🧠 Thinking...")
@staticmethod
def get_initial_input() -> str:
"""Get the initial research query from user"""
print(f"{Fore.GREEN}📝 Enter your message (Press CTRL+D to submit):{Style.RESET_ALL}")
lines = []
try:
while True:
line = input()
if line: # Only add non-empty lines
lines.append(line)
if not line: # Empty line (just Enter pressed)
break
except EOFError: # Ctrl+D pressed
pass
except KeyboardInterrupt: # Ctrl+C pressed
print("\nOperation cancelled")
sys.exit(0)
return " ".join(lines).strip()
def formulate_search_queries(self, focus_area: ResearchFocus) -> List[str]:
"""Generate search queries for a focus area"""
try:
self.print_thinking()
prompt = f"""
In order to research this query/topic:
Context: {self.original_query}
Base a search query to investigate the following research focus, which is related to the original query/topic:
Area: {focus_area.area}
Create a search query that will yield specific, search results thare are directly relevant to your focus area.
Format your response EXACTLY like this:
Search query: [Your 2-5 word query]
Time range: [d/w/m/y/none]
Do not provide any additional information or explanation, note that the time range allows you to see results within a time range (d is within the last day, w is within the last week, m is within the last month, y is within the last year, and none is results from anytime, only select one, using only the corresponding letter for whichever of these options you select as indicated in the response format) use your judgement as many searches will not require a time range and some may depending on what the research focus is.
"""
response_text = self.llm.generate(prompt, max_tokens=50, stop=None)
query, time_range = self.parse_query_response(response_text)
if not query:
self.ui.update_output(f"{Fore.RED}Error: Empty search query. Using focus area as query...{Style.RESET_ALL}")
return [focus_area.area]
self.ui.update_output(f"{Fore.YELLOW}Original focus: {focus_area.area}{Style.RESET_ALL}")
self.ui.update_output(f"{Fore.YELLOW}Formulated query: {query}{Style.RESET_ALL}")
self.ui.update_output(f"{Fore.YELLOW}Time range: {time_range}{Style.RESET_ALL}")
return [query]
except Exception as e:
logger.error(f"Error formulating query: {str(e)}")
return [focus_area.area]
def parse_search_query(self, query_response: str) -> Dict[str, str]:
"""Parse search query formulation response with improved time range detection"""
try:
lines = query_response.strip().split('\n')
result = {
'query': '',
'time_range': 'none'
}
# First try to find standard format
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
key = key.strip().lower()
value = value.strip()
if 'query' in key:
result['query'] = self._clean_query(value)
elif ('time' in key or 'range' in key) and value.strip().lower() in ['d', 'w', 'm', 'y', 'none']:
result['time_range'] = value.strip().lower()
# If no time range found, look for individual characters
if result['time_range'] == 'none':
# Get all text except the query itself
full_text = query_response.lower()
if result['query']:
full_text = full_text.replace(result['query'].lower(), '')
# Look for isolated d, w, m, or y characters
time_chars = set()
for char in ['d', 'w', 'm', 'y']:
# Check if char exists by itself (not part of another word)
matches = re.finditer(r'\b' + char + r'\b', full_text)
for match in matches:
# Verify it's not part of a word
start, end = match.span()
if (start == 0 or not full_text[start-1].isalpha()) and \
(end == len(full_text) or not full_text[end].isalpha()):
time_chars.add(char)
# If exactly one time char found, use it
if len(time_chars) == 1:
result['time_range'] = time_chars.pop()
return result
except Exception as e:
logger.error(f"Error parsing search query: {str(e)}")
return {'query': '', 'time_range': 'none'}
def _cleanup(self):
"""Enhanced cleanup to handle conversation mode"""
self.conversation_active = False
self.should_terminate.set()
if self.research_thread and self.research_thread.is_alive():
try:
self.research_thread.join(timeout=1.0)
if self.research_thread.is_alive():
import ctypes
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self.research_thread.ident),
ctypes.py_object(SystemExit)
)
except Exception as e:
logger.error(f"Error terminating research thread: {str(e)}")
if hasattr(self.llm, 'cleanup'):
try:
self.llm.cleanup()
except Exception as e:
logger.error(f"Error cleaning up LLM: {str(e)}")
if hasattr(self.ui, 'cleanup'):
self.ui.cleanup()
def _initialize_document(self):
"""Initialize research session document"""
try:
# Get all existing research session files
self.session_files = []
for file in os.listdir():
if file.startswith("research_session_") and file.endswith(".txt"):
try:
num = int(file.split("_")[2].split(".")[0])
self.session_files.append(num)
except ValueError:
continue
# Determine next session number
next_session = 1 if not self.session_files else max(self.session_files) + 1
self.document_path = f"research_session_{next_session}.txt"
# Initialize the new document
with open(self.document_path, 'w', encoding='utf-8') as f:
f.write(f"Research Session {next_session}\n")
f.write(f"Topic: {self.original_query}\n")
f.write(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("="*80 + "\n\n")
f.flush()
except Exception as e:
logger.error(f"Error initializing document: {str(e)}")
self.document_path = "research_findings.txt"
with open(self.document_path, 'w', encoding='utf-8') as f:
f.write("Research Findings:\n\n")
f.flush()
def add_to_document(self, content: str, source_url: str, focus_area: str):
"""Add research findings to current session document"""
try:
with open(self.document_path, 'a', encoding='utf-8') as f:
if source_url not in self.searched_urls:
f.write(f"\n{'='*80}\n")
f.write(f"Research Focus: {focus_area}\n")
f.write(f"Source: {source_url}\n")
f.write(f"Content:\n{content}\n")
f.write(f"{'='*80}\n")
f.flush()
self.searched_urls.add(source_url)
self.ui.update_output(f"Added content from: {source_url}")
except Exception as e:
logger.error(f"Error adding to document: {str(e)}")
self.ui.update_output(f"Error saving content: {str(e)}")
def _process_search_results(self, results: Dict[str, str], focus_area: str):
"""Process and store search results"""
if not results:
return
for url, content in results.items():
if url not in self.searched_urls:
self.add_to_document(content, url, focus_area)
def _research_loop(self):
"""Main research loop with comprehensive functionality"""
self.is_running = True
try:
self.research_started.set()
while not self.should_terminate.is_set() and not self.shutdown_event.is_set():
# Check if research is paused
if self.research_paused:
time.sleep(1)
continue
self.ui.update_output("\nAnalyzing research progress...")
# Generate focus areas
self.ui.update_output("\nGenerating research focus areas...")
analysis_result = self.strategic_parser.strategic_analysis(self.original_query)
if not analysis_result:
self.ui.update_output("\nFailed to generate analysis result. Retrying...")
continue
focus_areas = analysis_result.focus_areas
if not focus_areas:
self.ui.update_output("\nNo valid focus areas generated. Retrying...")
continue
self.ui.update_output(f"\nGenerated {len(focus_areas)} research areas:")
for i, focus in enumerate(focus_areas, 1):
self.ui.update_output(f"\nArea {i}: {focus.area}")
self.ui.update_output(f"Priority: {focus.priority}")
# Process each focus area in priority order
for focus_area in focus_areas:
if self.should_terminate.is_set():
break
# Check if research is paused
while self.research_paused and not self.should_terminate.is_set():
time.sleep(1)
if self.should_terminate.is_set():
break
self.current_focus = focus_area
self.ui.update_output(f"\nInvestigating: {focus_area.area}")
queries = self.formulate_search_queries(focus_area)
if not queries:
continue
for query in queries:
if self.should_terminate.is_set():
break
# Check if research is paused
while self.research_paused and not self.should_terminate.is_set():
time.sleep(1)
if self.should_terminate.is_set():
break
try:
self.ui.update_output(f"\nSearching: {query}")
results = self.search_engine.perform_search(query, time_range='none')
if results:
# self.search_engine.display_search_results(results)
selected_urls = self.search_engine.select_relevant_pages(results, query)
if selected_urls:
self.ui.update_output("\n⚙️ Scraping selected pages...")
scraped_content = self.search_engine.scrape_content(selected_urls)
if scraped_content:
for url, content in scraped_content.items():
if url not in self.searched_urls:
self.add_to_document(content, url, focus_area.area)
except Exception as e:
logger.error(f"Error in search: {str(e)}")
self.ui.update_output(f"Error during search: {str(e)}")
if self.check_document_size():
self.ui.update_output("\nDocument size limit reached. Finalizing research.")
return
# After processing all areas, cycle back to generate new ones
self.ui.update_output("\nAll current focus areas investigated. Generating new areas...")
except Exception as e:
logger.error(f"Error in research loop: {str(e)}")
self.ui.update_output(f"Error in research process: {str(e)}")
finally:
self.is_running = False
def start_research(self, topic: str):
"""Start research with new session document"""
try:
self.ui.setup()
self.original_query = topic
self._initialize_document()
self.ui.update_output(f"Starting research on: {topic}")
self.ui.update_output(f"Session document: {self.document_path}")
self.ui.update_output("\nCommands available during research:")
self.ui.update_output("'s' = Show status")
self.ui.update_output("'f' = Show current focus")
self.ui.update_output("'p' = Pause and assess the research progress") # New command
self.ui.update_output("'q' = Quit research\n")
# Reset events
self.should_terminate.clear()
self.research_started.clear()
self.research_paused = False # Ensure research is not paused at the start
self.awaiting_user_decision = False
# Start research thread
self.research_thread = threading.Thread(target=self._research_loop, daemon=True)
self.research_thread.start()
# Wait for research to actually start
if not self.research_started.wait(timeout=10):
self.ui.update_output("Error: Research failed to start within timeout period")
self.should_terminate.set()
return
while not self.should_terminate.is_set():
cmd = self.ui.get_input("Enter command: ")
if cmd is None or self.shutdown_event.is_set():
if self.should_terminate.is_set() and not self.research_complete:
self.ui.update_output("\nGenerating research summary... please wait...")
summary = self.terminate_research()
self.ui.update_output("\nFinal Research Summary:")
self.ui.update_output(summary)
break
if cmd:
self._handle_command(cmd)
except Exception as e:
logger.error(f"Error in research process: {str(e)}")
finally:
self._cleanup()
def check_document_size(self) -> bool:
"""Check if document size is approaching context limit"""
try:
with open(self.document_path, 'r', encoding='utf-8') as f:
content = f.read()
estimated_tokens = len(content.split()) * 1.3
max_tokens = self.llm.llm_config.get('n_ctx', 2048)
current_ratio = estimated_tokens / max_tokens
if current_ratio > 0.8:
logger.warning(f"Document size at {current_ratio*100:.1f}% of context limit")
self.ui.update_output(f"Warning: Document size at {current_ratio*100:.1f}% of context limit")
return current_ratio > 0.9
except Exception as e:
logger.error(f"Error checking document size: {str(e)}")
return True
def _handle_command(self, cmd: str):
"""Handle user commands during research"""
if cmd.lower() == 's':
self.ui.update_output(self.get_progress())
elif cmd.lower() == 'f':
if self.current_focus:
self.ui.update_output("\nCurrent Focus:")
self.ui.update_output(f"Area: {self.current_focus.area}")
self.ui.update_output(f"Priority: {self.current_focus.priority}")
else:
self.ui.update_output("\nNo current focus area")
elif cmd.lower() == 'p':
self.pause_and_assess()
elif cmd.lower() == 'q':
self.ui.update_output("\nInitiating research termination...")
self.should_terminate.set()
self.ui.update_output("\nGenerating research summary... please wait...")
summary = self.terminate_research()
self.ui.update_output("\nFinal Research Summary:")
self.ui.update_output(summary)
def pause_and_assess(self):
"""Pause the research and assess if the collected content is sufficient."""
try:
# Pause the research thread
self.ui.update_output("\nPausing research for assessment...")
self.research_paused = True
# Start progress indicator in a separate thread
self.summary_ready = False
indicator_thread = threading.Thread(
target=self.show_progress_indicator,
args=("Assessing the researched information...",)
)
indicator_thread.daemon = True
indicator_thread.start()
# Read the current research content
if not os.path.exists(self.document_path):
self.summary_ready = True
indicator_thread.join()
self.ui.update_output("No research data found to assess.")
self.research_paused = False
return
with open(self.document_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if not content:
self.summary_ready = True
indicator_thread.join()
self.ui.update_output("No research data was collected to assess.")
self.research_paused = False
return
# Prepare the prompt for the AI assessment
assessment_prompt = f"""
Based on the following research content, please assess whether the original query "{self.original_query}" can be answered sufficiently with the collected information.
Research Content:
{content}
Instructions:
1. If the research content provides enough information to answer the original query in detail, respond with: "The research is sufficient to answer the query."
2. If not, respond with: "The research is insufficient and it would be advisable to continue gathering information."
3. Do not provide any additional information or details.
Assessment:
"""
# Generate the assessment
assessment = self.llm.generate(assessment_prompt, max_tokens=200)
# Stop the progress indicator
self.summary_ready = True
indicator_thread.join()
# Display the assessment
self.ui.update_output("\nAssessment Result:")
self.ui.update_output(assessment.strip())
# Provide user with options to continue or quit
self.ui.update_output("\nEnter 'c' to continue the research or 'q' to terminate and generate the summary.")
self.awaiting_user_decision = True # Flag to indicate we are waiting for user's decision
while self.awaiting_user_decision:
cmd = self.ui.get_input("Enter command ('c' to continue, 'q' to quit): ")
if cmd is None:
continue # Ignore invalid inputs
cmd = cmd.strip().lower()
if cmd == 'c':
self.ui.update_output("\nResuming research...")
self.research_paused = False
self.awaiting_user_decision = False
elif cmd == 'q':
self.ui.update_output("\nTerminating research and generating summary...")
self.awaiting_user_decision = False
self.should_terminate.set()
summary = self.terminate_research()
self.ui.update_output("\nFinal Research Summary:")
self.ui.update_output(summary)
break
else:
self.ui.update_output("Invalid command. Please enter 'c' to continue or 'q' to quit.")
except Exception as e:
logger.error(f"Error during pause and assess: {str(e)}")
self.ui.update_output(f"Error during assessment: {str(e)}")
self.research_paused = False
finally:
self.summary_ready = True # Ensure the indicator thread can exit
def get_progress(self) -> str:
"""Get current research progress"""
return f"""
Research Progress:
- Original Query: {self.original_query}
- Sources analyzed: {len(self.searched_urls)}
- Status: {'Active' if self.is_running else 'Stopped'}
- Current focus: {self.current_focus.area if self.current_focus else 'Initializing'}
"""
def is_active(self) -> bool:
"""Check if research is currently active"""
return self.is_running and self.research_thread and self.research_thread.is_alive()
def terminate_research(self) -> str:
"""Terminate research and return to main terminal"""
try:
print("Initiating research termination...")
sys.stdout.flush()
# Start progress indicator in a separate thread immediately
indicator_thread = threading.Thread(target=self.show_progress_indicator)
indicator_thread.daemon = True
indicator_thread.start()
if not os.path.exists(self.document_path):
self.summary_ready = True
indicator_thread.join(timeout=1.0)
self._cleanup()
return "No research data found to summarize."
with open(self.document_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
self.research_content = content # Store for conversation mode
if not content or content == "Research Findings:\n\n":
self.summary_ready = True
indicator_thread.join(timeout=1.0)
self._cleanup()
return "No research data was collected to summarize."
try:
# Generate summary using LLM
summary_prompt = f"""
Analyze the following content to provide a comprehensive research summary and a response to the user's original query "{self.original_query}" ensuring that you conclusively answer the query in detail:
Research Content:
{content}
Important Instructions:
> Summarize the research findings that are relevant to the Original topic/question: "{self.original_query}"
> Ensure that in your summary you directly answer the original question/topic conclusively to the best of your ability in detail.
> Read the original topic/question again "{self.original_query}" and abide by any additional instructions that it contains, exactly as instructed in your summary otherwise provide it normally should it not have any specific instructions
Summary:
"""
summary = self.llm.generate(summary_prompt, max_tokens=4000)
# Signal that summary is complete to stop the progress indicator
self.summary_ready = True
indicator_thread.join(timeout=1.0)
# Store summary and mark research as complete
self.research_summary = summary
self.research_complete = True
# Format summary
formatted_summary = f"""
{'='*80}
RESEARCH SUMMARY
{'='*80}
Original Query: {self.original_query}
Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
{summary}
{'='*80}
End of Summary
{'='*80}
"""
# Write to document
with open(self.document_path, 'a', encoding='utf-8') as f:
f.write("\n\n" + formatted_summary)
# Clean up research UI
if hasattr(self, 'ui') and self.ui:
self.ui.cleanup()
return formatted_summary
except Exception as e:
self.summary_ready = True
indicator_thread.join(timeout=1.0)
raise e
except Exception as e:
error_msg = f"Error generating summary: {str(e)}"
logger.error(error_msg)
return error_msg
finally:
# Clean up research UI
self._cleanup_research_ui()
def show_progress_indicator(self, message="Generating summary, please wait..."):
"""Show a rotating progress indicator until the summary is ready."""
symbols = ['|', '/', '-', '\\']
idx = 0
self.summary_ready = False # Track whether the summary is complete
while not self.summary_ready:
sys.stdout.write(f"\r{message} {symbols[idx]}")
sys.stdout.flush()
idx = (idx + 1) % len(symbols)
time.sleep(0.2) # Adjust the speed of the rotation if needed
sys.stdout.write("\r" + " " * (len(message) + 2) + "\r") # Clear the line when done
def _cleanup_research_ui(self):
"""Clean up just the research UI components"""
if hasattr(self, 'ui') and self.ui:
self.ui.cleanup()
def show_thinking_indicator(self, message: str, stop_flag_name: str):
"""Show a rotating thinking indicator with custom message"""
symbols = ['|', '/', '-', '\\']
idx = 0
while getattr(self, stop_flag_name): # Use dynamic attribute lookup
sys.stdout.write(f"\r{message} {symbols[idx]}")
sys.stdout.flush()
idx = (idx + 1) % len(symbols)
time.sleep(0.2)
sys.stdout.write("\r" + " " * (len(message) + 2) + "\r") # Clear the line when done
def start_conversation_mode(self):
"""Start interactive conversation mode with CTRL+D input handling and thinking indicator"""
self.conversation_active = True
self.thinking = False
# Print header with clear instructions
print("\n" + "="*80)
print(Fore.CYAN + "Research Conversation Mode" + Style.RESET_ALL)
print("="*80)
print(Fore.YELLOW + "\nInstructions:")
print("- Type your question and press CTRL+D to submit")
print("- Type 'quit' and press CTRL+D to exit")
print("- Your messages appear in green")
print("- AI responses appear in cyan" + Style.RESET_ALL + "\n")
while self.conversation_active:
try:
# Show prompt with user input in green
print(Fore.GREEN + "Your question (Press CTRL+D to submit):" + Style.RESET_ALL)
user_input = self.get_multiline_conversation_input()
# Handle exit commands
if not user_input or user_input.lower() in ['quit', 'exit', 'q']:
print(Fore.YELLOW + "\nExiting conversation mode..." + Style.RESET_ALL)
self.conversation_active = False
break
# Skip empty input
if not user_input.strip():
continue
# Echo the submitted question for clarity
print(Fore.GREEN + "Submitted question:" + Style.RESET_ALL)
print(Fore.GREEN + user_input + Style.RESET_ALL + "\n")
# Start thinking indicator in a separate thread
self.thinking = True # Set flag before starting thread
thinking_thread = threading.Thread(
target=self.show_thinking_indicator,
args=("Thinking...", "thinking")
)
thinking_thread.daemon = True
thinking_thread.start()
try:
# Generate response
response = self._generate_conversation_response(user_input)
# Stop thinking indicator
self.thinking = False
thinking_thread.join()
# Display response in cyan
print(Fore.CYAN + "AI Response:" + Style.RESET_ALL)
print(f"{Fore.CYAN}{response}{Style.RESET_ALL}\n")
print("-" * 80 + "\n") # Separator between QA pairs
except Exception as e:
self.thinking = False # Ensure thinking indicator stops
thinking_thread.join()
raise e
except KeyboardInterrupt:
self.thinking = False # Ensure thinking indicator stops
print(Fore.YELLOW + "\nOperation cancelled. Submit 'quit' to exit." + Style.RESET_ALL)
except Exception as e:
logger.error(f"Error in conversation mode: {str(e)}")
print(Fore.RED + f"Error processing question: {str(e)}" + Style.RESET_ALL)
def _generate_conversation_response(self, user_query: str) -> str:
"""Generate contextual responses with improved context handling"""
try:
# Add debug logging to verify content
logger.info(f"Research summary length: {len(self.research_summary) if self.research_summary else 0}")
logger.info(f"Research content length: {len(self.research_content) if self.research_content else 0}")
# First verify we have content
if not self.research_content and not self.research_summary:
# Try to reload from file if available
try:
if os.path.exists(self.document_path):
with open(self.document_path, 'r', encoding='utf-8') as f:
self.research_content = f.read().strip()
except Exception as e:
logger.error(f"Failed to reload research content: {str(e)}")
# Prepare context, ensuring we have content
context = f"""
Research Content:
{self.research_content}
Research Summary:
{self.research_summary if self.research_summary else 'No summary available'}
"""
prompt = f"""
Based on the following research content and summary, please answer this question:
{context}
Question: {user_query}
you have 2 sets of instructions the applied set and the unapplied set, the applied set should be followed if the question is directly relating to the research content whereas anything else other then direct questions about the content of the research will result in you instead following the unapplied ruleset
Applied:
Instructions:
1. Answer based ONLY on the research content provided above if asked a question about your research or that content.
2. If the information requested isn't in the research, clearly state that it isn't in the content you gathered.
3. Be direct and specific in your response, DO NOT directly cite research unless specifically asked to, be concise and give direct answers to questions based on the research, unless instructed otherwise.
Unapplied:
Instructions:
1. Do not make up anything that isn't actually true.
2. Respond directly to the user's question in an honest and thoughtful manner.
3. disregard rules in the applied set for queries not DIRECTLY related to the research, including queries about the research process or what you remember about the research should result in the unapplied ruleset being used.
Answer:
"""
response = self.llm.generate(
prompt,
max_tokens=1000, # Increased for more detailed responses
temperature=0.7
)
if not response or not response.strip():
return "I apologize, but I cannot find relevant information in the research content to answer your question."
return response.strip()
except Exception as e:
logger.error(f"Error generating response: {str(e)}")
return f"I apologize, but I encountered an error processing your question: {str(e)}"
def get_multiline_conversation_input(self) -> str:
"""Get multiline input with CTRL+D handling for conversation mode"""
buffer = []
# Save original terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# Set terminal to raw mode
tty.setraw(fd)
current_line = []
while True:
char = sys.stdin.read(1)
# CTRL+D detection
if not char or ord(char) == 4: # EOF or CTRL+D
sys.stdout.write('\n')
if current_line:
buffer.append(''.join(current_line))
return ' '.join(buffer).strip()
# Handle special characters
elif ord(char) == 13: # Enter
sys.stdout.write('\n')
buffer.append(''.join(current_line))
current_line = []
elif ord(char) == 127: # Backspace
if current_line:
current_line.pop()
sys.stdout.write('\b \b')
elif ord(char) == 3: # CTRL+C
sys.stdout.write('\n')
return 'quit'
# Normal character
elif 32 <= ord(char) <= 126: # Printable characters
current_line.append(char)
sys.stdout.write(char)
sys.stdout.flush()
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
print() # New line for clean display
if __name__ == "__main__":
from llm_wrapper import LLMWrapper
from llm_response_parser import UltimateLLMResponseParser
from Self_Improving_Search import EnhancedSelfImprovingSearch
try:
print(f"{Fore.CYAN}Initializing Research System...{Style.RESET_ALL}")
llm = LLMWrapper()
parser = UltimateLLMResponseParser()
search_engine = EnhancedSelfImprovingSearch(llm, parser)
manager = ResearchManager(llm, parser, search_engine)
print(f"{Fore.GREEN}System initialized. Enter your research topic or 'quit' to exit.{Style.RESET_ALL}")
while True:
try:
topic = ResearchManager.get_initial_input()
if topic.lower() == 'quit':
break
if not topic:
continue
if not topic.startswith('@'):
print(f"{Fore.YELLOW}Please start your research query with '@'{Style.RESET_ALL}")
continue
topic = topic[1:] # Remove @ prefix
manager.start_research(topic)
summary = manager.terminate_research()
print(f"\n{Fore.GREEN}Research Summary:{Style.RESET_ALL}")
print(summary)
print(f"\n{Fore.GREEN}Research completed. Ready for next topic.{Style.RESET_ALL}\n")
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Operation cancelled. Ready for next topic.{Style.RESET_ALL}")
if 'manager' in locals():
manager.terminate_research()
continue
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Research system shutting down.{Style.RESET_ALL}")
if 'manager' in locals():
manager.terminate_research()
except Exception as e:
print(f"{Fore.RED}Critical error: {str(e)}{Style.RESET_ALL}")
logger.error("Critical error in main loop", exc_info=True)
if os.name == 'nt':
print(f"{Fore.YELLOW}Running on Windows - Some features may be limited{Style.RESET_ALL}")