diff --git a/notifications/cleaner.go b/notifications/cleaner.go new file mode 100644 index 0000000..2befef1 --- /dev/null +++ b/notifications/cleaner.go @@ -0,0 +1,40 @@ +package notifications + +import ( + "time" +) + +func cleaner() { + shutdownWg.Add(1) + select { + case <-shutdownSignal: + shutdownWg.Done() + return + case <-time.After(1 * time.Minute): + cleanNotifications() + } +} + +func cleanNotifications() { + threshold := time.Now().Add(-2 * time.Minute).Unix() + maxThreshold := time.Now().Add(-72 * time.Hour).Unix() + + notsLock.Lock() + defer notsLock.Unlock() + + for _, n := range nots { + n.Lock() + if n.Expires != 0 && n.Expires < threshold || + n.Executed != 0 && n.Executed < threshold || + n.Created < maxThreshold { + + // delete + n.Meta().Delete() + delete(nots, n.ID) + + // save (ie. propagate delete) + go n.Save() + } + n.Unlock() + } +} diff --git a/notifications/database.go b/notifications/database.go new file mode 100644 index 0000000..6e8fcfe --- /dev/null +++ b/notifications/database.go @@ -0,0 +1,173 @@ +package notifications + +import ( + "errors" + "fmt" + "strings" + "sync" + + "github.com/Safing/portbase/database" + "github.com/Safing/portbase/database/iterator" + "github.com/Safing/portbase/database/query" + "github.com/Safing/portbase/database/record" + "github.com/Safing/portbase/database/storage" +) + +var ( + nots = make(map[string]*Notification) + notsLock sync.RWMutex + + dbController *database.Controller + dbInterface *database.Interface + + persistentBasePath string +) + +// Storage interface errors +var ( + ErrInvalidData = errors.New("invalid data, must be a notification object") + ErrInvalidPath = errors.New("invalid path") + ErrNoDelete = errors.New("notifications may not be deleted, they must be handled") +) + +// SetPersistenceBasePath sets the base path for persisting persistent notifications. +func SetPersistenceBasePath(dbBasePath string) { + if persistentBasePath == "" { + persistentBasePath = dbBasePath + } +} + +// StorageInterface provices a storage.Interface to the configuration manager. +type StorageInterface struct { + storage.InjectBase +} + +// Get returns a database record. +func (s *StorageInterface) Get(key string) (record.Record, error) { + notsLock.Lock() + defer notsLock.Unlock() + + // transform key + if strings.HasPrefix(key, "all/") { + key = strings.TrimPrefix(key, "all/") + } else { + return nil, storage.ErrNotFound + } + + // get notification + not, ok := nots[key] + if ok { + return not, nil + } + return nil, storage.ErrNotFound +} + +// Query returns a an iterator for the supplied query. +func (s *StorageInterface) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { + it := iterator.New() + go s.processQuery(q, it) + // TODO: check local and internal + + return it, nil +} + +func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) { + notsLock.Lock() + defer notsLock.Unlock() + + // send all notifications + for _, n := range nots { + if q.MatchesKey(n.DatabaseKey()) && q.MatchesRecord(n) { + it.Next <- n + } + } + + it.Finish(nil) +} + +func registerAsDatabase() error { + _, err := database.Register(&database.Database{ + Name: "notifications", + Description: "Notifications", + StorageType: "injected", + PrimaryAPI: "", + }) + if err != nil { + return err + } + + controller, err := database.InjectDatabase("notifications", &StorageInterface{}) + if err != nil { + return err + } + + dbController = controller + return nil +} + +// Put stores a record in the database. +func (s *StorageInterface) Put(r record.Record) error { + r.Lock() + key := r.DatabaseKey() + n, err := EnsureNotification(r) + r.Unlock() + + if err != nil { + return ErrInvalidData + } + + // transform key + if strings.HasPrefix(key, "all/") { + key = strings.TrimPrefix(key, "all/") + } else { + return ErrInvalidPath + } + + notsLock.Lock() + origN, ok := nots[key] + notsLock.Unlock() + + if ok { + n.Lock() + defer n.Unlock() + go origN.SelectAndExecuteAction(n.SelectedActionID) + } else { + // accept new notification as is + notsLock.Lock() + nots[key] = n + notsLock.Unlock() + } + + return nil +} + +// Delete deletes a record from the database. +func (s *StorageInterface) Delete(key string) error { + return ErrNoDelete +} + +// ReadOnly returns whether the database is read only. +func (s *StorageInterface) ReadOnly() bool { + return false +} + +// EnsureNotification ensures that the given record is a Notification and returns it. +func EnsureNotification(r record.Record) (*Notification, error) { + // unwrap + if r.IsWrapped() { + // only allocate a new struct, if we need it + new := &Notification{} + err := record.Unwrap(r, new) + if err != nil { + return nil, err + } + return new, nil + } + + // or adjust type + new, ok := r.(*Notification) + if !ok { + return nil, fmt.Errorf("record not of type *Example, but %T", r) + } + return new, nil +} diff --git a/notifications/doc.go b/notifications/doc.go new file mode 100644 index 0000000..29c3b53 --- /dev/null +++ b/notifications/doc.go @@ -0,0 +1,27 @@ +/* +Package notifications provides a notification system. + +Notification Lifecycle + +1. Create Notification with an ID and Message. +2. Set possible actions and save it. +3. When the user responds, the action is executed. + +Example + + // create notification + n := notifications.New("update-available", "A new update is available. Restart to upgrade.") + // set actions and save + n.AddAction("later", "Later").AddAction("restart", "Restart now!").Save() + + // wait for user action + selectedAction := <-n.Response() + switch selectedAction { + case "later": + log.Infof("user wants to upgrade later.") + case "restart": + log.Infof("user wants to restart now.") + } + +*/ +package notifications diff --git a/notifications/module.go b/notifications/module.go new file mode 100644 index 0000000..f8599ae --- /dev/null +++ b/notifications/module.go @@ -0,0 +1,32 @@ +package notifications + +import ( + "sync" + + "github.com/Safing/portbase/modules" +) + +var ( + shutdownSignal = make(chan struct{}) + shutdownWg sync.WaitGroup +) + +func init() { + modules.Register("notifications", nil, start, nil, "core") +} + +func start() error { + err := registerAsDatabase() + if err != nil { + return err + } + + go cleaner() + return nil +} + +func stop() error { + close(shutdownSignal) + shutdownWg.Wait() + return nil +} diff --git a/notifications/notification.go b/notifications/notification.go new file mode 100644 index 0000000..005d427 --- /dev/null +++ b/notifications/notification.go @@ -0,0 +1,236 @@ +package notifications + +import ( + "fmt" + "sync" + "time" + + "github.com/Safing/portbase/database/record" + "github.com/Safing/portbase/log" +) + +// Notification types +const ( + Info uint8 = 0 + Warning uint8 = 1 + Prompt uint8 = 2 +) + +// Notification represents a notification that is to be delivered to the user. +type Notification struct { + record.Base + + ID string + Message string + // MessageTemplate string + // MessageData []string + DataSubject sync.Locker + Type uint8 + + AvailableActions []*Action + SelectedActionID string + + Persistent bool // this notification persists until it is handled and survives restarts + Created int64 // creation timestamp, notification "starts" + Expires int64 // expiry timestamp, notification is expected to be canceled at this time and may be cleaned up afterwards + Responded int64 // response timestamp, notification "ends" + Executed int64 // execution timestamp, notification will be deleted soon + + lock sync.Mutex + actionFunction func(*Notification) // call function to process action + actionTrigger chan string // and/or send to a channel +} + +// Action describes an action that can be taken for a notification. +type Action struct { + ID string + Text string +} + +func noOpAction(n *Notification) { + return +} + +// Get returns the notification identifed by the given id or nil if it doesn't exist. +func Get(id string) *Notification { + notsLock.Lock() + defer notsLock.Unlock() + n, ok := nots[id] + if ok { + return n + } + return nil +} + +// New returns a new Notification +func (n *Notification) Init() *Notification { + n.Created = time.Now().Unix() + return n +} + +// Save saves the notification +func (n *Notification) Save() *Notification { + notsLock.Lock() + defer notsLock.Unlock() + n.Lock() + defer n.Unlock() + + // check key + if n.DatabaseKey() == "" { + n.SetKey(fmt.Sprintf("notifications:all/%s", n.ID)) + } + + // update meta + n.UpdateMeta() + + // assign to data map + nots[n.ID] = n + + // push update + dbController.PushUpdate(n) + + // persist + if n.Persistent && persistentBasePath != "" { + duplicate := &Notification{ + ID: n.ID, + Message: n.Message, + DataSubject: n.DataSubject, + AvailableActions: duplicateActions(n.AvailableActions), + SelectedActionID: n.SelectedActionID, + Persistent: n.Persistent, + Created: n.Created, + Expires: n.Expires, + Responded: n.Responded, + Executed: n.Executed, + } + duplicate.SetMeta(n.Meta().Duplicate()) + duplicate.SetKey(fmt.Sprintf("%s/%s", persistentBasePath, n.ID)) + go func() { + err := dbInterface.Put(duplicate) + if err != nil { + log.Warningf("notifications: failed to persist notification %s: %s", n.Key(), err) + } + }() + } + + return n +} + +// SetActionFunction sets a trigger function to be executed when the user reacted on the notification. +// The provided funtion will be started as its own goroutine and will have to lock everything it accesses, even the provided notification. +func (n *Notification) SetActionFunction(fn func(*Notification)) *Notification { + n.Lock() + defer n.Unlock() + n.actionFunction = fn + return n +} + +// MakeAck sets a default "OK" action and a no-op action function. +func (n *Notification) MakeAck() *Notification { + n.Lock() + defer n.Unlock() + + n.AvailableActions = []*Action{ + &Action{ + ID: "ack", + Text: "OK", + }, + } + n.Type = Info + n.actionFunction = noOpAction + + return n +} + +// Response waits for the user to respond to the notification and returns the selected action. +func (n *Notification) Response() <-chan string { + n.Lock() + defer n.Unlock() + + if n.actionTrigger == nil { + n.actionTrigger = make(chan string) + } + + return n.actionTrigger +} + +// Cancel (prematurely) destroys a notification. +func (n *Notification) Cancel() { + notsLock.Lock() + defer notsLock.Unlock() + n.Lock() + defer n.Unlock() + + // delete + n.Meta().Delete() + delete(nots, n.ID) + + // save (ie. propagate delete) + go n.Save() +} + +// SelectAndExecuteAction sets the user response and executes/triggers the action, if possible. +func (n *Notification) SelectAndExecuteAction(id string) { + n.Lock() + defer n.Unlock() + + // update selection + if n.Executed != 0 { + // we already executed + return + } + n.SelectedActionID = id + n.Responded = time.Now().Unix() + + // execute + executed := false + if n.actionFunction != nil { + go n.actionFunction(n) + executed = true + } + if n.actionTrigger != nil { + // satisfy all listeners + for { + select { + case n.actionTrigger <- n.SelectedActionID: + executed = true + default: + break + } + } + } + + // save execution time + if executed { + n.Executed = time.Now().Unix() + } + + go n.Save() +} + +// Lock locks the Notification and the DataSubject, if available. +func (n *Notification) Lock() { + n.lock.Lock() + if n.DataSubject != nil { + n.DataSubject.Lock() + } +} + +// Unlock unlocks the Notification and the DataSubject, if available. +func (n *Notification) Unlock() { + n.lock.Unlock() + if n.DataSubject != nil { + n.DataSubject.Unlock() + } +} + +func duplicateActions(original []*Action) (duplicate []*Action) { + duplicate = make([]*Action, len(original)) + for _, action := range original { + duplicate = append(duplicate, &Action{ + ID: action.ID, + Text: action.Text, + }) + } + return +}