From 7ee11105f592b3254f02cf5227d0318cd2bd3cf3 Mon Sep 17 00:00:00 2001 From: rcourtman Date: Fri, 7 Nov 2025 08:33:09 +0000 Subject: [PATCH] Implement queue cancellation and atomic DB operations (P1 fixes) Queue cancellation mechanism: - Add CancelByAlertIDs method to mark queued notifications as cancelled when alerts resolve - Update CancelAlert to cancel queued notifications containing resolved alert IDs - Skip cancelled notifications in queue processor - Prevents resolved alerts from triggering notifications after they clear Atomic DB operations: - Add IncrementAttemptAndSetStatus to atomically update attempt counter and status - Replace separate IncrementAttempt + UpdateStatus calls with single atomic operation - Prevents orphaned queue entries when crashes occur between operations - Eliminates race condition where rows get stuck in "pending" or "sending" status These fixes ensure queued notifications are properly cancelled when alerts resolve and prevent database inconsistencies during crash scenarios. --- internal/notifications/notifications.go | 7 ++ internal/notifications/queue.go | 103 ++++++++++++++++++++++-- 2 files changed, 104 insertions(+), 6 deletions(-) diff --git a/internal/notifications/notifications.go b/internal/notifications/notifications.go index f4060494f..17f50d576 100644 --- a/internal/notifications/notifications.go +++ b/internal/notifications/notifications.go @@ -618,6 +618,13 @@ func (n *NotificationManager) CancelAlert(alertID string) { // Clean up cooldown record for resolved alert delete(n.lastNotified, alertID) + // Cancel any queued notifications containing this alert + if n.queue != nil { + if err := n.queue.CancelByAlertIDs([]string{alertID}); err != nil { + log.Error().Err(err).Str("alertID", alertID).Msg("Failed to cancel queued notifications") + } + } + log.Debug(). Str("alertID", alertID). Int("remaining", len(n.pendingAlerts)). diff --git a/internal/notifications/queue.go b/internal/notifications/queue.go index 047a89c2d..c2437a580 100644 --- a/internal/notifications/queue.go +++ b/internal/notifications/queue.go @@ -278,6 +278,25 @@ func (nq *NotificationQueue) IncrementAttempt(id string) error { return nil } +// IncrementAttemptAndSetStatus atomically increments attempt counter and sets status in a single operation +func (nq *NotificationQueue) IncrementAttemptAndSetStatus(id string, status NotificationQueueStatus) error { + nq.mu.Lock() + defer nq.mu.Unlock() + + query := ` + UPDATE notification_queue + SET attempts = attempts + 1, + status = ?, + last_attempt = ? + WHERE id = ? + ` + _, err := nq.db.Exec(query, status, time.Now().Unix(), id) + if err != nil { + return fmt.Errorf("failed to increment attempt and set status: %w", err) + } + return nil +} + // GetPending returns notifications ready for processing func (nq *NotificationQueue) GetPending(limit int) ([]*QueuedNotification, error) { nq.mu.RLock() @@ -555,15 +574,15 @@ func (nq *NotificationQueue) processBatch() { // processNotification processes a single notification func (nq *NotificationQueue) processNotification(notif *QueuedNotification) { - // Increment attempt counter once at the start of processing - if err := nq.IncrementAttempt(notif.ID); err != nil { - log.Error().Err(err).Str("id", notif.ID).Msg("Failed to increment attempt counter") + // Skip cancelled notifications + if notif.Status == QueueStatusCancelled { + log.Debug().Str("id", notif.ID).Msg("Skipping cancelled notification") return } - // Update status to sending (without incrementing attempts again) - if err := nq.UpdateStatus(notif.ID, QueueStatusSending, ""); err != nil { - log.Error().Err(err).Str("id", notif.ID).Msg("Failed to update notification status to sending") + // Atomically increment attempt counter and set status to sending + if err := nq.IncrementAttemptAndSetStatus(notif.ID, QueueStatusSending); err != nil { + log.Error().Err(err).Str("id", notif.ID).Msg("Failed to increment attempt and set status") return } @@ -709,3 +728,75 @@ func calculateBackoff(attempt int) time.Duration { } return backoff } + +// CancelByAlertIDs marks all queued notifications containing any of the given alert IDs as cancelled +func (nq *NotificationQueue) CancelByAlertIDs(alertIDs []string) error { + if len(alertIDs) == 0 { + return nil + } + + nq.mu.Lock() + defer nq.mu.Unlock() + + // Query pending/sending notifications + query := ` + SELECT id, alerts + FROM notification_queue + WHERE status IN ('pending', 'sending') + ` + + rows, err := nq.db.Query(query) + if err != nil { + return fmt.Errorf("failed to query notifications for cancellation: %w", err) + } + defer rows.Close() + + var toCancelIDs []string + alertIDSet := make(map[string]struct{}) + for _, id := range alertIDs { + alertIDSet[id] = struct{}{} + } + + for rows.Next() { + var notifID string + var alertsJSON []byte + if err := rows.Scan(¬ifID, &alertsJSON); err != nil { + log.Error().Err(err).Msg("Failed to scan notification for cancellation") + continue + } + + var alerts []*alerts.Alert + if err := json.Unmarshal(alertsJSON, &alerts); err != nil { + log.Error().Err(err).Str("notifID", notifID).Msg("Failed to unmarshal alerts for cancellation check") + continue + } + + // Check if any alert in this notification matches + for _, alert := range alerts { + if _, exists := alertIDSet[alert.ID]; exists { + toCancelIDs = append(toCancelIDs, notifID) + break + } + } + } + + if err := rows.Err(); err != nil { + return fmt.Errorf("error iterating notifications for cancellation: %w", err) + } + + // Cancel the matched notifications + if len(toCancelIDs) > 0 { + for _, notifID := range toCancelIDs { + if err := nq.UpdateStatus(notifID, QueueStatusCancelled, "Alert resolved"); err != nil { + log.Error().Err(err).Str("notifID", notifID).Msg("Failed to mark notification as cancelled") + } + } + + log.Info(). + Int("count", len(toCancelIDs)). + Strs("alertIDs", alertIDs). + Msg("Cancelled queued notifications for resolved alerts") + } + + return nil +}