diff --git a/crates/ruvector-hailo-cluster/tests/dos_gates.rs b/crates/ruvector-hailo-cluster/tests/dos_gates.rs index 9e8b43a25..d14fe4e73 100644 --- a/crates/ruvector-hailo-cluster/tests/dos_gates.rs +++ b/crates/ruvector-hailo-cluster/tests/dos_gates.rs @@ -163,6 +163,92 @@ impl Embedding for OversizedResponseMockWorker { } } +/// Iter 195 — companion fixture for iter-182's `Server::timeout`. +/// Stands up a worker whose `embed` handler sleeps `handler_sleep_ms` +/// before returning, so the test can drive the timeout middleware +/// without an actual hang. +#[derive(Clone)] +struct SlowMockWorker { + handler_sleep_ms: u64, +} + +#[tonic::async_trait] +impl Embedding for SlowMockWorker { + async fn embed( + &self, + _request: Request, + ) -> Result, Status> { + tokio::time::sleep(Duration::from_millis(self.handler_sleep_ms)).await; + Ok(Response::new(EmbedResponse { + vector: vec![0.0; 384], + dim: 384, + latency_us: (self.handler_sleep_ms * 1_000) as i64, + })) + } + + async fn health( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(HealthResponse { + version: "slow-mock".into(), + device_id: "slow:0".into(), + model_fingerprint: "fp:slow".into(), + ready: true, + npu_temp_ts0_celsius: 0.0, + npu_temp_ts1_celsius: 0.0, + })) + } + + type EmbedStreamStream = Pin< + Box> + Send + 'static>, + >; + + async fn embed_stream( + &self, + _request: Request, + ) -> Result, Status> { + let (tx, rx) = tokio::sync::mpsc::channel::>(1); + drop(tx); + Ok(Response::new(Box::pin( + tokio_stream::wrappers::ReceiverStream::new(rx), + ))) + } + + async fn get_stats( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(StatsResponse::default())) + } +} + +/// Iter 195 — stand up a server with `Server::timeout(timeout_ms)` +/// wrapping a `SlowMockWorker(handler_sleep_ms)`. When +/// handler_sleep > timeout the tonic tower-timeout middleware fires +/// and the client sees `Status::cancelled`. When handler_sleep < +/// timeout the request completes normally. +async fn start_timeout_server( + timeout_ms: u64, + handler_sleep_ms: u64, +) -> SocketAddr { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let incoming = TcpListenerStream::new(listener); + + let svc = EmbeddingServer::new(SlowMockWorker { handler_sleep_ms }); + tokio::spawn(async move { + Server::builder() + .timeout(Duration::from_millis(timeout_ms)) + .add_service(svc) + .serve_with_incoming(incoming) + .await + .ok(); + }); + tokio::time::sleep(Duration::from_millis(50)).await; + addr +} + /// Iter 194 — stand up an EmbeddingServer with the encoding cap set /// (mirrors `start_capped_server` for the iter-180 byte cap). Returns /// the bound addr once the listener is accepting. @@ -319,3 +405,82 @@ async fn embed_response_under_encoding_cap_succeeds() { assert_eq!(body.dim, 4_000, "oversized mock returns dim=4000"); assert_eq!(body.vector.len(), 4_000, "vector length matches dim"); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn embed_handler_exceeding_timeout_returns_cancelled() { + // Iter 182's Server::timeout. Stand up a server with a 200 ms + // RPC bound. Handler sleeps 1 s — well past the cap. Expect the + // tower-timeout middleware to drop the future at the first await + // point past the deadline; tonic surfaces this to the client as + // `Status::cancelled`. The end-to-end path covers the full + // tonic + tower interaction so a regression in either layer trips + // this test. + let timeout_ms = 200; + let handler_sleep_ms = 1_000; + let addr = start_timeout_server(timeout_ms, handler_sleep_ms).await; + let endpoint = tonic::transport::Endpoint::from_shared(format!("http://{}", addr)) + .unwrap() + .connect_timeout(Duration::from_secs(2)) + // Generous client-side request timeout so we measure the + // server-side bound, not the channel's. We expect the server + // to fail this in ~200 ms anyway. + .timeout(Duration::from_secs(5)); + let channel = endpoint.connect().await.expect("connect"); + let mut client = EmbeddingClient::new(channel); + + let req = tonic::Request::new(EmbedRequest { + text: "slow".into(), + max_seq: 16, + request_id: "dos-timeout-test".into(), + }); + + let started = std::time::Instant::now(); + let err = client + .embed(req) + .await + .expect_err("slow handler should be killed by Server::timeout"); + let elapsed = started.elapsed(); + + assert_eq!( + err.code(), + Code::Cancelled, + "tonic tower-timeout middleware surfaces as Cancelled; got \ + {:?} with message {:?}", + err.code(), + err.message(), + ); + + // Belt-and-suspenders: if the cap fired correctly, we should land + // well under the handler's 1 s sleep. Allow generous slack for + // CI scheduler jitter (3× the timeout). + assert!( + elapsed < Duration::from_millis(timeout_ms * 3), + "request returned in {:?}, expected < {:?} (handler would have \ + taken {:?} without the cap)", + elapsed, + Duration::from_millis(timeout_ms * 3), + Duration::from_millis(handler_sleep_ms), + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn embed_handler_within_timeout_succeeds() { + // Counterpart: 1 s server timeout, 50 ms handler sleep. The + // happy path completes well under the cap and returns a normal + // response. + let addr = start_timeout_server(1_000, 50).await; + let endpoint = tonic::transport::Endpoint::from_shared(format!("http://{}", addr)) + .unwrap() + .connect_timeout(Duration::from_secs(2)); + let channel = endpoint.connect().await.expect("connect"); + let mut client = EmbeddingClient::new(channel); + + let req = tonic::Request::new(EmbedRequest { + text: "fast".into(), + max_seq: 16, + request_id: "dos-timeout-ok".into(), + }); + + let resp = client.embed(req).await.expect("fast handler should complete"); + assert_eq!(resp.into_inner().dim, 384); +}