safing-portbase/modules/worker.go
2023-10-13 11:46:17 +02:00

189 lines
5.3 KiB
Go

package modules
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/safing/portbase/log"
)
// Default Worker Configuration.
const (
DefaultBackoffDuration = 2 * time.Second
)
var (
// ErrRestartNow may be returned (wrapped) by service workers to request an immediate restart.
ErrRestartNow = errors.New("requested restart")
errNoModule = errors.New("missing module (is nil!)")
)
// StartWorker directly starts a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to StartWorker starts a new goroutine and returns immediately.
func (m *Module) StartWorker(name string, fn func(context.Context) error) {
go func() {
err := m.RunWorker(name, fn)
switch {
case err == nil:
return
case errors.Is(err, context.Canceled):
log.Debugf("%s: worker %s was canceled: %s", m.Name, name, err)
default:
log.Errorf("%s: worker %s failed: %s", m.Name, name, err)
}
}()
}
// RunWorker directly runs a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to RunWorker blocks until the worker is finished.
func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start worker "%s" with nil module`, name)
return errNoModule
}
atomic.AddInt32(m.workerCnt, 1)
defer func() {
atomic.AddInt32(m.workerCnt, -1)
m.checkIfStopComplete()
}()
return m.runWorker(name, fn)
}
// StartServiceWorker starts a generic worker, which is automatically restarted in case of an error. A call to StartServiceWorker runs the service-worker in a new goroutine and returns immediately. `backoffDuration` specifies how to long to wait before restarts, multiplied by the number of failed attempts. Pass `0` for the default backoff duration. For custom error remediation functionality, build your own error handling procedure using calls to RunWorker.
// Returning nil error or context.Canceled will stop the service worker.
func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
if m == nil {
log.Errorf(`modules: cannot start service worker "%s" with nil module`, name)
return
}
go m.runServiceWorker(name, backoffDuration, fn)
}
func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
atomic.AddInt32(m.workerCnt, 1)
defer func() {
atomic.AddInt32(m.workerCnt, -1)
m.checkIfStopComplete()
}()
if backoffDuration == 0 {
backoffDuration = DefaultBackoffDuration
}
failCnt := 0
lastFail := time.Now()
for {
if m.IsStopping() {
return
}
err := m.runWorker(name, fn)
switch {
case err == nil:
// No error means that the worker is finished.
return
case errors.Is(err, context.Canceled):
// A canceled context also means that the worker is finished.
return
case errors.Is(err, ErrRestartNow):
// Worker requested a restart - silently continue with loop.
default:
// Any other errors triggers a restart with backoff.
// Reset fail counter if running without error for some time.
if time.Now().Add(-5 * time.Minute).After(lastFail) {
failCnt = 0
}
// Increase fail counter and set last failed time.
failCnt++
lastFail = time.Now()
// Log error and back off for some time.
sleepFor := time.Duration(failCnt) * backoffDuration
log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
select {
case <-time.After(sleepFor):
case <-m.Ctx.Done():
return
}
}
}
}
func (m *Module) runWorker(name string, fn func(context.Context) error) (err error) {
defer func() {
// recover from panic
panicVal := recover()
if panicVal != nil {
me := m.NewPanicError(name, "worker", panicVal)
me.Report()
err = me
}
}()
// run
// TODO: get cancel func for worker context and cancel when worker is done.
// This ensure that when the worker passes its context to another (async) function, it will also be shutdown when the worker finished or dies.
err = fn(m.Ctx)
return
}
func (m *Module) runCtrlFnWithTimeout(name string, timeout time.Duration, fn func() error) error {
stopFnError := m.startCtrlFn(name, fn)
// wait for results
select {
case err := <-stopFnError:
return err
case <-time.After(timeout):
return fmt.Errorf("timed out (%s)", timeout)
}
}
func (m *Module) startCtrlFn(name string, fn func() error) chan error {
ctrlFnError := make(chan error, 1)
// If no function is given, still act as if it was run.
if fn == nil {
// Signal finish.
m.ctrlFuncRunning.UnSet()
m.checkIfStopComplete()
// Report nil error and return.
ctrlFnError <- nil
return ctrlFnError
}
// Signal that a control function is running.
m.ctrlFuncRunning.Set()
// Start control function in goroutine.
go func() {
// Recover from panic and reset control function signal.
defer func() {
// recover from panic
panicVal := recover()
if panicVal != nil {
me := m.NewPanicError(name, "module-control", panicVal)
me.Report()
ctrlFnError <- fmt.Errorf("panic: %s", panicVal)
}
// Signal finish.
m.ctrlFuncRunning.UnSet()
m.checkIfStopComplete()
}()
// Run control function and report error.
err := fn()
ctrlFnError <- err
}()
return ctrlFnError
}