Queued Messages

This commit is contained in:
keyboardstaff 2026-01-29 21:22:41 -08:00
parent 680cfbf017
commit a8595e1575
14 changed files with 659 additions and 32 deletions

View file

@ -1,11 +1,10 @@
from agent import AgentContext, UserMessage
from python.helpers.api import ApiHandler, Request, Response
from python.helpers import files, extension
from python.helpers import files, extension, message_queue as mq
import os
from werkzeug.utils import secure_filename
from python.helpers.defer import DeferredTask
from python.helpers.print_style import PrintStyle
class Message(ApiHandler):
@ -64,30 +63,7 @@ class Message(ApiHandler):
# Store attachments in agent data
# context.agent0.set_data("attachments", attachment_paths)
# Prepare attachment filenames for logging
attachment_filenames = (
[os.path.basename(path) for path in attachment_paths]
if attachment_paths
else []
)
# Print to console and log
PrintStyle(
background_color="#6C3483", font_color="white", bold=True, padding=True
).print(f"User message:")
PrintStyle(font_color="white", padding=False).print(f"> {message}")
if attachment_filenames:
PrintStyle(font_color="white", padding=False).print("Attachments:")
for filename in attachment_filenames:
PrintStyle(font_color="white", padding=False).print(f"- {filename}")
# Log the message with message_id and attachments
context.log.log(
type="user",
heading="",
content=message,
kvps={"attachments": attachment_filenames},
id=message_id,
)
# Log to console and UI using helper function
mq.log_user_message(context, message, attachment_paths, message_id)
return context.communicate(UserMessage(message, attachment_paths)), context

View file

@ -0,0 +1,21 @@
from python.helpers.api import ApiHandler, Request, Response
from python.helpers import message_queue as mq
from agent import AgentContext
class MessageQueueAdd(ApiHandler):
"""Add a message to the queue."""
async def process(self, input: dict, request: Request) -> dict | Response:
context = AgentContext.get(input.get("context", ""))
if not context:
return Response("Context not found", status=404)
text = input.get("text", "").strip()
attachments = input.get("attachments", []) # filenames from /upload API
if not text and not attachments:
return Response("Empty message", status=400)
item = mq.add(context, text, attachments)
return {"ok": True, "item_id": item["id"], "queue_length": len(mq.get_queue(context))}

View file

@ -0,0 +1,16 @@
from python.helpers.api import ApiHandler, Request, Response
from python.helpers import message_queue as mq
from agent import AgentContext
class MessageQueueRemove(ApiHandler):
"""Remove message(s) from queue."""
async def process(self, input: dict, request: Request) -> dict | Response:
context = AgentContext.get(input.get("context", ""))
if not context:
return Response("Context not found", status=404)
item_id = input.get("item_id") # None means clear all
remaining = mq.remove(context, item_id)
return {"ok": True, "remaining": remaining}

View file

@ -0,0 +1,30 @@
from python.helpers.api import ApiHandler, Request, Response
from python.helpers import message_queue as mq
from agent import AgentContext
class MessageQueueSend(ApiHandler):
"""Send queued message(s) immediately."""
async def process(self, input: dict, request: Request) -> dict | Response:
context = AgentContext.get(input.get("context", ""))
if not context:
return Response("Context not found", status=404)
if not mq.has_queue(context):
return {"ok": True, "message": "Queue empty"}
item_id = input.get("item_id")
send_all = input.get("send_all", False)
if send_all:
count = mq.send_all_aggregated(context)
return {"ok": True, "sent_count": count}
# Send single item
item = mq.pop_item(context, item_id) if item_id else mq.pop_first(context)
if not item:
return Response("Item not found", status=404)
mq.send_message(context, item)
return {"ok": True, "sent_item_id": item["id"]}

View file

@ -122,4 +122,5 @@ class Poll(ApiHandler):
"notifications": notifications,
"notifications_guid": notification_manager.guid,
"notifications_version": len(notification_manager.updates),
"message_queue": context.output_data.get("message_queue", []) if context else [],
}

View file

@ -14,7 +14,7 @@ class UploadFile(ApiHandler):
for file in file_list:
if file and self.allowed_file(file.filename): # Check file type
filename = secure_filename(file.filename) # type: ignore
file.save(files.get_abs_path("tmp/upload", filename))
file.save(files.get_abs_path("tmp/uploads", filename))
saved_filenames.append(filename)
return {"filenames": saved_filenames} # Return saved filenames

View file

@ -0,0 +1,33 @@
import asyncio
from python.helpers.extension import Extension
from python.helpers import message_queue as mq
from agent import LoopData
class ProcessQueue(Extension):
"""Process queued messages after monologue ends."""
async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
# Only process for agent0 (main agent)
if self.agent.number != 0:
return
context = self.agent.context
# Check if there are queued messages
if mq.has_queue(context):
# Schedule delayed task to send next queued message
# This allows current monologue to fully complete first
asyncio.create_task(self._delayed_send(context))
async def _delayed_send(self, context):
"""Wait for task to complete, then send next queued message."""
# Small delay to ensure monologue fully completes
await asyncio.sleep(0.1)
# Wait for current task to finish
while context.task and context.task.is_alive():
await asyncio.sleep(0.1)
# Send next queued message
mq.send_next(context)

