mirror of
https://github.com/ruvnet/RuView.git
synced 2026-04-28 05:59:32 +00:00
- Implemented the WiFi DensePose model in PyTorch, including CSI phase processing, modality translation, and DensePose prediction heads. - Added a comprehensive training utility for the model, including loss functions and training steps. - Created a CSV file to document hardware specifications, architecture details, training parameters, performance metrics, and advantages of the model.
202 lines
No EOL
7.5 KiB
Python
202 lines
No EOL
7.5 KiB
Python
# WiFi DensePose Implementation - Core Neural Network Architecture
|
|
# Based on "DensePose From WiFi" by Carnegie Mellon University
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
import numpy as np
|
|
import math
|
|
from typing import Dict, List, Tuple, Optional
|
|
from collections import OrderedDict
|
|
|
|
# CSI Phase Sanitization Module
|
|
class CSIPhaseProcessor:
|
|
"""
|
|
Processes raw CSI phase data through unwrapping, filtering, and linear fitting
|
|
Based on the phase sanitization methodology from the paper
|
|
"""
|
|
|
|
def __init__(self, num_subcarriers: int = 30):
|
|
self.num_subcarriers = num_subcarriers
|
|
|
|
def unwrap_phase(self, phase_data: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Unwrap phase values to handle discontinuities
|
|
Args:
|
|
phase_data: Raw phase data of shape (samples, frequencies, antennas, antennas)
|
|
Returns:
|
|
Unwrapped phase data
|
|
"""
|
|
unwrapped = np.copy(phase_data)
|
|
|
|
for i in range(1, phase_data.shape[1]): # Along frequency dimension
|
|
diff = unwrapped[:, i] - unwrapped[:, i-1]
|
|
|
|
# Apply unwrapping logic
|
|
unwrapped[:, i] = np.where(diff > np.pi,
|
|
unwrapped[:, i-1] + diff - 2*np.pi,
|
|
unwrapped[:, i])
|
|
unwrapped[:, i] = np.where(diff < -np.pi,
|
|
unwrapped[:, i-1] + diff + 2*np.pi,
|
|
unwrapped[:, i])
|
|
|
|
return unwrapped
|
|
|
|
def apply_filters(self, phase_data: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Apply median and uniform filters to eliminate outliers
|
|
"""
|
|
# Simple moving average as approximation for filters
|
|
filtered = np.copy(phase_data)
|
|
|
|
# Apply simple smoothing in time dimension
|
|
for i in range(1, phase_data.shape[0]-1):
|
|
filtered[i] = (phase_data[i-1] + phase_data[i] + phase_data[i+1]) / 3
|
|
|
|
# Apply smoothing in frequency dimension
|
|
for i in range(1, phase_data.shape[1]-1):
|
|
filtered[:, i] = (filtered[:, i-1] + filtered[:, i] + filtered[:, i+1]) / 3
|
|
|
|
return filtered
|
|
|
|
def linear_fitting(self, phase_data: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Apply linear fitting to remove systematic phase drift
|
|
"""
|
|
fitted_data = np.copy(phase_data)
|
|
F = self.num_subcarriers
|
|
|
|
for sample_idx in range(phase_data.shape[0]):
|
|
for ant_i in range(phase_data.shape[2]):
|
|
for ant_j in range(phase_data.shape[3]):
|
|
phase_seq = phase_data[sample_idx, :, ant_i, ant_j]
|
|
|
|
# Calculate linear coefficients
|
|
alpha1 = (phase_seq[-1] - phase_seq[0]) / (2 * np.pi * F)
|
|
alpha0 = np.mean(phase_seq)
|
|
|
|
# Apply linear fitting
|
|
frequencies = np.arange(1, F + 1)
|
|
linear_trend = alpha1 * frequencies + alpha0
|
|
fitted_data[sample_idx, :, ant_i, ant_j] = phase_seq - linear_trend
|
|
|
|
return fitted_data
|
|
|
|
def sanitize_phase(self, raw_phase: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Complete phase sanitization pipeline
|
|
"""
|
|
# Step 1: Unwrap phase
|
|
unwrapped = self.unwrap_phase(raw_phase)
|
|
|
|
# Step 2: Apply filters
|
|
filtered = self.apply_filters(unwrapped)
|
|
|
|
# Step 3: Linear fitting
|
|
sanitized = self.linear_fitting(filtered)
|
|
|
|
return sanitized
|
|
|
|
# Modality Translation Network
|
|
class ModalityTranslationNetwork(nn.Module):
|
|
"""
|
|
Translates CSI domain features to spatial domain features
|
|
Input: 150x3x3 amplitude and phase tensors
|
|
Output: 3x720x1280 feature map
|
|
"""
|
|
|
|
def __init__(self, input_dim: int = 1350, hidden_dim: int = 512, output_height: int = 720, output_width: int = 1280):
|
|
super(ModalityTranslationNetwork, self).__init__()
|
|
|
|
self.input_dim = input_dim
|
|
self.output_height = output_height
|
|
self.output_width = output_width
|
|
|
|
# Amplitude encoder
|
|
self.amplitude_encoder = nn.Sequential(
|
|
nn.Linear(input_dim, hidden_dim),
|
|
nn.ReLU(),
|
|
nn.Linear(hidden_dim, hidden_dim//2),
|
|
nn.ReLU(),
|
|
nn.Linear(hidden_dim//2, hidden_dim//4),
|
|
nn.ReLU()
|
|
)
|
|
|
|
# Phase encoder
|
|
self.phase_encoder = nn.Sequential(
|
|
nn.Linear(input_dim, hidden_dim),
|
|
nn.ReLU(),
|
|
nn.Linear(hidden_dim, hidden_dim//2),
|
|
nn.ReLU(),
|
|
nn.Linear(hidden_dim//2, hidden_dim//4),
|
|
nn.ReLU()
|
|
)
|
|
|
|
# Feature fusion
|
|
self.fusion_mlp = nn.Sequential(
|
|
nn.Linear(hidden_dim//2, hidden_dim//4),
|
|
nn.ReLU(),
|
|
nn.Linear(hidden_dim//4, 24*24), # Reshape to 24x24
|
|
nn.ReLU()
|
|
)
|
|
|
|
# Spatial processing
|
|
self.spatial_conv = nn.Sequential(
|
|
nn.Conv2d(1, 64, kernel_size=3, padding=1),
|
|
nn.ReLU(),
|
|
nn.Conv2d(64, 128, kernel_size=3, padding=1),
|
|
nn.ReLU(),
|
|
nn.AdaptiveAvgPool2d((6, 6)) # Compress to 6x6
|
|
)
|
|
|
|
# Upsampling to target resolution
|
|
self.upsample = nn.Sequential(
|
|
nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1), # 12x12
|
|
nn.ReLU(),
|
|
nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1), # 24x24
|
|
nn.ReLU(),
|
|
nn.ConvTranspose2d(32, 16, kernel_size=4, stride=2, padding=1), # 48x48
|
|
nn.ReLU(),
|
|
nn.ConvTranspose2d(16, 8, kernel_size=4, stride=2, padding=1), # 96x96
|
|
nn.ReLU(),
|
|
)
|
|
|
|
# Final upsampling to target size
|
|
self.final_upsample = nn.ConvTranspose2d(8, 3, kernel_size=1)
|
|
|
|
def forward(self, amplitude_tensor: torch.Tensor, phase_tensor: torch.Tensor) -> torch.Tensor:
|
|
batch_size = amplitude_tensor.shape[0]
|
|
|
|
# Flatten input tensors
|
|
amplitude_flat = amplitude_tensor.view(batch_size, -1) # [B, 1350]
|
|
phase_flat = phase_tensor.view(batch_size, -1) # [B, 1350]
|
|
|
|
# Encode features
|
|
amp_features = self.amplitude_encoder(amplitude_flat) # [B, 128]
|
|
phase_features = self.phase_encoder(phase_flat) # [B, 128]
|
|
|
|
# Fuse features
|
|
fused_features = torch.cat([amp_features, phase_features], dim=1) # [B, 256]
|
|
spatial_features = self.fusion_mlp(fused_features) # [B, 576]
|
|
|
|
# Reshape to 2D feature map
|
|
spatial_map = spatial_features.view(batch_size, 1, 24, 24) # [B, 1, 24, 24]
|
|
|
|
# Apply spatial convolutions
|
|
conv_features = self.spatial_conv(spatial_map) # [B, 128, 6, 6]
|
|
|
|
# Upsample
|
|
upsampled = self.upsample(conv_features) # [B, 8, 96, 96]
|
|
|
|
# Final upsampling using interpolation to reach target size
|
|
final_features = self.final_upsample(upsampled) # [B, 3, 96, 96]
|
|
|
|
# Interpolate to target resolution
|
|
output = F.interpolate(final_features, size=(self.output_height, self.output_width),
|
|
mode='bilinear', align_corners=False)
|
|
|
|
return output
|
|
|
|
print("Modality Translation Network implementation completed!")
|
|
print("Input: 150x3x3 amplitude and phase tensors")
|
|
print("Output: 3x720x1280 feature map") |