fix(server): resolve adversarial review findings C1-C5, H1-H3, H5, M1-M2

Critical fixes:
- C1: FieldModel created with n_links=1 (single_link_config) so
  feed_calibration/extract_perturbation no longer get DimensionMismatch
- C2: variance_explained now uses centered covariance trace (E[x²]-E[x]²)
  matching mode_energies normalization
- C3: MP ratio uses total_obs = frames * links for consistent threshold
  between calibration and runtime
- C4: Noise estimator filters to positive eigenvalues only, preventing
  collapse to ~0 on rank-deficient matrices (p > n)
- C5: ESP32 paths gate total_persons on presence — empty room reports 0

High fixes:
- H1: Bounding box computed from observed keypoints only (confidence > 0),
  preventing collapse from centroid-filled unobserved slots
- H2: fuse_or_fallback returns Option<usize> instead of sentinel 0,
  eliminating type ambiguity between "fusion succeeded" and "zero people"
- H3: Monotonic epoch-relative timestamps replace wall-clock/Instant mixing,
  preventing spurious TimestampMismatch on NTP steps
- H5: ndarray-linalg gated behind "eigenvalue" feature flag (default=on),
  diagonal fallback used with --no-default-features

Moderate fixes:
- M1: calibration_start guards against replacing Fresh calibration
- M2: parse_node_positions logs warning for malformed entries

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
rUv 2026-03-31 18:50:00 +00:00
parent 74e0ebbd41
commit a23bd2ec01
6 changed files with 180 additions and 92 deletions

View file

