From a49f6c6fe5fb390db43923e1cb377a6426a408af Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 9 Oct 2019 16:17:37 +0200 Subject: [PATCH 01/14] Update linter settings --- .golangci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.golangci.yml b/.golangci.yml index ce0a12b..3c2d6b3 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -7,3 +7,5 @@ linters: - funlen - whitespace - wsl + - godox + From 2fb83cecb8d708abb08bf06d6a788dd263f17e0f Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 9 Oct 2019 16:18:16 +0200 Subject: [PATCH 02/14] Silence linter --- database/query/parser.go | 2 + database/record/meta-gencode.go | 92 +++++++++++++++---------------- database/storage/badger/badger.go | 1 + formats/dsd/dsd_test.go | 2 +- formats/dsd/gencode_test.go | 2 +- formats/varint/varint.go | 6 +- formats/varint/varint_test.go | 1 + rng/test/main.go | 1 + 8 files changed, 57 insertions(+), 50 deletions(-) diff --git a/database/query/parser.go b/database/query/parser.go index 42c07f7..54615e8 100644 --- a/database/query/parser.go +++ b/database/query/parser.go @@ -14,6 +14,7 @@ type snippet struct { } // ParseQuery parses a plaintext query. Special characters (that must be escaped with a '\') are: `\()` and any whitespaces. +//nolint:gocognit func ParseQuery(query string) (*Query, error) { snippets, err := extractSnippets(query) if err != nil { @@ -195,6 +196,7 @@ func extractSnippets(text string) (snippets []*snippet, err error) { } +//nolint:gocognit func parseAndOr(getSnippet func() (*snippet, error), remainingSnippets func() int, rootCondition bool) (Condition, error) { var isOr = false var typeSet = false diff --git a/database/record/meta-gencode.go b/database/record/meta-gencode.go index c0a2142..6494a1d 100644 --- a/database/record/meta-gencode.go +++ b/database/record/meta-gencode.go @@ -14,14 +14,14 @@ var ( ) // GenCodeSize returns the size of the gencode marshalled byte slice -func (d *Meta) GenCodeSize() (s int) { +func (m *Meta) GenCodeSize() (s int) { s += 34 return } // GenCodeMarshal gencode marshalls Meta into the given byte array, or a new one if its too small. -func (d *Meta) GenCodeMarshal(buf []byte) ([]byte, error) { - size := d.GenCodeSize() +func (m *Meta) GenCodeMarshal(buf []byte) ([]byte, error) { + size := m.GenCodeSize() { if cap(buf) >= size { buf = buf[:size] @@ -33,89 +33,89 @@ func (d *Meta) GenCodeMarshal(buf []byte) ([]byte, error) { { - buf[0+0] = byte(d.Created >> 0) + buf[0+0] = byte(m.Created >> 0) - buf[1+0] = byte(d.Created >> 8) + buf[1+0] = byte(m.Created >> 8) - buf[2+0] = byte(d.Created >> 16) + buf[2+0] = byte(m.Created >> 16) - buf[3+0] = byte(d.Created >> 24) + buf[3+0] = byte(m.Created >> 24) - buf[4+0] = byte(d.Created >> 32) + buf[4+0] = byte(m.Created >> 32) - buf[5+0] = byte(d.Created >> 40) + buf[5+0] = byte(m.Created >> 40) - buf[6+0] = byte(d.Created >> 48) + buf[6+0] = byte(m.Created >> 48) - buf[7+0] = byte(d.Created >> 56) + buf[7+0] = byte(m.Created >> 56) } { - buf[0+8] = byte(d.Modified >> 0) + buf[0+8] = byte(m.Modified >> 0) - buf[1+8] = byte(d.Modified >> 8) + buf[1+8] = byte(m.Modified >> 8) - buf[2+8] = byte(d.Modified >> 16) + buf[2+8] = byte(m.Modified >> 16) - buf[3+8] = byte(d.Modified >> 24) + buf[3+8] = byte(m.Modified >> 24) - buf[4+8] = byte(d.Modified >> 32) + buf[4+8] = byte(m.Modified >> 32) - buf[5+8] = byte(d.Modified >> 40) + buf[5+8] = byte(m.Modified >> 40) - buf[6+8] = byte(d.Modified >> 48) + buf[6+8] = byte(m.Modified >> 48) - buf[7+8] = byte(d.Modified >> 56) + buf[7+8] = byte(m.Modified >> 56) } { - buf[0+16] = byte(d.Expires >> 0) + buf[0+16] = byte(m.Expires >> 0) - buf[1+16] = byte(d.Expires >> 8) + buf[1+16] = byte(m.Expires >> 8) - buf[2+16] = byte(d.Expires >> 16) + buf[2+16] = byte(m.Expires >> 16) - buf[3+16] = byte(d.Expires >> 24) + buf[3+16] = byte(m.Expires >> 24) - buf[4+16] = byte(d.Expires >> 32) + buf[4+16] = byte(m.Expires >> 32) - buf[5+16] = byte(d.Expires >> 40) + buf[5+16] = byte(m.Expires >> 40) - buf[6+16] = byte(d.Expires >> 48) + buf[6+16] = byte(m.Expires >> 48) - buf[7+16] = byte(d.Expires >> 56) + buf[7+16] = byte(m.Expires >> 56) } { - buf[0+24] = byte(d.Deleted >> 0) + buf[0+24] = byte(m.Deleted >> 0) - buf[1+24] = byte(d.Deleted >> 8) + buf[1+24] = byte(m.Deleted >> 8) - buf[2+24] = byte(d.Deleted >> 16) + buf[2+24] = byte(m.Deleted >> 16) - buf[3+24] = byte(d.Deleted >> 24) + buf[3+24] = byte(m.Deleted >> 24) - buf[4+24] = byte(d.Deleted >> 32) + buf[4+24] = byte(m.Deleted >> 32) - buf[5+24] = byte(d.Deleted >> 40) + buf[5+24] = byte(m.Deleted >> 40) - buf[6+24] = byte(d.Deleted >> 48) + buf[6+24] = byte(m.Deleted >> 48) - buf[7+24] = byte(d.Deleted >> 56) + buf[7+24] = byte(m.Deleted >> 56) } { - if d.secret { + if m.secret { buf[32] = 1 } else { buf[32] = 0 } } { - if d.cronjewel { + if m.cronjewel { buf[33] = 1 } else { buf[33] = 0 @@ -125,38 +125,38 @@ func (d *Meta) GenCodeMarshal(buf []byte) ([]byte, error) { } // GenCodeUnmarshal gencode unmarshalls Meta and returns the bytes read. -func (d *Meta) GenCodeUnmarshal(buf []byte) (uint64, error) { - if len(buf) < d.GenCodeSize() { - return 0, fmt.Errorf("insufficient data: got %d out of %d bytes", len(buf), d.GenCodeSize()) +func (m *Meta) GenCodeUnmarshal(buf []byte) (uint64, error) { + if len(buf) < m.GenCodeSize() { + return 0, fmt.Errorf("insufficient data: got %d out of %d bytes", len(buf), m.GenCodeSize()) } i := uint64(0) { - d.Created = 0 | (int64(buf[0+0]) << 0) | (int64(buf[1+0]) << 8) | (int64(buf[2+0]) << 16) | (int64(buf[3+0]) << 24) | (int64(buf[4+0]) << 32) | (int64(buf[5+0]) << 40) | (int64(buf[6+0]) << 48) | (int64(buf[7+0]) << 56) + m.Created = 0 | (int64(buf[0+0]) << 0) | (int64(buf[1+0]) << 8) | (int64(buf[2+0]) << 16) | (int64(buf[3+0]) << 24) | (int64(buf[4+0]) << 32) | (int64(buf[5+0]) << 40) | (int64(buf[6+0]) << 48) | (int64(buf[7+0]) << 56) } { - d.Modified = 0 | (int64(buf[0+8]) << 0) | (int64(buf[1+8]) << 8) | (int64(buf[2+8]) << 16) | (int64(buf[3+8]) << 24) | (int64(buf[4+8]) << 32) | (int64(buf[5+8]) << 40) | (int64(buf[6+8]) << 48) | (int64(buf[7+8]) << 56) + m.Modified = 0 | (int64(buf[0+8]) << 0) | (int64(buf[1+8]) << 8) | (int64(buf[2+8]) << 16) | (int64(buf[3+8]) << 24) | (int64(buf[4+8]) << 32) | (int64(buf[5+8]) << 40) | (int64(buf[6+8]) << 48) | (int64(buf[7+8]) << 56) } { - d.Expires = 0 | (int64(buf[0+16]) << 0) | (int64(buf[1+16]) << 8) | (int64(buf[2+16]) << 16) | (int64(buf[3+16]) << 24) | (int64(buf[4+16]) << 32) | (int64(buf[5+16]) << 40) | (int64(buf[6+16]) << 48) | (int64(buf[7+16]) << 56) + m.Expires = 0 | (int64(buf[0+16]) << 0) | (int64(buf[1+16]) << 8) | (int64(buf[2+16]) << 16) | (int64(buf[3+16]) << 24) | (int64(buf[4+16]) << 32) | (int64(buf[5+16]) << 40) | (int64(buf[6+16]) << 48) | (int64(buf[7+16]) << 56) } { - d.Deleted = 0 | (int64(buf[0+24]) << 0) | (int64(buf[1+24]) << 8) | (int64(buf[2+24]) << 16) | (int64(buf[3+24]) << 24) | (int64(buf[4+24]) << 32) | (int64(buf[5+24]) << 40) | (int64(buf[6+24]) << 48) | (int64(buf[7+24]) << 56) + m.Deleted = 0 | (int64(buf[0+24]) << 0) | (int64(buf[1+24]) << 8) | (int64(buf[2+24]) << 16) | (int64(buf[3+24]) << 24) | (int64(buf[4+24]) << 32) | (int64(buf[5+24]) << 40) | (int64(buf[6+24]) << 48) | (int64(buf[7+24]) << 56) } { - d.secret = buf[32] == 1 + m.secret = buf[32] == 1 } { - d.cronjewel = buf[33] == 1 + m.cronjewel = buf[33] == 1 } return i + 34, nil } diff --git a/database/storage/badger/badger.go b/database/storage/badger/badger.go index da2276d..8f473cd 100644 --- a/database/storage/badger/badger.go +++ b/database/storage/badger/badger.go @@ -118,6 +118,7 @@ func (b *Badger) Query(q *query.Query, local, internal bool) (*iterator.Iterator return queryIter, nil } +//nolint:gocognit func (b *Badger) queryExecutor(queryIter *iterator.Iterator, q *query.Query, local, internal bool) { err := b.db.View(func(txn *badger.Txn) error { it := txn.NewIterator(badger.DefaultIteratorOptions) diff --git a/formats/dsd/dsd_test.go b/formats/dsd/dsd_test.go index e46df20..db81e76 100644 --- a/formats/dsd/dsd_test.go +++ b/formats/dsd/dsd_test.go @@ -1,4 +1,4 @@ -//nolint:maligned,unparam,gocyclo +//nolint:maligned,unparam,gocyclo,gocognit package dsd import ( diff --git a/formats/dsd/gencode_test.go b/formats/dsd/gencode_test.go index 9d24946..cb35f09 100644 --- a/formats/dsd/gencode_test.go +++ b/formats/dsd/gencode_test.go @@ -1,4 +1,4 @@ -//nolint:nakedret,unconvert +//nolint:nakedret,unconvert,gocognit package dsd import ( diff --git a/formats/varint/varint.go b/formats/varint/varint.go index d0f6129..478e803 100644 --- a/formats/varint/varint.go +++ b/formats/varint/varint.go @@ -1,7 +1,9 @@ package varint -import "errors" -import "encoding/binary" +import ( + "encoding/binary" + "errors" +) // Pack8 packs a uint8 into a VarInt. func Pack8(n uint8) []byte { diff --git a/formats/varint/varint_test.go b/formats/varint/varint_test.go index 0de1741..5e049ad 100644 --- a/formats/varint/varint_test.go +++ b/formats/varint/varint_test.go @@ -1,3 +1,4 @@ +//nolint:gocognit package varint import ( diff --git a/rng/test/main.go b/rng/test/main.go index 601ac95..a45e222 100644 --- a/rng/test/main.go +++ b/rng/test/main.go @@ -38,6 +38,7 @@ func noise() { } +//nolint:gocognit func main() { // generates 1MB and writes to stdout From 09616a0c2537f902d578db089ec7205acc041140 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 9 Oct 2019 16:19:54 +0200 Subject: [PATCH 03/14] Update info module to start first, improve sanity checking --- info/flags.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/info/flags.go b/info/flags.go index f2700b3..6c677b4 100644 --- a/info/flags.go +++ b/info/flags.go @@ -15,7 +15,7 @@ var ( ) func init() { - modules.Register("info", prep, nil, nil, "base") + modules.Register("info", prep, nil, nil) flag.BoolVar(&showVersion, "version", false, "show version and exit") } @@ -35,8 +35,10 @@ func prep() error { // CheckVersion checks if the metadata is ok. func CheckVersion() error { if !strings.HasSuffix(os.Args[0], ".test") { - if name == "[NAME]" || - version == "[version unknown]" || + if name == "[NAME]" { + return errors.New("must call SetInfo() before calling CheckVersion()") + } + if version == "[version unknown]" || commit == "[commit unknown]" || license == "[license unknown]" || buildOptions == "[options unknown]" || From b0204f95ffae2be4c5b145c3d62dcc1f1a2ba84e Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 9 Oct 2019 16:22:51 +0200 Subject: [PATCH 04/14] Improve and fix logging --- log/input.go | 12 ++++++--- log/logging.go | 5 ++-- log/output.go | 70 ++++++++++++++++++++++++++------------------------ log/trace.go | 14 +++++++--- 4 files changed, 57 insertions(+), 44 deletions(-) diff --git a/log/input.go b/log/input.go index e8bff6c..e8b6579 100644 --- a/log/input.go +++ b/log/input.go @@ -75,15 +75,21 @@ func log(level Severity, msg string, tracer *ContextTracer) { select { case logBuffer <- log: default: - forceEmptyingOfBuffer <- struct{}{} - logBuffer <- log + forceEmptyingLoop: + // force empty buffer until we can send to it + for { + select { + case forceEmptyingOfBuffer <- struct{}{}: + case logBuffer <- log: + break forceEmptyingLoop + } + } } // wake up writer if necessary if logsWaitingFlag.SetToIf(false, true) { logsWaiting <- struct{}{} } - } func fastcheck(level Severity) bool { diff --git a/log/logging.go b/log/logging.go index 0ca6fac..f94d12e 100644 --- a/log/logging.go +++ b/log/logging.go @@ -70,7 +70,7 @@ const ( var ( logBuffer chan *logLine - forceEmptyingOfBuffer chan struct{} + forceEmptyingOfBuffer = make(chan struct{}) logLevelInt = uint32(3) logLevel = &logLevelInt @@ -79,7 +79,7 @@ var ( pkgLevels = make(map[string]Severity) pkgLevelsLock sync.Mutex - logsWaiting = make(chan struct{}, 1) + logsWaiting = make(chan struct{}, 4) logsWaitingFlag = abool.NewBool(false) shutdownSignal = make(chan struct{}) @@ -135,7 +135,6 @@ func Start() (err error) { } logBuffer = make(chan *logLine, 1024) - forceEmptyingOfBuffer = make(chan struct{}, 16) initialLogLevel := ParseLevel(logLevelFlag) if initialLogLevel > 0 { diff --git a/log/output.go b/log/output.go index 5b44fb5..cde86c5 100644 --- a/log/output.go +++ b/log/output.go @@ -45,31 +45,32 @@ func startWriter() { } func writer() { - var line *logLine - var lastLine *logLine + var currentLine *logLine + var nextLine *logLine var duplicates uint64 defer shutdownWaitGroup.Done() for { // reset - line = nil - lastLine = nil //nolint:ineffassign // only ineffectual in first loop + currentLine = nil + nextLine = nil duplicates = 0 // wait until logs need to be processed select { - case <-logsWaiting: + case <-logsWaiting: // normal process logsWaitingFlag.UnSet() - case <-shutdownSignal: + case <-forceEmptyingOfBuffer: // log buffer is full! + case <-shutdownSignal: // shutting down finalizeWriting() return } - // wait for timeslot to log, or when buffer is full + // wait for timeslot to log select { - case <-writeTrigger: - case <-forceEmptyingOfBuffer: - case <-shutdownSignal: + case <-writeTrigger: // normal process + case <-forceEmptyingOfBuffer: // log buffer is full! + case <-shutdownSignal: // shutting down finalizeWriting() return } @@ -78,38 +79,39 @@ func writer() { writeLoop: for { select { - case line = <-logBuffer: - - // look-ahead for deduplication (best effort) - dedupLoop: - for { - // check if there is another line waiting - select { - case nextLine := <-logBuffer: - lastLine = line - line = nextLine - default: - break dedupLoop - } - - // deduplication - if !line.Equal(lastLine) { - // no duplicate - break dedupLoop - } - - // duplicate - duplicates++ + case nextLine = <-logBuffer: + // first line we process, just assign to currentLine + if currentLine == nil { + currentLine = nextLine + continue writeLoop } - // write actual line - writeLine(line, duplicates) + // we now have currentLine and nextLine + + // if currentLine and nextLine are equal, do not print, just increase counter and continue + if nextLine.Equal(currentLine) { + duplicates++ + continue writeLoop + } + + // if currentLine and line are _not_ equal, output currentLine + writeLine(currentLine, duplicates) + // reset duplicate counter duplicates = 0 + // set new currentLine + currentLine = nextLine default: break writeLoop } } + // write final line + writeLine(currentLine, duplicates) + // reset state + currentLine = nil //nolint:ineffassign + nextLine = nil + duplicates = 0 //nolint:ineffassign + // back down a little select { case <-time.After(10 * time.Millisecond): diff --git a/log/trace.go b/log/trace.go index 6f77c37..a345f5d 100644 --- a/log/trace.go +++ b/log/trace.go @@ -83,7 +83,7 @@ func Tracer(ctx context.Context) *ContextTracer { // Submit collected logs on the context for further processing/outputting. Does nothing if called on a nil ContextTracer. func (tracer *ContextTracer) Submit() { - if tracer != nil { + if tracer == nil { return } @@ -119,15 +119,21 @@ func (tracer *ContextTracer) Submit() { select { case logBuffer <- log: default: - forceEmptyingOfBuffer <- struct{}{} - logBuffer <- log + forceEmptyingLoop: + // force empty buffer until we can send to it + for { + select { + case forceEmptyingOfBuffer <- struct{}{}: + case logBuffer <- log: + break forceEmptyingLoop + } + } } // wake up writer if necessary if logsWaitingFlag.SetToIf(false, true) { logsWaiting <- struct{}{} } - } func (tracer *ContextTracer) log(level Severity, msg string) { From 2282c6bb71885f1110d6976348d4bea77d0076f7 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 9 Oct 2019 16:35:33 +0200 Subject: [PATCH 05/14] Improve and fix module startup and shutdown procedures as well as error reporting --- modules/error.go | 51 +++++++++++++++++++++++++++++++++++++++++++--- modules/modules.go | 47 ++++++++++++++++++------------------------ modules/start.go | 17 ++++++++++++---- modules/stop.go | 3 +++ modules/worker.go | 23 ++++++++++++++++++++- 5 files changed, 106 insertions(+), 35 deletions(-) diff --git a/modules/error.go b/modules/error.go index 80d025e..a22cd9c 100644 --- a/modules/error.go +++ b/modules/error.go @@ -2,11 +2,16 @@ package modules import ( "fmt" + "os" "runtime/debug" + "sync" + "time" ) var ( errorReportingChannel chan *ModuleError + reportToStdErr bool + reportingLock sync.RWMutex ) // ModuleError wraps a panic, error or message into an error that can be reported. @@ -64,12 +69,43 @@ func (me *ModuleError) Error() string { // Report reports the error through the configured reporting channel. func (me *ModuleError) Report() { + reportingLock.RLock() + defer reportingLock.RUnlock() + if errorReportingChannel != nil { select { case errorReportingChannel <- me: default: } } + + if reportToStdErr { + // default to writing to stderr + fmt.Fprintf( + os.Stderr, + `===== Error Report ===== +Message: %s +Timestamp: %s +ModuleName: %s +TaskName: %s +TaskType: %s +Severity: %s +PanicValue: %s +StackTrace: + +%s +===== End of Report ===== +`, + me.Message, + time.Now(), + me.ModuleName, + me.TaskName, + me.TaskType, + me.Severity, + me.PanicValue, + me.StackTrace, + ) + } } // IsPanic returns whether the given error is a wrapped panic by the modules package and additionally returns it, if true. @@ -84,7 +120,16 @@ func IsPanic(err error) (bool, *ModuleError) { // SetErrorReportingChannel sets the channel to report module errors through. By default only panics are reported, all other errors need to be manually wrapped into a *ModuleError and reported. func SetErrorReportingChannel(reportingChannel chan *ModuleError) { - if errorReportingChannel == nil { - errorReportingChannel = reportingChannel - } + reportingLock.Lock() + defer reportingLock.Unlock() + + errorReportingChannel = reportingChannel +} + +// SetStdErrReporting controls error reporting to stderr. +func SetStdErrReporting(on bool) { + reportingLock.Lock() + defer reportingLock.Unlock() + + reportToStdErr = on } diff --git a/modules/modules.go b/modules/modules.go index 9594cff..dbe2375 100644 --- a/modules/modules.go +++ b/modules/modules.go @@ -13,7 +13,7 @@ import ( ) var ( - modulesLock sync.Mutex + modulesLock sync.RWMutex modules = make(map[string]*Module) // ErrCleanExit is returned by Start() when the program is interrupted before starting. This can happen for example, when using the "--help" flag. @@ -46,6 +46,10 @@ type Module struct { microTaskCnt *int32 waitGroup sync.WaitGroup + // events + eventHooks map[string][]*eventHook + eventHooksLock sync.RWMutex + // dependency mgmt depNames []string depModules []*Module @@ -69,9 +73,9 @@ func (m *Module) shutdown() error { // start shutdown function m.waitGroup.Add(1) - stopFnError := make(chan error) + stopFnError := make(chan error, 1) go func() { - stopFnError <- m.runModuleCtrlFn("stop module", m.stop) + stopFnError <- m.runCtrlFn("stop module", m.stop) m.waitGroup.Done() }() @@ -84,16 +88,8 @@ func (m *Module) shutdown() error { // wait for results select { - case err := <-stopFnError: - return err case <-done: - select { - case err := <-stopFnError: - return err - default: - return nil - } - case <-time.After(3 * time.Second): + case <-time.After(30 * time.Second): log.Warningf( "%s: timed out while waiting for workers/tasks to finish: workers=%d tasks=%d microtasks=%d, continuing shutdown...", m.Name, @@ -102,11 +98,18 @@ func (m *Module) shutdown() error { atomic.LoadInt32(m.microTaskCnt), ) } - return nil -} -func dummyAction() error { - return nil + // collect error + select { + case err := <-stopFnError: + return err + default: + log.Warningf( + "%s: timed out while waiting for stop function to finish, continuing shutdown...", + m.Name, + ) + return nil + } } // Register registers a new module. The control functions `prep`, `start` and `stop` are technically optional. `stop` is called _after_ all added module workers finished. @@ -141,20 +144,10 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ... prep: prep, start: start, stop: stop, + eventHooks: make(map[string][]*eventHook), depNames: dependencies, } - // replace nil arguments with dummy action - if newModule.prep == nil { - newModule.prep = dummyAction - } - if newModule.start == nil { - newModule.start = dummyAction - } - if newModule.stop == nil { - newModule.stop = dummyAction - } - return newModule } diff --git a/modules/start.go b/modules/start.go index 3c3e544..6c945f3 100644 --- a/modules/start.go +++ b/modules/start.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "runtime" + "time" "github.com/safing/portbase/log" "github.com/tevino/abool" @@ -26,8 +27,8 @@ func WaitForStartCompletion() <-chan struct{} { // Start starts all modules in the correct order. In case of an error, it will automatically shutdown again. func Start() error { - modulesLock.Lock() - defer modulesLock.Unlock() + modulesLock.RLock() + defer modulesLock.RUnlock() // start microtask scheduler go microTaskScheduler() @@ -106,7 +107,11 @@ func prepareModules() error { go func() { reports <- &report{ module: execM, - err: execM.runModuleCtrlFn("prep module", execM.prep), + err: execM.runCtrlFnWithTimeout( + "prep module", + 10*time.Second, + execM.prep, + ), } }() } @@ -154,7 +159,11 @@ func startModules() error { go func() { reports <- &report{ module: execM, - err: execM.runModuleCtrlFn("start module", execM.start), + err: execM.runCtrlFnWithTimeout( + "start module", + 60*time.Second, + execM.start, + ), } }() } diff --git a/modules/stop.go b/modules/stop.go index ad1a79e..17e8b7b 100644 --- a/modules/stop.go +++ b/modules/stop.go @@ -12,6 +12,8 @@ import ( var ( shutdownSignal = make(chan struct{}) shutdownSignalClosed = abool.NewBool(false) + + shutdownCompleteSignal = make(chan struct{}) ) // ShuttingDown returns a channel read on the global shutdown signal. @@ -45,6 +47,7 @@ func Shutdown() error { } log.Shutdown() + close(shutdownCompleteSignal) return err } diff --git a/modules/worker.go b/modules/worker.go index 951e91d..b9d6567 100644 --- a/modules/worker.go +++ b/modules/worker.go @@ -3,6 +3,7 @@ package modules import ( "context" "errors" + "fmt" "sync/atomic" "time" @@ -101,7 +102,27 @@ func (m *Module) runWorker(name string, fn func(context.Context) error) (err err return } -func (m *Module) runModuleCtrlFn(name string, fn func() error) (err error) { +func (m *Module) runCtrlFnWithTimeout(name string, timeout time.Duration, fn func() error) error { + + stopFnError := make(chan error) + go func() { + stopFnError <- m.runCtrlFn(name, fn) + }() + + // wait for results + select { + case err := <-stopFnError: + return err + case <-time.After(timeout): + return fmt.Errorf("timed out (%s)", timeout) + } +} + +func (m *Module) runCtrlFn(name string, fn func() error) (err error) { + if fn == nil { + return + } + defer func() { // recover from panic panicVal := recover() From d6457348af2736fa735dbcae0c47d765c17c1f9f Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 9 Oct 2019 16:35:50 +0200 Subject: [PATCH 06/14] Add exit code mgmt to modules --- modules/exit.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 modules/exit.go diff --git a/modules/exit.go b/modules/exit.go new file mode 100644 index 0000000..67d38bc --- /dev/null +++ b/modules/exit.go @@ -0,0 +1,16 @@ +package modules + +var ( + exitStatusCode int +) + +// SetExitStatusCode sets the exit code that the program shell return to the host after shutdown. +func SetExitStatusCode(n int) { + exitStatusCode = n +} + +// GetExitStatusCode waits for the shutdown to complete and then returns the exit code +func GetExitStatusCode() int { + <-shutdownCompleteSignal + return exitStatusCode +} From 21264f4d1f44d42b4e501b42a773af34dcf5214c Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 9 Oct 2019 16:36:00 +0200 Subject: [PATCH 07/14] Add event hooking to modules --- modules/events.go | 101 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 modules/events.go diff --git a/modules/events.go b/modules/events.go new file mode 100644 index 0000000..b13e46e --- /dev/null +++ b/modules/events.go @@ -0,0 +1,101 @@ +package modules + +import ( + "context" + "fmt" + + "github.com/safing/portbase/log" +) + +type eventHookFn func(context.Context, interface{}) error + +type eventHook struct { + description string + hookingModule *Module + hookFn eventHookFn +} + +// TriggerEvent executes all hook functions registered to the specified event. +func (m *Module) TriggerEvent(event string, data interface{}) { + go m.processEventTrigger(event, data) +} + +func (m *Module) processEventTrigger(event string, data interface{}) { + m.eventHooksLock.RLock() + defer m.eventHooksLock.RUnlock() + + hooks, ok := m.eventHooks[event] + if !ok { + log.Warningf("%s: tried to trigger non-existent event %s", m.Name, event) + return + } + + for _, hook := range hooks { + go m.runEventHook(hook, event, data) + } +} + +func (m *Module) runEventHook(hook *eventHook, event string, data interface{}) { + if !hook.hookingModule.Started.IsSet() { + // target module has not yet fully started, wait until start is complete + select { + case <-startCompleteSignal: + case <-shutdownSignal: + return + } + } + + err := hook.hookingModule.RunWorker( + fmt.Sprintf("event hook %s/%s -> %s/%s", m.Name, event, hook.hookingModule.Name, hook.description), + func(ctx context.Context) error { + return hook.hookFn(ctx, data) + }, + ) + if err != nil { + log.Warningf("%s: failed to execute event hook %s/%s -> %s/%s: %s", hook.hookingModule.Name, m.Name, event, hook.hookingModule.Name, hook.description, err) + } +} + +// RegisterEvent registers a new event to allow for registering hooks. +func (m *Module) RegisterEvent(event string) { + m.eventHooksLock.Lock() + defer m.eventHooksLock.Unlock() + + _, ok := m.eventHooks[event] + if !ok { + m.eventHooks[event] = make([]*eventHook, 0, 1) + } +} + +// RegisterEventHook registers a hook function with (another) modules' event. Whenever a hook is triggered and the receiving module has not yet fully started, hook execution will be delayed until all modules completed starting. +func (m *Module) RegisterEventHook(module string, event string, description string, fn func(context.Context, interface{}) error) error { + // get target module + var eventModule *Module + if module == m.Name { + eventModule = m + } else { + var ok bool + modulesLock.RLock() + eventModule, ok = modules[module] + modulesLock.RUnlock() + if !ok { + return fmt.Errorf("module %s does not exist", module) + } + } + + // get target event + eventModule.eventHooksLock.Lock() + defer eventModule.eventHooksLock.Unlock() + hooks, ok := eventModule.eventHooks[event] + if !ok { + return fmt.Errorf("module %s event %s does not exist", eventModule.Name, event) + } + + // add hook + eventModule.eventHooks[event] = append(hooks, &eventHook{ + description: description, + hookingModule: m, + hookFn: fn, + }) + return nil +} From f4d19ee7574cab7baf14498a992b9e3e93ed5beb Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 9 Oct 2019 16:36:26 +0200 Subject: [PATCH 08/14] Switch config change notifier to event hooking --- config/main.go | 8 +++++++- config/set.go | 16 +--------------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/config/main.go b/config/main.go index 5267634..a37f8f6 100644 --- a/config/main.go +++ b/config/main.go @@ -10,7 +10,12 @@ import ( "github.com/safing/portmaster/core/structure" ) +const ( + configChangeEvent = "config change" +) + var ( + module *modules.Module dataRoot *utils.DirStructure ) @@ -22,7 +27,8 @@ func SetDataRoot(root *utils.DirStructure) { } func init() { - modules.Register("config", prep, start, nil, "base", "database") + module = modules.Register("config", prep, start, nil, "base", "database") + module.RegisterEvent(configChangeEvent) } func prep() error { diff --git a/config/set.go b/config/set.go index 37aece9..8e55714 100644 --- a/config/set.go +++ b/config/set.go @@ -17,9 +17,6 @@ var ( validityFlag = abool.NewBool(true) validityFlagLock sync.RWMutex - - changedSignal = make(chan struct{}) - changedSignalLock sync.Mutex ) func getValidityFlag() *abool.AtomicBool { @@ -28,13 +25,6 @@ func getValidityFlag() *abool.AtomicBool { return validityFlag } -// Changed signals if any config option was changed. -func Changed() <-chan struct{} { - changedSignalLock.Lock() - defer changedSignalLock.Unlock() - return changedSignal -} - func signalChanges() { // refetch and save release level and expertise level updateReleaseLevel() @@ -46,11 +36,7 @@ func signalChanges() { validityFlag = abool.NewBool(true) validityFlagLock.Unlock() - // trigger change signal: signal listeners that a config option was changed. - changedSignalLock.Lock() - close(changedSignal) - changedSignal = make(chan struct{}) - changedSignalLock.Unlock() + module.TriggerEvent(configChangeEvent, nil) } // setConfig sets the (prioritized) user defined config. From d4439dcc44ab0d298bfe370f079a6d4925ada0e9 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 9 Oct 2019 16:36:47 +0200 Subject: [PATCH 09/14] Add convenience wrapper for running a portbase-based service --- run/main.go | 126 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 run/main.go diff --git a/run/main.go b/run/main.go new file mode 100644 index 0000000..c5d3917 --- /dev/null +++ b/run/main.go @@ -0,0 +1,126 @@ +package run + +import ( + "bufio" + "flag" + "fmt" + "os" + "os/signal" + "runtime/pprof" + "syscall" + "time" + + "github.com/safing/portbase/log" + "github.com/safing/portbase/modules" +) + +var ( + printStackOnExit bool + enableInputSignals bool + + sigUSR1 = syscall.Signal(0xa) // dummy for windows +) + +func init() { + flag.BoolVar(&printStackOnExit, "print-stack-on-exit", false, "prints the stack before of shutting down") + flag.BoolVar(&enableInputSignals, "input-signals", false, "emulate signals using stdin") +} + +// Run execute a full program lifecycle (including signal handling) based on modules. Just empty-import required packages and do os.Exit(run.Run()). +func Run() int { + + // Start + err := modules.Start() + if err != nil { + if err == modules.ErrCleanExit { + return 0 + } + + _ = modules.Shutdown() + return modules.GetExitStatusCode() + } + + // Shutdown + // catch interrupt for clean shutdown + signalCh := make(chan os.Signal) + if enableInputSignals { + go inputSignals(signalCh) + } + signal.Notify( + signalCh, + os.Interrupt, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT, + sigUSR1, + ) + +signalLoop: + for { + select { + case sig := <-signalCh: + // only print and continue to wait if SIGUSR1 + if sig == sigUSR1 { + _ = pprof.Lookup("goroutine").WriteTo(os.Stderr, 2) + continue signalLoop + } + + fmt.Println(" ") + log.Warning("main: program was interrupted, shutting down.") + + // catch signals during shutdown + go func() { + for { + <-signalCh + fmt.Println(" again, but already shutting down") + } + }() + + if printStackOnExit { + fmt.Println("=== PRINTING TRACES ===") + fmt.Println("=== GOROUTINES ===") + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 2) + fmt.Println("=== BLOCKING ===") + _ = pprof.Lookup("block").WriteTo(os.Stdout, 2) + fmt.Println("=== MUTEXES ===") + _ = pprof.Lookup("mutex").WriteTo(os.Stdout, 2) + fmt.Println("=== END TRACES ===") + } + + go func() { + time.Sleep(60 * time.Second) + fmt.Fprintln(os.Stderr, "===== TAKING TOO LONG FOR SHUTDOWN - PRINTING STACK TRACES =====") + _ = pprof.Lookup("goroutine").WriteTo(os.Stderr, 2) + os.Exit(1) + }() + + _ = modules.Shutdown() + break signalLoop + + case <-modules.ShuttingDown(): + break signalLoop + } + } + + // wait for shutdown to complete, then exit + return modules.GetExitStatusCode() +} + +func inputSignals(signalCh chan os.Signal) { + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + switch scanner.Text() { + case "SIGHUP": + signalCh <- syscall.SIGHUP + case "SIGINT": + signalCh <- syscall.SIGINT + case "SIGQUIT": + signalCh <- syscall.SIGQUIT + case "SIGTERM": + signalCh <- syscall.SIGTERM + case "SIGUSR1": + signalCh <- sigUSR1 + } + } +} From ed61819be751e69a2da1aed2d5ebb24a71672507 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 25 Oct 2019 13:37:42 +0200 Subject: [PATCH 10/14] Cleanup database package --- database/registry.go | 1 + database/storage/bbolt/bbolt_test.go | 2 +- database/storage/hashmap/map.go | 140 +++++++++++++++++++++++++ database/storage/hashmap/map_test.go | 147 +++++++++++++++++++++++++++ 4 files changed, 289 insertions(+), 1 deletion(-) create mode 100644 database/storage/hashmap/map.go create mode 100644 database/storage/hashmap/map_test.go diff --git a/database/registry.go b/database/registry.go index 23d382c..b6d1984 100644 --- a/database/registry.go +++ b/database/registry.go @@ -139,6 +139,7 @@ func saveRegistry(lock bool) error { } // write file + // FIXME: write atomically (best effort) filePath := path.Join(rootStructure.Path, registryFileName) return ioutil.WriteFile(filePath, data, 0600) } diff --git a/database/storage/bbolt/bbolt_test.go b/database/storage/bbolt/bbolt_test.go index 5264f56..e5cdd3e 100644 --- a/database/storage/bbolt/bbolt_test.go +++ b/database/storage/bbolt/bbolt_test.go @@ -31,7 +31,7 @@ type TestRecord struct { B bool } -func TestBadger(t *testing.T) { +func TestBBolt(t *testing.T) { testDir, err := ioutil.TempDir("", "testing-") if err != nil { t.Fatal(err) diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go new file mode 100644 index 0000000..c6bb65d --- /dev/null +++ b/database/storage/hashmap/map.go @@ -0,0 +1,140 @@ +package hashmap + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/safing/portbase/database/iterator" + "github.com/safing/portbase/database/query" + "github.com/safing/portbase/database/record" + "github.com/safing/portbase/database/storage" +) + +// HashMap storage. +type HashMap struct { + name string + db map[string]record.Record + dbLock sync.RWMutex +} + +func init() { + _ = storage.Register("hashmap", NewHashMap) +} + +// NewHashMap creates a hashmap database. +func NewHashMap(name, location string) (storage.Interface, error) { + return &HashMap{ + name: name, + db: make(map[string]record.Record), + }, nil +} + +// Get returns a database record. +func (hm *HashMap) Get(key string) (record.Record, error) { + hm.dbLock.RLock() + defer hm.dbLock.RUnlock() + + r, ok := hm.db[key] + if !ok { + return nil, storage.ErrNotFound + } + return r, nil +} + +// Put stores a record in the database. +func (hm *HashMap) Put(r record.Record) error { + hm.dbLock.Lock() + defer hm.dbLock.Unlock() + + hm.db[r.DatabaseKey()] = r + return nil +} + +// Delete deletes a record from the database. +func (hm *HashMap) Delete(key string) error { + hm.dbLock.Lock() + defer hm.dbLock.Unlock() + + delete(hm.db, key) + return nil +} + +// Query returns a an iterator for the supplied query. +func (hm *HashMap) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { + _, err := q.Check() + if err != nil { + return nil, fmt.Errorf("invalid query: %s", err) + } + + queryIter := iterator.New() + + go hm.queryExecutor(queryIter, q, local, internal) + return queryIter, nil +} + +func (hm *HashMap) queryExecutor(queryIter *iterator.Iterator, q *query.Query, local, internal bool) { + hm.dbLock.RLock() + defer hm.dbLock.RUnlock() + + var err error + +mapLoop: + for key, record := range hm.db { + + switch { + case !q.MatchesKey(key): + continue + case !q.MatchesRecord(record): + continue + case !record.Meta().CheckValidity(): + continue + case !record.Meta().CheckPermission(local, internal): + continue + } + + select { + case <-queryIter.Done: + break mapLoop + case queryIter.Next <- record: + default: + select { + case <-queryIter.Done: + break mapLoop + case queryIter.Next <- record: + case <-time.After(1 * time.Second): + err = errors.New("query timeout") + break mapLoop + } + } + + } + + queryIter.Finish(err) +} + +// ReadOnly returns whether the database is read only. +func (hm *HashMap) ReadOnly() bool { + return false +} + +// Injected returns whether the database is injected. +func (hm *HashMap) Injected() bool { + return false +} + +// Maintain runs a light maintenance operation on the database. +func (hm *HashMap) Maintain() error { + return nil +} + +// MaintainThorough runs a thorough maintenance operation on the database. +func (hm *HashMap) MaintainThorough() (err error) { + return nil +} + +// Shutdown shuts down the database. +func (hm *HashMap) Shutdown() error { + return nil +} diff --git a/database/storage/hashmap/map_test.go b/database/storage/hashmap/map_test.go new file mode 100644 index 0000000..911dc5b --- /dev/null +++ b/database/storage/hashmap/map_test.go @@ -0,0 +1,147 @@ +//nolint:unparam,maligned +package hashmap + +import ( + "reflect" + "sync" + "testing" + + "github.com/safing/portbase/database/query" + "github.com/safing/portbase/database/record" +) + +type TestRecord struct { + record.Base + sync.Mutex + S string + I int + I8 int8 + I16 int16 + I32 int32 + I64 int64 + UI uint + UI8 uint8 + UI16 uint16 + UI32 uint32 + UI64 uint64 + F32 float32 + F64 float64 + B bool +} + +func TestHashMap(t *testing.T) { + // start + db, err := NewHashMap("test", "") + if err != nil { + t.Fatal(err) + } + + a := &TestRecord{ + S: "banana", + I: 42, + I8: 42, + I16: 42, + I32: 42, + I64: 42, + UI: 42, + UI8: 42, + UI16: 42, + UI32: 42, + UI64: 42, + F32: 42.42, + F64: 42.42, + B: true, + } + a.SetMeta(&record.Meta{}) + a.Meta().Update() + a.SetKey("test:A") + + // put record + err = db.Put(a) + if err != nil { + t.Fatal(err) + } + + // get and compare + a1, err := db.Get("A") + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(a, a1) { + t.Fatalf("mismatch, got %v", a1) + } + + // setup query test records + qA := &TestRecord{} + qA.SetKey("test:path/to/A") + qA.CreateMeta() + qB := &TestRecord{} + qB.SetKey("test:path/to/B") + qB.CreateMeta() + qC := &TestRecord{} + qC.SetKey("test:path/to/C") + qC.CreateMeta() + qZ := &TestRecord{} + qZ.SetKey("test:z") + qZ.CreateMeta() + // put + err = db.Put(qA) + if err == nil { + err = db.Put(qB) + } + if err == nil { + err = db.Put(qC) + } + if err == nil { + err = db.Put(qZ) + } + if err != nil { + t.Fatal(err) + } + + // test query + q := query.New("test:path/to/").MustBeValid() + it, err := db.Query(q, true, true) + if err != nil { + t.Fatal(err) + } + cnt := 0 + for range it.Next { + cnt++ + } + if it.Err() != nil { + t.Fatal(it.Err()) + } + if cnt != 3 { + t.Fatalf("unexpected query result count: %d", cnt) + } + + // delete + err = db.Delete("A") + if err != nil { + t.Fatal(err) + } + + // check if its gone + _, err = db.Get("A") + if err == nil { + t.Fatal("should fail") + } + + // maintenance + err = db.Maintain() + if err != nil { + t.Fatal(err) + } + err = db.MaintainThorough() + if err != nil { + t.Fatal(err) + } + + // shutdown + err = db.Shutdown() + if err != nil { + t.Fatal(err) + } +} From bd362db87e3e662e658234f35889a05ffdeab2e5 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 25 Oct 2019 13:37:58 +0200 Subject: [PATCH 11/14] Improve logging --- log/logging.go | 6 +++--- log/output.go | 49 ++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/log/logging.go b/log/logging.go index f94d12e..2d605c8 100644 --- a/log/logging.go +++ b/log/logging.go @@ -90,7 +90,7 @@ var ( startedSignal = make(chan struct{}) ) -// SetPkgLevels sets individual log levels for packages. +// SetPkgLevels sets individual log levels for packages. Only effective after Start(). func SetPkgLevels(levels map[string]Severity) { pkgLevelsLock.Lock() pkgLevels = levels @@ -103,7 +103,7 @@ func UnSetPkgLevels() { pkgLevelsActive.UnSet() } -// SetLogLevel sets a new log level. +// SetLogLevel sets a new log level. Only effective after Start(). func SetLogLevel(level Severity) { atomic.StoreUint32(logLevel, uint32(level)) } @@ -138,7 +138,7 @@ func Start() (err error) { initialLogLevel := ParseLevel(logLevelFlag) if initialLogLevel > 0 { - atomic.StoreUint32(logLevel, uint32(initialLogLevel)) + SetLogLevel(initialLogLevel) } else { err = fmt.Errorf("log warning: invalid log level \"%s\", falling back to level info", logLevelFlag) fmt.Fprintf(os.Stderr, "%s\n", err.Error()) diff --git a/log/output.go b/log/output.go index cde86c5..28bde37 100644 --- a/log/output.go +++ b/log/output.go @@ -2,6 +2,8 @@ package log import ( "fmt" + "os" + "runtime/debug" "time" ) @@ -39,16 +41,51 @@ func writeLine(line *logLine, duplicates uint64) { } func startWriter() { - shutdownWaitGroup.Add(1) fmt.Println(fmt.Sprintf("%s%s %s BOF%s", InfoLevel.color(), time.Now().Format(timeFormat), rightArrow, endColor())) - go writer() + + shutdownWaitGroup.Add(1) + go writerManager() } -func writer() { +func writerManager() { + defer shutdownWaitGroup.Done() + + for { + err := writer() + if err != nil { + Errorf("log: writer failed: %s", err) + } else { + return + } + } +} + +func writer() (err error) { + defer func() { + // recover from panic + panicVal := recover() + if panicVal != nil { + err = fmt.Errorf("%s", panicVal) + + // write stack to stderr + fmt.Fprintf( + os.Stderr, + `===== Error Report ===== +Message: %s +StackTrace: + +%s +===== End of Report ===== +`, + err, + string(debug.Stack()), + ) + } + }() + var currentLine *logLine var nextLine *logLine var duplicates uint64 - defer shutdownWaitGroup.Done() for { // reset @@ -106,7 +143,9 @@ func writer() { } // write final line - writeLine(currentLine, duplicates) + if currentLine != nil { + writeLine(currentLine, duplicates) + } // reset state currentLine = nil //nolint:ineffassign nextLine = nil From 00f545c3a401448d27718733364753b4e12baa06 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 25 Oct 2019 13:38:20 +0200 Subject: [PATCH 12/14] Remove change notifications from updater --- updater/file.go | 1 - updater/notifier.go | 23 ----------------------- updater/registry.go | 7 ------- updater/resource.go | 2 -- 4 files changed, 33 deletions(-) diff --git a/updater/file.go b/updater/file.go index 09bd18d..14addd7 100644 --- a/updater/file.go +++ b/updater/file.go @@ -37,6 +37,5 @@ func (file *File) markActiveWithLocking() { // update last used version if file.resource.ActiveVersion != file.version { file.resource.ActiveVersion = file.version - file.resource.registry.notifyOfChanges() } } diff --git a/updater/notifier.go b/updater/notifier.go index bb7810a..66b2832 100644 --- a/updater/notifier.go +++ b/updater/notifier.go @@ -31,26 +31,3 @@ func (file *File) UpgradeAvailable() bool { func (file *File) WaitForAvailableUpgrade() <-chan struct{} { return file.notifier.notifyChannel } - -// registry wide change notifications - -func (reg *ResourceRegistry) notifyOfChanges() { - if !reg.notifyHooksEnabled.IsSet() { - return - } - - reg.RLock() - defer reg.RUnlock() - - for _, hook := range reg.notifyHooks { - go hook() - } -} - -// RegisterNotifyHook registers a function that is called (as a goroutine) every time the resource registry changes. -func (reg *ResourceRegistry) RegisterNotifyHook(fn func()) { - reg.Lock() - defer reg.Unlock() - - reg.notifyHooks = append(reg.notifyHooks, fn) -} diff --git a/updater/registry.go b/updater/registry.go index 08b7350..d593b9f 100644 --- a/updater/registry.go +++ b/updater/registry.go @@ -120,13 +120,6 @@ func (reg *ResourceRegistry) AddResources(versions map[string]string, available, // SelectVersions selects new resource versions depending on the current registry state. func (reg *ResourceRegistry) SelectVersions() { - // only notify of changes after we are finished - reg.notifyHooksEnabled.UnSet() - defer func() { - reg.notifyHooksEnabled.Set() - reg.notifyOfChanges() - }() - reg.RLock() defer reg.RUnlock() diff --git a/updater/resource.go b/updater/resource.go index 5643b10..f131b2d 100644 --- a/updater/resource.go +++ b/updater/resource.go @@ -166,8 +166,6 @@ func (res *Resource) selectVersion() { res.notifier.markAsUpgradeable() res.notifier = nil } - - res.registry.notifyOfChanges() }() if len(res.Versions) == 0 { From 2508d28465640269acbc637802409931c26ff452 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 25 Oct 2019 13:38:37 +0200 Subject: [PATCH 13/14] Improve modules package --- modules/error.go | 2 +- modules/events.go | 10 +++++--- modules/microtasks.go | 51 +++++++++++++++++++++++++++++++------- modules/microtasks_test.go | 14 +++++------ modules/modules.go | 7 ++++++ modules/worker.go | 40 +++++++++++++++++++++--------- 6 files changed, 91 insertions(+), 33 deletions(-) diff --git a/modules/error.go b/modules/error.go index a22cd9c..04bda1c 100644 --- a/modules/error.go +++ b/modules/error.go @@ -10,7 +10,7 @@ import ( var ( errorReportingChannel chan *ModuleError - reportToStdErr bool + reportToStdErr = true reportingLock sync.RWMutex ) diff --git a/modules/events.go b/modules/events.go index b13e46e..0768ebd 100644 --- a/modules/events.go +++ b/modules/events.go @@ -26,12 +26,14 @@ func (m *Module) processEventTrigger(event string, data interface{}) { hooks, ok := m.eventHooks[event] if !ok { - log.Warningf("%s: tried to trigger non-existent event %s", m.Name, event) + log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, event) return } for _, hook := range hooks { - go m.runEventHook(hook, event, data) + if !hook.hookingModule.ShutdownInProgress() { + go m.runEventHook(hook, event, data) + } } } @@ -79,7 +81,7 @@ func (m *Module) RegisterEventHook(module string, event string, description stri eventModule, ok = modules[module] modulesLock.RUnlock() if !ok { - return fmt.Errorf("module %s does not exist", module) + return fmt.Errorf(`module "%s" does not exist`, module) } } @@ -88,7 +90,7 @@ func (m *Module) RegisterEventHook(module string, event string, description stri defer eventModule.eventHooksLock.Unlock() hooks, ok := eventModule.eventHooks[event] if !ok { - return fmt.Errorf("module %s event %s does not exist", eventModule.Name, event) + return fmt.Errorf(`event "%s/%s" does not exist`, eventModule.Name, event) } // add hook diff --git a/modules/microtasks.go b/modules/microtasks.go index 67866fe..a43d427 100644 --- a/modules/microtasks.go +++ b/modules/microtasks.go @@ -45,19 +45,49 @@ func SetMaxConcurrentMicroTasks(n int) { } } -// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied. -func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) error { +// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The call starts a new goroutine and returns immediately. The given function will be executed and panics caught. The supplied name must not be changed. +func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) { + go func() { + err := m.RunMicroTask(name, fn) + if err != nil { + log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err) + } + }() +} + +// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 3 seconds). The given function will be executed and panics caught. The supplied name must not be changed. +func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) { + go func() { + err := m.RunMediumPriorityMicroTask(name, fn) + if err != nil { + log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err) + } + }() +} + +// StartLowPriorityMicroTask starts a new MicroTask with low priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 15 seconds). The given function will be executed and panics caught. The supplied name must not be changed. +func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) { + go func() { + err := m.RunLowPriorityMicroTask(name, fn) + if err != nil { + log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err) + } + }() +} + +// RunMicroTask runs a new MicroTask with high priority. It will start immediately. The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed. +func (m *Module) RunMicroTask(name *string, fn func(context.Context) error) error { if m == nil { log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name) return errNoModule } - atomic.AddInt32(microTasks, 1) + atomic.AddInt32(microTasks, 1) // increase global counter here, as high priority tasks are not started by the scheduler, where this counter is usually increased return m.runMicroTask(name, fn) } -// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. It will wait until given a go (max 3 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied. -func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) error { +// RunMediumPriorityMicroTask runs a new MicroTask with medium priority. It will wait until a slot becomes available (max 3 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed. +func (m *Module) RunMediumPriorityMicroTask(name *string, fn func(context.Context) error) error { if m == nil { log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name) return errNoModule @@ -76,8 +106,8 @@ func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Cont return m.runMicroTask(name, fn) } -// StartLowPriorityMicroTask starts a new MicroTask with low priority. It will wait until given a go (max 15 seconds). The given function will be executed and panics caught. The supplied name should be a constant - the variable should never change as it won't be copied. -func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) error { +// RunLowPriorityMicroTask runs a new MicroTask with low priority. It will wait until a slot becomes available (max 15 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed. +func (m *Module) RunLowPriorityMicroTask(name *string, fn func(context.Context) error) error { if m == nil { log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name) return errNoModule @@ -109,7 +139,7 @@ func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err if panicVal != nil { me := m.NewPanicError(*name, "microtask", panicVal) me.Report() - log.Errorf("%s: microtask %s panicked: %s\n%s", m.Name, *name, panicVal, me.StackTrace) + log.Errorf("%s: microtask %s panicked: %s", m.Name, *name, panicVal) err = me } @@ -165,7 +195,10 @@ microTaskManageLoop: atomic.AddInt32(microTasks, 1) } else { // wait for signal that a task was completed - <-microTaskFinished + select { + case <-microTaskFinished: + case <-time.After(1 * time.Second): + } } } diff --git a/modules/microtasks_test.go b/modules/microtasks_test.go index 1e21e03..7cde095 100644 --- a/modules/microtasks_test.go +++ b/modules/microtasks_test.go @@ -42,7 +42,7 @@ func TestMicroTaskWaiting(t *testing.T) { go func() { defer mtwWaitGroup.Done() // exec at slot 1 - _ = mtModule.StartMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error { mtwOutputChannel <- "1" // slot 1 time.Sleep(mtwSleepDuration * 5) mtwOutputChannel <- "2" // slot 5 @@ -53,7 +53,7 @@ func TestMicroTaskWaiting(t *testing.T) { time.Sleep(mtwSleepDuration * 1) // clear clearances - _ = mtModule.StartMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error { return nil }) @@ -61,7 +61,7 @@ func TestMicroTaskWaiting(t *testing.T) { go func() { defer mtwWaitGroup.Done() // exec at slot 2 - _ = mtModule.StartLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error { mtwOutputChannel <- "7" // slot 16 return nil }) @@ -74,7 +74,7 @@ func TestMicroTaskWaiting(t *testing.T) { defer mtwWaitGroup.Done() time.Sleep(mtwSleepDuration * 8) // exec at slot 10 - _ = mtModule.StartMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error { mtwOutputChannel <- "4" // slot 10 time.Sleep(mtwSleepDuration * 5) mtwOutputChannel <- "6" // slot 15 @@ -86,7 +86,7 @@ func TestMicroTaskWaiting(t *testing.T) { go func() { defer mtwWaitGroup.Done() // exec at slot 3 - _ = mtModule.StartMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error { mtwOutputChannel <- "3" // slot 6 time.Sleep(mtwSleepDuration * 7) mtwOutputChannel <- "5" // slot 13 @@ -122,7 +122,7 @@ var mtoWaitCh chan struct{} func mediumPrioTaskTester() { defer mtoWaitGroup.Done() <-mtoWaitCh - _ = mtModule.StartMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error { mtoOutputChannel <- "1" time.Sleep(2 * time.Millisecond) return nil @@ -132,7 +132,7 @@ func mediumPrioTaskTester() { func lowPrioTaskTester() { defer mtoWaitGroup.Done() <-mtoWaitCh - _ = mtModule.StartLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error { mtoOutputChannel <- "2" time.Sleep(2 * time.Millisecond) return nil diff --git a/modules/modules.go b/modules/modules.go index dbe2375..66c3bd4 100644 --- a/modules/modules.go +++ b/modules/modules.go @@ -118,7 +118,14 @@ func Register(name string, prep, start, stop func() error, dependencies ...strin modulesLock.Lock() defer modulesLock.Unlock() + // check for already existing module + _, ok := modules[name] + if ok { + panic(fmt.Sprintf("modules: module %s is already registered", name)) + } + // add new module modules[name] = newModule + return newModule } diff --git a/modules/worker.go b/modules/worker.go index b9d6567..d0481f9 100644 --- a/modules/worker.go +++ b/modules/worker.go @@ -16,9 +16,21 @@ const ( ) var ( - errNoModule = errors.New("missing module (is nil!)") + // ErrRestartNow may be returned (wrapped) by service workers to request an immediate restart. + ErrRestartNow = errors.New("requested restart") + errNoModule = errors.New("missing module (is nil!)") ) +// StartWorker directly starts a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to StartWorker starts a new goroutine and returns immediately. +func (m *Module) StartWorker(name string, fn func(context.Context) error) { + go func() { + err := m.RunWorker(name, fn) + if err != nil { + log.Warningf("%s: worker %s failed: %s", m.Name, name, err) + } + }() +} + // RunWorker directly runs a generic worker that does not fit to be a Task or MicroTask, such as long running (and possibly mostly idle) sessions. A call to RunWorker blocks until the worker is finished. func (m *Module) RunWorker(name string, fn func(context.Context) error) error { if m == nil { @@ -67,18 +79,22 @@ func (m *Module) runServiceWorker(name string, backoffDuration time.Duration, fn err := m.runWorker(name, fn) if err != nil { - // reset fail counter if running without error for some time - if time.Now().Add(-5 * time.Minute).After(lastFail) { - failCnt = 0 + if !errors.Is(err, ErrRestartNow) { + // reset fail counter if running without error for some time + if time.Now().Add(-5 * time.Minute).After(lastFail) { + failCnt = 0 + } + // increase fail counter and set last failed time + failCnt++ + lastFail = time.Now() + // log error + sleepFor := time.Duration(failCnt) * backoffDuration + log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor) + time.Sleep(sleepFor) + // loop to restart + } else { + log.Infof("%s: service-worker %s %s - restarting now", m.Name, name, err) } - // increase fail counter and set last failed time - failCnt++ - lastFail = time.Now() - // log error - sleepFor := time.Duration(failCnt) * backoffDuration - log.Errorf("%s: service-worker %s failed (%d): %s - restarting in %s", m.Name, name, failCnt, err, sleepFor) - time.Sleep(sleepFor) - // loop to restart } else { // finish return From 858d37de23c3dd8961b277d44456c4b064af878b Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 30 Oct 2019 11:58:09 +0100 Subject: [PATCH 14/14] Update go version in travis config --- .travis.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.travis.yml b/.travis.yml index 4d85a1b..3571007 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,8 @@ language: go +go: +- 1.x + os: - linux - windows