Refactor notification system

This commit is contained in:
Patrick Pacher 2020-09-15 15:07:32 +02:00
parent 903447b65f
commit 4665ae8dca
No known key found for this signature in database
GPG key ID: E8CD2DA160925A6D
3 changed files with 134 additions and 108 deletions

View file

@ -7,62 +7,44 @@ import (
"github.com/safing/portbase/log" "github.com/safing/portbase/log"
) )
//nolint:unparam // must conform to interface
func cleaner(ctx context.Context) error { func cleaner(ctx context.Context) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
L:
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil break L
case <-time.After(5 * time.Second): case <-ticker.C:
cleanNotifications() deleteExpiredNotifs()
} }
} }
return nil
} }
func cleanNotifications() { func deleteExpiredNotifs() {
now := time.Now().Unix() now := time.Now().Unix()
finishedThreshhold := time.Now().Add(-10 * time.Second).Unix()
executionTimelimit := time.Now().Add(-24 * time.Hour).Unix()
fallbackTimelimit := time.Now().Add(-72 * time.Hour).Unix()
notsLock.Lock() notsLock.Lock()
defer notsLock.Unlock() defer notsLock.Unlock()
toDelete := make([]*Notification, 0, len(nots))
for _, n := range nots { for _, n := range nots {
n.Lock() n.Lock()
switch { if now > n.Expires {
case n.Executed != 0: // notification was fully handled toDelete = append(toDelete, n)
// wait for a short time before deleting
if n.Executed < finishedThreshhold {
go deleteNotification(n)
}
case n.Responded != 0:
// waiting for execution
if n.Responded < executionTimelimit {
go deleteNotification(n)
}
case n.Expires != 0:
// expired without response
if n.Expires < now {
go deleteNotification(n)
}
case n.Created != 0:
// fallback: delete after 3 days after creation
if n.Created < fallbackTimelimit {
go deleteNotification(n)
}
default:
// invalid, impossible to determine cleanup timeframe, delete now
go deleteNotification(n)
} }
n.Unlock() n.Unlock()
} }
}
func deleteNotification(n *Notification) { for _, n := range toDelete {
err := n.Delete() n.Lock()
if err != nil { err := n.delete(true)
log.Debugf("notifications: failed to delete %s: %s", n.ID, err) n.Unlock()
if err != nil {
log.Debugf("notifications: failed to delete %s: %s", n.EventID, err)
}
} }
} }

View file

