mirror of
https://github.com/ruvnet/RuVector.git
synced 2026-05-27 17:23:34 +00:00
New subcrate at crates/rvAgent/rvagent-a2a/ implementing all four
ADR-159 milestones (M1-M4) plus the rvagent-cli a2a subcommand.
Library scope (~7500 LoC + 1500 tests):
- Core types: AgentCard, Task, Message, Part, Artifact, TaskSpec, plus
TaskStatusUpdateEvent / TaskArtifactUpdateEvent SSE events
- Server: axum-based JSON-RPC 2.0 with tasks/{send, get, cancel,
sendSubscribe, resubscribe, pushNotification/{set,get}}; bounded
broadcast; SSE replay from task history with Last-Event-Id support
- Client: discovery with ETag cache + signature verification, retry
with exponential backoff, streaming
- Identity (r2): AgentID = SHAKE-256(ed25519_pubkey), JCS-canonical
signed AgentCards, verify-on-discover
- Policy (r2): TaskPolicy + PolicyGuard with concurrency tickets,
per-task max_tokens / max_cost_usd / max_duration_ms / allowed_skills
- Executor (r2): unified Local(TaskRunner) / Remote(Peer) abstraction
- Artifacts (r2+r3): #[non_exhaustive] ArtifactKind with
Text/StructuredJson/VectorRef/RuLakeWitness/Raw + version negotiation
- Routing (r2): PeerSelector trait + 4 stock impls (CheapestUnderLatency,
LowestLatency, RoundRobin, CapabilityMatch) + ChainedSelector +
PeerRegistry with 3-strike circuit breaker; live peer-forwarding
wired through tasks/send dispatch chain
- Budget (r3): GlobalBudget + BudgetLedger with parking_lot::Mutex,
100ms lazy eviction, uncapped fast-path (442 M ops/s), Shed/Queue
overflow policies (custom deserializer accepts both bare-string and
tagged-table TOML forms)
- Context (r3): TaskContext with W3C trace_id, parent_task_id, depth,
visited_agents propagated as metadata.ruvector.context
- Recursion guard (r3): RecursionPolicy depth + revisit cycle detection
- Config (r3): TOML loader for routing/budget/policy/recursion sections
- Push webhooks (M4): HMAC-SHA256 + optional Ed25519 (feature-gated),
3-attempt exponential retry on 5xx, no-retry on 4xx, registry per
task_id
Dispatch chain (server/json_rpc.rs tasks/send):
budget → recursion → policy → router (peer-forward) → local executor
CLI integration (crates/rvAgent/rvagent-cli/src/a2a.rs):
rvagent a2a serve [--bind] [--config] [--generate-key]
rvagent a2a discover <URL>
rvagent a2a send-task <URL> --skill <id> [--input ...]
End-to-end smoke test in tests/a2a_cli.rs spawns the binary, asserts
serve → discover → send-task roundtrip with signed AgentCard.
Verification:
- 136/136 tests passing on default features
- 137/137 with `--features ed25519-webhooks`
- Three-point ADR-159 acceptance test all green:
- executor_remote: local ≡ remote PASS
- witness_handoff: 765-byte body for 100k-vector payload (≤ 2 KiB)
- dispatch_order + recursion_guard + budget_guard: cost bounded PASS
Workspace member registration for rvagent-a2a + examples/a2a-swarm
included in this commit.
Refs: ADR-159
Co-Authored-By: claude-flow <ruv@ruv.net>
117 lines
4.1 KiB
Rust
117 lines
4.1 KiB
Rust
//! End-to-end smoke test for `rvagent a2a` subcommand.
|
|
//!
|
|
//! Spawns `rvagent a2a serve --bind 127.0.0.1:0` as a child process,
|
|
//! parses the bound port from its stdout, and then exercises
|
|
//! `discover` + `send-task` against it.
|
|
|
|
use std::process::Stdio;
|
|
use std::time::Duration;
|
|
|
|
use tokio::io::{AsyncBufReadExt, BufReader};
|
|
use tokio::process::Command as TokioCommand;
|
|
|
|
/// Locate the `rvagent` binary produced by `cargo build -p rvagent-cli`.
|
|
/// Works in both `cargo test` (where `CARGO_BIN_EXE_rvagent` is set) and
|
|
/// ad-hoc invocations (fallback to `target/debug`).
|
|
fn rvagent_bin() -> std::path::PathBuf {
|
|
if let Some(p) = option_env!("CARGO_BIN_EXE_rvagent") {
|
|
return std::path::PathBuf::from(p);
|
|
}
|
|
let mut p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
|
|
for _ in 0..3 {
|
|
p = p.parent().unwrap().to_path_buf();
|
|
}
|
|
p.join("target").join("debug").join("rvagent")
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn a2a_serve_discover_and_send_task() {
|
|
let bin = rvagent_bin();
|
|
assert!(bin.exists(), "rvagent binary not found at {:?}", bin);
|
|
|
|
// -- 1) Spawn the server on an ephemeral port.
|
|
let mut server = TokioCommand::new(&bin)
|
|
.args(["a2a", "serve", "--bind", "127.0.0.1:0", "--generate-key"])
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.kill_on_drop(true)
|
|
.spawn()
|
|
.expect("spawn rvagent a2a serve");
|
|
|
|
let stdout = server.stdout.take().expect("server stdout piped");
|
|
let mut reader = BufReader::new(stdout).lines();
|
|
|
|
// -- 2) Parse "listening on 127.0.0.1:<port>" from the first line.
|
|
//
|
|
// Give the server up to 20s to bind + print; CI under load is slower
|
|
// than local.
|
|
let line = tokio::time::timeout(Duration::from_secs(20), reader.next_line())
|
|
.await
|
|
.expect("server listening line timed out")
|
|
.expect("server stdout read error")
|
|
.expect("server closed before emitting listening line");
|
|
let addr = line
|
|
.strip_prefix("listening on ")
|
|
.unwrap_or_else(|| panic!("unexpected first-line stdout from server: {:?}", line))
|
|
.trim()
|
|
.to_string();
|
|
let base_url = format!("http://{}", addr);
|
|
|
|
// -- 3) Run `discover` against the live server.
|
|
let out = TokioCommand::new(&bin)
|
|
.args(["a2a", "discover", &base_url])
|
|
.output()
|
|
.await
|
|
.expect("spawn rvagent a2a discover");
|
|
assert!(
|
|
out.status.success(),
|
|
"discover failed: status={:?} stdout={} stderr={}",
|
|
out.status,
|
|
String::from_utf8_lossy(&out.stdout),
|
|
String::from_utf8_lossy(&out.stderr),
|
|
);
|
|
let stdout = String::from_utf8(out.stdout).expect("discover stdout utf8");
|
|
// Must be parseable JSON with an `agentCard` envelope.
|
|
let v: serde_json::Value =
|
|
serde_json::from_str(&stdout).expect("discover stdout is valid JSON");
|
|
assert!(
|
|
v.get("agentCard").is_some(),
|
|
"discover response missing agentCard: {}",
|
|
stdout
|
|
);
|
|
|
|
// -- 4) Run `send-task` — skill `echo` is advertised by the built-in
|
|
// InMemoryRunner, which always completes synchronously.
|
|
let out = TokioCommand::new(&bin)
|
|
.args([
|
|
"a2a",
|
|
"send-task",
|
|
&base_url,
|
|
"--skill",
|
|
"echo",
|
|
"--input",
|
|
"hello",
|
|
])
|
|
.output()
|
|
.await
|
|
.expect("spawn rvagent a2a send-task");
|
|
assert!(
|
|
out.status.success(),
|
|
"send-task failed: status={:?} stdout={} stderr={}",
|
|
out.status,
|
|
String::from_utf8_lossy(&out.stdout),
|
|
String::from_utf8_lossy(&out.stderr),
|
|
);
|
|
let stdout = String::from_utf8(out.stdout).expect("send-task stdout utf8");
|
|
// Task JSON must report completed state. Match both kebab-case
|
|
// (on-wire) and struct-field forms conservatively.
|
|
assert!(
|
|
stdout.contains("\"state\": \"completed\"") || stdout.contains("\"state\":\"completed\""),
|
|
"send-task response did not include completed state:\n{}",
|
|
stdout
|
|
);
|
|
|
|
// -- 5) Tear the server down.
|
|
let _ = server.start_kill();
|
|
let _ = server.wait().await;
|
|
}
|