mirror of
https://github.com/safing/portbase
synced 2025-09-01 10:09:50 +00:00
commit
3be9f93001
16 changed files with 265 additions and 77 deletions
|
@ -7,13 +7,17 @@ import (
|
||||||
"github.com/safing/portbase/modules"
|
"github.com/safing/portbase/modules"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
module *modules.Module
|
||||||
|
)
|
||||||
|
|
||||||
// API Errors
|
// API Errors
|
||||||
var (
|
var (
|
||||||
ErrAuthenticationAlreadySet = errors.New("the authentication function has already been set")
|
ErrAuthenticationAlreadySet = errors.New("the authentication function has already been set")
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
modules.Register("api", prep, start, stop, "base", "database", "config")
|
module = modules.Register("api", prep, start, stop, "base", "database", "config")
|
||||||
}
|
}
|
||||||
|
|
||||||
func prep() error {
|
func prep() error {
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
|
||||||
|
@ -56,8 +58,20 @@ func Serve() {
|
||||||
|
|
||||||
// start serving
|
// start serving
|
||||||
log.Infof("api: starting to listen on %s", server.Addr)
|
log.Infof("api: starting to listen on %s", server.Addr)
|
||||||
// TODO: retry if failed
|
backoffDuration := 10 * time.Second
|
||||||
log.Errorf("api: failed to listen on %s: %s", server.Addr, server.ListenAndServe())
|
for {
|
||||||
|
// always returns an error
|
||||||
|
err := module.RunWorker("http endpoint", func(ctx context.Context) error {
|
||||||
|
return server.ListenAndServe()
|
||||||
|
})
|
||||||
|
// return on shutdown error
|
||||||
|
if err == http.ErrServerClosed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// log error and restart
|
||||||
|
log.Errorf("api: http endpoint failed: %s - restarting in %s", err, backoffDuration)
|
||||||
|
time.Sleep(backoffDuration)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMuxVars wraps github.com/gorilla/mux.Vars in order to mitigate context key issues in multi-repo projects.
|
// GetMuxVars wraps github.com/gorilla/mux.Vars in order to mitigate context key issues in multi-repo projects.
|
||||||
|
|
2
config/doc.go
Normal file
2
config/doc.go
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
// Package config provides a versatile configuration management system.
|
||||||
|
package config
|
72
config/expertise.go
Normal file
72
config/expertise.go
Normal file
|
@ -0,0 +1,72 @@
|
||||||
|
// Package config ... (linter fix)
|
||||||
|
//nolint:dupl
|
||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Expertise Level constants
|
||||||
|
const (
|
||||||
|
ExpertiseLevelUser uint8 = 0
|
||||||
|
ExpertiseLevelExpert uint8 = 1
|
||||||
|
ExpertiseLevelDeveloper uint8 = 2
|
||||||
|
|
||||||
|
ExpertiseLevelNameUser = "user"
|
||||||
|
ExpertiseLevelNameExpert = "expert"
|
||||||
|
ExpertiseLevelNameDeveloper = "developer"
|
||||||
|
|
||||||
|
expertiseLevelKey = "core/expertiseLevel"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
expertiseLevel *int32
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
var expertiseLevelVal int32
|
||||||
|
expertiseLevel = &expertiseLevelVal
|
||||||
|
|
||||||
|
registerExpertiseLevelOption()
|
||||||
|
}
|
||||||
|
|
||||||
|
func registerExpertiseLevelOption() {
|
||||||
|
err := Register(&Option{
|
||||||
|
Name: "Expertise Level",
|
||||||
|
Key: expertiseLevelKey,
|
||||||
|
Description: "The Expertise Level controls the perceived complexity. Higher settings will show you more complex settings and information. This might also affect various other things relying on this setting. Modified settings in higher expertise levels stay in effect when switching back. (Unlike the Release Level)",
|
||||||
|
|
||||||
|
OptType: OptTypeString,
|
||||||
|
ExpertiseLevel: ExpertiseLevelUser,
|
||||||
|
ReleaseLevel: ExpertiseLevelUser,
|
||||||
|
|
||||||
|
RequiresRestart: false,
|
||||||
|
DefaultValue: ExpertiseLevelNameUser,
|
||||||
|
|
||||||
|
ExternalOptType: "string list",
|
||||||
|
ValidationRegex: fmt.Sprintf("^(%s|%s|%s)$", ExpertiseLevelNameUser, ExpertiseLevelNameExpert, ExpertiseLevelNameDeveloper),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateExpertiseLevel() {
|
||||||
|
new := findStringValue(expertiseLevelKey, "")
|
||||||
|
switch new {
|
||||||
|
case ExpertiseLevelNameUser:
|
||||||
|
atomic.StoreInt32(expertiseLevel, int32(ExpertiseLevelUser))
|
||||||
|
case ExpertiseLevelNameExpert:
|
||||||
|
atomic.StoreInt32(expertiseLevel, int32(ExpertiseLevelExpert))
|
||||||
|
case ExpertiseLevelNameDeveloper:
|
||||||
|
atomic.StoreInt32(expertiseLevel, int32(ExpertiseLevelDeveloper))
|
||||||
|
default:
|
||||||
|
atomic.StoreInt32(expertiseLevel, int32(ExpertiseLevelUser))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetExpertiseLevel returns the current active expertise level.
|
||||||
|
func GetExpertiseLevel() uint8 {
|
||||||
|
return uint8(atomic.LoadInt32(expertiseLevel))
|
||||||
|
}
|
|
@ -81,21 +81,7 @@ func findValue(key string) interface{} {
|
||||||
option.Lock()
|
option.Lock()
|
||||||
defer option.Unlock()
|
defer option.Unlock()
|
||||||
|
|
||||||
// check if option is active
|
if option.ReleaseLevel <= getReleaseLevel() && option.activeValue != nil {
|
||||||
optionActive := true
|
|
||||||
switch getReleaseLevel() {
|
|
||||||
case ReleaseLevelStable:
|
|
||||||
// In stable, only stable is active
|
|
||||||
optionActive = option.ReleaseLevel == ReleaseLevelStable
|
|
||||||
case ReleaseLevelBeta:
|
|
||||||
// In beta, only stable and beta are active
|
|
||||||
optionActive = option.ReleaseLevel == ReleaseLevelStable || option.ReleaseLevel == ReleaseLevelBeta
|
|
||||||
case ReleaseLevelExperimental:
|
|
||||||
// In experimental, everything is active
|
|
||||||
optionActive = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if optionActive && option.activeValue != nil {
|
|
||||||
return option.activeValue
|
return option.activeValue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -160,21 +160,21 @@ func TestReleaseLevel(t *testing.T) {
|
||||||
|
|
||||||
// test option level stable
|
// test option level stable
|
||||||
subsystemOption.ReleaseLevel = ReleaseLevelStable
|
subsystemOption.ReleaseLevel = ReleaseLevelStable
|
||||||
err = SetConfigOption(releaseLevelKey, ReleaseLevelStable)
|
err = SetConfigOption(releaseLevelKey, ReleaseLevelNameStable)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !testSubsystem() {
|
if !testSubsystem() {
|
||||||
t.Error("should be active")
|
t.Error("should be active")
|
||||||
}
|
}
|
||||||
err = SetConfigOption(releaseLevelKey, ReleaseLevelBeta)
|
err = SetConfigOption(releaseLevelKey, ReleaseLevelNameBeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !testSubsystem() {
|
if !testSubsystem() {
|
||||||
t.Error("should be active")
|
t.Error("should be active")
|
||||||
}
|
}
|
||||||
err = SetConfigOption(releaseLevelKey, ReleaseLevelExperimental)
|
err = SetConfigOption(releaseLevelKey, ReleaseLevelNameExperimental)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -184,21 +184,21 @@ func TestReleaseLevel(t *testing.T) {
|
||||||
|
|
||||||
// test option level beta
|
// test option level beta
|
||||||
subsystemOption.ReleaseLevel = ReleaseLevelBeta
|
subsystemOption.ReleaseLevel = ReleaseLevelBeta
|
||||||
err = SetConfigOption(releaseLevelKey, ReleaseLevelStable)
|
err = SetConfigOption(releaseLevelKey, ReleaseLevelNameStable)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if testSubsystem() {
|
if testSubsystem() {
|
||||||
t.Errorf("should be inactive: opt=%s system=%s", subsystemOption.ReleaseLevel, releaseLevel)
|
t.Errorf("should be inactive: opt=%d system=%d", subsystemOption.ReleaseLevel, getReleaseLevel())
|
||||||
}
|
}
|
||||||
err = SetConfigOption(releaseLevelKey, ReleaseLevelBeta)
|
err = SetConfigOption(releaseLevelKey, ReleaseLevelNameBeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !testSubsystem() {
|
if !testSubsystem() {
|
||||||
t.Error("should be active")
|
t.Error("should be active")
|
||||||
}
|
}
|
||||||
err = SetConfigOption(releaseLevelKey, ReleaseLevelExperimental)
|
err = SetConfigOption(releaseLevelKey, ReleaseLevelNameExperimental)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -208,21 +208,21 @@ func TestReleaseLevel(t *testing.T) {
|
||||||
|
|
||||||
// test option level experimental
|
// test option level experimental
|
||||||
subsystemOption.ReleaseLevel = ReleaseLevelExperimental
|
subsystemOption.ReleaseLevel = ReleaseLevelExperimental
|
||||||
err = SetConfigOption(releaseLevelKey, ReleaseLevelStable)
|
err = SetConfigOption(releaseLevelKey, ReleaseLevelNameStable)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if testSubsystem() {
|
if testSubsystem() {
|
||||||
t.Error("should be inactive")
|
t.Error("should be inactive")
|
||||||
}
|
}
|
||||||
err = SetConfigOption(releaseLevelKey, ReleaseLevelBeta)
|
err = SetConfigOption(releaseLevelKey, ReleaseLevelNameBeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if testSubsystem() {
|
if testSubsystem() {
|
||||||
t.Error("should be inactive")
|
t.Error("should be inactive")
|
||||||
}
|
}
|
||||||
err = SetConfigOption(releaseLevelKey, ReleaseLevelExperimental)
|
err = SetConfigOption(releaseLevelKey, ReleaseLevelNameExperimental)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,14 +17,6 @@ const (
|
||||||
OptTypeStringArray uint8 = 2
|
OptTypeStringArray uint8 = 2
|
||||||
OptTypeInt uint8 = 3
|
OptTypeInt uint8 = 3
|
||||||
OptTypeBool uint8 = 4
|
OptTypeBool uint8 = 4
|
||||||
|
|
||||||
ExpertiseLevelUser uint8 = 1
|
|
||||||
ExpertiseLevelExpert uint8 = 2
|
|
||||||
ExpertiseLevelDeveloper uint8 = 3
|
|
||||||
|
|
||||||
ReleaseLevelStable = "stable"
|
|
||||||
ReleaseLevelBeta = "beta"
|
|
||||||
ReleaseLevelExperimental = "experimental"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func getTypeName(t uint8) string {
|
func getTypeName(t uint8) string {
|
||||||
|
@ -50,9 +42,9 @@ type Option struct {
|
||||||
Key string // in path format: category/sub/key
|
Key string // in path format: category/sub/key
|
||||||
Description string
|
Description string
|
||||||
|
|
||||||
ReleaseLevel string
|
|
||||||
ExpertiseLevel uint8
|
|
||||||
OptType uint8
|
OptType uint8
|
||||||
|
ExpertiseLevel uint8
|
||||||
|
ReleaseLevel uint8
|
||||||
|
|
||||||
RequiresRestart bool
|
RequiresRestart bool
|
||||||
DefaultValue interface{}
|
DefaultValue interface{}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -10,21 +9,21 @@ import (
|
||||||
var (
|
var (
|
||||||
optionsLock sync.RWMutex
|
optionsLock sync.RWMutex
|
||||||
options = make(map[string]*Option)
|
options = make(map[string]*Option)
|
||||||
|
|
||||||
// ErrIncompleteCall is return when RegisterOption is called with empty mandatory values.
|
|
||||||
ErrIncompleteCall = errors.New("could not register config option: all fields, except for the validationRegex are mandatory")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Register registers a new configuration option.
|
// Register registers a new configuration option.
|
||||||
func Register(option *Option) error {
|
func Register(option *Option) error {
|
||||||
|
if option.Name == "" {
|
||||||
if option.Name == "" ||
|
return fmt.Errorf("failed to register option: please set option.Name")
|
||||||
option.Key == "" ||
|
}
|
||||||
option.Description == "" ||
|
if option.Key == "" {
|
||||||
option.OptType == 0 ||
|
return fmt.Errorf("failed to register option: please set option.Key")
|
||||||
option.ExpertiseLevel == 0 ||
|
}
|
||||||
option.ReleaseLevel == "" {
|
if option.Description == "" {
|
||||||
return ErrIncompleteCall
|
return fmt.Errorf("failed to register option: please set option.Description")
|
||||||
|
}
|
||||||
|
if option.OptType == 0 {
|
||||||
|
return fmt.Errorf("failed to register option: please set option.OptType")
|
||||||
}
|
}
|
||||||
|
|
||||||
if option.ValidationRegex != "" {
|
if option.ValidationRegex != "" {
|
||||||
|
@ -37,7 +36,6 @@ func Register(option *Option) error {
|
||||||
|
|
||||||
optionsLock.Lock()
|
optionsLock.Lock()
|
||||||
defer optionsLock.Unlock()
|
defer optionsLock.Unlock()
|
||||||
|
|
||||||
options[option.Key] = option
|
options[option.Key] = option
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -1,38 +1,51 @@
|
||||||
|
// Package config ... (linter fix)
|
||||||
|
//nolint:dupl
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Release Level constants
|
||||||
const (
|
const (
|
||||||
releaseLevelKey = "core/release_level"
|
ReleaseLevelStable uint8 = 0
|
||||||
|
ReleaseLevelBeta uint8 = 1
|
||||||
|
ReleaseLevelExperimental uint8 = 2
|
||||||
|
|
||||||
|
ReleaseLevelNameStable = "stable"
|
||||||
|
ReleaseLevelNameBeta = "beta"
|
||||||
|
ReleaseLevelNameExperimental = "experimental"
|
||||||
|
|
||||||
|
releaseLevelKey = "core/releaseLevel"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
releaseLevel = ReleaseLevelStable
|
releaseLevel *int32
|
||||||
releaseLevelLock sync.Mutex
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
var releaseLevelVal int32
|
||||||
|
releaseLevel = &releaseLevelVal
|
||||||
|
|
||||||
registerReleaseLevelOption()
|
registerReleaseLevelOption()
|
||||||
}
|
}
|
||||||
|
|
||||||
func registerReleaseLevelOption() {
|
func registerReleaseLevelOption() {
|
||||||
err := Register(&Option{
|
err := Register(&Option{
|
||||||
Name: "Release Selection",
|
Name: "Release Level",
|
||||||
Key: releaseLevelKey,
|
Key: releaseLevelKey,
|
||||||
Description: "Select maturity level of features that should be available",
|
Description: "The Release Level changes which features are available to you. Some beta or experimental features are also available in the stable release channel. Unavailable settings are set to the default value.",
|
||||||
|
|
||||||
OptType: OptTypeString,
|
OptType: OptTypeString,
|
||||||
ExpertiseLevel: ExpertiseLevelExpert,
|
ExpertiseLevel: ExpertiseLevelExpert,
|
||||||
ReleaseLevel: ReleaseLevelStable,
|
ReleaseLevel: ReleaseLevelStable,
|
||||||
|
|
||||||
RequiresRestart: false,
|
RequiresRestart: false,
|
||||||
DefaultValue: ReleaseLevelStable,
|
DefaultValue: ReleaseLevelNameStable,
|
||||||
|
|
||||||
ExternalOptType: "string list",
|
ExternalOptType: "string list",
|
||||||
ValidationRegex: fmt.Sprintf("^(%s|%s|%s)$", ReleaseLevelStable, ReleaseLevelBeta, ReleaseLevelExperimental),
|
ValidationRegex: fmt.Sprintf("^(%s|%s|%s)$", ReleaseLevelNameStable, ReleaseLevelNameBeta, ReleaseLevelNameExperimental),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -41,17 +54,18 @@ func registerReleaseLevelOption() {
|
||||||
|
|
||||||
func updateReleaseLevel() {
|
func updateReleaseLevel() {
|
||||||
new := findStringValue(releaseLevelKey, "")
|
new := findStringValue(releaseLevelKey, "")
|
||||||
releaseLevelLock.Lock()
|
switch new {
|
||||||
if new == "" {
|
case ReleaseLevelNameStable:
|
||||||
releaseLevel = ReleaseLevelStable
|
atomic.StoreInt32(releaseLevel, int32(ReleaseLevelStable))
|
||||||
} else {
|
case ReleaseLevelNameBeta:
|
||||||
releaseLevel = new
|
atomic.StoreInt32(releaseLevel, int32(ReleaseLevelBeta))
|
||||||
|
case ReleaseLevelNameExperimental:
|
||||||
|
atomic.StoreInt32(releaseLevel, int32(ReleaseLevelExperimental))
|
||||||
|
default:
|
||||||
|
atomic.StoreInt32(releaseLevel, int32(ReleaseLevelStable))
|
||||||
}
|
}
|
||||||
releaseLevelLock.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getReleaseLevel() string {
|
func getReleaseLevel() uint8 {
|
||||||
releaseLevelLock.Lock()
|
return uint8(atomic.LoadInt32(releaseLevel))
|
||||||
defer releaseLevelLock.Unlock()
|
|
||||||
return releaseLevel
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,8 +36,9 @@ func Changed() <-chan struct{} {
|
||||||
}
|
}
|
||||||
|
|
||||||
func signalChanges() {
|
func signalChanges() {
|
||||||
// refetch and save release level
|
// refetch and save release level and expertise level
|
||||||
updateReleaseLevel()
|
updateReleaseLevel()
|
||||||
|
updateExpertiseLevel()
|
||||||
|
|
||||||
// reset validity flag
|
// reset validity flag
|
||||||
validityFlagLock.Lock()
|
validityFlagLock.Lock()
|
||||||
|
|
|
@ -47,12 +47,22 @@ func SetMaxConcurrentMicroTasks(n int) {
|
||||||
|
|
||||||
// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
|
// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
|
||||||
func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) error {
|
func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) error {
|
||||||
|
if m == nil {
|
||||||
|
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
|
||||||
|
return errNoModule
|
||||||
|
}
|
||||||
|
|
||||||
atomic.AddInt32(microTasks, 1)
|
atomic.AddInt32(microTasks, 1)
|
||||||
return m.runMicroTask(name, fn)
|
return m.runMicroTask(name, fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. It will wait until given a go (max 3 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
|
// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. It will wait until given a go (max 3 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
|
||||||
func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) error {
|
func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) error {
|
||||||
|
if m == nil {
|
||||||
|
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
|
||||||
|
return errNoModule
|
||||||
|
}
|
||||||
|
|
||||||
// check if we can go immediately
|
// check if we can go immediately
|
||||||
select {
|
select {
|
||||||
case <-mediumPriorityClearance:
|
case <-mediumPriorityClearance:
|
||||||
|
@ -68,6 +78,11 @@ func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Cont
|
||||||
|
|
||||||
// StartLowPriorityMicroTask starts a new MicroTask with low priority. It will wait until given a go (max 15 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
|
// StartLowPriorityMicroTask starts a new MicroTask with low priority. It will wait until given a go (max 15 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
|
||||||
func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) error {
|
func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) error {
|
||||||
|
if m == nil {
|
||||||
|
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
|
||||||
|
return errNoModule
|
||||||
|
}
|
||||||
|
|
||||||
// check if we can go immediately
|
// check if we can go immediately
|
||||||
select {
|
select {
|
||||||
case <-lowPriorityClearance:
|
case <-lowPriorityClearance:
|
||||||
|
|
|
@ -67,14 +67,32 @@ func (m *Module) shutdown() error {
|
||||||
m.shutdownFlag.Set()
|
m.shutdownFlag.Set()
|
||||||
m.cancelCtx()
|
m.cancelCtx()
|
||||||
|
|
||||||
|
// start shutdown function
|
||||||
|
m.waitGroup.Add(1)
|
||||||
|
stopFnError := make(chan error)
|
||||||
|
go func() {
|
||||||
|
stopFnError <- m.runModuleCtrlFn("stop module", m.stop)
|
||||||
|
m.waitGroup.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
// wait for workers
|
// wait for workers
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
m.waitGroup.Wait()
|
m.waitGroup.Wait()
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// wait for results
|
||||||
select {
|
select {
|
||||||
|
case err := <-stopFnError:
|
||||||
|
return err
|
||||||
case <-done:
|
case <-done:
|
||||||
|
select {
|
||||||
|
case err := <-stopFnError:
|
||||||
|
return err
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(3 * time.Second):
|
||||||
log.Warningf(
|
log.Warningf(
|
||||||
"%s: timed out while waiting for workers/tasks to finish: workers=%d tasks=%d microtasks=%d, continuing shutdown...",
|
"%s: timed out while waiting for workers/tasks to finish: workers=%d tasks=%d microtasks=%d, continuing shutdown...",
|
||||||
|
@ -84,9 +102,7 @@ func (m *Module) shutdown() error {
|
||||||
atomic.LoadInt32(m.microTaskCnt),
|
atomic.LoadInt32(m.microTaskCnt),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
// call shutdown function
|
|
||||||
return m.stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func dummyAction() error {
|
func dummyAction() error {
|
||||||
|
|
|
@ -106,7 +106,7 @@ func prepareModules() error {
|
||||||
go func() {
|
go func() {
|
||||||
reports <- &report{
|
reports <- &report{
|
||||||
module: execM,
|
module: execM,
|
||||||
err: execM.prep(),
|
err: execM.runModuleCtrlFn("prep module", execM.prep),
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -154,7 +154,7 @@ func startModules() error {
|
||||||
go func() {
|
go func() {
|
||||||
reports <- &report{
|
reports <- &report{
|
||||||
module: execM,
|
module: execM,
|
||||||
err: execM.start(),
|
err: execM.runModuleCtrlFn("start module", execM.start),
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package modules
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
@ -59,6 +60,10 @@ const (
|
||||||
|
|
||||||
// NewTask creates a new task with a descriptive name (non-unique), a optional deadline, and the task function to be executed. You must call one of Queue, Prioritize, StartASAP, Schedule or Repeat in order to have the Task executed.
|
// NewTask creates a new task with a descriptive name (non-unique), a optional deadline, and the task function to be executed. You must call one of Queue, Prioritize, StartASAP, Schedule or Repeat in order to have the Task executed.
|
||||||
func (m *Module) NewTask(name string, fn func(context.Context, *Task)) *Task {
|
func (m *Module) NewTask(name string, fn func(context.Context, *Task)) *Task {
|
||||||
|
if m == nil {
|
||||||
|
log.Errorf(`modules: cannot create task "%s" with nil module`, name)
|
||||||
|
}
|
||||||
|
|
||||||
return &Task{
|
return &Task{
|
||||||
name: name,
|
name: name,
|
||||||
module: m,
|
module: m,
|
||||||
|
@ -68,6 +73,10 @@ func (m *Module) NewTask(name string, fn func(context.Context, *Task)) *Task {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) isActive() bool {
|
func (t *Task) isActive() bool {
|
||||||
|
if t.module == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
return !t.canceled && !t.module.ShutdownInProgress()
|
return !t.canceled && !t.module.ShutdownInProgress()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,6 +187,7 @@ func (t *Task) Repeat(interval time.Duration) *Task {
|
||||||
t.lock.Lock()
|
t.lock.Lock()
|
||||||
t.repeat = interval
|
t.repeat = interval
|
||||||
t.executeAt = time.Now().Add(t.repeat)
|
t.executeAt = time.Now().Add(t.repeat)
|
||||||
|
t.addToSchedule()
|
||||||
t.lock.Unlock()
|
t.lock.Unlock()
|
||||||
|
|
||||||
return t
|
return t
|
||||||
|
@ -194,6 +204,10 @@ func (t *Task) Cancel() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task) runWithLocking() {
|
func (t *Task) runWithLocking() {
|
||||||
|
if t.module == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// wait for good timeslot regarding microtasks
|
// wait for good timeslot regarding microtasks
|
||||||
select {
|
select {
|
||||||
case <-taskTimeslot:
|
case <-taskTimeslot:
|
||||||
|
@ -308,6 +322,7 @@ func (t *Task) getExecuteAtWithLocking() time.Time {
|
||||||
func (t *Task) addToSchedule() {
|
func (t *Task) addToSchedule() {
|
||||||
scheduleLock.Lock()
|
scheduleLock.Lock()
|
||||||
defer scheduleLock.Unlock()
|
defer scheduleLock.Unlock()
|
||||||
|
// defer printTaskList(taskSchedule) // for debugging
|
||||||
|
|
||||||
// notify scheduler
|
// notify scheduler
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -439,3 +454,23 @@ func taskScheduleHandler() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func printTaskList(*list.List) { //nolint:unused,deadcode // for debugging, NOT production use
|
||||||
|
fmt.Println("Modules Task List:")
|
||||||
|
for e := taskSchedule.Front(); e != nil; e = e.Next() {
|
||||||
|
t, ok := e.Value.(*Task)
|
||||||
|
if ok {
|
||||||
|
fmt.Printf(
|
||||||
|
"%s:%s qu=%v ca=%v exec=%v at=%s rep=%s delay=%s\n",
|
||||||
|
t.module.Name,
|
||||||
|
t.name,
|
||||||
|
t.queued,
|
||||||
|
t.canceled,
|
||||||
|
t.executing,
|
||||||
|
t.executeAt,
|
||||||
|
t.repeat,
|
||||||
|
t.maxDelay,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package modules
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -13,8 +14,17 @@ const (
|
||||||
DefaultBackoffDuration = 2 * time.Second
|
DefaultBackoffDuration = 2 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errNoModule = errors.New("missing module (is nil!)")
|
||||||
|
)
|
||||||
|
|
||||||
// RunWorker directly runs a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to RunWorker blocks until the worker is finished.
|
// RunWorker directly runs a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to RunWorker blocks until the worker is finished.
|
||||||
func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
|
func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
|
||||||
|
if m == nil {
|
||||||
|
log.Errorf(`modules: cannot start worker "%s" with nil module`, name)
|
||||||
|
return errNoModule
|
||||||
|
}
|
||||||
|
|
||||||
atomic.AddInt32(m.workerCnt, 1)
|
atomic.AddInt32(m.workerCnt, 1)
|
||||||
m.waitGroup.Add(1)
|
m.waitGroup.Add(1)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -27,6 +37,11 @@ func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
|
||||||
|
|
||||||
// StartServiceWorker starts a generic worker, which is automatically restarted in case of an error. A call to StartServiceWorker runs the service-worker in a new goroutine and returns immediately. `backoffDuration` specifies how to long to wait before restarts, multiplied by the number of failed attempts. Pass `0` for the default backoff duration. For custom error remediation functionality, build your own error handling procedure using calls to RunWorker.
|
// StartServiceWorker starts a generic worker, which is automatically restarted in case of an error. A call to StartServiceWorker runs the service-worker in a new goroutine and returns immediately. `backoffDuration` specifies how to long to wait before restarts, multiplied by the number of failed attempts. Pass `0` for the default backoff duration. For custom error remediation functionality, build your own error handling procedure using calls to RunWorker.
|
||||||
func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
|
func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
|
||||||
|
if m == nil {
|
||||||
|
log.Errorf(`modules: cannot start service worker "%s" with nil module`, name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
go m.runServiceWorker(name, backoffDuration, fn)
|
go m.runServiceWorker(name, backoffDuration, fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,6 +57,7 @@ func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn
|
||||||
backoffDuration = DefaultBackoffDuration
|
backoffDuration = DefaultBackoffDuration
|
||||||
}
|
}
|
||||||
failCnt := 0
|
failCnt := 0
|
||||||
|
lastFail := time.Now()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if m.ShutdownInProgress() {
|
if m.ShutdownInProgress() {
|
||||||
|
@ -50,11 +66,18 @@ func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn
|
||||||
|
|
||||||
err := m.runWorker(name, fn)
|
err := m.runWorker(name, fn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// log error and restart
|
// reset fail counter if running without error for some time
|
||||||
|
if time.Now().Add(-5 * time.Minute).After(lastFail) {
|
||||||
|
failCnt = 0
|
||||||
|
}
|
||||||
|
// increase fail counter and set last failed time
|
||||||
failCnt++
|
failCnt++
|
||||||
|
lastFail = time.Now()
|
||||||
|
// log error
|
||||||
sleepFor := time.Duration(failCnt) * backoffDuration
|
sleepFor := time.Duration(failCnt) * backoffDuration
|
||||||
log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
|
log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
|
||||||
time.Sleep(sleepFor)
|
time.Sleep(sleepFor)
|
||||||
|
// loop to restart
|
||||||
} else {
|
} else {
|
||||||
// finish
|
// finish
|
||||||
return
|
return
|
||||||
|
@ -77,3 +100,19 @@ func (m *Module) runWorker(name string, fn func(context.Context) error) (err err
|
||||||
err = fn(m.Ctx)
|
err = fn(m.Ctx)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Module) runModuleCtrlFn(name string, fn func() error) (err error) {
|
||||||
|
defer func() {
|
||||||
|
// recover from panic
|
||||||
|
panicVal := recover()
|
||||||
|
if panicVal != nil {
|
||||||
|
me := m.NewPanicError(name, "module-control", panicVal)
|
||||||
|
me.Report()
|
||||||
|
err = me
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// run
|
||||||
|
err = fn()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
modules.Register("notifications", nil, start, nil, "base", "database")
|
module = modules.Register("notifications", nil, start, nil, "base", "database")
|
||||||
}
|
}
|
||||||
|
|
||||||
func start() error {
|
func start() error {
|
||||||
|
|
Loading…
Add table
Reference in a new issue