Merge pull request #224 from safing/fix/metrics-modules-api

Fix metrics, modules, api
This commit is contained in:
Daniel Hovie 2023-10-13 11:55:21 +02:00 committed by GitHub
commit 83b709526e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 133 additions and 116 deletions

View file

@ -151,7 +151,7 @@ func authenticateRequest(w http.ResponseWriter, r *http.Request, targetHandler h
switch requiredPermission { //nolint:exhaustive switch requiredPermission { //nolint:exhaustive
case NotFound: case NotFound:
// Not found. // Not found.
tracer.Trace("api: authenticated handler reported: not found") tracer.Debug("api: no API endpoint registered for this path")
http.Error(w, "Not found.", http.StatusNotFound) http.Error(w, "Not found.", http.StatusNotFound)
return nil return nil
case NotSupported: case NotSupported:

View file

@ -235,6 +235,7 @@ func (mh *mainHandler) handle(w http.ResponseWriter, r *http.Request) error {
http.Error(lrw, "Method not allowed.", http.StatusMethodNotAllowed) http.Error(lrw, "Method not allowed.", http.StatusMethodNotAllowed)
return nil return nil
default: default:
tracer.Debug("api: no handler registered for this path")
http.Error(lrw, "Not found.", http.StatusNotFound) http.Error(lrw, "Not found.", http.StatusNotFound)
return nil return nil
} }

View file

@ -11,7 +11,6 @@ import (
"github.com/safing/portbase/api" "github.com/safing/portbase/api"
"github.com/safing/portbase/config" "github.com/safing/portbase/config"
"github.com/safing/portbase/log" "github.com/safing/portbase/log"
"github.com/safing/portbase/utils"
) )
func registerAPI() error { func registerAPI() error {
@ -140,11 +139,7 @@ func writeMetricsTo(ctx context.Context, url string) error {
) )
} }
var metricsPusherDone = utils.NewBroadcastFlag()
func metricsWriter(ctx context.Context) error { func metricsWriter(ctx context.Context) error {
defer metricsPusherDone.NotifyAndReset()
pushURL := pushOption() pushURL := pushOption()
ticker := module.NewSleepyTicker(1*time.Minute, 0) ticker := module.NewSleepyTicker(1*time.Minute, 0)
defer ticker.Stop() defer ticker.Stop()

View file

@ -16,7 +16,7 @@ import (
const hostStatTTL = 1 * time.Second const hostStatTTL = 1 * time.Second
func registeHostMetrics() (err error) { func registerHostMetrics() (err error) {
// Register load average metrics. // Register load average metrics.
_, err = NewGauge("host/load/avg/1", nil, getFloat64HostStat(LoadAvg1), &Options{Name: "Host Load Avg 1min", Permission: api.PermitUser}) _, err = NewGauge("host/load/avg/1", nil, getFloat64HostStat(LoadAvg1), &Options{Name: "Host Load Avg 1min", Permission: api.PermitUser})
if err != nil { if err != nil {

View file

@ -3,10 +3,13 @@ package metrics
import ( import (
"runtime" "runtime"
"strings" "strings"
"sync/atomic"
"github.com/safing/portbase/info" "github.com/safing/portbase/info"
) )
var reportedStart atomic.Bool
func registerInfoMetric() error { func registerInfoMetric() error {
meta := info.GetInfo() meta := info.GetInfo()
_, err := NewGauge( _, err := NewGauge(
@ -26,6 +29,10 @@ func registerInfoMetric() error {
"comment": commentOption(), "comment": commentOption(),
}, },
func() float64 { func() float64 {
// Report as 0 the first time in order to detect (re)starts.
if reportedStart.CompareAndSwap(false, true) {
return 0
}
return 1 return 1
}, },
nil, nil,

View file

@ -5,7 +5,7 @@ import (
"github.com/safing/portbase/log" "github.com/safing/portbase/log"
) )
func registeLogMetrics() (err error) { func registerLogMetrics() (err error) {
_, err = NewFetchingCounter( _, err = NewFetchingCounter(
"logs/warning/total", "logs/warning/total",
nil, nil,

View file

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"sort" "sort"
"sync" "sync"
"time"
"github.com/safing/portbase/modules" "github.com/safing/portbase/modules"
) )
@ -59,11 +58,11 @@ func start() error {
return err return err
} }
if err := registeHostMetrics(); err != nil { if err := registerHostMetrics(); err != nil {
return err return err
} }
if err := registeLogMetrics(); err != nil { if err := registerLogMetrics(); err != nil {
return err return err
} }
@ -82,16 +81,13 @@ func stop() error {
// Wait until the metrics pusher is done, as it may have started reporting // Wait until the metrics pusher is done, as it may have started reporting
// and may report a higher number than we store to disk. For persistent // and may report a higher number than we store to disk. For persistent
// metrics it can then happen that the first report is lower than the // metrics it can then happen that the first report is lower than the
// previous report, making prometheus think that al that happened since the // previous report, making prometheus think that all that happened since the
// last report, due to the automatic restart detection. // last report, due to the automatic restart detection.
done := metricsPusherDone.NewFlag()
done.Refresh() // The registry is read locked when writing metrics.
if !done.IsSet() { // Write lock the registry to make sure all writes are finished.
select { registryLock.Lock()
case <-done.Signal(): registryLock.Unlock() //nolint:staticcheck
case <-time.After(10 * time.Second):
}
}
storePersistentMetrics() storePersistentMetrics()
@ -120,6 +116,10 @@ func register(m Metric) error {
// Set flag that first metric is now registered. // Set flag that first metric is now registered.
firstMetricRegistered = true firstMetricRegistered = true
if module.Status() < modules.StatusStarting {
return fmt.Errorf("registering metric %q too early", m.ID())
}
return nil return nil
} }

View file

@ -25,7 +25,7 @@ var (
}) })
// ErrAlreadyInitialized is returned when trying to initialize an option // ErrAlreadyInitialized is returned when trying to initialize an option
// more than once. // more than once or if the time window for initializing is over.
ErrAlreadyInitialized = errors.New("already initialized") ErrAlreadyInitialized = errors.New("already initialized")
) )
@ -55,7 +55,7 @@ func EnableMetricPersistence(key string) error {
// Load metrics from storage. // Load metrics from storage.
var err error var err error
storage, err = getMetricsStorage(key) storage, err = getMetricsStorage(storageKey)
switch { switch {
case err == nil: case err == nil:
// Continue. // Continue.

View file

@ -60,6 +60,7 @@ type Module struct { //nolint:maligned
Ctx context.Context Ctx context.Context
cancelCtx func() cancelCtx func()
stopFlag *abool.AtomicBool stopFlag *abool.AtomicBool
stopCompleted *abool.AtomicBool
stopComplete chan struct{} stopComplete chan struct{}
// workers/tasks // workers/tasks
@ -255,12 +256,10 @@ func (m *Module) checkIfStopComplete() {
atomic.LoadInt32(m.taskCnt) == 0 && atomic.LoadInt32(m.taskCnt) == 0 &&
atomic.LoadInt32(m.microTaskCnt) == 0 { atomic.LoadInt32(m.microTaskCnt) == 0 {
if m.stopCompleted.SetToIf(false, true) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
if m.stopComplete != nil {
close(m.stopComplete) close(m.stopComplete)
m.stopComplete = nil
} }
} }
} }
@ -283,61 +282,57 @@ func (m *Module) stop(reports chan *report) {
// Reset start/stop signal channels. // Reset start/stop signal channels.
m.startComplete = make(chan struct{}) m.startComplete = make(chan struct{})
m.stopComplete = make(chan struct{}) m.stopComplete = make(chan struct{})
m.stopCompleted.SetTo(false)
// Make a copy of the stop channel. // Set status.
stopComplete := m.stopComplete
// Set status and cancel context.
m.status = StatusStopping m.status = StatusStopping
go m.stopAllTasks(reports)
}
func (m *Module) stopAllTasks(reports chan *report) {
// Manually set the control function flag in order to stop completion by race
// condition before stop function has even started.
m.ctrlFuncRunning.Set()
// Set stop flag for everyone checking this flag before we activate any stop trigger.
m.stopFlag.Set() m.stopFlag.Set()
// Cancel the context to notify all workers and tasks.
m.cancelCtx() m.cancelCtx()
go m.stopAllTasks(reports, stopComplete) // Start stop function.
} stopFnError := m.startCtrlFn("stop module", m.stopFn)
func (m *Module) stopAllTasks(reports chan *report, stopComplete chan struct{}) {
// start shutdown function
var stopFnError error
stopFuncRunning := abool.New()
if m.stopFn != nil {
stopFuncRunning.Set()
go func() {
stopFnError = m.runCtrlFn("stop module", m.stopFn)
stopFuncRunning.UnSet()
m.checkIfStopComplete()
}()
} else {
m.checkIfStopComplete()
}
// wait for results // wait for results
select { select {
case <-stopComplete: case <-m.stopComplete:
// case <-time.After(moduleStopTimeout): // Complete!
case <-time.After(moduleStopTimeout): case <-time.After(moduleStopTimeout):
log.Warningf( log.Warningf(
"%s: timed out while waiting for stopfn/workers/tasks to finish: stopFn=%v/%v workers=%d tasks=%d microtasks=%d, continuing shutdown...", "%s: timed out while waiting for stopfn/workers/tasks to finish: stopFn=%v workers=%d tasks=%d microtasks=%d, continuing shutdown...",
m.Name, m.Name,
stopFuncRunning.IsSet(), m.ctrlFuncRunning.IsSet(), m.ctrlFuncRunning.IsSet(),
atomic.LoadInt32(m.workerCnt), atomic.LoadInt32(m.workerCnt),
atomic.LoadInt32(m.taskCnt), atomic.LoadInt32(m.taskCnt),
atomic.LoadInt32(m.microTaskCnt), atomic.LoadInt32(m.microTaskCnt),
) )
} }
// collect error // Check for stop fn status.
var err error var err error
if stopFuncRunning.IsNotSet() && stopFnError != nil { select {
err = stopFnError case err = <-stopFnError:
}
// set status
if err != nil { if err != nil {
// Set error as module error.
m.Error( m.Error(
fmt.Sprintf("%s:stop-failed", m.Name), fmt.Sprintf("%s:stop-failed", m.Name),
fmt.Sprintf("Stopping module %s failed", m.Name), fmt.Sprintf("Stopping module %s failed", m.Name),
fmt.Sprintf("Failed to stop module: %s", err.Error()), fmt.Sprintf("Failed to stop module: %s", err.Error()),
) )
} }
default:
}
// Always set to offline in order to let other modules shutdown in order. // Always set to offline in order to let other modules shutdown in order.
m.Lock() m.Lock()
@ -384,7 +379,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
Name: name, Name: name,
enabled: abool.NewBool(false), enabled: abool.NewBool(false),
enabledAsDependency: abool.NewBool(false), enabledAsDependency: abool.NewBool(false),
sleepMode: abool.NewBool(true), sleepMode: abool.NewBool(true), // Change (for init) is triggered below.
sleepWaitingChannel: make(chan time.Time), sleepWaitingChannel: make(chan time.Time),
prepFn: prep, prepFn: prep,
startFn: start, startFn: start,
@ -393,6 +388,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
Ctx: ctx, Ctx: ctx,
cancelCtx: cancelCtx, cancelCtx: cancelCtx,
stopFlag: abool.NewBool(false), stopFlag: abool.NewBool(false),
stopCompleted: abool.NewBool(true),
ctrlFuncRunning: abool.NewBool(false), ctrlFuncRunning: abool.NewBool(false),
workerCnt: &workerCnt, workerCnt: &workerCnt,
taskCnt: &taskCnt, taskCnt: &taskCnt,
@ -401,7 +397,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
depNames: dependencies, depNames: dependencies,
} }
// Sleep mode is disabled by default // Sleep mode is disabled by default.
newModule.Sleep(false) newModule.Sleep(false)
return newModule return newModule

View file

@ -53,6 +53,7 @@ func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
} }
// StartServiceWorker starts a generic worker, which is automatically restarted in case of an error. A call to StartServiceWorker runs the service-worker in a new goroutine and returns immediately. `backoffDuration` specifies how to long to wait before restarts, multiplied by the number of failed attempts. Pass `0` for the default backoff duration. For custom error remediation functionality, build your own error handling procedure using calls to RunWorker. // StartServiceWorker starts a generic worker, which is automatically restarted in case of an error. A call to StartServiceWorker runs the service-worker in a new goroutine and returns immediately. `backoffDuration` specifies how to long to wait before restarts, multiplied by the number of failed attempts. Pass `0` for the default backoff duration. For custom error remediation functionality, build your own error handling procedure using calls to RunWorker.
// Returning nil error or context.Canceled will stop the service worker.
func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) { func (m *Module) StartServiceWorker(name string, backoffDuration time.Duration, fn func(context.Context) error) {
if m == nil { if m == nil {
log.Errorf(`modules: cannot start service worker "%s" with nil module`, name) log.Errorf(`modules: cannot start service worker "%s" with nil module`, name)
@ -81,34 +82,36 @@ func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn
} }
err := m.runWorker(name, fn) err := m.runWorker(name, fn)
if err != nil { switch {
if !errors.Is(err, ErrRestartNow) { case err == nil:
// reset fail counter if running without error for some time // No error means that the worker is finished.
return
case errors.Is(err, context.Canceled):
// A canceled context also means that the worker is finished.
return
case errors.Is(err, ErrRestartNow):
// Worker requested a restart - silently continue with loop.
default:
// Any other errors triggers a restart with backoff.
// Reset fail counter if running without error for some time.
if time.Now().Add(-5 * time.Minute).After(lastFail) { if time.Now().Add(-5 * time.Minute).After(lastFail) {
failCnt = 0 failCnt = 0
} }
// increase fail counter and set last failed time // Increase fail counter and set last failed time.
failCnt++ failCnt++
lastFail = time.Now() lastFail = time.Now()
// log error // Log error and back off for some time.
sleepFor := time.Duration(failCnt) * backoffDuration sleepFor := time.Duration(failCnt) * backoffDuration
if errors.Is(err, context.Canceled) {
log.Debugf("%s: service-worker %s was canceled (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
} else {
log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor) log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
}
select { select {
case <-time.After(sleepFor): case <-time.After(sleepFor):
case <-m.Ctx.Done(): case <-m.Ctx.Done():
return return
} }
// loop to restart
} else {
log.Infof("%s: service-worker %s %s - restarting now", m.Name, name, err)
}
} else {
// finish
return
} }
} }
} }
@ -132,10 +135,7 @@ func (m *Module) runWorker(name string, fn func(context.Context) error) (err err
} }
func (m *Module) runCtrlFnWithTimeout(name string, timeout time.Duration, fn func() error) error { func (m *Module) runCtrlFnWithTimeout(name string, timeout time.Duration, fn func() error) error {
stopFnError := make(chan error) stopFnError := m.startCtrlFn(name, fn)
go func() {
stopFnError <- m.runCtrlFn(name, fn)
}()
// wait for results // wait for results
select { select {
@ -146,26 +146,44 @@ func (m *Module) runCtrlFnWithTimeout(name string, timeout time.Duration, fn fun
} }
} }
func (m *Module) runCtrlFn(name string, fn func() error) (err error) { func (m *Module) startCtrlFn(name string, fn func() error) chan error {
ctrlFnError := make(chan error, 1)
// If no function is given, still act as if it was run.
if fn == nil { if fn == nil {
return // Signal finish.
m.ctrlFuncRunning.UnSet()
m.checkIfStopComplete()
// Report nil error and return.
ctrlFnError <- nil
return ctrlFnError
} }
if m.ctrlFuncRunning.SetToIf(false, true) { // Signal that a control function is running.
defer m.ctrlFuncRunning.SetToIf(true, false) m.ctrlFuncRunning.Set()
}
// Start control function in goroutine.
go func() {
// Recover from panic and reset control function signal.
defer func() { defer func() {
// recover from panic // recover from panic
panicVal := recover() panicVal := recover()
if panicVal != nil { if panicVal != nil {
me := m.NewPanicError(name, "module-control", panicVal) me := m.NewPanicError(name, "module-control", panicVal)
me.Report() me.Report()
err = me ctrlFnError <- fmt.Errorf("panic: %s", panicVal)
} }
// Signal finish.
m.ctrlFuncRunning.UnSet()
m.checkIfStopComplete()
}() }()
// run // Run control function and report error.
err = fn() err := fn()
return ctrlFnError <- err
}()
return ctrlFnError
} }