diff --git a/notifications/cleaner.go b/notifications/cleaner.go index 2befef1..efd0408 100644 --- a/notifications/cleaner.go +++ b/notifications/cleaner.go @@ -2,6 +2,8 @@ package notifications import ( "time" + + "github.com/safing/portbase/log" ) func cleaner() { @@ -10,31 +12,55 @@ func cleaner() { case <-shutdownSignal: shutdownWg.Done() return - case <-time.After(1 * time.Minute): + case <-time.After(5 * time.Second): cleanNotifications() } } func cleanNotifications() { - threshold := time.Now().Add(-2 * time.Minute).Unix() - maxThreshold := time.Now().Add(-72 * time.Hour).Unix() + now := time.Now().Unix() + finishedThreshhold := time.Now().Add(-10 * time.Second).Unix() + executionTimelimit := time.Now().Add(-24 * time.Hour).Unix() + fallbackTimelimit := time.Now().Add(-72 * time.Hour).Unix() notsLock.Lock() defer notsLock.Unlock() for _, n := range nots { n.Lock() - if n.Expires != 0 && n.Expires < threshold || - n.Executed != 0 && n.Executed < threshold || - n.Created < maxThreshold { + switch { + case n.Executed != 0: // notification was fully handled + // wait for a short time before deleting + if n.Executed < finishedThreshhold { + go deleteNotification(n) + } + case n.Responded != 0: + // waiting for execution + if n.Responded < executionTimelimit { + go deleteNotification(n) + } + case n.Expires != 0: + // expired without response + if n.Expires < now { + go deleteNotification(n) + } + case n.Created != 0: + // fallback: delete after 3 days after creation + if n.Created < fallbackTimelimit { + go deleteNotification(n) - // delete - n.Meta().Delete() - delete(nots, n.ID) - - // save (ie. propagate delete) - go n.Save() + } + default: + // invalid, impossible to determine cleanup timeframe, delete now + go deleteNotification(n) } n.Unlock() } } + +func deleteNotification(n *Notification) { + err := n.Delete() + if err != nil { + log.Debugf("notifications: failed to delete %s: %s", n.ID, err) + } +} diff --git a/notifications/database.go b/notifications/database.go index 927ef72..2cfd29f 100644 --- a/notifications/database.go +++ b/notifications/database.go @@ -128,45 +128,59 @@ func (s *StorageInterface) Put(r record.Record) error { } // continue in goroutine - go updateNotificationFromDatabasePut(n, key) + go UpdateNotification(n, key) return nil } -func updateNotificationFromDatabasePut(n *Notification, key string) { +// UpdateNotification updates a notification with input from a database action. Notification will not be saved/propagated if there is no valid change. +func UpdateNotification(n *Notification, key string) { + n.Lock() + defer n.Unlock() + // seperate goroutine in order to correctly lock notsLock notsLock.RLock() origN, ok := nots[key] notsLock.RUnlock() + save := false + // ignore if already deleted if ok && origN.Meta().IsDeleted() { ok = false } if ok { - // existing notification, update selected action ID only - n.Lock() - defer n.Unlock() + // existing notification + // only update select attributes + origN.Lock() + defer origN.Unlock() } else { - // accept new notification as is - n.Save() - // set var for action processing + // new notification (from external source): old == new origN = n + save = true } - // select action, if not yet already handled - if n.SelectedActionID != "" && n.Responded == 0 { - log.Tracef("notifications: user selected action for %s: %s", n.ID, n.SelectedActionID) - origN.SelectAndExecuteAction(n.SelectedActionID) + switch { + case n.SelectedActionID != "" && n.Responded == 0: + // select action, if not yet already handled + log.Tracef("notifications: selected action for %s: %s", n.ID, n.SelectedActionID) + origN.selectAndExecuteAction(n.SelectedActionID) + save = true + case origN.Executed == 0 && n.Executed != 0: + log.Tracef("notifications: action for %s executed externally", n.ID) + origN.Executed = n.Executed + save = true + } + + if save { + // we may be locking + go origN.Save() } } // Delete deletes a record from the database. func (s *StorageInterface) Delete(key string) error { - notsLock.Lock() - defer notsLock.Unlock() - // transform key if strings.HasPrefix(key, "all/") { key = strings.TrimPrefix(key, "all/") @@ -175,18 +189,14 @@ func (s *StorageInterface) Delete(key string) error { } // get notification + notsLock.Lock() n, ok := nots[key] + notsLock.Unlock() if !ok { return storage.ErrNotFound } - n.Lock() - defer n.Unlock() - // delete - n.Meta().Delete() - delete(nots, n.ID) - - return nil + return n.Delete() } // ReadOnly returns whether the database is read only. diff --git a/notifications/notification.go b/notifications/notification.go index 5d51543..bb5fe58 100644 --- a/notifications/notification.go +++ b/notifications/notification.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/safing/portbase/database" "github.com/safing/portbase/database/record" "github.com/safing/portbase/log" @@ -43,6 +44,7 @@ type Notification struct { lock sync.Mutex actionFunction func(*Notification) // call function to process action actionTrigger chan string // and/or send to a channel + expiredTrigger chan struct{} // closed on expire } // Action describes an action that can be taken for a notification. @@ -80,7 +82,6 @@ func (n *Notification) Save() *Notification { if n.GUID == "" { n.GUID = uuid.NewV4().String() } - // check key if n.DatabaseKey() == "" { n.SetKey(fmt.Sprintf("notifications:all/%s", n.ID)) @@ -110,11 +111,12 @@ func (n *Notification) Save() *Notification { Executed: n.Executed, } duplicate.SetMeta(n.Meta().Duplicate()) - duplicate.SetKey(fmt.Sprintf("%s/%s", persistentBasePath, n.ID)) + key := fmt.Sprintf("%s/%s", persistentBasePath, n.ID) + duplicate.SetKey(key) go func() { err := dbInterface.Put(duplicate) if err != nil { - log.Warningf("notifications: failed to persist notification %s: %s", n.Key(), err) + log.Warningf("notifications: failed to persist notification %s: %s", key, err) } }() } @@ -151,42 +153,85 @@ func (n *Notification) MakeAck() *Notification { // Response waits for the user to respond to the notification and returns the selected action. func (n *Notification) Response() <-chan string { n.lock.Lock() - defer n.lock.Unlock() - if n.actionTrigger == nil { n.actionTrigger = make(chan string) } + n.lock.Unlock() return n.actionTrigger } -// Cancel (prematurely) destroys a notification. -func (n *Notification) Cancel() { +// Update updates/resends a notification if it was not already responded to. +func (n *Notification) Update(expires int64) { + responded := true + n.lock.Lock() + if n.Responded == 0 { + responded = false + n.Expires = expires + } + n.lock.Unlock() + + // save if not yet responded + if !responded { + n.Save() + } +} + +// Delete (prematurely) cancels and deletes a notification. +func (n *Notification) Delete() error { notsLock.Lock() defer notsLock.Unlock() n.Lock() defer n.Unlock() - // delete + // mark as deleted n.Meta().Delete() + + // delete from internal storage delete(nots, n.ID) - // save (ie. propagate delete) - go n.Save() + // close expired + if n.expiredTrigger != nil { + close(n.expiredTrigger) + n.expiredTrigger = nil + } + + // push update + dbController.PushUpdate(n) + + // delete from persistent storage + if n.Persistent && persistentBasePath != "" { + key := fmt.Sprintf("%s/%s", persistentBasePath, n.ID) + err := dbInterface.Delete(key) + if err != nil && err != database.ErrNotFound { + return fmt.Errorf("failed to delete persisted notification %s from database: %s", key, err) + } + } + + return nil } -// SelectAndExecuteAction sets the user response and executes/triggers the action, if possible. -func (n *Notification) SelectAndExecuteAction(id string) { - n.Lock() - defer n.Unlock() +// Expired notifies the caller when the notification has expired. +func (n *Notification) Expired() <-chan struct{} { + n.lock.Lock() + if n.expiredTrigger == nil { + n.expiredTrigger = make(chan struct{}) + } + n.lock.Unlock() - // update selection + return n.expiredTrigger +} + +// selectAndExecuteAction sets the user response and executes/triggers the action, if possible. +func (n *Notification) selectAndExecuteAction(id string) { + // abort if already executed if n.Executed != 0 { - // we already executed return } - n.SelectedActionID = id + + // set response n.Responded = time.Now().Unix() + n.SelectedActionID = id // execute executed := false @@ -201,7 +246,7 @@ func (n *Notification) SelectAndExecuteAction(id string) { select { case n.actionTrigger <- n.SelectedActionID: executed = true - default: + case <-time.After(100 * time.Millisecond): // mitigate race conditions break triggerAll } } @@ -211,8 +256,6 @@ func (n *Notification) SelectAndExecuteAction(id string) { if executed { n.Executed = time.Now().Unix() } - - go n.Save() } // AddDataSubject adds the data subject to the notification. This is the only way how a data subject should be added - it avoids locking problems.