Hydrate shared Ceph storage usage before alerts

This commit is contained in:
rcourtman 2026-04-05 19:48:27 +01:00
parent d2f8800fc3
commit 1430b52b97
3 changed files with 243 additions and 27 deletions

View file

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"slices"
"strings"
"time"
@ -22,6 +23,101 @@ func isCephStorageType(storageType string) bool {
}
}
func cephPollContext(ctx context.Context) (context.Context, context.CancelFunc) {
if deadline, ok := ctx.Deadline(); ok && time.Until(deadline) <= 15*time.Second {
return ctx, func() {}
}
return context.WithTimeout(ctx, 15*time.Second)
}
func fetchCephClusterData(ctx context.Context, instanceName string, client PVEClientInterface) (*proxmox.CephStatus, *proxmox.CephDF, error) {
cephCtx, cancel := cephPollContext(ctx)
defer cancel()
status, err := client.GetCephStatus(cephCtx)
if err != nil {
log.Debug().Err(err).Str("instance", instanceName).Msg("Ceph status unavailable - preserving previous Ceph state")
return nil, nil, err
}
if status == nil {
return nil, nil, nil
}
df, err := client.GetCephDF(cephCtx)
if err != nil {
log.Debug().Err(err).Str("instance", instanceName).Msg("Ceph DF unavailable - continuing with status-only data")
}
return status, df, nil
}
func normalizeCephPoolKey(value string) string {
return strings.ToLower(strings.TrimSpace(value))
}
func cephPoolLookupCandidates(storage models.Storage) []string {
candidates := make([]string, 0, 2)
if pool := normalizeCephPoolKey(storage.Pool); pool != "" {
candidates = append(candidates, pool)
}
if name := normalizeCephPoolKey(storage.Name); name != "" {
candidates = append(candidates, name)
}
return slices.Compact(candidates)
}
func hydrateCephStorageUsageFromDF(storage []models.Storage, df *proxmox.CephDF) bool {
if len(storage) == 0 || df == nil || len(df.Data.Pools) == 0 {
return false
}
poolsByName := make(map[string]proxmox.CephDFPool, len(df.Data.Pools))
for _, pool := range df.Data.Pools {
key := normalizeCephPoolKey(pool.Name)
if key == "" {
continue
}
poolsByName[key] = pool
}
updated := false
for idx := range storage {
if !isCephStorageType(storage[idx].Type) {
continue
}
var pool proxmox.CephDFPool
found := false
for _, candidate := range cephPoolLookupCandidates(storage[idx]) {
match, ok := poolsByName[candidate]
if !ok {
continue
}
pool = match
found = true
break
}
if !found {
continue
}
used := int64(pool.Stats.BytesUsed)
free := int64(pool.Stats.MaxAvail)
total := used + free
if total <= 0 {
continue
}
storage[idx].Used = used
storage[idx].Free = free
storage[idx].Total = total
storage[idx].Usage = safePercentage(float64(used), float64(total))
updated = true
}
return updated
}
// pollCephCluster gathers Ceph cluster information when Ceph-backed storage is detected.
func (m *Monitor) pollCephCluster(ctx context.Context, instanceName string, client PVEClientInterface, cephDetected bool) {
if !cephDetected {
@ -30,29 +126,16 @@ func (m *Monitor) pollCephCluster(ctx context.Context, instanceName string, clie
return
}
cephCtx := ctx
if deadline, ok := ctx.Deadline(); !ok || time.Until(deadline) > 15*time.Second {
var cancel context.CancelFunc
cephCtx, cancel = context.WithTimeout(ctx, 15*time.Second)
defer cancel()
}
status, err := client.GetCephStatus(cephCtx)
status, df, err := fetchCephClusterData(ctx, instanceName, client)
if err != nil {
log.Debug().Err(err).Str("instance", instanceName).Msg("Ceph status unavailable preserving previous Ceph state")
return
}
if status == nil {
log.Debug().Str("instance", instanceName).Msg("Ceph status response empty clearing cached Ceph state")
log.Debug().Str("instance", instanceName).Msg("Ceph status response empty - clearing cached Ceph state")
m.state.UpdateCephClustersForInstance(instanceName, []models.CephCluster{})
return
}
df, err := client.GetCephDF(cephCtx)
if err != nil {
log.Debug().Err(err).Str("instance", instanceName).Msg("Ceph DF unavailable continuing with status-only data")
}
cluster := buildCephClusterModel(instanceName, status, df)
if cluster.ID == "" {
// Ensure the cluster has a stable identifier; fall back to instance name.

View file

@ -1965,6 +1965,30 @@ func (m *Monitor) pollStorageWithNodes(ctx context.Context, instanceName string,
}
}
if !cephDetected {
for _, storage := range allStorage {
if isCephStorageType(storage.Type) {
cephDetected = true
break
}
}
}
var cephStatus *proxmox.CephStatus
var cephDF *proxmox.CephDF
cephFetchAttempted := false
cephFetchFailed := false
if (instanceCfg == nil || !instanceCfg.DisableCeph) && cephDetected {
cephFetchAttempted = true
var err error
cephStatus, cephDF, err = fetchCephClusterData(ctx, instanceName, client)
if err != nil {
cephFetchFailed = true
} else if cephStatus != nil {
hydrateCephStorageUsageFromDF(allStorage, cephDF)
}
}
// Record metrics and check alerts for all storage devices
for _, storage := range allStorage {
if m.metricsHistory != nil {
@ -1988,21 +2012,25 @@ func (m *Monitor) pollStorageWithNodes(ctx context.Context, instanceName string,
}
}
if !cephDetected {
for _, storage := range allStorage {
if isCephStorageType(storage.Type) {
cephDetected = true
break
}
}
}
// Update state with all storage
m.state.UpdateStorageForInstance(storageInstanceName, allStorage)
// Poll Ceph cluster data after refreshing storage information
if instanceCfg == nil || !instanceCfg.DisableCeph {
m.pollCephCluster(ctx, instanceName, client, cephDetected)
switch {
case !cephDetected:
m.state.UpdateCephClustersForInstance(instanceName, []models.CephCluster{})
case cephFetchAttempted && cephFetchFailed:
// Preserve previous Ceph state when the refresh fails.
case cephFetchAttempted && cephStatus == nil:
m.state.UpdateCephClustersForInstance(instanceName, []models.CephCluster{})
default:
cluster := buildCephClusterModel(instanceName, cephStatus, cephDF)
if cluster.ID == "" {
cluster.ID = instanceName
}
m.state.UpdateCephClustersForInstance(instanceName, []models.CephCluster{cluster})
}
}
duration := time.Since(startTime)

View file

@ -17,6 +17,8 @@ type fakeStorageClient struct {
allStorage []proxmox.Storage
storageByNode map[string][]proxmox.Storage
zfsPoolsByNode map[string][]proxmox.ZFSPoolInfo
cephStatus *proxmox.CephStatus
cephDF *proxmox.CephDF
}
func (f *fakeStorageClient) GetNodes(ctx context.Context) ([]proxmox.Node, error) {
@ -137,11 +139,11 @@ func (f *fakeStorageClient) GetDisks(ctx context.Context, node string) ([]proxmo
}
func (f *fakeStorageClient) GetCephStatus(ctx context.Context) (*proxmox.CephStatus, error) {
return nil, nil
return f.cephStatus, nil
}
func (f *fakeStorageClient) GetCephDF(ctx context.Context) (*proxmox.CephDF, error) {
return nil, nil
return f.cephDF, nil
}
func (f *fakeStorageClient) GetNodePendingUpdates(ctx context.Context, node string) ([]proxmox.AptPackage, error) {
@ -436,6 +438,109 @@ func TestPollStorageWithNodesOptimizedSynthesizesSharedClusterOnlyStorage(t *tes
}
}
func TestPollStorageWithNodesOptimizedHydratesSharedCephStorageFromDF(t *testing.T) {
t.Setenv("PULSE_DATA_DIR", t.TempDir())
monitor := &Monitor{
state: &models.State{},
metricsHistory: NewMetricsHistory(16, time.Hour),
alertManager: alerts.NewManager(),
}
t.Cleanup(func() {
monitor.alertManager.Stop()
})
cfg := monitor.alertManager.GetConfig()
cfg.MinimumDelta = 0
if cfg.TimeThresholds == nil {
cfg.TimeThresholds = make(map[string]int)
}
cfg.TimeThresholds["storage"] = 0
cfg.StorageDefault = alerts.HysteresisThreshold{Trigger: 10, Clear: 5}
monitor.alertManager.UpdateConfig(cfg)
cephStorage := proxmox.Storage{
Storage: "ceph-shared",
Type: "rbd",
Content: "images,rootdir",
Nodes: "pve1,pve2",
Pool: "ceph-pool",
}
client := &fakeStorageClient{
allStorage: []proxmox.Storage{cephStorage},
storageByNode: map[string][]proxmox.Storage{
"pve1": {},
"pve2": {},
},
cephStatus: &proxmox.CephStatus{
FSID: "ceph-fsid",
PGMap: proxmox.CephPGMap{
BytesTotal: 1000,
BytesUsed: 200,
BytesAvail: 800,
},
},
cephDF: &proxmox.CephDF{
Data: proxmox.CephDFData{
Pools: []proxmox.CephDFPool{
{
ID: 1,
Name: "ceph-pool",
Stats: proxmox.CephDFPoolStat{
BytesUsed: 200,
MaxAvail: 800,
PercentUsed: 20,
},
},
},
},
},
}
nodes := []proxmox.Node{
{Node: "pve1", Status: "online"},
{Node: "pve2", Status: "online"},
}
monitor.pollStorageWithNodes(context.Background(), "inst1", client, nodes)
var shared *models.Storage
for i := range monitor.state.Storage {
if monitor.state.Storage[i].ID == "inst1-cluster-ceph-shared" {
shared = &monitor.state.Storage[i]
break
}
}
if shared == nil {
t.Fatal("expected synthesized shared Ceph storage entry")
}
if shared.Total != 1000 {
t.Fatalf("shared storage total = %d, want 1000", shared.Total)
}
if shared.Used != 200 {
t.Fatalf("shared storage used = %d, want 200", shared.Used)
}
if shared.Free != 800 {
t.Fatalf("shared storage free = %d, want 800", shared.Free)
}
if diff := math.Abs(shared.Usage - 20); diff > 0.001 {
t.Fatalf("shared storage usage = %.4f, want 20", shared.Usage)
}
alerts := monitor.alertManager.GetActiveAlerts()
found := false
for _, alert := range alerts {
if alert.ID == "inst1-cluster-ceph-shared-usage" {
found = true
break
}
}
if !found {
t.Fatal("expected shared Ceph storage usage alert to be active")
}
}
func TestPollStorageWithNodesOptimizedAttachesZFSPoolForDirStorageOnDatasetPath(t *testing.T) {
t.Setenv("PULSE_DATA_DIR", t.TempDir())