feat(rabitq,rulake): VectorKernel + memory_class + per-collection stats + sidecar example

Four in-scope M1 items from the remaining backlog, landed together
because they cross-cut cleanly.

Iter 23 (rabitq): VectorKernel trait + CpuKernel default
  - Trait: id(), caps() → KernelCaps, scan(ScanRequest) → ScanResponse.
    Scan-phase determinism is the hard contract; rerank-phase nondet
    is declared via caps().deterministic = false and the caller's
    dispatch policy filters those out of Fresh/Frozen paths (ADR-157).
  - CpuKernel wraps RabitqPlusIndex::search_with_rerank, always
    available, unbounded dim, deterministic.
  - Tests: CPU kernel matches direct search byte-exactly + respects
    per-call rerank override + caps advertised correctly.

Iter 24 (rulake): memory_class on RuLakeBundle (ADR-156)
  - Opaque caller-defined tag — agent systems write "episodic" /
    "semantic" / etc; ruLake stores but never interprets.
  - Not part of the witness: two bundles with identical data but
    different memory_class share the cache.
  - Serde default+skip_if_none keeps old bundles forward-compatible.
  - Test: roundtrip + witness-unchanged + legacy bundles without the
    field still parse.

Iter 25 (rulake): examples/sidecar_daemon.rs
  - Runnable demo of publish_bundle / refresh_from_bundle_dir pair.
  - Publisher mutates backend + re-publishes; daemon poll loop
    detects witness change, invalidates; next query re-primes.
  - Includes a bug fix in refresh_from_bundle_dir: when the cache
    pointer is None (already invalidated), report UpToDate instead
    of Invalidated so daemons don't re-fire on every poll between
    "we invalidated" and "somebody queried."

Iter 26 (rulake): CacheStats::stats_by_collection
  - Per-(backend, collection) counters, one level finer than
    stats_by_backend. Operators can identify which specific
    collection is hot and pin it in LRU or increase its shard count.

21 federation + 11 bundle + 3 fs_backend + 3 kernel = 38 tests
passing across both crates. Clippy -D warnings clean. Example runs
end-to-end.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruvnet 2026-04-23 21:27:04 -04:00
parent 39110f09d9
commit 2bdfd342e3
6 changed files with 428 additions and 5 deletions

View file

