diff --git a/modules/microtasks.go b/modules/microtasks.go index f874e4f..67866fe 100644 --- a/modules/microtasks.go +++ b/modules/microtasks.go @@ -47,12 +47,22 @@ func SetMaxConcurrentMicroTasks(n int) { // StartMicroTask starts a new MicroTask with high priority. It will start immediately. The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied. func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) error { + if m == nil { + log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name) + return errNoModule + } + atomic.AddInt32(microTasks, 1) return m.runMicroTask(name, fn) } // StartMediumPriorityMicroTask starts a new MicroTask with medium priority. It will wait until given a go (max 3 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied. func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) error { + if m == nil { + log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name) + return errNoModule + } + // check if we can go immediately select { case <-mediumPriorityClearance: @@ -68,6 +78,11 @@ func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Cont // StartLowPriorityMicroTask starts a new MicroTask with low priority. It will wait until given a go (max 15 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied. func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) error { + if m == nil { + log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name) + return errNoModule + } + // check if we can go immediately select { case <-lowPriorityClearance: diff --git a/modules/modules.go b/modules/modules.go index d58e315..9594cff 100644 --- a/modules/modules.go +++ b/modules/modules.go @@ -67,14 +67,32 @@ func (m *Module) shutdown() error { m.shutdownFlag.Set() m.cancelCtx() + // start shutdown function + m.waitGroup.Add(1) + stopFnError := make(chan error) + go func() { + stopFnError <- m.runModuleCtrlFn("stop module", m.stop) + m.waitGroup.Done() + }() + // wait for workers done := make(chan struct{}) go func() { m.waitGroup.Wait() close(done) }() + + // wait for results select { + case err := <-stopFnError: + return err case <-done: + select { + case err := <-stopFnError: + return err + default: + return nil + } case <-time.After(3 * time.Second): log.Warningf( "%s: timed out while waiting for workers/tasks to finish: workers=%d tasks=%d microtasks=%d, continuing shutdown...", @@ -84,9 +102,7 @@ func (m *Module) shutdown() error { atomic.LoadInt32(m.microTaskCnt), ) } - - // call shutdown function - return m.stop() + return nil } func dummyAction() error { diff --git a/modules/start.go b/modules/start.go index bfbe36d..3c3e544 100644 --- a/modules/start.go +++ b/modules/start.go @@ -106,7 +106,7 @@ func prepareModules() error { go func() { reports <- &report{ module: execM, - err: execM.prep(), + err: execM.runModuleCtrlFn("prep module", execM.prep), } }() } @@ -154,7 +154,7 @@ func startModules() error { go func() { reports <- &report{ module: execM, - err: execM.start(), + err: execM.runModuleCtrlFn("start module", execM.start), } }() } diff --git a/modules/tasks.go b/modules/tasks.go index 7e695d8..2af4fb2 100644 --- a/modules/tasks.go +++ b/modules/tasks.go @@ -3,6 +3,7 @@ package modules import ( "container/list" "context" + "fmt" "sync" "sync/atomic" "time" @@ -59,6 +60,10 @@ const ( // NewTask creates a new task with a descriptive name (non-unique), a optional deadline, and the task function to be executed. You must call one of Queue, Prioritize, StartASAP, Schedule or Repeat in order to have the Task executed. func (m *Module) NewTask(name string, fn func(context.Context, *Task)) *Task { + if m == nil { + log.Errorf(`modules: cannot create task "%s" with nil module`, name) + } + return &Task{ name: name, module: m, @@ -68,6 +73,10 @@ func (m *Module) NewTask(name string, fn func(context.Context, *Task)) *Task { } func (t *Task) isActive() bool { + if t.module == nil { + return false + } + return !t.canceled && !t.module.ShutdownInProgress() } @@ -178,6 +187,7 @@ func (t *Task) Repeat(interval time.Duration) *Task { t.lock.Lock() t.repeat = interval t.executeAt = time.Now().Add(t.repeat) + t.addToSchedule() t.lock.Unlock() return t @@ -194,6 +204,10 @@ func (t *Task) Cancel() { } func (t *Task) runWithLocking() { + if t.module == nil { + return + } + // wait for good timeslot regarding microtasks select { case <-taskTimeslot: @@ -308,6 +322,7 @@ func (t *Task) getExecuteAtWithLocking() time.Time { func (t *Task) addToSchedule() { scheduleLock.Lock() defer scheduleLock.Unlock() + // defer printTaskList(taskSchedule) // for debugging // notify scheduler defer func() { @@ -439,3 +454,23 @@ func taskScheduleHandler() { } } } + +func printTaskList(*list.List) { //nolint:unused,deadcode // for debugging, NOT production use + fmt.Println("Modules Task List:") + for e := taskSchedule.Front(); e != nil; e = e.Next() { + t, ok := e.Value.(*Task) + if ok { + fmt.Printf( + "%s:%s qu=%v ca=%v exec=%v at=%s rep=%s delay=%s\n", + t.module.Name, + t.name, + t.queued, + t.canceled, + t.executing, + t.executeAt, + t.repeat, + t.maxDelay, + ) + } + } +} diff --git a/modules/worker.go b/modules/worker.go index c385c6d..951e91d 100644 --- a/modules/worker.go +++ b/modules/worker.go @@ -2,6 +2,7 @@ package modules import ( "context" + "errors" "sync/atomic" "time" @@ -13,8 +14,17 @@ const ( DefaultBackoffDuration = 2 * time.Second ) +var ( + errNoModule = errors.New("missing module (is nil!)") +) + // RunWorker directly runs a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to RunWorker blocks until the worker is finished. func (m *Module) RunWorker(name string, fn func(context.Context) error) error { + if m == nil { + log.Errorf(`modules: cannot start worker "%s" with nil module`, name) + return errNoModule + } + atomic.AddInt32(m.workerCnt, 1) m.waitGroup.Add(1) defer func() { @@ -27,6 +37,11 @@ func (m *Module) RunWorker(name string, fn func(context.Context) error) error { // StartServiceWorker starts a generic worker, which is automatically restarted in case of an error. A call to StartServiceWorker runs the service-worker in a new goroutine and returns immediately. `backoffDuration` specifies how to long to wait before restarts, multiplied by the number of failed attempts. Pass `0` for the default backoff duration. For custom error remediation functionality, build your own error handling procedure using calls to RunWorker. func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) { + if m == nil { + log.Errorf(`modules: cannot start service worker "%s" with nil module`, name) + return + } + go m.runServiceWorker(name, backoffDuration, fn) } @@ -42,6 +57,7 @@ func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn backoffDuration = DefaultBackoffDuration } failCnt := 0 + lastFail := time.Now() for { if m.ShutdownInProgress() { @@ -50,11 +66,18 @@ func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn err := m.runWorker(name, fn) if err != nil { - // log error and restart + // reset fail counter if running without error for some time + if time.Now().Add(-5 * time.Minute).After(lastFail) { + failCnt = 0 + } + // increase fail counter and set last failed time failCnt++ + lastFail = time.Now() + // log error sleepFor := time.Duration(failCnt) * backoffDuration log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor) time.Sleep(sleepFor) + // loop to restart } else { // finish return @@ -77,3 +100,19 @@ func (m *Module) runWorker(name string, fn func(context.Context) error) (err err err = fn(m.Ctx) return } + +func (m *Module) runModuleCtrlFn(name string, fn func() error) (err error) { + defer func() { + // recover from panic + panicVal := recover() + if panicVal != nil { + me := m.NewPanicError(name, "module-control", panicVal) + me.Report() + err = me + } + }() + + // run + err = fn() + return +}