View file

@ -0,0 +1,182 @@
import os
import uuid
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from agent import AgentContext
from python.helpers.print_style import PrintStyle
QUEUE_KEY = "message_queue"
QUEUE_SEQ_KEY = "message_queue_seq"
UPLOAD_FOLDER = "/a0/tmp/uploads"
def get_queue(context: "AgentContext") -> list:
"""Get current queue from context.data."""
return context.get_data(QUEUE_KEY) or []
def _get_next_seq(context: "AgentContext") -> int:
"""Get next sequence number."""
seq = context.get_data(QUEUE_SEQ_KEY) or 0
seq += 1
context.set_data(QUEUE_SEQ_KEY, seq)
return seq
def _sync_output(context: "AgentContext"):
"""Sync queue to output_data for frontend polling."""
queue = get_queue(context)
# Truncate text for frontend display
truncated = []
for item in queue:
truncated.append({
"id": item["id"],
"seq": item.get("seq", 0),
"text": item["text"][:100] + "..." if len(item["text"]) > 100 else item["text"],
"attachments": [a.split("/")[-1] for a in item.get("attachments", [])],
"attachment_count": len(item.get("attachments", [])),
})
context.set_output_data(QUEUE_KEY, truncated)
def add(context: "AgentContext", text: str, attachments: list[str] | None = None) -> dict:
"""Add message to queue. Attachments should be filenames, will be converted to full paths."""
queue = get_queue(context)
# Convert filenames to full paths
full_paths = []
for att in (attachments or []):
if att.startswith("/"):
full_paths.append(att)
else:
full_paths.append(f"{UPLOAD_FOLDER}/{att}")
item = {
"id": str(uuid.uuid4())[:8],
"seq": _get_next_seq(context),
"text": text,
"attachments": full_paths,
}
queue.append(item)
context.set_data(QUEUE_KEY, queue)
_sync_output(context)
return item
def remove(context: "AgentContext", item_id: str | None = None) -> int:
"""Remove item(s). If item_id is None, clears all. Returns remaining count."""
if not item_id:
context.set_data(QUEUE_KEY, [])
context.set_output_data(QUEUE_KEY, [])
return 0
queue = [i for i in get_queue(context) if i["id"] != item_id]
context.set_data(QUEUE_KEY, queue)
_sync_output(context)
return len(queue)
def pop_first(context: "AgentContext") -> dict | None:
"""Remove and return first item."""
queue = get_queue(context)
if not queue:
return None
item = queue.pop(0)
context.set_data(QUEUE_KEY, queue)
_sync_output(context)
return item
def pop_item(context: "AgentContext", item_id: str) -> dict | None:
"""Remove and return specific item."""
queue = get_queue(context)
for i, item in enumerate(queue):
if item["id"] == item_id:
queue.pop(i)
context.set_data(QUEUE_KEY, queue)
_sync_output(context)
return item
return None
def has_queue(context: "AgentContext") -> bool:
"""Check if queue has items."""
return len(get_queue(context)) > 0
def log_user_message(
context: "AgentContext",
message: str,
attachment_paths: list[str],
message_id: str | None = None,
source: str = "",
):
"""Log user message to console and UI. Used by message API and queue processing."""
# Prepare attachment filenames for logging
attachment_filenames = (
[os.path.basename(path) for path in attachment_paths]
if attachment_paths
else []
)
# Print to console
label = f"User message{source}:"
PrintStyle(
background_color="#6C3483", font_color="white", bold=True, padding=True
).print(label)
PrintStyle(font_color="white", padding=False).print(f"> {message}")
if attachment_filenames:
PrintStyle(font_color="white", padding=False).print("Attachments:")
for filename in attachment_filenames:
PrintStyle(font_color="white", padding=False).print(f"- {filename}")
# Log to UI
context.log.log(
type="user",
heading="",
content=message,
kvps={"attachments": attachment_filenames},
id=message_id,
)
def send_message(context: "AgentContext", item: dict, source: str = " (from queue)"):
"""Send a single queued message (log + communicate)."""
from agent import UserMessage # Import here to avoid circular import
message = item.get("text", "")
attachments = item.get("attachments", [])
log_user_message(context, message, attachments, source=source)
context.communicate(UserMessage(message, attachments))
def send_next(context: "AgentContext") -> bool:
"""Send next queued message. Returns True if sent, False if queue empty."""
if not has_queue(context):
return False
item = pop_first(context)
if item:
send_message(context, item)
return True
return False
def send_all_aggregated(context: "AgentContext") -> int:
"""Aggregate and send all queued messages as one. Returns count of items sent."""
from agent import UserMessage # Import here to avoid circular import
if not has_queue(context):
return 0
items = []
while has_queue(context):
items.append(pop_first(context))
# Combine texts with separator
text = "\n\n---\n\n".join(i["text"] for i in items if i["text"])
attachments = [a for i in items for a in i.get("attachments", [])]
log_user_message(context, text, attachments, source=" (queued batch)")
context.communicate(UserMessage(text, attachments))
return len(items)