Improve modules package

This commit is contained in:
Daniel 2019-10-25 13:38:37 +02:00
parent 00f545c3a4
commit 2508d28465
6 changed files with 91 additions and 33 deletions

View file

@ -10,7 +10,7 @@ import (
var (
errorReportingChannel chan *ModuleError
reportToStdErr bool
reportToStdErr = true
reportingLock sync.RWMutex
)

View file

@ -26,12 +26,14 @@ func (m *Module) processEventTrigger(event string, data interface{}) {
hooks, ok := m.eventHooks[event]
if !ok {
log.Warningf("%s: tried to trigger non-existent event %s", m.Name, event)
log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, event)
return
}
for _, hook := range hooks {
go m.runEventHook(hook, event, data)
if !hook.hookingModule.ShutdownInProgress() {
go m.runEventHook(hook, event, data)
}
}
}
@ -79,7 +81,7 @@ func (m *Module) RegisterEventHook(module string, event string, description stri
eventModule, ok = modules[module]
modulesLock.RUnlock()
if !ok {
return fmt.Errorf("module %s does not exist", module)
return fmt.Errorf(`module "%s" does not exist`, module)
}
}
@ -88,7 +90,7 @@ func (m *Module) RegisterEventHook(module string, event string, description stri
defer eventModule.eventHooksLock.Unlock()
hooks, ok := eventModule.eventHooks[event]
if !ok {
return fmt.Errorf("module %s event %s does not exist", eventModule.Name, event)
return fmt.Errorf(`event "%s/%s" does not exist`, eventModule.Name, event)
}
// add hook

View file

@ -45,19 +45,49 @@ func SetMaxConcurrentMicroTasks(n int) {
}
}
// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) error {
// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The call starts a new goroutine and returns immediately. The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) {
go func() {
err := m.RunMicroTask(name, fn)
if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err)
}
}()
}
// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 3 seconds). The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) {
go func() {
err := m.RunMediumPriorityMicroTask(name, fn)
if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err)
}
}()
}
// StartLowPriorityMicroTask starts a new MicroTask with low priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 15 seconds). The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) {
go func() {
err := m.RunLowPriorityMicroTask(name, fn)
if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err)
}
}()
}
// RunMicroTask runs a new MicroTask with high priority. It will start immediately. The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) RunMicroTask(name *string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
return errNoModule
}
atomic.AddInt32(microTasks, 1)
atomic.AddInt32(microTasks, 1) // increase global counter here, as high priority tasks are not started by the scheduler, where this counter is usually increased
return m.runMicroTask(name, fn)
}
// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. It will wait until given a go (max 3 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) error {
// RunMediumPriorityMicroTask runs a new MicroTask with medium priority. It will wait until a slot becomes available (max 3 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) RunMediumPriorityMicroTask(name *string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
return errNoModule
@ -76,8 +106,8 @@ func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Cont
return m.runMicroTask(name, fn)
}
// StartLowPriorityMicroTask starts a new MicroTask with low priority. It will wait until given a go (max 15 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) error {
// RunLowPriorityMicroTask runs a new MicroTask with low priority. It will wait until a slot becomes available (max 15 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) RunLowPriorityMicroTask(name *string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
return errNoModule
@ -109,7 +139,7 @@ func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err
if panicVal != nil {
me := m.NewPanicError(*name, "microtask", panicVal)
me.Report()
log.Errorf("%s: microtask %s panicked: %s\n%s", m.Name, *name, panicVal, me.StackTrace)
log.Errorf("%s: microtask %s panicked: %s", m.Name, *name, panicVal)
err = me
}
@ -165,7 +195,10 @@ microTaskManageLoop:
atomic.AddInt32(microTasks, 1)
} else {
// wait for signal that a task was completed
<-microTaskFinished
select {
case <-microTaskFinished:
case <-time.After(1 * time.Second):
}
}
}

View file

