From 952bc9b85fe9f142b2a6dbbc238d266527760565 Mon Sep 17 00:00:00 2001 From: ruvnet Date: Sun, 3 May 2026 18:58:36 -0400 Subject: [PATCH] test(hailo): lock in iter-182 RPC timeout behavior (iter 195) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds two cases to dos_gates.rs to lock in the iter-182 `Server::timeout` middleware behavior. iter-182 picked tonic's tower-timeout cap to bound slow-loris attacks and any handler that hangs past its budget; without a regression test, a future change that unbinds the timeout silently lets the worker accumulate stuck handlers again. embed_handler_exceeding_timeout_returns_cancelled Server::timeout(200 ms), handler sleeps 1 s. Asserts: * status code = Cancelled (tonic's tower-timeout middleware wraps tower's Elapsed error in Status::cancelled, per the iter-182 commit message) * elapsed wall time < 600 ms (3× timeout) — proves the cap actually fired rather than the request completing some other way embed_handler_within_timeout_succeeds Server::timeout(1 s), handler sleeps 50 ms. Confirms the cap doesn't accidentally block legitimate fast traffic — guards against a future "tighten the timeout to 10 ms" change that would break every embed. dos_gates.rs now has six cases covering three of the six gates: byte cap (iter 180) : 2/2 encoding cap (iter 190) : 2/2 RPC timeout (iter 182) : 2/2 ← new Validated: - dos_gates suite: 6/6 pass in 0.25 s - full integration sweep: 1 pre-existing flake unrelated to this iter (`cluster_load_distribution::p2c_ewma_biases_toward_fast_worker_under_load`, confirmed flaky 1/5 — depends on tokio scheduler timing for a 2:1 EWMA dispatch ratio, intermittent across the session) Pi worker untouched; pure test-suite addition. Co-Authored-By: claude-flow --- .../ruvector-hailo-cluster/tests/dos_gates.rs | 165 ++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/crates/ruvector-hailo-cluster/tests/dos_gates.rs b/crates/ruvector-hailo-cluster/tests/dos_gates.rs index 9e8b43a25..d14fe4e73 100644 --- a/crates/ruvector-hailo-cluster/tests/dos_gates.rs +++ b/crates/ruvector-hailo-cluster/tests/dos_gates.rs @@ -163,6 +163,92 @@ impl Embedding for OversizedResponseMockWorker { } } +/// Iter 195 — companion fixture for iter-182's `Server::timeout`. +/// Stands up a worker whose `embed` handler sleeps `handler_sleep_ms` +/// before returning, so the test can drive the timeout middleware +/// without an actual hang. +#[derive(Clone)] +struct SlowMockWorker { + handler_sleep_ms: u64, +} + +#[tonic::async_trait] +impl Embedding for SlowMockWorker { + async fn embed( + &self, + _request: Request, + ) -> Result, Status> { + tokio::time::sleep(Duration::from_millis(self.handler_sleep_ms)).await; + Ok(Response::new(EmbedResponse { + vector: vec![0.0; 384], + dim: 384, + latency_us: (self.handler_sleep_ms * 1_000) as i64, + })) + } + + async fn health( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(HealthResponse { + version: "slow-mock".into(), + device_id: "slow:0".into(), + model_fingerprint: "fp:slow".into(), + ready: true, + npu_temp_ts0_celsius: 0.0, + npu_temp_ts1_celsius: 0.0, + })) + } + + type EmbedStreamStream = Pin< + Box> + Send + 'static>, + >; + + async fn embed_stream( + &self, + _request: Request, + ) -> Result, Status> { + let (tx, rx) = tokio::sync::mpsc::channel::>(1); + drop(tx); + Ok(Response::new(Box::pin( + tokio_stream::wrappers::ReceiverStream::new(rx), + ))) + } + + async fn get_stats( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(StatsResponse::default())) + } +} + +/// Iter 195 — stand up a server with `Server::timeout(timeout_ms)` +/// wrapping a `SlowMockWorker(handler_sleep_ms)`. When +/// handler_sleep > timeout the tonic tower-timeout middleware fires +/// and the client sees `Status::cancelled`. When handler_sleep < +/// timeout the request completes normally. +async fn start_timeout_server( + timeout_ms: u64, + handler_sleep_ms: u64, +) -> SocketAddr { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let incoming = TcpListenerStream::new(listener); + + let svc = EmbeddingServer::new(SlowMockWorker { handler_sleep_ms }); + tokio::spawn(async move { + Server::builder() + .timeout(Duration::from_millis(timeout_ms)) + .add_service(svc) + .serve_with_incoming(incoming) + .await + .ok(); + }); + tokio::time::sleep(Duration::from_millis(50)).await; + addr +} + /// Iter 194 — stand up an EmbeddingServer with the encoding cap set /// (mirrors `start_capped_server` for the iter-180 byte cap). Returns /// the bound addr once the listener is accepting. @@ -319,3 +405,82 @@ async fn embed_response_under_encoding_cap_succeeds() { assert_eq!(body.dim, 4_000, "oversized mock returns dim=4000"); assert_eq!(body.vector.len(), 4_000, "vector length matches dim"); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn embed_handler_exceeding_timeout_returns_cancelled() { + // Iter 182's Server::timeout. Stand up a server with a 200 ms + // RPC bound. Handler sleeps 1 s — well past the cap. Expect the + // tower-timeout middleware to drop the future at the first await + // point past the deadline; tonic surfaces this to the client as + // `Status::cancelled`. The end-to-end path covers the full + // tonic + tower interaction so a regression in either layer trips + // this test. + let timeout_ms = 200; + let handler_sleep_ms = 1_000; + let addr = start_timeout_server(timeout_ms, handler_sleep_ms).await; + let endpoint = tonic::transport::Endpoint::from_shared(format!("http://{}", addr)) + .unwrap() + .connect_timeout(Duration::from_secs(2)) + // Generous client-side request timeout so we measure the + // server-side bound, not the channel's. We expect the server + // to fail this in ~200 ms anyway. + .timeout(Duration::from_secs(5)); + let channel = endpoint.connect().await.expect("connect"); + let mut client = EmbeddingClient::new(channel); + + let req = tonic::Request::new(EmbedRequest { + text: "slow".into(), + max_seq: 16, + request_id: "dos-timeout-test".into(), + }); + + let started = std::time::Instant::now(); + let err = client + .embed(req) + .await + .expect_err("slow handler should be killed by Server::timeout"); + let elapsed = started.elapsed(); + + assert_eq!( + err.code(), + Code::Cancelled, + "tonic tower-timeout middleware surfaces as Cancelled; got \ + {:?} with message {:?}", + err.code(), + err.message(), + ); + + // Belt-and-suspenders: if the cap fired correctly, we should land + // well under the handler's 1 s sleep. Allow generous slack for + // CI scheduler jitter (3× the timeout). + assert!( + elapsed < Duration::from_millis(timeout_ms * 3), + "request returned in {:?}, expected < {:?} (handler would have \ + taken {:?} without the cap)", + elapsed, + Duration::from_millis(timeout_ms * 3), + Duration::from_millis(handler_sleep_ms), + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn embed_handler_within_timeout_succeeds() { + // Counterpart: 1 s server timeout, 50 ms handler sleep. The + // happy path completes well under the cap and returns a normal + // response. + let addr = start_timeout_server(1_000, 50).await; + let endpoint = tonic::transport::Endpoint::from_shared(format!("http://{}", addr)) + .unwrap() + .connect_timeout(Duration::from_secs(2)); + let channel = endpoint.connect().await.expect("connect"); + let mut client = EmbeddingClient::new(channel); + + let req = tonic::Request::new(EmbedRequest { + text: "fast".into(), + max_seq: 16, + request_id: "dos-timeout-ok".into(), + }); + + let resp = client.embed(req).await.expect("fast handler should complete"); + assert_eq!(resp.into_inner().dim, 384); +}