Update research_manager.py for windows

Replaced termios-based input handling with msvcrt
Modified TerminalUI class
Added Windows-specific NonBlockingInput class
Updated get_multiline_conversation_input method
Removed Unix-specific imports
This commit is contained in:
Hafeez 2024-11-20 21:46:46 +05:30 committed by GitHub
parent aa73019d70
commit c53dddc457
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,3 +1,4 @@
import msvcrt
import os
import sys
import threading
@ -14,17 +15,15 @@ from datetime import datetime
from io import StringIO
from colorama import init, Fore, Style
import select
import termios
import tty
from threading import Event
from urllib.parse import urlparse
from pathlib import Path
# Initialize colorama for cross-platform color support
if os.name == 'nt': # Windows-specific initialization
init(convert=True, strip=False, wrap=True)
if os.name == 'nt':
import msvcrt
else:
init()
raise ImportError("This version is Windows-specific. Please use the Unix version for other operating systems.")
# Set up logging
log_directory = 'logs'
@ -479,7 +478,7 @@ class TerminalUI:
pass
def get_input(self, prompt: Optional[str] = None) -> Optional[str]:
"""Enhanced input handling with mouse scroll support"""
"""Windows-compatible input handling"""
try:
if prompt:
self.update_status(prompt)
@ -492,18 +491,10 @@ class TerminalUI:
if self.should_terminate.is_set():
return None
try:
ch = self.input_win.getch()
if ch == curses.KEY_MOUSE:
try:
mouse_event = curses.getmouse()
# Ignore mouse events entirely for now
continue
except curses.error:
continue
if ch == 4: # Ctrl+D
if msvcrt.kbhit():
ch = msvcrt.getch()
if ch == b'\x04': # Ctrl+D
result = self.input_buffer.strip()
self.input_buffer = ""
if not result:
@ -511,39 +502,32 @@ class TerminalUI:
return "@quit"
return result
elif ch == 3: # Ctrl+C
elif ch == b'\x03': # Ctrl+C
self.should_terminate.set()
self.cleanup()
return "@quit"
elif ch == ord('\n'): # Enter
elif ch == b'\r': # Enter
result = self.input_buffer.strip()
if result:
self.input_buffer = ""
return result
continue
elif ch == curses.KEY_BACKSPACE or ch == 127: # Backspace
elif ch == b'\x08': # Backspace
if self.input_buffer:
self.input_buffer = self.input_buffer[:-1]
self._refresh_input_prompt()
elif 32 <= ch <= 126: # Printable characters
self.input_buffer += chr(ch)
elif 32 <= ord(ch[0]) <= 126: # Printable characters
self.input_buffer += ch.decode('utf-8')
self._refresh_input_prompt()
except KeyboardInterrupt:
self.should_terminate.set()
self.cleanup()
return "@quit"
except curses.error:
self._refresh_input_prompt()
except Exception as e:
logger.error(f"Error in get_input: {str(e)}")
self.should_terminate.set()
self.cleanup()
return "@quit"
return "@quit"
def force_exit(self):
"""Force immediate exit with enhanced cleanup"""
@ -559,33 +543,21 @@ class TerminalUI:
os._exit(0) # Ensure exit
class NonBlockingInput:
"""Handles non-blocking keyboard input for Unix-like systems"""
def __init__(self):
self.old_settings = None
"""Windows-compatible non-blocking input"""
def __init__(self):
pass
def __enter__(self):
if os.name == 'nt': # Windows
return self
self.old_settings = termios.tcgetattr(sys.stdin)
tty.setcbreak(sys.stdin.fileno())
return self
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if os.name != 'nt': # Unix-like
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings)
def __exit__(self, type, value, traceback):
pass
def check_input(self, timeout=0.1):
"""Check for input without blocking, cross-platform"""
if os.name == 'nt': # Windows
import msvcrt
if msvcrt.kbhit():
return msvcrt.getch().decode('utf-8')
return None
else: # Unix-like
ready_to_read, _, _ = select.select([sys.stdin], [], [], timeout)
if ready_to_read:
return sys.stdin.read(1)
return None
def check_input(self, timeout=0.1):
"""Check for input without blocking"""
if msvcrt.kbhit():
return msvcrt.getch().decode('utf-8')
return None
class ResearchManager:
"""Manages the research process including analysis, search, and documentation"""
@ -1380,56 +1352,49 @@ Answer:
logger.error(f"Error generating response: {str(e)}")
return f"I apologize, but I encountered an error processing your question: {str(e)}"
def get_multiline_conversation_input(self) -> str:
"""Get multiline input with CTRL+D handling for conversation mode"""
buffer = []
def get_multiline_conversation_input(self) -> str:
"""Windows-compatible multiline input"""
buffer = []
current_line = []
# Save original terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
while True:
if msvcrt.kbhit():
char = msvcrt.getch()
try:
# Set terminal to raw mode
tty.setraw(fd)
# CTRL+D or CTRL+Z detection
if char in [b'\x04', b'\x1a']:
sys.stdout.write('\n')
if current_line:
buffer.append(''.join(current_line))
return ' '.join(buffer).strip()
current_line = []
while True:
char = sys.stdin.read(1)
# Handle special characters
elif char == b'\r': # Enter
sys.stdout.write('\n')
buffer.append(''.join(current_line))
current_line = []
# CTRL+D detection
if not char or ord(char) == 4: # EOF or CTRL+D
sys.stdout.write('\n')
if current_line:
buffer.append(''.join(current_line))
return ' '.join(buffer).strip()
elif char == b'\x08': # Backspace
if current_line:
current_line.pop()
sys.stdout.write('\b \b')
# Handle special characters
elif ord(char) == 13: # Enter
sys.stdout.write('\n')
buffer.append(''.join(current_line))
current_line = []
elif char == b'\x03': # CTRL+C
sys.stdout.write('\n')
return 'quit'
elif ord(char) == 127: # Backspace
if current_line:
current_line.pop()
sys.stdout.write('\b \b')
# Normal character
elif 32 <= ord(char[0]) <= 126:
current_line.append(char.decode('utf-8'))
sys.stdout.write(char.decode('utf-8'))
elif ord(char) == 3: # CTRL+C
sys.stdout.write('\n')
return 'quit'
# Normal character
elif 32 <= ord(char) <= 126: # Printable characters
current_line.append(char)
sys.stdout.write(char)
sys.stdout.flush()
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
print() # New line for clean display
sys.stdout.flush()
except Exception as e:
logger.error(f"Error in multiline input: {str(e)}")
return 'quit'
if __name__ == "__main__":
from llm_wrapper import LLMWrapper
from llm_response_parser import UltimateLLMResponseParser