mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-08 09:59:53 +00:00
Automated bundler for preinstalled packages. Bundling python executable and dependencies, imports project files respecting .gitignore.
114 lines
No EOL
4.9 KiB
Python
114 lines
No EOL
4.9 KiB
Python
import asyncio
|
|
import sys
|
|
import threading, time, models, os
|
|
from ansio import application_keypad, mouse_input, raw_input
|
|
from ansio.input import InputEvent, get_input_event
|
|
from agent import AgentContext
|
|
from python.helpers.print_style import PrintStyle
|
|
from python.helpers.files import read_file
|
|
from python.helpers import files
|
|
import python.helpers.timed_input as timed_input
|
|
from initialize import initialize
|
|
from python.helpers.dotenv import load_dotenv
|
|
|
|
|
|
context: AgentContext = None # type: ignore
|
|
input_lock = threading.Lock()
|
|
|
|
|
|
# Main conversation loop
|
|
async def chat(context: AgentContext):
|
|
|
|
# start the conversation loop
|
|
while True:
|
|
# ask user for message
|
|
with input_lock:
|
|
timeout = context.agent0.get_data("timeout") # how long the agent is willing to wait
|
|
if not timeout: # if agent wants to wait for user input forever
|
|
PrintStyle(background_color="#6C3483", font_color="white", bold=True, padding=True).print(f"User message ('e' to leave):")
|
|
if sys.platform != "win32": import readline # this fixes arrow keys in terminal
|
|
user_input = input("> ")
|
|
PrintStyle(font_color="white", padding=False, log_only=True).print(f"> {user_input}")
|
|
|
|
else: # otherwise wait for user input with a timeout
|
|
PrintStyle(background_color="#6C3483", font_color="white", bold=True, padding=True).print(f"User message ({timeout}s timeout, 'w' to wait, 'e' to leave):")
|
|
if sys.platform != "win32": import readline # this fixes arrow keys in terminal
|
|
# user_input = timed_input("> ", timeout=timeout)
|
|
user_input = timeout_input("> ", timeout=timeout)
|
|
|
|
if not user_input:
|
|
user_input = context.agent0.read_prompt("fw.msg_timeout.md")
|
|
PrintStyle(font_color="white", padding=False).stream(f"{user_input}")
|
|
else:
|
|
user_input = user_input.strip()
|
|
if user_input.lower()=="w": # the user needs more time
|
|
user_input = input("> ").strip()
|
|
PrintStyle(font_color="white", padding=False, log_only=True).print(f"> {user_input}")
|
|
|
|
|
|
|
|
# exit the conversation when the user types 'exit'
|
|
if user_input.lower() == 'e': break
|
|
|
|
# send message to agent0,
|
|
assistant_response = await context.communicate(user_input).result()
|
|
|
|
# print agent0 response
|
|
PrintStyle(font_color="white",background_color="#1D8348", bold=True, padding=True).print(f"{context.agent0.agent_name}: reponse:")
|
|
PrintStyle(font_color="white").print(f"{assistant_response}")
|
|
|
|
|
|
# User intervention during agent streaming
|
|
def intervention():
|
|
if context.streaming_agent and not context.paused:
|
|
context.paused = True # stop agent streaming
|
|
PrintStyle(background_color="#6C3483", font_color="white", bold=True, padding=True).print(f"User intervention ('e' to leave, empty to continue):")
|
|
|
|
if sys.platform != "win32": import readline # this fixes arrow keys in terminal
|
|
user_input = input("> ").strip()
|
|
PrintStyle(font_color="white", padding=False, log_only=True).print(f"> {user_input}")
|
|
|
|
if user_input.lower() == 'e': os._exit(0) # exit the conversation when the user types 'exit'
|
|
if user_input: context.streaming_agent.intervention_message = user_input # set intervention message if non-empty
|
|
context.paused = False # continue agent streaming
|
|
|
|
|
|
# Capture keyboard input to trigger user intervention
|
|
def capture_keys():
|
|
global input_lock
|
|
intervent=False
|
|
while True:
|
|
if intervent: intervention()
|
|
intervent = False
|
|
time.sleep(0.1)
|
|
|
|
if context.streaming_agent:
|
|
# with raw_input, application_keypad, mouse_input:
|
|
with input_lock, raw_input, application_keypad:
|
|
event: InputEvent | None = get_input_event(timeout=0.1)
|
|
if event and (event.shortcut.isalpha() or event.shortcut.isspace()):
|
|
intervent=True
|
|
continue
|
|
|
|
# User input with timeout
|
|
def timeout_input(prompt, timeout=10):
|
|
return timed_input.timeout_input(prompt=prompt, timeout=timeout)
|
|
|
|
def run():
|
|
print("Initializing framework...")
|
|
|
|
#load env vars
|
|
load_dotenv()
|
|
|
|
# initialize context
|
|
config = initialize()
|
|
context = AgentContext(config)
|
|
|
|
# Start the key capture thread for user intervention during agent streaming
|
|
threading.Thread(target=capture_keys, daemon=True).start()
|
|
|
|
#start the chat
|
|
asyncio.run(chat(context))
|
|
|
|
if __name__ == "__main__":
|
|
run() |