feat: 4 sensing examples — sleep apnea, stress, room environment

examples/sleep/apnea_screener.py — detects breathing cessation
events (>10s), computes AHI score, classifies OSA severity.

examples/stress/hrv_stress_monitor.py — real-time SDNN/RMSSD
from mmWave HR, stress level with visual bar.

examples/environment/room_monitor.py — dual-sensor (CSI + mmWave)
room awareness: occupancy, light, RF fingerprint, activity events.

examples/README.md — index with hardware table and quick start.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-03-15 16:50:04 -04:00
parent ef582b4429
commit 24ea88cbe0
4 changed files with 503 additions and 0 deletions

35
examples/README.md Normal file
View file

@ -0,0 +1,35 @@
# Examples
Real-time sensing applications built on the RuView platform.
| Example | Sensors | What It Does |
|---------|---------|-------------|
| [Medical: Blood Pressure](medical/) | mmWave (COM4) | Contactless BP estimation from HRV |
| [Sleep: Apnea Screener](sleep/) | mmWave (COM4) | Detects breathing cessation events, computes AHI |
| [Stress: HRV Monitor](stress/) | mmWave (COM4) | Real-time stress level from heart rate variability |
| [Environment: Room Monitor](environment/) | CSI (COM7) + mmWave (COM4) | Occupancy, light, RF fingerprint, activity events |
## Hardware Required
| Port | Device | Cost |
|------|--------|------|
| COM7 | ESP32-S3 (WiFi CSI) | ~$9 |
| COM4 | ESP32-C6 + Seeed MR60BHA2 (60 GHz mmWave) | ~$15 |
## Quick Start
```bash
pip install pyserial numpy
# Blood pressure
python examples/medical/bp_estimator.py --port COM4
# Sleep apnea screening
python examples/sleep/apnea_screener.py --port COM4 --duration 3600
# Stress monitoring
python examples/stress/hrv_stress_monitor.py --port COM4
# Full room monitor (both sensors)
python examples/environment/room_monitor.py --csi-port COM7 --mmwave-port COM4
```

View file

