mirror of
https://github.com/safing/portbase
synced 2025-04-17 07:59:09 +00:00
Improve events and expose them via the runtime database
This commit is contained in:
parent
ca19f4a44b
commit
f18c0c5564
4 changed files with 139 additions and 8 deletions
|
@ -32,7 +32,7 @@ func SetDataRoot(root *utils.DirStructure) {
|
|||
|
||||
func init() {
|
||||
module = modules.Register("config", prep, start, nil, "database")
|
||||
module.RegisterEvent(configChangeEvent)
|
||||
module.RegisterEvent(configChangeEvent, true)
|
||||
|
||||
flag.BoolVar(&exportConfig, "export-config-options", false, "export configuration registry and exit")
|
||||
}
|
||||
|
|
|
@ -6,8 +6,18 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/safing/portbase/log"
|
||||
"github.com/tevino/abool"
|
||||
)
|
||||
|
||||
type eventHooks struct {
|
||||
// hooks holds all registed hooks for the event.
|
||||
hooks []*eventHook
|
||||
|
||||
// internal signifies that the event and it's data may not be exposed and may
|
||||
// only be propagated internally.
|
||||
internal bool
|
||||
}
|
||||
|
||||
type eventHookFn func(context.Context, interface{}) error
|
||||
|
||||
type eventHook struct {
|
||||
|
@ -27,17 +37,26 @@ func (m *Module) processEventTrigger(event string, data interface{}) {
|
|||
m.eventHooksLock.RLock()
|
||||
defer m.eventHooksLock.RUnlock()
|
||||
|
||||
hooks, ok := m.eventHooks[event]
|
||||
eventHooks, ok := m.eventHooks[event]
|
||||
if !ok {
|
||||
log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, event)
|
||||
return
|
||||
}
|
||||
|
||||
for _, hook := range hooks {
|
||||
for _, hook := range eventHooks.hooks {
|
||||
if hook.hookingModule.OnlineSoon() {
|
||||
go m.runEventHook(hook, event, data)
|
||||
}
|
||||
}
|
||||
|
||||
// Call subscription function, if set.
|
||||
if eventSubscriptionFuncReady.IsSet() {
|
||||
m.StartWorker("event subscription", func(context.Context) error {
|
||||
// Only use data in worker that won't change anymore.
|
||||
eventSubscriptionFunc(m.Name, event, eventHooks.internal, data)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// InjectEvent triggers an event from a foreign module and executes all hook functions registered to that event.
|
||||
|
@ -63,12 +82,21 @@ func (m *Module) InjectEvent(sourceEventName, targetModuleName, targetEventName
|
|||
return fmt.Errorf(`module "%s" has no event named "%s"`, targetModuleName, targetEventName)
|
||||
}
|
||||
|
||||
for _, hook := range targetHooks {
|
||||
for _, hook := range targetHooks.hooks {
|
||||
if hook.hookingModule.OnlineSoon() {
|
||||
go m.runEventHook(hook, sourceEventName, data)
|
||||
}
|
||||
}
|
||||
|
||||
// Call subscription function, if set.
|
||||
if eventSubscriptionFuncReady.IsSet() {
|
||||
m.StartWorker("event subscription", func(context.Context) error {
|
||||
// Only use data in worker that won't change anymore.
|
||||
eventSubscriptionFunc(targetModule.Name, targetEventName, targetHooks.internal, data)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -111,13 +139,20 @@ func (m *Module) runEventHook(hook *eventHook, event string, data interface{}) {
|
|||
}
|
||||
|
||||
// RegisterEvent registers a new event to allow for registering hooks.
|
||||
func (m *Module) RegisterEvent(event string) {
|
||||
// The expose argument controls whether these events and the attached data may
|
||||
// be received by external components via APIs. If not exposed, the database
|
||||
// record that carries the event and it's data will be marked as secret and as
|
||||
// a crown jewel. Enforcement is left to the database layer.
|
||||
func (m *Module) RegisterEvent(event string, expose bool) {
|
||||
m.eventHooksLock.Lock()
|
||||
defer m.eventHooksLock.Unlock()
|
||||
|
||||
_, ok := m.eventHooks[event]
|
||||
if !ok {
|
||||
m.eventHooks[event] = make([]*eventHook, 0, 1)
|
||||
m.eventHooks[event] = &eventHooks{
|
||||
hooks: make([]*eventHook, 0, 1),
|
||||
internal: !expose,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -138,16 +173,34 @@ func (m *Module) RegisterEventHook(module string, event string, description stri
|
|||
// get target event
|
||||
eventModule.eventHooksLock.Lock()
|
||||
defer eventModule.eventHooksLock.Unlock()
|
||||
hooks, ok := eventModule.eventHooks[event]
|
||||
eventHooks, ok := eventModule.eventHooks[event]
|
||||
if !ok {
|
||||
return fmt.Errorf(`event "%s/%s" does not exist`, eventModule.Name, event)
|
||||
}
|
||||
|
||||
// add hook
|
||||
eventModule.eventHooks[event] = append(hooks, &eventHook{
|
||||
eventHooks.hooks = append(eventHooks.hooks, &eventHook{
|
||||
description: description,
|
||||
hookingModule: m,
|
||||
hookFn: fn,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe to all events
|
||||
|
||||
var (
|
||||
eventSubscriptionFunc func(moduleName, eventName string, internal bool, data interface{})
|
||||
eventSubscriptionFuncEnabled = abool.NewBool(false)
|
||||
eventSubscriptionFuncReady = abool.NewBool(false)
|
||||
)
|
||||
|
||||
// SetEventSubscriptionFunc
|
||||
func SetEventSubscriptionFunc(fn func(moduleName, eventName string, internal bool, data interface{})) bool {
|
||||
if eventSubscriptionFuncEnabled.SetToIf(false, true) {
|
||||
eventSubscriptionFunc = fn
|
||||
eventSubscriptionFuncReady.Set()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package runtime
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/safing/portbase/database"
|
||||
"github.com/safing/portbase/modules"
|
||||
)
|
||||
|
@ -30,6 +32,10 @@ func startModule() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := startModulesIntegration(); err != nil {
|
||||
return fmt.Errorf("failed to start modules integration: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
72
runtime/modules_integration.go
Normal file
72
runtime/modules_integration.go
Normal file
|
@ -0,0 +1,72 @@
|
|||
package runtime
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/safing/portbase/database"
|
||||
"github.com/safing/portbase/database/record"
|
||||
"github.com/safing/portbase/log"
|
||||
"github.com/safing/portbase/modules"
|
||||
)
|
||||
|
||||
var (
|
||||
modulesIntegrationUpdatePusher func(...record.Record)
|
||||
)
|
||||
|
||||
func startModulesIntegration() (err error) {
|
||||
modulesIntegrationUpdatePusher, err = Register("modules/", &ModulesIntegration{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !modules.SetEventSubscriptionFunc(pushModuleEvent) {
|
||||
log.Warningf("runtime: failed to register the modules event subscription function")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type ModulesIntegration struct{}
|
||||
|
||||
// Set is called when the value is set from outside.
|
||||
// If the runtime value is considered read-only ErrReadOnly
|
||||
// should be returned. It is guaranteed that the key of
|
||||
// the record passed to Set is prefixed with the key used
|
||||
// to register the value provider.
|
||||
func (mi *ModulesIntegration) Set(record.Record) (record.Record, error) {
|
||||
return nil, ErrReadOnly
|
||||
}
|
||||
|
||||
// Get should return one or more records that match keyOrPrefix.
|
||||
// keyOrPrefix is guaranteed to be at least the prefix used to
|
||||
// register the ValueProvider.
|
||||
func (mi *ModulesIntegration) Get(keyOrPrefix string) ([]record.Record, error) {
|
||||
return nil, database.ErrNotFound
|
||||
}
|
||||
|
||||
type eventData struct { //nolint:unused // This is a cascading false positive.
|
||||
record.Base
|
||||
sync.Mutex
|
||||
Data interface{}
|
||||
}
|
||||
|
||||
func pushModuleEvent(moduleName, eventName string, internal bool, data interface{}) { //nolint:unused // This is a false positive, the function is provided to modules.SetEventSubscriptionFunc().
|
||||
// Create event record and set key.
|
||||
eventRecord := &eventData{
|
||||
Data: data,
|
||||
}
|
||||
eventRecord.SetKey(fmt.Sprintf(
|
||||
"runtime:modules/%s/event/%s",
|
||||
moduleName,
|
||||
eventName,
|
||||
))
|
||||
eventRecord.UpdateMeta()
|
||||
if internal {
|
||||
eventRecord.Meta().MakeSecret()
|
||||
eventRecord.Meta().MakeCrownJewel()
|
||||
}
|
||||
|
||||
// Push event to database subscriptions.
|
||||
modulesIntegrationUpdatePusher(eventRecord)
|
||||
}
|
Loading…
Add table
Reference in a new issue