feat: Wire coherence gate, epoch tracker, and metrics series into TieredStore

- Add CoherenceCheck, EpochTracker, MetricsSeries fields to TieredStore
- put() now records write epochs for staleness detection
- tick() auto-records metrics snapshots for trend analysis
- Add enable_coherence()/disable_coherence() + accessor methods
- Add coherence_check() convenience method on TieredStore
- 4 new integration tests verify wiring

https://claude.ai/code/session_01Ksy165BL5nGpVoWaAfTE7t
This commit is contained in:
Claude 2026-02-08 04:50:33 +00:00
parent f0cac3bef2
commit 441ba45296

View file

@ -425,6 +425,13 @@ pub struct TieredStore {
/// Witness log for auditing tiering decisions.
witness_log: crate::metrics::WitnessLog,
/// Optional coherence checker for read-after-write validation.
coherence: Option<crate::coherence::CoherenceCheck>,
/// Epoch tracker for staleness detection.
epoch_tracker: crate::coherence::EpochTracker,
/// Metrics time-series for trend analysis.
metrics_series: crate::metrics::MetricsSeries,
}
/// Smoothing constant for the exponential moving average of access rate.
@ -443,6 +450,9 @@ impl TieredStore {
tier2_keys: Vec::new(),
tier3_keys: Vec::new(),
witness_log: crate::metrics::WitnessLog::new(10_000),
coherence: None,
epoch_tracker: crate::coherence::EpochTracker::new(),
metrics_series: crate::metrics::MetricsSeries::new(256),
}
}
@ -462,6 +472,55 @@ impl TieredStore {
&mut self.witness_log
}
/// Enable coherence checking with the given configuration.
///
/// When enabled, every `put()` records a write epoch in the epoch tracker.
/// Callers can use [`coherence_check()`](Self::coherence_check) to validate
/// read-after-write consistency.
pub fn enable_coherence(&mut self, check: crate::coherence::CoherenceCheck) {
self.coherence = Some(check);
}
/// Disable coherence checking.
pub fn disable_coherence(&mut self) {
self.coherence = None;
}
/// Access the epoch tracker.
pub fn epoch_tracker(&self) -> &crate::coherence::EpochTracker {
&self.epoch_tracker
}
/// Access the epoch tracker mutably.
pub fn epoch_tracker_mut(&mut self) -> &mut crate::coherence::EpochTracker {
&mut self.epoch_tracker
}
/// Access the metrics time-series.
pub fn metrics_series(&self) -> &crate::metrics::MetricsSeries {
&self.metrics_series
}
/// Access the metrics time-series mutably.
pub fn metrics_series_mut(&mut self) -> &mut crate::metrics::MetricsSeries {
&mut self.metrics_series
}
/// Perform a coherence check on a recently written block.
///
/// Returns `None` if coherence checking is not enabled.
/// Returns `Some(Err(...))` if the block doesn't exist or is evicted.
/// Returns `Some(Ok(result))` with the coherence result.
pub fn coherence_check(
&mut self,
key: BlockKey,
original_data: &[f32],
now: u64,
) -> Option<Result<crate::coherence::CoherenceResult, StoreError>> {
let check = self.coherence.clone()?;
Some(check.check_coherence(self, key, original_data, now))
}
/// Compute current aggregate metrics.
pub fn metrics(&self) -> crate::metrics::StoreMetrics {
let mut m = crate::metrics::StoreMetrics::new();
@ -548,6 +607,9 @@ impl TieredStore {
tier,
});
// Record write epoch for staleness detection.
self.epoch_tracker.record_write(key);
Ok(())
}
@ -939,6 +1001,10 @@ impl TieredStore {
},
);
// Auto-record a metrics snapshot for trend analysis.
let snapshot_metrics = self.metrics();
self.metrics_series.record(now, snapshot_metrics);
result
}
@ -1791,4 +1857,81 @@ mod tests {
let result = store.score_block(make_key(99, 0), &config, 100);
assert_eq!(result, None);
}
// -----------------------------------------------------------------------
// Coherence integration
// -----------------------------------------------------------------------
#[test]
fn test_epoch_tracker_wired_into_put() {
let mut store = TieredStore::new(4096);
let key = BlockKey { tensor_id: 1, block_index: 0 };
let data = vec![1.0f32; 64];
assert_eq!(store.epoch_tracker().check_epoch(key), None);
store.put(key, &data, Tier::Tier1, 0).unwrap();
assert!(store.epoch_tracker().check_epoch(key).is_some());
let epoch1 = store.epoch_tracker().check_epoch(key).unwrap();
store.put(key, &data, Tier::Tier1, 1).unwrap();
let epoch2 = store.epoch_tracker().check_epoch(key).unwrap();
assert!(epoch2 > epoch1, "epoch should increment on overwrite");
}
#[test]
fn test_coherence_disabled_by_default() {
let mut store = TieredStore::new(4096);
let key = BlockKey { tensor_id: 1, block_index: 0 };
let data = vec![1.0f32; 64];
store.put(key, &data, Tier::Tier1, 0).unwrap();
assert!(store.coherence_check(key, &data, 1).is_none());
}
#[test]
fn test_coherence_enabled_passes() {
let mut store = TieredStore::new(4096);
store.enable_coherence(crate::coherence::CoherenceCheck::default());
let key = BlockKey { tensor_id: 1, block_index: 0 };
let data: Vec<f32> = (0..64).map(|i| (i as f32 + 1.0) * 0.25).collect();
store.put(key, &data, Tier::Tier1, 0).unwrap();
let result = store.coherence_check(key, &data, 1).unwrap().unwrap();
assert!(result.passed, "Tier1 coherence should pass; err={}", result.max_error);
}
// -----------------------------------------------------------------------
// MetricsSeries integration
// -----------------------------------------------------------------------
#[test]
fn test_metrics_series_wired_into_tick() {
use crate::tiering::TierConfig;
let mut store = TieredStore::new(4096);
let config = TierConfig::default();
// Put a few blocks.
for i in 0..5u128 {
let key = BlockKey { tensor_id: i, block_index: 0 };
store.put(key, &vec![1.0f32; 64], Tier::Tier1, 0).unwrap();
}
assert!(store.metrics_series().is_empty());
// Run a tick -- should auto-record a metrics snapshot.
store.tick(&config, 100, 1_000_000, 100);
assert_eq!(store.metrics_series().len(), 1);
// Run another tick.
store.tick(&config, 200, 1_000_000, 100);
assert_eq!(store.metrics_series().len(), 2);
// Latest snapshot should reflect current state.
let (ts, m) = store.metrics_series().latest().unwrap();
assert_eq!(*ts, 200);
assert_eq!(m.total_blocks, 5);
}
}