mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-05-11 21:28:15 +00:00
- Replace all 'any' types with proper TypeScript types throughout the codebase - Fix Record<string, any> to use specific types (AlertThresholds, unknown) - Update logger methods to use 'unknown' instead of 'any' for parameters - Fix type assertions to use proper types instead of 'as any' - Update generic type defaults from 'any' to 'unknown' - Fix WebSocket message types to use 'unknown' for optional data - Move global Toast declaration to top level to fix TypeScript errors - Comment out legacy PBS backup code that referenced non-existent fields - Ensure all code follows TypeScript standards as documented in CLAUDE.md All TypeScript compilation errors have been resolved and the codebase now adheres to strict typing standards with no 'any' types remaining.
1769 lines
54 KiB
Go
1769 lines
54 KiB
Go
package monitoring
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/alerts"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/config"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/errors"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/models"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/notifications"
|
|
"github.com/rcourtman/pulse-go-rewrite/internal/websocket"
|
|
"github.com/rcourtman/pulse-go-rewrite/pkg/proxmox"
|
|
"github.com/rcourtman/pulse-go-rewrite/pkg/pbs"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
// PVEClientInterface defines the interface for PVE clients (both regular and cluster)
|
|
type PVEClientInterface interface {
|
|
GetNodes(ctx context.Context) ([]proxmox.Node, error)
|
|
GetNodeStatus(ctx context.Context, node string) (*proxmox.NodeStatus, error)
|
|
GetVMs(ctx context.Context, node string) ([]proxmox.VM, error)
|
|
GetContainers(ctx context.Context, node string) ([]proxmox.Container, error)
|
|
GetStorage(ctx context.Context, node string) ([]proxmox.Storage, error)
|
|
GetAllStorage(ctx context.Context) ([]proxmox.Storage, error)
|
|
GetBackupTasks(ctx context.Context) ([]proxmox.Task, error)
|
|
GetStorageContent(ctx context.Context, node, storage string) ([]proxmox.StorageContent, error)
|
|
GetVMSnapshots(ctx context.Context, node string, vmid int) ([]proxmox.Snapshot, error)
|
|
GetContainerSnapshots(ctx context.Context, node string, vmid int) ([]proxmox.Snapshot, error)
|
|
}
|
|
|
|
// Monitor handles all monitoring operations
|
|
type Monitor struct {
|
|
config *config.Config
|
|
state *models.State
|
|
pveClients map[string]PVEClientInterface
|
|
pbsClients map[string]*pbs.Client
|
|
mu sync.RWMutex
|
|
startTime time.Time
|
|
rateTracker *RateTracker
|
|
metricsHistory *MetricsHistory
|
|
alertManager *alerts.Manager
|
|
notificationMgr *notifications.NotificationManager
|
|
configPersist *config.ConfigPersistence
|
|
activePollCount int32 // Number of active polling operations
|
|
pollCounter int64 // Counter for polling cycles
|
|
authFailures map[string]int // Track consecutive auth failures per node
|
|
lastAuthAttempt map[string]time.Time // Track last auth attempt time
|
|
}
|
|
|
|
// safePercentage calculates percentage safely, returning 0 if divisor is 0
|
|
func safePercentage(used, total float64) float64 {
|
|
if total == 0 {
|
|
return 0
|
|
}
|
|
result := used / total * 100
|
|
if math.IsNaN(result) || math.IsInf(result, 0) {
|
|
return 0
|
|
}
|
|
return result
|
|
}
|
|
|
|
// safeFloat ensures a float value is not NaN or Inf
|
|
func safeFloat(val float64) float64 {
|
|
if math.IsNaN(val) || math.IsInf(val, 0) {
|
|
return 0
|
|
}
|
|
return val
|
|
}
|
|
|
|
// sortContent sorts comma-separated content values for consistent display
|
|
func sortContent(content string) string {
|
|
if content == "" {
|
|
return ""
|
|
}
|
|
parts := strings.Split(content, ",")
|
|
sort.Strings(parts)
|
|
return strings.Join(parts, ",")
|
|
}
|
|
|
|
// GetConnectionStatuses returns the current connection status for all nodes
|
|
func (m *Monitor) GetConnectionStatuses() map[string]bool {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
statuses := make(map[string]bool)
|
|
|
|
// Check PVE clients
|
|
for name, client := range m.pveClients {
|
|
// Simple check - if we have a client, consider it connected
|
|
// In reality, you'd want to check if recent API calls succeeded
|
|
statuses["pve-"+name] = client != nil
|
|
}
|
|
|
|
// Check PBS clients
|
|
for name, client := range m.pbsClients {
|
|
statuses["pbs-"+name] = client != nil
|
|
}
|
|
|
|
return statuses
|
|
}
|
|
|
|
// New creates a new Monitor instance
|
|
func New(cfg *config.Config) (*Monitor, error) {
|
|
m := &Monitor{
|
|
config: cfg,
|
|
state: models.NewState(),
|
|
pveClients: make(map[string]PVEClientInterface),
|
|
pbsClients: make(map[string]*pbs.Client),
|
|
startTime: time.Now(),
|
|
rateTracker: NewRateTracker(),
|
|
metricsHistory: NewMetricsHistory(1000, 24*time.Hour), // Keep up to 1000 points or 24 hours
|
|
alertManager: alerts.NewManager(),
|
|
notificationMgr: notifications.NewNotificationManager(),
|
|
configPersist: config.NewConfigPersistence(""),
|
|
authFailures: make(map[string]int),
|
|
lastAuthAttempt: make(map[string]time.Time),
|
|
}
|
|
|
|
// Load saved configurations
|
|
if alertConfig, err := m.configPersist.LoadAlertConfig(); err == nil {
|
|
m.alertManager.UpdateConfig(*alertConfig)
|
|
// Apply schedule settings to notification manager
|
|
if alertConfig.Schedule.Cooldown > 0 {
|
|
m.notificationMgr.SetCooldown(alertConfig.Schedule.Cooldown)
|
|
}
|
|
if alertConfig.Schedule.GroupingWindow > 0 {
|
|
m.notificationMgr.SetGroupingWindow(alertConfig.Schedule.GroupingWindow)
|
|
} else if alertConfig.Schedule.Grouping.Window > 0 {
|
|
m.notificationMgr.SetGroupingWindow(alertConfig.Schedule.Grouping.Window)
|
|
}
|
|
m.notificationMgr.SetGroupingOptions(
|
|
alertConfig.Schedule.Grouping.ByNode,
|
|
alertConfig.Schedule.Grouping.ByGuest,
|
|
)
|
|
} else {
|
|
log.Warn().Err(err).Msg("Failed to load alert configuration")
|
|
}
|
|
|
|
if emailConfig, err := m.configPersist.LoadEmailConfig(); err == nil {
|
|
m.notificationMgr.SetEmailConfig(*emailConfig)
|
|
} else {
|
|
log.Warn().Err(err).Msg("Failed to load email configuration")
|
|
}
|
|
|
|
if webhooks, err := m.configPersist.LoadWebhooks(); err == nil {
|
|
for _, webhook := range webhooks {
|
|
m.notificationMgr.AddWebhook(webhook)
|
|
}
|
|
} else {
|
|
log.Warn().Err(err).Msg("Failed to load webhook configuration")
|
|
}
|
|
|
|
// Initialize PVE clients
|
|
log.Info().Int("count", len(cfg.PVEInstances)).Msg("Initializing PVE clients")
|
|
for _, pve := range cfg.PVEInstances {
|
|
log.Info().
|
|
Str("name", pve.Name).
|
|
Str("host", pve.Host).
|
|
Str("user", pve.User).
|
|
Bool("hasToken", pve.TokenName != "").
|
|
Msg("Configuring PVE instance")
|
|
|
|
// Check if this is a cluster
|
|
if pve.IsCluster && len(pve.ClusterEndpoints) > 0 {
|
|
// Create cluster client
|
|
endpoints := make([]string, 0, len(pve.ClusterEndpoints))
|
|
for _, ep := range pve.ClusterEndpoints {
|
|
// Use IP if available, otherwise use host
|
|
host := ep.IP
|
|
if host == "" {
|
|
host = ep.Host
|
|
}
|
|
// Ensure we have the full URL
|
|
if !strings.HasPrefix(host, "http") {
|
|
if pve.VerifySSL {
|
|
host = fmt.Sprintf("https://%s:8006", host)
|
|
} else {
|
|
host = fmt.Sprintf("https://%s:8006", host)
|
|
}
|
|
}
|
|
endpoints = append(endpoints, host)
|
|
}
|
|
|
|
log.Info().
|
|
Str("cluster", pve.ClusterName).
|
|
Strs("endpoints", endpoints).
|
|
Msg("Creating cluster-aware client")
|
|
|
|
clusterClient := proxmox.NewClusterClient(
|
|
pve.Name,
|
|
proxmox.ClientConfig{
|
|
User: pve.User,
|
|
Password: pve.Password,
|
|
TokenName: pve.TokenName,
|
|
TokenValue: pve.TokenValue,
|
|
Fingerprint: pve.Fingerprint,
|
|
VerifySSL: pve.VerifySSL,
|
|
Timeout: cfg.ConnectionTimeout,
|
|
},
|
|
endpoints,
|
|
)
|
|
m.pveClients[pve.Name] = clusterClient
|
|
log.Info().
|
|
Str("instance", pve.Name).
|
|
Str("cluster", pve.ClusterName).
|
|
Int("endpoints", len(endpoints)).
|
|
Msg("Cluster client created successfully")
|
|
} else {
|
|
// Create regular client
|
|
client, err := proxmox.NewClient(proxmox.ClientConfig{
|
|
Host: pve.Host,
|
|
User: pve.User,
|
|
Password: pve.Password,
|
|
TokenName: pve.TokenName,
|
|
TokenValue: pve.TokenValue,
|
|
Fingerprint: pve.Fingerprint,
|
|
VerifySSL: pve.VerifySSL,
|
|
Timeout: cfg.ConnectionTimeout,
|
|
})
|
|
if err != nil {
|
|
monErr := errors.WrapConnectionError("create_pve_client", pve.Name, err)
|
|
log.Error().Err(monErr).Str("instance", pve.Name).Msg("Failed to create PVE client")
|
|
continue
|
|
}
|
|
m.pveClients[pve.Name] = client
|
|
log.Info().Str("instance", pve.Name).Msg("PVE client created successfully")
|
|
}
|
|
}
|
|
|
|
// Initialize PBS clients
|
|
log.Info().Int("count", len(cfg.PBSInstances)).Msg("Initializing PBS clients")
|
|
for _, pbsInst := range cfg.PBSInstances {
|
|
log.Info().
|
|
Str("name", pbsInst.Name).
|
|
Str("host", pbsInst.Host).
|
|
Str("user", pbsInst.User).
|
|
Bool("hasToken", pbsInst.TokenName != "").
|
|
Msg("Configuring PBS instance")
|
|
|
|
client, err := pbs.NewClient(pbs.ClientConfig{
|
|
Host: pbsInst.Host,
|
|
User: pbsInst.User,
|
|
Password: pbsInst.Password,
|
|
TokenName: pbsInst.TokenName,
|
|
TokenValue: pbsInst.TokenValue,
|
|
Fingerprint: pbsInst.Fingerprint,
|
|
VerifySSL: pbsInst.VerifySSL,
|
|
Timeout: cfg.ConnectionTimeout,
|
|
})
|
|
if err != nil {
|
|
monErr := errors.WrapConnectionError("create_pbs_client", pbsInst.Name, err)
|
|
log.Error().Err(monErr).Str("instance", pbsInst.Name).Msg("Failed to create PBS client")
|
|
continue
|
|
}
|
|
m.pbsClients[pbsInst.Name] = client
|
|
log.Info().Str("instance", pbsInst.Name).Msg("PBS client created successfully")
|
|
}
|
|
|
|
// Initialize state stats
|
|
m.state.Stats = models.Stats{
|
|
StartTime: m.startTime,
|
|
Version: "2.0.0-go",
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
|
|
// Start begins the monitoring loop
|
|
func (m *Monitor) Start(ctx context.Context, wsHub *websocket.Hub) {
|
|
log.Info().
|
|
Dur("pollingInterval", m.config.PollingInterval).
|
|
Msg("Starting monitoring loop")
|
|
|
|
// Set up alert callbacks
|
|
m.alertManager.SetAlertCallback(func(alert *alerts.Alert) {
|
|
wsHub.BroadcastAlert(alert)
|
|
// Send notifications
|
|
go m.notificationMgr.SendAlert(alert)
|
|
})
|
|
m.alertManager.SetResolvedCallback(func(alertID string) {
|
|
wsHub.BroadcastAlertResolved(alertID)
|
|
// Broadcast updated state immediately so frontend gets the new activeAlerts list
|
|
state := m.GetState()
|
|
wsHub.BroadcastState(state)
|
|
})
|
|
m.alertManager.SetEscalateCallback(func(alert *alerts.Alert, level int) {
|
|
log.Info().
|
|
Str("alertID", alert.ID).
|
|
Int("level", level).
|
|
Msg("Alert escalated - sending notifications")
|
|
|
|
// Get escalation config
|
|
config := m.alertManager.GetConfig()
|
|
if level <= 0 || level > len(config.Schedule.Escalation.Levels) {
|
|
return
|
|
}
|
|
|
|
escalationLevel := config.Schedule.Escalation.Levels[level-1]
|
|
|
|
// Send notifications based on escalation level
|
|
switch escalationLevel.Notify {
|
|
case "email":
|
|
// Only send email
|
|
if emailConfig := m.notificationMgr.GetEmailConfig(); emailConfig.Enabled {
|
|
m.notificationMgr.SendAlert(alert)
|
|
}
|
|
case "webhook":
|
|
// Only send webhooks
|
|
for _, webhook := range m.notificationMgr.GetWebhooks() {
|
|
if webhook.Enabled {
|
|
m.notificationMgr.SendAlert(alert)
|
|
break
|
|
}
|
|
}
|
|
case "all":
|
|
// Send all notifications
|
|
m.notificationMgr.SendAlert(alert)
|
|
}
|
|
|
|
// Update WebSocket with escalation
|
|
wsHub.BroadcastAlert(alert)
|
|
})
|
|
|
|
// Create separate tickers for polling and broadcasting
|
|
pollTicker := time.NewTicker(m.config.PollingInterval)
|
|
defer pollTicker.Stop()
|
|
|
|
broadcastTicker := time.NewTicker(m.config.PollingInterval)
|
|
defer broadcastTicker.Stop()
|
|
|
|
// Do an immediate poll on start
|
|
go m.poll(ctx, wsHub)
|
|
|
|
for {
|
|
select {
|
|
case <-pollTicker.C:
|
|
// Start polling in a goroutine so it doesn't block the ticker
|
|
go m.poll(ctx, wsHub)
|
|
|
|
case <-broadcastTicker.C:
|
|
// Broadcast current state regardless of polling status
|
|
state := m.state.GetSnapshot()
|
|
log.Info().
|
|
Int("nodes", len(state.Nodes)).
|
|
Int("vms", len(state.VMs)).
|
|
Int("containers", len(state.Containers)).
|
|
Int("pbsBackups", len(state.PBSBackups)).
|
|
Msg("Broadcasting state update (ticker)")
|
|
wsHub.BroadcastState(state)
|
|
|
|
case <-ctx.Done():
|
|
log.Info().Msg("Monitoring loop stopped")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// poll fetches data from all configured instances
|
|
func (m *Monitor) poll(ctx context.Context, wsHub *websocket.Hub) {
|
|
// Limit concurrent polls to 2 to prevent resource exhaustion
|
|
currentCount := atomic.AddInt32(&m.activePollCount, 1)
|
|
if currentCount > 2 {
|
|
atomic.AddInt32(&m.activePollCount, -1)
|
|
log.Debug().Int32("activePolls", currentCount-1).Msg("Too many concurrent polls, skipping")
|
|
return
|
|
}
|
|
defer atomic.AddInt32(&m.activePollCount, -1)
|
|
|
|
log.Debug().Msg("Starting polling cycle")
|
|
startTime := time.Now()
|
|
|
|
if m.config.ConcurrentPolling {
|
|
// Use concurrent polling
|
|
m.pollConcurrent(ctx)
|
|
} else {
|
|
m.pollSequential(ctx)
|
|
}
|
|
|
|
// Update performance metrics
|
|
m.state.Performance.LastPollDuration = time.Since(startTime).Seconds()
|
|
m.state.Stats.PollingCycles++
|
|
m.state.Stats.Uptime = int64(time.Since(m.startTime).Seconds())
|
|
m.state.Stats.WebSocketClients = wsHub.GetClientCount()
|
|
|
|
// Sync active alerts to state
|
|
activeAlerts := m.alertManager.GetActiveAlerts()
|
|
modelAlerts := make([]models.Alert, 0, len(activeAlerts))
|
|
for _, alert := range activeAlerts {
|
|
modelAlerts = append(modelAlerts, models.Alert{
|
|
ID: alert.ID,
|
|
Type: alert.Type,
|
|
Level: string(alert.Level),
|
|
ResourceID: alert.ResourceID,
|
|
ResourceName: alert.ResourceName,
|
|
Node: alert.Node,
|
|
Instance: alert.Instance,
|
|
Message: alert.Message,
|
|
Value: alert.Value,
|
|
Threshold: alert.Threshold,
|
|
StartTime: alert.StartTime,
|
|
Acknowledged: alert.Acknowledged,
|
|
})
|
|
}
|
|
m.state.UpdateActiveAlerts(modelAlerts)
|
|
|
|
// Sync recently resolved alerts
|
|
recentlyResolved := m.alertManager.GetRecentlyResolved()
|
|
if len(recentlyResolved) > 0 {
|
|
log.Info().Int("count", len(recentlyResolved)).Msg("Syncing recently resolved alerts")
|
|
}
|
|
m.state.UpdateRecentlyResolved(recentlyResolved)
|
|
|
|
// Increment poll counter
|
|
m.mu.Lock()
|
|
m.pollCounter++
|
|
m.mu.Unlock()
|
|
|
|
log.Debug().Dur("duration", time.Since(startTime)).Msg("Polling cycle completed")
|
|
|
|
// Broadcasting is now handled by the timer in Start()
|
|
}
|
|
|
|
// pollConcurrent polls all instances concurrently
|
|
func (m *Monitor) pollConcurrent(ctx context.Context) {
|
|
var wg sync.WaitGroup
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// Poll PVE instances
|
|
for name, client := range m.pveClients {
|
|
// Check if context is already cancelled before starting
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(instanceName string, c PVEClientInterface) {
|
|
defer wg.Done()
|
|
// Pass context to ensure cancellation propagates
|
|
m.pollPVEInstance(ctx, instanceName, c)
|
|
}(name, client)
|
|
}
|
|
|
|
// Poll PBS instances
|
|
for name, client := range m.pbsClients {
|
|
// Check if context is already cancelled before starting
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(instanceName string, c *pbs.Client) {
|
|
defer wg.Done()
|
|
// Pass context to ensure cancellation propagates
|
|
m.pollPBSInstance(ctx, instanceName, c)
|
|
}(name, client)
|
|
}
|
|
|
|
// Wait for all goroutines to complete or context cancellation
|
|
done := make(chan struct{})
|
|
go func() {
|
|
wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
// All goroutines completed normally
|
|
case <-ctx.Done():
|
|
// Context cancelled, cancel all operations
|
|
cancel()
|
|
// Still wait for goroutines to finish gracefully
|
|
wg.Wait()
|
|
}
|
|
}
|
|
|
|
// pollSequential polls all instances sequentially
|
|
func (m *Monitor) pollSequential(ctx context.Context) {
|
|
// Poll PVE instances
|
|
for name, client := range m.pveClients {
|
|
// Check context before each instance
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
m.pollPVEInstance(ctx, name, client)
|
|
}
|
|
|
|
// Poll PBS instances
|
|
for name, client := range m.pbsClients {
|
|
// Check context before each instance
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
m.pollPBSInstance(ctx, name, client)
|
|
}
|
|
}
|
|
|
|
// pollPVEInstance polls a single PVE instance
|
|
func (m *Monitor) pollPVEInstance(ctx context.Context, instanceName string, client PVEClientInterface) {
|
|
// Check if context is cancelled
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Debug().Str("instance", instanceName).Msg("Polling cancelled")
|
|
return
|
|
default:
|
|
}
|
|
|
|
log.Debug().Str("instance", instanceName).Msg("Polling PVE instance")
|
|
|
|
// Get instance config
|
|
var instanceCfg *config.PVEInstance
|
|
for _, cfg := range m.config.PVEInstances {
|
|
if cfg.Name == instanceName {
|
|
instanceCfg = &cfg
|
|
break
|
|
}
|
|
}
|
|
if instanceCfg == nil {
|
|
return
|
|
}
|
|
|
|
// Poll nodes
|
|
nodes, err := client.GetNodes(ctx)
|
|
if err != nil {
|
|
monErr := errors.WrapConnectionError("poll_nodes", instanceName, err)
|
|
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get nodes")
|
|
m.state.SetConnectionHealth(instanceName, false)
|
|
|
|
// Track auth failure if it's an authentication error
|
|
if errors.IsAuthError(err) {
|
|
m.recordAuthFailure(instanceName, "pve")
|
|
}
|
|
return
|
|
}
|
|
|
|
// Reset auth failures on successful connection
|
|
m.resetAuthFailures(instanceName, "pve")
|
|
m.state.SetConnectionHealth(instanceName, true)
|
|
|
|
// Convert to models
|
|
var modelNodes []models.Node
|
|
for _, node := range nodes {
|
|
modelNode := models.Node{
|
|
ID: instanceName + "-" + node.Node,
|
|
Name: node.Node,
|
|
Instance: instanceName,
|
|
Status: node.Status,
|
|
Type: "node",
|
|
CPU: safeFloat(node.CPU), // Already in percentage
|
|
Memory: models.Memory{
|
|
Total: int64(node.MaxMem),
|
|
Used: int64(node.Mem),
|
|
Free: int64(node.MaxMem - node.Mem),
|
|
Usage: safePercentage(float64(node.Mem), float64(node.MaxMem)),
|
|
},
|
|
Disk: models.Disk{
|
|
Total: int64(node.MaxDisk),
|
|
Used: int64(node.Disk),
|
|
Free: int64(node.MaxDisk - node.Disk),
|
|
Usage: safePercentage(float64(node.Disk), float64(node.MaxDisk)),
|
|
},
|
|
Uptime: int64(node.Uptime),
|
|
LoadAverage: []float64{},
|
|
LastSeen: time.Now(),
|
|
ConnectionHealth: "healthy",
|
|
}
|
|
|
|
// Debug logging for disk metrics - note that these values can fluctuate
|
|
// due to thin provisioning and dynamic allocation
|
|
log.Debug().
|
|
Str("node", node.Node).
|
|
Uint64("disk", node.Disk).
|
|
Uint64("maxDisk", node.MaxDisk).
|
|
Float64("diskUsage", safePercentage(float64(node.Disk), float64(node.MaxDisk))).
|
|
Msg("Node disk metrics (raw from Proxmox)")
|
|
|
|
// Get detailed node info if available
|
|
if nodeInfo, err := client.GetNodeStatus(ctx, node.Node); err == nil {
|
|
// Convert LoadAvg from interface{} to float64
|
|
loadAvg := make([]float64, 0, len(nodeInfo.LoadAvg))
|
|
for _, val := range nodeInfo.LoadAvg {
|
|
switch v := val.(type) {
|
|
case float64:
|
|
loadAvg = append(loadAvg, v)
|
|
case string:
|
|
if f, err := strconv.ParseFloat(v, 64); err == nil {
|
|
loadAvg = append(loadAvg, f)
|
|
}
|
|
}
|
|
}
|
|
modelNode.LoadAverage = loadAvg
|
|
modelNode.KernelVersion = nodeInfo.KernelVersion
|
|
modelNode.PVEVersion = nodeInfo.PVEVersion
|
|
|
|
// Use rootfs data if available for more stable disk metrics
|
|
if nodeInfo.RootFS != nil && nodeInfo.RootFS.Total > 0 {
|
|
modelNode.Disk = models.Disk{
|
|
Total: int64(nodeInfo.RootFS.Total),
|
|
Used: int64(nodeInfo.RootFS.Used),
|
|
Free: int64(nodeInfo.RootFS.Free),
|
|
Usage: safePercentage(float64(nodeInfo.RootFS.Used), float64(nodeInfo.RootFS.Total)),
|
|
}
|
|
log.Debug().
|
|
Str("node", node.Node).
|
|
Uint64("rootfsUsed", nodeInfo.RootFS.Used).
|
|
Uint64("rootfsTotal", nodeInfo.RootFS.Total).
|
|
Float64("rootfsUsage", modelNode.Disk.Usage).
|
|
Msg("Using rootfs for disk metrics")
|
|
}
|
|
|
|
if nodeInfo.CPUInfo != nil {
|
|
// Use MaxCPU from node data for logical CPU count (includes hyperthreading)
|
|
// If MaxCPU is not available or 0, fall back to physical cores
|
|
logicalCores := node.MaxCPU
|
|
if logicalCores == 0 {
|
|
logicalCores = nodeInfo.CPUInfo.Cores
|
|
}
|
|
|
|
log.Debug().
|
|
Str("node", node.Node).
|
|
Str("model", nodeInfo.CPUInfo.Model).
|
|
Int("cores", nodeInfo.CPUInfo.Cores).
|
|
Int("logicalCores", logicalCores).
|
|
Int("sockets", nodeInfo.CPUInfo.Sockets).
|
|
Str("mhz", nodeInfo.CPUInfo.MHz).
|
|
Msg("Node CPU info from Proxmox")
|
|
modelNode.CPUInfo = models.CPUInfo{
|
|
Model: nodeInfo.CPUInfo.Model,
|
|
Cores: logicalCores, // Use logical cores for display
|
|
Sockets: nodeInfo.CPUInfo.Sockets,
|
|
MHz: nodeInfo.CPUInfo.MHz,
|
|
}
|
|
}
|
|
} else {
|
|
log.Debug().Err(err).Str("node", node.Node).Msg("Failed to get node status")
|
|
}
|
|
|
|
modelNodes = append(modelNodes, modelNode)
|
|
}
|
|
|
|
// Update state first so we have nodes available
|
|
m.state.UpdateNodesForInstance(instanceName, modelNodes)
|
|
|
|
// Now get storage data to use as fallback for disk metrics if needed
|
|
storageByNode := make(map[string]models.Disk)
|
|
if instanceCfg.MonitorStorage {
|
|
_, err := client.GetAllStorage(ctx)
|
|
if err == nil {
|
|
for _, node := range nodes {
|
|
nodeStorages, err := client.GetStorage(ctx, node.Node)
|
|
if err == nil {
|
|
// Look for local or local-lvm storage as most stable disk metric
|
|
for _, storage := range nodeStorages {
|
|
if storage.Storage == "local" || storage.Storage == "local-lvm" {
|
|
disk := models.Disk{
|
|
Total: int64(storage.Total),
|
|
Used: int64(storage.Used),
|
|
Free: int64(storage.Available),
|
|
Usage: safePercentage(float64(storage.Used), float64(storage.Total)),
|
|
}
|
|
// Prefer "local" over "local-lvm"
|
|
if _, exists := storageByNode[node.Node]; !exists || storage.Storage == "local" {
|
|
storageByNode[node.Node] = disk
|
|
log.Debug().
|
|
Str("node", node.Node).
|
|
Str("storage", storage.Storage).
|
|
Float64("usage", disk.Usage).
|
|
Msg("Using storage for disk metrics fallback")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update nodes with storage fallback if rootfs was not available
|
|
for i := range modelNodes {
|
|
if modelNodes[i].Disk.Total == 0 {
|
|
if disk, exists := storageByNode[modelNodes[i].Name]; exists {
|
|
modelNodes[i].Disk = disk
|
|
log.Debug().
|
|
Str("node", modelNodes[i].Name).
|
|
Float64("usage", disk.Usage).
|
|
Msg("Applied storage fallback for disk metrics")
|
|
}
|
|
}
|
|
|
|
// Record node metrics history
|
|
now := time.Now()
|
|
m.metricsHistory.AddNodeMetric(modelNodes[i].ID, "cpu", modelNodes[i].CPU*100, now)
|
|
m.metricsHistory.AddNodeMetric(modelNodes[i].ID, "memory", modelNodes[i].Memory.Usage, now)
|
|
m.metricsHistory.AddNodeMetric(modelNodes[i].ID, "disk", modelNodes[i].Disk.Usage, now)
|
|
|
|
// Check thresholds for alerts
|
|
m.alertManager.CheckNode(modelNodes[i])
|
|
}
|
|
|
|
// Update state again with corrected disk metrics
|
|
m.state.UpdateNodesForInstance(instanceName, modelNodes)
|
|
|
|
// Poll VMs if enabled
|
|
if instanceCfg.MonitorVMs {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
m.pollVMs(ctx, instanceName, client)
|
|
}
|
|
}
|
|
|
|
// Poll containers if enabled
|
|
if instanceCfg.MonitorContainers {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
m.pollContainers(ctx, instanceName, client)
|
|
}
|
|
}
|
|
|
|
// Poll storage if enabled
|
|
if instanceCfg.MonitorStorage {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
m.pollStorage(ctx, instanceName, client)
|
|
}
|
|
}
|
|
|
|
// Poll backups if enabled - using configurable cycle count
|
|
// This prevents slow backup/snapshot queries from blocking real-time stats
|
|
// Also poll on first cycle (pollCounter == 1) to ensure data loads quickly
|
|
backupCycles := 10 // default
|
|
if m.config.BackupPollingCycles > 0 {
|
|
backupCycles = m.config.BackupPollingCycles
|
|
}
|
|
if instanceCfg.MonitorBackups && (m.pollCounter%int64(backupCycles) == 0 || m.pollCounter == 1) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
// Run backup polling in a separate goroutine to not block main polling
|
|
go func() {
|
|
log.Info().Str("instance", instanceName).Msg("Starting background backup/snapshot polling")
|
|
// Create a separate context with longer timeout for backup operations
|
|
backupCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
|
defer cancel()
|
|
|
|
// Poll backup tasks
|
|
m.pollBackupTasks(backupCtx, instanceName, client)
|
|
|
|
// Poll storage backups
|
|
m.pollStorageBackups(backupCtx, instanceName, client)
|
|
|
|
// Poll guest snapshots
|
|
m.pollGuestSnapshots(backupCtx, instanceName, client)
|
|
|
|
log.Info().Str("instance", instanceName).Msg("Completed background backup/snapshot polling")
|
|
}()
|
|
}
|
|
}
|
|
}
|
|
|
|
// pollVMs polls VMs from a PVE instance
|
|
func (m *Monitor) pollVMs(ctx context.Context, instanceName string, client PVEClientInterface) {
|
|
log.Debug().Str("instance", instanceName).Msg("Polling VMs")
|
|
|
|
// Get all nodes first
|
|
nodes, err := client.GetNodes(ctx)
|
|
if err != nil {
|
|
monErr := errors.WrapConnectionError("get_nodes_for_vms", instanceName, err)
|
|
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get nodes for VM polling")
|
|
return
|
|
}
|
|
|
|
var allVMs []models.VM
|
|
for _, node := range nodes {
|
|
vms, err := client.GetVMs(ctx, node.Node)
|
|
if err != nil {
|
|
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_vms", instanceName, err).WithNode(node.Node)
|
|
log.Error().Err(monErr).Str("node", node.Node).Msg("Failed to get VMs")
|
|
continue
|
|
}
|
|
|
|
for _, vm := range vms {
|
|
// Skip templates if configured
|
|
if vm.Template == 1 {
|
|
continue
|
|
}
|
|
|
|
// Parse tags
|
|
var tags []string
|
|
if vm.Tags != "" {
|
|
tags = strings.Split(vm.Tags, ";")
|
|
}
|
|
|
|
// Calculate I/O rates
|
|
guestID := fmt.Sprintf("%s-%s-%d", instanceName, node.Node, vm.VMID)
|
|
currentMetrics := IOMetrics{
|
|
DiskRead: int64(vm.DiskRead),
|
|
DiskWrite: int64(vm.DiskWrite),
|
|
NetworkIn: int64(vm.NetIn),
|
|
NetworkOut: int64(vm.NetOut),
|
|
Timestamp: time.Now(),
|
|
}
|
|
diskReadRate, diskWriteRate, netInRate, netOutRate := m.rateTracker.CalculateRates(guestID, currentMetrics)
|
|
|
|
modelVM := models.VM{
|
|
ID: guestID,
|
|
VMID: vm.VMID,
|
|
Name: vm.Name,
|
|
Node: node.Node,
|
|
Instance: instanceName,
|
|
Status: vm.Status,
|
|
Type: "qemu",
|
|
CPU: safeFloat(vm.CPU), // Already in percentage
|
|
CPUs: vm.CPUs,
|
|
Memory: models.Memory{
|
|
Total: int64(vm.MaxMem),
|
|
Used: int64(vm.Mem),
|
|
Free: int64(vm.MaxMem - vm.Mem),
|
|
Usage: safePercentage(float64(vm.Mem), float64(vm.MaxMem)),
|
|
},
|
|
Disk: models.Disk{
|
|
Total: int64(vm.MaxDisk),
|
|
Used: int64(vm.Disk),
|
|
Free: int64(vm.MaxDisk - vm.Disk),
|
|
Usage: safePercentage(float64(vm.Disk), float64(vm.MaxDisk)),
|
|
},
|
|
NetworkIn: int64(netInRate),
|
|
NetworkOut: int64(netOutRate),
|
|
DiskRead: int64(diskReadRate),
|
|
DiskWrite: int64(diskWriteRate),
|
|
Uptime: int64(vm.Uptime),
|
|
Template: vm.Template == 1,
|
|
Tags: tags,
|
|
Lock: vm.Lock,
|
|
LastSeen: time.Now(),
|
|
}
|
|
allVMs = append(allVMs, modelVM)
|
|
|
|
// Record metrics history
|
|
now := time.Now()
|
|
m.metricsHistory.AddGuestMetric(modelVM.ID, "cpu", modelVM.CPU*100, now)
|
|
m.metricsHistory.AddGuestMetric(modelVM.ID, "memory", modelVM.Memory.Usage, now)
|
|
m.metricsHistory.AddGuestMetric(modelVM.ID, "disk", modelVM.Disk.Usage, now)
|
|
m.metricsHistory.AddGuestMetric(modelVM.ID, "diskread", float64(modelVM.DiskRead), now)
|
|
m.metricsHistory.AddGuestMetric(modelVM.ID, "diskwrite", float64(modelVM.DiskWrite), now)
|
|
m.metricsHistory.AddGuestMetric(modelVM.ID, "netin", float64(modelVM.NetworkIn), now)
|
|
m.metricsHistory.AddGuestMetric(modelVM.ID, "netout", float64(modelVM.NetworkOut), now)
|
|
|
|
// Check thresholds for alerts
|
|
m.alertManager.CheckGuest(modelVM, instanceName)
|
|
}
|
|
}
|
|
|
|
m.state.UpdateVMsForInstance(instanceName, allVMs)
|
|
}
|
|
|
|
// pollContainers polls containers from a PVE instance
|
|
func (m *Monitor) pollContainers(ctx context.Context, instanceName string, client PVEClientInterface) {
|
|
log.Debug().Str("instance", instanceName).Msg("Polling containers")
|
|
|
|
// Get all nodes first
|
|
nodes, err := client.GetNodes(ctx)
|
|
if err != nil {
|
|
monErr := errors.WrapConnectionError("get_nodes_for_containers", instanceName, err)
|
|
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get nodes for container polling")
|
|
return
|
|
}
|
|
|
|
var allContainers []models.Container
|
|
for _, node := range nodes {
|
|
containers, err := client.GetContainers(ctx, node.Node)
|
|
if err != nil {
|
|
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_containers", instanceName, err).WithNode(node.Node)
|
|
log.Error().Err(monErr).Str("node", node.Node).Msg("Failed to get containers")
|
|
continue
|
|
}
|
|
|
|
for _, ct := range containers {
|
|
// Skip templates if configured
|
|
if ct.Template == 1 {
|
|
continue
|
|
}
|
|
|
|
// Parse tags
|
|
var tags []string
|
|
if ct.Tags != "" {
|
|
tags = strings.Split(ct.Tags, ";")
|
|
}
|
|
|
|
// Calculate I/O rates
|
|
guestID := fmt.Sprintf("%s-%s-%d", instanceName, node.Node, ct.VMID)
|
|
currentMetrics := IOMetrics{
|
|
DiskRead: int64(ct.DiskRead),
|
|
DiskWrite: int64(ct.DiskWrite),
|
|
NetworkIn: int64(ct.NetIn),
|
|
NetworkOut: int64(ct.NetOut),
|
|
Timestamp: time.Now(),
|
|
}
|
|
diskReadRate, diskWriteRate, netInRate, netOutRate := m.rateTracker.CalculateRates(guestID, currentMetrics)
|
|
|
|
// Convert -1 to nil for I/O metrics when VM is not running
|
|
// We'll use -1 to indicate "no data" which will be converted to null for the frontend
|
|
modelCT := models.Container{
|
|
ID: guestID,
|
|
VMID: ct.VMID,
|
|
Name: ct.Name,
|
|
Node: node.Node,
|
|
Instance: instanceName,
|
|
Status: ct.Status,
|
|
Type: "lxc",
|
|
CPU: safeFloat(ct.CPU), // Already in percentage
|
|
CPUs: ct.CPUs,
|
|
Memory: models.Memory{
|
|
Total: int64(ct.MaxMem),
|
|
Used: int64(ct.Mem),
|
|
Free: int64(ct.MaxMem - ct.Mem),
|
|
Usage: safePercentage(float64(ct.Mem), float64(ct.MaxMem)),
|
|
},
|
|
Disk: models.Disk{
|
|
Total: int64(ct.MaxDisk),
|
|
Used: int64(ct.Disk),
|
|
Free: int64(ct.MaxDisk - ct.Disk),
|
|
Usage: safePercentage(float64(ct.Disk), float64(ct.MaxDisk)),
|
|
},
|
|
NetworkIn: int64(netInRate),
|
|
NetworkOut: int64(netOutRate),
|
|
DiskRead: int64(diskReadRate),
|
|
DiskWrite: int64(diskWriteRate),
|
|
Uptime: int64(ct.Uptime),
|
|
Template: ct.Template == 1,
|
|
Tags: tags,
|
|
Lock: ct.Lock,
|
|
LastSeen: time.Now(),
|
|
}
|
|
allContainers = append(allContainers, modelCT)
|
|
|
|
// Record metrics history
|
|
now := time.Now()
|
|
m.metricsHistory.AddGuestMetric(modelCT.ID, "cpu", modelCT.CPU*100, now)
|
|
m.metricsHistory.AddGuestMetric(modelCT.ID, "memory", modelCT.Memory.Usage, now)
|
|
m.metricsHistory.AddGuestMetric(modelCT.ID, "disk", modelCT.Disk.Usage, now)
|
|
m.metricsHistory.AddGuestMetric(modelCT.ID, "diskread", float64(modelCT.DiskRead), now)
|
|
m.metricsHistory.AddGuestMetric(modelCT.ID, "diskwrite", float64(modelCT.DiskWrite), now)
|
|
m.metricsHistory.AddGuestMetric(modelCT.ID, "netin", float64(modelCT.NetworkIn), now)
|
|
m.metricsHistory.AddGuestMetric(modelCT.ID, "netout", float64(modelCT.NetworkOut), now)
|
|
|
|
// Check thresholds for alerts
|
|
m.alertManager.CheckGuest(modelCT, instanceName)
|
|
}
|
|
}
|
|
|
|
m.state.UpdateContainersForInstance(instanceName, allContainers)
|
|
}
|
|
|
|
// pollStorage polls storage from a PVE instance
|
|
func (m *Monitor) pollStorage(ctx context.Context, instanceName string, client PVEClientInterface) {
|
|
log.Debug().Str("instance", instanceName).Msg("Polling storage")
|
|
|
|
// Get all nodes first
|
|
nodes, err := client.GetNodes(ctx)
|
|
if err != nil {
|
|
monErr := errors.WrapConnectionError("get_nodes_for_storage", instanceName, err)
|
|
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get nodes for storage polling")
|
|
return
|
|
}
|
|
|
|
// Get cluster storage configuration for shared/enabled status
|
|
clusterStorages, err := client.GetAllStorage(ctx)
|
|
if err != nil {
|
|
monErr := errors.WrapAPIError("get_cluster_storage", instanceName, err, 0)
|
|
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get cluster storage")
|
|
}
|
|
|
|
// Create a map for quick lookup of cluster storage config
|
|
clusterStorageMap := make(map[string]proxmox.Storage)
|
|
for _, cs := range clusterStorages {
|
|
clusterStorageMap[cs.Storage] = cs
|
|
}
|
|
|
|
var allStorage []models.Storage
|
|
seenStorage := make(map[string]bool)
|
|
|
|
// Get storage from each node (this includes capacity info)
|
|
for _, node := range nodes {
|
|
nodeStorage, err := client.GetStorage(ctx, node.Node)
|
|
if err != nil {
|
|
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_node_storage", instanceName, err).WithNode(node.Node)
|
|
log.Error().Err(monErr).Str("node", node.Node).Msg("Failed to get node storage")
|
|
continue
|
|
}
|
|
|
|
for _, storage := range nodeStorage {
|
|
// Get cluster config for this storage
|
|
clusterConfig, hasClusterConfig := clusterStorageMap[storage.Storage]
|
|
|
|
// Determine if shared
|
|
shared := hasClusterConfig && clusterConfig.Shared == 1
|
|
|
|
// For shared storage, only include it once
|
|
storageKey := storage.Storage
|
|
if shared {
|
|
if seenStorage[storageKey] {
|
|
continue
|
|
}
|
|
seenStorage[storageKey] = true
|
|
}
|
|
|
|
// Use appropriate node name
|
|
nodeID := node.Node
|
|
if shared {
|
|
nodeID = "shared"
|
|
}
|
|
|
|
modelStorage := models.Storage{
|
|
ID: fmt.Sprintf("%s-%s-%s", instanceName, nodeID, storage.Storage),
|
|
Name: storage.Storage,
|
|
Node: nodeID,
|
|
Instance: instanceName,
|
|
Type: storage.Type,
|
|
Status: "available",
|
|
Total: int64(storage.Total),
|
|
Used: int64(storage.Used),
|
|
Free: int64(storage.Available),
|
|
Usage: 0,
|
|
Content: sortContent(storage.Content),
|
|
Shared: shared,
|
|
Enabled: true,
|
|
Active: true,
|
|
}
|
|
|
|
// Override with cluster config if available
|
|
if hasClusterConfig {
|
|
// Sort content values for consistent display
|
|
if clusterConfig.Content != "" {
|
|
contentParts := strings.Split(clusterConfig.Content, ",")
|
|
sort.Strings(contentParts)
|
|
modelStorage.Content = strings.Join(contentParts, ",")
|
|
} else {
|
|
modelStorage.Content = clusterConfig.Content
|
|
}
|
|
modelStorage.Enabled = clusterConfig.Enabled == 1
|
|
modelStorage.Active = clusterConfig.Active == 1
|
|
}
|
|
|
|
// Calculate usage percentage
|
|
if modelStorage.Total > 0 {
|
|
modelStorage.Usage = safePercentage(float64(modelStorage.Used), float64(modelStorage.Total))
|
|
}
|
|
|
|
// Determine status based on active/enabled flags
|
|
if storage.Active == 1 || modelStorage.Active {
|
|
modelStorage.Status = "available"
|
|
} else if modelStorage.Enabled {
|
|
modelStorage.Status = "inactive"
|
|
} else {
|
|
modelStorage.Status = "disabled"
|
|
}
|
|
|
|
allStorage = append(allStorage, modelStorage)
|
|
|
|
// Record storage metrics history
|
|
now := time.Now()
|
|
m.metricsHistory.AddStorageMetric(modelStorage.ID, "usage", modelStorage.Usage, now)
|
|
m.metricsHistory.AddStorageMetric(modelStorage.ID, "used", float64(modelStorage.Used), now)
|
|
m.metricsHistory.AddStorageMetric(modelStorage.ID, "total", float64(modelStorage.Total), now)
|
|
m.metricsHistory.AddStorageMetric(modelStorage.ID, "avail", float64(modelStorage.Free), now)
|
|
|
|
// Check thresholds for alerts
|
|
m.alertManager.CheckStorage(modelStorage)
|
|
}
|
|
}
|
|
|
|
// Update storage for this instance only
|
|
var instanceStorage []models.Storage
|
|
for _, st := range allStorage {
|
|
st.Instance = instanceName
|
|
instanceStorage = append(instanceStorage, st)
|
|
}
|
|
m.state.UpdateStorageForInstance(instanceName, instanceStorage)
|
|
}
|
|
|
|
// pollBackupTasks polls backup tasks from a PVE instance
|
|
func (m *Monitor) pollBackupTasks(ctx context.Context, instanceName string, client PVEClientInterface) {
|
|
log.Debug().Str("instance", instanceName).Msg("Polling backup tasks")
|
|
|
|
tasks, err := client.GetBackupTasks(ctx)
|
|
if err != nil {
|
|
monErr := errors.WrapAPIError("get_backup_tasks", instanceName, err, 0)
|
|
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get backup tasks")
|
|
return
|
|
}
|
|
|
|
var backupTasks []models.BackupTask
|
|
for _, task := range tasks {
|
|
// Extract VMID from task ID (format: "UPID:node:pid:starttime:type:vmid:user@realm:")
|
|
vmid := 0
|
|
if task.ID != "" {
|
|
if vmidInt, err := strconv.Atoi(task.ID); err == nil {
|
|
vmid = vmidInt
|
|
}
|
|
}
|
|
|
|
taskID := fmt.Sprintf("%s-%s", instanceName, task.UPID)
|
|
|
|
backupTask := models.BackupTask{
|
|
ID: taskID,
|
|
Node: task.Node,
|
|
Type: task.Type,
|
|
VMID: vmid,
|
|
Status: task.Status,
|
|
StartTime: time.Unix(task.StartTime, 0),
|
|
}
|
|
|
|
if task.EndTime > 0 {
|
|
backupTask.EndTime = time.Unix(task.EndTime, 0)
|
|
}
|
|
|
|
backupTasks = append(backupTasks, backupTask)
|
|
}
|
|
|
|
// Update state with new backup tasks for this instance
|
|
m.state.UpdateBackupTasksForInstance(instanceName, backupTasks)
|
|
}
|
|
|
|
// pollPBSInstance polls a single PBS instance
|
|
func (m *Monitor) pollPBSInstance(ctx context.Context, instanceName string, client *pbs.Client) {
|
|
// Check if context is cancelled
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Debug().Str("instance", instanceName).Msg("Polling cancelled")
|
|
return
|
|
default:
|
|
}
|
|
|
|
log.Debug().Str("instance", instanceName).Msg("Polling PBS instance")
|
|
|
|
// Get instance config
|
|
var instanceCfg *config.PBSInstance
|
|
for _, cfg := range m.config.PBSInstances {
|
|
if cfg.Name == instanceName {
|
|
instanceCfg = &cfg
|
|
break
|
|
}
|
|
}
|
|
if instanceCfg == nil {
|
|
return
|
|
}
|
|
|
|
// Get version/status
|
|
version, err := client.GetVersion(ctx)
|
|
if err != nil {
|
|
monErr := errors.WrapConnectionError("get_pbs_version", instanceName, err)
|
|
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get PBS version")
|
|
m.state.SetConnectionHealth("pbs-"+instanceName, false)
|
|
|
|
// Track auth failure if it's an authentication error
|
|
if errors.IsAuthError(err) {
|
|
m.recordAuthFailure(instanceName, "pbs")
|
|
}
|
|
return
|
|
}
|
|
|
|
// Reset auth failures on successful connection
|
|
m.resetAuthFailures(instanceName, "pbs")
|
|
m.state.SetConnectionHealth("pbs-"+instanceName, true)
|
|
|
|
pbsInst := models.PBSInstance{
|
|
ID: "pbs-" + instanceName,
|
|
Name: instanceName,
|
|
Host: instanceCfg.Host,
|
|
Status: "online",
|
|
Version: version.Version,
|
|
ConnectionHealth: "healthy",
|
|
LastSeen: time.Now(),
|
|
}
|
|
|
|
// Poll datastores if enabled
|
|
if instanceCfg.MonitorDatastores {
|
|
datastores, err := client.GetDatastores(ctx)
|
|
if err != nil {
|
|
monErr := errors.WrapAPIError("get_datastores", instanceName, err, 0)
|
|
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get datastores")
|
|
} else {
|
|
for _, ds := range datastores {
|
|
modelDS := models.PBSDatastore{
|
|
Name: ds.Store,
|
|
Total: ds.Total,
|
|
Used: ds.Used,
|
|
Free: ds.Avail,
|
|
Usage: safePercentage(float64(ds.Used), float64(ds.Total)),
|
|
Status: "available",
|
|
}
|
|
|
|
// Discover namespaces for this datastore
|
|
namespaces, err := client.ListNamespaces(ctx, ds.Store, "", 0)
|
|
if err != nil {
|
|
log.Warn().Err(err).
|
|
Str("instance", instanceName).
|
|
Str("datastore", ds.Store).
|
|
Msg("Failed to list namespaces")
|
|
} else {
|
|
// Convert PBS namespaces to model namespaces
|
|
for _, ns := range namespaces {
|
|
nsPath := ns.NS
|
|
if nsPath == "" {
|
|
nsPath = ns.Path
|
|
}
|
|
if nsPath == "" {
|
|
nsPath = ns.Name
|
|
}
|
|
|
|
modelNS := models.PBSNamespace{
|
|
Path: nsPath,
|
|
Parent: ns.Parent,
|
|
Depth: strings.Count(nsPath, "/"),
|
|
}
|
|
modelDS.Namespaces = append(modelDS.Namespaces, modelNS)
|
|
}
|
|
|
|
// Always include root namespace
|
|
hasRoot := false
|
|
for _, ns := range modelDS.Namespaces {
|
|
if ns.Path == "" {
|
|
hasRoot = true
|
|
break
|
|
}
|
|
}
|
|
if !hasRoot {
|
|
modelDS.Namespaces = append([]models.PBSNamespace{{Path: "", Depth: 0}}, modelDS.Namespaces...)
|
|
}
|
|
}
|
|
|
|
pbsInst.Datastores = append(pbsInst.Datastores, modelDS)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update state
|
|
m.state.UpdatePBSInstances([]models.PBSInstance{pbsInst})
|
|
|
|
// Poll backups if enabled
|
|
if instanceCfg.MonitorBackups {
|
|
log.Info().
|
|
Str("instance", instanceName).
|
|
Int("datastores", len(pbsInst.Datastores)).
|
|
Msg("Polling PBS backups")
|
|
m.pollPBSBackups(ctx, instanceName, client, pbsInst.Datastores)
|
|
} else {
|
|
log.Debug().
|
|
Str("instance", instanceName).
|
|
Msg("PBS backup monitoring disabled")
|
|
}
|
|
}
|
|
|
|
// GetState returns the current state
|
|
func (m *Monitor) GetState() models.StateSnapshot {
|
|
return m.state.GetSnapshot()
|
|
}
|
|
|
|
// GetStartTime returns the monitor start time
|
|
func (m *Monitor) GetStartTime() time.Time {
|
|
return m.startTime
|
|
}
|
|
|
|
|
|
// GetGuestMetrics returns historical metrics for a guest
|
|
func (m *Monitor) GetGuestMetrics(guestID string, duration time.Duration) map[string][]MetricPoint {
|
|
return m.metricsHistory.GetAllGuestMetrics(guestID, duration)
|
|
}
|
|
|
|
// GetNodeMetrics returns historical metrics for a node
|
|
func (m *Monitor) GetNodeMetrics(nodeID string, metricType string, duration time.Duration) []MetricPoint {
|
|
return m.metricsHistory.GetNodeMetrics(nodeID, metricType, duration)
|
|
}
|
|
|
|
// GetStorageMetrics returns historical metrics for storage
|
|
func (m *Monitor) GetStorageMetrics(storageID string, duration time.Duration) map[string][]MetricPoint {
|
|
return m.metricsHistory.GetAllStorageMetrics(storageID, duration)
|
|
}
|
|
|
|
// GetAlertManager returns the alert manager
|
|
func (m *Monitor) GetAlertManager() *alerts.Manager {
|
|
return m.alertManager
|
|
}
|
|
|
|
// GetNotificationManager returns the notification manager
|
|
func (m *Monitor) GetNotificationManager() *notifications.NotificationManager {
|
|
return m.notificationMgr
|
|
}
|
|
|
|
// GetConfigPersistence returns the config persistence manager
|
|
func (m *Monitor) GetConfigPersistence() *config.ConfigPersistence {
|
|
return m.configPersist
|
|
}
|
|
|
|
// pollStorageBackups polls backup files from storage
|
|
func (m *Monitor) pollStorageBackups(ctx context.Context, instanceName string, client PVEClientInterface) {
|
|
log.Debug().Str("instance", instanceName).Msg("Polling storage backups")
|
|
|
|
// Get all nodes
|
|
nodes, err := client.GetNodes(ctx)
|
|
if err != nil {
|
|
monErr := errors.WrapConnectionError("get_nodes_for_backups", instanceName, err)
|
|
log.Error().Err(monErr).Str("instance", instanceName).Msg("Failed to get nodes for backup polling")
|
|
return
|
|
}
|
|
|
|
var allBackups []models.StorageBackup
|
|
seenVolids := make(map[string]bool) // Track seen volume IDs to avoid duplicates
|
|
|
|
// For each node, get storage and check content
|
|
for _, node := range nodes {
|
|
if node.Status != "online" {
|
|
continue
|
|
}
|
|
|
|
// Get storage for this node
|
|
storages, err := client.GetStorage(ctx, node.Node)
|
|
if err != nil {
|
|
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_storage_for_backups", instanceName, err).WithNode(node.Node)
|
|
log.Error().Err(monErr).Str("node", node.Node).Msg("Failed to get storage")
|
|
continue
|
|
}
|
|
|
|
// For each storage that can contain backups
|
|
for _, storage := range storages {
|
|
// Check if storage supports backup content
|
|
if !strings.Contains(storage.Content, "backup") {
|
|
continue
|
|
}
|
|
|
|
// Get storage content
|
|
contents, err := client.GetStorageContent(ctx, node.Node, storage.Storage)
|
|
if err != nil {
|
|
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_storage_content", instanceName, err).WithNode(node.Node)
|
|
log.Debug().Err(monErr).
|
|
Str("node", node.Node).
|
|
Str("storage", storage.Storage).
|
|
Msg("Failed to get storage content")
|
|
continue
|
|
}
|
|
|
|
// Convert to models
|
|
for _, content := range contents {
|
|
// Skip if we've already seen this backup (shared storage duplicate)
|
|
if seenVolids[content.Volid] {
|
|
continue
|
|
}
|
|
seenVolids[content.Volid] = true
|
|
|
|
// Determine type from volid
|
|
backupType := "unknown"
|
|
if strings.Contains(content.Volid, "qemu") {
|
|
backupType = "qemu"
|
|
} else if strings.Contains(content.Volid, "lxc") {
|
|
backupType = "lxc"
|
|
}
|
|
|
|
// For shared storage (like PBS), use the storage name as node
|
|
// to avoid confusion about which node the backup is on
|
|
backupNode := node.Node
|
|
isPBSStorage := strings.HasPrefix(storage.Storage, "pbs-") || storage.Type == "pbs"
|
|
if isPBSStorage || storage.Shared == 1 {
|
|
backupNode = storage.Storage // Use storage name for shared storage
|
|
}
|
|
|
|
// Check verification status for PBS backups
|
|
verified := false
|
|
verificationInfo := ""
|
|
if isPBSStorage {
|
|
// Check if verified flag is set
|
|
if content.Verified > 0 {
|
|
verified = true
|
|
}
|
|
// Also check verification map if available
|
|
if content.Verification != nil {
|
|
if state, ok := content.Verification["state"].(string); ok {
|
|
verified = (state == "ok")
|
|
verificationInfo = state
|
|
}
|
|
}
|
|
}
|
|
|
|
backup := models.StorageBackup{
|
|
ID: fmt.Sprintf("%s-%s", instanceName, content.Volid),
|
|
Storage: storage.Storage,
|
|
Node: backupNode,
|
|
Type: backupType,
|
|
VMID: content.VMID,
|
|
Time: time.Unix(content.CTime, 0),
|
|
CTime: content.CTime,
|
|
Size: int64(content.Size),
|
|
Format: content.Format,
|
|
Notes: content.Notes,
|
|
Protected: content.Protected > 0,
|
|
Volid: content.Volid,
|
|
IsPBS: isPBSStorage,
|
|
Verified: verified,
|
|
Verification: verificationInfo,
|
|
}
|
|
|
|
allBackups = append(allBackups, backup)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update state with storage backups for this instance
|
|
m.state.UpdateStorageBackupsForInstance(instanceName, allBackups)
|
|
|
|
log.Debug().
|
|
Str("instance", instanceName).
|
|
Int("count", len(allBackups)).
|
|
Msg("Storage backups polled")
|
|
}
|
|
|
|
// pollGuestSnapshots polls snapshots for all VMs and containers
|
|
func (m *Monitor) pollGuestSnapshots(ctx context.Context, instanceName string, client PVEClientInterface) {
|
|
log.Debug().Str("instance", instanceName).Msg("Polling guest snapshots")
|
|
|
|
// Create a separate context with a longer timeout for snapshot queries
|
|
// Snapshot queries can be slow, especially with many VMs/containers
|
|
snapshotCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
|
defer cancel()
|
|
|
|
// Get current VMs and containers from state
|
|
m.mu.RLock()
|
|
vms := append([]models.VM{}, m.state.VMs...)
|
|
containers := append([]models.Container{}, m.state.Containers...)
|
|
m.mu.RUnlock()
|
|
|
|
var allSnapshots []models.GuestSnapshot
|
|
|
|
// Poll VM snapshots
|
|
for _, vm := range vms {
|
|
// Skip templates
|
|
if vm.Template {
|
|
continue
|
|
}
|
|
|
|
snapshots, err := client.GetVMSnapshots(snapshotCtx, vm.Node, vm.VMID)
|
|
if err != nil {
|
|
// This is common for VMs without snapshots, so use debug level
|
|
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_vm_snapshots", instanceName, err).WithNode(vm.Node)
|
|
log.Debug().
|
|
Err(monErr).
|
|
Str("node", vm.Node).
|
|
Int("vmid", vm.VMID).
|
|
Msg("Failed to get VM snapshots")
|
|
continue
|
|
}
|
|
|
|
for _, snap := range snapshots {
|
|
snapshot := models.GuestSnapshot{
|
|
ID: fmt.Sprintf("%s-%s-%d-%s", instanceName, vm.Node, vm.VMID, snap.Name),
|
|
Name: snap.Name,
|
|
Node: vm.Node,
|
|
Type: "qemu",
|
|
VMID: vm.VMID,
|
|
Time: time.Unix(snap.SnapTime, 0),
|
|
Description: snap.Description,
|
|
Parent: snap.Parent,
|
|
VMState: false, // TODO: Add VM state support if needed
|
|
}
|
|
|
|
allSnapshots = append(allSnapshots, snapshot)
|
|
}
|
|
}
|
|
|
|
// Poll container snapshots
|
|
for _, ct := range containers {
|
|
// Skip templates
|
|
if ct.Template {
|
|
continue
|
|
}
|
|
|
|
snapshots, err := client.GetContainerSnapshots(snapshotCtx, ct.Node, ct.VMID)
|
|
if err != nil {
|
|
// This is common for containers without snapshots, so use debug level
|
|
monErr := errors.NewMonitorError(errors.ErrorTypeAPI, "get_container_snapshots", instanceName, err).WithNode(ct.Node)
|
|
log.Debug().
|
|
Err(monErr).
|
|
Str("node", ct.Node).
|
|
Int("vmid", ct.VMID).
|
|
Msg("Failed to get container snapshots")
|
|
continue
|
|
}
|
|
|
|
for _, snap := range snapshots {
|
|
snapshot := models.GuestSnapshot{
|
|
ID: fmt.Sprintf("%s-%s-%d-%s", instanceName, ct.Node, ct.VMID, snap.Name),
|
|
Name: snap.Name,
|
|
Node: ct.Node,
|
|
Type: "lxc",
|
|
VMID: ct.VMID,
|
|
Time: time.Unix(snap.SnapTime, 0),
|
|
Description: snap.Description,
|
|
Parent: snap.Parent,
|
|
VMState: false,
|
|
}
|
|
|
|
allSnapshots = append(allSnapshots, snapshot)
|
|
}
|
|
}
|
|
|
|
// Update state with guest snapshots for this instance
|
|
m.state.UpdateGuestSnapshotsForInstance(instanceName, allSnapshots)
|
|
|
|
log.Debug().
|
|
Str("instance", instanceName).
|
|
Int("count", len(allSnapshots)).
|
|
Msg("Guest snapshots polled")
|
|
}
|
|
|
|
// Stop gracefully stops the monitor
|
|
func (m *Monitor) Stop() {
|
|
log.Info().Msg("Stopping monitor")
|
|
|
|
// Stop the alert manager to save history
|
|
if m.alertManager != nil {
|
|
m.alertManager.Stop()
|
|
}
|
|
|
|
// Stop notification manager
|
|
if m.notificationMgr != nil {
|
|
m.notificationMgr.Stop()
|
|
}
|
|
|
|
log.Info().Msg("Monitor stopped")
|
|
}
|
|
|
|
// recordAuthFailure records an authentication failure for a node
|
|
func (m *Monitor) recordAuthFailure(instanceName string, nodeType string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
nodeID := instanceName
|
|
if nodeType != "" {
|
|
nodeID = nodeType + "-" + instanceName
|
|
}
|
|
|
|
// Increment failure count
|
|
m.authFailures[nodeID]++
|
|
m.lastAuthAttempt[nodeID] = time.Now()
|
|
|
|
log.Warn().
|
|
Str("node", nodeID).
|
|
Int("failures", m.authFailures[nodeID]).
|
|
Msg("Authentication failure recorded")
|
|
|
|
// If we've exceeded the threshold, remove the node
|
|
const maxAuthFailures = 5
|
|
if m.authFailures[nodeID] >= maxAuthFailures {
|
|
log.Error().
|
|
Str("node", nodeID).
|
|
Int("failures", m.authFailures[nodeID]).
|
|
Msg("Maximum authentication failures reached, removing node from state")
|
|
|
|
// Remove from state based on type
|
|
if nodeType == "pve" {
|
|
m.removeFailedPVENode(instanceName)
|
|
} else if nodeType == "pbs" {
|
|
m.removeFailedPBSNode(instanceName)
|
|
}
|
|
|
|
// Reset the counter since we've removed the node
|
|
delete(m.authFailures, nodeID)
|
|
delete(m.lastAuthAttempt, nodeID)
|
|
}
|
|
}
|
|
|
|
// resetAuthFailures resets the failure count for a node after successful auth
|
|
func (m *Monitor) resetAuthFailures(instanceName string, nodeType string) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
nodeID := instanceName
|
|
if nodeType != "" {
|
|
nodeID = nodeType + "-" + instanceName
|
|
}
|
|
|
|
if count, exists := m.authFailures[nodeID]; exists && count > 0 {
|
|
log.Info().
|
|
Str("node", nodeID).
|
|
Int("previousFailures", count).
|
|
Msg("Authentication succeeded, resetting failure count")
|
|
|
|
delete(m.authFailures, nodeID)
|
|
delete(m.lastAuthAttempt, nodeID)
|
|
}
|
|
}
|
|
|
|
// removeFailedPVENode updates a PVE node to show failed authentication status
|
|
func (m *Monitor) removeFailedPVENode(instanceName string) {
|
|
// Create a failed node entry to show in UI with error status
|
|
failedNode := models.Node{
|
|
ID: instanceName + "-failed",
|
|
Name: instanceName,
|
|
Instance: instanceName,
|
|
Status: "offline",
|
|
Type: "node",
|
|
ConnectionHealth: "error",
|
|
LastSeen: time.Now(),
|
|
// Set other fields to zero values to indicate no data
|
|
CPU: 0,
|
|
Memory: models.Memory{},
|
|
Disk: models.Disk{},
|
|
}
|
|
|
|
// Update with just the failed node
|
|
m.state.UpdateNodesForInstance(instanceName, []models.Node{failedNode})
|
|
|
|
// Remove all other resources associated with this instance
|
|
m.state.UpdateVMsForInstance(instanceName, []models.VM{})
|
|
m.state.UpdateContainersForInstance(instanceName, []models.Container{})
|
|
m.state.UpdateStorageForInstance(instanceName, []models.Storage{})
|
|
m.state.UpdateBackupTasksForInstance(instanceName, []models.BackupTask{})
|
|
m.state.UpdateStorageBackupsForInstance(instanceName, []models.StorageBackup{})
|
|
m.state.UpdateGuestSnapshotsForInstance(instanceName, []models.GuestSnapshot{})
|
|
|
|
// Set connection health to false
|
|
m.state.SetConnectionHealth(instanceName, false)
|
|
}
|
|
|
|
// removeFailedPBSNode removes a PBS node and all its resources from state
|
|
func (m *Monitor) removeFailedPBSNode(instanceName string) {
|
|
// Remove PBS instance by passing empty array
|
|
currentInstances := m.state.PBSInstances
|
|
var updatedInstances []models.PBSInstance
|
|
for _, inst := range currentInstances {
|
|
if inst.Name != instanceName {
|
|
updatedInstances = append(updatedInstances, inst)
|
|
}
|
|
}
|
|
m.state.UpdatePBSInstances(updatedInstances)
|
|
|
|
// Remove PBS backups
|
|
m.state.UpdatePBSBackups(instanceName, []models.PBSBackup{})
|
|
|
|
// Set connection health to false
|
|
m.state.SetConnectionHealth("pbs-"+instanceName, false)
|
|
}
|
|
|
|
|
|
// pollPBSBackups fetches all backups from PBS datastores
|
|
func (m *Monitor) pollPBSBackups(ctx context.Context, instanceName string, client *pbs.Client, datastores []models.PBSDatastore) {
|
|
log.Debug().Str("instance", instanceName).Msg("Polling PBS backups")
|
|
|
|
var allBackups []models.PBSBackup
|
|
|
|
// Process each datastore
|
|
for _, ds := range datastores {
|
|
// Get namespace paths
|
|
namespacePaths := make([]string, 0, len(ds.Namespaces))
|
|
for _, ns := range ds.Namespaces {
|
|
namespacePaths = append(namespacePaths, ns.Path)
|
|
}
|
|
|
|
log.Info().
|
|
Str("instance", instanceName).
|
|
Str("datastore", ds.Name).
|
|
Int("namespaces", len(namespacePaths)).
|
|
Strs("namespace_paths", namespacePaths).
|
|
Msg("Processing datastore namespaces")
|
|
|
|
// Fetch backups from all namespaces concurrently
|
|
backupsMap, err := client.ListAllBackups(ctx, ds.Name, namespacePaths)
|
|
if err != nil {
|
|
log.Error().Err(err).
|
|
Str("instance", instanceName).
|
|
Str("datastore", ds.Name).
|
|
Msg("Failed to fetch PBS backups")
|
|
continue
|
|
}
|
|
|
|
// Convert PBS backups to model backups
|
|
for namespace, snapshots := range backupsMap {
|
|
for _, snapshot := range snapshots {
|
|
backupTime := time.Unix(snapshot.BackupTime, 0)
|
|
|
|
// Generate unique ID
|
|
id := fmt.Sprintf("pbs-%s-%s-%s-%s-%s-%d",
|
|
instanceName, ds.Name, namespace,
|
|
snapshot.BackupType, snapshot.BackupID,
|
|
snapshot.BackupTime)
|
|
|
|
// Extract file names from files (which can be strings or objects)
|
|
var fileNames []string
|
|
for _, file := range snapshot.Files {
|
|
switch f := file.(type) {
|
|
case string:
|
|
fileNames = append(fileNames, f)
|
|
case map[string]interface{}:
|
|
if filename, ok := f["filename"].(string); ok {
|
|
fileNames = append(fileNames, filename)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Extract verification status
|
|
verified := false
|
|
if snapshot.Verification != nil {
|
|
switch v := snapshot.Verification.(type) {
|
|
case string:
|
|
verified = v == "ok"
|
|
case map[string]interface{}:
|
|
if state, ok := v["state"].(string); ok {
|
|
verified = state == "ok"
|
|
}
|
|
}
|
|
|
|
// Debug log verification data
|
|
log.Debug().
|
|
Str("vmid", snapshot.BackupID).
|
|
Int64("time", snapshot.BackupTime).
|
|
Interface("verification", snapshot.Verification).
|
|
Bool("verified", verified).
|
|
Msg("PBS backup verification status")
|
|
}
|
|
|
|
backup := models.PBSBackup{
|
|
ID: id,
|
|
Instance: instanceName,
|
|
Datastore: ds.Name,
|
|
Namespace: namespace,
|
|
BackupType: snapshot.BackupType,
|
|
VMID: snapshot.BackupID,
|
|
BackupTime: backupTime,
|
|
Size: snapshot.Size,
|
|
Protected: snapshot.Protected,
|
|
Verified: verified,
|
|
Comment: snapshot.Comment,
|
|
Files: fileNames,
|
|
}
|
|
|
|
allBackups = append(allBackups, backup)
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Info().
|
|
Str("instance", instanceName).
|
|
Int("count", len(allBackups)).
|
|
Msg("PBS backups fetched")
|
|
|
|
// Update state
|
|
m.state.UpdatePBSBackups(instanceName, allBackups)
|
|
}
|