Ruview/v1/src/sensing/backend.py
ruv b7e0f07e6e feat: Sensing-only UI mode with Gaussian splat visualization and Rust migration ADR
- Add Python WebSocket sensing server (ws_server.py) with ESP32 UDP CSI
  and Windows RSSI auto-detect collectors on port 8765
- Add Three.js Gaussian splat renderer with custom GLSL shaders for
  real-time WiFi signal field visualization (blue→green→red gradient)
- Add SensingTab component with RSSI sparkline, feature meters, and
  motion classification badge
- Add sensing.service.js WebSocket client with reconnect and simulation fallback
- Implement sensing-only mode: suppress all DensePose API calls when
  FastAPI backend (port 8000) is not running, clean console output
- ADR-019: Document sensing-only UI architecture and data flow
- ADR-020: Migrate AI/model inference to Rust with RuVector ONNX Runtime,
  replacing ~2.7GB Python stack with ~50MB static binary
- Add ruvnet/ruvector as upstream remote for RuVector crate ecosystem

Co-Authored-By: claude-flow <ruv@ruv.net>
2026-02-28 14:37:29 -05:00

165 lines
5.3 KiB
Python

"""
Common sensing backend interface.
Defines the ``SensingBackend`` protocol and the ``CommodityBackend`` concrete
implementation that wires together the RSSI collector, feature extractor, and
classifier into a single coherent pipeline.
The ``Capability`` enum enumerates all possible sensing capabilities. The
``CommodityBackend`` honestly reports that it supports only PRESENCE and MOTION.
"""
from __future__ import annotations
import logging
from enum import Enum, auto
from typing import List, Optional, Protocol, Set, runtime_checkable
from v1.src.sensing.classifier import MotionLevel, PresenceClassifier, SensingResult
from v1.src.sensing.feature_extractor import RssiFeatureExtractor, RssiFeatures
from v1.src.sensing.rssi_collector import (
LinuxWifiCollector,
SimulatedCollector,
WindowsWifiCollector,
WifiCollector,
WifiSample,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Capability enum
# ---------------------------------------------------------------------------
class Capability(Enum):
"""All possible sensing capabilities across backend tiers."""
PRESENCE = auto()
MOTION = auto()
RESPIRATION = auto()
LOCATION = auto()
POSE = auto()
# ---------------------------------------------------------------------------
# Backend protocol
# ---------------------------------------------------------------------------
@runtime_checkable
class SensingBackend(Protocol):
"""Protocol that all sensing backends must implement."""
def get_features(self) -> RssiFeatures:
"""Extract current features from the sensing pipeline."""
...
def get_capabilities(self) -> Set[Capability]:
"""Return the set of capabilities this backend supports."""
...
# ---------------------------------------------------------------------------
# Commodity backend
# ---------------------------------------------------------------------------
class CommodityBackend:
"""
RSSI-based commodity sensing backend.
Wires together:
- A WiFi collector (real or simulated)
- An RSSI feature extractor
- A presence/motion classifier
Capabilities: PRESENCE and MOTION only.
Parameters
----------
collector : WifiCollector-compatible object
The data source (LinuxWifiCollector or SimulatedCollector).
extractor : RssiFeatureExtractor, optional
Feature extractor (created with defaults if not provided).
classifier : PresenceClassifier, optional
Classifier (created with defaults if not provided).
"""
SUPPORTED_CAPABILITIES: Set[Capability] = frozenset(
{Capability.PRESENCE, Capability.MOTION}
)
def __init__(
self,
collector: LinuxWifiCollector | SimulatedCollector | WindowsWifiCollector,
extractor: Optional[RssiFeatureExtractor] = None,
classifier: Optional[PresenceClassifier] = None,
) -> None:
self._collector = collector
self._extractor = extractor or RssiFeatureExtractor()
self._classifier = classifier or PresenceClassifier()
@property
def collector(self) -> LinuxWifiCollector | SimulatedCollector | WindowsWifiCollector:
return self._collector
@property
def extractor(self) -> RssiFeatureExtractor:
return self._extractor
@property
def classifier(self) -> PresenceClassifier:
return self._classifier
# -- SensingBackend protocol ---------------------------------------------
def get_features(self) -> RssiFeatures:
"""
Get current features from the latest collected samples.
Uses the extractor's window_seconds to determine how many samples
to pull from the collector's ring buffer.
"""
window = self._extractor.window_seconds
sample_rate = self._collector.sample_rate_hz
n_needed = int(window * sample_rate)
samples = self._collector.get_samples(n=n_needed)
return self._extractor.extract(samples)
def get_capabilities(self) -> Set[Capability]:
"""CommodityBackend supports PRESENCE and MOTION only."""
return set(self.SUPPORTED_CAPABILITIES)
# -- convenience methods -------------------------------------------------
def get_result(self) -> SensingResult:
"""
Run the full pipeline: collect -> extract -> classify.
Returns
-------
SensingResult
Classification result with motion level and confidence.
"""
features = self.get_features()
return self._classifier.classify(features)
def start(self) -> None:
"""Start the underlying collector."""
self._collector.start()
logger.info(
"CommodityBackend started (capabilities: %s)",
", ".join(c.name for c in self.SUPPORTED_CAPABILITIES),
)
def stop(self) -> None:
"""Stop the underlying collector."""
self._collector.stop()
logger.info("CommodityBackend stopped")
def is_capable(self, capability: Capability) -> bool:
"""Check whether this backend supports a specific capability."""
return capability in self.SUPPORTED_CAPABILITIES
def __repr__(self) -> str:
caps = ", ".join(c.name for c in sorted(self.SUPPORTED_CAPABILITIES, key=lambda c: c.value))
return f"CommodityBackend(capabilities=[{caps}])"