mirror of
https://github.com/safing/portbase
synced 2025-09-02 02:29:59 +00:00
Merge pull request #96 from safing/feature/ui-revamp-ingest
Improvements for changes for new UI
This commit is contained in:
commit
a9ac02f68e
17 changed files with 205 additions and 146 deletions
|
@ -41,7 +41,7 @@ func registerConfig() error {
|
||||||
err := config.Register(&config.Option{
|
err := config.Register(&config.Option{
|
||||||
Name: "API Address",
|
Name: "API Address",
|
||||||
Key: CfgDefaultListenAddressKey,
|
Key: CfgDefaultListenAddressKey,
|
||||||
Description: "Define on which IP and port the API should listen on.",
|
Description: "Defines the IP address and port for the internal API.",
|
||||||
OptType: config.OptTypeString,
|
OptType: config.OptTypeString,
|
||||||
ExpertiseLevel: config.ExpertiseLevelDeveloper,
|
ExpertiseLevel: config.ExpertiseLevelDeveloper,
|
||||||
ReleaseLevel: config.ReleaseLevelStable,
|
ReleaseLevel: config.ReleaseLevelStable,
|
||||||
|
|
|
@ -39,9 +39,9 @@ func init() {
|
||||||
|
|
||||||
func registerExpertiseLevelOption() {
|
func registerExpertiseLevelOption() {
|
||||||
expertiseLevelOption = &Option{
|
expertiseLevelOption = &Option{
|
||||||
Name: "Expertise Level",
|
Name: "UI Mode",
|
||||||
Key: expertiseLevelKey,
|
Key: expertiseLevelKey,
|
||||||
Description: "The Expertise Level controls the perceived complexity. Higher settings will show you more complex settings and information. This might also affect various other things relying on this setting. Modified settings in higher expertise levels stay in effect when switching back. (Unlike the Release Level)",
|
Description: "Controls the amount of settings and information shown. Hidden settings are still in effect - unlike with the Release Level.",
|
||||||
OptType: OptTypeString,
|
OptType: OptTypeString,
|
||||||
ExpertiseLevel: ExpertiseLevelUser,
|
ExpertiseLevel: ExpertiseLevelUser,
|
||||||
ReleaseLevel: ReleaseLevelStable,
|
ReleaseLevel: ReleaseLevelStable,
|
||||||
|
@ -52,14 +52,14 @@ func registerExpertiseLevelOption() {
|
||||||
},
|
},
|
||||||
PossibleValues: []PossibleValue{
|
PossibleValues: []PossibleValue{
|
||||||
{
|
{
|
||||||
Name: "Easy",
|
Name: "Simple",
|
||||||
Value: ExpertiseLevelNameUser,
|
Value: ExpertiseLevelNameUser,
|
||||||
Description: "Easy application mode by hidding complex settings.",
|
Description: "Hide complex settings and information.",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "Expert",
|
Name: "Advanced",
|
||||||
Value: ExpertiseLevelNameExpert,
|
Value: ExpertiseLevelNameExpert,
|
||||||
Description: "Expert application mode. Allows access to almost all configuration options.",
|
Description: "Show technical details.",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "Developer",
|
Name: "Developer",
|
||||||
|
|
|
@ -16,6 +16,7 @@ type OptionType uint8
|
||||||
|
|
||||||
// Various attribute options. Use ExternalOptType for extended types in the frontend.
|
// Various attribute options. Use ExternalOptType for extended types in the frontend.
|
||||||
const (
|
const (
|
||||||
|
optTypeAny OptionType = 0
|
||||||
OptTypeString OptionType = 1
|
OptTypeString OptionType = 1
|
||||||
OptTypeStringArray OptionType = 2
|
OptTypeStringArray OptionType = 2
|
||||||
OptTypeInt OptionType = 3
|
OptTypeInt OptionType = 3
|
||||||
|
@ -24,6 +25,8 @@ const (
|
||||||
|
|
||||||
func getTypeName(t OptionType) string {
|
func getTypeName(t OptionType) string {
|
||||||
switch t {
|
switch t {
|
||||||
|
case optTypeAny:
|
||||||
|
return "any"
|
||||||
case OptTypeString:
|
case OptTypeString:
|
||||||
return "string"
|
return "string"
|
||||||
case OptTypeStringArray:
|
case OptTypeStringArray:
|
||||||
|
@ -84,6 +87,12 @@ const (
|
||||||
// SubsystemAnnotation can be used to mark an option as part
|
// SubsystemAnnotation can be used to mark an option as part
|
||||||
// of a module subsystem.
|
// of a module subsystem.
|
||||||
SubsystemAnnotation = "safing/portbase:module:subsystem"
|
SubsystemAnnotation = "safing/portbase:module:subsystem"
|
||||||
|
// StackableAnnotation can be set on configuration options that
|
||||||
|
// stack on top of the default (or otherwise related) options.
|
||||||
|
// The value of StackableAnnotaiton is expected to be a boolean but
|
||||||
|
// may be extended to hold references to other options in the
|
||||||
|
// future.
|
||||||
|
StackableAnnotation = "safing/portbase:options:stackable"
|
||||||
// QuickSettingAnnotation can be used to add quick settings to
|
// QuickSettingAnnotation can be used to add quick settings to
|
||||||
// a configuration option. A quick setting can support the user
|
// a configuration option. A quick setting can support the user
|
||||||
// by switching between pre-configured values.
|
// by switching between pre-configured values.
|
||||||
|
@ -171,7 +180,7 @@ type Option struct {
|
||||||
// been created.
|
// been created.
|
||||||
Description string
|
Description string
|
||||||
// Help may hold a long version of the description providing
|
// Help may hold a long version of the description providing
|
||||||
// assistence with the configuration option.
|
// assistance with the configuration option.
|
||||||
// Help is considered immutable after the option has
|
// Help is considered immutable after the option has
|
||||||
// been created.
|
// been created.
|
||||||
Help string
|
Help string
|
||||||
|
|
|
@ -75,7 +75,7 @@ func (p *Perspective) getPerspectiveValueCache(name string, requestedType Option
|
||||||
}
|
}
|
||||||
|
|
||||||
// check type
|
// check type
|
||||||
if requestedType != pOption.option.OptType {
|
if requestedType != pOption.option.OptType && requestedType != optTypeAny {
|
||||||
log.Errorf("config: bad type: requested %s as %s, but is %s", name, getTypeName(requestedType), getTypeName(pOption.option.OptType))
|
log.Errorf("config: bad type: requested %s as %s, but is %s", name, getTypeName(requestedType), getTypeName(pOption.option.OptType))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -88,6 +88,12 @@ func (p *Perspective) getPerspectiveValueCache(name string, requestedType Option
|
||||||
return pOption.valueCache
|
return pOption.valueCache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Has returns whether the given option is set in the perspective.
|
||||||
|
func (p *Perspective) Has(name string) bool {
|
||||||
|
valueCache := p.getPerspectiveValueCache(name, optTypeAny)
|
||||||
|
return valueCache != nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetAsString returns a function that returns the wanted string with high performance.
|
// GetAsString returns a function that returns the wanted string with high performance.
|
||||||
func (p *Perspective) GetAsString(name string) (value string, ok bool) {
|
func (p *Perspective) GetAsString(name string) (value string, ok bool) {
|
||||||
valueCache := p.getPerspectiveValueCache(name, OptTypeString)
|
valueCache := p.getPerspectiveValueCache(name, OptTypeString)
|
||||||
|
|
|
@ -39,7 +39,7 @@ func registerReleaseLevelOption() {
|
||||||
releaseLevelOption = &Option{
|
releaseLevelOption = &Option{
|
||||||
Name: "Release Level",
|
Name: "Release Level",
|
||||||
Key: releaseLevelKey,
|
Key: releaseLevelKey,
|
||||||
Description: "The Release Level changes which features are available to you. Some beta or experimental features are also available in the stable release channel. Unavailable settings are set to the default value.",
|
Description: "Controls the amount of available settings. Hidden settings revert to default - unlike with the UI Mode.",
|
||||||
OptType: OptTypeString,
|
OptType: OptTypeString,
|
||||||
ExpertiseLevel: ExpertiseLevelExpert,
|
ExpertiseLevel: ExpertiseLevelExpert,
|
||||||
ReleaseLevel: ReleaseLevelStable,
|
ReleaseLevel: ReleaseLevelStable,
|
||||||
|
@ -62,7 +62,7 @@ func registerReleaseLevelOption() {
|
||||||
{
|
{
|
||||||
Name: "Experimental",
|
Name: "Experimental",
|
||||||
Value: ReleaseLevelNameExperimental,
|
Value: ReleaseLevelNameExperimental,
|
||||||
Description: "Show experimental features",
|
Description: "Show all features",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ func isAllowedPossibleValue(opt *Option, value interface{}) error {
|
||||||
compareAgainst := val.Value
|
compareAgainst := val.Value
|
||||||
valueType := reflect.TypeOf(value)
|
valueType := reflect.TypeOf(value)
|
||||||
|
|
||||||
// loading int's from the configuration JSON does not perserve the correct type
|
// loading int's from the configuration JSON does not preserve the correct type
|
||||||
// as we get float64 instead. Make sure to convert them before.
|
// as we get float64 instead. Make sure to convert them before.
|
||||||
if reflect.TypeOf(val.Value).ConvertibleTo(valueType) {
|
if reflect.TypeOf(val.Value).ConvertibleTo(valueType) {
|
||||||
compareAgainst = reflect.ValueOf(val.Value).Convert(valueType).Interface()
|
compareAgainst = reflect.ValueOf(val.Value).Convert(valueType).Interface()
|
||||||
|
|
|
@ -17,7 +17,7 @@ type Hook interface {
|
||||||
// the underlying storage. A PreGet hookd may be used to
|
// the underlying storage. A PreGet hookd may be used to
|
||||||
// implement more advanced access control on database keys.
|
// implement more advanced access control on database keys.
|
||||||
PreGet(dbKey string) error
|
PreGet(dbKey string) error
|
||||||
// UsesPostGet should returnd true if the hook's PostGet
|
// UsesPostGet should return true if the hook's PostGet
|
||||||
// should be called after loading a database record from
|
// should be called after loading a database record from
|
||||||
// the underlying storage.
|
// the underlying storage.
|
||||||
UsesPostGet() bool
|
UsesPostGet() bool
|
||||||
|
|
|
@ -21,7 +21,7 @@ var (
|
||||||
printGraphFlag bool
|
printGraphFlag bool
|
||||||
)
|
)
|
||||||
|
|
||||||
// Register registeres a new subsystem. It's like Manager.Register
|
// Register registers a new subsystem. It's like Manager.Register
|
||||||
// but uses DefaultManager and panics on error.
|
// but uses DefaultManager and panics on error.
|
||||||
func Register(id, name, description string, module *modules.Module, configKeySpace string, option *config.Option) {
|
func Register(id, name, description string, module *modules.Module, configKeySpace string, option *config.Option) {
|
||||||
err := DefaultManager.Register(id, name, description, module, configKeySpace, option)
|
err := DefaultManager.Register(id, name, description, module, configKeySpace, option)
|
||||||
|
@ -92,7 +92,7 @@ func prep() error {
|
||||||
|
|
||||||
func start() error {
|
func start() error {
|
||||||
// Registration of subsystems is only allowed during
|
// Registration of subsystems is only allowed during
|
||||||
// preperation. Make sure any further call to Register()
|
// preparation. Make sure any further call to Register()
|
||||||
// panics.
|
// panics.
|
||||||
if err := DefaultManager.Start(); err != nil {
|
if err := DefaultManager.Start(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -104,9 +104,9 @@ func start() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// PrintGraph prints the subsystem and module graph.
|
// PrintGraph prints the subsystem and module graph.
|
||||||
func (reg *Manager) PrintGraph() {
|
func (mng *Manager) PrintGraph() {
|
||||||
reg.l.RLock()
|
mng.l.RLock()
|
||||||
defer reg.l.RUnlock()
|
defer mng.l.RUnlock()
|
||||||
|
|
||||||
fmt.Println("subsystems dependency graph:")
|
fmt.Println("subsystems dependency graph:")
|
||||||
|
|
||||||
|
@ -114,17 +114,17 @@ func (reg *Manager) PrintGraph() {
|
||||||
module.Disable()
|
module.Disable()
|
||||||
|
|
||||||
// mark roots
|
// mark roots
|
||||||
for _, sub := range reg.subsys {
|
for _, sub := range mng.subsys {
|
||||||
sub.module.Enable() // mark as tree root
|
sub.module.Enable() // mark as tree root
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, sub := range reg.subsys {
|
for _, sub := range mng.subsys {
|
||||||
printModuleGraph("", sub.module, true)
|
printModuleGraph("", sub.module, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("\nsubsystem module groups:")
|
fmt.Println("\nsubsystem module groups:")
|
||||||
_ = start() // no errors for what we need here
|
_ = start() // no errors for what we need here
|
||||||
for _, sub := range reg.subsys {
|
for _, sub := range mng.subsys {
|
||||||
fmt.Printf("├── %s\n", sub.Name)
|
fmt.Printf("├── %s\n", sub.Name)
|
||||||
for _, mod := range sub.Modules[1:] {
|
for _, mod := range sub.Modules[1:] {
|
||||||
fmt.Printf("│ ├── %s\n", mod.Name)
|
fmt.Printf("│ ├── %s\n", mod.Name)
|
||||||
|
|
|
@ -36,7 +36,7 @@ type Manager struct {
|
||||||
runtime *runtime.Registry
|
runtime *runtime.Registry
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewManager returns a new subsystem manager that registeres
|
// NewManager returns a new subsystem manager that registers
|
||||||
// itself at rtReg.
|
// itself at rtReg.
|
||||||
func NewManager(rtReg *runtime.Registry) (*Manager, error) {
|
func NewManager(rtReg *runtime.Registry) (*Manager, error) {
|
||||||
mng := &Manager{
|
mng := &Manager{
|
||||||
|
@ -71,7 +71,7 @@ func (mng *Manager) Start() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// aggregate all modules dependencies (and the subsystem module itself)
|
// aggregate all modules dependencies (and the subsystem module itself)
|
||||||
// into the Modules slice. Configuration options form dependened modules
|
// into the Modules slice. Configuration options form dependent modules
|
||||||
// will be marked using config.SubsystemAnnotation if not already set.
|
// will be marked using config.SubsystemAnnotation if not already set.
|
||||||
for _, sub := range mng.subsys {
|
for _, sub := range mng.subsys {
|
||||||
sub.Modules = append(sub.Modules, statusFromModule(sub.module))
|
sub.Modules = append(sub.Modules, statusFromModule(sub.module))
|
||||||
|
@ -118,7 +118,7 @@ func (mng *Manager) Get(keyOrPrefix string) ([]record.Record, error) {
|
||||||
return records, nil
|
return records, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register registeres a new subsystem. The given option must be a bool option.
|
// Register registers a new subsystem. The given option must be a bool option.
|
||||||
// Should be called in init() directly after the modules.Register() function.
|
// Should be called in init() directly after the modules.Register() function.
|
||||||
// The config option must not yet be registered and will be registered for
|
// The config option must not yet be registered and will be registered for
|
||||||
// you. Pass a nil option to force enable.
|
// you. Pass a nil option to force enable.
|
||||||
|
|
|
@ -28,7 +28,7 @@ type Subsystem struct { //nolint:maligned // not worth the effort
|
||||||
// FailureStatus is the worst failure status that is currently
|
// FailureStatus is the worst failure status that is currently
|
||||||
// set in one of the subsystem's dependencies.
|
// set in one of the subsystem's dependencies.
|
||||||
FailureStatus uint8
|
FailureStatus uint8
|
||||||
// ToggleOptionKey holds the key of the configuraiton option
|
// ToggleOptionKey holds the key of the configuration option
|
||||||
// that is used to completely enable or disable this subsystem.
|
// that is used to completely enable or disable this subsystem.
|
||||||
ToggleOptionKey string
|
ToggleOptionKey string
|
||||||
// ExpertiseLevel defines the complexity of the subsystem and is
|
// ExpertiseLevel defines the complexity of the subsystem and is
|
||||||
|
|
|
@ -3,48 +3,49 @@ package notifications
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/safing/portbase/log"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func cleaner(ctx context.Context) error {
|
func cleaner(ctx context.Context) error { //nolint:unparam // Conforms to worker interface
|
||||||
ticker := time.NewTicker(5 * time.Second)
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
L:
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
break L
|
return nil
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
deleteExpiredNotifs()
|
deleteExpiredNotifs()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteExpiredNotifs() {
|
func deleteExpiredNotifs() {
|
||||||
now := time.Now().Unix()
|
// Get a copy of the notification map.
|
||||||
|
notsCopy := getNotsCopy()
|
||||||
|
|
||||||
notsLock.Lock()
|
// Delete all expired notifications.
|
||||||
defer notsLock.Unlock()
|
for _, n := range notsCopy {
|
||||||
|
if n.isExpired() {
|
||||||
toDelete := make([]*Notification, 0, len(nots))
|
n.delete(true)
|
||||||
for _, n := range nots {
|
|
||||||
n.Lock()
|
|
||||||
if now > n.Expires {
|
|
||||||
toDelete = append(toDelete, n)
|
|
||||||
}
|
|
||||||
n.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, n := range toDelete {
|
|
||||||
n.Lock()
|
|
||||||
err := n.delete(true)
|
|
||||||
n.Unlock()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Debugf("notifications: failed to delete %s: %s", n.EventID, err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Notification) isExpired() bool {
|
||||||
|
n.Lock()
|
||||||
|
defer n.Unlock()
|
||||||
|
|
||||||
|
return n.Expires > 0 && n.Expires < time.Now().Unix()
|
||||||
|
}
|
||||||
|
|
||||||
|
func getNotsCopy() []*Notification {
|
||||||
|
notsLock.RLock()
|
||||||
|
defer notsLock.RUnlock()
|
||||||
|
|
||||||
|
notsCopy := make([]*Notification, 0, len(nots))
|
||||||
|
for _, n := range nots {
|
||||||
|
notsCopy = append(notsCopy, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
return notsCopy
|
||||||
|
}
|
||||||
|
|
|
@ -54,21 +54,27 @@ func registerAsDatabase() error {
|
||||||
|
|
||||||
// Get returns a database record.
|
// Get returns a database record.
|
||||||
func (s *StorageInterface) Get(key string) (record.Record, error) {
|
func (s *StorageInterface) Get(key string) (record.Record, error) {
|
||||||
notsLock.RLock()
|
// Get EventID from key.
|
||||||
defer notsLock.RUnlock()
|
|
||||||
|
|
||||||
// transform key
|
|
||||||
if !strings.HasPrefix(key, "all/") {
|
if !strings.HasPrefix(key, "all/") {
|
||||||
return nil, storage.ErrNotFound
|
return nil, storage.ErrNotFound
|
||||||
}
|
}
|
||||||
key = strings.TrimPrefix(key, "all/")
|
key = strings.TrimPrefix(key, "all/")
|
||||||
|
|
||||||
// get notification
|
// Get notification from storage.
|
||||||
not, ok := nots[key]
|
n, ok := getNotification(key)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, storage.ErrNotFound
|
return nil, storage.ErrNotFound
|
||||||
}
|
}
|
||||||
return not, nil
|
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getNotification(eventID string) (n *Notification, ok bool) {
|
||||||
|
notsLock.RLock()
|
||||||
|
defer notsLock.RUnlock()
|
||||||
|
|
||||||
|
n, ok = nots[eventID]
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query returns a an iterator for the supplied query.
|
// Query returns a an iterator for the supplied query.
|
||||||
|
@ -81,16 +87,12 @@ func (s *StorageInterface) Query(q *query.Query, local, internal bool) (*iterato
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) {
|
func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) {
|
||||||
notsLock.RLock()
|
// Get a copy of the notification map.
|
||||||
defer notsLock.RUnlock()
|
notsCopy := getNotsCopy()
|
||||||
|
|
||||||
// send all notifications
|
// send all notifications
|
||||||
for _, n := range nots {
|
for _, n := range notsCopy {
|
||||||
if n.Meta().IsDeleted() {
|
if inQuery(n, q) {
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if q.MatchesKey(n.DatabaseKey()) && q.MatchesRecord(n) {
|
|
||||||
select {
|
select {
|
||||||
case it.Next <- n:
|
case it.Next <- n:
|
||||||
case <-it.Done:
|
case <-it.Done:
|
||||||
|
@ -103,6 +105,22 @@ func (s *StorageInterface) processQuery(q *query.Query, it *iterator.Iterator) {
|
||||||
it.Finish(nil)
|
it.Finish(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func inQuery(n *Notification, q *query.Query) bool {
|
||||||
|
n.lock.Lock()
|
||||||
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case n.Meta().IsDeleted():
|
||||||
|
return false
|
||||||
|
case !q.MatchesKey(n.DatabaseKey()):
|
||||||
|
return false
|
||||||
|
case !q.MatchesRecord(n):
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// Put stores a record in the database.
|
// Put stores a record in the database.
|
||||||
func (s *StorageInterface) Put(r record.Record) (record.Record, error) {
|
func (s *StorageInterface) Put(r record.Record) (record.Record, error) {
|
||||||
// record is already locked!
|
// record is already locked!
|
||||||
|
@ -125,12 +143,9 @@ func (s *StorageInterface) Put(r record.Record) (record.Record, error) {
|
||||||
|
|
||||||
func applyUpdate(n *Notification, key string) (*Notification, error) {
|
func applyUpdate(n *Notification, key string) (*Notification, error) {
|
||||||
// separate goroutine in order to correctly lock notsLock
|
// separate goroutine in order to correctly lock notsLock
|
||||||
notsLock.RLock()
|
existing, ok := getNotification(key)
|
||||||
existing, ok := nots[key]
|
|
||||||
notsLock.RUnlock()
|
|
||||||
|
|
||||||
// ignore if already deleted
|
// ignore if already deleted
|
||||||
|
|
||||||
if !ok || existing.Meta().IsDeleted() {
|
if !ok || existing.Meta().IsDeleted() {
|
||||||
// this is a completely new notification
|
// this is a completely new notification
|
||||||
// we pass pushUpdate==false because the storage
|
// we pass pushUpdate==false because the storage
|
||||||
|
@ -139,6 +154,14 @@ func applyUpdate(n *Notification, key string) (*Notification, error) {
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Save when we're finished, if needed.
|
||||||
|
save := false
|
||||||
|
defer func() {
|
||||||
|
if save {
|
||||||
|
existing.save(false)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
existing.Lock()
|
existing.Lock()
|
||||||
defer existing.Unlock()
|
defer existing.Unlock()
|
||||||
|
|
||||||
|
@ -146,8 +169,6 @@ func applyUpdate(n *Notification, key string) (*Notification, error) {
|
||||||
return existing, fmt.Errorf("action already executed")
|
return existing, fmt.Errorf("action already executed")
|
||||||
}
|
}
|
||||||
|
|
||||||
save := false
|
|
||||||
|
|
||||||
// check if the notification has been marked as
|
// check if the notification has been marked as
|
||||||
// "executed externally".
|
// "executed externally".
|
||||||
if n.State == Executed {
|
if n.State == Executed {
|
||||||
|
@ -171,33 +192,25 @@ func applyUpdate(n *Notification, key string) (*Notification, error) {
|
||||||
save = true
|
save = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if save {
|
|
||||||
existing.save(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
return existing, nil
|
return existing, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes a record from the database.
|
// Delete deletes a record from the database.
|
||||||
func (s *StorageInterface) Delete(key string) error {
|
func (s *StorageInterface) Delete(key string) error {
|
||||||
// transform key
|
// Get EventID from key.
|
||||||
if !strings.HasPrefix(key, "all/") {
|
if !strings.HasPrefix(key, "all/") {
|
||||||
return storage.ErrNotFound
|
return storage.ErrNotFound
|
||||||
}
|
}
|
||||||
key = strings.TrimPrefix(key, "all/")
|
key = strings.TrimPrefix(key, "all/")
|
||||||
|
|
||||||
notsLock.Lock()
|
// Get notification from storage.
|
||||||
defer notsLock.Unlock()
|
n, ok := getNotification(key)
|
||||||
|
|
||||||
n, ok := nots[key]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return storage.ErrNotFound
|
return storage.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
n.Lock()
|
n.delete(true)
|
||||||
defer n.Unlock()
|
return nil
|
||||||
|
|
||||||
return n.delete(true)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadOnly returns whether the database is read only.
|
// ReadOnly returns whether the database is read only.
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package notifications
|
package notifications
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -13,7 +14,7 @@ import (
|
||||||
// Type describes the type of a notification.
|
// Type describes the type of a notification.
|
||||||
type Type uint8
|
type Type uint8
|
||||||
|
|
||||||
// Notification types
|
// Notification types.
|
||||||
const (
|
const (
|
||||||
Info Type = 0
|
Info Type = 0
|
||||||
Warning Type = 1
|
Warning Type = 1
|
||||||
|
@ -23,6 +24,10 @@ const (
|
||||||
// State describes the state of a notification.
|
// State describes the state of a notification.
|
||||||
type State string
|
type State string
|
||||||
|
|
||||||
|
// NotificationActionFn defines the function signature for notification action
|
||||||
|
// functions.
|
||||||
|
type NotificationActionFn func(context.Context, *Notification) error
|
||||||
|
|
||||||
// Possible notification states.
|
// Possible notification states.
|
||||||
// State transitions can only happen from top to bottom.
|
// State transitions can only happen from top to bottom.
|
||||||
const (
|
const (
|
||||||
|
@ -81,9 +86,9 @@ type Notification struct {
|
||||||
SelectedActionID string
|
SelectedActionID string
|
||||||
|
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
actionFunction func(*Notification) // call function to process action
|
actionFunction NotificationActionFn // call function to process action
|
||||||
actionTrigger chan string // and/or send to a channel
|
actionTrigger chan string // and/or send to a channel
|
||||||
expiredTrigger chan struct{} // closed on expire
|
expiredTrigger chan struct{} // closed on expire
|
||||||
}
|
}
|
||||||
|
|
||||||
// Action describes an action that can be taken for a notification.
|
// Action describes an action that can be taken for a notification.
|
||||||
|
@ -92,8 +97,6 @@ type Action struct {
|
||||||
Text string
|
Text string
|
||||||
}
|
}
|
||||||
|
|
||||||
func noOpAction(n *Notification) {}
|
|
||||||
|
|
||||||
// Get returns the notification identifed by the given id or nil if it doesn't exist.
|
// Get returns the notification identifed by the given id or nil if it doesn't exist.
|
||||||
func Get(id string) *Notification {
|
func Get(id string) *Notification {
|
||||||
notsLock.RLock()
|
notsLock.RLock()
|
||||||
|
@ -149,18 +152,36 @@ func notify(nType Type, id string, msg string, actions ...Action) *Notification
|
||||||
|
|
||||||
// Save saves the notification and returns it.
|
// Save saves the notification and returns it.
|
||||||
func (n *Notification) Save() *Notification {
|
func (n *Notification) Save() *Notification {
|
||||||
n.Lock()
|
|
||||||
defer n.Unlock()
|
|
||||||
|
|
||||||
return n.save(true)
|
return n.save(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// save saves the notification to the internal storage. It locks the
|
||||||
|
// notification, so it must not be locked when save is called.
|
||||||
func (n *Notification) save(pushUpdate bool) *Notification {
|
func (n *Notification) save(pushUpdate bool) *Notification {
|
||||||
|
var id string
|
||||||
|
|
||||||
|
// Delete notification after processing deletion.
|
||||||
|
defer func() {
|
||||||
|
// Lock and save to notification storage.
|
||||||
|
notsLock.Lock()
|
||||||
|
defer notsLock.Unlock()
|
||||||
|
nots[id] = n
|
||||||
|
}()
|
||||||
|
|
||||||
|
// We do not access EventData here, so it is enough to just lock the
|
||||||
|
// notification itself.
|
||||||
|
n.lock.Lock()
|
||||||
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
|
// Save ID for deletion
|
||||||
|
id = n.EventID
|
||||||
|
|
||||||
|
// Generate random GUID if not set.
|
||||||
if n.GUID == "" {
|
if n.GUID == "" {
|
||||||
n.GUID = utils.RandomUUID(n.EventID).String()
|
n.GUID = utils.RandomUUID(n.EventID).String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// make ack notification if there are no defined actions
|
// Make ack notification if there are no defined actions.
|
||||||
if len(n.AvailableActions) == 0 {
|
if len(n.AvailableActions) == 0 {
|
||||||
n.AvailableActions = []*Action{
|
n.AvailableActions = []*Action{
|
||||||
{
|
{
|
||||||
|
@ -168,12 +189,6 @@ func (n *Notification) save(pushUpdate bool) *Notification {
|
||||||
Text: "OK",
|
Text: "OK",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
n.actionFunction = noOpAction
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure we always have a reasonable expiration set.
|
|
||||||
if n.Expires == 0 {
|
|
||||||
n.Expires = time.Now().Add(72 * time.Hour).Unix()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure we always have a notification state assigned.
|
// Make sure we always have a notification state assigned.
|
||||||
|
@ -182,17 +197,14 @@ func (n *Notification) save(pushUpdate bool) *Notification {
|
||||||
}
|
}
|
||||||
|
|
||||||
// check key
|
// check key
|
||||||
if n.DatabaseKey() == "" {
|
if !n.KeyIsSet() {
|
||||||
n.SetKey(fmt.Sprintf("notifications:all/%s", n.EventID))
|
n.SetKey(fmt.Sprintf("notifications:all/%s", n.EventID))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update meta data.
|
||||||
n.UpdateMeta()
|
n.UpdateMeta()
|
||||||
|
|
||||||
// store the notification inside or map
|
// Push update via the database system if needed.
|
||||||
notsLock.Lock()
|
|
||||||
nots[n.EventID] = n
|
|
||||||
notsLock.Unlock()
|
|
||||||
|
|
||||||
if pushUpdate {
|
if pushUpdate {
|
||||||
log.Tracef("notifications: pushing update for %s to subscribers", n.Key())
|
log.Tracef("notifications: pushing update for %s to subscribers", n.Key())
|
||||||
dbController.PushUpdate(n)
|
dbController.PushUpdate(n)
|
||||||
|
@ -203,7 +215,7 @@ func (n *Notification) save(pushUpdate bool) *Notification {
|
||||||
|
|
||||||
// SetActionFunction sets a trigger function to be executed when the user reacted on the notification.
|
// SetActionFunction sets a trigger function to be executed when the user reacted on the notification.
|
||||||
// The provided function will be started as its own goroutine and will have to lock everything it accesses, even the provided notification.
|
// The provided function will be started as its own goroutine and will have to lock everything it accesses, even the provided notification.
|
||||||
func (n *Notification) SetActionFunction(fn func(*Notification)) *Notification {
|
func (n *Notification) SetActionFunction(fn NotificationActionFn) *Notification {
|
||||||
n.lock.Lock()
|
n.lock.Lock()
|
||||||
defer n.lock.Unlock()
|
defer n.lock.Unlock()
|
||||||
n.actionFunction = fn
|
n.actionFunction = fn
|
||||||
|
@ -224,43 +236,72 @@ func (n *Notification) Response() <-chan string {
|
||||||
|
|
||||||
// Update updates/resends a notification if it was not already responded to.
|
// Update updates/resends a notification if it was not already responded to.
|
||||||
func (n *Notification) Update(expires int64) {
|
func (n *Notification) Update(expires int64) {
|
||||||
|
// Save when we're finished, if needed.
|
||||||
|
save := false
|
||||||
|
defer func() {
|
||||||
|
if save {
|
||||||
|
n.save(true)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
n.lock.Lock()
|
n.lock.Lock()
|
||||||
defer n.lock.Unlock()
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
if n.State == Active {
|
// Don't update if notification isn't active.
|
||||||
n.Expires = expires
|
if n.State != Active {
|
||||||
n.save(true)
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Don't update too quickly.
|
||||||
|
if n.Meta().Modified > time.Now().Add(-10*time.Second).Unix() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update expiry and save.
|
||||||
|
n.Expires = expires
|
||||||
|
save = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete (prematurely) cancels and deletes a notification.
|
// Delete (prematurely) cancels and deletes a notification.
|
||||||
func (n *Notification) Delete() error {
|
func (n *Notification) Delete() error {
|
||||||
notsLock.Lock()
|
n.delete(true)
|
||||||
defer notsLock.Unlock()
|
return nil
|
||||||
n.Lock()
|
|
||||||
defer n.Unlock()
|
|
||||||
|
|
||||||
return n.delete(true)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Notification) delete(pushUpdate bool) error {
|
// delete deletes the notification from the internal storage. It locks the
|
||||||
// mark as deleted
|
// notification, so it must not be locked when delete is called.
|
||||||
|
func (n *Notification) delete(pushUpdate bool) {
|
||||||
|
var id string
|
||||||
|
|
||||||
|
// Delete notification after processing deletion.
|
||||||
|
defer func() {
|
||||||
|
// Lock and delete from notification storage.
|
||||||
|
notsLock.Lock()
|
||||||
|
defer notsLock.Unlock()
|
||||||
|
delete(nots, id)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// We do not access EventData here, so it is enough to just lock the
|
||||||
|
// notification itself.
|
||||||
|
n.lock.Lock()
|
||||||
|
defer n.lock.Unlock()
|
||||||
|
|
||||||
|
// Save ID for deletion
|
||||||
|
id = n.EventID
|
||||||
|
|
||||||
|
// Mark notification as deleted.
|
||||||
n.Meta().Delete()
|
n.Meta().Delete()
|
||||||
|
|
||||||
// delete from internal storage
|
// Close expiry channel if available.
|
||||||
delete(nots, n.EventID)
|
|
||||||
|
|
||||||
// close expired
|
|
||||||
if n.expiredTrigger != nil {
|
if n.expiredTrigger != nil {
|
||||||
close(n.expiredTrigger)
|
close(n.expiredTrigger)
|
||||||
n.expiredTrigger = nil
|
n.expiredTrigger = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Push update via the database system if needed.
|
||||||
if pushUpdate {
|
if pushUpdate {
|
||||||
dbController.PushUpdate(n)
|
dbController.PushUpdate(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Expired notifies the caller when the notification has expired.
|
// Expired notifies the caller when the notification has expired.
|
||||||
|
@ -286,7 +327,9 @@ func (n *Notification) selectAndExecuteAction(id string) {
|
||||||
|
|
||||||
executed := false
|
executed := false
|
||||||
if n.actionFunction != nil {
|
if n.actionFunction != nil {
|
||||||
go n.actionFunction(n)
|
module.StartWorker("notification action execution", func(ctx context.Context) error {
|
||||||
|
return n.actionFunction(ctx, n)
|
||||||
|
})
|
||||||
executed = true
|
executed = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -336,14 +379,3 @@ func (n *Notification) Unlock() {
|
||||||
locker.Unlock()
|
locker.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func duplicateActions(original []*Action) (duplicate []*Action) {
|
|
||||||
duplicate = make([]*Action, len(original))
|
|
||||||
for _, action := range original {
|
|
||||||
duplicate = append(duplicate, &Action{
|
|
||||||
ID: action.ID,
|
|
||||||
Text: action.Text,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
|
@ -9,12 +9,10 @@ var (
|
||||||
// DefaultRegistry is the default registry
|
// DefaultRegistry is the default registry
|
||||||
// that is used by the module-level API.
|
// that is used by the module-level API.
|
||||||
DefaultRegistry = NewRegistry()
|
DefaultRegistry = NewRegistry()
|
||||||
|
|
||||||
module *modules.Module
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
module = modules.Register("runtime", nil, startModule, nil, "database")
|
modules.Register("runtime", nil, startModule, nil, "database")
|
||||||
}
|
}
|
||||||
|
|
||||||
func startModule() error {
|
func startModule() error {
|
||||||
|
|
|
@ -36,7 +36,7 @@ var (
|
||||||
// package but may consider creating a dedicated
|
// package but may consider creating a dedicated
|
||||||
// runtime registry on their own. Registry uses
|
// runtime registry on their own. Registry uses
|
||||||
// a radix tree for value providers and their
|
// a radix tree for value providers and their
|
||||||
// choosen database key/prefix.
|
// chosen database key/prefix.
|
||||||
type Registry struct {
|
type Registry struct {
|
||||||
l sync.RWMutex
|
l sync.RWMutex
|
||||||
providers *radix.Tree
|
providers *radix.Tree
|
||||||
|
|
|
@ -43,7 +43,7 @@ func getTestRegistry(t *testing.T) *Registry {
|
||||||
r := NewRegistry()
|
r := NewRegistry()
|
||||||
|
|
||||||
providers := []testProvider{
|
providers := []testProvider{
|
||||||
testProvider{
|
{
|
||||||
k: "p1/",
|
k: "p1/",
|
||||||
r: []record.Record{
|
r: []record.Record{
|
||||||
makeTestRecord("p1/f1/v1", "p1.1"),
|
makeTestRecord("p1/f1/v1", "p1.1"),
|
||||||
|
@ -51,7 +51,7 @@ func getTestRegistry(t *testing.T) *Registry {
|
||||||
makeTestRecord("p1/v3", "p1.3"),
|
makeTestRecord("p1/v3", "p1.3"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
testProvider{
|
{
|
||||||
k: "p2/f1",
|
k: "p2/f1",
|
||||||
r: []record.Record{
|
r: []record.Record{
|
||||||
makeTestRecord("p2/f1/v1", "p2.1"),
|
makeTestRecord("p2/f1/v1", "p2.1"),
|
||||||
|
@ -104,7 +104,7 @@ func TestRegistryQuery(t *testing.T) {
|
||||||
iter, err := reg.Query(q, true, true)
|
iter, err := reg.Query(q, true, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, iter)
|
require.NotNil(t, iter)
|
||||||
var records []record.Record
|
var records []record.Record //nolint:prealloc
|
||||||
for r := range iter.Next {
|
for r := range iter.Next {
|
||||||
records = append(records, r)
|
records = append(records, r)
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ func init() {
|
||||||
module,
|
module,
|
||||||
"config:template", // key space for configuration options registered
|
"config:template", // key space for configuration options registered
|
||||||
&config.Option{
|
&config.Option{
|
||||||
Name: "Enable Template Subsystem",
|
Name: "Template Subsystem",
|
||||||
Key: "config:subsystems/template",
|
Key: "config:subsystems/template",
|
||||||
Description: "This option enables the Template Subsystem [TEMPLATE]",
|
Description: "This option enables the Template Subsystem [TEMPLATE]",
|
||||||
OptType: config.OptTypeBool,
|
OptType: config.OptTypeBool,
|
||||||
|
|
Loading…
Add table
Reference in a new issue