mirror of
https://github.com/rcourtman/Pulse.git
synced 2026-04-28 03:20:11 +00:00
fix(notifications): route escalation notifications to selected channels only (#1259)
Escalation was calling SendAlert() which always sends to all enabled channels, ignoring the per-level channel selection (email/webhook/all). Add SendAlertToChannels() that snapshots only the requested channel configs and uses a distinct "_escalation" queue type so the dequeue handler skips cooldown writes — preventing interference with the alert manager's own re-notify cadence.
This commit is contained in:
parent
c213e0ce30
commit
eb2397d99a
3 changed files with 169 additions and 23 deletions
|
|
@ -4621,25 +4621,8 @@ func (m *Monitor) Start(ctx context.Context, wsHub *websocket.Hub) {
|
|||
|
||||
escalationLevel := config.Schedule.Escalation.Levels[level-1]
|
||||
|
||||
// Send notifications based on escalation level
|
||||
switch escalationLevel.Notify {
|
||||
case "email":
|
||||
// Only send email
|
||||
if emailConfig := m.notificationMgr.GetEmailConfig(); emailConfig.Enabled {
|
||||
m.notificationMgr.SendAlert(alert)
|
||||
}
|
||||
case "webhook":
|
||||
// Only send webhooks
|
||||
for _, webhook := range m.notificationMgr.GetWebhooks() {
|
||||
if webhook.Enabled {
|
||||
m.notificationMgr.SendAlert(alert)
|
||||
break
|
||||
}
|
||||
}
|
||||
case "all":
|
||||
// Send all notifications
|
||||
m.notificationMgr.SendAlert(alert)
|
||||
}
|
||||
// Send notifications only to the channels specified in the escalation level
|
||||
m.notificationMgr.SendAlertToChannels(alert, escalationLevel.Notify)
|
||||
|
||||
// Update WebSocket with escalation
|
||||
wsHub.BroadcastAlert(alert)
|
||||
|
|
|
|||
|
|
@ -292,6 +292,24 @@ func TestNormalizeQueueType(t *testing.T) {
|
|||
expectedType: "",
|
||||
expectedEvent: eventResolved,
|
||||
},
|
||||
{
|
||||
name: "email_escalation type",
|
||||
notifType: "email_escalation",
|
||||
expectedType: "email",
|
||||
expectedEvent: eventEscalation,
|
||||
},
|
||||
{
|
||||
name: "webhook_escalation type",
|
||||
notifType: "webhook_escalation",
|
||||
expectedType: "webhook",
|
||||
expectedEvent: eventEscalation,
|
||||
},
|
||||
{
|
||||
name: "apprise_escalation type",
|
||||
notifType: "apprise_escalation",
|
||||
expectedType: "apprise",
|
||||
expectedEvent: eventEscalation,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
|
|
|
|||
|
|
@ -44,15 +44,17 @@ const (
|
|||
)
|
||||
|
||||
const (
|
||||
queueTypeSuffixResolved = "_resolved"
|
||||
metadataResolvedAt = "resolvedAt"
|
||||
queueTypeSuffixResolved = "_resolved"
|
||||
queueTypeSuffixEscalation = "_escalation"
|
||||
metadataResolvedAt = "resolvedAt"
|
||||
)
|
||||
|
||||
type notificationEvent string
|
||||
|
||||
const (
|
||||
eventAlert notificationEvent = "alert"
|
||||
eventResolved notificationEvent = "resolved"
|
||||
eventAlert notificationEvent = "alert"
|
||||
eventResolved notificationEvent = "resolved"
|
||||
eventEscalation notificationEvent = "escalation"
|
||||
)
|
||||
|
||||
// createSecureWebhookClient creates an HTTP client with security controls
|
||||
|
|
@ -674,6 +676,84 @@ func (n *NotificationManager) SendAlert(alert *alerts.Alert) {
|
|||
}
|
||||
}
|
||||
|
||||
// SendAlertToChannels sends an alert to specific notification channels only,
|
||||
// bypassing both alert grouping and the regular notification cooldown. This is
|
||||
// used for escalation notifications where the escalation level specifies which
|
||||
// channels to notify — escalation has its own timing/level progression so the
|
||||
// regular cooldown should not suppress it.
|
||||
// Valid channel values: "email", "webhook", "apprise", "all".
|
||||
func (n *NotificationManager) SendAlertToChannels(alert *alerts.Alert, channels ...string) {
|
||||
n.mu.Lock()
|
||||
|
||||
if !n.enabled {
|
||||
n.mu.Unlock()
|
||||
log.Debug().Msg("Notifications disabled, skipping channel-filtered send")
|
||||
return
|
||||
}
|
||||
|
||||
// Build channel filter (normalize to lowercase for robustness)
|
||||
channelSet := make(map[string]bool, len(channels))
|
||||
for _, ch := range channels {
|
||||
channelSet[strings.ToLower(strings.TrimSpace(ch))] = true
|
||||
}
|
||||
sendAll := channelSet["all"] || len(channels) == 0
|
||||
|
||||
// Snapshot only the requested channel configs
|
||||
var emailConfig EmailConfig
|
||||
if sendAll || channelSet["email"] {
|
||||
emailConfig = copyEmailConfig(n.emailConfig)
|
||||
}
|
||||
|
||||
var webhooks []WebhookConfig
|
||||
if sendAll || channelSet["webhook"] {
|
||||
webhooks = copyWebhookConfigs(n.webhooks)
|
||||
}
|
||||
|
||||
var appriseConfig AppriseConfig
|
||||
if sendAll || channelSet["apprise"] {
|
||||
appriseConfig = copyAppriseConfig(n.appriseConfig)
|
||||
}
|
||||
|
||||
// Check that at least one requested channel is actually enabled
|
||||
hasEnabled := (emailConfig.Enabled) ||
|
||||
(appriseConfig.Enabled) ||
|
||||
func() bool {
|
||||
for _, w := range webhooks {
|
||||
if w.Enabled {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}()
|
||||
|
||||
if !hasEnabled {
|
||||
n.mu.Unlock()
|
||||
log.Info().
|
||||
Str("alertID", alert.ID).
|
||||
Strs("channels", channels).
|
||||
Msg("No enabled channels match escalation target - skipping")
|
||||
return
|
||||
}
|
||||
|
||||
n.mu.Unlock()
|
||||
|
||||
log.Info().
|
||||
Str("alertID", alert.ID).
|
||||
Strs("channels", channels).
|
||||
Msg("Sending escalation notification to filtered channels")
|
||||
|
||||
alertsToSend := []*alerts.Alert{alert}
|
||||
|
||||
// Use persistent queue with escalation-typed entries so sends get retried
|
||||
// on failure, but the dequeue handler skips cooldown writes (it only marks
|
||||
// cooldown for eventAlert, not eventEscalation).
|
||||
if n.queue != nil {
|
||||
n.enqueueEscalationNotifications(emailConfig, webhooks, appriseConfig, alertsToSend)
|
||||
} else {
|
||||
n.sendNotificationsDirect(emailConfig, webhooks, appriseConfig, alertsToSend)
|
||||
}
|
||||
}
|
||||
|
||||
// SendResolvedAlert delivers notifications for a resolved alert immediately.
|
||||
func (n *NotificationManager) SendResolvedAlert(resolved *alerts.ResolvedAlert) {
|
||||
if resolved == nil || resolved.Alert == nil {
|
||||
|
|
@ -901,6 +981,68 @@ func (n *NotificationManager) enqueueNotifications(emailConfig EmailConfig, webh
|
|||
}
|
||||
}
|
||||
|
||||
// enqueueEscalationNotifications adds escalation notifications to the persistent
|
||||
// queue using the "_escalation" type suffix. The dequeue handler only marks
|
||||
// cooldown for eventAlert, so escalation sends won't interfere with the alert
|
||||
// manager's own re-notify cadence.
|
||||
func (n *NotificationManager) enqueueEscalationNotifications(emailConfig EmailConfig, webhooks []WebhookConfig, appriseConfig AppriseConfig, alertsToSend []*alerts.Alert) {
|
||||
if emailConfig.Enabled {
|
||||
configJSON, err := json.Marshal(emailConfig)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to marshal email config for escalation queue")
|
||||
} else {
|
||||
notif := &QueuedNotification{
|
||||
Type: "email" + queueTypeSuffixEscalation,
|
||||
Alerts: alertsToSend,
|
||||
Config: configJSON,
|
||||
MaxAttempts: 3,
|
||||
}
|
||||
if err := n.queue.Enqueue(notif); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to enqueue escalation email - falling back to direct send")
|
||||
go n.sendGroupedEmail(emailConfig, alertsToSend)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, webhook := range webhooks {
|
||||
if webhook.Enabled {
|
||||
configJSON, err := json.Marshal(webhook)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("webhookName", webhook.Name).Msg("Failed to marshal webhook config for escalation queue")
|
||||
} else {
|
||||
notif := &QueuedNotification{
|
||||
Type: "webhook" + queueTypeSuffixEscalation,
|
||||
Alerts: alertsToSend,
|
||||
Config: configJSON,
|
||||
MaxAttempts: 3,
|
||||
}
|
||||
if err := n.queue.Enqueue(notif); err != nil {
|
||||
log.Error().Err(err).Str("webhookName", webhook.Name).Msg("Failed to enqueue escalation webhook - falling back to direct send")
|
||||
go n.sendGroupedWebhook(webhook, alertsToSend)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if appriseConfig.Enabled {
|
||||
configJSON, err := json.Marshal(appriseConfig)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to marshal apprise config for escalation queue")
|
||||
} else {
|
||||
notif := &QueuedNotification{
|
||||
Type: "apprise" + queueTypeSuffixEscalation,
|
||||
Alerts: alertsToSend,
|
||||
Config: configJSON,
|
||||
MaxAttempts: 3,
|
||||
}
|
||||
if err := n.queue.Enqueue(notif); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to enqueue escalation apprise - falling back to direct send")
|
||||
go n.sendGroupedApprise(appriseConfig, alertsToSend)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// enqueueResolvedNotifications adds resolved notifications to the persistent queue.
|
||||
func (n *NotificationManager) enqueueResolvedNotifications(queue *NotificationQueue, emailConfig EmailConfig, webhooks []WebhookConfig, appriseConfig AppriseConfig, alertsToSend []*alerts.Alert, resolvedAt time.Time) {
|
||||
if queue == nil {
|
||||
|
|
@ -3093,6 +3235,9 @@ func normalizeQueueType(notifType string) (string, notificationEvent) {
|
|||
if strings.HasSuffix(notifType, queueTypeSuffixResolved) {
|
||||
return strings.TrimSuffix(notifType, queueTypeSuffixResolved), eventResolved
|
||||
}
|
||||
if strings.HasSuffix(notifType, queueTypeSuffixEscalation) {
|
||||
return strings.TrimSuffix(notifType, queueTypeSuffixEscalation), eventEscalation
|
||||
}
|
||||
return notifType, eventAlert
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue