Clean up notification storage

This commit is contained in:
Daniel 2020-10-29 13:59:12 +01:00
parent dbbe556f3c
commit 1038e875da
3 changed files with 154 additions and 112 deletions

View file

@ -3,48 +3,49 @@ package notifications
import ( import (
"context" "context"
"time" "time"
"github.com/safing/portbase/log"
) )
func cleaner(ctx context.Context) error { func cleaner(ctx context.Context) error { //nolint:unparam // Conforms to worker interface
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop() defer ticker.Stop()
L:
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
break L return nil
case <-ticker.C: case <-ticker.C:
deleteExpiredNotifs() deleteExpiredNotifs()
} }
} }
return nil
} }
func deleteExpiredNotifs() { func deleteExpiredNotifs() {
now := time.Now().Unix() // Get a copy of the notification map.
notsCopy := getNotsCopy()
notsLock.Lock() // Delete all expired notifications.
defer notsLock.Unlock() for _, n := range notsCopy {
if n.isExpired() {
toDelete := make([]*Notification, 0, len(nots)) n.delete(true)
for _, n := range nots {
n.Lock()
if now > n.Expires {
toDelete = append(toDelete, n)
}
n.Unlock()
}
for _, n := range toDelete {
n.Lock()
err := n.delete(true)
n.Unlock()
if err != nil {
log.Debugf("notifications: failed to delete %s: %s", n.EventID, err)
} }
} }
} }
func (n *Notification) isExpired() bool {
n.Lock()
defer n.Unlock()
return n.Expires > 0 && n.Expires < time.Now().Unix()
}
func getNotsCopy() []*Notification {
notsLock.RLock()
defer notsLock.RUnlock()
notsCopy := make([]*Notification, 0, len(nots))
for _, n := range nots {
notsCopy = append(notsCopy, n)
}
return notsCopy
}

View file

