mirror of
https://github.com/ruvnet/RuVector.git
synced 2026-05-29 11:13:33 +00:00
Four in-scope M1 items from the remaining backlog, landed together
because they cross-cut cleanly.
Iter 23 (rabitq): VectorKernel trait + CpuKernel default
- Trait: id(), caps() → KernelCaps, scan(ScanRequest) → ScanResponse.
Scan-phase determinism is the hard contract; rerank-phase nondet
is declared via caps().deterministic = false and the caller's
dispatch policy filters those out of Fresh/Frozen paths (ADR-157).
- CpuKernel wraps RabitqPlusIndex::search_with_rerank, always
available, unbounded dim, deterministic.
- Tests: CPU kernel matches direct search byte-exactly + respects
per-call rerank override + caps advertised correctly.
Iter 24 (rulake): memory_class on RuLakeBundle (ADR-156)
- Opaque caller-defined tag — agent systems write "episodic" /
"semantic" / etc; ruLake stores but never interprets.
- Not part of the witness: two bundles with identical data but
different memory_class share the cache.
- Serde default+skip_if_none keeps old bundles forward-compatible.
- Test: roundtrip + witness-unchanged + legacy bundles without the
field still parse.
Iter 25 (rulake): examples/sidecar_daemon.rs
- Runnable demo of publish_bundle / refresh_from_bundle_dir pair.
- Publisher mutates backend + re-publishes; daemon poll loop
detects witness change, invalidates; next query re-primes.
- Includes a bug fix in refresh_from_bundle_dir: when the cache
pointer is None (already invalidated), report UpToDate instead
of Invalidated so daemons don't re-fire on every poll between
"we invalidated" and "somebody queried."
Iter 26 (rulake): CacheStats::stats_by_collection
- Per-(backend, collection) counters, one level finer than
stats_by_backend. Operators can identify which specific
collection is hot and pin it in LRU or increase its shard count.
21 federation + 11 bundle + 3 fs_backend + 3 kernel = 38 tests
passing across both crates. Clippy -D warnings clean. Example runs
end-to-end.
Co-Authored-By: claude-flow <ruv@ruv.net>
134 lines
5 KiB
Rust
134 lines
5 KiB
Rust
//! Minimal cache-sidecar daemon demonstrating the bundle protocol
|
|
//! (ADR-155 §Consequences).
|
|
//!
|
|
//! Watches a publish directory for updates to `table.rulake.json`
|
|
//! and calls `RuLake::refresh_from_bundle_dir` to keep the reader
|
|
//! cache coherent with whatever the publisher has signed off.
|
|
//!
|
|
//! Run with:
|
|
//! cargo run --release -p ruvector-rulake --example sidecar_daemon
|
|
//!
|
|
//! The example simulates a full deployment in one process:
|
|
//! - A "publisher" backend with a collection of vectors
|
|
//! - A "reader" RuLake serving queries against the same collection
|
|
//! - The publisher updates the backend and emits a new bundle
|
|
//! - The reader's daemon polls, detects the witness change,
|
|
//! invalidates, and the next query re-primes automatically
|
|
//!
|
|
//! A real deployment replaces the polling loop with an inotify
|
|
//! watcher or GCS object-change notification; the shape is identical.
|
|
|
|
use std::sync::Arc;
|
|
use std::thread;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use ruvector_rulake::{cache::Consistency, LocalBackend, RefreshResult, RuLake};
|
|
|
|
fn main() {
|
|
println!("=== ruLake sidecar daemon demo ===\n");
|
|
|
|
// Shared publish directory. In production this would be a GCS
|
|
// mount, an S3FS prefix, or an NFS share.
|
|
let publish_dir =
|
|
std::env::temp_dir().join(format!("rulake-sidecar-demo-{}", std::process::id()));
|
|
std::fs::create_dir_all(&publish_dir).unwrap();
|
|
println!("Publish directory: {}", publish_dir.display());
|
|
|
|
// --- Publisher side ---
|
|
let backend = Arc::new(LocalBackend::new("publisher"));
|
|
backend
|
|
.put_collection(
|
|
"memories",
|
|
8,
|
|
(0..100).collect(),
|
|
(0..100)
|
|
.map(|i| (0..8).map(|j| (i + j) as f32 * 0.1).collect())
|
|
.collect(),
|
|
)
|
|
.unwrap();
|
|
|
|
let publisher_lake = RuLake::new(20, 42);
|
|
publisher_lake.register_backend(backend.clone()).unwrap();
|
|
let key = ("publisher".to_string(), "memories".to_string());
|
|
|
|
publisher_lake.publish_bundle(&key, &publish_dir).unwrap();
|
|
println!("[publisher] emitted initial table.rulake.json");
|
|
|
|
// --- Reader side ---
|
|
let reader_lake =
|
|
Arc::new(RuLake::new(20, 42).with_consistency(Consistency::Eventual { ttl_ms: 60_000 }));
|
|
reader_lake.register_backend(backend.clone()).unwrap();
|
|
|
|
// --- Daemon goroutine (polling-based) ---
|
|
let daemon_lake = Arc::clone(&reader_lake);
|
|
let daemon_dir = publish_dir.clone();
|
|
let daemon_key = key.clone();
|
|
let daemon_stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
|
|
let stop_handle = Arc::clone(&daemon_stop);
|
|
let daemon = thread::spawn(move || {
|
|
let poll_every = Duration::from_millis(100);
|
|
let deadline = Instant::now() + Duration::from_secs(5);
|
|
while !stop_handle.load(std::sync::atomic::Ordering::Relaxed) && Instant::now() < deadline {
|
|
match daemon_lake.refresh_from_bundle_dir(&daemon_key, &daemon_dir) {
|
|
Ok(RefreshResult::Invalidated) => {
|
|
println!("[daemon] bundle rotated — cache invalidated");
|
|
}
|
|
Ok(RefreshResult::UpToDate) => {} // quiet
|
|
Ok(RefreshResult::BundleMissing) => {
|
|
eprintln!("[daemon] sidecar missing at publish dir");
|
|
}
|
|
Err(e) => eprintln!("[daemon] refresh error: {e}"),
|
|
}
|
|
thread::sleep(poll_every);
|
|
}
|
|
});
|
|
|
|
// --- Workload: read a bunch, then publisher mutates, then read more ---
|
|
let q = vec![0.5f32; 8];
|
|
|
|
for _ in 0..10 {
|
|
let _ = reader_lake
|
|
.search_one("publisher", "memories", &q, 3)
|
|
.unwrap();
|
|
}
|
|
let stats1 = reader_lake.cache_stats();
|
|
println!(
|
|
"[reader] after 10 warm queries: hit_rate={:.3} primes={}",
|
|
stats1.hit_rate().unwrap_or(0.0),
|
|
stats1.primes
|
|
);
|
|
|
|
// Publisher side-effect: the backend data changed. Re-publish.
|
|
backend.append("memories", 999, vec![9.0; 8]).unwrap();
|
|
publisher_lake.publish_bundle(&key, &publish_dir).unwrap();
|
|
println!("[publisher] mutated backend + re-published bundle");
|
|
|
|
// Give the daemon a few poll cycles to see it.
|
|
thread::sleep(Duration::from_millis(300));
|
|
|
|
// Next queries: first one re-primes (daemon dropped the pointer),
|
|
// the rest are warm again.
|
|
for _ in 0..10 {
|
|
let _ = reader_lake
|
|
.search_one("publisher", "memories", &q, 3)
|
|
.unwrap();
|
|
}
|
|
let stats2 = reader_lake.cache_stats();
|
|
println!(
|
|
"[reader] after mutation: hit_rate={:.3} primes={} invalidations={}",
|
|
stats2.hit_rate().unwrap_or(0.0),
|
|
stats2.primes,
|
|
stats2.invalidations
|
|
);
|
|
|
|
assert!(
|
|
stats2.primes > stats1.primes,
|
|
"daemon should have triggered a re-prime"
|
|
);
|
|
|
|
daemon_stop.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
daemon.join().unwrap();
|
|
|
|
let _ = std::fs::remove_dir_all(&publish_dir);
|
|
println!("\n✓ daemon + reader stayed coherent through the bundle rotation");
|
|
}
|