mirror of
https://github.com/ruvnet/RuVector.git
synced 2026-05-31 21:49:52 +00:00
New subcrate at crates/rvAgent/rvagent-a2a/ implementing all four
ADR-159 milestones (M1-M4) plus the rvagent-cli a2a subcommand.
Library scope (~7500 LoC + 1500 tests):
- Core types: AgentCard, Task, Message, Part, Artifact, TaskSpec, plus
TaskStatusUpdateEvent / TaskArtifactUpdateEvent SSE events
- Server: axum-based JSON-RPC 2.0 with tasks/{send, get, cancel,
sendSubscribe, resubscribe, pushNotification/{set,get}}; bounded
broadcast; SSE replay from task history with Last-Event-Id support
- Client: discovery with ETag cache + signature verification, retry
with exponential backoff, streaming
- Identity (r2): AgentID = SHAKE-256(ed25519_pubkey), JCS-canonical
signed AgentCards, verify-on-discover
- Policy (r2): TaskPolicy + PolicyGuard with concurrency tickets,
per-task max_tokens / max_cost_usd / max_duration_ms / allowed_skills
- Executor (r2): unified Local(TaskRunner) / Remote(Peer) abstraction
- Artifacts (r2+r3): #[non_exhaustive] ArtifactKind with
Text/StructuredJson/VectorRef/RuLakeWitness/Raw + version negotiation
- Routing (r2): PeerSelector trait + 4 stock impls (CheapestUnderLatency,
LowestLatency, RoundRobin, CapabilityMatch) + ChainedSelector +
PeerRegistry with 3-strike circuit breaker; live peer-forwarding
wired through tasks/send dispatch chain
- Budget (r3): GlobalBudget + BudgetLedger with parking_lot::Mutex,
100ms lazy eviction, uncapped fast-path (442 M ops/s), Shed/Queue
overflow policies (custom deserializer accepts both bare-string and
tagged-table TOML forms)
- Context (r3): TaskContext with W3C trace_id, parent_task_id, depth,
visited_agents propagated as metadata.ruvector.context
- Recursion guard (r3): RecursionPolicy depth + revisit cycle detection
- Config (r3): TOML loader for routing/budget/policy/recursion sections
- Push webhooks (M4): HMAC-SHA256 + optional Ed25519 (feature-gated),
3-attempt exponential retry on 5xx, no-retry on 4xx, registry per
task_id
Dispatch chain (server/json_rpc.rs tasks/send):
budget → recursion → policy → router (peer-forward) → local executor
CLI integration (crates/rvAgent/rvagent-cli/src/a2a.rs):
rvagent a2a serve [--bind] [--config] [--generate-key]
rvagent a2a discover <URL>
rvagent a2a send-task <URL> --skill <id> [--input ...]
End-to-end smoke test in tests/a2a_cli.rs spawns the binary, asserts
serve → discover → send-task roundtrip with signed AgentCard.
Verification:
- 136/136 tests passing on default features
- 137/137 with `--features ed25519-webhooks`
- Three-point ADR-159 acceptance test all green:
- executor_remote: local ≡ remote PASS
- witness_handoff: 765-byte body for 100k-vector payload (≤ 2 KiB)
- dispatch_order + recursion_guard + budget_guard: cost bounded PASS
Workspace member registration for rvagent-a2a + examples/a2a-swarm
included in this commit.
Refs: ADR-159
Co-Authored-By: claude-flow <ruv@ruv.net>
204 lines
7.7 KiB
Rust
204 lines
7.7 KiB
Rust
//! ADR-159 M3 r2 — `executor_remote.rs` (acceptance test #1).
|
|
//!
|
|
//! "A remote agent call is indistinguishable from a local call." We dispatch
|
|
//! the same `TaskSpec` through `Executor::Local(InMemoryRunner)` and
|
|
//! `Executor::Remote(Peer)` and assert the result payload is equivalent
|
|
//! modulo generated IDs + timestamps.
|
|
//!
|
|
//! Wiring approach: bind a plain `tokio::net::TcpListener` on 127.0.0.1:0,
|
|
//! mount the `A2aServer` router behind `axum::serve` in a background task,
|
|
//! and aim a live `A2aClient` (real reqwest, real TCP) at that ephemeral
|
|
//! port. This avoids the in-process vs. reqwest mismatch that
|
|
//! `axum_test::TestServer` historically exhibits.
|
|
|
|
use std::sync::Arc;
|
|
|
|
use rvagent_a2a::budget::{BudgetLedger, GlobalBudget};
|
|
use rvagent_a2a::client::A2aClient;
|
|
use rvagent_a2a::context::TaskContext;
|
|
use rvagent_a2a::executor::{Executor, InMemoryRunner, Peer};
|
|
use rvagent_a2a::identity::{agent_id_from_pubkey, sign_card, AgentID};
|
|
use rvagent_a2a::server::{A2aServer, A2aServerConfig};
|
|
use rvagent_a2a::types::{
|
|
AgentCapabilities, AgentCard, AgentProvider, AgentSkill, AuthScheme, Message, Part, Role,
|
|
TaskSpec, TaskState,
|
|
};
|
|
|
|
use ed25519_dalek::SigningKey;
|
|
use rand_core::OsRng;
|
|
use serde_json::json;
|
|
|
|
fn random_agent_id() -> AgentID {
|
|
agent_id_from_pubkey(&SigningKey::generate(&mut OsRng).verifying_key())
|
|
}
|
|
|
|
fn spec(id: &str) -> TaskSpec {
|
|
TaskSpec {
|
|
id: id.into(),
|
|
skill: "rag.query".into(),
|
|
message: Message {
|
|
role: Role::User,
|
|
parts: vec![Part::Text {
|
|
text: "ping".into(),
|
|
}],
|
|
metadata: serde_json::Value::Null,
|
|
},
|
|
policy: None,
|
|
context: TaskContext::new_root(random_agent_id()),
|
|
metadata: serde_json::Value::Null,
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn local_executor_reaches_completed() {
|
|
// The Local half of the acceptance — kept even after the Remote half
|
|
// is wired so regression failures narrow cleanly to one side.
|
|
let executor = Executor::Local(Arc::new(InMemoryRunner::new()));
|
|
let task = executor.run(spec("ex-local")).await.expect("local run");
|
|
assert_eq!(task.status.state, TaskState::Completed);
|
|
assert!(
|
|
!task.artifacts.is_empty(),
|
|
"local runner produced 0 artifacts"
|
|
);
|
|
}
|
|
|
|
/// Build a fresh signed [`AgentCard`] for a server at `base_url`. Embeds the
|
|
/// Ed25519 signature under `metadata.ruvector.signature` so the client's
|
|
/// `fetch_card` verify path is exercised end-to-end.
|
|
fn signed_card(sk: &SigningKey, base_url: &str) -> AgentCard {
|
|
let card = AgentCard {
|
|
name: "exec-remote-test".into(),
|
|
description: "executor_remote.rs fixture".into(),
|
|
url: base_url.to_string(),
|
|
provider: AgentProvider {
|
|
organization: "ruvector".into(),
|
|
url: None,
|
|
},
|
|
version: "0.1.0".into(),
|
|
capabilities: AgentCapabilities::default(),
|
|
skills: vec![AgentSkill {
|
|
id: "rag.query".into(),
|
|
name: "RAG".into(),
|
|
description: "x".into(),
|
|
tags: vec![],
|
|
input_modes: vec![],
|
|
output_modes: vec![],
|
|
}],
|
|
authentication: AuthScheme {
|
|
schemes: vec!["bearer".into()],
|
|
},
|
|
metadata: json!({ "ruvector": {} }),
|
|
};
|
|
let sig = sign_card(&card, sk).expect("sign card");
|
|
// Re-inject the signature under metadata.ruvector.signature — same
|
|
// pattern as card_signature.rs fixtures.
|
|
let mut v = serde_json::to_value(&card).unwrap();
|
|
let meta = v
|
|
.get_mut("metadata")
|
|
.and_then(|m| m.as_object_mut())
|
|
.unwrap();
|
|
let ruvector = meta
|
|
.entry("ruvector")
|
|
.or_insert_with(|| json!({}))
|
|
.as_object_mut()
|
|
.unwrap();
|
|
ruvector.insert("signature".into(), serde_json::to_value(&sig).unwrap());
|
|
serde_json::from_value(v).expect("re-parse signed card")
|
|
}
|
|
|
|
/// ADR-159 acceptance test #1 — "indistinguishable." Runs the same
|
|
/// `TaskSpec` through `Executor::Local(InMemoryRunner)` and
|
|
/// `Executor::Remote(Peer)` (backed by a real-TCP `A2aClient` ↔ `A2aServer`
|
|
/// pair) and asserts the result shapes match modulo timestamps.
|
|
#[tokio::test]
|
|
async fn remote_executor_matches_local_shape() {
|
|
// 1. Bind an ephemeral TCP port. Reqwest can always reach this.
|
|
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
|
|
.await
|
|
.expect("bind ephemeral port");
|
|
let addr = listener.local_addr().expect("local_addr");
|
|
let base_url = format!("http://{}", addr);
|
|
|
|
// 2. Build the server: signed AgentCard + InMemoryRunner executor +
|
|
// unlimited budget.
|
|
let sk = SigningKey::generate(&mut OsRng);
|
|
let card = signed_card(&sk, &base_url);
|
|
let card_bytes = serde_json::to_vec(&card).expect("card bytes");
|
|
let executor = Arc::new(Executor::Local(Arc::new(InMemoryRunner::new())));
|
|
let budget = Arc::new(BudgetLedger::new(GlobalBudget::default()));
|
|
let server = A2aServer::new(
|
|
card.clone(),
|
|
card_bytes,
|
|
executor,
|
|
budget,
|
|
A2aServerConfig::default(),
|
|
);
|
|
let router = server.router();
|
|
|
|
// 3. Mount behind `axum::serve` in a background task. Kept for the
|
|
// lifetime of the test via the tokio runtime's shutdown on drop —
|
|
// this is the most reliable cross-platform pattern.
|
|
tokio::spawn(async move {
|
|
axum::serve(listener, router).await.expect("axum serve");
|
|
});
|
|
// Give the server a moment to start accepting connections — ~10 ms is
|
|
// ample on a loopback interface.
|
|
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
|
|
|
// 4. Build the client + Peer.
|
|
let client = A2aClient::new().expect("client");
|
|
// Exercise discovery once so fetch_card() is covered + the signed-card
|
|
// path actually runs through `verify_card`.
|
|
let fetched_card = client.fetch_card(&base_url).await.expect("fetch card");
|
|
assert_eq!(fetched_card.url, base_url);
|
|
|
|
let peer = Peer {
|
|
id: agent_id_from_pubkey(&sk.verifying_key()),
|
|
card: fetched_card,
|
|
base_url: url::Url::parse(&base_url).expect("url"),
|
|
client: Arc::new(client),
|
|
};
|
|
|
|
let local = Executor::Local(Arc::new(InMemoryRunner::new()));
|
|
let remote = Executor::Remote(Box::new(peer));
|
|
|
|
// 5. Dispatch the same TaskSpec through both sides.
|
|
let local_spec = spec("ex-local-side");
|
|
let remote_spec = spec("ex-remote-side");
|
|
let local_task = local.run(local_spec).await.expect("local run");
|
|
let remote_task = remote.run(remote_spec).await.expect("remote run");
|
|
|
|
// 6. Both reached Completed.
|
|
assert_eq!(local_task.status.state, TaskState::Completed);
|
|
assert_eq!(remote_task.status.state, TaskState::Completed);
|
|
|
|
// 7. Same artifact count.
|
|
assert_eq!(
|
|
local_task.artifacts.len(),
|
|
remote_task.artifacts.len(),
|
|
"artifact counts diverge",
|
|
);
|
|
|
|
// 8. Structural equality of artifact Part shapes — compare the
|
|
// `type` discriminator of each Part, which is what a downstream
|
|
// consumer actually branches on. Ignores generated ids + timestamps
|
|
// (which live only on the Task, not on the artifact Parts).
|
|
fn part_kind(p: &Part) -> &'static str {
|
|
match p {
|
|
Part::Text { .. } => "text",
|
|
Part::File { .. } => "file",
|
|
Part::Data { .. } => "data",
|
|
}
|
|
}
|
|
for (l, r) in local_task
|
|
.artifacts
|
|
.iter()
|
|
.zip(remote_task.artifacts.iter())
|
|
{
|
|
let l_kinds: Vec<&str> = l.parts.iter().map(part_kind).collect();
|
|
let r_kinds: Vec<&str> = r.parts.iter().map(part_kind).collect();
|
|
assert_eq!(l_kinds, r_kinds, "part-kind shapes diverge");
|
|
assert_eq!(l.append, r.append);
|
|
assert_eq!(l.last_chunk, r.last_chunk);
|
|
}
|
|
}
|