Implement queue cancellation and atomic DB operations (P1 fixes)

Queue cancellation mechanism:
- Add CancelByAlertIDs method to mark queued notifications as cancelled when alerts resolve
- Update CancelAlert to cancel queued notifications containing resolved alert IDs
- Skip cancelled notifications in queue processor
- Prevents resolved alerts from triggering notifications after they clear

Atomic DB operations:
- Add IncrementAttemptAndSetStatus to atomically update attempt counter and status
- Replace separate IncrementAttempt + UpdateStatus calls with single atomic operation
- Prevents orphaned queue entries when crashes occur between operations
- Eliminates race condition where rows get stuck in "pending" or "sending" status

These fixes ensure queued notifications are properly cancelled when alerts resolve
and prevent database inconsistencies during crash scenarios.
This commit is contained in:
rcourtman 2025-11-07 08:33:09 +00:00
parent c6a69e525c
commit 7ee11105f5
2 changed files with 104 additions and 6 deletions

View file

@ -618,6 +618,13 @@ func (n *NotificationManager) CancelAlert(alertID string) {
// Clean up cooldown record for resolved alert
delete(n.lastNotified, alertID)
// Cancel any queued notifications containing this alert
if n.queue != nil {
if err := n.queue.CancelByAlertIDs([]string{alertID}); err != nil {
log.Error().Err(err).Str("alertID", alertID).Msg("Failed to cancel queued notifications")
}
}
log.Debug().
Str("alertID", alertID).
Int("remaining", len(n.pendingAlerts)).

View file

@ -278,6 +278,25 @@ func (nq *NotificationQueue) IncrementAttempt(id string) error {
return nil
}
// IncrementAttemptAndSetStatus atomically increments attempt counter and sets status in a single operation
func (nq *NotificationQueue) IncrementAttemptAndSetStatus(id string, status NotificationQueueStatus) error {
nq.mu.Lock()
defer nq.mu.Unlock()
query := `
UPDATE notification_queue
SET attempts = attempts + 1,
status = ?,
last_attempt = ?
WHERE id = ?
`
_, err := nq.db.Exec(query, status, time.Now().Unix(), id)
if err != nil {
return fmt.Errorf("failed to increment attempt and set status: %w", err)
}
return nil
}
// GetPending returns notifications ready for processing
func (nq *NotificationQueue) GetPending(limit int) ([]*QueuedNotification, error) {
nq.mu.RLock()
@ -555,15 +574,15 @@ func (nq *NotificationQueue) processBatch() {
// processNotification processes a single notification
func (nq *NotificationQueue) processNotification(notif *QueuedNotification) {
// Increment attempt counter once at the start of processing
if err := nq.IncrementAttempt(notif.ID); err != nil {
log.Error().Err(err).Str("id", notif.ID).Msg("Failed to increment attempt counter")
// Skip cancelled notifications
if notif.Status == QueueStatusCancelled {
log.Debug().Str("id", notif.ID).Msg("Skipping cancelled notification")
return
}
// Update status to sending (without incrementing attempts again)
if err := nq.UpdateStatus(notif.ID, QueueStatusSending, ""); err != nil {
log.Error().Err(err).Str("id", notif.ID).Msg("Failed to update notification status to sending")
// Atomically increment attempt counter and set status to sending
if err := nq.IncrementAttemptAndSetStatus(notif.ID, QueueStatusSending); err != nil {
log.Error().Err(err).Str("id", notif.ID).Msg("Failed to increment attempt and set status")
return
}
@ -709,3 +728,75 @@ func calculateBackoff(attempt int) time.Duration {
}
return backoff
}
// CancelByAlertIDs marks all queued notifications containing any of the given alert IDs as cancelled
func (nq *NotificationQueue) CancelByAlertIDs(alertIDs []string) error {
if len(alertIDs) == 0 {
return nil
}
nq.mu.Lock()
defer nq.mu.Unlock()
// Query pending/sending notifications
query := `
SELECT id, alerts
FROM notification_queue
WHERE status IN ('pending', 'sending')
`
rows, err := nq.db.Query(query)
if err != nil {
return fmt.Errorf("failed to query notifications for cancellation: %w", err)
}
defer rows.Close()
var toCancelIDs []string
alertIDSet := make(map[string]struct{})
for _, id := range alertIDs {
alertIDSet[id] = struct{}{}
}
for rows.Next() {
var notifID string
var alertsJSON []byte
if err := rows.Scan(&notifID, &alertsJSON); err != nil {
log.Error().Err(err).Msg("Failed to scan notification for cancellation")
continue
}
var alerts []*alerts.Alert
if err := json.Unmarshal(alertsJSON, &alerts); err != nil {
log.Error().Err(err).Str("notifID", notifID).Msg("Failed to unmarshal alerts for cancellation check")
continue
}
// Check if any alert in this notification matches
for _, alert := range alerts {
if _, exists := alertIDSet[alert.ID]; exists {
toCancelIDs = append(toCancelIDs, notifID)
break
}
}
}
if err := rows.Err(); err != nil {
return fmt.Errorf("error iterating notifications for cancellation: %w", err)
}
// Cancel the matched notifications
if len(toCancelIDs) > 0 {
for _, notifID := range toCancelIDs {
if err := nq.UpdateStatus(notifID, QueueStatusCancelled, "Alert resolved"); err != nil {
log.Error().Err(err).Str("notifID", notifID).Msg("Failed to mark notification as cancelled")
}
}
log.Info().
Int("count", len(toCancelIDs)).
Strs("alertIDs", alertIDs).
Msg("Cancelled queued notifications for resolved alerts")
}
return nil
}