@ -0,0 +1,190 @@
#!/usr/bin/env python3
"""
Room Environment Monitor WiFi CSI + mmWave + Light Sensor Fusion
Combines all available sensors to build a real-time room awareness picture:
- WiFi CSI (COM7): Presence, motion energy, room RF fingerprint
- mmWave (COM4): Occupancy count, distance, HR/BR of nearest person
- BH1750 (COM4): Ambient light level
Detects: occupancy changes, lighting anomalies, activity patterns,
room RF fingerprint drift (door/window state changes).
Usage:
python examples/environment/room_monitor.py --csi-port COM7 --mmwave-port COM4
"""
import argparse
import collections
import math
import re
import serial
import sys
import threading
import time
RE_HR = re.compile(r"'Real-time heart rate'.*?(\d+\.?\d*)\s*bpm", re.IGNORECASE)
RE_BR = re.compile(r"'Real-time respiratory rate'.*?(\d+\.?\d*)", re.IGNORECASE)
RE_PRES = re.compile(r"'Person Information'.*?state\s+(ON|OFF)", re.IGNORECASE)
RE_DIST = re.compile(r"'Distance to detection object'.*?(\d+\.?\d*)\s*cm", re.IGNORECASE)
RE_LUX = re.compile(r"'Seeed MR60BHA2 Illuminance'.*?(\d+\.?\d*)\s*lx", re.IGNORECASE)
RE_TARGETS = re.compile(r"'Target Number'.*?(\d+\.?\d*)", re.IGNORECASE)
RE_CSI_CB = re.compile(r"CSI cb #(\d+).*?len=(\d+).*?rssi=(-?\d+)")
RE_ANSI = re.compile(r"\x1b\[[0-9;]*m")
# Light categories
def light_category(lux):
if lux < 1: return "Dark"
if lux < 10: return "Dim"
if lux < 50: return "Low"
if lux < 200: return "Normal"
if lux < 500: return "Bright"
return "Very Bright"
def main():
parser = argparse.ArgumentParser(description="Room Environment Monitor")
parser.add_argument("--csi-port", default="COM7")
parser.add_argument("--mmwave-port", default="COM4")
parser.add_argument("--duration", type=int, default=120)
args = parser.parse_args()
# Shared state
state = {
"hr": 0.0, "br": 0.0, "presence_mw": False, "distance": 0.0,
"lux": 0.0, "targets": 0, "rssi": 0, "csi_frames": 0,
"mw_frames": 0, "events": [],
}
rssi_history = collections.deque(maxlen=60)
lux_history = collections.deque(maxlen=60)
lock = threading.Lock()
stop = threading.Event()
def read_mmwave():
try:
ser = serial.Serial(args.mmwave_port, 115200, timeout=1)
except Exception:
return
while not stop.is_set():
line = ser.readline().decode("utf-8", errors="replace")
clean = RE_ANSI.sub("", line)
with lock:
m = RE_HR.search(clean)
if m: state["hr"] = float(m.group(1)); state["mw_frames"] += 1
m = RE_BR.search(clean)
if m: state["br"] = float(m.group(1))
m = RE_PRES.search(clean)
if m:
new_pres = m.group(1) == "ON"
if new_pres != state["presence_mw"]:
event = f"Person {'arrived' if new_pres else 'left'} (mmWave)"
state["events"].append((time.time(), event))
state["presence_mw"] = new_pres
m = RE_DIST.search(clean)
if m: state["distance"] = float(m.group(1))
m = RE_LUX.search(clean)
if m:
lux = float(m.group(1))
old_cat = light_category(state["lux"])
new_cat = light_category(lux)
if old_cat != new_cat and state["lux"] > 0:
state["events"].append((time.time(), f"Light: {old_cat} -> {new_cat} ({lux:.1f} lx)"))
state["lux"] = lux
lux_history.append(lux)
m = RE_TARGETS.search(clean)
if m: state["targets"] = int(float(m.group(1)))
ser.close()
def read_csi():
try:
ser = serial.Serial(args.csi_port, 115200, timeout=1)
except Exception:
return
while not stop.is_set():
line = ser.readline().decode("utf-8", errors="replace")
m = RE_CSI_CB.search(line)
if m:
with lock:
state["csi_frames"] = int(m.group(1))
state["rssi"] = int(m.group(3))
rssi_history.append(int(m.group(3)))
ser.close()
t1 = threading.Thread(target=read_mmwave, daemon=True)
t2 = threading.Thread(target=read_csi, daemon=True)
t1.start()
t2.start()
print()
print("=" * 70)
print(" Room Environment Monitor (WiFi CSI + mmWave + Light)")
print("=" * 70)
print()
start_time = time.time()
last_print = 0
try:
while time.time() - start_time < args.duration:
time.sleep(1)
elapsed = int(time.time() - start_time)
if elapsed <= last_print or elapsed % 5 != 0:
continue
last_print = elapsed
with lock:
s = dict(state)
events = list(state["events"][-3:])
# RSSI stability (RF fingerprint drift)
rssi_std = 0
if len(rssi_history) >= 5:
vals = list(rssi_history)
mean = sum(vals) / len(vals)
rssi_std = math.sqrt(sum((x - mean)**2 for x in vals) / len(vals))
rf_status = "Stable" if rssi_std < 3 else "Shifting" if rssi_std < 6 else "Volatile"
pres = "YES" if s["presence_mw"] else "no"
lcat = light_category(s["lux"])
print(f" {elapsed:>4}s | Pres:{pres:>3} Dist:{s['distance']:>4.0f}cm | "
f"HR:{s['hr']:>3.0f} BR:{s['br']:>2.0f} | "
f"Light:{s['lux']:>5.1f}lx ({lcat:<6}) | "
f"RSSI:{s['rssi']:>3}dBm RF:{rf_status:<8} | "
f"CSI:{s['csi_frames']} MW:{s['mw_frames']}")
for ts, event in events:
age = elapsed - int(ts - start_time)
if age < 10:
print(f" ** EVENT: {event}")
except KeyboardInterrupt:
pass
stop.set()
time.sleep(1)
print()
print("=" * 70)
print(" ROOM SUMMARY")
print("=" * 70)
with lock:
print(f" Duration: {time.time()-start_time:.0f}s")
print(f" CSI frames: {state['csi_frames']}")
print(f" mmWave data: {state['mw_frames']} readings")
print(f" Last HR: {state['hr']:.0f} bpm")
print(f" Last BR: {state['br']:.0f}/min")
print(f" Light: {state['lux']:.1f} lux ({light_category(state['lux'])})")
if lux_history:
print(f" Light range: {min(lux_history):.1f} - {max(lux_history):.1f} lux")
if rssi_history:
print(f" RSSI range: {min(rssi_history)} to {max(rssi_history)} dBm (std={rssi_std:.1f})")
print(f" Events: {len(state['events'])}")
for ts, event in state["events"]:
print(f" [{int(ts-start_time):>4}s] {event}")
print()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""
Sleep Apnea Screener Contactless via 60 GHz mmWave
Monitors breathing rate from MR60BHA2 and detects apnea events
(breathing cessation > 10 seconds). Clinical threshold: > 5 events/hour
= Obstructive Sleep Apnea (mild), > 15 = moderate, > 30 = severe.
Usage:
python examples/sleep/apnea_screener.py --port COM4
python examples/sleep/apnea_screener.py --port COM4 --duration 3600 # 1 hour
"""
import argparse
import collections
import re
import serial
import sys
import time
RE_BR = re.compile(r"'Real-time respiratory rate'.*?(\d+\.?\d*)", re.IGNORECASE)
RE_HR = re.compile(r"'Real-time heart rate'.*?(\d+\.?\d*)", re.IGNORECASE)
RE_PRES = re.compile(r"'Person Information'.*?state\s+(ON|OFF)", re.IGNORECASE)
RE_ANSI = re.compile(r"\x1b\[[0-9;]*m")
APNEA_THRESHOLD_SEC = 10 # Breathing absent for >10s = apnea event
HYPOPNEA_BR = 6.0 # BR < 6/min = hypopnea (shallow breathing)
def main():
parser = argparse.ArgumentParser(description="Sleep Apnea Screener (mmWave)")
parser.add_argument("--port", default="COM4")
parser.add_argument("--baud", type=int, default=115200)
parser.add_argument("--duration", type=int, default=120, help="Duration in seconds")
args = parser.parse_args()
ser = serial.Serial(args.port, args.baud, timeout=1)
print()
print("=" * 60)
print(" Sleep Apnea Screener (60 GHz mmWave)")
print(" Lie still within 1m of sensor. Monitoring breathing.")
print("=" * 60)
print()
br_history = collections.deque(maxlen=600)
apnea_events = []
hypopnea_events = []
last_br_time = time.time()
last_br_value = 0.0
last_hr = 0.0
in_apnea = False
apnea_start = 0.0
start = time.time()
last_print = 0
try:
while time.time() - start < args.duration:
line = ser.readline().decode("utf-8", errors="replace")
clean = RE_ANSI.sub("", line)
m = RE_BR.search(clean)
if m:
br = float(m.group(1))
br_history.append((time.time(), br))
if br > 0:
last_br_time = time.time()
last_br_value = br
if in_apnea:
duration = time.time() - apnea_start
apnea_events.append(duration)
print(f" ** APNEA EVENT ENDED: {duration:.1f}s **")
in_apnea = False
if br < HYPOPNEA_BR and br > 0:
hypopnea_events.append(br)
elif br == 0 and not in_apnea:
gap = time.time() - last_br_time
if gap >= APNEA_THRESHOLD_SEC:
in_apnea = True
apnea_start = last_br_time
print(f" ** APNEA DETECTED at {int(time.time()-start)}s (no breath for {gap:.0f}s) **")
m = RE_HR.search(clean)
if m:
last_hr = float(m.group(1))
elapsed = int(time.time() - start)
if elapsed > last_print and elapsed % 10 == 0:
last_print = elapsed
gap = time.time() - last_br_time
status = "APNEA" if in_apnea else ("OK" if gap < 5 else f"gap {gap:.0f}s")
print(f" {elapsed:>4}s | BR {last_br_value:>4.0f}/min | HR {last_hr:>4.0f} | "
f"Apneas: {len(apnea_events)} | Hypopneas: {len(hypopnea_events)} | {status}")
except KeyboardInterrupt:
pass
ser.close()
duration_hr = (time.time() - start) / 3600.0
print()
print("=" * 60)
print(" APNEA SCREENING RESULTS")
print("=" * 60)
ahi = (len(apnea_events) + len(hypopnea_events)) / max(duration_hr, 0.01)
print(f" Duration: {time.time()-start:.0f}s ({duration_hr*60:.1f} min)")
print(f" Apnea events: {len(apnea_events)} (breathing absent > {APNEA_THRESHOLD_SEC}s)")
print(f" Hypopneas: {len(hypopnea_events)} (BR < {HYPOPNEA_BR}/min)")
print(f" AHI estimate: {ahi:.1f} events/hour")
print()
if ahi < 5:
print(" Classification: Normal (AHI < 5)")
elif ahi < 15:
print(" Classification: Mild OSA (AHI 5-14)")
elif ahi < 30:
print(" Classification: Moderate OSA (AHI 15-29)")
else:
print(" Classification: Severe OSA (AHI >= 30)")
print()
print(" NOT A MEDICAL DEVICE. Consult a sleep specialist for diagnosis.")
print()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,149 @@
#!/usr/bin/env python3
"""
Real-Time Stress Monitor via Heart Rate Variability (HRV)
Reads heart rate from MR60BHA2 mmWave radar and computes HRV metrics
to estimate stress level continuously.
HRV Science:
- SDNN < 50ms = high stress / low parasympathetic tone
- SDNN 50-100ms = moderate
- SDNN > 100ms = relaxed / high vagal tone
- RMSSD: successive difference metric, more sensitive to acute stress
Usage:
python examples/stress/hrv_stress_monitor.py --port COM4
"""
import argparse
import collections
import math
import re
import serial
import sys
import time
RE_HR = re.compile(r"'Real-time heart rate'.*?(\d+\.?\d*)\s*bpm", re.IGNORECASE)
RE_ANSI = re.compile(r"\x1b\[[0-9;]*m")
def compute_hrv(hr_values):
"""Compute HRV metrics from HR time series."""
if len(hr_values) < 5:
return {"sdnn": 0, "rmssd": 0, "mean_hr": 0, "stress": ""}
rr = [60000.0 / h for h in hr_values if h > 0]
if len(rr) < 5:
return {"sdnn": 0, "rmssd": 0, "mean_hr": 0, "stress": ""}
mean_rr = sum(rr) / len(rr)
sdnn = math.sqrt(sum((x - mean_rr) ** 2 for x in rr) / len(rr))
# RMSSD: root mean square of successive differences
diffs = [(rr[i+1] - rr[i]) ** 2 for i in range(len(rr) - 1)]
rmssd = math.sqrt(sum(diffs) / len(diffs)) if diffs else 0
mean_hr = sum(hr_values) / len(hr_values)
if sdnn < 30:
stress = "HIGH STRESS"
elif sdnn < 50:
stress = "Moderate Stress"
elif sdnn < 80:
stress = "Mild Stress"
elif sdnn < 100:
stress = "Relaxed"
else:
stress = "Very Relaxed"
return {"sdnn": sdnn, "rmssd": rmssd, "mean_hr": mean_hr, "stress": stress}
def stress_bar(sdnn, width=30):
"""Visual stress bar: more filled = more stressed."""
level = max(0, min(1, 1.0 - sdnn / 120.0))
filled = int(level * width)
bar = "#" * filled + "." * (width - filled)
return f"[{bar}] {level*100:.0f}%"
def main():
parser = argparse.ArgumentParser(description="HRV Stress Monitor (mmWave)")
parser.add_argument("--port", default="COM4")
parser.add_argument("--baud", type=int, default=115200)
parser.add_argument("--duration", type=int, default=120)
parser.add_argument("--window", type=int, default=60, help="HRV window in seconds")
args = parser.parse_args()
ser = serial.Serial(args.port, args.baud, timeout=1)
print()
print("=" * 60)
print(" Real-Time Stress Monitor (mmWave HRV)")
print(" Sit still within 1m. Lower stress = higher HRV.")
print("=" * 60)
print()
hr_buffer = collections.deque(maxlen=args.window)
start = time.time()
last_print = 0
min_stress = 999.0
max_stress = 0.0
readings = []
try:
while time.time() - start < args.duration:
line = ser.readline().decode("utf-8", errors="replace")
clean = RE_ANSI.sub("", line)
m = RE_HR.search(clean)
if m:
hr = float(m.group(1))
if 30 < hr < 200:
hr_buffer.append(hr)
elapsed = int(time.time() - start)
if elapsed > last_print and elapsed % 5 == 0 and len(hr_buffer) >= 3:
last_print = elapsed
hrv = compute_hrv(list(hr_buffer))
bar = stress_bar(hrv["sdnn"])
readings.append(hrv)
if hrv["sdnn"] > 0:
min_stress = min(min_stress, hrv["sdnn"])
max_stress = max(max_stress, hrv["sdnn"])
print(f" {elapsed:>4}s | HR {hrv['mean_hr']:>4.0f} | "
f"SDNN {hrv['sdnn']:>5.1f}ms | RMSSD {hrv['rmssd']:>5.1f}ms | "
f"{hrv['stress']:<16} | {bar}")
except KeyboardInterrupt:
pass
ser.close()
print()
print("=" * 60)
print(" STRESS SESSION SUMMARY")
print("=" * 60)
if readings:
avg_sdnn = sum(r["sdnn"] for r in readings) / len(readings)
avg_rmssd = sum(r["rmssd"] for r in readings) / len(readings)
avg_hr = sum(r["mean_hr"] for r in readings) / len(readings)
final_stress = readings[-1]["stress"]
print(f" Duration: {time.time()-start:.0f}s")
print(f" Avg HR: {avg_hr:.0f} bpm")
print(f" Avg SDNN: {avg_sdnn:.1f} ms {'(low — consider a break)' if avg_sdnn < 50 else '(healthy range)' if avg_sdnn > 70 else ''}")
print(f" Avg RMSSD: {avg_rmssd:.1f} ms")
print(f" SDNN range: {min_stress:.0f} - {max_stress:.0f} ms")
print(f" Assessment: {final_stress}")
print()
print(" SDNN Guide: <30=high stress, 30-50=moderate, 50-100=normal, >100=relaxed")
else:
print(" No data collected. Ensure person is in range.")
print()
if __name__ == "__main__":
main()