mirror of
https://github.com/safing/portbase
synced 2025-04-10 20:49:09 +00:00
189 lines
5.3 KiB
Go
189 lines
5.3 KiB
Go
package modules
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/safing/portbase/log"
|
|
)
|
|
|
|
// Default Worker Configuration.
|
|
const (
|
|
DefaultBackoffDuration = 2 * time.Second
|
|
)
|
|
|
|
var (
|
|
// ErrRestartNow may be returned (wrapped) by service workers to request an immediate restart.
|
|
ErrRestartNow = errors.New("requested restart")
|
|
errNoModule = errors.New("missing module (is nil!)")
|
|
)
|
|
|
|
// StartWorker directly starts a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to StartWorker starts a new goroutine and returns immediately.
|
|
func (m *Module) StartWorker(name string, fn func(context.Context) error) {
|
|
go func() {
|
|
err := m.RunWorker(name, fn)
|
|
switch {
|
|
case err == nil:
|
|
return
|
|
case errors.Is(err, context.Canceled):
|
|
log.Debugf("%s: worker %s was canceled: %s", m.Name, name, err)
|
|
default:
|
|
log.Errorf("%s: worker %s failed: %s", m.Name, name, err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// RunWorker directly runs a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to RunWorker blocks until the worker is finished.
|
|
func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
|
|
if m == nil {
|
|
log.Errorf(`modules: cannot start worker "%s" with nil module`, name)
|
|
return errNoModule
|
|
}
|
|
|
|
atomic.AddInt32(m.workerCnt, 1)
|
|
defer func() {
|
|
atomic.AddInt32(m.workerCnt, -1)
|
|
m.checkIfStopComplete()
|
|
}()
|
|
|
|
return m.runWorker(name, fn)
|
|
}
|
|
|
|
// StartServiceWorker starts a generic worker, which is automatically restarted in case of an error. A call to StartServiceWorker runs the service-worker in a new goroutine and returns immediately. `backoffDuration` specifies how to long to wait before restarts, multiplied by the number of failed attempts. Pass `0` for the default backoff duration. For custom error remediation functionality, build your own error handling procedure using calls to RunWorker.
|
|
// Returning nil error or context.Canceled will stop the service worker.
|
|
func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
|
|
if m == nil {
|
|
log.Errorf(`modules: cannot start service worker "%s" with nil module`, name)
|
|
return
|
|
}
|
|
|
|
go m.runServiceWorker(name, backoffDuration, fn)
|
|
}
|
|
|
|
func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
|
|
atomic.AddInt32(m.workerCnt, 1)
|
|
defer func() {
|
|
atomic.AddInt32(m.workerCnt, -1)
|
|
m.checkIfStopComplete()
|
|
}()
|
|
|
|
if backoffDuration == 0 {
|
|
backoffDuration = DefaultBackoffDuration
|
|
}
|
|
failCnt := 0
|
|
lastFail := time.Now()
|
|
|
|
for {
|
|
if m.IsStopping() {
|
|
return
|
|
}
|
|
|
|
err := m.runWorker(name, fn)
|
|
switch {
|
|
case err == nil:
|
|
// No error means that the worker is finished.
|
|
return
|
|
|
|
case errors.Is(err, context.Canceled):
|
|
// A canceled context also means that the worker is finished.
|
|
return
|
|
|
|
case errors.Is(err, ErrRestartNow):
|
|
// Worker requested a restart - silently continue with loop.
|
|
|
|
default:
|
|
// Any other errors triggers a restart with backoff.
|
|
|
|
// Reset fail counter if running without error for some time.
|
|
if time.Now().Add(-5 * time.Minute).After(lastFail) {
|
|
failCnt = 0
|
|
}
|
|
// Increase fail counter and set last failed time.
|
|
failCnt++
|
|
lastFail = time.Now()
|
|
// Log error and back off for some time.
|
|
sleepFor := time.Duration(failCnt) * backoffDuration
|
|
log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
|
|
select {
|
|
case <-time.After(sleepFor):
|
|
case <-m.Ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Module) runWorker(name string, fn func(context.Context) error) (err error) {
|
|
defer func() {
|
|
// recover from panic
|
|
panicVal := recover()
|
|
if panicVal != nil {
|
|
me := m.NewPanicError(name, "worker", panicVal)
|
|
me.Report()
|
|
err = me
|
|
}
|
|
}()
|
|
|
|
// run
|
|
// TODO: get cancel func for worker context and cancel when worker is done.
|
|
// This ensure that when the worker passes its context to another (async) function, it will also be shutdown when the worker finished or dies.
|
|
err = fn(m.Ctx)
|
|
return
|
|
}
|
|
|
|
func (m *Module) runCtrlFnWithTimeout(name string, timeout time.Duration, fn func() error) error {
|
|
stopFnError := m.startCtrlFn(name, fn)
|
|
|
|
// wait for results
|
|
select {
|
|
case err := <-stopFnError:
|
|
return err
|
|
case <-time.After(timeout):
|
|
return fmt.Errorf("timed out (%s)", timeout)
|
|
}
|
|
}
|
|
|
|
func (m *Module) startCtrlFn(name string, fn func() error) chan error {
|
|
ctrlFnError := make(chan error, 1)
|
|
|
|
// If no function is given, still act as if it was run.
|
|
if fn == nil {
|
|
// Signal finish.
|
|
m.ctrlFuncRunning.UnSet()
|
|
m.checkIfStopComplete()
|
|
|
|
// Report nil error and return.
|
|
ctrlFnError <- nil
|
|
return ctrlFnError
|
|
}
|
|
|
|
// Signal that a control function is running.
|
|
m.ctrlFuncRunning.Set()
|
|
|
|
// Start control function in goroutine.
|
|
go func() {
|
|
// Recover from panic and reset control function signal.
|
|
defer func() {
|
|
// recover from panic
|
|
panicVal := recover()
|
|
if panicVal != nil {
|
|
me := m.NewPanicError(name, "module-control", panicVal)
|
|
me.Report()
|
|
ctrlFnError <- fmt.Errorf("panic: %s", panicVal)
|
|
}
|
|
|
|
// Signal finish.
|
|
m.ctrlFuncRunning.UnSet()
|
|
m.checkIfStopComplete()
|
|
}()
|
|
|
|
// Run control function and report error.
|
|
err := fn()
|
|
ctrlFnError <- err
|
|
}()
|
|
|
|
return ctrlFnError
|
|
}
|