# OmniRoute A2A Server (ๆ—ฅๆœฌ่ชž) ๐ŸŒ **Languages:** ๐Ÿ‡บ๐Ÿ‡ธ [English](../../../../../../src/lib/a2a/README.md) ยท ๐Ÿ‡ช๐Ÿ‡ธ [es](../../../../es/src/lib/a2a/README.md) ยท ๐Ÿ‡ซ๐Ÿ‡ท [fr](../../../../fr/src/lib/a2a/README.md) ยท ๐Ÿ‡ฉ๐Ÿ‡ช [de](../../../../de/src/lib/a2a/README.md) ยท ๐Ÿ‡ฎ๐Ÿ‡น [it](../../../../it/src/lib/a2a/README.md) ยท ๐Ÿ‡ท๐Ÿ‡บ [ru](../../../../ru/src/lib/a2a/README.md) ยท ๐Ÿ‡จ๐Ÿ‡ณ [zh-CN](../../../../zh-CN/src/lib/a2a/README.md) ยท ๐Ÿ‡ฏ๐Ÿ‡ต [ja](../../../../ja/src/lib/a2a/README.md) ยท ๐Ÿ‡ฐ๐Ÿ‡ท [ko](../../../../ko/src/lib/a2a/README.md) ยท ๐Ÿ‡ธ๐Ÿ‡ฆ [ar](../../../../ar/src/lib/a2a/README.md) ยท ๐Ÿ‡ฎ๐Ÿ‡ณ [hi](../../../../hi/src/lib/a2a/README.md) ยท ๐Ÿ‡ฎ๐Ÿ‡ณ [in](../../../../in/src/lib/a2a/README.md) ยท ๐Ÿ‡น๐Ÿ‡ญ [th](../../../../th/src/lib/a2a/README.md) ยท ๐Ÿ‡ป๐Ÿ‡ณ [vi](../../../../vi/src/lib/a2a/README.md) ยท ๐Ÿ‡ฎ๐Ÿ‡ฉ [id](../../../../id/src/lib/a2a/README.md) ยท ๐Ÿ‡ฒ๐Ÿ‡พ [ms](../../../../ms/src/lib/a2a/README.md) ยท ๐Ÿ‡ณ๐Ÿ‡ฑ [nl](../../../../nl/src/lib/a2a/README.md) ยท ๐Ÿ‡ต๐Ÿ‡ฑ [pl](../../../../pl/src/lib/a2a/README.md) ยท ๐Ÿ‡ธ๐Ÿ‡ช [sv](../../../../sv/src/lib/a2a/README.md) ยท ๐Ÿ‡ณ๐Ÿ‡ด [no](../../../../no/src/lib/a2a/README.md) ยท ๐Ÿ‡ฉ๐Ÿ‡ฐ [da](../../../../da/src/lib/a2a/README.md) ยท ๐Ÿ‡ซ๐Ÿ‡ฎ [fi](../../../../fi/src/lib/a2a/README.md) ยท ๐Ÿ‡ต๐Ÿ‡น [pt](../../../../pt/src/lib/a2a/README.md) ยท ๐Ÿ‡ท๐Ÿ‡ด [ro](../../../../ro/src/lib/a2a/README.md) ยท ๐Ÿ‡ญ๐Ÿ‡บ [hu](../../../../hu/src/lib/a2a/README.md) ยท ๐Ÿ‡ง๐Ÿ‡ฌ [bg](../../../../bg/src/lib/a2a/README.md) ยท ๐Ÿ‡ธ๐Ÿ‡ฐ [sk](../../../../sk/src/lib/a2a/README.md) ยท ๐Ÿ‡บ๐Ÿ‡ฆ [uk-UA](../../../../uk-UA/src/lib/a2a/README.md) ยท ๐Ÿ‡ฎ๐Ÿ‡ฑ [he](../../../../he/src/lib/a2a/README.md) ยท ๐Ÿ‡ต๐Ÿ‡ญ [phi](../../../../phi/src/lib/a2a/README.md) ยท ๐Ÿ‡ง๐Ÿ‡ท [pt-BR](../../../../pt-BR/src/lib/a2a/README.md) ยท ๐Ÿ‡จ๐Ÿ‡ฟ [cs](../../../../cs/src/lib/a2a/README.md) ยท ๐Ÿ‡น๐Ÿ‡ท [tr](../../../../tr/src/lib/a2a/README.md) --- > **Agent-to-Agent Protocol v0.3** โ€” Enables any AI agent to use OmniRoute as an intelligent routing agent via JSON-RPC 2.0. The A2A Server exposes OmniRoute as a **first-class agent** that other agents can discover, delegate tasks to, and collaborate with using the [A2A Protocol](https://google.github.io/A2A/). --- ## ใ‚ขใƒผใ‚ญใƒ†ใ‚ฏใƒใƒฃ ``` โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ Orchestrator Agent โ”‚ โ”‚ (LangChain, CrewAI, AutoGen, Custom Agent) โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ 1. GET /.well-known/agent.json (discover) โ”‚ 2. POST /a2a (JSON-RPC 2.0) โ–ผ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ OmniRoute A2A Server โ”‚ โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”‚ โ”‚ Task Manager โ”‚ โ”‚ Skill Engine โ”‚ โ”‚ SSE Streaming โ”‚ โ”‚ โ”‚ โ”‚ (lifecycle) โ”‚โ”€โ”€โ”‚ (registry) โ”‚โ”€โ”€โ”‚ (real-time) โ”‚ โ”‚ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ Skills: โ”‚ โ”‚ โ”‚ โ”œโ”€ smart-routing โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”‚ โ””โ”€ quota-management โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ Routing Decision Logger โ”‚ โ”‚ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ–ผ OmniRoute Gateway (internal) /v1/chat/completions, /api/combos, /api/usage/quota ``` --- ## ใ‚ฏใ‚คใƒƒใ‚ฏใ‚นใ‚ฟใƒผใƒˆ ### Agent Discovery Every A2A-compatible agent exposes an **Agent Card** at `/.well-known/agent.json`: ```bash curl http://localhost:20128/.well-known/agent.json ``` **Response:** ```json { "name": "OmniRoute", "description": "Intelligent AI gateway with auto-routing across 50+ providers", "url": "http://localhost:20128/a2a", "version": "1.8.1", "capabilities": { "streaming": true, "pushNotifications": false }, "skills": [ { "id": "smart-routing", "name": "Smart Routing", "description": "Routes prompts through OmniRoute intelligent pipeline", "tags": ["routing", "llm", "multi-provider", "cost-optimization"], "examples": [ "Write a hello world in Python", "Explain quantum computing using the cheapest provider" ] }, { "id": "quota-management", "name": "Quota Management", "description": "Natural-language queries about provider quotas", "tags": ["quota", "analytics", "cost"], "examples": [ "Which provider has the most quota remaining?", "Suggest a free combo for coding" ] } ], "authentication": { "schemes": ["bearer"], "apiKeyHeader": "Authorization" } } ``` --- ## JSON-RPC 2.0 Methods ### `message/send` โ€” Synchronous Execution Send a message to a skill and receive the complete response. ```bash curl -X POST http://localhost:20128/a2a \ -H "Content-Type: application/json" \ -H "Authorization: Bearer YOUR_KEY" \ -d '{ "jsonrpc": "2.0", "id": "1", "method": "message/send", "params": { "skill": "smart-routing", "messages": [{"role": "user", "content": "Write a Python hello world"}], "metadata": {"model": "auto", "combo": "fast-coding"} } }' ``` **Response:** ```json { "jsonrpc": "2.0", "id": "1", "result": { "task": { "id": "a1b2c3d4-...", "state": "completed" }, "artifacts": [{ "type": "text", "content": "print('Hello, World!')" }], "metadata": { "routing_explanation": "Selected claude-sonnet via provider \"anthropic\" (latency: 1200ms, cost: $0.0030)", "cost_envelope": { "estimated": 0.005, "actual": 0.003, "currency": "USD" }, "resilience_trace": [ { "event": "primary_selected", "provider": "anthropic", "timestamp": "2026-03-04T..." } ], "policy_verdict": { "allowed": true, "reason": "within budget and quota limits" } } } } ``` ### `message/stream` โ€” SSE Streaming Same as `message/send` but returns Server-Sent Events for real-time streaming. ```bash curl -N -X POST http://localhost:20128/a2a \ -H "Content-Type: application/json" \ -H "Authorization: Bearer YOUR_KEY" \ -d '{ "jsonrpc": "2.0", "id": "1", "method": "message/stream", "params": { "skill": "smart-routing", "messages": [{"role": "user", "content": "Explain quantum computing"}] } }' ``` **SSE Events:** ``` data: {"jsonrpc":"2.0","method":"message/stream","params":{"task":{"id":"...","state":"working"},"chunk":{"type":"text","content":"Quantum computing..."}}} : heartbeat 2026-03-04T21:00:00Z data: {"jsonrpc":"2.0","method":"message/stream","params":{"task":{"id":"...","state":"completed"},"metadata":{...}}} ``` ### `tasks/get` โ€” Query Task Status ```bash curl -X POST http://localhost:20128/a2a \ -H "Content-Type: application/json" \ -H "Authorization: Bearer YOUR_KEY" \ -d '{"jsonrpc":"2.0","id":"2","method":"tasks/get","params":{"taskId":"TASK_UUID"}}' ``` ### `tasks/cancel` โ€” Cancel a Running Task ```bash curl -X POST http://localhost:20128/a2a \ -H "Content-Type: application/json" \ -H "Authorization: Bearer YOUR_KEY" \ -d '{"jsonrpc":"2.0","id":"3","method":"tasks/cancel","params":{"taskId":"TASK_UUID"}}' ``` --- ## Skills Reference ### `smart-routing` Routes prompts through OmniRoute's intelligent pipeline with full observability. **Parameters (in `metadata`):** | Parameter | Type | Default | Description | | --------- | -------- | ------------ | ---------------------------------------------------------------------------------------- | | `model` | `string` | `"auto"` | Target model (e.g., `claude-sonnet-4`, `gpt-4o`, `auto`) | | `combo` | `string` | active combo | Specific combo to route through | | `budget` | `number` | none | Maximum cost in USD for this request | | `role` | `string` | none | Task role hint: `coding`, `review`, `planning`, `analysis`, `debugging`, `documentation` | **Returns:** | Field | Description | | ------------------------------ | --------------------------------------------------------- | | `artifacts[].content` | The LLM response text | | `metadata.routing_explanation` | Human-readable explanation of routing decision | | `metadata.cost_envelope` | Estimated vs actual cost with currency | | `metadata.resilience_trace` | Array of events (primary_selected, fallback_needed, etc.) | | `metadata.policy_verdict` | Whether the request was allowed and why | ### `quota-management` Answers natural-language queries about provider quotas. **Query types (inferred from message content):** | Query Pattern | Response Type | | ---------------------------------------------- | -------------------------------------------------------- | | Contains `"ranking"`, `"most quota"`, `"best"` | Providers ranked by remaining quota | | Contains `"free"`, `"suggest"` | Lists free combos or suggests free-tier providers | | Default | Full quota summary with warnings for low-quota providers | --- ## Task Lifecycle ``` submitted โ”€โ”€โ†’ working โ”€โ”€โ†’ completed โ”€โ”€โ†’ failed โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ†’ cancelled ``` | State | Description | | ----------- | ----------------------------------------------------- | | `submitted` | Task created, queued for execution | | `working` | Skill handler is executing | | `completed` | Execution succeeded, artifacts available | | `failed` | Execution failed or task expired (TTL: 5 min default) | | `cancelled` | Cancelled by client via `tasks/cancel` | - Terminal states: `completed`, `failed`, `cancelled` (no further transitions) - Expired tasks in `submitted` or `working` are auto-marked as `failed` - Tasks are garbage-collected after 2ร— TTL --- ## Client Examples ### Python โ€” Orchestrator Agent ```python """ A2A Client โ€” Python example. Discovers OmniRoute agent, sends a task, and processes the result. """ import requests import json BASE_URL = "http://localhost:20128" API_KEY = "your-api-key" HEADERS = { "Content-Type": "application/json", "Authorization": f"Bearer {API_KEY}", } # 1. Discover agent capabilities agent_card = requests.get(f"{BASE_URL}/.well-known/agent.json").json() print(f"Agent: {agent_card['name']} v{agent_card['version']}") print(f"Skills: {[s['id'] for s in agent_card['skills']]}") # 2. Send a smart-routing task response = requests.post(f"{BASE_URL}/a2a", headers=HEADERS, json={ "jsonrpc": "2.0", "id": "task-1", "method": "message/send", "params": { "skill": "smart-routing", "messages": [{"role": "user", "content": "Write a Python quicksort implementation"}], "metadata": { "model": "auto", "combo": "fast-coding", "budget": 0.10, } } }) result = response.json()["result"] print(f"\n๐Ÿ“ Response: {result['artifacts'][0]['content'][:200]}...") print(f"๐Ÿ”€ Routing: {result['metadata']['routing_explanation']}") print(f"๐Ÿ’ฐ Cost: ${result['metadata']['cost_envelope']['actual']}") print(f"๐Ÿ›ก๏ธ Policy: {result['metadata']['policy_verdict']['reason']}") # 3. Query quota status quota_resp = requests.post(f"{BASE_URL}/a2a", headers=HEADERS, json={ "jsonrpc": "2.0", "id": "task-2", "method": "message/send", "params": { "skill": "quota-management", "messages": [{"role": "user", "content": "Which provider has the most quota remaining?"}], } }) quota_result = quota_resp.json()["result"] print(f"\n๐Ÿ“Š Quota: {quota_result['artifacts'][0]['content']}") ``` ### TypeScript โ€” Multi-Agent Orchestrator ```typescript /** * A2A Client โ€” TypeScript example. * Shows agent discovery, task delegation, and streaming. */ const BASE_URL = "http://localhost:20128"; const API_KEY = "your-api-key"; interface JsonRpcResponse { jsonrpc: "2.0"; id: string | number; result?: T; error?: { code: number; message: string }; } async function a2aCall(method: string, params: Record): Promise { const resp = await fetch(`${BASE_URL}/a2a`, { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${API_KEY}`, }, body: JSON.stringify({ jsonrpc: "2.0", id: `${method}-${Date.now()}`, method, params, }), }); const json: JsonRpcResponse = await resp.json(); if (json.error) throw new Error(`[${json.error.code}] ${json.error.message}`); return json.result!; } // โ”€โ”€ Agent Discovery โ”€โ”€ const agentCard = await fetch(`${BASE_URL}/.well-known/agent.json`).then((r) => r.json()); console.log(`Connected to: ${agentCard.name} (${agentCard.skills.length} skills)`); // โ”€โ”€ Smart Routing: Send a coding task โ”€โ”€ const routingResult = await a2aCall("message/send", { skill: "smart-routing", messages: [{ role: "user", content: "Implement a Redis cache wrapper in TypeScript" }], metadata: { model: "claude-sonnet-4", role: "coding" }, }); console.log("Response:", routingResult.artifacts[0].content); console.log("Provider:", routingResult.metadata.routing_explanation); // โ”€โ”€ Quota Management: Find free alternatives โ”€โ”€ const quotaResult = await a2aCall("message/send", { skill: "quota-management", messages: [{ role: "user", content: "Suggest free combos for documentation" }], }); console.log("Free combos:", quotaResult.artifacts[0].content); // โ”€โ”€ Streaming: Real-time response โ”€โ”€ const streamResp = await fetch(`${BASE_URL}/a2a`, { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${API_KEY}`, }, body: JSON.stringify({ jsonrpc: "2.0", id: "stream-1", method: "message/stream", params: { skill: "smart-routing", messages: [{ role: "user", content: "Explain microservices architecture" }], }, }), }); const reader = streamResp.body!.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value); for (const line of chunk.split("\n")) { if (line.startsWith("data: ")) { const event = JSON.parse(line.slice(6)); if (event.params.chunk) { process.stdout.write(event.params.chunk.content); } if (event.params.task.state === "completed") { console.log("\nโœ… Stream completed"); } } } } ``` ### Python โ€” LangChain A2A Integration ```python """ LangChain integration โ€” Use OmniRoute A2A as a custom LLM. """ from langchain.llms.base import BaseLLM from langchain.schema import LLMResult, Generation import requests from typing import List, Optional class OmniRouteA2A(BaseLLM): base_url: str = "http://localhost:20128" api_key: str = "" model: str = "auto" combo: Optional[str] = None @property def _llm_type(self) -> str: return "omniroute-a2a" def _call(self, prompt: str, stop: Optional[List[str]] = None, **kwargs) -> str: response = requests.post( f"{self.base_url}/a2a", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}", }, json={ "jsonrpc": "2.0", "id": "langchain-1", "method": "message/send", "params": { "skill": "smart-routing", "messages": [{"role": "user", "content": prompt}], "metadata": { "model": self.model, **({"combo": self.combo} if self.combo else {}), }, }, }, ) result = response.json()["result"] return result["artifacts"][0]["content"] def _generate(self, prompts: List[str], stop=None, **kwargs) -> LLMResult: return LLMResult( generations=[[Generation(text=self._call(p, stop))] for p in prompts] ) # Usage llm = OmniRouteA2A( base_url="http://localhost:20128", api_key="your-key", model="auto", combo="fast-coding", ) result = llm("Write a Python function to merge two sorted lists") print(result) ``` ### Go โ€” A2A Client ```go package main import ( "bytes" "encoding/json" "fmt" "io" "net/http" ) const baseURL = "http://localhost:20128" const apiKey = "your-api-key" type JsonRpcRequest struct { Jsonrpc string `json:"jsonrpc"` ID string `json:"id"` Method string `json:"method"` Params interface{} `json:"params"` } type JsonRpcResponse struct { Jsonrpc string `json:"jsonrpc"` ID string `json:"id"` Result interface{} `json:"result"` Error *struct { Code int `json:"code"` Message string `json:"message"` } `json:"error"` } func a2aCall(method string, params interface{}) (*JsonRpcResponse, error) { body, _ := json.Marshal(JsonRpcRequest{ Jsonrpc: "2.0", ID: "go-1", Method: method, Params: params, }) req, _ := http.NewRequest("POST", baseURL+"/a2a", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+apiKey) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() data, _ := io.ReadAll(resp.Body) var result JsonRpcResponse json.Unmarshal(data, &result) return &result, nil } func main() { // Discover agent resp, _ := http.Get(baseURL + "/.well-known/agent.json") defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) fmt.Println("Agent Card:", string(body)) // Send smart-routing task result, _ := a2aCall("message/send", map[string]interface{}{ "skill": "smart-routing", "messages": []map[string]string{{"role": "user", "content": "Hello from Go!"}}, "metadata": map[string]interface{}{"model": "auto"}, }) out, _ := json.MarshalIndent(result.Result, "", " ") fmt.Println("Result:", string(out)) } ``` --- ## Use Cases ### ๐Ÿค– Use Case 1: Multi-Agent Coding Pipeline An orchestrator agent delegates code generation to OmniRoute, then passes the output to a review agent. ```python def coding_pipeline(task: str): # Step 1: Generate code via OmniRoute A2A code_result = a2a_send("smart-routing", [ {"role": "user", "content": f"Write production-quality code: {task}"} ], metadata={"model": "auto", "role": "coding"}) code = code_result["artifacts"][0]["content"] # Step 2: Review the code via OmniRoute A2A (different model) review_result = a2a_send("smart-routing", [ {"role": "user", "content": f"Review this code for bugs and improvements:\n\n{code}"} ], metadata={"model": "auto", "role": "review"}) review = review_result["artifacts"][0]["content"] # Step 3: Check costs print(f"Code cost: ${code_result['metadata']['cost_envelope']['actual']}") print(f"Review cost: ${review_result['metadata']['cost_envelope']['actual']}") return {"code": code, "review": review} ``` ### ๐Ÿ’ก Use Case 2: Quota-Aware Agent Swarm Multiple agents share quota through OmniRoute, using the quota skill to coordinate. ```python async def quota_aware_agent(agent_name: str, task: str): # Check quota before starting quota = a2a_send("quota-management", [ {"role": "user", "content": "Which provider has the most quota remaining?"} ]) print(f"[{agent_name}] {quota['artifacts'][0]['content']}") # Send request with budget constraint result = a2a_send("smart-routing", [ {"role": "user", "content": task} ], metadata={"budget": 0.05}) policy = result["metadata"]["policy_verdict"] if not policy["allowed"]: print(f"[{agent_name}] โš ๏ธ Budget exceeded: {policy['reason']}") # Fall back to free combo quota = a2a_send("quota-management", [ {"role": "user", "content": "Suggest free combos"} ]) print(f"[{agent_name}] Free alternatives: {quota['artifacts'][0]['content']}") return result ``` ### ๐Ÿ“Š Use Case 3: Real-Time Streaming Dashboard A monitoring agent streams responses and displays progress in real-time. ```typescript async function streamingDashboard(prompt: string) { const response = await fetch(`${BASE_URL}/a2a`, { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${API_KEY}` }, body: JSON.stringify({ jsonrpc: "2.0", id: "dash-1", method: "message/stream", params: { skill: "smart-routing", messages: [{ role: "user", content: prompt }] }, }), }); let totalChunks = 0; const reader = response.body!.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; for (const line of decoder.decode(value).split("\n")) { if (line.startsWith("data: ")) { const event = JSON.parse(line.slice(6)); const state = event.params.task.state; if (state === "working" && event.params.chunk) { totalChunks++; process.stdout.write( `\r[Chunk ${totalChunks}] ${event.params.chunk.content.slice(0, 50)}...` ); } if (state === "completed") { const meta = event.params.metadata; console.log( `\nโœ… Done | Cost: $${meta?.cost_envelope?.actual || 0} | Route: ${meta?.routing_explanation || "N/A"}` ); } if (state === "failed") { console.error(`\nโŒ Failed: ${event.params.metadata?.error}`); } } } } } ``` ### ๐Ÿ” Use Case 4: Task Polling Pattern For long-running tasks, poll the task status instead of waiting synchronously. ```python import time def poll_task(task_id: str, timeout: int = 60): """Poll task status until completion or timeout.""" start = time.time() while time.time() - start < timeout: result = requests.post(f"{BASE_URL}/a2a", headers=HEADERS, json={ "jsonrpc": "2.0", "id": "poll-1", "method": "tasks/get", "params": {"taskId": task_id}, }).json() task = result["result"]["task"] state = task["state"] print(f" Task {task_id[:8]}... state={state}") if state in ("completed", "failed", "cancelled"): return task time.sleep(2) # Timeout โ€” cancel the task requests.post(f"{BASE_URL}/a2a", headers=HEADERS, json={ "jsonrpc": "2.0", "id": "cancel-1", "method": "tasks/cancel", "params": {"taskId": task_id}, }) raise TimeoutError(f"Task {task_id} timed out after {timeout}s") ``` --- ## Error Codes | Code | Constant | Meaning | | ------ | ------------------------ | ---------------------------------------- | | -32700 | โ€” | Parse error (invalid JSON) | | -32600 | `INVALID_REQUEST` | Invalid JSON-RPC request or unauthorized | | -32601 | `METHOD_NOT_FOUND` | Unknown method or skill | | -32602 | `INVALID_PARAMS` | Missing or invalid parameters | | -32603 | `INTERNAL_ERROR` | Skill execution failed | | -32001 | `TASK_NOT_FOUND` | Task ID not found | | -32002 | `TASK_ALREADY_COMPLETED` | Cannot modify a completed task | | -32003 | `UNAUTHORIZED` | Invalid or missing API key | | -32004 | `BUDGET_EXCEEDED` | Request exceeds configured budget | | -32005 | `PROVIDER_UNAVAILABLE` | No available providers | --- ## Authentication All `/a2a` requests require a Bearer token via the `Authorization` header: ``` Authorization: Bearer YOUR_OMNIROUTE_API_KEY ``` If no API key is configured on the server (`OMNIROUTE_API_KEY` is empty), authentication is bypassed. --- ## File Structure ``` src/lib/a2a/ โ”œโ”€โ”€ taskManager.ts # Task lifecycle (create/update/cancel/list), TTL, cleanup โ”œโ”€โ”€ taskExecution.ts # Generic task executor with state management โ”œโ”€โ”€ streaming.ts # SSE stream formatting, heartbeat, chunk/completion events โ”œโ”€โ”€ routingLogger.ts # Routing decision logger (stats, history, retention) โ””โ”€โ”€ skills/ โ”œโ”€โ”€ smartRouting.ts # Smart routing skill (routes via /v1/chat/completions) โ””โ”€โ”€ quotaManagement.ts # Quota management skill (natural-language quota queries) src/app/a2a/ โ””โ”€โ”€ route.ts # Next.js API route handler (JSON-RPC 2.0 dispatch) open-sse/mcp-server/ โ””โ”€โ”€ schemas/a2a.ts # Zod schemas (AgentCard, Task, JSON-RPC, SSE events) ``` --- ## Comparison: MCP vs A2A | Feature | MCP Server | A2A Server | | ----------------- | ---------------------------- | ------------------------------------------------- | | **Protocol** | Model Context Protocol | Agent-to-Agent Protocol v0.3 | | **Transport** | stdio / HTTP | HTTP (JSON-RPC 2.0) | | **Discovery** | Tool listing via MCP | `/.well-known/agent.json` | | **Granularity** | 16 individual tools | 2 high-level skills | | **Best for** | IDE agents (Cursor, VS Code) | Multi-agent systems (LangChain, CrewAI) | | **Streaming** | Not supported | SSE via `message/stream` | | **Task tracking** | No | Full lifecycle (submitted โ†’ completed) | | **Observability** | Audit log per tool call | Cost envelope + resilience trace + policy verdict | --- ## ใƒฉใ‚คใ‚ปใƒณใ‚น Part of [OmniRoute](https://github.com/diegosouzapw/OmniRoute) โ€” MIT License.