Refactor subsystem package

This commit is contained in:
Patrick Pacher 2020-09-21 15:31:21 +02:00
parent 511cb22759
commit 1b55c73559
No known key found for this signature in database
GPG key ID: E8CD2DA160925A6D
8 changed files with 393 additions and 278 deletions

View file

@ -1,18 +1,28 @@
/*
Package modules provides a full module and task management ecosystem to cleanly put all big and small moving parts of a service together.
Modules are started in a multi-stage process and may depend on other modules:
- Go's init(): register flags
- prep: check flags, register config variables
- start: start actual work, access config
- stop: gracefully shut down
Workers: A simple function that is run by the module while catching panics and reporting them. Ideal for long running (possibly) idle goroutines. Can be automatically restarted if execution ends with an error.
Tasks: Functions that take somewhere between a couple seconds and a couple minutes to execute and should be queued, scheduled or repeated.
MicroTasks: Functions that take less than a second to execute, but require lots of resources. Running such functions as MicroTasks will reduce concurrent execution and shall improve performance.
Ideally, _any_ execution by a module is done through these methods. This will not only ensure that all panics are caught, but will also give better insights into how your service performs.
*/
// Package modules provides a full module and task management ecosystem to
// cleanly put all big and small moving parts of a service together.
//
// Modules are started in a multi-stage process and may depend on other
// modules:
// - Go's init(): register flags
// - prep: check flags, register config variables
// - start: start actual work, access config
// - stop: gracefully shut down
//
// **Workers**
// A simple function that is run by the module while catching
// panics and reporting them. Ideal for long running (possibly) idle goroutines.
// Can be automatically restarted if execution ends with an error.
//
// **Tasks**
// Functions that take somewhere between a couple seconds and a couple
// minutes to execute and should be queued, scheduled or repeated.
//
// **MicroTasks**
// Functions that take less than a second to execute, but require
// lots of resources. Running such functions as MicroTasks will reduce concurrent
// execution and shall improve performance.
//
// Ideally, _any_ execution by a module is done through these methods. This will
// not only ensure that all panics are caught, but will also give better insights
// into how your service performs.
package modules

View file

@ -12,17 +12,20 @@ var (
modulesChangeNotifyFn func(*Module)
)
// Enable enables the module. Only has an effect if module management is enabled.
// Enable enables the module. Only has an effect if module management
// is enabled.
func (m *Module) Enable() (changed bool) {
return m.enabled.SetToIf(false, true)
}
// Disable disables the module. Only has an effect if module management is enabled.
// Disable disables the module. Only has an effect if module management
// is enabled.
func (m *Module) Disable() (changed bool) {
return m.enabled.SetToIf(true, false)
}
// SetEnabled sets the module to the desired enabled state. Only has an effect if module management is enabled.
// SetEnabled sets the module to the desired enabled state. Only has
// an effect if module management is enabled.
func (m *Module) SetEnabled(enable bool) (changed bool) {
if enable {
return m.Enable()
@ -35,16 +38,36 @@ func (m *Module) Enabled() bool {
return m.enabled.IsSet()
}
// EnabledAsDependency returns whether or not the module is currently enabled as a dependency.
// EnabledAsDependency returns whether or not the module is currently
// enabled as a dependency.
func (m *Module) EnabledAsDependency() bool {
return m.enabledAsDependency.IsSet()
}
// EnableModuleManagement enables the module management functionality within modules. The supplied notify function will be called whenever the status of a module changes. The affected module will be in the parameter. You will need to manually enable modules, else nothing will start.
func EnableModuleManagement(changeNotifyFn func(*Module)) {
// EnableModuleManagement enables the module management functionality
// within modules. The supplied notify function will be called whenever
// the status of a module changes. The affected module will be in the
// parameter. You will need to manually enable modules, else nothing
// will start.
// EnableModuleManagement returns true if changeNotifyFn has been set
// and it has been called for the first time.
//
// Example:
//
// EnableModuleManagement(func(m *modules.Module) {
// // some module has changed ...
// // do what ever you like
//
// // Run the built-in module management
// modules.ManageModules()
// })
//
func EnableModuleManagement(changeNotifyFn func(*Module)) bool {
if moduleMgmtEnabled.SetToIf(false, true) {
modulesChangeNotifyFn = changeNotifyFn
return true
}
return false
}
func (m *Module) notifyOfChange() {
@ -56,7 +79,8 @@ func (m *Module) notifyOfChange() {
}
}
// ManageModules triggers the module manager to react to recent changes of enabled modules.
// ManageModules triggers the module manager to react to recent changes of
// enabled modules.
func ManageModules() error {
// check if enabled
if !moduleMgmtEnabled.IsSet() {

View file

@ -4,27 +4,32 @@ import (
"context"
"flag"
"fmt"
"strings"
"github.com/safing/portbase/config"
"github.com/safing/portbase/database"
_ "github.com/safing/portbase/database/dbmodule" // database module is required
"github.com/safing/portbase/modules"
"github.com/safing/portbase/runtime"
)
const (
configChangeEvent = "config change"
subsystemsStatusChange = "status change"
)
const configChangeEvent = "config change"
var (
// DefaultManager is the default subsystem registry.
DefaultManager *Manager
module *modules.Module
printGraphFlag bool
databaseKeySpace string
db = database.NewInterface(nil)
)
// Register registeres a new subsystem. It's like Manager.Register
// but uses DefaultManager and panics on error.
func Register(id, name, description string, module *modules.Module, configKeySpace string, option *config.Option) {
err := DefaultManager.Register(id, name, description, module, configKeySpace, option)
if err != nil {
panic(err)
}
}
func init() {
// The subsystem layer takes over module management. Note that
// no one must have called EnableModuleManagement. Otherwise
@ -32,20 +37,30 @@ func init() {
// dependencies!
// TODO(ppacher): we SHOULD panic here!
// TASK(#1431)
modules.EnableModuleManagement(handleModuleChanges)
module = modules.Register("subsystems", prep, start, nil, "config", "database", "base")
modules.EnableModuleManagement(func(m *modules.Module) {
if DefaultManager == nil {
return
}
DefaultManager.handleModuleUpdate(m)
})
module = modules.Register("subsystems", prep, start, nil, "config", "database", "runtime", "base")
module.Enable()
// register event for changes in the subsystem
module.RegisterEvent(subsystemsStatusChange)
// TODO(ppacher): can we create the default registry during prep phase?
var err error
DefaultManager, err = NewManager(runtime.DefaultRegistry)
if err != nil {
panic("Failed to create default registry: " + err.Error())
}
flag.BoolVar(&printGraphFlag, "print-subsystem-graph", false, "print the subsystem module dependency graph")
}
func prep() error {
if printGraphFlag {
printGraph()
DefaultManager.PrintGraph()
return modules.ErrCleanExit
}
@ -56,7 +71,18 @@ func prep() error {
"config",
configChangeEvent,
"control subsystems",
handleConfigChanges,
func(ctx context.Context, _ interface{}) error {
err := DefaultManager.CheckConfig(ctx)
if err != nil {
module.Error(
"modulemgmt-failed",
fmt.Sprintf("The subsystem framework failed to start or stop one or more modules.\nError: %s\nCheck logs for more information.", err),
)
return nil
}
module.Resolve("modulemgmt-failed")
return nil
},
); err != nil {
return fmt.Errorf("register event hook: %w", err)
}
@ -68,87 +94,37 @@ func start() error {
// Registration of subsystems is only allowed during
// preperation. Make sure any further call to Register()
// panics.
subsystemsLocked.Set()
subsystemsLock.Lock()
defer subsystemsLock.Unlock()
seen := make(map[string]struct{}, len(subsystems))
configKeyPrefixes := make(map[string]*Subsystem, len(subsystems))
// mark all sub-systems as seen. This prevents sub-systems
// from being added as a sub-systems dependency in addAndMarkDependencies.
for _, sub := range subsystems {
seen[sub.module.Name] = struct{}{}
configKeyPrefixes[sub.ConfigKeySpace] = sub
if err := DefaultManager.Start(); err != nil {
return err
}
// aggregate all modules dependencies (and the subsystem module itself)
// into the Modules slice. Configuration options form dependened modules
// will be marked using config.SubsystemAnnotation if not already set.
for _, sub := range subsystems {
sub.Modules = append(sub.Modules, statusFromModule(sub.module))
sub.addDependencies(sub.module, seen)
}
module.StartWorker("initial subsystem configuration", DefaultManager.CheckConfig)
// Annotate all configuration options with their respective subsystem.
config.ForEachOption(func(opt *config.Option) error {
subsys, ok := configKeyPrefixes[opt.Key]
if !ok {
return nil
}
// Add a new subsystem annotation is it is not already set!
opt.AddAnnotation(config.SubsystemAnnotation, subsys.ID)
return nil
})
// apply config
module.StartWorker("initial subsystem configuration", func(ctx context.Context) error {
return handleConfigChanges(module.Ctx, nil)
})
return nil
}
func (sub *Subsystem) addDependencies(module *modules.Module, seen map[string]struct{}) {
for _, module := range module.Dependencies() {
if _, ok := seen[module.Name]; !ok {
seen[module.Name] = struct{}{}
// PrintGraph prints the subsystem and module graph.
func (reg *Manager) PrintGraph() {
reg.l.RLock()
defer reg.l.RUnlock()
sub.Modules = append(sub.Modules, statusFromModule(module))
sub.addDependencies(module, seen)
}
}
}
// SetDatabaseKeySpace sets a key space where subsystem status
func SetDatabaseKeySpace(keySpace string) {
if databaseKeySpace == "" {
databaseKeySpace = keySpace
if !strings.HasSuffix(databaseKeySpace, "/") {
databaseKeySpace += "/"
}
}
}
func printGraph() {
fmt.Println("subsystems dependency graph:")
// unmark subsystems module
module.Disable()
// mark roots
for _, sub := range subsystems {
for _, sub := range reg.subsys {
sub.module.Enable() // mark as tree root
}
for _, sub := range subsystems {
for _, sub := range reg.subsys {
printModuleGraph("", sub.module, true)
}
fmt.Println("\nsubsystem module groups:")
_ = start() // no errors for what we need here
for _, sub := range subsystems {
for _, sub := range reg.subsys {
fmt.Printf("├── %s\n", sub.Name)
for _, mod := range sub.Modules[1:] {
fmt.Printf("│ ├── %s\n", mod.Name)

View file

@ -0,0 +1,267 @@
package subsystems
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/safing/portbase/config"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/modules"
"github.com/safing/portbase/runtime"
"github.com/tevino/abool"
)
var (
// ErrManagerStarted is returned when subsystem registration attempt
// occurs after the manager has been started.
ErrManagerStarted = errors.New("subsystem manager already started")
// ErrDuplicateSubsystem is returned when the subsystem to be registered
// is alreadey known (duplicated subsystem ID).
ErrDuplicateSubsystem = errors.New("subsystem is already registered")
)
// Manager manages subsystems, provides access via a runtime
// value providers and can takeover module management.
type Manager struct {
l sync.RWMutex
subsys map[string]*Subsystem
pushUpdate runtime.PushFunc
immutable *abool.AtomicBool
debounceUpdate *abool.AtomicBool
runtime *runtime.Registry
}
// NewManager returns a new subsystem manager that registeres
// itself at rtReg.
func NewManager(rtReg *runtime.Registry) (*Manager, error) {
mng := &Manager{
subsys: make(map[string]*Subsystem),
immutable: abool.New(),
debounceUpdate: abool.New(),
}
push, err := rtReg.Register("subsystems/", runtime.SimpleValueGetterFunc(mng.Get))
if err != nil {
return nil, err
}
mng.pushUpdate = push
mng.runtime = rtReg
return mng, nil
}
// Start starts managing subsystems. Note that it's not possible
// to define new subsystems once Start() has been called.
func (mng *Manager) Start() error {
mng.immutable.Set()
seen := make(map[string]struct{}, len(mng.subsys))
configKeyPrefixes := make(map[string]*Subsystem, len(mng.subsys))
// mark all sub-systems as seen. This prevents sub-systems
// from being added as a sub-systems dependency in addAndMarkDependencies.
for _, sub := range mng.subsys {
seen[sub.module.Name] = struct{}{}
configKeyPrefixes[sub.ConfigKeySpace] = sub
}
// aggmngate all modules dependencies (and the subsystem module itself)
// into the Modules slice. Configuration options form dependened modules
// will be marked using config.SubsystemAnnotation if not already set.
for _, sub := range mng.subsys {
sub.Modules = append(sub.Modules, statusFromModule(sub.module))
sub.addDependencies(sub.module, seen)
}
// Annotate all configuration options with their respective subsystem.
_ = config.ForEachOption(func(opt *config.Option) error {
subsys, ok := configKeyPrefixes[opt.Key]
if !ok {
return nil
}
// Add a new subsystem annotation is it is not already set!
opt.AddAnnotation(config.SubsystemAnnotation, subsys.ID)
return nil
})
return nil
}
// Get implements runtime.ValueProvider
func (mng *Manager) Get(keyOrPrefix string) ([]record.Record, error) {
mng.l.RLock()
defer mng.l.RUnlock()
dbName := mng.runtime.DatabaseName()
records := make([]record.Record, 0, len(mng.subsys))
for _, subsys := range mng.subsys {
subsys.Lock()
if !subsys.KeyIsSet() {
subsys.SetKey(dbName + ":subsystems/" + subsys.ID)
}
if strings.HasPrefix(subsys.DatabaseKey(), keyOrPrefix) {
records = append(records, subsys)
}
subsys.Unlock()
}
// make sure the order is always the same
sort.Sort(bySubsystemID(records))
return records, nil
}
// Register registeres a new subsystem. The given option must be a bool option.
// Should be called in init() directly after the modules.Register() function.
// The config option must not yet be registered and will be registered for
// you. Pass a nil option to force enable.
//
// TODO(ppacher): IMHO the subsystem package is not responsible of registering
// the "toggle option". This would also remove runtime
// dependency to the config package. Users should either pass
// the BoolOptionFunc and the expertise/release level directly
// or just pass the configuration key so those information can
// be looked up by the registry.
func (mng *Manager) Register(id, name, description string, module *modules.Module, configKeySpace string, option *config.Option) error {
mng.l.Lock()
defer mng.l.Unlock()
if mng.immutable.IsSet() {
return ErrManagerStarted
}
if _, ok := mng.subsys[id]; ok {
return ErrDuplicateSubsystem
}
s := &Subsystem{
ID: id,
Name: name,
Description: description,
ConfigKeySpace: configKeySpace,
module: module,
toggleOption: option,
}
if s.toggleOption != nil {
s.ToggleOptionKey = s.toggleOption.Key
s.ExpertiseLevel = s.toggleOption.ExpertiseLevel
s.ReleaseLevel = s.toggleOption.ReleaseLevel
if err := config.Register(s.toggleOption); err != nil {
return fmt.Errorf("failed to register subsystem option: %w", err)
}
s.toggleValue = config.GetAsBool(s.ToggleOptionKey, false)
} else {
s.toggleValue = func() bool { return true }
}
mng.subsys[id] = s
return nil
}
func (mng *Manager) shouldServeUpdates() bool {
if !mng.immutable.IsSet() {
// the manager must be marked as immutable before we
// are going to handle any module changes.
return false
}
if modules.IsShuttingDown() {
// we don't care if we are shutting down anyway
return false
}
return true
}
// CheckConfig checks subsystem configuration values and enables
// or disables subsystems and their dependencies as required.
func (mng *Manager) CheckConfig(ctx context.Context) error {
return mng.handleConfigChanges(ctx)
}
func (mng *Manager) handleModuleUpdate(m *modules.Module) {
if !mng.shouldServeUpdates() {
return
}
// Read lock is fine as the subsystems are write-locked on their own
mng.l.RLock()
defer mng.l.RUnlock()
subsys, ms := mng.findParentSubsystem(m)
if subsys == nil {
// the updated module is not handled by any
// subsystem. We're done here.
return
}
subsys.Lock()
updated := compareAndUpdateStatus(m, ms)
if updated {
subsys.makeSummary()
}
subsys.Unlock()
if updated {
mng.pushUpdate(subsys)
}
}
func (mng *Manager) handleConfigChanges(_ context.Context) error {
if !mng.shouldServeUpdates() {
return nil
}
if mng.debounceUpdate.SetToIf(false, true) {
time.Sleep(100 * time.Millisecond)
mng.debounceUpdate.UnSet()
} else {
return nil
}
mng.l.RLock()
defer mng.l.RUnlock()
var changed bool
for _, subsystem := range mng.subsys {
if subsystem.module.SetEnabled(subsystem.toggleValue()) {
changed = true
}
}
if !changed {
return nil
}
return modules.ManageModules()
}
func (mng *Manager) findParentSubsystem(m *modules.Module) (*Subsystem, *ModuleStatus) {
for _, subsys := range mng.subsys {
for _, ms := range subsys.Modules {
if ms.Name == m.Name {
return subsys, ms
}
}
}
return nil, nil
}
// helper type to sort a slice of []*Subsystem (casted as []record.Record) by
// id. Only use if it's guaranteed that all record.Records are *Subsystem.
// Otherwise Less() will panic.
type bySubsystemID []record.Record
func (sl bySubsystemID) Less(i, j int) bool { return sl[i].(*Subsystem).ID < sl[j].(*Subsystem).ID }
func (sl bySubsystemID) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
func (sl bySubsystemID) Len() int { return len(sl) }

View file

@ -5,7 +5,6 @@ import (
"github.com/safing/portbase/config"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/log"
"github.com/safing/portbase/modules"
)
@ -66,19 +65,14 @@ type ModuleStatus struct {
FailureMsg string
}
// Save saves the Subsystem Status to the database.
func (sub *Subsystem) Save() {
if databaseKeySpace == "" {
return
}
func (sub *Subsystem) addDependencies(module *modules.Module, seen map[string]struct{}) {
for _, module := range module.Dependencies() {
if _, ok := seen[module.Name]; !ok {
seen[module.Name] = struct{}{}
if !sub.KeyIsSet() {
sub.SetKey(databaseKeySpace + sub.ID)
}
err := db.Put(sub)
if err != nil {
log.Errorf("subsystems: could not save subsystem status to database: %s", err)
sub.Modules = append(sub.Modules, statusFromModule(module))
sub.addDependencies(module, seen)
}
}
}

View file

@ -1,158 +0,0 @@
package subsystems
import (
"context"
"fmt"
"sync"
"time"
"github.com/tevino/abool"
"github.com/safing/portbase/config"
"github.com/safing/portbase/modules"
)
var (
subsystemsLock sync.Mutex
subsystems []*Subsystem
subsystemsMap = make(map[string]*Subsystem)
subsystemsLocked = abool.New()
handlingConfigChanges = abool.New()
)
// Register registers a new subsystem. The given option must be a bool option.
// Should be called in init() directly after the modules.Register() function.
// The config option must not yet be registered and will be registered for
// you. Pass a nil option to force enable.
func Register(id, name, description string, module *modules.Module, configKeySpace string, option *config.Option) {
// lock slice and map
subsystemsLock.Lock()
defer subsystemsLock.Unlock()
// check if registration is closed
if subsystemsLocked.IsSet() {
panic("subsystems can only be registered in prep phase or earlier")
}
// check if already registered
if _, ok := subsystemsMap[name]; ok {
panic(fmt.Sprintf(`subsystem "%s" already registered`, name))
}
// create new
new := &Subsystem{
ID: id,
Name: name,
Description: description,
module: module,
toggleOption: option,
ConfigKeySpace: configKeySpace,
}
if new.toggleOption != nil {
new.ToggleOptionKey = new.toggleOption.Key
new.ExpertiseLevel = new.toggleOption.ExpertiseLevel
new.ReleaseLevel = new.toggleOption.ReleaseLevel
}
// register config
if option != nil {
err := config.Register(option)
if err != nil {
panic(fmt.Sprintf("failed to register config: %s", err))
}
new.toggleValue = config.GetAsBool(new.ToggleOptionKey, false)
} else {
// force enabled
new.toggleValue = func() bool { return true }
}
subsystemsMap[name] = new
subsystems = append(subsystems, new)
}
func handleModuleChanges(m *modules.Module) {
// check if ready
if !subsystemsLocked.IsSet() {
return
}
// check if shutting down
if modules.IsShuttingDown() {
return
}
// find module status
var moduleSubsystem *Subsystem
var moduleStatus *ModuleStatus
subsystemLoop:
for _, subsystem := range subsystems {
for _, status := range subsystem.Modules {
if m.Name == status.Name {
moduleSubsystem = subsystem
moduleStatus = status
break subsystemLoop
}
}
}
// abort if not found
if moduleSubsystem == nil || moduleStatus == nil {
return
}
// update status
moduleSubsystem.Lock()
changed := compareAndUpdateStatus(m, moduleStatus)
if changed {
moduleSubsystem.makeSummary()
}
moduleSubsystem.Unlock()
// save
if changed {
moduleSubsystem.Save()
}
}
func handleConfigChanges(ctx context.Context, _ interface{}) error {
// bail out early if we haven't started yet or are already
// shutting down
if !subsystemsLocked.IsSet() || modules.IsShuttingDown() {
return nil
}
// potentially catch multiple changes
if handlingConfigChanges.SetToIf(false, true) {
time.Sleep(100 * time.Millisecond)
handlingConfigChanges.UnSet()
} else {
return nil
}
// only run one instance at any time
subsystemsLock.Lock()
defer subsystemsLock.Unlock()
var changed bool
for _, subsystem := range subsystems {
if subsystem.module.SetEnabled(subsystem.toggleValue()) {
// if changed
changed = true
}
}
if !changed {
return nil
}
err := modules.ManageModules()
if err != nil {
module.Error(
"modulemgmt-failed",
fmt.Sprintf("The subsystem framework failed to start or stop one or more modules.\nError: %s\nCheck logs for more information.", err),
)
} else {
module.Resolve("modulemgmt-failed")
}
return nil
}

View file

@ -50,7 +50,7 @@ func TestSubsystems(t *testing.T) {
DefaultValue: false,
},
)
sub1 := subsystemsMap["Feature One"]
sub1 := DefaultManager.subsys["feature-one"]
feature2 := modules.Register("feature2", nil, nil, nil)
Register(

View file

@ -215,15 +215,17 @@ func (r *Registry) Query(q *query.Query, local, internal bool) (*iterator.Iterat
for _, r := range records {
r.Lock()
if q.MatchesKey(r.DatabaseKey()) ||
!r.Meta().CheckValidity() ||
!r.Meta().CheckPermission(local, internal) ||
!q.MatchesRecord(r) {
var (
matchesKey = q.MatchesKey(r.DatabaseKey())
isValid = r.Meta().CheckValidity()
isAllowed = r.Meta().CheckPermission(local, internal)
matchesRecord = q.MatchesRecord(r)
)
r.Unlock()
r.Unlock()
if !(matchesKey && isValid && isAllowed && matchesRecord) {
continue
}
r.Unlock()
select {
case iter.Next <- r: