Unclutter Module struct and refactor events

This commit is contained in:
Patrick Pacher 2020-09-21 21:24:10 +02:00
parent 19f75bb6ca
commit 5a2554ed57
No known key found for this signature in database
GPG key ID: E8CD2DA160925A6D
10 changed files with 260 additions and 194 deletions

View file

@ -4,43 +4,80 @@ import (
"context"
"errors"
"fmt"
"sync"
"github.com/safing/portbase/log"
)
type eventHookFn func(context.Context, interface{}) error
type (
// EventObserverFunc can be registered for one or more event types
// and will be called with the event payload.
// Any error returned from the observer function will be logged.
EventObserverFunc func(context.Context, interface{}) error
type eventHook struct {
description string
hookingModule *Module
hookFn eventHookFn
}
// TriggerEvent executes all hook functions registered to the specified event.
func (m *Module) TriggerEvent(event string, data interface{}) {
if m.OnlineSoon() {
go m.processEventTrigger(event, data)
}
}
func (m *Module) processEventTrigger(event string, data interface{}) {
m.eventHooksLock.RLock()
defer m.eventHooksLock.RUnlock()
hooks, ok := m.eventHooks[event]
if !ok {
log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, event)
return
// eventHooks keeps track of registered event subscriptions.
eventHooks struct {
sync.RWMutex
subscriptions map[string][]*subscription
}
for _, hook := range hooks {
if hook.hookingModule.OnlineSoon() {
go m.runEventHook(hook, event, data)
// subscription defines the subscription to an observable.
// Any time the observable emits an event the subscriptions
// callback is called.
subscription struct {
// description is a human readable description of the
// subscription purpose. This is mainly used for logging
// purposes.
description string
// subscriber is a reference to the module that placed
// the subscription.
subscriber *Module
// target holds a reference to the module that is
// observed by this subscription
target *Module
// callback is the function to execute when the observed
// event occurs.
callback EventObserverFunc
}
)
// RegisterEvent registers a new event to allow for registering hooks.
func (m *Module) RegisterEvent(event string) {
m.events.defineEvent(event)
}
// RegisterEventHook registers a hook function with (another) modules'
// event. Whenever a hook is triggered and the receiving module has not
// yet fully started, hook execution will be delayed until the modules
// completed starting.
func (m *Module) RegisterEventHook(module, event, description string, fn EventObserverFunc) error {
targetModule := m
if module != m.Name {
var ok bool
// TODO(ppacher): accessing modules[module] here without any
// kind of protection seems wrong.... Check with
// @dhaavi.
targetModule, ok = modules[module]
if !ok {
return fmt.Errorf(`module "%s" does not exist`, module)
}
}
return targetModule.events.addSubscription(targetModule, m, event, description, fn)
}
// InjectEvent triggers an event from a foreign module and executes all hook functions registered to that event.
// TriggerEvent executes all hook functions registered to the
// specified event.
func (m *Module) TriggerEvent(event string, data interface{}) {
if m.OnlineSoon() {
go m.processEventTrigger(event, event, data)
}
}
// InjectEvent triggers an event from a foreign module and executes
// all hook functions registered to that event.
// Note that sourceEventName is only used for logging purposes while
// targetModuleName and targetEventName must actually exist.
func (m *Module) InjectEvent(sourceEventName, targetModuleName, targetEventName string, data interface{}) error {
if !m.OnlineSoon() {
return errors.New("module not yet started")
@ -55,99 +92,105 @@ func (m *Module) InjectEvent(sourceEventName, targetModuleName, targetEventName
return fmt.Errorf(`module "%s" does not exist`, targetModuleName)
}
targetModule.eventHooksLock.RLock()
defer targetModule.eventHooksLock.RUnlock()
targetHooks, ok := targetModule.eventHooks[targetEventName]
if !ok {
return fmt.Errorf(`module "%s" has no event named "%s"`, targetModuleName, targetEventName)
}
for _, hook := range targetHooks {
if hook.hookingModule.OnlineSoon() {
go m.runEventHook(hook, sourceEventName, data)
}
}
targetModule.processEventTrigger(targetEventName, sourceEventName, data)
return nil
}
func (m *Module) runEventHook(hook *eventHook, event string, data interface{}) {
// check if source module is ready for handling
if m.Status() != StatusOnline {
// target module has not yet fully started, wait until start is complete
select {
case <-m.StartCompleted():
// continue with hook execution
case <-hook.hookingModule.Stopping():
return
case <-m.Stopping():
return
}
func (m *Module) processEventTrigger(eventID, eventName string, data interface{}) {
m.events.RLock()
defer m.events.RUnlock()
hooks, ok := m.events.subscriptions[eventID]
if !ok {
log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, eventID)
return
}
// check if destionation module is ready for handling
if hook.hookingModule.Status() != StatusOnline {
// target module has not yet fully started, wait until start is complete
select {
case <-hook.hookingModule.StartCompleted():
// continue with hook execution
case <-hook.hookingModule.Stopping():
return
case <-m.Stopping():
return
for _, hook := range hooks {
if hook.subscriber.OnlineSoon() {
go hook.runEventHook(eventName, data)
}
}
}
err := hook.hookingModule.RunWorker(
fmt.Sprintf("event hook %s/%s -> %s/%s", m.Name, event, hook.hookingModule.Name, hook.description),
func (hook *subscription) Name(event string) string {
return fmt.Sprintf("event hook %s/%s -> %s/%s", hook.target.Name, event, hook.subscriber.Name, hook.description)
}
func waitForModule(ctx context.Context, m *Module) bool {
select {
case <-ctx.Done():
return false
case <-m.StartCompleted():
return true
}
}
func (hook *subscription) runEventHook(event string, data interface{}) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-hook.subscriber.Stopping():
cancel()
case <-hook.target.Stopping():
cancel()
case <-ctx.Done():
}
}()
// wait for both modules to become online (or shutdown)
if !waitForModule(ctx, hook.target) || !waitForModule(ctx, hook.subscriber) {
return
}
err := hook.subscriber.RunWorker(
hook.Name(event),
func(ctx context.Context) error {
return hook.hookFn(ctx, data)
return hook.callback(ctx, data)
},
)
if err != nil {
log.Warningf("%s: failed to execute event hook %s/%s -> %s/%s: %s", hook.hookingModule.Name, m.Name, event, hook.hookingModule.Name, hook.description, err)
log.Warningf("%s: failed to execute %s: %s", hook.target.Name, hook.Name(event), err)
}
}
// RegisterEvent registers a new event to allow for registering hooks.
func (m *Module) RegisterEvent(event string) {
m.eventHooksLock.Lock()
defer m.eventHooksLock.Unlock()
func (hooks *eventHooks) addSubscription(target, subscriber *Module, event, descr string, fn EventObserverFunc) error {
hooks.Lock()
defer hooks.Unlock()
_, ok := m.eventHooks[event]
if !ok {
m.eventHooks[event] = make([]*eventHook, 0, 1)
}
}
// RegisterEventHook registers a hook function with (another) modules' event. Whenever a hook is triggered and the receiving module has not yet fully started, hook execution will be delayed until the modules completed starting.
func (m *Module) RegisterEventHook(module string, event string, description string, fn func(context.Context, interface{}) error) error {
// get target module
var eventModule *Module
if module == m.Name {
eventModule = m
} else {
var ok bool
eventModule, ok = modules[module]
if !ok {
return fmt.Errorf(`module "%s" does not exist`, module)
}
if hooks.subscriptions == nil {
return fmt.Errorf("unknown event %q", event)
}
// get target event
eventModule.eventHooksLock.Lock()
defer eventModule.eventHooksLock.Unlock()
hooks, ok := eventModule.eventHooks[event]
if !ok {
return fmt.Errorf(`event "%s/%s" does not exist`, eventModule.Name, event)
if _, ok := hooks.subscriptions[event]; !ok {
return fmt.Errorf("unknown event %q", event)
}
// add hook
eventModule.eventHooks[event] = append(hooks, &eventHook{
description: description,
hookingModule: m,
hookFn: fn,
})
hooks.subscriptions[event] = append(
hooks.subscriptions[event],
&subscription{
description: descr,
subscriber: subscriber,
target: target,
callback: fn,
},
)
return nil
}
func (hooks *eventHooks) defineEvent(event string) {
hooks.Lock()
defer hooks.Unlock()
if hooks.subscriptions == nil {
hooks.subscriptions = make(map[string][]*subscription)
}
if _, ok := hooks.subscriptions[event]; !ok {
hooks.subscriptions[event] = make([]*subscription, 0, 1)
}
}

View file

@ -36,7 +36,7 @@ func parseFlags() error {
func printGraph() {
// mark roots
for _, module := range modules {
if len(module.depReverse) == 0 {
if len(module.dependencies.reverse) == 0 {
// is root, dont print deps in dep tree
module.stopFlag.Set()
}

View file

@ -104,7 +104,7 @@ func buildEnabledTree() {
}
func (m *Module) markDependencies() {
for _, dep := range m.depModules {
for _, dep := range m.dependencies.modules {
if dep.enabledAsDependency.SetToIf(false, true) {
dep.markDependencies()
}

View file

@ -129,7 +129,7 @@ func (m *Module) RunLowPriorityMicroTask(name *string, fn func(context.Context)
func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err error) {
// start for module
// hint: only microTasks global var is important for scheduling, others can be set here
atomic.AddInt32(m.microTaskCnt, 1)
atomic.AddInt32(m.stats.microTaskCnt, 1)
m.waitGroup.Add(1)
// set up recovery
@ -144,7 +144,7 @@ func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err
}
// finish for module
atomic.AddInt32(m.microTaskCnt, -1)
atomic.AddInt32(m.stats.microTaskCnt, -1)
m.waitGroup.Done()
// finish and possibly trigger next task

View file

@ -21,55 +21,78 @@ var (
moduleStartTimeout = 2 * time.Minute
moduleStopTimeout = 1 * time.Minute
)
// ErrCleanExit is returned by Start() when the program is interrupted before starting. This can happen for example, when using the "--help" flag.
var (
// ErrCleanExit is returned by Start() when the program is
// interrupted before starting. This can happen for example,
// when using the "--help" flag.
ErrCleanExit = errors.New("clean exit requested")
)
// Module represents a module.
type Module struct { //nolint:maligned // not worth the effort
sync.RWMutex
type (
// LiveCycleFunc is called at specific times during the live-cycle
// of a module.
LiveCycleFunc func() error
Name string
liveCycleHooks struct {
prepFunc LiveCycleFunc
startFunc LiveCycleFunc
stopFunc LiveCycleFunc
}
// status mgmt
enabled *abool.AtomicBool
enabledAsDependency *abool.AtomicBool
status uint8
depInfo struct {
names []string
modules []*Module
reverse []*Module
}
// failure status
failureStatus uint8
failureID string
failureMsg string
failureInfo struct {
failureStatus uint8
failureID string
failureMsg string
}
// lifecycle callback functions
prepFn func() error
startFn func() error
stopFn func() error
taskStats struct {
workerCnt *int32
taskCnt *int32
microTaskCnt *int32
}
// lifecycle mgmt
// start
startComplete chan struct{}
// stop
Ctx context.Context
cancelCtx func()
stopFlag *abool.AtomicBool
// Module represents a module.
Module struct { //nolint:maligned // not worth the effort
sync.RWMutex
// workers/tasks
workerCnt *int32
taskCnt *int32
microTaskCnt *int32
waitGroup sync.WaitGroup
// Name is the name of the module and is meant to be
// used for presentation purposes (i.e. user interface).
// Module names must be unique otherwise bad things will
// happen!
Name string
// events
eventHooks map[string][]*eventHook
eventHooksLock sync.RWMutex
// status mgmt
enabled *abool.AtomicBool
enabledAsDependency *abool.AtomicBool
status uint8
// dependency mgmt
depNames []string
depModules []*Module
depReverse []*Module
}
failureInfo
hooks liveCycleHooks
// lifecycle mgmt
// start
startComplete chan struct{}
// stop
Ctx context.Context
cancelCtx func()
stopFlag *abool.AtomicBool
// workers/tasks
waitGroup sync.WaitGroup
stats taskStats
events eventHooks
dependencies depInfo
}
)
// StartCompleted returns a channel read that triggers when the module has finished starting.
func (m *Module) StartCompleted() <-chan struct{} {
@ -94,7 +117,7 @@ func (m *Module) IsStopping() bool {
func (m *Module) Dependencies() []*Module {
m.RLock()
defer m.RUnlock()
return m.depModules
return m.dependencies.modules
}
func (m *Module) prep(reports chan *report) {
@ -116,12 +139,12 @@ func (m *Module) prep(reports chan *report) {
// run prep function
go func() {
var err error
if m.prepFn != nil {
if m.hooks.prepFunc != nil {
// execute function
err = m.runCtrlFnWithTimeout(
"prep module",
moduleStartTimeout,
m.prepFn,
m.hooks.prepFunc,
)
}
// set status
@ -172,12 +195,12 @@ func (m *Module) start(reports chan *report) {
// run start function
go func() {
var err error
if m.startFn != nil {
if m.hooks.startFunc != nil {
// execute function
err = m.runCtrlFnWithTimeout(
"start module",
moduleStartTimeout,
m.startFn,
m.hooks.startFunc,
)
}
// set status
@ -232,10 +255,10 @@ func (m *Module) stopAllTasks(reports chan *report) {
// start shutdown function
stopFnFinished := abool.NewBool(false)
var stopFnError error
if m.stopFn != nil {
if m.hooks.stopFunc != nil {
m.waitGroup.Add(1)
go func() {
stopFnError = m.runCtrlFn("stop module", m.stopFn)
stopFnError = m.runCtrlFn("stop module", m.hooks.stopFunc)
stopFnFinished.Set()
m.waitGroup.Done()
}()
@ -256,9 +279,9 @@ func (m *Module) stopAllTasks(reports chan *report) {
"%s: timed out while waiting for stopfn/workers/tasks to finish: stopFn=%v workers=%d tasks=%d microtasks=%d, continuing shutdown...",
m.Name,
stopFnFinished.IsSet(),
atomic.LoadInt32(m.workerCnt),
atomic.LoadInt32(m.taskCnt),
atomic.LoadInt32(m.microTaskCnt),
atomic.LoadInt32(m.stats.workerCnt),
atomic.LoadInt32(m.stats.taskCnt),
atomic.LoadInt32(m.stats.microTaskCnt),
)
}
@ -307,27 +330,29 @@ func Register(name string, prep, start, stop func() error, dependencies ...strin
func initNewModule(name string, prep, start, stop func() error, dependencies ...string) *Module {
ctx, cancelCtx := context.WithCancel(context.Background())
var workerCnt int32
var taskCnt int32
var microTaskCnt int32
newModule := &Module{
Name: name,
enabled: abool.NewBool(false),
enabledAsDependency: abool.NewBool(false),
prepFn: prep,
startFn: start,
stopFn: stop,
startComplete: make(chan struct{}),
Ctx: ctx,
cancelCtx: cancelCtx,
stopFlag: abool.NewBool(false),
workerCnt: &workerCnt,
taskCnt: &taskCnt,
microTaskCnt: &microTaskCnt,
waitGroup: sync.WaitGroup{},
eventHooks: make(map[string][]*eventHook),
depNames: dependencies,
hooks: liveCycleHooks{
prepFunc: prep,
startFunc: start,
stopFunc: stop,
},
startComplete: make(chan struct{}),
Ctx: ctx,
cancelCtx: cancelCtx,
stopFlag: abool.NewBool(false),
stats: taskStats{
workerCnt: new(int32),
taskCnt: new(int32),
microTaskCnt: new(int32),
},
waitGroup: sync.WaitGroup{},
dependencies: depInfo{
names: dependencies,
},
}
return newModule
@ -335,7 +360,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
func initDependencies() error {
for _, m := range modules {
for _, depName := range m.depNames {
for _, depName := range m.dependencies.names {
// get dependency
depModule, ok := modules[depName]
@ -344,9 +369,8 @@ func initDependencies() error {
}
// link together
m.depModules = append(m.depModules, depModule)
depModule.depReverse = append(depModule.depReverse, m)
m.dependencies.modules = append(m.dependencies.modules, depModule)
depModule.dependencies.reverse = append(depModule.dependencies.reverse, m)
}
}

View file

@ -113,7 +113,7 @@ func (m *Module) readyToPrep() uint8 {
return statusNothingToDo
}
for _, dep := range m.depModules {
for _, dep := range m.dependencies.modules {
if dep.Status() < StatusOffline {
return statusWaiting
}
@ -137,7 +137,7 @@ func (m *Module) readyToStart() uint8 {
}
// check if all dependencies are ready
for _, dep := range m.depModules {
for _, dep := range m.dependencies.modules {
if dep.Status() < StatusOnline {
return statusWaiting
}
@ -160,7 +160,7 @@ func (m *Module) readyToStop() uint8 {
return statusNothingToDo
}
for _, revDep := range m.depReverse {
for _, revDep := range m.dependencies.reverse {
// not ready if a reverse dependency was started, but not yet stopped
if revDep.Status() > StatusOffline {
return statusWaiting

View file

@ -274,11 +274,9 @@ func (t *Task) runWithLocking() {
}
// check if module was stopped
select {
case <-t.ctx.Done():
if t.ctx.Err() != nil {
t.lock.Unlock()
return
default:
}
// enter executing state
@ -291,20 +289,16 @@ func (t *Task) runWithLocking() {
case <-time.After(maxTimeslotWait):
}
// wait for module start
if !t.module.Online() {
if t.module.OnlineSoon() {
// wait
<-t.module.StartCompleted()
} else {
// abort, module will not come online
t.lock.Lock()
t.executing = false
t.lock.Unlock()
return
}
if !t.module.OnlineSoon() {
// abort, module will not come online
t.lock.Lock()
t.executing = false
t.lock.Unlock()
return
}
<-t.module.StartCompleted()
// add to queue workgroup
queueWg.Add(1)
@ -322,7 +316,7 @@ func (t *Task) runWithLocking() {
func (t *Task) executeWithLocking() {
// start for module
// hint: only queueWg global var is important for scheduling, others can be set here
atomic.AddInt32(t.module.taskCnt, 1)
atomic.AddInt32(t.module.stats.taskCnt, 1)
t.module.waitGroup.Add(1)
defer func() {
@ -335,7 +329,7 @@ func (t *Task) executeWithLocking() {
}
// finish for module
atomic.AddInt32(t.module.taskCnt, -1)
atomic.AddInt32(t.module.stats.taskCnt, -1)
t.module.waitGroup.Done()
t.lock.Lock()

View file

@ -40,6 +40,7 @@ var qtModule *Module
func init() {
qtModule = initNewModule("task test module", nil, nil, nil)
qtModule.status = StatusOnline
close(qtModule.startComplete)
}
// functions

View file

@ -38,10 +38,10 @@ func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
return errNoModule
}
atomic.AddInt32(m.workerCnt, 1)
atomic.AddInt32(m.stats.workerCnt, 1)
m.waitGroup.Add(1)
defer func() {
atomic.AddInt32(m.workerCnt, -1)
atomic.AddInt32(m.stats.workerCnt, -1)
m.waitGroup.Done()
}()
@ -59,10 +59,10 @@ func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration,
}
func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
atomic.AddInt32(m.workerCnt, 1)
atomic.AddInt32(m.stats.workerCnt, 1)
m.waitGroup.Add(1)
defer func() {
atomic.AddInt32(m.workerCnt, -1)
atomic.AddInt32(m.stats.workerCnt, -1)
m.waitGroup.Done()
}()

View file

@ -171,6 +171,10 @@ func (n *Notification) save(pushUpdate bool) *Notification {
n.actionFunction = noOpAction
}
if n.State == "" {
n.State = Active
}
// Make sure we always have a reasonable expiration set.
if n.Expires == 0 {
n.Expires = time.Now().Add(72 * time.Hour).Unix()