@ -42,7 +42,7 @@ func TestMicroTaskWaiting(t *testing.T) {
go func() {
defer mtwWaitGroup.Done()
// exec at slot 1
_ = mtModule.StartMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "1" // slot 1
time.Sleep(mtwSleepDuration * 5)
mtwOutputChannel <- "2" // slot 5
@ -53,7 +53,7 @@ func TestMicroTaskWaiting(t *testing.T) {
time.Sleep(mtwSleepDuration * 1)
// clear clearances
_ = mtModule.StartMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error {
return nil
})
@ -61,7 +61,7 @@ func TestMicroTaskWaiting(t *testing.T) {
go func() {
defer mtwWaitGroup.Done()
// exec at slot 2
_ = mtModule.StartLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "7" // slot 16
return nil
})
@ -74,7 +74,7 @@ func TestMicroTaskWaiting(t *testing.T) {
defer mtwWaitGroup.Done()
time.Sleep(mtwSleepDuration * 8)
// exec at slot 10
_ = mtModule.StartMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "4" // slot 10
time.Sleep(mtwSleepDuration * 5)
mtwOutputChannel <- "6" // slot 15
@ -86,7 +86,7 @@ func TestMicroTaskWaiting(t *testing.T) {
go func() {
defer mtwWaitGroup.Done()
// exec at slot 3
_ = mtModule.StartMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "3" // slot 6
time.Sleep(mtwSleepDuration * 7)
mtwOutputChannel <- "5" // slot 13
@ -122,7 +122,7 @@ var mtoWaitCh chan struct{}
func mediumPrioTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
_ = mtModule.StartMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
mtoOutputChannel <- "1"
time.Sleep(2 * time.Millisecond)
return nil
@ -132,7 +132,7 @@ func mediumPrioTaskTester() {
func lowPrioTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
_ = mtModule.StartLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
mtoOutputChannel <- "2"
time.Sleep(2 * time.Millisecond)
return nil

View file

@ -118,7 +118,14 @@ func Register(name string, prep, start, stop func() error, dependencies ...strin
modulesLock.Lock()
defer modulesLock.Unlock()
// check for already existing module
_, ok := modules[name]
if ok {
panic(fmt.Sprintf("modules: module %s is already registered", name))
}
// add new module
modules[name] = newModule
return newModule
}

View file

@ -16,9 +16,21 @@ const (
)
var (
errNoModule = errors.New("missing module (is nil!)")
// ErrRestartNow may be returned (wrapped) by service workers to request an immediate restart.
ErrRestartNow = errors.New("requested restart")
errNoModule = errors.New("missing module (is nil!)")
)
// StartWorker directly starts a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to StartWorker starts a new goroutine and returns immediately.
func (m *Module) StartWorker(name string, fn func(context.Context) error) {
go func() {
err := m.RunWorker(name, fn)
if err != nil {
log.Warningf("%s: worker %s failed: %s", m.Name, name, err)
}
}()
}
// RunWorker directly runs a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to RunWorker blocks until the worker is finished.
func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
if m == nil {
@ -67,18 +79,22 @@ func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn
err := m.runWorker(name, fn)
if err != nil {
// reset fail counter if running without error for some time
if time.Now().Add(-5 * time.Minute).After(lastFail) {
failCnt = 0
if !errors.Is(err, ErrRestartNow) {
// reset fail counter if running without error for some time
if time.Now().Add(-5 * time.Minute).After(lastFail) {
failCnt = 0
}
// increase fail counter and set last failed time
failCnt++
lastFail = time.Now()
// log error
sleepFor := time.Duration(failCnt) * backoffDuration
log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
time.Sleep(sleepFor)
// loop to restart
} else {
log.Infof("%s: service-worker %s %s - restarting now", m.Name, name, err)
}
// increase fail counter and set last failed time
failCnt++
lastFail = time.Now()
// log error
sleepFor := time.Duration(failCnt) * backoffDuration
log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
time.Sleep(sleepFor)
// loop to restart
} else {
// finish
return