mirror of
https://github.com/agent0ai/agent-zero.git
synced 2026-05-16 19:50:43 +00:00
134 lines
5 KiB
Python
134 lines
5 KiB
Python
from typing import Any
|
|
|
|
from helpers.tool import Tool, Response
|
|
from helpers.print_style import PrintStyle
|
|
from helpers.fasta2a_client import connect_to_agent, is_client_available
|
|
|
|
|
|
A2A_EMPTY_RESPONSE_ERROR = (
|
|
"A2A chat failed: the remote task completed but no assistant text was found. "
|
|
"Expected final.result.history to include an assistant message with a text "
|
|
"part, or a text artifact/status message. Treat this as a failed remote "
|
|
"response, not success."
|
|
)
|
|
|
|
|
|
def _session_key(agent_url: str) -> str:
|
|
"""Keep root and explicit /a2a URLs in the same conversation cache."""
|
|
normalized = agent_url.rstrip("/")
|
|
if normalized.endswith("/a2a"):
|
|
return normalized[:-4].rstrip("/")
|
|
return normalized
|
|
|
|
|
|
def _text_from_part(part: Any) -> str:
|
|
if not isinstance(part, dict):
|
|
return ""
|
|
for key in ("text", "content"):
|
|
value = part.get(key)
|
|
if isinstance(value, str) and value.strip():
|
|
return value.strip()
|
|
return ""
|
|
|
|
|
|
def _text_from_message(message: Any) -> str:
|
|
if isinstance(message, str):
|
|
return message.strip()
|
|
if not isinstance(message, dict):
|
|
return ""
|
|
|
|
parts = message.get("parts")
|
|
if isinstance(parts, list):
|
|
texts = [_text_from_part(part) for part in parts]
|
|
text = "\n".join(text for text in texts if text)
|
|
if text:
|
|
return text
|
|
|
|
for key in ("text", "content", "message", "output"):
|
|
value = message.get(key)
|
|
if isinstance(value, str) and value.strip():
|
|
return value.strip()
|
|
|
|
return ""
|
|
|
|
|
|
def _extract_latest_assistant_text(task_response: Any) -> str:
|
|
if not isinstance(task_response, dict):
|
|
return ""
|
|
|
|
result = task_response.get("result", task_response)
|
|
if not isinstance(result, dict):
|
|
return ""
|
|
|
|
history = result.get("history")
|
|
if isinstance(history, list):
|
|
for message in reversed(history):
|
|
if isinstance(message, dict) and message.get("role") == "user":
|
|
continue
|
|
text = _text_from_message(message)
|
|
if text:
|
|
return text
|
|
|
|
status = result.get("status")
|
|
if isinstance(status, dict):
|
|
text = _text_from_message(status.get("message"))
|
|
if text:
|
|
return text
|
|
|
|
artifacts = result.get("artifacts")
|
|
if isinstance(artifacts, list):
|
|
for artifact in reversed(artifacts):
|
|
text = _text_from_message(artifact)
|
|
if text:
|
|
return text
|
|
|
|
return _text_from_message(result)
|
|
|
|
|
|
class A2AChatTool(Tool):
|
|
"""Communicate with another FastA2A-compatible agent."""
|
|
|
|
async def execute(self, **kwargs):
|
|
if not is_client_available():
|
|
return Response(message="FastA2A client not available on this instance.", break_loop=False)
|
|
|
|
agent_url: str | None = kwargs.get("agent_url") # required
|
|
user_message: str | None = kwargs.get("message") # required
|
|
attachments = kwargs.get("attachments", None) # optional list[str]
|
|
reset = bool(kwargs.get("reset", False))
|
|
if not agent_url or not isinstance(agent_url, str):
|
|
return Response(message="agent_url argument missing", break_loop=False)
|
|
if not user_message or not isinstance(user_message, str):
|
|
return Response(message="message argument missing", break_loop=False)
|
|
|
|
# Retrieve or create session cache on the Agent instance
|
|
sessions: dict[str, str] = self.agent.get_data("_a2a_sessions") or {}
|
|
cache_key = _session_key(agent_url)
|
|
|
|
# Handle reset flag: start fresh conversation
|
|
if reset and cache_key in sessions:
|
|
sessions.pop(cache_key, None)
|
|
|
|
context_id = None if reset else sessions.get(cache_key)
|
|
try:
|
|
async with await connect_to_agent(agent_url) as conn:
|
|
task_resp = await conn.send_message(user_message, attachments=attachments, context_id=context_id)
|
|
task_id = task_resp.get("result", {}).get("id") # type: ignore[index]
|
|
if not task_id:
|
|
return Response(message="Remote agent failed to create task.", break_loop=False)
|
|
final = await conn.wait_for_completion(task_id)
|
|
new_context_id = final["result"].get("context_id") # type: ignore[index]
|
|
if isinstance(new_context_id, str):
|
|
sessions[cache_key] = new_context_id
|
|
# persist back to agent data
|
|
self.agent.set_data("_a2a_sessions", sessions)
|
|
assistant_text = _extract_latest_assistant_text(final)
|
|
if not assistant_text:
|
|
return Response(
|
|
message=A2A_EMPTY_RESPONSE_ERROR,
|
|
break_loop=False,
|
|
)
|
|
return Response(message=assistant_text, break_loop=False)
|
|
except Exception as e:
|
|
PrintStyle.error(f"A2A chat error: {e}")
|
|
return Response(message=f"A2A chat error: {e}", break_loop=False)
|