diff --git a/modules/worker.go b/modules/worker.go index b44a646..c385c6d 100644 --- a/modules/worker.go +++ b/modules/worker.go @@ -8,12 +8,13 @@ import ( "github.com/safing/portbase/log" ) -var ( - serviceBackoffDuration = 2 * time.Second +// Worker Default Configuration +const ( + DefaultBackoffDuration = 2 * time.Second ) -// StartWorker starts a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. You may declare a worker as a service, which will then be automatically restarted in case of an error. -func (m *Module) StartWorker(name string, service bool, fn func(context.Context) error) error { +// RunWorker directly runs a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to RunWorker blocks until the worker is finished. +func (m *Module) RunWorker(name string, fn func(context.Context) error) error { atomic.AddInt32(m.workerCnt, 1) m.waitGroup.Add(1) defer func() { @@ -21,28 +22,43 @@ func (m *Module) StartWorker(name string, service bool, fn func(context.Context) m.waitGroup.Done() }() + return m.runWorker(name, fn) +} + +// StartServiceWorker starts a generic worker, which is automatically restarted in case of an error. A call to StartServiceWorker runs the service-worker in a new goroutine and returns immediately. `backoffDuration` specifies how to long to wait before restarts, multiplied by the number of failed attempts. Pass `0` for the default backoff duration. For custom error remediation functionality, build your own error handling procedure using calls to RunWorker. +func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) { + go m.runServiceWorker(name, backoffDuration, fn) +} + +func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) { + atomic.AddInt32(m.workerCnt, 1) + m.waitGroup.Add(1) + defer func() { + atomic.AddInt32(m.workerCnt, -1) + m.waitGroup.Done() + }() + + if backoffDuration == 0 { + backoffDuration = DefaultBackoffDuration + } failCnt := 0 - if service { - for { - if m.ShutdownInProgress() { - return nil - } - - err := m.runWorker(name, fn) - if err != nil { - // log error and restart - failCnt++ - sleepFor := time.Duration(failCnt) * serviceBackoffDuration - log.Errorf("module %s service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor) - time.Sleep(sleepFor) - } else { - // clean finish - return nil - } + for { + if m.ShutdownInProgress() { + return + } + + err := m.runWorker(name, fn) + if err != nil { + // log error and restart + failCnt++ + sleepFor := time.Duration(failCnt) * backoffDuration + log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor) + time.Sleep(sleepFor) + } else { + // finish + return } - } else { - return m.runWorker(name, fn) } } diff --git a/modules/worker_test.go b/modules/worker_test.go index 417580a..6c86dbf 100644 --- a/modules/worker_test.go +++ b/modules/worker_test.go @@ -3,6 +3,7 @@ package modules import ( "context" "errors" + "sync" "testing" "time" ) @@ -14,7 +15,7 @@ var ( func TestWorker(t *testing.T) { // test basic functionality - err := wModule.StartWorker("test worker", false, func(ctx context.Context) error { + err := wModule.RunWorker("test worker", func(ctx context.Context) error { return nil }) if err != nil { @@ -22,7 +23,7 @@ func TestWorker(t *testing.T) { } // test returning an error - err = wModule.StartWorker("test worker", false, func(ctx context.Context) error { + err = wModule.RunWorker("test worker", func(ctx context.Context) error { return errTest }) if err != errTest { @@ -30,25 +31,26 @@ func TestWorker(t *testing.T) { } // test service functionality - serviceBackoffDuration = 2 * time.Millisecond // speed up backoff failCnt := 0 - err = wModule.StartWorker("test worker", true, func(ctx context.Context) error { + var sWTestGroup sync.WaitGroup + sWTestGroup.Add(1) + wModule.StartServiceWorker("test service-worker", 2*time.Millisecond, func(ctx context.Context) error { failCnt++ t.Logf("service-worker test run #%d", failCnt) if failCnt >= 3 { + sWTestGroup.Done() return nil } return errTest }) - if err == errTest { - t.Errorf("service-worker failed with unexpected error: %s", err) - } + // wait for service-worker to complete test + sWTestGroup.Wait() if failCnt != 3 { t.Errorf("service-worker failed to restart") } // test panic recovery - err = wModule.StartWorker("test worker", false, func(ctx context.Context) error { + err = wModule.RunWorker("test worker", func(ctx context.Context) error { var a []byte _ = a[0] //nolint // we want to runtime panic! return nil