@ -54,21 +54,27 @@ func registerAsDatabase() error {
// Get returns a database record. // Get returns a database record.
func (s *StorageInterface) Get(key string) (record.Record, error) { func (s *StorageInterface) Get(key string) (record.Record, error) {
notsLock.RLock() // Get EventID from key.
defer notsLock.RUnlock()
// transform key
if !strings.HasPrefix(key, "all/") { if !strings.HasPrefix(key, "all/") {
return nil, storage.ErrNotFound return nil, storage.ErrNotFound
} }
key = strings.TrimPrefix(key, "all/") key = strings.TrimPrefix(key, "all/")
// get notification // Get notification from storage.
not, ok := nots[key] n, ok := getNotification(key)
if !ok { if !ok {
return nil, storage.ErrNotFound return nil, storage.ErrNotFound
} }
return not, nil
return n, nil
}
func getNotification(eventID string) (n *Notification, ok bool) {
notsLock.RLock()
defer notsLock.RUnlock()
n, ok = nots[eventID]
return
} }
// Query returns a an iterator for the supplied query. // Query returns a an iterator for the supplied query.
@ -81,16 +87,12 @@ func (s *StorageInterface) Query(q *query.Query, local, internal bool) (*iterato
} }
func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) { func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) {
notsLock.RLock() // Get a copy of the notification map.
defer notsLock.RUnlock() notsCopy := getNotsCopy()
// send all notifications // send all notifications
for _, n := range nots { for _, n := range notsCopy {
if n.Meta().IsDeleted() { if inQuery(n, q) {
continue
}
if q.MatchesKey(n.DatabaseKey()) && q.MatchesRecord(n) {
select { select {
case it.Next <- n: case it.Next <- n:
case <-it.Done: case <-it.Done:
@ -103,6 +105,22 @@ func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) {
it.Finish(nil) it.Finish(nil)
} }
func inQuery(n *Notification, q *query.Query) bool {
n.lock.Lock()
defer n.lock.Unlock()
switch {
case n.Meta().IsDeleted():
return false
case !q.MatchesKey(n.DatabaseKey()):
return false
case !q.MatchesRecord(n):
return false
}
return true
}
// Put stores a record in the database. // Put stores a record in the database.
func (s *StorageInterface) Put(r record.Record) (record.Record, error) { func (s *StorageInterface) Put(r record.Record) (record.Record, error) {
// record is already locked! // record is already locked!
@ -125,12 +143,9 @@ func (s *StorageInterface) Put(r record.Record) (record.Record, error) {
func applyUpdate(n *Notification, key string) (*Notification, error) { func applyUpdate(n *Notification, key string) (*Notification, error) {
// separate goroutine in order to correctly lock notsLock // separate goroutine in order to correctly lock notsLock
notsLock.RLock() existing, ok := getNotification(key)
existing, ok := nots[key]
notsLock.RUnlock()
// ignore if already deleted // ignore if already deleted
if !ok || existing.Meta().IsDeleted() { if !ok || existing.Meta().IsDeleted() {
// this is a completely new notification // this is a completely new notification
// we pass pushUpdate==false because the storage // we pass pushUpdate==false because the storage
@ -139,6 +154,14 @@ func applyUpdate(n *Notification, key string) (*Notification, error) {
return n, nil return n, nil
} }
// Save when we're finished, if needed.
save := false
defer func() {
if save {
existing.save(false)
}
}()
existing.Lock() existing.Lock()
defer existing.Unlock() defer existing.Unlock()
@ -146,8 +169,6 @@ func applyUpdate(n *Notification, key string) (*Notification, error) {
return existing, fmt.Errorf("action already executed") return existing, fmt.Errorf("action already executed")
} }
save := false
// check if the notification has been marked as // check if the notification has been marked as
// "executed externally". // "executed externally".
if n.State == Executed { if n.State == Executed {
@ -171,33 +192,25 @@ func applyUpdate(n *Notification, key string) (*Notification, error) {
save = true save = true
} }
if save {
existing.save(false)
}
return existing, nil return existing, nil
} }
// Delete deletes a record from the database. // Delete deletes a record from the database.
func (s *StorageInterface) Delete(key string) error { func (s *StorageInterface) Delete(key string) error {
// transform key // Get EventID from key.
if !strings.HasPrefix(key, "all/") { if !strings.HasPrefix(key, "all/") {
return storage.ErrNotFound return storage.ErrNotFound
} }
key = strings.TrimPrefix(key, "all/") key = strings.TrimPrefix(key, "all/")
notsLock.Lock() // Get notification from storage.
defer notsLock.Unlock() n, ok := getNotification(key)
n, ok := nots[key]
if !ok { if !ok {
return storage.ErrNotFound return storage.ErrNotFound
} }
n.Lock() n.delete(true)
defer n.Unlock() return nil
return n.delete(true)
} }
// ReadOnly returns whether the database is read only. // ReadOnly returns whether the database is read only.

View file

@ -1,6 +1,7 @@
package notifications package notifications
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
"time" "time"
@ -23,6 +24,10 @@ const (
// State describes the state of a notification. // State describes the state of a notification.
type State string type State string
// NotificationActionFn defines the function signature for notification action
// functions.
type NotificationActionFn func(context.Context, *Notification) error
// Possible notification states. // Possible notification states.
// State transitions can only happen from top to bottom. // State transitions can only happen from top to bottom.
const ( const (
@ -81,9 +86,9 @@ type Notification struct {
SelectedActionID string SelectedActionID string
lock sync.Mutex lock sync.Mutex
actionFunction func(*Notification) // call function to process action actionFunction NotificationActionFn // call function to process action
actionTrigger chan string // and/or send to a channel actionTrigger chan string // and/or send to a channel
expiredTrigger chan struct{} // closed on expire expiredTrigger chan struct{} // closed on expire
} }
// Action describes an action that can be taken for a notification. // Action describes an action that can be taken for a notification.
@ -92,8 +97,6 @@ type Action struct {
Text string Text string
} }
func noOpAction(n *Notification) {}
// Get returns the notification identifed by the given id or nil if it doesn't exist. // Get returns the notification identifed by the given id or nil if it doesn't exist.
func Get(id string) *Notification { func Get(id string) *Notification {
notsLock.RLock() notsLock.RLock()
@ -149,18 +152,34 @@ func notify(nType Type, id string, msg string, actions ...Action) *Notification
// Save saves the notification and returns it. // Save saves the notification and returns it.
func (n *Notification) Save() *Notification { func (n *Notification) Save() *Notification {
n.Lock()
defer n.Unlock()
return n.save(true) return n.save(true)
} }
func (n *Notification) save(pushUpdate bool) *Notification { func (n *Notification) save(pushUpdate bool) *Notification {
var id string
// Delete notification after processing deletion.
defer func() {
// Lock and delete from notification storage.
notsLock.Lock()
defer notsLock.Unlock()
nots[id] = n
}()
// We do not access EventData here, so it is enough to just lock the
// notification itself.
n.lock.Lock()
defer n.lock.Unlock()
// Save ID for deletion
id = n.EventID
// Generate random GUID if not set.
if n.GUID == "" { if n.GUID == "" {
n.GUID = utils.RandomUUID(n.EventID).String() n.GUID = utils.RandomUUID(n.EventID).String()
} }
// make ack notification if there are no defined actions // Make ack notification if there are no defined actions.
if len(n.AvailableActions) == 0 { if len(n.AvailableActions) == 0 {
n.AvailableActions = []*Action{ n.AvailableActions = []*Action{
{ {
@ -168,12 +187,6 @@ func (n *Notification) save(pushUpdate bool) *Notification {
Text: "OK", Text: "OK",
}, },
} }
n.actionFunction = noOpAction
}
// Make sure we always have a reasonable expiration set.
if n.Expires == 0 {
n.Expires = time.Now().Add(72 * time.Hour).Unix()
} }
// Make sure we always have a notification state assigned. // Make sure we always have a notification state assigned.
@ -182,17 +195,14 @@ func (n *Notification) save(pushUpdate bool) *Notification {
} }
// check key // check key
if n.DatabaseKey() == "" { if !n.KeyIsSet() {
n.SetKey(fmt.Sprintf("notifications:all/%s", n.EventID)) n.SetKey(fmt.Sprintf("notifications:all/%s", n.EventID))
} }
// Update meta data.
n.UpdateMeta() n.UpdateMeta()
// store the notification inside or map // Push update via the database system if needed.
notsLock.Lock()
nots[n.EventID] = n
notsLock.Unlock()
if pushUpdate { if pushUpdate {
log.Tracef("notifications: pushing update for %s to subscribers", n.Key()) log.Tracef("notifications: pushing update for %s to subscribers", n.Key())
dbController.PushUpdate(n) dbController.PushUpdate(n)
@ -203,7 +213,7 @@ func (n *Notification) save(pushUpdate bool) *Notification {
// SetActionFunction sets a trigger function to be executed when the user reacted on the notification. // SetActionFunction sets a trigger function to be executed when the user reacted on the notification.
// The provided function will be started as its own goroutine and will have to lock everything it accesses, even the provided notification. // The provided function will be started as its own goroutine and will have to lock everything it accesses, even the provided notification.
func (n *Notification) SetActionFunction(fn func(*Notification)) *Notification { func (n *Notification) SetActionFunction(fn NotificationActionFn) *Notification {
n.lock.Lock() n.lock.Lock()
defer n.lock.Unlock() defer n.lock.Unlock()
n.actionFunction = fn n.actionFunction = fn
@ -224,43 +234,70 @@ func (n *Notification) Response() <-chan string {
// Update updates/resends a notification if it was not already responded to. // Update updates/resends a notification if it was not already responded to.
func (n *Notification) Update(expires int64) { func (n *Notification) Update(expires int64) {
// Save when we're finished, if needed.
save := false
defer func() {
if save {
n.save(true)
}
}()
n.lock.Lock() n.lock.Lock()
defer n.lock.Unlock() defer n.lock.Unlock()
if n.State == Active { // Don't update if notification isn't active.
n.Expires = expires if n.State != Active {
n.save(true) return
} }
// Don't update too quickly.
if n.Meta().Modified > time.Now().Add(-10*time.Second).Unix() {
return
}
// Update expiry and save.
n.Expires = expires
save = true
} }
// Delete (prematurely) cancels and deletes a notification. // Delete (prematurely) cancels and deletes a notification.
func (n *Notification) Delete() error { func (n *Notification) Delete() error {
notsLock.Lock() n.delete(true)
defer notsLock.Unlock() return nil
n.Lock()
defer n.Unlock()
return n.delete(true)
} }
func (n *Notification) delete(pushUpdate bool) error { func (n *Notification) delete(pushUpdate bool) {
// mark as deleted var id string
// Delete notification after processing deletion.
defer func() {
// Lock and delete from notification storage.
notsLock.Lock()
defer notsLock.Unlock()
delete(nots, id)
}()
// We do not access EventData here, so it is enough to just lock the
// notification itself.
n.lock.Lock()
defer n.lock.Unlock()
// Save ID for deletion
id = n.EventID
// Mark notification as deleted.
n.Meta().Delete() n.Meta().Delete()
// delete from internal storage // Close expiry channel if available.
delete(nots, n.EventID)
// close expired
if n.expiredTrigger != nil { if n.expiredTrigger != nil {
close(n.expiredTrigger) close(n.expiredTrigger)
n.expiredTrigger = nil n.expiredTrigger = nil
} }
// Push update via the database system if needed.
if pushUpdate { if pushUpdate {
dbController.PushUpdate(n) dbController.PushUpdate(n)
} }
return nil
} }
// Expired notifies the caller when the notification has expired. // Expired notifies the caller when the notification has expired.
@ -286,7 +323,9 @@ func (n *Notification) selectAndExecuteAction(id string) {
executed := false executed := false
if n.actionFunction != nil { if n.actionFunction != nil {
go n.actionFunction(n) module.StartWorker("notification action execution", func(ctx context.Context) error {
return n.actionFunction(ctx, n)
})
executed = true executed = true
} }
@ -336,14 +375,3 @@ func (n *Notification) Unlock() {
locker.Unlock() locker.Unlock()
} }
} }
func duplicateActions(original []*Action) (duplicate []*Action) {
duplicate = make([]*Action, len(original))
for _, action := range original {
duplicate = append(duplicate, &Action{
ID: action.ID,
Text: action.Text,
})
}
return
}