diff --git a/crates/ruvector-rulake/src/cache.rs b/crates/ruvector-rulake/src/cache.rs index fb7848835..83ed58139 100644 --- a/crates/ruvector-rulake/src/cache.rs +++ b/crates/ruvector-rulake/src/cache.rs @@ -24,6 +24,19 @@ //! swap the pointer for free). Under `Consistency::Eventual` the //! witness check is skipped for up to `ttl_ms` after the last //! successful check. +//! +//! ## Key interning (memory-audit finding #1) +//! +//! The hot path used to clone `(String, String)` keys across every +//! `mark_hit` / `mark_miss` / `per_backend_mut` call — ≈96 B/query at +//! federated fan-out with 3 K calls per query. We now intern the +//! backend id and collection id into `Arc` once per incoming +//! `RuLake` call ([`InternedKey`]); every downstream op moves through +//! cheap refcount bumps instead of `String::clone` + hashmap rehashing. +//! +//! The **public** `CacheKey = (String, String)` alias is unchanged so +//! existing callers / tests compile untouched; the `Arc` world is a +//! strict implementation detail of the cache + router hot path. use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -138,8 +151,48 @@ impl PerBackendStats { } /// External lookup key: `(backend_id, collection_id)`. +/// +/// Kept as `(String, String)` for public API stability — callers pass +/// owned tuples by reference at rare diagnostic call sites +/// (`cache_witness_of`, `invalidate_cache`, …) and we intern them into +/// [`InternedKey`] on the boundary. The hot router path never allocates +/// an owned `(String, String)` — see [`InternedKey`]. pub type CacheKey = (BackendId, CollectionId); +/// Build a [`CacheKey`] from two `&str` without sprinkling +/// `.to_string()` across call sites. The public API-compat name from +/// the memory-audit follow-up. +#[inline] +pub fn make_key(backend: &str, collection: &str) -> CacheKey { + (backend.to_string(), collection.to_string()) +} + +/// Internal, refcount-cheap `(backend_id, collection_id)` key. +/// +/// The router interns once per incoming call and hands `Arc` refs +/// down through the cache — every subsequent `mark_hit` / +/// `per_backend_mut` / pointer-map lookup does a refcount bump instead +/// of a `String` clone. That closes memory-audit finding #1. +pub(crate) type InternedKey = (Arc, Arc); + +/// Intern a `(backend, collection)` pair into [`InternedKey`]. +/// +/// One allocation per `Arc` up front; every downstream op is a +/// cheap refcount bump. +#[inline] +pub(crate) fn intern_key(backend: &str, collection: &str) -> InternedKey { + (Arc::from(backend), Arc::from(collection)) +} + +/// Intern an external [`CacheKey`] into [`InternedKey`]. Used by the +/// rare public-facing entrypoints that still take `&CacheKey` +/// (`invalidate_cache`, `witness_of`, …) to cross from the owned-string +/// world into the Arc world before talking to the cache. +#[inline] +pub(crate) fn intern_cache_key(key: &CacheKey) -> InternedKey { + intern_key(&key.0, &key.1) +} + /// Internal content-addressed key: the RuLakeBundle witness (SHAKE-256 /// hex). Two bundles with the same witness are interchangeable; the /// cache deduplicates them. @@ -160,6 +213,16 @@ struct CacheEntry { last_used: Instant, /// internal-position → external id. Arc so we can share-clone /// without copying the vector on every search. + /// + /// Memory-audit finding #2 (HIGH) — this duplicates + /// `RabitqPlusIndex.ids: Vec` inside `ruvector-rabitq`. A dedup + /// would require widening rabitq's internal `ids: Vec` to + /// `Vec` (loses cache-line density) OR exposing a position + /// iterator from rabitq. Both are cross-crate changes and the + /// memory-audit follow-up explicitly scoped us to `ruvector-rulake` + /// only — so we keep the split and document it here. At n=1 M with + /// u64 ids this is 8 MB/entry; entry counts are bounded by + /// `max_entries`, so the waste is capped, not unbounded. pos_to_id: Arc>, /// How many external pointers currently resolve to this witness. /// An entry with `refcount > 0` is ineligible for LRU eviction. @@ -183,25 +246,42 @@ pub struct VectorCache { struct CacheState { /// witness → compressed index entries: HashMap, - /// (backend, collection) → witness - pointers: HashMap, - /// cache-key → last time the witness check ran (for Eventual mode) - last_checked: HashMap, - stats: CacheStats, + /// (backend, collection) → witness. Uses `Arc` keys so that + /// the hot router path never has to own a `(String, String)`. + pointers: HashMap, + /// cache-key → last time the witness check ran (for Eventual mode). + last_checked: HashMap, /// Per-backend counters. Populated lazily on the first event per /// backend id. - per_backend: HashMap, + stats: CacheStats, + per_backend: HashMap, PerBackendStats>, /// Per-`(backend, collection)` counters — same events as /// `per_backend`, attributed one level finer. - per_collection: HashMap, + per_collection: HashMap, } impl CacheState { - fn per_backend_mut(&mut self, backend: &str) -> &mut PerBackendStats { - self.per_backend.entry(backend.to_string()).or_default() + /// Look up (or insert) the per-backend counter by borrowing the + /// existing `Arc` — only creates a new `Arc` allocation + /// when the backend is seen for the first time. + fn per_backend_mut(&mut self, backend: &Arc) -> &mut PerBackendStats { + if !self.per_backend.contains_key(backend) { + self.per_backend + .insert(Arc::clone(backend), PerBackendStats::default()); + } + self.per_backend.get_mut(backend).unwrap() } - fn per_collection_mut(&mut self, key: &CacheKey) -> &mut PerBackendStats { - self.per_collection.entry(key.clone()).or_default() + /// Look up (or insert) the per-collection counter. Same pattern: + /// the `Arc` pair is cloned (refcount bumps × 2) on first-insert + /// only — subsequent events reuse the existing map entry. + fn per_collection_mut(&mut self, key: &InternedKey) -> &mut PerBackendStats { + if !self.per_collection.contains_key(key) { + self.per_collection.insert( + (Arc::clone(&key.0), Arc::clone(&key.1)), + PerBackendStats::default(), + ); + } + self.per_collection.get_mut(key).unwrap() } } @@ -248,23 +328,40 @@ impl VectorCache { /// against a given backend id — empty backends are not in the /// map. Use to see which backend is hot vs cold without having /// to attribute global counters manually. + /// + /// Converts the internal `Arc` keys back to `String` on the + /// way out so the public surface matches the pre-intern API + /// (cold-path call: one clone per entry, run once per diagnostic + /// read). pub fn stats_by_backend(&self) -> HashMap { - self.inner.lock().unwrap().per_backend.clone() + self.inner + .lock() + .unwrap() + .per_backend + .iter() + .map(|(k, v)| (k.as_ref().to_string(), v.clone())) + .collect() } /// Per-`(backend, collection)` counters. One level finer than /// `stats_by_backend`: operators who want to know "which /// collection is hot?" get exact attribution. pub fn stats_by_collection(&self) -> HashMap { - self.inner.lock().unwrap().per_collection.clone() + self.inner + .lock() + .unwrap() + .per_collection + .iter() + .map(|((b, c), v)| ((b.as_ref().to_string(), c.as_ref().to_string()), v.clone())) + .collect() } /// Compress a pulled batch into a RaBitQ index and associate it with /// the target witness. Bookkeeping happens under the lock; the /// heavy O(n·D) compression runs BEFORE acquiring the lock. - pub fn prime( + pub(crate) fn prime_interned( &self, - key: CacheKey, + key: InternedKey, witness: WitnessKey, batch: PulledBatch, ) -> crate::Result<()> { @@ -343,6 +440,18 @@ impl VectorCache { rc } + /// Public entrypoint kept for API compatibility. Interns the + /// `(String, String)` key once, then delegates to the hot path. + pub fn prime( + &self, + key: CacheKey, + witness: WitnessKey, + batch: PulledBatch, + ) -> crate::Result<()> { + let interned = intern_cache_key(&key); + self.prime_interned(interned, witness, batch) + } + /// Evict the least-recently-used unpinned entry until we're at or /// below `cap`. Pinned entries are skipped; in the worst case every /// entry is pinned and we can't evict anyone — that's by design. @@ -371,12 +480,10 @@ impl VectorCache { fn inner_install_pointer_unlocked( &self, inner: &mut CacheState, - key: CacheKey, + key: InternedKey, witness: WitnessKey, shared: bool, ) -> crate::Result<()> { - let backend_id = key.0.clone(); - let key_attrib = key.clone(); // If this key already points somewhere, decrement the old entry. if let Some(old_w) = inner.pointers.remove(&key) { if let Some(e) = inner.entries.get_mut(&old_w) { @@ -384,8 +491,8 @@ impl VectorCache { if e.refcount == 0 { inner.entries.remove(&old_w); inner.stats.invalidations += 1; - inner.per_backend_mut(&backend_id).invalidations += 1; - inner.per_collection_mut(&key_attrib).invalidations += 1; + inner.per_backend_mut(&key.0).invalidations += 1; + inner.per_collection_mut(&key).invalidations += 1; } } } @@ -394,11 +501,11 @@ impl VectorCache { e.refcount = e.refcount.saturating_add(1); e.last_checked = Instant::now(); } - inner.last_checked.insert(key, Instant::now()); + inner.last_checked.insert(key.clone(), Instant::now()); if shared { inner.stats.shared_hits += 1; - inner.per_backend_mut(&backend_id).shared_hits += 1; - inner.per_collection_mut(&key_attrib).shared_hits += 1; + inner.per_backend_mut(&key.0).shared_hits += 1; + inner.per_collection_mut(&key).shared_hits += 1; } Ok(()) } @@ -407,6 +514,11 @@ impl VectorCache { /// The underlying entry is garbage-collected when its last pointer /// goes. pub fn invalidate(&self, key: &CacheKey) { + let interned = intern_cache_key(key); + self.invalidate_interned(&interned); + } + + pub(crate) fn invalidate_interned(&self, key: &InternedKey) { let mut inner = self.inner.lock().unwrap(); if let Some(old_w) = inner.pointers.remove(key) { if let Some(e) = inner.entries.get_mut(&old_w) { @@ -423,11 +535,17 @@ impl VectorCache { } pub fn has(&self, key: &CacheKey) -> bool { - self.inner.lock().unwrap().pointers.contains_key(key) + let interned = intern_cache_key(key); + self.inner.lock().unwrap().pointers.contains_key(&interned) } /// What witness currently resolves from this key? `None` if unprimed. pub fn witness_of(&self, key: &CacheKey) -> Option { + let interned = intern_cache_key(key); + self.witness_of_interned(&interned) + } + + pub(crate) fn witness_of_interned(&self, key: &InternedKey) -> Option { self.inner.lock().unwrap().pointers.get(key).cloned() } @@ -449,18 +567,19 @@ impl VectorCache { } pub fn dim_of(&self, key: &CacheKey) -> Option { + let interned = intern_cache_key(key); let inner = self.inner.lock().unwrap(); - let w = inner.pointers.get(key)?; + let w = inner.pointers.get(&interned)?; inner.entries.get(w).map(|e| e.dim) } - pub(crate) fn mark_hit(&self, key: &CacheKey) { + pub(crate) fn mark_hit(&self, key: &InternedKey) { let mut inner = self.inner.lock().unwrap(); inner.stats.hits += 1; inner.per_backend_mut(&key.0).hits += 1; inner.per_collection_mut(key).hits += 1; } - pub(crate) fn mark_miss(&self, key: &CacheKey) { + pub(crate) fn mark_miss(&self, key: &InternedKey) { let mut inner = self.inner.lock().unwrap(); inner.stats.misses += 1; inner.per_backend_mut(&key.0).misses += 1; @@ -489,6 +608,17 @@ impl VectorCache { query: &[f32], k: usize, rerank_factor_override: Option, + ) -> crate::Result> { + let interned = intern_cache_key(key); + self.search_cached_with_rerank_interned(&interned, query, k, rerank_factor_override) + } + + pub(crate) fn search_cached_with_rerank_interned( + &self, + key: &InternedKey, + query: &[f32], + k: usize, + rerank_factor_override: Option, ) -> crate::Result> { // Look up the entry under the lock, clone the Arcs, drop the // lock, then run the scan unlocked. Concurrent readers on @@ -501,14 +631,14 @@ impl VectorCache { .pointers .get(key) .ok_or_else(|| crate::RuLakeError::UnknownCollection { - backend: key.0.clone(), - collection: key.1.clone(), + backend: key.0.as_ref().to_string(), + collection: key.1.as_ref().to_string(), })? .clone(); let entry = inner.entries.get_mut(&witness).ok_or_else(|| { crate::RuLakeError::UnknownCollection { - backend: key.0.clone(), - collection: key.1.clone(), + backend: key.0.as_ref().to_string(), + collection: key.1.as_ref().to_string(), } })?; if query.len() != entry.dim { @@ -551,6 +681,17 @@ impl VectorCache { queries: &[Vec], k: usize, rerank_factor_override: Option, + ) -> crate::Result>> { + let interned = intern_cache_key(key); + self.search_cached_batch_interned(&interned, queries, k, rerank_factor_override) + } + + pub(crate) fn search_cached_batch_interned( + &self, + key: &InternedKey, + queries: &[Vec], + k: usize, + rerank_factor_override: Option, ) -> crate::Result>> { // Lock-once pattern: validate + clone Arcs, drop the mutex, // run the N scans unlocked. Concurrent batches against the @@ -561,14 +702,14 @@ impl VectorCache { .pointers .get(key) .ok_or_else(|| crate::RuLakeError::UnknownCollection { - backend: key.0.clone(), - collection: key.1.clone(), + backend: key.0.as_ref().to_string(), + collection: key.1.as_ref().to_string(), })? .clone(); let entry = inner.entries.get_mut(&witness).ok_or_else(|| { crate::RuLakeError::UnknownCollection { - backend: key.0.clone(), - collection: key.1.clone(), + backend: key.0.as_ref().to_string(), + collection: key.1.as_ref().to_string(), } })?; let dim = entry.dim; @@ -598,11 +739,25 @@ impl VectorCache { } pub fn touch(&self, key: &CacheKey) { + let interned = intern_cache_key(key); + self.touch_interned(&interned); + } + + pub(crate) fn touch_interned(&self, key: &InternedKey) { let mut inner = self.inner.lock().unwrap(); inner.last_checked.insert(key.clone(), Instant::now()); } pub fn can_skip_check(&self, key: &CacheKey, consistency: Consistency) -> bool { + let interned = intern_cache_key(key); + self.can_skip_check_interned(&interned, consistency) + } + + pub(crate) fn can_skip_check_interned( + &self, + key: &InternedKey, + consistency: Consistency, + ) -> bool { match consistency { Consistency::Fresh => false, Consistency::Eventual { ttl_ms } => { diff --git a/crates/ruvector-rulake/src/lake.rs b/crates/ruvector-rulake/src/lake.rs index 32ff54234..b099172f4 100644 --- a/crates/ruvector-rulake/src/lake.rs +++ b/crates/ruvector-rulake/src/lake.rs @@ -406,8 +406,13 @@ impl RuLake { /// pointer, zero prime work. This is the cross-backend share. /// 4. Witness not in the pool → pull + prime. fn ensure_fresh(&self, key: &CacheKey) -> Result<()> { + // Intern once at the entry of the hot path — every downstream + // mark_hit / mark_miss / per_backend_mut call then takes + // refcount-cheap Arc clones instead of cloning the owned + // String tuple. Memory-audit finding #1. + let interned = crate::cache::intern_key(&key.0, &key.1); if self.cache.can_skip_check(key, self.consistency) { - self.cache.mark_hit(key); + self.cache.mark_hit(&interned); return Ok(()); } @@ -421,14 +426,14 @@ impl RuLake { if self.cache.witness_of(key).as_deref() == Some(target_witness.as_str()) { // Case 2: pointer up-to-date. - self.cache.mark_hit(key); + self.cache.mark_hit(&interned); self.cache.touch(key); return Ok(()); } // Cases 3 + 4 are handled in `prime`: it reuses an existing // entry for the target witness if present, or builds a new one. - self.cache.mark_miss(key); + self.cache.mark_miss(&interned); let batch = backend.pull_vectors(&key.1)?; self.cache.prime(key.clone(), target_witness, batch)?; Ok(())