From 3fcdba014a792cc18a260a473815efbc390b0805 Mon Sep 17 00:00:00 2001 From: rcourtman Date: Fri, 3 Oct 2025 15:31:30 +0000 Subject: [PATCH] Improve PBS backup polling performance (#502) --- internal/monitoring/monitor.go | 58 +++++++++++++++++++++++++++--- pkg/pbs/client.go | 64 ++++++++++++++++++++++++++++------ 2 files changed, 106 insertions(+), 16 deletions(-) diff --git a/internal/monitoring/monitor.go b/internal/monitoring/monitor.go index 452d2992d..cd0086f92 100644 --- a/internal/monitoring/monitor.go +++ b/internal/monitoring/monitor.go @@ -71,6 +71,7 @@ type Monitor struct { lastAuthAttempt map[string]time.Time // Track last auth attempt time lastClusterCheck map[string]time.Time // Track last cluster check for standalone nodes persistence *config.ConfigPersistence // Add persistence for saving updated configs + pbsBackupPollers map[string]bool // Track PBS backup polling goroutines per instance runtimeCtx context.Context // Context used while monitor is running wsHub *websocket.Hub // Hub used for broadcasting state } @@ -431,6 +432,7 @@ func New(cfg *config.Config) (*Monitor, error) { lastAuthAttempt: make(map[string]time.Time), lastClusterCheck: make(map[string]time.Time), persistence: config.NewConfigPersistence(cfg.DataPath), + pbsBackupPollers: make(map[string]bool), } // Load saved configurations @@ -3258,11 +3260,57 @@ func (m *Monitor) pollPBSInstance(ctx context.Context, instanceName string, clie // Poll backups if enabled if instanceCfg.MonitorBackups { - log.Info(). - Str("instance", instanceName). - Int("datastores", len(pbsInst.Datastores)). - Msg("Polling PBS backups") - m.pollPBSBackups(ctx, instanceName, client, pbsInst.Datastores) + if len(pbsInst.Datastores) == 0 { + log.Debug(). + Str("instance", instanceName). + Msg("No PBS datastores available for backup polling") + } else { + backupCycles := 10 + if m.config.BackupPollingCycles > 0 { + backupCycles = m.config.BackupPollingCycles + } + + shouldPoll := m.pollCounter%int64(backupCycles) == 0 || m.pollCounter == 1 + if !shouldPoll { + log.Debug(). + Str("instance", instanceName). + Int64("cycle", m.pollCounter). + Int("backupCycles", backupCycles). + Msg("Skipping PBS backup polling this cycle") + } else { + m.mu.Lock() + if m.pbsBackupPollers[instanceName] { + m.mu.Unlock() + log.Debug(). + Str("instance", instanceName). + Msg("PBS backup polling already in progress") + } else { + m.pbsBackupPollers[instanceName] = true + m.mu.Unlock() + + datastoreSnapshot := make([]models.PBSDatastore, len(pbsInst.Datastores)) + copy(datastoreSnapshot, pbsInst.Datastores) + + go func(ds []models.PBSDatastore) { + defer func() { + m.mu.Lock() + delete(m.pbsBackupPollers, instanceName) + m.mu.Unlock() + }() + + log.Info(). + Str("instance", instanceName). + Int("datastores", len(ds)). + Msg("Starting background PBS backup polling") + + backupCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + m.pollPBSBackups(backupCtx, instanceName, client, ds) + }(datastoreSnapshot) + } + } + } } else { log.Debug(). Str("instance", instanceName). diff --git a/pkg/pbs/client.go b/pkg/pbs/client.go index c926f011d..4f3169bf5 100644 --- a/pkg/pbs/client.go +++ b/pkg/pbs/client.go @@ -822,22 +822,64 @@ func (c *Client) ListAllBackups(ctx context.Context, datastore string, namespace Int("groups", len(groups)). Msg("Found backup groups") - var allSnapshots []BackupSnapshot + var ( + allSnapshots []BackupSnapshot + snapshotsMu sync.Mutex + ) - // For each group, get snapshots + groupSem := make(chan struct{}, 5) + var groupWG sync.WaitGroup + + // For each group, get snapshots concurrently with a small worker limit to avoid hammering PBS for _, group := range groups { - snapshots, err := c.ListBackupSnapshots(ctx, datastore, namespace, group.BackupType, group.BackupID) - if err != nil { - log.Error(). + if ctx.Err() != nil { + log.Debug(). Str("datastore", datastore). Str("namespace", namespace). - Str("type", group.BackupType). - Str("id", group.BackupID). - Err(err). - Msg("Failed to list snapshots") - continue + Msg("Context cancelled before completing snapshot fetch") + break } - allSnapshots = append(allSnapshots, snapshots...) + + group := group + + groupWG.Add(1) + go func() { + defer groupWG.Done() + + select { + case groupSem <- struct{}{}: + case <-ctx.Done(): + return + } + defer func() { <-groupSem }() + + snapshots, err := c.ListBackupSnapshots(ctx, datastore, namespace, group.BackupType, group.BackupID) + if err != nil { + log.Error(). + Str("datastore", datastore). + Str("namespace", namespace). + Str("type", group.BackupType). + Str("id", group.BackupID). + Err(err). + Msg("Failed to list snapshots") + return + } + + if len(snapshots) == 0 { + return + } + + snapshotsMu.Lock() + allSnapshots = append(allSnapshots, snapshots...) + snapshotsMu.Unlock() + }() + } + + groupWG.Wait() + + if ctx.Err() != nil { + resultCh <- namespaceResult{namespace: namespace, err: ctx.Err()} + return } resultCh <- namespaceResult{