mirror of
https://github.com/safing/portbase
synced 2025-09-01 10:09:50 +00:00
Clean up / revamp notification handling
This commit is contained in:
parent
3890bf68f3
commit
090669728d
3 changed files with 133 additions and 54 deletions
|
@ -2,6 +2,8 @@ package notifications
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
func cleaner() {
|
func cleaner() {
|
||||||
|
@ -10,31 +12,55 @@ func cleaner() {
|
||||||
case <-shutdownSignal:
|
case <-shutdownSignal:
|
||||||
shutdownWg.Done()
|
shutdownWg.Done()
|
||||||
return
|
return
|
||||||
case <-time.After(1 * time.Minute):
|
case <-time.After(5 * time.Second):
|
||||||
cleanNotifications()
|
cleanNotifications()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func cleanNotifications() {
|
func cleanNotifications() {
|
||||||
threshold := time.Now().Add(-2 * time.Minute).Unix()
|
now := time.Now().Unix()
|
||||||
maxThreshold := time.Now().Add(-72 * time.Hour).Unix()
|
finishedThreshhold := time.Now().Add(-10 * time.Second).Unix()
|
||||||
|
executionTimelimit := time.Now().Add(-24 * time.Hour).Unix()
|
||||||
|
fallbackTimelimit := time.Now().Add(-72 * time.Hour).Unix()
|
||||||
|
|
||||||
notsLock.Lock()
|
notsLock.Lock()
|
||||||
defer notsLock.Unlock()
|
defer notsLock.Unlock()
|
||||||
|
|
||||||
for _, n := range nots {
|
for _, n := range nots {
|
||||||
n.Lock()
|
n.Lock()
|
||||||
if n.Expires != 0 && n.Expires < threshold ||
|
switch {
|
||||||
n.Executed != 0 && n.Executed < threshold ||
|
case n.Executed != 0: // notification was fully handled
|
||||||
n.Created < maxThreshold {
|
// wait for a short time before deleting
|
||||||
|
if n.Executed < finishedThreshhold {
|
||||||
|
go deleteNotification(n)
|
||||||
|
}
|
||||||
|
case n.Responded != 0:
|
||||||
|
// waiting for execution
|
||||||
|
if n.Responded < executionTimelimit {
|
||||||
|
go deleteNotification(n)
|
||||||
|
}
|
||||||
|
case n.Expires != 0:
|
||||||
|
// expired without response
|
||||||
|
if n.Expires < now {
|
||||||
|
go deleteNotification(n)
|
||||||
|
}
|
||||||
|
case n.Created != 0:
|
||||||
|
// fallback: delete after 3 days after creation
|
||||||
|
if n.Created < fallbackTimelimit {
|
||||||
|
go deleteNotification(n)
|
||||||
|
|
||||||
// delete
|
}
|
||||||
n.Meta().Delete()
|
default:
|
||||||
delete(nots, n.ID)
|
// invalid, impossible to determine cleanup timeframe, delete now
|
||||||
|
go deleteNotification(n)
|
||||||
// save (ie. propagate delete)
|
|
||||||
go n.Save()
|
|
||||||
}
|
}
|
||||||
n.Unlock()
|
n.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func deleteNotification(n *Notification) {
|
||||||
|
err := n.Delete()
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("notifications: failed to delete %s: %s", n.ID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -128,45 +128,59 @@ func (s *StorageInterface) Put(r record.Record) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// continue in goroutine
|
// continue in goroutine
|
||||||
go updateNotificationFromDatabasePut(n, key)
|
go UpdateNotification(n, key)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateNotificationFromDatabasePut(n *Notification, key string) {
|
// UpdateNotification updates a notification with input from a database action. Notification will not be saved/propagated if there is no valid change.
|
||||||
|
func UpdateNotification(n *Notification, key string) {
|
||||||
|
n.Lock()
|
||||||
|
defer n.Unlock()
|
||||||
|
|
||||||
// seperate goroutine in order to correctly lock notsLock
|
// seperate goroutine in order to correctly lock notsLock
|
||||||
notsLock.RLock()
|
notsLock.RLock()
|
||||||
origN, ok := nots[key]
|
origN, ok := nots[key]
|
||||||
notsLock.RUnlock()
|
notsLock.RUnlock()
|
||||||
|
|
||||||
|
save := false
|
||||||
|
|
||||||
// ignore if already deleted
|
// ignore if already deleted
|
||||||
if ok && origN.Meta().IsDeleted() {
|
if ok && origN.Meta().IsDeleted() {
|
||||||
ok = false
|
ok = false
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok {
|
if ok {
|
||||||
// existing notification, update selected action ID only
|
// existing notification
|
||||||
n.Lock()
|
// only update select attributes
|
||||||
defer n.Unlock()
|
origN.Lock()
|
||||||
|
defer origN.Unlock()
|
||||||
} else {
|
} else {
|
||||||
// accept new notification as is
|
// new notification (from external source): old == new
|
||||||
n.Save()
|
|
||||||
// set var for action processing
|
|
||||||
origN = n
|
origN = n
|
||||||
|
save = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// select action, if not yet already handled
|
switch {
|
||||||
if n.SelectedActionID != "" && n.Responded == 0 {
|
case n.SelectedActionID != "" && n.Responded == 0:
|
||||||
log.Tracef("notifications: user selected action for %s: %s", n.ID, n.SelectedActionID)
|
// select action, if not yet already handled
|
||||||
origN.SelectAndExecuteAction(n.SelectedActionID)
|
log.Tracef("notifications: selected action for %s: %s", n.ID, n.SelectedActionID)
|
||||||
|
origN.selectAndExecuteAction(n.SelectedActionID)
|
||||||
|
save = true
|
||||||
|
case origN.Executed == 0 && n.Executed != 0:
|
||||||
|
log.Tracef("notifications: action for %s executed externally", n.ID)
|
||||||
|
origN.Executed = n.Executed
|
||||||
|
save = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if save {
|
||||||
|
// we may be locking
|
||||||
|
go origN.Save()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes a record from the database.
|
// Delete deletes a record from the database.
|
||||||
func (s *StorageInterface) Delete(key string) error {
|
func (s *StorageInterface) Delete(key string) error {
|
||||||
notsLock.Lock()
|
|
||||||
defer notsLock.Unlock()
|
|
||||||
|
|
||||||
// transform key
|
// transform key
|
||||||
if strings.HasPrefix(key, "all/") {
|
if strings.HasPrefix(key, "all/") {
|
||||||
key = strings.TrimPrefix(key, "all/")
|
key = strings.TrimPrefix(key, "all/")
|
||||||
|
@ -175,18 +189,14 @@ func (s *StorageInterface) Delete(key string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// get notification
|
// get notification
|
||||||
|
notsLock.Lock()
|
||||||
n, ok := nots[key]
|
n, ok := nots[key]
|
||||||
|
notsLock.Unlock()
|
||||||
if !ok {
|
if !ok {
|
||||||
return storage.ErrNotFound
|
return storage.ErrNotFound
|
||||||
}
|
}
|
||||||
n.Lock()
|
|
||||||
defer n.Unlock()
|
|
||||||
|
|
||||||
// delete
|
// delete
|
||||||
n.Meta().Delete()
|
return n.Delete()
|
||||||
delete(nots, n.ID)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadOnly returns whether the database is read only.
|
// ReadOnly returns whether the database is read only.
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/database"
|
||||||
"github.com/safing/portbase/database/record"
|
"github.com/safing/portbase/database/record"
|
||||||
"github.com/safing/portbase/log"
|
"github.com/safing/portbase/log"
|
||||||
|
|
||||||
|
@ -43,6 +44,7 @@ type Notification struct {
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
actionFunction func(*Notification) // call function to process action
|
actionFunction func(*Notification) // call function to process action
|
||||||
actionTrigger chan string // and/or send to a channel
|
actionTrigger chan string // and/or send to a channel
|
||||||
|
expiredTrigger chan struct{} // closed on expire
|
||||||
}
|
}
|
||||||
|
|
||||||
// Action describes an action that can be taken for a notification.
|
// Action describes an action that can be taken for a notification.
|
||||||
|
@ -80,7 +82,6 @@ func (n *Notification) Save() *Notification {
|
||||||
if n.GUID == "" {
|
if n.GUID == "" {
|
||||||
n.GUID = uuid.NewV4().String()
|
n.GUID = uuid.NewV4().String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// check key
|
// check key
|
||||||
if n.DatabaseKey() == "" {
|
if n.DatabaseKey() == "" {
|
||||||
n.SetKey(fmt.Sprintf("notifications:all/%s", n.ID))
|
n.SetKey(fmt.Sprintf("notifications:all/%s", n.ID))
|
||||||
|
@ -110,11 +111,12 @@ func (n *Notification) Save() *Notification {
|
||||||
Executed: n.Executed,
|
Executed: n.Executed,
|
||||||
}
|
}
|
||||||
duplicate.SetMeta(n.Meta().Duplicate())
|
duplicate.SetMeta(n.Meta().Duplicate())
|
||||||
duplicate.SetKey(fmt.Sprintf("%s/%s", persistentBasePath, n.ID))
|
key := fmt.Sprintf("%s/%s", persistentBasePath, n.ID)
|
||||||
|
duplicate.SetKey(key)
|
||||||
go func() {
|
go func() {
|
||||||
err := dbInterface.Put(duplicate)
|
err := dbInterface.Put(duplicate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warningf("notifications: failed to persist notification %s: %s", n.Key(), err)
|
log.Warningf("notifications: failed to persist notification %s: %s", key, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -151,42 +153,85 @@ func (n *Notification) MakeAck() *Notification {
|
||||||
// Response waits for the user to respond to the notification and returns the selected action.
|
// Response waits for the user to respond to the notification and returns the selected action.
|
||||||
func (n *Notification) Response() <-chan string {
|
func (n *Notification) Response() <-chan string {
|
||||||
n.lock.Lock()
|
n.lock.Lock()
|
||||||
defer n.lock.Unlock()
|
|
||||||
|
|
||||||
if n.actionTrigger == nil {
|
if n.actionTrigger == nil {
|
||||||
n.actionTrigger = make(chan string)
|
n.actionTrigger = make(chan string)
|
||||||
}
|
}
|
||||||
|
n.lock.Unlock()
|
||||||
|
|
||||||
return n.actionTrigger
|
return n.actionTrigger
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cancel (prematurely) destroys a notification.
|
// Update updates/resends a notification if it was not already responded to.
|
||||||
func (n *Notification) Cancel() {
|
func (n *Notification) Update(expires int64) {
|
||||||
|
responded := true
|
||||||
|
n.lock.Lock()
|
||||||
|
if n.Responded == 0 {
|
||||||
|
responded = false
|
||||||
|
n.Expires = expires
|
||||||
|
}
|
||||||
|
n.lock.Unlock()
|
||||||
|
|
||||||
|
// save if not yet responded
|
||||||
|
if !responded {
|
||||||
|
n.Save()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete (prematurely) cancels and deletes a notification.
|
||||||
|
func (n *Notification) Delete() error {
|
||||||
notsLock.Lock()
|
notsLock.Lock()
|
||||||
defer notsLock.Unlock()
|
defer notsLock.Unlock()
|
||||||
n.Lock()
|
n.Lock()
|
||||||
defer n.Unlock()
|
defer n.Unlock()
|
||||||
|
|
||||||
// delete
|
// mark as deleted
|
||||||
n.Meta().Delete()
|
n.Meta().Delete()
|
||||||
|
|
||||||
|
// delete from internal storage
|
||||||
delete(nots, n.ID)
|
delete(nots, n.ID)
|
||||||
|
|
||||||
// save (ie. propagate delete)
|
// close expired
|
||||||
go n.Save()
|
if n.expiredTrigger != nil {
|
||||||
|
close(n.expiredTrigger)
|
||||||
|
n.expiredTrigger = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// push update
|
||||||
|
dbController.PushUpdate(n)
|
||||||
|
|
||||||
|
// delete from persistent storage
|
||||||
|
if n.Persistent && persistentBasePath != "" {
|
||||||
|
key := fmt.Sprintf("%s/%s", persistentBasePath, n.ID)
|
||||||
|
err := dbInterface.Delete(key)
|
||||||
|
if err != nil && err != database.ErrNotFound {
|
||||||
|
return fmt.Errorf("failed to delete persisted notification %s from database: %s", key, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SelectAndExecuteAction sets the user response and executes/triggers the action, if possible.
|
// Expired notifies the caller when the notification has expired.
|
||||||
func (n *Notification) SelectAndExecuteAction(id string) {
|
func (n *Notification) Expired() <-chan struct{} {
|
||||||
n.Lock()
|
n.lock.Lock()
|
||||||
defer n.Unlock()
|
if n.expiredTrigger == nil {
|
||||||
|
n.expiredTrigger = make(chan struct{})
|
||||||
|
}
|
||||||
|
n.lock.Unlock()
|
||||||
|
|
||||||
// update selection
|
return n.expiredTrigger
|
||||||
|
}
|
||||||
|
|
||||||
|
// selectAndExecuteAction sets the user response and executes/triggers the action, if possible.
|
||||||
|
func (n *Notification) selectAndExecuteAction(id string) {
|
||||||
|
// abort if already executed
|
||||||
if n.Executed != 0 {
|
if n.Executed != 0 {
|
||||||
// we already executed
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
n.SelectedActionID = id
|
|
||||||
|
// set response
|
||||||
n.Responded = time.Now().Unix()
|
n.Responded = time.Now().Unix()
|
||||||
|
n.SelectedActionID = id
|
||||||
|
|
||||||
// execute
|
// execute
|
||||||
executed := false
|
executed := false
|
||||||
|
@ -201,7 +246,7 @@ func (n *Notification) SelectAndExecuteAction(id string) {
|
||||||
select {
|
select {
|
||||||
case n.actionTrigger <- n.SelectedActionID:
|
case n.actionTrigger <- n.SelectedActionID:
|
||||||
executed = true
|
executed = true
|
||||||
default:
|
case <-time.After(100 * time.Millisecond): // mitigate race conditions
|
||||||
break triggerAll
|
break triggerAll
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -211,8 +256,6 @@ func (n *Notification) SelectAndExecuteAction(id string) {
|
||||||
if executed {
|
if executed {
|
||||||
n.Executed = time.Now().Unix()
|
n.Executed = time.Now().Unix()
|
||||||
}
|
}
|
||||||
|
|
||||||
go n.Save()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddDataSubject adds the data subject to the notification. This is the only way how a data subject should be added - it avoids locking problems.
|
// AddDataSubject adds the data subject to the notification. This is the only way how a data subject should be added - it avoids locking problems.
|
||||||
|
|
Loading…
Add table
Reference in a new issue