Pulse/internal/monitoring/monitor_optimized.go
Pulse Monitor 2a2e281b10 fix: resolve storage display issues (addresses #410, #411, #385)
- Fixed shared storage jumping between nodes on each update by using consistent deduplication
- Shared storage now displays with node="cluster" to indicate it's cluster-wide
- Improved error logging when storage API calls fail to help diagnose permission issues
- Added specific warning when all nodes fail to retrieve storage (helps with #385, #411)

The jumping storage issue (#410) was caused by a race condition where parallel goroutines
would report shared storage under whichever node completed first, causing it to randomly
"jump" between nodes on each polling cycle.
2025-09-03 12:00:52 +00:00

627 lines
No EOL
17 KiB
Go

package monitoring
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/rcourtman/pulse-go-rewrite/internal/errors"
"github.com/rcourtman/pulse-go-rewrite/internal/models"
"github.com/rcourtman/pulse-go-rewrite/pkg/proxmox"
"github.com/rs/zerolog/log"
)
// pollVMsWithNodesOptimized polls VMs from all nodes in parallel using goroutines
func (m *Monitor) pollVMsWithNodesOptimized(ctx context.Context, instanceName string, client PVEClientInterface, nodes []proxmox.Node) {
startTime := time.Now()
// Channel to collect VM results from each node
type nodeResult struct {
node string
vms []models.VM
err error
}
resultChan := make(chan nodeResult, len(nodes))
var wg sync.WaitGroup
// Count online nodes for logging
onlineNodes := 0
for _, node := range nodes {
if node.Status == "online" {
onlineNodes++
}
}
log.Info().
Str("instance", instanceName).
Int("totalNodes", len(nodes)).
Int("onlineNodes", onlineNodes).
Msg("Starting parallel VM polling")
// Launch a goroutine for each online node
for _, node := range nodes {
// Skip offline nodes
if node.Status != "online" {
log.Debug().
Str("node", node.Node).
Str("status", node.Status).
Msg("Skipping offline node for VM polling")
continue
}
wg.Add(1)
go func(n proxmox.Node) {
defer wg.Done()
nodeStart := time.Now()
// Fetch VMs for this node
vms, err := client.GetVMs(ctx, n.Node)
if err != nil {
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_vms", instanceName, err).WithNode(n.Node)
log.Error().Err(monErr).Str("node", n.Node).Msg("Failed to get VMs")
resultChan <- nodeResult{node: n.Node, err: err}
return
}
var nodeVMs []models.VM
// Process each VM
for _, vm := range vms {
// Skip templates
if vm.Template == 1 {
continue
}
// Parse tags
var tags []string
if vm.Tags != "" {
tags = strings.Split(vm.Tags, ";")
}
// Create guest ID
var guestID string
if instanceName == n.Node {
guestID = fmt.Sprintf("%s-%d", n.Node, vm.VMID)
} else {
guestID = fmt.Sprintf("%s-%s-%d", instanceName, n.Node, vm.VMID)
}
// Calculate I/O rates
currentMetrics := IOMetrics{
DiskRead: int64(vm.DiskRead),
DiskWrite: int64(vm.DiskWrite),
NetworkIn: int64(vm.NetIn),
NetworkOut: int64(vm.NetOut),
Timestamp: time.Now(),
}
diskReadRate, diskWriteRate, netInRate, netOutRate := m.rateTracker.CalculateRates(guestID, currentMetrics)
// Get memory info for running VMs
memUsed := uint64(0)
memTotal := vm.MaxMem
if vm.Status == "running" {
// Try to get detailed VM status (but don't wait too long)
statusCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
if vmStatus, err := client.GetVMStatus(statusCtx, n.Node, vm.VMID); err == nil {
if vmStatus.Balloon > 0 && vmStatus.Balloon < vmStatus.MaxMem {
memTotal = vmStatus.Balloon
}
if vmStatus.FreeMem > 0 {
memUsed = memTotal - vmStatus.FreeMem
} else if vmStatus.Mem > 0 {
memUsed = vmStatus.Mem
}
}
cancel()
}
// Set CPU to 0 for non-running VMs
cpuUsage := safeFloat(vm.CPU)
if vm.Status != "running" {
cpuUsage = 0
}
// Calculate disk usage
diskUsed := uint64(vm.Disk)
diskTotal := uint64(vm.MaxDisk)
diskFree := diskTotal - diskUsed
diskUsage := safePercentage(float64(diskUsed), float64(diskTotal))
if diskUsed == 0 && diskTotal > 0 && vm.Status == "running" {
diskUsage = -1 // Unknown
}
// Create VM model
modelVM := models.VM{
ID: guestID,
VMID: vm.VMID,
Name: vm.Name,
Node: n.Node,
Instance: instanceName,
Status: vm.Status,
Type: "qemu",
CPU: cpuUsage,
CPUs: int(vm.CPUs),
Memory: models.Memory{
Total: int64(memTotal),
Used: int64(memUsed),
Free: int64(memTotal - memUsed),
Usage: safePercentage(float64(memUsed), float64(memTotal)),
},
Disk: models.Disk{
Total: int64(diskTotal),
Used: int64(diskUsed),
Free: int64(diskFree),
Usage: diskUsage,
},
NetworkIn: maxInt64(0, int64(netInRate)),
NetworkOut: maxInt64(0, int64(netOutRate)),
DiskRead: maxInt64(0, int64(diskReadRate)),
DiskWrite: maxInt64(0, int64(diskWriteRate)),
Uptime: int64(vm.Uptime),
Template: vm.Template == 1,
LastSeen: time.Now(),
Tags: tags,
}
// Zero out metrics for non-running VMs
if vm.Status != "running" {
modelVM.CPU = 0
modelVM.Memory.Usage = 0
modelVM.Disk.Usage = 0
modelVM.NetworkIn = 0
modelVM.NetworkOut = 0
modelVM.DiskRead = 0
modelVM.DiskWrite = 0
}
nodeVMs = append(nodeVMs, modelVM)
// Check alerts
m.alertManager.CheckGuest(modelVM, instanceName)
}
nodeDuration := time.Since(nodeStart)
log.Debug().
Str("node", n.Node).
Int("vms", len(nodeVMs)).
Dur("duration", nodeDuration).
Msg("Node VM polling completed")
resultChan <- nodeResult{node: n.Node, vms: nodeVMs}
}(node)
}
// Close channel when all goroutines complete
go func() {
wg.Wait()
close(resultChan)
}()
// Collect results from all nodes
var allVMs []models.VM
successfulNodes := 0
failedNodes := 0
for result := range resultChan {
if result.err != nil {
failedNodes++
} else {
successfulNodes++
allVMs = append(allVMs, result.vms...)
}
}
// Update state with all VMs
m.state.UpdateVMsForInstance(instanceName, allVMs)
duration := time.Since(startTime)
log.Info().
Str("instance", instanceName).
Int("totalVMs", len(allVMs)).
Int("successfulNodes", successfulNodes).
Int("failedNodes", failedNodes).
Dur("duration", duration).
Msg("Parallel VM polling completed")
}
// pollContainersWithNodesOptimized polls containers from all nodes in parallel using goroutines
func (m *Monitor) pollContainersWithNodesOptimized(ctx context.Context, instanceName string, client PVEClientInterface, nodes []proxmox.Node) {
startTime := time.Now()
// Channel to collect container results from each node
type nodeResult struct {
node string
containers []models.Container
err error
}
resultChan := make(chan nodeResult, len(nodes))
var wg sync.WaitGroup
// Count online nodes for logging
onlineNodes := 0
for _, node := range nodes {
if node.Status == "online" {
onlineNodes++
}
}
log.Info().
Str("instance", instanceName).
Int("totalNodes", len(nodes)).
Int("onlineNodes", onlineNodes).
Msg("Starting parallel container polling")
// Launch a goroutine for each online node
for _, node := range nodes {
// Skip offline nodes
if node.Status != "online" {
log.Debug().
Str("node", node.Node).
Str("status", node.Status).
Msg("Skipping offline node for container polling")
continue
}
wg.Add(1)
go func(n proxmox.Node) {
defer wg.Done()
nodeStart := time.Now()
// Fetch containers for this node
containers, err := client.GetContainers(ctx, n.Node)
if err != nil {
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_containers", instanceName, err).WithNode(n.Node)
log.Error().Err(monErr).Str("node", n.Node).Msg("Failed to get containers")
resultChan <- nodeResult{node: n.Node, err: err}
return
}
var nodeContainers []models.Container
// Process each container
for _, container := range containers {
// Skip templates
if container.Template == 1 {
continue
}
// Parse tags
var tags []string
if container.Tags != "" {
tags = strings.Split(container.Tags, ";")
}
// Create guest ID
var guestID string
if instanceName == n.Node {
guestID = fmt.Sprintf("%s-%d", n.Node, container.VMID)
} else {
guestID = fmt.Sprintf("%s-%s-%d", instanceName, n.Node, container.VMID)
}
// Calculate I/O rates
currentMetrics := IOMetrics{
DiskRead: int64(container.DiskRead),
DiskWrite: int64(container.DiskWrite),
NetworkIn: int64(container.NetIn),
NetworkOut: int64(container.NetOut),
Timestamp: time.Now(),
}
diskReadRate, diskWriteRate, netInRate, netOutRate := m.rateTracker.CalculateRates(guestID, currentMetrics)
// Set CPU to 0 for non-running containers
cpuUsage := safeFloat(container.CPU)
if container.Status != "running" {
cpuUsage = 0
}
// Create container model
modelContainer := models.Container{
ID: guestID,
VMID: int(container.VMID),
Name: container.Name,
Node: n.Node,
Instance: instanceName,
Status: container.Status,
Type: "lxc",
CPU: cpuUsage,
CPUs: int(container.CPUs),
Memory: models.Memory{
Total: int64(container.MaxMem),
Used: int64(container.Mem),
Free: int64(container.MaxMem - container.Mem),
Usage: safePercentage(float64(container.Mem), float64(container.MaxMem)),
},
Disk: models.Disk{
Total: int64(container.MaxDisk),
Used: int64(container.Disk),
Free: int64(container.MaxDisk - container.Disk),
Usage: safePercentage(float64(container.Disk), float64(container.MaxDisk)),
},
NetworkIn: maxInt64(0, int64(netInRate)),
NetworkOut: maxInt64(0, int64(netOutRate)),
DiskRead: maxInt64(0, int64(diskReadRate)),
DiskWrite: maxInt64(0, int64(diskWriteRate)),
Uptime: int64(container.Uptime),
Template: container.Template == 1,
LastSeen: time.Now(),
Tags: tags,
}
// Zero out metrics for non-running containers
if container.Status != "running" {
modelContainer.CPU = 0
modelContainer.Memory.Usage = 0
modelContainer.Disk.Usage = 0
modelContainer.NetworkIn = 0
modelContainer.NetworkOut = 0
modelContainer.DiskRead = 0
modelContainer.DiskWrite = 0
}
nodeContainers = append(nodeContainers, modelContainer)
// Check alerts
m.alertManager.CheckGuest(modelContainer, instanceName)
}
nodeDuration := time.Since(nodeStart)
log.Debug().
Str("node", n.Node).
Int("containers", len(nodeContainers)).
Dur("duration", nodeDuration).
Msg("Node container polling completed")
resultChan <- nodeResult{node: n.Node, containers: nodeContainers}
}(node)
}
// Close channel when all goroutines complete
go func() {
wg.Wait()
close(resultChan)
}()
// Collect results from all nodes
var allContainers []models.Container
successfulNodes := 0
failedNodes := 0
for result := range resultChan {
if result.err != nil {
failedNodes++
} else {
successfulNodes++
allContainers = append(allContainers, result.containers...)
}
}
// Update state with all containers
m.state.UpdateContainersForInstance(instanceName, allContainers)
duration := time.Since(startTime)
log.Info().
Str("instance", instanceName).
Int("totalContainers", len(allContainers)).
Int("successfulNodes", successfulNodes).
Int("failedNodes", failedNodes).
Dur("duration", duration).
Msg("Parallel container polling completed")
}
// pollStorageWithNodesOptimized polls storage from all nodes in parallel using goroutines
func (m *Monitor) pollStorageWithNodesOptimized(ctx context.Context, instanceName string, client PVEClientInterface, nodes []proxmox.Node) {
startTime := time.Now()
// Get cluster storage configuration first (single call)
clusterStorages, err := client.GetAllStorage(ctx)
clusterStorageAvailable := err == nil
if err != nil {
log.Warn().Err(err).Str("instance", instanceName).Msg("Failed to get cluster storage config - will continue with node storage only")
}
// Create a map for quick lookup of cluster storage config
clusterStorageMap := make(map[string]proxmox.Storage)
if clusterStorageAvailable {
for _, cs := range clusterStorages {
clusterStorageMap[cs.Storage] = cs
}
}
// Channel to collect storage results from each node
type nodeResult struct {
node string
storage []models.Storage
err error
}
resultChan := make(chan nodeResult, len(nodes))
var wg sync.WaitGroup
// Count online nodes for logging
onlineNodes := 0
for _, node := range nodes {
if node.Status == "online" {
onlineNodes++
}
}
log.Info().
Str("instance", instanceName).
Int("totalNodes", len(nodes)).
Int("onlineNodes", onlineNodes).
Msg("Starting parallel storage polling")
// Launch a goroutine for each online node
for _, node := range nodes {
// Skip offline nodes
if node.Status != "online" {
log.Debug().
Str("node", node.Node).
Str("status", node.Status).
Msg("Skipping offline node for storage polling")
continue
}
wg.Add(1)
go func(n proxmox.Node) {
defer wg.Done()
nodeStart := time.Now()
// Fetch storage for this node
nodeStorage, err := client.GetStorage(ctx, n.Node)
if err != nil {
// Log more details about the failure
log.Error().
Err(err).
Str("node", n.Node).
Str("instance", instanceName).
Msg("Failed to get node storage - check API permissions")
resultChan <- nodeResult{node: n.Node, err: err}
return
}
var nodeStorageList []models.Storage
// Process each storage
for _, storage := range nodeStorage {
// Create storage ID
var storageID string
if instanceName == n.Node {
storageID = fmt.Sprintf("%s-%s", n.Node, storage.Storage)
} else {
storageID = fmt.Sprintf("%s-%s-%s", instanceName, n.Node, storage.Storage)
}
// Get cluster config for this storage
clusterConfig, hasClusterConfig := clusterStorageMap[storage.Storage]
// Determine if shared
shared := hasClusterConfig && clusterConfig.Shared == 1
// Create storage model
modelStorage := models.Storage{
ID: storageID,
Name: storage.Storage,
Node: n.Node,
Instance: instanceName,
Type: storage.Type,
Status: "available",
Total: int64(storage.Total),
Used: int64(storage.Used),
Free: int64(storage.Available),
Usage: safePercentage(float64(storage.Used), float64(storage.Total)),
Content: sortContent(storage.Content),
Shared: shared,
Enabled: true,
Active: true,
}
// Override with cluster config if available
if hasClusterConfig {
modelStorage.Enabled = clusterConfig.Enabled == 1
modelStorage.Active = clusterConfig.Active == 1
}
// Determine status based on active/enabled flags
if storage.Active == 1 || modelStorage.Active {
modelStorage.Status = "available"
} else if modelStorage.Enabled {
modelStorage.Status = "inactive"
} else {
modelStorage.Status = "disabled"
}
nodeStorageList = append(nodeStorageList, modelStorage)
}
nodeDuration := time.Since(nodeStart)
log.Debug().
Str("node", n.Node).
Int("storage", len(nodeStorageList)).
Dur("duration", nodeDuration).
Msg("Node storage polling completed")
resultChan <- nodeResult{node: n.Node, storage: nodeStorageList}
}(node)
}
// Close channel when all goroutines complete
go func() {
wg.Wait()
close(resultChan)
}()
// Collect results from all nodes
var allStorage []models.Storage
sharedStorageMap := make(map[string]models.Storage) // Map to keep best shared storage entry
successfulNodes := 0
failedNodes := 0
for result := range resultChan {
if result.err != nil {
failedNodes++
} else {
successfulNodes++
for _, storage := range result.storage {
if storage.Shared {
// For shared storage, use just the storage name as key
// This ensures consistent deduplication regardless of which node reports first
key := storage.Name
// Keep the entry with the most complete data (highest usage)
// or the first one if all are equal
if existing, exists := sharedStorageMap[key]; !exists || storage.Used > existing.Used {
// Update the Node field to indicate it's shared across cluster
storage.Node = "cluster"
sharedStorageMap[key] = storage
}
} else {
// Non-shared storage goes directly to results
allStorage = append(allStorage, storage)
}
}
}
}
// Add deduplicated shared storage to results
for _, storage := range sharedStorageMap {
allStorage = append(allStorage, storage)
}
// Check alerts for all storage devices
for _, storage := range allStorage {
m.alertManager.CheckStorage(storage)
}
// Update state with all storage
m.state.UpdateStorageForInstance(instanceName, allStorage)
duration := time.Since(startTime)
// Warn if all nodes failed to get storage
if successfulNodes == 0 && failedNodes > 0 {
log.Error().
Str("instance", instanceName).
Int("failedNodes", failedNodes).
Msg("All nodes failed to retrieve storage - check Proxmox API permissions for Datastore.Audit on all storage")
} else {
log.Info().
Str("instance", instanceName).
Int("totalStorage", len(allStorage)).
Int("successfulNodes", successfulNodes).
Int("failedNodes", failedNodes).
Dur("duration", duration).
Msg("Parallel storage polling completed")
}
}