Fix and improve module tasks and workers

This commit is contained in:
Daniel 2019-10-04 23:50:23 +02:00
parent 807095f9bc
commit 233ca05158
5 changed files with 111 additions and 6 deletions

View file

@ -47,12 +47,22 @@ func SetMaxConcurrentMicroTasks(n int) {
// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
return errNoModule
}
atomic.AddInt32(microTasks, 1)
return m.runMicroTask(name, fn)
}
// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. It will wait until given a go (max 3 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
return errNoModule
}
// check if we can go immediately
select {
case <-mediumPriorityClearance:
@ -68,6 +78,11 @@ func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Cont
// StartLowPriorityMicroTask starts a new MicroTask with low priority. It will wait until given a go (max 15 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
return errNoModule
}
// check if we can go immediately
select {
case <-lowPriorityClearance:

View file

@ -67,14 +67,32 @@ func (m *Module) shutdown() error {
m.shutdownFlag.Set()
m.cancelCtx()
// start shutdown function
m.waitGroup.Add(1)
stopFnError := make(chan error)
go func() {
stopFnError <- m.runModuleCtrlFn("stop module", m.stop)
m.waitGroup.Done()
}()
// wait for workers
done := make(chan struct{})
go func() {
m.waitGroup.Wait()
close(done)
}()
// wait for results
select {
case err := <-stopFnError:
return err
case <-done:
select {
case err := <-stopFnError:
return err
default:
return nil
}
case <-time.After(3 * time.Second):
log.Warningf(
"%s: timed out while waiting for workers/tasks to finish: workers=%d tasks=%d microtasks=%d, continuing shutdown...",
@ -84,9 +102,7 @@ func (m *Module) shutdown() error {
atomic.LoadInt32(m.microTaskCnt),
)
}
// call shutdown function
return m.stop()
return nil
}
func dummyAction() error {

View file

@ -106,7 +106,7 @@ func prepareModules() error {
go func() {
reports <- &report{
module: execM,
err: execM.prep(),
err: execM.runModuleCtrlFn("prep module", execM.prep),
}
}()
}
@ -154,7 +154,7 @@ func startModules() error {
go func() {
reports <- &report{
module: execM,
err: execM.start(),
err: execM.runModuleCtrlFn("start module", execM.start),
}
}()
}

View file

@ -3,6 +3,7 @@ package modules
import (
"container/list"
"context"
"fmt"
"sync"
"sync/atomic"
"time"
@ -59,6 +60,10 @@ const (
// NewTask creates a new task with a descriptive name (non-unique), a optional deadline, and the task function to be executed. You must call one of Queue, Prioritize, StartASAP, Schedule or Repeat in order to have the Task executed.
func (m *Module) NewTask(name string, fn func(context.Context, *Task)) *Task {
if m == nil {
log.Errorf(`modules: cannot create task "%s" with nil module`, name)
}
return &Task{
name: name,
module: m,
@ -68,6 +73,10 @@ func (m *Module) NewTask(name string, fn func(context.Context, *Task)) *Task {
}
func (t *Task) isActive() bool {
if t.module == nil {
return false
}
return !t.canceled && !t.module.ShutdownInProgress()
}
@ -178,6 +187,7 @@ func (t *Task) Repeat(interval time.Duration) *Task {
t.lock.Lock()
t.repeat = interval
t.executeAt = time.Now().Add(t.repeat)
t.addToSchedule()
t.lock.Unlock()
return t
@ -194,6 +204,10 @@ func (t *Task) Cancel() {
}
func (t *Task) runWithLocking() {
if t.module == nil {
return
}
// wait for good timeslot regarding microtasks
select {
case <-taskTimeslot:
@ -308,6 +322,7 @@ func (t *Task) getExecuteAtWithLocking() time.Time {
func (t *Task) addToSchedule() {
scheduleLock.Lock()
defer scheduleLock.Unlock()
// defer printTaskList(taskSchedule) // for debugging
// notify scheduler
defer func() {
@ -439,3 +454,23 @@ func taskScheduleHandler() {
}
}
}
func printTaskList(*list.List) { //nolint:unused,deadcode // for debugging, NOT production use
fmt.Println("Modules Task List:")
for e := taskSchedule.Front(); e != nil; e = e.Next() {
t, ok := e.Value.(*Task)
if ok {
fmt.Printf(
"%s:%s qu=%v ca=%v exec=%v at=%s rep=%s delay=%s\n",
t.module.Name,
t.name,
t.queued,
t.canceled,
t.executing,
t.executeAt,
t.repeat,
t.maxDelay,
)
}
}
}

View file

@ -2,6 +2,7 @@ package modules
import (
"context"
"errors"
"sync/atomic"
"time"
@ -13,8 +14,17 @@ const (
DefaultBackoffDuration = 2 * time.Second
)
var (
errNoModule = errors.New("missing module (is nil!)")
)
// RunWorker directly runs a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to RunWorker blocks until the worker is finished.
func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start worker "%s" with nil module`, name)
return errNoModule
}
atomic.AddInt32(m.workerCnt, 1)
m.waitGroup.Add(1)
defer func() {
@ -27,6 +37,11 @@ func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
// StartServiceWorker starts a generic worker, which is automatically restarted in case of an error. A call to StartServiceWorker runs the service-worker in a new goroutine and returns immediately. `backoffDuration` specifies how to long to wait before restarts, multiplied by the number of failed attempts. Pass `0` for the default backoff duration. For custom error remediation functionality, build your own error handling procedure using calls to RunWorker.
func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
if m == nil {
log.Errorf(`modules: cannot start service worker "%s" with nil module`, name)
return
}
go m.runServiceWorker(name, backoffDuration, fn)
}
@ -42,6 +57,7 @@ func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn
backoffDuration = DefaultBackoffDuration
}
failCnt := 0
lastFail := time.Now()
for {
if m.ShutdownInProgress() {
@ -50,11 +66,18 @@ func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn
err := m.runWorker(name, fn)
if err != nil {
// log error and restart
// reset fail counter if running without error for some time
if time.Now().Add(-5 * time.Minute).After(lastFail) {
failCnt = 0
}
// increase fail counter and set last failed time
failCnt++
lastFail = time.Now()
// log error
sleepFor := time.Duration(failCnt) * backoffDuration
log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
time.Sleep(sleepFor)
// loop to restart
} else {
// finish
return
@ -77,3 +100,19 @@ func (m *Module) runWorker(name string, fn func(context.Context) error) (err err
err = fn(m.Ctx)
return
}
func (m *Module) runModuleCtrlFn(name string, fn func() error) (err error) {
defer func() {
// recover from panic
panicVal := recover()
if panicVal != nil {
me := m.NewPanicError(name, "module-control", panicVal)
me.Report()
err = me
}
}()
// run
err = fn()
return
}