mirror of
https://github.com/safing/portbase
synced 2025-09-01 18:19:57 +00:00
Drop support for peristent notifications. Fixes #71
This path drops support for persistent notifications as they were always broken and not used anyway (see #71). As a result, the locking strategy of the injected notification backend has been updated and should improve database push updates (i.e. the "upd" is now correct and no additional "new" updates with partial data will be sent anymore)
This commit is contained in:
parent
c479430d46
commit
1810855f64
2 changed files with 86 additions and 115 deletions
|
@ -19,9 +19,6 @@ var (
|
|||
notsLock sync.RWMutex
|
||||
|
||||
dbController *database.Controller
|
||||
dbInterface *database.Interface
|
||||
|
||||
persistentBasePath string
|
||||
)
|
||||
|
||||
// Storage interface errors
|
||||
|
@ -31,13 +28,6 @@ var (
|
|||
ErrNoDelete = errors.New("notifications may not be deleted, they must be handled")
|
||||
)
|
||||
|
||||
// SetPersistenceBasePath sets the base path for persisting persistent notifications.
|
||||
func SetPersistenceBasePath(dbBasePath string) {
|
||||
if persistentBasePath == "" {
|
||||
persistentBasePath = dbBasePath
|
||||
}
|
||||
}
|
||||
|
||||
// StorageInterface provices a storage.Interface to the configuration manager.
|
||||
type StorageInterface struct {
|
||||
storage.InjectBase
|
||||
|
@ -69,19 +59,18 @@ func (s *StorageInterface) Get(key string) (record.Record, error) {
|
|||
defer notsLock.RUnlock()
|
||||
|
||||
// transform key
|
||||
if strings.HasPrefix(key, "all/") {
|
||||
key = strings.TrimPrefix(key, "all/")
|
||||
} else {
|
||||
if !strings.HasPrefix(key, "all/") {
|
||||
return nil, storage.ErrNotFound
|
||||
}
|
||||
key = strings.TrimPrefix(key, "all/")
|
||||
|
||||
// get notification
|
||||
not, ok := nots[key]
|
||||
if ok {
|
||||
return not, nil
|
||||
}
|
||||
if !ok {
|
||||
return nil, storage.ErrNotFound
|
||||
}
|
||||
return not, nil
|
||||
}
|
||||
|
||||
// Query returns a an iterator for the supplied query.
|
||||
func (s *StorageInterface) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
|
||||
|
@ -103,7 +92,12 @@ func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) {
|
|||
}
|
||||
|
||||
if q.MatchesKey(n.DatabaseKey()) && q.MatchesRecord(n) {
|
||||
it.Next <- n
|
||||
select {
|
||||
case it.Next <- n:
|
||||
case <-it.Done:
|
||||
// make sure we don't leak this goroutine if the iterator get's cancelled
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,56 +121,68 @@ func (s *StorageInterface) Put(r record.Record) (record.Record, error) {
|
|||
return nil, ErrInvalidPath
|
||||
}
|
||||
|
||||
// continue in goroutine
|
||||
go UpdateNotification(n, key)
|
||||
return applyUpdate(n, key)
|
||||
}
|
||||
|
||||
func applyUpdate(n *Notification, key string) (*Notification, error) {
|
||||
// separate goroutine in order to correctly lock notsLock
|
||||
notsLock.RLock()
|
||||
existing, ok := nots[key]
|
||||
notsLock.RUnlock()
|
||||
|
||||
// ignore if already deleted
|
||||
|
||||
if !ok || existing.Meta().IsDeleted() {
|
||||
// this is a completely new notification
|
||||
// we pass pushUpdate==false because the storage
|
||||
// controller will push an update on put anyway.
|
||||
n.save(false)
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// UpdateNotification updates a notification with input from a database action. Notification will not be saved/propagated if there is no valid change.
|
||||
func UpdateNotification(n *Notification, key string) {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
existing.Lock()
|
||||
defer existing.Unlock()
|
||||
|
||||
// separate goroutine in order to correctly lock notsLock
|
||||
notsLock.RLock()
|
||||
origN, ok := nots[key]
|
||||
notsLock.RUnlock()
|
||||
// A notification can only be updated to select and execute the
|
||||
// notification action. If the existing one already has an
|
||||
// Executed timestamp this update request is invalid
|
||||
if existing.Executed > 0 {
|
||||
return existing, fmt.Errorf("action already executed at %d", existing.Executed)
|
||||
}
|
||||
|
||||
save := false
|
||||
|
||||
// ignore if already deleted
|
||||
if ok && origN.Meta().IsDeleted() {
|
||||
ok = false
|
||||
}
|
||||
|
||||
if ok {
|
||||
// existing notification
|
||||
// only update select attributes
|
||||
origN.Lock()
|
||||
defer origN.Unlock()
|
||||
} else {
|
||||
// new notification (from external source): old == new
|
||||
origN = n
|
||||
save = true
|
||||
}
|
||||
|
||||
switch {
|
||||
case n.SelectedActionID != "" && n.Responded == 0:
|
||||
// select action, if not yet already handled
|
||||
log.Tracef("notifications: selected action for %s: %s", n.ID, n.SelectedActionID)
|
||||
origN.selectAndExecuteAction(n.SelectedActionID)
|
||||
save = true
|
||||
case origN.Executed == 0 && n.Executed != 0:
|
||||
// check if the notification has been marked as
|
||||
// "executed externally".
|
||||
if n.Executed > 0 {
|
||||
log.Tracef("notifications: action for %s executed externally", n.ID)
|
||||
origN.Executed = n.Executed
|
||||
existing.Executed = n.Executed
|
||||
save = true
|
||||
|
||||
// in case the action has been executed immediately by the
|
||||
// sender we may need to update the SelectedActionID and the
|
||||
// Responded timestamp as well. Though, we guard the assignments
|
||||
// with value checks so partial updates that only change the
|
||||
// Executed property do not overwrite existing values.
|
||||
if n.SelectedActionID != "" {
|
||||
existing.SelectedActionID = n.SelectedActionID
|
||||
}
|
||||
if n.Responded != 0 {
|
||||
existing.Responded = n.Responded
|
||||
}
|
||||
}
|
||||
|
||||
if n.SelectedActionID != "" && existing.Responded == 0 {
|
||||
log.Tracef("notifications: selected action for %s: %s", n.ID, n.SelectedActionID)
|
||||
existing.selectAndExecuteAction(n.SelectedActionID)
|
||||
save = true
|
||||
}
|
||||
|
||||
if save {
|
||||
// we may be locking
|
||||
go origN.Save()
|
||||
existing.save(false)
|
||||
}
|
||||
|
||||
return existing, nil
|
||||
}
|
||||
|
||||
// Delete deletes a record from the database.
|
||||
|
|
|
@ -5,17 +5,19 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/safing/portbase/database"
|
||||
"github.com/safing/portbase/database/record"
|
||||
"github.com/safing/portbase/log"
|
||||
"github.com/safing/portbase/utils"
|
||||
)
|
||||
|
||||
// Type describes the type of a notification.
|
||||
type Type uint8
|
||||
|
||||
// Notification types
|
||||
const (
|
||||
Info uint8 = 0
|
||||
Warning uint8 = 1
|
||||
Prompt uint8 = 2
|
||||
Info Type = 0
|
||||
Warning Type = 1
|
||||
Prompt Type = 2
|
||||
)
|
||||
|
||||
// Notification represents a notification that is to be delivered to the user.
|
||||
|
@ -29,9 +31,8 @@ type Notification struct {
|
|||
// MessageTemplate string
|
||||
// MessageData []string
|
||||
DataSubject sync.Locker
|
||||
Type uint8
|
||||
Type Type
|
||||
|
||||
Persistent bool // this notification persists until it is handled and survives restarts
|
||||
Created int64 // creation timestamp, notification "starts"
|
||||
Expires int64 // expiry timestamp, notification is expected to be canceled at this time and may be cleaned up afterwards
|
||||
Responded int64 // response timestamp, notification "ends"
|
||||
|
@ -52,8 +53,7 @@ type Action struct {
|
|||
Text string
|
||||
}
|
||||
|
||||
func noOpAction(n *Notification) {
|
||||
}
|
||||
func noOpAction(n *Notification) {}
|
||||
|
||||
// Get returns the notification identifed by the given id or nil if it doesn't exist.
|
||||
func Get(id string) *Notification {
|
||||
|
@ -87,7 +87,7 @@ func NotifyPrompt(id, msg string, actions ...Action) *Notification {
|
|||
return notify(Prompt, id, msg, actions...)
|
||||
}
|
||||
|
||||
func notify(nType uint8, id string, msg string, actions ...Action) *Notification {
|
||||
func notify(nType Type, id string, msg string, actions ...Action) *Notification {
|
||||
acts := make([]*Action, len(actions))
|
||||
for idx := range actions {
|
||||
a := actions[idx]
|
||||
|
@ -110,11 +110,13 @@ func notify(nType uint8, id string, msg string, actions ...Action) *Notification
|
|||
|
||||
// Save saves the notification and returns it.
|
||||
func (n *Notification) Save() *Notification {
|
||||
notsLock.Lock()
|
||||
defer notsLock.Unlock()
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
|
||||
return n.save(true)
|
||||
}
|
||||
|
||||
func (n *Notification) save(pushUpdate bool) *Notification {
|
||||
// initialize
|
||||
if n.Created == 0 {
|
||||
n.Created = time.Now().Unix()
|
||||
|
@ -139,39 +141,16 @@ func (n *Notification) Save() *Notification {
|
|||
n.SetKey(fmt.Sprintf("notifications:all/%s", n.ID))
|
||||
}
|
||||
|
||||
// update meta
|
||||
n.UpdateMeta()
|
||||
|
||||
// assign to data map
|
||||
// store the notification inside or map
|
||||
notsLock.Lock()
|
||||
nots[n.ID] = n
|
||||
notsLock.Unlock()
|
||||
|
||||
// push update
|
||||
if pushUpdate {
|
||||
log.Tracef("notifications: pushing update for %s to subscribers", n.Key())
|
||||
dbController.PushUpdate(n)
|
||||
|
||||
// persist
|
||||
if n.Persistent && persistentBasePath != "" {
|
||||
duplicate := &Notification{
|
||||
ID: n.ID,
|
||||
Message: n.Message,
|
||||
DataSubject: n.DataSubject,
|
||||
AvailableActions: duplicateActions(n.AvailableActions),
|
||||
SelectedActionID: n.SelectedActionID,
|
||||
Persistent: n.Persistent,
|
||||
Created: n.Created,
|
||||
Expires: n.Expires,
|
||||
Responded: n.Responded,
|
||||
Executed: n.Executed,
|
||||
}
|
||||
duplicate.SetMeta(n.Meta().Duplicate())
|
||||
key := fmt.Sprintf("%s/%s", persistentBasePath, n.ID)
|
||||
duplicate.SetKey(key)
|
||||
go func() {
|
||||
err := dbInterface.Put(duplicate)
|
||||
if err != nil {
|
||||
log.Warningf("notifications: failed to persist notification %s: %s", key, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return n
|
||||
|
@ -200,17 +179,12 @@ func (n *Notification) Response() <-chan string {
|
|||
|
||||
// Update updates/resends a notification if it was not already responded to.
|
||||
func (n *Notification) Update(expires int64) {
|
||||
responded := true
|
||||
n.lock.Lock()
|
||||
if n.Responded == 0 {
|
||||
responded = false
|
||||
n.Expires = expires
|
||||
}
|
||||
n.lock.Unlock()
|
||||
defer n.lock.Unlock()
|
||||
|
||||
// save if not yet responded
|
||||
if !responded {
|
||||
n.Save()
|
||||
if n.Responded == 0 {
|
||||
n.Expires = expires
|
||||
n.save(true)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -236,15 +210,6 @@ func (n *Notification) Delete() error {
|
|||
// push update
|
||||
dbController.PushUpdate(n)
|
||||
|
||||
// delete from persistent storage
|
||||
if n.Persistent && persistentBasePath != "" {
|
||||
key := fmt.Sprintf("%s/%s", persistentBasePath, n.ID)
|
||||
err := dbInterface.Delete(key)
|
||||
if err != nil && err != database.ErrNotFound {
|
||||
return fmt.Errorf("failed to delete persisted notification %s from database: %s", key, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue