Pulse/internal/monitoring/monitor.go
Pulse Monitor 8492b0932d fix: dashboard now uses actual configured host URLs for node links
addresses #306 - The dashboard and storage views were hardcoding port 8006 for node links,
but now they properly use the host URLs from the node configuration. This ensures users
are redirected to the correct URL when clicking on node names, respecting custom ports
and protocols configured in the settings.

- Added host field to Node struct in Go models
- Updated monitor.go to populate host field from instance config
- Added host field to TypeScript Node interface
- Modified Dashboard and Storage components to use nodeHostMap for correct URLs
- Falls back to old behavior if host field is not available
2025-08-12 14:28:19 +00:00

1977 lines
61 KiB
Go

package monitoring
import (
"context"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/rcourtman/pulse-go-rewrite/internal/alerts"
"github.com/rcourtman/pulse-go-rewrite/internal/config"
"github.com/rcourtman/pulse-go-rewrite/internal/discovery"
"github.com/rcourtman/pulse-go-rewrite/internal/errors"
"github.com/rcourtman/pulse-go-rewrite/internal/models"
"github.com/rcourtman/pulse-go-rewrite/internal/notifications"
"github.com/rcourtman/pulse-go-rewrite/internal/websocket"
"github.com/rcourtman/pulse-go-rewrite/pkg/pbs"
"github.com/rcourtman/pulse-go-rewrite/pkg/proxmox"
"github.com/rs/zerolog/log"
)
// PVEClientInterface defines the interface for PVE clients (both regular and cluster)
type PVEClientInterface interface {
GetNodes(ctx context.Context) ([]proxmox.Node, error)
GetNodeStatus(ctx context.Context, node string) (*proxmox.NodeStatus, error)
GetVMs(ctx context.Context, node string) ([]proxmox.VM, error)
GetContainers(ctx context.Context, node string) ([]proxmox.Container, error)
GetStorage(ctx context.Context, node string) ([]proxmox.Storage, error)
GetAllStorage(ctx context.Context) ([]proxmox.Storage, error)
GetBackupTasks(ctx context.Context) ([]proxmox.Task, error)
GetStorageContent(ctx context.Context, node, storage string) ([]proxmox.StorageContent, error)
GetVMSnapshots(ctx context.Context, node string, vmid int) ([]proxmox.Snapshot, error)
GetContainerSnapshots(ctx context.Context, node string, vmid int) ([]proxmox.Snapshot, error)
GetVMStatus(ctx context.Context, node string, vmid int) (*proxmox.VMStatus, error)
}
// Monitor handles all monitoring operations
type Monitor struct {
config *config.Config
state *models.State
pveClients map[string]PVEClientInterface
pbsClients map[string]*pbs.Client
mu sync.RWMutex
startTime time.Time
rateTracker *RateTracker
metricsHistory *MetricsHistory
alertManager *alerts.Manager
notificationMgr *notifications.NotificationManager
configPersist *config.ConfigPersistence
discoveryService *discovery.Service // Background discovery service
activePollCount int32 // Number of active polling operations
pollCounter int64 // Counter for polling cycles
authFailures map[string]int // Track consecutive auth failures per node
lastAuthAttempt map[string]time.Time // Track last auth attempt time
}
// safePercentage calculates percentage safely, returning 0 if divisor is 0
func safePercentage(used, total float64) float64 {
if total == 0 {
return 0
}
result := used / total * 100
if math.IsNaN(result) || math.IsInf(result, 0) {
return 0
}
return result
}
// safeFloat ensures a float value is not NaN or Inf
func safeFloat(val float64) float64 {
if math.IsNaN(val) || math.IsInf(val, 0) {
return 0
}
return val
}
// sortContent sorts comma-separated content values for consistent display
func sortContent(content string) string {
if content == "" {
return ""
}
parts := strings.Split(content, ",")
sort.Strings(parts)
return strings.Join(parts, ",")
}
// GetConnectionStatuses returns the current connection status for all nodes
func (m *Monitor) GetConnectionStatuses() map[string]bool {
m.mu.RLock()
defer m.mu.RUnlock()
statuses := make(map[string]bool)
// Check PVE clients
for name, client := range m.pveClients {
// Simple check - if we have a client, consider it connected
// In reality, you'd want to check if recent API calls succeeded
statuses["pve-"+name] = client != nil
}
// Check PBS clients
for name, client := range m.pbsClients {
statuses["pbs-"+name] = client != nil
}
return statuses
}
// New creates a new Monitor instance
func New(cfg *config.Config) (*Monitor, error) {
m := &Monitor{
config: cfg,
state: models.NewState(),
pveClients: make(map[string]PVEClientInterface),
pbsClients: make(map[string]*pbs.Client),
startTime: time.Now(),
rateTracker: NewRateTracker(),
metricsHistory: NewMetricsHistory(1000, 24*time.Hour), // Keep up to 1000 points or 24 hours
alertManager: alerts.NewManager(),
notificationMgr: notifications.NewNotificationManager(),
configPersist: config.NewConfigPersistence(cfg.DataPath),
discoveryService: nil, // Will be initialized in Start()
authFailures: make(map[string]int),
lastAuthAttempt: make(map[string]time.Time),
}
// Load saved configurations
if alertConfig, err := m.configPersist.LoadAlertConfig(); err == nil {
m.alertManager.UpdateConfig(*alertConfig)
// Apply schedule settings to notification manager
if alertConfig.Schedule.Cooldown > 0 {
m.notificationMgr.SetCooldown(alertConfig.Schedule.Cooldown)
}
if alertConfig.Schedule.GroupingWindow > 0 {
m.notificationMgr.SetGroupingWindow(alertConfig.Schedule.GroupingWindow)
} else if alertConfig.Schedule.Grouping.Window > 0 {
m.notificationMgr.SetGroupingWindow(alertConfig.Schedule.Grouping.Window)
}
m.notificationMgr.SetGroupingOptions(
alertConfig.Schedule.Grouping.ByNode,
alertConfig.Schedule.Grouping.ByGuest,
)
} else {
log.Warn().Err(err).Msg("Failed to load alert configuration")
}
if emailConfig, err := m.configPersist.LoadEmailConfig(); err == nil {
m.notificationMgr.SetEmailConfig(*emailConfig)
} else {
log.Warn().Err(err).Msg("Failed to load email configuration")
}
if webhooks, err := m.configPersist.LoadWebhooks(); err == nil {
for _, webhook := range webhooks {
m.notificationMgr.AddWebhook(webhook)
}
} else {
log.Warn().Err(err).Msg("Failed to load webhook configuration")
}
// Initialize PVE clients
log.Info().Int("count", len(cfg.PVEInstances)).Msg("Initializing PVE clients")
for _, pve := range cfg.PVEInstances {
log.Info().
Str("name", pve.Name).
Str("host", pve.Host).
Str("user", pve.User).
Bool("hasToken", pve.TokenName != "").
Msg("Configuring PVE instance")
// Check if this is a cluster
if pve.IsCluster && len(pve.ClusterEndpoints) > 0 {
// Create cluster client
endpoints := make([]string, 0, len(pve.ClusterEndpoints))
for _, ep := range pve.ClusterEndpoints {
// Use IP if available, otherwise use host
host := ep.IP
if host == "" {
host = ep.Host
}
// Skip if no host information
if host == "" {
log.Warn().
Str("node", ep.NodeName).
Msg("Skipping cluster endpoint with no host/IP")
continue
}
// Ensure we have the full URL
if !strings.HasPrefix(host, "http") {
if pve.VerifySSL {
host = fmt.Sprintf("https://%s:8006", host)
} else {
host = fmt.Sprintf("https://%s:8006", host)
}
}
endpoints = append(endpoints, host)
}
// If no valid endpoints, fall back to single node mode
if len(endpoints) == 0 {
log.Warn().
Str("instance", pve.Name).
Msg("No valid cluster endpoints found, falling back to single node mode")
endpoints = []string{pve.Host}
if !strings.HasPrefix(endpoints[0], "http") {
endpoints[0] = fmt.Sprintf("https://%s:8006", endpoints[0])
}
}
log.Info().
Str("cluster", pve.ClusterName).
Strs("endpoints", endpoints).
Msg("Creating cluster-aware client")
clientConfig := config.CreateProxmoxConfig(&pve)
clientConfig.Timeout = cfg.ConnectionTimeout
clusterClient := proxmox.NewClusterClient(
pve.Name,
clientConfig,
endpoints,
)
m.pveClients[pve.Name] = clusterClient
log.Info().
Str("instance", pve.Name).
Str("cluster", pve.ClusterName).
Int("endpoints", len(endpoints)).
Msg("Cluster client created successfully")
} else {
// Create regular client
clientConfig := config.CreateProxmoxConfig(&pve)
clientConfig.Timeout = cfg.ConnectionTimeout
client, err := proxmox.NewClient(clientConfig)
if err != nil {
monErr := errors.WrapConnectionError("create_pve_client", pve.Name, err)
log.Error().Err(monErr).Str("instance", pve.Name).Msg("Failed to create PVE client")
continue
}
m.pveClients[pve.Name] = client
log.Info().Str("instance", pve.Name).Msg("PVE client created successfully")
}
}
// Initialize PBS clients
log.Info().Int("count", len(cfg.PBSInstances)).Msg("Initializing PBS clients")
for _, pbsInst := range cfg.PBSInstances {
log.Info().
Str("name", pbsInst.Name).
Str("host", pbsInst.Host).
Str("user", pbsInst.User).
Bool("hasToken", pbsInst.TokenName != "").
Msg("Configuring PBS instance")
clientConfig := config.CreatePBSConfig(&pbsInst)
clientConfig.Timeout = 60 * time.Second // Very generous timeout for slow PBS servers
client, err := pbs.NewClient(clientConfig)
if err != nil {
monErr := errors.WrapConnectionError("create_pbs_client", pbsInst.Name, err)
log.Error().Err(monErr).Str("instance", pbsInst.Name).Msg("Failed to create PBS client")
continue
}
m.pbsClients[pbsInst.Name] = client
log.Info().Str("instance", pbsInst.Name).Msg("PBS client created successfully")
}
// Initialize state stats
m.state.Stats = models.Stats{
StartTime: m.startTime,
Version: "2.0.0-go",
}
return m, nil
}
// Start begins the monitoring loop
func (m *Monitor) Start(ctx context.Context, wsHub *websocket.Hub) {
log.Info().
Dur("pollingInterval", m.config.PollingInterval).
Msg("Starting monitoring loop")
// Initialize and start discovery service
discoverySubnet := m.config.DiscoverySubnet
if discoverySubnet == "" {
discoverySubnet = "auto"
}
m.discoveryService = discovery.NewService(wsHub, 5*time.Minute, discoverySubnet)
if m.discoveryService != nil {
m.discoveryService.Start(ctx)
log.Info().Msg("Discovery service initialized and started")
} else {
log.Error().Msg("Failed to initialize discovery service")
}
// Set up alert callbacks
m.alertManager.SetAlertCallback(func(alert *alerts.Alert) {
wsHub.BroadcastAlert(alert)
// Send notifications
go m.notificationMgr.SendAlert(alert)
})
m.alertManager.SetResolvedCallback(func(alertID string) {
wsHub.BroadcastAlertResolved(alertID)
// Broadcast updated state immediately so frontend gets the new activeAlerts list
state := m.GetState()
wsHub.BroadcastState(state)
})
m.alertManager.SetEscalateCallback(func(alert *alerts.Alert, level int) {
log.Info().
Str("alertID", alert.ID).
Int("level", level).
Msg("Alert escalated - sending notifications")
// Get escalation config
config := m.alertManager.GetConfig()
if level <= 0 || level > len(config.Schedule.Escalation.Levels) {
return
}
escalationLevel := config.Schedule.Escalation.Levels[level-1]
// Send notifications based on escalation level
switch escalationLevel.Notify {
case "email":
// Only send email
if emailConfig := m.notificationMgr.GetEmailConfig(); emailConfig.Enabled {
m.notificationMgr.SendAlert(alert)
}
case "webhook":
// Only send webhooks
for _, webhook := range m.notificationMgr.GetWebhooks() {
if webhook.Enabled {
m.notificationMgr.SendAlert(alert)
break
}
}
case "all":
// Send all notifications
m.notificationMgr.SendAlert(alert)
}
// Update WebSocket with escalation
wsHub.BroadcastAlert(alert)
})
// Create separate tickers for polling and broadcasting
pollTicker := time.NewTicker(m.config.PollingInterval)
defer pollTicker.Stop()
broadcastTicker := time.NewTicker(m.config.PollingInterval)
defer broadcastTicker.Stop()
// Do an immediate poll on start
go m.poll(ctx, wsHub)
for {
select {
case <-pollTicker.C:
// Start polling in a goroutine so it doesn't block the ticker
go m.poll(ctx, wsHub)
case <-broadcastTicker.C:
// Broadcast current state regardless of polling status
state := m.state.GetSnapshot()
log.Info().
Int("nodes", len(state.Nodes)).
Int("vms", len(state.VMs)).
Int("containers", len(state.Containers)).
Int("pbs", len(state.PBSInstances)).
Int("pbsBackups", len(state.PBSBackups)).
Msg("Broadcasting state update (ticker)")
wsHub.BroadcastState(state)
case <-ctx.Done():
log.Info().Msg("Monitoring loop stopped")
return
}
}
}
// poll fetches data from all configured instances
func (m *Monitor) poll(ctx context.Context, wsHub *websocket.Hub) {
// Limit concurrent polls to 2 to prevent resource exhaustion
currentCount := atomic.AddInt32(&m.activePollCount, 1)
if currentCount > 2 {
atomic.AddInt32(&m.activePollCount, -1)
log.Debug().Int32("activePolls", currentCount-1).Msg("Too many concurrent polls, skipping")
return
}
defer atomic.AddInt32(&m.activePollCount, -1)
log.Debug().Msg("Starting polling cycle")
startTime := time.Now()
if m.config.ConcurrentPolling {
// Use concurrent polling
m.pollConcurrent(ctx)
} else {
m.pollSequential(ctx)
}
// Update performance metrics
m.state.Performance.LastPollDuration = time.Since(startTime).Seconds()
m.state.Stats.PollingCycles++
m.state.Stats.Uptime = int64(time.Since(m.startTime).Seconds())
m.state.Stats.WebSocketClients = wsHub.GetClientCount()
// Sync active alerts to state
activeAlerts := m.alertManager.GetActiveAlerts()
modelAlerts := make([]models.Alert, 0, len(activeAlerts))
for _, alert := range activeAlerts {
modelAlerts = append(modelAlerts, models.Alert{
ID: alert.ID,
Type: alert.Type,
Level: string(alert.Level),
ResourceID: alert.ResourceID,
ResourceName: alert.ResourceName,
Node: alert.Node,
Instance: alert.Instance,
Message: alert.Message,
Value: alert.Value,
Threshold: alert.Threshold,
StartTime: alert.StartTime,
Acknowledged: alert.Acknowledged,
})
}
m.state.UpdateActiveAlerts(modelAlerts)
// Sync recently resolved alerts
recentlyResolved := m.alertManager.GetRecentlyResolved()
if len(recentlyResolved) > 0 {
log.Info().Int("count", len(recentlyResolved)).Msg("Syncing recently resolved alerts")
}
m.state.UpdateRecentlyResolved(recentlyResolved)
// Increment poll counter
m.mu.Lock()
m.pollCounter++
m.mu.Unlock()
log.Debug().Dur("duration", time.Since(startTime)).Msg("Polling cycle completed")
// Broadcasting is now handled by the timer in Start()
}
// pollConcurrent polls all instances concurrently
func (m *Monitor) pollConcurrent(ctx context.Context) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Poll PVE instances
for name, client := range m.pveClients {
// Check if context is already cancelled before starting
select {
case <-ctx.Done():
return
default:
}
wg.Add(1)
go func(instanceName string, c PVEClientInterface) {
defer wg.Done()
// Pass context to ensure cancellation propagates
m.pollPVEInstance(ctx, instanceName, c)
}(name, client)
}
// Poll PBS instances
for name, client := range m.pbsClients {
// Check if context is already cancelled before starting
select {
case <-ctx.Done():
return
default:
}
wg.Add(1)
go func(instanceName string, c *pbs.Client) {
defer wg.Done()
// Pass context to ensure cancellation propagates
m.pollPBSInstance(ctx, instanceName, c)
}(name, client)
}
// Wait for all goroutines to complete or context cancellation
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// All goroutines completed normally
case <-ctx.Done():
// Context cancelled, cancel all operations
cancel()
// Still wait for goroutines to finish gracefully
wg.Wait()
}
}
// pollSequential polls all instances sequentially
func (m *Monitor) pollSequential(ctx context.Context) {
// Poll PVE instances
for name, client := range m.pveClients {
// Check context before each instance
select {
case <-ctx.Done():
return
default:
}
m.pollPVEInstance(ctx, name, client)
}
// Poll PBS instances
for name, client := range m.pbsClients {
// Check context before each instance
select {
case <-ctx.Done():
return
default:
}
m.pollPBSInstance(ctx, name, client)
}
}
// pollPVEInstance polls a single PVE instance
func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, client PVEClientInterface) {
// Check if context is cancelled
select {
case <-ctx.Done():
log.Debug().Str("instance", instanceName).Msg("Polling cancelled")
return
default:
}
log.Debug().Str("instance", instanceName).Msg("Polling PVE instance")
// Get instance config
var instanceCfg *config.PVEInstance
for _, cfg := range m.config.PVEInstances {
if cfg.Name == instanceName {
instanceCfg = &cfg
break
}
}
if instanceCfg == nil {
return
}
// Poll nodes
nodes, err := client.GetNodes(ctx)
if err != nil {
monErr := errors.WrapConnectionError("poll_nodes", instanceName, err)
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get nodes")
m.state.SetConnectionHealth(instanceName, false)
// Track auth failure if it's an authentication error
if errors.IsAuthError(err) {
m.recordAuthFailure(instanceName, "pve")
}
return
}
// Reset auth failures on successful connection
m.resetAuthFailures(instanceName, "pve")
m.state.SetConnectionHealth(instanceName, true)
// Convert to models
var modelNodes []models.Node
for _, node := range nodes {
modelNode := models.Node{
ID: instanceName + "-" + node.Node,
Name: node.Node,
Instance: instanceName,
Host: instanceCfg.Host, // Add the actual host URL
Status: node.Status,
Type: "node",
CPU: safeFloat(node.CPU), // Already in percentage
Memory: models.Memory{
Total: int64(node.MaxMem),
Used: int64(node.Mem),
Free: int64(node.MaxMem - node.Mem),
Usage: safePercentage(float64(node.Mem), float64(node.MaxMem)),
},
Disk: models.Disk{
Total: int64(node.MaxDisk),
Used: int64(node.Disk),
Free: int64(node.MaxDisk - node.Disk),
Usage: safePercentage(float64(node.Disk), float64(node.MaxDisk)),
},
Uptime: int64(node.Uptime),
LoadAverage: []float64{},
LastSeen: time.Now(),
ConnectionHealth: "healthy",
}
// Debug logging for disk metrics - note that these values can fluctuate
// due to thin provisioning and dynamic allocation
log.Debug().
Str("node", node.Node).
Uint64("disk", node.Disk).
Uint64("maxDisk", node.MaxDisk).
Float64("diskUsage", safePercentage(float64(node.Disk), float64(node.MaxDisk))).
Msg("Node disk metrics (raw from Proxmox)")
// Get detailed node info if available
if nodeInfo, err := client.GetNodeStatus(ctx, node.Node); err == nil {
// Convert LoadAvg from interface{} to float64
loadAvg := make([]float64, 0, len(nodeInfo.LoadAvg))
for _, val := range nodeInfo.LoadAvg {
switch v := val.(type) {
case float64:
loadAvg = append(loadAvg, v)
case string:
if f, err := strconv.ParseFloat(v, 64); err == nil {
loadAvg = append(loadAvg, f)
}
}
}
modelNode.LoadAverage = loadAvg
modelNode.KernelVersion = nodeInfo.KernelVersion
modelNode.PVEVersion = nodeInfo.PVEVersion
// Use rootfs data if available for more stable disk metrics
if nodeInfo.RootFS != nil && nodeInfo.RootFS.Total > 0 {
modelNode.Disk = models.Disk{
Total: int64(nodeInfo.RootFS.Total),
Used: int64(nodeInfo.RootFS.Used),
Free: int64(nodeInfo.RootFS.Free),
Usage: safePercentage(float64(nodeInfo.RootFS.Used), float64(nodeInfo.RootFS.Total)),
}
log.Debug().
Str("node", node.Node).
Uint64("rootfsUsed", nodeInfo.RootFS.Used).
Uint64("rootfsTotal", nodeInfo.RootFS.Total).
Float64("rootfsUsage", modelNode.Disk.Usage).
Msg("Using rootfs for disk metrics")
}
if nodeInfo.CPUInfo != nil {
// Use MaxCPU from node data for logical CPU count (includes hyperthreading)
// If MaxCPU is not available or 0, fall back to physical cores
logicalCores := node.MaxCPU
if logicalCores == 0 {
logicalCores = nodeInfo.CPUInfo.Cores
}
mhzStr := nodeInfo.CPUInfo.GetMHzString()
log.Debug().
Str("node", node.Node).
Str("model", nodeInfo.CPUInfo.Model).
Int("cores", nodeInfo.CPUInfo.Cores).
Int("logicalCores", logicalCores).
Int("sockets", nodeInfo.CPUInfo.Sockets).
Str("mhz", mhzStr).
Msg("Node CPU info from Proxmox")
modelNode.CPUInfo = models.CPUInfo{
Model: nodeInfo.CPUInfo.Model,
Cores: logicalCores, // Use logical cores for display
Sockets: nodeInfo.CPUInfo.Sockets,
MHz: mhzStr,
}
}
} else {
log.Debug().Err(err).Str("node", node.Node).Msg("Failed to get node status")
}
modelNodes = append(modelNodes, modelNode)
}
// Update state first so we have nodes available
m.state.UpdateNodesForInstance(instanceName, modelNodes)
// Now get storage data to use as fallback for disk metrics if needed
storageByNode := make(map[string]models.Disk)
if instanceCfg.MonitorStorage {
_, err := client.GetAllStorage(ctx)
if err == nil {
for _, node := range nodes {
nodeStorages, err := client.GetStorage(ctx, node.Node)
if err == nil {
// Look for local or local-lvm storage as most stable disk metric
for _, storage := range nodeStorages {
if storage.Storage == "local" || storage.Storage == "local-lvm" {
disk := models.Disk{
Total: int64(storage.Total),
Used: int64(storage.Used),
Free: int64(storage.Available),
Usage: safePercentage(float64(storage.Used), float64(storage.Total)),
}
// Prefer "local" over "local-lvm"
if _, exists := storageByNode[node.Node]; !exists || storage.Storage == "local" {
storageByNode[node.Node] = disk
log.Debug().
Str("node", node.Node).
Str("storage", storage.Storage).
Float64("usage", disk.Usage).
Msg("Using storage for disk metrics fallback")
}
}
}
}
}
}
}
// Update nodes with storage fallback if rootfs was not available
for i := range modelNodes {
if modelNodes[i].Disk.Total == 0 {
if disk, exists := storageByNode[modelNodes[i].Name]; exists {
modelNodes[i].Disk = disk
log.Debug().
Str("node", modelNodes[i].Name).
Float64("usage", disk.Usage).
Msg("Applied storage fallback for disk metrics")
}
}
// Record node metrics history
now := time.Now()
m.metricsHistory.AddNodeMetric(modelNodes[i].ID, "cpu", modelNodes[i].CPU*100, now)
m.metricsHistory.AddNodeMetric(modelNodes[i].ID, "memory", modelNodes[i].Memory.Usage, now)
m.metricsHistory.AddNodeMetric(modelNodes[i].ID, "disk", modelNodes[i].Disk.Usage, now)
// Check thresholds for alerts
m.alertManager.CheckNode(modelNodes[i])
}
// Update state again with corrected disk metrics
m.state.UpdateNodesForInstance(instanceName, modelNodes)
// Poll VMs if enabled
if instanceCfg.MonitorVMs {
select {
case <-ctx.Done():
return
default:
m.pollVMs(ctx, instanceName, client)
}
}
// Poll containers if enabled
if instanceCfg.MonitorContainers {
select {
case <-ctx.Done():
return
default:
m.pollContainers(ctx, instanceName, client)
}
}
// Poll storage if enabled
if instanceCfg.MonitorStorage {
select {
case <-ctx.Done():
return
default:
m.pollStorage(ctx, instanceName, client)
}
}
// Poll backups if enabled - using configurable cycle count
// This prevents slow backup/snapshot queries from blocking real-time stats
// Also poll on first cycle (pollCounter == 1) to ensure data loads quickly
backupCycles := 10 // default
if m.config.BackupPollingCycles > 0 {
backupCycles = m.config.BackupPollingCycles
}
if instanceCfg.MonitorBackups && (m.pollCounter%int64(backupCycles) == 0 || m.pollCounter == 1) {
select {
case <-ctx.Done():
return
default:
// Run backup polling in a separate goroutine to not block main polling
go func() {
log.Info().Str("instance", instanceName).Msg("Starting background backup/snapshot polling")
// Create a separate context with longer timeout for backup operations
backupCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
// Poll backup tasks
m.pollBackupTasks(backupCtx, instanceName, client)
// Poll storage backups
m.pollStorageBackups(backupCtx, instanceName, client)
// Poll guest snapshots
m.pollGuestSnapshots(backupCtx, instanceName, client)
log.Info().Str("instance", instanceName).Msg("Completed background backup/snapshot polling")
}()
}
}
}
// pollVMs polls VMs from a PVE instance
func (m *Monitor) pollVMs(ctx context.Context, instanceName string, client PVEClientInterface) {
log.Debug().Str("instance", instanceName).Msg("Polling VMs")
// Get all nodes first
nodes, err := client.GetNodes(ctx)
if err != nil {
monErr := errors.WrapConnectionError("get_nodes_for_vms", instanceName, err)
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get nodes for VM polling")
return
}
var allVMs []models.VM
for _, node := range nodes {
vms, err := client.GetVMs(ctx, node.Node)
if err != nil {
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_vms", instanceName, err).WithNode(node.Node)
log.Error().Err(monErr).Str("node", node.Node).Msg("Failed to get VMs")
continue
}
for _, vm := range vms {
// Skip templates if configured
if vm.Template == 1 {
continue
}
// Parse tags
var tags []string
if vm.Tags != "" {
tags = strings.Split(vm.Tags, ";")
}
// Calculate I/O rates
guestID := fmt.Sprintf("%s-%s-%d", instanceName, node.Node, vm.VMID)
currentMetrics := IOMetrics{
DiskRead: int64(vm.DiskRead),
DiskWrite: int64(vm.DiskWrite),
NetworkIn: int64(vm.NetIn),
NetworkOut: int64(vm.NetOut),
Timestamp: time.Now(),
}
diskReadRate, diskWriteRate, netInRate, netOutRate := m.rateTracker.CalculateRates(guestID, currentMetrics)
// For running VMs, try to get detailed status with balloon info
memUsed := uint64(0)
memTotal := vm.MaxMem
if vm.Status == "running" {
// Try to get detailed VM status for more accurate memory reporting
if vmStatus, err := client.GetVMStatus(ctx, node.Node, vm.VMID); err == nil {
// If balloon is enabled, use balloon as the total available memory
if vmStatus.Balloon > 0 && vmStatus.Balloon < vmStatus.MaxMem {
memTotal = vmStatus.Balloon
}
// If we have free memory from guest agent, calculate actual usage
if vmStatus.FreeMem > 0 {
// Guest agent reports free memory, so calculate used
memUsed = memTotal - vmStatus.FreeMem
} else if vmStatus.Mem > 0 {
// No guest agent free memory data, but we have actual memory usage
// Use the reported memory usage from Proxmox
memUsed = vmStatus.Mem
} else {
// No memory data available at all - show 0% usage
memUsed = 0
}
} else {
// Failed to get detailed status - show 0% usage
memUsed = 0
}
} else {
// VM is not running, show 0 usage
memUsed = 0
}
// Set CPU to 0 for non-running VMs to avoid false alerts
// VMs can have status: running, stopped, paused, suspended
cpuUsage := safeFloat(vm.CPU)
if vm.Status != "running" {
cpuUsage = 0
}
modelVM := models.VM{
ID: guestID,
VMID: vm.VMID,
Name: vm.Name,
Node: node.Node,
Instance: instanceName,
Status: vm.Status,
Type: "qemu",
CPU: cpuUsage, // Already in percentage
CPUs: vm.CPUs,
Memory: models.Memory{
Total: int64(memTotal),
Used: int64(memUsed),
Free: int64(memTotal - memUsed),
Usage: safePercentage(float64(memUsed), float64(memTotal)),
},
Disk: models.Disk{
Total: int64(vm.MaxDisk),
Used: int64(vm.Disk),
Free: int64(vm.MaxDisk - vm.Disk),
Usage: safePercentage(float64(vm.Disk), float64(vm.MaxDisk)),
},
NetworkIn: int64(netInRate),
NetworkOut: int64(netOutRate),
DiskRead: int64(diskReadRate),
DiskWrite: int64(diskWriteRate),
Uptime: int64(vm.Uptime),
Template: vm.Template == 1,
Tags: tags,
Lock: vm.Lock,
LastSeen: time.Now(),
}
allVMs = append(allVMs, modelVM)
// Record metrics history
now := time.Now()
m.metricsHistory.AddGuestMetric(modelVM.ID, "cpu", modelVM.CPU*100, now)
m.metricsHistory.AddGuestMetric(modelVM.ID, "memory", modelVM.Memory.Usage, now)
m.metricsHistory.AddGuestMetric(modelVM.ID, "disk", modelVM.Disk.Usage, now)
m.metricsHistory.AddGuestMetric(modelVM.ID, "diskread", float64(modelVM.DiskRead), now)
m.metricsHistory.AddGuestMetric(modelVM.ID, "diskwrite", float64(modelVM.DiskWrite), now)
m.metricsHistory.AddGuestMetric(modelVM.ID, "netin", float64(modelVM.NetworkIn), now)
m.metricsHistory.AddGuestMetric(modelVM.ID, "netout", float64(modelVM.NetworkOut), now)
// Check thresholds for alerts
m.alertManager.CheckGuest(modelVM, instanceName)
}
}
m.state.UpdateVMsForInstance(instanceName, allVMs)
}
// pollContainers polls containers from a PVE instance
func (m *Monitor) pollContainers(ctx context.Context, instanceName string, client PVEClientInterface) {
log.Debug().Str("instance", instanceName).Msg("Polling containers")
// Get all nodes first
nodes, err := client.GetNodes(ctx)
if err != nil {
monErr := errors.WrapConnectionError("get_nodes_for_containers", instanceName, err)
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get nodes for container polling")
return
}
var allContainers []models.Container
for _, node := range nodes {
containers, err := client.GetContainers(ctx, node.Node)
if err != nil {
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_containers", instanceName, err).WithNode(node.Node)
log.Error().Err(monErr).Str("node", node.Node).Msg("Failed to get containers")
continue
}
for _, ct := range containers {
// Skip templates if configured
if ct.Template == 1 {
continue
}
// Parse tags
var tags []string
if ct.Tags != "" {
tags = strings.Split(ct.Tags, ";")
}
// Calculate I/O rates
guestID := fmt.Sprintf("%s-%s-%d", instanceName, node.Node, ct.VMID)
currentMetrics := IOMetrics{
DiskRead: int64(ct.DiskRead),
DiskWrite: int64(ct.DiskWrite),
NetworkIn: int64(ct.NetIn),
NetworkOut: int64(ct.NetOut),
Timestamp: time.Now(),
}
diskReadRate, diskWriteRate, netInRate, netOutRate := m.rateTracker.CalculateRates(guestID, currentMetrics)
// Set CPU to 0 for non-running containers to avoid false alerts
// Containers can have status: running, stopped, paused, suspended
cpuUsage := safeFloat(ct.CPU)
if ct.Status != "running" {
cpuUsage = 0
}
// For containers, memory reporting is more accurate than VMs
// ct.Mem shows actual usage for running containers
memUsed := uint64(0)
memTotal := ct.MaxMem
if ct.Status == "running" {
// For running containers, ct.Mem is actual usage
memUsed = ct.Mem
}
// Convert -1 to nil for I/O metrics when VM is not running
// We'll use -1 to indicate "no data" which will be converted to null for the frontend
modelCT := models.Container{
ID: guestID,
VMID: ct.VMID,
Name: ct.Name,
Node: node.Node,
Instance: instanceName,
Status: ct.Status,
Type: "lxc",
CPU: cpuUsage, // Already in percentage
CPUs: int(ct.CPUs),
Memory: models.Memory{
Total: int64(memTotal),
Used: int64(memUsed),
Free: int64(memTotal - memUsed),
Usage: safePercentage(float64(memUsed), float64(memTotal)),
},
Disk: models.Disk{
Total: int64(ct.MaxDisk),
Used: int64(ct.Disk),
Free: int64(ct.MaxDisk - ct.Disk),
Usage: safePercentage(float64(ct.Disk), float64(ct.MaxDisk)),
},
NetworkIn: int64(netInRate),
NetworkOut: int64(netOutRate),
DiskRead: int64(diskReadRate),
DiskWrite: int64(diskWriteRate),
Uptime: int64(ct.Uptime),
Template: ct.Template == 1,
Tags: tags,
Lock: ct.Lock,
LastSeen: time.Now(),
}
allContainers = append(allContainers, modelCT)
// Record metrics history
now := time.Now()
m.metricsHistory.AddGuestMetric(modelCT.ID, "cpu", modelCT.CPU*100, now)
m.metricsHistory.AddGuestMetric(modelCT.ID, "memory", modelCT.Memory.Usage, now)
m.metricsHistory.AddGuestMetric(modelCT.ID, "disk", modelCT.Disk.Usage, now)
m.metricsHistory.AddGuestMetric(modelCT.ID, "diskread", float64(modelCT.DiskRead), now)
m.metricsHistory.AddGuestMetric(modelCT.ID, "diskwrite", float64(modelCT.DiskWrite), now)
m.metricsHistory.AddGuestMetric(modelCT.ID, "netin", float64(modelCT.NetworkIn), now)
m.metricsHistory.AddGuestMetric(modelCT.ID, "netout", float64(modelCT.NetworkOut), now)
// Check thresholds for alerts
m.alertManager.CheckGuest(modelCT, instanceName)
}
}
m.state.UpdateContainersForInstance(instanceName, allContainers)
}
// pollStorage polls storage from a PVE instance
func (m *Monitor) pollStorage(ctx context.Context, instanceName string, client PVEClientInterface) {
log.Debug().Str("instance", instanceName).Msg("Polling storage")
// Get all nodes first
nodes, err := client.GetNodes(ctx)
if err != nil {
monErr := errors.WrapConnectionError("get_nodes_for_storage", instanceName, err)
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get nodes for storage polling")
return
}
// Get cluster storage configuration for shared/enabled status
clusterStorages, err := client.GetAllStorage(ctx)
if err != nil {
monErr := errors.WrapAPIError("get_cluster_storage", instanceName, err, 0)
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get cluster storage")
}
// Create a map for quick lookup of cluster storage config
clusterStorageMap := make(map[string]proxmox.Storage)
for _, cs := range clusterStorages {
clusterStorageMap[cs.Storage] = cs
}
var allStorage []models.Storage
seenStorage := make(map[string]bool)
// Get storage from each node (this includes capacity info)
for _, node := range nodes {
nodeStorage, err := client.GetStorage(ctx, node.Node)
if err != nil {
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_node_storage", instanceName, err).WithNode(node.Node)
log.Error().Err(monErr).Str("node", node.Node).Msg("Failed to get node storage")
continue
}
for _, storage := range nodeStorage {
// Get cluster config for this storage
clusterConfig, hasClusterConfig := clusterStorageMap[storage.Storage]
// Determine if shared
shared := hasClusterConfig && clusterConfig.Shared == 1
// For shared storage, only include it once
storageKey := storage.Storage
if shared {
if seenStorage[storageKey] {
continue
}
seenStorage[storageKey] = true
}
// Use appropriate node name
nodeID := node.Node
storageID := fmt.Sprintf("%s-%s-%s", instanceName, nodeID, storage.Storage)
if shared {
nodeID = "shared"
// Use a consistent ID for shared storage across all instances
storageID = fmt.Sprintf("shared-%s", storage.Storage)
}
modelStorage := models.Storage{
ID: storageID,
Name: storage.Storage,
Node: nodeID,
Instance: instanceName,
Type: storage.Type,
Status: "available",
Total: int64(storage.Total),
Used: int64(storage.Used),
Free: int64(storage.Available),
Usage: 0,
Content: sortContent(storage.Content),
Shared: shared,
Enabled: true,
Active: true,
}
// Override with cluster config if available
if hasClusterConfig {
// Sort content values for consistent display
if clusterConfig.Content != "" {
contentParts := strings.Split(clusterConfig.Content, ",")
sort.Strings(contentParts)
modelStorage.Content = strings.Join(contentParts, ",")
} else {
modelStorage.Content = clusterConfig.Content
}
modelStorage.Enabled = clusterConfig.Enabled == 1
modelStorage.Active = clusterConfig.Active == 1
}
// Calculate usage percentage
if modelStorage.Total > 0 {
modelStorage.Usage = safePercentage(float64(modelStorage.Used), float64(modelStorage.Total))
}
// Determine status based on active/enabled flags
if storage.Active == 1 || modelStorage.Active {
modelStorage.Status = "available"
} else if modelStorage.Enabled {
modelStorage.Status = "inactive"
} else {
modelStorage.Status = "disabled"
}
allStorage = append(allStorage, modelStorage)
// Record storage metrics history
now := time.Now()
m.metricsHistory.AddStorageMetric(modelStorage.ID, "usage", modelStorage.Usage, now)
m.metricsHistory.AddStorageMetric(modelStorage.ID, "used", float64(modelStorage.Used), now)
m.metricsHistory.AddStorageMetric(modelStorage.ID, "total", float64(modelStorage.Total), now)
m.metricsHistory.AddStorageMetric(modelStorage.ID, "avail", float64(modelStorage.Free), now)
// Check thresholds for alerts
m.alertManager.CheckStorage(modelStorage)
}
}
// Update storage for this instance only
var instanceStorage []models.Storage
for _, st := range allStorage {
st.Instance = instanceName
instanceStorage = append(instanceStorage, st)
}
m.state.UpdateStorageForInstance(instanceName, instanceStorage)
}
// pollBackupTasks polls backup tasks from a PVE instance
func (m *Monitor) pollBackupTasks(ctx context.Context, instanceName string, client PVEClientInterface) {
log.Debug().Str("instance", instanceName).Msg("Polling backup tasks")
tasks, err := client.GetBackupTasks(ctx)
if err != nil {
monErr := errors.WrapAPIError("get_backup_tasks", instanceName, err, 0)
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get backup tasks")
return
}
var backupTasks []models.BackupTask
for _, task := range tasks {
// Extract VMID from task ID (format: "UPID:node:pid:starttime:type:vmid:user@realm:")
vmid := 0
if task.ID != "" {
if vmidInt, err := strconv.Atoi(task.ID); err == nil {
vmid = vmidInt
}
}
taskID := fmt.Sprintf("%s-%s", instanceName, task.UPID)
backupTask := models.BackupTask{
ID: taskID,
Node: task.Node,
Type: task.Type,
VMID: vmid,
Status: task.Status,
StartTime: time.Unix(task.StartTime, 0),
}
if task.EndTime > 0 {
backupTask.EndTime = time.Unix(task.EndTime, 0)
}
backupTasks = append(backupTasks, backupTask)
}
// Update state with new backup tasks for this instance
m.state.UpdateBackupTasksForInstance(instanceName, backupTasks)
}
// pollPBSInstance polls a single PBS instance
func (m *Monitor) pollPBSInstance(ctx context.Context, instanceName string, client *pbs.Client) {
// Check if context is cancelled
select {
case <-ctx.Done():
log.Debug().Str("instance", instanceName).Msg("Polling cancelled")
return
default:
}
log.Debug().Str("instance", instanceName).Msg("Polling PBS instance")
// Get instance config
var instanceCfg *config.PBSInstance
for _, cfg := range m.config.PBSInstances {
if cfg.Name == instanceName {
instanceCfg = &cfg
log.Debug().
Str("instance", instanceName).
Bool("monitorDatastores", cfg.MonitorDatastores).
Msg("Found PBS instance config")
break
}
}
if instanceCfg == nil {
log.Error().Str("instance", instanceName).Msg("PBS instance config not found")
return
}
// Initialize PBS instance with default values
pbsInst := models.PBSInstance{
ID: "pbs-" + instanceName,
Name: instanceName,
Host: instanceCfg.Host,
Status: "offline",
Version: "unknown",
ConnectionHealth: "unhealthy",
LastSeen: time.Now(),
}
// Try to get version first
version, versionErr := client.GetVersion(ctx)
if versionErr == nil {
// Version succeeded - PBS is online
pbsInst.Status = "online"
pbsInst.Version = version.Version
pbsInst.ConnectionHealth = "healthy"
m.resetAuthFailures(instanceName, "pbs")
m.state.SetConnectionHealth("pbs-"+instanceName, true)
log.Debug().
Str("instance", instanceName).
Str("version", version.Version).
Bool("monitorDatastores", instanceCfg.MonitorDatastores).
Msg("PBS version retrieved successfully")
} else {
log.Debug().Err(versionErr).Str("instance", instanceName).Msg("Failed to get PBS version, trying fallback")
// Version failed, try datastores as fallback (like test connection does)
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel2()
_, datastoreErr := client.GetDatastores(ctx2)
if datastoreErr == nil {
// Datastores succeeded - PBS is online but version unavailable
pbsInst.Status = "online"
pbsInst.Version = "connected"
pbsInst.ConnectionHealth = "healthy"
m.resetAuthFailures(instanceName, "pbs")
m.state.SetConnectionHealth("pbs-"+instanceName, true)
log.Info().
Str("instance", instanceName).
Msg("PBS connected (version unavailable but datastores accessible)")
} else {
// Both failed - PBS is offline
monErr := errors.WrapConnectionError("get_pbs_version", instanceName, versionErr)
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to connect to PBS")
m.state.SetConnectionHealth("pbs-"+instanceName, false)
// Track auth failure if it's an authentication error
if errors.IsAuthError(versionErr) || errors.IsAuthError(datastoreErr) {
m.recordAuthFailure(instanceName, "pbs")
// Don't continue if auth failed
return
}
}
}
// Get node status (CPU, memory, etc.)
// Note: This requires Sys.Audit permission on PBS which read-only tokens often don't have
nodeStatus, err := client.GetNodeStatus(ctx)
if err != nil {
// Log as debug instead of error since this is often a permission issue
log.Debug().Err(err).Str("instance", instanceName).Msg("Could not get PBS node status (may need Sys.Audit permission)")
} else {
pbsInst.CPU = nodeStatus.CPU
if nodeStatus.Memory.Total > 0 {
pbsInst.Memory = float64(nodeStatus.Memory.Used) / float64(nodeStatus.Memory.Total) * 100
pbsInst.MemoryUsed = nodeStatus.Memory.Used
pbsInst.MemoryTotal = nodeStatus.Memory.Total
}
pbsInst.Uptime = nodeStatus.Uptime
log.Debug().
Str("instance", instanceName).
Float64("cpu", pbsInst.CPU).
Float64("memory", pbsInst.Memory).
Int64("uptime", pbsInst.Uptime).
Msg("PBS node status retrieved")
}
// Poll datastores if enabled
log.Debug().Bool("monitorDatastores", instanceCfg.MonitorDatastores).Str("instance", instanceName).Msg("Checking if datastore monitoring is enabled")
if instanceCfg.MonitorDatastores {
datastores, err := client.GetDatastores(ctx)
if err != nil {
monErr := errors.WrapAPIError("get_datastores", instanceName, err, 0)
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get datastores")
} else {
log.Info().
Str("instance", instanceName).
Int("count", len(datastores)).
Msg("Got PBS datastores")
for _, ds := range datastores {
// Use whichever fields are populated
total := ds.Total
if total == 0 && ds.TotalSpace > 0 {
total = ds.TotalSpace
}
used := ds.Used
if used == 0 && ds.UsedSpace > 0 {
used = ds.UsedSpace
}
avail := ds.Avail
if avail == 0 && ds.AvailSpace > 0 {
avail = ds.AvailSpace
}
// If still 0, try to calculate from each other
if total == 0 && used > 0 && avail > 0 {
total = used + avail
}
log.Debug().
Str("store", ds.Store).
Int64("total", total).
Int64("used", used).
Int64("avail", avail).
Int64("orig_total", ds.Total).
Int64("orig_total_space", ds.TotalSpace).
Msg("PBS datastore details")
modelDS := models.PBSDatastore{
Name: ds.Store,
Total: total,
Used: used,
Free: avail,
Usage: safePercentage(float64(used), float64(total)),
Status: "available",
}
// Discover namespaces for this datastore
namespaces, err := client.ListNamespaces(ctx, ds.Store, "", 0)
if err != nil {
log.Warn().Err(err).
Str("instance", instanceName).
Str("datastore", ds.Store).
Msg("Failed to list namespaces")
} else {
// Convert PBS namespaces to model namespaces
for _, ns := range namespaces {
nsPath := ns.NS
if nsPath == "" {
nsPath = ns.Path
}
if nsPath == "" {
nsPath = ns.Name
}
modelNS := models.PBSNamespace{
Path: nsPath,
Parent: ns.Parent,
Depth: strings.Count(nsPath, "/"),
}
modelDS.Namespaces = append(modelDS.Namespaces, modelNS)
}
// Always include root namespace
hasRoot := false
for _, ns := range modelDS.Namespaces {
if ns.Path == "" {
hasRoot = true
break
}
}
if !hasRoot {
modelDS.Namespaces = append([]models.PBSNamespace{{Path: "", Depth: 0}}, modelDS.Namespaces...)
}
}
pbsInst.Datastores = append(pbsInst.Datastores, modelDS)
}
}
}
// Update state - merge with existing instances
m.state.UpdatePBSInstance(pbsInst)
log.Info().
Str("instance", instanceName).
Str("id", pbsInst.ID).
Int("datastores", len(pbsInst.Datastores)).
Msg("PBS instance updated in state")
// Poll backups if enabled
if instanceCfg.MonitorBackups {
log.Info().
Str("instance", instanceName).
Int("datastores", len(pbsInst.Datastores)).
Msg("Polling PBS backups")
m.pollPBSBackups(ctx, instanceName, client, pbsInst.Datastores)
} else {
log.Debug().
Str("instance", instanceName).
Msg("PBS backup monitoring disabled")
}
}
// GetState returns the current state
func (m *Monitor) GetState() models.StateSnapshot {
return m.state.GetSnapshot()
}
// GetStartTime returns the monitor start time
func (m *Monitor) GetStartTime() time.Time {
return m.startTime
}
// GetDiscoveryService returns the discovery service
func (m *Monitor) GetDiscoveryService() *discovery.Service {
return m.discoveryService
}
// GetGuestMetrics returns historical metrics for a guest
func (m *Monitor) GetGuestMetrics(guestID string, duration time.Duration) map[string][]MetricPoint {
return m.metricsHistory.GetAllGuestMetrics(guestID, duration)
}
// GetNodeMetrics returns historical metrics for a node
func (m *Monitor) GetNodeMetrics(nodeID string, metricType string, duration time.Duration) []MetricPoint {
return m.metricsHistory.GetNodeMetrics(nodeID, metricType, duration)
}
// GetStorageMetrics returns historical metrics for storage
func (m *Monitor) GetStorageMetrics(storageID string, duration time.Duration) map[string][]MetricPoint {
return m.metricsHistory.GetAllStorageMetrics(storageID, duration)
}
// GetAlertManager returns the alert manager
func (m *Monitor) GetAlertManager() *alerts.Manager {
return m.alertManager
}
// GetNotificationManager returns the notification manager
func (m *Monitor) GetNotificationManager() *notifications.NotificationManager {
return m.notificationMgr
}
// GetConfigPersistence returns the config persistence manager
func (m *Monitor) GetConfigPersistence() *config.ConfigPersistence {
return m.configPersist
}
// pollStorageBackups polls backup files from storage
func (m *Monitor) pollStorageBackups(ctx context.Context, instanceName string, client PVEClientInterface) {
log.Debug().Str("instance", instanceName).Msg("Polling storage backups")
// Get all nodes
nodes, err := client.GetNodes(ctx)
if err != nil {
monErr := errors.WrapConnectionError("get_nodes_for_backups", instanceName, err)
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get nodes for backup polling")
return
}
var allBackups []models.StorageBackup
seenVolids := make(map[string]bool) // Track seen volume IDs to avoid duplicates
// For each node, get storage and check content
for _, node := range nodes {
if node.Status != "online" {
continue
}
// Get storage for this node
storages, err := client.GetStorage(ctx, node.Node)
if err != nil {
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_storage_for_backups", instanceName, err).WithNode(node.Node)
log.Error().Err(monErr).Str("node", node.Node).Msg("Failed to get storage")
continue
}
// For each storage that can contain backups or templates
for _, storage := range storages {
// Check if storage supports backup content
if !strings.Contains(storage.Content, "backup") {
continue
}
// Get storage content
contents, err := client.GetStorageContent(ctx, node.Node, storage.Storage)
if err != nil {
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_storage_content", instanceName, err).WithNode(node.Node)
log.Debug().Err(monErr).
Str("node", node.Node).
Str("storage", storage.Storage).
Msg("Failed to get storage content")
continue
}
// Convert to models
for _, content := range contents {
// Skip if we've already seen this item (shared storage duplicate)
if seenVolids[content.Volid] {
continue
}
seenVolids[content.Volid] = true
// Skip templates and ISOs - they're not backups
if content.Content == "vztmpl" || content.Content == "iso" {
continue
}
// Determine type from content type and volid
backupType := "unknown"
if strings.Contains(content.Volid, "/vm/") || strings.Contains(content.Volid, "qemu") {
backupType = "qemu"
} else if strings.Contains(content.Volid, "/ct/") || strings.Contains(content.Volid, "lxc") {
backupType = "lxc"
} else if strings.Contains(content.Format, "pbs-ct") {
// PBS format check as fallback
backupType = "lxc"
} else if strings.Contains(content.Format, "pbs-vm") {
// PBS format check as fallback
backupType = "qemu"
}
// For shared storage (like PBS), use the storage name as node
// to avoid confusion about which node the backup is on
backupNode := node.Node
isPBSStorage := strings.HasPrefix(storage.Storage, "pbs-") || storage.Type == "pbs"
if isPBSStorage || storage.Shared == 1 {
backupNode = storage.Storage // Use storage name for shared storage
}
// Check verification status for PBS backups
verified := false
verificationInfo := ""
if isPBSStorage {
// Check if verified flag is set
if content.Verified > 0 {
verified = true
}
// Also check verification map if available
if content.Verification != nil {
if state, ok := content.Verification["state"].(string); ok {
verified = (state == "ok")
verificationInfo = state
}
}
}
backup := models.StorageBackup{
ID: fmt.Sprintf("%s-%s", instanceName, content.Volid),
Storage: storage.Storage,
Node: backupNode,
Type: backupType,
VMID: content.VMID,
Time: time.Unix(content.CTime, 0),
CTime: content.CTime,
Size: int64(content.Size),
Format: content.Format,
Notes: content.Notes,
Protected: content.Protected > 0,
Volid: content.Volid,
IsPBS: isPBSStorage,
Verified: verified,
Verification: verificationInfo,
}
allBackups = append(allBackups, backup)
}
}
}
// Update state with storage backups for this instance
m.state.UpdateStorageBackupsForInstance(instanceName, allBackups)
log.Debug().
Str("instance", instanceName).
Int("count", len(allBackups)).
Msg("Storage backups polled")
}
// pollGuestSnapshots polls snapshots for all VMs and containers
func (m *Monitor) pollGuestSnapshots(ctx context.Context, instanceName string, client PVEClientInterface) {
log.Debug().Str("instance", instanceName).Msg("Polling guest snapshots")
// Create a separate context with a longer timeout for snapshot queries
// Snapshot queries can be slow, especially with many VMs/containers
snapshotCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// Get current VMs and containers from state
m.mu.RLock()
vms := append([]models.VM{}, m.state.VMs...)
containers := append([]models.Container{}, m.state.Containers...)
m.mu.RUnlock()
var allSnapshots []models.GuestSnapshot
// Poll VM snapshots
for _, vm := range vms {
// Skip templates
if vm.Template {
continue
}
snapshots, err := client.GetVMSnapshots(snapshotCtx, vm.Node, vm.VMID)
if err != nil {
// This is common for VMs without snapshots, so use debug level
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_vm_snapshots", instanceName, err).WithNode(vm.Node)
log.Debug().
Err(monErr).
Str("node", vm.Node).
Int("vmid", vm.VMID).
Msg("Failed to get VM snapshots")
continue
}
for _, snap := range snapshots {
snapshot := models.GuestSnapshot{
ID: fmt.Sprintf("%s-%s-%d-%s", instanceName, vm.Node, vm.VMID, snap.Name),
Name: snap.Name,
Node: vm.Node,
Type: "qemu",
VMID: vm.VMID,
Time: time.Unix(snap.SnapTime, 0),
Description: snap.Description,
Parent: snap.Parent,
VMState: true, // VM state support enabled
}
allSnapshots = append(allSnapshots, snapshot)
}
}
// Poll container snapshots
for _, ct := range containers {
// Skip templates
if ct.Template {
continue
}
snapshots, err := client.GetContainerSnapshots(snapshotCtx, ct.Node, ct.VMID)
if err != nil {
// API error 596 means snapshots not supported/available - this is expected for many containers
errStr := err.Error()
if strings.Contains(errStr, "596") || strings.Contains(errStr, "not available") {
// Silently skip containers without snapshot support
continue
}
// Log other errors at debug level
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_container_snapshots", instanceName, err).WithNode(ct.Node)
log.Debug().
Err(monErr).
Str("node", ct.Node).
Int("vmid", ct.VMID).
Msg("Failed to get container snapshots")
continue
}
for _, snap := range snapshots {
snapshot := models.GuestSnapshot{
ID: fmt.Sprintf("%s-%s-%d-%s", instanceName, ct.Node, ct.VMID, snap.Name),
Name: snap.Name,
Node: ct.Node,
Type: "lxc",
VMID: ct.VMID,
Time: time.Unix(snap.SnapTime, 0),
Description: snap.Description,
Parent: snap.Parent,
VMState: false,
}
allSnapshots = append(allSnapshots, snapshot)
}
}
// Update state with guest snapshots for this instance
m.state.UpdateGuestSnapshotsForInstance(instanceName, allSnapshots)
log.Debug().
Str("instance", instanceName).
Int("count", len(allSnapshots)).
Msg("Guest snapshots polled")
}
// Stop gracefully stops the monitor
func (m *Monitor) Stop() {
log.Info().Msg("Stopping monitor")
// Stop the alert manager to save history
if m.alertManager != nil {
m.alertManager.Stop()
}
// Stop notification manager
if m.notificationMgr != nil {
m.notificationMgr.Stop()
}
log.Info().Msg("Monitor stopped")
}
// recordAuthFailure records an authentication failure for a node
func (m *Monitor) recordAuthFailure(instanceName string, nodeType string) {
m.mu.Lock()
defer m.mu.Unlock()
nodeID := instanceName
if nodeType != "" {
nodeID = nodeType + "-" + instanceName
}
// Increment failure count
m.authFailures[nodeID]++
m.lastAuthAttempt[nodeID] = time.Now()
log.Warn().
Str("node", nodeID).
Int("failures", m.authFailures[nodeID]).
Msg("Authentication failure recorded")
// If we've exceeded the threshold, remove the node
const maxAuthFailures = 5
if m.authFailures[nodeID] >= maxAuthFailures {
log.Error().
Str("node", nodeID).
Int("failures", m.authFailures[nodeID]).
Msg("Maximum authentication failures reached, removing node from state")
// Remove from state based on type
if nodeType == "pve" {
m.removeFailedPVENode(instanceName)
} else if nodeType == "pbs" {
m.removeFailedPBSNode(instanceName)
}
// Reset the counter since we've removed the node
delete(m.authFailures, nodeID)
delete(m.lastAuthAttempt, nodeID)
}
}
// resetAuthFailures resets the failure count for a node after successful auth
func (m *Monitor) resetAuthFailures(instanceName string, nodeType string) {
m.mu.Lock()
defer m.mu.Unlock()
nodeID := instanceName
if nodeType != "" {
nodeID = nodeType + "-" + instanceName
}
if count, exists := m.authFailures[nodeID]; exists && count > 0 {
log.Info().
Str("node", nodeID).
Int("previousFailures", count).
Msg("Authentication succeeded, resetting failure count")
delete(m.authFailures, nodeID)
delete(m.lastAuthAttempt, nodeID)
}
}
// removeFailedPVENode updates a PVE node to show failed authentication status
func (m *Monitor) removeFailedPVENode(instanceName string) {
// Get instance config to get host URL
var hostURL string
for _, cfg := range m.config.PVEInstances {
if cfg.Name == instanceName {
hostURL = cfg.Host
break
}
}
// Create a failed node entry to show in UI with error status
failedNode := models.Node{
ID: instanceName + "-failed",
Name: instanceName,
Instance: instanceName,
Host: hostURL, // Include host URL even for failed nodes
Status: "offline",
Type: "node",
ConnectionHealth: "error",
LastSeen: time.Now(),
// Set other fields to zero values to indicate no data
CPU: 0,
Memory: models.Memory{},
Disk: models.Disk{},
}
// Update with just the failed node
m.state.UpdateNodesForInstance(instanceName, []models.Node{failedNode})
// Remove all other resources associated with this instance
m.state.UpdateVMsForInstance(instanceName, []models.VM{})
m.state.UpdateContainersForInstance(instanceName, []models.Container{})
m.state.UpdateStorageForInstance(instanceName, []models.Storage{})
m.state.UpdateBackupTasksForInstance(instanceName, []models.BackupTask{})
m.state.UpdateStorageBackupsForInstance(instanceName, []models.StorageBackup{})
m.state.UpdateGuestSnapshotsForInstance(instanceName, []models.GuestSnapshot{})
// Set connection health to false
m.state.SetConnectionHealth(instanceName, false)
}
// removeFailedPBSNode removes a PBS node and all its resources from state
func (m *Monitor) removeFailedPBSNode(instanceName string) {
// Remove PBS instance by passing empty array
currentInstances := m.state.PBSInstances
var updatedInstances []models.PBSInstance
for _, inst := range currentInstances {
if inst.Name != instanceName {
updatedInstances = append(updatedInstances, inst)
}
}
m.state.UpdatePBSInstances(updatedInstances)
// Remove PBS backups
m.state.UpdatePBSBackups(instanceName, []models.PBSBackup{})
// Set connection health to false
m.state.SetConnectionHealth("pbs-"+instanceName, false)
}
// pollPBSBackups fetches all backups from PBS datastores
func (m *Monitor) pollPBSBackups(ctx context.Context, instanceName string, client *pbs.Client, datastores []models.PBSDatastore) {
log.Debug().Str("instance", instanceName).Msg("Polling PBS backups")
var allBackups []models.PBSBackup
// Process each datastore
for _, ds := range datastores {
// Get namespace paths
namespacePaths := make([]string, 0, len(ds.Namespaces))
for _, ns := range ds.Namespaces {
namespacePaths = append(namespacePaths, ns.Path)
}
log.Info().
Str("instance", instanceName).
Str("datastore", ds.Name).
Int("namespaces", len(namespacePaths)).
Strs("namespace_paths", namespacePaths).
Msg("Processing datastore namespaces")
// Fetch backups from all namespaces concurrently
backupsMap, err := client.ListAllBackups(ctx, ds.Name, namespacePaths)
if err != nil {
log.Error().Err(err).
Str("instance", instanceName).
Str("datastore", ds.Name).
Msg("Failed to fetch PBS backups")
continue
}
// Convert PBS backups to model backups
for namespace, snapshots := range backupsMap {
for _, snapshot := range snapshots {
backupTime := time.Unix(snapshot.BackupTime, 0)
// Generate unique ID
id := fmt.Sprintf("pbs-%s-%s-%s-%s-%s-%d",
instanceName, ds.Name, namespace,
snapshot.BackupType, snapshot.BackupID,
snapshot.BackupTime)
// Extract file names from files (which can be strings or objects)
var fileNames []string
for _, file := range snapshot.Files {
switch f := file.(type) {
case string:
fileNames = append(fileNames, f)
case map[string]interface{}:
if filename, ok := f["filename"].(string); ok {
fileNames = append(fileNames, filename)
}
}
}
// Extract verification status
verified := false
if snapshot.Verification != nil {
switch v := snapshot.Verification.(type) {
case string:
verified = v == "ok"
case map[string]interface{}:
if state, ok := v["state"].(string); ok {
verified = state == "ok"
}
}
// Debug log verification data
log.Debug().
Str("vmid", snapshot.BackupID).
Int64("time", snapshot.BackupTime).
Interface("verification", snapshot.Verification).
Bool("verified", verified).
Msg("PBS backup verification status")
}
backup := models.PBSBackup{
ID: id,
Instance: instanceName,
Datastore: ds.Name,
Namespace: namespace,
BackupType: snapshot.BackupType,
VMID: snapshot.BackupID,
BackupTime: backupTime,
Size: snapshot.Size,
Protected: snapshot.Protected,
Verified: verified,
Comment: snapshot.Comment,
Files: fileNames,
}
allBackups = append(allBackups, backup)
}
}
}
log.Info().
Str("instance", instanceName).
Int("count", len(allBackups)).
Msg("PBS backups fetched")
// Update state
m.state.UpdatePBSBackups(instanceName, allBackups)
}