ruvector/crates/rvAgent/rvagent-cli/tests/a2a_cli.rs
ruvnet 6c224b809c feat(rvagent-a2a): implement ADR-159 — A2A protocol library + CLI integration
New subcrate at crates/rvAgent/rvagent-a2a/ implementing all four
ADR-159 milestones (M1-M4) plus the rvagent-cli a2a subcommand.

Library scope (~7500 LoC + 1500 tests):

- Core types: AgentCard, Task, Message, Part, Artifact, TaskSpec, plus
  TaskStatusUpdateEvent / TaskArtifactUpdateEvent SSE events
- Server: axum-based JSON-RPC 2.0 with tasks/{send, get, cancel,
  sendSubscribe, resubscribe, pushNotification/{set,get}}; bounded
  broadcast; SSE replay from task history with Last-Event-Id support
- Client: discovery with ETag cache + signature verification, retry
  with exponential backoff, streaming
- Identity (r2): AgentID = SHAKE-256(ed25519_pubkey), JCS-canonical
  signed AgentCards, verify-on-discover
- Policy (r2): TaskPolicy + PolicyGuard with concurrency tickets,
  per-task max_tokens / max_cost_usd / max_duration_ms / allowed_skills
- Executor (r2): unified Local(TaskRunner) / Remote(Peer) abstraction
- Artifacts (r2+r3): #[non_exhaustive] ArtifactKind with
  Text/StructuredJson/VectorRef/RuLakeWitness/Raw + version negotiation
- Routing (r2): PeerSelector trait + 4 stock impls (CheapestUnderLatency,
  LowestLatency, RoundRobin, CapabilityMatch) + ChainedSelector +
  PeerRegistry with 3-strike circuit breaker; live peer-forwarding
  wired through tasks/send dispatch chain
- Budget (r3): GlobalBudget + BudgetLedger with parking_lot::Mutex,
  100ms lazy eviction, uncapped fast-path (442 M ops/s), Shed/Queue
  overflow policies (custom deserializer accepts both bare-string and
  tagged-table TOML forms)
- Context (r3): TaskContext with W3C trace_id, parent_task_id, depth,
  visited_agents propagated as metadata.ruvector.context
- Recursion guard (r3): RecursionPolicy depth + revisit cycle detection
- Config (r3): TOML loader for routing/budget/policy/recursion sections
- Push webhooks (M4): HMAC-SHA256 + optional Ed25519 (feature-gated),
  3-attempt exponential retry on 5xx, no-retry on 4xx, registry per
  task_id

Dispatch chain (server/json_rpc.rs tasks/send):
  budget → recursion → policy → router (peer-forward) → local executor

CLI integration (crates/rvAgent/rvagent-cli/src/a2a.rs):
  rvagent a2a serve [--bind] [--config] [--generate-key]
  rvagent a2a discover <URL>
  rvagent a2a send-task <URL> --skill <id> [--input ...]

End-to-end smoke test in tests/a2a_cli.rs spawns the binary, asserts
serve → discover → send-task roundtrip with signed AgentCard.

Verification:
- 136/136 tests passing on default features
- 137/137 with `--features ed25519-webhooks`
- Three-point ADR-159 acceptance test all green:
  - executor_remote: local ≡ remote PASS
  - witness_handoff: 765-byte body for 100k-vector payload (≤ 2 KiB)
  - dispatch_order + recursion_guard + budget_guard: cost bounded PASS

Workspace member registration for rvagent-a2a + examples/a2a-swarm
included in this commit.

Refs: ADR-159

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-04-25 16:59:00 -04:00

117 lines
4.1 KiB
Rust

//! End-to-end smoke test for `rvagent a2a` subcommand.
//!
//! Spawns `rvagent a2a serve --bind 127.0.0.1:0` as a child process,
//! parses the bound port from its stdout, and then exercises
//! `discover` + `send-task` against it.
use std::process::Stdio;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command as TokioCommand;
/// Locate the `rvagent` binary produced by `cargo build -p rvagent-cli`.
/// Works in both `cargo test` (where `CARGO_BIN_EXE_rvagent` is set) and
/// ad-hoc invocations (fallback to `target/debug`).
fn rvagent_bin() -> std::path::PathBuf {
if let Some(p) = option_env!("CARGO_BIN_EXE_rvagent") {
return std::path::PathBuf::from(p);
}
let mut p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
for _ in 0..3 {
p = p.parent().unwrap().to_path_buf();
}
p.join("target").join("debug").join("rvagent")
}
#[tokio::test]
async fn a2a_serve_discover_and_send_task() {
let bin = rvagent_bin();
assert!(bin.exists(), "rvagent binary not found at {:?}", bin);
// -- 1) Spawn the server on an ephemeral port.
let mut server = TokioCommand::new(&bin)
.args(["a2a", "serve", "--bind", "127.0.0.1:0", "--generate-key"])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
.expect("spawn rvagent a2a serve");
let stdout = server.stdout.take().expect("server stdout piped");
let mut reader = BufReader::new(stdout).lines();
// -- 2) Parse "listening on 127.0.0.1:<port>" from the first line.
//
// Give the server up to 20s to bind + print; CI under load is slower
// than local.
let line = tokio::time::timeout(Duration::from_secs(20), reader.next_line())
.await
.expect("server listening line timed out")
.expect("server stdout read error")
.expect("server closed before emitting listening line");
let addr = line
.strip_prefix("listening on ")
.unwrap_or_else(|| panic!("unexpected first-line stdout from server: {:?}", line))
.trim()
.to_string();
let base_url = format!("http://{}", addr);
// -- 3) Run `discover` against the live server.
let out = TokioCommand::new(&bin)
.args(["a2a", "discover", &base_url])
.output()
.await
.expect("spawn rvagent a2a discover");
assert!(
out.status.success(),
"discover failed: status={:?} stdout={} stderr={}",
out.status,
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr),
);
let stdout = String::from_utf8(out.stdout).expect("discover stdout utf8");
// Must be parseable JSON with an `agentCard` envelope.
let v: serde_json::Value =
serde_json::from_str(&stdout).expect("discover stdout is valid JSON");
assert!(
v.get("agentCard").is_some(),
"discover response missing agentCard: {}",
stdout
);
// -- 4) Run `send-task` — skill `echo` is advertised by the built-in
// InMemoryRunner, which always completes synchronously.
let out = TokioCommand::new(&bin)
.args([
"a2a",
"send-task",
&base_url,
"--skill",
"echo",
"--input",
"hello",
])
.output()
.await
.expect("spawn rvagent a2a send-task");
assert!(
out.status.success(),
"send-task failed: status={:?} stdout={} stderr={}",
out.status,
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr),
);
let stdout = String::from_utf8(out.stdout).expect("send-task stdout utf8");
// Task JSON must report completed state. Match both kebab-case
// (on-wire) and struct-field forms conservatively.
assert!(
stdout.contains("\"state\": \"completed\"") || stdout.contains("\"state\":\"completed\""),
"send-task response did not include completed state:\n{}",
stdout
);
// -- 5) Tear the server down.
let _ = server.start_kill();
let _ = server.wait().await;
}