Pulse/cmd/pulse-sensor-proxy/config_cmd.go
rcourtman 0565781655 feat(sensor-proxy): Phase 2 - atomic config management with CLI
Implements bullet-proof configuration management to completely eliminate
allowed_nodes corruption by design. This builds on Phase 1 (file-only mode)
by replacing all shell/Python config manipulation with proper Go tooling.

**New Features:**
- `pulse-sensor-proxy config validate` - parse and validate config files
- `pulse-sensor-proxy config set-allowed-nodes` - atomic node list updates
- File locking via flock prevents concurrent write races
- Atomic writes (temp file + rename) ensure consistency
- systemd ExecStartPre validation prevents startup with bad config

**Architectural Changes:**
1. Installer now calls config CLI instead of embedded Python/shell scripts
2. All config mutations go through single authoritative writer
3. Deduplication and normalization handled in Go (reuses existing logic)
4. Sanitizer kept as noisy failsafe (warns if corruption still occurs)

**Implementation Details:**
- New cmd/pulse-sensor-proxy/config_cmd.go with cobra commands
- withLockedFile() wrapper ensures exclusive access
- atomicWriteFile() uses temp + rename pattern
- Installer update_allowed_nodes() simplified to CLI calls
- Both systemd service modes include ExecStartPre validation

**Why This Works:**
- Single code path for all writes (no shell/Python divergence)
- File locking serializes self-heal timer + manual installer runs
- Validation gate prevents proxy from starting with corrupt config
- CLI uses same YAML parser as the daemon (guaranteed compatibility)

**Phase 2 Benefits:**
- Corruption impossible by design (not just detected and fixed)
- No more Python dependency for config management
- Atomic operations prevent partial writes
- Clear error messages on validation failures

The defensive sanitizer remains active but now logs loudly if triggered,
allowing us to confirm Phase 2 eliminates corruption in production before
removing the safety net entirely.

This completes the fix for the recurring temperature monitoring outages.

Related to Phase 1 commit 53dec6010
2025-11-19 09:37:49 +00:00

331 lines
9 KiB
Go

