Split worker types

This commit is contained in:
Daniel 2019-09-20 22:04:26 +02:00
parent aaa7aaf648
commit 77ed019f27
2 changed files with 49 additions and 31 deletions

View file

@ -8,12 +8,13 @@ import (
"github.com/safing/portbase/log" "github.com/safing/portbase/log"
) )
var ( // Worker Default Configuration
serviceBackoffDuration = 2 * time.Second const (
DefaultBackoffDuration = 2 * time.Second
) )
// StartWorker starts a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. You may declare a worker as a service, which will then be automatically restarted in case of an error. // RunWorker directly runs a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to RunWorker blocks until the worker is finished.
func (m *Module) StartWorker(name string, service bool, fn func(context.Context) error) error { func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
atomic.AddInt32(m.workerCnt, 1) atomic.AddInt32(m.workerCnt, 1)
m.waitGroup.Add(1) m.waitGroup.Add(1)
defer func() { defer func() {
@ -21,29 +22,44 @@ func (m *Module) StartWorker(name string, service bool, fn func(context.Context)
m.waitGroup.Done() m.waitGroup.Done()
}() }()
return m.runWorker(name, fn)
}
// StartServiceWorker starts a generic worker, which is automatically restarted in case of an error. A call to StartServiceWorker runs the service-worker in a new goroutine and returns immediately. `backoffDuration` specifies how to long to wait before restarts, multiplied by the number of failed attempts. Pass `0` for the default backoff duration. For custom error remediation functionality, build your own error handling procedure using calls to RunWorker.
func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
go m.runServiceWorker(name, backoffDuration, fn)
}
func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
atomic.AddInt32(m.workerCnt, 1)
m.waitGroup.Add(1)
defer func() {
atomic.AddInt32(m.workerCnt, -1)
m.waitGroup.Done()
}()
if backoffDuration == 0 {
backoffDuration = DefaultBackoffDuration
}
failCnt := 0 failCnt := 0
if service {
for { for {
if m.ShutdownInProgress() { if m.ShutdownInProgress() {
return nil return
} }
err := m.runWorker(name, fn) err := m.runWorker(name, fn)
if err != nil { if err != nil {
// log error and restart // log error and restart
failCnt++ failCnt++
sleepFor := time.Duration(failCnt) * serviceBackoffDuration sleepFor := time.Duration(failCnt) * backoffDuration
log.Errorf("module %s service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor) log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
time.Sleep(sleepFor) time.Sleep(sleepFor)
} else { } else {
// clean finish // finish
return nil return
} }
} }
} else {
return m.runWorker(name, fn)
}
} }
func (m *Module) runWorker(name string, fn func(context.Context) error) (err error) { func (m *Module) runWorker(name string, fn func(context.Context) error) (err error) {

View file

@ -3,6 +3,7 @@ package modules
import ( import (
"context" "context"
"errors" "errors"
"sync"
"testing" "testing"
"time" "time"
) )
@ -14,7 +15,7 @@ var (
func TestWorker(t *testing.T) { func TestWorker(t *testing.T) {
// test basic functionality // test basic functionality
err := wModule.StartWorker("test worker", false, func(ctx context.Context) error { err := wModule.RunWorker("test worker", func(ctx context.Context) error {
return nil return nil
}) })
if err != nil { if err != nil {
@ -22,7 +23,7 @@ func TestWorker(t *testing.T) {
} }
// test returning an error // test returning an error
err = wModule.StartWorker("test worker", false, func(ctx context.Context) error { err = wModule.RunWorker("test worker", func(ctx context.Context) error {
return errTest return errTest
}) })
if err != errTest { if err != errTest {
@ -30,25 +31,26 @@ func TestWorker(t *testing.T) {
} }
// test service functionality // test service functionality
serviceBackoffDuration = 2 * time.Millisecond // speed up backoff
failCnt := 0 failCnt := 0
err = wModule.StartWorker("test worker", true, func(ctx context.Context) error { var sWTestGroup sync.WaitGroup
sWTestGroup.Add(1)
wModule.StartServiceWorker("test service-worker", 2*time.Millisecond, func(ctx context.Context) error {
failCnt++ failCnt++
t.Logf("service-worker test run #%d", failCnt) t.Logf("service-worker test run #%d", failCnt)
if failCnt >= 3 { if failCnt >= 3 {
sWTestGroup.Done()
return nil return nil
} }
return errTest return errTest
}) })
if err == errTest { // wait for service-worker to complete test
t.Errorf("service-worker failed with unexpected error: %s", err) sWTestGroup.Wait()
}
if failCnt != 3 { if failCnt != 3 {
t.Errorf("service-worker failed to restart") t.Errorf("service-worker failed to restart")
} }
// test panic recovery // test panic recovery
err = wModule.StartWorker("test worker", false, func(ctx context.Context) error { err = wModule.RunWorker("test worker", func(ctx context.Context) error {
var a []byte var a []byte
_ = a[0] //nolint // we want to runtime panic! _ = a[0] //nolint // we want to runtime panic!
return nil return nil