Add notifications system

This commit is contained in:
Daniel 2019-03-08 23:19:26 +01:00
parent 601b1384d1
commit 73154af3f9
5 changed files with 508 additions and 0 deletions

40
notifications/cleaner.go Normal file
View file

@ -0,0 +1,40 @@
package notifications
import (
"time"
)
func cleaner() {
shutdownWg.Add(1)
select {
case <-shutdownSignal:
shutdownWg.Done()
return
case <-time.After(1 * time.Minute):
cleanNotifications()
}
}
func cleanNotifications() {
threshold := time.Now().Add(-2 * time.Minute).Unix()
maxThreshold := time.Now().Add(-72 * time.Hour).Unix()
notsLock.Lock()
defer notsLock.Unlock()
for _, n := range nots {
n.Lock()
if n.Expires != 0 && n.Expires < threshold ||
n.Executed != 0 && n.Executed < threshold ||
n.Created < maxThreshold {
// delete
n.Meta().Delete()
delete(nots, n.ID)
// save (ie. propagate delete)
go n.Save()
}
n.Unlock()
}
}

173
notifications/database.go Normal file
View file

@ -0,0 +1,173 @@
package notifications
import (
"errors"
"fmt"
"strings"
"sync"
"github.com/Safing/portbase/database"
"github.com/Safing/portbase/database/iterator"
"github.com/Safing/portbase/database/query"
"github.com/Safing/portbase/database/record"
"github.com/Safing/portbase/database/storage"
)
var (
nots = make(map[string]*Notification)
notsLock sync.RWMutex
dbController *database.Controller
dbInterface *database.Interface
persistentBasePath string
)
// Storage interface errors
var (
ErrInvalidData = errors.New("invalid data, must be a notification object")
ErrInvalidPath = errors.New("invalid path")
ErrNoDelete = errors.New("notifications may not be deleted, they must be handled")
)
// SetPersistenceBasePath sets the base path for persisting persistent notifications.
func SetPersistenceBasePath(dbBasePath string) {
if persistentBasePath == "" {
persistentBasePath = dbBasePath
}
}
// StorageInterface provices a storage.Interface to the configuration manager.
type StorageInterface struct {
storage.InjectBase
}
// Get returns a database record.
func (s *StorageInterface) Get(key string) (record.Record, error) {
notsLock.Lock()
defer notsLock.Unlock()
// transform key
if strings.HasPrefix(key, "all/") {
key = strings.TrimPrefix(key, "all/")
} else {
return nil, storage.ErrNotFound
}
// get notification
not, ok := nots[key]
if ok {
return not, nil
}
return nil, storage.ErrNotFound
}
// Query returns a an iterator for the supplied query.
func (s *StorageInterface) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
it := iterator.New()
go s.processQuery(q, it)
// TODO: check local and internal
return it, nil
}
func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) {
notsLock.Lock()
defer notsLock.Unlock()
// send all notifications
for _, n := range nots {
if q.MatchesKey(n.DatabaseKey()) && q.MatchesRecord(n) {
it.Next <- n
}
}
it.Finish(nil)
}
func registerAsDatabase() error {
_, err := database.Register(&database.Database{
Name: "notifications",
Description: "Notifications",
StorageType: "injected",
PrimaryAPI: "",
})
if err != nil {
return err
}
controller, err := database.InjectDatabase("notifications", &StorageInterface{})
if err != nil {
return err
}
dbController = controller
return nil
}
// Put stores a record in the database.
func (s *StorageInterface) Put(r record.Record) error {
r.Lock()
key := r.DatabaseKey()
n, err := EnsureNotification(r)
r.Unlock()
if err != nil {
return ErrInvalidData
}
// transform key
if strings.HasPrefix(key, "all/") {
key = strings.TrimPrefix(key, "all/")
} else {
return ErrInvalidPath
}
notsLock.Lock()
origN, ok := nots[key]
notsLock.Unlock()
if ok {
n.Lock()
defer n.Unlock()
go origN.SelectAndExecuteAction(n.SelectedActionID)
} else {
// accept new notification as is
notsLock.Lock()
nots[key] = n
notsLock.Unlock()
}
return nil
}
// Delete deletes a record from the database.
func (s *StorageInterface) Delete(key string) error {
return ErrNoDelete
}
// ReadOnly returns whether the database is read only.
func (s *StorageInterface) ReadOnly() bool {
return false
}
// EnsureNotification ensures that the given record is a Notification and returns it.
func EnsureNotification(r record.Record) (*Notification, error) {
// unwrap
if r.IsWrapped() {
// only allocate a new struct, if we need it
new := &Notification{}
err := record.Unwrap(r, new)
if err != nil {
return nil, err
}
return new, nil
}
// or adjust type
new, ok := r.(*Notification)
if !ok {
return nil, fmt.Errorf("record not of type *Example, but %T", r)
}
return new, nil
}

27
notifications/doc.go Normal file
View file

@ -0,0 +1,27 @@
/*
Package notifications provides a notification system.
Notification Lifecycle
1. Create Notification with an ID and Message.
2. Set possible actions and save it.
3. When the user responds, the action is executed.
Example
// create notification
n := notifications.New("update-available", "A new update is available. Restart to upgrade.")
// set actions and save
n.AddAction("later", "Later").AddAction("restart", "Restart now!").Save()
// wait for user action
selectedAction := <-n.Response()
switch selectedAction {
case "later":
log.Infof("user wants to upgrade later.")
case "restart":
log.Infof("user wants to restart now.")
}
*/
package notifications

32
notifications/module.go Normal file
View file

