feat(server): per-node state pipeline for multi-node sensing (ADR-068, #249)

Replaces the single shared state pipeline with per-node HashMap<u8, NodeState>.
Each ESP32 node now gets independent:
- frame_history (temporal analysis)
- smoothed_person_score / prev_person_count
- smoothed_motion / baseline / debounce state
- vital sign detector + smoothing buffers
- RSSI history

Multi-node aggregation:
- Person count = sum of per-node counts for active nodes (seen <10s)
- SensingUpdate.nodes includes all active nodes
- estimated_persons reflects cross-node aggregate

Single-node deployments behave identically (HashMap has one entry).
Simulated data path unchanged for backward compatibility.

Closes #249
Refs #237, #276, #282

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-03-27 17:51:43 -04:00
parent 635c152e61
commit 8f927aaedb

View file

@ -16,7 +16,7 @@ mod vital_signs;
// Training pipeline modules (exposed via lib.rs)
use wifi_densepose_sensing_server::{graph_transformer, trainer, dataset, embedding};
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
@ -275,6 +275,59 @@ struct BoundingBox {
height: f64,
}
/// Per-node sensing state for multi-node deployments (issue #249).
/// Each ESP32 node gets its own frame history, smoothing buffers, and vital
/// sign detector so that data from different nodes is never mixed.
struct NodeState {
frame_history: VecDeque<Vec<f64>>,
smoothed_person_score: f64,
prev_person_count: usize,
smoothed_motion: f64,
current_motion_level: String,
debounce_counter: u32,
debounce_candidate: String,
baseline_motion: f64,
baseline_frames: u64,
smoothed_hr: f64,
smoothed_br: f64,
smoothed_hr_conf: f64,
smoothed_br_conf: f64,
hr_buffer: VecDeque<f64>,
br_buffer: VecDeque<f64>,
rssi_history: VecDeque<f64>,
vital_detector: VitalSignDetector,
latest_vitals: VitalSigns,
last_frame_time: Option<std::time::Instant>,
edge_vitals: Option<Esp32VitalsPacket>,
}
impl NodeState {
fn new() -> Self {
Self {
frame_history: VecDeque::new(),
smoothed_person_score: 0.0,
prev_person_count: 0,
smoothed_motion: 0.0,
current_motion_level: "absent".to_string(),
debounce_counter: 0,
debounce_candidate: "absent".to_string(),
baseline_motion: 0.0,
baseline_frames: 0,
smoothed_hr: 0.0,
smoothed_br: 0.0,
smoothed_hr_conf: 0.0,
smoothed_br_conf: 0.0,
hr_buffer: VecDeque::with_capacity(8),
br_buffer: VecDeque::with_capacity(8),
rssi_history: VecDeque::new(),
vital_detector: VitalSignDetector::new(10.0),
latest_vitals: VitalSigns::default(),
last_frame_time: None,
edge_vitals: None,
}
}
}
/// Shared application state
struct AppStateInner {
latest_update: Option<SensingUpdate>,
@ -364,6 +417,10 @@ struct AppStateInner {
// ── Adaptive classifier (environment-tuned) ──────────────────────────
/// Trained adaptive model (loaded from data/adaptive_model.json or trained at runtime).
adaptive_model: Option<adaptive_classifier::AdaptiveModel>,
// ── Per-node state (issue #249) ─────────────────────────────────────
/// Per-node sensing state for multi-node deployments.
/// Keyed by `node_id` from the ESP32 frame header.
node_states: HashMap<u8, NodeState>,
}
/// If no ESP32 frame arrives within this duration, source reverts to offline.
@ -964,6 +1021,44 @@ fn smooth_and_classify(state: &mut AppStateInner, raw: &mut ClassificationInfo,
raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0);
}
/// Per-node variant of `smooth_and_classify` that operates on a `NodeState`
/// instead of `AppStateInner` (issue #249).
fn smooth_and_classify_node(ns: &mut NodeState, raw: &mut ClassificationInfo, raw_motion: f64) {
ns.baseline_frames += 1;
if ns.baseline_frames < BASELINE_WARMUP {
ns.baseline_motion = ns.baseline_motion * 0.9 + raw_motion * 0.1;
} else if raw_motion < ns.smoothed_motion + 0.05 {
ns.baseline_motion = ns.baseline_motion * (1.0 - BASELINE_EMA_ALPHA)
+ raw_motion * BASELINE_EMA_ALPHA;
}
let adjusted = (raw_motion - ns.baseline_motion * 0.7).max(0.0);
ns.smoothed_motion = ns.smoothed_motion * (1.0 - MOTION_EMA_ALPHA)
+ adjusted * MOTION_EMA_ALPHA;
let sm = ns.smoothed_motion;
let candidate = raw_classify(sm);
if candidate == ns.current_motion_level {
ns.debounce_counter = 0;
ns.debounce_candidate = candidate;
} else if candidate == ns.debounce_candidate {
ns.debounce_counter += 1;
if ns.debounce_counter >= DEBOUNCE_FRAMES {
ns.current_motion_level = candidate;
ns.debounce_counter = 0;
}
} else {
ns.debounce_candidate = candidate;
ns.debounce_counter = 1;
}
raw.motion_level = ns.current_motion_level.clone();
raw.presence = sm > 0.03;
raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0);
}
/// If an adaptive model is loaded, override the classification with the
/// model's prediction. Uses the full 15-feature vector for higher accuracy.
fn adaptive_override(state: &AppStateInner, features: &FeatureInfo, classification: &mut ClassificationInfo) {
@ -1064,6 +1159,55 @@ fn smooth_vitals(state: &mut AppStateInner, raw: &VitalSigns) -> VitalSigns {
}
}
/// Per-node variant of `smooth_vitals` that operates on a `NodeState` (issue #249).
fn smooth_vitals_node(ns: &mut NodeState, raw: &VitalSigns) -> VitalSigns {
let raw_hr = raw.heart_rate_bpm.unwrap_or(0.0);
let raw_br = raw.breathing_rate_bpm.unwrap_or(0.0);
let hr_ok = ns.smoothed_hr < 1.0 || (raw_hr - ns.smoothed_hr).abs() < HR_MAX_JUMP;
let br_ok = ns.smoothed_br < 1.0 || (raw_br - ns.smoothed_br).abs() < BR_MAX_JUMP;
if hr_ok && raw_hr > 0.0 {
ns.hr_buffer.push_back(raw_hr);
if ns.hr_buffer.len() > VITAL_MEDIAN_WINDOW { ns.hr_buffer.pop_front(); }
}
if br_ok && raw_br > 0.0 {
ns.br_buffer.push_back(raw_br);
if ns.br_buffer.len() > VITAL_MEDIAN_WINDOW { ns.br_buffer.pop_front(); }
}
let trimmed_hr = trimmed_mean(&ns.hr_buffer);
let trimmed_br = trimmed_mean(&ns.br_buffer);
if trimmed_hr > 0.0 {
if ns.smoothed_hr < 1.0 {
ns.smoothed_hr = trimmed_hr;
} else if (trimmed_hr - ns.smoothed_hr).abs() > HR_DEAD_BAND {
ns.smoothed_hr = ns.smoothed_hr * (1.0 - VITAL_EMA_ALPHA)
+ trimmed_hr * VITAL_EMA_ALPHA;
}
}
if trimmed_br > 0.0 {
if ns.smoothed_br < 1.0 {
ns.smoothed_br = trimmed_br;
} else if (trimmed_br - ns.smoothed_br).abs() > BR_DEAD_BAND {
ns.smoothed_br = ns.smoothed_br * (1.0 - VITAL_EMA_ALPHA)
+ trimmed_br * VITAL_EMA_ALPHA;
}
}
ns.smoothed_hr_conf = ns.smoothed_hr_conf * 0.92 + raw.heartbeat_confidence * 0.08;
ns.smoothed_br_conf = ns.smoothed_br_conf * 0.92 + raw.breathing_confidence * 0.08;
VitalSigns {
breathing_rate_bpm: if ns.smoothed_br > 1.0 { Some(ns.smoothed_br) } else { None },
heart_rate_bpm: if ns.smoothed_hr > 1.0 { Some(ns.smoothed_hr) } else { None },
breathing_confidence: ns.smoothed_br_conf,
heartbeat_confidence: ns.smoothed_hr_conf,
signal_quality: raw.signal_quality,
}
}
/// Trimmed mean: sort, drop top/bottom 25%, average the middle 50%.
/// More robust than median (uses more data) and less noisy than raw mean.
fn trimmed_mean(buf: &VecDeque<f64>) -> f64 {
@ -2827,6 +2971,23 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
// "no detection" because it only renders sensing_update msgs.
s.source = "esp32".to_string();
s.last_esp32_frame = Some(std::time::Instant::now());
// ── Per-node state for edge vitals (issue #249) ──────
let node_id = vitals.node_id;
let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new);
ns.last_frame_time = Some(std::time::Instant::now());
ns.edge_vitals = Some(vitals.clone());
ns.rssi_history.push_back(vitals.rssi as f64);
if ns.rssi_history.len() > 60 { ns.rssi_history.pop_front(); }
// Store per-node person count from edge vitals.
let node_est = if vitals.presence {
(vitals.n_persons as usize).max(1)
} else {
0
};
ns.prev_person_count = node_est;
s.tick += 1;
let tick = s.tick;
@ -2836,11 +2997,25 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
let motion_score = if vitals.motion { 0.8 }
else if vitals.presence { 0.3 }
else { 0.05 };
let est_persons = if vitals.presence {
(vitals.n_persons as usize).max(1)
} else {
0
};
// Aggregate person count across all active nodes.
let now = std::time::Instant::now();
let total_persons: usize = s.node_states.values()
.filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
.map(|n| n.prev_person_count)
.sum();
// Build nodes array with all active nodes.
let active_nodes: Vec<NodeInfo> = s.node_states.iter()
.filter(|(_, n)| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
.map(|(&id, n)| NodeInfo {
node_id: id,
rssi_dbm: n.rssi_history.back().copied().unwrap_or(0.0),
position: [2.0, 0.0, 1.5],
amplitude: vec![],
subcarrier_count: 0,
})
.collect();
let features = FeatureInfo {
mean_rssi: vitals.rssi as f64,
@ -2866,13 +3041,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
source: "esp32".to_string(),
tick,
nodes: vec![NodeInfo {
node_id: vitals.node_id,
rssi_dbm: vitals.rssi as f64,
position: [2.0, 0.0, 1.5],
amplitude: vec![],
subcarrier_count: 0,
}],
nodes: active_nodes,
features: features.clone(),
classification,
signal_field,
@ -2892,7 +3061,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
pose_keypoints: None,
model_status: None,
persons: None,
estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
estimated_persons: if total_persons > 0 { Some(total_persons) } else { None },
};
let persons = derive_pose_from_sensing(&update);
@ -2935,24 +3104,90 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
s.source = "esp32".to_string();
s.last_esp32_frame = Some(std::time::Instant::now());
// Append current amplitudes to history before extracting features so
// that temporal analysis includes the most recent frame.
// Also maintain global frame_history for backward compat
// (simulation path, REST endpoints, etc.).
s.frame_history.push_back(frame.amplitudes.clone());
if s.frame_history.len() > FRAME_HISTORY_CAPACITY {
s.frame_history.pop_front();
}
let sample_rate_hz = 1000.0 / 500.0_f64; // default tick; ESP32 frames arrive as fast as they come
let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) =
extract_features_from_frame(&frame, &s.frame_history, sample_rate_hz);
smooth_and_classify(&mut s, &mut classification, raw_motion);
adaptive_override(&s, &features, &mut classification);
// ── Per-node processing (issue #249) ──────────────────
// Process entirely within per-node state so different
// ESP32 nodes never mix their smoothing/vitals buffers.
// We scope the mutable borrow of node_states so we can
// access other AppStateInner fields afterward.
let node_id = frame.node_id;
let adaptive_model_ref = s.adaptive_model.as_ref().map(|m| m as *const _);
let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new);
ns.last_frame_time = Some(std::time::Instant::now());
ns.frame_history.push_back(frame.amplitudes.clone());
if ns.frame_history.len() > FRAME_HISTORY_CAPACITY {
ns.frame_history.pop_front();
}
let sample_rate_hz = 1000.0 / 500.0_f64;
let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) =
extract_features_from_frame(&frame, &ns.frame_history, sample_rate_hz);
smooth_and_classify_node(ns, &mut classification, raw_motion);
// SAFETY: adaptive_model_ref points into s which we hold
// via write lock; the model is not mutated here. We use a
// raw pointer to break the borrow-checker deadlock between
// node_states and adaptive_model (both inside s).
if let Some(model_ptr) = adaptive_model_ref {
let model: &adaptive_classifier::AdaptiveModel = unsafe { &*model_ptr };
let amps = ns.frame_history.back()
.map(|v| v.as_slice())
.unwrap_or(&[]);
let feat_arr = adaptive_classifier::features_from_runtime(
&serde_json::json!({
"variance": features.variance,
"motion_band_power": features.motion_band_power,
"breathing_band_power": features.breathing_band_power,
"spectral_power": features.spectral_power,
"dominant_freq_hz": features.dominant_freq_hz,
"change_points": features.change_points,
"mean_rssi": features.mean_rssi,
}),
amps,
);
let (label, conf) = model.classify(&feat_arr);
classification.motion_level = label.to_string();
classification.presence = label != "absent";
classification.confidence = (conf * 0.7 + classification.confidence * 0.3).clamp(0.0, 1.0);
}
ns.rssi_history.push_back(features.mean_rssi);
if ns.rssi_history.len() > 60 {
ns.rssi_history.pop_front();
}
let raw_vitals = ns.vital_detector.process_frame(
&frame.amplitudes,
&frame.phases,
);
let vitals = smooth_vitals_node(ns, &raw_vitals);
ns.latest_vitals = vitals.clone();
let raw_score = compute_person_score(&features);
ns.smoothed_person_score = ns.smoothed_person_score * 0.90 + raw_score * 0.10;
if classification.presence {
let count = score_to_person_count(ns.smoothed_person_score, ns.prev_person_count);
ns.prev_person_count = count;
} else {
ns.prev_person_count = 0;
}
// Done with per-node mutable borrow; now read aggregated
// state from all nodes (the borrow of `ns` ends here).
// (We re-borrow node_states immutably via `s` below.)
// Update RSSI history
s.rssi_history.push_back(features.mean_rssi);
if s.rssi_history.len() > 60 {
s.rssi_history.pop_front();
}
s.latest_vitals = vitals.clone();
s.tick += 1;
let tick = s.tick;
@ -2961,37 +3196,33 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
else if classification.motion_level == "present_still" { 0.3 }
else { 0.05 };
let raw_vitals = s.vital_detector.process_frame(
&frame.amplitudes,
&frame.phases,
);
let vitals = smooth_vitals(&mut s, &raw_vitals);
s.latest_vitals = vitals.clone();
// Aggregate person count across all active nodes.
let now = std::time::Instant::now();
let total_persons: usize = s.node_states.values()
.filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
.map(|n| n.prev_person_count)
.sum();
// Multi-person estimation with temporal smoothing (EMA α=0.10).
let raw_score = compute_person_score(&features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
s.prev_person_count = count;
count
} else {
s.prev_person_count = 0;
0
};
// Build nodes array with all active nodes.
let active_nodes: Vec<NodeInfo> = s.node_states.iter()
.filter(|(_, n)| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
.map(|(&id, n)| NodeInfo {
node_id: id,
rssi_dbm: n.rssi_history.back().copied().unwrap_or(0.0),
position: [2.0, 0.0, 1.5],
amplitude: n.frame_history.back()
.map(|a| a.iter().take(56).cloned().collect())
.unwrap_or_default(),
subcarrier_count: n.frame_history.back().map_or(0, |a| a.len()),
})
.collect();
let mut update = SensingUpdate {
msg_type: "sensing_update".to_string(),
timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
source: "esp32".to_string(),
tick,
nodes: vec![NodeInfo {
node_id: frame.node_id,
rssi_dbm: features.mean_rssi,
position: [2.0, 0.0, 1.5],
amplitude: frame.amplitudes.iter().take(56).cloned().collect(),
subcarrier_count: frame.n_subcarriers as usize,
}],
nodes: active_nodes,
features: features.clone(),
classification,
signal_field: generate_signal_field(
@ -3008,7 +3239,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
pose_keypoints: None,
model_status: None,
persons: None,
estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
estimated_persons: if total_persons > 0 { Some(total_persons) } else { None },
};
let persons = derive_pose_from_sensing(&update);
@ -3760,6 +3991,7 @@ async fn main() {
m.trained_frames, m.training_accuracy * 100.0);
m
}),
node_states: HashMap::new(),
}));
// Start background tasks based on source