Update Web-LLM.py for windows

This commit is contained in:
Hafeez 2024-11-20 22:30:22 +05:30 committed by GitHub
parent 1029885bd0
commit a70e37841d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -17,10 +17,12 @@ if os.name != 'nt':
print("This version is Windows-specific. Please use the Unix version for other operating systems.") print("This version is Windows-specific. Please use the Unix version for other operating systems.")
sys.exit(1) sys.exit(1)
init() # Initialize colorama
# Set up logging # Set up logging
log_directory = 'logs' log_directory = 'logs'
if not os.path.exists(log_directory): if not os.path.exists(log_directory):
os.makedirs(log_directory) os.makedirs(log_directory)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
@ -34,23 +36,23 @@ logger.propagate = False
# Disable other loggers # Disable other loggers
for name in logging.root.manager.loggerDict: for name in logging.root.manager.loggerDict:
if name != __name__: if name != __name__:
logging.getLogger(name).disabled = True logging.getLogger(name).disabled = True
class OutputRedirector: class OutputRedirector:
def __init__(self, stream=None): def __init__(self, stream=None):
self.stream = stream or StringIO() self.stream = stream or StringIO()
self.original_stdout = sys.stdout self.original_stdout = sys.stdout
self.original_stderr = sys.stderr self.original_stderr = sys.stderr
def __enter__(self): def __enter__(self):
sys.stdout = self.stream sys.stdout = self.stream
sys.stderr = self.stream sys.stderr = self.stream
return self.stream return self.stream
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout = self.original_stdout sys.stdout = self.original_stdout
sys.stderr = self.original_stderr sys.stderr = self.original_stderr
def print_header(): def print_header():
print(Fore.CYAN + Style.BRIGHT + """ print(Fore.CYAN + Style.BRIGHT + """
@ -72,220 +74,241 @@ def print_header():
""" + Style.RESET_ALL) """ + Style.RESET_ALL)
def get_multiline_input() -> str: def get_multiline_input() -> str:
"""Windows-compatible multiline input handler""" """Windows-compatible multiline input handler with improved reliability"""
print(f"{Fore.GREEN}📝 Enter your message (Press CTRL+Z to submit):{Style.RESET_ALL}") print(f"{Fore.GREEN}📝 Enter your message (Press CTRL+Z to submit):{Style.RESET_ALL}")
lines = [] lines = []
current_line = [] current_line = ""
try: try:
while True: while True:
if msvcrt.kbhit(): if msvcrt.kbhit():
char = msvcrt.getch() char = msvcrt.getch()
# Convert bytes to string for comparison
char_code = ord(char)
# CTRL+Z detection (Windows EOF)
if char_code == 26: # ASCII code for CTRL+Z
print() # New line
if current_line:
lines.append(current_line)
return ' '.join(lines).strip() or "q"
# Enter key
elif char in [b'\r', b'\n']:
print() # New line
lines.append(current_line)
current_line = ""
# Backspace
elif char_code == 8: # ASCII code for backspace
if current_line:
current_line = current_line[:-1]
print('\b \b', end='', flush=True)
# Regular character input
elif 32 <= char_code <= 126: # Printable ASCII range
try:
char_str = char.decode('utf-8')
current_line += char_str
print(char_str, end='', flush=True)
except UnicodeDecodeError:
continue
# CTRL+Z detection (Windows equivalent of CTRL+D) time.sleep(0.01) # Prevent high CPU usage
if char in [b'\x1a', b'\x04']: # CTRL+Z or CTRL+D
sys.stdout.write('\n')
if current_line:
lines.append(''.join(current_line))
return ' '.join(lines).strip()
# Handle special characters except KeyboardInterrupt:
elif char == b'\r': # Enter print("\nInput interrupted")
sys.stdout.write('\n') return "q"
lines.append(''.join(current_line)) except Exception as e:
current_line = [] logger.error(f"Input error: {str(e)}")
return "q"
elif char == b'\x08': # Backspace
if current_line:
current_line.pop()
sys.stdout.write('\b \b')
elif char == b'\x03': # CTRL+C
sys.stdout.write('\n')
return 'q'
# Normal character
elif 32 <= ord(char[0]) <= 126:
current_line.append(char.decode('utf-8'))
sys.stdout.write(char.decode('utf-8'))
sys.stdout.flush()
except Exception as e:
logger.error(f"Error in multiline input: {str(e)}")
return 'q'
def initialize_system(): def initialize_system():
"""Initialize system with proper error checking""" """Initialize system with enhanced error checking and recovery"""
try: try:
print(Fore.YELLOW + "Initializing system..." + Style.RESET_ALL) print(Fore.YELLOW + "Initializing system..." + Style.RESET_ALL)
llm_config = get_llm_config() # Load configuration
if llm_config['llm_type'] == 'ollama': llm_config = get_llm_config()
import requests
try: # Validate Ollama connection
response = requests.get(llm_config['base_url'], timeout=5) if llm_config['llm_type'] == 'ollama':
if response.status_code != 200: import requests
raise ConnectionError("Cannot connect to Ollama server") max_retries = 3
except requests.exceptions.RequestException: retry_delay = 2
raise ConnectionError(
"\nCannot connect to Ollama server!" for attempt in range(max_retries):
"\nPlease ensure:" try:
"\n1. Ollama is installed" response = requests.get(llm_config['base_url'], timeout=5)
"\n2. Ollama server is running (try 'ollama serve')" if response.status_code == 200:
"\n3. The model specified in llm_config.py is pulled" break
) elif attempt < max_retries - 1:
elif llm_config['llm_type'] == 'llama_cpp': print(f"{Fore.YELLOW}Retrying Ollama connection ({attempt + 1}/{max_retries})...{Style.RESET_ALL}")
model_path = llm_config.get('model_path') time.sleep(retry_delay)
if not model_path or not os.path.exists(model_path): else:
raise FileNotFoundError( raise ConnectionError("Cannot connect to Ollama server")
f"\nLLama.cpp model not found at: {model_path}" except requests.exceptions.RequestException as e:
"\nPlease ensure model path in llm_config.py is correct" if attempt == max_retries - 1:
) raise ConnectionError(
"\nCannot connect to Ollama server!"
"\nPlease ensure:"
"\n1. Ollama is installed"
"\n2. Ollama server is running (try 'ollama serve')"
"\n3. The model specified in llm_config.py is pulled"
)
time.sleep(retry_delay)
with OutputRedirector() as output: # Initialize components with output redirection
llm_wrapper = LLMWrapper() with OutputRedirector() as output:
try: llm_wrapper = LLMWrapper()
test_response = llm_wrapper.generate("Test", max_tokens=10) parser = UltimateLLMResponseParser()
if not test_response: search_engine = EnhancedSelfImprovingSearch(llm_wrapper, parser)
raise ConnectionError("LLM failed to generate response") research_manager = ResearchManager(llm_wrapper, parser, search_engine)
except Exception as e:
raise ConnectionError(f"LLM test failed: {str(e)}")
parser = UltimateLLMResponseParser() # Validate LLM
search_engine = EnhancedSelfImprovingSearch(llm_wrapper, parser) test_response = llm_wrapper.generate("Test", max_tokens=10)
research_manager = ResearchManager(llm_wrapper, parser, search_engine) if not test_response:
raise ConnectionError("LLM failed to generate response")
print(Fore.GREEN + "System initialized successfully." + Style.RESET_ALL) print(Fore.GREEN + "System initialized successfully." + Style.RESET_ALL)
return llm_wrapper, parser, search_engine, research_manager return llm_wrapper, parser, search_engine, research_manager
except Exception as e:
logger.error(f"Error initializing system: {str(e)}", exc_info=True) except Exception as e:
print(Fore.RED + f"System initialization failed: {str(e)}" + Style.RESET_ALL) logger.error(f"Error initializing system: {str(e)}", exc_info=True)
return None, None, None, None print(Fore.RED + f"System initialization failed: {str(e)}" + Style.RESET_ALL)
return None, None, None, None
def handle_search_mode(search_engine, query):
"""Handles web search operations"""
print(f"{Fore.CYAN}Initiating web search...{Style.RESET_ALL}")
try:
# Change search() to search_and_improve() which is the correct method name
results = search_engine.search_and_improve(query)
print(f"\n{Fore.GREEN}Search Results:{Style.RESET_ALL}")
print(results)
except Exception as e:
logger.error(f"Search error: {str(e)}")
print(f"{Fore.RED}Search failed: {str(e)}{Style.RESET_ALL}")
def handle_research_mode(research_manager, query): def handle_research_mode(research_manager, query):
"""Handles research mode operations""" """Handles research mode operations"""
print(f"{Fore.CYAN}Initiating research mode...{Style.RESET_ALL}") print(f"{Fore.CYAN}Initiating research mode...{Style.RESET_ALL}")
try: try:
# Start the research # Start the research
research_manager.start_research(query) research_manager.start_research(query)
submit_key = "CTRL+Z" if os.name == 'nt' else "CTRL+D" submit_key = "CTRL+Z" if os.name == 'nt' else "CTRL+D"
print(f"\n{Fore.YELLOW}Research Running. Available Commands:{Style.RESET_ALL}") print(f"\n{Fore.YELLOW}Research Running. Available Commands:{Style.RESET_ALL}")
print(f"Type command and press {submit_key}:") print(f"Type command and press {submit_key}:")
print("'s' = Show status") print("'s' = Show status")
print("'f' = Show focus") print("'f' = Show focus")
print("'q' = Quit research") print("'q' = Quit research")
while research_manager.is_active(): while research_manager.is_active():
try: try:
command = get_multiline_input().strip().lower() command = get_multiline_input().strip().lower()
if command == 's': if command == 's':
print("\n" + research_manager.get_progress()) print("\n" + research_manager.get_progress())
elif command == 'f': elif command == 'f':
if research_manager.current_focus: if research_manager.current_focus:
print(f"\n{Fore.CYAN}Current Focus:{Style.RESET_ALL}") print(f"\n{Fore.CYAN}Current Focus:{Style.RESET_ALL}")
print(f"Area: {research_manager.current_focus.area}") print(f"Area: {research_manager.current_focus.area}")
print(f"Priority: {research_manager.current_focus.priority}") print(f"Priority: {research_manager.current_focus.priority}")
print(f"Reasoning: {research_manager.current_focus.reasoning}") print(f"Reasoning: {research_manager.current_focus.reasoning}")
else: else:
print(f"\n{Fore.YELLOW}No current focus area{Style.RESET_ALL}") print(f"\n{Fore.YELLOW}No current focus area{Style.RESET_ALL}")
elif command == 'q': elif command == 'q':
break break
except KeyboardInterrupt: except KeyboardInterrupt:
break break
# Get final summary first # Get final summary first
summary = research_manager.terminate_research() summary = research_manager.terminate_research()
# Ensure research UI is fully cleaned up # Ensure research UI is fully cleaned up
research_manager._cleanup_research_ui() research_manager._cleanup_research_ui()
# Now in main terminal, show summary # Now in main terminal, show summary
print(f"\n{Fore.GREEN}Research Summary:{Style.RESET_ALL}") print(f"\n{Fore.GREEN}Research Summary:{Style.RESET_ALL}")
print(summary) print(summary)
# Only NOW start conversation mode if we have a valid summary # Only NOW start conversation mode if we have a valid summary
if research_manager.research_complete and research_manager.research_summary: if hasattr(research_manager, 'research_complete') and \
time.sleep(0.5) # Small delay to ensure clean transition hasattr(research_manager, 'research_summary') and \
research_manager.start_conversation_mode() research_manager.research_complete and \
research_manager.research_summary:
time.sleep(0.5) # Small delay to ensure clean transition
research_manager.start_conversation_mode()
return return
except KeyboardInterrupt: except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Research interrupted.{Style.RESET_ALL}") print(f"\n{Fore.YELLOW}Research interrupted.{Style.RESET_ALL}")
research_manager.terminate_research() research_manager.terminate_research()
except Exception as e: except Exception as e:
print(f"\n{Fore.RED}Research error: {str(e)}{Style.RESET_ALL}") logger.error(f"Research error: {str(e)}")
research_manager.terminate_research() print(f"\n{Fore.RED}Research error: {str(e)}{Style.RESET_ALL}")
research_manager.terminate_research()
def main(): def main():
print_header() init() # Initialize colorama
try: print_header()
llm, parser, search_engine, research_manager = initialize_system()
if not all([llm, parser, search_engine, research_manager]): try:
return components = initialize_system()
if not all(components):
sys.exit(1)
llm, parser, search_engine, research_manager = components
while True: while True:
try: try:
# Get input with improved CTRL+D handling user_input = get_multiline_input()
user_input = get_multiline_input()
# Skip empty inputs
if not user_input:
continue
# Handle exit commands
if user_input.lower() in ["@quit", "quit", "q"]:
break
# Handle immediate CTRL+D (empty input) # Handle help command
if user_input == "": if user_input.lower() == 'help':
user_input = "@quit" # Convert empty CTRL+D to quit command print_header()
continue
user_input = user_input.strip() # Process commands
if user_input.startswith('/'):
handle_search_mode(search_engine, user_input[1:].strip())
elif user_input.startswith('@'):
handle_research_mode(research_manager, user_input[1:].strip())
else:
print(f"{Fore.YELLOW}Please start with '/' for search or '@' for research.{Style.RESET_ALL}")
# Check for special quit markers except KeyboardInterrupt:
if user_input in ["@quit", "quit", "q"]: print(f"\n{Fore.YELLOW}Use 'q' to quit or continue with new input.{Style.RESET_ALL}")
print(Fore.YELLOW + "\nGoodbye!" + Style.RESET_ALL) continue
break except Exception as e:
logger.error(f"Error processing input: {str(e)}")
print(f"{Fore.RED}Error: {str(e)}{Style.RESET_ALL}")
continue
if not user_input: except KeyboardInterrupt:
continue print(f"\n{Fore.YELLOW}Program terminated by user.{Style.RESET_ALL}")
except Exception as e:
if user_input.lower() == 'help': logger.critical(f"Critical error: {str(e)}")
print_header() print(f"{Fore.RED}Critical error: {str(e)}{Style.RESET_ALL}")
continue finally:
try:
if user_input.startswith('/'): if 'research_manager' in locals() and research_manager:
search_query = user_input[1:].strip() research_manager.cleanup()
handle_search_mode(search_engine, search_query) except Exception as e:
logger.error(f"Cleanup error: {str(e)}")
elif user_input.startswith('@'): print(Fore.YELLOW + "\nGoodbye!" + Style.RESET_ALL)
research_query = user_input[1:].strip() sys.exit(0)
handle_research_mode(research_manager, research_query)
else:
print(f"{Fore.RED}Please start with '/' for search or '@' for research.{Style.RESET_ALL}")
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Exiting program...{Style.RESET_ALL}")
break
except Exception as e:
logger.error(f"Error in main loop: {str(e)}")
print(f"{Fore.RED}An error occurred: {str(e)}{Style.RESET_ALL}")
continue
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Program terminated by user.{Style.RESET_ALL}")
except Exception as e:
logger.critical(f"Critical error: {str(e)}")
print(f"{Fore.RED}Critical error: {str(e)}{Style.RESET_ALL}")
finally:
# Ensure proper cleanup on exit
try:
if 'research_manager' in locals() and research_manager:
if hasattr(research_manager, 'ui'):
research_manager.ui.cleanup()
except:
pass
os._exit(0)
if __name__ == "__main__": if __name__ == "__main__":
main() main()