test(hailo): lock in iter-182 RPC timeout behavior (iter 195)

Adds two cases to dos_gates.rs to lock in the iter-182
`Server::timeout` middleware behavior. iter-182 picked tonic's
tower-timeout cap to bound slow-loris attacks and any handler that
hangs past its budget; without a regression test, a future change
that unbinds the timeout silently lets the worker accumulate stuck
handlers again.

  embed_handler_exceeding_timeout_returns_cancelled
    Server::timeout(200 ms), handler sleeps 1 s. Asserts:
      * status code = Cancelled (tonic's tower-timeout middleware
        wraps tower's Elapsed error in Status::cancelled, per the
        iter-182 commit message)
      * elapsed wall time < 600 ms (3× timeout) — proves the cap
        actually fired rather than the request completing some
        other way

  embed_handler_within_timeout_succeeds
    Server::timeout(1 s), handler sleeps 50 ms. Confirms the cap
    doesn't accidentally block legitimate fast traffic — guards
    against a future "tighten the timeout to 10 ms" change that
    would break every embed.

dos_gates.rs now has six cases covering three of the six gates:
  byte cap (iter 180)        : 2/2
  encoding cap (iter 190)    : 2/2
  RPC timeout (iter 182)     : 2/2 ← new

Validated:
  - dos_gates suite: 6/6 pass in 0.25 s
  - full integration sweep: 1 pre-existing flake unrelated to this
    iter (`cluster_load_distribution::p2c_ewma_biases_toward_fast_worker_under_load`,
    confirmed flaky 1/5 — depends on tokio scheduler timing for
    a 2:1 EWMA dispatch ratio, intermittent across the session)

Pi worker untouched; pure test-suite addition.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruvnet 2026-05-03 18:58:36 -04:00
parent 01a7588b9d
commit 952bc9b85f

View file

