mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-08 01:41:42 +00:00
Merge branch 'development' into filename_fix
This commit is contained in:
commit
eb50861689
37 changed files with 2610 additions and 2503 deletions
|
|
@ -1,11 +1,10 @@
|
|||
from agent import AgentContext, UserMessage
|
||||
from python.helpers.api import ApiHandler, Request, Response
|
||||
|
||||
from python.helpers import files, extension
|
||||
from python.helpers import files, extension, message_queue as mq
|
||||
import os
|
||||
from python.helpers.security import safe_filename
|
||||
from python.helpers.defer import DeferredTask
|
||||
from python.helpers.print_style import PrintStyle
|
||||
|
||||
|
||||
class Message(ApiHandler):
|
||||
|
|
@ -66,30 +65,7 @@ class Message(ApiHandler):
|
|||
# Store attachments in agent data
|
||||
# context.agent0.set_data("attachments", attachment_paths)
|
||||
|
||||
# Prepare attachment filenames for logging
|
||||
attachment_filenames = (
|
||||
[os.path.basename(path) for path in attachment_paths]
|
||||
if attachment_paths
|
||||
else []
|
||||
)
|
||||
|
||||
# Print to console and log
|
||||
PrintStyle(
|
||||
background_color="#6C3483", font_color="white", bold=True, padding=True
|
||||
).print(f"User message:")
|
||||
PrintStyle(font_color="white", padding=False).print(f"> {message}")
|
||||
if attachment_filenames:
|
||||
PrintStyle(font_color="white", padding=False).print("Attachments:")
|
||||
for filename in attachment_filenames:
|
||||
PrintStyle(font_color="white", padding=False).print(f"- {filename}")
|
||||
|
||||
# Log the message with message_id and attachments
|
||||
context.log.log(
|
||||
type="user",
|
||||
heading="",
|
||||
content=message,
|
||||
kvps={"attachments": attachment_filenames},
|
||||
id=message_id,
|
||||
)
|
||||
# Log to console and UI using helper function
|
||||
mq.log_user_message(context, message, attachment_paths, message_id)
|
||||
|
||||
return context.communicate(UserMessage(message, attachment_paths)), context
|
||||
|
|
|
|||
21
python/api/message_queue_add.py
Normal file
21
python/api/message_queue_add.py
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
from python.helpers.api import ApiHandler, Request, Response
|
||||
from python.helpers import message_queue as mq
|
||||
from agent import AgentContext
|
||||
|
||||
|
||||
class MessageQueueAdd(ApiHandler):
|
||||
"""Add a message to the queue."""
|
||||
|
||||
async def process(self, input: dict, request: Request) -> dict | Response:
|
||||
context = AgentContext.get(input.get("context", ""))
|
||||
if not context:
|
||||
return Response("Context not found", status=404)
|
||||
|
||||
text = input.get("text", "").strip()
|
||||
attachments = input.get("attachments", []) # filenames from /upload API
|
||||
|
||||
if not text and not attachments:
|
||||
return Response("Empty message", status=400)
|
||||
|
||||
item = mq.add(context, text, attachments)
|
||||
return {"ok": True, "item_id": item["id"], "queue_length": len(mq.get_queue(context))}
|
||||
16
python/api/message_queue_remove.py
Normal file
16
python/api/message_queue_remove.py
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
from python.helpers.api import ApiHandler, Request, Response
|
||||
from python.helpers import message_queue as mq
|
||||
from agent import AgentContext
|
||||
|
||||
|
||||
class MessageQueueRemove(ApiHandler):
|
||||
"""Remove message(s) from queue."""
|
||||
|
||||
async def process(self, input: dict, request: Request) -> dict | Response:
|
||||
context = AgentContext.get(input.get("context", ""))
|
||||
if not context:
|
||||
return Response("Context not found", status=404)
|
||||
|
||||
item_id = input.get("item_id") # None means clear all
|
||||
remaining = mq.remove(context, item_id)
|
||||
return {"ok": True, "remaining": remaining}
|
||||
30
python/api/message_queue_send.py
Normal file
30
python/api/message_queue_send.py
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
from python.helpers.api import ApiHandler, Request, Response
|
||||
from python.helpers import message_queue as mq
|
||||
from agent import AgentContext
|
||||
|
||||
|
||||
class MessageQueueSend(ApiHandler):
|
||||
"""Send queued message(s) immediately."""
|
||||
|
||||
async def process(self, input: dict, request: Request) -> dict | Response:
|
||||
context = AgentContext.get(input.get("context", ""))
|
||||
if not context:
|
||||
return Response("Context not found", status=404)
|
||||
|
||||
if not mq.has_queue(context):
|
||||
return {"ok": True, "message": "Queue empty"}
|
||||
|
||||
item_id = input.get("item_id")
|
||||
send_all = input.get("send_all", False)
|
||||
|
||||
if send_all:
|
||||
count = mq.send_all_aggregated(context)
|
||||
return {"ok": True, "sent_count": count}
|
||||
|
||||
# Send single item
|
||||
item = mq.pop_item(context, item_id) if item_id else mq.pop_first(context)
|
||||
if not item:
|
||||
return Response("Item not found", status=404)
|
||||
|
||||
mq.send_message(context, item)
|
||||
return {"ok": True, "sent_item_id": item["id"]}
|
||||
|
|
@ -122,4 +122,6 @@ class Poll(ApiHandler):
|
|||
"notifications": notifications,
|
||||
"notifications_guid": notification_manager.guid,
|
||||
"notifications_version": len(notification_manager.updates),
|
||||
"message_queue": context.output_data.get("message_queue", []) if context else [],
|
||||
"running": context.is_running() if context else False,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ class UploadFile(ApiHandler):
|
|||
filename = safe_filename(file.filename)
|
||||
if not filename:
|
||||
continue
|
||||
file.save(files.get_abs_path("tmp/upload", filename))
|
||||
file.save(files.get_abs_path("tmp/uploads", filename))
|
||||
saved_filenames.append(filename)
|
||||
|
||||
return {"filenames": saved_filenames} # Return saved filenames
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ class LogForStream(Extension):
|
|||
def build_heading(agent, text: str, icon: str = "network_intelligence"):
|
||||
# Include agent identifier for all agents (A0:, A1:, A2:, etc.)
|
||||
agent_prefix = f"{agent.agent_name}: "
|
||||
return f"icon://{icon} {agent_prefix}{text}"
|
||||
return f"{agent_prefix}{text}"
|
||||
|
||||
def build_default_heading(agent):
|
||||
return build_heading(agent, "Generating...")
|
||||
return build_heading(agent, "Calling LLM...")
|
||||
33
python/extensions/monologue_end/_95_process_queue.py
Normal file
33
python/extensions/monologue_end/_95_process_queue.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
import asyncio
|
||||
from python.helpers.extension import Extension
|
||||
from python.helpers import message_queue as mq
|
||||
from agent import LoopData
|
||||
|
||||
|
||||
class ProcessQueue(Extension):
|
||||
"""Process queued messages after monologue ends."""
|
||||
|
||||
async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
|
||||
# Only process for agent0 (main agent)
|
||||
if self.agent.number != 0:
|
||||
return
|
||||
|
||||
context = self.agent.context
|
||||
|
||||
# Check if there are queued messages
|
||||
if mq.has_queue(context):
|
||||
# Schedule delayed task to send next queued message
|
||||
# This allows current monologue to fully complete first
|
||||
asyncio.create_task(self._delayed_send(context))
|
||||
|
||||
async def _delayed_send(self, context):
|
||||
"""Wait for task to complete, then send next queued message."""
|
||||
# Small delay to ensure monologue fully completes
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Wait for current task to finish
|
||||
while context.task and context.task.is_alive():
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Send next queued message
|
||||
mq.send_next(context)
|
||||
|
|
@ -13,7 +13,7 @@ class LogFromStream(Extension):
|
|||
|
||||
# thought length indicator
|
||||
pipes = "|" * math.ceil(math.sqrt(len(text)))
|
||||
heading = build_heading(self.agent, f"Reasoning.. {pipes}")
|
||||
heading = build_heading(self.agent, f"Reasoning... {pipes}")
|
||||
|
||||
# create log message and store it in loop data temporary params
|
||||
if "log_item_generating" not in loop_data.params_temporary:
|
||||
|
|
@ -21,9 +21,10 @@ class LogFromStream(Extension):
|
|||
self.agent.context.log.log(
|
||||
type="agent",
|
||||
heading=heading,
|
||||
step="Reasoning..."
|
||||
)
|
||||
)
|
||||
|
||||
# update log message
|
||||
log_item = loop_data.params_temporary["log_item_generating"]
|
||||
log_item.update(heading=heading, reasoning=text)
|
||||
log_item.update(heading=heading, reasoning=text, step="Reasoning...")
|
||||
|
|
|
|||
|
|
@ -24,7 +24,11 @@ class LogFromStream(Extension):
|
|||
elif "tool_name" in parsed:
|
||||
heading = build_heading(self.agent, f"Using {parsed['tool_name']}") # if the llm skipped headline
|
||||
elif "thoughts" in parsed:
|
||||
heading = build_default_heading(self.agent)
|
||||
# thought length indicator
|
||||
pipes = "|" * math.ceil(math.sqrt(len(text)))
|
||||
heading = build_heading(self.agent, f"Thinking... {pipes}")
|
||||
else:
|
||||
heading = build_heading(self.agent, "Receiving...")
|
||||
|
||||
# create log message and store it in loop data temporary params
|
||||
if "log_item_generating" not in loop_data.params_temporary:
|
||||
|
|
@ -42,7 +46,25 @@ class LogFromStream(Extension):
|
|||
kvps = {}
|
||||
if log_item.kvps is not None and "reasoning" in log_item.kvps:
|
||||
kvps["reasoning"] = log_item.kvps["reasoning"]
|
||||
|
||||
# step description for UI - using tool XY, writing Python code, etc.
|
||||
if parsed is not None and "tool_name" in parsed and parsed["tool_name"]:
|
||||
kvps["step"] = f"Using {parsed['tool_name']}..." # using tool XY
|
||||
if parsed["tool_name"]=="code_execution_tool":
|
||||
if "tool_args" in parsed and "runtime" in parsed["tool_args"]:
|
||||
pipes = ""
|
||||
if "code" in parsed["tool_args"]:
|
||||
pipes = "|" * math.ceil(math.sqrt(len(parsed["tool_args"]["code"])))
|
||||
kvps["step"] = f"Writing code... {pipes}"
|
||||
if parsed["tool_args"]["runtime"] == "python":
|
||||
kvps["step"] = f"Writing Python code... {pipes}"
|
||||
elif parsed["tool_args"]["runtime"] == "nodejs":
|
||||
kvps["step"] = f"Writing Node.js code... {pipes}"
|
||||
elif parsed["tool_args"]["runtime"] == "terminal":
|
||||
kvps["step"] = f"Writing terminal command... {pipes}"
|
||||
kvps.update(parsed)
|
||||
|
||||
|
||||
|
||||
# update the log item
|
||||
log_item.update(heading=heading, content=text, kvps=kvps)
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
from python.helpers import persist_chat, tokens
|
||||
from python.helpers.extension import Extension
|
||||
from agent import LoopData
|
||||
import asyncio
|
||||
from python.helpers.log import LogItem
|
||||
from python.helpers import log
|
||||
import math
|
||||
from python.extensions.before_main_llm_call._10_log_for_stream import build_heading, build_default_heading
|
||||
|
||||
|
||||
class LogFromStream(Extension):
|
||||
|
||||
async def execute(
|
||||
self,
|
||||
loop_data: LoopData = LoopData(),
|
||||
text: str = "",
|
||||
parsed: dict = {},
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
# get log item from loop data temporary params
|
||||
log_item = loop_data.params_temporary["log_item_generating"]
|
||||
if log_item is None:
|
||||
return
|
||||
|
||||
# remove step parameter when done
|
||||
if log_item.kvps is not None and "step" in log_item.kvps:
|
||||
del log_item.kvps["step"]
|
||||
|
||||
# update the log item
|
||||
log_item.update(kvps=log_item.kvps)
|
||||
|
|
@ -21,12 +21,14 @@ Type = Literal[
|
|||
"agent",
|
||||
"browser",
|
||||
"code_exe",
|
||||
"subagent",
|
||||
"error",
|
||||
"hint",
|
||||
"info",
|
||||
"progress",
|
||||
"response",
|
||||
"tool",
|
||||
"mcp",
|
||||
"input",
|
||||
"user",
|
||||
"util",
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ from datetime import timedelta
|
|||
import json
|
||||
from python.helpers import errors
|
||||
from python.helpers import settings
|
||||
from python.helpers.log import LogItem
|
||||
|
||||
import httpx
|
||||
|
||||
|
|
@ -100,6 +101,14 @@ def initialize_mcp(mcp_servers_config: str):
|
|||
class MCPTool(Tool):
|
||||
"""MCP Tool wrapper"""
|
||||
|
||||
def get_log_object(self) -> LogItem:
|
||||
return self.agent.context.log.log(
|
||||
type="mcp",
|
||||
heading=f"icon://extension {self.agent.agent_name}: Using MCP tool '{self.name}'",
|
||||
content="",
|
||||
kvps={"tool_name": self.name, **self.args},
|
||||
)
|
||||
|
||||
async def execute(self, **kwargs: Any):
|
||||
error = ""
|
||||
try:
|
||||
|
|
@ -1070,9 +1079,9 @@ class MCPClientRemote(MCPClientBase):
|
|||
server: MCPServerRemote = cast(MCPServerRemote, self.server)
|
||||
set = settings.get_settings()
|
||||
|
||||
# Use lower timeouts for faster failure detection
|
||||
init_timeout = min(server.init_timeout or set["mcp_client_init_timeout"], 5)
|
||||
tool_timeout = min(server.tool_timeout or set["mcp_client_tool_timeout"], 10)
|
||||
# Resolve timeout: check server config first, then settings, defaulting to 5s/10s
|
||||
init_timeout = server.init_timeout or set["mcp_client_init_timeout"] or 5
|
||||
tool_timeout = server.tool_timeout or set["mcp_client_tool_timeout"] or 10
|
||||
|
||||
client_factory = CustomHTTPClientFactory(verify=server.verify)
|
||||
# Check if this is a streaming HTTP type
|
||||
|
|
|
|||
182
python/helpers/message_queue.py
Normal file
182
python/helpers/message_queue.py
Normal file
|
|
@ -0,0 +1,182 @@
|
|||
import os
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from agent import AgentContext
|
||||
|
||||
from python.helpers.print_style import PrintStyle
|
||||
|
||||
QUEUE_KEY = "message_queue"
|
||||
QUEUE_SEQ_KEY = "message_queue_seq"
|
||||
UPLOAD_FOLDER = "/a0/tmp/uploads"
|
||||
|
||||
|
||||
def get_queue(context: "AgentContext") -> list:
|
||||
"""Get current queue from context.data."""
|
||||
return context.get_data(QUEUE_KEY) or []
|
||||
|
||||
|
||||
def _get_next_seq(context: "AgentContext") -> int:
|
||||
"""Get next sequence number."""
|
||||
seq = context.get_data(QUEUE_SEQ_KEY) or 0
|
||||
seq += 1
|
||||
context.set_data(QUEUE_SEQ_KEY, seq)
|
||||
return seq
|
||||
|
||||
|
||||
def _sync_output(context: "AgentContext"):
|
||||
"""Sync queue to output_data for frontend polling."""
|
||||
queue = get_queue(context)
|
||||
# Truncate text for frontend display
|
||||
truncated = []
|
||||
for item in queue:
|
||||
truncated.append({
|
||||
"id": item["id"],
|
||||
"seq": item.get("seq", 0),
|
||||
"text": item["text"][:100] + "..." if len(item["text"]) > 100 else item["text"],
|
||||
"attachments": [a.split("/")[-1] for a in item.get("attachments", [])],
|
||||
"attachment_count": len(item.get("attachments", [])),
|
||||
})
|
||||
context.set_output_data(QUEUE_KEY, truncated)
|
||||
|
||||
|
||||
def add(context: "AgentContext", text: str, attachments: list[str] | None = None) -> dict:
|
||||
"""Add message to queue. Attachments should be filenames, will be converted to full paths."""
|
||||
queue = get_queue(context)
|
||||
|
||||
# Convert filenames to full paths
|
||||
full_paths = []
|
||||
for att in (attachments or []):
|
||||
if att.startswith("/"):
|
||||
full_paths.append(att)
|
||||
else:
|
||||
full_paths.append(f"{UPLOAD_FOLDER}/{att}")
|
||||
|
||||
item = {
|
||||
"id": str(uuid.uuid4())[:8],
|
||||
"seq": _get_next_seq(context),
|
||||
"text": text,
|
||||
"attachments": full_paths,
|
||||
}
|
||||
queue.append(item)
|
||||
context.set_data(QUEUE_KEY, queue)
|
||||
_sync_output(context)
|
||||
return item
|
||||
|
||||
|
||||
def remove(context: "AgentContext", item_id: str | None = None) -> int:
|
||||
"""Remove item(s). If item_id is None, clears all. Returns remaining count."""
|
||||
if not item_id:
|
||||
context.set_data(QUEUE_KEY, [])
|
||||
context.set_output_data(QUEUE_KEY, [])
|
||||
return 0
|
||||
queue = [i for i in get_queue(context) if i["id"] != item_id]
|
||||
context.set_data(QUEUE_KEY, queue)
|
||||
_sync_output(context)
|
||||
return len(queue)
|
||||
|
||||
|
||||
def pop_first(context: "AgentContext") -> dict | None:
|
||||
"""Remove and return first item."""
|
||||
queue = get_queue(context)
|
||||
if not queue:
|
||||
return None
|
||||
item = queue.pop(0)
|
||||
context.set_data(QUEUE_KEY, queue)
|
||||
_sync_output(context)
|
||||
return item
|
||||
|
||||
|
||||
def pop_item(context: "AgentContext", item_id: str) -> dict | None:
|
||||
"""Remove and return specific item."""
|
||||
queue = get_queue(context)
|
||||
for i, item in enumerate(queue):
|
||||
if item["id"] == item_id:
|
||||
queue.pop(i)
|
||||
context.set_data(QUEUE_KEY, queue)
|
||||
_sync_output(context)
|
||||
return item
|
||||
return None
|
||||
|
||||
|
||||
def has_queue(context: "AgentContext") -> bool:
|
||||
"""Check if queue has items."""
|
||||
return len(get_queue(context)) > 0
|
||||
|
||||
|
||||
def log_user_message(
|
||||
context: "AgentContext",
|
||||
message: str,
|
||||
attachment_paths: list[str],
|
||||
message_id: str | None = None,
|
||||
source: str = "",
|
||||
):
|
||||
"""Log user message to console and UI. Used by message API and queue processing."""
|
||||
# Prepare attachment filenames for logging
|
||||
attachment_filenames = (
|
||||
[os.path.basename(path) for path in attachment_paths]
|
||||
if attachment_paths
|
||||
else []
|
||||
)
|
||||
|
||||
# Print to console
|
||||
label = f"User message{source}:"
|
||||
PrintStyle(
|
||||
background_color="#6C3483", font_color="white", bold=True, padding=True
|
||||
).print(label)
|
||||
PrintStyle(font_color="white", padding=False).print(f"> {message}")
|
||||
if attachment_filenames:
|
||||
PrintStyle(font_color="white", padding=False).print("Attachments:")
|
||||
for filename in attachment_filenames:
|
||||
PrintStyle(font_color="white", padding=False).print(f"- {filename}")
|
||||
|
||||
# Log to UI
|
||||
context.log.log(
|
||||
type="user",
|
||||
heading="",
|
||||
content=message,
|
||||
kvps={"attachments": attachment_filenames},
|
||||
id=message_id,
|
||||
)
|
||||
|
||||
|
||||
def send_message(context: "AgentContext", item: dict, source: str = " (from queue)"):
|
||||
"""Send a single queued message (log + communicate)."""
|
||||
from agent import UserMessage # Import here to avoid circular import
|
||||
|
||||
message = item.get("text", "")
|
||||
attachments = item.get("attachments", [])
|
||||
log_user_message(context, message, attachments, source=source)
|
||||
context.communicate(UserMessage(message, attachments))
|
||||
|
||||
|
||||
def send_next(context: "AgentContext") -> bool:
|
||||
"""Send next queued message. Returns True if sent, False if queue empty."""
|
||||
if not has_queue(context):
|
||||
return False
|
||||
item = pop_first(context)
|
||||
if item:
|
||||
send_message(context, item)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def send_all_aggregated(context: "AgentContext") -> int:
|
||||
"""Aggregate and send all queued messages as one. Returns count of items sent."""
|
||||
from agent import UserMessage # Import here to avoid circular import
|
||||
|
||||
if not has_queue(context):
|
||||
return 0
|
||||
|
||||
items = []
|
||||
while has_queue(context):
|
||||
items.append(pop_first(context))
|
||||
|
||||
# Combine texts with separator
|
||||
text = "\n\n---\n\n".join(i["text"] for i in items if i["text"])
|
||||
attachments = [a for i in items for a in i.get("attachments", [])]
|
||||
|
||||
log_user_message(context, text, attachments, source=" (queued batch)")
|
||||
context.communicate(UserMessage(text, attachments))
|
||||
return len(items)
|
||||
|
|
@ -45,7 +45,7 @@ class Delegation(Tool):
|
|||
|
||||
def get_log_object(self):
|
||||
return self.agent.context.log.log(
|
||||
type="tool",
|
||||
type="subagent",
|
||||
heading=f"icon://communication {self.agent.agent_name}: Calling Subordinate Agent",
|
||||
content="",
|
||||
kvps=self.args,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue