diff --git a/crates/ruvector-rabitq/src/kernel.rs b/crates/ruvector-rabitq/src/kernel.rs new file mode 100644 index 00000000..bd3b37e4 --- /dev/null +++ b/crates/ruvector-rabitq/src/kernel.rs @@ -0,0 +1,205 @@ +//! `VectorKernel` trait — the pluggable execution backend for RaBitQ +//! scan + rerank. Defined here (ADR-157 §"Where each piece lives") +//! because kernels are RaBitQ primitives; the cache is a consumer. +//! +//! Ships with one implementation — `CpuKernel` — which delegates to +//! the existing `RabitqPlusIndex::search_with_rerank`. GPU / SIMD / +//! WASM kernels live in separate crates (`ruvector-rabitq-cuda` etc.) +//! and register themselves with the caller (e.g. `ruvector-rulake`'s +//! dispatcher) as optional accelerators. +//! +//! ## Determinism contract +//! +//! Scan-phase output (top-k by 1-bit Hamming distance) must be +//! bit-reproducible across every kernel. Rerank-phase output (exact +//! L2²) may differ in the last ulp on reduction-order-sensitive +//! kernels (GPU with float reduction reorder); these set +//! `caps().deterministic = false`, and the caller's dispatch policy +//! filters them out of `Consistency::Fresh` / `Consistency::Frozen` +//! paths. +//! +//! The witness chain is NOT recomputed per kernel; it stays anchored +//! on `(data_ref, dim, rotation_seed, rerank_factor, generation)`. +//! Kernel identity is surfaced in caps + stats, not in the witness. + +use crate::index::{AnnIndex, RabitqPlusIndex, SearchResult}; +use crate::RabitqError; + +/// Capability advertisement for a vector kernel. The caller's +/// dispatch policy compares these against the request to pick the +/// best kernel for a given batch + determinism requirement. +#[derive(Debug, Clone)] +pub struct KernelCaps { + /// Symbolic accelerator label: "cpu", "cpu-simd", "cuda", + /// "metal", "rocm", "wasm-simd", etc. Surfaced in stats. + pub accelerator: &'static str, + /// Minimum batch size at which this kernel is ever chosen. CPU + /// kernels report 1; GPU kernels typically ≥ 64. + pub min_batch: usize, + /// Maximum dimensionality the kernel supports without falling + /// back to a slower path. `usize::MAX` means "no constraint". + pub max_dim: usize, + /// Does the kernel produce byte-identical output (scan + rerank) + /// vs the reference CPU kernel? Only deterministic kernels can + /// feed witness-sealed outputs under Fresh/Frozen consistency. + pub deterministic: bool, +} + +impl KernelCaps { + /// Default CPU caps: available always, deterministic, no dim cap. + pub const fn cpu_default() -> Self { + Self { + accelerator: "cpu", + min_batch: 1, + max_dim: usize::MAX, + deterministic: true, + } + } +} + +/// A batch of query vectors against a single index. The index is +/// borrowed by reference so GPU kernels don't need to own its +/// lifetime — the cache holds the authoritative copy. +pub struct ScanRequest<'a> { + pub index: &'a RabitqPlusIndex, + pub queries: &'a [Vec], + pub k: usize, + /// Optional per-call rerank factor. `None` uses the index's stored + /// default. Used by `ruvector-rulake` to divide rerank cost + /// across K shards (ADR-155 federation path). + pub rerank_factor: Option, +} + +/// Batched top-k results, one `Vec` per query. Order +/// matches the input `queries`. +pub type ScanResponse = Vec>; + +/// A vector kernel executes scan + exact rerank for one or more +/// queries against a compressed RaBitQ index. +/// +/// Implementations are stateless w.r.t. the index — they receive it +/// by reference on every call, so a single kernel instance can serve +/// many caches / collections concurrently. Concrete GPU kernels may +/// carry a driver handle or stream object; that's kernel state, not +/// index state. +pub trait VectorKernel: Send + Sync { + /// Stable identifier surfaced in stats + logs. Must be unique per + /// kernel type (e.g. `"cpu"`, `"cuda:0"`, `"metal"`). + fn id(&self) -> &str; + + /// Capability advertisement — what this kernel can do. Return a + /// fresh struct (not a static reference) so kernels can narrow + /// caps at runtime (e.g. GPU-down → `min_batch = usize::MAX`). + fn caps(&self) -> KernelCaps; + + /// Run the scan + rerank for every query in `req`. Returns one + /// `Vec` per query, in the input order. + fn scan(&self, req: ScanRequest<'_>) -> Result; +} + +/// Reference CPU kernel. Wraps `RabitqPlusIndex::search_with_rerank`. +/// Deterministic by construction (integer popcount scan + stable +/// exact L2² rerank with total-order tie break via position). +/// +/// This is the default kernel every consumer gets for free; GPU / +/// SIMD implementations plug in alongside it via registration. +#[derive(Debug, Default, Clone, Copy)] +pub struct CpuKernel; + +impl CpuKernel { + pub const fn new() -> Self { + Self + } +} + +impl VectorKernel for CpuKernel { + fn id(&self) -> &str { + "cpu" + } + + fn caps(&self) -> KernelCaps { + KernelCaps::cpu_default() + } + + fn scan(&self, req: ScanRequest<'_>) -> Result { + let mut out = Vec::with_capacity(req.queries.len()); + for q in req.queries { + let hits = match req.rerank_factor { + None => req.index.search(q, req.k)?, + Some(rf) => req.index.search_with_rerank(q, req.k, rf)?, + }; + out.push(hits); + } + Ok(out) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn tiny_index() -> RabitqPlusIndex { + let d = 8; + let mut idx = RabitqPlusIndex::new(d, 42, 5); + for i in 0..16 { + let v: Vec = (0..d).map(|j| (i + j) as f32).collect(); + idx.add(i, v).unwrap(); + } + idx + } + + #[test] + fn cpu_kernel_matches_direct_search() { + let idx = tiny_index(); + let kernel = CpuKernel::new(); + let q: Vec = vec![2.0; 8]; + let direct = idx.search(&q, 4).unwrap(); + let batched = kernel + .scan(ScanRequest { + index: &idx, + queries: std::slice::from_ref(&q), + k: 4, + rerank_factor: None, + }) + .unwrap(); + assert_eq!(batched.len(), 1); + let batch = &batched[0]; + assert_eq!(batch.len(), direct.len()); + for (a, b) in batch.iter().zip(direct.iter()) { + assert_eq!(a.id, b.id); + assert!((a.score - b.score).abs() < 1e-5); + } + } + + #[test] + fn cpu_kernel_respects_rerank_override() { + let idx = tiny_index(); + let kernel = CpuKernel::new(); + let q: Vec = vec![2.0; 8]; + // Override with a smaller rerank factor — results should still + // be sorted and a prefix of the default. + let out = kernel + .scan(ScanRequest { + index: &idx, + queries: &[q.clone(), q.clone()], + k: 3, + rerank_factor: Some(2), + }) + .unwrap(); + assert_eq!(out.len(), 2, "one result vec per input query"); + for v in &out { + for w in v.windows(2) { + assert!(w[0].score <= w[1].score, "hits must be sorted"); + } + } + } + + #[test] + fn cpu_caps_are_deterministic_and_unbounded() { + let c = CpuKernel::new().caps(); + assert_eq!(c.accelerator, "cpu"); + assert_eq!(c.min_batch, 1); + assert_eq!(c.max_dim, usize::MAX); + assert!(c.deterministic); + } +} diff --git a/crates/ruvector-rabitq/src/lib.rs b/crates/ruvector-rabitq/src/lib.rs index c4e96706..286db5b5 100644 --- a/crates/ruvector-rabitq/src/lib.rs +++ b/crates/ruvector-rabitq/src/lib.rs @@ -44,6 +44,7 @@ pub mod error; pub mod index; +pub mod kernel; pub mod quantize; pub mod rotation; @@ -51,5 +52,6 @@ pub use error::RabitqError; pub use index::{ AnnIndex, FlatF32Index, RabitqAsymIndex, RabitqIndex, RabitqPlusIndex, SearchResult, }; +pub use kernel::{CpuKernel, KernelCaps, ScanRequest, ScanResponse, VectorKernel}; pub use quantize::{pack_bits, unpack_bits, BinaryCode}; pub use rotation::RandomRotation; diff --git a/crates/ruvector-rulake/examples/sidecar_daemon.rs b/crates/ruvector-rulake/examples/sidecar_daemon.rs new file mode 100644 index 00000000..32e98cf0 --- /dev/null +++ b/crates/ruvector-rulake/examples/sidecar_daemon.rs @@ -0,0 +1,134 @@ +//! Minimal cache-sidecar daemon demonstrating the bundle protocol +//! (ADR-155 §Consequences). +//! +//! Watches a publish directory for updates to `table.rulake.json` +//! and calls `RuLake::refresh_from_bundle_dir` to keep the reader +//! cache coherent with whatever the publisher has signed off. +//! +//! Run with: +//! cargo run --release -p ruvector-rulake --example sidecar_daemon +//! +//! The example simulates a full deployment in one process: +//! - A "publisher" backend with a collection of vectors +//! - A "reader" RuLake serving queries against the same collection +//! - The publisher updates the backend and emits a new bundle +//! - The reader's daemon polls, detects the witness change, +//! invalidates, and the next query re-primes automatically +//! +//! A real deployment replaces the polling loop with an inotify +//! watcher or GCS object-change notification; the shape is identical. + +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; + +use ruvector_rulake::{cache::Consistency, LocalBackend, RefreshResult, RuLake}; + +fn main() { + println!("=== ruLake sidecar daemon demo ===\n"); + + // Shared publish directory. In production this would be a GCS + // mount, an S3FS prefix, or an NFS share. + let publish_dir = + std::env::temp_dir().join(format!("rulake-sidecar-demo-{}", std::process::id())); + std::fs::create_dir_all(&publish_dir).unwrap(); + println!("Publish directory: {}", publish_dir.display()); + + // --- Publisher side --- + let backend = Arc::new(LocalBackend::new("publisher")); + backend + .put_collection( + "memories", + 8, + (0..100).collect(), + (0..100) + .map(|i| (0..8).map(|j| (i + j) as f32 * 0.1).collect()) + .collect(), + ) + .unwrap(); + + let publisher_lake = RuLake::new(20, 42); + publisher_lake.register_backend(backend.clone()).unwrap(); + let key = ("publisher".to_string(), "memories".to_string()); + + publisher_lake.publish_bundle(&key, &publish_dir).unwrap(); + println!("[publisher] emitted initial table.rulake.json"); + + // --- Reader side --- + let reader_lake = + Arc::new(RuLake::new(20, 42).with_consistency(Consistency::Eventual { ttl_ms: 60_000 })); + reader_lake.register_backend(backend.clone()).unwrap(); + + // --- Daemon goroutine (polling-based) --- + let daemon_lake = Arc::clone(&reader_lake); + let daemon_dir = publish_dir.clone(); + let daemon_key = key.clone(); + let daemon_stop = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let stop_handle = Arc::clone(&daemon_stop); + let daemon = thread::spawn(move || { + let poll_every = Duration::from_millis(100); + let deadline = Instant::now() + Duration::from_secs(5); + while !stop_handle.load(std::sync::atomic::Ordering::Relaxed) && Instant::now() < deadline { + match daemon_lake.refresh_from_bundle_dir(&daemon_key, &daemon_dir) { + Ok(RefreshResult::Invalidated) => { + println!("[daemon] bundle rotated — cache invalidated"); + } + Ok(RefreshResult::UpToDate) => {} // quiet + Ok(RefreshResult::BundleMissing) => { + eprintln!("[daemon] sidecar missing at publish dir"); + } + Err(e) => eprintln!("[daemon] refresh error: {e}"), + } + thread::sleep(poll_every); + } + }); + + // --- Workload: read a bunch, then publisher mutates, then read more --- + let q = vec![0.5f32; 8]; + + for _ in 0..10 { + let _ = reader_lake + .search_one("publisher", "memories", &q, 3) + .unwrap(); + } + let stats1 = reader_lake.cache_stats(); + println!( + "[reader] after 10 warm queries: hit_rate={:.3} primes={}", + stats1.hit_rate().unwrap_or(0.0), + stats1.primes + ); + + // Publisher side-effect: the backend data changed. Re-publish. + backend.append("memories", 999, vec![9.0; 8]).unwrap(); + publisher_lake.publish_bundle(&key, &publish_dir).unwrap(); + println!("[publisher] mutated backend + re-published bundle"); + + // Give the daemon a few poll cycles to see it. + thread::sleep(Duration::from_millis(300)); + + // Next queries: first one re-primes (daemon dropped the pointer), + // the rest are warm again. + for _ in 0..10 { + let _ = reader_lake + .search_one("publisher", "memories", &q, 3) + .unwrap(); + } + let stats2 = reader_lake.cache_stats(); + println!( + "[reader] after mutation: hit_rate={:.3} primes={} invalidations={}", + stats2.hit_rate().unwrap_or(0.0), + stats2.primes, + stats2.invalidations + ); + + assert!( + stats2.primes > stats1.primes, + "daemon should have triggered a re-prime" + ); + + daemon_stop.store(true, std::sync::atomic::Ordering::Relaxed); + daemon.join().unwrap(); + + let _ = std::fs::remove_dir_all(&publish_dir); + println!("\n✓ daemon + reader stayed coherent through the bundle rotation"); +} diff --git a/crates/ruvector-rulake/src/bundle.rs b/crates/ruvector-rulake/src/bundle.rs index 9fe76fff..ff1808f1 100644 --- a/crates/ruvector-rulake/src/bundle.rs +++ b/crates/ruvector-rulake/src/bundle.rs @@ -119,6 +119,19 @@ pub struct RuLakeBundle { /// OpenLineage job id that produced this bundle. pub lineage_id: Option, + + /// Caller-defined memory-class tag (ADR-156 substrate framing). + /// Agent systems tag bundles with cognitive labels like + /// `"episodic"` / `"semantic"` / `"procedural"` / `"identity"`; + /// ruLake stores the string and surfaces it through bundles and + /// stats but never interprets it. Opaque by design — brain + /// systems own the semantics, substrate owns the persistence. + /// + /// Not part of the witness: changing the memory-class tag on the + /// same vectors does not invalidate a cache entry. Two bundles + /// with identical data but different classes share the cache. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub memory_class: Option, } impl RuLakeBundle { @@ -144,6 +157,7 @@ impl RuLakeBundle { rvf_witness: witness, pii_policy: None, lineage_id: None, + memory_class: None, } } @@ -191,6 +205,13 @@ impl RuLakeBundle { self } + /// Tag this bundle with a memory class (ADR-156). Opaque to + /// ruLake; meaningful to the consuming brain system. + pub fn with_memory_class(mut self, class: impl Into) -> Self { + self.memory_class = Some(class.into()); + self + } + /// Canonical filename for the on-disk bundle sidecar. pub const SIDECAR_FILENAME: &'static str = "table.rulake.json"; @@ -431,6 +452,27 @@ mod tests { let _ = std::fs::remove_dir_all(&dir); } + #[test] + fn memory_class_roundtrips_and_does_not_affect_witness() { + let plain = RuLakeBundle::new("x", 8, 1, 5, Generation::Num(1)); + let tagged = + RuLakeBundle::new("x", 8, 1, 5, Generation::Num(1)).with_memory_class("episodic"); + // Witnesses match — class is not part of the digest. + assert_eq!(plain.rvf_witness, tagged.rvf_witness); + // Serde roundtrip preserves the tag. + let s = tagged.to_json().unwrap(); + let parsed = RuLakeBundle::from_json(&s).unwrap(); + assert_eq!(parsed.memory_class.as_deref(), Some("episodic")); + // Old-format (no memory_class field) still parses. + let old_format = r#"{"format_version":1,"data_ref":"x","dim":8,"rotation_seed":1,"rerank_factor":5,"generation":1,"rvf_witness":""#; + let legacy = format!( + "{}{}\",\"pii_policy\":null,\"lineage_id\":null}}", + old_format, plain.rvf_witness + ); + let loaded = RuLakeBundle::from_json(&legacy).unwrap(); + assert_eq!(loaded.memory_class, None); + } + #[test] fn fs_write_is_atomic_under_crash_simulation() { // Simulate a crash between tmp-create and rename by writing a diff --git a/crates/ruvector-rulake/src/cache.rs b/crates/ruvector-rulake/src/cache.rs index 5c78e785..e23d4d64 100644 --- a/crates/ruvector-rulake/src/cache.rs +++ b/crates/ruvector-rulake/src/cache.rs @@ -185,12 +185,18 @@ struct CacheState { /// Per-backend counters. Populated lazily on the first event per /// backend id. per_backend: HashMap, + /// Per-`(backend, collection)` counters — same events as + /// `per_backend`, attributed one level finer. + per_collection: HashMap, } impl CacheState { fn per_backend_mut(&mut self, backend: &str) -> &mut PerBackendStats { self.per_backend.entry(backend.to_string()).or_default() } + fn per_collection_mut(&mut self, key: &CacheKey) -> &mut PerBackendStats { + self.per_collection.entry(key.clone()).or_default() + } } impl VectorCache { @@ -202,6 +208,7 @@ impl VectorCache { last_checked: HashMap::new(), stats: CacheStats::default(), per_backend: HashMap::new(), + per_collection: HashMap::new(), })), rerank_factor, rotation_seed, @@ -239,6 +246,13 @@ impl VectorCache { self.inner.lock().unwrap().per_backend.clone() } + /// Per-`(backend, collection)` counters. One level finer than + /// `stats_by_backend`: operators who want to know "which + /// collection is hot?" get exact attribution. + pub fn stats_by_collection(&self) -> HashMap { + self.inner.lock().unwrap().per_collection.clone() + } + /// Compress a pulled batch into a RaBitQ index and associate it with /// the target witness. Bookkeeping happens under the lock; the /// heavy O(n·D) compression runs BEFORE acquiring the lock. @@ -286,6 +300,7 @@ impl VectorCache { inner.entries.insert(witness.clone(), entry); inner.stats.primes += 1; inner.per_backend_mut(&key.0).primes += 1; + inner.per_collection_mut(&key).primes += 1; let prime_ms = prime_start.elapsed().as_secs_f64() * 1000.0; inner.stats.total_prime_ms += prime_ms; inner.stats.last_prime_ms = prime_ms; @@ -332,6 +347,7 @@ impl VectorCache { shared: bool, ) -> crate::Result<()> { let backend_id = key.0.clone(); + let key_attrib = key.clone(); // If this key already points somewhere, decrement the old entry. if let Some(old_w) = inner.pointers.remove(&key) { if let Some(e) = inner.entries.get_mut(&old_w) { @@ -340,6 +356,7 @@ impl VectorCache { inner.entries.remove(&old_w); inner.stats.invalidations += 1; inner.per_backend_mut(&backend_id).invalidations += 1; + inner.per_collection_mut(&key_attrib).invalidations += 1; } } } @@ -352,6 +369,7 @@ impl VectorCache { if shared { inner.stats.shared_hits += 1; inner.per_backend_mut(&backend_id).shared_hits += 1; + inner.per_collection_mut(&key_attrib).shared_hits += 1; } Ok(()) } @@ -370,6 +388,7 @@ impl VectorCache { } inner.stats.invalidations += 1; inner.per_backend_mut(&key.0).invalidations += 1; + inner.per_collection_mut(key).invalidations += 1; } inner.last_checked.remove(key); } @@ -410,11 +429,13 @@ impl VectorCache { let mut inner = self.inner.lock().unwrap(); inner.stats.hits += 1; inner.per_backend_mut(&key.0).hits += 1; + inner.per_collection_mut(key).hits += 1; } pub(crate) fn mark_miss(&self, key: &CacheKey) { let mut inner = self.inner.lock().unwrap(); inner.stats.misses += 1; inner.per_backend_mut(&key.0).misses += 1; + inner.per_collection_mut(key).misses += 1; } /// Run the search against the cached entry for `key`. Caller must diff --git a/crates/ruvector-rulake/src/lake.rs b/crates/ruvector-rulake/src/lake.rs index a194bf6f..547ec523 100644 --- a/crates/ruvector-rulake/src/lake.rs +++ b/crates/ruvector-rulake/src/lake.rs @@ -110,6 +110,15 @@ impl RuLake { self.cache.stats_by_backend() } + /// Per-`(backend, collection)` cache stats. One level finer than + /// `cache_stats_by_backend`. Operators use this to identify hot + /// collections for per-collection LRU pinning or eviction. + pub fn cache_stats_by_collection( + &self, + ) -> std::collections::HashMap { + self.cache.stats_by_collection() + } + /// What witness resolves from a `(backend, collection)` pair? /// Useful for diagnostics and cross-backend cache-sharing tests. pub fn cache_witness_of(&self, key: &CacheKey) -> Option { @@ -190,11 +199,21 @@ impl RuLake { } let bundle = crate::RuLakeBundle::read_from_dir(dir)?; let current = self.cache.witness_of(key); - if current.as_deref() == Some(bundle.rvf_witness.as_str()) { - Ok(RefreshResult::UpToDate) - } else { - self.cache.invalidate(key); - Ok(RefreshResult::Invalidated) + match current.as_deref() { + Some(w) if w == bundle.rvf_witness.as_str() => Ok(RefreshResult::UpToDate), + Some(_) => { + // Live cache pointer, but it disagrees with the + // published bundle → rotate it out. + self.cache.invalidate(key); + Ok(RefreshResult::Invalidated) + } + None => { + // Cache pointer is empty. Nothing to invalidate; the + // next search will prime transparently. Report + // UpToDate so daemons don't re-fire for every poll + // between "we invalidated" and "somebody queried". + Ok(RefreshResult::UpToDate) + } } }