From c53dddc45728337b9b189de01d180a376efe6f8f Mon Sep 17 00:00:00 2001 From: Hafeez <90968109+hafeezhmha@users.noreply.github.com> Date: Wed, 20 Nov 2024 21:46:46 +0530 Subject: [PATCH] Update research_manager.py for windows Replaced termios-based input handling with msvcrt Modified TerminalUI class Added Windows-specific NonBlockingInput class Updated get_multiline_conversation_input method Removed Unix-specific imports --- research_manager.py | 159 +++++++++++++++++--------------------------- 1 file changed, 62 insertions(+), 97 deletions(-) diff --git a/research_manager.py b/research_manager.py index e95f873..dd77ffb 100644 --- a/research_manager.py +++ b/research_manager.py @@ -1,3 +1,4 @@ +import msvcrt import os import sys import threading @@ -14,17 +15,15 @@ from datetime import datetime from io import StringIO from colorama import init, Fore, Style import select -import termios -import tty from threading import Event from urllib.parse import urlparse from pathlib import Path # Initialize colorama for cross-platform color support -if os.name == 'nt': # Windows-specific initialization - init(convert=True, strip=False, wrap=True) +if os.name == 'nt': + import msvcrt else: - init() + raise ImportError("This version is Windows-specific. Please use the Unix version for other operating systems.") # Set up logging log_directory = 'logs' @@ -479,7 +478,7 @@ class TerminalUI: pass def get_input(self, prompt: Optional[str] = None) -> Optional[str]: - """Enhanced input handling with mouse scroll support""" + """Windows-compatible input handling""" try: if prompt: self.update_status(prompt) @@ -492,18 +491,10 @@ class TerminalUI: if self.should_terminate.is_set(): return None - try: - ch = self.input_win.getch() - - if ch == curses.KEY_MOUSE: - try: - mouse_event = curses.getmouse() - # Ignore mouse events entirely for now - continue - except curses.error: - continue - - if ch == 4: # Ctrl+D + if msvcrt.kbhit(): + ch = msvcrt.getch() + + if ch == b'\x04': # Ctrl+D result = self.input_buffer.strip() self.input_buffer = "" if not result: @@ -511,39 +502,32 @@ class TerminalUI: return "@quit" return result - elif ch == 3: # Ctrl+C + elif ch == b'\x03': # Ctrl+C self.should_terminate.set() self.cleanup() return "@quit" - elif ch == ord('\n'): # Enter + elif ch == b'\r': # Enter result = self.input_buffer.strip() if result: self.input_buffer = "" return result continue - elif ch == curses.KEY_BACKSPACE or ch == 127: # Backspace + elif ch == b'\x08': # Backspace if self.input_buffer: self.input_buffer = self.input_buffer[:-1] self._refresh_input_prompt() - elif 32 <= ch <= 126: # Printable characters - self.input_buffer += chr(ch) + elif 32 <= ord(ch[0]) <= 126: # Printable characters + self.input_buffer += ch.decode('utf-8') self._refresh_input_prompt() - except KeyboardInterrupt: - self.should_terminate.set() - self.cleanup() - return "@quit" - except curses.error: - self._refresh_input_prompt() - except Exception as e: logger.error(f"Error in get_input: {str(e)}") self.should_terminate.set() self.cleanup() - return "@quit" + return "@quit" def force_exit(self): """Force immediate exit with enhanced cleanup""" @@ -559,33 +543,21 @@ class TerminalUI: os._exit(0) # Ensure exit class NonBlockingInput: - """Handles non-blocking keyboard input for Unix-like systems""" - def __init__(self): - self.old_settings = None + """Windows-compatible non-blocking input""" + def __init__(self): + pass - def __enter__(self): - if os.name == 'nt': # Windows - return self - self.old_settings = termios.tcgetattr(sys.stdin) - tty.setcbreak(sys.stdin.fileno()) - return self + def __enter__(self): + return self - def __exit__(self, type, value, traceback): - if os.name != 'nt': # Unix-like - termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings) + def __exit__(self, type, value, traceback): + pass - def check_input(self, timeout=0.1): - """Check for input without blocking, cross-platform""" - if os.name == 'nt': # Windows - import msvcrt - if msvcrt.kbhit(): - return msvcrt.getch().decode('utf-8') - return None - else: # Unix-like - ready_to_read, _, _ = select.select([sys.stdin], [], [], timeout) - if ready_to_read: - return sys.stdin.read(1) - return None + def check_input(self, timeout=0.1): + """Check for input without blocking""" + if msvcrt.kbhit(): + return msvcrt.getch().decode('utf-8') + return None class ResearchManager: """Manages the research process including analysis, search, and documentation""" @@ -1380,56 +1352,49 @@ Answer: logger.error(f"Error generating response: {str(e)}") return f"I apologize, but I encountered an error processing your question: {str(e)}" - def get_multiline_conversation_input(self) -> str: - """Get multiline input with CTRL+D handling for conversation mode""" - buffer = [] +def get_multiline_conversation_input(self) -> str: + """Windows-compatible multiline input""" + buffer = [] + current_line = [] - # Save original terminal settings - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) + try: + while True: + if msvcrt.kbhit(): + char = msvcrt.getch() - try: - # Set terminal to raw mode - tty.setraw(fd) + # CTRL+D or CTRL+Z detection + if char in [b'\x04', b'\x1a']: + sys.stdout.write('\n') + if current_line: + buffer.append(''.join(current_line)) + return ' '.join(buffer).strip() - current_line = [] - while True: - char = sys.stdin.read(1) + # Handle special characters + elif char == b'\r': # Enter + sys.stdout.write('\n') + buffer.append(''.join(current_line)) + current_line = [] - # CTRL+D detection - if not char or ord(char) == 4: # EOF or CTRL+D - sys.stdout.write('\n') - if current_line: - buffer.append(''.join(current_line)) - return ' '.join(buffer).strip() + elif char == b'\x08': # Backspace + if current_line: + current_line.pop() + sys.stdout.write('\b \b') - # Handle special characters - elif ord(char) == 13: # Enter - sys.stdout.write('\n') - buffer.append(''.join(current_line)) - current_line = [] + elif char == b'\x03': # CTRL+C + sys.stdout.write('\n') + return 'quit' - elif ord(char) == 127: # Backspace - if current_line: - current_line.pop() - sys.stdout.write('\b \b') + # Normal character + elif 32 <= ord(char[0]) <= 126: + current_line.append(char.decode('utf-8')) + sys.stdout.write(char.decode('utf-8')) - elif ord(char) == 3: # CTRL+C - sys.stdout.write('\n') - return 'quit' - - # Normal character - elif 32 <= ord(char) <= 126: # Printable characters - current_line.append(char) - sys.stdout.write(char) - - sys.stdout.flush() - - finally: - # Restore terminal settings - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - print() # New line for clean display + sys.stdout.flush() + except Exception as e: + logger.error(f"Error in multiline input: {str(e)}") + return 'quit' + if __name__ == "__main__": from llm_wrapper import LLMWrapper from llm_response_parser import UltimateLLMResponseParser