diff --git a/prompts/agent.system.tool.a2a_chat.md b/prompts/agent.system.tool.a2a_chat.md new file mode 100644 index 000000000..de9d192fc --- /dev/null +++ b/prompts/agent.system.tool.a2a_chat.md @@ -0,0 +1,73 @@ +### a2a_chat: +This tool lets Agent Zero chat with any other FastA2A-compatible agent. +It automatically keeps conversation **context** (so each subsequent call +continues the same dialogue) and supports optional file attachments. + +#### What the tool can do +* Start a brand-new conversation with a remote agent. +* Continue an existing conversation transparently (context handled for you). +* Send text plus optional file URIs (images, docs, etc.). +* Receive the assistant’s reply as plain text. + +#### Arguments +* `agent_url` (string, required) – Base URL of the *remote* agent. + • Accepts `host:port`, `http://host:port`, or full path ending in `/a2a`. +* `message` (string, required) – The text you want to send. +* `attachments` (list[string], optional) – URIs pointing to files you want + to send along with the message (can be http(s):// or file path). +* `reset` (boolean, optional) – Set to `true` to start a **new** conversation + with the same `agent_url` (clears stored context). Default `false`. + +> Leave **context_id** out – the tool handles it internally. + +#### Usage – first message +##### Request +```json +{ + "thoughts": [ + "I want to ask the weather-bot for today’s forecast." + ], + "headline": "Ask remote agent (weather-bot)", + "tool_name": "a2a_chat", + "tool_args": { + "agent_url": "http://weather.example.com:8000/a2a", + "message": "Hello! What’s the forecast for Berlin today?", + "attachments": [], + "reset": false + } +} +``` +##### Response (assistant-side) +```plaintext +☀️ It will be sunny with a high of 22 °C. +``` + +#### Usage – follow-up (context automatically preserved) +##### Request +```json +{ + "thoughts": [ + "Need tomorrow’s forecast too." + ], + "headline": "Follow-up question", + "tool_name": "a2a_chat", + "tool_args": { + "agent_url": "http://weather.example.com:8000/a2a", + "message": "And tomorrow?", + "attachments": [], + "reset": false + } +} +``` +##### Response +```plaintext +🌦️ Partly cloudy with showers, high 18 °C. +``` + +#### Notes +1. **New conversation** – omit previous `agent_url` or use a *different* URL. +2. **Attachments** – supply absolute URIs ("http://…", "file:/…"). +3. The tool stores session IDs per `agent_url` inside the current + `AgentContext` – no manual handling required. +4. Use `"reset": true` to forget previous context and start a new chat. +5. The remote agent must implement FastA2A v0.2+ protocol. diff --git a/python/helpers/fasta2a_client.py b/python/helpers/fasta2a_client.py new file mode 100644 index 000000000..94f511d4c --- /dev/null +++ b/python/helpers/fasta2a_client.py @@ -0,0 +1,209 @@ +import uuid +from typing import Any, Dict, List, Optional +from python.helpers.print_style import PrintStyle + +try: + from fasta2a.client import A2AClient # type: ignore + import httpx # type: ignore + FASTA2A_CLIENT_AVAILABLE = True +except ImportError: + FASTA2A_CLIENT_AVAILABLE = False + PrintStyle.warning("FastA2A client not available. Agent-to-agent communication disabled.") + +_PRINTER = PrintStyle(italic=True, font_color="cyan", padding=False) + + +class AgentConnection: + """Helper class for connecting to and communicating with other Agent Zero instances via FastA2A.""" + + def __init__(self, agent_url: str, timeout: int = 30, token: Optional[str] = None): + """Initialize connection to an agent. + + Args: + agent_url: The base URL of the agent (e.g., "https://agent.example.com") + timeout: Request timeout in seconds + """ + if not FASTA2A_CLIENT_AVAILABLE: + raise RuntimeError("FastA2A client not available") + + # Ensure scheme is present + if not agent_url.startswith(('http://', 'https://')): + agent_url = 'http://' + agent_url + + self.agent_url = agent_url.rstrip('/') + self.timeout = timeout + # Auth headers + if token is None: + import os + token = os.getenv("A2A_TOKEN") + headers = {} + if token: + headers["Authorization"] = f"Bearer {token}" + headers["X-API-KEY"] = token + self._http_client = httpx.AsyncClient(timeout=timeout, headers=headers) # type: ignore + self._a2a_client = A2AClient(base_url=self.agent_url, http_client=self._http_client) # type: ignore + self._agent_card: Optional[Dict[str, Any]] = None + # Track conversation context automatically + self._context_id: Optional[str] = None + + async def get_agent_card(self) -> Dict[str, Any]: + """Retrieve the agent card from the remote agent.""" + if self._agent_card is None: + try: + response = await self._http_client.get(f"{self.agent_url}/.well-known/agent.json") + response.raise_for_status() + self._agent_card = response.json() + _PRINTER.print(f"Retrieved agent card from {self.agent_url}") + _PRINTER.print(f"Agent: {self._agent_card.get('name', 'Unknown')}") + _PRINTER.print(f"Description: {self._agent_card.get('description', 'No description')}") + except Exception as e: + # Fallback: if URL contains '/a2a', try root path without it + if "/a2a" in self.agent_url: + root_url = self.agent_url.split("/a2a", 1)[0] + try: + response = await self._http_client.get(f"{root_url}/.well-known/agent.json") + response.raise_for_status() + self._agent_card = response.json() + _PRINTER.print(f"Retrieved agent card from {root_url}") + except Exception: + pass # swallow, will re-raise below + _PRINTER.print(f"[!] Could not connect to {self.agent_url}\n → Ensure the server is running and reachable.\n → Full error: {e}") + raise RuntimeError(f"Could not retrieve agent card: {e}") + + return self._agent_card # type: ignore + + async def send_message( + self, + message: str, + attachments: Optional[List[str]] = None, + context_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """Send a message to the remote agent and return task response.""" + if not self._agent_card: + await self.get_agent_card() + + # Re-use context automatically if caller did not supply one + if context_id is None: + context_id = self._context_id + + # Build message parts + parts = [{'kind': 'text', 'text': message}] + + if attachments: + for attachment in attachments: + file_part = {'kind': 'file', 'file': {'uri': attachment}} + parts.append(file_part) # type: ignore + + # Construct A2A message + a2a_message = { + 'role': 'user', + 'parts': parts, + 'kind': 'message', + 'message_id': str(uuid.uuid4()) + } + + if context_id is not None: + a2a_message['context_id'] = context_id + + # Send using the message/send method (not send_task) + try: + response = await self._a2a_client.send_message( + message=a2a_message, # type: ignore + metadata=metadata, + configuration={'accepted_output_modes': ['application/json', 'text/plain'], 'blocking': True} # type: ignore + ) + + # Persist context id for subsequent calls + try: + ctx = response.get('result', {}).get('context_id') # type: ignore[index] + if isinstance(ctx, str): + self._context_id = ctx + except Exception: + pass # ignore if structure differs + return response # type: ignore + except Exception as e: + _PRINTER.print(f"[A2A] Error sending message: {e}") + raise + + async def get_task(self, task_id: str) -> Dict[str, Any]: + """Get the status and results of a task. + + Args: + task_id: The ID of the task to query + + Returns: + Dictionary containing the task information + """ + try: + response = await self._a2a_client.get_task(task_id) # type: ignore + return response # type: ignore + except Exception as e: + _PRINTER.print(f"Failed to get task {task_id}: {e}") + raise RuntimeError(f"Failed to get task: {e}") + + async def wait_for_completion(self, task_id: str, poll_interval: int = 2, max_wait: int = 300) -> Dict[str, Any]: + """Wait for a task to complete and return the final result. + + Args: + task_id: The ID of the task to wait for + poll_interval: How often to check task status (seconds) + max_wait: Maximum time to wait (seconds) + + Returns: + Dictionary containing the completed task information + """ + import asyncio + + waited = 0 + while waited < max_wait: + task_info = await self.get_task(task_id) + + if 'result' in task_info: + task = task_info['result'] + status = task.get('status', {}) + state = status.get('state', 'unknown') + + if state in ['completed', 'failed', 'canceled']: + _PRINTER.print(f"Task {task_id} finished with state: {state}") + return task_info + else: + _PRINTER.print(f"Task {task_id} status: {state}") + + await asyncio.sleep(poll_interval) + waited += poll_interval + + raise TimeoutError(f"Task {task_id} did not complete within {max_wait} seconds") + + async def close(self): + """Close the HTTP client connection.""" + await self._http_client.aclose() + + async def __aenter__(self): + """Async context manager entry.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await self.close() + + +async def connect_to_agent(agent_url: str, timeout: int = 30) -> AgentConnection: + """Create a connection to a remote agent. + + Args: + agent_url: The base URL of the agent + timeout: Request timeout in seconds + + Returns: + AgentConnection instance + """ + connection = AgentConnection(agent_url, timeout) + # Verify connection by retrieving agent card + await connection.get_agent_card() + return connection + + +def is_client_available() -> bool: + """Check if FastA2A client is available.""" + return FASTA2A_CLIENT_AVAILABLE diff --git a/python/helpers/fasta2a_server.py b/python/helpers/fasta2a_server.py new file mode 100644 index 000000000..53250be8e --- /dev/null +++ b/python/helpers/fasta2a_server.py @@ -0,0 +1,484 @@ +# noqa: D401 (docstrings) – internal helper +import asyncio +import uuid +import atexit +from typing import Any, List +import contextlib +import threading + +from python.helpers import settings +from starlette.requests import Request + +# Local imports +from python.helpers.print_style import PrintStyle +from agent import AgentContext, UserMessage +from initialize import initialize_agent + +# Import FastA2A +try: + from fasta2a import Worker, FastA2A # type: ignore + from fasta2a.broker import InMemoryBroker # type: ignore + from fasta2a.storage import InMemoryStorage # type: ignore + from fasta2a.schema import Message, Artifact, AgentProvider, Skill # type: ignore + FASTA2A_AVAILABLE = True +except ImportError: # pragma: no cover – library not installed + FASTA2A_AVAILABLE = False + # Minimal stubs for type checkers when FastA2A is not available + + class Worker: # type: ignore + def __init__(self, **kwargs): + pass + + async def run_task(self, params): + pass + + async def cancel_task(self, params): + pass + + def build_message_history(self, history): + return [] + + def build_artifacts(self, result): + return [] + + class FastA2A: # type: ignore + def __init__(self, **kwargs): + pass + + async def __call__(self, scope, receive, send): + pass + + class InMemoryBroker: # type: ignore + pass + + class InMemoryStorage: # type: ignore + async def update_task(self, **kwargs): + pass + + Message = Artifact = AgentProvider = Skill = Any # type: ignore + +_PRINTER = PrintStyle(italic=True, font_color="purple", padding=False) + + +class AgentZeroWorker(Worker): # type: ignore[misc] + """Agent Zero implementation of FastA2A Worker.""" + + def __init__(self, broker, storage): + super().__init__(broker=broker, storage=storage) + self.storage = storage + + async def run_task(self, params: Any) -> None: # params: TaskSendParams + """Execute a task by processing the message through Agent Zero.""" + try: + task_id = params['id'] + context_id = params['context_id'] + message = params['message'] + + _PRINTER.print(f"[A2A] Processing task {task_id} in context {context_id}") + + # Convert A2A message to Agent Zero format + agent_message = self._convert_message(message) + + # Get or create Agent Zero context + context = AgentContext.get(context_id) + if not context: + # Create new context for this A2A conversation + cfg = initialize_agent() + context = AgentContext(cfg, id=context_id) + + # Log user message so it appears instantly in UI chat window + context.log.log( + type="user", # type: ignore[arg-type] + heading="Remote user message", + content=agent_message.message, + kvps={"from": "A2A"}, + temp=False, + ) + + # Process message through Agent Zero (includes response) + task = context.communicate(agent_message) + result_text = await task.result() + + # Build A2A message from result + response_message: Message = { # type: ignore + 'role': 'agent', + 'parts': [{'kind': 'text', 'text': str(result_text)}], + 'kind': 'message', + 'message_id': str(uuid.uuid4()) + } + + await self.storage.update_task( # type: ignore[attr-defined] + task_id=task_id, + state='completed', + new_messages=[response_message] + ) + + _PRINTER.print(f"[A2A] Completed task {task_id}") + + except Exception as e: + _PRINTER.print(f"[A2A] Error processing task {params.get('id', 'unknown')}: {e}") + await self.storage.update_task( + task_id=params.get('id', 'unknown'), + state='failed' + ) + + async def cancel_task(self, params: Any) -> None: # params: TaskIdParams + """Cancel a running task.""" + task_id = params['id'] + _PRINTER.print(f"[A2A] Cancelling task {task_id}") + await self.storage.update_task(task_id=task_id, state='canceled') # type: ignore[attr-defined] + + def build_message_history(self, history: List[Any]) -> List[Message]: # type: ignore + # Not used in this simplified implementation + return [] + + def build_artifacts(self, result: Any) -> List[Artifact]: # type: ignore + # No artifacts for now + return [] + + def _convert_message(self, a2a_message: Message) -> UserMessage: # type: ignore + """Convert A2A message to Agent Zero UserMessage.""" + # Extract text from message parts + text_parts = [part.get('text', '') for part in a2a_message.get('parts', []) if part.get('kind') == 'text'] + message_text = '\n'.join(text_parts) + + # Extract file attachments + attachments = [] + for part in a2a_message.get('parts', []): + if part.get('kind') == 'file': + file_info = part.get('file', {}) + if 'uri' in file_info: + attachments.append(file_info['uri']) + + return UserMessage( + message=message_text, + attachments=attachments + ) + + +class DynamicA2AProxy: + """Dynamic proxy for FastA2A server that allows reconfiguration.""" + + _instance = None + + def __init__(self): + self.app = None + self.token = "" + self._lock = threading.Lock() # Use threading.Lock instead of asyncio.Lock + self._startup_done: bool = False + self._worker_bg_task: asyncio.Task | None = None + self._reconfigure_needed: bool = False # Flag for deferred reconfiguration + + if FASTA2A_AVAILABLE: + # Initialize with default token + cfg = settings.get_settings() + self.token = cfg.get("mcp_server_token", "") + self._configure() + self._register_shutdown() + else: + _PRINTER.print("[A2A] FastA2A not available, server will return 503") + + @staticmethod + def get_instance(): + if DynamicA2AProxy._instance is None: + DynamicA2AProxy._instance = DynamicA2AProxy() + return DynamicA2AProxy._instance + + def reconfigure(self, token: str): + """Reconfigure the FastA2A server with new token.""" + self.token = token + if FASTA2A_AVAILABLE: + with self._lock: + # Mark that reconfiguration is needed - will be done on next request + self._reconfigure_needed = True + self._startup_done = False # Force restart on next request + _PRINTER.print("[A2A] Reconfiguration scheduled for next request") + + def _configure(self): + """Configure the FastA2A application with Agent Zero integration.""" + try: + storage = InMemoryStorage() # type: ignore[arg-type] + broker = InMemoryBroker() # type: ignore[arg-type] + + # Define Agent Zero's skills + skills: List[Skill] = [{ # type: ignore + "id": "general_assistance", + "name": "General AI Assistant", + "description": "Provides general AI assistance including code execution, file management, web browsing, and problem solving", + "tags": ["ai", "assistant", "code", "files", "web", "automation"], + "examples": [ + "Write and execute Python code", + "Manage files and directories", + "Browse the web and extract information", + "Solve complex problems step by step", + "Install software and manage systems" + ], + "input_modes": ["text/plain", "application/octet-stream"], + "output_modes": ["text/plain", "application/json"] + }] + + provider: AgentProvider = { # type: ignore + "organization": "Agent Zero", + "url": "https://github.com/frdel/agent-zero" + } + + # Create new FastA2A app with proper thread safety + new_app = FastA2A( # type: ignore + storage=storage, + broker=broker, + name="Agent Zero", + description=( + "A general AI assistant that can execute code, manage files, browse the web, and " + "solve complex problems in an isolated Linux environment." + ), + version="1.0.0", + provider=provider, + skills=skills, + lifespan=None, # We manage lifespan manually + middleware=[], # No middleware - we handle auth in wrapper + ) + + # Store for later lazy startup (needs active event-loop) + self._storage = storage # type: ignore[attr-defined] + self._broker = broker # type: ignore[attr-defined] + self._worker = AgentZeroWorker(broker=broker, storage=storage) # type: ignore[attr-defined] + + # Atomic update of the app + self.app = new_app + + _PRINTER.print("[A2A] FastA2A server configured successfully") + + except Exception as e: + _PRINTER.print(f"[A2A] Failed to configure FastA2A server: {e}") + self.app = None + raise + + # --------------------------------------------------------------------- + # Shutdown handling + # --------------------------------------------------------------------- + + def _register_shutdown(self): + """Register an atexit hook to gracefully stop worker & task manager.""" + + def _sync_shutdown(): + try: + if not self._startup_done or not FASTA2A_AVAILABLE: + return + loop = asyncio.new_event_loop() + loop.run_until_complete(self._async_shutdown()) + loop.close() + except Exception: + pass # ignore errors during interpreter shutdown + + atexit.register(_sync_shutdown) + + async def _async_shutdown(self): + """Async shutdown: cancel worker task & close task manager.""" + if self._worker_bg_task and not self._worker_bg_task.done(): + self._worker_bg_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._worker_bg_task + try: + if hasattr(self, 'app') and self.app: + await self.app.task_manager.__aexit__(None, None, None) # type: ignore[attr-defined] + except Exception: + pass + + async def _async_reconfigure(self): + """Perform async reconfiguration with proper lifecycle management.""" + _PRINTER.print("[A2A] Starting async reconfiguration") + + # Shutdown existing components + await self._async_shutdown() + + # Reset startup state + self._startup_done = False + self._worker_bg_task = None + + # Reconfigure with new token + self._configure() + + # Restart components + await self._startup() + + # Clear reconfiguration flag + self._reconfigure_needed = False + + _PRINTER.print("[A2A] Async reconfiguration completed") + + async def _startup(self): + """Ensure TaskManager and Worker are running inside current event-loop.""" + if self._startup_done or not FASTA2A_AVAILABLE: + return + self._startup_done = True + + # Start task manager + await self.app.task_manager.__aenter__() # type: ignore[attr-defined] + + async def _worker_loop(): + async with self._worker.run(): # type: ignore[attr-defined] + await asyncio.Event().wait() + + # fire-and-forget background task – keep reference + self._worker_bg_task = asyncio.create_task(_worker_loop()) + _PRINTER.print("[A2A] Worker & TaskManager started") + + async def __call__(self, scope, receive, send): + """ASGI application interface with token-based routing.""" + if not FASTA2A_AVAILABLE: + # FastA2A not available, return 503 + response = b'HTTP/1.1 503 Service Unavailable\r\n\r\nFastA2A not available' + await send({ + 'type': 'http.response.start', + 'status': 503, + 'headers': [[b'content-type', b'text/plain']], + }) + await send({ + 'type': 'http.response.body', + 'body': response, + }) + return + + # Check if reconfiguration is needed + if self._reconfigure_needed: + try: + await self._async_reconfigure() + except Exception as e: + _PRINTER.print(f"[A2A] Error during reconfiguration: {e}") + # Return 503 if reconfiguration failed + await send({ + 'type': 'http.response.start', + 'status': 503, + 'headers': [[b'content-type', b'text/plain']], + }) + await send({ + 'type': 'http.response.body', + 'body': b'FastA2A reconfiguration failed', + }) + return + + if self.app is None: + # FastA2A not configured, return 503 + response = b'HTTP/1.1 503 Service Unavailable\r\n\r\nFastA2A not configured' + await send({ + 'type': 'http.response.start', + 'status': 503, + 'headers': [[b'content-type', b'text/plain']], + }) + await send({ + 'type': 'http.response.body', + 'body': response, + }) + return + + # Lazy-start background components the first time we get a request + if not self._startup_done: + try: + _PRINTER.print("[A2A] Starting up FastA2A components") + await self._startup() + except Exception as e: + _PRINTER.print(f"[A2A] Error during startup: {e}") + # Return 503 if startup failed + await send({ + 'type': 'http.response.start', + 'status': 503, + 'headers': [[b'content-type', b'text/plain']], + }) + await send({ + 'type': 'http.response.body', + 'body': b'FastA2A startup failed', + }) + return + + # Handle token-based routing: /a2a/t-{token}/... or /t-{token}/... + path = scope.get('path', '') + + # Strip /a2a prefix if present (DispatcherMiddleware doesn't always strip it) + if path.startswith('/a2a'): + path = path[4:] # Remove '/a2a' prefix + + # Check if path matches token pattern /t-{token}/ + if path.startswith('/t-') and '/' in path[3:]: + # Extract token from path + path_parts = path[3:].split('/', 1) # Remove '/t-' prefix + request_token = path_parts[0] + remaining_path = '/' + path_parts[1] if len(path_parts) > 1 else '/' + + # Validate token + cfg = settings.get_settings() + expected_token = cfg.get("mcp_server_token") + + if expected_token and request_token != expected_token: + # Invalid token, return 401 + await send({ + 'type': 'http.response.start', + 'status': 401, + 'headers': [[b'content-type', b'text/plain']], + }) + await send({ + 'type': 'http.response.body', + 'body': b'Unauthorized', + }) + return + + # Update scope with cleaned path + scope = dict(scope) + scope['path'] = remaining_path + else: + # No token in path, check other auth methods + request = Request(scope, receive=receive) + + cfg = settings.get_settings() + expected = cfg.get("mcp_server_token") + + if expected: + auth_header = request.headers.get("Authorization", "") + api_key = request.headers.get("X-API-KEY") or request.query_params.get("api_key") + + is_authorized = ( + (auth_header.startswith("Bearer ") and auth_header.split(" ", 1)[1] == expected) or + (api_key == expected) + ) + + if not is_authorized: + # No valid auth, return 401 + await send({ + 'type': 'http.response.start', + 'status': 401, + 'headers': [[b'content-type', b'text/plain']], + }) + await send({ + 'type': 'http.response.body', + 'body': b'Unauthorized', + }) + return + else: + _PRINTER.print("[A2A] No expected token found in settings") + + # Delegate to FastA2A app with cleaned scope + with self._lock: + app = self.app + if app: + await app(scope, receive, send) + else: + # App not configured, return 503 + await send({ + 'type': 'http.response.start', + 'status': 503, + 'headers': [[b'content-type', b'text/plain']], + }) + await send({ + 'type': 'http.response.body', + 'body': b'FastA2A app not configured', + }) + return + + +def is_available(): + """Check if FastA2A is available and properly configured.""" + return FASTA2A_AVAILABLE and DynamicA2AProxy.get_instance().app is not None + + +def get_proxy(): + """Get the FastA2A proxy instance.""" + return DynamicA2AProxy.get_instance() diff --git a/python/helpers/settings.py b/python/helpers/settings.py index 9c842630d..9ef32703c 100644 --- a/python/helpers/settings.py +++ b/python/helpers/settings.py @@ -70,7 +70,7 @@ class Settings(TypedDict): memory_memorize_enabled: bool memory_memorize_consolidation: bool memory_memorize_replace_threshold: float - + api_keys: dict[str, str] @@ -526,6 +526,25 @@ def convert_out(settings: Settings) -> SettingsOutput: } ) + # -------- A2A Section -------- + a2a_fields: list[SettingsField] = [ + { + "id": "show_a2a_connection", + "title": "Show A2A connection info", + "description": "Display the URL (including token) other agents can use to connect via FastA2A.", + "type": "button", + "value": "Show", + } + ] + + a2a_section: SettingsSection = { + "id": "a2a_server", + "title": "A2A Connection", + "description": "Share this connection string with other agents.", + "fields": a2a_fields, + "tab": "external", + } + if runtime.is_dockerized(): auth_fields.append( { @@ -913,7 +932,7 @@ def convert_out(settings: Settings) -> SettingsOutput: # TTS fields tts_fields: list[SettingsField] = [] - + tts_fields.append( { "id": "tts_kokoro", @@ -1061,6 +1080,7 @@ def convert_out(settings: Settings) -> SettingsOutput: speech_section, api_keys_section, auth_section, + a2a_section, mcp_client_section, mcp_server_section, backup_section, @@ -1369,6 +1389,18 @@ def _apply_settings(previous: Settings | None): update_mcp_token, current_token ) # TODO overkill, replace with background task + # update token in a2a server + if not previous or current_token != previous["mcp_server_token"]: + + async def update_a2a_token(token: str): + from python.helpers.fasta2a_server import DynamicA2AProxy + + DynamicA2AProxy.get_instance().reconfigure(token=token) + + task4 = defer.DeferredTask().start_task( + update_a2a_token, current_token + ) # TODO overkill, replace with background task + def _env_to_dict(data: str): env_dict = {} diff --git a/python/tools/a2a_chat.py b/python/tools/a2a_chat.py new file mode 100644 index 000000000..6ee664449 --- /dev/null +++ b/python/tools/a2a_chat.py @@ -0,0 +1,57 @@ +from python.helpers.tool import Tool, Response +from python.helpers.print_style import PrintStyle + +try: + from python.helpers.fasta2a_client import connect_to_agent, is_client_available # type: ignore +except ImportError: # pragma: no cover – client helper missing + is_client_available = lambda: False # type: ignore + + +class A2AChatTool(Tool): + """Communicate with another FastA2A-compatible agent.""" + + async def execute(self, **kwargs): + if not is_client_available(): + return Response(message="FastA2A client not available on this instance.", break_loop=False) + + agent_url: str | None = kwargs.get("agent_url") # required + user_message: str | None = kwargs.get("message") # required + attachments = kwargs.get("attachments", None) # optional list[str] + reset = bool(kwargs.get("reset", False)) + if not agent_url or not isinstance(agent_url, str): + return Response(message="agent_url argument missing", break_loop=False) + if not user_message or not isinstance(user_message, str): + return Response(message="message argument missing", break_loop=False) + + # Retrieve or create session cache on the Agent instance + sessions: dict[str, str] = self.agent.get_data("_a2a_sessions") or {} + + # Handle reset flag – start fresh conversation + if reset and agent_url in sessions: + sessions.pop(agent_url, None) + + context_id = None if reset else sessions.get(agent_url) + try: + async with await connect_to_agent(agent_url) as conn: + task_resp = await conn.send_message(user_message, attachments=attachments, context_id=context_id) + task_id = task_resp.get("result", {}).get("id") # type: ignore[index] + if not task_id: + return Response(message="Remote agent failed to create task.", break_loop=False) + final = await conn.wait_for_completion(task_id) + new_context_id = final["result"].get("context_id") # type: ignore[index] + if isinstance(new_context_id, str): + sessions[agent_url] = new_context_id + # persist back to agent data + self.agent.set_data("_a2a_sessions", sessions) + # Extract latest assistant text + history = final["result"].get("history", []) + assistant_text = "" + if history: + last_parts = history[-1].get("parts", []) + assistant_text = "\n".join( + p.get("text", "") for p in last_parts if p.get("kind") == "text" + ) + return Response(message=assistant_text or "(no response)", break_loop=False) + except Exception as e: + PrintStyle.error(f"A2A chat error: {e}") + return Response(message=f"A2A chat error: {e}", break_loop=False) diff --git a/requirements.txt b/requirements.txt index 738145f1b..799335b10 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ docker==7.1.0 duckduckgo-search==6.1.12 faiss-cpu==1.11.0 fastmcp==2.3.4 +fasta2a==0.5.0 flask[async]==3.0.3 flask-basicauth==0.2.0 flaredantic==0.1.4 diff --git a/run_ui.py b/run_ui.py index 07c57da9e..1818da5f1 100644 --- a/run_ui.py +++ b/run_ui.py @@ -1,18 +1,15 @@ from datetime import timedelta import os import secrets -import sys import time import socket import struct from functools import wraps import threading -import signal -from typing import override from flask import Flask, request, Response, session from flask_basicauth import BasicAuth import initialize -from python.helpers import errors, files, git, mcp_server +from python.helpers import files, git, mcp_server, fasta2a_server from python.helpers.files import get_abs_path from python.helpers import runtime, dotenv, process from python.helpers.extract_tools import load_classes_from_folder @@ -22,6 +19,7 @@ from python.helpers.print_style import PrintStyle # Set the new timezone to 'UTC' os.environ["TZ"] = "UTC" +os.environ["TOKENIZERS_PARALLELISM"] = "false" # Apply the timezone change if hasattr(time, 'tzset'): time.tzset() @@ -173,7 +171,7 @@ def run(): from werkzeug.serving import WSGIRequestHandler from werkzeug.serving import make_server from werkzeug.middleware.dispatcher import DispatcherMiddleware - from a2wsgi import ASGIMiddleware, WSGIMiddleware + from a2wsgi import ASGIMiddleware PrintStyle().print("Starting server...") @@ -216,15 +214,16 @@ def run(): for handler in handlers: register_api_handler(webapp, handler) - # add the webapp and mcp to the app - app = DispatcherMiddleware( - webapp, - { - "/mcp": ASGIMiddleware(app=mcp_server.DynamicMcpProxy.get_instance()), # type: ignore - }, - ) + # add the webapp, mcp, and a2a to the app + middleware_routes = { + "/mcp": ASGIMiddleware(app=mcp_server.DynamicMcpProxy.get_instance()), # type: ignore + "/a2a": ASGIMiddleware(app=fasta2a_server.DynamicA2AProxy.get_instance()), # type: ignore + } + + app = DispatcherMiddleware(webapp, middleware_routes) # type: ignore PrintStyle().debug(f"Starting server at http://{host}:{port} ...") + PrintStyle().debug("FastA2A mounted at: /a2a") server = make_server( host=host, diff --git a/test_fasta2a_client.py b/test_fasta2a_client.py new file mode 100644 index 000000000..541576ba6 --- /dev/null +++ b/test_fasta2a_client.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +""" +Test script to verify FastA2A agent card routing and authentication. +""" + +import asyncio +import sys +from python.helpers import settings + + +def get_test_urls(): + """Get the URLs to test based on current settings.""" + try: + cfg = settings.get_settings() + token = cfg.get("mcp_server_token", "") + + if not token: + print("❌ No mcp_server_token found in settings") + return None + + base_url = "http://localhost:50101" + + urls = { + "token_based": f"{base_url}/a2a/t-{token}/.well-known/agent.json", + "bearer_auth": f"{base_url}/a2a/.well-known/agent.json", + "api_key_header": f"{base_url}/a2a/.well-known/agent.json", + "api_key_query": f"{base_url}/a2a/.well-known/agent.json?api_key={token}" + } + + return {"token": token, "urls": urls} + + except Exception as e: + print(f"❌ Error getting settings: {e}") + return None + + +def print_test_commands(): + """Print curl commands to test FastA2A authentication.""" + data = get_test_urls() + if not data: + return + + token = data["token"] + urls = data["urls"] + + print("🚀 FastA2A Agent Card Testing Commands") + print("=" * 60) + print(f"Current token: {token}") + print() + + print("1️⃣ Token-based URL (recommended):") + print(f" curl -v '{urls['token_based']}'") + print() + + print("2️⃣ Bearer authentication:") + print(f" curl -v -H 'Authorization: Bearer {token}' '{urls['bearer_auth']}'") + print() + + print("3️⃣ API key header:") + print(f" curl -v -H 'X-API-KEY: {token}' '{urls['api_key_header']}'") + print() + + print("4️⃣ API key query parameter:") + print(f" curl -v '{urls['api_key_query']}'") + print() + + print("Expected response (if working):") + print(" HTTP/1.1 200 OK") + print(" Content-Type: application/json") + print(" {") + print(' "name": "Agent Zero",') + print(' "version": "1.0.0",') + print(' "skills": [...]') + print(" }") + print() + + print("Expected error (if auth fails):") + print(" HTTP/1.1 401 Unauthorized") + print(" Unauthorized") + print() + + +def print_troubleshooting(): + """Print troubleshooting information.""" + print("🔧 Troubleshooting FastA2A Issues") + print("=" * 40) + print() + print("1. Server not running:") + print(" - Make sure Agent Zero is running: python run_ui.py") + print(" - Check the correct port (default: 50101)") + print() + + print("2. Authentication failures:") + print(" - Verify token matches in settings") + print(" - Check token format (should be 16 characters)") + print(" - Try different auth methods") + print() + + print("3. FastA2A not available:") + print(" - Install FastA2A: pip install fasta2a") + print(" - Check server logs for FastA2A configuration errors") + print() + + print("4. Routing issues:") + print(" - Verify /a2a prefix is working") + print(" - Check DispatcherMiddleware configuration") + print(" - Look for FastA2A startup messages in logs") + print() + + +def validate_token_format(): + """Validate that the token format is correct.""" + try: + cfg = settings.get_settings() + token = cfg.get("mcp_server_token", "") + + print("🔍 Token Validation") + print("=" * 25) + + if not token: + print("❌ No token found") + return False + + print(f"✅ Token found: {token}") + print(f"✅ Token length: {len(token)} characters") + + if len(token) != 16: + print("⚠️ Warning: Expected token length is 16 characters") + + # Check token characters + if token.isalnum(): + print("✅ Token contains only alphanumeric characters") + else: + print("⚠️ Warning: Token contains non-alphanumeric characters") + + return True + + except Exception as e: + print(f"❌ Error validating token: {e}") + return False + + +async def test_server_connectivity(): + """Test basic server connectivity.""" + try: + import httpx + + print("🌐 Server Connectivity Test") + print("=" * 30) + + async with httpx.AsyncClient() as client: + try: + # Test basic server + await client.get("http://localhost:50101/", timeout=5.0) + print("✅ Agent Zero server is running") + return True + except httpx.ConnectError: + print("❌ Cannot connect to Agent Zero server") + print(" Make sure the server is running: python run_ui.py") + return False + except Exception as e: + print(f"❌ Server connectivity error: {e}") + return False + + except ImportError: + print("ℹ️ httpx not available, skipping connectivity test") + print(" Install with: pip install httpx") + return None + + +def main(): + """Main test function.""" + print("🧪 FastA2A Agent Card Testing Utility") + print("=" * 45) + print() + + # Validate token + if not validate_token_format(): + print() + print_troubleshooting() + return 1 + + print() + + # Test connectivity if possible + try: + connectivity = asyncio.run(test_server_connectivity()) + print() + + if connectivity is False: + print_troubleshooting() + return 1 + + except Exception as e: + print(f"Error testing connectivity: {e}") + print() + + # Print test commands + print_test_commands() + + print("📋 Next Steps:") + print("1. Start Agent Zero server if not running") + print("2. Run one of the curl commands above") + print("3. Check for successful 200 response with agent card JSON") + print("4. If issues occur, see troubleshooting section") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/webui/components/settings/external/a2a-connection.html b/webui/components/settings/external/a2a-connection.html new file mode 100644 index 000000000..fbc39885d --- /dev/null +++ b/webui/components/settings/external/a2a-connection.html @@ -0,0 +1,47 @@ + + + + Connection to A0 A2A Server + + + +
+

Agent Zero A2A Server enables FastA2A protocol communication with other agents.

+

Other agents can connect using the URL below (replace host if needed):

+ +

A2A Connection URL

+
+ + +
+ + + + + + diff --git a/webui/js/settings.js b/webui/js/settings.js index f58cb1c97..7b82cb58c 100644 --- a/webui/js/settings.js +++ b/webui/js/settings.js @@ -1,4 +1,3 @@ - const settingsModalProxy = { isOpen: false, settings: {}, @@ -291,6 +290,9 @@ const settingsModalProxy = { openModal("settings/backup/backup.html"); } else if (field.id === "backup_restore") { openModal("settings/backup/restore.html"); + } else if (field.id === "show_a2a_connection") { + console.log('Opening A2A connection modal...'); + openModal("settings/external/a2a-connection.html"); } } }; diff --git a/webui/public/a2a_server.svg b/webui/public/a2a_server.svg new file mode 100644 index 000000000..626a60668 --- /dev/null +++ b/webui/public/a2a_server.svg @@ -0,0 +1,4 @@ + + + +