Use module counters instead of waitgroup when shutting down

This commit is contained in:
Daniel 2021-09-26 13:42:26 +02:00
parent 92169d826d
commit 483cbad600
4 changed files with 57 additions and 38 deletions

View file

@ -130,7 +130,6 @@ func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err
// start for module // start for module
// hint: only microTasks global var is important for scheduling, others can be set here // hint: only microTasks global var is important for scheduling, others can be set here
atomic.AddInt32(m.microTaskCnt, 1) atomic.AddInt32(m.microTaskCnt, 1)
m.waitGroup.Add(1)
// set up recovery // set up recovery
defer func() { defer func() {
@ -145,7 +144,7 @@ func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err
// finish for module // finish for module
atomic.AddInt32(m.microTaskCnt, -1) atomic.AddInt32(m.microTaskCnt, -1)
m.waitGroup.Done() m.checkIfStopComplete()
// finish and possibly trigger next task // finish and possibly trigger next task
atomic.AddInt32(microTasks, -1) atomic.AddInt32(microTasks, -1)

View file

@ -52,15 +52,16 @@ type Module struct { //nolint:maligned // not worth the effort
// start // start
startComplete chan struct{} startComplete chan struct{}
// stop // stop
Ctx context.Context Ctx context.Context
cancelCtx func() cancelCtx func()
stopFlag *abool.AtomicBool stopFlag *abool.AtomicBool
stopComplete chan struct{}
// workers/tasks // workers/tasks
workerCnt *int32 ctrlFuncRunning *abool.AtomicBool
taskCnt *int32 workerCnt *int32
microTaskCnt *int32 taskCnt *int32
waitGroup sync.WaitGroup microTaskCnt *int32
// events // events
eventHooks map[string]*eventHooks eventHooks map[string]*eventHooks
@ -205,6 +206,23 @@ func (m *Module) start(reports chan *report) {
}() }()
} }
func (m *Module) checkIfStopComplete() {
if m.stopFlag.IsSet() &&
m.ctrlFuncRunning.IsNotSet() &&
atomic.LoadInt32(m.workerCnt) == 0 &&
atomic.LoadInt32(m.taskCnt) == 0 &&
atomic.LoadInt32(m.microTaskCnt) == 0 {
m.Lock()
defer m.Unlock()
if m.stopComplete != nil {
close(m.stopComplete)
m.stopComplete = nil
}
}
}
func (m *Module) stop(reports chan *report) { func (m *Module) stop(reports chan *report) {
// check and set intermediate status // check and set intermediate status
m.Lock() m.Lock()
@ -218,47 +236,48 @@ func (m *Module) stop(reports chan *report) {
}() }()
return return
} }
m.status = StatusStopping
// reset start management // Reset start/stop signal channels.
m.startComplete = make(chan struct{}) m.startComplete = make(chan struct{})
// init stop management m.stopComplete = make(chan struct{})
m.cancelCtx()
// Make a copy of the stop channel.
stopComplete := m.stopComplete
// Set status and cancel context.
m.status = StatusStopping
m.stopFlag.Set() m.stopFlag.Set()
m.cancelCtx()
m.Unlock() m.Unlock()
go m.stopAllTasks(reports) go m.stopAllTasks(reports, stopComplete)
} }
func (m *Module) stopAllTasks(reports chan *report) { func (m *Module) stopAllTasks(reports chan *report, stopComplete chan struct{}) {
// start shutdown function // start shutdown function
stopFnFinished := abool.NewBool(false)
var stopFnError error var stopFnError error
stopFuncRunning := abool.New()
if m.stopFn != nil { if m.stopFn != nil {
m.waitGroup.Add(1) stopFuncRunning.Set()
go func() { go func() {
stopFnError = m.runCtrlFn("stop module", m.stopFn) stopFnError = m.runCtrlFn("stop module", m.stopFn)
stopFnFinished.Set() stopFuncRunning.UnSet()
m.waitGroup.Done() m.checkIfStopComplete()
}() }()
} else {
m.checkIfStopComplete()
} }
// wait for workers and stop fn
done := make(chan struct{})
go func() {
m.waitGroup.Wait()
close(done)
}()
// wait for results // wait for results
select { select {
case <-done: case <-stopComplete:
case <-time.After(moduleStopTimeout): // case <-time.After(moduleStopTimeout):
case <-time.After(3 * time.Second):
log.Warningf( log.Warningf(
"%s: timed out while waiting for stopfn/workers/tasks to finish: stopFn=%v workers=%d tasks=%d microtasks=%d, continuing shutdown...", "%s: timed out while waiting for stopfn/workers/tasks to finish: stopFn=%v/%v workers=%d tasks=%d microtasks=%d, continuing shutdown...",
m.Name, m.Name,
stopFnFinished.IsSet(), stopFuncRunning.IsSet(), m.ctrlFuncRunning.IsSet(),
atomic.LoadInt32(m.workerCnt), atomic.LoadInt32(m.workerCnt),
atomic.LoadInt32(m.taskCnt), atomic.LoadInt32(m.taskCnt),
atomic.LoadInt32(m.microTaskCnt), atomic.LoadInt32(m.microTaskCnt),
@ -267,7 +286,7 @@ func (m *Module) stopAllTasks(reports chan *report) {
// collect error // collect error
var err error var err error
if stopFnFinished.IsSet() && stopFnError != nil { if stopFuncRunning.IsNotSet() && stopFnError != nil {
err = stopFnError err = stopFnError
} }
// set status // set status
@ -328,10 +347,10 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
Ctx: ctx, Ctx: ctx,
cancelCtx: cancelCtx, cancelCtx: cancelCtx,
stopFlag: abool.NewBool(false), stopFlag: abool.NewBool(false),
ctrlFuncRunning: abool.NewBool(false),
workerCnt: &workerCnt, workerCnt: &workerCnt,
taskCnt: &taskCnt, taskCnt: &taskCnt,
microTaskCnt: &microTaskCnt, microTaskCnt: &microTaskCnt,
waitGroup: sync.WaitGroup{},
eventHooks: make(map[string]*eventHooks), eventHooks: make(map[string]*eventHooks),
depNames: dependencies, depNames: dependencies,
} }

View file

@ -330,7 +330,6 @@ func (t *Task) executeWithLocking() {
// start for module // start for module
// hint: only queueWg global var is important for scheduling, others can be set here // hint: only queueWg global var is important for scheduling, others can be set here
atomic.AddInt32(t.module.taskCnt, 1) atomic.AddInt32(t.module.taskCnt, 1)
t.module.waitGroup.Add(1)
defer func() { defer func() {
// recover from panic // recover from panic
@ -343,7 +342,7 @@ func (t *Task) executeWithLocking() {
// finish for module // finish for module
atomic.AddInt32(t.module.taskCnt, -1) atomic.AddInt32(t.module.taskCnt, -1)
t.module.waitGroup.Done() t.module.checkIfStopComplete()
t.lock.Lock() t.lock.Lock()

View file

@ -39,10 +39,9 @@ func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
} }
atomic.AddInt32(m.workerCnt, 1) atomic.AddInt32(m.workerCnt, 1)
m.waitGroup.Add(1)
defer func() { defer func() {
atomic.AddInt32(m.workerCnt, -1) atomic.AddInt32(m.workerCnt, -1)
m.waitGroup.Done() m.checkIfStopComplete()
}() }()
return m.runWorker(name, fn) return m.runWorker(name, fn)
@ -60,10 +59,9 @@ func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration,
func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) { func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
atomic.AddInt32(m.workerCnt, 1) atomic.AddInt32(m.workerCnt, 1)
m.waitGroup.Add(1)
defer func() { defer func() {
atomic.AddInt32(m.workerCnt, -1) atomic.AddInt32(m.workerCnt, -1)
m.waitGroup.Done() m.checkIfStopComplete()
}() }()
if backoffDuration == 0 { if backoffDuration == 0 {
@ -143,6 +141,10 @@ func (m *Module) runCtrlFn(name string, fn func() error) (err error) {
return return
} }
if m.ctrlFuncRunning.SetToIf(false, true) {
defer m.ctrlFuncRunning.SetToIf(true, false)
}
defer func() { defer func() {
// recover from panic // recover from panic
panicVal := recover() panicVal := recover()