Merge pull request #19 from safing/feature/cleanup-adaptions

Intermediate PR for publishing current state
This commit is contained in:
Daniel 2019-10-30 13:52:00 +01:00 committed by GitHub
commit a747272674
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 902 additions and 217 deletions

View file

@ -7,3 +7,5 @@ linters:
- funlen
- whitespace
- wsl
- godox

View file

@ -1,5 +1,8 @@
language: go
go:
- 1.x
os:
- linux
- windows

View file

@ -10,7 +10,12 @@ import (
"github.com/safing/portmaster/core/structure"
)
const (
configChangeEvent = "config change"
)
var (
module *modules.Module
dataRoot *utils.DirStructure
)
@ -22,7 +27,8 @@ func SetDataRoot(root *utils.DirStructure) {
}
func init() {
modules.Register("config", prep, start, nil, "base", "database")
module = modules.Register("config", prep, start, nil, "base", "database")
module.RegisterEvent(configChangeEvent)
}
func prep() error {

View file

@ -17,9 +17,6 @@ var (
validityFlag = abool.NewBool(true)
validityFlagLock sync.RWMutex
changedSignal = make(chan struct{})
changedSignalLock sync.Mutex
)
func getValidityFlag() *abool.AtomicBool {
@ -28,13 +25,6 @@ func getValidityFlag() *abool.AtomicBool {
return validityFlag
}
// Changed signals if any config option was changed.
func Changed() <-chan struct{} {
changedSignalLock.Lock()
defer changedSignalLock.Unlock()
return changedSignal
}
func signalChanges() {
// refetch and save release level and expertise level
updateReleaseLevel()
@ -46,11 +36,7 @@ func signalChanges() {
validityFlag = abool.NewBool(true)
validityFlagLock.Unlock()
// trigger change signal: signal listeners that a config option was changed.
changedSignalLock.Lock()
close(changedSignal)
changedSignal = make(chan struct{})
changedSignalLock.Unlock()
module.TriggerEvent(configChangeEvent, nil)
}
// setConfig sets the (prioritized) user defined config.

View file

@ -14,6 +14,7 @@ type snippet struct {
}
// ParseQuery parses a plaintext query. Special characters (that must be escaped with a '\') are: `\()` and any whitespaces.
//nolint:gocognit
func ParseQuery(query string) (*Query, error) {
snippets, err := extractSnippets(query)
if err != nil {
@ -195,6 +196,7 @@ func extractSnippets(text string) (snippets []*snippet, err error) {
}
//nolint:gocognit
func parseAndOr(getSnippet func() (*snippet, error), remainingSnippets func() int, rootCondition bool) (Condition, error) {
var isOr = false
var typeSet = false

View file

@ -14,14 +14,14 @@ var (
)
// GenCodeSize returns the size of the gencode marshalled byte slice
func (d *Meta) GenCodeSize() (s int) {
func (m *Meta) GenCodeSize() (s int) {
s += 34
return
}
// GenCodeMarshal gencode marshalls Meta into the given byte array, or a new one if its too small.
func (d *Meta) GenCodeMarshal(buf []byte) ([]byte, error) {
size := d.GenCodeSize()
func (m *Meta) GenCodeMarshal(buf []byte) ([]byte, error) {
size := m.GenCodeSize()
{
if cap(buf) >= size {
buf = buf[:size]
@ -33,89 +33,89 @@ func (d *Meta) GenCodeMarshal(buf []byte) ([]byte, error) {
{
buf[0+0] = byte(d.Created >> 0)
buf[0+0] = byte(m.Created >> 0)
buf[1+0] = byte(d.Created >> 8)
buf[1+0] = byte(m.Created >> 8)
buf[2+0] = byte(d.Created >> 16)
buf[2+0] = byte(m.Created >> 16)
buf[3+0] = byte(d.Created >> 24)
buf[3+0] = byte(m.Created >> 24)
buf[4+0] = byte(d.Created >> 32)
buf[4+0] = byte(m.Created >> 32)
buf[5+0] = byte(d.Created >> 40)
buf[5+0] = byte(m.Created >> 40)
buf[6+0] = byte(d.Created >> 48)
buf[6+0] = byte(m.Created >> 48)
buf[7+0] = byte(d.Created >> 56)
buf[7+0] = byte(m.Created >> 56)
}
{
buf[0+8] = byte(d.Modified >> 0)
buf[0+8] = byte(m.Modified >> 0)
buf[1+8] = byte(d.Modified >> 8)
buf[1+8] = byte(m.Modified >> 8)
buf[2+8] = byte(d.Modified >> 16)
buf[2+8] = byte(m.Modified >> 16)
buf[3+8] = byte(d.Modified >> 24)
buf[3+8] = byte(m.Modified >> 24)
buf[4+8] = byte(d.Modified >> 32)
buf[4+8] = byte(m.Modified >> 32)
buf[5+8] = byte(d.Modified >> 40)
buf[5+8] = byte(m.Modified >> 40)
buf[6+8] = byte(d.Modified >> 48)
buf[6+8] = byte(m.Modified >> 48)
buf[7+8] = byte(d.Modified >> 56)
buf[7+8] = byte(m.Modified >> 56)
}
{
buf[0+16] = byte(d.Expires >> 0)
buf[0+16] = byte(m.Expires >> 0)
buf[1+16] = byte(d.Expires >> 8)
buf[1+16] = byte(m.Expires >> 8)
buf[2+16] = byte(d.Expires >> 16)
buf[2+16] = byte(m.Expires >> 16)
buf[3+16] = byte(d.Expires >> 24)
buf[3+16] = byte(m.Expires >> 24)
buf[4+16] = byte(d.Expires >> 32)
buf[4+16] = byte(m.Expires >> 32)
buf[5+16] = byte(d.Expires >> 40)
buf[5+16] = byte(m.Expires >> 40)
buf[6+16] = byte(d.Expires >> 48)
buf[6+16] = byte(m.Expires >> 48)
buf[7+16] = byte(d.Expires >> 56)
buf[7+16] = byte(m.Expires >> 56)
}
{
buf[0+24] = byte(d.Deleted >> 0)
buf[0+24] = byte(m.Deleted >> 0)
buf[1+24] = byte(d.Deleted >> 8)
buf[1+24] = byte(m.Deleted >> 8)
buf[2+24] = byte(d.Deleted >> 16)
buf[2+24] = byte(m.Deleted >> 16)
buf[3+24] = byte(d.Deleted >> 24)
buf[3+24] = byte(m.Deleted >> 24)
buf[4+24] = byte(d.Deleted >> 32)
buf[4+24] = byte(m.Deleted >> 32)
buf[5+24] = byte(d.Deleted >> 40)
buf[5+24] = byte(m.Deleted >> 40)
buf[6+24] = byte(d.Deleted >> 48)
buf[6+24] = byte(m.Deleted >> 48)
buf[7+24] = byte(d.Deleted >> 56)
buf[7+24] = byte(m.Deleted >> 56)
}
{
if d.secret {
if m.secret {
buf[32] = 1
} else {
buf[32] = 0
}
}
{
if d.cronjewel {
if m.cronjewel {
buf[33] = 1
} else {
buf[33] = 0
@ -125,38 +125,38 @@ func (d *Meta) GenCodeMarshal(buf []byte) ([]byte, error) {
}
// GenCodeUnmarshal gencode unmarshalls Meta and returns the bytes read.
func (d *Meta) GenCodeUnmarshal(buf []byte) (uint64, error) {
if len(buf) < d.GenCodeSize() {
return 0, fmt.Errorf("insufficient data: got %d out of %d bytes", len(buf), d.GenCodeSize())
func (m *Meta) GenCodeUnmarshal(buf []byte) (uint64, error) {
if len(buf) < m.GenCodeSize() {
return 0, fmt.Errorf("insufficient data: got %d out of %d bytes", len(buf), m.GenCodeSize())
}
i := uint64(0)
{
d.Created = 0 | (int64(buf[0+0]) << 0) | (int64(buf[1+0]) << 8) | (int64(buf[2+0]) << 16) | (int64(buf[3+0]) << 24) | (int64(buf[4+0]) << 32) | (int64(buf[5+0]) << 40) | (int64(buf[6+0]) << 48) | (int64(buf[7+0]) << 56)
m.Created = 0 | (int64(buf[0+0]) << 0) | (int64(buf[1+0]) << 8) | (int64(buf[2+0]) << 16) | (int64(buf[3+0]) << 24) | (int64(buf[4+0]) << 32) | (int64(buf[5+0]) << 40) | (int64(buf[6+0]) << 48) | (int64(buf[7+0]) << 56)
}
{
d.Modified = 0 | (int64(buf[0+8]) << 0) | (int64(buf[1+8]) << 8) | (int64(buf[2+8]) << 16) | (int64(buf[3+8]) << 24) | (int64(buf[4+8]) << 32) | (int64(buf[5+8]) << 40) | (int64(buf[6+8]) << 48) | (int64(buf[7+8]) << 56)
m.Modified = 0 | (int64(buf[0+8]) << 0) | (int64(buf[1+8]) << 8) | (int64(buf[2+8]) << 16) | (int64(buf[3+8]) << 24) | (int64(buf[4+8]) << 32) | (int64(buf[5+8]) << 40) | (int64(buf[6+8]) << 48) | (int64(buf[7+8]) << 56)
}
{
d.Expires = 0 | (int64(buf[0+16]) << 0) | (int64(buf[1+16]) << 8) | (int64(buf[2+16]) << 16) | (int64(buf[3+16]) << 24) | (int64(buf[4+16]) << 32) | (int64(buf[5+16]) << 40) | (int64(buf[6+16]) << 48) | (int64(buf[7+16]) << 56)
m.Expires = 0 | (int64(buf[0+16]) << 0) | (int64(buf[1+16]) << 8) | (int64(buf[2+16]) << 16) | (int64(buf[3+16]) << 24) | (int64(buf[4+16]) << 32) | (int64(buf[5+16]) << 40) | (int64(buf[6+16]) << 48) | (int64(buf[7+16]) << 56)
}
{
d.Deleted = 0 | (int64(buf[0+24]) << 0) | (int64(buf[1+24]) << 8) | (int64(buf[2+24]) << 16) | (int64(buf[3+24]) << 24) | (int64(buf[4+24]) << 32) | (int64(buf[5+24]) << 40) | (int64(buf[6+24]) << 48) | (int64(buf[7+24]) << 56)
m.Deleted = 0 | (int64(buf[0+24]) << 0) | (int64(buf[1+24]) << 8) | (int64(buf[2+24]) << 16) | (int64(buf[3+24]) << 24) | (int64(buf[4+24]) << 32) | (int64(buf[5+24]) << 40) | (int64(buf[6+24]) << 48) | (int64(buf[7+24]) << 56)
}
{
d.secret = buf[32] == 1
m.secret = buf[32] == 1
}
{
d.cronjewel = buf[33] == 1
m.cronjewel = buf[33] == 1
}
return i + 34, nil
}

View file

@ -139,6 +139,7 @@ func saveRegistry(lock bool) error {
}
// write file
// FIXME: write atomically (best effort)
filePath := path.Join(rootStructure.Path, registryFileName)
return ioutil.WriteFile(filePath, data, 0600)
}

View file

@ -118,6 +118,7 @@ func (b *Badger) Query(q *query.Query, local, internal bool) (*iterator.Iterator
return queryIter, nil
}
//nolint:gocognit
func (b *Badger) queryExecutor(queryIter *iterator.Iterator, q *query.Query, local, internal bool) {
err := b.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)

View file

@ -31,7 +31,7 @@ type TestRecord struct {
B bool
}
func TestBadger(t *testing.T) {
func TestBBolt(t *testing.T) {
testDir, err := ioutil.TempDir("", "testing-")
if err != nil {
t.Fatal(err)

View file

@ -0,0 +1,140 @@
package hashmap
import (
"errors"
"fmt"
"sync"
"time"
"github.com/safing/portbase/database/iterator"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/database/storage"
)
// HashMap storage.
type HashMap struct {
name string
db map[string]record.Record
dbLock sync.RWMutex
}
func init() {
_ = storage.Register("hashmap", NewHashMap)
}
// NewHashMap creates a hashmap database.
func NewHashMap(name, location string) (storage.Interface, error) {
return &HashMap{
name: name,
db: make(map[string]record.Record),
}, nil
}
// Get returns a database record.
func (hm *HashMap) Get(key string) (record.Record, error) {
hm.dbLock.RLock()
defer hm.dbLock.RUnlock()
r, ok := hm.db[key]
if !ok {
return nil, storage.ErrNotFound
}
return r, nil
}
// Put stores a record in the database.
func (hm *HashMap) Put(r record.Record) error {
hm.dbLock.Lock()
defer hm.dbLock.Unlock()
hm.db[r.DatabaseKey()] = r
return nil
}
// Delete deletes a record from the database.
func (hm *HashMap) Delete(key string) error {
hm.dbLock.Lock()
defer hm.dbLock.Unlock()
delete(hm.db, key)
return nil
}
// Query returns a an iterator for the supplied query.
func (hm *HashMap) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
_, err := q.Check()
if err != nil {
return nil, fmt.Errorf("invalid query: %s", err)
}
queryIter := iterator.New()
go hm.queryExecutor(queryIter, q, local, internal)
return queryIter, nil
}
func (hm *HashMap) queryExecutor(queryIter *iterator.Iterator, q *query.Query, local, internal bool) {
hm.dbLock.RLock()
defer hm.dbLock.RUnlock()
var err error
mapLoop:
for key, record := range hm.db {
switch {
case !q.MatchesKey(key):
continue
case !q.MatchesRecord(record):
continue
case !record.Meta().CheckValidity():
continue
case !record.Meta().CheckPermission(local, internal):
continue
}
select {
case <-queryIter.Done:
break mapLoop
case queryIter.Next <- record:
default:
select {
case <-queryIter.Done:
break mapLoop
case queryIter.Next <- record:
case <-time.After(1 * time.Second):
err = errors.New("query timeout")
break mapLoop
}
}
}
queryIter.Finish(err)
}
// ReadOnly returns whether the database is read only.
func (hm *HashMap) ReadOnly() bool {
return false
}
// Injected returns whether the database is injected.
func (hm *HashMap) Injected() bool {
return false
}
// Maintain runs a light maintenance operation on the database.
func (hm *HashMap) Maintain() error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (hm *HashMap) MaintainThorough() (err error) {
return nil
}
// Shutdown shuts down the database.
func (hm *HashMap) Shutdown() error {
return nil
}

View file

@ -0,0 +1,147 @@
//nolint:unparam,maligned
package hashmap
import (
"reflect"
"sync"
"testing"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
)
type TestRecord struct {
record.Base
sync.Mutex
S string
I int
I8 int8
I16 int16
I32 int32
I64 int64
UI uint
UI8 uint8
UI16 uint16
UI32 uint32
UI64 uint64
F32 float32
F64 float64
B bool
}
func TestHashMap(t *testing.T) {
// start
db, err := NewHashMap("test", "")
if err != nil {
t.Fatal(err)
}
a := &TestRecord{
S: "banana",
I: 42,
I8: 42,
I16: 42,
I32: 42,
I64: 42,
UI: 42,
UI8: 42,
UI16: 42,
UI32: 42,
UI64: 42,
F32: 42.42,
F64: 42.42,
B: true,
}
a.SetMeta(&record.Meta{})
a.Meta().Update()
a.SetKey("test:A")
// put record
err = db.Put(a)
if err != nil {
t.Fatal(err)
}
// get and compare
a1, err := db.Get("A")
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(a, a1) {
t.Fatalf("mismatch, got %v", a1)
}
// setup query test records
qA := &TestRecord{}
qA.SetKey("test:path/to/A")
qA.CreateMeta()
qB := &TestRecord{}
qB.SetKey("test:path/to/B")
qB.CreateMeta()
qC := &TestRecord{}
qC.SetKey("test:path/to/C")
qC.CreateMeta()
qZ := &TestRecord{}
qZ.SetKey("test:z")
qZ.CreateMeta()
// put
err = db.Put(qA)
if err == nil {
err = db.Put(qB)
}
if err == nil {
err = db.Put(qC)
}
if err == nil {
err = db.Put(qZ)
}
if err != nil {
t.Fatal(err)
}
// test query
q := query.New("test:path/to/").MustBeValid()
it, err := db.Query(q, true, true)
if err != nil {
t.Fatal(err)
}
cnt := 0
for range it.Next {
cnt++
}
if it.Err() != nil {
t.Fatal(it.Err())
}
if cnt != 3 {
t.Fatalf("unexpected query result count: %d", cnt)
}
// delete
err = db.Delete("A")
if err != nil {
t.Fatal(err)
}
// check if its gone
_, err = db.Get("A")
if err == nil {
t.Fatal("should fail")
}
// maintenance
err = db.Maintain()
if err != nil {
t.Fatal(err)
}
err = db.MaintainThorough()
if err != nil {
t.Fatal(err)
}
// shutdown
err = db.Shutdown()
if err != nil {
t.Fatal(err)
}
}

View file

@ -1,4 +1,4 @@
//nolint:maligned,unparam,gocyclo
//nolint:maligned,unparam,gocyclo,gocognit
package dsd
import (

View file

@ -1,4 +1,4 @@
//nolint:nakedret,unconvert
//nolint:nakedret,unconvert,gocognit
package dsd
import (

View file

@ -1,7 +1,9 @@
package varint
import "errors"
import "encoding/binary"
import (
"encoding/binary"
"errors"
)
// Pack8 packs a uint8 into a VarInt.
func Pack8(n uint8) []byte {

View file

@ -1,3 +1,4 @@
//nolint:gocognit
package varint
import (

View file

@ -15,7 +15,7 @@ var (
)
func init() {
modules.Register("info", prep, nil, nil, "base")
modules.Register("info", prep, nil, nil)
flag.BoolVar(&showVersion, "version", false, "show version and exit")
}
@ -35,8 +35,10 @@ func prep() error {
// CheckVersion checks if the metadata is ok.
func CheckVersion() error {
if !strings.HasSuffix(os.Args[0], ".test") {
if name == "[NAME]" ||
version == "[version unknown]" ||
if name == "[NAME]" {
return errors.New("must call SetInfo() before calling CheckVersion()")
}
if version == "[version unknown]" ||
commit == "[commit unknown]" ||
license == "[license unknown]" ||
buildOptions == "[options unknown]" ||

View file

@ -75,15 +75,21 @@ func log(level Severity, msg string, tracer *ContextTracer) {
select {
case logBuffer <- log:
default:
forceEmptyingOfBuffer <- struct{}{}
logBuffer <- log
forceEmptyingLoop:
// force empty buffer until we can send to it
for {
select {
case forceEmptyingOfBuffer <- struct{}{}:
case logBuffer <- log:
break forceEmptyingLoop
}
}
}
// wake up writer if necessary
if logsWaitingFlag.SetToIf(false, true) {
logsWaiting <- struct{}{}
}
}
func fastcheck(level Severity) bool {

View file

@ -70,7 +70,7 @@ const (
var (
logBuffer chan *logLine
forceEmptyingOfBuffer chan struct{}
forceEmptyingOfBuffer = make(chan struct{})
logLevelInt = uint32(3)
logLevel = &logLevelInt
@ -79,7 +79,7 @@ var (
pkgLevels = make(map[string]Severity)
pkgLevelsLock sync.Mutex
logsWaiting = make(chan struct{}, 1)
logsWaiting = make(chan struct{}, 4)
logsWaitingFlag = abool.NewBool(false)
shutdownSignal = make(chan struct{})
@ -90,7 +90,7 @@ var (
startedSignal = make(chan struct{})
)
// SetPkgLevels sets individual log levels for packages.
// SetPkgLevels sets individual log levels for packages. Only effective after Start().
func SetPkgLevels(levels map[string]Severity) {
pkgLevelsLock.Lock()
pkgLevels = levels
@ -103,7 +103,7 @@ func UnSetPkgLevels() {
pkgLevelsActive.UnSet()
}
// SetLogLevel sets a new log level.
// SetLogLevel sets a new log level. Only effective after Start().
func SetLogLevel(level Severity) {
atomic.StoreUint32(logLevel, uint32(level))
}
@ -135,11 +135,10 @@ func Start() (err error) {
}
logBuffer = make(chan *logLine, 1024)
forceEmptyingOfBuffer = make(chan struct{}, 16)
initialLogLevel := ParseLevel(logLevelFlag)
if initialLogLevel > 0 {
atomic.StoreUint32(logLevel, uint32(initialLogLevel))
SetLogLevel(initialLogLevel)
} else {
err = fmt.Errorf("log warning: invalid log level \"%s\", falling back to level info", logLevelFlag)
fmt.Fprintf(os.Stderr, "%s\n", err.Error())

View file

@ -2,6 +2,8 @@ package log
import (
"fmt"
"os"
"runtime/debug"
"time"
)
@ -39,37 +41,73 @@ func writeLine(line *logLine, duplicates uint64) {
}
func startWriter() {
shutdownWaitGroup.Add(1)
fmt.Println(fmt.Sprintf("%s%s %s BOF%s", InfoLevel.color(), time.Now().Format(timeFormat), rightArrow, endColor()))
go writer()
shutdownWaitGroup.Add(1)
go writerManager()
}
func writer() {
var line *logLine
var lastLine *logLine
var duplicates uint64
func writerManager() {
defer shutdownWaitGroup.Done()
for {
err := writer()
if err != nil {
Errorf("log: writer failed: %s", err)
} else {
return
}
}
}
func writer() (err error) {
defer func() {
// recover from panic
panicVal := recover()
if panicVal != nil {
err = fmt.Errorf("%s", panicVal)
// write stack to stderr
fmt.Fprintf(
os.Stderr,
`===== Error Report =====
Message: %s
StackTrace:
%s
===== End of Report =====
`,
err,
string(debug.Stack()),
)
}
}()
var currentLine *logLine
var nextLine *logLine
var duplicates uint64
for {
// reset
line = nil
lastLine = nil //nolint:ineffassign // only ineffectual in first loop
currentLine = nil
nextLine = nil
duplicates = 0
// wait until logs need to be processed
select {
case <-logsWaiting:
case <-logsWaiting: // normal process
logsWaitingFlag.UnSet()
case <-shutdownSignal:
case <-forceEmptyingOfBuffer: // log buffer is full!
case <-shutdownSignal: // shutting down
finalizeWriting()
return
}
// wait for timeslot to log, or when buffer is full
// wait for timeslot to log
select {
case <-writeTrigger:
case <-forceEmptyingOfBuffer:
case <-shutdownSignal:
case <-writeTrigger: // normal process
case <-forceEmptyingOfBuffer: // log buffer is full!
case <-shutdownSignal: // shutting down
finalizeWriting()
return
}
@ -78,38 +116,41 @@ func writer() {
writeLoop:
for {
select {
case line = <-logBuffer:
// look-ahead for deduplication (best effort)
dedupLoop:
for {
// check if there is another line waiting
select {
case nextLine := <-logBuffer:
lastLine = line
line = nextLine
default:
break dedupLoop
}
// deduplication
if !line.Equal(lastLine) {
// no duplicate
break dedupLoop
}
// duplicate
duplicates++
case nextLine = <-logBuffer:
// first line we process, just assign to currentLine
if currentLine == nil {
currentLine = nextLine
continue writeLoop
}
// write actual line
writeLine(line, duplicates)
// we now have currentLine and nextLine
// if currentLine and nextLine are equal, do not print, just increase counter and continue
if nextLine.Equal(currentLine) {
duplicates++
continue writeLoop
}
// if currentLine and line are _not_ equal, output currentLine
writeLine(currentLine, duplicates)
// reset duplicate counter
duplicates = 0
// set new currentLine
currentLine = nextLine
default:
break writeLoop
}
}
// write final line
if currentLine != nil {
writeLine(currentLine, duplicates)
}
// reset state
currentLine = nil //nolint:ineffassign
nextLine = nil
duplicates = 0 //nolint:ineffassign
// back down a little
select {
case <-time.After(10 * time.Millisecond):

View file

@ -83,7 +83,7 @@ func Tracer(ctx context.Context) *ContextTracer {
// Submit collected logs on the context for further processing/outputting. Does nothing if called on a nil ContextTracer.
func (tracer *ContextTracer) Submit() {
if tracer != nil {
if tracer == nil {
return
}
@ -119,15 +119,21 @@ func (tracer *ContextTracer) Submit() {
select {
case logBuffer <- log:
default:
forceEmptyingOfBuffer <- struct{}{}
logBuffer <- log
forceEmptyingLoop:
// force empty buffer until we can send to it
for {
select {
case forceEmptyingOfBuffer <- struct{}{}:
case logBuffer <- log:
break forceEmptyingLoop
}
}
}
// wake up writer if necessary
if logsWaitingFlag.SetToIf(false, true) {
logsWaiting <- struct{}{}
}
}
func (tracer *ContextTracer) log(level Severity, msg string) {

View file

@ -2,11 +2,16 @@ package modules
import (
"fmt"
"os"
"runtime/debug"
"sync"
"time"
)
var (
errorReportingChannel chan *ModuleError
reportToStdErr = true
reportingLock sync.RWMutex
)
// ModuleError wraps a panic, error or message into an error that can be reported.
@ -64,12 +69,43 @@ func (me *ModuleError) Error() string {
// Report reports the error through the configured reporting channel.
func (me *ModuleError) Report() {
reportingLock.RLock()
defer reportingLock.RUnlock()
if errorReportingChannel != nil {
select {
case errorReportingChannel <- me:
default:
}
}
if reportToStdErr {
// default to writing to stderr
fmt.Fprintf(
os.Stderr,
`===== Error Report =====
Message: %s
Timestamp: %s
ModuleName: %s
TaskName: %s
TaskType: %s
Severity: %s
PanicValue: %s
StackTrace:
%s
===== End of Report =====
`,
me.Message,
time.Now(),
me.ModuleName,
me.TaskName,
me.TaskType,
me.Severity,
me.PanicValue,
me.StackTrace,
)
}
}
// IsPanic returns whether the given error is a wrapped panic by the modules package and additionally returns it, if true.
@ -84,7 +120,16 @@ func IsPanic(err error) (bool, *ModuleError) {
// SetErrorReportingChannel sets the channel to report module errors through. By default only panics are reported, all other errors need to be manually wrapped into a *ModuleError and reported.
func SetErrorReportingChannel(reportingChannel chan *ModuleError) {
if errorReportingChannel == nil {
errorReportingChannel = reportingChannel
}
reportingLock.Lock()
defer reportingLock.Unlock()
errorReportingChannel = reportingChannel
}
// SetStdErrReporting controls error reporting to stderr.
func SetStdErrReporting(on bool) {
reportingLock.Lock()
defer reportingLock.Unlock()
reportToStdErr = on
}

103
modules/events.go Normal file
View file

@ -0,0 +1,103 @@
package modules
import (
"context"
"fmt"
"github.com/safing/portbase/log"
)
type eventHookFn func(context.Context, interface{}) error
type eventHook struct {
description string
hookingModule *Module
hookFn eventHookFn
}
// TriggerEvent executes all hook functions registered to the specified event.
func (m *Module) TriggerEvent(event string, data interface{}) {
go m.processEventTrigger(event, data)
}
func (m *Module) processEventTrigger(event string, data interface{}) {
m.eventHooksLock.RLock()
defer m.eventHooksLock.RUnlock()
hooks, ok := m.eventHooks[event]
if !ok {
log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, event)
return
}
for _, hook := range hooks {
if !hook.hookingModule.ShutdownInProgress() {
go m.runEventHook(hook, event, data)
}
}
}
func (m *Module) runEventHook(hook *eventHook, event string, data interface{}) {
if !hook.hookingModule.Started.IsSet() {
// target module has not yet fully started, wait until start is complete
select {
case <-startCompleteSignal:
case <-shutdownSignal:
return
}
}
err := hook.hookingModule.RunWorker(
fmt.Sprintf("event hook %s/%s -> %s/%s", m.Name, event, hook.hookingModule.Name, hook.description),
func(ctx context.Context) error {
return hook.hookFn(ctx, data)
},
)
if err != nil {
log.Warningf("%s: failed to execute event hook %s/%s -> %s/%s: %s", hook.hookingModule.Name, m.Name, event, hook.hookingModule.Name, hook.description, err)
}
}
// RegisterEvent registers a new event to allow for registering hooks.
func (m *Module) RegisterEvent(event string) {
m.eventHooksLock.Lock()
defer m.eventHooksLock.Unlock()
_, ok := m.eventHooks[event]
if !ok {
m.eventHooks[event] = make([]*eventHook, 0, 1)
}
}
// RegisterEventHook registers a hook function with (another) modules' event. Whenever a hook is triggered and the receiving module has not yet fully started, hook execution will be delayed until all modules completed starting.
func (m *Module) RegisterEventHook(module string, event string, description string, fn func(context.Context, interface{}) error) error {
// get target module
var eventModule *Module
if module == m.Name {
eventModule = m
} else {
var ok bool
modulesLock.RLock()
eventModule, ok = modules[module]
modulesLock.RUnlock()
if !ok {
return fmt.Errorf(`module "%s" does not exist`, module)
}
}
// get target event
eventModule.eventHooksLock.Lock()
defer eventModule.eventHooksLock.Unlock()
hooks, ok := eventModule.eventHooks[event]
if !ok {
return fmt.Errorf(`event "%s/%s" does not exist`, eventModule.Name, event)
}
// add hook
eventModule.eventHooks[event] = append(hooks, &eventHook{
description: description,
hookingModule: m,
hookFn: fn,
})
return nil
}

16
modules/exit.go Normal file
View file

@ -0,0 +1,16 @@
package modules
var (
exitStatusCode int
)
// SetExitStatusCode sets the exit code that the program shell return to the host after shutdown.
func SetExitStatusCode(n int) {
exitStatusCode = n
}
// GetExitStatusCode waits for the shutdown to complete and then returns the exit code
func GetExitStatusCode() int {
<-shutdownCompleteSignal
return exitStatusCode
}

View file

@ -45,19 +45,49 @@ func SetMaxConcurrentMicroTasks(n int) {
}
}
// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) error {
// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The call starts a new goroutine and returns immediately. The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) {
go func() {
err := m.RunMicroTask(name, fn)
if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err)
}
}()
}
// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 3 seconds). The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) {
go func() {
err := m.RunMediumPriorityMicroTask(name, fn)
if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err)
}
}()
}
// StartLowPriorityMicroTask starts a new MicroTask with low priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 15 seconds). The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) {
go func() {
err := m.RunLowPriorityMicroTask(name, fn)
if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err)
}
}()
}
// RunMicroTask runs a new MicroTask with high priority. It will start immediately. The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) RunMicroTask(name *string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
return errNoModule
}
atomic.AddInt32(microTasks, 1)
atomic.AddInt32(microTasks, 1) // increase global counter here, as high priority tasks are not started by the scheduler, where this counter is usually increased
return m.runMicroTask(name, fn)
}
// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. It will wait until given a go (max 3 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) error {
// RunMediumPriorityMicroTask runs a new MicroTask with medium priority. It will wait until a slot becomes available (max 3 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) RunMediumPriorityMicroTask(name *string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
return errNoModule
@ -76,8 +106,8 @@ func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Cont
return m.runMicroTask(name, fn)
}
// StartLowPriorityMicroTask starts a new MicroTask with low priority. It will wait until given a go (max 15 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied.
func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) error {
// RunLowPriorityMicroTask runs a new MicroTask with low priority. It will wait until a slot becomes available (max 15 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) RunLowPriorityMicroTask(name *string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
return errNoModule
@ -109,7 +139,7 @@ func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err
if panicVal != nil {
me := m.NewPanicError(*name, "microtask", panicVal)
me.Report()
log.Errorf("%s: microtask %s panicked: %s\n%s", m.Name, *name, panicVal, me.StackTrace)
log.Errorf("%s: microtask %s panicked: %s", m.Name, *name, panicVal)
err = me
}
@ -165,7 +195,10 @@ microTaskManageLoop:
atomic.AddInt32(microTasks, 1)
} else {
// wait for signal that a task was completed
<-microTaskFinished
select {
case <-microTaskFinished:
case <-time.After(1 * time.Second):
}
}
}

View file

@ -42,7 +42,7 @@ func TestMicroTaskWaiting(t *testing.T) {
go func() {
defer mtwWaitGroup.Done()
// exec at slot 1
_ = mtModule.StartMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "1" // slot 1
time.Sleep(mtwSleepDuration * 5)
mtwOutputChannel <- "2" // slot 5
@ -53,7 +53,7 @@ func TestMicroTaskWaiting(t *testing.T) {
time.Sleep(mtwSleepDuration * 1)
// clear clearances
_ = mtModule.StartMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error {
return nil
})
@ -61,7 +61,7 @@ func TestMicroTaskWaiting(t *testing.T) {
go func() {
defer mtwWaitGroup.Done()
// exec at slot 2
_ = mtModule.StartLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "7" // slot 16
return nil
})
@ -74,7 +74,7 @@ func TestMicroTaskWaiting(t *testing.T) {
defer mtwWaitGroup.Done()
time.Sleep(mtwSleepDuration * 8)
// exec at slot 10
_ = mtModule.StartMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "4" // slot 10
time.Sleep(mtwSleepDuration * 5)
mtwOutputChannel <- "6" // slot 15
@ -86,7 +86,7 @@ func TestMicroTaskWaiting(t *testing.T) {
go func() {
defer mtwWaitGroup.Done()
// exec at slot 3
_ = mtModule.StartMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "3" // slot 6
time.Sleep(mtwSleepDuration * 7)
mtwOutputChannel <- "5" // slot 13
@ -122,7 +122,7 @@ var mtoWaitCh chan struct{}
func mediumPrioTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
_ = mtModule.StartMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
mtoOutputChannel <- "1"
time.Sleep(2 * time.Millisecond)
return nil
@ -132,7 +132,7 @@ func mediumPrioTaskTester() {
func lowPrioTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
_ = mtModule.StartLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
mtoOutputChannel <- "2"
time.Sleep(2 * time.Millisecond)
return nil

View file

@ -13,7 +13,7 @@ import (
)
var (
modulesLock sync.Mutex
modulesLock sync.RWMutex
modules = make(map[string]*Module)
// ErrCleanExit is returned by Start() when the program is interrupted before starting. This can happen for example, when using the "--help" flag.
@ -46,6 +46,10 @@ type Module struct {
microTaskCnt *int32
waitGroup sync.WaitGroup
// events
eventHooks map[string][]*eventHook
eventHooksLock sync.RWMutex
// dependency mgmt
depNames []string
depModules []*Module
@ -69,9 +73,9 @@ func (m *Module) shutdown() error {
// start shutdown function
m.waitGroup.Add(1)
stopFnError := make(chan error)
stopFnError := make(chan error, 1)
go func() {
stopFnError <- m.runModuleCtrlFn("stop module", m.stop)
stopFnError <- m.runCtrlFn("stop module", m.stop)
m.waitGroup.Done()
}()
@ -84,16 +88,8 @@ func (m *Module) shutdown() error {
// wait for results
select {
case err := <-stopFnError:
return err
case <-done:
select {
case err := <-stopFnError:
return err
default:
return nil
}
case <-time.After(3 * time.Second):
case <-time.After(30 * time.Second):
log.Warningf(
"%s: timed out while waiting for workers/tasks to finish: workers=%d tasks=%d microtasks=%d, continuing shutdown...",
m.Name,
@ -102,11 +98,18 @@ func (m *Module) shutdown() error {
atomic.LoadInt32(m.microTaskCnt),
)
}
return nil
}
func dummyAction() error {
return nil
// collect error
select {
case err := <-stopFnError:
return err
default:
log.Warningf(
"%s: timed out while waiting for stop function to finish, continuing shutdown...",
m.Name,
)
return nil
}
}
// Register registers a new module. The control functions `prep`, `start` and `stop` are technically optional. `stop` is called _after_ all added module workers finished.
@ -115,7 +118,14 @@ func Register(name string, prep, start, stop func() error, dependencies ...strin
modulesLock.Lock()
defer modulesLock.Unlock()
// check for already existing module
_, ok := modules[name]
if ok {
panic(fmt.Sprintf("modules: module %s is already registered", name))
}
// add new module
modules[name] = newModule
return newModule
}
@ -141,20 +151,10 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
prep: prep,
start: start,
stop: stop,
eventHooks: make(map[string][]*eventHook),
depNames: dependencies,
}
// replace nil arguments with dummy action
if newModule.prep == nil {
newModule.prep = dummyAction
}
if newModule.start == nil {
newModule.start = dummyAction
}
if newModule.stop == nil {
newModule.stop = dummyAction
}
return newModule
}

View file

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"runtime"
"time"
"github.com/safing/portbase/log"
"github.com/tevino/abool"
@ -26,8 +27,8 @@ func WaitForStartCompletion() <-chan struct{} {
// Start starts all modules in the correct order. In case of an error, it will automatically shutdown again.
func Start() error {
modulesLock.Lock()
defer modulesLock.Unlock()
modulesLock.RLock()
defer modulesLock.RUnlock()
// start microtask scheduler
go microTaskScheduler()
@ -106,7 +107,11 @@ func prepareModules() error {
go func() {
reports <- &report{
module: execM,
err: execM.runModuleCtrlFn("prep module", execM.prep),
err: execM.runCtrlFnWithTimeout(
"prep module",
10*time.Second,
execM.prep,
),
}
}()
}
@ -154,7 +159,11 @@ func startModules() error {
go func() {
reports <- &report{
module: execM,
err: execM.runModuleCtrlFn("start module", execM.start),
err: execM.runCtrlFnWithTimeout(
"start module",
60*time.Second,
execM.start,
),
}
}()
}

View file

@ -12,6 +12,8 @@ import (
var (
shutdownSignal = make(chan struct{})
shutdownSignalClosed = abool.NewBool(false)
shutdownCompleteSignal = make(chan struct{})
)
// ShuttingDown returns a channel read on the global shutdown signal.
@ -45,6 +47,7 @@ func Shutdown() error {
}
log.Shutdown()
close(shutdownCompleteSignal)
return err
}

View file

@ -3,6 +3,7 @@ package modules
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
@ -15,9 +16,21 @@ const (
)
var (
errNoModule = errors.New("missing module (is nil!)")
// ErrRestartNow may be returned (wrapped) by service workers to request an immediate restart.
ErrRestartNow = errors.New("requested restart")
errNoModule = errors.New("missing module (is nil!)")
)
// StartWorker directly starts a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to StartWorker starts a new goroutine and returns immediately.
func (m *Module) StartWorker(name string, fn func(context.Context) error) {
go func() {
err := m.RunWorker(name, fn)
if err != nil {
log.Warningf("%s: worker %s failed: %s", m.Name, name, err)
}
}()
}
// RunWorker directly runs a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to RunWorker blocks until the worker is finished.
func (m *Module) RunWorker(name string, fn func(context.Context) error) error {
if m == nil {
@ -66,18 +79,22 @@ func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn
err := m.runWorker(name, fn)
if err != nil {
// reset fail counter if running without error for some time
if time.Now().Add(-5 * time.Minute).After(lastFail) {
failCnt = 0
if !errors.Is(err, ErrRestartNow) {
// reset fail counter if running without error for some time
if time.Now().Add(-5 * time.Minute).After(lastFail) {
failCnt = 0
}
// increase fail counter and set last failed time
failCnt++
lastFail = time.Now()
// log error
sleepFor := time.Duration(failCnt) * backoffDuration
log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
time.Sleep(sleepFor)
// loop to restart
} else {
log.Infof("%s: service-worker %s %s - restarting now", m.Name, name, err)
}
// increase fail counter and set last failed time
failCnt++
lastFail = time.Now()
// log error
sleepFor := time.Duration(failCnt) * backoffDuration
log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor)
time.Sleep(sleepFor)
// loop to restart
} else {
// finish
return
@ -101,7 +118,27 @@ func (m *Module) runWorker(name string, fn func(context.Context) error) (err err
return
}
func (m *Module) runModuleCtrlFn(name string, fn func() error) (err error) {
func (m *Module) runCtrlFnWithTimeout(name string, timeout time.Duration, fn func() error) error {
stopFnError := make(chan error)
go func() {
stopFnError <- m.runCtrlFn(name, fn)
}()
// wait for results
select {
case err := <-stopFnError:
return err
case <-time.After(timeout):
return fmt.Errorf("timed out (%s)", timeout)
}
}
func (m *Module) runCtrlFn(name string, fn func() error) (err error) {
if fn == nil {
return
}
defer func() {
// recover from panic
panicVal := recover()

View file

@ -38,6 +38,7 @@ func noise() {
}
//nolint:gocognit
func main() {
// generates 1MB and writes to stdout

126
run/main.go Normal file
View file

@ -0,0 +1,126 @@
package run
import (
"bufio"
"flag"
"fmt"
"os"
"os/signal"
"runtime/pprof"
"syscall"
"time"
"github.com/safing/portbase/log"
"github.com/safing/portbase/modules"
)
var (
printStackOnExit bool
enableInputSignals bool
sigUSR1 = syscall.Signal(0xa) // dummy for windows
)
func init() {
flag.BoolVar(&printStackOnExit, "print-stack-on-exit", false, "prints the stack before of shutting down")
flag.BoolVar(&enableInputSignals, "input-signals", false, "emulate signals using stdin")
}
// Run execute a full program lifecycle (including signal handling) based on modules. Just empty-import required packages and do os.Exit(run.Run()).
func Run() int {
// Start
err := modules.Start()
if err != nil {
if err == modules.ErrCleanExit {
return 0
}
_ = modules.Shutdown()
return modules.GetExitStatusCode()
}
// Shutdown
// catch interrupt for clean shutdown
signalCh := make(chan os.Signal)
if enableInputSignals {
go inputSignals(signalCh)
}
signal.Notify(
signalCh,
os.Interrupt,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT,
sigUSR1,
)
signalLoop:
for {
select {
case sig := <-signalCh:
// only print and continue to wait if SIGUSR1
if sig == sigUSR1 {
_ = pprof.Lookup("goroutine").WriteTo(os.Stderr, 2)
continue signalLoop
}
fmt.Println(" <INTERRUPT>")
log.Warning("main: program was interrupted, shutting down.")
// catch signals during shutdown
go func() {
for {
<-signalCh
fmt.Println(" <INTERRUPT> again, but already shutting down")
}
}()
if printStackOnExit {
fmt.Println("=== PRINTING TRACES ===")
fmt.Println("=== GOROUTINES ===")
_ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 2)
fmt.Println("=== BLOCKING ===")
_ = pprof.Lookup("block").WriteTo(os.Stdout, 2)
fmt.Println("=== MUTEXES ===")
_ = pprof.Lookup("mutex").WriteTo(os.Stdout, 2)
fmt.Println("=== END TRACES ===")
}
go func() {
time.Sleep(60 * time.Second)
fmt.Fprintln(os.Stderr, "===== TAKING TOO LONG FOR SHUTDOWN - PRINTING STACK TRACES =====")
_ = pprof.Lookup("goroutine").WriteTo(os.Stderr, 2)
os.Exit(1)
}()
_ = modules.Shutdown()
break signalLoop
case <-modules.ShuttingDown():
break signalLoop
}
}
// wait for shutdown to complete, then exit
return modules.GetExitStatusCode()
}
func inputSignals(signalCh chan os.Signal) {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
switch scanner.Text() {
case "SIGHUP":
signalCh <- syscall.SIGHUP
case "SIGINT":
signalCh <- syscall.SIGINT
case "SIGQUIT":
signalCh <- syscall.SIGQUIT
case "SIGTERM":
signalCh <- syscall.SIGTERM
case "SIGUSR1":
signalCh <- sigUSR1
}
}
}

View file

@ -37,6 +37,5 @@ func (file *File) markActiveWithLocking() {
// update last used version
if file.resource.ActiveVersion != file.version {
file.resource.ActiveVersion = file.version
file.resource.registry.notifyOfChanges()
}
}

View file

@ -31,26 +31,3 @@ func (file *File) UpgradeAvailable() bool {
func (file *File) WaitForAvailableUpgrade() <-chan struct{} {
return file.notifier.notifyChannel
}
// registry wide change notifications
func (reg *ResourceRegistry) notifyOfChanges() {
if !reg.notifyHooksEnabled.IsSet() {
return
}
reg.RLock()
defer reg.RUnlock()
for _, hook := range reg.notifyHooks {
go hook()
}
}
// RegisterNotifyHook registers a function that is called (as a goroutine) every time the resource registry changes.
func (reg *ResourceRegistry) RegisterNotifyHook(fn func()) {
reg.Lock()
defer reg.Unlock()
reg.notifyHooks = append(reg.notifyHooks, fn)
}

View file

@ -120,13 +120,6 @@ func (reg *ResourceRegistry) AddResources(versions map[string]string, available,
// SelectVersions selects new resource versions depending on the current registry state.
func (reg *ResourceRegistry) SelectVersions() {
// only notify of changes after we are finished
reg.notifyHooksEnabled.UnSet()
defer func() {
reg.notifyHooksEnabled.Set()
reg.notifyOfChanges()
}()
reg.RLock()
defer reg.RUnlock()

View file

@ -166,8 +166,6 @@ func (res *Resource) selectVersion() {
res.notifier.markAsUpgradeable()
res.notifier = nil
}
res.registry.notifyOfChanges()
}()
if len(res.Versions) == 0 {