feat(server): accuracy sprint 001 — Kalman tracker, multi-node fusion, eigenvalue counting

Original work by @taylorjdawson (PR #341). Merged with v0.5.5 firmware
preserved (ADR-069 feature vectors, ADR-073 channel hopping, batch-limited
watchdog from #266 fix).

New server features:
- Kalman tracker bridge for temporal smoothing
- Multi-node CSI fusion with field model
- Eigenvalue-based person counting
- Calibration endpoints (start/stop/status)
- Node positions parsing
- Adaptive classifier enhancements

Co-Authored-By: taylorjdawson <taylor@users.noreply.github.com>
Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-04-03 08:59:17 -04:00
commit b7650b5243
16 changed files with 152671 additions and 176 deletions

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,15 @@
{
"id": "pretrain-1775182186",
"name": "pretrain-1775182186",
"label": "mixed-activity",
"started_at": "2026-04-03T02:09:46Z",
"ended_at": "2026-04-03T02:11:46Z",
"duration_secs": 120,
"frame_count": 5783,
"file_size_bytes": 2580539,
"file_path": "data/recordings\\pretrain-1775182186.csi.jsonl",
"nodes": {
"2": 2886,
"1": 2897
}
}

View file

@ -0,0 +1,9 @@
@echo off
echo STARTING > C:\Users\ruv\idf_test.txt
set IDF_PATH=C:\Users\ruv\esp\v5.4\esp-idf
set PATH=C:\Espressif\tools\python\v5.4\venv\Scripts;C:\Espressif\tools\xtensa-esp-elf\esp-14.2.0_20241119\xtensa-esp-elf\bin;C:\Espressif\tools\cmake\3.30.2\bin;C:\Espressif\tools\ninja\1.12.1;C:\Espressif\tools\idf-exe\1.0.3;%PATH%
echo PATH_SET >> C:\Users\ruv\idf_test.txt
cd /d C:\Users\ruv\Projects\wifi-densepose\firmware\esp32-csi-node
echo CD_DONE >> C:\Users\ruv\idf_test.txt
python %IDF_PATH%\tools\idf.py build >> C:\Users\ruv\idf_test.txt 2>&1
echo RC=%ERRORLEVEL% >> C:\Users\ruv\idf_test.txt

View file

@ -7769,6 +7769,7 @@ dependencies = [
"chrono",
"clap",
"futures-util",
"ruvector-mincut",
"serde",
"serde_json",
"tempfile",

View file

@ -43,8 +43,8 @@ clap = { workspace = true }
# Multi-BSSID WiFi scanning pipeline (ADR-022 Phase 3)
wifi-densepose-wifiscan = { version = "0.3.0", path = "../wifi-densepose-wifiscan" }
# RuVector graph min-cut for person separation (ADR-068)
ruvector-mincut = { workspace = true }
# Signal processing with RuvSense pose tracker (accuracy sprint)
wifi-densepose-signal = { version = "0.3.0", path = "../wifi-densepose-signal" }
[dev-dependencies]
tempfile = "3.10"

View file

@ -10,6 +10,10 @@
//!
//! The trained model is serialised as JSON and hot-loaded at runtime so that
//! the classification thresholds adapt to the specific room and ESP32 placement.
//!
//! Classes are discovered dynamically from training data filenames instead of
//! being hardcoded, so new activity classes can be added just by recording data
//! with the appropriate filename convention.
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@ -20,9 +24,8 @@ use std::path::{Path, PathBuf};
/// Extended feature vector: 7 server features + 8 subcarrier-derived features = 15.
const N_FEATURES: usize = 15;
/// Activity classes we recognise.
pub const CLASSES: &[&str] = &["absent", "present_still", "present_moving", "active"];
const N_CLASSES: usize = 4;
/// Default class names for backward compatibility with old saved models.
const DEFAULT_CLASSES: &[&str] = &["absent", "present_still", "present_moving", "active"];
/// Extract extended feature vector from a JSONL frame (features + raw amplitudes).
pub fn features_from_frame(frame: &serde_json::Value) -> [f64; N_FEATURES] {
@ -124,8 +127,9 @@ pub struct ClassStats {
pub struct AdaptiveModel {
/// Per-class feature statistics (centroid + spread).
pub class_stats: Vec<ClassStats>,
/// Logistic regression weights: [N_CLASSES x (N_FEATURES + 1)] (last = bias).
pub weights: Vec<[f64; N_FEATURES + 1]>,
/// Logistic regression weights: [n_classes x (N_FEATURES + 1)] (last = bias).
/// Dynamic: the outer Vec length equals the number of discovered classes.
pub weights: Vec<Vec<f64>>,
/// Global feature normalisation: mean and stddev across all training data.
pub global_mean: [f64; N_FEATURES],
pub global_std: [f64; N_FEATURES],
@ -133,27 +137,38 @@ pub struct AdaptiveModel {
pub trained_frames: usize,
pub training_accuracy: f64,
pub version: u32,
/// Dynamically discovered class names (in index order).
#[serde(default = "default_class_names")]
pub class_names: Vec<String>,
}
/// Backward-compatible fallback for models saved without class_names.
fn default_class_names() -> Vec<String> {
DEFAULT_CLASSES.iter().map(|s| s.to_string()).collect()
}
impl Default for AdaptiveModel {
fn default() -> Self {
let n_classes = DEFAULT_CLASSES.len();
Self {
class_stats: Vec::new(),
weights: vec![[0.0; N_FEATURES + 1]; N_CLASSES],
weights: vec![vec![0.0; N_FEATURES + 1]; n_classes],
global_mean: [0.0; N_FEATURES],
global_std: [1.0; N_FEATURES],
trained_frames: 0,
training_accuracy: 0.0,
version: 1,
class_names: default_class_names(),
}
}
}
impl AdaptiveModel {
/// Classify a raw feature vector. Returns (class_label, confidence).
pub fn classify(&self, raw_features: &[f64; N_FEATURES]) -> (&'static str, f64) {
if self.weights.is_empty() || self.class_stats.is_empty() {
return ("present_still", 0.5);
pub fn classify(&self, raw_features: &[f64; N_FEATURES]) -> (String, f64) {
let n_classes = self.weights.len();
if n_classes == 0 || self.class_stats.is_empty() {
return ("present_still".to_string(), 0.5);
}
// Normalise features.
@ -163,8 +178,8 @@ impl AdaptiveModel {
}
// Compute logits: w·x + b for each class.
let mut logits = [0.0f64; N_CLASSES];
for c in 0..N_CLASSES.min(self.weights.len()) {
let mut logits: Vec<f64> = vec![0.0; n_classes];
for c in 0..n_classes {
let w = &self.weights[c];
let mut z = w[N_FEATURES]; // bias
for i in 0..N_FEATURES {
@ -176,8 +191,8 @@ impl AdaptiveModel {
// Softmax.
let max_logit = logits.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
let exp_sum: f64 = logits.iter().map(|z| (z - max_logit).exp()).sum();
let mut probs = [0.0f64; N_CLASSES];
for c in 0..N_CLASSES {
let mut probs: Vec<f64> = vec![0.0; n_classes];
for c in 0..n_classes {
probs[c] = ((logits[c] - max_logit).exp()) / exp_sum;
}
@ -185,7 +200,11 @@ impl AdaptiveModel {
let (best_c, best_p) = probs.iter().enumerate()
.max_by(|a, b| a.1.partial_cmp(b.1).unwrap())
.unwrap();
let label = if best_c < CLASSES.len() { CLASSES[best_c] } else { "present_still" };
let label = if best_c < self.class_names.len() {
self.class_names[best_c].clone()
} else {
"present_still".to_string()
};
(label, *best_p)
}
@ -228,40 +247,80 @@ fn load_recording(path: &Path, class_idx: usize) -> Vec<Sample> {
}).collect()
}
/// Map a recording filename to a class index.
fn classify_recording_name(name: &str) -> Option<usize> {
/// Map a recording filename to a class name (String).
/// Returns the discovered class name for the file, or None if it cannot be determined.
fn classify_recording_name(name: &str) -> Option<String> {
let lower = name.to_lowercase();
if lower.contains("empty") || lower.contains("absent") { Some(0) }
else if lower.contains("still") || lower.contains("sitting") || lower.contains("standing") { Some(1) }
else if lower.contains("walking") || lower.contains("moving") { Some(2) }
else if lower.contains("active") || lower.contains("exercise") || lower.contains("running") { Some(3) }
else { None }
// Strip "train_" prefix and ".jsonl" suffix, then extract the class label.
// Convention: train_<class>_<description>.jsonl
// The class is the first segment after "train_" that matches a known pattern,
// or the entire middle portion if no pattern matches.
// Check common patterns first for backward compat
if lower.contains("empty") || lower.contains("absent") { return Some("absent".into()); }
if lower.contains("still") || lower.contains("sitting") || lower.contains("standing") { return Some("present_still".into()); }
if lower.contains("walking") || lower.contains("moving") { return Some("present_moving".into()); }
if lower.contains("active") || lower.contains("exercise") || lower.contains("running") { return Some("active".into()); }
// Fallback: extract class from filename structure train_<class>_*.jsonl
let stem = lower.trim_start_matches("train_").trim_end_matches(".jsonl");
let class_name = stem.split('_').next().unwrap_or(stem);
if !class_name.is_empty() {
Some(class_name.to_string())
} else {
None
}
}
/// Train a model from labeled JSONL recordings in a directory.
///
/// Recordings are matched to classes by filename pattern:
/// - `*empty*` / `*absent*` → absent (0)
/// - `*still*` / `*sitting*` → present_still (1)
/// - `*walking*` / `*moving*` → present_moving (2)
/// - `*active*` / `*exercise*`→ active (3)
/// Recordings are matched to classes by filename pattern. Classes are discovered
/// dynamically from the training data filenames:
/// - `*empty*` / `*absent*` → absent
/// - `*still*` / `*sitting*` → present_still
/// - `*walking*` / `*moving*` → present_moving
/// - `*active*` / `*exercise*`→ active
/// - Any other `train_<class>_*.jsonl` → <class>
pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, String> {
// Scan for train_* files.
let mut samples: Vec<Sample> = Vec::new();
let entries = std::fs::read_dir(recordings_dir)
.map_err(|e| format!("Cannot read {}: {}", recordings_dir.display(), e))?;
// First pass: scan filenames to discover all unique class names.
let entries: Vec<_> = std::fs::read_dir(recordings_dir)
.map_err(|e| format!("Cannot read {}: {}", recordings_dir.display(), e))?
.flatten()
.collect();
for entry in entries.flatten() {
let mut class_map: HashMap<String, usize> = HashMap::new();
let mut class_names: Vec<String> = Vec::new();
// Collect (entry, class_name) pairs for files that match.
let mut file_classes: Vec<(PathBuf, String, String)> = Vec::new(); // (path, fname, class_name)
for entry in &entries {
let fname = entry.file_name().to_string_lossy().to_string();
if !fname.starts_with("train_") || !fname.ends_with(".jsonl") {
continue;
}
if let Some(class_idx) = classify_recording_name(&fname) {
let loaded = load_recording(&entry.path(), class_idx);
eprintln!(" Loaded {}: {} frames → class '{}'",
fname, loaded.len(), CLASSES[class_idx]);
samples.extend(loaded);
if let Some(class_name) = classify_recording_name(&fname) {
if !class_map.contains_key(&class_name) {
let idx = class_names.len();
class_map.insert(class_name.clone(), idx);
class_names.push(class_name.clone());
}
file_classes.push((entry.path(), fname, class_name));
}
}
let n_classes = class_names.len();
if n_classes == 0 {
return Err("No training samples found. Record data with train_* prefix.".into());
}
// Second pass: load recordings with the discovered class indices.
let mut samples: Vec<Sample> = Vec::new();
for (path, fname, class_name) in &file_classes {
let class_idx = class_map[class_name];
let loaded = load_recording(path, class_idx);
eprintln!(" Loaded {}: {} frames → class '{}'",
fname, loaded.len(), class_name);
samples.extend(loaded);
}
if samples.is_empty() {
@ -269,7 +328,7 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
}
let n = samples.len();
eprintln!("Total training samples: {n}");
eprintln!("Total training samples: {n} across {n_classes} classes: {:?}", class_names);
// ── Compute global normalisation stats ──
let mut global_mean = [0.0f64; N_FEATURES];
@ -289,9 +348,9 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
}
// ── Compute per-class statistics ──
let mut class_sums = vec![[0.0f64; N_FEATURES]; N_CLASSES];
let mut class_sq = vec![[0.0f64; N_FEATURES]; N_CLASSES];
let mut class_counts = vec![0usize; N_CLASSES];
let mut class_sums = vec![[0.0f64; N_FEATURES]; n_classes];
let mut class_sq = vec![[0.0f64; N_FEATURES]; n_classes];
let mut class_counts = vec![0usize; n_classes];
for s in &samples {
let c = s.class_idx;
class_counts[c] += 1;
@ -302,7 +361,7 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
}
let mut class_stats = Vec::new();
for c in 0..N_CLASSES {
for c in 0..n_classes {
let cnt = class_counts[c].max(1) as f64;
let mut mean = [0.0; N_FEATURES];
let mut stddev = [0.0; N_FEATURES];
@ -311,7 +370,7 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
stddev[i] = ((class_sq[c][i] / cnt) - mean[i] * mean[i]).max(0.0).sqrt();
}
class_stats.push(ClassStats {
label: CLASSES[c].to_string(),
label: class_names[c].clone(),
count: class_counts[c],
mean,
stddev,
@ -328,7 +387,7 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
}).collect();
// ── Train logistic regression via mini-batch SGD ──
let mut weights = vec![[0.0f64; N_FEATURES + 1]; N_CLASSES];
let mut weights: Vec<Vec<f64>> = vec![vec![0.0f64; N_FEATURES + 1]; n_classes];
let lr = 0.1;
let epochs = 200;
let batch_size = 32;
@ -348,19 +407,19 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
}
let mut epoch_loss = 0.0f64;
let mut batch_count = 0;
let mut _batch_count = 0;
for batch_start in (0..norm_samples.len()).step_by(batch_size) {
let batch_end = (batch_start + batch_size).min(norm_samples.len());
let batch = &norm_samples[batch_start..batch_end];
// Accumulate gradients.
let mut grad = vec![[0.0f64; N_FEATURES + 1]; N_CLASSES];
let mut grad: Vec<Vec<f64>> = vec![vec![0.0f64; N_FEATURES + 1]; n_classes];
for (x, target) in batch {
// Forward: softmax.
let mut logits = [0.0f64; N_CLASSES];
for c in 0..N_CLASSES {
let mut logits: Vec<f64> = vec![0.0; n_classes];
for c in 0..n_classes {
logits[c] = weights[c][N_FEATURES]; // bias
for i in 0..N_FEATURES {
logits[c] += weights[c][i] * x[i];
@ -368,8 +427,8 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
}
let max_l = logits.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
let exp_sum: f64 = logits.iter().map(|z| (z - max_l).exp()).sum();
let mut probs = [0.0f64; N_CLASSES];
for c in 0..N_CLASSES {
let mut probs: Vec<f64> = vec![0.0; n_classes];
for c in 0..n_classes {
probs[c] = ((logits[c] - max_l).exp()) / exp_sum;
}
@ -377,7 +436,7 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
epoch_loss += -(probs[*target].max(1e-15)).ln();
// Gradient: prob - one_hot(target).
for c in 0..N_CLASSES {
for c in 0..n_classes {
let delta = probs[c] - if c == *target { 1.0 } else { 0.0 };
for i in 0..N_FEATURES {
grad[c][i] += delta * x[i];
@ -389,12 +448,12 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
// Update weights.
let bs = batch.len() as f64;
let current_lr = lr * (1.0 - epoch as f64 / epochs as f64); // linear decay
for c in 0..N_CLASSES {
for c in 0..n_classes {
for i in 0..=N_FEATURES {
weights[c][i] -= current_lr * grad[c][i] / bs;
}
}
batch_count += 1;
_batch_count += 1;
}
if epoch % 50 == 0 || epoch == epochs - 1 {
@ -406,8 +465,8 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
// ── Evaluate accuracy ──
let mut correct = 0;
for (x, target) in &norm_samples {
let mut logits = [0.0f64; N_CLASSES];
for c in 0..N_CLASSES {
let mut logits: Vec<f64> = vec![0.0; n_classes];
for c in 0..n_classes {
logits[c] = weights[c][N_FEATURES];
for i in 0..N_FEATURES {
logits[c] += weights[c][i] * x[i];
@ -422,12 +481,12 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
eprintln!("Training accuracy: {correct}/{n} = {accuracy:.1}%");
// ── Per-class accuracy ──
let mut class_correct = vec![0usize; N_CLASSES];
let mut class_total = vec![0usize; N_CLASSES];
let mut class_correct = vec![0usize; n_classes];
let mut class_total = vec![0usize; n_classes];
for (x, target) in &norm_samples {
class_total[*target] += 1;
let mut logits = [0.0f64; N_CLASSES];
for c in 0..N_CLASSES {
let mut logits: Vec<f64> = vec![0.0; n_classes];
for c in 0..n_classes {
logits[c] = weights[c][N_FEATURES];
for i in 0..N_FEATURES {
logits[c] += weights[c][i] * x[i];
@ -438,9 +497,9 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
.unwrap().0;
if pred == *target { class_correct[*target] += 1; }
}
for c in 0..N_CLASSES {
for c in 0..n_classes {
let tot = class_total[c].max(1);
eprintln!(" {}: {}/{} ({:.0}%)", CLASSES[c], class_correct[c], tot,
eprintln!(" {}: {}/{} ({:.0}%)", class_names[c], class_correct[c], tot,
class_correct[c] as f64 / tot as f64 * 100.0);
}
@ -452,6 +511,7 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result<AdaptiveModel, Str
trained_frames: n,
training_accuracy: accuracy,
version: 1,
class_names,
})
}

View file

@ -0,0 +1,161 @@
//! Bridge between sensing-server frame data and signal crate FieldModel
//! for eigenvalue-based person counting.
//!
//! The FieldModel decomposes CSI observations into environmental drift and
//! body perturbation via SVD eigenmodes. When calibrated, perturbation energy
//! provides a physics-grounded occupancy estimate that supplements the
//! score-based heuristic in `score_to_person_count`.
use std::collections::VecDeque;
use wifi_densepose_signal::ruvsense::field_model::{CalibrationStatus, FieldModel, FieldModelConfig};
use super::score_to_person_count;
/// Number of recent frames to feed into perturbation extraction.
const OCCUPANCY_WINDOW: usize = 50;
/// Perturbation energy threshold for detecting a second person.
const ENERGY_THRESH_2: f64 = 12.0;
/// Perturbation energy threshold for detecting a third person.
const ENERGY_THRESH_3: f64 = 25.0;
/// Create a FieldModelConfig for single-link mode (one ESP32 node = one link).
/// This avoids the DimensionMismatch error when feeding single-frame observations.
pub fn single_link_config() -> FieldModelConfig {
FieldModelConfig {
n_links: 1,
..FieldModelConfig::default()
}
}
/// Estimate occupancy using the FieldModel when calibrated, falling back
/// to the score-based heuristic otherwise.
///
/// Prefers `estimate_occupancy()` (eigenvalue-based) when the model is
/// calibrated and enough frames are available. Falls back to perturbation
/// energy thresholds, then to the score heuristic.
pub fn occupancy_or_fallback(
field: &FieldModel,
frame_history: &VecDeque<Vec<f64>>,
smoothed_score: f64,
prev_count: usize,
) -> usize {
match field.status() {
CalibrationStatus::Fresh | CalibrationStatus::Stale => {
let frames: Vec<Vec<f64>> = frame_history
.iter()
.rev()
.take(OCCUPANCY_WINDOW)
.cloned()
.collect();
if frames.is_empty() {
return score_to_person_count(smoothed_score, prev_count);
}
// Try eigenvalue-based occupancy first (best accuracy).
match field.estimate_occupancy(&frames) {
Ok(count) => return count,
Err(_) => {} // fall through to perturbation energy
}
// Fallback: perturbation energy thresholds.
// FieldModel expects [n_links][n_subcarriers] — we use n_links=1.
let observation = vec![frames[0].clone()];
match field.extract_perturbation(&observation) {
Ok(perturbation) => {
if perturbation.total_energy > ENERGY_THRESH_3 {
3
} else if perturbation.total_energy > ENERGY_THRESH_2 {
2
} else if perturbation.total_energy > 1.0 {
1
} else {
0
}
}
Err(_) => score_to_person_count(smoothed_score, prev_count),
}
}
_ => score_to_person_count(smoothed_score, prev_count),
}
}
/// Feed the latest frame to the FieldModel during calibration collection.
///
/// Only acts when the model status is `Collecting`. Wraps the latest frame
/// as a single-link observation (n_links=1) and feeds it.
pub fn maybe_feed_calibration(field: &mut FieldModel, frame_history: &VecDeque<Vec<f64>>) {
if field.status() != CalibrationStatus::Collecting {
return;
}
if let Some(latest) = frame_history.back() {
// Single-link observation: [1][n_subcarriers]
let observations = vec![latest.clone()];
if let Err(e) = field.feed_calibration(&observations) {
tracing::debug!("FieldModel calibration feed: {e}");
}
}
}
/// Parse node positions from a semicolon-delimited string.
///
/// Format: `"x,y,z;x,y,z;..."` where each coordinate is an `f32`.
/// Malformed entries are skipped with a warning log.
pub fn parse_node_positions(input: &str) -> Vec<[f32; 3]> {
if input.is_empty() {
return Vec::new();
}
input
.split(';')
.enumerate()
.filter_map(|(idx, triplet)| {
let parts: Vec<&str> = triplet.split(',').collect();
if parts.len() != 3 {
tracing::warn!("Skipping malformed node position entry {idx}: '{triplet}' (expected x,y,z)");
return None;
}
match (parts[0].parse::<f32>(), parts[1].parse::<f32>(), parts[2].parse::<f32>()) {
(Ok(x), Ok(y), Ok(z)) => Some([x, y, z]),
_ => {
tracing::warn!("Skipping unparseable node position entry {idx}: '{triplet}'");
None
}
}
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_node_positions() {
let positions = parse_node_positions("0,0,1.5;3,0,1.5;1.5,3,1.5");
assert_eq!(positions.len(), 3);
assert_eq!(positions[0], [0.0, 0.0, 1.5]);
assert_eq!(positions[1], [3.0, 0.0, 1.5]);
assert_eq!(positions[2], [1.5, 3.0, 1.5]);
}
#[test]
fn test_parse_node_positions_empty() {
let positions = parse_node_positions("");
assert!(positions.is_empty());
}
#[test]
fn test_parse_node_positions_invalid() {
let positions = parse_node_positions("abc;1,2,3");
assert_eq!(positions.len(), 1);
assert_eq!(positions[0], [1.0, 2.0, 3.0]);
}
#[test]
fn test_parse_node_positions_partial_triplet() {
let positions = parse_node_positions("1,2;3,4,5");
assert_eq!(positions.len(), 1);
assert_eq!(positions[0], [3.0, 4.0, 5.0]);
}
}

View file

@ -9,8 +9,11 @@
//! Replaces both ws_server.py and the Python HTTP server.
mod adaptive_classifier;
mod field_bridge;
mod multistatic_bridge;
mod rvf_container;
mod rvf_pipeline;
mod tracker_bridge;
mod vital_signs;
// Training pipeline modules (exposed via lib.rs)
@ -53,6 +56,11 @@ use wifi_densepose_wifiscan::{
};
use wifi_densepose_wifiscan::parse_netsh_output as parse_netsh_bssid_output;
// Accuracy sprint: Kalman tracker, multistatic fusion, field model
use wifi_densepose_signal::ruvsense::pose_tracker::PoseTracker;
use wifi_densepose_signal::ruvsense::multistatic::{MultistaticFuser, MultistaticConfig};
use wifi_densepose_signal::ruvsense::field_model::{FieldModel, CalibrationStatus};
// ── CLI ──────────────────────────────────────────────────────────────────────
#[derive(Parser, Debug)]
@ -145,6 +153,14 @@ struct Args {
/// Build fingerprint index from embeddings (env|activity|temporal|person)
#[arg(long, value_name = "TYPE")]
build_index: Option<String>,
/// Node positions for multistatic fusion (format: "x,y,z;x,y,z;...")
#[arg(long, env = "SENSING_NODE_POSITIONS")]
node_positions: Option<String>,
/// Start field model calibration on boot (empty room required)
#[arg(long)]
calibrate: bool,
}
// ── Data types ───────────────────────────────────────────────────────────────
@ -213,6 +229,9 @@ struct SensingUpdate {
/// Estimated person count from CSI feature heuristics (1-3 for single ESP32).
#[serde(skip_serializing_if = "Option::is_none")]
estimated_persons: Option<usize>,
/// Per-node feature breakdown for multi-node deployments.
#[serde(skip_serializing_if = "Option::is_none")]
node_features: Option<Vec<PerNodeFeatureInfo>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -280,9 +299,9 @@ struct BoundingBox {
/// Each ESP32 node gets its own frame history, smoothing buffers, and vital
/// sign detector so that data from different nodes is never mixed.
struct NodeState {
frame_history: VecDeque<Vec<f64>>,
pub(crate) frame_history: VecDeque<Vec<f64>>,
smoothed_person_score: f64,
prev_person_count: usize,
pub(crate) prev_person_count: usize,
smoothed_motion: f64,
current_motion_level: String,
debounce_counter: u32,
@ -298,7 +317,7 @@ struct NodeState {
rssi_history: VecDeque<f64>,
vital_detector: VitalSignDetector,
latest_vitals: VitalSigns,
last_frame_time: Option<std::time::Instant>,
pub(crate) last_frame_time: Option<std::time::Instant>,
edge_vitals: Option<Esp32VitalsPacket>,
/// Latest extracted features for cross-node fusion.
latest_features: Option<FeatureInfo>,
@ -325,7 +344,7 @@ const MAX_BONE_CHANGE_RATIO: f64 = 0.20;
const COHERENCE_WINDOW: usize = 20;
impl NodeState {
fn new() -> Self {
pub(crate) fn new() -> Self {
Self {
frame_history: VecDeque::new(),
smoothed_person_score: 0.0,
@ -389,6 +408,18 @@ impl NodeState {
}
}
/// Per-node feature info for WebSocket broadcasts (multi-node support).
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PerNodeFeatureInfo {
node_id: u8,
features: FeatureInfo,
classification: ClassificationInfo,
rssi_dbm: f64,
last_seen_ms: u64,
frame_rate_hz: f64,
stale: bool,
}
/// Shared application state
struct AppStateInner {
latest_update: Option<SensingUpdate>,
@ -482,6 +513,15 @@ struct AppStateInner {
/// Per-node sensing state for multi-node deployments.
/// Keyed by `node_id` from the ESP32 frame header.
node_states: HashMap<u8, NodeState>,
// ── Accuracy sprint: Kalman tracker, multistatic fusion, eigenvalue counting ──
/// Global Kalman-based pose tracker for stable person IDs and smoothed keypoints.
pose_tracker: PoseTracker,
/// Instant of last tracker update (for computing dt).
last_tracker_instant: Option<std::time::Instant>,
/// Attention-weighted multi-node CSI fusion engine.
multistatic_fuser: MultistaticFuser,
/// SVD-based room field model for eigenvalue person counting (None until calibration).
field_model: Option<FieldModel>,
}
/// If no ESP32 frame arrives within this duration, source reverts to offline.
@ -491,6 +531,31 @@ impl AppStateInner {
/// Return the effective data source, accounting for ESP32 frame timeout.
/// If the source is "esp32" but no frame has arrived in 5 seconds, returns
/// "esp32:offline" so the UI can distinguish active vs stale connections.
/// Person count: eigenvalue-based if field model is calibrated, else heuristic.
/// Uses global frame_history if populated, otherwise the freshest per-node history.
fn person_count(&self) -> usize {
match self.field_model.as_ref() {
Some(fm) => {
// Prefer global frame_history (populated by wifi/simulate paths).
// Fall back to freshest per-node history (populated by ESP32 paths).
let history = if !self.frame_history.is_empty() {
&self.frame_history
} else {
// Find the node with the most recent frame
self.node_states.values()
.filter(|ns| !ns.frame_history.is_empty())
.max_by_key(|ns| ns.last_frame_time)
.map(|ns| &ns.frame_history)
.unwrap_or(&self.frame_history)
};
field_bridge::occupancy_or_fallback(
fm, history, self.smoothed_person_score, self.prev_person_count,
)
}
None => score_to_person_count(self.smoothed_person_score, self.prev_person_count),
}
}
fn effective_source(&self) -> String {
if self.source == "esp32" {
if let Some(last) = self.last_esp32_frame {
@ -639,12 +704,13 @@ fn parse_esp32_frame(buf: &[u8]) -> Option<Esp32Frame> {
// [20..] I/Q data
let node_id = buf[4];
let n_antennas = buf[5];
let n_subcarriers_u16 = u16::from_le_bytes([buf[6], buf[7]]);
let n_subcarriers = n_subcarriers_u16 as u8; // truncate to u8 for Esp32Frame compat
let freq_mhz = u16::from_le_bytes([buf[8], buf[9]]); // low 16 bits of u32
let sequence = u32::from_le_bytes([buf[12], buf[13], buf[14], buf[15]]);
let rssi = buf[16] as i8; // #332: was buf[14], 2 bytes off
let noise_floor = buf[17] as i8; // #332: was buf[15], 2 bytes off
let n_subcarriers = buf[6];
let freq_mhz = u16::from_le_bytes([buf[8], buf[9]]);
let sequence = u32::from_le_bytes([buf[10], buf[11], buf[12], buf[13]]);
let rssi_raw = buf[14] as i8;
// Fix RSSI sign: ensure it's always negative (dBm convention).
let rssi = if rssi_raw > 0 { rssi_raw.saturating_neg() } else { rssi_raw };
let noise_floor = buf[15] as i8;
let iq_start = 20;
let n_pairs = n_antennas as usize * n_subcarriers as usize;
@ -1546,7 +1612,7 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) {
let raw_score = compute_person_score(&features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
let count = s.person_count();
s.prev_person_count = count;
count
} else {
@ -1583,12 +1649,16 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) {
model_status: None,
persons: None,
estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
node_features: None,
};
// Populate persons from the sensing update.
let persons = derive_pose_from_sensing(&update);
if !persons.is_empty() {
update.persons = Some(persons);
// Populate persons from the sensing update (Kalman-smoothed via tracker).
let raw_persons = derive_pose_from_sensing(&update);
let tracked = tracker_bridge::tracker_update(
&mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons,
);
if !tracked.is_empty() {
update.persons = Some(tracked);
}
if let Ok(json) = serde_json::to_string(&update) {
@ -1679,7 +1749,7 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) {
let raw_score = compute_person_score(&features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
let count = s.person_count();
s.prev_person_count = count;
count
} else {
@ -1716,11 +1786,15 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) {
model_status: None,
persons: None,
estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
node_features: None,
};
let persons = derive_pose_from_sensing(&update);
if !persons.is_empty() {
update.persons = Some(persons);
let raw_persons = derive_pose_from_sensing(&update);
let tracked = tracker_bridge::tracker_update(
&mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons,
);
if !tracked.is_empty() {
update.persons = Some(tracked);
}
if let Ok(json) = serde_json::to_string(&update) {
@ -1897,9 +1971,13 @@ async fn handle_ws_pose_client(mut socket: WebSocket, state: SharedState) {
keypoints,
zone: "zone_1".into(),
}]
}).unwrap_or_else(|| derive_pose_from_sensing(&sensing))
}).unwrap_or_else(|| {
// Prefer tracked persons from broadcast if available
sensing.persons.clone().unwrap_or_else(|| derive_pose_from_sensing(&sensing))
})
} else {
derive_pose_from_sensing(&sensing)
// Prefer tracked persons from broadcast if available
sensing.persons.clone().unwrap_or_else(|| derive_pose_from_sensing(&sensing))
};
let pose_msg = serde_json::json!({
@ -2598,7 +2676,7 @@ async fn api_info(State(state): State<SharedState>) -> Json<serde_json::Value> {
async fn pose_current(State(state): State<SharedState>) -> Json<serde_json::Value> {
let s = state.read().await;
let persons = match &s.latest_update {
Some(update) => derive_pose_from_sensing(update),
Some(update) => update.persons.clone().unwrap_or_else(|| derive_pose_from_sensing(update)),
None => vec![],
};
Json(serde_json::json!({
@ -3149,6 +3227,88 @@ async fn adaptive_unload(State(state): State<SharedState>) -> Json<serde_json::V
Json(serde_json::json!({ "success": true, "message": "Adaptive model unloaded." }))
}
// ── Field model calibration endpoints (eigenvalue person counting) ──────────
async fn calibration_start(State(state): State<SharedState>) -> Json<serde_json::Value> {
let mut s = state.write().await;
// Guard: don't discard an in-progress or fresh calibration
if let Some(ref fm) = s.field_model {
match fm.status() {
CalibrationStatus::Collecting => {
return Json(serde_json::json!({
"success": false,
"error": "Calibration already in progress. Call /calibration/stop first.",
"frame_count": fm.calibration_frame_count(),
}));
}
CalibrationStatus::Fresh => {
return Json(serde_json::json!({
"success": false,
"error": "A fresh calibration already exists. Call /calibration/stop or wait for expiry.",
}));
}
_ => {} // Stale/Expired/Uncalibrated — ok to recalibrate
}
}
match FieldModel::new(field_bridge::single_link_config()) {
Ok(fm) => {
s.field_model = Some(fm);
Json(serde_json::json!({
"success": true,
"message": "Calibration started — keep room empty while frames accumulate.",
}))
}
Err(e) => Json(serde_json::json!({
"success": false,
"error": format!("{e}"),
})),
}
}
async fn calibration_stop(State(state): State<SharedState>) -> Json<serde_json::Value> {
let mut s = state.write().await;
if let Some(ref mut fm) = s.field_model {
let ts = chrono::Utc::now().timestamp_micros() as u64;
match fm.finalize_calibration(ts, 0) {
Ok(modes) => {
let baseline = modes.baseline_eigenvalue_count;
let variance_explained = modes.variance_explained;
info!("Field model calibrated: baseline_eigenvalues={baseline}, variance_explained={variance_explained:.2}");
Json(serde_json::json!({
"success": true,
"baseline_eigenvalue_count": baseline,
"variance_explained": variance_explained,
"frame_count": fm.calibration_frame_count(),
}))
}
Err(e) => Json(serde_json::json!({
"success": false,
"error": format!("{e}"),
})),
}
} else {
Json(serde_json::json!({
"success": false,
"error": "No field model active — call /calibration/start first.",
}))
}
}
async fn calibration_status(State(state): State<SharedState>) -> Json<serde_json::Value> {
let s = state.read().await;
match s.field_model.as_ref() {
Some(fm) => Json(serde_json::json!({
"active": true,
"status": format!("{:?}", fm.status()),
"frame_count": fm.calibration_frame_count(),
})),
None => Json(serde_json::json!({
"active": false,
"status": "none",
})),
}
}
/// Generate a simple timestamp string (epoch seconds) for recording IDs.
fn chrono_timestamp() -> u64 {
std::time::SystemTime::now()
@ -3295,6 +3455,34 @@ async fn sona_activate(
}
}
/// GET /api/v1/nodes — per-node health and feature info.
async fn nodes_endpoint(State(state): State<SharedState>) -> Json<serde_json::Value> {
let s = state.read().await;
let now = std::time::Instant::now();
let nodes: Vec<serde_json::Value> = s.node_states.iter()
.map(|(&id, ns)| {
let elapsed_ms = ns.last_frame_time
.map(|t| now.duration_since(t).as_millis() as u64)
.unwrap_or(999999);
let stale = elapsed_ms > 5000;
let status = if stale { "stale" } else { "active" };
let rssi = ns.rssi_history.back().copied().unwrap_or(-90.0);
serde_json::json!({
"node_id": id,
"status": status,
"last_seen_ms": elapsed_ms,
"rssi_dbm": rssi,
"motion_level": &ns.current_motion_level,
"person_count": ns.prev_person_count,
})
})
.collect();
Json(serde_json::json!({
"nodes": nodes,
"total": nodes.len(),
}))
}
async fn info_page() -> Html<String> {
Html(format!(
"<html><body>\
@ -3386,15 +3574,33 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
else if vitals.presence { 0.3 }
else { 0.05 };
// Aggregate person count across all active nodes.
// Use max (not sum) because nodes in the same room see the
// same people — summing would double-count.
// Aggregate person count: gate on presence first (matching WiFi path).
let now = std::time::Instant::now();
let total_persons: usize = s.node_states.values()
.filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
.map(|n| n.prev_person_count)
.max()
.unwrap_or(0);
let total_persons = if vitals.presence {
let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback(
&s.multistatic_fuser, &s.node_states,
);
match fused {
Some(ref f) => {
let score = multistatic_bridge::compute_person_score_from_amplitudes(&f.fused_amplitude);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + score * 0.10;
let count = s.person_count();
s.prev_person_count = count;
count.max(1) // presence=true => at least 1
}
None => fallback_count.unwrap_or(0).max(1),
}
} else {
s.prev_person_count = 0;
0
};
// Feed field model calibration if active (use per-node history for ESP32).
if let Some(ref mut fm) = s.field_model {
if let Some(ns) = s.node_states.get(&node_id) {
field_bridge::maybe_feed_calibration(fm, &ns.frame_history);
}
}
// Build nodes array with all active nodes.
let active_nodes: Vec<NodeInfo> = s.node_states.iter()
@ -3471,17 +3677,15 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
model_status: None,
persons: None,
estimated_persons: if total_persons > 0 { Some(total_persons) } else { None },
node_features: None,
};
let mut persons = derive_pose_from_sensing(&update);
// RuVector Phase 2: temporal smoothing + coherence gating
{
let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new);
ns.update_coherence(vitals.motion_energy as f64);
apply_temporal_smoothing(&mut persons, ns);
}
if !persons.is_empty() {
update.persons = Some(persons);
let raw_persons = derive_pose_from_sensing(&update);
let tracked = tracker_bridge::tracker_update(
&mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons,
);
if !tracked.is_empty() {
update.persons = Some(tracked);
}
if let Ok(json) = serde_json::to_string(&update) {
@ -3618,23 +3822,32 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
else if classification.motion_level == "present_still" { 0.3 }
else { 0.05 };
// Aggregate person count across all active nodes.
// Use max (not sum) because nodes in the same room see the
// same people — summing would double-count.
// Aggregate person count: gate on presence first (matching WiFi path).
let now = std::time::Instant::now();
let total_persons: usize = s.node_states.values()
.filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
.map(|n| n.prev_person_count)
.max()
.unwrap_or(0);
let total_persons = if classification.presence {
let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback(
&s.multistatic_fuser, &s.node_states,
);
match fused {
Some(ref f) => {
let score = multistatic_bridge::compute_person_score_from_amplitudes(&f.fused_amplitude);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + score * 0.10;
let count = s.person_count();
s.prev_person_count = count;
count.max(1)
}
None => fallback_count.unwrap_or(0).max(1),
}
} else {
s.prev_person_count = 0;
0
};
// Boost classification confidence with multi-node coverage.
let n_active = s.node_states.values()
.filter(|ns| ns.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
.count();
if n_active > 1 {
classification.confidence = (classification.confidence
* (1.0 + 0.15 * (n_active as f64 - 1.0))).clamp(0.0, 1.0);
// Feed field model calibration if active (use per-node history for ESP32).
if let Some(ref mut fm) = s.field_model {
if let Some(ns) = s.node_states.get(&node_id) {
field_bridge::maybe_feed_calibration(fm, &ns.frame_history);
}
}
// Build nodes array with all active nodes.
@ -3674,17 +3887,15 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
model_status: None,
persons: None,
estimated_persons: if total_persons > 0 { Some(total_persons) } else { None },
node_features: None,
};
let mut persons = derive_pose_from_sensing(&update);
// RuVector Phase 2: temporal smoothing + coherence gating
{
let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new);
ns.update_coherence(features.motion_band_power);
apply_temporal_smoothing(&mut persons, ns);
}
if !persons.is_empty() {
update.persons = Some(persons);
let raw_persons = derive_pose_from_sensing(&update);
let tracked = tracker_bridge::tracker_update(
&mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons,
);
if !tracked.is_empty() {
update.persons = Some(tracked);
}
if let Ok(json) = serde_json::to_string(&update) {
@ -3764,7 +3975,7 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) {
let raw_score = compute_person_score(&features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
let count = s.person_count();
s.prev_person_count = count;
count
} else {
@ -3811,12 +4022,16 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) {
},
persons: None,
estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
node_features: None,
};
// Populate persons from the sensing update.
let persons = derive_pose_from_sensing(&update);
if !persons.is_empty() {
update.persons = Some(persons);
// Populate persons from the sensing update (Kalman-smoothed via tracker).
let raw_persons = derive_pose_from_sensing(&update);
let tracked = tracker_bridge::tracker_update(
&mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons,
);
if !tracked.is_empty() {
update.persons = Some(tracked);
}
if update.classification.presence {
@ -4445,6 +4660,29 @@ async fn main() {
m
}),
node_states: HashMap::new(),
// Accuracy sprint
pose_tracker: PoseTracker::new(),
last_tracker_instant: None,
multistatic_fuser: {
let mut fuser = MultistaticFuser::with_config(MultistaticConfig {
min_nodes: 1, // single-node passthrough
..Default::default()
});
if let Some(ref pos_str) = args.node_positions {
let positions = field_bridge::parse_node_positions(pos_str);
if !positions.is_empty() {
info!("Configured {} node positions for multistatic fusion", positions.len());
fuser.set_node_positions(positions);
}
}
fuser
},
field_model: if args.calibrate {
info!("Field model calibration enabled — room should be empty during startup");
FieldModel::new(field_bridge::single_link_config()).ok()
} else {
None
},
}));
// Start background tasks based on source
@ -4498,6 +4736,8 @@ async fn main() {
.route("/api/v1/metrics", get(health_metrics))
// Sensing endpoints
.route("/api/v1/sensing/latest", get(latest))
// Per-node health endpoint
.route("/api/v1/nodes", get(nodes_endpoint))
// Vital sign endpoints
.route("/api/v1/vital-signs", get(vital_signs_endpoint))
.route("/api/v1/edge-vitals", get(edge_vitals_endpoint))
@ -4539,6 +4779,10 @@ async fn main() {
.route("/api/v1/adaptive/train", post(adaptive_train))
.route("/api/v1/adaptive/status", get(adaptive_status))
.route("/api/v1/adaptive/unload", post(adaptive_unload))
// Field model calibration (eigenvalue-based person counting)
.route("/api/v1/calibration/start", post(calibration_start))
.route("/api/v1/calibration/stop", post(calibration_stop))
.route("/api/v1/calibration/status", get(calibration_status))
// Static UI files
.nest_service("/ui", ServeDir::new(&ui_path))
.layer(SetResponseHeaderLayer::overriding(

View file

@ -0,0 +1,264 @@
//! Bridge between sensing-server per-node state and the signal crate's
//! `MultistaticFuser` for attention-weighted CSI fusion across ESP32 nodes.
//!
//! This module converts the server's `NodeState` (f64 amplitude history) into
//! `MultiBandCsiFrame`s that the multistatic fusion pipeline expects, then
//! drives `MultistaticFuser::fuse` with a graceful fallback when fusion fails
//! (e.g. insufficient nodes or timestamp spread).
use std::collections::HashMap;
use std::sync::LazyLock;
use std::time::{Duration, Instant};
use wifi_densepose_signal::hardware_norm::{CanonicalCsiFrame, HardwareType};
use wifi_densepose_signal::ruvsense::multiband::MultiBandCsiFrame;
use wifi_densepose_signal::ruvsense::multistatic::{FusedSensingFrame, MultistaticFuser};
use super::NodeState;
/// Maximum age for a node frame to be considered active (10 seconds).
const STALE_THRESHOLD: Duration = Duration::from_secs(10);
/// Default WiFi channel frequency (MHz) used for single-channel frames.
const DEFAULT_FREQ_MHZ: u32 = 2437; // Channel 6
/// Monotonic reference point for timestamp generation. All node timestamps
/// are relative to this instant, avoiding wall-clock/monotonic mixing issues.
static EPOCH: LazyLock<Instant> = LazyLock::new(Instant::now);
/// Convert a single `NodeState` into a `MultiBandCsiFrame` suitable for
/// multistatic fusion.
///
/// Returns `None` when the node has no frame history or no recorded
/// `last_frame_time`.
pub fn node_frame_from_state(node_id: u8, ns: &NodeState) -> Option<MultiBandCsiFrame> {
let last_time = ns.last_frame_time.as_ref()?;
let latest = ns.frame_history.back()?;
if latest.is_empty() {
return None;
}
let amplitude: Vec<f32> = latest.iter().map(|&v| v as f32).collect();
let n_sub = amplitude.len();
let phase = vec![0.0_f32; n_sub];
// Monotonic timestamp: microseconds since a shared process-local epoch.
// All nodes use the same reference so the fuser's guard_interval_us check
// compares apples to apples. No wall-clock mixing (immune to NTP jumps).
let timestamp_us = last_time.duration_since(*EPOCH).as_micros() as u64;
let canonical = CanonicalCsiFrame {
amplitude,
phase,
hardware_type: HardwareType::Esp32S3,
};
Some(MultiBandCsiFrame {
node_id,
timestamp_us,
channel_frames: vec![canonical],
frequencies_mhz: vec![DEFAULT_FREQ_MHZ],
coherence: 1.0, // single-channel, perfect self-coherence
})
}
/// Collect `MultiBandCsiFrame`s from all active nodes.
///
/// A node is considered active if its `last_frame_time` is within
/// [`STALE_THRESHOLD`] of `now`.
pub fn node_frames_from_states(node_states: &HashMap<u8, NodeState>) -> Vec<MultiBandCsiFrame> {
let now = Instant::now();
let mut frames = Vec::with_capacity(node_states.len());
for (&node_id, ns) in node_states {
// Skip stale nodes
if let Some(ref t) = ns.last_frame_time {
if now.duration_since(*t) > STALE_THRESHOLD {
continue;
}
} else {
continue;
}
if let Some(frame) = node_frame_from_state(node_id, ns) {
frames.push(frame);
}
}
frames
}
/// Attempt multistatic fusion; fall back to max per-node person count on failure.
///
/// Returns `(fused_frame, fallback_person_count)`. When fusion succeeds,
/// `fallback_person_count` is `None` — the caller must compute count from
/// the fused amplitudes. On failure, returns the maximum per-node count
/// (not the sum, to avoid double-counting overlapping coverage).
pub fn fuse_or_fallback(
fuser: &MultistaticFuser,
node_states: &HashMap<u8, NodeState>,
) -> (Option<FusedSensingFrame>, Option<usize>) {
let frames = node_frames_from_states(node_states);
if frames.is_empty() {
return (None, Some(0));
}
match fuser.fuse(&frames) {
Ok(fused) => {
// Caller must compute person count from fused amplitudes.
(Some(fused), None)
}
Err(e) => {
tracing::debug!("Multistatic fusion failed ({e}), using per-node max fallback");
// Use max (not sum) to avoid double-counting when nodes have overlapping coverage.
let max_count: usize = node_states
.values()
.filter(|ns| {
ns.last_frame_time
.map(|t| t.elapsed() <= STALE_THRESHOLD)
.unwrap_or(false)
})
.map(|ns| ns.prev_person_count)
.max()
.unwrap_or(0);
(None, Some(max_count))
}
}
}
/// Compute a person-presence score from fused amplitude data.
///
/// Uses the squared coefficient of variation (variance / mean^2) as a
/// lightweight proxy for body-induced CSI perturbation. A flat amplitude
/// vector (no person) yields a score near zero; a vector with high variance
/// relative to its mean (person moving) yields a score approaching 1.0.
pub fn compute_person_score_from_amplitudes(amplitudes: &[f32]) -> f64 {
if amplitudes.is_empty() {
return 0.0;
}
let n = amplitudes.len() as f64;
let sum: f64 = amplitudes.iter().map(|&a| a as f64).sum();
let mean = sum / n;
let variance: f64 = amplitudes.iter().map(|&a| {
let diff = (a as f64) - mean;
diff * diff
}).sum::<f64>() / n;
let score = variance / (mean * mean + 1e-10);
score.clamp(0.0, 1.0)
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
/// Helper: build a minimal NodeState for testing. Uses `NodeState::new()`
/// then mutates the `pub(crate)` fields the bridge needs.
fn make_node_state(
frame_history: VecDeque<Vec<f64>>,
last_frame_time: Option<Instant>,
prev_person_count: usize,
) -> NodeState {
let mut ns = NodeState::new();
ns.frame_history = frame_history;
ns.last_frame_time = last_frame_time;
ns.prev_person_count = prev_person_count;
ns
}
#[test]
fn test_node_frame_from_empty_state() {
let ns = make_node_state(VecDeque::new(), Some(Instant::now()), 0);
assert!(node_frame_from_state(1, &ns).is_none());
}
#[test]
fn test_node_frame_from_state_no_time() {
let mut history = VecDeque::new();
history.push_back(vec![1.0, 2.0, 3.0]);
let ns = make_node_state(history, None, 0);
assert!(node_frame_from_state(1, &ns).is_none());
}
#[test]
fn test_node_frame_conversion() {
let mut history = VecDeque::new();
history.push_back(vec![10.0, 20.0, 30.5]);
let ns = make_node_state(history, Some(Instant::now()), 0);
let frame = node_frame_from_state(42, &ns).expect("should produce a frame");
assert_eq!(frame.node_id, 42);
assert_eq!(frame.channel_frames.len(), 1);
let ch = &frame.channel_frames[0];
assert_eq!(ch.amplitude.len(), 3);
assert!((ch.amplitude[0] - 10.0_f32).abs() < f32::EPSILON);
assert!((ch.amplitude[1] - 20.0_f32).abs() < f32::EPSILON);
assert!((ch.amplitude[2] - 30.5_f32).abs() < f32::EPSILON);
// Phase should be all zeros
assert!(ch.phase.iter().all(|&p| p == 0.0));
assert_eq!(ch.hardware_type, HardwareType::Esp32S3);
}
#[test]
fn test_stale_node_excluded() {
let mut states: HashMap<u8, NodeState> = HashMap::new();
// Active node: frame just received
let mut active_history = VecDeque::new();
active_history.push_back(vec![1.0, 2.0]);
states.insert(1, make_node_state(active_history, Some(Instant::now()), 1));
// Stale node: frame 20 seconds ago
let mut stale_history = VecDeque::new();
stale_history.push_back(vec![3.0, 4.0]);
let stale_time = Instant::now() - Duration::from_secs(20);
states.insert(2, make_node_state(stale_history, Some(stale_time), 1));
let frames = node_frames_from_states(&states);
assert_eq!(frames.len(), 1, "stale node should be excluded");
assert_eq!(frames[0].node_id, 1);
}
#[test]
fn test_compute_person_score_empty() {
assert!((compute_person_score_from_amplitudes(&[]) - 0.0).abs() < f64::EPSILON);
}
#[test]
fn test_compute_person_score_flat() {
// Constant amplitude => variance = 0 => score ~ 0
let flat = vec![5.0_f32; 64];
let score = compute_person_score_from_amplitudes(&flat);
assert!(score < 0.001, "flat signal should have near-zero score, got {score}");
}
#[test]
fn test_compute_person_score_varied() {
// High variance relative to mean should produce a positive score
let varied: Vec<f32> = (0..64).map(|i| if i % 2 == 0 { 1.0 } else { 10.0 }).collect();
let score = compute_person_score_from_amplitudes(&varied);
assert!(score > 0.1, "varied signal should have positive score, got {score}");
assert!(score <= 1.0, "score should be clamped to 1.0, got {score}");
}
#[test]
fn test_compute_person_score_clamped() {
// Near-zero mean with non-zero variance => would blow up without clamp
let vals = vec![0.0_f32, 0.0, 0.0, 0.001];
let score = compute_person_score_from_amplitudes(&vals);
assert!(score <= 1.0, "score must be clamped to 1.0");
}
#[test]
fn test_fuse_or_fallback_empty() {
let fuser = MultistaticFuser::new();
let states: HashMap<u8, NodeState> = HashMap::new();
let (fused, count) = fuse_or_fallback(&fuser, &states);
assert!(fused.is_none());
assert_eq!(count, Some(0));
}
}

View file

@ -0,0 +1,409 @@
//! Bridge between sensing-server PersonDetection types and signal crate PoseTracker.
//!
//! The sensing server uses f64 types (PersonDetection, PoseKeypoint, BoundingBox)
//! while the signal crate's PoseTracker operates on f32 Kalman states. This module
//! provides conversion functions and a single `tracker_update` entry point that
//! accepts server-side detections and returns tracker-smoothed results.
use std::time::Instant;
use wifi_densepose_signal::ruvsense::{
self, KeypointState, PoseTrack, TrackLifecycleState, TrackId, NUM_KEYPOINTS,
};
use wifi_densepose_signal::ruvsense::pose_tracker::PoseTracker;
use super::{BoundingBox, PersonDetection, PoseKeypoint};
/// COCO-17 keypoint names in index order.
const COCO_NAMES: [&str; 17] = [
"nose",
"left_eye",
"right_eye",
"left_ear",
"right_ear",
"left_shoulder",
"right_shoulder",
"left_elbow",
"right_elbow",
"left_wrist",
"right_wrist",
"left_hip",
"right_hip",
"left_knee",
"right_knee",
"left_ankle",
"right_ankle",
];
/// Map a lowercase keypoint name to its COCO-17 index.
fn keypoint_name_to_coco_index(name: &str) -> Option<usize> {
COCO_NAMES.iter().position(|&n| n.eq_ignore_ascii_case(name))
}
/// Convert server-side PersonDetection slices into tracker-compatible keypoint arrays.
///
/// For each person, maps named keypoints to COCO-17 positions. Unmapped slots are
/// filled with the centroid of the mapped keypoints so the Kalman filter has a
/// reasonable initial value rather than zeros.
fn detections_to_tracker_keypoints(persons: &[PersonDetection]) -> Vec<[[f32; 3]; 17]> {
persons
.iter()
.map(|person| {
let mut kps = [[0.0_f32; 3]; 17];
let mut mapped_count = 0u32;
let mut cx = 0.0_f32;
let mut cy = 0.0_f32;
let mut cz = 0.0_f32;
// First pass: place mapped keypoints and accumulate centroid
for kp in &person.keypoints {
if let Some(idx) = keypoint_name_to_coco_index(&kp.name) {
kps[idx] = [kp.x as f32, kp.y as f32, kp.z as f32];
cx += kp.x as f32;
cy += kp.y as f32;
cz += kp.z as f32;
mapped_count += 1;
}
}
// Compute centroid of mapped keypoints
let centroid = if mapped_count > 0 {
let n = mapped_count as f32;
[cx / n, cy / n, cz / n]
} else {
[0.0, 0.0, 0.0]
};
// Second pass: fill unmapped slots with centroid
// Build a set of mapped indices
let mut mapped = [false; 17];
for kp in &person.keypoints {
if let Some(idx) = keypoint_name_to_coco_index(&kp.name) {
mapped[idx] = true;
}
}
for i in 0..17 {
if !mapped[i] {
kps[i] = centroid;
}
}
kps
})
.collect()
}
/// Convert active PoseTracker tracks back into server-side PersonDetection values.
///
/// Only tracks whose lifecycle `is_alive()` are included.
pub fn tracker_to_person_detections(tracker: &PoseTracker) -> Vec<PersonDetection> {
tracker
.active_tracks()
.into_iter()
.map(|track| {
let id = track.id.0 as u32;
let confidence = match track.lifecycle {
TrackLifecycleState::Active => 0.9,
TrackLifecycleState::Tentative => 0.5,
TrackLifecycleState::Lost => 0.3,
TrackLifecycleState::Terminated => 0.0,
};
// Build keypoints from Kalman state
let keypoints: Vec<PoseKeypoint> = (0..NUM_KEYPOINTS)
.map(|i| {
let pos = track.keypoints[i].position();
PoseKeypoint {
name: COCO_NAMES[i].to_string(),
x: pos[0] as f64,
y: pos[1] as f64,
z: pos[2] as f64,
confidence: track.keypoints[i].confidence as f64,
}
})
.collect();
// Compute bounding box from observed keypoints only (confidence > 0).
// Unobserved slots (centroid-filled) collapse the bbox over time.
let mut min_x = f64::MAX;
let mut min_y = f64::MAX;
let mut max_x = f64::MIN;
let mut max_y = f64::MIN;
let mut observed = 0;
for kp in &keypoints {
if kp.confidence > 0.0 {
if kp.x < min_x { min_x = kp.x; }
if kp.y < min_y { min_y = kp.y; }
if kp.x > max_x { max_x = kp.x; }
if kp.y > max_y { max_y = kp.y; }
observed += 1;
}
}
let bbox = if observed > 0 {
BoundingBox {
x: min_x,
y: min_y,
width: (max_x - min_x).max(0.01),
height: (max_y - min_y).max(0.01),
}
} else {
// No observed keypoints — use a default bbox at centroid
let cx = keypoints.iter().map(|k| k.x).sum::<f64>() / keypoints.len() as f64;
let cy = keypoints.iter().map(|k| k.y).sum::<f64>() / keypoints.len() as f64;
BoundingBox { x: cx - 0.3, y: cy - 0.5, width: 0.6, height: 1.0 }
};
PersonDetection {
id,
confidence,
keypoints,
bbox,
zone: "tracked".to_string(),
}
})
.collect()
}
/// Run one tracker cycle: predict, match detections, update, prune.
///
/// This is the main entry point called each sensing frame. It:
/// 1. Computes dt from the previous call instant
/// 2. Predicts all existing tracks forward
/// 3. Greedily assigns detections to tracks by Mahalanobis cost
/// 4. Updates matched tracks, creates new tracks for unmatched detections
/// 5. Prunes terminated tracks
/// 6. Returns smoothed PersonDetection values from the tracker state
pub fn tracker_update(
tracker: &mut PoseTracker,
last_instant: &mut Option<Instant>,
persons: Vec<PersonDetection>,
) -> Vec<PersonDetection> {
let now = Instant::now();
let dt = last_instant.map_or(0.1_f32, |prev| now.duration_since(prev).as_secs_f32());
*last_instant = Some(now);
// Predict all tracks forward
tracker.predict_all(dt);
if persons.is_empty() {
tracker.prune_terminated();
return tracker_to_person_detections(tracker);
}
// Convert detections to f32 keypoint arrays
let all_keypoints = detections_to_tracker_keypoints(&persons);
// Compute centroids for each detection
let centroids: Vec<[f32; 3]> = all_keypoints
.iter()
.map(|kps| {
let mut c = [0.0_f32; 3];
for kp in kps {
c[0] += kp[0];
c[1] += kp[1];
c[2] += kp[2];
}
let n = NUM_KEYPOINTS as f32;
c[0] /= n;
c[1] /= n;
c[2] /= n;
c
})
.collect();
// Greedy assignment: for each detection, find the best matching active track.
// Collect tracks once to avoid re-borrowing tracker per detection.
let active: Vec<(TrackId, [f32; 3])> = tracker.active_tracks().iter().map(|t| {
let centroid = {
let mut c = [0.0_f32; 3];
for kp in &t.keypoints {
let p = kp.position();
c[0] += p[0]; c[1] += p[1]; c[2] += p[2];
}
let n = NUM_KEYPOINTS as f32;
[c[0] / n, c[1] / n, c[2] / n]
};
(t.id, centroid)
}).collect();
let mut used_tracks: Vec<bool> = vec![false; active.len()];
let mut matched: Vec<Option<TrackId>> = vec![None; persons.len()];
for det_idx in 0..persons.len() {
let mut best_cost = f32::MAX;
let mut best_track_idx = None;
let active_refs = tracker.active_tracks();
for (track_idx, track) in active_refs.iter().enumerate() {
if used_tracks[track_idx] {
continue;
}
let cost = tracker.assignment_cost(track, &centroids[det_idx], &[]);
if cost < best_cost {
best_cost = cost;
best_track_idx = Some(track_idx);
}
}
// Mahalanobis gate: 9.0 (default TrackerConfig)
if best_cost < 9.0 {
if let Some(tidx) = best_track_idx {
matched[det_idx] = Some(active[tidx].0);
used_tracks[tidx] = true;
}
}
}
// Timestamp for new/updated tracks (microseconds since UNIX epoch)
let timestamp_us = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_micros() as u64)
.unwrap_or(0);
// Update matched tracks (uses update_keypoints for proper lifecycle transitions)
for (det_idx, track_id_opt) in matched.iter().enumerate() {
if let Some(track_id) = track_id_opt {
if let Some(track) = tracker.find_track_mut(*track_id) {
track.update_keypoints(&all_keypoints[det_idx], 0.08, 1.0, timestamp_us);
}
}
}
// Create new tracks for unmatched detections
for (det_idx, track_id_opt) in matched.iter().enumerate() {
if track_id_opt.is_none() {
tracker.create_track(&all_keypoints[det_idx], timestamp_us);
}
}
tracker.prune_terminated();
tracker_to_person_detections(tracker)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_keypoint(name: &str, x: f64, y: f64, z: f64) -> PoseKeypoint {
PoseKeypoint {
name: name.to_string(),
x,
y,
z,
confidence: 0.9,
}
}
fn make_person(id: u32, keypoints: Vec<PoseKeypoint>) -> PersonDetection {
PersonDetection {
id,
confidence: 0.8,
keypoints,
bbox: BoundingBox {
x: 0.0,
y: 0.0,
width: 1.0,
height: 1.0,
},
zone: "test".to_string(),
}
}
#[test]
fn test_keypoint_name_to_coco_index() {
assert_eq!(keypoint_name_to_coco_index("nose"), Some(0));
assert_eq!(keypoint_name_to_coco_index("left_eye"), Some(1));
assert_eq!(keypoint_name_to_coco_index("right_eye"), Some(2));
assert_eq!(keypoint_name_to_coco_index("left_ear"), Some(3));
assert_eq!(keypoint_name_to_coco_index("right_ear"), Some(4));
assert_eq!(keypoint_name_to_coco_index("left_shoulder"), Some(5));
assert_eq!(keypoint_name_to_coco_index("right_shoulder"), Some(6));
assert_eq!(keypoint_name_to_coco_index("left_elbow"), Some(7));
assert_eq!(keypoint_name_to_coco_index("right_elbow"), Some(8));
assert_eq!(keypoint_name_to_coco_index("left_wrist"), Some(9));
assert_eq!(keypoint_name_to_coco_index("right_wrist"), Some(10));
assert_eq!(keypoint_name_to_coco_index("left_hip"), Some(11));
assert_eq!(keypoint_name_to_coco_index("right_hip"), Some(12));
assert_eq!(keypoint_name_to_coco_index("left_knee"), Some(13));
assert_eq!(keypoint_name_to_coco_index("right_knee"), Some(14));
assert_eq!(keypoint_name_to_coco_index("left_ankle"), Some(15));
assert_eq!(keypoint_name_to_coco_index("right_ankle"), Some(16));
assert_eq!(keypoint_name_to_coco_index("unknown"), None);
// Case insensitive
assert_eq!(keypoint_name_to_coco_index("NOSE"), Some(0));
assert_eq!(keypoint_name_to_coco_index("Left_Eye"), Some(1));
}
#[test]
fn test_detections_to_tracker_keypoints() {
let person = make_person(
1,
vec![
make_keypoint("nose", 1.0, 2.0, 0.5),
make_keypoint("left_shoulder", 0.8, 2.5, 0.4),
make_keypoint("right_shoulder", 1.2, 2.5, 0.6),
],
);
let result = detections_to_tracker_keypoints(&[person]);
assert_eq!(result.len(), 1);
let kps = &result[0];
// Mapped keypoints should have correct values
assert!((kps[0][0] - 1.0).abs() < 1e-5); // nose x
assert!((kps[0][1] - 2.0).abs() < 1e-5); // nose y
assert!((kps[0][2] - 0.5).abs() < 1e-5); // nose z
assert!((kps[5][0] - 0.8).abs() < 1e-5); // left_shoulder x
assert!((kps[6][0] - 1.2).abs() < 1e-5); // right_shoulder x
// Unmapped keypoints should be at centroid of mapped keypoints
// centroid = ((1.0+0.8+1.2)/3, (2.0+2.5+2.5)/3, (0.5+0.4+0.6)/3)
let cx = (1.0 + 0.8 + 1.2) / 3.0;
let cy = (2.0 + 2.5 + 2.5) / 3.0;
let cz = (0.5 + 0.4 + 0.6) / 3.0;
// left_eye (index 1) should be at centroid
assert!((kps[1][0] - cx).abs() < 1e-4);
assert!((kps[1][1] - cy).abs() < 1e-4);
assert!((kps[1][2] - cz).abs() < 1e-4);
}
#[test]
fn test_tracker_update_stable_ids() {
let mut tracker = PoseTracker::new();
let mut last_instant: Option<Instant> = None;
let person = make_person(
0,
vec![
make_keypoint("nose", 1.0, 2.0, 0.0),
make_keypoint("left_shoulder", 0.8, 2.5, 0.0),
make_keypoint("right_shoulder", 1.2, 2.5, 0.0),
make_keypoint("left_hip", 0.9, 3.5, 0.0),
make_keypoint("right_hip", 1.1, 3.5, 0.0),
],
);
// First update: creates a new track
let result1 = tracker_update(&mut tracker, &mut last_instant, vec![person.clone()]);
assert_eq!(result1.len(), 1);
let id1 = result1[0].id;
// Second update: should match the existing track
let result2 = tracker_update(&mut tracker, &mut last_instant, vec![person.clone()]);
assert_eq!(result2.len(), 1);
let id2 = result2[0].id;
// Third update: same track ID should persist
let result3 = tracker_update(&mut tracker, &mut last_instant, vec![person.clone()]);
assert_eq!(result3.len(), 1);
let id3 = result3[0].id;
// All three updates should return the same track ID
assert_eq!(id1, id2, "Track ID should be stable across updates");
assert_eq!(id2, id3, "Track ID should be stable across updates");
}
}

View file

@ -11,6 +11,12 @@ keywords = ["wifi", "csi", "signal-processing", "densepose", "rust"]
categories = ["science", "computer-vision"]
readme = "README.md"
[features]
default = ["eigenvalue"]
## Enable eigenvalue-based person counting (requires BLAS via ndarray-linalg).
## Disable with --no-default-features to use the diagonal fallback instead.
eigenvalue = ["ndarray-linalg"]
[dependencies]
# Core utilities
thiserror.workspace = true
@ -20,6 +26,7 @@ chrono = { version = "0.4", features = ["serde"] }
# Signal processing
ndarray = { workspace = true }
ndarray-linalg = { workspace = true, optional = true }
rustfft.workspace = true
num-complex.workspace = true
num-traits.workspace = true

View file

@ -17,6 +17,12 @@
//! of Squares and Products." Technometrics.
//! - ADR-030: RuvSense Persistent Field Model
use ndarray::Array2;
#[cfg(feature = "eigenvalue")]
use ndarray_linalg::Eigh;
#[cfg(feature = "eigenvalue")]
use ndarray_linalg::UPLO;
// ---------------------------------------------------------------------------
// Error types
// ---------------------------------------------------------------------------
@ -47,6 +53,14 @@ pub enum FieldModelError {
/// Invalid configuration parameter.
#[error("Invalid configuration: {0}")]
InvalidConfig(String),
/// Model has not been calibrated yet.
#[error("Field model not calibrated")]
NotCalibrated,
/// Not enough data for the requested operation.
#[error("Insufficient data: need {need}, have {have}")]
InsufficientData { need: usize, have: usize },
}
// ---------------------------------------------------------------------------
@ -260,6 +274,8 @@ pub struct FieldNormalMode {
pub calibrated_at_us: u64,
/// Hash of mesh geometry at calibration time.
pub geometry_hash: u64,
/// Baseline eigenvalue count above Marcenko-Pastur threshold (empty-room).
pub baseline_eigenvalue_count: usize,
}
/// Body perturbation extracted from a CSI observation.
@ -310,6 +326,60 @@ pub struct FieldModel {
status: CalibrationStatus,
/// Timestamp of last calibration completion (microseconds).
last_calibration_us: u64,
/// Running outer-product sum for full covariance SVD: [n_sub x n_sub].
covariance_sum: Option<Array2<f64>>,
/// Number of frames accumulated into covariance_sum.
covariance_count: u64,
}
/// Diagonal variance fallback for when full covariance SVD is unavailable.
///
/// Returns `(mode_energies, environmental_modes, baseline_eigenvalue_count)`.
fn diagonal_fallback(
link_stats: &[LinkBaselineStats],
n_sc: usize,
n_modes: usize,
) -> (Vec<f64>, Vec<Vec<f64>>, usize) {
// Average variance across links (diagonal approximation)
let mut avg_variance = vec![0.0_f64; n_sc];
for ls in link_stats {
let var = ls.variance_vector();
for (i, v) in var.iter().enumerate() {
avg_variance[i] += v;
}
}
let n_links_f = link_stats.len() as f64;
if n_links_f > 0.0 {
for v in avg_variance.iter_mut() {
*v /= n_links_f;
}
}
// Sort subcarrier indices by variance (descending) to pick top-K modes
let mut indices: Vec<usize> = (0..n_sc).collect();
indices.sort_by(|&a, &b| {
avg_variance[b]
.partial_cmp(&avg_variance[a])
.unwrap_or(std::cmp::Ordering::Equal)
});
let mut environmental_modes = Vec::with_capacity(n_modes);
let mut mode_energies = Vec::with_capacity(n_modes);
for k in 0..n_modes.min(n_sc) {
let idx = indices[k];
let mut mode = vec![0.0_f64; n_sc];
mode[idx] = 1.0;
mode_energies.push(avg_variance[idx]);
environmental_modes.push(mode);
}
// For diagonal fallback, estimate baseline eigenvalue count from variance
let total_var: f64 = avg_variance.iter().sum();
let mean_var = if n_sc > 0 { total_var / n_sc as f64 } else { 0.0 };
let baseline_count = avg_variance.iter().filter(|&&v| v > mean_var * 2.0).count();
(mode_energies, environmental_modes, baseline_count)
}
impl FieldModel {
@ -339,6 +409,8 @@ impl FieldModel {
modes: None,
status: CalibrationStatus::Uncalibrated,
last_calibration_us: 0,
covariance_sum: None,
covariance_count: 0,
})
}
@ -375,6 +447,30 @@ impl FieldModel {
if self.status == CalibrationStatus::Uncalibrated {
self.status = CalibrationStatus::Collecting;
}
// Accumulate raw outer products for SVD covariance (no centering here —
// mean subtraction is deferred to finalize_calibration to avoid bias).
// We average across links so covariance_count tracks frames, not links.
let n = self.config.n_subcarriers;
let cov = self.covariance_sum.get_or_insert_with(|| Array2::zeros((n, n)));
let n_links = observations.len();
for obs in observations {
if obs.len() >= n {
// Rank-1 update: cov += obs * obs^T (raw, un-centered)
for i in 0..n {
for j in i..n {
let val = obs[i] * obs[j];
cov[[i, j]] += val;
if i != j {
cov[[j, i]] += val;
}
}
}
}
}
// Count once per frame (not per link) for correct MP ratio
self.covariance_count += 1;
Ok(())
}
@ -396,58 +492,134 @@ impl FieldModel {
});
}
// Build covariance matrix from per-link variance data.
// We average the variance vectors across all links to get the
// covariance diagonal, then compute eigenmodes via power iteration.
let n_sc = self.config.n_subcarriers;
let n_modes = self.config.n_modes.min(n_sc);
// Collect per-link baselines
let baseline: Vec<Vec<f64>> = self.link_stats.iter().map(|ls| ls.mean_vector()).collect();
// Average covariance across links (diagonal approximation)
let mut avg_variance = vec![0.0_f64; n_sc];
// --- True eigenvalue decomposition (with diagonal fallback) ---
let (mode_energies, environmental_modes, baseline_eig_count) =
if let Some(ref cov_sum) = self.covariance_sum {
if self.covariance_count > 1 {
// Compute sample covariance from raw outer products:
// cov = (sum_xx / N - mean * mean^T) * N / (N-1)
// where sum_xx accumulated obs * obs^T across all links per frame.
// We average per-link means for centering.
let n_frames = self.covariance_count as f64;
let n_links = self.config.n_links as f64;
// Average mean across all links
let mut avg_mean = vec![0.0f64; n_sc];
for ls in &self.link_stats {
let var = ls.variance_vector();
for (i, v) in var.iter().enumerate() {
avg_variance[i] += v;
let m = ls.mean_vector();
for i in 0..n_sc { avg_mean[i] += m[i]; }
}
for i in 0..n_sc { avg_mean[i] /= n_links; }
// cov = sum_xx / (N * n_links) - mean * mean^T, then Bessel correction
let total_obs = n_frames * n_links;
let mut covariance = cov_sum / total_obs;
for i in 0..n_sc {
for j in 0..n_sc {
covariance[[i, j]] -= avg_mean[i] * avg_mean[j];
}
}
let n_links_f = self.config.n_links as f64;
for v in avg_variance.iter_mut() {
*v /= n_links_f;
}
// Bessel's correction: multiply by N/(N-1) where N = total observations
let bessel = total_obs / (total_obs - 1.0);
covariance *= bessel;
// Extract modes via simplified power iteration on the diagonal
// covariance. Since we use a diagonal approximation, the eigenmodes
// are aligned with the standard basis, sorted by variance.
let total_variance: f64 = avg_variance.iter().sum();
// Sort subcarrier indices by variance (descending) to pick top-K modes
let mut indices: Vec<usize> = (0..n_sc).collect();
indices.sort_by(|&a, &b| {
avg_variance[b]
.partial_cmp(&avg_variance[a])
// Symmetric eigendecomposition (requires eigenvalue feature / BLAS)
#[cfg(feature = "eigenvalue")]
match covariance.eigh(UPLO::Upper) {
Ok((eigenvalues, eigenvectors)) => {
// eigenvalues are in ascending order from ndarray-linalg
// Reverse to get descending
let len = eigenvalues.len();
let mut sorted_indices: Vec<usize> = (0..len).collect();
sorted_indices.sort_by(|&a, &b| {
eigenvalues[b]
.partial_cmp(&eigenvalues[a])
.unwrap_or(std::cmp::Ordering::Equal)
});
let mut environmental_modes = Vec::with_capacity(n_modes);
let mut mode_energies = Vec::with_capacity(n_modes);
let mut explained = 0.0_f64;
// Extract top n_modes
let modes: Vec<Vec<f64>> = sorted_indices
.iter()
.take(n_modes)
.map(|&idx| eigenvectors.column(idx).to_vec())
.collect();
let energies: Vec<f64> = sorted_indices
.iter()
.take(n_modes)
.map(|&idx| eigenvalues[idx].max(0.0))
.collect();
for k in 0..n_modes {
let idx = indices[k];
// Create a unit vector along the highest-variance subcarrier
let mut mode = vec![0.0_f64; n_sc];
mode[idx] = 1.0;
let energy = avg_variance[idx];
environmental_modes.push(mode);
mode_energies.push(energy);
explained += energy;
// Marcenko-Pastur noise estimate: median of POSITIVE
// eigenvalues in the bottom half. Excludes zeros from
// rank-deficient matrices (when p > n).
let noise_var = {
let mut positive: Vec<f64> = eigenvalues
.iter().copied().filter(|&e| e > 1e-10).collect();
positive.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
if positive.len() >= 4 {
let half = positive.len() / 2;
positive[..half].iter().sum::<f64>() / half as f64
} else if !positive.is_empty() {
positive[0]
} else {
1e-10
}
};
// MP ratio: p/n where n = total observations (frames * links)
let total_obs_mp = self.covariance_count as f64 * self.config.n_links as f64;
let ratio = n_sc as f64 / total_obs_mp;
let mp_threshold = noise_var * (1.0 + ratio.sqrt()).powi(2);
let baseline_count = eigenvalues
.iter()
.filter(|&&ev| ev > mp_threshold)
.count();
(energies, modes, baseline_count)
}
Err(_) => {
// Fallback to diagonal approximation on SVD failure
diagonal_fallback(&self.link_stats, n_sc, n_modes)
}
}
// When eigenvalue feature is disabled, use diagonal fallback
#[cfg(not(feature = "eigenvalue"))]
{ diagonal_fallback(&self.link_stats, n_sc, n_modes) }
} else {
diagonal_fallback(&self.link_stats, n_sc, n_modes)
}
} else {
diagonal_fallback(&self.link_stats, n_sc, n_modes)
};
// Compute variance explained using the same centered covariance as modes.
// total_variance = trace(centered_covariance) = sum of ALL eigenvalues.
let total_energy: f64 = mode_energies.iter().sum();
let total_variance = if let Some(ref cov_sum) = self.covariance_sum {
if self.covariance_count > 1 {
let n_links_f = self.config.n_links as f64;
let total_obs = self.covariance_count as f64 * n_links_f;
// Centered trace: E[x^2] - E[x]^2, with Bessel correction
let mut avg_mean = vec![0.0f64; n_sc];
for ls in &self.link_stats {
let m = ls.mean_vector();
for i in 0..n_sc { avg_mean[i] += m[i]; }
}
for i in 0..n_sc { avg_mean[i] /= n_links_f; }
let raw_trace: f64 = (0..n_sc).map(|i| cov_sum[[i, i]] / total_obs).sum();
let mean_sq: f64 = avg_mean.iter().map(|m| m * m).sum();
(raw_trace - mean_sq).max(0.0) * total_obs / (total_obs - 1.0)
} else {
total_energy
}
} else {
total_energy
};
let variance_explained = if total_variance > 1e-15 {
explained / total_variance
total_energy / total_variance
} else {
0.0
};
@ -459,6 +631,7 @@ impl FieldModel {
variance_explained,
calibrated_at_us: timestamp_us,
geometry_hash,
baseline_eigenvalue_count: baseline_eig_count,
};
self.modes = Some(field_mode);
@ -541,6 +714,100 @@ impl FieldModel {
})
}
/// Estimate room occupancy from eigenvalue analysis of recent CSI frames.
///
/// `recent_frames`: sliding window of amplitude vectors (recommend 50 frames
/// ~ 2.5s at 20 Hz). Returns estimated person count (0 = empty room).
///
/// Requires the `eigenvalue` feature (BLAS). Returns `NotCalibrated` when
/// the feature is disabled.
#[cfg(feature = "eigenvalue")]
pub fn estimate_occupancy(&self, recent_frames: &[Vec<f64>]) -> Result<usize, FieldModelError> {
let modes = self.modes.as_ref().ok_or(FieldModelError::NotCalibrated)?;
let n = self.config.n_subcarriers;
if recent_frames.len() < 10 {
return Err(FieldModelError::InsufficientData {
need: 10,
have: recent_frames.len(),
});
}
// Build covariance matrix from recent frames
let mut mean = vec![0.0f64; n];
let mut count = 0usize;
for frame in recent_frames {
if frame.len() >= n {
for i in 0..n {
mean[i] += frame[i];
}
count += 1;
}
}
if count < 2 {
return Ok(0);
}
for m in &mut mean {
*m /= count as f64;
}
let mut cov = Array2::<f64>::zeros((n, n));
for frame in recent_frames {
if frame.len() >= n {
for i in 0..n {
let ci = frame[i] - mean[i];
for j in i..n {
let val = ci * (frame[j] - mean[j]);
cov[[i, j]] += val;
if i != j {
cov[[j, i]] += val;
}
}
}
}
}
let scale = 1.0 / (count as f64 - 1.0);
cov *= scale;
// Eigendecompose
let eigenvalues = match cov.eigh(UPLO::Upper) {
Ok((evals, _)) => evals,
Err(_) => return Ok(0), // SVD failure = can't estimate
};
// Marcenko-Pastur noise estimate: median of POSITIVE eigenvalues
// in the bottom half. Excludes zeros from rank-deficient matrices
// (common when n_subcarriers > n_frames, e.g. 56 subcarriers / 50 frames).
let noise_var = {
let mut positive: Vec<f64> = eigenvalues.iter()
.copied()
.filter(|&e| e > 1e-10)
.collect();
positive.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
if positive.len() >= 4 {
let half = positive.len() / 2;
positive[..half].iter().sum::<f64>() / half as f64
} else if !positive.is_empty() {
positive[0]
} else {
return Ok(0); // All zero eigenvalues — can't estimate
}
};
let ratio = n as f64 / count as f64;
let mp_threshold = noise_var * (1.0 + ratio.sqrt()).powi(2);
let significant = eigenvalues.iter().filter(|&&ev| ev > mp_threshold).count();
let occupancy = significant.saturating_sub(modes.baseline_eigenvalue_count);
Ok(occupancy.min(10)) // Cap at 10 persons
}
/// Stub when eigenvalue feature is disabled — always returns NotCalibrated.
#[cfg(not(feature = "eigenvalue"))]
pub fn estimate_occupancy(&self, _recent_frames: &[Vec<f64>]) -> Result<usize, FieldModelError> {
Err(FieldModelError::NotCalibrated)
}
/// Check calibration freshness against a given timestamp.
pub fn check_freshness(&self, current_us: u64) -> CalibrationStatus {
if self.modes.is_none() {
@ -563,6 +830,8 @@ impl FieldModel {
.collect();
self.modes = None;
self.status = CalibrationStatus::Uncalibrated;
self.covariance_sum = None;
self.covariance_count = 0;
}
}
@ -873,6 +1142,179 @@ mod tests {
}
}
#[test]
fn test_covariance_accumulation() {
let config = make_config(2, 4, 5);
let mut model = FieldModel::new(config).unwrap();
// Feed calibration data
for i in 0..10 {
let obs = make_observations(2, 4, 1.0 + 0.1 * i as f64);
model.feed_calibration(&obs).unwrap();
}
// covariance_sum should be populated
assert!(model.covariance_sum.is_some());
assert!(model.covariance_count > 0);
let cov = model.covariance_sum.as_ref().unwrap();
assert_eq!(cov.shape(), &[4, 4]);
// Diagonal entries should be non-negative (sum of squares)
for i in 0..4 {
assert!(cov[[i, i]] >= 0.0, "Diagonal covariance entry must be >= 0");
}
// Matrix should be symmetric
for i in 0..4 {
for j in 0..4 {
assert!(
(cov[[i, j]] - cov[[j, i]]).abs() < 1e-10,
"Covariance matrix must be symmetric"
);
}
}
}
#[test]
fn test_svd_finalize_produces_orthonormal_modes() {
let config = FieldModelConfig {
n_links: 1,
n_subcarriers: 8,
n_modes: 3,
min_calibration_frames: 20,
baseline_expiry_s: 86_400.0,
};
let mut model = FieldModel::new(config).unwrap();
// Feed frames with correlated subcarrier patterns to produce
// non-trivial eigenmodes
for i in 0..50 {
let t = i as f64 * 0.1;
let obs = vec![vec![
1.0 + t.sin(),
2.0 + t.cos(),
3.0 + 0.5 * t.sin(),
4.0 + 0.3 * t.cos(),
5.0 + 0.1 * t,
6.0,
7.0 + 0.2 * (2.0 * t).sin(),
8.0 + 0.1 * (2.0 * t).cos(),
]];
model.feed_calibration(&obs).unwrap();
}
model.finalize_calibration(1_000_000, 0).unwrap();
let modes = model.modes().unwrap();
// Each mode should be approximately unit length
for (k, mode) in modes.environmental_modes.iter().enumerate() {
let norm: f64 = mode.iter().map(|x| x * x).sum::<f64>().sqrt();
assert!(
(norm - 1.0).abs() < 0.01,
"Mode {} has norm {} (expected ~1.0)",
k,
norm
);
}
// Modes should be approximately orthogonal
for i in 0..modes.environmental_modes.len() {
for j in (i + 1)..modes.environmental_modes.len() {
let dot: f64 = modes.environmental_modes[i]
.iter()
.zip(modes.environmental_modes[j].iter())
.map(|(a, b)| a * b)
.sum();
assert!(
dot.abs() < 0.05,
"Modes {} and {} have dot product {} (expected ~0)",
i,
j,
dot
);
}
}
}
#[test]
fn test_estimate_occupancy_noise_only() {
let config = FieldModelConfig {
n_links: 1,
n_subcarriers: 8,
n_modes: 3,
min_calibration_frames: 20,
baseline_expiry_s: 86_400.0,
};
let mut model = FieldModel::new(config).unwrap();
// Calibrate with some deterministic noise-like pattern
for i in 0..50 {
let t = i as f64 * 0.1;
let obs = vec![vec![
1.0 + 0.01 * t.sin(),
2.0 + 0.01 * t.cos(),
3.0 + 0.01 * (2.0 * t).sin(),
4.0 + 0.01 * (2.0 * t).cos(),
5.0 + 0.01 * (3.0 * t).sin(),
6.0 + 0.01 * (3.0 * t).cos(),
7.0 + 0.01 * (4.0 * t).sin(),
8.0 + 0.01 * (4.0 * t).cos(),
]];
model.feed_calibration(&obs).unwrap();
}
model.finalize_calibration(1_000_000, 0).unwrap();
// Estimate occupancy with similar noise-only frames
let frames: Vec<Vec<f64>> = (0..20)
.map(|i| {
let t = (i + 50) as f64 * 0.1;
vec![
1.0 + 0.01 * t.sin(),
2.0 + 0.01 * t.cos(),
3.0 + 0.01 * (2.0 * t).sin(),
4.0 + 0.01 * (2.0 * t).cos(),
5.0 + 0.01 * (3.0 * t).sin(),
6.0 + 0.01 * (3.0 * t).cos(),
7.0 + 0.01 * (4.0 * t).sin(),
8.0 + 0.01 * (4.0 * t).cos(),
]
})
.collect();
let occupancy = model.estimate_occupancy(&frames).unwrap();
assert_eq!(occupancy, 0, "Noise-only frames should yield 0 occupancy");
}
#[test]
fn test_baseline_eigenvalue_count_stored() {
let config = FieldModelConfig {
n_links: 1,
n_subcarriers: 8,
n_modes: 3,
min_calibration_frames: 20,
baseline_expiry_s: 86_400.0,
};
let mut model = FieldModel::new(config).unwrap();
// Feed frames with structured variance so eigenvalues are meaningful
for i in 0..50 {
let t = i as f64 * 0.1;
let obs = vec![vec![
1.0 + t.sin(),
2.0 + t.cos(),
3.0 + 0.5 * t.sin(),
4.0 + 0.3 * t.cos(),
5.0 + 0.1 * t,
6.0,
7.0,
8.0,
]];
model.feed_calibration(&obs).unwrap();
}
let modes = model.finalize_calibration(1_000_000, 0).unwrap();
// baseline_eigenvalue_count should exist and be a reasonable value
// (at least 0, at most n_subcarriers)
assert!(
modes.baseline_eigenvalue_count <= 8,
"baseline_eigenvalue_count should be <= n_subcarriers"
);
}
#[test]
fn test_environmental_projection_removes_drift() {
let config = make_config(1, 4, 10);

View file

@ -110,12 +110,18 @@ export class SensingTab {
<div class="sensing-card-title">About This Data</div>
<p class="sensing-about-text">
Metrics are computed from WiFi Channel State Information (CSI).
With <strong>1 ESP32</strong> you get presence detection, breathing
With <strong><span id="sensingNodeCount">0</span> ESP32 node(s)</strong> you get presence detection, breathing
estimation, and gross motion. Add <strong>3-4+ ESP32 nodes</strong>
around the room for spatial resolution and limb-level tracking.
</p>
</div>
<!-- Node Status -->
<div class="sensing-card" id="sensingNodeCards">
<div class="sensing-card-title">NODE STATUS</div>
<div id="nodeStatusContainer"></div>
</div>
<!-- Extra info -->
<div class="sensing-card">
<div class="sensing-card-title">Details</div>
@ -193,6 +199,9 @@ export class SensingTab {
// Update HUD
this._updateHUD(data);
// Update per-node panels
this._updateNodePanels(data);
}
_onStateChange(state) {
@ -233,6 +242,11 @@ export class SensingTab {
const f = data.features || {};
const c = data.classification || {};
// Node count
const nodeCount = (data.nodes || []).length;
const countEl = this.container.querySelector('#sensingNodeCount');
if (countEl) countEl.textContent = String(nodeCount);
// RSSI
this._setText('sensingRssi', `${(f.mean_rssi || -80).toFixed(1)} dBm`);
this._setText('sensingSource', data.source || '');
@ -309,6 +323,57 @@ export class SensingTab {
ctx.stroke();
}
// ---- Per-node panels ---------------------------------------------------
_updateNodePanels(data) {
const container = this.container.querySelector('#nodeStatusContainer');
if (!container) return;
const nodeFeatures = data.node_features || [];
if (nodeFeatures.length === 0) {
container.textContent = '';
const msg = document.createElement('div');
msg.style.cssText = 'color:#888;font-size:12px;padding:8px;';
msg.textContent = 'No nodes detected';
container.appendChild(msg);
return;
}
const NODE_COLORS = ['#00ccff', '#ff6600', '#00ff88', '#ff00cc', '#ffcc00', '#8800ff', '#00ffcc', '#ff0044'];
container.textContent = '';
for (const nf of nodeFeatures) {
const color = NODE_COLORS[nf.node_id % NODE_COLORS.length];
const statusColor = nf.stale ? '#888' : '#0f0';
const row = document.createElement('div');
row.style.cssText = `display:flex;align-items:center;gap:8px;padding:6px 8px;margin-bottom:4px;background:rgba(255,255,255,0.03);border-radius:6px;border-left:3px solid ${color};`;
const idCol = document.createElement('div');
idCol.style.minWidth = '50px';
const nameEl = document.createElement('div');
nameEl.style.cssText = `font-size:11px;font-weight:600;color:${color};`;
nameEl.textContent = 'Node ' + nf.node_id;
const statusEl = document.createElement('div');
statusEl.style.cssText = `font-size:9px;color:${statusColor};`;
statusEl.textContent = nf.stale ? 'STALE' : 'ACTIVE';
idCol.appendChild(nameEl);
idCol.appendChild(statusEl);
const metricsCol = document.createElement('div');
metricsCol.style.cssText = 'flex:1;font-size:10px;color:#aaa;';
metricsCol.textContent = (nf.rssi_dbm || -80).toFixed(0) + ' dBm · var ' + (nf.features?.variance || 0).toFixed(1);
const classCol = document.createElement('div');
classCol.style.cssText = 'font-size:10px;font-weight:600;color:#ccc;';
const motion = (nf.classification?.motion_level || 'absent').toUpperCase();
const conf = ((nf.classification?.confidence || 0) * 100).toFixed(0);
classCol.textContent = motion + ' ' + conf + '%';
row.appendChild(idCol);
row.appendChild(metricsCol);
row.appendChild(classCol);
container.appendChild(row);
}
}
// ---- Resize ------------------------------------------------------------
_setupResize() {

View file

@ -66,6 +66,10 @@ function valueToColor(v) {
return [r, g, b];
}
// ---- Node marker color palette -------------------------------------------
const NODE_MARKER_COLORS = [0x00ccff, 0xff6600, 0x00ff88, 0xff00cc, 0xffcc00, 0x8800ff, 0x00ffcc, 0xff0044];
// ---- GaussianSplatRenderer -----------------------------------------------
export class GaussianSplatRenderer {
@ -108,6 +112,10 @@ export class GaussianSplatRenderer {
// Node markers (ESP32 / router positions)
this._createNodeMarkers(THREE);
// Dynamic per-node markers (multi-node support)
this.nodeMarkers = new Map(); // nodeId -> THREE.Mesh
this._THREE = THREE;
// Body disruption blob
this._createBodyBlob(THREE);
@ -369,11 +377,43 @@ export class GaussianSplatRenderer {
bGeo.attributes.splatSize.needsUpdate = true;
}
// -- Update node positions ---------------------------------------------
// -- Update node positions (legacy single-node) ------------------------
if (nodes.length > 0 && nodes[0].position) {
const pos = nodes[0].position;
this.nodeMarker.position.set(pos[0], 0.5, pos[2]);
}
// -- Update dynamic per-node markers (multi-node support) --------------
if (nodes && nodes.length > 0 && this.scene) {
const THREE = this._THREE || window.THREE;
if (THREE) {
const activeIds = new Set();
for (const node of nodes) {
activeIds.add(node.node_id);
if (!this.nodeMarkers.has(node.node_id)) {
const geo = new THREE.SphereGeometry(0.25, 16, 16);
const mat = new THREE.MeshBasicMaterial({
color: NODE_MARKER_COLORS[node.node_id % NODE_MARKER_COLORS.length],
transparent: true,
opacity: 0.8,
});
const marker = new THREE.Mesh(geo, mat);
this.scene.add(marker);
this.nodeMarkers.set(node.node_id, marker);
}
const marker = this.nodeMarkers.get(node.node_id);
const pos = node.position || [0, 0, 0];
marker.position.set(pos[0], 0.5, pos[2]);
}
// Remove stale markers
for (const [id, marker] of this.nodeMarkers) {
if (!activeIds.has(id)) {
this.scene.remove(marker);
this.nodeMarkers.delete(id);
}
}
}
}
}
// ---- Render loop -------------------------------------------------------

View file

@ -84,6 +84,11 @@ class SensingService {
return [...this._rssiHistory];
}
/** Get per-node RSSI history (object keyed by node_id). */
getPerNodeRssiHistory() {
return { ...(this._perNodeRssiHistory || {}) };
}
/** Current connection state. */
get state() {
return this._state;
@ -327,6 +332,20 @@ class SensingService {
}
}
// Per-node RSSI tracking
if (!this._perNodeRssiHistory) this._perNodeRssiHistory = {};
if (data.node_features) {
for (const nf of data.node_features) {
if (!this._perNodeRssiHistory[nf.node_id]) {
this._perNodeRssiHistory[nf.node_id] = [];
}
this._perNodeRssiHistory[nf.node_id].push(nf.rssi_dbm);
if (this._perNodeRssiHistory[nf.node_id].length > this._maxHistory) {
this._perNodeRssiHistory[nf.node_id].shift();
}
}
}
// Notify all listeners
for (const cb of this._listeners) {
try {