From 441ba45296fc71ae11042f8bfc9aaaccfac86bdc Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 8 Feb 2026 04:50:33 +0000 Subject: [PATCH] feat: Wire coherence gate, epoch tracker, and metrics series into TieredStore - Add CoherenceCheck, EpochTracker, MetricsSeries fields to TieredStore - put() now records write epochs for staleness detection - tick() auto-records metrics snapshots for trend analysis - Add enable_coherence()/disable_coherence() + accessor methods - Add coherence_check() convenience method on TieredStore - 4 new integration tests verify wiring https://claude.ai/code/session_01Ksy165BL5nGpVoWaAfTE7t --- crates/ruvector-temporal-tensor/src/store.rs | 143 +++++++++++++++++++ 1 file changed, 143 insertions(+) diff --git a/crates/ruvector-temporal-tensor/src/store.rs b/crates/ruvector-temporal-tensor/src/store.rs index 4973f195..a2d09e7b 100644 --- a/crates/ruvector-temporal-tensor/src/store.rs +++ b/crates/ruvector-temporal-tensor/src/store.rs @@ -425,6 +425,13 @@ pub struct TieredStore { /// Witness log for auditing tiering decisions. witness_log: crate::metrics::WitnessLog, + + /// Optional coherence checker for read-after-write validation. + coherence: Option, + /// Epoch tracker for staleness detection. + epoch_tracker: crate::coherence::EpochTracker, + /// Metrics time-series for trend analysis. + metrics_series: crate::metrics::MetricsSeries, } /// Smoothing constant for the exponential moving average of access rate. @@ -443,6 +450,9 @@ impl TieredStore { tier2_keys: Vec::new(), tier3_keys: Vec::new(), witness_log: crate::metrics::WitnessLog::new(10_000), + coherence: None, + epoch_tracker: crate::coherence::EpochTracker::new(), + metrics_series: crate::metrics::MetricsSeries::new(256), } } @@ -462,6 +472,55 @@ impl TieredStore { &mut self.witness_log } + /// Enable coherence checking with the given configuration. + /// + /// When enabled, every `put()` records a write epoch in the epoch tracker. + /// Callers can use [`coherence_check()`](Self::coherence_check) to validate + /// read-after-write consistency. + pub fn enable_coherence(&mut self, check: crate::coherence::CoherenceCheck) { + self.coherence = Some(check); + } + + /// Disable coherence checking. + pub fn disable_coherence(&mut self) { + self.coherence = None; + } + + /// Access the epoch tracker. + pub fn epoch_tracker(&self) -> &crate::coherence::EpochTracker { + &self.epoch_tracker + } + + /// Access the epoch tracker mutably. + pub fn epoch_tracker_mut(&mut self) -> &mut crate::coherence::EpochTracker { + &mut self.epoch_tracker + } + + /// Access the metrics time-series. + pub fn metrics_series(&self) -> &crate::metrics::MetricsSeries { + &self.metrics_series + } + + /// Access the metrics time-series mutably. + pub fn metrics_series_mut(&mut self) -> &mut crate::metrics::MetricsSeries { + &mut self.metrics_series + } + + /// Perform a coherence check on a recently written block. + /// + /// Returns `None` if coherence checking is not enabled. + /// Returns `Some(Err(...))` if the block doesn't exist or is evicted. + /// Returns `Some(Ok(result))` with the coherence result. + pub fn coherence_check( + &mut self, + key: BlockKey, + original_data: &[f32], + now: u64, + ) -> Option> { + let check = self.coherence.clone()?; + Some(check.check_coherence(self, key, original_data, now)) + } + /// Compute current aggregate metrics. pub fn metrics(&self) -> crate::metrics::StoreMetrics { let mut m = crate::metrics::StoreMetrics::new(); @@ -548,6 +607,9 @@ impl TieredStore { tier, }); + // Record write epoch for staleness detection. + self.epoch_tracker.record_write(key); + Ok(()) } @@ -939,6 +1001,10 @@ impl TieredStore { }, ); + // Auto-record a metrics snapshot for trend analysis. + let snapshot_metrics = self.metrics(); + self.metrics_series.record(now, snapshot_metrics); + result } @@ -1791,4 +1857,81 @@ mod tests { let result = store.score_block(make_key(99, 0), &config, 100); assert_eq!(result, None); } + + // ----------------------------------------------------------------------- + // Coherence integration + // ----------------------------------------------------------------------- + + #[test] + fn test_epoch_tracker_wired_into_put() { + let mut store = TieredStore::new(4096); + let key = BlockKey { tensor_id: 1, block_index: 0 }; + let data = vec![1.0f32; 64]; + + assert_eq!(store.epoch_tracker().check_epoch(key), None); + + store.put(key, &data, Tier::Tier1, 0).unwrap(); + assert!(store.epoch_tracker().check_epoch(key).is_some()); + + let epoch1 = store.epoch_tracker().check_epoch(key).unwrap(); + store.put(key, &data, Tier::Tier1, 1).unwrap(); + let epoch2 = store.epoch_tracker().check_epoch(key).unwrap(); + assert!(epoch2 > epoch1, "epoch should increment on overwrite"); + } + + #[test] + fn test_coherence_disabled_by_default() { + let mut store = TieredStore::new(4096); + let key = BlockKey { tensor_id: 1, block_index: 0 }; + let data = vec![1.0f32; 64]; + store.put(key, &data, Tier::Tier1, 0).unwrap(); + + assert!(store.coherence_check(key, &data, 1).is_none()); + } + + #[test] + fn test_coherence_enabled_passes() { + let mut store = TieredStore::new(4096); + store.enable_coherence(crate::coherence::CoherenceCheck::default()); + + let key = BlockKey { tensor_id: 1, block_index: 0 }; + let data: Vec = (0..64).map(|i| (i as f32 + 1.0) * 0.25).collect(); + store.put(key, &data, Tier::Tier1, 0).unwrap(); + + let result = store.coherence_check(key, &data, 1).unwrap().unwrap(); + assert!(result.passed, "Tier1 coherence should pass; err={}", result.max_error); + } + + // ----------------------------------------------------------------------- + // MetricsSeries integration + // ----------------------------------------------------------------------- + + #[test] + fn test_metrics_series_wired_into_tick() { + use crate::tiering::TierConfig; + + let mut store = TieredStore::new(4096); + let config = TierConfig::default(); + + // Put a few blocks. + for i in 0..5u128 { + let key = BlockKey { tensor_id: i, block_index: 0 }; + store.put(key, &vec![1.0f32; 64], Tier::Tier1, 0).unwrap(); + } + + assert!(store.metrics_series().is_empty()); + + // Run a tick -- should auto-record a metrics snapshot. + store.tick(&config, 100, 1_000_000, 100); + assert_eq!(store.metrics_series().len(), 1); + + // Run another tick. + store.tick(&config, 200, 1_000_000, 100); + assert_eq!(store.metrics_series().len(), 2); + + // Latest snapshot should reflect current state. + let (ts, m) = store.metrics_series().latest().unwrap(); + assert_eq!(*ts, 200); + assert_eq!(m.total_blocks, 5); + } }