From 50a10485e1299d16f15094ee70574f4d6d8f302c Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Mon, 21 Sep 2020 15:18:54 +0200 Subject: [PATCH 1/4] Fix race condition in database package. Simplify db locking --- database/controller.go | 62 ++++++++++++++++++---------------------- database/hook.go | 12 +++----- database/subscription.go | 6 ++-- 3 files changed, 34 insertions(+), 46 deletions(-) diff --git a/database/controller.go b/database/controller.go index 4f7ae3e..6b97239 100644 --- a/database/controller.go +++ b/database/controller.go @@ -21,12 +21,7 @@ type Controller struct { hooks []*RegisteredHook subscriptions []*Subscription - writeLock sync.RWMutex - // Lock: nobody may write - // RLock: concurrent writing - readLock sync.RWMutex - // Lock: nobody may read - // RLock: concurrent reading + exclusiveAccess sync.RWMutex migrating *abool.AtomicBool // TODO hibernating *abool.AtomicBool // TODO @@ -53,8 +48,8 @@ func (c *Controller) Injected() bool { // Get return the record with the given key. func (c *Controller) Get(key string) (record.Record, error) { - c.readLock.RLock() - defer c.readLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { return nil, ErrShuttingDown @@ -101,8 +96,8 @@ func (c *Controller) Get(key string) (record.Record, error) { // Put saves a record in the database. func (c *Controller) Put(r record.Record) (err error) { - c.writeLock.RLock() - defer c.writeLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { return ErrShuttingDown @@ -145,8 +140,8 @@ func (c *Controller) Put(r record.Record) (err error) { // PutMany stores many records in the database. func (c *Controller) PutMany() (chan<- record.Record, <-chan error) { - c.writeLock.RLock() - defer c.writeLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { errs := make(chan error, 1) @@ -171,16 +166,16 @@ func (c *Controller) PutMany() (chan<- record.Record, <-chan error) { // Query executes the given query on the database. func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { - c.readLock.RLock() + c.exclusiveAccess.RLock() if shuttingDown.IsSet() { - c.readLock.RUnlock() + c.exclusiveAccess.RUnlock() return nil, ErrShuttingDown } it, err := c.storage.Query(q, local, internal) if err != nil { - c.readLock.RUnlock() + c.exclusiveAccess.RUnlock() return nil, err } @@ -191,15 +186,19 @@ func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iter // PushUpdate pushes a record update to subscribers. func (c *Controller) PushUpdate(r record.Record) { if c != nil { - c.readLock.RLock() - defer c.readLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { return } for _, sub := range c.subscriptions { - if r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r) { + r.Lock() + push := r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r) + r.Unlock() + + if push { select { case sub.Feed <- r: default: @@ -210,10 +209,8 @@ func (c *Controller) PushUpdate(r record.Record) { } func (c *Controller) addSubscription(sub *Subscription) { - c.readLock.Lock() - defer c.readLock.Unlock() - c.writeLock.Lock() - defer c.writeLock.Unlock() + c.exclusiveAccess.Lock() + defer c.exclusiveAccess.Unlock() if shuttingDown.IsSet() { return @@ -223,14 +220,14 @@ func (c *Controller) addSubscription(sub *Subscription) { } func (c *Controller) readUnlockerAfterQuery(it *iterator.Iterator) { + defer c.exclusiveAccess.RUnlock() <-it.Done - c.readLock.RUnlock() } // Maintain runs the Maintain method on the storage. func (c *Controller) Maintain(ctx context.Context) error { - c.writeLock.RLock() - defer c.writeLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { return nil @@ -241,8 +238,8 @@ func (c *Controller) Maintain(ctx context.Context) error { // MaintainThorough runs the MaintainThorough method on the storage. func (c *Controller) MaintainThorough(ctx context.Context) error { - c.writeLock.RLock() - defer c.writeLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { return nil @@ -253,8 +250,8 @@ func (c *Controller) MaintainThorough(ctx context.Context) error { // MaintainRecordStates runs the record state lifecycle maintenance on the storage. func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { - c.writeLock.RLock() - defer c.writeLock.RUnlock() + c.exclusiveAccess.RLock() + defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { return nil @@ -265,11 +262,8 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor // Shutdown shuts down the storage. func (c *Controller) Shutdown() error { - // acquire full locks - c.readLock.Lock() - defer c.readLock.Unlock() - c.writeLock.Lock() - defer c.writeLock.Unlock() + c.exclusiveAccess.Lock() + defer c.exclusiveAccess.Unlock() return c.storage.Shutdown() } diff --git a/database/hook.go b/database/hook.go index e6649b2..43a1eaf 100644 --- a/database/hook.go +++ b/database/hook.go @@ -35,10 +35,8 @@ func RegisterHook(q *query.Query, hook Hook) (*RegisteredHook, error) { return nil, err } - c.readLock.Lock() - defer c.readLock.Unlock() - c.writeLock.Lock() - defer c.writeLock.Unlock() + c.exclusiveAccess.Lock() + defer c.exclusiveAccess.Unlock() rh := &RegisteredHook{ q: q, @@ -55,10 +53,8 @@ func (h *RegisteredHook) Cancel() error { return err } - c.readLock.Lock() - defer c.readLock.Unlock() - c.writeLock.Lock() - defer c.writeLock.Unlock() + c.exclusiveAccess.Lock() + defer c.exclusiveAccess.Unlock() for key, hook := range c.hooks { if hook.q == h.q { diff --git a/database/subscription.go b/database/subscription.go index ffe516a..923b7f8 100644 --- a/database/subscription.go +++ b/database/subscription.go @@ -22,10 +22,8 @@ func (s *Subscription) Cancel() error { return err } - c.readLock.Lock() - defer c.readLock.Unlock() - c.writeLock.Lock() - defer c.writeLock.Unlock() + c.exclusiveAccess.Lock() + defer c.exclusiveAccess.Unlock() if s.canceled { return nil From 511cb227595e17214ffcad6e35a8008e69542798 Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Mon, 21 Sep 2020 15:19:31 +0200 Subject: [PATCH 2/4] Fix incorrect type of SimpleValue(S)(G)etter --- runtime/provider.go | 12 ++++++++---- runtime/registry.go | 12 ++++++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/runtime/provider.go b/runtime/provider.go index 94cdbd5..806ac84 100644 --- a/runtime/provider.go +++ b/runtime/provider.go @@ -38,7 +38,7 @@ type ( // SimpleValueSetterFunc is a convenience type for implementing a // write-only value provider. - SimpleValueSetterFunc func(record.Record) error + SimpleValueSetterFunc func(record.Record) (record.Record, error) // SimpleValueGetterFunc is a convenience type for implementing a // read-only value provider. @@ -46,7 +46,7 @@ type ( ) // Set implements ValueProvider.Set and calls fn. -func (fn SimpleValueSetterFunc) Set(r record.Record) error { +func (fn SimpleValueSetterFunc) Set(r record.Record) (record.Record, error) { return fn(r) } @@ -56,11 +56,15 @@ func (SimpleValueSetterFunc) Get(_ string) ([]record.Record, error) { } // Set implements ValueProvider.Set and returns ErrReadOnly. -func (SimpleValueGetterFunc) Set(r record.Record) error { - return ErrReadOnly +func (SimpleValueGetterFunc) Set(r record.Record) (record.Record, error) { + return nil, ErrReadOnly } // Get implements ValueProvider.Get and calls fn. func (fn SimpleValueGetterFunc) Get(keyOrPrefix string) ([]record.Record, error) { return fn(keyOrPrefix) } + +// compile time checks +var _ ValueProvider = SimpleValueGetterFunc(nil) +var _ ValueProvider = SimpleValueSetterFunc(nil) diff --git a/runtime/registry.go b/runtime/registry.go index 445185a..1afc7fe 100644 --- a/runtime/registry.go +++ b/runtime/registry.go @@ -40,6 +40,7 @@ type Registry struct { l sync.RWMutex providers *radix.Tree dbController *database.Controller + dbName string } // keyedValueProvider simply wraps a value provider with it's @@ -60,6 +61,16 @@ func isPrefixKey(key string) bool { return strings.HasSuffix(key, "/") } +// DatabaseName returns the name of the database where the +// registry has been injected. It returns an empty string +// if InjectAsDatabase has not been called. +func (r *Registry) DatabaseName() string { + r.l.RLock() + defer r.l.RUnlock() + + return r.dbName +} + // InjectAsDatabase injects the registry as the storage // database for name. func (r *Registry) InjectAsDatabase(name string) error { @@ -75,6 +86,7 @@ func (r *Registry) InjectAsDatabase(name string) error { return err } + r.dbName = name r.dbController = ctrl return nil From 1b55c735591ba971ceea682efe8260208b5faee0 Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Mon, 21 Sep 2020 15:31:21 +0200 Subject: [PATCH 3/4] Refactor subsystem package --- modules/doc.go | 44 +++-- modules/mgmt.go | 38 +++- modules/subsystems/module.go | 128 +++++------- modules/subsystems/registry.go | 267 ++++++++++++++++++++++++++ modules/subsystems/subsystem.go | 20 +- modules/subsystems/subsystems.go | 158 --------------- modules/subsystems/subsystems_test.go | 2 +- runtime/registry.go | 14 +- 8 files changed, 393 insertions(+), 278 deletions(-) create mode 100644 modules/subsystems/registry.go delete mode 100644 modules/subsystems/subsystems.go diff --git a/modules/doc.go b/modules/doc.go index 238ab35..ddd3c56 100644 --- a/modules/doc.go +++ b/modules/doc.go @@ -1,18 +1,28 @@ -/* -Package modules provides a full module and task management ecosystem to cleanly put all big and small moving parts of a service together. - -Modules are started in a multi-stage process and may depend on other modules: -- Go's init(): register flags -- prep: check flags, register config variables -- start: start actual work, access config -- stop: gracefully shut down - -Workers: A simple function that is run by the module while catching panics and reporting them. Ideal for long running (possibly) idle goroutines. Can be automatically restarted if execution ends with an error. - -Tasks: Functions that take somewhere between a couple seconds and a couple minutes to execute and should be queued, scheduled or repeated. - -MicroTasks: Functions that take less than a second to execute, but require lots of resources. Running such functions as MicroTasks will reduce concurrent execution and shall improve performance. - -Ideally, _any_ execution by a module is done through these methods. This will not only ensure that all panics are caught, but will also give better insights into how your service performs. -*/ +// Package modules provides a full module and task management ecosystem to +// cleanly put all big and small moving parts of a service together. +// +// Modules are started in a multi-stage process and may depend on other +// modules: +// - Go's init(): register flags +// - prep: check flags, register config variables +// - start: start actual work, access config +// - stop: gracefully shut down +// +// **Workers** +// A simple function that is run by the module while catching +// panics and reporting them. Ideal for long running (possibly) idle goroutines. +// Can be automatically restarted if execution ends with an error. +// +// **Tasks** +// Functions that take somewhere between a couple seconds and a couple +// minutes to execute and should be queued, scheduled or repeated. +// +// **MicroTasks** +// Functions that take less than a second to execute, but require +// lots of resources. Running such functions as MicroTasks will reduce concurrent +// execution and shall improve performance. +// +// Ideally, _any_ execution by a module is done through these methods. This will +// not only ensure that all panics are caught, but will also give better insights +// into how your service performs. package modules diff --git a/modules/mgmt.go b/modules/mgmt.go index 4081741..42a255f 100644 --- a/modules/mgmt.go +++ b/modules/mgmt.go @@ -12,17 +12,20 @@ var ( modulesChangeNotifyFn func(*Module) ) -// Enable enables the module. Only has an effect if module management is enabled. +// Enable enables the module. Only has an effect if module management +// is enabled. func (m *Module) Enable() (changed bool) { return m.enabled.SetToIf(false, true) } -// Disable disables the module. Only has an effect if module management is enabled. +// Disable disables the module. Only has an effect if module management +// is enabled. func (m *Module) Disable() (changed bool) { return m.enabled.SetToIf(true, false) } -// SetEnabled sets the module to the desired enabled state. Only has an effect if module management is enabled. +// SetEnabled sets the module to the desired enabled state. Only has +// an effect if module management is enabled. func (m *Module) SetEnabled(enable bool) (changed bool) { if enable { return m.Enable() @@ -35,16 +38,36 @@ func (m *Module) Enabled() bool { return m.enabled.IsSet() } -// EnabledAsDependency returns whether or not the module is currently enabled as a dependency. +// EnabledAsDependency returns whether or not the module is currently +// enabled as a dependency. func (m *Module) EnabledAsDependency() bool { return m.enabledAsDependency.IsSet() } -// EnableModuleManagement enables the module management functionality within modules. The supplied notify function will be called whenever the status of a module changes. The affected module will be in the parameter. You will need to manually enable modules, else nothing will start. -func EnableModuleManagement(changeNotifyFn func(*Module)) { +// EnableModuleManagement enables the module management functionality +// within modules. The supplied notify function will be called whenever +// the status of a module changes. The affected module will be in the +// parameter. You will need to manually enable modules, else nothing +// will start. +// EnableModuleManagement returns true if changeNotifyFn has been set +// and it has been called for the first time. +// +// Example: +// +// EnableModuleManagement(func(m *modules.Module) { +// // some module has changed ... +// // do what ever you like +// +// // Run the built-in module management +// modules.ManageModules() +// }) +// +func EnableModuleManagement(changeNotifyFn func(*Module)) bool { if moduleMgmtEnabled.SetToIf(false, true) { modulesChangeNotifyFn = changeNotifyFn + return true } + return false } func (m *Module) notifyOfChange() { @@ -56,7 +79,8 @@ func (m *Module) notifyOfChange() { } } -// ManageModules triggers the module manager to react to recent changes of enabled modules. +// ManageModules triggers the module manager to react to recent changes of +// enabled modules. func ManageModules() error { // check if enabled if !moduleMgmtEnabled.IsSet() { diff --git a/modules/subsystems/module.go b/modules/subsystems/module.go index 2ce233f..406ed53 100644 --- a/modules/subsystems/module.go +++ b/modules/subsystems/module.go @@ -4,27 +4,32 @@ import ( "context" "flag" "fmt" - "strings" "github.com/safing/portbase/config" - "github.com/safing/portbase/database" _ "github.com/safing/portbase/database/dbmodule" // database module is required "github.com/safing/portbase/modules" + "github.com/safing/portbase/runtime" ) -const ( - configChangeEvent = "config change" - subsystemsStatusChange = "status change" -) +const configChangeEvent = "config change" var ( + // DefaultManager is the default subsystem registry. + DefaultManager *Manager + module *modules.Module printGraphFlag bool - - databaseKeySpace string - db = database.NewInterface(nil) ) +// Register registeres a new subsystem. It's like Manager.Register +// but uses DefaultManager and panics on error. +func Register(id, name, description string, module *modules.Module, configKeySpace string, option *config.Option) { + err := DefaultManager.Register(id, name, description, module, configKeySpace, option) + if err != nil { + panic(err) + } +} + func init() { // The subsystem layer takes over module management. Note that // no one must have called EnableModuleManagement. Otherwise @@ -32,20 +37,30 @@ func init() { // dependencies! // TODO(ppacher): we SHOULD panic here! // TASK(#1431) - modules.EnableModuleManagement(handleModuleChanges) - module = modules.Register("subsystems", prep, start, nil, "config", "database", "base") + modules.EnableModuleManagement(func(m *modules.Module) { + if DefaultManager == nil { + return + } + DefaultManager.handleModuleUpdate(m) + }) + + module = modules.Register("subsystems", prep, start, nil, "config", "database", "runtime", "base") module.Enable() - // register event for changes in the subsystem - module.RegisterEvent(subsystemsStatusChange) + // TODO(ppacher): can we create the default registry during prep phase? + var err error + DefaultManager, err = NewManager(runtime.DefaultRegistry) + if err != nil { + panic("Failed to create default registry: " + err.Error()) + } flag.BoolVar(&printGraphFlag, "print-subsystem-graph", false, "print the subsystem module dependency graph") } func prep() error { if printGraphFlag { - printGraph() + DefaultManager.PrintGraph() return modules.ErrCleanExit } @@ -56,7 +71,18 @@ func prep() error { "config", configChangeEvent, "control subsystems", - handleConfigChanges, + func(ctx context.Context, _ interface{}) error { + err := DefaultManager.CheckConfig(ctx) + if err != nil { + module.Error( + "modulemgmt-failed", + fmt.Sprintf("The subsystem framework failed to start or stop one or more modules.\nError: %s\nCheck logs for more information.", err), + ) + return nil + } + module.Resolve("modulemgmt-failed") + return nil + }, ); err != nil { return fmt.Errorf("register event hook: %w", err) } @@ -68,87 +94,37 @@ func start() error { // Registration of subsystems is only allowed during // preperation. Make sure any further call to Register() // panics. - subsystemsLocked.Set() - - subsystemsLock.Lock() - defer subsystemsLock.Unlock() - - seen := make(map[string]struct{}, len(subsystems)) - configKeyPrefixes := make(map[string]*Subsystem, len(subsystems)) - // mark all sub-systems as seen. This prevents sub-systems - // from being added as a sub-systems dependency in addAndMarkDependencies. - for _, sub := range subsystems { - seen[sub.module.Name] = struct{}{} - configKeyPrefixes[sub.ConfigKeySpace] = sub + if err := DefaultManager.Start(); err != nil { + return err } - // aggregate all modules dependencies (and the subsystem module itself) - // into the Modules slice. Configuration options form dependened modules - // will be marked using config.SubsystemAnnotation if not already set. - for _, sub := range subsystems { - sub.Modules = append(sub.Modules, statusFromModule(sub.module)) - sub.addDependencies(sub.module, seen) - } + module.StartWorker("initial subsystem configuration", DefaultManager.CheckConfig) - // Annotate all configuration options with their respective subsystem. - config.ForEachOption(func(opt *config.Option) error { - subsys, ok := configKeyPrefixes[opt.Key] - if !ok { - return nil - } - - // Add a new subsystem annotation is it is not already set! - opt.AddAnnotation(config.SubsystemAnnotation, subsys.ID) - - return nil - }) - - // apply config - module.StartWorker("initial subsystem configuration", func(ctx context.Context) error { - return handleConfigChanges(module.Ctx, nil) - }) return nil } -func (sub *Subsystem) addDependencies(module *modules.Module, seen map[string]struct{}) { - for _, module := range module.Dependencies() { - if _, ok := seen[module.Name]; !ok { - seen[module.Name] = struct{}{} +// PrintGraph prints the subsystem and module graph. +func (reg *Manager) PrintGraph() { + reg.l.RLock() + defer reg.l.RUnlock() - sub.Modules = append(sub.Modules, statusFromModule(module)) - sub.addDependencies(module, seen) - } - } -} - -// SetDatabaseKeySpace sets a key space where subsystem status -func SetDatabaseKeySpace(keySpace string) { - if databaseKeySpace == "" { - databaseKeySpace = keySpace - - if !strings.HasSuffix(databaseKeySpace, "/") { - databaseKeySpace += "/" - } - } -} - -func printGraph() { fmt.Println("subsystems dependency graph:") // unmark subsystems module module.Disable() + // mark roots - for _, sub := range subsystems { + for _, sub := range reg.subsys { sub.module.Enable() // mark as tree root } - for _, sub := range subsystems { + for _, sub := range reg.subsys { printModuleGraph("", sub.module, true) } fmt.Println("\nsubsystem module groups:") _ = start() // no errors for what we need here - for _, sub := range subsystems { + for _, sub := range reg.subsys { fmt.Printf("├── %s\n", sub.Name) for _, mod := range sub.Modules[1:] { fmt.Printf("│ ├── %s\n", mod.Name) diff --git a/modules/subsystems/registry.go b/modules/subsystems/registry.go new file mode 100644 index 0000000..bc0c7cb --- /dev/null +++ b/modules/subsystems/registry.go @@ -0,0 +1,267 @@ +package subsystems + +import ( + "context" + "errors" + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/safing/portbase/config" + "github.com/safing/portbase/database/record" + "github.com/safing/portbase/modules" + "github.com/safing/portbase/runtime" + "github.com/tevino/abool" +) + +var ( + // ErrManagerStarted is returned when subsystem registration attempt + // occurs after the manager has been started. + ErrManagerStarted = errors.New("subsystem manager already started") + // ErrDuplicateSubsystem is returned when the subsystem to be registered + // is alreadey known (duplicated subsystem ID). + ErrDuplicateSubsystem = errors.New("subsystem is already registered") +) + +// Manager manages subsystems, provides access via a runtime +// value providers and can takeover module management. +type Manager struct { + l sync.RWMutex + subsys map[string]*Subsystem + pushUpdate runtime.PushFunc + immutable *abool.AtomicBool + debounceUpdate *abool.AtomicBool + runtime *runtime.Registry +} + +// NewManager returns a new subsystem manager that registeres +// itself at rtReg. +func NewManager(rtReg *runtime.Registry) (*Manager, error) { + mng := &Manager{ + subsys: make(map[string]*Subsystem), + immutable: abool.New(), + debounceUpdate: abool.New(), + } + + push, err := rtReg.Register("subsystems/", runtime.SimpleValueGetterFunc(mng.Get)) + if err != nil { + return nil, err + } + + mng.pushUpdate = push + mng.runtime = rtReg + + return mng, nil +} + +// Start starts managing subsystems. Note that it's not possible +// to define new subsystems once Start() has been called. +func (mng *Manager) Start() error { + mng.immutable.Set() + + seen := make(map[string]struct{}, len(mng.subsys)) + configKeyPrefixes := make(map[string]*Subsystem, len(mng.subsys)) + // mark all sub-systems as seen. This prevents sub-systems + // from being added as a sub-systems dependency in addAndMarkDependencies. + for _, sub := range mng.subsys { + seen[sub.module.Name] = struct{}{} + configKeyPrefixes[sub.ConfigKeySpace] = sub + } + + // aggmngate all modules dependencies (and the subsystem module itself) + // into the Modules slice. Configuration options form dependened modules + // will be marked using config.SubsystemAnnotation if not already set. + for _, sub := range mng.subsys { + sub.Modules = append(sub.Modules, statusFromModule(sub.module)) + sub.addDependencies(sub.module, seen) + } + + // Annotate all configuration options with their respective subsystem. + _ = config.ForEachOption(func(opt *config.Option) error { + subsys, ok := configKeyPrefixes[opt.Key] + if !ok { + return nil + } + + // Add a new subsystem annotation is it is not already set! + opt.AddAnnotation(config.SubsystemAnnotation, subsys.ID) + + return nil + }) + + return nil +} + +// Get implements runtime.ValueProvider +func (mng *Manager) Get(keyOrPrefix string) ([]record.Record, error) { + mng.l.RLock() + defer mng.l.RUnlock() + + dbName := mng.runtime.DatabaseName() + + records := make([]record.Record, 0, len(mng.subsys)) + for _, subsys := range mng.subsys { + subsys.Lock() + if !subsys.KeyIsSet() { + subsys.SetKey(dbName + ":subsystems/" + subsys.ID) + } + + if strings.HasPrefix(subsys.DatabaseKey(), keyOrPrefix) { + records = append(records, subsys) + } + subsys.Unlock() + } + + // make sure the order is always the same + sort.Sort(bySubsystemID(records)) + + return records, nil +} + +// Register registeres a new subsystem. The given option must be a bool option. +// Should be called in init() directly after the modules.Register() function. +// The config option must not yet be registered and will be registered for +// you. Pass a nil option to force enable. +// +// TODO(ppacher): IMHO the subsystem package is not responsible of registering +// the "toggle option". This would also remove runtime +// dependency to the config package. Users should either pass +// the BoolOptionFunc and the expertise/release level directly +// or just pass the configuration key so those information can +// be looked up by the registry. +func (mng *Manager) Register(id, name, description string, module *modules.Module, configKeySpace string, option *config.Option) error { + mng.l.Lock() + defer mng.l.Unlock() + + if mng.immutable.IsSet() { + return ErrManagerStarted + } + + if _, ok := mng.subsys[id]; ok { + return ErrDuplicateSubsystem + } + + s := &Subsystem{ + ID: id, + Name: name, + Description: description, + ConfigKeySpace: configKeySpace, + module: module, + toggleOption: option, + } + + if s.toggleOption != nil { + s.ToggleOptionKey = s.toggleOption.Key + s.ExpertiseLevel = s.toggleOption.ExpertiseLevel + s.ReleaseLevel = s.toggleOption.ReleaseLevel + + if err := config.Register(s.toggleOption); err != nil { + return fmt.Errorf("failed to register subsystem option: %w", err) + } + + s.toggleValue = config.GetAsBool(s.ToggleOptionKey, false) + } else { + s.toggleValue = func() bool { return true } + } + + mng.subsys[id] = s + + return nil +} + +func (mng *Manager) shouldServeUpdates() bool { + if !mng.immutable.IsSet() { + // the manager must be marked as immutable before we + // are going to handle any module changes. + return false + } + if modules.IsShuttingDown() { + // we don't care if we are shutting down anyway + return false + } + return true +} + +// CheckConfig checks subsystem configuration values and enables +// or disables subsystems and their dependencies as required. +func (mng *Manager) CheckConfig(ctx context.Context) error { + return mng.handleConfigChanges(ctx) +} + +func (mng *Manager) handleModuleUpdate(m *modules.Module) { + if !mng.shouldServeUpdates() { + return + } + + // Read lock is fine as the subsystems are write-locked on their own + mng.l.RLock() + defer mng.l.RUnlock() + + subsys, ms := mng.findParentSubsystem(m) + if subsys == nil { + // the updated module is not handled by any + // subsystem. We're done here. + return + } + + subsys.Lock() + updated := compareAndUpdateStatus(m, ms) + if updated { + subsys.makeSummary() + } + subsys.Unlock() + + if updated { + mng.pushUpdate(subsys) + } +} + +func (mng *Manager) handleConfigChanges(_ context.Context) error { + if !mng.shouldServeUpdates() { + return nil + } + + if mng.debounceUpdate.SetToIf(false, true) { + time.Sleep(100 * time.Millisecond) + mng.debounceUpdate.UnSet() + } else { + return nil + } + + mng.l.RLock() + defer mng.l.RUnlock() + + var changed bool + for _, subsystem := range mng.subsys { + if subsystem.module.SetEnabled(subsystem.toggleValue()) { + changed = true + } + } + if !changed { + return nil + } + + return modules.ManageModules() +} + +func (mng *Manager) findParentSubsystem(m *modules.Module) (*Subsystem, *ModuleStatus) { + for _, subsys := range mng.subsys { + for _, ms := range subsys.Modules { + if ms.Name == m.Name { + return subsys, ms + } + } + } + return nil, nil +} + +// helper type to sort a slice of []*Subsystem (casted as []record.Record) by +// id. Only use if it's guaranteed that all record.Records are *Subsystem. +// Otherwise Less() will panic. +type bySubsystemID []record.Record + +func (sl bySubsystemID) Less(i, j int) bool { return sl[i].(*Subsystem).ID < sl[j].(*Subsystem).ID } +func (sl bySubsystemID) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] } +func (sl bySubsystemID) Len() int { return len(sl) } diff --git a/modules/subsystems/subsystem.go b/modules/subsystems/subsystem.go index 5178c71..f317c6b 100644 --- a/modules/subsystems/subsystem.go +++ b/modules/subsystems/subsystem.go @@ -5,7 +5,6 @@ import ( "github.com/safing/portbase/config" "github.com/safing/portbase/database/record" - "github.com/safing/portbase/log" "github.com/safing/portbase/modules" ) @@ -66,19 +65,14 @@ type ModuleStatus struct { FailureMsg string } -// Save saves the Subsystem Status to the database. -func (sub *Subsystem) Save() { - if databaseKeySpace == "" { - return - } +func (sub *Subsystem) addDependencies(module *modules.Module, seen map[string]struct{}) { + for _, module := range module.Dependencies() { + if _, ok := seen[module.Name]; !ok { + seen[module.Name] = struct{}{} - if !sub.KeyIsSet() { - sub.SetKey(databaseKeySpace + sub.ID) - } - - err := db.Put(sub) - if err != nil { - log.Errorf("subsystems: could not save subsystem status to database: %s", err) + sub.Modules = append(sub.Modules, statusFromModule(module)) + sub.addDependencies(module, seen) + } } } diff --git a/modules/subsystems/subsystems.go b/modules/subsystems/subsystems.go deleted file mode 100644 index 1502a17..0000000 --- a/modules/subsystems/subsystems.go +++ /dev/null @@ -1,158 +0,0 @@ -package subsystems - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/tevino/abool" - - "github.com/safing/portbase/config" - "github.com/safing/portbase/modules" -) - -var ( - subsystemsLock sync.Mutex - subsystems []*Subsystem - subsystemsMap = make(map[string]*Subsystem) - subsystemsLocked = abool.New() - - handlingConfigChanges = abool.New() -) - -// Register registers a new subsystem. The given option must be a bool option. -// Should be called in init() directly after the modules.Register() function. -// The config option must not yet be registered and will be registered for -// you. Pass a nil option to force enable. -func Register(id, name, description string, module *modules.Module, configKeySpace string, option *config.Option) { - // lock slice and map - subsystemsLock.Lock() - defer subsystemsLock.Unlock() - - // check if registration is closed - if subsystemsLocked.IsSet() { - panic("subsystems can only be registered in prep phase or earlier") - } - - // check if already registered - if _, ok := subsystemsMap[name]; ok { - panic(fmt.Sprintf(`subsystem "%s" already registered`, name)) - } - - // create new - new := &Subsystem{ - ID: id, - Name: name, - Description: description, - module: module, - toggleOption: option, - ConfigKeySpace: configKeySpace, - } - if new.toggleOption != nil { - new.ToggleOptionKey = new.toggleOption.Key - new.ExpertiseLevel = new.toggleOption.ExpertiseLevel - new.ReleaseLevel = new.toggleOption.ReleaseLevel - } - - // register config - if option != nil { - err := config.Register(option) - if err != nil { - panic(fmt.Sprintf("failed to register config: %s", err)) - } - new.toggleValue = config.GetAsBool(new.ToggleOptionKey, false) - } else { - // force enabled - new.toggleValue = func() bool { return true } - } - - subsystemsMap[name] = new - subsystems = append(subsystems, new) -} - -func handleModuleChanges(m *modules.Module) { - // check if ready - if !subsystemsLocked.IsSet() { - return - } - - // check if shutting down - if modules.IsShuttingDown() { - return - } - - // find module status - var moduleSubsystem *Subsystem - var moduleStatus *ModuleStatus -subsystemLoop: - for _, subsystem := range subsystems { - for _, status := range subsystem.Modules { - if m.Name == status.Name { - moduleSubsystem = subsystem - moduleStatus = status - break subsystemLoop - } - } - } - // abort if not found - if moduleSubsystem == nil || moduleStatus == nil { - return - } - - // update status - moduleSubsystem.Lock() - changed := compareAndUpdateStatus(m, moduleStatus) - if changed { - moduleSubsystem.makeSummary() - } - moduleSubsystem.Unlock() - - // save - if changed { - moduleSubsystem.Save() - } -} - -func handleConfigChanges(ctx context.Context, _ interface{}) error { - // bail out early if we haven't started yet or are already - // shutting down - if !subsystemsLocked.IsSet() || modules.IsShuttingDown() { - return nil - } - - // potentially catch multiple changes - if handlingConfigChanges.SetToIf(false, true) { - time.Sleep(100 * time.Millisecond) - handlingConfigChanges.UnSet() - } else { - return nil - } - - // only run one instance at any time - subsystemsLock.Lock() - defer subsystemsLock.Unlock() - - var changed bool - for _, subsystem := range subsystems { - if subsystem.module.SetEnabled(subsystem.toggleValue()) { - // if changed - changed = true - } - } - if !changed { - return nil - } - - err := modules.ManageModules() - if err != nil { - module.Error( - "modulemgmt-failed", - fmt.Sprintf("The subsystem framework failed to start or stop one or more modules.\nError: %s\nCheck logs for more information.", err), - ) - } else { - module.Resolve("modulemgmt-failed") - } - - return nil -} diff --git a/modules/subsystems/subsystems_test.go b/modules/subsystems/subsystems_test.go index 601b668..3333df6 100644 --- a/modules/subsystems/subsystems_test.go +++ b/modules/subsystems/subsystems_test.go @@ -50,7 +50,7 @@ func TestSubsystems(t *testing.T) { DefaultValue: false, }, ) - sub1 := subsystemsMap["Feature One"] + sub1 := DefaultManager.subsys["feature-one"] feature2 := modules.Register("feature2", nil, nil, nil) Register( diff --git a/runtime/registry.go b/runtime/registry.go index 1afc7fe..f87281b 100644 --- a/runtime/registry.go +++ b/runtime/registry.go @@ -215,15 +215,17 @@ func (r *Registry) Query(q *query.Query, local, internal bool) (*iterator.Iterat for _, r := range records { r.Lock() - if q.MatchesKey(r.DatabaseKey()) || - !r.Meta().CheckValidity() || - !r.Meta().CheckPermission(local, internal) || - !q.MatchesRecord(r) { + var ( + matchesKey = q.MatchesKey(r.DatabaseKey()) + isValid = r.Meta().CheckValidity() + isAllowed = r.Meta().CheckPermission(local, internal) + matchesRecord = q.MatchesRecord(r) + ) + r.Unlock() - r.Unlock() + if !(matchesKey && isValid && isAllowed && matchesRecord) { continue } - r.Unlock() select { case iter.Next <- r: From 0464f5be487cbec8f4d81cf73a17d22ac718b463 Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Tue, 22 Sep 2020 16:51:26 +0200 Subject: [PATCH 4/4] Review related changes --- modules/subsystems/registry.go | 2 +- runtime/registry.go | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/modules/subsystems/registry.go b/modules/subsystems/registry.go index bc0c7cb..98092c4 100644 --- a/modules/subsystems/registry.go +++ b/modules/subsystems/registry.go @@ -70,7 +70,7 @@ func (mng *Manager) Start() error { configKeyPrefixes[sub.ConfigKeySpace] = sub } - // aggmngate all modules dependencies (and the subsystem module itself) + // aggregate all modules dependencies (and the subsystem module itself) // into the Modules slice. Configuration options form dependened modules // will be marked using config.SubsystemAnnotation if not already set. for _, sub := range mng.subsys { diff --git a/runtime/registry.go b/runtime/registry.go index f87281b..643d46c 100644 --- a/runtime/registry.go +++ b/runtime/registry.go @@ -216,14 +216,18 @@ func (r *Registry) Query(q *query.Query, local, internal bool) (*iterator.Iterat for _, r := range records { r.Lock() var ( - matchesKey = q.MatchesKey(r.DatabaseKey()) - isValid = r.Meta().CheckValidity() - isAllowed = r.Meta().CheckPermission(local, internal) - matchesRecord = q.MatchesRecord(r) + matchesKey = q.MatchesKey(r.DatabaseKey()) + isValid = r.Meta().CheckValidity() + isAllowed = r.Meta().CheckPermission(local, internal) + + allowed = matchesKey && isValid && isAllowed ) + if allowed { + allowed = q.MatchesRecord(r) + } r.Unlock() - if !(matchesKey && isValid && isAllowed && matchesRecord) { + if !allowed { continue }