Improve stopping of modules

This commit is contained in:
Daniel 2023-10-13 11:46:17 +02:00
parent 3dbffd9c1a
commit 3b22f8497d
2 changed files with 78 additions and 67 deletions

View file

@ -57,10 +57,11 @@ type Module struct { //nolint:maligned
// start // start
startComplete chan struct{} startComplete chan struct{}
// stop // stop
Ctx context.Context Ctx context.Context
cancelCtx func() cancelCtx func()
stopFlag *abool.AtomicBool stopFlag *abool.AtomicBool
stopComplete chan struct{} stopCompleted *abool.AtomicBool
stopComplete chan struct{}
// workers/tasks // workers/tasks
ctrlFuncRunning *abool.AtomicBool ctrlFuncRunning *abool.AtomicBool
@ -255,12 +256,10 @@ func (m *Module) checkIfStopComplete() {
atomic.LoadInt32(m.taskCnt) == 0 && atomic.LoadInt32(m.taskCnt) == 0 &&
atomic.LoadInt32(m.microTaskCnt) == 0 { atomic.LoadInt32(m.microTaskCnt) == 0 {
m.Lock() if m.stopCompleted.SetToIf(false, true) {
defer m.Unlock() m.Lock()
defer m.Unlock()
if m.stopComplete != nil {
close(m.stopComplete) close(m.stopComplete)
m.stopComplete = nil
} }
} }
} }
@ -283,60 +282,56 @@ func (m *Module) stop(reports chan *report) {
// Reset start/stop signal channels. // Reset start/stop signal channels.
m.startComplete = make(chan struct{}) m.startComplete = make(chan struct{})
m.stopComplete = make(chan struct{}) m.stopComplete = make(chan struct{})
m.stopCompleted.SetTo(false)
// Make a copy of the stop channel. // Set status.
stopComplete := m.stopComplete
// Set status and cancel context.
m.status = StatusStopping m.status = StatusStopping
m.stopFlag.Set()
m.cancelCtx()
go m.stopAllTasks(reports, stopComplete) go m.stopAllTasks(reports)
} }
func (m *Module) stopAllTasks(reports chan *report, stopComplete chan struct{}) { func (m *Module) stopAllTasks(reports chan *report) {
// start shutdown function // Manually set the control function flag in order to stop completion by race
var stopFnError error // condition before stop function has even started.
stopFuncRunning := abool.New() m.ctrlFuncRunning.Set()
if m.stopFn != nil {
stopFuncRunning.Set() // Set stop flag for everyone checking this flag before we activate any stop trigger.
go func() { m.stopFlag.Set()
stopFnError = m.runCtrlFn("stop module", m.stopFn)
stopFuncRunning.UnSet() // Cancel the context to notify all workers and tasks.
m.checkIfStopComplete() m.cancelCtx()
}()
} else { // Start stop function.
m.checkIfStopComplete() stopFnError := m.startCtrlFn("stop module", m.stopFn)
}
// wait for results // wait for results
select { select {
case <-stopComplete: case <-m.stopComplete:
// case <-time.After(moduleStopTimeout): // Complete!
case <-time.After(moduleStopTimeout): case <-time.After(moduleStopTimeout):
log.Warningf( log.Warningf(
"%s: timed out while waiting for stopfn/workers/tasks to finish: stopFn=%v/%v workers=%d tasks=%d microtasks=%d, continuing shutdown...", "%s: timed out while waiting for stopfn/workers/tasks to finish: stopFn=%v workers=%d tasks=%d microtasks=%d, continuing shutdown...",
m.Name, m.Name,
stopFuncRunning.IsSet(), m.ctrlFuncRunning.IsSet(), m.ctrlFuncRunning.IsSet(),
atomic.LoadInt32(m.workerCnt), atomic.LoadInt32(m.workerCnt),
atomic.LoadInt32(m.taskCnt), atomic.LoadInt32(m.taskCnt),
atomic.LoadInt32(m.microTaskCnt), atomic.LoadInt32(m.microTaskCnt),
) )
} }
// collect error // Check for stop fn status.
var err error var err error
if stopFuncRunning.IsNotSet() && stopFnError != nil { select {
err = stopFnError case err = <-stopFnError:
} if err != nil {
// set status // Set error as module error.
if err != nil { m.Error(
m.Error( fmt.Sprintf("%s:stop-failed", m.Name),
fmt.Sprintf("%s:stop-failed", m.Name), fmt.Sprintf("Stopping module %s failed", m.Name),
fmt.Sprintf("Stopping module %s failed", m.Name), fmt.Sprintf("Failed to stop module: %s", err.Error()),
fmt.Sprintf("Failed to stop module: %s", err.Error()), )
) }
default:
} }
// Always set to offline in order to let other modules shutdown in order. // Always set to offline in order to let other modules shutdown in order.
@ -384,7 +379,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
Name: name, Name: name,
enabled: abool.NewBool(false), enabled: abool.NewBool(false),
enabledAsDependency: abool.NewBool(false), enabledAsDependency: abool.NewBool(false),
sleepMode: abool.NewBool(true), sleepMode: abool.NewBool(true), // Change (for init) is triggered below.
sleepWaitingChannel: make(chan time.Time), sleepWaitingChannel: make(chan time.Time),
prepFn: prep, prepFn: prep,
startFn: start, startFn: start,
@ -393,6 +388,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
Ctx: ctx, Ctx: ctx,
cancelCtx: cancelCtx, cancelCtx: cancelCtx,
stopFlag: abool.NewBool(false), stopFlag: abool.NewBool(false),
stopCompleted: abool.NewBool(true),
ctrlFuncRunning: abool.NewBool(false), ctrlFuncRunning: abool.NewBool(false),
workerCnt: &workerCnt, workerCnt: &workerCnt,
taskCnt: &taskCnt, taskCnt: &taskCnt,
@ -401,7 +397,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
depNames: dependencies, depNames: dependencies,
} }
// Sleep mode is disabled by default // Sleep mode is disabled by default.
newModule.Sleep(false) newModule.Sleep(false)
return newModule return newModule