package main
import (
"fmt"
"os"
"path/filepath"
"github.com/spf13/cobra"
"golang.org/x/sys/unix"
"gopkg.in/yaml.v3"
)
var (
// Config command flags
configPathFlag string
allowedNodesPathFlag string
mergeNodesFlag []string
replaceMode bool
)
var configCmd = &cobra.Command{
Use: "config",
Short: "Manage sensor proxy configuration",
Long: `Atomic configuration management for pulse-sensor-proxy`,
}
var validateCmd = &cobra.Command{
Use: "validate",
Short: "Validate configuration files",
Long: `Parse and validate config.yaml and allowed_nodes.yaml files`,
RunE: func(cmd *cobra.Command, args []string) error {
cfgPath := configPathFlag
if cfgPath == "" {
cfgPath = defaultConfigPath
}
allowedNodesPath := allowedNodesPathFlag
if allowedNodesPath == "" {
allowedNodesPath = filepath.Join(filepath.Dir(cfgPath), "allowed_nodes.yaml")
}
if err := validateConfigFile(cfgPath); err != nil {
fmt.Fprintf(os.Stderr, "Config validation failed: %v\n", err)
return err
}
// Check if allowed_nodes.yaml exists and validate it
if _, err := os.Stat(allowedNodesPath); err == nil {
if err := validateAllowedNodesFile(allowedNodesPath); err != nil {
fmt.Fprintf(os.Stderr, "Allowed nodes validation failed: %v\n", err)
return err
}
}
fmt.Println("Configuration valid")
return nil
},
}
var setAllowedNodesCmd = &cobra.Command{
Use: "set-allowed-nodes",
Short: "Atomically update allowed_nodes.yaml",
Long: `Merge or replace allowed nodes with atomic writes and file locking.
Examples:
# Merge new nodes into existing list
pulse-sensor-proxy config set-allowed-nodes --merge 192.168.0.1 --merge node1.local
# Replace entire list
pulse-sensor-proxy config set-allowed-nodes --replace --merge 192.168.0.1 --merge 192.168.0.2
`,
RunE: func(cmd *cobra.Command, args []string) error {
allowedNodesPath := allowedNodesPathFlag
if allowedNodesPath == "" {
// Default to /etc/pulse-sensor-proxy/allowed_nodes.yaml
allowedNodesPath = "/etc/pulse-sensor-proxy/allowed_nodes.yaml"
}
if len(mergeNodesFlag) == 0 {
return fmt.Errorf("no nodes specified (use --merge flag)")
}
if err := setAllowedNodes(allowedNodesPath, mergeNodesFlag, replaceMode); err != nil {
fmt.Fprintf(os.Stderr, "Failed to update allowed nodes: %v\n", err)
return err
}
if replaceMode {
fmt.Printf("Replaced allowed nodes with %d entries\n", len(mergeNodesFlag))
} else {
fmt.Printf("Merged %d nodes into allowed nodes list\n", len(mergeNodesFlag))
}
return nil
},
}
func init() {
// Add subcommands to config command
configCmd.AddCommand(validateCmd)
configCmd.AddCommand(setAllowedNodesCmd)
// Validate command flags
validateCmd.Flags().StringVar(&configPathFlag, "config", "", "Path to config.yaml (default: /etc/pulse-sensor-proxy/config.yaml)")
validateCmd.Flags().StringVar(&allowedNodesPathFlag, "allowed-nodes", "", "Path to allowed_nodes.yaml (default: same dir as config)")
// Set-allowed-nodes command flags
setAllowedNodesCmd.Flags().StringVar(&allowedNodesPathFlag, "allowed-nodes", "", "Path to allowed_nodes.yaml (default: /etc/pulse-sensor-proxy/allowed_nodes.yaml)")
setAllowedNodesCmd.Flags().StringSliceVar(&mergeNodesFlag, "merge", []string{}, "Node to merge (can be specified multiple times)")
setAllowedNodesCmd.Flags().BoolVar(&replaceMode, "replace", false, "Replace entire list instead of merging")
// Add config command to root
rootCmd.AddCommand(configCmd)
}
// validateConfigFile parses and validates the main config file
func validateConfigFile(path string) error {
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to read config file: %w", err)
}
// Check for duplicate allowed_nodes blocks (the issue we're fixing)
sanitized, cleanData := sanitizeDuplicateAllowedNodesBlocks("", data)
if sanitized {
return fmt.Errorf("config contains duplicate allowed_nodes blocks (would auto-fix on service start)")
}
// Parse YAML
cfg := &Config{}
if err := yaml.Unmarshal(cleanData, cfg); err != nil {
return fmt.Errorf("failed to parse config YAML: %w", err)
}
// Validate required fields
if cfg.ReadTimeout <= 0 {
return fmt.Errorf("read_timeout must be positive")
}
if cfg.WriteTimeout <= 0 {
return fmt.Errorf("write_timeout must be positive")
}
return nil
}
// validateAllowedNodesFile parses and validates the allowed_nodes.yaml file
func validateAllowedNodesFile(path string) error {
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("failed to read allowed_nodes file: %w", err)
}
// Parse YAML - can be either a dict with allowed_nodes key or a list
var result interface{}
if err := yaml.Unmarshal(data, &result); err != nil {
return fmt.Errorf("failed to parse allowed_nodes YAML: %w", err)
}
// Extract nodes
var nodes []string
switch v := result.(type) {
case map[string]interface{}:
if nodeList, ok := v["allowed_nodes"]; ok {
if list, ok := nodeList.([]interface{}); ok {
for _, item := range list {
if s, ok := item.(string); ok && s != "" {
nodes = append(nodes, s)
}
}
}
}
case []interface{}:
for _, item := range v {
if s, ok := item.(string); ok && s != "" {
nodes = append(nodes, s)
}
}
}
if len(nodes) == 0 {
return fmt.Errorf("allowed_nodes file is empty or invalid")
}
return nil
}
// setAllowedNodes atomically updates the allowed_nodes.yaml file with file locking
func setAllowedNodes(path string, newNodes []string, replace bool) error {
// Create directory if it doesn't exist
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create config directory: %w", err)
}
// Use file locking to prevent concurrent writes
return withLockedFile(path, func(f *os.File) error {
var existing []string
// Read existing nodes if not in replace mode
if !replace {
if data, err := os.ReadFile(path); err == nil {
existing = extractNodesFromYAML(data)
}
}
// Merge and deduplicate
merged := normalizeNodes(append(existing, newNodes...))
// Serialize to YAML
output := map[string]interface{}{
"allowed_nodes": merged,
}
data, err := yaml.Marshal(output)
if err != nil {
return fmt.Errorf("failed to marshal YAML: %w", err)
}
// Add header comment
header := "# Managed by pulse-sensor-proxy config CLI\n# Do not edit manually while service is running\n"
finalData := []byte(header + string(data))
// Write atomically
return atomicWriteFile(path, finalData, 0644)
})
}
// withLockedFile opens a file with exclusive locking and runs a callback
func withLockedFile(path string, fn func(f *os.File) error) error {
// Open or create the file
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer f.Close()
// Acquire exclusive lock
if err := unix.Flock(int(f.Fd()), unix.LOCK_EX); err != nil {
return fmt.Errorf("failed to acquire file lock: %w", err)
}
defer unix.Flock(int(f.Fd()), unix.LOCK_UN) //nolint:errcheck
// Run callback
return fn(f)
}
// atomicWriteFile writes data to a file atomically using temp file + rename
func atomicWriteFile(path string, data []byte, perm os.FileMode) error {
dir := filepath.Dir(path)
// Create temp file in same directory
tmp, err := os.CreateTemp(dir, ".tmp-*")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
tmpPath := tmp.Name()
// Clean up temp file on error
defer func() {
if tmpPath != "" {
os.Remove(tmpPath)
}
}()
// Write data
if _, err := tmp.Write(data); err != nil {
tmp.Close()
return fmt.Errorf("failed to write temp file: %w", err)
}
// Sync to disk
if err := tmp.Sync(); err != nil {
tmp.Close()
return fmt.Errorf("failed to sync temp file: %w", err)
}
// Close temp file
if err := tmp.Close(); err != nil {
return fmt.Errorf("failed to close temp file: %w", err)
}
// Set permissions
if err := os.Chmod(tmpPath, perm); err != nil {
return fmt.Errorf("failed to set permissions: %w", err)
}
// Atomic rename
if err := os.Rename(tmpPath, path); err != nil {
return fmt.Errorf("failed to rename temp file: %w", err)
}
// Mark temp file as successfully moved (don't delete in defer)
tmpPath = ""
// Sync directory to ensure rename is persisted
dirFile, err := os.Open(dir)
if err == nil {
dirFile.Sync() //nolint:errcheck
dirFile.Close()
}
return nil
}
// extractNodesFromYAML extracts node list from YAML data
func extractNodesFromYAML(data []byte) []string {
var result interface{}
if err := yaml.Unmarshal(data, &result); err != nil {
return nil
}
var nodes []string
switch v := result.(type) {
case map[string]interface{}:
if nodeList, ok := v["allowed_nodes"]; ok {
if list, ok := nodeList.([]interface{}); ok {
for _, item := range list {
if s, ok := item.(string); ok && s != "" {
nodes = append(nodes, s)
}
}
}
}
case []interface{}:
for _, item := range v {
if s, ok := item.(string); ok && s != "" {
nodes = append(nodes, s)
}
}
}
return nodes
}