mirror of
https://github.com/safing/portmaster
synced 2025-09-01 18:19:12 +00:00
Merge pull request #1668 from safing/fix/spn-start-stop
Fix starting/stopping SPN + more
This commit is contained in:
commit
91f4fe5c52
22 changed files with 718 additions and 59 deletions
2
.github/workflows/go.yml
vendored
2
.github/workflows/go.yml
vendored
|
@ -40,7 +40,7 @@ jobs:
|
|||
- name: Run golangci-lint
|
||||
uses: golangci/golangci-lint-action@v4
|
||||
with:
|
||||
version: v1.57.1
|
||||
version: v1.60.3
|
||||
only-new-issues: true
|
||||
args: -c ./.golangci.yml --timeout 15m
|
||||
|
||||
|
|
|
@ -18,11 +18,13 @@ linters:
|
|||
- gocyclo
|
||||
- goerr113
|
||||
- gomnd
|
||||
- gomoddirectives
|
||||
- ifshort
|
||||
- interfacebloat
|
||||
- interfacer
|
||||
- ireturn
|
||||
- lll
|
||||
- mnd
|
||||
- musttag
|
||||
- nestif
|
||||
- nilnil
|
||||
|
@ -31,16 +33,15 @@ linters:
|
|||
- nolintlint
|
||||
- nonamedreturns
|
||||
- nosnakecase
|
||||
- perfsprint # TODO(ppacher): we should re-enanble this one to avoid costly fmt.* calls in the hot-path
|
||||
- revive
|
||||
- tagliatelle
|
||||
- testifylint
|
||||
- testpackage
|
||||
- varnamelen
|
||||
- whitespace
|
||||
- wrapcheck
|
||||
- wsl
|
||||
- perfsprint # TODO(ppacher): we should re-enanble this one to avoid costly fmt.* calls in the hot-path
|
||||
- testifylint
|
||||
- gomoddirectives
|
||||
|
||||
linters-settings:
|
||||
revive:
|
||||
|
|
|
@ -38,11 +38,15 @@ func tickFeeder(ctx *mgr.WorkerCtx) error {
|
|||
feeder := NewFeeder()
|
||||
defer feeder.CloseFeeder()
|
||||
|
||||
tickDuration := getTickFeederTickDuration()
|
||||
ticker := time.NewTicker(getTickFeederTickDuration())
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
// wait for tick
|
||||
time.Sleep(tickDuration)
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
|
||||
// add tick value
|
||||
value = (value << 1) | (time.Now().UnixNano() % 2)
|
||||
|
@ -64,13 +68,6 @@ func tickFeeder(ctx *mgr.WorkerCtx) error {
|
|||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
// check if are done
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
1
go.mod
1
go.mod
|
@ -90,6 +90,7 @@ require (
|
|||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/josharian/native v1.1.0 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
||||
github.com/maruel/panicparse/v2 v2.3.1 // indirect
|
||||
github.com/mdlayher/netlink v1.7.2 // indirect
|
||||
github.com/mdlayher/socket v0.5.1 // indirect
|
||||
github.com/mitchellh/reflectwalk v1.0.2 // indirect
|
||||
|
|
7
go.sum
7
go.sum
|
@ -173,12 +173,16 @@ github.com/lmittmann/tint v1.0.5 h1:NQclAutOfYsqs2F1Lenue6OoWCajs5wJcP3DfWVpePw=
|
|||
github.com/lmittmann/tint v1.0.5/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
|
||||
github.com/magiconair/properties v1.7.4-0.20170902060319-8d7837e64d3c/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/maruel/panicparse/v2 v2.3.1 h1:NtJavmbMn0DyzmmSStE8yUsmPZrZmudPH7kplxBinOA=
|
||||
github.com/maruel/panicparse/v2 v2.3.1/go.mod h1:s3UmQB9Fm/n7n/prcD2xBGDkwXD6y2LeZnhbEXvs9Dg=
|
||||
github.com/mat/besticon v3.12.0+incompatible h1:1KTD6wisfjfnX+fk9Kx/6VEZL+MAW1LhCkL9Q47H9Bg=
|
||||
github.com/mat/besticon v3.12.0+incompatible/go.mod h1:mA1auQYHt6CW5e7L9HJLmqVQC8SzNk2gVwouO0AbiEU=
|
||||
github.com/mattn/go-colorable v0.0.10-0.20170816031813-ad5389df28cd/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
|
||||
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
|
||||
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
|
||||
github.com/mattn/go-isatty v0.0.2/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
|
||||
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
|
@ -205,6 +209,7 @@ github.com/mdlayher/socket v0.1.0/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5A
|
|||
github.com/mdlayher/socket v0.1.1/go.mod h1:mYV5YIZAfHh4dzDVzI8x8tWLWCliuX8Mon5Awbj+qDs=
|
||||
github.com/mdlayher/socket v0.5.1 h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos=
|
||||
github.com/mdlayher/socket v0.5.1/go.mod h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ=
|
||||
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
|
||||
github.com/miekg/dns v1.1.62 h1:cN8OuEF1/x5Rq6Np+h1epln8OiyPWV+lROx9LxcGgIQ=
|
||||
github.com/miekg/dns v1.1.62/go.mod h1:mvDlcItzm+br7MToIKqkglaGhlFMHJ9DTNNWONWXbNQ=
|
||||
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
|
||||
|
@ -402,6 +407,7 @@ golang.org/x/sys v0.0.0-20210305230114-8fe3ee5dd75b/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210525143221-35b2ab0089ea/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
|
@ -409,6 +415,7 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
|
|
|
@ -151,6 +151,7 @@ func debugInfo(ar *api.Request) (data []byte, err error) {
|
|||
// Detailed information.
|
||||
updates.AddToDebugInfo(di)
|
||||
compat.AddToDebugInfo(di)
|
||||
module.instance.AddWorkerInfoToDebugInfo(di)
|
||||
di.AddGoroutineStack()
|
||||
|
||||
// Return data.
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/safing/portmaster/base/log"
|
||||
"github.com/safing/portmaster/base/metrics"
|
||||
"github.com/safing/portmaster/base/utils/debug"
|
||||
_ "github.com/safing/portmaster/service/broadcasts"
|
||||
"github.com/safing/portmaster/service/mgr"
|
||||
_ "github.com/safing/portmaster/service/netenv"
|
||||
|
@ -112,4 +113,5 @@ func New(instance instance) (*Core, error) {
|
|||
|
||||
type instance interface {
|
||||
Shutdown()
|
||||
AddWorkerInfoToDebugInfo(di *debug.Info)
|
||||
}
|
||||
|
|
64
service/debug.go
Normal file
64
service/debug.go
Normal file
|
@ -0,0 +1,64 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"runtime"
|
||||
|
||||
"github.com/maruel/panicparse/v2/stack"
|
||||
|
||||
"github.com/safing/portmaster/base/utils/debug"
|
||||
"github.com/safing/portmaster/service/mgr"
|
||||
)
|
||||
|
||||
// GetWorkerInfo returns the worker info of all running workers.
|
||||
func (i *Instance) GetWorkerInfo() (*mgr.WorkerInfo, error) {
|
||||
snapshot, _, err := stack.ScanSnapshot(bytes.NewReader(fullStack()), io.Discard, stack.DefaultOpts())
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return nil, fmt.Errorf("get stack: %w", err)
|
||||
}
|
||||
|
||||
infos := make([]*mgr.WorkerInfo, 0, 32)
|
||||
for _, m := range i.serviceGroup.Modules() {
|
||||
wi, _ := m.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot.
|
||||
infos = append(infos, wi)
|
||||
}
|
||||
for _, m := range i.SpnGroup.Modules() {
|
||||
wi, _ := m.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot.
|
||||
infos = append(infos, wi)
|
||||
}
|
||||
|
||||
return mgr.MergeWorkerInfo(infos...), nil
|
||||
}
|
||||
|
||||
// AddWorkerInfoToDebugInfo adds the worker info of all running workers to the debug info.
|
||||
func (i *Instance) AddWorkerInfoToDebugInfo(di *debug.Info) {
|
||||
info, err := i.GetWorkerInfo()
|
||||
if err != nil {
|
||||
di.AddSection(
|
||||
"Worker Status Failed",
|
||||
debug.UseCodeSection,
|
||||
err.Error(),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
di.AddSection(
|
||||
fmt.Sprintf("Worker Status: %d/%d (%d?)", info.Running, len(info.Workers), info.Missing+info.Other),
|
||||
debug.UseCodeSection,
|
||||
info.Format(),
|
||||
)
|
||||
}
|
||||
|
||||
func fullStack() []byte {
|
||||
buf := make([]byte, 8096)
|
||||
for {
|
||||
n := runtime.Stack(buf, true)
|
||||
if n < len(buf) {
|
||||
return buf[:n]
|
||||
}
|
||||
buf = make([]byte, 2*len(buf))
|
||||
}
|
||||
}
|
33
service/debug_test.go
Normal file
33
service/debug_test.go
Normal file
|
@ -0,0 +1,33 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/safing/portmaster/base/notifications"
|
||||
"github.com/safing/portmaster/service/mgr"
|
||||
)
|
||||
|
||||
func TestDebug(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create test instance with at least one worker.
|
||||
i := &Instance{}
|
||||
n, err := notifications.New(i)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
i.serviceGroup = mgr.NewGroup(n)
|
||||
i.SpnGroup = mgr.NewExtendedGroup()
|
||||
err = i.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
info, err := i.GetWorkerInfo()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(info)
|
||||
}
|
|
@ -619,7 +619,7 @@ func (i *Instance) shutdown(exitCode int) {
|
|||
|
||||
// Stopping returns whether the instance is shutting down.
|
||||
func (i *Instance) Stopping() bool {
|
||||
return i.ctx.Err() == nil
|
||||
return i.ctx.Err() != nil
|
||||
}
|
||||
|
||||
// Stopped returns a channel that is triggered when the instance has shut down.
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
package mgr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -107,6 +106,12 @@ func (em *EventMgr[T]) Submit(event T) {
|
|||
|
||||
// Run callbacks.
|
||||
for _, ec := range em.callbacks {
|
||||
// Check if callback was canceled.
|
||||
if ec.canceled.Load() {
|
||||
anyCanceled = true
|
||||
continue
|
||||
}
|
||||
|
||||
// Execute callback.
|
||||
var (
|
||||
cancel bool
|
||||
|
@ -114,29 +119,38 @@ func (em *EventMgr[T]) Submit(event T) {
|
|||
)
|
||||
if em.mgr != nil {
|
||||
// Prefer executing in worker.
|
||||
wkrErr := em.mgr.Do("execute event callback", func(w *WorkerCtx) error {
|
||||
cancel, err = ec.callback(w, event) //nolint:scopelint // Execution is within scope.
|
||||
name := "event " + em.name + " callback " + ec.name
|
||||
em.mgr.Go(name, func(w *WorkerCtx) error {
|
||||
cancel, err = ec.callback(w, event)
|
||||
// Handle error and cancelation.
|
||||
if err != nil {
|
||||
w.Warn(
|
||||
"event callback failed",
|
||||
"event", em.name,
|
||||
"callback", ec.name,
|
||||
"err", err,
|
||||
)
|
||||
}
|
||||
if cancel {
|
||||
ec.canceled.Store(true)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if wkrErr != nil {
|
||||
err = fmt.Errorf("callback execution failed: %w", wkrErr)
|
||||
}
|
||||
} else {
|
||||
cancel, err = ec.callback(nil, event)
|
||||
}
|
||||
|
||||
// Handle error and cancelation.
|
||||
if err != nil && em.mgr != nil {
|
||||
em.mgr.Warn(
|
||||
"event callback failed",
|
||||
"event", em.name,
|
||||
"callback", ec.name,
|
||||
"err", err,
|
||||
)
|
||||
}
|
||||
if cancel {
|
||||
ec.canceled.Store(true)
|
||||
anyCanceled = true
|
||||
// Handle error and cancelation.
|
||||
if err != nil && em.mgr != nil {
|
||||
em.mgr.Warn(
|
||||
"event callback failed",
|
||||
"event", em.name,
|
||||
"callback", ec.name,
|
||||
"err", err,
|
||||
)
|
||||
}
|
||||
if cancel {
|
||||
ec.canceled.Store(true)
|
||||
anyCanceled = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ const (
|
|||
groupStateInvalid
|
||||
)
|
||||
|
||||
//nolint:goconst
|
||||
func groupStateToString(state int32) string {
|
||||
switch state {
|
||||
case groupStateOff:
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"log/slog"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
@ -21,6 +22,10 @@ type Manager struct {
|
|||
|
||||
workerCnt atomic.Int32
|
||||
workersDone chan struct{}
|
||||
|
||||
workers []*WorkerCtx
|
||||
workersIndex int
|
||||
workersLock sync.Mutex
|
||||
}
|
||||
|
||||
// New returns a new manager.
|
||||
|
@ -33,6 +38,7 @@ func newManager(name string) *Manager {
|
|||
name: name,
|
||||
logger: slog.Default().With(ManagerNameSLogKey, name),
|
||||
workersDone: make(chan struct{}),
|
||||
workers: make([]*WorkerCtx, 4),
|
||||
}
|
||||
m.ctx, m.cancelCtx = context.WithCancel(context.Background())
|
||||
return m
|
||||
|
@ -196,11 +202,13 @@ func (m *Manager) waitForWorkers(max time.Duration, limit int32) (done bool) {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *Manager) workerStart() {
|
||||
func (m *Manager) workerStart(w *WorkerCtx) {
|
||||
m.registerWorker(w)
|
||||
m.workerCnt.Add(1)
|
||||
}
|
||||
|
||||
func (m *Manager) workerDone() {
|
||||
func (m *Manager) workerDone(w *WorkerCtx) {
|
||||
m.unregisterWorker(w)
|
||||
if m.workerCnt.Add(-1) <= 1 {
|
||||
// Notify all waiters.
|
||||
for {
|
||||
|
|
|
@ -149,11 +149,11 @@ func (m *StateMgr) Remove(id string) {
|
|||
// Clear removes all states.
|
||||
func (m *StateMgr) Clear() {
|
||||
m.statesLock.Lock()
|
||||
defer m.statesLock.Unlock()
|
||||
|
||||
m.states = nil
|
||||
m.statesLock.Unlock()
|
||||
|
||||
m.statesEventMgr.Submit(m.export())
|
||||
// Submit event without lock, because callbacks might come back to change states.
|
||||
defer m.statesEventMgr.Submit(m.Export())
|
||||
}
|
||||
|
||||
// Export returns the current states.
|
||||
|
|
|
@ -21,6 +21,9 @@ var WorkerCtxContextKey = workerContextKey{}
|
|||
// WorkerCtx provides workers with the necessary environment for flow control
|
||||
// and logging.
|
||||
type WorkerCtx struct {
|
||||
name string
|
||||
workFunc func(w *WorkerCtx) error
|
||||
|
||||
ctx context.Context
|
||||
cancelCtx context.CancelFunc
|
||||
|
||||
|
@ -161,14 +164,16 @@ func (m *Manager) Go(name string, fn func(w *WorkerCtx) error) {
|
|||
}
|
||||
|
||||
func (m *Manager) manageWorker(name string, fn func(w *WorkerCtx) error) {
|
||||
m.workerStart()
|
||||
defer m.workerDone()
|
||||
|
||||
w := &WorkerCtx{
|
||||
logger: m.logger.With("worker", name),
|
||||
name: name,
|
||||
workFunc: fn,
|
||||
logger: m.logger.With("worker", name),
|
||||
}
|
||||
w.ctx = m.ctx
|
||||
|
||||
m.workerStart(w)
|
||||
defer m.workerDone(w)
|
||||
|
||||
backoff := time.Second
|
||||
failCnt := 0
|
||||
|
||||
|
@ -244,15 +249,17 @@ func (m *Manager) manageWorker(name string, fn func(w *WorkerCtx) error) {
|
|||
// - Panic catching.
|
||||
// - Flow control helpers.
|
||||
func (m *Manager) Do(name string, fn func(w *WorkerCtx) error) error {
|
||||
m.workerStart()
|
||||
defer m.workerDone()
|
||||
|
||||
// Create context.
|
||||
w := &WorkerCtx{
|
||||
ctx: m.Ctx(),
|
||||
logger: m.logger.With("worker", name),
|
||||
name: name,
|
||||
workFunc: fn,
|
||||
ctx: m.Ctx(),
|
||||
logger: m.logger.With("worker", name),
|
||||
}
|
||||
|
||||
m.workerStart(w)
|
||||
defer m.workerDone(w)
|
||||
|
||||
// Run worker.
|
||||
panicInfo, err := m.runWorker(w, fn)
|
||||
switch {
|
||||
|
|
389
service/mgr/worker_info.go
Normal file
389
service/mgr/worker_info.go
Normal file
|
@ -0,0 +1,389 @@
|
|||
package mgr
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
|
||||
"github.com/maruel/panicparse/v2/stack"
|
||||
)
|
||||
|
||||
// WorkerInfoModule is used for interface checks on modules.
|
||||
type WorkerInfoModule interface {
|
||||
WorkerInfo(s *stack.Snapshot) (*WorkerInfo, error)
|
||||
}
|
||||
|
||||
func (m *Manager) registerWorker(w *WorkerCtx) {
|
||||
m.workersLock.Lock()
|
||||
defer m.workersLock.Unlock()
|
||||
|
||||
// Iterate forwards over the ring buffer.
|
||||
end := (m.workersIndex - 1 + len(m.workers)) % len(m.workers)
|
||||
for {
|
||||
// Check if entry is available.
|
||||
if m.workers[m.workersIndex] == nil {
|
||||
m.workers[m.workersIndex] = w
|
||||
return
|
||||
}
|
||||
// Check if we checked the whole ring buffer.
|
||||
if m.workersIndex == end {
|
||||
break
|
||||
}
|
||||
// Go to next index.
|
||||
m.workersIndex = (m.workersIndex + 1) % len(m.workers)
|
||||
}
|
||||
|
||||
// Increase ring buffer.
|
||||
newRingBuf := make([]*WorkerCtx, len(m.workers)*4)
|
||||
copy(newRingBuf, m.workers)
|
||||
// Add new entry.
|
||||
m.workersIndex = len(m.workers)
|
||||
newRingBuf[m.workersIndex] = w
|
||||
m.workersIndex++
|
||||
// Switch to new ring buffer.
|
||||
m.workers = newRingBuf
|
||||
}
|
||||
|
||||
func (m *Manager) unregisterWorker(w *WorkerCtx) {
|
||||
m.workersLock.Lock()
|
||||
defer m.workersLock.Unlock()
|
||||
|
||||
// Iterate backwards over the ring buffer.
|
||||
i := m.workersIndex
|
||||
end := (i + 1) % len(m.workers)
|
||||
for {
|
||||
// Check if entry is the one we want to remove.
|
||||
if m.workers[i] == w {
|
||||
m.workers[i] = nil
|
||||
return
|
||||
}
|
||||
// Check if we checked the whole ring buffer.
|
||||
if i == end {
|
||||
break
|
||||
}
|
||||
// Go to next index.
|
||||
i = (i - 1 + len(m.workers)) % len(m.workers)
|
||||
}
|
||||
}
|
||||
|
||||
// WorkerInfo holds status information about a managers workers.
|
||||
type WorkerInfo struct {
|
||||
Running int
|
||||
Waiting int
|
||||
|
||||
Other int
|
||||
Missing int
|
||||
|
||||
Workers []*WorkerInfoDetail
|
||||
}
|
||||
|
||||
// WorkerInfoDetail holds status information about a single worker.
|
||||
type WorkerInfoDetail struct {
|
||||
Count int
|
||||
State string
|
||||
Mgr string
|
||||
Name string
|
||||
Func string
|
||||
CurrentLine string
|
||||
ExtraInfo string
|
||||
}
|
||||
|
||||
// WorkerInfo returns status information for all running workers of this manager.
|
||||
func (m *Manager) WorkerInfo(s *stack.Snapshot) (*WorkerInfo, error) {
|
||||
m.workersLock.Lock()
|
||||
defer m.workersLock.Unlock()
|
||||
|
||||
var err error
|
||||
if s == nil {
|
||||
s, _, err = stack.ScanSnapshot(bytes.NewReader(fullStack()), io.Discard, stack.DefaultOpts())
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return nil, fmt.Errorf("get stack: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
wi := &WorkerInfo{
|
||||
Workers: make([]*WorkerInfoDetail, 0, len(m.workers)),
|
||||
}
|
||||
|
||||
// Go through all registered workers of manager.
|
||||
for _, w := range m.workers {
|
||||
// Ignore empty slots.
|
||||
if w == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Setup worker detail struct.
|
||||
wd := &WorkerInfoDetail{
|
||||
Count: 1,
|
||||
Mgr: m.name,
|
||||
}
|
||||
if w.workerMgr != nil {
|
||||
wd.Name = w.workerMgr.name
|
||||
wd.Func = getFuncName(w.workerMgr.fn)
|
||||
} else {
|
||||
wd.Name = w.name
|
||||
wd.Func = getFuncName(w.workFunc)
|
||||
}
|
||||
|
||||
// Search for stack of this worker.
|
||||
goroutines:
|
||||
for _, gr := range s.Goroutines {
|
||||
for _, call := range gr.Stack.Calls {
|
||||
// Check if the can find the worker function in a call stack.
|
||||
fullFuncName := call.Func.ImportPath + "." + call.Func.Name
|
||||
if fullFuncName == wd.Func {
|
||||
wd.State = gr.State
|
||||
|
||||
// Find most useful line for where the goroutine currently is at.
|
||||
// Cut import path prefix to domain/user, eg. github.com/safing
|
||||
importPathPrefix := call.ImportPath
|
||||
splitted := strings.SplitN(importPathPrefix, "/", 3)
|
||||
if len(splitted) == 3 {
|
||||
importPathPrefix = splitted[0] + "/" + splitted[1] + "/"
|
||||
}
|
||||
// Find "last" call within that import path prefix.
|
||||
for _, call = range gr.Stack.Calls {
|
||||
if strings.HasPrefix(call.ImportPath, importPathPrefix) {
|
||||
wd.CurrentLine = call.ImportPath + "/" + call.SrcName + ":" + strconv.Itoa(call.Line)
|
||||
break
|
||||
}
|
||||
}
|
||||
// Fall back to last call if no better line was found.
|
||||
if wd.CurrentLine == "" {
|
||||
wd.CurrentLine = gr.Stack.Calls[0].ImportPath + "/" + gr.Stack.Calls[0].SrcName + ":" + strconv.Itoa(gr.Stack.Calls[0].Line)
|
||||
}
|
||||
|
||||
// Add some extra info in some cases.
|
||||
if wd.State == "sleep" { //nolint:goconst
|
||||
wd.ExtraInfo = gr.SleepString()
|
||||
}
|
||||
|
||||
break goroutines
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Summarize and add to list.
|
||||
switch wd.State {
|
||||
case "idle", "runnable", "running", "syscall",
|
||||
"waiting", "dead", "enqueue", "copystack":
|
||||
wi.Running++
|
||||
case "chan send", "chan receive", "select", "IO wait",
|
||||
"panicwait", "semacquire", "semarelease", "sleep",
|
||||
"sync.Mutex.Lock":
|
||||
wi.Waiting++
|
||||
case "":
|
||||
if w.workerMgr != nil {
|
||||
wi.Waiting++
|
||||
wd.State = "scheduled"
|
||||
wd.ExtraInfo = w.workerMgr.Status()
|
||||
} else {
|
||||
wi.Missing++
|
||||
wd.State = "missing"
|
||||
}
|
||||
default:
|
||||
wi.Other++
|
||||
}
|
||||
|
||||
wi.Workers = append(wi.Workers, wd)
|
||||
}
|
||||
|
||||
// Sort and return.
|
||||
wi.clean()
|
||||
return wi, nil
|
||||
}
|
||||
|
||||
// Format formats the worker information as a readable table.
|
||||
func (wi *WorkerInfo) Format() string {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
|
||||
// Add summary.
|
||||
buf.WriteString(fmt.Sprintf(
|
||||
"%d Workers: %d running, %d waiting\n\n",
|
||||
len(wi.Workers),
|
||||
wi.Running,
|
||||
wi.Waiting,
|
||||
))
|
||||
|
||||
// Build table.
|
||||
tabWriter := tabwriter.NewWriter(buf, 4, 4, 3, ' ', 0)
|
||||
_, _ = fmt.Fprintf(tabWriter, "#\tState\tModule\tName\tWorker Func\tCurrent Line\tExtra Info\n")
|
||||
|
||||
for _, wd := range wi.Workers {
|
||||
_, _ = fmt.Fprintf(tabWriter,
|
||||
"%d\t%s\t%s\t%s\t%s\t%s\t%s\n",
|
||||
wd.Count,
|
||||
wd.State,
|
||||
wd.Mgr,
|
||||
wd.Name,
|
||||
wd.Func,
|
||||
wd.CurrentLine,
|
||||
wd.ExtraInfo,
|
||||
)
|
||||
}
|
||||
_ = tabWriter.Flush()
|
||||
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
func getFuncName(fn func(w *WorkerCtx) error) string {
|
||||
name := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
|
||||
return strings.TrimSuffix(name, "-fm")
|
||||
}
|
||||
|
||||
func fullStack() []byte {
|
||||
buf := make([]byte, 8096)
|
||||
for {
|
||||
n := runtime.Stack(buf, true)
|
||||
if n < len(buf) {
|
||||
return buf[:n]
|
||||
}
|
||||
buf = make([]byte, 2*len(buf))
|
||||
}
|
||||
}
|
||||
|
||||
// MergeWorkerInfo merges multiple worker infos into one.
|
||||
func MergeWorkerInfo(infos ...*WorkerInfo) *WorkerInfo {
|
||||
// Calculate total registered workers.
|
||||
var totalWorkers int
|
||||
for _, status := range infos {
|
||||
totalWorkers += len(status.Workers)
|
||||
}
|
||||
|
||||
// Merge all worker infos.
|
||||
wi := &WorkerInfo{
|
||||
Workers: make([]*WorkerInfoDetail, 0, totalWorkers),
|
||||
}
|
||||
for _, info := range infos {
|
||||
wi.Running += info.Running
|
||||
wi.Waiting += info.Waiting
|
||||
wi.Other += info.Other
|
||||
wi.Missing += info.Missing
|
||||
wi.Workers = append(wi.Workers, info.Workers...)
|
||||
}
|
||||
|
||||
// Sort and return.
|
||||
wi.clean()
|
||||
return wi
|
||||
}
|
||||
|
||||
func (wi *WorkerInfo) clean() {
|
||||
// Check if there is anything to do.
|
||||
if len(wi.Workers) <= 1 {
|
||||
return
|
||||
}
|
||||
|
||||
// Sort for deduplication.
|
||||
slices.SortFunc(wi.Workers, sortWorkerInfoDetail)
|
||||
|
||||
// Count duplicate worker details.
|
||||
current := wi.Workers[0]
|
||||
for i := 1; i < len(wi.Workers); i++ {
|
||||
if workerDetailsAreEqual(current, wi.Workers[i]) {
|
||||
current.Count++
|
||||
} else {
|
||||
current = wi.Workers[i]
|
||||
}
|
||||
}
|
||||
// Deduplicate worker details.
|
||||
wi.Workers = slices.CompactFunc(wi.Workers, workerDetailsAreEqual)
|
||||
|
||||
// Sort for presentation.
|
||||
slices.SortFunc(wi.Workers, sortWorkerInfoDetailByCount)
|
||||
}
|
||||
|
||||
// sortWorkerInfoDetail is a sort function to sort worker info details by their content.
|
||||
func sortWorkerInfoDetail(a, b *WorkerInfoDetail) int {
|
||||
switch {
|
||||
case a.State != b.State:
|
||||
return strings.Compare(a.State, b.State)
|
||||
case a.Mgr != b.Mgr:
|
||||
return strings.Compare(a.Mgr, b.Mgr)
|
||||
case a.Name != b.Name:
|
||||
return strings.Compare(a.Name, b.Name)
|
||||
case a.Func != b.Func:
|
||||
return strings.Compare(a.Func, b.Func)
|
||||
case a.CurrentLine != b.CurrentLine:
|
||||
return strings.Compare(a.CurrentLine, b.CurrentLine)
|
||||
case a.ExtraInfo != b.ExtraInfo:
|
||||
return strings.Compare(a.ExtraInfo, b.ExtraInfo)
|
||||
case a.Count != b.Count:
|
||||
return b.Count - a.Count
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// sortWorkerInfoDetailByCount is a sort function to sort worker info details by their count and then by content.
|
||||
func sortWorkerInfoDetailByCount(a, b *WorkerInfoDetail) int {
|
||||
stateA, stateB := goroutineStateOrder(a.State), goroutineStateOrder(b.State)
|
||||
switch {
|
||||
case stateA != stateB:
|
||||
return stateA - stateB
|
||||
case a.State != b.State:
|
||||
return strings.Compare(a.State, b.State)
|
||||
case a.Count != b.Count:
|
||||
return b.Count - a.Count
|
||||
case a.Mgr != b.Mgr:
|
||||
return strings.Compare(a.Mgr, b.Mgr)
|
||||
case a.Name != b.Name:
|
||||
return strings.Compare(a.Name, b.Name)
|
||||
case a.Func != b.Func:
|
||||
return strings.Compare(a.Func, b.Func)
|
||||
case a.CurrentLine != b.CurrentLine:
|
||||
return strings.Compare(a.CurrentLine, b.CurrentLine)
|
||||
case a.ExtraInfo != b.ExtraInfo:
|
||||
return strings.Compare(a.ExtraInfo, b.ExtraInfo)
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// workerDetailsAreEqual is a deduplication function for worker details.
|
||||
func workerDetailsAreEqual(a, b *WorkerInfoDetail) bool {
|
||||
switch {
|
||||
case a.State != b.State:
|
||||
return false
|
||||
case a.Mgr != b.Mgr:
|
||||
return false
|
||||
case a.Name != b.Name:
|
||||
return false
|
||||
case a.Func != b.Func:
|
||||
return false
|
||||
case a.CurrentLine != b.CurrentLine:
|
||||
return false
|
||||
case a.ExtraInfo != b.ExtraInfo:
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:goconst
|
||||
func goroutineStateOrder(state string) int {
|
||||
switch state {
|
||||
case "runnable", "running", "syscall":
|
||||
return 0 // Active.
|
||||
case "idle", "waiting", "dead", "enqueue", "copystack":
|
||||
return 1 // Active-ish.
|
||||
case "semacquire", "semarelease", "sleep", "panicwait", "sync.Mutex.Lock":
|
||||
return 2 // Bad (practice) blocking.
|
||||
case "chan send", "chan receive", "select":
|
||||
return 3 // Potentially bad (practice), but normal blocking.
|
||||
case "IO wait":
|
||||
return 4 // Normal blocking.
|
||||
case "scheduled":
|
||||
return 5 // Not running.
|
||||
case "missing", "":
|
||||
return 6 // Warning of undetected workers.
|
||||
default:
|
||||
return 9
|
||||
}
|
||||
}
|
51
service/mgr/worker_test.go
Normal file
51
service/mgr/worker_test.go
Normal file
|
@ -0,0 +1,51 @@
|
|||
package mgr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestWorkerInfo(t *testing.T) { //nolint:paralleltest
|
||||
mgr := New("test")
|
||||
mgr.Go("test func one", testFunc1)
|
||||
mgr.Go("test func two", testFunc2)
|
||||
mgr.Go("test func three", testFunc3)
|
||||
defer mgr.Cancel()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
info, err := mgr.WorkerInfo(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if info.Waiting != 3 {
|
||||
t.Errorf("expected three waiting workers")
|
||||
}
|
||||
|
||||
fmt.Printf("%+v\n", info)
|
||||
}
|
||||
|
||||
func testFunc1(ctx *WorkerCtx) error {
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func testFunc2(ctx *WorkerCtx) error {
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func testFunc3(ctx *WorkerCtx) error {
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -125,14 +125,15 @@ func (m *Manager) NewWorkerMgr(name string, fn func(w *WorkerCtx) error, errorFn
|
|||
run: make(chan struct{}, 1),
|
||||
selectAction: make(chan struct{}, 1),
|
||||
}
|
||||
wCtx.workerMgr = s
|
||||
|
||||
go s.taskMgr()
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *WorkerMgr) taskMgr() {
|
||||
s.mgr.workerStart()
|
||||
defer s.mgr.workerDone()
|
||||
s.mgr.workerStart(s.ctx)
|
||||
defer s.mgr.workerDone(s.ctx)
|
||||
|
||||
// If the task manager ends, end all descendants too.
|
||||
defer s.ctx.cancelCtx()
|
||||
|
@ -195,6 +196,7 @@ manage:
|
|||
workerMgr: s,
|
||||
logger: s.ctx.logger,
|
||||
}
|
||||
//nolint:fatcontext // Every run gets a new context.
|
||||
wCtx.ctx, wCtx.cancelCtx = context.WithCancel(s.ctx.ctx)
|
||||
panicInfo, err := s.mgr.runWorker(wCtx, s.fn)
|
||||
|
||||
|
@ -229,6 +231,23 @@ manage:
|
|||
}
|
||||
}
|
||||
|
||||
// Status returns the current status of the worker manager.
|
||||
func (s *WorkerMgr) Status() string {
|
||||
s.actionLock.Lock()
|
||||
defer s.actionLock.Unlock()
|
||||
|
||||
switch {
|
||||
case s.delay != nil:
|
||||
return "delayed"
|
||||
case s.repeat != nil:
|
||||
return "repeated every " + s.repeat.interval.String()
|
||||
case s.keepAlive != nil:
|
||||
return "on demand"
|
||||
default:
|
||||
return "created"
|
||||
}
|
||||
}
|
||||
|
||||
// Go executes the worker immediately.
|
||||
// If the worker is currently being executed,
|
||||
// the next execution will commence afterwards.
|
||||
|
|
|
@ -23,7 +23,7 @@ var (
|
|||
cfgFilterLists []string
|
||||
)
|
||||
|
||||
func registerConfigUpdater() error {
|
||||
func registerGlobalConfigProfileUpdater() error {
|
||||
module.instance.Config().EventConfigChange.AddCallback("update global config profile", func(wc *mgr.WorkerCtx, s struct{}) (cancel bool, err error) {
|
||||
return false, updateGlobalConfigProfile(wc.Ctx())
|
||||
})
|
||||
|
|
|
@ -61,10 +61,6 @@ func prep() error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := registerConfigUpdater(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := registerMigrations(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -118,6 +114,12 @@ func start() error {
|
|||
|
||||
module.mgr.Go("clean active profiles", cleanActiveProfiles)
|
||||
|
||||
// Register config callback when starting, as it depends on the updates module,
|
||||
// but the config system will already submit events earlier.
|
||||
if err := registerGlobalConfigProfileUpdater(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = updateGlobalConfigProfile(module.mgr.Ctx())
|
||||
if err != nil {
|
||||
log.Warningf("profile: error during loading global profile from configuration: %s", err)
|
||||
|
|
|
@ -86,22 +86,24 @@ func start() error {
|
|||
|
||||
enabled := config.GetAsBool("spn/enable", false)
|
||||
if enabled() {
|
||||
log.Info("spn: starting SPN")
|
||||
module.mgr.Go("ensure SPN is started", module.instance.SPNGroup().EnsureStartedWorker)
|
||||
} else {
|
||||
log.Info("spn: stopping SPN")
|
||||
module.mgr.Go("ensure SPN is stopped", module.instance.SPNGroup().EnsureStoppedWorker)
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
|
||||
// Load tokens from database.
|
||||
loadTokens()
|
||||
|
||||
// Check if we need to enable SPN now.
|
||||
enabled := config.GetAsBool("spn/enable", false)
|
||||
if enabled() {
|
||||
module.mgr.Go("ensure SPN is started", module.instance.SPNGroup().EnsureStartedWorker)
|
||||
}
|
||||
|
||||
// Load tokens from database.
|
||||
loadTokens()
|
||||
|
||||
// Register new task.
|
||||
module.updateAccountWorkerMgr.Delay(1 * time.Minute)
|
||||
}
|
||||
|
|
60
spn/debug.go
Normal file
60
spn/debug.go
Normal file
|
@ -0,0 +1,60 @@
|
|||
package spn
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"runtime"
|
||||
|
||||
"github.com/maruel/panicparse/v2/stack"
|
||||
|
||||
"github.com/safing/portmaster/base/utils/debug"
|
||||
"github.com/safing/portmaster/service/mgr"
|
||||
)
|
||||
|
||||
// GetWorkerInfo returns the worker info of all running workers.
|
||||
func (i *Instance) GetWorkerInfo() (*mgr.WorkerInfo, error) {
|
||||
snapshot, _, err := stack.ScanSnapshot(bytes.NewReader(fullStack()), io.Discard, stack.DefaultOpts())
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return nil, fmt.Errorf("get stack: %w", err)
|
||||
}
|
||||
|
||||
infos := make([]*mgr.WorkerInfo, 0, 32)
|
||||
for _, m := range i.serviceGroup.Modules() {
|
||||
wi, _ := m.Manager().WorkerInfo(snapshot) // Does not fail when we provide a snapshot.
|
||||
infos = append(infos, wi)
|
||||
}
|
||||
|
||||
return mgr.MergeWorkerInfo(infos...), nil
|
||||
}
|
||||
|
||||
// AddWorkerInfoToDebugInfo adds the worker info of all running workers to the debug info.
|
||||
func (i *Instance) AddWorkerInfoToDebugInfo(di *debug.Info) {
|
||||
info, err := i.GetWorkerInfo()
|
||||
if err != nil {
|
||||
di.AddSection(
|
||||
"Worker Status Failed",
|
||||
debug.UseCodeSection,
|
||||
err.Error(),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
di.AddSection(
|
||||
fmt.Sprintf("Worker Status: %d/%d (%d?)", info.Running, len(info.Workers), info.Missing+info.Other),
|
||||
debug.UseCodeSection,
|
||||
info.Format(),
|
||||
)
|
||||
}
|
||||
|
||||
func fullStack() []byte {
|
||||
buf := make([]byte, 8096)
|
||||
for {
|
||||
n := runtime.Stack(buf, true)
|
||||
if n < len(buf) {
|
||||
return buf[:n]
|
||||
}
|
||||
buf = make([]byte, 2*len(buf))
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue