fix(brain): SSE connection limiter, pipeline rate limit, Firestore pagination fallback (ADR-130)

Three fixes for recurring pi.ruv.io outages:

1. SSE connection limiter (max 50) — prevents MCP reconnect storms from
   exhausting Cloud Run concurrency slots. Tracks active count with
   AtomicUsize, rejects excess with 429.

2. Pipeline optimize rate limiter — max 1 concurrent request with 30s
   cooldown. Prevents scheduler thundering herd from CPU-saturating
   the instance.

3. Firestore pagination offset fallback — when page tokens go stale
   after OOM restart (400 Bad Request), switches to offset-based
   pagination to load all documents instead of stopping at first batch.

Also adds /v1/ready lightweight probe (zero-cost, no state access)
for Cloud Run health checks.

ADR-130 documents the full decoupling architecture (SSE service split).

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
rUv 2026-03-30 11:59:35 +00:00
parent f7dd9b8865
commit a09251c75c
4 changed files with 385 additions and 8 deletions

View file

@ -276,6 +276,9 @@ pub async fn create_router() -> (Router, AppState) {
notifier: crate::notify::ResendNotifier::from_env(),
cached_status: Arc::new(parking_lot::RwLock::new(None)),
gist_publisher: crate::gist::GistPublisher::from_env().map(Arc::new),
optimize_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
last_optimize_completed: Arc::new(parking_lot::RwLock::new(None)),
sse_connections: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let router = Router::new()
@ -287,6 +290,7 @@ pub async fn create_router() -> (Router, AppState) {
.route("/.well-known/agent-guide.md", get(agent_guide))
.route("/origin", get(origin_page))
.route("/v1/health", get(health))
.route("/v1/ready", get(ready))
.route("/v1/challenge", get(issue_challenge))
.route("/v1/memories", post(share_memory))
.route("/v1/memories/search", get(search_memories))
@ -1139,6 +1143,16 @@ async fn health(State(state): State<AppState>) -> Json<HealthResponse> {
})
}
/// GET /v1/ready — lightweight readiness probe (ADR-130).
/// Returns 200 immediately. No computation, no state access.
async fn ready() -> StatusCode {
StatusCode::OK
}
/// Maximum concurrent SSE connections per instance (ADR-130 Phase 1).
/// Prevents reconnect storms from exhausting Cloud Run concurrency slots.
const MAX_SSE_CONNECTIONS: usize = 50;
/// Issue a challenge nonce for replay protection.
/// Clients must include this nonce in write requests.
/// Nonces are single-use and expire after 5 minutes.
@ -3378,6 +3392,9 @@ async fn pipeline_metrics_handler(
}
/// POST /v1/pipeline/optimize — trigger optimization actions
///
/// Rate-limited: max 1 concurrent, 30s cooldown between runs.
/// Prevents scheduler thundering herd from saturating the instance.
async fn pipeline_optimize(
State(state): State<AppState>,
_contributor: AuthenticatedContributor,
@ -3385,6 +3402,27 @@ async fn pipeline_optimize(
) -> Result<Json<OptimizeResponse>, (StatusCode, String)> {
check_read_only(&state)?;
// Enforce 30-second cooldown between optimize runs
{
let last = state.last_optimize_completed.read();
if let Some(ts) = *last {
if ts.elapsed() < std::time::Duration::from_secs(30) {
let wait = 30 - ts.elapsed().as_secs();
return Err((
StatusCode::TOO_MANY_REQUESTS,
format!("Pipeline optimize cooldown: retry in {wait}s"),
));
}
}
}
// Only 1 concurrent optimize — reject others immediately
let _permit = state.optimize_semaphore.try_acquire()
.map_err(|_| (
StatusCode::TOO_MANY_REQUESTS,
"Pipeline optimize already in progress".to_string(),
))?;
let all_actions = vec![
"train", "drift_check", "transfer_all", "rebuild_graph", "cleanup", "attractor_analysis",
];
@ -3492,6 +3530,7 @@ async fn pipeline_optimize(
}
state.pipeline_metrics.optimization_cycles.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
*state.last_optimize_completed.write() = Some(std::time::Instant::now());
Ok(Json(OptimizeResponse {
results,
@ -4566,19 +4605,32 @@ async fn origin_page() -> (
/// SSE handler — client connects here, receives event stream
async fn sse_handler(
State(state): State<AppState>,
) -> Sse<impl tokio_stream::Stream<Item = Result<Event, std::convert::Infallible>>> {
) -> Result<Sse<impl tokio_stream::Stream<Item = Result<Event, std::convert::Infallible>>>, (StatusCode, String)> {
// ADR-130 Phase 1: reject new SSE connections when at capacity
let current = state.sse_connections.load(Ordering::Relaxed);
if current >= MAX_SSE_CONNECTIONS {
tracing::warn!("SSE connection limit reached ({}/{}), rejecting", current, MAX_SSE_CONNECTIONS);
return Err((
StatusCode::TOO_MANY_REQUESTS,
format!("SSE connection limit reached ({MAX_SSE_CONNECTIONS}). Retry-After: 10"),
));
}
state.sse_connections.fetch_add(1, Ordering::Relaxed);
let session_id = Uuid::new_v4().to_string();
let (tx, rx) = tokio::sync::mpsc::channel::<String>(64);
// Store sender for this session
state.sessions.insert(session_id.clone(), tx);
tracing::info!("SSE session started: {}", session_id);
tracing::info!("SSE session started: {} (active: {})", session_id,
state.sse_connections.load(Ordering::Relaxed));
// Build SSE stream: first event is the endpoint, then stream messages
let initial_event = format!("/messages?sessionId={session_id}");
let session_id_cleanup = session_id.clone();
let sessions_cleanup = state.sessions.clone();
let sse_counter = state.sse_connections.clone();
let stream = async_stream::stream! {
// Send endpoint event first
@ -4590,9 +4642,13 @@ async fn sse_handler(
yield Ok(Event::default().event("message").data(msg));
}
// Decrement connection counter on disconnect
sse_counter.fetch_sub(1, Ordering::Relaxed);
// Clean up session on disconnect — grace period lets clients reconnect
// without losing the session (e.g. MCP SDK's EventSource polyfill)
tracing::info!("SSE stream closed for session: {}, starting 30s grace period", session_id_cleanup);
tracing::info!("SSE stream closed for session: {}, starting 30s grace period (active: {})",
session_id_cleanup, sse_counter.load(Ordering::Relaxed));
tokio::spawn({
let sessions = sessions_cleanup.clone();
let sid = session_id_cleanup.clone();
@ -4609,7 +4665,7 @@ async fn sse_handler(
});
};
Sse::new(stream).keep_alive(KeepAlive::default())
Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
}
/// Query params for /messages endpoint

View file

@ -314,10 +314,19 @@ impl FirestoreClient {
let mut all_docs = Vec::new();
let mut page_token: Option<String> = None;
let mut consecutive_errors: usize = 0;
// Track whether we're using offset-based fallback (after stale page token)
let mut use_offset_fallback = false;
loop {
let mut url = format!("{base}/{collection}?pageSize=300");
if let Some(ref token) = page_token {
if use_offset_fallback {
// Stale page token fallback: use offset to skip already-loaded docs
url.push_str(&format!("&offset={}", all_docs.len()));
tracing::info!(
"Firestore LIST {collection}: using offset fallback at {} docs",
all_docs.len()
);
} else if let Some(ref token) = page_token {
url.push_str(&format!("&pageToken={}", urlencoding::encode(token)));
}
@ -329,6 +338,7 @@ impl FirestoreClient {
let resp = match result {
Ok(resp) if resp.status().is_success() => {
consecutive_errors = 0;
use_offset_fallback = false; // back on happy path
resp
}
Ok(resp) if resp.status().as_u16() == 401 => {
@ -367,11 +377,24 @@ impl FirestoreClient {
}
}
Ok(resp) => {
let status = resp.status().as_u16();
consecutive_errors += 1;
tracing::warn!(
"Firestore LIST {collection} returned {} (error {}/{})",
resp.status(), consecutive_errors, Self::MAX_PAGE_ERRORS
status, consecutive_errors, Self::MAX_PAGE_ERRORS
);
// 400 Bad Request with a page token means the token is stale
// (e.g. after OOM restart). Switch to offset-based pagination.
if status == 400 && page_token.is_some() && !use_offset_fallback {
tracing::warn!(
"Firestore LIST {collection}: stale page token at {} docs, switching to offset fallback",
all_docs.len()
);
page_token = None;
use_offset_fallback = true;
consecutive_errors = 0; // reset — this is a recovery, not repeated failure
continue;
}
if consecutive_errors >= Self::MAX_PAGE_ERRORS { break; }
continue;
}
@ -418,8 +441,18 @@ impl FirestoreClient {
// Check for next page
match body.get("nextPageToken").and_then(|t| t.as_str()) {
Some(token) => page_token = Some(token.to_string()),
None => break,
Some(token) if !use_offset_fallback => {
page_token = Some(token.to_string());
}
_ if use_offset_fallback => {
// In offset mode, check if we got any docs this page
// (no docs = we've exhausted the collection)
if body.get("documents").and_then(|d| d.as_array()).map_or(true, |d| d.is_empty()) {
break;
}
// Otherwise continue with incremented offset (all_docs.len() grows each iteration)
}
_ => break,
}
}

View file

@ -1341,6 +1341,12 @@ pub struct AppState {
pub cached_status: std::sync::Arc<parking_lot::RwLock<Option<(std::time::Instant, StatusResponse)>>>,
/// GitHub Gist publisher for autonomous discoveries — None if GITHUB_GIST_PAT not set
pub gist_publisher: Option<std::sync::Arc<crate::gist::GistPublisher>>,
/// Semaphore to limit concurrent pipeline/optimize requests (prevents scheduler thundering herd)
pub optimize_semaphore: std::sync::Arc<tokio::sync::Semaphore>,
/// Timestamp of last completed pipeline/optimize run (for cooldown enforcement)
pub last_optimize_completed: std::sync::Arc<parking_lot::RwLock<Option<std::time::Instant>>>,
/// Active SSE connection count (ADR-130 Phase 1 — prevents SSE reconnect storms)
pub sse_connections: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}
// ──────────────────────────────────────────────────────────────────────

View file

@ -0,0 +1,282 @@
# ADR-130: MCP SSE Decoupling via Midstream Queue Architecture
## Status
Proposed
## Date
2026-03-29
## Context
pi.ruv.io has experienced three outages in two days, all traced to the same root cause: **MCP SSE transport sharing the Cloud Run concurrency pool with the REST API**. Each SSE connection holds an open HTTP stream on a Cloud Run request slot indefinitely. When MCP clients disconnect and reconnect in loops (IDE restarts, network blips, SSE polyfill reconnects), they create a reconnect storm that exhausts all concurrency slots, returning 429 to every request — including health checks, scheduler jobs, and the REST API.
### Failure Timeline
| Date | Incident | Root Cause |
|------|----------|------------|
| 2026-03-28 AM | 504 Gateway Timeout | Scheduler thundering herd saturated CPU |
| 2026-03-28 PM | 503 "Service is disabled" | GFE marked service unavailable after cascading timeouts |
| 2026-03-29 AM | OOM crash → partial data load | 5,898 memories + 12.5M edge graph exceeded 2GB |
| 2026-03-29 PM | 429 Rate Exceeded (all endpoints) | SSE reconnect storm consumed all concurrency slots |
### Current Architecture (broken)
```
MCP Clients ──SSE──┐
Health Checks ─────┤
Scheduler Jobs ────┤── Cloud Run (single service, shared concurrency)
REST API ──────────┤ ruvbrain: 2 CPU, 4GB, concurrency=250
Browser/UI ────────┘
```
All traffic types compete for the same concurrency slots. SSE connections are long-lived (minutes to hours). REST requests are short-lived (milliseconds). Mixing them on the same concurrency pool is fundamentally broken at scale.
## Decision
### Split into three decoupled services with a Rust midstream queue
```
┌─────────────────────┐
MCP Clients ──SSE──▶│ ruvbrain-sse │──push──▶┌──────────────┐
│ (Cloud Run) │ │ │
│ Concurrency: 500 │◀─poll───│ Midstream │
│ CPU: 1, Mem: 512MB │ │ Queue │
└─────────────────────┘ │ (in-process) │
│ │
Health/Scheduler ──▶┌─────────────────────┐ │ Ring buffer │
REST API ──────────▶│ ruvbrain-api │──push──▶│ per session │
Browser/UI ────────▶│ (Cloud Run) │ │ │
│ Concurrency: 80 │◀─poll───│ │
│ CPU: 2, Mem: 4GB │ └──────────────┘
└─────────────────────┘
┌─────────────────────┐
Scheduler Jobs ───▶│ ruvbrain-worker │
│ (Cloud Run Jobs) │
│ GPU: L4 (optional) │
│ Timeout: 1hr │
└─────────────────────┘
```
### Service Separation
| Service | Purpose | Concurrency | CPU | Memory | Cost/month |
|---------|---------|-------------|-----|--------|------------|
| `ruvbrain-api` | REST API, health, status | 80 | 2 | 4 GB | ~$30 |
| `ruvbrain-sse` | MCP SSE transport only | 500 | 1 | 512 MB | ~$10 |
| `ruvbrain-worker` | Scheduler jobs (train, drift, transfer) | 1 | 2 | 4 GB | ~$15 (job-based) |
### Midstream Queue (Rust, in-process)
The queue bridges SSE and API services. It runs inside `ruvbrain-api` as a Rust module — no external dependencies (no Pub/Sub, no Redis).
```rust
/// Midstream message queue for SSE decoupling.
/// Each MCP session gets a bounded ring buffer.
/// API writes responses into the buffer; SSE service polls via internal endpoint.
pub struct MidstreamQueue {
/// Session ID → bounded ring buffer of JSON-RPC responses
sessions: DashMap<String, SessionBuffer>,
/// Maximum sessions before evicting oldest idle
max_sessions: usize,
/// Maximum messages per session buffer
buffer_capacity: usize,
}
pub struct SessionBuffer {
messages: VecDeque<String>,
created_at: Instant,
last_poll: Instant,
capacity: usize,
}
impl MidstreamQueue {
pub fn new(max_sessions: usize, buffer_capacity: usize) -> Self {
Self {
sessions: DashMap::new(),
max_sessions,
buffer_capacity,
}
}
/// Called by API when processing a JSON-RPC request for a session.
/// Pushes the response into the session's ring buffer.
pub fn push(&self, session_id: &str, message: String) -> Result<(), QueueError> {
let mut entry = self.sessions
.entry(session_id.to_string())
.or_insert_with(|| SessionBuffer::new(self.buffer_capacity));
entry.push(message);
Ok(())
}
/// Called by SSE service to drain pending messages for a session.
/// Returns all buffered messages and clears the buffer.
pub fn drain(&self, session_id: &str) -> Vec<String> {
if let Some(mut entry) = self.sessions.get_mut(session_id) {
entry.last_poll = Instant::now();
entry.messages.drain(..).collect()
} else {
Vec::new()
}
}
/// Evict sessions idle for > timeout (called periodically).
pub fn evict_idle(&self, timeout: Duration) {
let now = Instant::now();
self.sessions.retain(|_, buf| now.duration_since(buf.last_poll) < timeout);
}
}
```
### SSE Service Protocol
The `ruvbrain-sse` service is a thin proxy:
1. **Client connects** via `GET /sse` → SSE service creates session, sends `endpoint` event
2. **Client sends** JSON-RPC via `POST /messages?sessionId=X` → SSE service forwards to `ruvbrain-api` internal endpoint
3. **API processes** request, pushes response into midstream queue
4. **SSE service polls** `ruvbrain-api` at `/internal/queue/drain?sessionId=X` every 100ms (or uses Server-Sent Events from API → SSE via internal endpoint)
5. **SSE service streams** response to client
The SSE service has **no business logic** — it only manages WebSocket/SSE transport. All brain logic stays in `ruvbrain-api`.
### Worker Service (Scheduler Isolation)
Heavy scheduler jobs move to `ruvbrain-worker` as Cloud Run Jobs:
| Job | Current | New |
|-----|---------|-----|
| `brain-train` | POST to API (every 5m) | Cloud Run Job (every 5m) — reads Firestore directly |
| `brain-transfer` | POST to API (every 30m) | Cloud Run Job (every 30m) — reads Firestore directly |
| `brain-attractor` | POST to API (every 20m) | Cloud Run Job (every 20m) |
| `brain-graph` | POST to API (hourly) | Cloud Run Job (hourly) — rebuilds graph, writes back |
| `brain-drift` | POST to API (every 15m) | Stays on API (lightweight, read-only) |
| `brain-cleanup` | POST to API (daily) | Cloud Run Job (daily) |
Workers read from and write to Firestore directly. They share the same Rust crates (`mcp-brain-server::store`, `mcp-brain-server::graph`, `sona`) compiled into a separate binary.
### Internal API Endpoints (not exposed externally)
Added to `ruvbrain-api` for SSE service communication:
```
POST /internal/queue/push — SSE service forwards JSON-RPC here
GET /internal/queue/drain — SSE service polls for responses
POST /internal/session/create — SSE service registers new session
DELETE /internal/session/:id — SSE service cleans up on disconnect
```
These are authenticated via an internal service account token (Cloud Run service-to-service auth).
## Implementation Plan
### Phase 1: SSE Connection Limiting (immediate, no new services)
Add server-side SSE connection limits to the existing monolith to stop the bleeding:
```rust
/// Maximum concurrent SSE connections per instance
const MAX_SSE_CONNECTIONS: usize = 50;
/// SSE idle timeout — disconnect clients that haven't sent a message in 5 minutes
const SSE_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
/// Backoff header sent on 429 to slow reconnect storms
const SSE_RETRY_AFTER: u32 = 10; // seconds
```
1. Track active SSE count with `AtomicUsize`
2. Reject new SSE connections with `429 + Retry-After: 10` when at capacity
3. Add idle timeout — disconnect SSE sessions with no activity for 5 minutes
4. Add exponential backoff hint in SSE retry field
### Phase 2: Midstream Queue Module (week 1)
Implement `MidstreamQueue` as a module in `crates/mcp-brain-server/src/midstream_queue.rs`:
1. Ring buffer per session (capacity: 64 messages)
2. Idle session eviction (5 min timeout)
3. Max 200 concurrent sessions
4. Internal drain endpoint for future SSE service
### Phase 3: Service Split (week 2)
1. Create `ruvbrain-sse` Dockerfile (thin, no graph/embeddings — just SSE + HTTP proxy)
2. Create `ruvbrain-worker` binary (scheduler jobs, shares store/graph crates)
3. Deploy both alongside existing `ruvbrain-api`
4. Update Cloud Scheduler to target worker jobs instead of API
5. Update DNS/domain mapping
### Phase 4: Worker Migration (week 3)
1. Convert `brain-train`, `brain-transfer`, `brain-attractor`, `brain-graph` to direct Firestore workers
2. Remove `/v1/pipeline/optimize` endpoint from API (workers don't need it)
3. Add write-back protocol: worker writes results to Firestore, API reads on next request
## SSE Connection Limiting (Phase 1 — ship immediately)
This is the minimum fix to stop outages while the full decoupling is built:
```rust
// In AppState:
pub sse_connections: Arc<AtomicUsize>,
// In sse_handler:
async fn sse_handler(State(state): State<AppState>) -> Result<Sse<...>, (StatusCode, String)> {
let current = state.sse_connections.load(Ordering::Relaxed);
if current >= MAX_SSE_CONNECTIONS {
return Err((
StatusCode::TOO_MANY_REQUESTS,
"SSE connection limit reached. Retry-After: 10".into(),
));
}
state.sse_connections.fetch_add(1, Ordering::Relaxed);
// ... existing SSE logic ...
// On stream close:
state.sse_connections.fetch_sub(1, Ordering::Relaxed);
}
```
## Alternatives Considered
1. **Google Cloud Pub/Sub**: External message queue between SSE and API. Adds latency (~50ms), cost ($0.40/million messages), and operational complexity. The in-process ring buffer is simpler and faster for this scale.
2. **Cloud Run WebSocket support**: Cloud Run supports WebSockets but with the same concurrency model. Doesn't solve the slot exhaustion problem.
3. **Separate domain for SSE** (`sse.pi.ruv.io`): Routes SSE to a different Cloud Run service but still shares the same codebase/binary. Partial solution — helps with concurrency isolation but doesn't decouple the business logic.
4. **Cloud Run min-instances scaling**: Set `min-instances=3` to absorb SSE load across more instances. Increases cost 3x without solving the architectural issue. Band-aid.
5. **Move to GKE**: Full Kubernetes with separate deployments per concern. Correct architecture but massive operational overhead for a single-developer project.
6. **Redis Pub/Sub**: External broker. Adds a managed service dependency. Overkill for session-scoped message passing where the in-process queue suffices.
## Cost Impact
| Component | Current | After Split |
|-----------|---------|-------------|
| API (ruvbrain) | ~$40/mo (2 CPU, 4GB, always-on) | ~$30/mo (same but less load) |
| SSE service | included above | ~$10/mo (1 CPU, 512MB) |
| Worker jobs | included above | ~$15/mo (on-demand, job-based) |
| **Total** | **~$40/mo** | **~$55/mo** |
$15/month increase for elimination of all concurrency-related outages.
## Risks
| Risk | Impact | Mitigation |
|------|--------|------------|
| SSE ↔ API latency | 100ms polling adds latency to MCP responses | Use internal SSE stream instead of polling; fallback to 50ms poll |
| Worker ↔ API cache coherence | Worker writes to Firestore; API has stale in-memory cache | API refreshes from Firestore on cache miss; add TTL to cached data |
| Increased deploy complexity | 3 services instead of 1 | Shared Dockerfile base; single `deploy_all.sh` script |
| SSE service statelessness | Session affinity needed for SSE reconnects | Cloud Run session affinity on SSE service |
## References
- [ADR-066: SSE MCP Transport](./ADR-066-sse-mcp-transport.md)
- [ADR-077: Midstream Brain Integration](./ADR-077-midstream-brain-integration.md)
- [Cloud Run concurrency model](https://cloud.google.com/run/docs/about-concurrency)
- [MCP SSE Transport specification](https://modelcontextprotocol.io/docs/concepts/transports#sse)