Add subsystems package and make modules able to be turned off and on

This commit is contained in:
Daniel 2020-02-18 15:40:36 +01:00
parent 432743ccc2
commit b4f014574b
17 changed files with 1488 additions and 391 deletions

View file

@ -17,7 +17,9 @@ type eventHook struct {
// TriggerEvent executes all hook functions registered to the specified event.
func (m *Module) TriggerEvent(event string, data interface{}) {
go m.processEventTrigger(event, data)
if m.OnlineSoon() {
go m.processEventTrigger(event, data)
}
}
func (m *Module) processEventTrigger(event string, data interface{}) {
@ -31,18 +33,35 @@ func (m *Module) processEventTrigger(event string, data interface{}) {
}
for _, hook := range hooks {
if !hook.hookingModule.ShutdownInProgress() {
if hook.hookingModule.OnlineSoon() {
go m.runEventHook(hook, event, data)
}
}
}
func (m *Module) runEventHook(hook *eventHook, event string, data interface{}) {
if !hook.hookingModule.Started.IsSet() {
// check if source module is ready for handling
if m.Status() != StatusOnline {
// target module has not yet fully started, wait until start is complete
select {
case <-startCompleteSignal:
case <-shutdownSignal:
case <-m.StartCompleted():
// continue with hook execution
case <-hook.hookingModule.Stopping():
return
case <-m.Stopping():
return
}
}
// check if destionation module is ready for handling
if hook.hookingModule.Status() != StatusOnline {
// target module has not yet fully started, wait until start is complete
select {
case <-hook.hookingModule.StartCompleted():
// continue with hook execution
case <-hook.hookingModule.Stopping():
return
case <-m.Stopping():
return
}
}
@ -69,7 +88,7 @@ func (m *Module) RegisterEvent(event string) {
}
}
// RegisterEventHook registers a hook function with (another) modules' event. Whenever a hook is triggered and the receiving module has not yet fully started, hook execution will be delayed until all modules completed starting.
// RegisterEventHook registers a hook function with (another) modules' event. Whenever a hook is triggered and the receiving module has not yet fully started, hook execution will be delayed until the modules completed starting.
func (m *Module) RegisterEventHook(module string, event string, description string, fn func(context.Context, interface{}) error) error {
// get target module
var eventModule *Module
@ -77,9 +96,7 @@ func (m *Module) RegisterEventHook(module string, event string, description stri
eventModule = m
} else {
var ok bool
modulesLock.RLock()
eventModule, ok = modules[module]
modulesLock.RUnlock()
if !ok {
return fmt.Errorf(`module "%s" does not exist`, module)
}

View file

@ -12,7 +12,6 @@ func init() {
}
func parseFlags() error {
// parse flags
flag.Parse()

107
modules/mgmt.go Normal file
View file

@ -0,0 +1,107 @@
package modules
import (
"context"
"github.com/safing/portbase/log"
"github.com/tevino/abool"
)
var (
moduleMgmtEnabled = abool.NewBool(false)
modulesChangeNotifyFn func(*Module)
)
// Enable enables the module. Only has an effect if module management is enabled.
func (m *Module) Enable() (changed bool) {
return m.enabled.SetToIf(false, true)
}
// Disable disables the module. Only has an effect if module management is enabled.
func (m *Module) Disable() (changed bool) {
return m.enabled.SetToIf(true, false)
}
// SetEnabled sets the module to the desired enabled state. Only has an effect if module management is enabled.
func (m *Module) SetEnabled(enable bool) (changed bool) {
if enable {
return m.Enable()
}
return m.Disable()
}
// Enabled returns wether or not the module is currently enabled.
func (m *Module) Enabled() bool {
return m.enabled.IsSet()
}
// EnableModuleManagement enables the module management functionality within modules. The supplied notify function will be called whenever the status of a module changes. The affected module will be in the parameter. You will need to manually enable modules, else nothing will start.
func EnableModuleManagement(changeNotifyFn func(*Module)) {
if moduleMgmtEnabled.SetToIf(false, true) {
modulesChangeNotifyFn = changeNotifyFn
}
}
func (m *Module) notifyOfChange() {
if moduleMgmtEnabled.IsSet() && modulesChangeNotifyFn != nil {
m.StartWorker("notify of change", func(ctx context.Context) error {
modulesChangeNotifyFn(m)
return nil
})
}
}
// ManageModules triggers the module manager to react to recent changes of enabled modules.
func ManageModules() error {
// check if enabled
if !moduleMgmtEnabled.IsSet() {
return nil
}
// lock mgmt
mgmtLock.Lock()
defer mgmtLock.Unlock()
log.Info("modules: managing changes")
// build new dependency tree
buildEnabledTree()
// stop unneeded modules
lastErr := stopModules()
if lastErr != nil {
log.Warning(lastErr.Error())
}
// start needed modules
err := startModules()
if err != nil {
log.Warning(err.Error())
lastErr = err
}
log.Info("modules: finished managing")
return lastErr
}
func buildEnabledTree() {
// reset marked dependencies
for _, m := range modules {
m.enabledAsDependency.UnSet()
}
// mark dependencies
for _, m := range modules {
if m.enabled.IsSet() {
m.markDependencies()
}
}
}
func (m *Module) markDependencies() {
for _, dep := range m.depModules {
if dep.enabledAsDependency.SetToIf(false, true) {
dep.markDependencies()
}
}
}

165
modules/mgmt_test.go Normal file
View file

@ -0,0 +1,165 @@
package modules
import (
"testing"
)
func testModuleMgmt(t *testing.T) {
// enable module management
EnableModuleManagement(nil)
registerTestModule(t, "base")
registerTestModule(t, "feature1", "base")
registerTestModule(t, "base2", "base")
registerTestModule(t, "feature2", "base2")
registerTestModule(t, "sub-feature", "base")
registerTestModule(t, "feature3", "sub-feature")
registerTestModule(t, "feature4", "sub-feature")
// enable core module
core := modules["base"]
core.Enable()
// start and check order
err := Start()
if err != nil {
t.Error(err)
}
if changeHistory != " on:base" {
t.Errorf("order mismatch, was %s", changeHistory)
}
changeHistory = ""
// enable feature1
feature1 := modules["feature1"]
feature1.Enable()
// manage modules and check
err = ManageModules()
if err != nil {
t.Fatal(err)
return
}
if changeHistory != " on:feature1" {
t.Errorf("order mismatch, was %s", changeHistory)
}
changeHistory = ""
// enable feature2
feature2 := modules["feature2"]
feature2.Enable()
// manage modules and check
err = ManageModules()
if err != nil {
t.Fatal(err)
return
}
if changeHistory != " on:base2 on:feature2" {
t.Errorf("order mismatch, was %s", changeHistory)
}
changeHistory = ""
// enable feature3
feature3 := modules["feature3"]
feature3.Enable()
// manage modules and check
err = ManageModules()
if err != nil {
t.Fatal(err)
return
}
if changeHistory != " on:sub-feature on:feature3" {
t.Errorf("order mismatch, was %s", changeHistory)
}
changeHistory = ""
// enable feature4
feature4 := modules["feature4"]
feature4.Enable()
// manage modules and check
err = ManageModules()
if err != nil {
t.Fatal(err)
return
}
if changeHistory != " on:feature4" {
t.Errorf("order mismatch, was %s", changeHistory)
}
changeHistory = ""
// disable feature1
feature1.Disable()
// manage modules and check
err = ManageModules()
if err != nil {
t.Fatal(err)
return
}
if changeHistory != " off:feature1" {
t.Errorf("order mismatch, was %s", changeHistory)
}
changeHistory = ""
// disable feature3
feature3.Disable()
// manage modules and check
err = ManageModules()
if err != nil {
t.Fatal(err)
return
}
// disable feature4
feature4.Disable()
// manage modules and check
err = ManageModules()
if err != nil {
t.Fatal(err)
return
}
if changeHistory != " off:feature3 off:feature4 off:sub-feature" {
t.Errorf("order mismatch, was %s", changeHistory)
}
changeHistory = ""
// enable feature4
feature4.Enable()
// manage modules and check
err = ManageModules()
if err != nil {
t.Fatal(err)
return
}
if changeHistory != " on:sub-feature on:feature4" {
t.Errorf("order mismatch, was %s", changeHistory)
}
changeHistory = ""
// disable feature4
feature4.Disable()
// manage modules and check
err = ManageModules()
if err != nil {
t.Fatal(err)
return
}
if changeHistory != " off:feature4 off:sub-feature" {
t.Errorf("order mismatch, was %s", changeHistory)
}
changeHistory = ""
err = Shutdown()
if err != nil {
t.Error(err)
}
if changeHistory != " off:feature2 off:base2 off:base" {
t.Errorf("order mismatch, was %s", changeHistory)
}
// reset history
changeHistory = ""
// disable module management
moduleMgmtEnabled.UnSet()
resetTestEnvironment()
}

View file

@ -172,7 +172,7 @@ func microTaskScheduler() {
microTaskManageLoop:
for {
if shutdownSignalClosed.IsSet() {
if shutdownFlag.IsSet() {
close(mediumPriorityClearance)
close(lowPriorityClearance)
return

View file

@ -13,32 +13,44 @@ import (
)
var (
modulesLock sync.RWMutex
modules = make(map[string]*Module)
modules = make(map[string]*Module)
mgmtLock sync.Mutex
// lock modules when starting
modulesLocked = abool.New()
// ErrCleanExit is returned by Start() when the program is interrupted before starting. This can happen for example, when using the "--help" flag.
ErrCleanExit = errors.New("clean exit requested")
)
// Module represents a module.
type Module struct {
type Module struct { //nolint:maligned // not worth the effort
sync.RWMutex
Name string
// lifecycle mgmt
Prepped *abool.AtomicBool
Started *abool.AtomicBool
Stopped *abool.AtomicBool
inTransition *abool.AtomicBool
// status mgmt
enabled *abool.AtomicBool
enabledAsDependency *abool.AtomicBool
status uint8
// failure status
failureStatus uint8
failureID string
failureMsg string
// lifecycle callback functions
prep func() error
start func() error
stop func() error
prepFn func() error
startFn func() error
stopFn func() error
// shutdown mgmt
Ctx context.Context
cancelCtx func()
shutdownFlag *abool.AtomicBool
// lifecycle mgmt
// start
startComplete chan struct{}
// stop
Ctx context.Context
cancelCtx func()
stopFlag *abool.AtomicBool
// workers/tasks
workerCnt *int32
@ -56,30 +68,177 @@ type Module struct {
depReverse []*Module
}
// ShutdownInProgress returns whether the module has started shutting down. In most cases, you should use ShuttingDown instead.
func (m *Module) ShutdownInProgress() bool {
return m.shutdownFlag.IsSet()
// StartCompleted returns a channel read that triggers when the module has finished starting.
func (m *Module) StartCompleted() <-chan struct{} {
m.RLock()
defer m.RUnlock()
return m.startComplete
}
// ShuttingDown lets you listen for the shutdown signal.
func (m *Module) ShuttingDown() <-chan struct{} {
// Stopping returns a channel read that triggers when the module has initiated the stop procedure.
func (m *Module) Stopping() <-chan struct{} {
m.RLock()
defer m.RUnlock()
return m.Ctx.Done()
}
func (m *Module) shutdown() error {
// signal shutdown
m.shutdownFlag.Set()
m.cancelCtx()
// IsStopping returns whether the module has started shutting down. In most cases, you should use Stopping instead.
func (m *Module) IsStopping() bool {
return m.stopFlag.IsSet()
}
// start shutdown function
m.waitGroup.Add(1)
stopFnError := make(chan error, 1)
// Dependencies returns the module's dependencies.
func (m *Module) Dependencies() []*Module {
m.RLock()
defer m.RUnlock()
return m.depModules
}
func (m *Module) prep(reports chan *report) {
// check and set intermediate status
m.Lock()
if m.status != StatusDead {
m.Unlock()
go func() {
reports <- &report{
module: m,
err: fmt.Errorf("module already prepped"),
}
}()
return
}
m.status = StatusPreparing
m.Unlock()
// run prep function
go func() {
stopFnError <- m.runCtrlFn("stop module", m.stop)
m.waitGroup.Done()
var err error
if m.prepFn != nil {
// execute function
err = m.runCtrlFnWithTimeout(
"prep module",
10*time.Second,
m.prepFn,
)
}
// set status
if err != nil {
m.Error(
"module-failed-prep",
fmt.Sprintf("failed to prep module: %s", err.Error()),
)
} else {
m.Lock()
m.status = StatusOffline
m.Unlock()
m.notifyOfChange()
}
// send report
reports <- &report{
module: m,
err: err,
}
}()
}
// wait for workers
func (m *Module) start(reports chan *report) {
// check and set intermediate status
m.Lock()
if m.status != StatusOffline {
m.Unlock()
go func() {
reports <- &report{
module: m,
err: fmt.Errorf("module not offline"),
}
}()
return
}
m.status = StatusStarting
// reset stop management
if m.cancelCtx != nil {
// trigger cancel just to be sure
m.cancelCtx()
}
m.Ctx, m.cancelCtx = context.WithCancel(context.Background())
m.stopFlag.UnSet()
m.Unlock()
// run start function
go func() {
var err error
if m.startFn != nil {
// execute function
err = m.runCtrlFnWithTimeout(
"start module",
10*time.Second,
m.startFn,
)
}
// set status
if err != nil {
m.Error(
"module-failed-start",
fmt.Sprintf("failed to start module: %s", err.Error()),
)
} else {
m.Lock()
m.status = StatusOnline
// init start management
close(m.startComplete)
m.Unlock()
m.notifyOfChange()
}
// send report
reports <- &report{
module: m,
err: err,
}
}()
}
func (m *Module) stop(reports chan *report) {
// check and set intermediate status
m.Lock()
if m.status != StatusOnline {
m.Unlock()
go func() {
reports <- &report{
module: m,
err: fmt.Errorf("module not online"),
}
}()
return
}
m.status = StatusStopping
// reset start management
m.startComplete = make(chan struct{})
// init stop management
m.cancelCtx()
m.stopFlag.Set()
m.Unlock()
go m.stopAllTasks(reports)
}
func (m *Module) stopAllTasks(reports chan *report) {
// start shutdown function
stopFnFinished := abool.NewBool(false)
var stopFnError error
if m.stopFn != nil {
m.waitGroup.Add(1)
go func() {
stopFnError = m.runCtrlFn("stop module", m.stopFn)
stopFnFinished.Set()
m.waitGroup.Done()
}()
}
// wait for workers and stop fn
done := make(chan struct{})
go func() {
m.waitGroup.Wait()
@ -91,8 +250,9 @@ func (m *Module) shutdown() error {
case <-done:
case <-time.After(30 * time.Second):
log.Warningf(
"%s: timed out while waiting for workers/tasks to finish: workers=%d tasks=%d microtasks=%d, continuing shutdown...",
"%s: timed out while waiting for stopfn/workers/tasks to finish: stopFn=%v workers=%d tasks=%d microtasks=%d, continuing shutdown...",
m.Name,
stopFnFinished.IsSet(),
atomic.LoadInt32(m.workerCnt),
atomic.LoadInt32(m.taskCnt),
atomic.LoadInt32(m.microTaskCnt),
@ -100,24 +260,37 @@ func (m *Module) shutdown() error {
}
// collect error
select {
case err := <-stopFnError:
return err
default:
log.Warningf(
"%s: timed out while waiting for stop function to finish, continuing shutdown...",
m.Name,
var err error
if stopFnFinished.IsSet() && stopFnError != nil {
err = stopFnError
}
// set status
if err != nil {
m.Error(
"module-failed-stop",
fmt.Sprintf("failed to stop module: %s", err.Error()),
)
return nil
} else {
m.Lock()
m.status = StatusOffline
m.Unlock()
m.notifyOfChange()
}
// send report
reports <- &report{
module: m,
err: err,
}
}
// Register registers a new module. The control functions `prep`, `start` and `stop` are technically optional. `stop` is called _after_ all added module workers finished.
func Register(name string, prep, start, stop func() error, dependencies ...string) *Module {
if modulesLocked.IsSet() {
return nil
}
newModule := initNewModule(name, prep, start, stop, dependencies...)
modulesLock.Lock()
defer modulesLock.Unlock()
// check for already existing module
_, ok := modules[name]
if ok {
@ -136,23 +309,22 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
var microTaskCnt int32
newModule := &Module{
Name: name,
Prepped: abool.NewBool(false),
Started: abool.NewBool(false),
Stopped: abool.NewBool(false),
inTransition: abool.NewBool(false),
Ctx: ctx,
cancelCtx: cancelCtx,
shutdownFlag: abool.NewBool(false),
waitGroup: sync.WaitGroup{},
workerCnt: &workerCnt,
taskCnt: &taskCnt,
microTaskCnt: &microTaskCnt,
prep: prep,
start: start,
stop: stop,
eventHooks: make(map[string][]*eventHook),
depNames: dependencies,
Name: name,
enabled: abool.NewBool(false),
enabledAsDependency: abool.NewBool(false),
prepFn: prep,
startFn: start,
stopFn: stop,
startComplete: make(chan struct{}),
Ctx: ctx,
cancelCtx: cancelCtx,
stopFlag: abool.NewBool(false),
workerCnt: &workerCnt,
taskCnt: &taskCnt,
microTaskCnt: &microTaskCnt,
waitGroup: sync.WaitGroup{},
eventHooks: make(map[string][]*eventHook),
depNames: dependencies,
}
return newModule
@ -177,49 +349,3 @@ func initDependencies() error {
return nil
}
// ReadyToPrep returns whether all dependencies are ready for this module to prep.
func (m *Module) ReadyToPrep() bool {
if m.inTransition.IsSet() || m.Prepped.IsSet() {
return false
}
for _, dep := range m.depModules {
if !dep.Prepped.IsSet() {
return false
}
}
return true
}
// ReadyToStart returns whether all dependencies are ready for this module to start.
func (m *Module) ReadyToStart() bool {
if m.inTransition.IsSet() || m.Started.IsSet() {
return false
}
for _, dep := range m.depModules {
if !dep.Started.IsSet() {
return false
}
}
return true
}
// ReadyToStop returns whether all dependencies are ready for this module to stop.
func (m *Module) ReadyToStop() bool {
if !m.Started.IsSet() || m.inTransition.IsSet() || m.Stopped.IsSet() {
return false
}
for _, revDep := range m.depReverse {
// not ready if a reverse dependency was started, but not yet stopped
if revDep.Started.IsSet() && !revDep.Stopped.IsSet() {
return false
}
}
return true
}

View file

@ -5,40 +5,36 @@ import (
"fmt"
"sync"
"testing"
"time"
)
var (
orderLock sync.Mutex
startOrder string
shutdownOrder string
changeHistoryLock sync.Mutex
changeHistory string
)
func testPrep(t *testing.T, name string) func() error {
return func() error {
t.Logf("prep %s\n", name)
return nil
}
}
func testStart(t *testing.T, name string) func() error {
return func() error {
orderLock.Lock()
defer orderLock.Unlock()
t.Logf("start %s\n", name)
startOrder = fmt.Sprintf("%s>%s", startOrder, name)
return nil
}
}
func testStop(t *testing.T, name string) func() error {
return func() error {
orderLock.Lock()
defer orderLock.Unlock()
t.Logf("stop %s\n", name)
shutdownOrder = fmt.Sprintf("%s>%s", shutdownOrder, name)
return nil
}
func registerTestModule(t *testing.T, name string, dependencies ...string) {
Register(
name,
func() error {
t.Logf("prep %s\n", name)
return nil
},
func() error {
changeHistoryLock.Lock()
defer changeHistoryLock.Unlock()
t.Logf("start %s\n", name)
changeHistory = fmt.Sprintf("%s on:%s", changeHistory, name)
return nil
},
func() error {
changeHistoryLock.Lock()
defer changeHistoryLock.Unlock()
t.Logf("stop %s\n", name)
changeHistory = fmt.Sprintf("%s off:%s", changeHistory, name)
return nil
},
dependencies...,
)
}
func testFail() error {
@ -53,135 +49,94 @@ func TestModules(t *testing.T) {
t.Parallel() // Not really, just a workaround for running these tests last.
t.Run("TestModuleOrder", testModuleOrder)
t.Run("TestModuleMgmt", testModuleMgmt)
t.Run("TestModuleErrors", testModuleErrors)
}
func testModuleOrder(t *testing.T) {
Register("database", testPrep(t, "database"), testStart(t, "database"), testStop(t, "database"))
Register("stats", testPrep(t, "stats"), testStart(t, "stats"), testStop(t, "stats"), "database")
Register("service", testPrep(t, "service"), testStart(t, "service"), testStop(t, "service"), "database")
Register("analytics", testPrep(t, "analytics"), testStart(t, "analytics"), testStop(t, "analytics"), "stats", "database")
registerTestModule(t, "database")
registerTestModule(t, "stats", "database")
registerTestModule(t, "service", "database")
registerTestModule(t, "analytics", "stats", "database")
err := Start()
if err != nil {
t.Error(err)
}
if startOrder != ">database>service>stats>analytics" &&
startOrder != ">database>stats>service>analytics" &&
startOrder != ">database>stats>analytics>service" {
t.Errorf("start order mismatch, was %s", startOrder)
if changeHistory != " on:database on:service on:stats on:analytics" &&
changeHistory != " on:database on:stats on:service on:analytics" &&
changeHistory != " on:database on:stats on:analytics on:service" {
t.Errorf("start order mismatch, was %s", changeHistory)
}
changeHistory = ""
var wg sync.WaitGroup
wg.Add(1)
go func() {
select {
case <-ShuttingDown():
case <-time.After(1 * time.Second):
t.Error("did not receive shutdown signal")
}
wg.Done()
}()
err = Shutdown()
if err != nil {
t.Error(err)
}
if shutdownOrder != ">analytics>service>stats>database" &&
shutdownOrder != ">analytics>stats>service>database" &&
shutdownOrder != ">service>analytics>stats>database" {
t.Errorf("shutdown order mismatch, was %s", shutdownOrder)
if changeHistory != " off:analytics off:service off:stats off:database" &&
changeHistory != " off:analytics off:stats off:service off:database" &&
changeHistory != " off:service off:analytics off:stats off:database" {
t.Errorf("shutdown order mismatch, was %s", changeHistory)
}
changeHistory = ""
wg.Wait()
printAndRemoveModules()
}
func printAndRemoveModules() {
modulesLock.Lock()
defer modulesLock.Unlock()
fmt.Printf("All %d modules:\n", len(modules))
for _, m := range modules {
fmt.Printf("module %s: %+v\n", m.Name, m)
}
modules = make(map[string]*Module)
resetTestEnvironment()
}
func testModuleErrors(t *testing.T) {
// reset modules
modules = make(map[string]*Module)
startComplete.UnSet()
startCompleteSignal = make(chan struct{})
// test prep error
Register("prepfail", testFail, testStart(t, "prepfail"), testStop(t, "prepfail"))
Register("prepfail", testFail, nil, nil)
err := Start()
if err == nil {
t.Error("should fail")
}
// reset modules
modules = make(map[string]*Module)
startComplete.UnSet()
startCompleteSignal = make(chan struct{})
resetTestEnvironment()
// test prep clean exit
Register("prepcleanexit", testCleanExit, testStart(t, "prepcleanexit"), testStop(t, "prepcleanexit"))
Register("prepcleanexit", testCleanExit, nil, nil)
err = Start()
if err != ErrCleanExit {
t.Error("should fail with clean exit")
}
// reset modules
modules = make(map[string]*Module)
startComplete.UnSet()
startCompleteSignal = make(chan struct{})
resetTestEnvironment()
// test invalid dependency
Register("database", nil, testStart(t, "database"), testStop(t, "database"), "invalid")
Register("database", nil, nil, nil, "invalid")
err = Start()
if err == nil {
t.Error("should fail")
}
// reset modules
modules = make(map[string]*Module)
startComplete.UnSet()
startCompleteSignal = make(chan struct{})
resetTestEnvironment()
// test dependency loop
Register("database", nil, testStart(t, "database"), testStop(t, "database"), "helper")
Register("helper", nil, testStart(t, "helper"), testStop(t, "helper"), "database")
registerTestModule(t, "database", "helper")
registerTestModule(t, "helper", "database")
err = Start()
if err == nil {
t.Error("should fail")
}
// reset modules
modules = make(map[string]*Module)
startComplete.UnSet()
startCompleteSignal = make(chan struct{})
resetTestEnvironment()
// test failing module start
Register("startfail", nil, testFail, testStop(t, "startfail"))
Register("startfail", nil, testFail, nil)
err = Start()
if err == nil {
t.Error("should fail")
}
// reset modules
modules = make(map[string]*Module)
startComplete.UnSet()
startCompleteSignal = make(chan struct{})
resetTestEnvironment()
// test failing module stop
Register("stopfail", nil, testStart(t, "stopfail"), testFail)
Register("stopfail", nil, nil, testFail)
err = Start()
if err != nil {
t.Error("should not fail")
@ -191,10 +146,7 @@ func testModuleErrors(t *testing.T) {
t.Error("should fail")
}
// reset modules
modules = make(map[string]*Module)
startComplete.UnSet()
startCompleteSignal = make(chan struct{})
resetTestEnvironment()
// test help flag
HelpFlag = true
@ -204,4 +156,20 @@ func testModuleErrors(t *testing.T) {
}
HelpFlag = false
resetTestEnvironment()
}
func printModules() { //nolint:unused,deadcode
fmt.Printf("All %d modules:\n", len(modules))
for _, m := range modules {
fmt.Printf("module %s: %+v\n", m.Name, m)
}
}
func resetTestEnvironment() {
modules = make(map[string]*Module)
shutdownSignal = make(chan struct{})
shutdownCompleteSignal = make(chan struct{})
shutdownFlag.UnSet()
modulesLocked.UnSet()
}

View file

@ -1,34 +1,38 @@
package modules
import (
"errors"
"fmt"
"os"
"runtime"
"time"
"github.com/tevino/abool"
"github.com/safing/portbase/log"
"github.com/tevino/abool"
)
var (
startComplete = abool.NewBool(false)
startCompleteSignal = make(chan struct{})
initialStartCompleted = abool.NewBool(false)
globalPrepFn func() error
)
// StartCompleted returns whether starting has completed.
func StartCompleted() bool {
return startComplete.IsSet()
}
// WaitForStartCompletion returns as soon as starting has completed.
func WaitForStartCompletion() <-chan struct{} {
return startCompleteSignal
// SetGlobalPrepFn sets a global prep function that is run before all modules. This can be used to pre-initialize modules, such as setting the data root or database path.
// SetGlobalPrepFn sets a global prep function that is run before all modules.
func SetGlobalPrepFn(fn func() error) {
if globalPrepFn == nil {
globalPrepFn = fn
}
}
// Start starts all modules in the correct order. In case of an error, it will automatically shutdown again.
func Start() error {
modulesLock.RLock()
defer modulesLock.RUnlock()
if !modulesLocked.SetToIf(false, true) {
return errors.New("module system already started")
}
// lock mgmt
mgmtLock.Lock()
defer mgmtLock.Unlock()
// start microtask scheduler
go microTaskScheduler()
@ -48,6 +52,17 @@ func Start() error {
return err
}
// execute global prep fn
if globalPrepFn != nil {
err = globalPrepFn()
if err != nil {
if err != ErrCleanExit {
fmt.Fprintf(os.Stderr, "CRITICAL ERROR: %s\n", err)
}
return err
}
}
// prep modules
err = prepareModules()
if err != nil {
@ -65,6 +80,9 @@ func Start() error {
return err
}
// build dependency tree
buildEnabledTree()
// start modules
log.Info("modules: initiating...")
err = startModules()
@ -75,13 +93,11 @@ func Start() error {
// complete startup
log.Infof("modules: started %d modules", len(modules))
if startComplete.SetToIf(false, true) {
close(startCompleteSignal)
}
go taskQueueHandler()
go taskScheduleHandler()
initialStartCompleted.Set()
return nil
}
@ -97,45 +113,36 @@ func prepareModules() error {
reportCnt := 0
for {
waiting := 0
// find modules to exec
for _, m := range modules {
if m.ReadyToPrep() {
switch m.readyToPrep() {
case statusNothingToDo:
case statusWaiting:
waiting++
case statusReady:
execCnt++
m.inTransition.Set()
execM := m
go func() {
reports <- &report{
module: execM,
err: execM.runCtrlFnWithTimeout(
"prep module",
10*time.Second,
execM.prep,
),
}
}()
m.prep(reports)
}
}
// check for dep loop
if execCnt == reportCnt {
return fmt.Errorf("modules: dependency loop detected, cannot continue")
}
// wait for reports
rep = <-reports
rep.module.inTransition.UnSet()
if rep.err != nil {
if rep.err == ErrCleanExit {
return rep.err
if reportCnt < execCnt {
// wait for reports
rep = <-reports
if rep.err != nil {
if rep.err == ErrCleanExit {
return rep.err
}
return fmt.Errorf("failed to prep module %s: %s", rep.module.Name, rep.err)
}
reportCnt++
} else {
// finished
if waiting > 0 {
// check for dep loop
return fmt.Errorf("modules: dependency loop detected, cannot continue")
}
return fmt.Errorf("failed to prep module %s: %s", rep.module.Name, rep.err)
}
reportCnt++
rep.module.Prepped.Set()
// exit if done
if reportCnt == len(modules) {
return nil
}
@ -149,45 +156,36 @@ func startModules() error {
reportCnt := 0
for {
waiting := 0
// find modules to exec
for _, m := range modules {
if m.ReadyToStart() {
switch m.readyToStart() {
case statusNothingToDo:
case statusWaiting:
waiting++
case statusReady:
execCnt++
m.inTransition.Set()
execM := m
go func() {
reports <- &report{
module: execM,
err: execM.runCtrlFnWithTimeout(
"start module",
60*time.Second,
execM.start,
),
}
}()
m.start(reports)
}
}
// check for dep loop
if execCnt == reportCnt {
return fmt.Errorf("modules: dependency loop detected, cannot continue")
}
// wait for reports
rep = <-reports
rep.module.inTransition.UnSet()
if rep.err != nil {
return fmt.Errorf("modules: could not start module %s: %s", rep.module.Name, rep.err)
}
reportCnt++
rep.module.Started.Set()
log.Infof("modules: started %s", rep.module.Name)
// exit if done
if reportCnt == len(modules) {
if reportCnt < execCnt {
// wait for reports
rep = <-reports
if rep.err != nil {
return fmt.Errorf("modules: could not start module %s: %s", rep.module.Name, rep.err)
}
reportCnt++
log.Infof("modules: started %s", rep.module.Name)
} else {
// finished
if waiting > 0 {
// check for dep loop
return fmt.Errorf("modules: dependency loop detected, cannot continue")
}
// return last error
return nil
}
}
}

171
modules/status.go Normal file
View file

@ -0,0 +1,171 @@
package modules
// Module Status Values
const (
StatusDead uint8 = 0 // not prepared, not started
StatusPreparing uint8 = 1
StatusOffline uint8 = 2 // prepared, not started
StatusStopping uint8 = 3
StatusStarting uint8 = 4
StatusOnline uint8 = 5 // online and running
)
// Module Failure Status Values
const (
FailureNone uint8 = 0
FailureHint uint8 = 1
FailureWarning uint8 = 2
FailureError uint8 = 3
)
// ready status
const (
statusWaiting uint8 = iota
statusReady
statusNothingToDo
)
// Online returns whether the module is online.
func (m *Module) Online() bool {
return m.Status() == StatusOnline
}
// OnlineSoon returns whether the module is or is about to be online.
func (m *Module) OnlineSoon() bool {
if moduleMgmtEnabled.IsSet() &&
!m.enabled.IsSet() &&
!m.enabledAsDependency.IsSet() {
return false
}
return !m.stopFlag.IsSet()
}
// Status returns the current module status.
func (m *Module) Status() uint8 {
m.RLock()
defer m.RUnlock()
return m.status
}
// FailureStatus returns the current failure status, ID and message.
func (m *Module) FailureStatus() (failureStatus uint8, failureID, failureMsg string) {
m.RLock()
defer m.RUnlock()
return m.failureStatus, m.failureID, m.failureMsg
}
// Hint sets failure status to hint. This is a somewhat special failure status, as the module is believed to be working correctly, but there is an important module specific information to convey. The supplied failureID is for improved automatic handling within connected systems, the failureMsg is for humans.
func (m *Module) Hint(failureID, failureMsg string) {
m.Lock()
defer m.Unlock()
m.failureStatus = FailureHint
m.failureID = failureID
m.failureMsg = failureMsg
m.notifyOfChange()
}
// Warning sets failure status to warning. The supplied failureID is for improved automatic handling within connected systems, the failureMsg is for humans.
func (m *Module) Warning(failureID, failureMsg string) {
m.Lock()
defer m.Unlock()
m.failureStatus = FailureWarning
m.failureID = failureID
m.failureMsg = failureMsg
m.notifyOfChange()
}
// Error sets failure status to error. The supplied failureID is for improved automatic handling within connected systems, the failureMsg is for humans.
func (m *Module) Error(failureID, failureMsg string) {
m.Lock()
defer m.Unlock()
m.failureStatus = FailureError
m.failureID = failureID
m.failureMsg = failureMsg
m.notifyOfChange()
}
// Resolve removes the failure state from the module if the given failureID matches the current failure ID. If the given failureID is an empty string, Resolve removes any failure state.
func (m *Module) Resolve(failureID string) {
m.Lock()
defer m.Unlock()
if failureID == "" || failureID == m.failureID {
m.failureStatus = FailureNone
m.failureID = ""
m.failureMsg = ""
}
m.notifyOfChange()
}
// readyToPrep returns whether all dependencies are ready for this module to prep.
func (m *Module) readyToPrep() uint8 {
// check if valid state for prepping
if m.Status() != StatusDead {
return statusNothingToDo
}
for _, dep := range m.depModules {
if dep.Status() < StatusOffline {
return statusWaiting
}
}
return statusReady
}
// readyToStart returns whether all dependencies are ready for this module to start.
func (m *Module) readyToStart() uint8 {
// check if start is wanted
if moduleMgmtEnabled.IsSet() {
if !m.enabled.IsSet() && !m.enabledAsDependency.IsSet() {
return statusNothingToDo
}
}
// check if valid state for starting
if m.Status() != StatusOffline {
return statusNothingToDo
}
// check if all dependencies are ready
for _, dep := range m.depModules {
if dep.Status() < StatusOnline {
return statusWaiting
}
}
return statusReady
}
// readyToStop returns whether all dependencies are ready for this module to stop.
func (m *Module) readyToStop() uint8 {
// check if stop is wanted
if moduleMgmtEnabled.IsSet() && !shutdownFlag.IsSet() {
if m.enabled.IsSet() || m.enabledAsDependency.IsSet() {
return statusNothingToDo
}
}
// check if valid state for stopping
if m.Status() != StatusOnline {
return statusNothingToDo
}
for _, revDep := range m.depReverse {
// not ready if a reverse dependency was started, but not yet stopped
if revDep.Status() > StatusOffline {
return statusWaiting
}
}
return statusReady
}

View file

@ -10,8 +10,8 @@ import (
)
var (
shutdownSignal = make(chan struct{})
shutdownSignalClosed = abool.NewBool(false)
shutdownSignal = make(chan struct{})
shutdownFlag = abool.NewBool(false)
shutdownCompleteSignal = make(chan struct{})
)
@ -23,18 +23,19 @@ func ShuttingDown() <-chan struct{} {
// Shutdown stops all modules in the correct order.
func Shutdown() error {
// lock mgmt
mgmtLock.Lock()
defer mgmtLock.Unlock()
if shutdownSignalClosed.SetToIf(false, true) {
if shutdownFlag.SetToIf(false, true) {
close(shutdownSignal)
} else {
// shutdown was already issued
return errors.New("shutdown already initiated")
}
if startComplete.IsSet() {
if initialStartCompleted.IsSet() {
log.Warning("modules: starting shutdown...")
modulesLock.Lock()
defer modulesLock.Unlock()
} else {
log.Warning("modules: aborting, shutting down...")
}
@ -61,46 +62,42 @@ func stopModules() error {
// get number of started modules
startedCnt := 0
for _, m := range modules {
if m.Started.IsSet() {
if m.Status() >= StatusStarting {
startedCnt++
}
}
for {
waiting := 0
// find modules to exec
for _, m := range modules {
if m.ReadyToStop() {
switch m.readyToStop() {
case statusNothingToDo:
case statusWaiting:
waiting++
case statusReady:
execCnt++
m.inTransition.Set()
execM := m
go func() {
reports <- &report{
module: execM,
err: execM.shutdown(),
}
}()
m.stop(reports)
}
}
// check for dep loop
if execCnt == reportCnt {
return fmt.Errorf("modules: dependency loop detected, cannot continue")
}
// wait for reports
rep = <-reports
rep.module.inTransition.UnSet()
if rep.err != nil {
lastErr = rep.err
log.Warningf("modules: could not stop module %s: %s", rep.module.Name, rep.err)
}
reportCnt++
rep.module.Stopped.Set()
log.Infof("modules: stopped %s", rep.module.Name)
// exit if done
if reportCnt == startedCnt {
if reportCnt < execCnt {
// wait for reports
rep = <-reports
if rep.err != nil {
lastErr = rep.err
log.Warningf("modules: could not stop module %s: %s", rep.module.Name, rep.err)
}
reportCnt++
log.Infof("modules: stopped %s", rep.module.Name)
} else {
// finished
if waiting > 0 {
// check for dep loop
return fmt.Errorf("modules: dependency loop detected, cannot continue")
}
// return last error
return lastErr
}
}

View file

@ -0,0 +1,83 @@
package subsystems
import (
"strings"
"github.com/safing/portbase/database"
_ "github.com/safing/portbase/database/dbmodule" // database module is required
"github.com/safing/portbase/modules"
)
const (
configChangeEvent = "config change"
subsystemsStatusChange = "status change"
)
var (
module *modules.Module
databaseKeySpace string
db = database.NewInterface(nil)
)
func init() {
// enable partial starting
modules.EnableModuleManagement(handleModuleChanges)
// register module and enable it for starting
module = modules.Register("subsystems", prep, start, nil, "config", "database")
module.Enable()
// register event for changes in the subsystem
module.RegisterEvent(subsystemsStatusChange)
}
func prep() error {
return module.RegisterEventHook("config", configChangeEvent, "control subsystems", handleConfigChanges)
}
func start() error {
// lock registration
subsystemsLocked.Set()
// lock slice and map
subsystemsLock.Lock()
// go through all dependencies
seen := make(map[string]struct{})
for _, sub := range subsystems {
// add main module
sub.Dependencies = append(sub.Dependencies, statusFromModule(sub.module))
// add dependencies
sub.addDependencies(sub.module, seen)
}
// unlock
subsystemsLock.Unlock()
// apply config
return handleConfigChanges(module.Ctx, nil)
}
func (sub *Subsystem) addDependencies(module *modules.Module, seen map[string]struct{}) {
for _, module := range module.Dependencies() {
_, ok := seen[module.Name]
if !ok {
// add dependency to modules
sub.Dependencies = append(sub.Dependencies, statusFromModule(module))
// mark as seen
seen[module.Name] = struct{}{}
// add further dependencies
sub.addDependencies(module, seen)
}
}
}
// SetDatabaseKeySpace sets a key space where subsystem status
func SetDatabaseKeySpace(keySpace string) {
if databaseKeySpace == "" {
databaseKeySpace = keySpace
if !strings.HasSuffix(databaseKeySpace, "/") {
databaseKeySpace += "/"
}
}
}

View file

@ -0,0 +1,117 @@
package subsystems
import (
"sync"
"github.com/safing/portbase/config"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/log"
"github.com/safing/portbase/modules"
)
// Subsystem describes a subset of modules that represent a part of a service or program to the user.
type Subsystem struct { //nolint:maligned // not worth the effort
record.Base
sync.Mutex
Name string
Description string
module *modules.Module
Status *ModuleStatus
Dependencies []*ModuleStatus
FailureStatus uint8
ToggleOptionKey string // empty == forced on
toggleOption *config.Option
toggleValue func() bool
ExpertiseLevel uint8 // copied from toggleOption
ReleaseLevel uint8 // copied from toggleOption
ConfigKeySpace string
}
// ModuleStatus describes the status of a module.
type ModuleStatus struct {
Name string
module *modules.Module
// status mgmt
Enabled bool
Status uint8
// failure status
FailureStatus uint8
FailureID string
FailureMsg string
}
// Save saves the Subsystem Status to the database.
func (sub *Subsystem) Save() {
if databaseKeySpace != "" {
// sub.SetKey() // FIXME
err := db.Put(sub)
if err != nil {
log.Errorf("subsystems: could not save subsystem status to database: %s", err)
}
}
}
func statusFromModule(module *modules.Module) *ModuleStatus {
status := &ModuleStatus{
Name: module.Name,
module: module,
Enabled: module.Enabled(),
Status: module.Status(),
}
status.FailureStatus, status.FailureID, status.FailureMsg = module.FailureStatus()
return status
}
func compareAndUpdateStatus(module *modules.Module, status *ModuleStatus) (changed bool) {
// check if enabled
enabled := module.Enabled()
if status.Enabled != enabled {
status.Enabled = enabled
changed = true
}
// check status
statusLvl := module.Status()
if status.Status != statusLvl {
status.Status = statusLvl
changed = true
}
// check failure status
failureStatus, failureID, failureMsg := module.FailureStatus()
if status.FailureStatus != failureStatus ||
status.FailureID != failureID {
status.FailureStatus = failureStatus
status.FailureID = failureID
status.FailureMsg = failureMsg
changed = true
}
return
}
func (sub *Subsystem) makeSummary() {
// find worst failing module
worstFailing := &ModuleStatus{}
if sub.Status.FailureStatus > worstFailing.FailureStatus {
worstFailing = sub.Status
}
for _, depStatus := range sub.Dependencies {
if depStatus.FailureStatus > worstFailing.FailureStatus {
worstFailing = depStatus
}
}
if worstFailing != nil {
sub.FailureStatus = worstFailing.FailureStatus
} else {
sub.FailureStatus = 0
}
}

View file

@ -0,0 +1,158 @@
package subsystems
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/tevino/abool"
"github.com/safing/portbase/config"
"github.com/safing/portbase/modules"
)
var (
subsystems []*Subsystem
subsystemsMap = make(map[string]*Subsystem)
subsystemsLock sync.Mutex
subsystemsLocked = abool.New()
handlingConfigChanges = abool.New()
)
// Register registers a new subsystem. The given option must be a bool option. Should be called in the module's prep function. The config option must not yet be registered and will be registered for you.
func Register(name, description string, module *modules.Module, configKeySpace string, option *config.Option) error {
// lock slice and map
subsystemsLock.Lock()
defer subsystemsLock.Unlock()
// check if registration is closed
if subsystemsLocked.IsSet() {
return errors.New("subsystems can only be registered in prep phase")
}
// check if already registered
_, ok := subsystemsMap[name]
if ok {
return fmt.Errorf(`subsystem "%s" already registered`, name)
}
// create new
new := &Subsystem{
Name: name,
Description: description,
module: module,
Status: statusFromModule(module),
toggleOption: option,
ConfigKeySpace: configKeySpace,
}
if new.toggleOption != nil {
new.ToggleOptionKey = new.toggleOption.Key
new.ExpertiseLevel = new.toggleOption.ExpertiseLevel
new.ReleaseLevel = new.toggleOption.ReleaseLevel
}
// register config
if option != nil {
err := config.Register(option)
if err != nil {
return fmt.Errorf("failed to register config: %s", err)
}
}
new.toggleValue = config.GetAsBool(new.ToggleOptionKey, false)
// add to lists
subsystemsMap[name] = new
subsystems = append(subsystems, new)
return nil
}
func handleModuleChanges(m *modules.Module) {
// check if ready
if !subsystemsLocked.IsSet() {
return
}
// find module status
var moduleSubsystem *Subsystem
var moduleStatus *ModuleStatus
subsystemLoop:
for _, subsystem := range subsystems {
if m.Name == subsystem.Status.Name {
moduleSubsystem = subsystem
moduleStatus = subsystem.Status
break subsystemLoop
}
for _, status := range subsystem.Dependencies {
if m.Name == status.Name {
moduleSubsystem = subsystem
moduleStatus = status
break subsystemLoop
}
}
}
// abort if not found
if moduleSubsystem == nil || moduleStatus == nil {
return
}
// update status
moduleSubsystem.Lock()
changed := compareAndUpdateStatus(m, moduleStatus)
if changed {
moduleSubsystem.makeSummary()
}
moduleSubsystem.Unlock()
// save
if changed {
moduleSubsystem.Save()
}
}
func handleConfigChanges(ctx context.Context, data interface{}) error {
// check if ready
if !subsystemsLocked.IsSet() {
return nil
}
// potentially catch multiple changes
if handlingConfigChanges.SetToIf(false, true) {
time.Sleep(100 * time.Millisecond)
handlingConfigChanges.UnSet()
} else {
return nil
}
// only run one instance at any time
subsystemsLock.Lock()
defer subsystemsLock.Unlock()
var changed bool
for _, subsystem := range subsystems {
if subsystem.module.SetEnabled(subsystem.toggleValue()) {
// if changed
changed = true
}
}
// trigger module management if any setting was changed
if changed {
err := modules.ManageModules()
if err != nil {
module.Error(
"modulemgmt-failed",
fmt.Sprintf("The subsystem framework failed to start or stop one or more modules.\nError: %s\nCheck logs for more information.", err),
)
} else {
module.Resolve("modulemgmt-failed")
}
}
return nil
}

View file

@ -0,0 +1,132 @@
package subsystems
import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/safing/portbase/config"
"github.com/safing/portbase/dataroot"
"github.com/safing/portbase/modules"
)
func TestSubsystems(t *testing.T) {
// tmp dir for data root (db & config)
tmpDir, err := ioutil.TempDir("", "portbase-testing-")
// initialize data dir
if err == nil {
err = dataroot.Initialize(tmpDir, 0755)
}
// handle setup error
if err != nil {
t.Fatal(err)
}
// register
baseModule := modules.Register("base", nil, nil, nil)
err = Register(
"Base",
"Framework Groundwork",
baseModule,
"config:base",
nil,
)
if err != nil {
t.Fatal(err)
return
}
feature1 := modules.Register("feature1", nil, nil, nil)
err = Register(
"Feature One",
"Provides feature one",
feature1,
"config:feature1",
&config.Option{
Name: "Enable Feature One",
Key: "config:subsystems/feature1",
Description: "This option enables feature 1",
OptType: config.OptTypeBool,
DefaultValue: false,
},
)
if err != nil {
t.Fatal(err)
return
}
sub1 := subsystemsMap["Feature One"]
feature2 := modules.Register("feature2", nil, nil, nil)
err = Register(
"Feature Two",
"Provides feature two",
feature2,
"config:feature2",
&config.Option{
Name: "Enable Feature One",
Key: "config:subsystems/feature2",
Description: "This option enables feature 2",
OptType: config.OptTypeBool,
DefaultValue: false,
},
)
if err != nil {
t.Fatal(err)
return
}
// start
err = modules.Start()
if err != nil {
t.Fatal(err)
}
// test
// let module fail
feature1.Error("test-fail", "Testing Fail")
time.Sleep(10 * time.Millisecond)
if sub1.FailureStatus != modules.FailureError {
t.Fatal("error did not propagate")
}
// resolve
feature1.Resolve("test-fail")
time.Sleep(10 * time.Millisecond)
if sub1.FailureStatus != modules.FailureNone {
t.Fatal("error resolving did not propagate")
}
// update settings
err = config.SetConfigOption("config:subsystems/feature2", true)
if err != nil {
t.Fatal(err)
return
}
time.Sleep(200 * time.Millisecond)
if !feature2.Enabled() {
t.Fatal("failed to enable feature2")
}
if feature2.Status() != modules.StatusOnline {
t.Fatal("feature2 did not start")
}
// update settings
err = config.SetConfigOption("config:subsystems/feature2", false)
if err != nil {
t.Fatal(err)
return
}
time.Sleep(200 * time.Millisecond)
if feature2.Enabled() {
t.Fatal("failed to disable feature2")
}
if feature2.Status() != modules.StatusOffline {
t.Fatal("feature2 did not stop")
}
// clean up and exit
os.RemoveAll(tmpDir)
}

View file

@ -8,21 +8,24 @@ import (
"sync/atomic"
"time"
"github.com/tevino/abool"
"github.com/safing/portbase/log"
"github.com/tevino/abool"
)
// Task is managed task bound to a module.
type Task struct {
name string
module *Module
taskFn func(context.Context, *Task)
taskFn func(context.Context, *Task) error
queued bool
canceled bool
executing bool
cancelFunc func()
queued bool
canceled bool
executing bool
// these are populated at task creation
// ctx is canceled when task is shutdown -> all tasks become canceled
ctx context.Context
cancelCtx func()
executeAt time.Time
repeat time.Duration
@ -59,25 +62,46 @@ const (
)
// NewTask creates a new task with a descriptive name (non-unique), a optional deadline, and the task function to be executed. You must call one of Queue, Prioritize, StartASAP, Schedule or Repeat in order to have the Task executed.
func (m *Module) NewTask(name string, fn func(context.Context, *Task)) *Task {
func (m *Module) NewTask(name string, fn func(context.Context, *Task) error) *Task {
m.Lock()
defer m.Unlock()
if m == nil {
log.Errorf(`modules: cannot create task "%s" with nil module`, name)
return &Task{
name: name,
module: &Module{Name: "[NONE]"},
canceled: true,
}
}
if m.Ctx == nil || !m.OnlineSoon() {
log.Errorf(`modules: tasks should only be started when the module is online or starting`)
return &Task{
name: name,
module: m,
canceled: true,
}
}
return &Task{
// create new task
new := &Task{
name: name,
module: m,
taskFn: fn,
maxDelay: defaultMaxDelay,
}
// create context
new.ctx, new.cancelCtx = context.WithCancel(m.Ctx)
return new
}
func (t *Task) isActive() bool {
if t.module == nil {
if t.canceled {
return false
}
return !t.canceled && !t.module.ShutdownInProgress()
return t.module.OnlineSoon()
}
func (t *Task) prepForQueueing() (ok bool) {
@ -197,45 +221,15 @@ func (t *Task) Repeat(interval time.Duration) *Task {
func (t *Task) Cancel() {
t.lock.Lock()
t.canceled = true
if t.cancelFunc != nil {
t.cancelFunc()
if t.cancelCtx != nil {
t.cancelCtx()
}
t.lock.Unlock()
}
func (t *Task) runWithLocking() {
if t.module == nil {
return
}
// wait for good timeslot regarding microtasks
select {
case <-taskTimeslot:
case <-time.After(maxTimeslotWait):
}
t.lock.Lock()
// check state, return if already executing or inactive
if t.executing || !t.isActive() {
t.lock.Unlock()
return
}
t.executing = true
// get list elements
queueElement := t.queueElement
prioritizedQueueElement := t.prioritizedQueueElement
scheduleListElement := t.scheduleListElement
// create context
var taskCtx context.Context
taskCtx, t.cancelFunc = context.WithCancel(t.module.Ctx)
t.lock.Unlock()
func (t *Task) removeFromQueues() {
// remove from lists
if queueElement != nil {
if t.queueElement != nil {
queuesLock.Lock()
taskQueue.Remove(t.queueElement)
queuesLock.Unlock()
@ -243,7 +237,7 @@ func (t *Task) runWithLocking() {
t.queueElement = nil
t.lock.Unlock()
}
if prioritizedQueueElement != nil {
if t.prioritizedQueueElement != nil {
queuesLock.Lock()
prioritizedTaskQueue.Remove(t.prioritizedQueueElement)
queuesLock.Unlock()
@ -251,7 +245,7 @@ func (t *Task) runWithLocking() {
t.prioritizedQueueElement = nil
t.lock.Unlock()
}
if scheduleListElement != nil {
if t.scheduleListElement != nil {
scheduleLock.Lock()
taskSchedule.Remove(t.scheduleListElement)
scheduleLock.Unlock()
@ -259,14 +253,62 @@ func (t *Task) runWithLocking() {
t.scheduleListElement = nil
t.lock.Unlock()
}
}
func (t *Task) runWithLocking() {
t.lock.Lock()
// check if task is already executing
if t.executing {
t.lock.Unlock()
return
}
// check if task is active
if !t.isActive() {
t.removeFromQueues()
t.lock.Unlock()
return
}
// check if module was stopped
select {
case <-t.ctx.Done(): // check if module is stopped
t.removeFromQueues()
t.lock.Unlock()
return
default:
}
t.executing = true
t.lock.Unlock()
// wait for good timeslot regarding microtasks
select {
case <-taskTimeslot:
case <-time.After(maxTimeslotWait):
}
// wait for module start
if !t.module.Online() {
if t.module.OnlineSoon() {
// wait
<-t.module.StartCompleted()
} else {
t.lock.Lock()
t.removeFromQueues()
t.lock.Unlock()
return
}
}
// add to queue workgroup
queueWg.Add(1)
go t.executeWithLocking(taskCtx, t.cancelFunc)
go t.executeWithLocking()
go func() {
select {
case <-taskCtx.Done():
case <-t.ctx.Done():
case <-time.After(maxExecutionWait):
}
// complete queue worker (early) to allow next worker
@ -274,7 +316,7 @@ func (t *Task) runWithLocking() {
}()
}
func (t *Task) executeWithLocking(ctx context.Context, cancelFunc func()) {
func (t *Task) executeWithLocking() {
// start for module
// hint: only queueWg global var is important for scheduling, others can be set here
atomic.AddInt32(t.module.taskCnt, 1)
@ -306,11 +348,16 @@ func (t *Task) executeWithLocking(ctx context.Context, cancelFunc func()) {
t.lock.Unlock()
// notify that we finished
cancelFunc()
if t.cancelCtx != nil {
t.cancelCtx()
}
}()
// run
t.taskFn(ctx, t)
err := t.taskFn(t.ctx, t)
if err != nil {
log.Errorf("%s: task %s failed: %s", t.module.Name, t.name, err)
}
}
func (t *Task) getExecuteAtWithLocking() time.Time {
@ -320,6 +367,10 @@ func (t *Task) getExecuteAtWithLocking() time.Time {
}
func (t *Task) addToSchedule() {
if !t.isActive() {
return
}
scheduleLock.Lock()
defer scheduleLock.Unlock()
// defer printTaskList(taskSchedule) // for debugging
@ -395,7 +446,7 @@ func taskQueueHandler() {
queueWg.Wait()
// check for shutdown
if shutdownSignalClosed.IsSet() {
if shutdownFlag.IsSet() {
return
}

View file

@ -35,22 +35,29 @@ func init() {
var qtWg sync.WaitGroup
var qtOutputChannel chan string
var qtSleepDuration time.Duration
var qtModule = initNewModule("task test module", nil, nil, nil)
var qtModule *Module
func init() {
qtModule = initNewModule("task test module", nil, nil, nil)
qtModule.status = StatusOnline
}
// functions
func queuedTaskTester(s string) {
qtModule.NewTask(s, func(ctx context.Context, t *Task) {
qtModule.NewTask(s, func(ctx context.Context, t *Task) error {
time.Sleep(qtSleepDuration * 2)
qtOutputChannel <- s
qtWg.Done()
return nil
}).Queue()
}
func prioritizedTaskTester(s string) {
qtModule.NewTask(s, func(ctx context.Context, t *Task) {
qtModule.NewTask(s, func(ctx context.Context, t *Task) error {
time.Sleep(qtSleepDuration * 2)
qtOutputChannel <- s
qtWg.Done()
return nil
}).Prioritize()
}
@ -109,10 +116,11 @@ var stWaitCh chan bool
// functions
func scheduledTaskTester(s string, sched time.Time) {
qtModule.NewTask(s, func(ctx context.Context, t *Task) {
qtModule.NewTask(s, func(ctx context.Context, t *Task) error {
time.Sleep(stSleepDuration)
stOutputChannel <- s
stWg.Done()
return nil
}).Schedule(sched)
}

View file

@ -73,7 +73,7 @@ func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn
lastFail := time.Now()
for {
if m.ShutdownInProgress() {
if m.IsStopping() {
return
}