@ -7,7 +7,7 @@
//! score-based heuristic in `score_to_person_count`.
use std::collections::VecDeque;
use wifi_densepose_signal::ruvsense::field_model::{CalibrationStatus, FieldModel};
use wifi_densepose_signal::ruvsense::field_model::{CalibrationStatus, FieldModel, FieldModelConfig};
use super::score_to_person_count;
@ -19,12 +19,21 @@ const ENERGY_THRESH_2: f64 = 12.0;
/// Perturbation energy threshold for detecting a third person.
const ENERGY_THRESH_3: f64 = 25.0;
/// Create a FieldModelConfig for single-link mode (one ESP32 node = one link).
/// This avoids the DimensionMismatch error when feeding single-frame observations.
pub fn single_link_config() -> FieldModelConfig {
FieldModelConfig {
n_links: 1,
..FieldModelConfig::default()
}
}
/// Estimate occupancy using the FieldModel when calibrated, falling back
/// to the score-based heuristic otherwise.
///
/// When the field model is `Fresh` or `Stale`, we extract body perturbation
/// from the most recent frames and map total energy to a person count.
/// On any error or when uncalibrated, we fall through to `score_to_person_count`.
/// Prefers `estimate_occupancy()` (eigenvalue-based) when the model is
/// calibrated and enough frames are available. Falls back to perturbation
/// energy thresholds, then to the score heuristic.
pub fn occupancy_or_fallback(
field: &FieldModel,
frame_history: &VecDeque<Vec<f64>>,
@ -44,9 +53,14 @@ pub fn occupancy_or_fallback(
return score_to_person_count(smoothed_score, prev_count);
}
// Use the most recent frame as the observation for perturbation
// extraction. The FieldModel expects [n_links][n_subcarriers],
// so we wrap the single frame as a single-link observation.
// Try eigenvalue-based occupancy first (best accuracy).
match field.estimate_occupancy(&frames) {
Ok(count) => return count,
Err(_) => {} // fall through to perturbation energy
}
// Fallback: perturbation energy thresholds.
// FieldModel expects [n_links][n_subcarriers] — we use n_links=1.
let observation = vec![frames[0].clone()];
match field.extract_perturbation(&observation) {
Ok(perturbation) => {
@ -54,14 +68,13 @@ pub fn occupancy_or_fallback(
3
} else if perturbation.total_energy > ENERGY_THRESH_2 {
2
} else {
} else if perturbation.total_energy > 1.0 {
1
} else {
0
}
}
Err(e) => {
tracing::warn!("FieldModel perturbation failed, using fallback: {e}");
score_to_person_count(smoothed_score, prev_count)
}
Err(_) => score_to_person_count(smoothed_score, prev_count),
}
}
_ => score_to_person_count(smoothed_score, prev_count),
@ -71,15 +84,16 @@ pub fn occupancy_or_fallback(
/// Feed the latest frame to the FieldModel during calibration collection.
///
/// Only acts when the model status is `Collecting`. Wraps the latest frame
/// as a single-link observation and feeds it; errors are logged and ignored.
/// as a single-link observation (n_links=1) and feeds it.
pub fn maybe_feed_calibration(field: &mut FieldModel, frame_history: &VecDeque<Vec<f64>>) {
if field.status() != CalibrationStatus::Collecting {
return;
}
if let Some(latest) = frame_history.back() {
// Single-link observation: [1][n_subcarriers]
let observations = vec![latest.clone()];
if let Err(e) = field.feed_calibration(&observations) {
tracing::warn!("FieldModel calibration feed error: {e}");
tracing::debug!("FieldModel calibration feed: {e}");
}
}
}
@ -87,22 +101,27 @@ pub fn maybe_feed_calibration(field: &mut FieldModel, frame_history: &VecDeque<V
/// Parse node positions from a semicolon-delimited string.
///
/// Format: `"x,y,z;x,y,z;..."` where each coordinate is an `f32`.
/// Entries that fail to parse are silently skipped.
/// Malformed entries are skipped with a warning log.
pub fn parse_node_positions(input: &str) -> Vec<[f32; 3]> {
if input.is_empty() {
return Vec::new();
}
input
.split(';')
.filter_map(|triplet| {
.enumerate()
.filter_map(|(idx, triplet)| {
let parts: Vec<&str> = triplet.split(',').collect();
if parts.len() != 3 {
tracing::warn!("Skipping malformed node position entry {idx}: '{triplet}' (expected x,y,z)");
return None;
}
let x = parts[0].parse::<f32>().ok()?;
let y = parts[1].parse::<f32>().ok()?;
let z = parts[2].parse::<f32>().ok()?;
Some([x, y, z])
match (parts[0].parse::<f32>(), parts[1].parse::<f32>(), parts[2].parse::<f32>()) {
(Ok(x), Ok(y), Ok(z)) => Some([x, y, z]),
_ => {
tracing::warn!("Skipping unparseable node position entry {idx}: '{triplet}'");
None
}
}
})
.collect()
}

View file

@ -58,7 +58,7 @@ use wifi_densepose_wifiscan::parse_netsh_output as parse_netsh_bssid_output;
// Accuracy sprint: Kalman tracker, multistatic fusion, field model
use wifi_densepose_signal::ruvsense::pose_tracker::PoseTracker;
use wifi_densepose_signal::ruvsense::multistatic::{MultistaticFuser, MultistaticConfig};
use wifi_densepose_signal::ruvsense::field_model::{FieldModel, FieldModelConfig, CalibrationStatus};
use wifi_densepose_signal::ruvsense::field_model::{FieldModel, CalibrationStatus};
// ── CLI ──────────────────────────────────────────────────────────────────────
@ -2844,17 +2844,26 @@ async fn adaptive_unload(State(state): State<SharedState>) -> Json<serde_json::V
async fn calibration_start(State(state): State<SharedState>) -> Json<serde_json::Value> {
let mut s = state.write().await;
// Guard: don't discard an in-progress calibration
// Guard: don't discard an in-progress or fresh calibration
if let Some(ref fm) = s.field_model {
if fm.status() == CalibrationStatus::Collecting {
return Json(serde_json::json!({
"success": false,
"error": "Calibration already in progress. Call /calibration/stop first.",
"frame_count": fm.calibration_frame_count(),
}));
match fm.status() {
CalibrationStatus::Collecting => {
return Json(serde_json::json!({
"success": false,
"error": "Calibration already in progress. Call /calibration/stop first.",
"frame_count": fm.calibration_frame_count(),
}));
}
CalibrationStatus::Fresh => {
return Json(serde_json::json!({
"success": false,
"error": "A fresh calibration already exists. Call /calibration/stop or wait for expiry.",
}));
}
_ => {} // Stale/Expired/Uncalibrated — ok to recalibrate
}
}
match FieldModel::new(FieldModelConfig::default()) {
match FieldModel::new(field_bridge::single_link_config()) {
Ok(fm) => {
s.field_model = Some(fm);
Json(serde_json::json!({
@ -3178,9 +3187,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
else if vitals.presence { 0.3 }
else { 0.05 };
// Aggregate person count: attention-weighted fusion or max-per-node fallback.
// Aggregate person count: gate on presence first (matching WiFi path).
let now = std::time::Instant::now();
let total_persons = {
let total_persons = if vitals.presence {
let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback(
&s.multistatic_fuser, &s.node_states,
);
@ -3190,10 +3199,13 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
s.smoothed_person_score = s.smoothed_person_score * 0.90 + score * 0.10;
let count = s.person_count();
s.prev_person_count = count;
count
count.max(1) // presence=true => at least 1
}
None => fallback_count,
None => fallback_count.unwrap_or(0).max(1),
}
} else {
s.prev_person_count = 0;
0
};
// Feed field model calibration if active (use per-node history for ESP32).
@ -3398,9 +3410,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
else if classification.motion_level == "present_still" { 0.3 }
else { 0.05 };
// Aggregate person count: attention-weighted fusion or naive sum fallback.
// Aggregate person count: gate on presence first (matching WiFi path).
let now = std::time::Instant::now();
let total_persons = {
let total_persons = if classification.presence {
let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback(
&s.multistatic_fuser, &s.node_states,
);
@ -3410,10 +3422,13 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
s.smoothed_person_score = s.smoothed_person_score * 0.90 + score * 0.10;
let count = s.person_count();
s.prev_person_count = count;
count
count.max(1)
}
None => fallback_count,
None => fallback_count.unwrap_or(0).max(1),
}
} else {
s.prev_person_count = 0;
0
};
// Feed field model calibration if active (use per-node history for ESP32).
@ -4239,7 +4254,7 @@ async fn main() {
},
field_model: if args.calibrate {
info!("Field model calibration enabled — room should be empty during startup");
FieldModel::new(FieldModelConfig::default()).ok()
FieldModel::new(field_bridge::single_link_config()).ok()
} else {
None
},

View file

@ -7,6 +7,7 @@
//! (e.g. insufficient nodes or timestamp spread).
use std::collections::HashMap;
use std::sync::LazyLock;
use std::time::{Duration, Instant};
use wifi_densepose_signal::hardware_norm::{CanonicalCsiFrame, HardwareType};
@ -21,6 +22,10 @@ const STALE_THRESHOLD: Duration = Duration::from_secs(10);
/// Default WiFi channel frequency (MHz) used for single-channel frames.
const DEFAULT_FREQ_MHZ: u32 = 2437; // Channel 6
/// Monotonic reference point for timestamp generation. All node timestamps
/// are relative to this instant, avoiding wall-clock/monotonic mixing issues.
static EPOCH: LazyLock<Instant> = LazyLock::new(Instant::now);
/// Convert a single `NodeState` into a `MultiBandCsiFrame` suitable for
/// multistatic fusion.
///
@ -37,14 +42,10 @@ pub fn node_frame_from_state(node_id: u8, ns: &NodeState) -> Option<MultiBandCsi
let n_sub = amplitude.len();
let phase = vec![0.0_f32; n_sub];
// Derive a monotonic timestamp: use wall-clock time minus elapsed since
// last frame to approximate when the frame was actually received.
let wall_us = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_micros() as u64)
.unwrap_or(0);
let age_us = last_time.elapsed().as_micros() as u64;
let timestamp_us = wall_us.saturating_sub(age_us);
// Monotonic timestamp: microseconds since a shared process-local epoch.
// All nodes use the same reference so the fuser's guard_interval_us check
// compares apples to apples. No wall-clock mixing (immune to NTP jumps).
let timestamp_us = last_time.duration_since(*EPOCH).as_micros() as u64;
let canonical = CanonicalCsiFrame {
amplitude,
@ -89,23 +90,23 @@ pub fn node_frames_from_states(node_states: &HashMap<u8, NodeState>) -> Vec<Mult
/// Attempt multistatic fusion; fall back to max per-node person count on failure.
///
/// Returns `(fused_frame, fallback_person_count)`. When fusion succeeds, the
/// caller should compute person count from the fused amplitudes (the returned
/// fallback count is 0 as a sentinel). On failure, returns the maximum
/// per-node count (not the sum, to avoid double-counting overlapping coverage).
/// Returns `(fused_frame, fallback_person_count)`. When fusion succeeds,
/// `fallback_person_count` is `None` — the caller must compute count from
/// the fused amplitudes. On failure, returns the maximum per-node count
/// (not the sum, to avoid double-counting overlapping coverage).
pub fn fuse_or_fallback(
fuser: &MultistaticFuser,
node_states: &HashMap<u8, NodeState>,
) -> (Option<FusedSensingFrame>, usize) {
) -> (Option<FusedSensingFrame>, Option<usize>) {
let frames = node_frames_from_states(node_states);
if frames.is_empty() {
return (None, 0);
return (None, Some(0));
}
match fuser.fuse(&frames) {
Ok(fused) => {
// Return 0 as sentinel — caller must compute count from fused amplitudes.
(Some(fused), 0)
// Caller must compute person count from fused amplitudes.
(Some(fused), None)
}
Err(e) => {
tracing::debug!("Multistatic fusion failed ({e}), using per-node max fallback");
@ -120,7 +121,7 @@ pub fn fuse_or_fallback(
.map(|ns| ns.prev_person_count)
.max()
.unwrap_or(0);
(None, max_count)
(None, Some(max_count))
}
}
}
@ -258,6 +259,6 @@ mod tests {
let states: HashMap<u8, NodeState> = HashMap::new();
let (fused, count) = fuse_or_fallback(&fuser, &states);
assert!(fused.is_none());
assert_eq!(count, 0);
assert_eq!(count, Some(0));
}
}

View file

@ -123,23 +123,35 @@ pub fn tracker_to_person_detections(tracker: &PoseTracker) -> Vec<PersonDetectio
})
.collect();
// Compute bounding box from keypoint min/max
// Compute bounding box from observed keypoints only (confidence > 0).
// Unobserved slots (centroid-filled) collapse the bbox over time.
let mut min_x = f64::MAX;
let mut min_y = f64::MAX;
let mut max_x = f64::MIN;
let mut max_y = f64::MIN;
let mut observed = 0;
for kp in &keypoints {
if kp.x < min_x { min_x = kp.x; }
if kp.y < min_y { min_y = kp.y; }
if kp.x > max_x { max_x = kp.x; }
if kp.y > max_y { max_y = kp.y; }
if kp.confidence > 0.0 {
if kp.x < min_x { min_x = kp.x; }
if kp.y < min_y { min_y = kp.y; }
if kp.x > max_x { max_x = kp.x; }
if kp.y > max_y { max_y = kp.y; }
observed += 1;
}
}
let bbox = BoundingBox {
x: min_x,
y: min_y,
width: max_x - min_x,
height: max_y - min_y,
let bbox = if observed > 0 {
BoundingBox {
x: min_x,
y: min_y,
width: (max_x - min_x).max(0.01),
height: (max_y - min_y).max(0.01),
}
} else {
// No observed keypoints — use a default bbox at centroid
let cx = keypoints.iter().map(|k| k.x).sum::<f64>() / keypoints.len() as f64;
let cy = keypoints.iter().map(|k| k.y).sum::<f64>() / keypoints.len() as f64;
BoundingBox { x: cx - 0.3, y: cy - 0.5, width: 0.6, height: 1.0 }
};
PersonDetection {

View file

@ -11,6 +11,12 @@ keywords = ["wifi", "csi", "signal-processing", "densepose", "rust"]
categories = ["science", "computer-vision"]
readme = "README.md"
[features]
default = ["eigenvalue"]
## Enable eigenvalue-based person counting (requires BLAS via ndarray-linalg).
## Disable with --no-default-features to use the diagonal fallback instead.
eigenvalue = ["ndarray-linalg"]
[dependencies]
# Core utilities
thiserror.workspace = true
@ -20,7 +26,7 @@ chrono = { version = "0.4", features = ["serde"] }
# Signal processing
ndarray = { workspace = true }
ndarray-linalg = { workspace = true }
ndarray-linalg = { workspace = true, optional = true }
rustfft.workspace = true
num-complex.workspace = true
num-traits.workspace = true

View file

@ -18,7 +18,9 @@
//! - ADR-030: RuvSense Persistent Field Model
use ndarray::Array2;
#[cfg(feature = "eigenvalue")]
use ndarray_linalg::Eigh;
#[cfg(feature = "eigenvalue")]
use ndarray_linalg::UPLO;
// ---------------------------------------------------------------------------
@ -525,7 +527,8 @@ impl FieldModel {
let bessel = total_obs / (total_obs - 1.0);
covariance *= bessel;
// Symmetric eigendecomposition
// Symmetric eigendecomposition (requires eigenvalue feature / BLAS)
#[cfg(feature = "eigenvalue")]
match covariance.eigh(UPLO::Upper) {
Ok((eigenvalues, eigenvectors)) => {
// eigenvalues are in ascending order from ndarray-linalg
@ -550,21 +553,25 @@ impl FieldModel {
.map(|&idx| eigenvalues[idx].max(0.0))
.collect();
// Marcenko-Pastur threshold for baseline eigenvalue count.
// Use median of bottom half as robust noise estimate
// (consistent with estimate_occupancy).
// Marcenko-Pastur noise estimate: median of POSITIVE
// eigenvalues in the bottom half. Excludes zeros from
// rank-deficient matrices (when p > n).
let noise_var = {
let mut sorted_eigs: Vec<f64> = eigenvalues
.iter().copied().map(|e| e.max(0.0)).collect();
sorted_eigs.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let half = sorted_eigs.len() / 2;
if half > 0 {
sorted_eigs[..half].iter().sum::<f64>() / half as f64
let mut positive: Vec<f64> = eigenvalues
.iter().copied().filter(|&e| e > 1e-10).collect();
positive.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
if positive.len() >= 4 {
let half = positive.len() / 2;
positive[..half].iter().sum::<f64>() / half as f64
} else if !positive.is_empty() {
positive[0]
} else {
sorted_eigs.iter().sum::<f64>() / sorted_eigs.len().max(1) as f64
1e-10
}
};
let ratio = n_sc as f64 / self.covariance_count as f64;
// MP ratio: p/n where n = total observations (frames * links)
let total_obs_mp = self.covariance_count as f64 * self.config.n_links as f64;
let ratio = n_sc as f64 / total_obs_mp;
let mp_threshold = noise_var * (1.0 + ratio.sqrt()).powi(2);
let baseline_count = eigenvalues
.iter()
@ -578,6 +585,9 @@ impl FieldModel {
diagonal_fallback(&self.link_stats, n_sc, n_modes)
}
}
// When eigenvalue feature is disabled, use diagonal fallback
#[cfg(not(feature = "eigenvalue"))]
{ diagonal_fallback(&self.link_stats, n_sc, n_modes) }
} else {
diagonal_fallback(&self.link_stats, n_sc, n_modes)
}
@ -585,14 +595,23 @@ impl FieldModel {
diagonal_fallback(&self.link_stats, n_sc, n_modes)
};
// Compute variance explained
// Compute variance explained using the same centered covariance as modes.
// total_variance = trace(centered_covariance) = sum of ALL eigenvalues.
let total_energy: f64 = mode_energies.iter().sum();
// For variance_explained, we need total variance across all subcarriers.
// Use the sum of all eigenvalues (== trace of covariance == total variance).
let total_variance = if let Some(ref cov_sum) = self.covariance_sum {
if self.covariance_count > 1 {
let scale = 1.0 / (self.covariance_count as f64 - 1.0);
(0..n_sc).map(|i| (cov_sum[[i, i]] * scale).max(0.0)).sum::<f64>()
let n_links_f = self.config.n_links as f64;
let total_obs = self.covariance_count as f64 * n_links_f;
// Centered trace: E[x^2] - E[x]^2, with Bessel correction
let mut avg_mean = vec![0.0f64; n_sc];
for ls in &self.link_stats {
let m = ls.mean_vector();
for i in 0..n_sc { avg_mean[i] += m[i]; }
}
for i in 0..n_sc { avg_mean[i] /= n_links_f; }
let raw_trace: f64 = (0..n_sc).map(|i| cov_sum[[i, i]] / total_obs).sum();
let mean_sq: f64 = avg_mean.iter().map(|m| m * m).sum();
(raw_trace - mean_sq).max(0.0) * total_obs / (total_obs - 1.0)
} else {
total_energy
}
@ -699,6 +718,10 @@ impl FieldModel {
///
/// `recent_frames`: sliding window of amplitude vectors (recommend 50 frames
/// ~ 2.5s at 20 Hz). Returns estimated person count (0 = empty room).
///
/// Requires the `eigenvalue` feature (BLAS). Returns `NotCalibrated` when
/// the feature is disabled.
#[cfg(feature = "eigenvalue")]
pub fn estimate_occupancy(&self, recent_frames: &[Vec<f64>]) -> Result<usize, FieldModelError> {
let modes = self.modes.as_ref().ok_or(FieldModelError::NotCalibrated)?;
@ -752,16 +775,22 @@ impl FieldModel {
Err(_) => return Ok(0), // SVD failure = can't estimate
};
// Marcenko-Pastur noise threshold
// Marcenko-Pastur noise estimate: median of POSITIVE eigenvalues
// in the bottom half. Excludes zeros from rank-deficient matrices
// (common when n_subcarriers > n_frames, e.g. 56 subcarriers / 50 frames).
let noise_var = {
let mut sorted: Vec<f64> = eigenvalues.iter().copied().collect();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
// Median of bottom half as robust noise estimate
let half = sorted.len() / 2;
if half > 0 {
sorted[..half].iter().sum::<f64>() / half as f64
let mut positive: Vec<f64> = eigenvalues.iter()
.copied()
.filter(|&e| e > 1e-10)
.collect();
positive.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
if positive.len() >= 4 {
let half = positive.len() / 2;
positive[..half].iter().sum::<f64>() / half as f64
} else if !positive.is_empty() {
positive[0]
} else {
1.0
return Ok(0); // All zero eigenvalues — can't estimate
}
};
let ratio = n as f64 / count as f64;
@ -773,6 +802,12 @@ impl FieldModel {
Ok(occupancy.min(10)) // Cap at 10 persons
}
/// Stub when eigenvalue feature is disabled — always returns NotCalibrated.
#[cfg(not(feature = "eigenvalue"))]
pub fn estimate_occupancy(&self, _recent_frames: &[Vec<f64>]) -> Result<usize, FieldModelError> {
Err(FieldModelError::NotCalibrated)
}
/// Check calibration freshness against a given timestamp.
pub fn check_freshness(&self, current_us: u64) -> CalibrationStatus {
if self.modes.is_none() {