diff --git a/notifications/database.go b/notifications/database.go index 58b455c..1bc3cb6 100644 --- a/notifications/database.go +++ b/notifications/database.go @@ -19,9 +19,6 @@ var ( notsLock sync.RWMutex dbController *database.Controller - dbInterface *database.Interface - - persistentBasePath string ) // Storage interface errors @@ -31,13 +28,6 @@ var ( ErrNoDelete = errors.New("notifications may not be deleted, they must be handled") ) -// SetPersistenceBasePath sets the base path for persisting persistent notifications. -func SetPersistenceBasePath(dbBasePath string) { - if persistentBasePath == "" { - persistentBasePath = dbBasePath - } -} - // StorageInterface provices a storage.Interface to the configuration manager. type StorageInterface struct { storage.InjectBase @@ -69,18 +59,17 @@ func (s *StorageInterface) Get(key string) (record.Record, error) { defer notsLock.RUnlock() // transform key - if strings.HasPrefix(key, "all/") { - key = strings.TrimPrefix(key, "all/") - } else { + if !strings.HasPrefix(key, "all/") { return nil, storage.ErrNotFound } + key = strings.TrimPrefix(key, "all/") // get notification not, ok := nots[key] - if ok { - return not, nil + if !ok { + return nil, storage.ErrNotFound } - return nil, storage.ErrNotFound + return not, nil } // Query returns a an iterator for the supplied query. @@ -103,7 +92,12 @@ func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) { } if q.MatchesKey(n.DatabaseKey()) && q.MatchesRecord(n) { - it.Next <- n + select { + case it.Next <- n: + case <-it.Done: + // make sure we don't leak this goroutine if the iterator get's cancelled + return + } } } @@ -127,56 +121,68 @@ func (s *StorageInterface) Put(r record.Record) (record.Record, error) { return nil, ErrInvalidPath } - // continue in goroutine - go UpdateNotification(n, key) - - return n, nil + return applyUpdate(n, key) } -// UpdateNotification updates a notification with input from a database action. Notification will not be saved/propagated if there is no valid change. -func UpdateNotification(n *Notification, key string) { - n.Lock() - defer n.Unlock() - +func applyUpdate(n *Notification, key string) (*Notification, error) { // separate goroutine in order to correctly lock notsLock notsLock.RLock() - origN, ok := nots[key] + existing, ok := nots[key] notsLock.RUnlock() + // ignore if already deleted + + if !ok || existing.Meta().IsDeleted() { + // this is a completely new notification + // we pass pushUpdate==false because the storage + // controller will push an update on put anyway. + n.save(false) + return n, nil + } + + existing.Lock() + defer existing.Unlock() + + // A notification can only be updated to select and execute the + // notification action. If the existing one already has an + // Executed timestamp this update request is invalid + if existing.Executed > 0 { + return existing, fmt.Errorf("action already executed at %d", existing.Executed) + } + save := false - // ignore if already deleted - if ok && origN.Meta().IsDeleted() { - ok = false - } - - if ok { - // existing notification - // only update select attributes - origN.Lock() - defer origN.Unlock() - } else { - // new notification (from external source): old == new - origN = n - save = true - } - - switch { - case n.SelectedActionID != "" && n.Responded == 0: - // select action, if not yet already handled - log.Tracef("notifications: selected action for %s: %s", n.ID, n.SelectedActionID) - origN.selectAndExecuteAction(n.SelectedActionID) - save = true - case origN.Executed == 0 && n.Executed != 0: + // check if the notification has been marked as + // "executed externally". + if n.Executed > 0 { log.Tracef("notifications: action for %s executed externally", n.ID) - origN.Executed = n.Executed + existing.Executed = n.Executed + save = true + + // in case the action has been executed immediately by the + // sender we may need to update the SelectedActionID and the + // Responded timestamp as well. Though, we guard the assignments + // with value checks so partial updates that only change the + // Executed property do not overwrite existing values. + if n.SelectedActionID != "" { + existing.SelectedActionID = n.SelectedActionID + } + if n.Responded != 0 { + existing.Responded = n.Responded + } + } + + if n.SelectedActionID != "" && existing.Responded == 0 { + log.Tracef("notifications: selected action for %s: %s", n.ID, n.SelectedActionID) + existing.selectAndExecuteAction(n.SelectedActionID) save = true } if save { - // we may be locking - go origN.Save() + existing.save(false) } + + return existing, nil } // Delete deletes a record from the database. diff --git a/notifications/notification.go b/notifications/notification.go index 78a8790..b867e08 100644 --- a/notifications/notification.go +++ b/notifications/notification.go @@ -5,17 +5,19 @@ import ( "sync" "time" - "github.com/safing/portbase/database" "github.com/safing/portbase/database/record" "github.com/safing/portbase/log" "github.com/safing/portbase/utils" ) +// Type describes the type of a notification. +type Type uint8 + // Notification types const ( - Info uint8 = 0 - Warning uint8 = 1 - Prompt uint8 = 2 + Info Type = 0 + Warning Type = 1 + Prompt Type = 2 ) // Notification represents a notification that is to be delivered to the user. @@ -29,13 +31,12 @@ type Notification struct { // MessageTemplate string // MessageData []string DataSubject sync.Locker - Type uint8 + Type Type - Persistent bool // this notification persists until it is handled and survives restarts - Created int64 // creation timestamp, notification "starts" - Expires int64 // expiry timestamp, notification is expected to be canceled at this time and may be cleaned up afterwards - Responded int64 // response timestamp, notification "ends" - Executed int64 // execution timestamp, notification will be deleted soon + Created int64 // creation timestamp, notification "starts" + Expires int64 // expiry timestamp, notification is expected to be canceled at this time and may be cleaned up afterwards + Responded int64 // response timestamp, notification "ends" + Executed int64 // execution timestamp, notification will be deleted soon AvailableActions []*Action SelectedActionID string @@ -52,8 +53,7 @@ type Action struct { Text string } -func noOpAction(n *Notification) { -} +func noOpAction(n *Notification) {} // Get returns the notification identifed by the given id or nil if it doesn't exist. func Get(id string) *Notification { @@ -87,7 +87,7 @@ func NotifyPrompt(id, msg string, actions ...Action) *Notification { return notify(Prompt, id, msg, actions...) } -func notify(nType uint8, id string, msg string, actions ...Action) *Notification { +func notify(nType Type, id string, msg string, actions ...Action) *Notification { acts := make([]*Action, len(actions)) for idx := range actions { a := actions[idx] @@ -110,11 +110,13 @@ func notify(nType uint8, id string, msg string, actions ...Action) *Notification // Save saves the notification and returns it. func (n *Notification) Save() *Notification { - notsLock.Lock() - defer notsLock.Unlock() n.Lock() defer n.Unlock() + return n.save(true) +} + +func (n *Notification) save(pushUpdate bool) *Notification { // initialize if n.Created == 0 { n.Created = time.Now().Unix() @@ -139,39 +141,16 @@ func (n *Notification) Save() *Notification { n.SetKey(fmt.Sprintf("notifications:all/%s", n.ID)) } - // update meta n.UpdateMeta() - // assign to data map + // store the notification inside or map + notsLock.Lock() nots[n.ID] = n + notsLock.Unlock() - // push update - log.Tracef("notifications: pushing update for %s to subscribers", n.Key()) - dbController.PushUpdate(n) - - // persist - if n.Persistent && persistentBasePath != "" { - duplicate := &Notification{ - ID: n.ID, - Message: n.Message, - DataSubject: n.DataSubject, - AvailableActions: duplicateActions(n.AvailableActions), - SelectedActionID: n.SelectedActionID, - Persistent: n.Persistent, - Created: n.Created, - Expires: n.Expires, - Responded: n.Responded, - Executed: n.Executed, - } - duplicate.SetMeta(n.Meta().Duplicate()) - key := fmt.Sprintf("%s/%s", persistentBasePath, n.ID) - duplicate.SetKey(key) - go func() { - err := dbInterface.Put(duplicate) - if err != nil { - log.Warningf("notifications: failed to persist notification %s: %s", key, err) - } - }() + if pushUpdate { + log.Tracef("notifications: pushing update for %s to subscribers", n.Key()) + dbController.PushUpdate(n) } return n @@ -200,17 +179,12 @@ func (n *Notification) Response() <-chan string { // Update updates/resends a notification if it was not already responded to. func (n *Notification) Update(expires int64) { - responded := true n.lock.Lock() - if n.Responded == 0 { - responded = false - n.Expires = expires - } - n.lock.Unlock() + defer n.lock.Unlock() - // save if not yet responded - if !responded { - n.Save() + if n.Responded == 0 { + n.Expires = expires + n.save(true) } } @@ -236,15 +210,6 @@ func (n *Notification) Delete() error { // push update dbController.PushUpdate(n) - // delete from persistent storage - if n.Persistent && persistentBasePath != "" { - key := fmt.Sprintf("%s/%s", persistentBasePath, n.ID) - err := dbInterface.Delete(key) - if err != nil && err != database.ErrNotFound { - return fmt.Errorf("failed to delete persisted notification %s from database: %s", key, err) - } - } - return nil }