@ -143,37 +143,31 @@ func applyUpdate(n *Notification, key string) (*Notification, error) {
existing.Lock() existing.Lock()
defer existing.Unlock() defer existing.Unlock()
// A notification can only be updated to select and execute the if existing.State == Executed {
// notification action. If the existing one already has an return existing, fmt.Errorf("action already executed")
// Executed timestamp this update request is invalid
if existing.Executed > 0 {
return existing, fmt.Errorf("action already executed at %d", existing.Executed)
} }
save := false save := false
// check if the notification has been marked as // check if the notification has been marked as
// "executed externally". // "executed externally".
if n.Executed > 0 { if n.State == Executed {
log.Tracef("notifications: action for %s executed externally", n.ID) log.Tracef("notifications: action for %s executed externally", n.EventID)
existing.Executed = n.Executed existing.State = Executed
save = true save = true
// in case the action has been executed immediately by the // in case the action has been executed immediately by the
// sender we may need to update the SelectedActionID and the // sender we may need to update the SelectedActionID.
// Responded timestamp as well. Though, we guard the assignments // Though, we guard the assignments with value check
// with value checks so partial updates that only change the // so partial updates that only change the
// Executed property do not overwrite existing values. // State property do not overwrite existing values.
if n.SelectedActionID != "" { if n.SelectedActionID != "" {
existing.SelectedActionID = n.SelectedActionID existing.SelectedActionID = n.SelectedActionID
} }
if n.Responded != 0 {
existing.Responded = n.Responded
}
} }
if n.SelectedActionID != "" && existing.Responded == 0 { if n.SelectedActionID != "" && existing.State == Active {
log.Tracef("notifications: selected action for %s: %s", n.ID, n.SelectedActionID) log.Tracef("notifications: selected action for %s: %s", n.EventID, n.SelectedActionID)
existing.selectAndExecuteAction(n.SelectedActionID) existing.selectAndExecuteAction(n.SelectedActionID)
save = true save = true
} }
@ -188,21 +182,23 @@ func applyUpdate(n *Notification, key string) (*Notification, error) {
// Delete deletes a record from the database. // Delete deletes a record from the database.
func (s *StorageInterface) Delete(key string) error { func (s *StorageInterface) Delete(key string) error {
// transform key // transform key
if strings.HasPrefix(key, "all/") { if !strings.HasPrefix(key, "all/") {
key = strings.TrimPrefix(key, "all/")
} else {
return storage.ErrNotFound return storage.ErrNotFound
} }
key = strings.TrimPrefix(key, "all/")
// get notification
notsLock.Lock() notsLock.Lock()
defer notsLock.Unlock()
n, ok := nots[key] n, ok := nots[key]
notsLock.Unlock()
if !ok { if !ok {
return storage.ErrNotFound return storage.ErrNotFound
} }
// delete
return n.Delete() n.Lock()
defer n.Unlock()
return n.delete(true)
} }
// ReadOnly returns whether the database is read only. // ReadOnly returns whether the database is read only.

View file

@ -20,25 +20,64 @@ const (
Prompt Type = 2 Prompt Type = 2
) )
// State describes the state of a notification.
type State string
// Possible notification states.
// State transitions can only happen from top to bottom.
const (
// Active describes a notification that is active, no expired and,
// if actions are available, still waits for the user to select an
// action.
Active State = "active"
// Responded describes a notification where the user has already
// selected which action to take but that action is still to be
// performed.
Responded State = "responded"
// Executes describes a notification where the user has selected
// and action and that action has been performed.
Executed State = "executed"
)
// Notification represents a notification that is to be delivered to the user. // Notification represents a notification that is to be delivered to the user.
type Notification struct { type Notification struct {
record.Base record.Base
// EventID is used to identify a specific notification. It consists of
ID string // the module name and a per-module unique event id.
// The following format is recommended:
// <module-id>:<event-id>
EventID string
// GUID is a unique identifier for each notification instance. That is
// two notifications with the same EventID must still have unique GUIDs.
// The GUID is mainly used for system (Windows) integration and is
// automatically populated by the notification package. Average users
// don't need to care about this field.
GUID string GUID string
// Type is the notification type. It can be one of Info, Warning or Prompt.
Type Type
// Message is the default message shown to the user if no localized version
// of the notification is available. Note that the message should already
// have any paramerized values replaced.
Message string Message string
// MessageTemplate string // EventData contains an additional payload for the notification. This payload
// MessageData []string // may contain contextual data and may be used by a localization framework
DataSubject sync.Locker // to populate the notification message template.
Type Type // If EventData implements sync.Locker it will be locked and unlocked together with the
// notification. Otherwise, EventData is expected to be immutable once the
Created int64 // creation timestamp, notification "starts" // notification has been saved and handed over to the notification or database package.
Expires int64 // expiry timestamp, notification is expected to be canceled at this time and may be cleaned up afterwards EventData interface{}
Responded int64 // response timestamp, notification "ends" // Expires holds the unix epoch timestamp at which the notification expires
Executed int64 // execution timestamp, notification will be deleted soon // and can be cleaned up.
// Users can safely ignore expired notifications and should handle expiry the
// same as deletion.
Expires int64
// State describes the current state of a notification. See State for
// a list of available values and their meaning.
State State
// AvailableActions defines a list of actions that a user can choose from.
AvailableActions []*Action AvailableActions []*Action
// SelectedActionID is updated to match the ID of one of the AvailableActions
// based on the user selection.
SelectedActionID string SelectedActionID string
lock sync.Mutex lock sync.Mutex
@ -99,7 +138,7 @@ func notify(nType Type, id string, msg string, actions ...Action) *Notification
} }
n := Notification{ n := Notification{
ID: id, EventID: id,
Message: msg, Message: msg,
Type: nType, Type: nType,
AvailableActions: acts, AvailableActions: acts,
@ -117,12 +156,8 @@ func (n *Notification) Save() *Notification {
} }
func (n *Notification) save(pushUpdate bool) *Notification { func (n *Notification) save(pushUpdate bool) *Notification {
// initialize
if n.Created == 0 {
n.Created = time.Now().Unix()
}
if n.GUID == "" { if n.GUID == "" {
n.GUID = utils.RandomUUID(n.ID).String() n.GUID = utils.RandomUUID(n.EventID).String()
} }
// make ack notification if there are no defined actions // make ack notification if there are no defined actions
@ -136,16 +171,21 @@ func (n *Notification) save(pushUpdate bool) *Notification {
n.actionFunction = noOpAction n.actionFunction = noOpAction
} }
// Make sure we always have a reasonable expiration set.
if n.Expires == 0 {
n.Expires = time.Now().Add(72 * time.Hour).Unix()
}
// check key // check key
if n.DatabaseKey() == "" { if n.DatabaseKey() == "" {
n.SetKey(fmt.Sprintf("notifications:all/%s", n.ID)) n.SetKey(fmt.Sprintf("notifications:all/%s", n.EventID))
} }
n.UpdateMeta() n.UpdateMeta()
// store the notification inside or map // store the notification inside or map
notsLock.Lock() notsLock.Lock()
nots[n.ID] = n nots[n.EventID] = n
notsLock.Unlock() notsLock.Unlock()
if pushUpdate { if pushUpdate {
@ -182,7 +222,7 @@ func (n *Notification) Update(expires int64) {
n.lock.Lock() n.lock.Lock()
defer n.lock.Unlock() defer n.lock.Unlock()
if n.Responded == 0 { if n.State == Active {
n.Expires = expires n.Expires = expires
n.save(true) n.save(true)
} }
@ -195,11 +235,15 @@ func (n *Notification) Delete() error {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
return n.delete(true)
}
func (n *Notification) delete(pushUpdate bool) error {
// mark as deleted // mark as deleted
n.Meta().Delete() n.Meta().Delete()
// delete from internal storage // delete from internal storage
delete(nots, n.ID) delete(nots, n.EventID)
// close expired // close expired
if n.expiredTrigger != nil { if n.expiredTrigger != nil {
@ -207,8 +251,9 @@ func (n *Notification) Delete() error {
n.expiredTrigger = nil n.expiredTrigger = nil
} }
// push update if pushUpdate {
dbController.PushUpdate(n) dbController.PushUpdate(n)
}
return nil return nil
} }
@ -227,23 +272,27 @@ func (n *Notification) Expired() <-chan struct{} {
// selectAndExecuteAction sets the user response and executes/triggers the action, if possible. // selectAndExecuteAction sets the user response and executes/triggers the action, if possible.
func (n *Notification) selectAndExecuteAction(id string) { func (n *Notification) selectAndExecuteAction(id string) {
// abort if already executed if n.State != Active {
if n.Executed != 0 {
return return
} }
// set response n.State = Responded
n.Responded = time.Now().Unix()
n.SelectedActionID = id n.SelectedActionID = id
// execute
executed := false executed := false
if n.actionFunction != nil { if n.actionFunction != nil {
go n.actionFunction(n) go n.actionFunction(n)
executed = true executed = true
} }
if n.actionTrigger != nil { if n.actionTrigger != nil {
// satisfy all listeners // satisfy all listeners (if they are listening)
// TODO(ppacher): if we miss to notify the waiter here (because
// nobody is listeing on actionTrigger) we wil likely
// never be able to execute the action again (simply because
// we won't try). May consider replacing the single actionTrigger
// channel with a per-listener (buffered) one so we just send
// the value and close the channel.
triggerAll: triggerAll:
for { for {
select { select {
@ -255,32 +304,31 @@ func (n *Notification) selectAndExecuteAction(id string) {
} }
} }
// save execution time
if executed { if executed {
n.Executed = time.Now().Unix() n.State = Executed
} }
} }
// AddDataSubject adds the data subject to the notification. This is the only way how a data subject should be added - it avoids locking problems. // Lock locks the Notification. If EventData is set and
func (n *Notification) AddDataSubject(ds sync.Locker) { // implements sync.Locker it is locked as well. Users that
n.lock.Lock() // want to replace the EventData on a notification must
defer n.lock.Unlock() // ensure to unlock the current value on their own. If the
n.DataSubject = ds // new EventData implements sync.Locker as well, it must
} // be locked prior to unlocking the notification.
// Lock locks the Notification and the DataSubject, if available.
func (n *Notification) Lock() { func (n *Notification) Lock() {
n.lock.Lock() n.lock.Lock()
if n.DataSubject != nil { if locker, ok := n.EventData.(sync.Locker); ok {
n.DataSubject.Lock() locker.Lock()
} }
} }
// Unlock unlocks the Notification and the DataSubject, if available. // Unlock unlocks the Notification and the EventData, if
// it implements sync.Locker. See Lock() for more information
// on how to replace and work with EventData.
func (n *Notification) Unlock() { func (n *Notification) Unlock() {
n.lock.Unlock() n.lock.Unlock()
if n.DataSubject != nil { if locker, ok := n.EventData.(sync.Locker); ok {
n.DataSubject.Unlock() locker.Unlock()
} }
} }