@ -163,6 +163,92 @@ impl Embedding for OversizedResponseMockWorker {
}
}
/// Iter 195 — companion fixture for iter-182's `Server::timeout`.
/// Stands up a worker whose `embed` handler sleeps `handler_sleep_ms`
/// before returning, so the test can drive the timeout middleware
/// without an actual hang.
#[derive(Clone)]
struct SlowMockWorker {
handler_sleep_ms: u64,
}
#[tonic::async_trait]
impl Embedding for SlowMockWorker {
async fn embed(
&self,
_request: Request<EmbedRequest>,
) -> Result<Response<EmbedResponse>, Status> {
tokio::time::sleep(Duration::from_millis(self.handler_sleep_ms)).await;
Ok(Response::new(EmbedResponse {
vector: vec![0.0; 384],
dim: 384,
latency_us: (self.handler_sleep_ms * 1_000) as i64,
}))
}
async fn health(
&self,
_request: Request<HealthRequest>,
) -> Result<Response<HealthResponse>, Status> {
Ok(Response::new(HealthResponse {
version: "slow-mock".into(),
device_id: "slow:0".into(),
model_fingerprint: "fp:slow".into(),
ready: true,
npu_temp_ts0_celsius: 0.0,
npu_temp_ts1_celsius: 0.0,
}))
}
type EmbedStreamStream = Pin<
Box<dyn futures_core::Stream<Item = Result<EmbedStreamResponse, Status>> + Send + 'static>,
>;
async fn embed_stream(
&self,
_request: Request<EmbedBatchRequest>,
) -> Result<Response<Self::EmbedStreamStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel::<Result<EmbedStreamResponse, Status>>(1);
drop(tx);
Ok(Response::new(Box::pin(
tokio_stream::wrappers::ReceiverStream::new(rx),
)))
}
async fn get_stats(
&self,
_request: Request<StatsRequest>,
) -> Result<Response<StatsResponse>, Status> {
Ok(Response::new(StatsResponse::default()))
}
}
/// Iter 195 — stand up a server with `Server::timeout(timeout_ms)`
/// wrapping a `SlowMockWorker(handler_sleep_ms)`. When
/// handler_sleep > timeout the tonic tower-timeout middleware fires
/// and the client sees `Status::cancelled`. When handler_sleep <
/// timeout the request completes normally.
async fn start_timeout_server(
timeout_ms: u64,
handler_sleep_ms: u64,
) -> SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let incoming = TcpListenerStream::new(listener);
let svc = EmbeddingServer::new(SlowMockWorker { handler_sleep_ms });
tokio::spawn(async move {
Server::builder()
.timeout(Duration::from_millis(timeout_ms))
.add_service(svc)
.serve_with_incoming(incoming)
.await
.ok();
});
tokio::time::sleep(Duration::from_millis(50)).await;
addr
}
/// Iter 194 — stand up an EmbeddingServer with the encoding cap set
/// (mirrors `start_capped_server` for the iter-180 byte cap). Returns
/// the bound addr once the listener is accepting.
@ -319,3 +405,82 @@ async fn embed_response_under_encoding_cap_succeeds() {
assert_eq!(body.dim, 4_000, "oversized mock returns dim=4000");
assert_eq!(body.vector.len(), 4_000, "vector length matches dim");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn embed_handler_exceeding_timeout_returns_cancelled() {
// Iter 182's Server::timeout. Stand up a server with a 200 ms
// RPC bound. Handler sleeps 1 s — well past the cap. Expect the
// tower-timeout middleware to drop the future at the first await
// point past the deadline; tonic surfaces this to the client as
// `Status::cancelled`. The end-to-end path covers the full
// tonic + tower interaction so a regression in either layer trips
// this test.
let timeout_ms = 200;
let handler_sleep_ms = 1_000;
let addr = start_timeout_server(timeout_ms, handler_sleep_ms).await;
let endpoint = tonic::transport::Endpoint::from_shared(format!("http://{}", addr))
.unwrap()
.connect_timeout(Duration::from_secs(2))
// Generous client-side request timeout so we measure the
// server-side bound, not the channel's. We expect the server
// to fail this in ~200 ms anyway.
.timeout(Duration::from_secs(5));
let channel = endpoint.connect().await.expect("connect");
let mut client = EmbeddingClient::new(channel);
let req = tonic::Request::new(EmbedRequest {
text: "slow".into(),
max_seq: 16,
request_id: "dos-timeout-test".into(),
});
let started = std::time::Instant::now();
let err = client
.embed(req)
.await
.expect_err("slow handler should be killed by Server::timeout");
let elapsed = started.elapsed();
assert_eq!(
err.code(),
Code::Cancelled,
"tonic tower-timeout middleware surfaces as Cancelled; got \
{:?} with message {:?}",
err.code(),
err.message(),
);
// Belt-and-suspenders: if the cap fired correctly, we should land
// well under the handler's 1 s sleep. Allow generous slack for
// CI scheduler jitter (3× the timeout).
assert!(
elapsed < Duration::from_millis(timeout_ms * 3),
"request returned in {:?}, expected < {:?} (handler would have \
taken {:?} without the cap)",
elapsed,
Duration::from_millis(timeout_ms * 3),
Duration::from_millis(handler_sleep_ms),
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn embed_handler_within_timeout_succeeds() {
// Counterpart: 1 s server timeout, 50 ms handler sleep. The
// happy path completes well under the cap and returns a normal
// response.
let addr = start_timeout_server(1_000, 50).await;
let endpoint = tonic::transport::Endpoint::from_shared(format!("http://{}", addr))
.unwrap()
.connect_timeout(Duration::from_secs(2));
let channel = endpoint.connect().await.expect("connect");
let mut client = EmbeddingClient::new(channel);
let req = tonic::Request::new(EmbedRequest {
text: "fast".into(),
max_seq: 16,
request_id: "dos-timeout-ok".into(),
});
let resp = client.embed(req).await.expect("fast handler should complete");
assert_eq!(resp.into_inner().dim, 384);
}