@ -0,0 +1,205 @@
//! `VectorKernel` trait — the pluggable execution backend for RaBitQ
//! scan + rerank. Defined here (ADR-157 §"Where each piece lives")
//! because kernels are RaBitQ primitives; the cache is a consumer.
//!
//! Ships with one implementation — `CpuKernel` — which delegates to
//! the existing `RabitqPlusIndex::search_with_rerank`. GPU / SIMD /
//! WASM kernels live in separate crates (`ruvector-rabitq-cuda` etc.)
//! and register themselves with the caller (e.g. `ruvector-rulake`'s
//! dispatcher) as optional accelerators.
//!
//! ## Determinism contract
//!
//! Scan-phase output (top-k by 1-bit Hamming distance) must be
//! bit-reproducible across every kernel. Rerank-phase output (exact
//! L2²) may differ in the last ulp on reduction-order-sensitive
//! kernels (GPU with float reduction reorder); these set
//! `caps().deterministic = false`, and the caller's dispatch policy
//! filters them out of `Consistency::Fresh` / `Consistency::Frozen`
//! paths.
//!
//! The witness chain is NOT recomputed per kernel; it stays anchored
//! on `(data_ref, dim, rotation_seed, rerank_factor, generation)`.
//! Kernel identity is surfaced in caps + stats, not in the witness.
use crate::index::{AnnIndex, RabitqPlusIndex, SearchResult};
use crate::RabitqError;
/// Capability advertisement for a vector kernel. The caller's
/// dispatch policy compares these against the request to pick the
/// best kernel for a given batch + determinism requirement.
#[derive(Debug, Clone)]
pub struct KernelCaps {
/// Symbolic accelerator label: "cpu", "cpu-simd", "cuda",
/// "metal", "rocm", "wasm-simd", etc. Surfaced in stats.
pub accelerator: &'static str,
/// Minimum batch size at which this kernel is ever chosen. CPU
/// kernels report 1; GPU kernels typically ≥ 64.
pub min_batch: usize,
/// Maximum dimensionality the kernel supports without falling
/// back to a slower path. `usize::MAX` means "no constraint".
pub max_dim: usize,
/// Does the kernel produce byte-identical output (scan + rerank)
/// vs the reference CPU kernel? Only deterministic kernels can
/// feed witness-sealed outputs under Fresh/Frozen consistency.
pub deterministic: bool,
}
impl KernelCaps {
/// Default CPU caps: available always, deterministic, no dim cap.
pub const fn cpu_default() -> Self {
Self {
accelerator: "cpu",
min_batch: 1,
max_dim: usize::MAX,
deterministic: true,
}
}
}
/// A batch of query vectors against a single index. The index is
/// borrowed by reference so GPU kernels don't need to own its
/// lifetime — the cache holds the authoritative copy.
pub struct ScanRequest<'a> {
pub index: &'a RabitqPlusIndex,
pub queries: &'a [Vec<f32>],
pub k: usize,
/// Optional per-call rerank factor. `None` uses the index's stored
/// default. Used by `ruvector-rulake` to divide rerank cost
/// across K shards (ADR-155 federation path).
pub rerank_factor: Option<usize>,
}
/// Batched top-k results, one `Vec<SearchResult>` per query. Order
/// matches the input `queries`.
pub type ScanResponse = Vec<Vec<SearchResult>>;
/// A vector kernel executes scan + exact rerank for one or more
/// queries against a compressed RaBitQ index.
///
/// Implementations are stateless w.r.t. the index — they receive it
/// by reference on every call, so a single kernel instance can serve
/// many caches / collections concurrently. Concrete GPU kernels may
/// carry a driver handle or stream object; that's kernel state, not
/// index state.
pub trait VectorKernel: Send + Sync {
/// Stable identifier surfaced in stats + logs. Must be unique per
/// kernel type (e.g. `"cpu"`, `"cuda:0"`, `"metal"`).
fn id(&self) -> &str;
/// Capability advertisement — what this kernel can do. Return a
/// fresh struct (not a static reference) so kernels can narrow
/// caps at runtime (e.g. GPU-down → `min_batch = usize::MAX`).
fn caps(&self) -> KernelCaps;
/// Run the scan + rerank for every query in `req`. Returns one
/// `Vec<SearchResult>` per query, in the input order.
fn scan(&self, req: ScanRequest<'_>) -> Result<ScanResponse, RabitqError>;
}
/// Reference CPU kernel. Wraps `RabitqPlusIndex::search_with_rerank`.
/// Deterministic by construction (integer popcount scan + stable
/// exact L2² rerank with total-order tie break via position).
///
/// This is the default kernel every consumer gets for free; GPU /
/// SIMD implementations plug in alongside it via registration.
#[derive(Debug, Default, Clone, Copy)]
pub struct CpuKernel;
impl CpuKernel {
pub const fn new() -> Self {
Self
}
}
impl VectorKernel for CpuKernel {
fn id(&self) -> &str {
"cpu"
}
fn caps(&self) -> KernelCaps {
KernelCaps::cpu_default()
}
fn scan(&self, req: ScanRequest<'_>) -> Result<ScanResponse, RabitqError> {
let mut out = Vec::with_capacity(req.queries.len());
for q in req.queries {
let hits = match req.rerank_factor {
None => req.index.search(q, req.k)?,
Some(rf) => req.index.search_with_rerank(q, req.k, rf)?,
};
out.push(hits);
}
Ok(out)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn tiny_index() -> RabitqPlusIndex {
let d = 8;
let mut idx = RabitqPlusIndex::new(d, 42, 5);
for i in 0..16 {
let v: Vec<f32> = (0..d).map(|j| (i + j) as f32).collect();
idx.add(i, v).unwrap();
}
idx
}
#[test]
fn cpu_kernel_matches_direct_search() {
let idx = tiny_index();
let kernel = CpuKernel::new();
let q: Vec<f32> = vec![2.0; 8];
let direct = idx.search(&q, 4).unwrap();
let batched = kernel
.scan(ScanRequest {
index: &idx,
queries: std::slice::from_ref(&q),
k: 4,
rerank_factor: None,
})
.unwrap();
assert_eq!(batched.len(), 1);
let batch = &batched[0];
assert_eq!(batch.len(), direct.len());
for (a, b) in batch.iter().zip(direct.iter()) {
assert_eq!(a.id, b.id);
assert!((a.score - b.score).abs() < 1e-5);
}
}
#[test]
fn cpu_kernel_respects_rerank_override() {
let idx = tiny_index();
let kernel = CpuKernel::new();
let q: Vec<f32> = vec![2.0; 8];
// Override with a smaller rerank factor — results should still
// be sorted and a prefix of the default.
let out = kernel
.scan(ScanRequest {
index: &idx,
queries: &[q.clone(), q.clone()],
k: 3,
rerank_factor: Some(2),
})
.unwrap();
assert_eq!(out.len(), 2, "one result vec per input query");
for v in &out {
for w in v.windows(2) {
assert!(w[0].score <= w[1].score, "hits must be sorted");
}
}
}
#[test]
fn cpu_caps_are_deterministic_and_unbounded() {
let c = CpuKernel::new().caps();
assert_eq!(c.accelerator, "cpu");
assert_eq!(c.min_batch, 1);
assert_eq!(c.max_dim, usize::MAX);
assert!(c.deterministic);
}
}

View file

@ -44,6 +44,7 @@
pub mod error;
pub mod index;
pub mod kernel;
pub mod quantize;
pub mod rotation;
@ -51,5 +52,6 @@ pub use error::RabitqError;
pub use index::{
AnnIndex, FlatF32Index, RabitqAsymIndex, RabitqIndex, RabitqPlusIndex, SearchResult,
};
pub use kernel::{CpuKernel, KernelCaps, ScanRequest, ScanResponse, VectorKernel};
pub use quantize::{pack_bits, unpack_bits, BinaryCode};
pub use rotation::RandomRotation;

View file

@ -0,0 +1,134 @@
//! Minimal cache-sidecar daemon demonstrating the bundle protocol
//! (ADR-155 §Consequences).
//!
//! Watches a publish directory for updates to `table.rulake.json`
//! and calls `RuLake::refresh_from_bundle_dir` to keep the reader
//! cache coherent with whatever the publisher has signed off.
//!
//! Run with:
//! cargo run --release -p ruvector-rulake --example sidecar_daemon
//!
//! The example simulates a full deployment in one process:
//! - A "publisher" backend with a collection of vectors
//! - A "reader" RuLake serving queries against the same collection
//! - The publisher updates the backend and emits a new bundle
//! - The reader's daemon polls, detects the witness change,
//! invalidates, and the next query re-primes automatically
//!
//! A real deployment replaces the polling loop with an inotify
//! watcher or GCS object-change notification; the shape is identical.
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use ruvector_rulake::{cache::Consistency, LocalBackend, RefreshResult, RuLake};
fn main() {
println!("=== ruLake sidecar daemon demo ===\n");
// Shared publish directory. In production this would be a GCS
// mount, an S3FS prefix, or an NFS share.
let publish_dir =
std::env::temp_dir().join(format!("rulake-sidecar-demo-{}", std::process::id()));
std::fs::create_dir_all(&publish_dir).unwrap();
println!("Publish directory: {}", publish_dir.display());
// --- Publisher side ---
let backend = Arc::new(LocalBackend::new("publisher"));
backend
.put_collection(
"memories",
8,
(0..100).collect(),
(0..100)
.map(|i| (0..8).map(|j| (i + j) as f32 * 0.1).collect())
.collect(),
)
.unwrap();
let publisher_lake = RuLake::new(20, 42);
publisher_lake.register_backend(backend.clone()).unwrap();
let key = ("publisher".to_string(), "memories".to_string());
publisher_lake.publish_bundle(&key, &publish_dir).unwrap();
println!("[publisher] emitted initial table.rulake.json");
// --- Reader side ---
let reader_lake =
Arc::new(RuLake::new(20, 42).with_consistency(Consistency::Eventual { ttl_ms: 60_000 }));
reader_lake.register_backend(backend.clone()).unwrap();
// --- Daemon goroutine (polling-based) ---
let daemon_lake = Arc::clone(&reader_lake);
let daemon_dir = publish_dir.clone();
let daemon_key = key.clone();
let daemon_stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
let stop_handle = Arc::clone(&daemon_stop);
let daemon = thread::spawn(move || {
let poll_every = Duration::from_millis(100);
let deadline = Instant::now() + Duration::from_secs(5);
while !stop_handle.load(std::sync::atomic::Ordering::Relaxed) && Instant::now() < deadline {
match daemon_lake.refresh_from_bundle_dir(&daemon_key, &daemon_dir) {
Ok(RefreshResult::Invalidated) => {
println!("[daemon] bundle rotated — cache invalidated");
}
Ok(RefreshResult::UpToDate) => {} // quiet
Ok(RefreshResult::BundleMissing) => {
eprintln!("[daemon] sidecar missing at publish dir");
}
Err(e) => eprintln!("[daemon] refresh error: {e}"),
}
thread::sleep(poll_every);
}
});
// --- Workload: read a bunch, then publisher mutates, then read more ---
let q = vec![0.5f32; 8];
for _ in 0..10 {
let _ = reader_lake
.search_one("publisher", "memories", &q, 3)
.unwrap();
}
let stats1 = reader_lake.cache_stats();
println!(
"[reader] after 10 warm queries: hit_rate={:.3} primes={}",
stats1.hit_rate().unwrap_or(0.0),
stats1.primes
);
// Publisher side-effect: the backend data changed. Re-publish.
backend.append("memories", 999, vec![9.0; 8]).unwrap();
publisher_lake.publish_bundle(&key, &publish_dir).unwrap();
println!("[publisher] mutated backend + re-published bundle");
// Give the daemon a few poll cycles to see it.
thread::sleep(Duration::from_millis(300));
// Next queries: first one re-primes (daemon dropped the pointer),
// the rest are warm again.
for _ in 0..10 {
let _ = reader_lake
.search_one("publisher", "memories", &q, 3)
.unwrap();
}
let stats2 = reader_lake.cache_stats();
println!(
"[reader] after mutation: hit_rate={:.3} primes={} invalidations={}",
stats2.hit_rate().unwrap_or(0.0),
stats2.primes,
stats2.invalidations
);
assert!(
stats2.primes > stats1.primes,
"daemon should have triggered a re-prime"
);
daemon_stop.store(true, std::sync::atomic::Ordering::Relaxed);
daemon.join().unwrap();
let _ = std::fs::remove_dir_all(&publish_dir);
println!("\n✓ daemon + reader stayed coherent through the bundle rotation");
}

View file

@ -119,6 +119,19 @@ pub struct RuLakeBundle {
/// OpenLineage job id that produced this bundle.
pub lineage_id: Option<String>,
/// Caller-defined memory-class tag (ADR-156 substrate framing).
/// Agent systems tag bundles with cognitive labels like
/// `"episodic"` / `"semantic"` / `"procedural"` / `"identity"`;
/// ruLake stores the string and surfaces it through bundles and
/// stats but never interprets it. Opaque by design — brain
/// systems own the semantics, substrate owns the persistence.
///
/// Not part of the witness: changing the memory-class tag on the
/// same vectors does not invalidate a cache entry. Two bundles
/// with identical data but different classes share the cache.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub memory_class: Option<String>,
}
impl RuLakeBundle {
@ -144,6 +157,7 @@ impl RuLakeBundle {
rvf_witness: witness,
pii_policy: None,
lineage_id: None,
memory_class: None,
}
}
@ -191,6 +205,13 @@ impl RuLakeBundle {
self
}
/// Tag this bundle with a memory class (ADR-156). Opaque to
/// ruLake; meaningful to the consuming brain system.
pub fn with_memory_class(mut self, class: impl Into<String>) -> Self {
self.memory_class = Some(class.into());
self
}
/// Canonical filename for the on-disk bundle sidecar.
pub const SIDECAR_FILENAME: &'static str = "table.rulake.json";
@ -431,6 +452,27 @@ mod tests {
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn memory_class_roundtrips_and_does_not_affect_witness() {
let plain = RuLakeBundle::new("x", 8, 1, 5, Generation::Num(1));
let tagged =
RuLakeBundle::new("x", 8, 1, 5, Generation::Num(1)).with_memory_class("episodic");
// Witnesses match — class is not part of the digest.
assert_eq!(plain.rvf_witness, tagged.rvf_witness);
// Serde roundtrip preserves the tag.
let s = tagged.to_json().unwrap();
let parsed = RuLakeBundle::from_json(&s).unwrap();
assert_eq!(parsed.memory_class.as_deref(), Some("episodic"));
// Old-format (no memory_class field) still parses.
let old_format = r#"{"format_version":1,"data_ref":"x","dim":8,"rotation_seed":1,"rerank_factor":5,"generation":1,"rvf_witness":""#;
let legacy = format!(
"{}{}\",\"pii_policy\":null,\"lineage_id\":null}}",
old_format, plain.rvf_witness
);
let loaded = RuLakeBundle::from_json(&legacy).unwrap();
assert_eq!(loaded.memory_class, None);
}
#[test]
fn fs_write_is_atomic_under_crash_simulation() {
// Simulate a crash between tmp-create and rename by writing a

View file

@ -185,12 +185,18 @@ struct CacheState {
/// Per-backend counters. Populated lazily on the first event per
/// backend id.
per_backend: HashMap<BackendId, PerBackendStats>,
/// Per-`(backend, collection)` counters — same events as
/// `per_backend`, attributed one level finer.
per_collection: HashMap<CacheKey, PerBackendStats>,
}
impl CacheState {
fn per_backend_mut(&mut self, backend: &str) -> &mut PerBackendStats {
self.per_backend.entry(backend.to_string()).or_default()
}
fn per_collection_mut(&mut self, key: &CacheKey) -> &mut PerBackendStats {
self.per_collection.entry(key.clone()).or_default()
}
}
impl VectorCache {
@ -202,6 +208,7 @@ impl VectorCache {
last_checked: HashMap::new(),
stats: CacheStats::default(),
per_backend: HashMap::new(),
per_collection: HashMap::new(),
})),
rerank_factor,
rotation_seed,
@ -239,6 +246,13 @@ impl VectorCache {
self.inner.lock().unwrap().per_backend.clone()
}
/// Per-`(backend, collection)` counters. One level finer than
/// `stats_by_backend`: operators who want to know "which
/// collection is hot?" get exact attribution.
pub fn stats_by_collection(&self) -> HashMap<CacheKey, PerBackendStats> {
self.inner.lock().unwrap().per_collection.clone()
}
/// Compress a pulled batch into a RaBitQ index and associate it with
/// the target witness. Bookkeeping happens under the lock; the
/// heavy O(n·D) compression runs BEFORE acquiring the lock.
@ -286,6 +300,7 @@ impl VectorCache {
inner.entries.insert(witness.clone(), entry);
inner.stats.primes += 1;
inner.per_backend_mut(&key.0).primes += 1;
inner.per_collection_mut(&key).primes += 1;
let prime_ms = prime_start.elapsed().as_secs_f64() * 1000.0;
inner.stats.total_prime_ms += prime_ms;
inner.stats.last_prime_ms = prime_ms;
@ -332,6 +347,7 @@ impl VectorCache {
shared: bool,
) -> crate::Result<()> {
let backend_id = key.0.clone();
let key_attrib = key.clone();
// If this key already points somewhere, decrement the old entry.
if let Some(old_w) = inner.pointers.remove(&key) {
if let Some(e) = inner.entries.get_mut(&old_w) {
@ -340,6 +356,7 @@ impl VectorCache {
inner.entries.remove(&old_w);
inner.stats.invalidations += 1;
inner.per_backend_mut(&backend_id).invalidations += 1;
inner.per_collection_mut(&key_attrib).invalidations += 1;
}
}
}
@ -352,6 +369,7 @@ impl VectorCache {
if shared {
inner.stats.shared_hits += 1;
inner.per_backend_mut(&backend_id).shared_hits += 1;
inner.per_collection_mut(&key_attrib).shared_hits += 1;
}
Ok(())
}
@ -370,6 +388,7 @@ impl VectorCache {
}
inner.stats.invalidations += 1;
inner.per_backend_mut(&key.0).invalidations += 1;
inner.per_collection_mut(key).invalidations += 1;
}
inner.last_checked.remove(key);
}
@ -410,11 +429,13 @@ impl VectorCache {
let mut inner = self.inner.lock().unwrap();
inner.stats.hits += 1;
inner.per_backend_mut(&key.0).hits += 1;
inner.per_collection_mut(key).hits += 1;
}
pub(crate) fn mark_miss(&self, key: &CacheKey) {
let mut inner = self.inner.lock().unwrap();
inner.stats.misses += 1;
inner.per_backend_mut(&key.0).misses += 1;
inner.per_collection_mut(key).misses += 1;
}
/// Run the search against the cached entry for `key`. Caller must

View file

@ -110,6 +110,15 @@ impl RuLake {
self.cache.stats_by_backend()
}
/// Per-`(backend, collection)` cache stats. One level finer than
/// `cache_stats_by_backend`. Operators use this to identify hot
/// collections for per-collection LRU pinning or eviction.
pub fn cache_stats_by_collection(
&self,
) -> std::collections::HashMap<crate::cache::CacheKey, crate::cache::PerBackendStats> {
self.cache.stats_by_collection()
}
/// What witness resolves from a `(backend, collection)` pair?
/// Useful for diagnostics and cross-backend cache-sharing tests.
pub fn cache_witness_of(&self, key: &CacheKey) -> Option<String> {
@ -190,11 +199,21 @@ impl RuLake {
}
let bundle = crate::RuLakeBundle::read_from_dir(dir)?;
let current = self.cache.witness_of(key);
if current.as_deref() == Some(bundle.rvf_witness.as_str()) {
Ok(RefreshResult::UpToDate)
} else {
self.cache.invalidate(key);
Ok(RefreshResult::Invalidated)
match current.as_deref() {
Some(w) if w == bundle.rvf_witness.as_str() => Ok(RefreshResult::UpToDate),
Some(_) => {
// Live cache pointer, but it disagrees with the
// published bundle → rotate it out.
self.cache.invalidate(key);
Ok(RefreshResult::Invalidated)
}
None => {
// Cache pointer is empty. Nothing to invalidate; the
// next search will prime transparently. Report
// UpToDate so daemons don't re-fire for every poll
// between "we invalidated" and "somebody queried".
Ok(RefreshResult::UpToDate)
}
}
}