mirror of
https://github.com/safing/portbase
synced 2025-09-02 18:50:14 +00:00
commit
2cd0412e18
32 changed files with 1733 additions and 479 deletions
|
@ -7,5 +7,11 @@ linters:
|
||||||
- funlen
|
- funlen
|
||||||
- whitespace
|
- whitespace
|
||||||
- wsl
|
- wsl
|
||||||
- godox
|
- gomnd
|
||||||
|
|
||||||
|
linters-settings:
|
||||||
|
godox:
|
||||||
|
# report any comments starting with keywords, this is useful for TODO or FIXME comments that
|
||||||
|
# might be left in the code accidentally and should be resolved before merging
|
||||||
|
keywords:
|
||||||
|
- FIXME
|
||||||
|
|
|
@ -243,8 +243,8 @@ func (api *DatabaseAPI) handleGet(opID []byte, key string) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
data, err = r.Marshal(r, record.JSON)
|
data, err = r.Marshal(r, record.JSON)
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err != nil {
|
||||||
api.send(opID, dbMsgTypeError, err.Error(), nil) //nolint:nilness // FIXME: possibly false positive (golangci-lint govet/nilness)
|
api.send(opID, dbMsgTypeError, err.Error(), nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
api.send(opID, dbMsgTypeOk, r.Key(), data)
|
api.send(opID, dbMsgTypeOk, r.Key(), data)
|
||||||
|
@ -435,13 +435,13 @@ func (api *DatabaseAPI) handlePut(opID []byte, key string, data []byte, create b
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: remove transition code
|
// TODO - staged for deletion: remove transition code
|
||||||
if data[0] != record.JSON {
|
// if data[0] != record.JSON {
|
||||||
typedData := make([]byte, len(data)+1)
|
// typedData := make([]byte, len(data)+1)
|
||||||
typedData[0] = record.JSON
|
// typedData[0] = record.JSON
|
||||||
copy(typedData[1:], data)
|
// copy(typedData[1:], data)
|
||||||
data = typedData
|
// data = typedData
|
||||||
}
|
// }
|
||||||
|
|
||||||
r, err := record.NewWrapper(key, nil, data[0], data[1:])
|
r, err := record.NewWrapper(key, nil, data[0], data[1:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -5,9 +5,9 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/dataroot"
|
||||||
"github.com/safing/portbase/modules"
|
"github.com/safing/portbase/modules"
|
||||||
"github.com/safing/portbase/utils"
|
"github.com/safing/portbase/utils"
|
||||||
"github.com/safing/portmaster/core/structure"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -27,12 +27,12 @@ func SetDataRoot(root *utils.DirStructure) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
module = modules.Register("config", prep, start, nil, "base", "database")
|
module = modules.Register("config", prep, start, nil, "database")
|
||||||
module.RegisterEvent(configChangeEvent)
|
module.RegisterEvent(configChangeEvent)
|
||||||
}
|
}
|
||||||
|
|
||||||
func prep() error {
|
func prep() error {
|
||||||
SetDataRoot(structure.Root())
|
SetDataRoot(dataroot.Root())
|
||||||
if dataRoot == nil {
|
if dataRoot == nil {
|
||||||
return errors.New("data root is not set")
|
return errors.New("data root is not set")
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,12 +128,12 @@ func TestDatabaseSystem(t *testing.T) {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
testDir, err := ioutil.TempDir("", "testing-")
|
testDir, err := ioutil.TempDir("", "portbase-database-testing-")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = Initialize(testDir, nil)
|
err = InitializeWithPath(testDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,41 +4,44 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/safing/portbase/database"
|
"github.com/safing/portbase/database"
|
||||||
|
"github.com/safing/portbase/dataroot"
|
||||||
"github.com/safing/portbase/modules"
|
"github.com/safing/portbase/modules"
|
||||||
"github.com/safing/portbase/utils"
|
"github.com/safing/portbase/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
databasePath string
|
|
||||||
databaseStructureRoot *utils.DirStructure
|
databaseStructureRoot *utils.DirStructure
|
||||||
|
|
||||||
module *modules.Module
|
module *modules.Module
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
module = modules.Register("database", prep, start, stop, "base")
|
module = modules.Register("database", prep, start, stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetDatabaseLocation sets the location of the database for initialization. Supply either a path or dir structure.
|
// SetDatabaseLocation sets the location of the database for initialization. Supply either a path or dir structure.
|
||||||
func SetDatabaseLocation(dirPath string, dirStructureRoot *utils.DirStructure) {
|
func SetDatabaseLocation(dirStructureRoot *utils.DirStructure) {
|
||||||
databasePath = dirPath
|
if databaseStructureRoot == nil {
|
||||||
databaseStructureRoot = dirStructureRoot
|
databaseStructureRoot = dirStructureRoot
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func prep() error {
|
func prep() error {
|
||||||
if databasePath == "" && databaseStructureRoot == nil {
|
SetDatabaseLocation(dataroot.Root())
|
||||||
return errors.New("no database location specified")
|
if databaseStructureRoot == nil {
|
||||||
|
return errors.New("database location not specified")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func start() error {
|
func start() error {
|
||||||
err := database.Initialize(databasePath, databaseStructureRoot)
|
err := database.Initialize(databaseStructureRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
registerMaintenanceTasks()
|
startMaintenanceTasks()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,33 +5,23 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/safing/portbase/database"
|
"github.com/safing/portbase/database"
|
||||||
"github.com/safing/portbase/log"
|
|
||||||
"github.com/safing/portbase/modules"
|
"github.com/safing/portbase/modules"
|
||||||
)
|
)
|
||||||
|
|
||||||
func registerMaintenanceTasks() {
|
func startMaintenanceTasks() {
|
||||||
module.NewTask("basic maintenance", maintainBasic).Repeat(10 * time.Minute).MaxDelay(10 * time.Minute)
|
module.NewTask("basic maintenance", maintainBasic).Repeat(10 * time.Minute).MaxDelay(10 * time.Minute)
|
||||||
module.NewTask("thorough maintenance", maintainThorough).Repeat(1 * time.Hour).MaxDelay(1 * time.Hour)
|
module.NewTask("thorough maintenance", maintainThorough).Repeat(1 * time.Hour).MaxDelay(1 * time.Hour)
|
||||||
module.NewTask("record maintenance", maintainRecords).Repeat(1 * time.Hour).MaxDelay(1 * time.Hour)
|
module.NewTask("record maintenance", maintainRecords).Repeat(1 * time.Hour).MaxDelay(1 * time.Hour)
|
||||||
}
|
}
|
||||||
|
|
||||||
func maintainBasic(ctx context.Context, task *modules.Task) {
|
func maintainBasic(ctx context.Context, task *modules.Task) error {
|
||||||
err := database.Maintain()
|
return database.Maintain()
|
||||||
if err != nil {
|
|
||||||
log.Errorf("database: maintenance error: %s", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func maintainThorough(ctx context.Context, task *modules.Task) {
|
func maintainThorough(ctx context.Context, task *modules.Task) error {
|
||||||
err := database.MaintainThorough()
|
return database.MaintainThorough()
|
||||||
if err != nil {
|
|
||||||
log.Errorf("database: thorough maintenance error: %s", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func maintainRecords(ctx context.Context, task *modules.Task) {
|
func maintainRecords(ctx context.Context, task *modules.Task) error {
|
||||||
err := database.MaintainRecordStates()
|
return database.MaintainRecordStates()
|
||||||
if err != nil {
|
|
||||||
log.Errorf("database: record states maintenance error: %s", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,15 +23,15 @@ var (
|
||||||
databasesStructure *utils.DirStructure
|
databasesStructure *utils.DirStructure
|
||||||
)
|
)
|
||||||
|
|
||||||
// Initialize initializes the database at the specified location. Supply either a path or dir structure.
|
// InitializeWithPath initializes the database at the specified location using a path.
|
||||||
func Initialize(dirPath string, dirStructureRoot *utils.DirStructure) error {
|
func InitializeWithPath(dirPath string) error {
|
||||||
if initialized.SetToIf(false, true) {
|
return Initialize(utils.NewDirStructure(dirPath, 0755))
|
||||||
|
}
|
||||||
|
|
||||||
if dirStructureRoot != nil {
|
// Initialize initializes the database at the specified location using a dir structure.
|
||||||
rootStructure = dirStructureRoot
|
func Initialize(dirStructureRoot *utils.DirStructure) error {
|
||||||
} else {
|
if initialized.SetToIf(false, true) {
|
||||||
rootStructure = utils.NewDirStructure(dirPath, 0755)
|
rootStructure = dirStructureRoot
|
||||||
}
|
|
||||||
|
|
||||||
// ensure root and databases dirs
|
// ensure root and databases dirs
|
||||||
databasesStructure = rootStructure.ChildDir(databasesSubDir, 0700)
|
databasesStructure = rootStructure.ChildDir(databasesSubDir, 0700)
|
||||||
|
|
|
@ -139,7 +139,7 @@ func saveRegistry(lock bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// write file
|
// write file
|
||||||
// FIXME: write atomically (best effort)
|
// TODO: write atomically (best effort)
|
||||||
filePath := path.Join(rootStructure.Path, registryFileName)
|
filePath := path.Join(rootStructure.Path, registryFileName)
|
||||||
return ioutil.WriteFile(filePath, data, 0600)
|
return ioutil.WriteFile(filePath, data, 0600)
|
||||||
}
|
}
|
||||||
|
|
27
dataroot/root.go
Normal file
27
dataroot/root.go
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
package dataroot
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
root *utils.DirStructure
|
||||||
|
)
|
||||||
|
|
||||||
|
// Initialize initializes the data root directory
|
||||||
|
func Initialize(rootDir string, perm os.FileMode) error {
|
||||||
|
if root != nil {
|
||||||
|
return errors.New("already initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
root = utils.NewDirStructure(rootDir, perm)
|
||||||
|
return root.Ensure()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Root returns the data root directory.
|
||||||
|
func Root() *utils.DirStructure {
|
||||||
|
return root
|
||||||
|
}
|
|
@ -12,7 +12,7 @@ func log(level Severity, msg string, tracer *ContextTracer) {
|
||||||
|
|
||||||
if !started.IsSet() {
|
if !started.IsSet() {
|
||||||
// a bit resource intense, but keeps logs before logging started.
|
// a bit resource intense, but keeps logs before logging started.
|
||||||
// FIXME: create option to disable logging
|
// TODO: create option to disable logging
|
||||||
go func() {
|
go func() {
|
||||||
<-startedSignal
|
<-startedSignal
|
||||||
log(level, msg, tracer)
|
log(level, msg, tracer)
|
||||||
|
|
|
@ -82,6 +82,7 @@ var (
|
||||||
logsWaiting = make(chan struct{}, 4)
|
logsWaiting = make(chan struct{}, 4)
|
||||||
logsWaitingFlag = abool.NewBool(false)
|
logsWaitingFlag = abool.NewBool(false)
|
||||||
|
|
||||||
|
shutdownFlag = abool.NewBool(false)
|
||||||
shutdownSignal = make(chan struct{})
|
shutdownSignal = make(chan struct{})
|
||||||
shutdownWaitGroup sync.WaitGroup
|
shutdownWaitGroup sync.WaitGroup
|
||||||
|
|
||||||
|
@ -179,6 +180,8 @@ func Start() (err error) {
|
||||||
|
|
||||||
// Shutdown writes remaining log lines and then stops the log system.
|
// Shutdown writes remaining log lines and then stops the log system.
|
||||||
func Shutdown() {
|
func Shutdown() {
|
||||||
close(shutdownSignal)
|
if shutdownFlag.SetToIf(false, true) {
|
||||||
|
close(shutdownSignal)
|
||||||
|
}
|
||||||
shutdownWaitGroup.Wait()
|
shutdownWaitGroup.Wait()
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,7 +89,7 @@ func (tracer *ContextTracer) Submit() {
|
||||||
|
|
||||||
if !started.IsSet() {
|
if !started.IsSet() {
|
||||||
// a bit resource intense, but keeps logs before logging started.
|
// a bit resource intense, but keeps logs before logging started.
|
||||||
// FIXME: create option to disable logging
|
// TODO: create option to disable logging
|
||||||
go func() {
|
go func() {
|
||||||
<-startedSignal
|
<-startedSignal
|
||||||
tracer.Submit()
|
tracer.Submit()
|
||||||
|
|
|
@ -17,7 +17,9 @@ type eventHook struct {
|
||||||
|
|
||||||
// TriggerEvent executes all hook functions registered to the specified event.
|
// TriggerEvent executes all hook functions registered to the specified event.
|
||||||
func (m *Module) TriggerEvent(event string, data interface{}) {
|
func (m *Module) TriggerEvent(event string, data interface{}) {
|
||||||
go m.processEventTrigger(event, data)
|
if m.OnlineSoon() {
|
||||||
|
go m.processEventTrigger(event, data)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Module) processEventTrigger(event string, data interface{}) {
|
func (m *Module) processEventTrigger(event string, data interface{}) {
|
||||||
|
@ -31,18 +33,35 @@ func (m *Module) processEventTrigger(event string, data interface{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, hook := range hooks {
|
for _, hook := range hooks {
|
||||||
if !hook.hookingModule.ShutdownInProgress() {
|
if hook.hookingModule.OnlineSoon() {
|
||||||
go m.runEventHook(hook, event, data)
|
go m.runEventHook(hook, event, data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Module) runEventHook(hook *eventHook, event string, data interface{}) {
|
func (m *Module) runEventHook(hook *eventHook, event string, data interface{}) {
|
||||||
if !hook.hookingModule.Started.IsSet() {
|
// check if source module is ready for handling
|
||||||
|
if m.Status() != StatusOnline {
|
||||||
// target module has not yet fully started, wait until start is complete
|
// target module has not yet fully started, wait until start is complete
|
||||||
select {
|
select {
|
||||||
case <-startCompleteSignal:
|
case <-m.StartCompleted():
|
||||||
case <-shutdownSignal:
|
// continue with hook execution
|
||||||
|
case <-hook.hookingModule.Stopping():
|
||||||
|
return
|
||||||
|
case <-m.Stopping():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if destionation module is ready for handling
|
||||||
|
if hook.hookingModule.Status() != StatusOnline {
|
||||||
|
// target module has not yet fully started, wait until start is complete
|
||||||
|
select {
|
||||||
|
case <-hook.hookingModule.StartCompleted():
|
||||||
|
// continue with hook execution
|
||||||
|
case <-hook.hookingModule.Stopping():
|
||||||
|
return
|
||||||
|
case <-m.Stopping():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -69,7 +88,7 @@ func (m *Module) RegisterEvent(event string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterEventHook registers a hook function with (another) modules' event. Whenever a hook is triggered and the receiving module has not yet fully started, hook execution will be delayed until all modules completed starting.
|
// RegisterEventHook registers a hook function with (another) modules' event. Whenever a hook is triggered and the receiving module has not yet fully started, hook execution will be delayed until the modules completed starting.
|
||||||
func (m *Module) RegisterEventHook(module string, event string, description string, fn func(context.Context, interface{}) error) error {
|
func (m *Module) RegisterEventHook(module string, event string, description string, fn func(context.Context, interface{}) error) error {
|
||||||
// get target module
|
// get target module
|
||||||
var eventModule *Module
|
var eventModule *Module
|
||||||
|
@ -77,9 +96,7 @@ func (m *Module) RegisterEventHook(module string, event string, description stri
|
||||||
eventModule = m
|
eventModule = m
|
||||||
} else {
|
} else {
|
||||||
var ok bool
|
var ok bool
|
||||||
modulesLock.RLock()
|
|
||||||
eventModule, ok = modules[module]
|
eventModule, ok = modules[module]
|
||||||
modulesLock.RUnlock()
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf(`module "%s" does not exist`, module)
|
return fmt.Errorf(`module "%s" does not exist`, module)
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,6 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseFlags() error {
|
func parseFlags() error {
|
||||||
|
|
||||||
// parse flags
|
// parse flags
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
|
|
107
modules/mgmt.go
Normal file
107
modules/mgmt.go
Normal file
|
@ -0,0 +1,107 @@
|
||||||
|
package modules
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/log"
|
||||||
|
"github.com/tevino/abool"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
moduleMgmtEnabled = abool.NewBool(false)
|
||||||
|
modulesChangeNotifyFn func(*Module)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Enable enables the module. Only has an effect if module management is enabled.
|
||||||
|
func (m *Module) Enable() (changed bool) {
|
||||||
|
return m.enabled.SetToIf(false, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disable disables the module. Only has an effect if module management is enabled.
|
||||||
|
func (m *Module) Disable() (changed bool) {
|
||||||
|
return m.enabled.SetToIf(true, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetEnabled sets the module to the desired enabled state. Only has an effect if module management is enabled.
|
||||||
|
func (m *Module) SetEnabled(enable bool) (changed bool) {
|
||||||
|
if enable {
|
||||||
|
return m.Enable()
|
||||||
|
}
|
||||||
|
return m.Disable()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enabled returns wether or not the module is currently enabled.
|
||||||
|
func (m *Module) Enabled() bool {
|
||||||
|
return m.enabled.IsSet()
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnableModuleManagement enables the module management functionality within modules. The supplied notify function will be called whenever the status of a module changes. The affected module will be in the parameter. You will need to manually enable modules, else nothing will start.
|
||||||
|
func EnableModuleManagement(changeNotifyFn func(*Module)) {
|
||||||
|
if moduleMgmtEnabled.SetToIf(false, true) {
|
||||||
|
modulesChangeNotifyFn = changeNotifyFn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Module) notifyOfChange() {
|
||||||
|
if moduleMgmtEnabled.IsSet() && modulesChangeNotifyFn != nil {
|
||||||
|
m.StartWorker("notify of change", func(ctx context.Context) error {
|
||||||
|
modulesChangeNotifyFn(m)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ManageModules triggers the module manager to react to recent changes of enabled modules.
|
||||||
|
func ManageModules() error {
|
||||||
|
// check if enabled
|
||||||
|
if !moduleMgmtEnabled.IsSet() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// lock mgmt
|
||||||
|
mgmtLock.Lock()
|
||||||
|
defer mgmtLock.Unlock()
|
||||||
|
|
||||||
|
log.Info("modules: managing changes")
|
||||||
|
|
||||||
|
// build new dependency tree
|
||||||
|
buildEnabledTree()
|
||||||
|
|
||||||
|
// stop unneeded modules
|
||||||
|
lastErr := stopModules()
|
||||||
|
if lastErr != nil {
|
||||||
|
log.Warning(lastErr.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// start needed modules
|
||||||
|
err := startModules()
|
||||||
|
if err != nil {
|
||||||
|
log.Warning(err.Error())
|
||||||
|
lastErr = err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("modules: finished managing")
|
||||||
|
return lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildEnabledTree() {
|
||||||
|
// reset marked dependencies
|
||||||
|
for _, m := range modules {
|
||||||
|
m.enabledAsDependency.UnSet()
|
||||||
|
}
|
||||||
|
|
||||||
|
// mark dependencies
|
||||||
|
for _, m := range modules {
|
||||||
|
if m.enabled.IsSet() {
|
||||||
|
m.markDependencies()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Module) markDependencies() {
|
||||||
|
for _, dep := range m.depModules {
|
||||||
|
if dep.enabledAsDependency.SetToIf(false, true) {
|
||||||
|
dep.markDependencies()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
165
modules/mgmt_test.go
Normal file
165
modules/mgmt_test.go
Normal file
|
@ -0,0 +1,165 @@
|
||||||
|
package modules
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func testModuleMgmt(t *testing.T) {
|
||||||
|
|
||||||
|
// enable module management
|
||||||
|
EnableModuleManagement(nil)
|
||||||
|
|
||||||
|
registerTestModule(t, "base")
|
||||||
|
registerTestModule(t, "feature1", "base")
|
||||||
|
registerTestModule(t, "base2", "base")
|
||||||
|
registerTestModule(t, "feature2", "base2")
|
||||||
|
registerTestModule(t, "sub-feature", "base")
|
||||||
|
registerTestModule(t, "feature3", "sub-feature")
|
||||||
|
registerTestModule(t, "feature4", "sub-feature")
|
||||||
|
|
||||||
|
// enable core module
|
||||||
|
core := modules["base"]
|
||||||
|
core.Enable()
|
||||||
|
|
||||||
|
// start and check order
|
||||||
|
err := Start()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if changeHistory != " on:base" {
|
||||||
|
t.Errorf("order mismatch, was %s", changeHistory)
|
||||||
|
}
|
||||||
|
changeHistory = ""
|
||||||
|
|
||||||
|
// enable feature1
|
||||||
|
feature1 := modules["feature1"]
|
||||||
|
feature1.Enable()
|
||||||
|
// manage modules and check
|
||||||
|
err = ManageModules()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if changeHistory != " on:feature1" {
|
||||||
|
t.Errorf("order mismatch, was %s", changeHistory)
|
||||||
|
}
|
||||||
|
changeHistory = ""
|
||||||
|
|
||||||
|
// enable feature2
|
||||||
|
feature2 := modules["feature2"]
|
||||||
|
feature2.Enable()
|
||||||
|
// manage modules and check
|
||||||
|
err = ManageModules()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if changeHistory != " on:base2 on:feature2" {
|
||||||
|
t.Errorf("order mismatch, was %s", changeHistory)
|
||||||
|
}
|
||||||
|
changeHistory = ""
|
||||||
|
|
||||||
|
// enable feature3
|
||||||
|
feature3 := modules["feature3"]
|
||||||
|
feature3.Enable()
|
||||||
|
// manage modules and check
|
||||||
|
err = ManageModules()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if changeHistory != " on:sub-feature on:feature3" {
|
||||||
|
t.Errorf("order mismatch, was %s", changeHistory)
|
||||||
|
}
|
||||||
|
changeHistory = ""
|
||||||
|
|
||||||
|
// enable feature4
|
||||||
|
feature4 := modules["feature4"]
|
||||||
|
feature4.Enable()
|
||||||
|
// manage modules and check
|
||||||
|
err = ManageModules()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if changeHistory != " on:feature4" {
|
||||||
|
t.Errorf("order mismatch, was %s", changeHistory)
|
||||||
|
}
|
||||||
|
changeHistory = ""
|
||||||
|
|
||||||
|
// disable feature1
|
||||||
|
feature1.Disable()
|
||||||
|
// manage modules and check
|
||||||
|
err = ManageModules()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if changeHistory != " off:feature1" {
|
||||||
|
t.Errorf("order mismatch, was %s", changeHistory)
|
||||||
|
}
|
||||||
|
changeHistory = ""
|
||||||
|
|
||||||
|
// disable feature3
|
||||||
|
feature3.Disable()
|
||||||
|
// manage modules and check
|
||||||
|
err = ManageModules()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// disable feature4
|
||||||
|
feature4.Disable()
|
||||||
|
// manage modules and check
|
||||||
|
err = ManageModules()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if changeHistory != " off:feature3 off:feature4 off:sub-feature" {
|
||||||
|
t.Errorf("order mismatch, was %s", changeHistory)
|
||||||
|
}
|
||||||
|
changeHistory = ""
|
||||||
|
|
||||||
|
// enable feature4
|
||||||
|
feature4.Enable()
|
||||||
|
// manage modules and check
|
||||||
|
err = ManageModules()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if changeHistory != " on:sub-feature on:feature4" {
|
||||||
|
t.Errorf("order mismatch, was %s", changeHistory)
|
||||||
|
}
|
||||||
|
changeHistory = ""
|
||||||
|
|
||||||
|
// disable feature4
|
||||||
|
feature4.Disable()
|
||||||
|
// manage modules and check
|
||||||
|
err = ManageModules()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if changeHistory != " off:feature4 off:sub-feature" {
|
||||||
|
t.Errorf("order mismatch, was %s", changeHistory)
|
||||||
|
}
|
||||||
|
changeHistory = ""
|
||||||
|
|
||||||
|
err = Shutdown()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
if changeHistory != " off:feature2 off:base2 off:base" {
|
||||||
|
t.Errorf("order mismatch, was %s", changeHistory)
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset history
|
||||||
|
changeHistory = ""
|
||||||
|
|
||||||
|
// disable module management
|
||||||
|
moduleMgmtEnabled.UnSet()
|
||||||
|
|
||||||
|
resetTestEnvironment()
|
||||||
|
}
|
|
@ -172,7 +172,7 @@ func microTaskScheduler() {
|
||||||
|
|
||||||
microTaskManageLoop:
|
microTaskManageLoop:
|
||||||
for {
|
for {
|
||||||
if shutdownSignalClosed.IsSet() {
|
if shutdownFlag.IsSet() {
|
||||||
close(mediumPriorityClearance)
|
close(mediumPriorityClearance)
|
||||||
close(lowPriorityClearance)
|
close(lowPriorityClearance)
|
||||||
return
|
return
|
||||||
|
|
|
@ -13,32 +13,44 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
modulesLock sync.RWMutex
|
modules = make(map[string]*Module)
|
||||||
modules = make(map[string]*Module)
|
mgmtLock sync.Mutex
|
||||||
|
|
||||||
|
// lock modules when starting
|
||||||
|
modulesLocked = abool.New()
|
||||||
|
|
||||||
// ErrCleanExit is returned by Start() when the program is interrupted before starting. This can happen for example, when using the "--help" flag.
|
// ErrCleanExit is returned by Start() when the program is interrupted before starting. This can happen for example, when using the "--help" flag.
|
||||||
ErrCleanExit = errors.New("clean exit requested")
|
ErrCleanExit = errors.New("clean exit requested")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Module represents a module.
|
// Module represents a module.
|
||||||
type Module struct {
|
type Module struct { //nolint:maligned // not worth the effort
|
||||||
|
sync.RWMutex
|
||||||
|
|
||||||
Name string
|
Name string
|
||||||
|
|
||||||
// lifecycle mgmt
|
// status mgmt
|
||||||
Prepped *abool.AtomicBool
|
enabled *abool.AtomicBool
|
||||||
Started *abool.AtomicBool
|
enabledAsDependency *abool.AtomicBool
|
||||||
Stopped *abool.AtomicBool
|
status uint8
|
||||||
inTransition *abool.AtomicBool
|
|
||||||
|
// failure status
|
||||||
|
failureStatus uint8
|
||||||
|
failureID string
|
||||||
|
failureMsg string
|
||||||
|
|
||||||
// lifecycle callback functions
|
// lifecycle callback functions
|
||||||
prep func() error
|
prepFn func() error
|
||||||
start func() error
|
startFn func() error
|
||||||
stop func() error
|
stopFn func() error
|
||||||
|
|
||||||
// shutdown mgmt
|
// lifecycle mgmt
|
||||||
Ctx context.Context
|
// start
|
||||||
cancelCtx func()
|
startComplete chan struct{}
|
||||||
shutdownFlag *abool.AtomicBool
|
// stop
|
||||||
|
Ctx context.Context
|
||||||
|
cancelCtx func()
|
||||||
|
stopFlag *abool.AtomicBool
|
||||||
|
|
||||||
// workers/tasks
|
// workers/tasks
|
||||||
workerCnt *int32
|
workerCnt *int32
|
||||||
|
@ -56,30 +68,177 @@ type Module struct {
|
||||||
depReverse []*Module
|
depReverse []*Module
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShutdownInProgress returns whether the module has started shutting down. In most cases, you should use ShuttingDown instead.
|
// StartCompleted returns a channel read that triggers when the module has finished starting.
|
||||||
func (m *Module) ShutdownInProgress() bool {
|
func (m *Module) StartCompleted() <-chan struct{} {
|
||||||
return m.shutdownFlag.IsSet()
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
return m.startComplete
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShuttingDown lets you listen for the shutdown signal.
|
// Stopping returns a channel read that triggers when the module has initiated the stop procedure.
|
||||||
func (m *Module) ShuttingDown() <-chan struct{} {
|
func (m *Module) Stopping() <-chan struct{} {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
return m.Ctx.Done()
|
return m.Ctx.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Module) shutdown() error {
|
// IsStopping returns whether the module has started shutting down. In most cases, you should use Stopping instead.
|
||||||
// signal shutdown
|
func (m *Module) IsStopping() bool {
|
||||||
m.shutdownFlag.Set()
|
return m.stopFlag.IsSet()
|
||||||
m.cancelCtx()
|
}
|
||||||
|
|
||||||
// start shutdown function
|
// Dependencies returns the module's dependencies.
|
||||||
m.waitGroup.Add(1)
|
func (m *Module) Dependencies() []*Module {
|
||||||
stopFnError := make(chan error, 1)
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
return m.depModules
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Module) prep(reports chan *report) {
|
||||||
|
// check and set intermediate status
|
||||||
|
m.Lock()
|
||||||
|
if m.status != StatusDead {
|
||||||
|
m.Unlock()
|
||||||
|
go func() {
|
||||||
|
reports <- &report{
|
||||||
|
module: m,
|
||||||
|
err: fmt.Errorf("module already prepped"),
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.status = StatusPreparing
|
||||||
|
m.Unlock()
|
||||||
|
|
||||||
|
// run prep function
|
||||||
go func() {
|
go func() {
|
||||||
stopFnError <- m.runCtrlFn("stop module", m.stop)
|
var err error
|
||||||
m.waitGroup.Done()
|
if m.prepFn != nil {
|
||||||
|
// execute function
|
||||||
|
err = m.runCtrlFnWithTimeout(
|
||||||
|
"prep module",
|
||||||
|
10*time.Second,
|
||||||
|
m.prepFn,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
// set status
|
||||||
|
if err != nil {
|
||||||
|
m.Error(
|
||||||
|
"module-failed-prep",
|
||||||
|
fmt.Sprintf("failed to prep module: %s", err.Error()),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
m.Lock()
|
||||||
|
m.status = StatusOffline
|
||||||
|
m.Unlock()
|
||||||
|
m.notifyOfChange()
|
||||||
|
}
|
||||||
|
// send report
|
||||||
|
reports <- &report{
|
||||||
|
module: m,
|
||||||
|
err: err,
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// wait for workers
|
func (m *Module) start(reports chan *report) {
|
||||||
|
// check and set intermediate status
|
||||||
|
m.Lock()
|
||||||
|
if m.status != StatusOffline {
|
||||||
|
m.Unlock()
|
||||||
|
go func() {
|
||||||
|
reports <- &report{
|
||||||
|
module: m,
|
||||||
|
err: fmt.Errorf("module not offline"),
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.status = StatusStarting
|
||||||
|
|
||||||
|
// reset stop management
|
||||||
|
if m.cancelCtx != nil {
|
||||||
|
// trigger cancel just to be sure
|
||||||
|
m.cancelCtx()
|
||||||
|
}
|
||||||
|
m.Ctx, m.cancelCtx = context.WithCancel(context.Background())
|
||||||
|
m.stopFlag.UnSet()
|
||||||
|
|
||||||
|
m.Unlock()
|
||||||
|
|
||||||
|
// run start function
|
||||||
|
go func() {
|
||||||
|
var err error
|
||||||
|
if m.startFn != nil {
|
||||||
|
// execute function
|
||||||
|
err = m.runCtrlFnWithTimeout(
|
||||||
|
"start module",
|
||||||
|
10*time.Second,
|
||||||
|
m.startFn,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
// set status
|
||||||
|
if err != nil {
|
||||||
|
m.Error(
|
||||||
|
"module-failed-start",
|
||||||
|
fmt.Sprintf("failed to start module: %s", err.Error()),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
m.Lock()
|
||||||
|
m.status = StatusOnline
|
||||||
|
// init start management
|
||||||
|
close(m.startComplete)
|
||||||
|
m.Unlock()
|
||||||
|
m.notifyOfChange()
|
||||||
|
}
|
||||||
|
// send report
|
||||||
|
reports <- &report{
|
||||||
|
module: m,
|
||||||
|
err: err,
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Module) stop(reports chan *report) {
|
||||||
|
// check and set intermediate status
|
||||||
|
m.Lock()
|
||||||
|
if m.status != StatusOnline {
|
||||||
|
m.Unlock()
|
||||||
|
go func() {
|
||||||
|
reports <- &report{
|
||||||
|
module: m,
|
||||||
|
err: fmt.Errorf("module not online"),
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.status = StatusStopping
|
||||||
|
|
||||||
|
// reset start management
|
||||||
|
m.startComplete = make(chan struct{})
|
||||||
|
// init stop management
|
||||||
|
m.cancelCtx()
|
||||||
|
m.stopFlag.Set()
|
||||||
|
|
||||||
|
m.Unlock()
|
||||||
|
|
||||||
|
go m.stopAllTasks(reports)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Module) stopAllTasks(reports chan *report) {
|
||||||
|
// start shutdown function
|
||||||
|
stopFnFinished := abool.NewBool(false)
|
||||||
|
var stopFnError error
|
||||||
|
if m.stopFn != nil {
|
||||||
|
m.waitGroup.Add(1)
|
||||||
|
go func() {
|
||||||
|
stopFnError = m.runCtrlFn("stop module", m.stopFn)
|
||||||
|
stopFnFinished.Set()
|
||||||
|
m.waitGroup.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for workers and stop fn
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
m.waitGroup.Wait()
|
m.waitGroup.Wait()
|
||||||
|
@ -91,8 +250,9 @@ func (m *Module) shutdown() error {
|
||||||
case <-done:
|
case <-done:
|
||||||
case <-time.After(30 * time.Second):
|
case <-time.After(30 * time.Second):
|
||||||
log.Warningf(
|
log.Warningf(
|
||||||
"%s: timed out while waiting for workers/tasks to finish: workers=%d tasks=%d microtasks=%d, continuing shutdown...",
|
"%s: timed out while waiting for stopfn/workers/tasks to finish: stopFn=%v workers=%d tasks=%d microtasks=%d, continuing shutdown...",
|
||||||
m.Name,
|
m.Name,
|
||||||
|
stopFnFinished.IsSet(),
|
||||||
atomic.LoadInt32(m.workerCnt),
|
atomic.LoadInt32(m.workerCnt),
|
||||||
atomic.LoadInt32(m.taskCnt),
|
atomic.LoadInt32(m.taskCnt),
|
||||||
atomic.LoadInt32(m.microTaskCnt),
|
atomic.LoadInt32(m.microTaskCnt),
|
||||||
|
@ -100,24 +260,37 @@ func (m *Module) shutdown() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// collect error
|
// collect error
|
||||||
select {
|
var err error
|
||||||
case err := <-stopFnError:
|
if stopFnFinished.IsSet() && stopFnError != nil {
|
||||||
return err
|
err = stopFnError
|
||||||
default:
|
}
|
||||||
log.Warningf(
|
// set status
|
||||||
"%s: timed out while waiting for stop function to finish, continuing shutdown...",
|
if err != nil {
|
||||||
m.Name,
|
m.Error(
|
||||||
|
"module-failed-stop",
|
||||||
|
fmt.Sprintf("failed to stop module: %s", err.Error()),
|
||||||
)
|
)
|
||||||
return nil
|
} else {
|
||||||
|
m.Lock()
|
||||||
|
m.status = StatusOffline
|
||||||
|
m.Unlock()
|
||||||
|
m.notifyOfChange()
|
||||||
|
}
|
||||||
|
// send report
|
||||||
|
reports <- &report{
|
||||||
|
module: m,
|
||||||
|
err: err,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register registers a new module. The control functions `prep`, `start` and `stop` are technically optional. `stop` is called _after_ all added module workers finished.
|
// Register registers a new module. The control functions `prep`, `start` and `stop` are technically optional. `stop` is called _after_ all added module workers finished.
|
||||||
func Register(name string, prep, start, stop func() error, dependencies ...string) *Module {
|
func Register(name string, prep, start, stop func() error, dependencies ...string) *Module {
|
||||||
|
if modulesLocked.IsSet() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
newModule := initNewModule(name, prep, start, stop, dependencies...)
|
newModule := initNewModule(name, prep, start, stop, dependencies...)
|
||||||
|
|
||||||
modulesLock.Lock()
|
|
||||||
defer modulesLock.Unlock()
|
|
||||||
// check for already existing module
|
// check for already existing module
|
||||||
_, ok := modules[name]
|
_, ok := modules[name]
|
||||||
if ok {
|
if ok {
|
||||||
|
@ -136,23 +309,22 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
|
||||||
var microTaskCnt int32
|
var microTaskCnt int32
|
||||||
|
|
||||||
newModule := &Module{
|
newModule := &Module{
|
||||||
Name: name,
|
Name: name,
|
||||||
Prepped: abool.NewBool(false),
|
enabled: abool.NewBool(false),
|
||||||
Started: abool.NewBool(false),
|
enabledAsDependency: abool.NewBool(false),
|
||||||
Stopped: abool.NewBool(false),
|
prepFn: prep,
|
||||||
inTransition: abool.NewBool(false),
|
startFn: start,
|
||||||
Ctx: ctx,
|
stopFn: stop,
|
||||||
cancelCtx: cancelCtx,
|
startComplete: make(chan struct{}),
|
||||||
shutdownFlag: abool.NewBool(false),
|
Ctx: ctx,
|
||||||
waitGroup: sync.WaitGroup{},
|
cancelCtx: cancelCtx,
|
||||||
workerCnt: &workerCnt,
|
stopFlag: abool.NewBool(false),
|
||||||
taskCnt: &taskCnt,
|
workerCnt: &workerCnt,
|
||||||
microTaskCnt: µTaskCnt,
|
taskCnt: &taskCnt,
|
||||||
prep: prep,
|
microTaskCnt: µTaskCnt,
|
||||||
start: start,
|
waitGroup: sync.WaitGroup{},
|
||||||
stop: stop,
|
eventHooks: make(map[string][]*eventHook),
|
||||||
eventHooks: make(map[string][]*eventHook),
|
depNames: dependencies,
|
||||||
depNames: dependencies,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return newModule
|
return newModule
|
||||||
|
@ -177,49 +349,3 @@ func initDependencies() error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadyToPrep returns whether all dependencies are ready for this module to prep.
|
|
||||||
func (m *Module) ReadyToPrep() bool {
|
|
||||||
if m.inTransition.IsSet() || m.Prepped.IsSet() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, dep := range m.depModules {
|
|
||||||
if !dep.Prepped.IsSet() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadyToStart returns whether all dependencies are ready for this module to start.
|
|
||||||
func (m *Module) ReadyToStart() bool {
|
|
||||||
if m.inTransition.IsSet() || m.Started.IsSet() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, dep := range m.depModules {
|
|
||||||
if !dep.Started.IsSet() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadyToStop returns whether all dependencies are ready for this module to stop.
|
|
||||||
func (m *Module) ReadyToStop() bool {
|
|
||||||
if !m.Started.IsSet() || m.inTransition.IsSet() || m.Stopped.IsSet() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, revDep := range m.depReverse {
|
|
||||||
// not ready if a reverse dependency was started, but not yet stopped
|
|
||||||
if revDep.Started.IsSet() && !revDep.Stopped.IsSet() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
|
@ -5,40 +5,36 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
orderLock sync.Mutex
|
changeHistoryLock sync.Mutex
|
||||||
startOrder string
|
changeHistory string
|
||||||
shutdownOrder string
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func testPrep(t *testing.T, name string) func() error {
|
func registerTestModule(t *testing.T, name string, dependencies ...string) {
|
||||||
return func() error {
|
Register(
|
||||||
t.Logf("prep %s\n", name)
|
name,
|
||||||
return nil
|
func() error {
|
||||||
}
|
t.Logf("prep %s\n", name)
|
||||||
}
|
return nil
|
||||||
|
},
|
||||||
func testStart(t *testing.T, name string) func() error {
|
func() error {
|
||||||
return func() error {
|
changeHistoryLock.Lock()
|
||||||
orderLock.Lock()
|
defer changeHistoryLock.Unlock()
|
||||||
defer orderLock.Unlock()
|
t.Logf("start %s\n", name)
|
||||||
t.Logf("start %s\n", name)
|
changeHistory = fmt.Sprintf("%s on:%s", changeHistory, name)
|
||||||
startOrder = fmt.Sprintf("%s>%s", startOrder, name)
|
return nil
|
||||||
return nil
|
},
|
||||||
}
|
func() error {
|
||||||
}
|
changeHistoryLock.Lock()
|
||||||
|
defer changeHistoryLock.Unlock()
|
||||||
func testStop(t *testing.T, name string) func() error {
|
t.Logf("stop %s\n", name)
|
||||||
return func() error {
|
changeHistory = fmt.Sprintf("%s off:%s", changeHistory, name)
|
||||||
orderLock.Lock()
|
return nil
|
||||||
defer orderLock.Unlock()
|
},
|
||||||
t.Logf("stop %s\n", name)
|
dependencies...,
|
||||||
shutdownOrder = fmt.Sprintf("%s>%s", shutdownOrder, name)
|
)
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func testFail() error {
|
func testFail() error {
|
||||||
|
@ -53,135 +49,94 @@ func TestModules(t *testing.T) {
|
||||||
t.Parallel() // Not really, just a workaround for running these tests last.
|
t.Parallel() // Not really, just a workaround for running these tests last.
|
||||||
|
|
||||||
t.Run("TestModuleOrder", testModuleOrder)
|
t.Run("TestModuleOrder", testModuleOrder)
|
||||||
|
t.Run("TestModuleMgmt", testModuleMgmt)
|
||||||
t.Run("TestModuleErrors", testModuleErrors)
|
t.Run("TestModuleErrors", testModuleErrors)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testModuleOrder(t *testing.T) {
|
func testModuleOrder(t *testing.T) {
|
||||||
|
|
||||||
Register("database", testPrep(t, "database"), testStart(t, "database"), testStop(t, "database"))
|
registerTestModule(t, "database")
|
||||||
Register("stats", testPrep(t, "stats"), testStart(t, "stats"), testStop(t, "stats"), "database")
|
registerTestModule(t, "stats", "database")
|
||||||
Register("service", testPrep(t, "service"), testStart(t, "service"), testStop(t, "service"), "database")
|
registerTestModule(t, "service", "database")
|
||||||
Register("analytics", testPrep(t, "analytics"), testStart(t, "analytics"), testStop(t, "analytics"), "stats", "database")
|
registerTestModule(t, "analytics", "stats", "database")
|
||||||
|
|
||||||
err := Start()
|
err := Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if startOrder != ">database>service>stats>analytics" &&
|
if changeHistory != " on:database on:service on:stats on:analytics" &&
|
||||||
startOrder != ">database>stats>service>analytics" &&
|
changeHistory != " on:database on:stats on:service on:analytics" &&
|
||||||
startOrder != ">database>stats>analytics>service" {
|
changeHistory != " on:database on:stats on:analytics on:service" {
|
||||||
t.Errorf("start order mismatch, was %s", startOrder)
|
t.Errorf("start order mismatch, was %s", changeHistory)
|
||||||
}
|
}
|
||||||
|
changeHistory = ""
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-ShuttingDown():
|
|
||||||
case <-time.After(1 * time.Second):
|
|
||||||
t.Error("did not receive shutdown signal")
|
|
||||||
}
|
|
||||||
wg.Done()
|
|
||||||
}()
|
|
||||||
err = Shutdown()
|
err = Shutdown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if shutdownOrder != ">analytics>service>stats>database" &&
|
if changeHistory != " off:analytics off:service off:stats off:database" &&
|
||||||
shutdownOrder != ">analytics>stats>service>database" &&
|
changeHistory != " off:analytics off:stats off:service off:database" &&
|
||||||
shutdownOrder != ">service>analytics>stats>database" {
|
changeHistory != " off:service off:analytics off:stats off:database" {
|
||||||
t.Errorf("shutdown order mismatch, was %s", shutdownOrder)
|
t.Errorf("shutdown order mismatch, was %s", changeHistory)
|
||||||
}
|
}
|
||||||
|
changeHistory = ""
|
||||||
|
|
||||||
wg.Wait()
|
resetTestEnvironment()
|
||||||
|
|
||||||
printAndRemoveModules()
|
|
||||||
}
|
|
||||||
|
|
||||||
func printAndRemoveModules() {
|
|
||||||
modulesLock.Lock()
|
|
||||||
defer modulesLock.Unlock()
|
|
||||||
|
|
||||||
fmt.Printf("All %d modules:\n", len(modules))
|
|
||||||
for _, m := range modules {
|
|
||||||
fmt.Printf("module %s: %+v\n", m.Name, m)
|
|
||||||
}
|
|
||||||
|
|
||||||
modules = make(map[string]*Module)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func testModuleErrors(t *testing.T) {
|
func testModuleErrors(t *testing.T) {
|
||||||
|
|
||||||
// reset modules
|
|
||||||
modules = make(map[string]*Module)
|
|
||||||
startComplete.UnSet()
|
|
||||||
startCompleteSignal = make(chan struct{})
|
|
||||||
|
|
||||||
// test prep error
|
// test prep error
|
||||||
Register("prepfail", testFail, testStart(t, "prepfail"), testStop(t, "prepfail"))
|
Register("prepfail", testFail, nil, nil)
|
||||||
err := Start()
|
err := Start()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error("should fail")
|
t.Error("should fail")
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset modules
|
resetTestEnvironment()
|
||||||
modules = make(map[string]*Module)
|
|
||||||
startComplete.UnSet()
|
|
||||||
startCompleteSignal = make(chan struct{})
|
|
||||||
|
|
||||||
// test prep clean exit
|
// test prep clean exit
|
||||||
Register("prepcleanexit", testCleanExit, testStart(t, "prepcleanexit"), testStop(t, "prepcleanexit"))
|
Register("prepcleanexit", testCleanExit, nil, nil)
|
||||||
err = Start()
|
err = Start()
|
||||||
if err != ErrCleanExit {
|
if err != ErrCleanExit {
|
||||||
t.Error("should fail with clean exit")
|
t.Error("should fail with clean exit")
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset modules
|
resetTestEnvironment()
|
||||||
modules = make(map[string]*Module)
|
|
||||||
startComplete.UnSet()
|
|
||||||
startCompleteSignal = make(chan struct{})
|
|
||||||
|
|
||||||
// test invalid dependency
|
// test invalid dependency
|
||||||
Register("database", nil, testStart(t, "database"), testStop(t, "database"), "invalid")
|
Register("database", nil, nil, nil, "invalid")
|
||||||
err = Start()
|
err = Start()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error("should fail")
|
t.Error("should fail")
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset modules
|
resetTestEnvironment()
|
||||||
modules = make(map[string]*Module)
|
|
||||||
startComplete.UnSet()
|
|
||||||
startCompleteSignal = make(chan struct{})
|
|
||||||
|
|
||||||
// test dependency loop
|
// test dependency loop
|
||||||
Register("database", nil, testStart(t, "database"), testStop(t, "database"), "helper")
|
registerTestModule(t, "database", "helper")
|
||||||
Register("helper", nil, testStart(t, "helper"), testStop(t, "helper"), "database")
|
registerTestModule(t, "helper", "database")
|
||||||
err = Start()
|
err = Start()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error("should fail")
|
t.Error("should fail")
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset modules
|
resetTestEnvironment()
|
||||||
modules = make(map[string]*Module)
|
|
||||||
startComplete.UnSet()
|
|
||||||
startCompleteSignal = make(chan struct{})
|
|
||||||
|
|
||||||
// test failing module start
|
// test failing module start
|
||||||
Register("startfail", nil, testFail, testStop(t, "startfail"))
|
Register("startfail", nil, testFail, nil)
|
||||||
err = Start()
|
err = Start()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Error("should fail")
|
t.Error("should fail")
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset modules
|
resetTestEnvironment()
|
||||||
modules = make(map[string]*Module)
|
|
||||||
startComplete.UnSet()
|
|
||||||
startCompleteSignal = make(chan struct{})
|
|
||||||
|
|
||||||
// test failing module stop
|
// test failing module stop
|
||||||
Register("stopfail", nil, testStart(t, "stopfail"), testFail)
|
Register("stopfail", nil, nil, testFail)
|
||||||
err = Start()
|
err = Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error("should not fail")
|
t.Error("should not fail")
|
||||||
|
@ -191,10 +146,7 @@ func testModuleErrors(t *testing.T) {
|
||||||
t.Error("should fail")
|
t.Error("should fail")
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset modules
|
resetTestEnvironment()
|
||||||
modules = make(map[string]*Module)
|
|
||||||
startComplete.UnSet()
|
|
||||||
startCompleteSignal = make(chan struct{})
|
|
||||||
|
|
||||||
// test help flag
|
// test help flag
|
||||||
HelpFlag = true
|
HelpFlag = true
|
||||||
|
@ -204,4 +156,20 @@ func testModuleErrors(t *testing.T) {
|
||||||
}
|
}
|
||||||
HelpFlag = false
|
HelpFlag = false
|
||||||
|
|
||||||
|
resetTestEnvironment()
|
||||||
|
}
|
||||||
|
|
||||||
|
func printModules() { //nolint:unused,deadcode
|
||||||
|
fmt.Printf("All %d modules:\n", len(modules))
|
||||||
|
for _, m := range modules {
|
||||||
|
fmt.Printf("module %s: %+v\n", m.Name, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func resetTestEnvironment() {
|
||||||
|
modules = make(map[string]*Module)
|
||||||
|
shutdownSignal = make(chan struct{})
|
||||||
|
shutdownCompleteSignal = make(chan struct{})
|
||||||
|
shutdownFlag.UnSet()
|
||||||
|
modulesLocked.UnSet()
|
||||||
}
|
}
|
||||||
|
|
160
modules/start.go
160
modules/start.go
|
@ -1,34 +1,38 @@
|
||||||
package modules
|
package modules
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
|
||||||
|
"github.com/tevino/abool"
|
||||||
|
|
||||||
"github.com/safing/portbase/log"
|
"github.com/safing/portbase/log"
|
||||||
"github.com/tevino/abool"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
startComplete = abool.NewBool(false)
|
initialStartCompleted = abool.NewBool(false)
|
||||||
startCompleteSignal = make(chan struct{})
|
globalPrepFn func() error
|
||||||
)
|
)
|
||||||
|
|
||||||
// StartCompleted returns whether starting has completed.
|
// SetGlobalPrepFn sets a global prep function that is run before all modules. This can be used to pre-initialize modules, such as setting the data root or database path.
|
||||||
func StartCompleted() bool {
|
// SetGlobalPrepFn sets a global prep function that is run before all modules.
|
||||||
return startComplete.IsSet()
|
func SetGlobalPrepFn(fn func() error) {
|
||||||
}
|
if globalPrepFn == nil {
|
||||||
|
globalPrepFn = fn
|
||||||
// WaitForStartCompletion returns as soon as starting has completed.
|
}
|
||||||
func WaitForStartCompletion() <-chan struct{} {
|
|
||||||
return startCompleteSignal
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts all modules in the correct order. In case of an error, it will automatically shutdown again.
|
// Start starts all modules in the correct order. In case of an error, it will automatically shutdown again.
|
||||||
func Start() error {
|
func Start() error {
|
||||||
modulesLock.RLock()
|
if !modulesLocked.SetToIf(false, true) {
|
||||||
defer modulesLock.RUnlock()
|
return errors.New("module system already started")
|
||||||
|
}
|
||||||
|
|
||||||
|
// lock mgmt
|
||||||
|
mgmtLock.Lock()
|
||||||
|
defer mgmtLock.Unlock()
|
||||||
|
|
||||||
// start microtask scheduler
|
// start microtask scheduler
|
||||||
go microTaskScheduler()
|
go microTaskScheduler()
|
||||||
|
@ -48,6 +52,17 @@ func Start() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// execute global prep fn
|
||||||
|
if globalPrepFn != nil {
|
||||||
|
err = globalPrepFn()
|
||||||
|
if err != nil {
|
||||||
|
if err != ErrCleanExit {
|
||||||
|
fmt.Fprintf(os.Stderr, "CRITICAL ERROR: %s\n", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// prep modules
|
// prep modules
|
||||||
err = prepareModules()
|
err = prepareModules()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -65,6 +80,9 @@ func Start() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// build dependency tree
|
||||||
|
buildEnabledTree()
|
||||||
|
|
||||||
// start modules
|
// start modules
|
||||||
log.Info("modules: initiating...")
|
log.Info("modules: initiating...")
|
||||||
err = startModules()
|
err = startModules()
|
||||||
|
@ -75,13 +93,11 @@ func Start() error {
|
||||||
|
|
||||||
// complete startup
|
// complete startup
|
||||||
log.Infof("modules: started %d modules", len(modules))
|
log.Infof("modules: started %d modules", len(modules))
|
||||||
if startComplete.SetToIf(false, true) {
|
|
||||||
close(startCompleteSignal)
|
|
||||||
}
|
|
||||||
|
|
||||||
go taskQueueHandler()
|
go taskQueueHandler()
|
||||||
go taskScheduleHandler()
|
go taskScheduleHandler()
|
||||||
|
|
||||||
|
initialStartCompleted.Set()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,45 +113,36 @@ func prepareModules() error {
|
||||||
reportCnt := 0
|
reportCnt := 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
waiting := 0
|
||||||
|
|
||||||
// find modules to exec
|
// find modules to exec
|
||||||
for _, m := range modules {
|
for _, m := range modules {
|
||||||
if m.ReadyToPrep() {
|
switch m.readyToPrep() {
|
||||||
|
case statusNothingToDo:
|
||||||
|
case statusWaiting:
|
||||||
|
waiting++
|
||||||
|
case statusReady:
|
||||||
execCnt++
|
execCnt++
|
||||||
m.inTransition.Set()
|
m.prep(reports)
|
||||||
|
|
||||||
execM := m
|
|
||||||
go func() {
|
|
||||||
reports <- &report{
|
|
||||||
module: execM,
|
|
||||||
err: execM.runCtrlFnWithTimeout(
|
|
||||||
"prep module",
|
|
||||||
10*time.Second,
|
|
||||||
execM.prep,
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check for dep loop
|
if reportCnt < execCnt {
|
||||||
if execCnt == reportCnt {
|
// wait for reports
|
||||||
return fmt.Errorf("modules: dependency loop detected, cannot continue")
|
rep = <-reports
|
||||||
}
|
if rep.err != nil {
|
||||||
|
if rep.err == ErrCleanExit {
|
||||||
// wait for reports
|
return rep.err
|
||||||
rep = <-reports
|
}
|
||||||
rep.module.inTransition.UnSet()
|
return fmt.Errorf("failed to prep module %s: %s", rep.module.Name, rep.err)
|
||||||
if rep.err != nil {
|
}
|
||||||
if rep.err == ErrCleanExit {
|
reportCnt++
|
||||||
return rep.err
|
} else {
|
||||||
|
// finished
|
||||||
|
if waiting > 0 {
|
||||||
|
// check for dep loop
|
||||||
|
return fmt.Errorf("modules: dependency loop detected, cannot continue")
|
||||||
}
|
}
|
||||||
return fmt.Errorf("failed to prep module %s: %s", rep.module.Name, rep.err)
|
|
||||||
}
|
|
||||||
reportCnt++
|
|
||||||
rep.module.Prepped.Set()
|
|
||||||
|
|
||||||
// exit if done
|
|
||||||
if reportCnt == len(modules) {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,45 +156,36 @@ func startModules() error {
|
||||||
reportCnt := 0
|
reportCnt := 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
waiting := 0
|
||||||
|
|
||||||
// find modules to exec
|
// find modules to exec
|
||||||
for _, m := range modules {
|
for _, m := range modules {
|
||||||
if m.ReadyToStart() {
|
switch m.readyToStart() {
|
||||||
|
case statusNothingToDo:
|
||||||
|
case statusWaiting:
|
||||||
|
waiting++
|
||||||
|
case statusReady:
|
||||||
execCnt++
|
execCnt++
|
||||||
m.inTransition.Set()
|
m.start(reports)
|
||||||
|
|
||||||
execM := m
|
|
||||||
go func() {
|
|
||||||
reports <- &report{
|
|
||||||
module: execM,
|
|
||||||
err: execM.runCtrlFnWithTimeout(
|
|
||||||
"start module",
|
|
||||||
60*time.Second,
|
|
||||||
execM.start,
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check for dep loop
|
if reportCnt < execCnt {
|
||||||
if execCnt == reportCnt {
|
// wait for reports
|
||||||
return fmt.Errorf("modules: dependency loop detected, cannot continue")
|
rep = <-reports
|
||||||
}
|
if rep.err != nil {
|
||||||
|
return fmt.Errorf("modules: could not start module %s: %s", rep.module.Name, rep.err)
|
||||||
// wait for reports
|
}
|
||||||
rep = <-reports
|
reportCnt++
|
||||||
rep.module.inTransition.UnSet()
|
log.Infof("modules: started %s", rep.module.Name)
|
||||||
if rep.err != nil {
|
} else {
|
||||||
return fmt.Errorf("modules: could not start module %s: %s", rep.module.Name, rep.err)
|
// finished
|
||||||
}
|
if waiting > 0 {
|
||||||
reportCnt++
|
// check for dep loop
|
||||||
rep.module.Started.Set()
|
return fmt.Errorf("modules: dependency loop detected, cannot continue")
|
||||||
log.Infof("modules: started %s", rep.module.Name)
|
}
|
||||||
|
// return last error
|
||||||
// exit if done
|
|
||||||
if reportCnt == len(modules) {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
171
modules/status.go
Normal file
171
modules/status.go
Normal file
|
@ -0,0 +1,171 @@
|
||||||
|
package modules
|
||||||
|
|
||||||
|
// Module Status Values
|
||||||
|
const (
|
||||||
|
StatusDead uint8 = 0 // not prepared, not started
|
||||||
|
StatusPreparing uint8 = 1
|
||||||
|
StatusOffline uint8 = 2 // prepared, not started
|
||||||
|
StatusStopping uint8 = 3
|
||||||
|
StatusStarting uint8 = 4
|
||||||
|
StatusOnline uint8 = 5 // online and running
|
||||||
|
)
|
||||||
|
|
||||||
|
// Module Failure Status Values
|
||||||
|
const (
|
||||||
|
FailureNone uint8 = 0
|
||||||
|
FailureHint uint8 = 1
|
||||||
|
FailureWarning uint8 = 2
|
||||||
|
FailureError uint8 = 3
|
||||||
|
)
|
||||||
|
|
||||||
|
// ready status
|
||||||
|
const (
|
||||||
|
statusWaiting uint8 = iota
|
||||||
|
statusReady
|
||||||
|
statusNothingToDo
|
||||||
|
)
|
||||||
|
|
||||||
|
// Online returns whether the module is online.
|
||||||
|
func (m *Module) Online() bool {
|
||||||
|
return m.Status() == StatusOnline
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnlineSoon returns whether the module is or is about to be online.
|
||||||
|
func (m *Module) OnlineSoon() bool {
|
||||||
|
if moduleMgmtEnabled.IsSet() &&
|
||||||
|
!m.enabled.IsSet() &&
|
||||||
|
!m.enabledAsDependency.IsSet() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return !m.stopFlag.IsSet()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Status returns the current module status.
|
||||||
|
func (m *Module) Status() uint8 {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
|
||||||
|
return m.status
|
||||||
|
}
|
||||||
|
|
||||||
|
// FailureStatus returns the current failure status, ID and message.
|
||||||
|
func (m *Module) FailureStatus() (failureStatus uint8, failureID, failureMsg string) {
|
||||||
|
m.RLock()
|
||||||
|
defer m.RUnlock()
|
||||||
|
|
||||||
|
return m.failureStatus, m.failureID, m.failureMsg
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hint sets failure status to hint. This is a somewhat special failure status, as the module is believed to be working correctly, but there is an important module specific information to convey. The supplied failureID is for improved automatic handling within connected systems, the failureMsg is for humans.
|
||||||
|
func (m *Module) Hint(failureID, failureMsg string) {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
|
m.failureStatus = FailureHint
|
||||||
|
m.failureID = failureID
|
||||||
|
m.failureMsg = failureMsg
|
||||||
|
|
||||||
|
m.notifyOfChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Warning sets failure status to warning. The supplied failureID is for improved automatic handling within connected systems, the failureMsg is for humans.
|
||||||
|
func (m *Module) Warning(failureID, failureMsg string) {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
|
m.failureStatus = FailureWarning
|
||||||
|
m.failureID = failureID
|
||||||
|
m.failureMsg = failureMsg
|
||||||
|
|
||||||
|
m.notifyOfChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error sets failure status to error. The supplied failureID is for improved automatic handling within connected systems, the failureMsg is for humans.
|
||||||
|
func (m *Module) Error(failureID, failureMsg string) {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
|
m.failureStatus = FailureError
|
||||||
|
m.failureID = failureID
|
||||||
|
m.failureMsg = failureMsg
|
||||||
|
|
||||||
|
m.notifyOfChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve removes the failure state from the module if the given failureID matches the current failure ID. If the given failureID is an empty string, Resolve removes any failure state.
|
||||||
|
func (m *Module) Resolve(failureID string) {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
|
if failureID == "" || failureID == m.failureID {
|
||||||
|
m.failureStatus = FailureNone
|
||||||
|
m.failureID = ""
|
||||||
|
m.failureMsg = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
m.notifyOfChange()
|
||||||
|
}
|
||||||
|
|
||||||
|
// readyToPrep returns whether all dependencies are ready for this module to prep.
|
||||||
|
func (m *Module) readyToPrep() uint8 {
|
||||||
|
// check if valid state for prepping
|
||||||
|
if m.Status() != StatusDead {
|
||||||
|
return statusNothingToDo
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, dep := range m.depModules {
|
||||||
|
if dep.Status() < StatusOffline {
|
||||||
|
return statusWaiting
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return statusReady
|
||||||
|
}
|
||||||
|
|
||||||
|
// readyToStart returns whether all dependencies are ready for this module to start.
|
||||||
|
func (m *Module) readyToStart() uint8 {
|
||||||
|
// check if start is wanted
|
||||||
|
if moduleMgmtEnabled.IsSet() {
|
||||||
|
if !m.enabled.IsSet() && !m.enabledAsDependency.IsSet() {
|
||||||
|
return statusNothingToDo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if valid state for starting
|
||||||
|
if m.Status() != StatusOffline {
|
||||||
|
return statusNothingToDo
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if all dependencies are ready
|
||||||
|
for _, dep := range m.depModules {
|
||||||
|
if dep.Status() < StatusOnline {
|
||||||
|
return statusWaiting
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return statusReady
|
||||||
|
}
|
||||||
|
|
||||||
|
// readyToStop returns whether all dependencies are ready for this module to stop.
|
||||||
|
func (m *Module) readyToStop() uint8 {
|
||||||
|
// check if stop is wanted
|
||||||
|
if moduleMgmtEnabled.IsSet() && !shutdownFlag.IsSet() {
|
||||||
|
if m.enabled.IsSet() || m.enabledAsDependency.IsSet() {
|
||||||
|
return statusNothingToDo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if valid state for stopping
|
||||||
|
if m.Status() != StatusOnline {
|
||||||
|
return statusNothingToDo
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, revDep := range m.depReverse {
|
||||||
|
// not ready if a reverse dependency was started, but not yet stopped
|
||||||
|
if revDep.Status() > StatusOffline {
|
||||||
|
return statusWaiting
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return statusReady
|
||||||
|
}
|
|
@ -10,8 +10,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
shutdownSignal = make(chan struct{})
|
shutdownSignal = make(chan struct{})
|
||||||
shutdownSignalClosed = abool.NewBool(false)
|
shutdownFlag = abool.NewBool(false)
|
||||||
|
|
||||||
shutdownCompleteSignal = make(chan struct{})
|
shutdownCompleteSignal = make(chan struct{})
|
||||||
)
|
)
|
||||||
|
@ -23,18 +23,19 @@ func ShuttingDown() <-chan struct{} {
|
||||||
|
|
||||||
// Shutdown stops all modules in the correct order.
|
// Shutdown stops all modules in the correct order.
|
||||||
func Shutdown() error {
|
func Shutdown() error {
|
||||||
|
// lock mgmt
|
||||||
|
mgmtLock.Lock()
|
||||||
|
defer mgmtLock.Unlock()
|
||||||
|
|
||||||
if shutdownSignalClosed.SetToIf(false, true) {
|
if shutdownFlag.SetToIf(false, true) {
|
||||||
close(shutdownSignal)
|
close(shutdownSignal)
|
||||||
} else {
|
} else {
|
||||||
// shutdown was already issued
|
// shutdown was already issued
|
||||||
return errors.New("shutdown already initiated")
|
return errors.New("shutdown already initiated")
|
||||||
}
|
}
|
||||||
|
|
||||||
if startComplete.IsSet() {
|
if initialStartCompleted.IsSet() {
|
||||||
log.Warning("modules: starting shutdown...")
|
log.Warning("modules: starting shutdown...")
|
||||||
modulesLock.Lock()
|
|
||||||
defer modulesLock.Unlock()
|
|
||||||
} else {
|
} else {
|
||||||
log.Warning("modules: aborting, shutting down...")
|
log.Warning("modules: aborting, shutting down...")
|
||||||
}
|
}
|
||||||
|
@ -61,46 +62,42 @@ func stopModules() error {
|
||||||
// get number of started modules
|
// get number of started modules
|
||||||
startedCnt := 0
|
startedCnt := 0
|
||||||
for _, m := range modules {
|
for _, m := range modules {
|
||||||
if m.Started.IsSet() {
|
if m.Status() >= StatusStarting {
|
||||||
startedCnt++
|
startedCnt++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
waiting := 0
|
||||||
|
|
||||||
// find modules to exec
|
// find modules to exec
|
||||||
for _, m := range modules {
|
for _, m := range modules {
|
||||||
if m.ReadyToStop() {
|
switch m.readyToStop() {
|
||||||
|
case statusNothingToDo:
|
||||||
|
case statusWaiting:
|
||||||
|
waiting++
|
||||||
|
case statusReady:
|
||||||
execCnt++
|
execCnt++
|
||||||
m.inTransition.Set()
|
m.stop(reports)
|
||||||
|
|
||||||
execM := m
|
|
||||||
go func() {
|
|
||||||
reports <- &report{
|
|
||||||
module: execM,
|
|
||||||
err: execM.shutdown(),
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check for dep loop
|
if reportCnt < execCnt {
|
||||||
if execCnt == reportCnt {
|
// wait for reports
|
||||||
return fmt.Errorf("modules: dependency loop detected, cannot continue")
|
rep = <-reports
|
||||||
}
|
if rep.err != nil {
|
||||||
|
lastErr = rep.err
|
||||||
// wait for reports
|
log.Warningf("modules: could not stop module %s: %s", rep.module.Name, rep.err)
|
||||||
rep = <-reports
|
}
|
||||||
rep.module.inTransition.UnSet()
|
reportCnt++
|
||||||
if rep.err != nil {
|
log.Infof("modules: stopped %s", rep.module.Name)
|
||||||
lastErr = rep.err
|
} else {
|
||||||
log.Warningf("modules: could not stop module %s: %s", rep.module.Name, rep.err)
|
// finished
|
||||||
}
|
if waiting > 0 {
|
||||||
reportCnt++
|
// check for dep loop
|
||||||
rep.module.Stopped.Set()
|
return fmt.Errorf("modules: dependency loop detected, cannot continue")
|
||||||
log.Infof("modules: stopped %s", rep.module.Name)
|
}
|
||||||
|
// return last error
|
||||||
// exit if done
|
|
||||||
if reportCnt == startedCnt {
|
|
||||||
return lastErr
|
return lastErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
83
modules/subsystems/module.go
Normal file
83
modules/subsystems/module.go
Normal file
|
@ -0,0 +1,83 @@
|
||||||
|
package subsystems
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/database"
|
||||||
|
_ "github.com/safing/portbase/database/dbmodule" // database module is required
|
||||||
|
"github.com/safing/portbase/modules"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
configChangeEvent = "config change"
|
||||||
|
subsystemsStatusChange = "status change"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
module *modules.Module
|
||||||
|
|
||||||
|
databaseKeySpace string
|
||||||
|
db = database.NewInterface(nil)
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// enable partial starting
|
||||||
|
modules.EnableModuleManagement(handleModuleChanges)
|
||||||
|
|
||||||
|
// register module and enable it for starting
|
||||||
|
module = modules.Register("subsystems", prep, start, nil, "config", "database")
|
||||||
|
module.Enable()
|
||||||
|
|
||||||
|
// register event for changes in the subsystem
|
||||||
|
module.RegisterEvent(subsystemsStatusChange)
|
||||||
|
}
|
||||||
|
|
||||||
|
func prep() error {
|
||||||
|
return module.RegisterEventHook("config", configChangeEvent, "control subsystems", handleConfigChanges)
|
||||||
|
}
|
||||||
|
|
||||||
|
func start() error {
|
||||||
|
// lock registration
|
||||||
|
subsystemsLocked.Set()
|
||||||
|
|
||||||
|
// lock slice and map
|
||||||
|
subsystemsLock.Lock()
|
||||||
|
// go through all dependencies
|
||||||
|
seen := make(map[string]struct{})
|
||||||
|
for _, sub := range subsystems {
|
||||||
|
// add main module
|
||||||
|
sub.Dependencies = append(sub.Dependencies, statusFromModule(sub.module))
|
||||||
|
// add dependencies
|
||||||
|
sub.addDependencies(sub.module, seen)
|
||||||
|
}
|
||||||
|
// unlock
|
||||||
|
subsystemsLock.Unlock()
|
||||||
|
|
||||||
|
// apply config
|
||||||
|
return handleConfigChanges(module.Ctx, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sub *Subsystem) addDependencies(module *modules.Module, seen map[string]struct{}) {
|
||||||
|
for _, module := range module.Dependencies() {
|
||||||
|
_, ok := seen[module.Name]
|
||||||
|
if !ok {
|
||||||
|
// add dependency to modules
|
||||||
|
sub.Dependencies = append(sub.Dependencies, statusFromModule(module))
|
||||||
|
// mark as seen
|
||||||
|
seen[module.Name] = struct{}{}
|
||||||
|
// add further dependencies
|
||||||
|
sub.addDependencies(module, seen)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetDatabaseKeySpace sets a key space where subsystem status
|
||||||
|
func SetDatabaseKeySpace(keySpace string) {
|
||||||
|
if databaseKeySpace == "" {
|
||||||
|
databaseKeySpace = keySpace
|
||||||
|
|
||||||
|
if !strings.HasSuffix(databaseKeySpace, "/") {
|
||||||
|
databaseKeySpace += "/"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
117
modules/subsystems/subsystem.go
Normal file
117
modules/subsystems/subsystem.go
Normal file
|
@ -0,0 +1,117 @@
|
||||||
|
package subsystems
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/config"
|
||||||
|
"github.com/safing/portbase/database/record"
|
||||||
|
"github.com/safing/portbase/log"
|
||||||
|
"github.com/safing/portbase/modules"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Subsystem describes a subset of modules that represent a part of a service or program to the user.
|
||||||
|
type Subsystem struct { //nolint:maligned // not worth the effort
|
||||||
|
record.Base
|
||||||
|
sync.Mutex
|
||||||
|
|
||||||
|
Name string
|
||||||
|
Description string
|
||||||
|
module *modules.Module
|
||||||
|
|
||||||
|
Status *ModuleStatus
|
||||||
|
Dependencies []*ModuleStatus
|
||||||
|
FailureStatus uint8
|
||||||
|
|
||||||
|
ToggleOptionKey string // empty == forced on
|
||||||
|
toggleOption *config.Option
|
||||||
|
toggleValue func() bool
|
||||||
|
ExpertiseLevel uint8 // copied from toggleOption
|
||||||
|
ReleaseLevel uint8 // copied from toggleOption
|
||||||
|
|
||||||
|
ConfigKeySpace string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ModuleStatus describes the status of a module.
|
||||||
|
type ModuleStatus struct {
|
||||||
|
Name string
|
||||||
|
module *modules.Module
|
||||||
|
|
||||||
|
// status mgmt
|
||||||
|
Enabled bool
|
||||||
|
Status uint8
|
||||||
|
|
||||||
|
// failure status
|
||||||
|
FailureStatus uint8
|
||||||
|
FailureID string
|
||||||
|
FailureMsg string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save saves the Subsystem Status to the database.
|
||||||
|
func (sub *Subsystem) Save() {
|
||||||
|
if databaseKeySpace != "" {
|
||||||
|
// sub.SetKey() // FIXME
|
||||||
|
err := db.Put(sub)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("subsystems: could not save subsystem status to database: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func statusFromModule(module *modules.Module) *ModuleStatus {
|
||||||
|
status := &ModuleStatus{
|
||||||
|
Name: module.Name,
|
||||||
|
module: module,
|
||||||
|
Enabled: module.Enabled(),
|
||||||
|
Status: module.Status(),
|
||||||
|
}
|
||||||
|
status.FailureStatus, status.FailureID, status.FailureMsg = module.FailureStatus()
|
||||||
|
|
||||||
|
return status
|
||||||
|
}
|
||||||
|
|
||||||
|
func compareAndUpdateStatus(module *modules.Module, status *ModuleStatus) (changed bool) {
|
||||||
|
// check if enabled
|
||||||
|
enabled := module.Enabled()
|
||||||
|
if status.Enabled != enabled {
|
||||||
|
status.Enabled = enabled
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// check status
|
||||||
|
statusLvl := module.Status()
|
||||||
|
if status.Status != statusLvl {
|
||||||
|
status.Status = statusLvl
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// check failure status
|
||||||
|
failureStatus, failureID, failureMsg := module.FailureStatus()
|
||||||
|
if status.FailureStatus != failureStatus ||
|
||||||
|
status.FailureID != failureID {
|
||||||
|
status.FailureStatus = failureStatus
|
||||||
|
status.FailureID = failureID
|
||||||
|
status.FailureMsg = failureMsg
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sub *Subsystem) makeSummary() {
|
||||||
|
// find worst failing module
|
||||||
|
worstFailing := &ModuleStatus{}
|
||||||
|
if sub.Status.FailureStatus > worstFailing.FailureStatus {
|
||||||
|
worstFailing = sub.Status
|
||||||
|
}
|
||||||
|
for _, depStatus := range sub.Dependencies {
|
||||||
|
if depStatus.FailureStatus > worstFailing.FailureStatus {
|
||||||
|
worstFailing = depStatus
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if worstFailing != nil {
|
||||||
|
sub.FailureStatus = worstFailing.FailureStatus
|
||||||
|
} else {
|
||||||
|
sub.FailureStatus = 0
|
||||||
|
}
|
||||||
|
}
|
158
modules/subsystems/subsystems.go
Normal file
158
modules/subsystems/subsystems.go
Normal file
|
@ -0,0 +1,158 @@
|
||||||
|
package subsystems
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/tevino/abool"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/config"
|
||||||
|
"github.com/safing/portbase/modules"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
subsystems []*Subsystem
|
||||||
|
subsystemsMap = make(map[string]*Subsystem)
|
||||||
|
subsystemsLock sync.Mutex
|
||||||
|
subsystemsLocked = abool.New()
|
||||||
|
|
||||||
|
handlingConfigChanges = abool.New()
|
||||||
|
)
|
||||||
|
|
||||||
|
// Register registers a new subsystem. The given option must be a bool option. Should be called in the module's prep function. The config option must not yet be registered and will be registered for you.
|
||||||
|
func Register(name, description string, module *modules.Module, configKeySpace string, option *config.Option) error {
|
||||||
|
// lock slice and map
|
||||||
|
subsystemsLock.Lock()
|
||||||
|
defer subsystemsLock.Unlock()
|
||||||
|
|
||||||
|
// check if registration is closed
|
||||||
|
if subsystemsLocked.IsSet() {
|
||||||
|
return errors.New("subsystems can only be registered in prep phase")
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if already registered
|
||||||
|
_, ok := subsystemsMap[name]
|
||||||
|
if ok {
|
||||||
|
return fmt.Errorf(`subsystem "%s" already registered`, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// create new
|
||||||
|
new := &Subsystem{
|
||||||
|
Name: name,
|
||||||
|
Description: description,
|
||||||
|
module: module,
|
||||||
|
|
||||||
|
Status: statusFromModule(module),
|
||||||
|
|
||||||
|
toggleOption: option,
|
||||||
|
ConfigKeySpace: configKeySpace,
|
||||||
|
}
|
||||||
|
if new.toggleOption != nil {
|
||||||
|
new.ToggleOptionKey = new.toggleOption.Key
|
||||||
|
new.ExpertiseLevel = new.toggleOption.ExpertiseLevel
|
||||||
|
new.ReleaseLevel = new.toggleOption.ReleaseLevel
|
||||||
|
}
|
||||||
|
|
||||||
|
// register config
|
||||||
|
if option != nil {
|
||||||
|
err := config.Register(option)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to register config: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
new.toggleValue = config.GetAsBool(new.ToggleOptionKey, false)
|
||||||
|
|
||||||
|
// add to lists
|
||||||
|
subsystemsMap[name] = new
|
||||||
|
subsystems = append(subsystems, new)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleModuleChanges(m *modules.Module) {
|
||||||
|
// check if ready
|
||||||
|
if !subsystemsLocked.IsSet() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// find module status
|
||||||
|
var moduleSubsystem *Subsystem
|
||||||
|
var moduleStatus *ModuleStatus
|
||||||
|
subsystemLoop:
|
||||||
|
for _, subsystem := range subsystems {
|
||||||
|
if m.Name == subsystem.Status.Name {
|
||||||
|
moduleSubsystem = subsystem
|
||||||
|
moduleStatus = subsystem.Status
|
||||||
|
break subsystemLoop
|
||||||
|
}
|
||||||
|
for _, status := range subsystem.Dependencies {
|
||||||
|
if m.Name == status.Name {
|
||||||
|
moduleSubsystem = subsystem
|
||||||
|
moduleStatus = status
|
||||||
|
break subsystemLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// abort if not found
|
||||||
|
if moduleSubsystem == nil || moduleStatus == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// update status
|
||||||
|
moduleSubsystem.Lock()
|
||||||
|
changed := compareAndUpdateStatus(m, moduleStatus)
|
||||||
|
if changed {
|
||||||
|
moduleSubsystem.makeSummary()
|
||||||
|
}
|
||||||
|
moduleSubsystem.Unlock()
|
||||||
|
|
||||||
|
// save
|
||||||
|
if changed {
|
||||||
|
moduleSubsystem.Save()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleConfigChanges(ctx context.Context, data interface{}) error {
|
||||||
|
// check if ready
|
||||||
|
if !subsystemsLocked.IsSet() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// potentially catch multiple changes
|
||||||
|
if handlingConfigChanges.SetToIf(false, true) {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
handlingConfigChanges.UnSet()
|
||||||
|
} else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// only run one instance at any time
|
||||||
|
subsystemsLock.Lock()
|
||||||
|
defer subsystemsLock.Unlock()
|
||||||
|
|
||||||
|
var changed bool
|
||||||
|
for _, subsystem := range subsystems {
|
||||||
|
if subsystem.module.SetEnabled(subsystem.toggleValue()) {
|
||||||
|
// if changed
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// trigger module management if any setting was changed
|
||||||
|
if changed {
|
||||||
|
err := modules.ManageModules()
|
||||||
|
if err != nil {
|
||||||
|
module.Error(
|
||||||
|
"modulemgmt-failed",
|
||||||
|
fmt.Sprintf("The subsystem framework failed to start or stop one or more modules.\nError: %s\nCheck logs for more information.", err),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
module.Resolve("modulemgmt-failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
132
modules/subsystems/subsystems_test.go
Normal file
132
modules/subsystems/subsystems_test.go
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
package subsystems
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/config"
|
||||||
|
"github.com/safing/portbase/dataroot"
|
||||||
|
"github.com/safing/portbase/modules"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSubsystems(t *testing.T) {
|
||||||
|
// tmp dir for data root (db & config)
|
||||||
|
tmpDir, err := ioutil.TempDir("", "portbase-testing-")
|
||||||
|
// initialize data dir
|
||||||
|
if err == nil {
|
||||||
|
err = dataroot.Initialize(tmpDir, 0755)
|
||||||
|
}
|
||||||
|
// handle setup error
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// register
|
||||||
|
|
||||||
|
baseModule := modules.Register("base", nil, nil, nil)
|
||||||
|
err = Register(
|
||||||
|
"Base",
|
||||||
|
"Framework Groundwork",
|
||||||
|
baseModule,
|
||||||
|
"config:base",
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
feature1 := modules.Register("feature1", nil, nil, nil)
|
||||||
|
err = Register(
|
||||||
|
"Feature One",
|
||||||
|
"Provides feature one",
|
||||||
|
feature1,
|
||||||
|
"config:feature1",
|
||||||
|
&config.Option{
|
||||||
|
Name: "Enable Feature One",
|
||||||
|
Key: "config:subsystems/feature1",
|
||||||
|
Description: "This option enables feature 1",
|
||||||
|
OptType: config.OptTypeBool,
|
||||||
|
DefaultValue: false,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sub1 := subsystemsMap["Feature One"]
|
||||||
|
|
||||||
|
feature2 := modules.Register("feature2", nil, nil, nil)
|
||||||
|
err = Register(
|
||||||
|
"Feature Two",
|
||||||
|
"Provides feature two",
|
||||||
|
feature2,
|
||||||
|
"config:feature2",
|
||||||
|
&config.Option{
|
||||||
|
Name: "Enable Feature One",
|
||||||
|
Key: "config:subsystems/feature2",
|
||||||
|
Description: "This option enables feature 2",
|
||||||
|
OptType: config.OptTypeBool,
|
||||||
|
DefaultValue: false,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// start
|
||||||
|
err = modules.Start()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// test
|
||||||
|
|
||||||
|
// let module fail
|
||||||
|
feature1.Error("test-fail", "Testing Fail")
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
if sub1.FailureStatus != modules.FailureError {
|
||||||
|
t.Fatal("error did not propagate")
|
||||||
|
}
|
||||||
|
|
||||||
|
// resolve
|
||||||
|
feature1.Resolve("test-fail")
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
if sub1.FailureStatus != modules.FailureNone {
|
||||||
|
t.Fatal("error resolving did not propagate")
|
||||||
|
}
|
||||||
|
|
||||||
|
// update settings
|
||||||
|
err = config.SetConfigOption("config:subsystems/feature2", true)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
if !feature2.Enabled() {
|
||||||
|
t.Fatal("failed to enable feature2")
|
||||||
|
}
|
||||||
|
if feature2.Status() != modules.StatusOnline {
|
||||||
|
t.Fatal("feature2 did not start")
|
||||||
|
}
|
||||||
|
|
||||||
|
// update settings
|
||||||
|
err = config.SetConfigOption("config:subsystems/feature2", false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
if feature2.Enabled() {
|
||||||
|
t.Fatal("failed to disable feature2")
|
||||||
|
}
|
||||||
|
if feature2.Status() != modules.StatusOffline {
|
||||||
|
t.Fatal("feature2 did not stop")
|
||||||
|
}
|
||||||
|
|
||||||
|
// clean up and exit
|
||||||
|
os.RemoveAll(tmpDir)
|
||||||
|
}
|
159
modules/tasks.go
159
modules/tasks.go
|
@ -8,21 +8,24 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tevino/abool"
|
|
||||||
|
|
||||||
"github.com/safing/portbase/log"
|
"github.com/safing/portbase/log"
|
||||||
|
"github.com/tevino/abool"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Task is managed task bound to a module.
|
// Task is managed task bound to a module.
|
||||||
type Task struct {
|
type Task struct {
|
||||||
name string
|
name string
|
||||||
module *Module
|
module *Module
|
||||||
taskFn func(context.Context, *Task)
|
taskFn func(context.Context, *Task) error
|
||||||
|
|
||||||
queued bool
|
queued bool
|
||||||
canceled bool
|
canceled bool
|
||||||
executing bool
|
executing bool
|
||||||
cancelFunc func()
|
|
||||||
|
// these are populated at task creation
|
||||||
|
// ctx is canceled when task is shutdown -> all tasks become canceled
|
||||||
|
ctx context.Context
|
||||||
|
cancelCtx func()
|
||||||
|
|
||||||
executeAt time.Time
|
executeAt time.Time
|
||||||
repeat time.Duration
|
repeat time.Duration
|
||||||
|
@ -59,25 +62,46 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewTask creates a new task with a descriptive name (non-unique), a optional deadline, and the task function to be executed. You must call one of Queue, Prioritize, StartASAP, Schedule or Repeat in order to have the Task executed.
|
// NewTask creates a new task with a descriptive name (non-unique), a optional deadline, and the task function to be executed. You must call one of Queue, Prioritize, StartASAP, Schedule or Repeat in order to have the Task executed.
|
||||||
func (m *Module) NewTask(name string, fn func(context.Context, *Task)) *Task {
|
func (m *Module) NewTask(name string, fn func(context.Context, *Task) error) *Task {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
if m == nil {
|
if m == nil {
|
||||||
log.Errorf(`modules: cannot create task "%s" with nil module`, name)
|
log.Errorf(`modules: cannot create task "%s" with nil module`, name)
|
||||||
|
return &Task{
|
||||||
|
name: name,
|
||||||
|
module: &Module{Name: "[NONE]"},
|
||||||
|
canceled: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if m.Ctx == nil || !m.OnlineSoon() {
|
||||||
|
log.Errorf(`modules: tasks should only be started when the module is online or starting`)
|
||||||
|
return &Task{
|
||||||
|
name: name,
|
||||||
|
module: m,
|
||||||
|
canceled: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Task{
|
// create new task
|
||||||
|
new := &Task{
|
||||||
name: name,
|
name: name,
|
||||||
module: m,
|
module: m,
|
||||||
taskFn: fn,
|
taskFn: fn,
|
||||||
maxDelay: defaultMaxDelay,
|
maxDelay: defaultMaxDelay,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create context
|
||||||
|
new.ctx, new.cancelCtx = context.WithCancel(m.Ctx)
|
||||||
|
|
||||||
|
return new
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) isActive() bool {
|
func (t *Task) isActive() bool {
|
||||||
if t.module == nil {
|
if t.canceled {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
return t.module.OnlineSoon()
|
||||||
return !t.canceled && !t.module.ShutdownInProgress()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) prepForQueueing() (ok bool) {
|
func (t *Task) prepForQueueing() (ok bool) {
|
||||||
|
@ -197,45 +221,15 @@ func (t *Task) Repeat(interval time.Duration) *Task {
|
||||||
func (t *Task) Cancel() {
|
func (t *Task) Cancel() {
|
||||||
t.lock.Lock()
|
t.lock.Lock()
|
||||||
t.canceled = true
|
t.canceled = true
|
||||||
if t.cancelFunc != nil {
|
if t.cancelCtx != nil {
|
||||||
t.cancelFunc()
|
t.cancelCtx()
|
||||||
}
|
}
|
||||||
t.lock.Unlock()
|
t.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) runWithLocking() {
|
func (t *Task) removeFromQueues() {
|
||||||
if t.module == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for good timeslot regarding microtasks
|
|
||||||
select {
|
|
||||||
case <-taskTimeslot:
|
|
||||||
case <-time.After(maxTimeslotWait):
|
|
||||||
}
|
|
||||||
|
|
||||||
t.lock.Lock()
|
|
||||||
|
|
||||||
// check state, return if already executing or inactive
|
|
||||||
if t.executing || !t.isActive() {
|
|
||||||
t.lock.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
t.executing = true
|
|
||||||
|
|
||||||
// get list elements
|
|
||||||
queueElement := t.queueElement
|
|
||||||
prioritizedQueueElement := t.prioritizedQueueElement
|
|
||||||
scheduleListElement := t.scheduleListElement
|
|
||||||
|
|
||||||
// create context
|
|
||||||
var taskCtx context.Context
|
|
||||||
taskCtx, t.cancelFunc = context.WithCancel(t.module.Ctx)
|
|
||||||
|
|
||||||
t.lock.Unlock()
|
|
||||||
|
|
||||||
// remove from lists
|
// remove from lists
|
||||||
if queueElement != nil {
|
if t.queueElement != nil {
|
||||||
queuesLock.Lock()
|
queuesLock.Lock()
|
||||||
taskQueue.Remove(t.queueElement)
|
taskQueue.Remove(t.queueElement)
|
||||||
queuesLock.Unlock()
|
queuesLock.Unlock()
|
||||||
|
@ -243,7 +237,7 @@ func (t *Task) runWithLocking() {
|
||||||
t.queueElement = nil
|
t.queueElement = nil
|
||||||
t.lock.Unlock()
|
t.lock.Unlock()
|
||||||
}
|
}
|
||||||
if prioritizedQueueElement != nil {
|
if t.prioritizedQueueElement != nil {
|
||||||
queuesLock.Lock()
|
queuesLock.Lock()
|
||||||
prioritizedTaskQueue.Remove(t.prioritizedQueueElement)
|
prioritizedTaskQueue.Remove(t.prioritizedQueueElement)
|
||||||
queuesLock.Unlock()
|
queuesLock.Unlock()
|
||||||
|
@ -251,7 +245,7 @@ func (t *Task) runWithLocking() {
|
||||||
t.prioritizedQueueElement = nil
|
t.prioritizedQueueElement = nil
|
||||||
t.lock.Unlock()
|
t.lock.Unlock()
|
||||||
}
|
}
|
||||||
if scheduleListElement != nil {
|
if t.scheduleListElement != nil {
|
||||||
scheduleLock.Lock()
|
scheduleLock.Lock()
|
||||||
taskSchedule.Remove(t.scheduleListElement)
|
taskSchedule.Remove(t.scheduleListElement)
|
||||||
scheduleLock.Unlock()
|
scheduleLock.Unlock()
|
||||||
|
@ -259,14 +253,62 @@ func (t *Task) runWithLocking() {
|
||||||
t.scheduleListElement = nil
|
t.scheduleListElement = nil
|
||||||
t.lock.Unlock()
|
t.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Task) runWithLocking() {
|
||||||
|
t.lock.Lock()
|
||||||
|
|
||||||
|
// check if task is already executing
|
||||||
|
if t.executing {
|
||||||
|
t.lock.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if task is active
|
||||||
|
if !t.isActive() {
|
||||||
|
t.removeFromQueues()
|
||||||
|
t.lock.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if module was stopped
|
||||||
|
select {
|
||||||
|
case <-t.ctx.Done(): // check if module is stopped
|
||||||
|
t.removeFromQueues()
|
||||||
|
t.lock.Unlock()
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
t.executing = true
|
||||||
|
t.lock.Unlock()
|
||||||
|
|
||||||
|
// wait for good timeslot regarding microtasks
|
||||||
|
select {
|
||||||
|
case <-taskTimeslot:
|
||||||
|
case <-time.After(maxTimeslotWait):
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for module start
|
||||||
|
if !t.module.Online() {
|
||||||
|
if t.module.OnlineSoon() {
|
||||||
|
// wait
|
||||||
|
<-t.module.StartCompleted()
|
||||||
|
} else {
|
||||||
|
t.lock.Lock()
|
||||||
|
t.removeFromQueues()
|
||||||
|
t.lock.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// add to queue workgroup
|
// add to queue workgroup
|
||||||
queueWg.Add(1)
|
queueWg.Add(1)
|
||||||
|
|
||||||
go t.executeWithLocking(taskCtx, t.cancelFunc)
|
go t.executeWithLocking()
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
case <-taskCtx.Done():
|
case <-t.ctx.Done():
|
||||||
case <-time.After(maxExecutionWait):
|
case <-time.After(maxExecutionWait):
|
||||||
}
|
}
|
||||||
// complete queue worker (early) to allow next worker
|
// complete queue worker (early) to allow next worker
|
||||||
|
@ -274,7 +316,7 @@ func (t *Task) runWithLocking() {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) executeWithLocking(ctx context.Context, cancelFunc func()) {
|
func (t *Task) executeWithLocking() {
|
||||||
// start for module
|
// start for module
|
||||||
// hint: only queueWg global var is important for scheduling, others can be set here
|
// hint: only queueWg global var is important for scheduling, others can be set here
|
||||||
atomic.AddInt32(t.module.taskCnt, 1)
|
atomic.AddInt32(t.module.taskCnt, 1)
|
||||||
|
@ -306,11 +348,16 @@ func (t *Task) executeWithLocking(ctx context.Context, cancelFunc func()) {
|
||||||
t.lock.Unlock()
|
t.lock.Unlock()
|
||||||
|
|
||||||
// notify that we finished
|
// notify that we finished
|
||||||
cancelFunc()
|
if t.cancelCtx != nil {
|
||||||
|
t.cancelCtx()
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// run
|
// run
|
||||||
t.taskFn(ctx, t)
|
err := t.taskFn(t.ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("%s: task %s failed: %s", t.module.Name, t.name, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) getExecuteAtWithLocking() time.Time {
|
func (t *Task) getExecuteAtWithLocking() time.Time {
|
||||||
|
@ -320,6 +367,10 @@ func (t *Task) getExecuteAtWithLocking() time.Time {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) addToSchedule() {
|
func (t *Task) addToSchedule() {
|
||||||
|
if !t.isActive() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
scheduleLock.Lock()
|
scheduleLock.Lock()
|
||||||
defer scheduleLock.Unlock()
|
defer scheduleLock.Unlock()
|
||||||
// defer printTaskList(taskSchedule) // for debugging
|
// defer printTaskList(taskSchedule) // for debugging
|
||||||
|
@ -395,7 +446,7 @@ func taskQueueHandler() {
|
||||||
queueWg.Wait()
|
queueWg.Wait()
|
||||||
|
|
||||||
// check for shutdown
|
// check for shutdown
|
||||||
if shutdownSignalClosed.IsSet() {
|
if shutdownFlag.IsSet() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,22 +35,29 @@ func init() {
|
||||||
var qtWg sync.WaitGroup
|
var qtWg sync.WaitGroup
|
||||||
var qtOutputChannel chan string
|
var qtOutputChannel chan string
|
||||||
var qtSleepDuration time.Duration
|
var qtSleepDuration time.Duration
|
||||||
var qtModule = initNewModule("task test module", nil, nil, nil)
|
var qtModule *Module
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
qtModule = initNewModule("task test module", nil, nil, nil)
|
||||||
|
qtModule.status = StatusOnline
|
||||||
|
}
|
||||||
|
|
||||||
// functions
|
// functions
|
||||||
func queuedTaskTester(s string) {
|
func queuedTaskTester(s string) {
|
||||||
qtModule.NewTask(s, func(ctx context.Context, t *Task) {
|
qtModule.NewTask(s, func(ctx context.Context, t *Task) error {
|
||||||
time.Sleep(qtSleepDuration * 2)
|
time.Sleep(qtSleepDuration * 2)
|
||||||
qtOutputChannel <- s
|
qtOutputChannel <- s
|
||||||
qtWg.Done()
|
qtWg.Done()
|
||||||
|
return nil
|
||||||
}).Queue()
|
}).Queue()
|
||||||
}
|
}
|
||||||
|
|
||||||
func prioritizedTaskTester(s string) {
|
func prioritizedTaskTester(s string) {
|
||||||
qtModule.NewTask(s, func(ctx context.Context, t *Task) {
|
qtModule.NewTask(s, func(ctx context.Context, t *Task) error {
|
||||||
time.Sleep(qtSleepDuration * 2)
|
time.Sleep(qtSleepDuration * 2)
|
||||||
qtOutputChannel <- s
|
qtOutputChannel <- s
|
||||||
qtWg.Done()
|
qtWg.Done()
|
||||||
|
return nil
|
||||||
}).Prioritize()
|
}).Prioritize()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,10 +116,11 @@ var stWaitCh chan bool
|
||||||
|
|
||||||
// functions
|
// functions
|
||||||
func scheduledTaskTester(s string, sched time.Time) {
|
func scheduledTaskTester(s string, sched time.Time) {
|
||||||
qtModule.NewTask(s, func(ctx context.Context, t *Task) {
|
qtModule.NewTask(s, func(ctx context.Context, t *Task) error {
|
||||||
time.Sleep(stSleepDuration)
|
time.Sleep(stSleepDuration)
|
||||||
stOutputChannel <- s
|
stOutputChannel <- s
|
||||||
stWg.Done()
|
stWg.Done()
|
||||||
|
return nil
|
||||||
}).Schedule(sched)
|
}).Schedule(sched)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,7 +73,7 @@ func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn
|
||||||
lastFail := time.Now()
|
lastFail := time.Now()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if m.ShutdownInProgress() {
|
if m.IsStopping() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
40
portbase.go
40
portbase.go
|
@ -1,49 +1,19 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
|
||||||
"syscall"
|
|
||||||
|
|
||||||
"github.com/safing/portbase/info"
|
"github.com/safing/portbase/info"
|
||||||
"github.com/safing/portbase/log"
|
"github.com/safing/portbase/run"
|
||||||
"github.com/safing/portbase/modules"
|
|
||||||
// include packages here
|
// include packages here
|
||||||
|
_ "github.com/safing/portbase/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
// Set Info
|
// Set Info
|
||||||
info.Set("Portbase", "0.0.1", "GPLv3", false)
|
info.Set("Portbase", "0.0.1", "GPLv3", false)
|
||||||
|
|
||||||
// Start
|
// Run
|
||||||
err := modules.Start()
|
os.Exit(run.Run())
|
||||||
if err != nil {
|
|
||||||
if err == modules.ErrCleanExit {
|
|
||||||
os.Exit(0)
|
|
||||||
} else {
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Shutdown
|
|
||||||
// catch interrupt for clean shutdown
|
|
||||||
signalCh := make(chan os.Signal, 3)
|
|
||||||
signal.Notify(
|
|
||||||
signalCh,
|
|
||||||
os.Interrupt,
|
|
||||||
syscall.SIGHUP,
|
|
||||||
syscall.SIGINT,
|
|
||||||
syscall.SIGTERM,
|
|
||||||
syscall.SIGQUIT,
|
|
||||||
)
|
|
||||||
select {
|
|
||||||
case <-signalCh:
|
|
||||||
fmt.Println(" <INTERRUPT>")
|
|
||||||
log.Warning("main: program was interrupted, shutting down.")
|
|
||||||
_ = modules.Shutdown()
|
|
||||||
case <-modules.ShuttingDown():
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
116
template/module.go
Normal file
116
template/module.go
Normal file
|
@ -0,0 +1,116 @@
|
||||||
|
package template
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/config"
|
||||||
|
"github.com/safing/portbase/modules"
|
||||||
|
"github.com/safing/portbase/modules/subsystems"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
eventStateUpdate = "state update"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
module *modules.Module
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// register module
|
||||||
|
module = modules.Register("template", prep, start, stop) // add dependencies...
|
||||||
|
|
||||||
|
// register events that other modules can subscribe to
|
||||||
|
module.RegisterEvent(eventStateUpdate)
|
||||||
|
}
|
||||||
|
|
||||||
|
func prep() error {
|
||||||
|
// register module as subsystem
|
||||||
|
err := subsystems.Register(
|
||||||
|
"Template Subsystem", // name
|
||||||
|
"This subsystem is a template for quick setup", // description
|
||||||
|
module,
|
||||||
|
"config:template", // key space for configuration options registered
|
||||||
|
&config.Option{
|
||||||
|
Name: "Enable Template Subsystem",
|
||||||
|
Key: "config:subsystems/template",
|
||||||
|
Description: "This option enables the Template Subsystem [TEMPLATE]",
|
||||||
|
OptType: config.OptTypeBool,
|
||||||
|
DefaultValue: false,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// register options
|
||||||
|
err = config.Register(&config.Option{
|
||||||
|
Name: "language",
|
||||||
|
Key: "config:template/language",
|
||||||
|
Description: "Sets the language for the template [TEMPLATE]",
|
||||||
|
OptType: config.OptTypeString,
|
||||||
|
ExpertiseLevel: config.ExpertiseLevelUser, // default
|
||||||
|
ReleaseLevel: config.ReleaseLevelStable, // default
|
||||||
|
RequiresRestart: false, // default
|
||||||
|
DefaultValue: "en",
|
||||||
|
ValidationRegex: "^[a-z]{2}$",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// register event hooks
|
||||||
|
// do this in prep() and not in start(), as we don't want to register again if module is turned off and on again
|
||||||
|
err = module.RegisterEventHook(
|
||||||
|
"template", // event source module name
|
||||||
|
"state update", // event source name
|
||||||
|
"react to state changes", // description of hook function
|
||||||
|
eventHandler, // hook function
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// hint: event hooks and tasks will not be run if module isn't online
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func start() error {
|
||||||
|
// register tasks
|
||||||
|
module.NewTask("do something", taskFn).Queue()
|
||||||
|
|
||||||
|
// start service worker
|
||||||
|
module.StartServiceWorker("do something", 0, serviceWorker)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func stop() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func serviceWorker(ctx context.Context) error {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
err := do()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func taskFn(ctx context.Context, task *modules.Task) error {
|
||||||
|
return do()
|
||||||
|
}
|
||||||
|
|
||||||
|
func eventHandler(ctx context.Context, data interface{}) error {
|
||||||
|
return do()
|
||||||
|
}
|
||||||
|
|
||||||
|
func do() error {
|
||||||
|
return nil
|
||||||
|
}
|
42
template/module_test.go
Normal file
42
template/module_test.go
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
package template
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/dataroot"
|
||||||
|
"github.com/safing/portbase/modules"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
// tmp dir for data root (db & config)
|
||||||
|
tmpDir, err := ioutil.TempDir("", "portbase-testing-")
|
||||||
|
// initialize data dir
|
||||||
|
if err == nil {
|
||||||
|
err = dataroot.Initialize(tmpDir, 0755)
|
||||||
|
}
|
||||||
|
// start modules
|
||||||
|
if err == nil {
|
||||||
|
err = modules.Start()
|
||||||
|
}
|
||||||
|
// handle setup error
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "failed to setup test: %s", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// run tests
|
||||||
|
exitCode := m.Run()
|
||||||
|
|
||||||
|
// shutdown
|
||||||
|
_ = modules.Shutdown()
|
||||||
|
if modules.GetExitStatusCode() != 0 {
|
||||||
|
exitCode = modules.GetExitStatusCode()
|
||||||
|
fmt.Fprintf(os.Stderr, "failed to cleanly shutdown test: %s", err)
|
||||||
|
}
|
||||||
|
// clean up and exit
|
||||||
|
os.RemoveAll(tmpDir)
|
||||||
|
os.Exit(exitCode)
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue