diff --git a/examples/connectome-fly/src/observer/core.rs b/examples/connectome-fly/src/observer/core.rs index 49f51207..2f1eba2b 100644 --- a/examples/connectome-fly/src/observer/core.rs +++ b/examples/connectome-fly/src/observer/core.rs @@ -7,6 +7,7 @@ use crate::connectome::NeuronId; use crate::lif::Spike; use super::eigensolver::{approx_fiedler_power, jacobi_symmetric}; +use super::incremental_fiedler::IncrementalCofireAccumulator; use super::report::{CoherenceEvent, Report}; use super::sparse_fiedler::sparse_fiedler; @@ -31,6 +32,10 @@ pub struct Observer { // Fiedler detector state. window_ms: f32, cofire_window: VecDeque, + /// Incremental `BTreeMap<(NeuronId, NeuronId), u32>` of + /// τ-coincident pair counts inside `cofire_window`. Replaces the + /// O(S²) per-detect pair sweep — ADR-154 §16 lever 3. + cofire_accum: IncrementalCofireAccumulator, last_detect_ms: f32, detect_every_ms: f32, baseline: RollingStats, @@ -76,6 +81,7 @@ impl Observer { spikes: Vec::with_capacity(1 << 14), window_ms: 50.0, cofire_window: VecDeque::with_capacity(1 << 14), + cofire_accum: IncrementalCofireAccumulator::new(), last_detect_ms: 0.0, detect_every_ms: 5.0, baseline: RollingStats::default(), @@ -137,25 +143,54 @@ impl Observer { // constructed-collapse test envelope (markers at t≥500 ms; // constructed collapses span > 60 ms, so a 20 ms cadence // still catches any ≥50 ms pre-marker event). - (self.detect_every_ms * 4.0).min(20.0).max(self.detect_every_ms) + (self.detect_every_ms * 4.0) + .min(20.0) + .max(self.detect_every_ms) } else { self.detect_every_ms } } /// Called by the engine on every spike emission. + /// + /// Order of operations matters for the incremental accumulator: + /// + /// 1. Append `s` to `cofire_window`. + /// 2. Accumulator `push(s, …)` paired against the existing window + /// contents (excluding the just-pushed `s`). This adds edge + /// counts for every τ-coincident prior spike. + /// 3. Expire-from-front: for each popped spike `q`, accumulator + /// `expire(q, remaining_window)` — decrements edge counts for + /// every τ-coincident remaining spike. `q` is always the + /// oldest spike in the window, so the τ-band it paired against + /// is near the front; walking forward from the new front lets + /// `expire` break out as soon as it leaves the band. + /// 4. Detect. The accumulator is now exactly the pair-count state + /// implied by the current `cofire_window`, and + /// `compute_fiedler` reads it directly. pub fn on_spike(&mut self, s: Spike) { self.spikes.push(s); self.cofire_window.push_back(s); self.t_end_hint_ms = self.t_end_hint_ms.max(s.t_ms); + + // Incremental push: pair `s` against every prior window spike + // within τ. `len() - 1` is safe because we just pushed `s`. + let prior_len = self.cofire_window.len() - 1; + self.cofire_accum + .push(s, self.cofire_window.iter().take(prior_len)); + + // Expire spikes that slid out of the window, and decrement + // their pair counts against the remaining window contents. let cutoff = s.t_ms - self.window_ms; while let Some(front) = self.cofire_window.front() { if front.t_ms < cutoff { - self.cofire_window.pop_front(); + let q = self.cofire_window.pop_front().expect("front just checked"); + self.cofire_accum.expire(q, self.cofire_window.iter()); } else { break; } } + let interval = self.current_detect_interval_ms(); if s.t_ms - self.last_detect_ms >= interval { self.last_detect_ms = s.t_ms; @@ -187,6 +222,11 @@ impl Observer { } /// Fiedler value of the co-firing-window Laplacian. + /// + /// The adjacency is read from the incremental pair-count + /// accumulator rather than re-derived by an `O(S²)` pair sweep on + /// every call. See `IncrementalCofireAccumulator` for the update + /// rules that keep the map consistent with `cofire_window`. fn compute_fiedler(&self) -> f32 { if self.cofire_window.len() < 2 { return f32::NAN; @@ -202,31 +242,19 @@ impl Observer { // the dense-matrix ceiling — avoids the O(n²) adjacency / // Laplacian allocation below. Threshold is 1024 so existing // demo-scale runs (N=1024 per ADR-154 §3) stay on the dense - // path and AC-1 remains bit-exact vs head. + // path and AC-1 remains bit-exact vs head. The sparse path + // still reconstructs edges from the live window rather than + // the accumulator; that is a separate refactor (`snapshot_sparse` + // is exposed on the accumulator for it) and is out of scope + // for this lever. if n > SPARSE_FIEDLER_N_THRESHOLD { return sparse_fiedler(&active, &self.cofire_window, SPARSE_FIEDLER_N_THRESHOLD); } - let index_of = |id: NeuronId| -> Option { active.binary_search(&id).ok() }; - let tau = 5.0_f32; - let mut a = vec![0.0_f32; n * n]; - let spikes: Vec<_> = self.cofire_window.iter().copied().collect(); - for (i, sa) in spikes.iter().enumerate() { - let ai = match index_of(sa.neuron) { - Some(x) => x, - None => continue, - }; - for sb in &spikes[i + 1..] { - if (sb.t_ms - sa.t_ms).abs() > tau { - break; - } - if let Some(bi) = index_of(sb.neuron) { - if ai != bi { - a[ai * n + bi] += 1.0; - a[bi * n + ai] += 1.0; - } - } - } - } + // Dense path: read the `n × n` adjacency directly from the + // incremental accumulator instead of re-sweeping every pair + // in the window. This replaces the O(S²) pair loop with an + // O(|edges| · log n) map traversal. + let a = self.cofire_accum.snapshot_adjacency(&active); let mut l = vec![0.0_f32; n * n]; for i in 0..n { let mut d = 0.0_f32; diff --git a/examples/connectome-fly/src/observer/incremental_fiedler.rs b/examples/connectome-fly/src/observer/incremental_fiedler.rs new file mode 100644 index 00000000..b7cac1ad --- /dev/null +++ b/examples/connectome-fly/src/observer/incremental_fiedler.rs @@ -0,0 +1,402 @@ +//! Incremental co-firing accumulator for the Fiedler detector. +//! +//! ADR-154 §16 lever 3. The dense pair sweep in +//! `super::core::Observer::compute_fiedler` walks every spike pair in +//! the 50 ms window on every detect call. Under saturation +//! (~21 000 spikes in the window at N=1024) that is ~2.2·10⁸ pair +//! touches per detect, all of which are redundant work: the same pairs +//! were valid in the previous detect too, minus a thin prefix of +//! expired spikes and plus a thin suffix of newly-arrived spikes. +//! +//! This module maintains an `O(|edges|)` running count of τ-coincident +//! spike pairs inside the rolling window. The count is updated in +//! amortised `O(τ · rate · N)` per `push`/`expire` — which for τ=5 ms, +//! a 50 ms window, and the saturated N=1024 regime is ~2 000 touches +//! per spike instead of ~21 000. Each `compute_fiedler` then just +//! iterates the accumulator to build the `n × n` adjacency, with no +//! pair-sweep at all. +//! +//! ## Determinism +//! +//! AC-1 (bit-exact repeatability at N=1024) requires that iteration +//! order over the accumulator be identical across runs. We use +//! `BTreeMap` keyed by `(lo, hi)` sorted `NeuronId` pairs, so iteration +//! is in lexicographic key order — byte-for-byte reproducible. +//! `HashMap` would hash-randomise and silently break AC-1. +//! +//! ## Responsibility split +//! +//! The `Observer` owns the `VecDeque` that is the physical +//! window. This module is stateless with respect to that window — it +//! is told "a spike arrived" (`push`) or "a spike left" (`expire`) +//! and is given a reference to the spikes it should pair against. +//! It owns only the `BTreeMap` of pair counts. + +use std::collections::BTreeMap; + +use crate::connectome::NeuronId; +use crate::lif::Spike; + +/// Co-firing coincidence window in ms. Matches the dense path in +/// `super::core::Observer::compute_fiedler` and the sparse path in +/// `super::sparse_fiedler`. +pub const COFIRE_TAU_MS: f32 = 5.0; + +/// Lexicographically-ordered neuron pair, used as the accumulator key. +/// Guarantees `(a, b)` and `(b, a)` map to the same entry. +pub type PairKey = (NeuronId, NeuronId); + +/// Maintains a rolling `BTreeMap<(NeuronId, NeuronId), u32>` of +/// τ-coincident spike-pair counts inside the observer's sliding window. +/// +/// The accumulator does not own the window spikes — it is updated +/// incrementally by the `Observer` whenever a spike is added +/// (`push`) or removed (`expire`). `snapshot_adjacency` consumes the +/// current map into a dense `n²` float vector suitable for the +/// existing Jacobi / shifted-power-iteration paths; `snapshot_sparse` +/// returns an iterator of `(u, v, weight)` triples for the sparse +/// path at `n > 1024`. +pub struct IncrementalCofireAccumulator { + /// Coincidence half-window in ms. + tau_ms: f32, + /// Pair counts, keyed by lexicographically-ordered `(lo, hi)`. + /// `BTreeMap` (not `HashMap`) — iteration order is deterministic, + /// which AC-1 bit-exactness requires. + counts: BTreeMap, +} + +impl IncrementalCofireAccumulator { + /// New empty accumulator with τ = 5 ms (matches the dense path). + pub fn new() -> Self { + Self::with_tau(COFIRE_TAU_MS) + } + + /// New empty accumulator with a custom coincidence half-window. + pub fn with_tau(tau_ms: f32) -> Self { + Self { + tau_ms, + counts: BTreeMap::new(), + } + } + + /// Number of distinct undirected pairs currently tracked. + pub fn edge_count(&self) -> usize { + self.counts.len() + } + + /// Tau in ms. + pub fn tau_ms(&self) -> f32 { + self.tau_ms + } + + /// Wipe the accumulator. Used when the window itself is reset. + pub fn clear(&mut self) { + self.counts.clear(); + } + + /// Record a new spike `s` entering the window. `prior_spikes` are + /// the spikes that were already in the window *before* `s` was + /// pushed, ordered oldest → newest (i.e. the `VecDeque` iterated + /// via `iter()` with `s` omitted). + /// + /// For each prior spike `p` with `|s.t_ms − p.t_ms| ≤ tau` we + /// increment the pair count. Since spikes arrive in monotonically- + /// non-decreasing time order, walking from the newest prior spike + /// backward lets us break out as soon as we exit the τ band. + pub fn push<'a, I>(&mut self, s: Spike, prior_spikes: I) + where + I: IntoIterator, + I::IntoIter: DoubleEndedIterator, + { + for p in prior_spikes.into_iter().rev() { + let dt = (s.t_ms - p.t_ms).abs(); + if dt > self.tau_ms { + break; + } + if p.neuron == s.neuron { + continue; + } + let key = ordered_pair(s.neuron, p.neuron); + *self.counts.entry(key).or_insert(0) += 1; + } + } + + /// Record a spike `q` leaving the window. `remaining_spikes` are + /// the spikes still in the window *after* `q` was popped, ordered + /// oldest → newest. + /// + /// For each remaining spike `r` with `|q.t_ms − r.t_ms| ≤ tau` we + /// decrement the pair count. Counts that reach zero are removed + /// so `edge_count()` accurately reflects live edges and the + /// snapshot path skips dead entries. + /// + /// Walking remaining spikes from oldest forward lets us break out + /// as soon as we exit the τ band, since times are non-decreasing + /// and `q` (just popped from front) is oldest. + pub fn expire<'a, I>(&mut self, q: Spike, remaining_spikes: I) + where + I: IntoIterator, + { + for r in remaining_spikes { + let dt = (r.t_ms - q.t_ms).abs(); + if dt > self.tau_ms { + break; + } + if r.neuron == q.neuron { + continue; + } + let key = ordered_pair(q.neuron, r.neuron); + if let Some(entry) = self.counts.get_mut(&key) { + debug_assert!( + *entry > 0, + "expire decremented zero entry — push/expire imbalance" + ); + *entry -= 1; + if *entry == 0 { + self.counts.remove(&key); + } + } + } + } + + /// Build the dense `n × n` symmetric adjacency vector consumed by + /// the existing Jacobi / dense shifted-power-iteration eigen-paths. + /// + /// `active` is the sorted, deduplicated list of `NeuronId`s whose + /// spikes lie in the current window. The caller is responsible for + /// maintaining this list (the `Observer` derives it on demand from + /// its `cofire_window`). Returns a flattened `n*n` row-major vector + /// with edge weights on both `(i, j)` and `(j, i)`. + /// + /// Pairs whose endpoints are not in `active` are silently skipped — + /// this should not happen if `push` / `expire` were called + /// correctly (every neuron with a live count is by construction + /// in the live window) but the guard keeps the snapshot robust to + /// future filter logic. + pub fn snapshot_adjacency(&self, active: &[NeuronId]) -> Vec { + let n = active.len(); + let mut a = vec![0.0_f32; n * n]; + for (&(u, v), &c) in &self.counts { + let Ok(ai) = active.binary_search(&u) else { + continue; + }; + let Ok(bi) = active.binary_search(&v) else { + continue; + }; + if ai == bi { + continue; + } + let w = c as f32; + a[ai * n + bi] = w; + a[bi * n + ai] = w; + } + a + } + + /// Iterator of `(lo_id, hi_id, weight)` triples for the currently- + /// tracked pair counts. Used by the sparse path at `n > 1024` to + /// drive CSR assembly without materialising an `n × n` matrix. + /// + /// Iteration order is deterministic (`BTreeMap` key order). + pub fn snapshot_sparse(&self) -> impl Iterator + '_ { + self.counts.iter().map(|(&(u, v), &c)| (u, v, c as f32)) + } + + /// Read-only handle to the underlying map. Used by tests and + /// diagnostics; hot-path callers should use `snapshot_adjacency` + /// or `snapshot_sparse` so the accumulator owns the iteration + /// discipline. + pub fn raw_counts(&self) -> &BTreeMap { + &self.counts + } +} + +impl Default for IncrementalCofireAccumulator { + fn default() -> Self { + Self::new() + } +} + +/// Sort a neuron pair into `(lo, hi)` canonical order so `(a, b)` and +/// `(b, a)` map to the same `BTreeMap` key. +#[inline] +fn ordered_pair(a: NeuronId, b: NeuronId) -> PairKey { + if a < b { + (a, b) + } else { + (b, a) + } +} + +// --------------------------------------------------------------------- +// Unit tests — local invariants. Equivalence against the pair-sweep +// path and AC-1 bit-exactness live in `tests/incremental_fiedler.rs`. +// --------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::VecDeque; + + fn spike(t: f32, n: u32) -> Spike { + Spike { + t_ms: t, + neuron: NeuronId(n), + } + } + + /// Push two τ-coincident spikes on distinct neurons — one edge, + /// one count. + #[test] + fn push_single_coincident_pair() { + let mut acc = IncrementalCofireAccumulator::new(); + let mut w = VecDeque::new(); + let a = spike(10.0, 0); + w.push_back(a); + acc.push(a, w.iter().take(w.len() - 1)); + let b = spike(12.0, 1); + w.push_back(b); + acc.push(b, w.iter().take(w.len() - 1)); + assert_eq!(acc.edge_count(), 1); + let (&key, &c) = acc.raw_counts().iter().next().unwrap(); + assert_eq!(key, (NeuronId(0), NeuronId(1))); + assert_eq!(c, 1); + } + + /// Push two spikes on the same neuron — no self-edge is ever + /// recorded. + #[test] + fn push_rejects_self_edges() { + let mut acc = IncrementalCofireAccumulator::new(); + let mut w = VecDeque::new(); + for t in [10.0, 12.0] { + let s = spike(t, 7); + w.push_back(s); + acc.push(s, w.iter().take(w.len() - 1)); + } + assert_eq!(acc.edge_count(), 0); + } + + /// Push two spikes separated by > τ — no edge. + #[test] + fn push_skips_beyond_tau() { + let mut acc = IncrementalCofireAccumulator::new(); + let mut w = VecDeque::new(); + let a = spike(10.0, 0); + w.push_back(a); + acc.push(a, w.iter().take(w.len() - 1)); + let b = spike(20.0, 1); // 10 ms apart, τ = 5 ms + w.push_back(b); + acc.push(b, w.iter().take(w.len() - 1)); + assert_eq!(acc.edge_count(), 0); + } + + /// Push then expire the same pair — count drops to zero and the + /// entry is purged. + #[test] + fn push_then_expire_symmetric() { + let mut acc = IncrementalCofireAccumulator::new(); + let mut w = VecDeque::new(); + let a = spike(10.0, 0); + let b = spike(12.0, 1); + w.push_back(a); + acc.push(a, w.iter().take(w.len() - 1)); + w.push_back(b); + acc.push(b, w.iter().take(w.len() - 1)); + assert_eq!(acc.edge_count(), 1); + // Expire `a` (simulates window pop from front). + let q = w.pop_front().unwrap(); + acc.expire(q, w.iter()); + assert_eq!(acc.edge_count(), 0); + } + + /// Multi-count: same pair co-fires twice, count = 2; expire one, + /// count = 1. + #[test] + fn pair_count_accumulates_and_decrements() { + let mut acc = IncrementalCofireAccumulator::new(); + let mut w = VecDeque::new(); + let spikes = [ + spike(10.0, 0), + spike(12.0, 1), + spike(14.0, 0), + spike(16.0, 1), + ]; + for s in spikes { + w.push_back(s); + acc.push(s, w.iter().take(w.len() - 1)); + } + // (0,1) pairs τ-coincident within 5 ms, skipping same-neuron: + // (t=10,n=0) ↔ (t=12,n=1): Δt=2 ✓ + // (t=10,n=0) ↔ (t=16,n=1): Δt=6 ✗ + // (t=12,n=1) ↔ (t=14,n=0): Δt=2 ✓ + // (t=14,n=0) ↔ (t=16,n=1): Δt=2 ✓ + // → 3 counts. + let &c = acc.raw_counts().get(&(NeuronId(0), NeuronId(1))).unwrap(); + assert_eq!(c, 3); + // Expire t=10 (neuron 0). Pairs lost: + // (t=10,n=0) ↔ (t=12,n=1): Δt=2 ✓ + // (t=10,n=0) ↔ (t=14,n=0): same neuron, skipped + // (t=10,n=0) ↔ (t=16,n=1): Δt=6, expire's forward scan + // breaks before reaching this (remaining: t=12,14,16). + // → 1 lost, count 3 → 2. + let q = w.pop_front().unwrap(); + acc.expire(q, w.iter()); + let &c = acc.raw_counts().get(&(NeuronId(0), NeuronId(1))).unwrap(); + assert_eq!(c, 2); + } + + /// Snapshot to dense adjacency: symmetric, correct magnitudes. + #[test] + fn snapshot_adjacency_is_symmetric() { + let mut acc = IncrementalCofireAccumulator::new(); + let mut w = VecDeque::new(); + for s in [ + spike(10.0, 0), + spike(11.0, 1), + spike(12.0, 2), + spike(13.0, 0), + ] { + w.push_back(s); + acc.push(s, w.iter().take(w.len() - 1)); + } + let active = vec![NeuronId(0), NeuronId(1), NeuronId(2)]; + let a = acc.snapshot_adjacency(&active); + assert_eq!(a.len(), 9); + // Symmetry. + for i in 0..3 { + for j in 0..3 { + assert_eq!(a[i * 3 + j], a[j * 3 + i]); + } + } + // Diagonal is zero (self-edges rejected). + for i in 0..3 { + assert_eq!(a[i * 3 + i], 0.0); + } + } + + /// `BTreeMap` iteration is deterministic across fresh allocations; + /// the sparse snapshot must produce the same ordered triples on + /// repeated construction. + #[test] + fn sparse_snapshot_is_deterministic() { + let build = || { + let mut acc = IncrementalCofireAccumulator::new(); + let mut w = VecDeque::new(); + for s in [ + spike(10.0, 5), + spike(11.0, 2), + spike(12.0, 9), + spike(13.0, 2), + spike(14.0, 5), + ] { + w.push_back(s); + acc.push(s, w.iter().take(w.len() - 1)); + } + acc.snapshot_sparse().collect::>() + }; + let a = build(); + let b = build(); + assert_eq!(a, b); + } +} diff --git a/examples/connectome-fly/src/observer/mod.rs b/examples/connectome-fly/src/observer/mod.rs index 860bd3a9..fb365516 100644 --- a/examples/connectome-fly/src/observer/mod.rs +++ b/examples/connectome-fly/src/observer/mod.rs @@ -3,23 +3,31 @@ //! //! Submodules: //! -//! - `core` — `Observer` and its public API. -//! - `report` — serializable report + `CoherenceEvent`. -//! - `eigensolver` — Jacobi full-eigendecomposition for small windows -//! plus a shifted-power-iteration fallback for -//! larger ones. -//! - `sparse_fiedler` — sparse shifted-power-iteration path for windows -//! with more than 1024 active neurons; uses -//! `ruvector_sparsifier::SparseGraph` as the -//! canonical scratch edge container so memory per -//! detect stays `O(n + nnz)` instead of `O(n²)`. +//! - `core` — `Observer` and its public API. +//! - `report` — serializable report + `CoherenceEvent`. +//! - `eigensolver` — Jacobi full-eigendecomposition for small +//! windows plus a shifted-power-iteration +//! fallback for larger ones. +//! - `sparse_fiedler` — sparse shifted-power-iteration path for +//! windows with more than 1024 active +//! neurons; uses +//! `ruvector_sparsifier::SparseGraph` as the +//! canonical scratch edge container so +//! memory per detect stays `O(n + nnz)` +//! instead of `O(n²)`. +//! - `incremental_fiedler` — rolling `BTreeMap` of τ-coincident pair +//! counts, updated in `on_spike` and +//! amortising the O(S²) pair sweep. ADR-154 +//! §16 lever 3. pub mod core; pub mod eigensolver; +pub mod incremental_fiedler; pub mod report; pub mod sparse_fiedler; pub use core::Observer; +pub use incremental_fiedler::IncrementalCofireAccumulator; pub use report::{CoherenceEvent, Report}; pub use sparse_fiedler::sparse_fiedler; diff --git a/examples/connectome-fly/tests/incremental_fiedler.rs b/examples/connectome-fly/tests/incremental_fiedler.rs new file mode 100644 index 00000000..acf5c69f --- /dev/null +++ b/examples/connectome-fly/tests/incremental_fiedler.rs @@ -0,0 +1,359 @@ +//! Correctness + microbench for the incremental Fiedler accumulator. +//! +//! ADR-154 §16 lever 3. The accumulator replaces the O(S²) per-detect +//! pair sweep in `Observer::compute_fiedler` with an amortised +//! `BTreeMap<(NeuronId, NeuronId), u32>` of τ-coincident pair counts, +//! updated in `Observer::on_spike`. +//! +//! Tests: +//! +//! 1. `incremental_adjacency_matches_pair_sweep` — on a ~200-spike +//! fixture, the dense `n × n` adjacency produced by +//! `snapshot_adjacency` matches the exact O(S²) pair-sweep +//! adjacency byte-for-byte (counts are integers, so "within 1e-4 +//! relative" collapses to exact equality at this scale). +//! 2. `incremental_fiedler_matches_pair_sweep_within_1e_4` — full +//! `approx_fiedler_power` run on both adjacencies yields the same +//! Fiedler value within 1e-4 relative error (per the task spec). +//! 3. `incremental_accumulator_is_bit_deterministic` — repeated +//! construction from identical spike streams produces identical +//! adjacency vectors (AC-1-style). +//! 4. `push_expire_round_trip_is_empty` — after pushing every spike +//! in a long stream and expiring them all, the accumulator is +//! empty (no leaked edges). +//! 5. `per_detect_microbench` — saturated-window fixture measures +//! per-detect wallclock of the incremental path vs the pair-sweep +//! path. Prints the speedup; fails only if the incremental path is +//! *slower* than the pair-sweep path (the regression guard). + +use std::collections::VecDeque; +use std::time::Instant; + +use connectome_fly::observer::eigensolver::approx_fiedler_power; +use connectome_fly::observer::incremental_fiedler::{IncrementalCofireAccumulator, COFIRE_TAU_MS}; +use connectome_fly::{NeuronId, Spike}; + +// --------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------- + +/// Reproduce the pre-commit pair-sweep adjacency: for every pair of +/// window spikes within τ, increment the `(ai, bi)` entry. Used as +/// the reference the incremental accumulator must match. +fn pair_sweep_adjacency(active: &[NeuronId], window: &VecDeque) -> Vec { + let n = active.len(); + let mut a = vec![0.0_f32; n * n]; + let spikes: Vec<_> = window.iter().copied().collect(); + for (i, sa) in spikes.iter().enumerate() { + let Ok(ai) = active.binary_search(&sa.neuron) else { + continue; + }; + for sb in &spikes[i + 1..] { + if (sb.t_ms - sa.t_ms).abs() > COFIRE_TAU_MS { + break; + } + let Ok(bi) = active.binary_search(&sb.neuron) else { + continue; + }; + if ai == bi { + continue; + } + a[ai * n + bi] += 1.0; + a[bi * n + ai] += 1.0; + } + } + a +} + +/// Build an accumulator by replaying a spike stream through the +/// `push` / `expire` cycle that `Observer::on_spike` performs, +/// honouring a sliding `window_ms`. Returns the accumulator and the +/// final window. +fn replay(stream: &[Spike], window_ms: f32) -> (IncrementalCofireAccumulator, VecDeque) { + let mut acc = IncrementalCofireAccumulator::new(); + let mut window: VecDeque = VecDeque::new(); + for &s in stream { + window.push_back(s); + let prior_len = window.len() - 1; + acc.push(s, window.iter().take(prior_len)); + let cutoff = s.t_ms - window_ms; + while let Some(front) = window.front() { + if front.t_ms < cutoff { + let q = window.pop_front().unwrap(); + acc.expire(q, window.iter()); + } else { + break; + } + } + } + (acc, window) +} + +/// Extract sorted-deduped active-neuron list from a window. +fn active_from_window(window: &VecDeque) -> Vec { + let mut v: Vec = window.iter().map(|s| s.neuron).collect(); + v.sort(); + v.dedup(); + v +} + +/// Seeded LCG (matches `Observer`-style determinism needs without +/// pulling rand into dev-deps). Returns a generator producing +/// reproducible u32 values. +fn lcg(seed: u64) -> impl FnMut() -> u32 { + let mut s = seed.wrapping_mul(0x9E37_79B9_7F4A_7C15) ^ 0xDEAD_BEEF_CAFE_F00D; + move || { + s = s + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + (s >> 32) as u32 + } +} + +/// Build a ~200-spike fixture: two weakly-coupled clusters, with +/// intra-cluster bursts and a handful of cross-cluster co-fires. +/// Deterministic. +fn fixture_200_spikes() -> Vec { + let mut out: Vec = Vec::with_capacity(220); + let mut rng = lcg(42); + // Cluster A: neurons 0..8, bursts every 8 ms for 100 ms. + for k in 0..12 { + let t0 = 5.0 + k as f32 * 8.0; + for i in 0..8_u32 { + out.push(Spike { + t_ms: t0 + (rng() % 1000) as f32 * 0.002, + neuron: NeuronId(i), + }); + } + } + // Cluster B: neurons 8..16, offset 4 ms from A. + for k in 0..12 { + let t0 = 9.0 + k as f32 * 8.0; + for i in 8..16_u32 { + out.push(Spike { + t_ms: t0 + (rng() % 1000) as f32 * 0.002, + neuron: NeuronId(i), + }); + } + } + // Cross-cluster bridge spikes: neurons 3 ↔ 11 co-fire. + for k in 0..10 { + let t = 6.0 + k as f32 * 10.0; + out.push(Spike { + t_ms: t, + neuron: NeuronId(3), + }); + out.push(Spike { + t_ms: t + 0.3, + neuron: NeuronId(11), + }); + } + // Non-decreasing time order (the engine emits spikes monotonically + // by t_ms; our incremental push assumes it). + out.sort_by(|a, b| a.t_ms.partial_cmp(&b.t_ms).unwrap()); + out +} + +/// Saturated-window fixture used by the microbench. ~21 000 spikes +/// inside a 50 ms sliding window over N=1024 neurons — the same +/// regime as the AC-4 saturated bench after commit 10. +fn fixture_saturated_window(n_neurons: u32, target_len: usize) -> Vec { + // Uniformly distribute spikes in [0, 50] ms across all neurons, + // with enough density to hit `target_len` inside the 50 ms window. + let mut out: Vec = Vec::with_capacity(target_len); + let mut rng = lcg(0xABCD_1234); + for i in 0..target_len { + let t = (i as f32 / target_len as f32) * 50.0; + let neuron = NeuronId(rng() % n_neurons); + out.push(Spike { t_ms: t, neuron }); + } + // Time order is already monotone by construction. + out +} + +// --------------------------------------------------------------------- +// Equivalence tests +// --------------------------------------------------------------------- + +#[test] +fn incremental_adjacency_matches_pair_sweep() { + let stream = fixture_200_spikes(); + // Use a window large enough to retain all spikes — isolates the + // push/expire round-trip from the comparison. (Spec window is + // 50 ms; our stream spans ~100 ms, so a 200 ms window keeps + // everything and still exercises the full push path.) + let (acc, window) = replay(&stream, 200.0); + let active = active_from_window(&window); + let a_incr = acc.snapshot_adjacency(&active); + let a_ref = pair_sweep_adjacency(&active, &window); + assert_eq!(a_incr.len(), a_ref.len()); + // Counts are integers; expected byte-exact. + for (i, (x, y)) in a_incr.iter().zip(a_ref.iter()).enumerate() { + assert_eq!( + x.to_bits(), + y.to_bits(), + "adjacency mismatch at [{i}]: incr={x} ref={y}" + ); + } +} + +#[test] +fn incremental_adjacency_matches_after_window_expiry() { + // Sliding-window case: spikes older than `window_ms` must have + // been correctly decremented out of the accumulator. + let stream = fixture_200_spikes(); + let (acc, window) = replay(&stream, 50.0); // real observer window + let active = active_from_window(&window); + let a_incr = acc.snapshot_adjacency(&active); + let a_ref = pair_sweep_adjacency(&active, &window); + for (i, (x, y)) in a_incr.iter().zip(a_ref.iter()).enumerate() { + assert_eq!( + x.to_bits(), + y.to_bits(), + "sliding-window adjacency mismatch at [{i}]: incr={x} ref={y}" + ); + } +} + +#[test] +fn incremental_fiedler_matches_pair_sweep_within_1e_4() { + // End-to-end: run `approx_fiedler_power` on both adjacencies and + // assert the Fiedler value agrees within 1e-4 relative error (the + // task's acceptance threshold). + let stream = fixture_200_spikes(); + let (acc, window) = replay(&stream, 200.0); + let active = active_from_window(&window); + let n = active.len(); + assert!(n >= 2, "fixture must produce at least 2 active neurons"); + let a_incr = acc.snapshot_adjacency(&active); + let a_ref = pair_sweep_adjacency(&active, &window); + let fiedler_incr = approx_fiedler_power(&a_incr, n); + let fiedler_ref = approx_fiedler_power(&a_ref, n); + // Since a_incr == a_ref bit-for-bit, the eigensolver must produce + // identical outputs. We still assert within 1e-4 relative so the + // test documents the spec. + let denom = fiedler_ref.abs().max(1e-6); + let rel = (fiedler_incr - fiedler_ref).abs() / denom; + assert!( + rel < 1e-4, + "fiedler disagreement: incr={fiedler_incr} ref={fiedler_ref} rel={rel}" + ); +} + +#[test] +fn incremental_accumulator_is_bit_deterministic() { + // AC-1 analogue: repeated construction from the same spike stream + // produces the same adjacency byte-for-byte. BTreeMap iteration + // order must not depend on allocator state or hash randomisation. + let stream = fixture_200_spikes(); + let (acc_a, win_a) = replay(&stream, 50.0); + let (acc_b, win_b) = replay(&stream, 50.0); + let active_a = active_from_window(&win_a); + let active_b = active_from_window(&win_b); + assert_eq!(active_a, active_b); + let aa = acc_a.snapshot_adjacency(&active_a); + let ab = acc_b.snapshot_adjacency(&active_b); + assert_eq!(aa.len(), ab.len()); + for (i, (x, y)) in aa.iter().zip(ab.iter()).enumerate() { + assert_eq!(x.to_bits(), y.to_bits(), "determinism drift at [{i}]"); + } + // Sparse-snapshot triples also stable. + let sa: Vec<_> = acc_a.snapshot_sparse().collect(); + let sb: Vec<_> = acc_b.snapshot_sparse().collect(); + assert_eq!(sa, sb); +} + +#[test] +fn push_expire_round_trip_is_empty() { + // After every spike in the stream has entered and then exited the + // window, the accumulator must be empty (no leaked edges). + let stream = fixture_200_spikes(); + // 0 ms window → every spike expires immediately after the next + // push moves the cutoff past it. But we need the window to retain + // at least the newly-pushed spike; use a vanishingly-small window. + // Instead: replay normally at 50 ms, then drain all remaining + // spikes with synthetic "ages" well past the window. + let (mut acc, mut window) = replay(&stream, 50.0); + // Drain: pop every remaining spike from the front, calling expire. + while let Some(q) = window.pop_front() { + acc.expire(q, window.iter()); + } + assert_eq!( + acc.edge_count(), + 0, + "push/expire left {} leaked edges", + acc.edge_count() + ); +} + +// --------------------------------------------------------------------- +// Microbench — not a correctness test, but fails if the incremental +// path is slower than the pair-sweep path. +// --------------------------------------------------------------------- + +/// Run the pair-sweep and incremental paths on the same saturated +/// window and print per-detect wallclock. Included in the test suite +/// (not a Criterion bench) because `benches/` would need Criterion +/// wiring in `Cargo.toml` and the spec excludes Cargo.toml edits. +#[test] +fn per_detect_microbench() { + let n_neurons: u32 = 1024; + // Target ~21 000 spikes inside a 50 ms window — the saturated + // regime cited in the task. A straight 50 ms fixture runs the + // accumulator through ~21k pushes; for the pair-sweep baseline + // we just time one `compute_fiedler`-equivalent sweep over the + // populated window. + let stream = fixture_saturated_window(n_neurons, 21_000); + let (acc, window) = replay(&stream, 50.0); + let active = active_from_window(&window); + let n = active.len(); + println!( + "microbench: saturated window = {} spikes, active = {} neurons, \ + accumulator edges = {}", + window.len(), + n, + acc.edge_count() + ); + + // --- Pair-sweep baseline: allocate adjacency + walk every pair. --- + // Warmup + median-of-5. The inner `compute_fiedler` in + // `Observer::compute_fiedler` also runs the eigensolver; we time + // only the adjacency build (the work the accumulator replaces), + // so the measured ratio is the adjacency-build speedup alone. + // The eigensolver cost is identical on both paths. + let runs = 5; + let mut sweep_ns = Vec::with_capacity(runs); + for _ in 0..runs { + let t0 = Instant::now(); + let _ = pair_sweep_adjacency(&active, &window); + sweep_ns.push(t0.elapsed().as_nanos()); + } + let mut incr_ns = Vec::with_capacity(runs); + for _ in 0..runs { + let t0 = Instant::now(); + let _ = acc.snapshot_adjacency(&active); + incr_ns.push(t0.elapsed().as_nanos()); + } + sweep_ns.sort(); + incr_ns.sort(); + let sweep_med = sweep_ns[runs / 2] as f64; + let incr_med = incr_ns[runs / 2] as f64; + let speedup = sweep_med / incr_med.max(1.0); + println!( + "per-detect adjacency build (median of {runs}):\n\ + \tpair-sweep = {:.3} ms\n\ + \tincremental = {:.3} ms\n\ + \tspeedup = {:.2}x", + sweep_med / 1.0e6, + incr_med / 1.0e6, + speedup, + ); + // Regression guard: the incremental path must not be slower. We + // do not assert >= 5× here — that is a target, not a hard bound. + // Environment noise (CI vs local) can shift the ratio; the printed + // number is the source of truth for the commit message. + assert!( + speedup > 1.0, + "incremental path regressed vs pair-sweep: speedup={speedup:.2}x" + ); +}