@ -0,0 +1,32 @@
package notifications
import (
"sync"
"github.com/Safing/portbase/modules"
)
var (
shutdownSignal = make(chan struct{})
shutdownWg sync.WaitGroup
)
func init() {
modules.Register("notifications", nil, start, nil, "core")
}
func start() error {
err := registerAsDatabase()
if err != nil {
return err
}
go cleaner()
return nil
}
func stop() error {
close(shutdownSignal)
shutdownWg.Wait()
return nil
}

View file

@ -0,0 +1,236 @@
package notifications
import (
"fmt"
"sync"
"time"
"github.com/Safing/portbase/database/record"
"github.com/Safing/portbase/log"
)
// Notification types
const (
Info uint8 = 0
Warning uint8 = 1
Prompt uint8 = 2
)
// Notification represents a notification that is to be delivered to the user.
type Notification struct {
record.Base
ID string
Message string
// MessageTemplate string
// MessageData []string
DataSubject sync.Locker
Type uint8
AvailableActions []*Action
SelectedActionID string
Persistent bool // this notification persists until it is handled and survives restarts
Created int64 // creation timestamp, notification "starts"
Expires int64 // expiry timestamp, notification is expected to be canceled at this time and may be cleaned up afterwards
Responded int64 // response timestamp, notification "ends"
Executed int64 // execution timestamp, notification will be deleted soon
lock sync.Mutex
actionFunction func(*Notification) // call function to process action
actionTrigger chan string // and/or send to a channel
}
// Action describes an action that can be taken for a notification.
type Action struct {
ID string
Text string
}
func noOpAction(n *Notification) {
return
}
// Get returns the notification identifed by the given id or nil if it doesn't exist.
func Get(id string) *Notification {
notsLock.Lock()
defer notsLock.Unlock()
n, ok := nots[id]
if ok {
return n
}
return nil
}
// New returns a new Notification
func (n *Notification) Init() *Notification {
n.Created = time.Now().Unix()
return n
}
// Save saves the notification
func (n *Notification) Save() *Notification {
notsLock.Lock()
defer notsLock.Unlock()
n.Lock()
defer n.Unlock()
// check key
if n.DatabaseKey() == "" {
n.SetKey(fmt.Sprintf("notifications:all/%s", n.ID))
}
// update meta
n.UpdateMeta()
// assign to data map
nots[n.ID] = n
// push update
dbController.PushUpdate(n)
// persist
if n.Persistent && persistentBasePath != "" {
duplicate := &Notification{
ID: n.ID,
Message: n.Message,
DataSubject: n.DataSubject,
AvailableActions: duplicateActions(n.AvailableActions),
SelectedActionID: n.SelectedActionID,
Persistent: n.Persistent,
Created: n.Created,
Expires: n.Expires,
Responded: n.Responded,
Executed: n.Executed,
}
duplicate.SetMeta(n.Meta().Duplicate())
duplicate.SetKey(fmt.Sprintf("%s/%s", persistentBasePath, n.ID))
go func() {
err := dbInterface.Put(duplicate)
if err != nil {
log.Warningf("notifications: failed to persist notification %s: %s", n.Key(), err)
}
}()
}
return n
}
// SetActionFunction sets a trigger function to be executed when the user reacted on the notification.
// The provided funtion will be started as its own goroutine and will have to lock everything it accesses, even the provided notification.
func (n *Notification) SetActionFunction(fn func(*Notification)) *Notification {
n.Lock()
defer n.Unlock()
n.actionFunction = fn
return n
}
// MakeAck sets a default "OK" action and a no-op action function.
func (n *Notification) MakeAck() *Notification {
n.Lock()
defer n.Unlock()
n.AvailableActions = []*Action{
&Action{
ID: "ack",
Text: "OK",
},
}
n.Type = Info
n.actionFunction = noOpAction
return n
}
// Response waits for the user to respond to the notification and returns the selected action.
func (n *Notification) Response() <-chan string {
n.Lock()
defer n.Unlock()
if n.actionTrigger == nil {
n.actionTrigger = make(chan string)
}
return n.actionTrigger
}
// Cancel (prematurely) destroys a notification.
func (n *Notification) Cancel() {
notsLock.Lock()
defer notsLock.Unlock()
n.Lock()
defer n.Unlock()
// delete
n.Meta().Delete()
delete(nots, n.ID)
// save (ie. propagate delete)
go n.Save()
}
// SelectAndExecuteAction sets the user response and executes/triggers the action, if possible.
func (n *Notification) SelectAndExecuteAction(id string) {
n.Lock()
defer n.Unlock()
// update selection
if n.Executed != 0 {
// we already executed
return
}
n.SelectedActionID = id
n.Responded = time.Now().Unix()
// execute
executed := false
if n.actionFunction != nil {
go n.actionFunction(n)
executed = true
}
if n.actionTrigger != nil {
// satisfy all listeners
for {
select {
case n.actionTrigger <- n.SelectedActionID:
executed = true
default:
break
}
}
}
// save execution time
if executed {
n.Executed = time.Now().Unix()
}
go n.Save()
}
// Lock locks the Notification and the DataSubject, if available.
func (n *Notification) Lock() {
n.lock.Lock()
if n.DataSubject != nil {
n.DataSubject.Lock()
}
}
// Unlock unlocks the Notification and the DataSubject, if available.
func (n *Notification) Unlock() {
n.lock.Unlock()
if n.DataSubject != nil {
n.DataSubject.Unlock()
}
}
func duplicateActions(original []*Action) (duplicate []*Action) {
duplicate = make([]*Action, len(original))
for _, action := range original {
duplicate = append(duplicate, &Action{
ID: action.ID,
Text: action.Text,
})
}
return
}