View file

@ -135,10 +135,7 @@ func (m *Module) runWorker(name string, fn func(context.Context) error) (err err
} }
func (m *Module) runCtrlFnWithTimeout(name string, timeout time.Duration, fn func() error) error { func (m *Module) runCtrlFnWithTimeout(name string, timeout time.Duration, fn func() error) error {
stopFnError := make(chan error) stopFnError := m.startCtrlFn(name, fn)
go func() {
stopFnError <- m.runCtrlFn(name, fn)
}()
// wait for results // wait for results
select { select {
@ -149,26 +146,44 @@ func (m *Module) runCtrlFnWithTimeout(name string, timeout time.Duration, fn fun
} }
} }
func (m *Module) runCtrlFn(name string, fn func() error) (err error) { func (m *Module) startCtrlFn(name string, fn func() error) chan error {
ctrlFnError := make(chan error, 1)
// If no function is given, still act as if it was run.
if fn == nil { if fn == nil {
return // Signal finish.
m.ctrlFuncRunning.UnSet()
m.checkIfStopComplete()
// Report nil error and return.
ctrlFnError <- nil
return ctrlFnError
} }
if m.ctrlFuncRunning.SetToIf(false, true) { // Signal that a control function is running.
defer m.ctrlFuncRunning.SetToIf(true, false) m.ctrlFuncRunning.Set()
}
defer func() { // Start control function in goroutine.
// recover from panic go func() {
panicVal := recover() // Recover from panic and reset control function signal.
if panicVal != nil { defer func() {
me := m.NewPanicError(name, "module-control", panicVal) // recover from panic
me.Report() panicVal := recover()
err = me if panicVal != nil {
} me := m.NewPanicError(name, "module-control", panicVal)
me.Report()
ctrlFnError <- fmt.Errorf("panic: %s", panicVal)
}
// Signal finish.
m.ctrlFuncRunning.UnSet()
m.checkIfStopComplete()
}()
// Run control function and report error.
err := fn()
ctrlFnError <- err
}() }()
// run return ctrlFnError
err = fn()
return
} }