mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-05-09 19:32:24 +00:00
Improve PBS backup polling performance (#502)
This commit is contained in:
parent
f61cbb3508
commit
3fcdba014a
2 changed files with 106 additions and 16 deletions
|
|
@ -71,6 +71,7 @@ type Monitor struct {
|
|||
lastAuthAttempt map[string]time.Time // Track last auth attempt time
|
||||
lastClusterCheck map[string]time.Time // Track last cluster check for standalone nodes
|
||||
persistence *config.ConfigPersistence // Add persistence for saving updated configs
|
||||
pbsBackupPollers map[string]bool // Track PBS backup polling goroutines per instance
|
||||
runtimeCtx context.Context // Context used while monitor is running
|
||||
wsHub *websocket.Hub // Hub used for broadcasting state
|
||||
}
|
||||
|
|
@ -431,6 +432,7 @@ func New(cfg *config.Config) (*Monitor, error) {
|
|||
lastAuthAttempt: make(map[string]time.Time),
|
||||
lastClusterCheck: make(map[string]time.Time),
|
||||
persistence: config.NewConfigPersistence(cfg.DataPath),
|
||||
pbsBackupPollers: make(map[string]bool),
|
||||
}
|
||||
|
||||
// Load saved configurations
|
||||
|
|
@ -3258,11 +3260,57 @@ func (m *Monitor) pollPBSInstance(ctx context.Context, instanceName string, clie
|
|||
|
||||
// Poll backups if enabled
|
||||
if instanceCfg.MonitorBackups {
|
||||
log.Info().
|
||||
Str("instance", instanceName).
|
||||
Int("datastores", len(pbsInst.Datastores)).
|
||||
Msg("Polling PBS backups")
|
||||
m.pollPBSBackups(ctx, instanceName, client, pbsInst.Datastores)
|
||||
if len(pbsInst.Datastores) == 0 {
|
||||
log.Debug().
|
||||
Str("instance", instanceName).
|
||||
Msg("No PBS datastores available for backup polling")
|
||||
} else {
|
||||
backupCycles := 10
|
||||
if m.config.BackupPollingCycles > 0 {
|
||||
backupCycles = m.config.BackupPollingCycles
|
||||
}
|
||||
|
||||
shouldPoll := m.pollCounter%int64(backupCycles) == 0 || m.pollCounter == 1
|
||||
if !shouldPoll {
|
||||
log.Debug().
|
||||
Str("instance", instanceName).
|
||||
Int64("cycle", m.pollCounter).
|
||||
Int("backupCycles", backupCycles).
|
||||
Msg("Skipping PBS backup polling this cycle")
|
||||
} else {
|
||||
m.mu.Lock()
|
||||
if m.pbsBackupPollers[instanceName] {
|
||||
m.mu.Unlock()
|
||||
log.Debug().
|
||||
Str("instance", instanceName).
|
||||
Msg("PBS backup polling already in progress")
|
||||
} else {
|
||||
m.pbsBackupPollers[instanceName] = true
|
||||
m.mu.Unlock()
|
||||
|
||||
datastoreSnapshot := make([]models.PBSDatastore, len(pbsInst.Datastores))
|
||||
copy(datastoreSnapshot, pbsInst.Datastores)
|
||||
|
||||
go func(ds []models.PBSDatastore) {
|
||||
defer func() {
|
||||
m.mu.Lock()
|
||||
delete(m.pbsBackupPollers, instanceName)
|
||||
m.mu.Unlock()
|
||||
}()
|
||||
|
||||
log.Info().
|
||||
Str("instance", instanceName).
|
||||
Int("datastores", len(ds)).
|
||||
Msg("Starting background PBS backup polling")
|
||||
|
||||
backupCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
m.pollPBSBackups(backupCtx, instanceName, client, ds)
|
||||
}(datastoreSnapshot)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Debug().
|
||||
Str("instance", instanceName).
|
||||
|
|
|
|||
|
|
@ -822,22 +822,64 @@ func (c *Client) ListAllBackups(ctx context.Context, datastore string, namespace
|
|||
Int("groups", len(groups)).
|
||||
Msg("Found backup groups")
|
||||
|
||||
var allSnapshots []BackupSnapshot
|
||||
var (
|
||||
allSnapshots []BackupSnapshot
|
||||
snapshotsMu sync.Mutex
|
||||
)
|
||||
|
||||
// For each group, get snapshots
|
||||
groupSem := make(chan struct{}, 5)
|
||||
var groupWG sync.WaitGroup
|
||||
|
||||
// For each group, get snapshots concurrently with a small worker limit to avoid hammering PBS
|
||||
for _, group := range groups {
|
||||
snapshots, err := c.ListBackupSnapshots(ctx, datastore, namespace, group.BackupType, group.BackupID)
|
||||
if err != nil {
|
||||
log.Error().
|
||||
if ctx.Err() != nil {
|
||||
log.Debug().
|
||||
Str("datastore", datastore).
|
||||
Str("namespace", namespace).
|
||||
Str("type", group.BackupType).
|
||||
Str("id", group.BackupID).
|
||||
Err(err).
|
||||
Msg("Failed to list snapshots")
|
||||
continue
|
||||
Msg("Context cancelled before completing snapshot fetch")
|
||||
break
|
||||
}
|
||||
allSnapshots = append(allSnapshots, snapshots...)
|
||||
|
||||
group := group
|
||||
|
||||
groupWG.Add(1)
|
||||
go func() {
|
||||
defer groupWG.Done()
|
||||
|
||||
select {
|
||||
case groupSem <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
defer func() { <-groupSem }()
|
||||
|
||||
snapshots, err := c.ListBackupSnapshots(ctx, datastore, namespace, group.BackupType, group.BackupID)
|
||||
if err != nil {
|
||||
log.Error().
|
||||
Str("datastore", datastore).
|
||||
Str("namespace", namespace).
|
||||
Str("type", group.BackupType).
|
||||
Str("id", group.BackupID).
|
||||
Err(err).
|
||||
Msg("Failed to list snapshots")
|
||||
return
|
||||
}
|
||||
|
||||
if len(snapshots) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
snapshotsMu.Lock()
|
||||
allSnapshots = append(allSnapshots, snapshots...)
|
||||
snapshotsMu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
groupWG.Wait()
|
||||
|
||||
if ctx.Err() != nil {
|
||||
resultCh <- namespaceResult{namespace: namespace, err: ctx.Err()}
|
||||
return
|
||||
}
|
||||
|
||||
resultCh <- namespaceResult{
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue