diff --git a/notifications/cleaner.go b/notifications/cleaner.go index 436a9c9..f9e2ffd 100644 --- a/notifications/cleaner.go +++ b/notifications/cleaner.go @@ -3,48 +3,49 @@ package notifications import ( "context" "time" - - "github.com/safing/portbase/log" ) -func cleaner(ctx context.Context) error { - ticker := time.NewTicker(5 * time.Second) +func cleaner(ctx context.Context) error { //nolint:unparam // Conforms to worker interface + ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() -L: for { select { case <-ctx.Done(): - break L + return nil case <-ticker.C: deleteExpiredNotifs() } } - return nil } func deleteExpiredNotifs() { - now := time.Now().Unix() + // Get a copy of the notification map. + notsCopy := getNotsCopy() - notsLock.Lock() - defer notsLock.Unlock() - - toDelete := make([]*Notification, 0, len(nots)) - for _, n := range nots { - n.Lock() - if now > n.Expires { - toDelete = append(toDelete, n) - } - n.Unlock() - } - - for _, n := range toDelete { - n.Lock() - err := n.delete(true) - n.Unlock() - - if err != nil { - log.Debugf("notifications: failed to delete %s: %s", n.EventID, err) + // Delete all expired notifications. + for _, n := range notsCopy { + if n.isExpired() { + n.delete(true) } } } + +func (n *Notification) isExpired() bool { + n.Lock() + defer n.Unlock() + + return n.Expires > 0 && n.Expires < time.Now().Unix() +} + +func getNotsCopy() []*Notification { + notsLock.RLock() + defer notsLock.RUnlock() + + notsCopy := make([]*Notification, 0, len(nots)) + for _, n := range nots { + notsCopy = append(notsCopy, n) + } + + return notsCopy +} diff --git a/notifications/database.go b/notifications/database.go index a06c2f3..f9d8be7 100644 --- a/notifications/database.go +++ b/notifications/database.go @@ -54,21 +54,27 @@ func registerAsDatabase() error { // Get returns a database record. func (s *StorageInterface) Get(key string) (record.Record, error) { - notsLock.RLock() - defer notsLock.RUnlock() - - // transform key + // Get EventID from key. if !strings.HasPrefix(key, "all/") { return nil, storage.ErrNotFound } key = strings.TrimPrefix(key, "all/") - // get notification - not, ok := nots[key] + // Get notification from storage. + n, ok := getNotification(key) if !ok { return nil, storage.ErrNotFound } - return not, nil + + return n, nil +} + +func getNotification(eventID string) (n *Notification, ok bool) { + notsLock.RLock() + defer notsLock.RUnlock() + + n, ok = nots[eventID] + return } // Query returns a an iterator for the supplied query. @@ -81,16 +87,12 @@ func (s *StorageInterface) Query(q *query.Query, local, internal bool) (*iterato } func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) { - notsLock.RLock() - defer notsLock.RUnlock() + // Get a copy of the notification map. + notsCopy := getNotsCopy() // send all notifications - for _, n := range nots { - if n.Meta().IsDeleted() { - continue - } - - if q.MatchesKey(n.DatabaseKey()) && q.MatchesRecord(n) { + for _, n := range notsCopy { + if inQuery(n, q) { select { case it.Next <- n: case <-it.Done: @@ -103,6 +105,22 @@ func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) { it.Finish(nil) } +func inQuery(n *Notification, q *query.Query) bool { + n.lock.Lock() + defer n.lock.Unlock() + + switch { + case n.Meta().IsDeleted(): + return false + case !q.MatchesKey(n.DatabaseKey()): + return false + case !q.MatchesRecord(n): + return false + } + + return true +} + // Put stores a record in the database. func (s *StorageInterface) Put(r record.Record) (record.Record, error) { // record is already locked! @@ -125,12 +143,9 @@ func (s *StorageInterface) Put(r record.Record) (record.Record, error) { func applyUpdate(n *Notification, key string) (*Notification, error) { // separate goroutine in order to correctly lock notsLock - notsLock.RLock() - existing, ok := nots[key] - notsLock.RUnlock() + existing, ok := getNotification(key) // ignore if already deleted - if !ok || existing.Meta().IsDeleted() { // this is a completely new notification // we pass pushUpdate==false because the storage @@ -139,6 +154,14 @@ func applyUpdate(n *Notification, key string) (*Notification, error) { return n, nil } + // Save when we're finished, if needed. + save := false + defer func() { + if save { + existing.save(false) + } + }() + existing.Lock() defer existing.Unlock() @@ -146,8 +169,6 @@ func applyUpdate(n *Notification, key string) (*Notification, error) { return existing, fmt.Errorf("action already executed") } - save := false - // check if the notification has been marked as // "executed externally". if n.State == Executed { @@ -171,33 +192,25 @@ func applyUpdate(n *Notification, key string) (*Notification, error) { save = true } - if save { - existing.save(false) - } - return existing, nil } // Delete deletes a record from the database. func (s *StorageInterface) Delete(key string) error { - // transform key + // Get EventID from key. if !strings.HasPrefix(key, "all/") { return storage.ErrNotFound } key = strings.TrimPrefix(key, "all/") - notsLock.Lock() - defer notsLock.Unlock() - - n, ok := nots[key] + // Get notification from storage. + n, ok := getNotification(key) if !ok { return storage.ErrNotFound } - n.Lock() - defer n.Unlock() - - return n.delete(true) + n.delete(true) + return nil } // ReadOnly returns whether the database is read only. diff --git a/notifications/notification.go b/notifications/notification.go index 1b3cf52..aa6484c 100644 --- a/notifications/notification.go +++ b/notifications/notification.go @@ -1,6 +1,7 @@ package notifications import ( + "context" "fmt" "sync" "time" @@ -23,6 +24,10 @@ const ( // State describes the state of a notification. type State string +// NotificationActionFn defines the function signature for notification action +// functions. +type NotificationActionFn func(context.Context, *Notification) error + // Possible notification states. // State transitions can only happen from top to bottom. const ( @@ -81,9 +86,9 @@ type Notification struct { SelectedActionID string lock sync.Mutex - actionFunction func(*Notification) // call function to process action - actionTrigger chan string // and/or send to a channel - expiredTrigger chan struct{} // closed on expire + actionFunction NotificationActionFn // call function to process action + actionTrigger chan string // and/or send to a channel + expiredTrigger chan struct{} // closed on expire } // Action describes an action that can be taken for a notification. @@ -92,8 +97,6 @@ type Action struct { Text string } -func noOpAction(n *Notification) {} - // Get returns the notification identifed by the given id or nil if it doesn't exist. func Get(id string) *Notification { notsLock.RLock() @@ -149,18 +152,34 @@ func notify(nType Type, id string, msg string, actions ...Action) *Notification // Save saves the notification and returns it. func (n *Notification) Save() *Notification { - n.Lock() - defer n.Unlock() - return n.save(true) } func (n *Notification) save(pushUpdate bool) *Notification { + var id string + + // Delete notification after processing deletion. + defer func() { + // Lock and delete from notification storage. + notsLock.Lock() + defer notsLock.Unlock() + nots[id] = n + }() + + // We do not access EventData here, so it is enough to just lock the + // notification itself. + n.lock.Lock() + defer n.lock.Unlock() + + // Save ID for deletion + id = n.EventID + + // Generate random GUID if not set. if n.GUID == "" { n.GUID = utils.RandomUUID(n.EventID).String() } - // make ack notification if there are no defined actions + // Make ack notification if there are no defined actions. if len(n.AvailableActions) == 0 { n.AvailableActions = []*Action{ { @@ -168,12 +187,6 @@ func (n *Notification) save(pushUpdate bool) *Notification { Text: "OK", }, } - n.actionFunction = noOpAction - } - - // Make sure we always have a reasonable expiration set. - if n.Expires == 0 { - n.Expires = time.Now().Add(72 * time.Hour).Unix() } // Make sure we always have a notification state assigned. @@ -182,17 +195,14 @@ func (n *Notification) save(pushUpdate bool) *Notification { } // check key - if n.DatabaseKey() == "" { + if !n.KeyIsSet() { n.SetKey(fmt.Sprintf("notifications:all/%s", n.EventID)) } + // Update meta data. n.UpdateMeta() - // store the notification inside or map - notsLock.Lock() - nots[n.EventID] = n - notsLock.Unlock() - + // Push update via the database system if needed. if pushUpdate { log.Tracef("notifications: pushing update for %s to subscribers", n.Key()) dbController.PushUpdate(n) @@ -203,7 +213,7 @@ func (n *Notification) save(pushUpdate bool) *Notification { // SetActionFunction sets a trigger function to be executed when the user reacted on the notification. // The provided function will be started as its own goroutine and will have to lock everything it accesses, even the provided notification. -func (n *Notification) SetActionFunction(fn func(*Notification)) *Notification { +func (n *Notification) SetActionFunction(fn NotificationActionFn) *Notification { n.lock.Lock() defer n.lock.Unlock() n.actionFunction = fn @@ -224,43 +234,70 @@ func (n *Notification) Response() <-chan string { // Update updates/resends a notification if it was not already responded to. func (n *Notification) Update(expires int64) { + // Save when we're finished, if needed. + save := false + defer func() { + if save { + n.save(true) + } + }() + n.lock.Lock() defer n.lock.Unlock() - if n.State == Active { - n.Expires = expires - n.save(true) + // Don't update if notification isn't active. + if n.State != Active { + return } + + // Don't update too quickly. + if n.Meta().Modified > time.Now().Add(-10*time.Second).Unix() { + return + } + + // Update expiry and save. + n.Expires = expires + save = true } // Delete (prematurely) cancels and deletes a notification. func (n *Notification) Delete() error { - notsLock.Lock() - defer notsLock.Unlock() - n.Lock() - defer n.Unlock() - - return n.delete(true) + n.delete(true) + return nil } -func (n *Notification) delete(pushUpdate bool) error { - // mark as deleted +func (n *Notification) delete(pushUpdate bool) { + var id string + + // Delete notification after processing deletion. + defer func() { + // Lock and delete from notification storage. + notsLock.Lock() + defer notsLock.Unlock() + delete(nots, id) + }() + + // We do not access EventData here, so it is enough to just lock the + // notification itself. + n.lock.Lock() + defer n.lock.Unlock() + + // Save ID for deletion + id = n.EventID + + // Mark notification as deleted. n.Meta().Delete() - // delete from internal storage - delete(nots, n.EventID) - - // close expired + // Close expiry channel if available. if n.expiredTrigger != nil { close(n.expiredTrigger) n.expiredTrigger = nil } + // Push update via the database system if needed. if pushUpdate { dbController.PushUpdate(n) } - - return nil } // Expired notifies the caller when the notification has expired. @@ -286,7 +323,9 @@ func (n *Notification) selectAndExecuteAction(id string) { executed := false if n.actionFunction != nil { - go n.actionFunction(n) + module.StartWorker("notification action execution", func(ctx context.Context) error { + return n.actionFunction(ctx, n) + }) executed = true } @@ -336,14 +375,3 @@ func (n *Notification) Unlock() { locker.Unlock() } } - -func duplicateActions(original []*Action) (duplicate []*Action) { - duplicate = make([]*Action, len(original)) - for _, action := range original { - duplicate = append(duplicate, &Action{ - ID: action.ID, - Text: action.Text, - }) - } - return -}