test(hailo): Pi-gated integration test locks in iter-163 throughput (iter 172)

Iter-165 leftover #4 closed. New
crates/ruvector-hailo-cluster/tests/pi_hardware_integration.rs
runs three end-to-end tests against a real Pi worker, gated on
RUVECTOR_TEST_PI_HOST being set. Without the env var all three
tests skip cleanly so default cargo test is unaffected.

Tests:
  pi_worker_returns_real_semantic_vectors
    Embeds the same three reference phrases the iter-167 worker
    self-test uses; asserts sim(dog,puppy) > sim(dog,kafka) with
    a margin > 0.10. Catches encoder degeneration that iter-167's
    in-process check would miss (e.g. corrupt model in a deploy
    push that bypassed install.sh).

  pi_worker_throughput_above_floor
    Sequentially embeds 30 sentences, asserts >= 5 embeds/sec.
    Floor lets a Pi 4 (~3-4/sec estimated) fail loudly while
    Pi 5 cpu-fallback (7/sec) and NPU (67/sec) pass.

  pi_worker_handles_padding_and_truncation
    Empty string + 200-repeat long string both produce finite
    384-dim vectors. Shape contract regression gate.

Run live against cognitum-v0 (Pi 5 + AI HAT+ NPU worker on 50051):

  Pi cognitum-v0:50051: sim(dog,puppy)=0.5019 sim(dog,kafka)=0.2692 Δ=+0.2327
  Pi cognitum-v0:50051: 30 embeds in 1.36s = 22.0 embeds/sec
  test result: ok. 3 passed; 0 failed; 0 ignored

The 22/sec is single-threaded sequential (no client concurrency);
matches the iter-163 single-thread profile. Concurrent dispatch
hits the iter-163 67.3/sec ceiling.

Default cargo test on x86 dev box: 3 tests skip cleanly with the
"set RUVECTOR_TEST_PI_HOST" message — CI safe.

Iter 172 closes the agreed "Clean Exit" sprint. Remaining items
(mask-aware HEF, sysroot cross-build, real calibration corpus,
multi-network HEF) are research / strategic decisions left as
future work.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruvnet 2026-05-03 16:51:17 -04:00
parent 6318096af5
commit 46ae11531c

View file

@ -0,0 +1,141 @@
//! Pi-gated end-to-end integration test for the NPU + cpu-fallback
//! workers running on real hardware.
//!
//! Iter 172 (ADR-176 follow-up). Locks in the iter-163 / iter-149
//! throughput numbers as regression gates by running the same
//! cluster-bench-style workload from this test process. Skips
//! entirely when `RUVECTOR_TEST_PI_HOST` is unset so CI / dev-box
//! `cargo test` is unaffected.
//!
//! Usage:
//! RUVECTOR_TEST_PI_HOST=cognitum-v0:50051 \
//! cargo test -p ruvector-hailo-cluster --test pi_hardware_integration \
//! -- --nocapture --test-threads=1
use ruvector_hailo_cluster::transport::{EmbeddingTransport, WorkerEndpoint};
use ruvector_hailo_cluster::{GrpcTransport, HailoClusterEmbedder};
use std::sync::Arc;
use std::time::Instant;
fn pi_host() -> Option<String> {
std::env::var("RUVECTOR_TEST_PI_HOST").ok()
}
fn cluster(addr: &str) -> HailoClusterEmbedder {
let workers = vec![WorkerEndpoint::new("pi", addr)];
let transport: Arc<dyn EmbeddingTransport + Send + Sync> = Arc::new(
GrpcTransport::new().expect("GrpcTransport::new"),
);
HailoClusterEmbedder::new(workers, transport, 384, "")
.expect("HailoClusterEmbedder::new")
}
fn cos(a: &[f32], b: &[f32]) -> f32 {
a.iter().zip(b.iter()).map(|(x, y)| x * y).sum()
}
#[test]
fn pi_worker_returns_real_semantic_vectors() {
let Some(addr) = pi_host() else {
eprintln!("skipping — set RUVECTOR_TEST_PI_HOST=<host>:<port>");
return;
};
let c = cluster(&addr);
// Three reference phrases — same set the iter-167 worker
// self-test uses. If we get the same ranking from the cluster
// side, we know:
// * worker is up
// * NPU/cpu-fallback path is loaded
// * tokenizer + embeddings + encoder + pool agree
let v0 = c.embed_one_blocking("the quick brown fox jumps over the lazy dog").unwrap();
let v1 = c.embed_one_blocking("a puppy sprints across the meadow").unwrap();
let v2 = c.embed_one_blocking("kafka topic partition rebalancing strategy").unwrap();
assert_eq!(v0.len(), 384);
assert_eq!(v1.len(), 384);
assert_eq!(v2.len(), 384);
let sim_close = cos(&v0, &v1);
let sim_far = cos(&v0, &v2);
eprintln!(
"Pi {}: sim(dog,puppy)={:.4} sim(dog,kafka)={:.4} Δ={:+.4}",
addr,
sim_close,
sim_far,
sim_close - sim_far
);
assert!(
sim_close > sim_far,
"ranking violation: sim(dog,puppy)={:.4} <= sim(dog,kafka)={:.4}",
sim_close,
sim_far
);
assert!(
sim_close - sim_far > 0.10,
"ranking margin too thin: Δ={:+.4} (encoder may be degenerate)",
sim_close - sim_far
);
}
#[test]
fn pi_worker_throughput_above_floor() {
let Some(addr) = pi_host() else {
return;
};
let c = cluster(&addr);
// iter-149 cpu-fallback baseline = 7 / sec
// iter-163 NPU = 67 / sec
// Floor is 5 / sec — catches a regression that would drop the
// cpu-fallback path below useful, while still allowing the much
// weaker Pi 4 (~3-4 / sec estimated) to fail loudly.
const FLOOR_EMBEDS_PER_SEC: f64 = 5.0;
const SAMPLES: usize = 30;
// Warm up so the first-call model load doesn't skew the bench.
let _ = c.embed_one_blocking("warm-up").unwrap();
let t0 = Instant::now();
for i in 0..SAMPLES {
let s = format!("benchmark sentence number {} of {}", i, SAMPLES);
let v = c.embed_one_blocking(&s).unwrap();
assert_eq!(v.len(), 384);
}
let elapsed = t0.elapsed();
let rate = SAMPLES as f64 / elapsed.as_secs_f64();
eprintln!(
"Pi {}: {} embeds in {:.2}s = {:.1} embeds/sec",
addr,
SAMPLES,
elapsed.as_secs_f64(),
rate
);
assert!(
rate >= FLOOR_EMBEDS_PER_SEC,
"throughput {:.1} / sec below floor {:.1} (regression?)",
rate,
FLOOR_EMBEDS_PER_SEC
);
}
#[test]
fn pi_worker_handles_padding_and_truncation() {
let Some(addr) = pi_host() else {
return;
};
let c = cluster(&addr);
// Empty string → tokenizer emits [CLS][SEP] → encoder runs on
// 2 attended positions, 126 PAD. Output should still be a
// finite unit vector.
let v_empty = c.embed_one_blocking("").unwrap();
assert_eq!(v_empty.len(), 384);
assert!(v_empty.iter().all(|x| x.is_finite()));
// Long input → tokenizer truncates to seq=128. Should still work.
let long: String = "lorem ipsum dolor sit amet ".repeat(200);
let v_long = c.embed_one_blocking(&long).unwrap();
assert_eq!(v_long.len(), 384);
assert!(v_long.iter().all(|x| x.is_finite()));
}