From 2282c6bb71885f1110d6976348d4bea77d0076f7 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 9 Oct 2019 16:35:33 +0200 Subject: [PATCH] Improve and fix module startup and shutdown procedures as well as error reporting --- modules/error.go | 51 +++++++++++++++++++++++++++++++++++++++++++--- modules/modules.go | 47 ++++++++++++++++++------------------------ modules/start.go | 17 ++++++++++++---- modules/stop.go | 3 +++ modules/worker.go | 23 ++++++++++++++++++++- 5 files changed, 106 insertions(+), 35 deletions(-) diff --git a/modules/error.go b/modules/error.go index 80d025e..a22cd9c 100644 --- a/modules/error.go +++ b/modules/error.go @@ -2,11 +2,16 @@ package modules import ( "fmt" + "os" "runtime/debug" + "sync" + "time" ) var ( errorReportingChannel chan *ModuleError + reportToStdErr bool + reportingLock sync.RWMutex ) // ModuleError wraps a panic, error or message into an error that can be reported. @@ -64,12 +69,43 @@ func (me *ModuleError) Error() string { // Report reports the error through the configured reporting channel. func (me *ModuleError) Report() { + reportingLock.RLock() + defer reportingLock.RUnlock() + if errorReportingChannel != nil { select { case errorReportingChannel <- me: default: } } + + if reportToStdErr { + // default to writing to stderr + fmt.Fprintf( + os.Stderr, + `===== Error Report ===== +Message: %s +Timestamp: %s +ModuleName: %s +TaskName: %s +TaskType: %s +Severity: %s +PanicValue: %s +StackTrace: + +%s +===== End of Report ===== +`, + me.Message, + time.Now(), + me.ModuleName, + me.TaskName, + me.TaskType, + me.Severity, + me.PanicValue, + me.StackTrace, + ) + } } // IsPanic returns whether the given error is a wrapped panic by the modules package and additionally returns it, if true. @@ -84,7 +120,16 @@ func IsPanic(err error) (bool, *ModuleError) { // SetErrorReportingChannel sets the channel to report module errors through. By default only panics are reported, all other errors need to be manually wrapped into a *ModuleError and reported. func SetErrorReportingChannel(reportingChannel chan *ModuleError) { - if errorReportingChannel == nil { - errorReportingChannel = reportingChannel - } + reportingLock.Lock() + defer reportingLock.Unlock() + + errorReportingChannel = reportingChannel +} + +// SetStdErrReporting controls error reporting to stderr. +func SetStdErrReporting(on bool) { + reportingLock.Lock() + defer reportingLock.Unlock() + + reportToStdErr = on } diff --git a/modules/modules.go b/modules/modules.go index 9594cff..dbe2375 100644 --- a/modules/modules.go +++ b/modules/modules.go @@ -13,7 +13,7 @@ import ( ) var ( - modulesLock sync.Mutex + modulesLock sync.RWMutex modules = make(map[string]*Module) // ErrCleanExit is returned by Start() when the program is interrupted before starting. This can happen for example, when using the "--help" flag. @@ -46,6 +46,10 @@ type Module struct { microTaskCnt *int32 waitGroup sync.WaitGroup + // events + eventHooks map[string][]*eventHook + eventHooksLock sync.RWMutex + // dependency mgmt depNames []string depModules []*Module @@ -69,9 +73,9 @@ func (m *Module) shutdown() error { // start shutdown function m.waitGroup.Add(1) - stopFnError := make(chan error) + stopFnError := make(chan error, 1) go func() { - stopFnError <- m.runModuleCtrlFn("stop module", m.stop) + stopFnError <- m.runCtrlFn("stop module", m.stop) m.waitGroup.Done() }() @@ -84,16 +88,8 @@ func (m *Module) shutdown() error { // wait for results select { - case err := <-stopFnError: - return err case <-done: - select { - case err := <-stopFnError: - return err - default: - return nil - } - case <-time.After(3 * time.Second): + case <-time.After(30 * time.Second): log.Warningf( "%s: timed out while waiting for workers/tasks to finish: workers=%d tasks=%d microtasks=%d, continuing shutdown...", m.Name, @@ -102,11 +98,18 @@ func (m *Module) shutdown() error { atomic.LoadInt32(m.microTaskCnt), ) } - return nil -} -func dummyAction() error { - return nil + // collect error + select { + case err := <-stopFnError: + return err + default: + log.Warningf( + "%s: timed out while waiting for stop function to finish, continuing shutdown...", + m.Name, + ) + return nil + } } // Register registers a new module. The control functions `prep`, `start` and `stop` are technically optional. `stop` is called _after_ all added module workers finished. @@ -141,20 +144,10 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ... prep: prep, start: start, stop: stop, + eventHooks: make(map[string][]*eventHook), depNames: dependencies, } - // replace nil arguments with dummy action - if newModule.prep == nil { - newModule.prep = dummyAction - } - if newModule.start == nil { - newModule.start = dummyAction - } - if newModule.stop == nil { - newModule.stop = dummyAction - } - return newModule } diff --git a/modules/start.go b/modules/start.go index 3c3e544..6c945f3 100644 --- a/modules/start.go +++ b/modules/start.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "runtime" + "time" "github.com/safing/portbase/log" "github.com/tevino/abool" @@ -26,8 +27,8 @@ func WaitForStartCompletion() <-chan struct{} { // Start starts all modules in the correct order. In case of an error, it will automatically shutdown again. func Start() error { - modulesLock.Lock() - defer modulesLock.Unlock() + modulesLock.RLock() + defer modulesLock.RUnlock() // start microtask scheduler go microTaskScheduler() @@ -106,7 +107,11 @@ func prepareModules() error { go func() { reports <- &report{ module: execM, - err: execM.runModuleCtrlFn("prep module", execM.prep), + err: execM.runCtrlFnWithTimeout( + "prep module", + 10*time.Second, + execM.prep, + ), } }() } @@ -154,7 +159,11 @@ func startModules() error { go func() { reports <- &report{ module: execM, - err: execM.runModuleCtrlFn("start module", execM.start), + err: execM.runCtrlFnWithTimeout( + "start module", + 60*time.Second, + execM.start, + ), } }() } diff --git a/modules/stop.go b/modules/stop.go index ad1a79e..17e8b7b 100644 --- a/modules/stop.go +++ b/modules/stop.go @@ -12,6 +12,8 @@ import ( var ( shutdownSignal = make(chan struct{}) shutdownSignalClosed = abool.NewBool(false) + + shutdownCompleteSignal = make(chan struct{}) ) // ShuttingDown returns a channel read on the global shutdown signal. @@ -45,6 +47,7 @@ func Shutdown() error { } log.Shutdown() + close(shutdownCompleteSignal) return err } diff --git a/modules/worker.go b/modules/worker.go index 951e91d..b9d6567 100644 --- a/modules/worker.go +++ b/modules/worker.go @@ -3,6 +3,7 @@ package modules import ( "context" "errors" + "fmt" "sync/atomic" "time" @@ -101,7 +102,27 @@ func (m *Module) runWorker(name string, fn func(context.Context) error) (err err return } -func (m *Module) runModuleCtrlFn(name string, fn func() error) (err error) { +func (m *Module) runCtrlFnWithTimeout(name string, timeout time.Duration, fn func() error) error { + + stopFnError := make(chan error) + go func() { + stopFnError <- m.runCtrlFn(name, fn) + }() + + // wait for results + select { + case err := <-stopFnError: + return err + case <-time.After(timeout): + return fmt.Errorf("timed out (%s)", timeout) + } +} + +func (m *Module) runCtrlFn(name string, fn func() error) (err error) { + if fn == nil { + return + } + defer func() { // recover from panic panicVal := recover()