mirror of
https://github.com/TheBlewish/Automated-AI-Web-Researcher-Ollama.git
synced 2025-01-18 16:37:47 +00:00
1482 lines
58 KiB
Python
1482 lines
58 KiB
Python
|
import os
|
||
|
import sys
|
||
|
import threading
|
||
|
import time
|
||
|
import re
|
||
|
import json
|
||
|
import logging
|
||
|
import curses
|
||
|
import signal
|
||
|
from typing import List, Dict, Set, Optional, Tuple, Union
|
||
|
from dataclasses import dataclass
|
||
|
from queue import Queue
|
||
|
from datetime import datetime
|
||
|
from io import StringIO
|
||
|
from colorama import init, Fore, Style
|
||
|
import select
|
||
|
import termios
|
||
|
import tty
|
||
|
from threading import Event
|
||
|
from urllib.parse import urlparse
|
||
|
from pathlib import Path
|
||
|
|
||
|
# Initialize colorama for cross-platform color support
|
||
|
if os.name == 'nt': # Windows-specific initialization
|
||
|
init(convert=True, strip=False, wrap=True)
|
||
|
else:
|
||
|
init()
|
||
|
|
||
|
# Set up logging
|
||
|
log_directory = 'logs'
|
||
|
if not os.path.exists(log_directory):
|
||
|
os.makedirs(log_directory)
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
logger.setLevel(logging.INFO)
|
||
|
log_file = os.path.join(log_directory, 'research_llm.log')
|
||
|
file_handler = logging.FileHandler(log_file)
|
||
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||
|
file_handler.setFormatter(formatter)
|
||
|
logger.handlers = []
|
||
|
logger.addHandler(file_handler)
|
||
|
logger.propagate = False
|
||
|
|
||
|
# Suppress other loggers
|
||
|
for name in logging.root.manager.loggerDict:
|
||
|
if name != __name__:
|
||
|
logging.getLogger(name).disabled = True
|
||
|
|
||
|
@dataclass
|
||
|
class ResearchFocus:
|
||
|
"""Represents a specific area of research focus"""
|
||
|
area: str
|
||
|
priority: int
|
||
|
source_query: str = ""
|
||
|
timestamp: str = ""
|
||
|
search_queries: List[str] = None
|
||
|
|
||
|
def __post_init__(self):
|
||
|
if not self.timestamp:
|
||
|
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
if self.search_queries is None:
|
||
|
self.search_queries = []
|
||
|
|
||
|
@dataclass
|
||
|
class AnalysisResult:
|
||
|
"""Contains the complete analysis result"""
|
||
|
original_question: str
|
||
|
focus_areas: List[ResearchFocus]
|
||
|
raw_response: str
|
||
|
timestamp: str = ""
|
||
|
|
||
|
def __post_init__(self):
|
||
|
if not self.timestamp:
|
||
|
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
|
||
|
class StrategicAnalysisParser:
|
||
|
def __init__(self, llm=None):
|
||
|
self.llm = llm
|
||
|
self.logger = logging.getLogger(__name__)
|
||
|
# Simplify patterns to match exactly what we expect
|
||
|
self.patterns = {
|
||
|
'priority': [
|
||
|
r"Priority:\s*(\d+)", # Match exactly what's in our prompt
|
||
|
]
|
||
|
}
|
||
|
|
||
|
def strategic_analysis(self, original_query: str) -> Optional[AnalysisResult]:
|
||
|
"""Generate and process research areas with retries until success"""
|
||
|
max_retries = 3
|
||
|
try:
|
||
|
self.logger.info("Starting strategic analysis...")
|
||
|
prompt = f"""
|
||
|
You must select exactly 5 areas to investigate in order to explore and gather information to answer the research question:
|
||
|
"{original_query}"
|
||
|
|
||
|
You MUST provide exactly 5 areas numbered 1-5. Each must have a priority, YOU MUST ensure that you only assign one priority per area.
|
||
|
Assign priority based on the likelihood of a focus area being investigated to provide information that directly will allow you to respond to "{original_query}" with 5 being most likely and 1 being least.
|
||
|
Follow this EXACT format without any deviations or additional text:
|
||
|
|
||
|
1. [First research topic]
|
||
|
Priority: [number 1-5]
|
||
|
|
||
|
2. [Second research topic]
|
||
|
Priority: [number 1-5]
|
||
|
|
||
|
3. [Third research topic]
|
||
|
Priority: [number 1-5]
|
||
|
|
||
|
4. [Fourth research topic]
|
||
|
Priority: [number 1-5]
|
||
|
|
||
|
5. [Fifth research topic]
|
||
|
Priority: [number 1-5]
|
||
|
"""
|
||
|
for attempt in range(max_retries):
|
||
|
response = self.llm.generate(prompt, max_tokens=1000)
|
||
|
focus_areas = self._extract_research_areas(response)
|
||
|
|
||
|
if focus_areas: # If we got any valid areas
|
||
|
# Sort by priority (highest first)
|
||
|
focus_areas.sort(key=lambda x: x.priority, reverse=True)
|
||
|
|
||
|
return AnalysisResult(
|
||
|
original_question=original_query,
|
||
|
focus_areas=focus_areas,
|
||
|
raw_response=response,
|
||
|
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
)
|
||
|
else:
|
||
|
self.logger.warning(f"Attempt {attempt + 1}: No valid areas generated, retrying...")
|
||
|
print(f"\nRetrying research area generation (Attempt {attempt + 1}/{max_retries})...")
|
||
|
|
||
|
# If all retries failed, try one final time with a stronger prompt
|
||
|
prompt += "\n\nIMPORTANT: You MUST provide exactly 5 research areas with priorities. This is crucial."
|
||
|
response = self.llm.generate(prompt, max_tokens=1000)
|
||
|
focus_areas = self._extract_research_areas(response)
|
||
|
|
||
|
if focus_areas:
|
||
|
focus_areas.sort(key=lambda x: x.priority, reverse=True)
|
||
|
return AnalysisResult(
|
||
|
original_question=original_query,
|
||
|
focus_areas=focus_areas,
|
||
|
raw_response=response,
|
||
|
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
)
|
||
|
|
||
|
self.logger.error("Failed to generate any valid research areas after all attempts")
|
||
|
return None
|
||
|
|
||
|
except Exception as e:
|
||
|
self.logger.error(f"Error in strategic analysis: {str(e)}")
|
||
|
return None
|
||
|
|
||
|
def _extract_research_areas(self, text: str) -> List[ResearchFocus]:
|
||
|
"""Extract research areas with enhanced parsing to handle priorities in various formats."""
|
||
|
areas = []
|
||
|
lines = text.strip().split('\n')
|
||
|
|
||
|
current_area = None
|
||
|
current_priority = None
|
||
|
|
||
|
for i in range(len(lines)):
|
||
|
line = lines[i].strip()
|
||
|
if not line:
|
||
|
continue
|
||
|
|
||
|
# Check for numbered items (e.g., '1. Area Name')
|
||
|
number_match = re.match(r'^(\d+)\.\s*(.*)', line)
|
||
|
if number_match:
|
||
|
# If we have a previous area, add it to our list
|
||
|
if current_area is not None:
|
||
|
areas.append(ResearchFocus(
|
||
|
area=current_area.strip(' -:'),
|
||
|
priority=current_priority or 3,
|
||
|
))
|
||
|
# Start a new area
|
||
|
area_line = number_match.group(2)
|
||
|
|
||
|
# Search for 'priority' followed by a number, anywhere in the area_line
|
||
|
priority_inline_match = re.search(
|
||
|
r'(?i)\bpriority\b\s*(?:[:=]?\s*)?(\d+)', area_line)
|
||
|
if priority_inline_match:
|
||
|
# Extract and set the priority
|
||
|
try:
|
||
|
current_priority = int(priority_inline_match.group(1))
|
||
|
current_priority = max(1, min(5, current_priority))
|
||
|
except ValueError:
|
||
|
current_priority = 3 # Default priority if parsing fails
|
||
|
# Remove the 'priority' portion from area_line
|
||
|
area_line = area_line[:priority_inline_match.start()] + area_line[priority_inline_match.end():]
|
||
|
area_line = area_line.strip(' -:')
|
||
|
else:
|
||
|
current_priority = None # Priority might be on the next line
|
||
|
|
||
|
current_area = area_line.strip()
|
||
|
|
||
|
elif re.match(r'(?i)^priority\s*(?:[:=]?\s*)?(\d+)', line):
|
||
|
# Extract priority from the line following the area
|
||
|
try:
|
||
|
priority_match = re.match(r'(?i)^priority\s*(?:[:=]?\s*)?(\d+)', line)
|
||
|
current_priority = int(priority_match.group(1))
|
||
|
current_priority = max(1, min(5, current_priority))
|
||
|
except (ValueError, IndexError):
|
||
|
current_priority = 3 # Default priority if parsing fails
|
||
|
|
||
|
# Check if this is the last line or the next line is a new area
|
||
|
next_line_is_new_area = (i + 1 < len(lines)) and re.match(r'^\d+\.', lines[i + 1].strip())
|
||
|
if next_line_is_new_area or i + 1 == len(lines):
|
||
|
if current_area is not None:
|
||
|
# Append the current area and priority to the list
|
||
|
areas.append(ResearchFocus(
|
||
|
area=current_area.strip(' -:'),
|
||
|
priority=current_priority or 3,
|
||
|
))
|
||
|
current_area = None
|
||
|
current_priority = None
|
||
|
|
||
|
return areas
|
||
|
|
||
|
def _clean_text(self, text: str) -> str:
|
||
|
"""Clean and normalize text"""
|
||
|
text = re.sub(r'\s+', ' ', text)
|
||
|
text = re.sub(r'(\d+\))', r'\1.', text)
|
||
|
text = re.sub(r'(?i)priority:', 'P:', text)
|
||
|
return text.strip()
|
||
|
|
||
|
def _add_area(self, areas: List[ResearchFocus], area: str, priority: Optional[int]):
|
||
|
"""Add area with basic validation"""
|
||
|
if not area or len(area.split()) < 3: # Basic validation
|
||
|
return
|
||
|
|
||
|
areas.append(ResearchFocus(
|
||
|
area=area,
|
||
|
priority=priority or 3,
|
||
|
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
|
search_queries=[]
|
||
|
))
|
||
|
|
||
|
def _normalize_focus_areas(self, areas: List[ResearchFocus]) -> List[ResearchFocus]:
|
||
|
"""Normalize and prepare final list of areas"""
|
||
|
if not areas:
|
||
|
return []
|
||
|
|
||
|
# Sort by priority
|
||
|
areas.sort(key=lambda x: x.priority, reverse=True)
|
||
|
|
||
|
# Ensure priorities are properly spread
|
||
|
for i, area in enumerate(areas):
|
||
|
area.priority = max(1, min(5, area.priority))
|
||
|
|
||
|
return areas[:5]
|
||
|
|
||
|
def format_analysis_result(self, result: AnalysisResult) -> str:
|
||
|
"""Format the results for display"""
|
||
|
if not result:
|
||
|
return "No valid analysis result generated."
|
||
|
|
||
|
formatted = [
|
||
|
f"\nResearch Areas for: {result.original_question}\n"
|
||
|
]
|
||
|
|
||
|
for i, focus in enumerate(result.focus_areas, 1):
|
||
|
formatted.extend([
|
||
|
f"\n{i}. {focus.area}",
|
||
|
f" Priority: {focus.priority}"
|
||
|
])
|
||
|
|
||
|
return "\n".join(formatted)
|
||
|
|
||
|
class OutputRedirector:
|
||
|
"""Redirects stdout and stderr to a string buffer"""
|
||
|
def __init__(self, stream=None):
|
||
|
self.stream = stream or StringIO()
|
||
|
self.original_stdout = sys.stdout
|
||
|
self.original_stderr = sys.stderr
|
||
|
|
||
|
def __enter__(self):
|
||
|
sys.stdout = self.stream
|
||
|
sys.stderr = self.stream
|
||
|
return self.stream
|
||
|
|
||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
|
sys.stdout = self.original_stdout
|
||
|
sys.stderr = self.original_stderr
|
||
|
|
||
|
class TerminalUI:
|
||
|
"""Manages terminal display with fixed input area at bottom"""
|
||
|
def __init__(self):
|
||
|
self.stdscr = None
|
||
|
self.input_win = None
|
||
|
self.output_win = None
|
||
|
self.status_win = None
|
||
|
self.max_y = 0
|
||
|
self.max_x = 0
|
||
|
self.input_buffer = ""
|
||
|
self.is_setup = False
|
||
|
self.old_terminal_settings = None
|
||
|
self.should_terminate = Event()
|
||
|
self.shutdown_event = Event()
|
||
|
self.research_thread = None
|
||
|
self.last_display_height = 0 # Track display height for corruption fix
|
||
|
|
||
|
|
||
|
def setup(self):
|
||
|
"""Initialize the terminal UI"""
|
||
|
if self.is_setup:
|
||
|
return
|
||
|
|
||
|
# Save terminal settings
|
||
|
if not os.name == 'nt': # Unix-like systems
|
||
|
self.old_terminal_settings = termios.tcgetattr(sys.stdin.fileno())
|
||
|
|
||
|
self.stdscr = curses.initscr()
|
||
|
curses.start_color()
|
||
|
curses.noecho()
|
||
|
curses.cbreak()
|
||
|
self.stdscr.keypad(True)
|
||
|
|
||
|
# Enable only scroll wheel events, not all mouse events
|
||
|
# curses.mousemask(curses.BUTTON4_PRESSED | curses.BUTTON5_PRESSED)
|
||
|
|
||
|
# Remove this line that was causing the spam
|
||
|
# print('\033[?1003h') # We don't want mouse movement events
|
||
|
|
||
|
# Get terminal dimensions
|
||
|
self.max_y, self.max_x = self.stdscr.getmaxyx()
|
||
|
|
||
|
# Create windows
|
||
|
self.output_win = curses.newwin(self.max_y - 4, self.max_x, 0, 0)
|
||
|
self.status_win = curses.newwin(1, self.max_x, self.max_y - 4, 0)
|
||
|
self.input_win = curses.newwin(3, self.max_x, self.max_y - 3, 0)
|
||
|
|
||
|
# Setup colors
|
||
|
curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK)
|
||
|
curses.init_pair(2, curses.COLOR_CYAN, curses.COLOR_BLACK)
|
||
|
curses.init_pair(3, curses.COLOR_YELLOW, curses.COLOR_BLACK)
|
||
|
|
||
|
# Enable scrolling
|
||
|
self.output_win.scrollok(True)
|
||
|
self.output_win.idlok(True)
|
||
|
self.input_win.scrollok(True)
|
||
|
|
||
|
self.is_setup = True
|
||
|
self._refresh_input_prompt()
|
||
|
|
||
|
def cleanup(self):
|
||
|
"""Public cleanup method with enhanced terminal restoration"""
|
||
|
if not self.is_setup:
|
||
|
return
|
||
|
try:
|
||
|
# Ensure all windows are properly closed
|
||
|
for win in [self.input_win, self.output_win, self.status_win]:
|
||
|
if win:
|
||
|
win.clear()
|
||
|
win.refresh()
|
||
|
|
||
|
# Restore terminal state
|
||
|
if self.stdscr:
|
||
|
self.stdscr.keypad(False)
|
||
|
curses.nocbreak()
|
||
|
curses.echo()
|
||
|
curses.endwin()
|
||
|
|
||
|
# Restore original terminal settings
|
||
|
if self.old_terminal_settings and not os.name == 'nt':
|
||
|
termios.tcsetattr(
|
||
|
sys.stdin.fileno(),
|
||
|
termios.TCSADRAIN,
|
||
|
self.old_terminal_settings
|
||
|
)
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error during terminal cleanup: {str(e)}")
|
||
|
finally:
|
||
|
self.is_setup = False
|
||
|
self.stdscr = None
|
||
|
self.input_win = None
|
||
|
self.output_win = None
|
||
|
self.status_win = None
|
||
|
|
||
|
def _cleanup(self):
|
||
|
"""Enhanced resource cleanup with better process handling"""
|
||
|
self.should_terminate.set()
|
||
|
|
||
|
# Handle research thread with improved termination
|
||
|
if self.research_thread and self.research_thread.is_alive():
|
||
|
try:
|
||
|
self.research_thread.join(timeout=1.0)
|
||
|
if self.research_thread.is_alive():
|
||
|
import ctypes
|
||
|
ctypes.pythonapi.PyThreadState_SetAsyncExc(
|
||
|
ctypes.c_long(self.research_thread.ident),
|
||
|
ctypes.py_object(SystemExit))
|
||
|
time.sleep(0.1) # Give thread time to exit
|
||
|
if self.research_thread.is_alive(): # Double-check
|
||
|
ctypes.pythonapi.PyThreadState_SetAsyncExc(
|
||
|
ctypes.c_long(self.research_thread.ident),
|
||
|
0) # Reset exception
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error terminating research thread: {str(e)}")
|
||
|
|
||
|
# Clean up LLM with improved error handling
|
||
|
if hasattr(self, 'llm') and hasattr(self.llm, '_cleanup'):
|
||
|
try:
|
||
|
self.llm.cleanup()
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error cleaning up LLM: {str(e)}")
|
||
|
|
||
|
# Ensure terminal is restored
|
||
|
try:
|
||
|
curses.endwin()
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
# Final cleanup of UI
|
||
|
self.cleanup()
|
||
|
|
||
|
def _refresh_input_prompt(self, prompt="Enter command: "):
|
||
|
"""Refresh the fixed input prompt at bottom with display fix"""
|
||
|
if not self.is_setup:
|
||
|
return
|
||
|
|
||
|
try:
|
||
|
# Clear the entire input window first
|
||
|
self.input_win.clear()
|
||
|
|
||
|
# Calculate proper cursor position
|
||
|
cursor_y = 0
|
||
|
cursor_x = len(prompt) + len(self.input_buffer)
|
||
|
|
||
|
# Add the prompt and buffer
|
||
|
self.input_win.addstr(0, 0, f"{prompt}{self.input_buffer}", curses.color_pair(1))
|
||
|
|
||
|
# Position cursor correctly
|
||
|
try:
|
||
|
self.input_win.move(cursor_y, cursor_x)
|
||
|
except curses.error:
|
||
|
pass # Ignore if cursor would be off-screen
|
||
|
|
||
|
self.input_win.refresh()
|
||
|
except curses.error:
|
||
|
pass
|
||
|
|
||
|
def update_output(self, text: str):
|
||
|
"""Update output window with display corruption fix"""
|
||
|
if not self.is_setup:
|
||
|
return
|
||
|
|
||
|
try:
|
||
|
# Clean ANSI escape codes
|
||
|
clean_text = re.sub(r'\x1b\[[0-9;]*[mK]', '', text)
|
||
|
|
||
|
# Store current position
|
||
|
current_y, _ = self.output_win.getyx()
|
||
|
|
||
|
# Clear any potential corruption
|
||
|
if current_y > self.last_display_height:
|
||
|
self.output_win.clear()
|
||
|
|
||
|
self.output_win.addstr(clean_text + "\n", curses.color_pair(2))
|
||
|
new_y, _ = self.output_win.getyx()
|
||
|
self.last_display_height = new_y
|
||
|
|
||
|
self.output_win.refresh()
|
||
|
self._refresh_input_prompt()
|
||
|
except curses.error:
|
||
|
pass
|
||
|
|
||
|
def update_status(self, text: str):
|
||
|
"""Update the status line above input area"""
|
||
|
if not self.is_setup:
|
||
|
return
|
||
|
|
||
|
try:
|
||
|
self.status_win.clear()
|
||
|
self.status_win.addstr(0, 0, text, curses.color_pair(3))
|
||
|
self.status_win.refresh()
|
||
|
self._refresh_input_prompt() # Ensure prompt is refreshed after status update
|
||
|
except curses.error:
|
||
|
pass
|
||
|
|
||
|
def get_input(self, prompt: Optional[str] = None) -> Optional[str]:
|
||
|
"""Enhanced input handling with mouse scroll support"""
|
||
|
try:
|
||
|
if prompt:
|
||
|
self.update_status(prompt)
|
||
|
if not self.is_setup:
|
||
|
self.setup()
|
||
|
self.input_buffer = ""
|
||
|
self._refresh_input_prompt()
|
||
|
|
||
|
while True:
|
||
|
if self.should_terminate.is_set():
|
||
|
return None
|
||
|
|
||
|
try:
|
||
|
ch = self.input_win.getch()
|
||
|
|
||
|
if ch == curses.KEY_MOUSE:
|
||
|
try:
|
||
|
mouse_event = curses.getmouse()
|
||
|
# Ignore mouse events entirely for now
|
||
|
continue
|
||
|
except curses.error:
|
||
|
continue
|
||
|
|
||
|
if ch == 4: # Ctrl+D
|
||
|
result = self.input_buffer.strip()
|
||
|
self.input_buffer = ""
|
||
|
if not result:
|
||
|
self.cleanup()
|
||
|
return "@quit"
|
||
|
return result
|
||
|
|
||
|
elif ch == 3: # Ctrl+C
|
||
|
self.should_terminate.set()
|
||
|
self.cleanup()
|
||
|
return "@quit"
|
||
|
|
||
|
elif ch == ord('\n'): # Enter
|
||
|
result = self.input_buffer.strip()
|
||
|
if result:
|
||
|
self.input_buffer = ""
|
||
|
return result
|
||
|
continue
|
||
|
|
||
|
elif ch == curses.KEY_BACKSPACE or ch == 127: # Backspace
|
||
|
if self.input_buffer:
|
||
|
self.input_buffer = self.input_buffer[:-1]
|
||
|
self._refresh_input_prompt()
|
||
|
|
||
|
elif 32 <= ch <= 126: # Printable characters
|
||
|
self.input_buffer += chr(ch)
|
||
|
self._refresh_input_prompt()
|
||
|
|
||
|
except KeyboardInterrupt:
|
||
|
self.should_terminate.set()
|
||
|
self.cleanup()
|
||
|
return "@quit"
|
||
|
except curses.error:
|
||
|
self._refresh_input_prompt()
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error in get_input: {str(e)}")
|
||
|
self.should_terminate.set()
|
||
|
self.cleanup()
|
||
|
return "@quit"
|
||
|
|
||
|
def force_exit(self):
|
||
|
"""Force immediate exit with enhanced cleanup"""
|
||
|
try:
|
||
|
self.should_terminate.set()
|
||
|
self.shutdown_event.set()
|
||
|
self._cleanup() # Call private cleanup first
|
||
|
self.cleanup() # Then public cleanup
|
||
|
curses.endwin() # Final attempt to restore terminal
|
||
|
except:
|
||
|
pass
|
||
|
finally:
|
||
|
os._exit(0) # Ensure exit
|
||
|
|
||
|
class NonBlockingInput:
|
||
|
"""Handles non-blocking keyboard input for Unix-like systems"""
|
||
|
def __init__(self):
|
||
|
self.old_settings = None
|
||
|
|
||
|
def __enter__(self):
|
||
|
if os.name == 'nt': # Windows
|
||
|
return self
|
||
|
self.old_settings = termios.tcgetattr(sys.stdin)
|
||
|
tty.setcbreak(sys.stdin.fileno())
|
||
|
return self
|
||
|
|
||
|
def __exit__(self, type, value, traceback):
|
||
|
if os.name != 'nt': # Unix-like
|
||
|
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings)
|
||
|
|
||
|
def check_input(self, timeout=0.1):
|
||
|
"""Check for input without blocking, cross-platform"""
|
||
|
if os.name == 'nt': # Windows
|
||
|
import msvcrt
|
||
|
if msvcrt.kbhit():
|
||
|
return msvcrt.getch().decode('utf-8')
|
||
|
return None
|
||
|
else: # Unix-like
|
||
|
ready_to_read, _, _ = select.select([sys.stdin], [], [], timeout)
|
||
|
if ready_to_read:
|
||
|
return sys.stdin.read(1)
|
||
|
return None
|
||
|
|
||
|
class ResearchManager:
|
||
|
"""Manages the research process including analysis, search, and documentation"""
|
||
|
def __init__(self, llm_wrapper, parser, search_engine, max_searches_per_cycle: int = 5):
|
||
|
self.llm = llm_wrapper
|
||
|
self.parser = parser
|
||
|
self.search_engine = search_engine
|
||
|
self.max_searches = max_searches_per_cycle
|
||
|
self.should_terminate = threading.Event()
|
||
|
self.shutdown_event = Event()
|
||
|
self.research_started = threading.Event()
|
||
|
self.research_thread = None
|
||
|
self.thinking = False
|
||
|
self.stop_words = {
|
||
|
'the', 'be', 'to', 'of', 'and', 'a', 'in', 'that', 'have', 'i',
|
||
|
'it', 'for', 'not', 'on', 'with', 'he', 'as', 'you', 'do', 'at'
|
||
|
}
|
||
|
|
||
|
# State tracking
|
||
|
self.searched_urls: Set[str] = set()
|
||
|
self.current_focus: Optional[ResearchFocus] = None
|
||
|
self.original_query: str = ""
|
||
|
self.focus_areas: List[ResearchFocus] = []
|
||
|
self.is_running = False
|
||
|
|
||
|
# New conversation mode attributes
|
||
|
self.research_complete = False
|
||
|
self.research_summary = ""
|
||
|
self.conversation_active = False
|
||
|
self.research_content = ""
|
||
|
|
||
|
# Initialize document paths
|
||
|
self.document_path = None
|
||
|
self.session_files = []
|
||
|
|
||
|
# Initialize UI and parser
|
||
|
self.ui = TerminalUI()
|
||
|
self.strategic_parser = StrategicAnalysisParser(llm=self.llm)
|
||
|
|
||
|
# Initialize new flags for pausing and assessment
|
||
|
self.research_paused = False
|
||
|
self.awaiting_user_decision = False
|
||
|
|
||
|
# Setup signal handlers
|
||
|
signal.signal(signal.SIGINT, self._signal_handler)
|
||
|
signal.signal(signal.SIGTERM, self._signal_handler)
|
||
|
|
||
|
def _signal_handler(self, signum, frame):
|
||
|
"""Handle interrupt signals"""
|
||
|
self.shutdown_event.set()
|
||
|
self.should_terminate.set()
|
||
|
self._cleanup()
|
||
|
|
||
|
def print_thinking(self):
|
||
|
"""Display thinking indicator to user"""
|
||
|
self.ui.update_output("🧠 Thinking...")
|
||
|
|
||
|
@staticmethod
|
||
|
def get_initial_input() -> str:
|
||
|
"""Get the initial research query from user"""
|
||
|
print(f"{Fore.GREEN}📝 Enter your message (Press CTRL+D to submit):{Style.RESET_ALL}")
|
||
|
lines = []
|
||
|
try:
|
||
|
while True:
|
||
|
line = input()
|
||
|
if line: # Only add non-empty lines
|
||
|
lines.append(line)
|
||
|
if not line: # Empty line (just Enter pressed)
|
||
|
break
|
||
|
except EOFError: # Ctrl+D pressed
|
||
|
pass
|
||
|
except KeyboardInterrupt: # Ctrl+C pressed
|
||
|
print("\nOperation cancelled")
|
||
|
sys.exit(0)
|
||
|
|
||
|
return " ".join(lines).strip()
|
||
|
|
||
|
def formulate_search_queries(self, focus_area: ResearchFocus) -> List[str]:
|
||
|
"""Generate search queries for a focus area"""
|
||
|
try:
|
||
|
self.print_thinking()
|
||
|
|
||
|
prompt = f"""
|
||
|
In order to research this query/topic:
|
||
|
|
||
|
Context: {self.original_query}
|
||
|
|
||
|
Base a search query to investigate the following research focus, which is related to the original query/topic:
|
||
|
|
||
|
Area: {focus_area.area}
|
||
|
|
||
|
Create a search query that will yield specific, search results thare are directly relevant to your focus area.
|
||
|
Format your response EXACTLY like this:
|
||
|
|
||
|
Search query: [Your 2-5 word query]
|
||
|
Time range: [d/w/m/y/none]
|
||
|
|
||
|
Do not provide any additional information or explanation, note that the time range allows you to see results within a time range (d is within the last day, w is within the last week, m is within the last month, y is within the last year, and none is results from anytime, only select one, using only the corresponding letter for whichever of these options you select as indicated in the response format) use your judgement as many searches will not require a time range and some may depending on what the research focus is.
|
||
|
"""
|
||
|
response_text = self.llm.generate(prompt, max_tokens=50, stop=None)
|
||
|
query, time_range = self.parse_query_response(response_text)
|
||
|
|
||
|
if not query:
|
||
|
self.ui.update_output(f"{Fore.RED}Error: Empty search query. Using focus area as query...{Style.RESET_ALL}")
|
||
|
return [focus_area.area]
|
||
|
|
||
|
self.ui.update_output(f"{Fore.YELLOW}Original focus: {focus_area.area}{Style.RESET_ALL}")
|
||
|
self.ui.update_output(f"{Fore.YELLOW}Formulated query: {query}{Style.RESET_ALL}")
|
||
|
self.ui.update_output(f"{Fore.YELLOW}Time range: {time_range}{Style.RESET_ALL}")
|
||
|
|
||
|
return [query]
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error formulating query: {str(e)}")
|
||
|
return [focus_area.area]
|
||
|
|
||
|
def parse_search_query(self, query_response: str) -> Dict[str, str]:
|
||
|
"""Parse search query formulation response with improved time range detection"""
|
||
|
try:
|
||
|
lines = query_response.strip().split('\n')
|
||
|
result = {
|
||
|
'query': '',
|
||
|
'time_range': 'none'
|
||
|
}
|
||
|
|
||
|
# First try to find standard format
|
||
|
for line in lines:
|
||
|
if ':' in line:
|
||
|
key, value = line.split(':', 1)
|
||
|
key = key.strip().lower()
|
||
|
value = value.strip()
|
||
|
|
||
|
if 'query' in key:
|
||
|
result['query'] = self._clean_query(value)
|
||
|
elif ('time' in key or 'range' in key) and value.strip().lower() in ['d', 'w', 'm', 'y', 'none']:
|
||
|
result['time_range'] = value.strip().lower()
|
||
|
|
||
|
# If no time range found, look for individual characters
|
||
|
if result['time_range'] == 'none':
|
||
|
# Get all text except the query itself
|
||
|
full_text = query_response.lower()
|
||
|
if result['query']:
|
||
|
full_text = full_text.replace(result['query'].lower(), '')
|
||
|
|
||
|
# Look for isolated d, w, m, or y characters
|
||
|
time_chars = set()
|
||
|
for char in ['d', 'w', 'm', 'y']:
|
||
|
# Check if char exists by itself (not part of another word)
|
||
|
matches = re.finditer(r'\b' + char + r'\b', full_text)
|
||
|
for match in matches:
|
||
|
# Verify it's not part of a word
|
||
|
start, end = match.span()
|
||
|
if (start == 0 or not full_text[start-1].isalpha()) and \
|
||
|
(end == len(full_text) or not full_text[end].isalpha()):
|
||
|
time_chars.add(char)
|
||
|
|
||
|
# If exactly one time char found, use it
|
||
|
if len(time_chars) == 1:
|
||
|
result['time_range'] = time_chars.pop()
|
||
|
|
||
|
return result
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error parsing search query: {str(e)}")
|
||
|
return {'query': '', 'time_range': 'none'}
|
||
|
|
||
|
def _cleanup(self):
|
||
|
"""Enhanced cleanup to handle conversation mode"""
|
||
|
self.conversation_active = False
|
||
|
self.should_terminate.set()
|
||
|
|
||
|
if self.research_thread and self.research_thread.is_alive():
|
||
|
try:
|
||
|
self.research_thread.join(timeout=1.0)
|
||
|
if self.research_thread.is_alive():
|
||
|
import ctypes
|
||
|
ctypes.pythonapi.PyThreadState_SetAsyncExc(
|
||
|
ctypes.c_long(self.research_thread.ident),
|
||
|
ctypes.py_object(SystemExit)
|
||
|
)
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error terminating research thread: {str(e)}")
|
||
|
|
||
|
if hasattr(self.llm, 'cleanup'):
|
||
|
try:
|
||
|
self.llm.cleanup()
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error cleaning up LLM: {str(e)}")
|
||
|
|
||
|
if hasattr(self.ui, 'cleanup'):
|
||
|
self.ui.cleanup()
|
||
|
|
||
|
def _initialize_document(self):
|
||
|
"""Initialize research session document"""
|
||
|
try:
|
||
|
# Get all existing research session files
|
||
|
self.session_files = []
|
||
|
for file in os.listdir():
|
||
|
if file.startswith("research_session_") and file.endswith(".txt"):
|
||
|
try:
|
||
|
num = int(file.split("_")[2].split(".")[0])
|
||
|
self.session_files.append(num)
|
||
|
except ValueError:
|
||
|
continue
|
||
|
|
||
|
# Determine next session number
|
||
|
next_session = 1 if not self.session_files else max(self.session_files) + 1
|
||
|
self.document_path = f"research_session_{next_session}.txt"
|
||
|
|
||
|
# Initialize the new document
|
||
|
with open(self.document_path, 'w', encoding='utf-8') as f:
|
||
|
f.write(f"Research Session {next_session}\n")
|
||
|
f.write(f"Topic: {self.original_query}\n")
|
||
|
f.write(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||
|
f.write("="*80 + "\n\n")
|
||
|
f.flush()
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error initializing document: {str(e)}")
|
||
|
self.document_path = "research_findings.txt"
|
||
|
with open(self.document_path, 'w', encoding='utf-8') as f:
|
||
|
f.write("Research Findings:\n\n")
|
||
|
f.flush()
|
||
|
|
||
|
def add_to_document(self, content: str, source_url: str, focus_area: str):
|
||
|
"""Add research findings to current session document"""
|
||
|
try:
|
||
|
with open(self.document_path, 'a', encoding='utf-8') as f:
|
||
|
if source_url not in self.searched_urls:
|
||
|
f.write(f"\n{'='*80}\n")
|
||
|
f.write(f"Research Focus: {focus_area}\n")
|
||
|
f.write(f"Source: {source_url}\n")
|
||
|
f.write(f"Content:\n{content}\n")
|
||
|
f.write(f"{'='*80}\n")
|
||
|
f.flush()
|
||
|
self.searched_urls.add(source_url)
|
||
|
self.ui.update_output(f"Added content from: {source_url}")
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error adding to document: {str(e)}")
|
||
|
self.ui.update_output(f"Error saving content: {str(e)}")
|
||
|
|
||
|
def _process_search_results(self, results: Dict[str, str], focus_area: str):
|
||
|
"""Process and store search results"""
|
||
|
if not results:
|
||
|
return
|
||
|
|
||
|
for url, content in results.items():
|
||
|
if url not in self.searched_urls:
|
||
|
self.add_to_document(content, url, focus_area)
|
||
|
|
||
|
def _research_loop(self):
|
||
|
"""Main research loop with comprehensive functionality"""
|
||
|
self.is_running = True
|
||
|
try:
|
||
|
self.research_started.set()
|
||
|
|
||
|
while not self.should_terminate.is_set() and not self.shutdown_event.is_set():
|
||
|
# Check if research is paused
|
||
|
if self.research_paused:
|
||
|
time.sleep(1)
|
||
|
continue
|
||
|
|
||
|
self.ui.update_output("\nAnalyzing research progress...")
|
||
|
|
||
|
# Generate focus areas
|
||
|
self.ui.update_output("\nGenerating research focus areas...")
|
||
|
analysis_result = self.strategic_parser.strategic_analysis(self.original_query)
|
||
|
|
||
|
if not analysis_result:
|
||
|
self.ui.update_output("\nFailed to generate analysis result. Retrying...")
|
||
|
continue
|
||
|
|
||
|
focus_areas = analysis_result.focus_areas
|
||
|
if not focus_areas:
|
||
|
self.ui.update_output("\nNo valid focus areas generated. Retrying...")
|
||
|
continue
|
||
|
|
||
|
self.ui.update_output(f"\nGenerated {len(focus_areas)} research areas:")
|
||
|
for i, focus in enumerate(focus_areas, 1):
|
||
|
self.ui.update_output(f"\nArea {i}: {focus.area}")
|
||
|
self.ui.update_output(f"Priority: {focus.priority}")
|
||
|
|
||
|
# Process each focus area in priority order
|
||
|
for focus_area in focus_areas:
|
||
|
if self.should_terminate.is_set():
|
||
|
break
|
||
|
|
||
|
# Check if research is paused
|
||
|
while self.research_paused and not self.should_terminate.is_set():
|
||
|
time.sleep(1)
|
||
|
|
||
|
if self.should_terminate.is_set():
|
||
|
break
|
||
|
|
||
|
self.current_focus = focus_area
|
||
|
self.ui.update_output(f"\nInvestigating: {focus_area.area}")
|
||
|
|
||
|
queries = self.formulate_search_queries(focus_area)
|
||
|
if not queries:
|
||
|
continue
|
||
|
|
||
|
for query in queries:
|
||
|
if self.should_terminate.is_set():
|
||
|
break
|
||
|
|
||
|
# Check if research is paused
|
||
|
while self.research_paused and not self.should_terminate.is_set():
|
||
|
time.sleep(1)
|
||
|
|
||
|
if self.should_terminate.is_set():
|
||
|
break
|
||
|
|
||
|
try:
|
||
|
self.ui.update_output(f"\nSearching: {query}")
|
||
|
results = self.search_engine.perform_search(query, time_range='none')
|
||
|
|
||
|
if results:
|
||
|
# self.search_engine.display_search_results(results)
|
||
|
selected_urls = self.search_engine.select_relevant_pages(results, query)
|
||
|
|
||
|
if selected_urls:
|
||
|
self.ui.update_output("\n⚙️ Scraping selected pages...")
|
||
|
scraped_content = self.search_engine.scrape_content(selected_urls)
|
||
|
if scraped_content:
|
||
|
for url, content in scraped_content.items():
|
||
|
if url not in self.searched_urls:
|
||
|
self.add_to_document(content, url, focus_area.area)
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error in search: {str(e)}")
|
||
|
self.ui.update_output(f"Error during search: {str(e)}")
|
||
|
|
||
|
if self.check_document_size():
|
||
|
self.ui.update_output("\nDocument size limit reached. Finalizing research.")
|
||
|
return
|
||
|
|
||
|
# After processing all areas, cycle back to generate new ones
|
||
|
self.ui.update_output("\nAll current focus areas investigated. Generating new areas...")
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error in research loop: {str(e)}")
|
||
|
self.ui.update_output(f"Error in research process: {str(e)}")
|
||
|
finally:
|
||
|
self.is_running = False
|
||
|
|
||
|
def start_research(self, topic: str):
|
||
|
"""Start research with new session document"""
|
||
|
try:
|
||
|
self.ui.setup()
|
||
|
self.original_query = topic
|
||
|
self._initialize_document()
|
||
|
|
||
|
self.ui.update_output(f"Starting research on: {topic}")
|
||
|
self.ui.update_output(f"Session document: {self.document_path}")
|
||
|
self.ui.update_output("\nCommands available during research:")
|
||
|
self.ui.update_output("'s' = Show status")
|
||
|
self.ui.update_output("'f' = Show current focus")
|
||
|
self.ui.update_output("'p' = Pause and assess the research progress") # New command
|
||
|
self.ui.update_output("'q' = Quit research\n")
|
||
|
|
||
|
# Reset events
|
||
|
self.should_terminate.clear()
|
||
|
self.research_started.clear()
|
||
|
self.research_paused = False # Ensure research is not paused at the start
|
||
|
self.awaiting_user_decision = False
|
||
|
|
||
|
# Start research thread
|
||
|
self.research_thread = threading.Thread(target=self._research_loop, daemon=True)
|
||
|
self.research_thread.start()
|
||
|
|
||
|
# Wait for research to actually start
|
||
|
if not self.research_started.wait(timeout=10):
|
||
|
self.ui.update_output("Error: Research failed to start within timeout period")
|
||
|
self.should_terminate.set()
|
||
|
return
|
||
|
|
||
|
while not self.should_terminate.is_set():
|
||
|
cmd = self.ui.get_input("Enter command: ")
|
||
|
if cmd is None or self.shutdown_event.is_set():
|
||
|
if self.should_terminate.is_set() and not self.research_complete:
|
||
|
self.ui.update_output("\nGenerating research summary... please wait...")
|
||
|
summary = self.terminate_research()
|
||
|
self.ui.update_output("\nFinal Research Summary:")
|
||
|
self.ui.update_output(summary)
|
||
|
break
|
||
|
if cmd:
|
||
|
self._handle_command(cmd)
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error in research process: {str(e)}")
|
||
|
finally:
|
||
|
self._cleanup()
|
||
|
|
||
|
def check_document_size(self) -> bool:
|
||
|
"""Check if document size is approaching context limit"""
|
||
|
try:
|
||
|
with open(self.document_path, 'r', encoding='utf-8') as f:
|
||
|
content = f.read()
|
||
|
estimated_tokens = len(content.split()) * 1.3
|
||
|
max_tokens = self.llm.llm_config.get('n_ctx', 2048)
|
||
|
current_ratio = estimated_tokens / max_tokens
|
||
|
|
||
|
if current_ratio > 0.8:
|
||
|
logger.warning(f"Document size at {current_ratio*100:.1f}% of context limit")
|
||
|
self.ui.update_output(f"Warning: Document size at {current_ratio*100:.1f}% of context limit")
|
||
|
|
||
|
return current_ratio > 0.9
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error checking document size: {str(e)}")
|
||
|
return True
|
||
|
|
||
|
def _handle_command(self, cmd: str):
|
||
|
"""Handle user commands during research"""
|
||
|
if cmd.lower() == 's':
|
||
|
self.ui.update_output(self.get_progress())
|
||
|
elif cmd.lower() == 'f':
|
||
|
if self.current_focus:
|
||
|
self.ui.update_output("\nCurrent Focus:")
|
||
|
self.ui.update_output(f"Area: {self.current_focus.area}")
|
||
|
self.ui.update_output(f"Priority: {self.current_focus.priority}")
|
||
|
else:
|
||
|
self.ui.update_output("\nNo current focus area")
|
||
|
elif cmd.lower() == 'p':
|
||
|
self.pause_and_assess()
|
||
|
elif cmd.lower() == 'q':
|
||
|
self.ui.update_output("\nInitiating research termination...")
|
||
|
self.should_terminate.set()
|
||
|
self.ui.update_output("\nGenerating research summary... please wait...")
|
||
|
summary = self.terminate_research()
|
||
|
self.ui.update_output("\nFinal Research Summary:")
|
||
|
self.ui.update_output(summary)
|
||
|
|
||
|
def pause_and_assess(self):
|
||
|
"""Pause the research and assess if the collected content is sufficient."""
|
||
|
try:
|
||
|
# Pause the research thread
|
||
|
self.ui.update_output("\nPausing research for assessment...")
|
||
|
self.research_paused = True
|
||
|
|
||
|
# Start progress indicator in a separate thread
|
||
|
self.summary_ready = False
|
||
|
indicator_thread = threading.Thread(
|
||
|
target=self.show_progress_indicator,
|
||
|
args=("Assessing the researched information...",)
|
||
|
)
|
||
|
indicator_thread.daemon = True
|
||
|
indicator_thread.start()
|
||
|
|
||
|
# Read the current research content
|
||
|
if not os.path.exists(self.document_path):
|
||
|
self.summary_ready = True
|
||
|
indicator_thread.join()
|
||
|
self.ui.update_output("No research data found to assess.")
|
||
|
self.research_paused = False
|
||
|
return
|
||
|
|
||
|
with open(self.document_path, 'r', encoding='utf-8') as f:
|
||
|
content = f.read().strip()
|
||
|
|
||
|
if not content:
|
||
|
self.summary_ready = True
|
||
|
indicator_thread.join()
|
||
|
self.ui.update_output("No research data was collected to assess.")
|
||
|
self.research_paused = False
|
||
|
return
|
||
|
|
||
|
# Prepare the prompt for the AI assessment
|
||
|
assessment_prompt = f"""
|
||
|
Based on the following research content, please assess whether the original query "{self.original_query}" can be answered sufficiently with the collected information.
|
||
|
|
||
|
Research Content:
|
||
|
{content}
|
||
|
|
||
|
Instructions:
|
||
|
1. If the research content provides enough information to answer the original query in detail, respond with: "The research is sufficient to answer the query."
|
||
|
2. If not, respond with: "The research is insufficient and it would be advisable to continue gathering information."
|
||
|
3. Do not provide any additional information or details.
|
||
|
|
||
|
Assessment:
|
||
|
"""
|
||
|
|
||
|
# Generate the assessment
|
||
|
assessment = self.llm.generate(assessment_prompt, max_tokens=200)
|
||
|
|
||
|
# Stop the progress indicator
|
||
|
self.summary_ready = True
|
||
|
indicator_thread.join()
|
||
|
|
||
|
# Display the assessment
|
||
|
self.ui.update_output("\nAssessment Result:")
|
||
|
self.ui.update_output(assessment.strip())
|
||
|
|
||
|
# Provide user with options to continue or quit
|
||
|
self.ui.update_output("\nEnter 'c' to continue the research or 'q' to terminate and generate the summary.")
|
||
|
self.awaiting_user_decision = True # Flag to indicate we are waiting for user's decision
|
||
|
|
||
|
while self.awaiting_user_decision:
|
||
|
cmd = self.ui.get_input("Enter command ('c' to continue, 'q' to quit): ")
|
||
|
if cmd is None:
|
||
|
continue # Ignore invalid inputs
|
||
|
cmd = cmd.strip().lower()
|
||
|
if cmd == 'c':
|
||
|
self.ui.update_output("\nResuming research...")
|
||
|
self.research_paused = False
|
||
|
self.awaiting_user_decision = False
|
||
|
elif cmd == 'q':
|
||
|
self.ui.update_output("\nTerminating research and generating summary...")
|
||
|
self.awaiting_user_decision = False
|
||
|
self.should_terminate.set()
|
||
|
summary = self.terminate_research()
|
||
|
self.ui.update_output("\nFinal Research Summary:")
|
||
|
self.ui.update_output(summary)
|
||
|
break
|
||
|
else:
|
||
|
self.ui.update_output("Invalid command. Please enter 'c' to continue or 'q' to quit.")
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error during pause and assess: {str(e)}")
|
||
|
self.ui.update_output(f"Error during assessment: {str(e)}")
|
||
|
self.research_paused = False
|
||
|
finally:
|
||
|
self.summary_ready = True # Ensure the indicator thread can exit
|
||
|
|
||
|
def get_progress(self) -> str:
|
||
|
"""Get current research progress"""
|
||
|
return f"""
|
||
|
Research Progress:
|
||
|
- Original Query: {self.original_query}
|
||
|
- Sources analyzed: {len(self.searched_urls)}
|
||
|
- Status: {'Active' if self.is_running else 'Stopped'}
|
||
|
- Current focus: {self.current_focus.area if self.current_focus else 'Initializing'}
|
||
|
"""
|
||
|
|
||
|
def is_active(self) -> bool:
|
||
|
"""Check if research is currently active"""
|
||
|
return self.is_running and self.research_thread and self.research_thread.is_alive()
|
||
|
|
||
|
def terminate_research(self) -> str:
|
||
|
"""Terminate research and return to main terminal"""
|
||
|
try:
|
||
|
print("Initiating research termination...")
|
||
|
sys.stdout.flush()
|
||
|
|
||
|
# Start progress indicator in a separate thread immediately
|
||
|
indicator_thread = threading.Thread(target=self.show_progress_indicator)
|
||
|
indicator_thread.daemon = True
|
||
|
indicator_thread.start()
|
||
|
|
||
|
if not os.path.exists(self.document_path):
|
||
|
self.summary_ready = True
|
||
|
indicator_thread.join(timeout=1.0)
|
||
|
self._cleanup()
|
||
|
return "No research data found to summarize."
|
||
|
|
||
|
with open(self.document_path, 'r', encoding='utf-8') as f:
|
||
|
content = f.read().strip()
|
||
|
self.research_content = content # Store for conversation mode
|
||
|
|
||
|
if not content or content == "Research Findings:\n\n":
|
||
|
self.summary_ready = True
|
||
|
indicator_thread.join(timeout=1.0)
|
||
|
self._cleanup()
|
||
|
return "No research data was collected to summarize."
|
||
|
|
||
|
try:
|
||
|
# Generate summary using LLM
|
||
|
summary_prompt = f"""
|
||
|
Analyze the following content to provide a comprehensive research summary and a response to the user's original query "{self.original_query}" ensuring that you conclusively answer the query in detail:
|
||
|
|
||
|
Research Content:
|
||
|
{content}
|
||
|
|
||
|
Important Instructions:
|
||
|
> Summarize the research findings that are relevant to the Original topic/question: "{self.original_query}"
|
||
|
> Ensure that in your summary you directly answer the original question/topic conclusively to the best of your ability in detail.
|
||
|
> Read the original topic/question again "{self.original_query}" and abide by any additional instructions that it contains, exactly as instructed in your summary otherwise provide it normally should it not have any specific instructions
|
||
|
|
||
|
Summary:
|
||
|
"""
|
||
|
|
||
|
summary = self.llm.generate(summary_prompt, max_tokens=4000)
|
||
|
|
||
|
# Signal that summary is complete to stop the progress indicator
|
||
|
self.summary_ready = True
|
||
|
indicator_thread.join(timeout=1.0)
|
||
|
|
||
|
# Store summary and mark research as complete
|
||
|
self.research_summary = summary
|
||
|
self.research_complete = True
|
||
|
|
||
|
# Format summary
|
||
|
formatted_summary = f"""
|
||
|
{'='*80}
|
||
|
RESEARCH SUMMARY
|
||
|
{'='*80}
|
||
|
|
||
|
Original Query: {self.original_query}
|
||
|
Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
||
|
|
||
|
{summary}
|
||
|
|
||
|
{'='*80}
|
||
|
End of Summary
|
||
|
{'='*80}
|
||
|
"""
|
||
|
|
||
|
# Write to document
|
||
|
with open(self.document_path, 'a', encoding='utf-8') as f:
|
||
|
f.write("\n\n" + formatted_summary)
|
||
|
|
||
|
# Clean up research UI
|
||
|
if hasattr(self, 'ui') and self.ui:
|
||
|
self.ui.cleanup()
|
||
|
|
||
|
return formatted_summary
|
||
|
|
||
|
except Exception as e:
|
||
|
self.summary_ready = True
|
||
|
indicator_thread.join(timeout=1.0)
|
||
|
raise e
|
||
|
|
||
|
except Exception as e:
|
||
|
error_msg = f"Error generating summary: {str(e)}"
|
||
|
logger.error(error_msg)
|
||
|
return error_msg
|
||
|
|
||
|
finally:
|
||
|
# Clean up research UI
|
||
|
self._cleanup_research_ui()
|
||
|
|
||
|
def show_progress_indicator(self, message="Generating summary, please wait..."):
|
||
|
"""Show a rotating progress indicator until the summary is ready."""
|
||
|
symbols = ['|', '/', '-', '\\']
|
||
|
idx = 0
|
||
|
self.summary_ready = False # Track whether the summary is complete
|
||
|
while not self.summary_ready:
|
||
|
sys.stdout.write(f"\r{message} {symbols[idx]}")
|
||
|
sys.stdout.flush()
|
||
|
idx = (idx + 1) % len(symbols)
|
||
|
time.sleep(0.2) # Adjust the speed of the rotation if needed
|
||
|
sys.stdout.write("\r" + " " * (len(message) + 2) + "\r") # Clear the line when done
|
||
|
|
||
|
def _cleanup_research_ui(self):
|
||
|
"""Clean up just the research UI components"""
|
||
|
if hasattr(self, 'ui') and self.ui:
|
||
|
self.ui.cleanup()
|
||
|
|
||
|
def show_thinking_indicator(self, message: str, stop_flag_name: str):
|
||
|
"""Show a rotating thinking indicator with custom message"""
|
||
|
symbols = ['|', '/', '-', '\\']
|
||
|
idx = 0
|
||
|
while getattr(self, stop_flag_name): # Use dynamic attribute lookup
|
||
|
sys.stdout.write(f"\r{message} {symbols[idx]}")
|
||
|
sys.stdout.flush()
|
||
|
idx = (idx + 1) % len(symbols)
|
||
|
time.sleep(0.2)
|
||
|
sys.stdout.write("\r" + " " * (len(message) + 2) + "\r") # Clear the line when done
|
||
|
|
||
|
def start_conversation_mode(self):
|
||
|
"""Start interactive conversation mode with CTRL+D input handling and thinking indicator"""
|
||
|
self.conversation_active = True
|
||
|
self.thinking = False
|
||
|
|
||
|
# Print header with clear instructions
|
||
|
print("\n" + "="*80)
|
||
|
print(Fore.CYAN + "Research Conversation Mode" + Style.RESET_ALL)
|
||
|
print("="*80)
|
||
|
print(Fore.YELLOW + "\nInstructions:")
|
||
|
print("- Type your question and press CTRL+D to submit")
|
||
|
print("- Type 'quit' and press CTRL+D to exit")
|
||
|
print("- Your messages appear in green")
|
||
|
print("- AI responses appear in cyan" + Style.RESET_ALL + "\n")
|
||
|
|
||
|
while self.conversation_active:
|
||
|
try:
|
||
|
# Show prompt with user input in green
|
||
|
print(Fore.GREEN + "Your question (Press CTRL+D to submit):" + Style.RESET_ALL)
|
||
|
user_input = self.get_multiline_conversation_input()
|
||
|
|
||
|
# Handle exit commands
|
||
|
if not user_input or user_input.lower() in ['quit', 'exit', 'q']:
|
||
|
print(Fore.YELLOW + "\nExiting conversation mode..." + Style.RESET_ALL)
|
||
|
self.conversation_active = False
|
||
|
break
|
||
|
|
||
|
# Skip empty input
|
||
|
if not user_input.strip():
|
||
|
continue
|
||
|
|
||
|
# Echo the submitted question for clarity
|
||
|
print(Fore.GREEN + "Submitted question:" + Style.RESET_ALL)
|
||
|
print(Fore.GREEN + user_input + Style.RESET_ALL + "\n")
|
||
|
|
||
|
# Start thinking indicator in a separate thread
|
||
|
self.thinking = True # Set flag before starting thread
|
||
|
thinking_thread = threading.Thread(
|
||
|
target=self.show_thinking_indicator,
|
||
|
args=("Thinking...", "thinking")
|
||
|
)
|
||
|
thinking_thread.daemon = True
|
||
|
thinking_thread.start()
|
||
|
|
||
|
try:
|
||
|
# Generate response
|
||
|
response = self._generate_conversation_response(user_input)
|
||
|
|
||
|
# Stop thinking indicator
|
||
|
self.thinking = False
|
||
|
thinking_thread.join()
|
||
|
|
||
|
# Display response in cyan
|
||
|
print(Fore.CYAN + "AI Response:" + Style.RESET_ALL)
|
||
|
print(f"{Fore.CYAN}{response}{Style.RESET_ALL}\n")
|
||
|
print("-" * 80 + "\n") # Separator between QA pairs
|
||
|
|
||
|
except Exception as e:
|
||
|
self.thinking = False # Ensure thinking indicator stops
|
||
|
thinking_thread.join()
|
||
|
raise e
|
||
|
|
||
|
except KeyboardInterrupt:
|
||
|
self.thinking = False # Ensure thinking indicator stops
|
||
|
print(Fore.YELLOW + "\nOperation cancelled. Submit 'quit' to exit." + Style.RESET_ALL)
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error in conversation mode: {str(e)}")
|
||
|
print(Fore.RED + f"Error processing question: {str(e)}" + Style.RESET_ALL)
|
||
|
|
||
|
def _generate_conversation_response(self, user_query: str) -> str:
|
||
|
"""Generate contextual responses with improved context handling"""
|
||
|
try:
|
||
|
# Add debug logging to verify content
|
||
|
logger.info(f"Research summary length: {len(self.research_summary) if self.research_summary else 0}")
|
||
|
logger.info(f"Research content length: {len(self.research_content) if self.research_content else 0}")
|
||
|
|
||
|
# First verify we have content
|
||
|
if not self.research_content and not self.research_summary:
|
||
|
# Try to reload from file if available
|
||
|
try:
|
||
|
if os.path.exists(self.document_path):
|
||
|
with open(self.document_path, 'r', encoding='utf-8') as f:
|
||
|
self.research_content = f.read().strip()
|
||
|
except Exception as e:
|
||
|
logger.error(f"Failed to reload research content: {str(e)}")
|
||
|
|
||
|
# Prepare context, ensuring we have content
|
||
|
context = f"""
|
||
|
Research Content:
|
||
|
{self.research_content}
|
||
|
|
||
|
Research Summary:
|
||
|
{self.research_summary if self.research_summary else 'No summary available'}
|
||
|
"""
|
||
|
|
||
|
prompt = f"""
|
||
|
Based on the following research content and summary, please answer this question:
|
||
|
|
||
|
{context}
|
||
|
|
||
|
Question: {user_query}
|
||
|
|
||
|
you have 2 sets of instructions the applied set and the unapplied set, the applied set should be followed if the question is directly relating to the research content whereas anything else other then direct questions about the content of the research will result in you instead following the unapplied ruleset
|
||
|
|
||
|
Applied:
|
||
|
|
||
|
Instructions:
|
||
|
1. Answer based ONLY on the research content provided above if asked a question about your research or that content.
|
||
|
2. If the information requested isn't in the research, clearly state that it isn't in the content you gathered.
|
||
|
3. Be direct and specific in your response, DO NOT directly cite research unless specifically asked to, be concise and give direct answers to questions based on the research, unless instructed otherwise.
|
||
|
|
||
|
Unapplied:
|
||
|
|
||
|
Instructions:
|
||
|
|
||
|
1. Do not make up anything that isn't actually true.
|
||
|
2. Respond directly to the user's question in an honest and thoughtful manner.
|
||
|
3. disregard rules in the applied set for queries not DIRECTLY related to the research, including queries about the research process or what you remember about the research should result in the unapplied ruleset being used.
|
||
|
|
||
|
Answer:
|
||
|
"""
|
||
|
|
||
|
response = self.llm.generate(
|
||
|
prompt,
|
||
|
max_tokens=1000, # Increased for more detailed responses
|
||
|
temperature=0.7
|
||
|
)
|
||
|
|
||
|
if not response or not response.strip():
|
||
|
return "I apologize, but I cannot find relevant information in the research content to answer your question."
|
||
|
|
||
|
return response.strip()
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.error(f"Error generating response: {str(e)}")
|
||
|
return f"I apologize, but I encountered an error processing your question: {str(e)}"
|
||
|
|
||
|
def get_multiline_conversation_input(self) -> str:
|
||
|
"""Get multiline input with CTRL+D handling for conversation mode"""
|
||
|
buffer = []
|
||
|
|
||
|
# Save original terminal settings
|
||
|
fd = sys.stdin.fileno()
|
||
|
old_settings = termios.tcgetattr(fd)
|
||
|
|
||
|
try:
|
||
|
# Set terminal to raw mode
|
||
|
tty.setraw(fd)
|
||
|
|
||
|
current_line = []
|
||
|
while True:
|
||
|
char = sys.stdin.read(1)
|
||
|
|
||
|
# CTRL+D detection
|
||
|
if not char or ord(char) == 4: # EOF or CTRL+D
|
||
|
sys.stdout.write('\n')
|
||
|
if current_line:
|
||
|
buffer.append(''.join(current_line))
|
||
|
return ' '.join(buffer).strip()
|
||
|
|
||
|
# Handle special characters
|
||
|
elif ord(char) == 13: # Enter
|
||
|
sys.stdout.write('\n')
|
||
|
buffer.append(''.join(current_line))
|
||
|
current_line = []
|
||
|
|
||
|
elif ord(char) == 127: # Backspace
|
||
|
if current_line:
|
||
|
current_line.pop()
|
||
|
sys.stdout.write('\b \b')
|
||
|
|
||
|
elif ord(char) == 3: # CTRL+C
|
||
|
sys.stdout.write('\n')
|
||
|
return 'quit'
|
||
|
|
||
|
# Normal character
|
||
|
elif 32 <= ord(char) <= 126: # Printable characters
|
||
|
current_line.append(char)
|
||
|
sys.stdout.write(char)
|
||
|
|
||
|
sys.stdout.flush()
|
||
|
|
||
|
finally:
|
||
|
# Restore terminal settings
|
||
|
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
||
|
print() # New line for clean display
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
from llm_wrapper import LLMWrapper
|
||
|
from llm_response_parser import UltimateLLMResponseParser
|
||
|
from Self_Improving_Search import EnhancedSelfImprovingSearch
|
||
|
|
||
|
try:
|
||
|
print(f"{Fore.CYAN}Initializing Research System...{Style.RESET_ALL}")
|
||
|
llm = LLMWrapper()
|
||
|
parser = UltimateLLMResponseParser()
|
||
|
search_engine = EnhancedSelfImprovingSearch(llm, parser)
|
||
|
manager = ResearchManager(llm, parser, search_engine)
|
||
|
|
||
|
print(f"{Fore.GREEN}System initialized. Enter your research topic or 'quit' to exit.{Style.RESET_ALL}")
|
||
|
while True:
|
||
|
try:
|
||
|
topic = ResearchManager.get_initial_input()
|
||
|
if topic.lower() == 'quit':
|
||
|
break
|
||
|
|
||
|
if not topic:
|
||
|
continue
|
||
|
|
||
|
if not topic.startswith('@'):
|
||
|
print(f"{Fore.YELLOW}Please start your research query with '@'{Style.RESET_ALL}")
|
||
|
continue
|
||
|
|
||
|
topic = topic[1:] # Remove @ prefix
|
||
|
manager.start_research(topic)
|
||
|
summary = manager.terminate_research()
|
||
|
print(f"\n{Fore.GREEN}Research Summary:{Style.RESET_ALL}")
|
||
|
print(summary)
|
||
|
print(f"\n{Fore.GREEN}Research completed. Ready for next topic.{Style.RESET_ALL}\n")
|
||
|
|
||
|
except KeyboardInterrupt:
|
||
|
print(f"\n{Fore.YELLOW}Operation cancelled. Ready for next topic.{Style.RESET_ALL}")
|
||
|
if 'manager' in locals():
|
||
|
manager.terminate_research()
|
||
|
continue
|
||
|
|
||
|
except KeyboardInterrupt:
|
||
|
print(f"\n{Fore.YELLOW}Research system shutting down.{Style.RESET_ALL}")
|
||
|
if 'manager' in locals():
|
||
|
manager.terminate_research()
|
||
|
except Exception as e:
|
||
|
print(f"{Fore.RED}Critical error: {str(e)}{Style.RESET_ALL}")
|
||
|
logger.error("Critical error in main loop", exc_info=True)
|
||
|
|
||
|
if os.name == 'nt':
|
||
|
print(f"{Fore.YELLOW}Running on Windows - Some features may be limited{Style.RESET_ALL}")
|