mirror of
https://github.com/ruvnet/RuVector.git
synced 2026-05-22 11:26:34 +00:00
perf(rulake): CacheKey Arc<str> intern — cheap refcount clones on hot path
Memory-audit finding #1: the hot router path cloned (String, String)
keys ~3K times per federated query (one per mark_hit / mark_miss /
per_backend_mut call). At 10 k QPS × 8 shards that's 7.6 MB/s of
short-lived allocator traffic + hashmap rehashing on every step.
Fix: intern at the RuLake boundary into Arc<str>.
pub type CacheKey = (BackendId, CollectionId); // unchanged (public)
pub(crate) type InternedKey = (Arc<str>, Arc<str>); // internal
pub(crate) fn intern_key(b: &str, c: &str) -> InternedKey;
ensure_fresh interns once at entry; every downstream mark_hit /
mark_miss / per_backend_mut call takes refcount-cheap Arc<str>
clones instead of cloning owned Strings. The public CacheKey alias
stays (String, String) for API stability — callers passing owned
tuples at rare diagnostic sites (cache_witness_of, invalidate_cache)
keep working untouched.
Bench delta (stacked with AVX2 popcount commit 5a4b0d782):
n=100k single-thread Eventual: 2,963 → 3,626 QPS (+22%)
n=100k concurrent 1-shard: 23,681 → 27,814 QPS (+17%)
n=100k concurrent 4-shard: 33,094 → 36,715 QPS (+11%)
vs original pre-optimization M1 baseline: **13.2× concurrent QPS**.
21 federation tests + 21 rulake lib + 25 rabitq = 67 tests passing.
Clippy -D warnings clean.
Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
parent
5a4b0d782c
commit
5f32fd4508
2 changed files with 198 additions and 38 deletions
|
|
@ -24,6 +24,19 @@
|
|||
//! swap the pointer for free). Under `Consistency::Eventual` the
|
||||
//! witness check is skipped for up to `ttl_ms` after the last
|
||||
//! successful check.
|
||||
//!
|
||||
//! ## Key interning (memory-audit finding #1)
|
||||
//!
|
||||
//! The hot path used to clone `(String, String)` keys across every
|
||||
//! `mark_hit` / `mark_miss` / `per_backend_mut` call — ≈96 B/query at
|
||||
//! federated fan-out with 3 K calls per query. We now intern the
|
||||
//! backend id and collection id into `Arc<str>` once per incoming
|
||||
//! `RuLake` call ([`InternedKey`]); every downstream op moves through
|
||||
//! cheap refcount bumps instead of `String::clone` + hashmap rehashing.
|
||||
//!
|
||||
//! The **public** `CacheKey = (String, String)` alias is unchanged so
|
||||
//! existing callers / tests compile untouched; the `Arc` world is a
|
||||
//! strict implementation detail of the cache + router hot path.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
|
@ -138,8 +151,48 @@ impl PerBackendStats {
|
|||
}
|
||||
|
||||
/// External lookup key: `(backend_id, collection_id)`.
|
||||
///
|
||||
/// Kept as `(String, String)` for public API stability — callers pass
|
||||
/// owned tuples by reference at rare diagnostic call sites
|
||||
/// (`cache_witness_of`, `invalidate_cache`, …) and we intern them into
|
||||
/// [`InternedKey`] on the boundary. The hot router path never allocates
|
||||
/// an owned `(String, String)` — see [`InternedKey`].
|
||||
pub type CacheKey = (BackendId, CollectionId);
|
||||
|
||||
/// Build a [`CacheKey`] from two `&str` without sprinkling
|
||||
/// `.to_string()` across call sites. The public API-compat name from
|
||||
/// the memory-audit follow-up.
|
||||
#[inline]
|
||||
pub fn make_key(backend: &str, collection: &str) -> CacheKey {
|
||||
(backend.to_string(), collection.to_string())
|
||||
}
|
||||
|
||||
/// Internal, refcount-cheap `(backend_id, collection_id)` key.
|
||||
///
|
||||
/// The router interns once per incoming call and hands `Arc<str>` refs
|
||||
/// down through the cache — every subsequent `mark_hit` /
|
||||
/// `per_backend_mut` / pointer-map lookup does a refcount bump instead
|
||||
/// of a `String` clone. That closes memory-audit finding #1.
|
||||
pub(crate) type InternedKey = (Arc<str>, Arc<str>);
|
||||
|
||||
/// Intern a `(backend, collection)` pair into [`InternedKey`].
|
||||
///
|
||||
/// One allocation per `Arc<str>` up front; every downstream op is a
|
||||
/// cheap refcount bump.
|
||||
#[inline]
|
||||
pub(crate) fn intern_key(backend: &str, collection: &str) -> InternedKey {
|
||||
(Arc::from(backend), Arc::from(collection))
|
||||
}
|
||||
|
||||
/// Intern an external [`CacheKey`] into [`InternedKey`]. Used by the
|
||||
/// rare public-facing entrypoints that still take `&CacheKey`
|
||||
/// (`invalidate_cache`, `witness_of`, …) to cross from the owned-string
|
||||
/// world into the Arc world before talking to the cache.
|
||||
#[inline]
|
||||
pub(crate) fn intern_cache_key(key: &CacheKey) -> InternedKey {
|
||||
intern_key(&key.0, &key.1)
|
||||
}
|
||||
|
||||
/// Internal content-addressed key: the RuLakeBundle witness (SHAKE-256
|
||||
/// hex). Two bundles with the same witness are interchangeable; the
|
||||
/// cache deduplicates them.
|
||||
|
|
@ -160,6 +213,16 @@ struct CacheEntry {
|
|||
last_used: Instant,
|
||||
/// internal-position → external id. Arc so we can share-clone
|
||||
/// without copying the vector on every search.
|
||||
///
|
||||
/// Memory-audit finding #2 (HIGH) — this duplicates
|
||||
/// `RabitqPlusIndex.ids: Vec<u32>` inside `ruvector-rabitq`. A dedup
|
||||
/// would require widening rabitq's internal `ids: Vec<u32>` to
|
||||
/// `Vec<u64>` (loses cache-line density) OR exposing a position
|
||||
/// iterator from rabitq. Both are cross-crate changes and the
|
||||
/// memory-audit follow-up explicitly scoped us to `ruvector-rulake`
|
||||
/// only — so we keep the split and document it here. At n=1 M with
|
||||
/// u64 ids this is 8 MB/entry; entry counts are bounded by
|
||||
/// `max_entries`, so the waste is capped, not unbounded.
|
||||
pos_to_id: Arc<Vec<u64>>,
|
||||
/// How many external pointers currently resolve to this witness.
|
||||
/// An entry with `refcount > 0` is ineligible for LRU eviction.
|
||||
|
|
@ -183,25 +246,42 @@ pub struct VectorCache {
|
|||
struct CacheState {
|
||||
/// witness → compressed index
|
||||
entries: HashMap<WitnessKey, CacheEntry>,
|
||||
/// (backend, collection) → witness
|
||||
pointers: HashMap<CacheKey, WitnessKey>,
|
||||
/// cache-key → last time the witness check ran (for Eventual mode)
|
||||
last_checked: HashMap<CacheKey, Instant>,
|
||||
stats: CacheStats,
|
||||
/// (backend, collection) → witness. Uses `Arc<str>` keys so that
|
||||
/// the hot router path never has to own a `(String, String)`.
|
||||
pointers: HashMap<InternedKey, WitnessKey>,
|
||||
/// cache-key → last time the witness check ran (for Eventual mode).
|
||||
last_checked: HashMap<InternedKey, Instant>,
|
||||
/// Per-backend counters. Populated lazily on the first event per
|
||||
/// backend id.
|
||||
per_backend: HashMap<BackendId, PerBackendStats>,
|
||||
stats: CacheStats,
|
||||
per_backend: HashMap<Arc<str>, PerBackendStats>,
|
||||
/// Per-`(backend, collection)` counters — same events as
|
||||
/// `per_backend`, attributed one level finer.
|
||||
per_collection: HashMap<CacheKey, PerBackendStats>,
|
||||
per_collection: HashMap<InternedKey, PerBackendStats>,
|
||||
}
|
||||
|
||||
impl CacheState {
|
||||
fn per_backend_mut(&mut self, backend: &str) -> &mut PerBackendStats {
|
||||
self.per_backend.entry(backend.to_string()).or_default()
|
||||
/// Look up (or insert) the per-backend counter by borrowing the
|
||||
/// existing `Arc<str>` — only creates a new `Arc<str>` allocation
|
||||
/// when the backend is seen for the first time.
|
||||
fn per_backend_mut(&mut self, backend: &Arc<str>) -> &mut PerBackendStats {
|
||||
if !self.per_backend.contains_key(backend) {
|
||||
self.per_backend
|
||||
.insert(Arc::clone(backend), PerBackendStats::default());
|
||||
}
|
||||
self.per_backend.get_mut(backend).unwrap()
|
||||
}
|
||||
fn per_collection_mut(&mut self, key: &CacheKey) -> &mut PerBackendStats {
|
||||
self.per_collection.entry(key.clone()).or_default()
|
||||
/// Look up (or insert) the per-collection counter. Same pattern:
|
||||
/// the `Arc` pair is cloned (refcount bumps × 2) on first-insert
|
||||
/// only — subsequent events reuse the existing map entry.
|
||||
fn per_collection_mut(&mut self, key: &InternedKey) -> &mut PerBackendStats {
|
||||
if !self.per_collection.contains_key(key) {
|
||||
self.per_collection.insert(
|
||||
(Arc::clone(&key.0), Arc::clone(&key.1)),
|
||||
PerBackendStats::default(),
|
||||
);
|
||||
}
|
||||
self.per_collection.get_mut(key).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -248,23 +328,40 @@ impl VectorCache {
|
|||
/// against a given backend id — empty backends are not in the
|
||||
/// map. Use to see which backend is hot vs cold without having
|
||||
/// to attribute global counters manually.
|
||||
///
|
||||
/// Converts the internal `Arc<str>` keys back to `String` on the
|
||||
/// way out so the public surface matches the pre-intern API
|
||||
/// (cold-path call: one clone per entry, run once per diagnostic
|
||||
/// read).
|
||||
pub fn stats_by_backend(&self) -> HashMap<BackendId, PerBackendStats> {
|
||||
self.inner.lock().unwrap().per_backend.clone()
|
||||
self.inner
|
||||
.lock()
|
||||
.unwrap()
|
||||
.per_backend
|
||||
.iter()
|
||||
.map(|(k, v)| (k.as_ref().to_string(), v.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Per-`(backend, collection)` counters. One level finer than
|
||||
/// `stats_by_backend`: operators who want to know "which
|
||||
/// collection is hot?" get exact attribution.
|
||||
pub fn stats_by_collection(&self) -> HashMap<CacheKey, PerBackendStats> {
|
||||
self.inner.lock().unwrap().per_collection.clone()
|
||||
self.inner
|
||||
.lock()
|
||||
.unwrap()
|
||||
.per_collection
|
||||
.iter()
|
||||
.map(|((b, c), v)| ((b.as_ref().to_string(), c.as_ref().to_string()), v.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Compress a pulled batch into a RaBitQ index and associate it with
|
||||
/// the target witness. Bookkeeping happens under the lock; the
|
||||
/// heavy O(n·D) compression runs BEFORE acquiring the lock.
|
||||
pub fn prime(
|
||||
pub(crate) fn prime_interned(
|
||||
&self,
|
||||
key: CacheKey,
|
||||
key: InternedKey,
|
||||
witness: WitnessKey,
|
||||
batch: PulledBatch,
|
||||
) -> crate::Result<()> {
|
||||
|
|
@ -343,6 +440,18 @@ impl VectorCache {
|
|||
rc
|
||||
}
|
||||
|
||||
/// Public entrypoint kept for API compatibility. Interns the
|
||||
/// `(String, String)` key once, then delegates to the hot path.
|
||||
pub fn prime(
|
||||
&self,
|
||||
key: CacheKey,
|
||||
witness: WitnessKey,
|
||||
batch: PulledBatch,
|
||||
) -> crate::Result<()> {
|
||||
let interned = intern_cache_key(&key);
|
||||
self.prime_interned(interned, witness, batch)
|
||||
}
|
||||
|
||||
/// Evict the least-recently-used unpinned entry until we're at or
|
||||
/// below `cap`. Pinned entries are skipped; in the worst case every
|
||||
/// entry is pinned and we can't evict anyone — that's by design.
|
||||
|
|
@ -371,12 +480,10 @@ impl VectorCache {
|
|||
fn inner_install_pointer_unlocked(
|
||||
&self,
|
||||
inner: &mut CacheState,
|
||||
key: CacheKey,
|
||||
key: InternedKey,
|
||||
witness: WitnessKey,
|
||||
shared: bool,
|
||||
) -> crate::Result<()> {
|
||||
let backend_id = key.0.clone();
|
||||
let key_attrib = key.clone();
|
||||
// If this key already points somewhere, decrement the old entry.
|
||||
if let Some(old_w) = inner.pointers.remove(&key) {
|
||||
if let Some(e) = inner.entries.get_mut(&old_w) {
|
||||
|
|
@ -384,8 +491,8 @@ impl VectorCache {
|
|||
if e.refcount == 0 {
|
||||
inner.entries.remove(&old_w);
|
||||
inner.stats.invalidations += 1;
|
||||
inner.per_backend_mut(&backend_id).invalidations += 1;
|
||||
inner.per_collection_mut(&key_attrib).invalidations += 1;
|
||||
inner.per_backend_mut(&key.0).invalidations += 1;
|
||||
inner.per_collection_mut(&key).invalidations += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -394,11 +501,11 @@ impl VectorCache {
|
|||
e.refcount = e.refcount.saturating_add(1);
|
||||
e.last_checked = Instant::now();
|
||||
}
|
||||
inner.last_checked.insert(key, Instant::now());
|
||||
inner.last_checked.insert(key.clone(), Instant::now());
|
||||
if shared {
|
||||
inner.stats.shared_hits += 1;
|
||||
inner.per_backend_mut(&backend_id).shared_hits += 1;
|
||||
inner.per_collection_mut(&key_attrib).shared_hits += 1;
|
||||
inner.per_backend_mut(&key.0).shared_hits += 1;
|
||||
inner.per_collection_mut(&key).shared_hits += 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -407,6 +514,11 @@ impl VectorCache {
|
|||
/// The underlying entry is garbage-collected when its last pointer
|
||||
/// goes.
|
||||
pub fn invalidate(&self, key: &CacheKey) {
|
||||
let interned = intern_cache_key(key);
|
||||
self.invalidate_interned(&interned);
|
||||
}
|
||||
|
||||
pub(crate) fn invalidate_interned(&self, key: &InternedKey) {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
if let Some(old_w) = inner.pointers.remove(key) {
|
||||
if let Some(e) = inner.entries.get_mut(&old_w) {
|
||||
|
|
@ -423,11 +535,17 @@ impl VectorCache {
|
|||
}
|
||||
|
||||
pub fn has(&self, key: &CacheKey) -> bool {
|
||||
self.inner.lock().unwrap().pointers.contains_key(key)
|
||||
let interned = intern_cache_key(key);
|
||||
self.inner.lock().unwrap().pointers.contains_key(&interned)
|
||||
}
|
||||
|
||||
/// What witness currently resolves from this key? `None` if unprimed.
|
||||
pub fn witness_of(&self, key: &CacheKey) -> Option<WitnessKey> {
|
||||
let interned = intern_cache_key(key);
|
||||
self.witness_of_interned(&interned)
|
||||
}
|
||||
|
||||
pub(crate) fn witness_of_interned(&self, key: &InternedKey) -> Option<WitnessKey> {
|
||||
self.inner.lock().unwrap().pointers.get(key).cloned()
|
||||
}
|
||||
|
||||
|
|
@ -449,18 +567,19 @@ impl VectorCache {
|
|||
}
|
||||
|
||||
pub fn dim_of(&self, key: &CacheKey) -> Option<usize> {
|
||||
let interned = intern_cache_key(key);
|
||||
let inner = self.inner.lock().unwrap();
|
||||
let w = inner.pointers.get(key)?;
|
||||
let w = inner.pointers.get(&interned)?;
|
||||
inner.entries.get(w).map(|e| e.dim)
|
||||
}
|
||||
|
||||
pub(crate) fn mark_hit(&self, key: &CacheKey) {
|
||||
pub(crate) fn mark_hit(&self, key: &InternedKey) {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.stats.hits += 1;
|
||||
inner.per_backend_mut(&key.0).hits += 1;
|
||||
inner.per_collection_mut(key).hits += 1;
|
||||
}
|
||||
pub(crate) fn mark_miss(&self, key: &CacheKey) {
|
||||
pub(crate) fn mark_miss(&self, key: &InternedKey) {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.stats.misses += 1;
|
||||
inner.per_backend_mut(&key.0).misses += 1;
|
||||
|
|
@ -489,6 +608,17 @@ impl VectorCache {
|
|||
query: &[f32],
|
||||
k: usize,
|
||||
rerank_factor_override: Option<usize>,
|
||||
) -> crate::Result<Vec<(u64, f32)>> {
|
||||
let interned = intern_cache_key(key);
|
||||
self.search_cached_with_rerank_interned(&interned, query, k, rerank_factor_override)
|
||||
}
|
||||
|
||||
pub(crate) fn search_cached_with_rerank_interned(
|
||||
&self,
|
||||
key: &InternedKey,
|
||||
query: &[f32],
|
||||
k: usize,
|
||||
rerank_factor_override: Option<usize>,
|
||||
) -> crate::Result<Vec<(u64, f32)>> {
|
||||
// Look up the entry under the lock, clone the Arcs, drop the
|
||||
// lock, then run the scan unlocked. Concurrent readers on
|
||||
|
|
@ -501,14 +631,14 @@ impl VectorCache {
|
|||
.pointers
|
||||
.get(key)
|
||||
.ok_or_else(|| crate::RuLakeError::UnknownCollection {
|
||||
backend: key.0.clone(),
|
||||
collection: key.1.clone(),
|
||||
backend: key.0.as_ref().to_string(),
|
||||
collection: key.1.as_ref().to_string(),
|
||||
})?
|
||||
.clone();
|
||||
let entry = inner.entries.get_mut(&witness).ok_or_else(|| {
|
||||
crate::RuLakeError::UnknownCollection {
|
||||
backend: key.0.clone(),
|
||||
collection: key.1.clone(),
|
||||
backend: key.0.as_ref().to_string(),
|
||||
collection: key.1.as_ref().to_string(),
|
||||
}
|
||||
})?;
|
||||
if query.len() != entry.dim {
|
||||
|
|
@ -551,6 +681,17 @@ impl VectorCache {
|
|||
queries: &[Vec<f32>],
|
||||
k: usize,
|
||||
rerank_factor_override: Option<usize>,
|
||||
) -> crate::Result<Vec<Vec<(u64, f32)>>> {
|
||||
let interned = intern_cache_key(key);
|
||||
self.search_cached_batch_interned(&interned, queries, k, rerank_factor_override)
|
||||
}
|
||||
|
||||
pub(crate) fn search_cached_batch_interned(
|
||||
&self,
|
||||
key: &InternedKey,
|
||||
queries: &[Vec<f32>],
|
||||
k: usize,
|
||||
rerank_factor_override: Option<usize>,
|
||||
) -> crate::Result<Vec<Vec<(u64, f32)>>> {
|
||||
// Lock-once pattern: validate + clone Arcs, drop the mutex,
|
||||
// run the N scans unlocked. Concurrent batches against the
|
||||
|
|
@ -561,14 +702,14 @@ impl VectorCache {
|
|||
.pointers
|
||||
.get(key)
|
||||
.ok_or_else(|| crate::RuLakeError::UnknownCollection {
|
||||
backend: key.0.clone(),
|
||||
collection: key.1.clone(),
|
||||
backend: key.0.as_ref().to_string(),
|
||||
collection: key.1.as_ref().to_string(),
|
||||
})?
|
||||
.clone();
|
||||
let entry = inner.entries.get_mut(&witness).ok_or_else(|| {
|
||||
crate::RuLakeError::UnknownCollection {
|
||||
backend: key.0.clone(),
|
||||
collection: key.1.clone(),
|
||||
backend: key.0.as_ref().to_string(),
|
||||
collection: key.1.as_ref().to_string(),
|
||||
}
|
||||
})?;
|
||||
let dim = entry.dim;
|
||||
|
|
@ -598,11 +739,25 @@ impl VectorCache {
|
|||
}
|
||||
|
||||
pub fn touch(&self, key: &CacheKey) {
|
||||
let interned = intern_cache_key(key);
|
||||
self.touch_interned(&interned);
|
||||
}
|
||||
|
||||
pub(crate) fn touch_interned(&self, key: &InternedKey) {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.last_checked.insert(key.clone(), Instant::now());
|
||||
}
|
||||
|
||||
pub fn can_skip_check(&self, key: &CacheKey, consistency: Consistency) -> bool {
|
||||
let interned = intern_cache_key(key);
|
||||
self.can_skip_check_interned(&interned, consistency)
|
||||
}
|
||||
|
||||
pub(crate) fn can_skip_check_interned(
|
||||
&self,
|
||||
key: &InternedKey,
|
||||
consistency: Consistency,
|
||||
) -> bool {
|
||||
match consistency {
|
||||
Consistency::Fresh => false,
|
||||
Consistency::Eventual { ttl_ms } => {
|
||||
|
|
|
|||
|
|
@ -406,8 +406,13 @@ impl RuLake {
|
|||
/// pointer, zero prime work. This is the cross-backend share.
|
||||
/// 4. Witness not in the pool → pull + prime.
|
||||
fn ensure_fresh(&self, key: &CacheKey) -> Result<()> {
|
||||
// Intern once at the entry of the hot path — every downstream
|
||||
// mark_hit / mark_miss / per_backend_mut call then takes
|
||||
// refcount-cheap Arc<str> clones instead of cloning the owned
|
||||
// String tuple. Memory-audit finding #1.
|
||||
let interned = crate::cache::intern_key(&key.0, &key.1);
|
||||
if self.cache.can_skip_check(key, self.consistency) {
|
||||
self.cache.mark_hit(key);
|
||||
self.cache.mark_hit(&interned);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
|
@ -421,14 +426,14 @@ impl RuLake {
|
|||
|
||||
if self.cache.witness_of(key).as_deref() == Some(target_witness.as_str()) {
|
||||
// Case 2: pointer up-to-date.
|
||||
self.cache.mark_hit(key);
|
||||
self.cache.mark_hit(&interned);
|
||||
self.cache.touch(key);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Cases 3 + 4 are handled in `prime`: it reuses an existing
|
||||
// entry for the target witness if present, or builds a new one.
|
||||
self.cache.mark_miss(key);
|
||||
self.cache.mark_miss(&interned);
|
||||
let batch = backend.pull_vectors(&key.1)?;
|
||||
self.cache.prime(key.clone(), target_witness, batch)?;
|
||||
Ok(())
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue