diff --git a/scripts/record-csi-udp.py b/scripts/record-csi-udp.py new file mode 100644 index 00000000..2c0bdb11 --- /dev/null +++ b/scripts/record-csi-udp.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +""" +Lightweight ESP32 CSI UDP recorder (ADR-079). + +Captures raw CSI packets from ESP32 nodes over UDP and writes to JSONL. +Runs alongside collect-ground-truth.py for synchronized capture. + +Usage: + python scripts/record-csi-udp.py --duration 300 --output data/recordings +""" + +import argparse +import json +import os +import socket +import struct +import time + + +def parse_csi_packet(data): + """Parse ADR-018 binary CSI packet into dict.""" + if len(data) < 8: + return None + + # ADR-018 header: [magic(2), len(2), node_id(1), seq(1), rssi(1), channel(1), iq_data...] + # Simplified: extract what we can from the raw packet + node_id = data[4] if len(data) > 4 else 0 + rssi = struct.unpack('b', bytes([data[6]]))[0] if len(data) > 6 else 0 + channel = data[7] if len(data) > 7 else 0 + + # IQ data starts at offset 8 + iq_data = data[8:] if len(data) > 8 else b'' + n_subcarriers = len(iq_data) // 2 # I,Q pairs + + # Compute amplitudes + amplitudes = [] + for i in range(0, len(iq_data) - 1, 2): + I = struct.unpack('b', bytes([iq_data[i]]))[0] + Q = struct.unpack('b', bytes([iq_data[i + 1]]))[0] + amplitudes.append(round((I * I + Q * Q) ** 0.5, 2)) + + return { + "type": "raw_csi", + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S.") + f"{int(time.time() * 1000) % 1000:03d}Z", + "ts_ns": time.time_ns(), + "node_id": node_id, + "rssi": rssi, + "channel": channel, + "subcarriers": n_subcarriers, + "amplitudes": amplitudes, + "iq_hex": iq_data.hex(), + } + + +def main(): + parser = argparse.ArgumentParser(description="Record ESP32 CSI over UDP") + parser.add_argument("--port", type=int, default=5005, help="UDP port (default: 5005)") + parser.add_argument("--duration", type=int, default=300, help="Duration in seconds (default: 300)") + parser.add_argument("--output", default="data/recordings", help="Output directory") + args = parser.parse_args() + + os.makedirs(args.output, exist_ok=True) + filename = f"csi-{int(time.time())}.csi.jsonl" + filepath = os.path.join(args.output, filename) + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(("0.0.0.0", args.port)) + sock.settimeout(1) + + print(f"Recording CSI on UDP :{args.port} for {args.duration}s") + print(f"Output: {filepath}") + + count = 0 + start = time.time() + nodes_seen = set() + + with open(filepath, "w") as f: + try: + while time.time() - start < args.duration: + try: + data, addr = sock.recvfrom(4096) + frame = parse_csi_packet(data) + if frame: + f.write(json.dumps(frame) + "\n") + count += 1 + nodes_seen.add(frame["node_id"]) + + if count % 500 == 0: + elapsed = time.time() - start + rate = count / elapsed + print(f" {count} frames | {rate:.0f} fps | " + f"nodes: {sorted(nodes_seen)} | " + f"{elapsed:.0f}s / {args.duration}s") + except socket.timeout: + continue + except KeyboardInterrupt: + print("\nStopped by user") + + sock.close() + elapsed = time.time() - start + print(f"\n=== CSI Recording Complete ===") + print(f" Frames: {count}") + print(f" Duration: {elapsed:.0f}s") + print(f" Rate: {count / max(elapsed, 1):.0f} fps") + print(f" Nodes: {sorted(nodes_seen)}") + print(f" Output: {filepath}") + + +if __name__ == "__main__": + main()