mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-04-28 03:20:11 +00:00
Migrate guest alerts across node moves
This commit is contained in:
parent
387a79bc2d
commit
65118b4fc4
4 changed files with 348 additions and 3 deletions
|
|
@ -5341,6 +5341,36 @@ func BuildGuestKey(instance, node string, vmid int) string {
|
|||
return fmt.Sprintf("%s:%s:%d", instance, node, vmid)
|
||||
}
|
||||
|
||||
func isGuestMetricResourceType(resourceType string) bool {
|
||||
switch strings.TrimSpace(resourceType) {
|
||||
case "VM", "Container":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func parseGuestAlertVMID(resourceID string) (int, bool) {
|
||||
resourceID = strings.TrimSpace(resourceID)
|
||||
if resourceID == "" {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
if idx := strings.LastIndex(resourceID, ":"); idx >= 0 && idx < len(resourceID)-1 {
|
||||
if vmid, err := strconv.Atoi(resourceID[idx+1:]); err == nil {
|
||||
return vmid, true
|
||||
}
|
||||
}
|
||||
|
||||
if idx := strings.LastIndex(resourceID, "-"); idx >= 0 && idx < len(resourceID)-1 {
|
||||
if vmid, err := strconv.Atoi(resourceID[idx+1:]); err == nil {
|
||||
return vmid, true
|
||||
}
|
||||
}
|
||||
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func storageOverrideLookupKeys(storage models.Storage) []string {
|
||||
keys := make([]string, 0, 1+len(storage.NodeIDs)+len(storage.Nodes))
|
||||
seen := make(map[string]struct{})
|
||||
|
|
@ -6688,6 +6718,14 @@ func (m *Manager) checkMetric(resourceID, resourceName, node, instance, resource
|
|||
defer m.mu.Unlock()
|
||||
|
||||
existingAlert, exists := m.activeAlerts[alertID]
|
||||
migratedAlertIdentity := false
|
||||
if !exists && isGuestMetricResourceType(resourceType) {
|
||||
if migrated := m.migrateGuestMetricAlertNoLock(alertID, resourceID, resourceName, node, instance, metricType); migrated != nil {
|
||||
existingAlert = migrated
|
||||
exists = true
|
||||
migratedAlertIdentity = true
|
||||
}
|
||||
}
|
||||
monitorOnly := opts != nil && opts.MonitorOnly
|
||||
|
||||
// Check for suppression
|
||||
|
|
@ -7018,6 +7056,100 @@ func (m *Manager) checkMetric(resourceID, resourceName, node, instance, resource
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if migratedAlertIdentity {
|
||||
m.saveActiveAlertsAsync("guest-alert-node-move")
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) migrateGuestMetricAlertNoLock(alertID, resourceID, resourceName, node, instance, metricType string) *Alert {
|
||||
currentVMID, ok := parseGuestAlertVMID(resourceID)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
normalizedInstance := strings.TrimSpace(instance)
|
||||
for existingID, alert := range m.activeAlerts {
|
||||
if existingID == alertID || alert == nil {
|
||||
continue
|
||||
}
|
||||
if alert.Type != metricType {
|
||||
continue
|
||||
}
|
||||
if strings.TrimSpace(alert.Instance) != normalizedInstance {
|
||||
continue
|
||||
}
|
||||
|
||||
alertVMID, ok := parseGuestAlertVMID(alert.ResourceID)
|
||||
if !ok || alertVMID != currentVMID {
|
||||
continue
|
||||
}
|
||||
|
||||
oldAlertID := alert.ID
|
||||
delete(m.activeAlerts, existingID)
|
||||
|
||||
alert.ID = alertID
|
||||
alert.ResourceID = resourceID
|
||||
alert.ResourceName = resourceName
|
||||
alert.Node = node
|
||||
alert.Instance = instance
|
||||
if dn := m.resolveNodeDisplayName(instance, node); dn != "" {
|
||||
alert.NodeDisplayName = dn
|
||||
} else {
|
||||
alert.NodeDisplayName = ""
|
||||
}
|
||||
|
||||
m.activeAlerts[alertID] = alert
|
||||
m.moveAlertTrackingStateNoLock(oldAlertID, alertID, alert)
|
||||
|
||||
log.Info().
|
||||
Str("oldAlertID", oldAlertID).
|
||||
Str("newAlertID", alertID).
|
||||
Str("resource", resourceName).
|
||||
Str("metric", metricType).
|
||||
Msg("Migrated guest alert to current node identity")
|
||||
|
||||
return alert
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) moveAlertTrackingStateNoLock(oldAlertID, newAlertID string, alert *Alert) {
|
||||
if oldAlertID == "" || newAlertID == "" || oldAlertID == newAlertID || alert == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if pending, exists := m.pendingAlerts[oldAlertID]; exists {
|
||||
delete(m.pendingAlerts, oldAlertID)
|
||||
m.pendingAlerts[newAlertID] = pending
|
||||
}
|
||||
if recent, exists := m.recentAlerts[oldAlertID]; exists {
|
||||
delete(m.recentAlerts, oldAlertID)
|
||||
m.recentAlerts[newAlertID] = recent
|
||||
}
|
||||
if until, exists := m.suppressedUntil[oldAlertID]; exists {
|
||||
delete(m.suppressedUntil, oldAlertID)
|
||||
m.suppressedUntil[newAlertID] = until
|
||||
}
|
||||
if rateLimit, exists := m.alertRateLimit[oldAlertID]; exists {
|
||||
delete(m.alertRateLimit, oldAlertID)
|
||||
m.alertRateLimit[newAlertID] = rateLimit
|
||||
}
|
||||
if ack, exists := m.ackState[oldAlertID]; exists {
|
||||
delete(m.ackState, oldAlertID)
|
||||
m.ackState[newAlertID] = ack
|
||||
}
|
||||
if flapping, exists := m.flappingHistory[oldAlertID]; exists {
|
||||
delete(m.flappingHistory, oldAlertID)
|
||||
m.flappingHistory[newAlertID] = flapping
|
||||
}
|
||||
if active, exists := m.flappingActive[oldAlertID]; exists {
|
||||
delete(m.flappingActive, oldAlertID)
|
||||
m.flappingActive[newAlertID] = active
|
||||
}
|
||||
|
||||
m.historyManager.MigrateActiveAlert(oldAlertID, *alert)
|
||||
}
|
||||
|
||||
func sanitizeAlertKey(label string) string {
|
||||
|
|
|
|||
|
|
@ -148,6 +148,22 @@ func (hm *HistoryManager) UpdateAlertLastSeen(alertID string, lastSeen time.Time
|
|||
}
|
||||
}
|
||||
|
||||
// MigrateActiveAlert updates the most recent history entry for an in-flight
|
||||
// alert when its runtime identity changes (for example, a VM moves nodes and
|
||||
// its node-scoped alert ID changes). This preserves a single history record so
|
||||
// the eventual resolution updates the correct entry.
|
||||
func (hm *HistoryManager) MigrateActiveAlert(oldAlertID string, updated Alert) {
|
||||
hm.mu.Lock()
|
||||
defer hm.mu.Unlock()
|
||||
|
||||
for i := len(hm.history) - 1; i >= 0; i-- {
|
||||
if hm.history[i].Alert.ID == oldAlertID {
|
||||
hm.history[i].Alert = *updated.Clone()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetHistory returns alert history within the specified time range
|
||||
func (hm *HistoryManager) GetHistory(since time.Time, limit int) []Alert {
|
||||
hm.mu.RLock()
|
||||
|
|
|
|||
|
|
@ -157,6 +157,55 @@ func TestOnAlert(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMigrateActiveAlert(t *testing.T) {
|
||||
hm := newTestHistoryManager(t)
|
||||
|
||||
start := time.Now().Add(-10 * time.Minute)
|
||||
hm.history = []HistoryEntry{
|
||||
{
|
||||
Alert: Alert{
|
||||
ID: "older-alert",
|
||||
ResourceID: "pve1:node0:100",
|
||||
Node: "node0",
|
||||
Instance: "pve1",
|
||||
StartTime: start.Add(-5 * time.Minute),
|
||||
},
|
||||
Timestamp: start.Add(-5 * time.Minute),
|
||||
},
|
||||
{
|
||||
Alert: Alert{
|
||||
ID: "pve1:node1:100-cpu",
|
||||
ResourceID: "pve1:node1:100",
|
||||
Node: "node1",
|
||||
Instance: "pve1",
|
||||
StartTime: start,
|
||||
},
|
||||
Timestamp: start,
|
||||
},
|
||||
}
|
||||
|
||||
hm.MigrateActiveAlert("pve1:node1:100-cpu", Alert{
|
||||
ID: "pve1:node2:100-cpu",
|
||||
ResourceID: "pve1:node2:100",
|
||||
Node: "node2",
|
||||
Instance: "pve1",
|
||||
StartTime: start,
|
||||
})
|
||||
|
||||
if hm.history[0].Alert.ID != "older-alert" {
|
||||
t.Fatalf("expected unrelated history to remain unchanged, got %q", hm.history[0].Alert.ID)
|
||||
}
|
||||
if hm.history[1].Alert.ID != "pve1:node2:100-cpu" {
|
||||
t.Fatalf("expected most recent alert ID to be migrated, got %q", hm.history[1].Alert.ID)
|
||||
}
|
||||
if hm.history[1].Alert.ResourceID != "pve1:node2:100" {
|
||||
t.Fatalf("expected history resource ID to migrate, got %q", hm.history[1].Alert.ResourceID)
|
||||
}
|
||||
if hm.history[1].Alert.Node != "node2" {
|
||||
t.Fatalf("expected history node to migrate, got %q", hm.history[1].Alert.Node)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetHistory_WithLimit(t *testing.T) {
|
||||
// t.Parallel()
|
||||
|
||||
|
|
|
|||
|
|
@ -219,6 +219,154 @@ func TestReevaluateActiveAlertsStillAboveThreshold(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCheckMetricMigratesGuestAlertAcrossNodeMove(t *testing.T) {
|
||||
manager := newTestManager(t)
|
||||
manager.ClearActiveAlerts()
|
||||
|
||||
oldResourceID := BuildGuestKey("pve1", "node1", 101)
|
||||
newResourceID := BuildGuestKey("pve1", "node2", 101)
|
||||
oldAlertID := oldResourceID + "-cpu"
|
||||
newAlertID := newResourceID + "-cpu"
|
||||
start := time.Now().Add(-10 * time.Minute)
|
||||
ackTime := start.Add(1 * time.Minute)
|
||||
|
||||
alert := &Alert{
|
||||
ID: oldAlertID,
|
||||
Type: "cpu",
|
||||
Level: AlertLevelWarning,
|
||||
ResourceID: oldResourceID,
|
||||
ResourceName: "vm101",
|
||||
Node: "node1",
|
||||
Instance: "pve1",
|
||||
Message: "VM cpu at 95%",
|
||||
Value: 95,
|
||||
Threshold: 80,
|
||||
StartTime: start,
|
||||
LastSeen: start.Add(5 * time.Minute),
|
||||
Acknowledged: true,
|
||||
AckUser: "tester",
|
||||
AckTime: &ackTime,
|
||||
}
|
||||
|
||||
manager.mu.Lock()
|
||||
manager.activeAlerts[oldAlertID] = alert
|
||||
manager.recentAlerts[oldAlertID] = alert
|
||||
manager.suppressedUntil[oldAlertID] = start.Add(2 * time.Minute)
|
||||
manager.alertRateLimit[oldAlertID] = []time.Time{start.Add(30 * time.Second)}
|
||||
manager.ackState[oldAlertID] = ackRecord{
|
||||
acknowledged: true,
|
||||
user: "tester",
|
||||
time: ackTime,
|
||||
}
|
||||
manager.flappingHistory[oldAlertID] = []time.Time{start.Add(45 * time.Second)}
|
||||
manager.flappingActive[oldAlertID] = true
|
||||
manager.historyManager.AddAlert(*alert)
|
||||
manager.mu.Unlock()
|
||||
|
||||
manager.checkMetric(newResourceID, "vm101", "node2", "pve1", "VM", "cpu", 92, &HysteresisThreshold{Trigger: 80, Clear: 70}, nil)
|
||||
|
||||
manager.mu.RLock()
|
||||
migrated, exists := manager.activeAlerts[newAlertID]
|
||||
_, oldExists := manager.activeAlerts[oldAlertID]
|
||||
_, oldAckExists := manager.ackState[oldAlertID]
|
||||
newAck, newAckExists := manager.ackState[newAlertID]
|
||||
_, oldSuppressed := manager.suppressedUntil[oldAlertID]
|
||||
_, newSuppressed := manager.suppressedUntil[newAlertID]
|
||||
_, oldRateLimit := manager.alertRateLimit[oldAlertID]
|
||||
_, newRateLimit := manager.alertRateLimit[newAlertID]
|
||||
_, oldFlapping := manager.flappingHistory[oldAlertID]
|
||||
_, newFlapping := manager.flappingHistory[newAlertID]
|
||||
manager.mu.RUnlock()
|
||||
|
||||
if oldExists {
|
||||
t.Fatal("expected old node-scoped alert to be removed")
|
||||
}
|
||||
if !exists {
|
||||
t.Fatal("expected alert to migrate to the current node-scoped ID")
|
||||
}
|
||||
if migrated.Node != "node2" || migrated.ResourceID != newResourceID {
|
||||
t.Fatalf("expected migrated alert to target node2, got node=%q resource=%q", migrated.Node, migrated.ResourceID)
|
||||
}
|
||||
if !migrated.Acknowledged || migrated.AckUser != "tester" {
|
||||
t.Fatalf("expected migrated alert acknowledgment to be preserved, got %#v", migrated)
|
||||
}
|
||||
if migrated.StartTime != start {
|
||||
t.Fatalf("expected migrated alert start time to be preserved, got %v want %v", migrated.StartTime, start)
|
||||
}
|
||||
if oldAckExists || !newAckExists || !newAck.acknowledged {
|
||||
t.Fatalf("expected ack state to move to new alert ID, old=%v new=%v", oldAckExists, newAckExists)
|
||||
}
|
||||
if oldSuppressed || !newSuppressed {
|
||||
t.Fatalf("expected suppression window to move to new alert ID, old=%v new=%v", oldSuppressed, newSuppressed)
|
||||
}
|
||||
if oldRateLimit || !newRateLimit {
|
||||
t.Fatalf("expected rate limit state to move to new alert ID, old=%v new=%v", oldRateLimit, newRateLimit)
|
||||
}
|
||||
if oldFlapping || !newFlapping {
|
||||
t.Fatalf("expected flapping state to move to new alert ID, old=%v new=%v", oldFlapping, newFlapping)
|
||||
}
|
||||
|
||||
history := manager.historyManager.GetAllHistory(1)
|
||||
if len(history) != 1 || history[0].ID != newAlertID {
|
||||
t.Fatalf("expected history entry to follow migrated alert ID, got %#v", history)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckMetricResolvesGuestAlertAfterNodeMove(t *testing.T) {
|
||||
manager := newTestManager(t)
|
||||
manager.ClearActiveAlerts()
|
||||
|
||||
oldResourceID := BuildGuestKey("pve1", "node1", 202)
|
||||
newResourceID := BuildGuestKey("pve1", "node2", 202)
|
||||
oldAlertID := oldResourceID + "-cpu"
|
||||
newAlertID := newResourceID + "-cpu"
|
||||
start := time.Now().Add(-15 * time.Minute)
|
||||
|
||||
alert := &Alert{
|
||||
ID: oldAlertID,
|
||||
Type: "cpu",
|
||||
Level: AlertLevelWarning,
|
||||
ResourceID: oldResourceID,
|
||||
ResourceName: "vm202",
|
||||
Node: "node1",
|
||||
Instance: "pve1",
|
||||
Message: "VM cpu at 95%",
|
||||
Value: 95,
|
||||
Threshold: 80,
|
||||
StartTime: start,
|
||||
LastSeen: start.Add(10 * time.Minute),
|
||||
}
|
||||
|
||||
manager.mu.Lock()
|
||||
manager.activeAlerts[oldAlertID] = alert
|
||||
manager.historyManager.AddAlert(*alert)
|
||||
manager.mu.Unlock()
|
||||
|
||||
manager.checkMetric(newResourceID, "vm202", "node2", "pve1", "VM", "cpu", 5, &HysteresisThreshold{Trigger: 80, Clear: 70}, nil)
|
||||
|
||||
manager.mu.RLock()
|
||||
_, oldExists := manager.activeAlerts[oldAlertID]
|
||||
_, newExists := manager.activeAlerts[newAlertID]
|
||||
manager.mu.RUnlock()
|
||||
|
||||
if oldExists || newExists {
|
||||
t.Fatalf("expected migrated guest alert to resolve after node move, old=%v new=%v", oldExists, newExists)
|
||||
}
|
||||
|
||||
resolved := manager.GetResolvedAlert(newAlertID)
|
||||
if resolved == nil || resolved.Alert == nil {
|
||||
t.Fatal("expected resolved alert to be recorded under the current node-scoped ID")
|
||||
}
|
||||
if resolved.Alert.ResourceID != newResourceID {
|
||||
t.Fatalf("expected resolved alert resource ID %q, got %q", newResourceID, resolved.Alert.ResourceID)
|
||||
}
|
||||
|
||||
history := manager.historyManager.GetAllHistory(1)
|
||||
if len(history) != 1 || history[0].ID != newAlertID {
|
||||
t.Fatalf("expected history entry to resolve under migrated alert ID, got %#v", history)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReevaluateActiveStorageAlertsOnThresholdChange(t *testing.T) {
|
||||
manager := NewManager()
|
||||
|
||||
|
|
@ -345,9 +493,9 @@ func TestReevaluateActiveAlertsGuestUsesStableClusterOverrideAcrossNodeMove(t *t
|
|||
GuestDefaults: ThresholdConfig{
|
||||
Memory: &HysteresisThreshold{Trigger: 85, Clear: 80},
|
||||
},
|
||||
NodeDefaults: ThresholdConfig{},
|
||||
StorageDefault: HysteresisThreshold{Trigger: 85, Clear: 80},
|
||||
Overrides: map[string]ThresholdConfig{"pve1-101": {Memory: &HysteresisThreshold{Trigger: 95, Clear: 90}}},
|
||||
NodeDefaults: ThresholdConfig{},
|
||||
StorageDefault: HysteresisThreshold{Trigger: 85, Clear: 80},
|
||||
Overrides: map[string]ThresholdConfig{"pve1-101": {Memory: &HysteresisThreshold{Trigger: 95, Clear: 90}}},
|
||||
}
|
||||
manager.UpdateConfig(config)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue