diff --git a/.golangci.yml b/.golangci.yml index b4c851b..6c348ac 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -31,6 +31,7 @@ linters: - whitespace - wrapcheck - wsl + - nolintlint linters-settings: revive: diff --git a/api/authentication.go b/api/authentication.go index 86fe904..9060e29 100644 --- a/api/authentication.go +++ b/api/authentication.go @@ -433,8 +433,7 @@ func updateAPIKeys(_ context.Context, _ interface{}) error { } if hasExpiredKeys { - name := "api-key-cleanup" - module.StartLowPriorityMicroTask(&name, func(ctx context.Context) error { + module.StartLowPriorityMicroTask("api key cleanup", 0, func(ctx context.Context) error { if err := config.SetConfigOption(CfgAPIKeys, validAPIKeys); err != nil { log.Errorf("api: failed to remove expired API keys: %s", err) } else { diff --git a/container/container.go b/container/container.go index 1f8d6f8..3f3292d 100644 --- a/container/container.go +++ b/container/container.go @@ -127,7 +127,7 @@ func (c *Container) CompileData() []byte { // Get returns the given amount of bytes. Data MAY be copied and IS consumed. func (c *Container) Get(n int) ([]byte, error) { - buf := c.gather(n) + buf := c.Peek(n) if len(buf) < n { return nil, errors.New("container: not enough data to return") } @@ -138,14 +138,14 @@ func (c *Container) Get(n int) ([]byte, error) { // GetAll returns all data. Data MAY be copied and IS consumed. func (c *Container) GetAll() []byte { // TODO: Improve. - buf := c.gather(c.Length()) + buf := c.Peek(c.Length()) c.skip(len(buf)) return buf } // GetAsContainer returns the given amount of bytes in a new container. Data will NOT be copied and IS consumed. func (c *Container) GetAsContainer(n int) (*Container, error) { - newC := c.gatherAsContainer(n) + newC := c.PeekContainer(n) if newC == nil { return nil, errors.New("container: not enough data to return") } @@ -155,7 +155,7 @@ func (c *Container) GetAsContainer(n int) (*Container, error) { // GetMax returns as much as possible, but the given amount of bytes at maximum. Data MAY be copied and IS consumed. func (c *Container) GetMax(n int) []byte { - buf := c.gather(n) + buf := c.Peek(n) c.skip(len(buf)) return buf } @@ -226,42 +226,6 @@ func (c *Container) checkOffset() { } } -// Error Handling - -/* -DEPRECATING... like.... NOW. - -// SetError sets an error. -func (c *Container) SetError(err error) { - c.err = err - c.Replace(append([]byte{0x00}, []byte(err.Error())...)) -} - -// CheckError checks if there is an error in the data. If so, it will parse the error and delete the data. -func (c *Container) CheckError() { - if len(c.compartments[c.offset]) > 0 && c.compartments[c.offset][0] == 0x00 { - c.compartments[c.offset] = c.compartments[c.offset][1:] - c.err = errors.New(string(c.CompileData())) - c.compartments = nil - } -} - -// HasError returns wether or not the container is holding an error. -func (c *Container) HasError() bool { - return c.err != nil -} - -// Error returns the error. -func (c *Container) Error() error { - return c.err -} - -// ErrString returns the error as a string. -func (c *Container) ErrString() string { - return c.err.Error() -} -*/ - // Block Handling // PrependLength prepends the current full length of all bytes in the container. @@ -269,7 +233,8 @@ func (c *Container) PrependLength() { c.Prepend(varint.Pack64(uint64(c.Length()))) } -func (c *Container) gather(n int) []byte { +// Peek returns the given amount of bytes. Data MAY be copied and IS NOT consumed. +func (c *Container) Peek(n int) []byte { // Check requested length. if n <= 0 { return nil @@ -296,7 +261,8 @@ func (c *Container) gather(n int) []byte { return slice[:n] } -func (c *Container) gatherAsContainer(n int) (newC *Container) { +// PeekContainer returns the given amount of bytes in a new container. Data will NOT be copied and IS NOT consumed. +func (c *Container) PeekContainer(n int) (newC *Container) { // Check requested length. if n < 0 { return nil @@ -359,7 +325,7 @@ func (c *Container) GetNextBlockAsContainer() (*Container, error) { // GetNextN8 parses and returns a varint of type uint8. func (c *Container) GetNextN8() (uint8, error) { - buf := c.gather(2) + buf := c.Peek(2) num, n, err := varint.Unpack8(buf) if err != nil { return 0, err @@ -370,7 +336,7 @@ func (c *Container) GetNextN8() (uint8, error) { // GetNextN16 parses and returns a varint of type uint16. func (c *Container) GetNextN16() (uint16, error) { - buf := c.gather(3) + buf := c.Peek(3) num, n, err := varint.Unpack16(buf) if err != nil { return 0, err @@ -381,7 +347,7 @@ func (c *Container) GetNextN16() (uint16, error) { // GetNextN32 parses and returns a varint of type uint32. func (c *Container) GetNextN32() (uint32, error) { - buf := c.gather(5) + buf := c.Peek(5) num, n, err := varint.Unpack32(buf) if err != nil { return 0, err @@ -392,7 +358,7 @@ func (c *Container) GetNextN32() (uint32, error) { // GetNextN64 parses and returns a varint of type uint64. func (c *Container) GetNextN64() (uint64, error) { - buf := c.gather(10) + buf := c.Peek(10) num, n, err := varint.Unpack64(buf) if err != nil { return 0, err diff --git a/container/container_test.go b/container/container_test.go index dc3963a..0c8e636 100644 --- a/container/container_test.go +++ b/container/container_test.go @@ -66,9 +66,9 @@ func TestContainerDataHandling(t *testing.T) { } c8.clean() - c9 := c8.gatherAsContainer(len(testData)) + c9 := c8.PeekContainer(len(testData)) - c10 := c9.gatherAsContainer(len(testData) - 1) + c10 := c9.PeekContainer(len(testData) - 1) c10.Append(testData[len(testData)-1:]) compareMany(t, testData, c1.CompileData(), c2.CompileData(), c3.CompileData(), d4, d5, c6.CompileData(), c7.CompileData(), c8.CompileData(), c9.CompileData(), c10.CompileData()) diff --git a/modules/microtasks.go b/modules/microtasks.go index 92f23d6..5fa8d72 100644 --- a/modules/microtasks.go +++ b/modules/microtasks.go @@ -2,6 +2,7 @@ package modules import ( "context" + "runtime" "sync/atomic" "time" @@ -13,21 +14,17 @@ import ( // TODO: getting some errors when in nanosecond precision for tests: // (1) panic: sync: WaitGroup is reused before previous Wait has returned - should theoretically not happen // (2) sometimes there seems to some kind of race condition stuff, the test hangs and does not complete +// NOTE: These might be resolved by the switch to clearance queues. var ( microTasks *int32 microTasksThreshhold *int32 microTaskFinished = make(chan struct{}, 1) - - mediumPriorityClearance = make(chan struct{}) - lowPriorityClearance = make(chan struct{}) - - triggerLogWriting = log.TriggerWriterChannel() ) const ( - mediumPriorityMaxDelay = 1 * time.Second - lowPriorityMaxDelay = 3 * time.Second + defaultMediumPriorityMaxDelay = 1 * time.Second + defaultLowPriorityMaxDelay = 3 * time.Second ) func init() { @@ -37,97 +34,113 @@ func init() { microTasksThreshhold = µTasksThreshholdVal } -// SetMaxConcurrentMicroTasks sets the maximum number of microtasks that should be run concurrently. +// SetMaxConcurrentMicroTasks sets the maximum number of microtasks that should +// be run concurrently. The modules system initializes it with GOMAXPROCS. +// The minimum is 2. func SetMaxConcurrentMicroTasks(n int) { - if n < 4 { - atomic.StoreInt32(microTasksThreshhold, 4) + if n < 2 { + atomic.StoreInt32(microTasksThreshhold, 2) } else { atomic.StoreInt32(microTasksThreshhold, int32(n)) } } -// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The call starts a new goroutine and returns immediately. The given function will be executed and panics caught. The supplied name must not be changed. -func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) { +// StartHighPriorityMicroTask starts a new MicroTask with high priority. +// It will start immediately. +// The call starts a new goroutine and returns immediately. +// The given function will be executed and panics caught. +func (m *Module) StartHighPriorityMicroTask(name string, fn func(context.Context) error) { go func() { - err := m.RunMicroTask(name, fn) + err := m.RunHighPriorityMicroTask(name, fn) if err != nil { - log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err) + log.Warningf("%s: microtask %s failed: %s", m.Name, name, err) } }() } -// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 3 seconds). The given function will be executed and panics caught. The supplied name must not be changed. -func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) { +// StartMicroTask starts a new MicroTask with medium priority. +// The call starts a new goroutine and returns immediately. +// It will wait until a slot becomes available while respecting maxDelay. +// You can also set maxDelay to 0 to use the default value of 1 second. +// The given function will be executed and panics caught. +func (m *Module) StartMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) { go func() { - err := m.RunMediumPriorityMicroTask(name, fn) + err := m.RunMicroTask(name, maxDelay, fn) if err != nil { - log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err) + log.Warningf("%s: microtask %s failed: %s", m.Name, name, err) } }() } -// StartLowPriorityMicroTask starts a new MicroTask with low priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 15 seconds). The given function will be executed and panics caught. The supplied name must not be changed. -func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) { +// StartLowPriorityMicroTask starts a new MicroTask with low priority. +// The call starts a new goroutine and returns immediately. +// It will wait until a slot becomes available while respecting maxDelay. +// You can also set maxDelay to 0 to use the default value of 3 seconds. +// The given function will be executed and panics caught. +func (m *Module) StartLowPriorityMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) { go func() { - err := m.RunLowPriorityMicroTask(name, fn) + err := m.RunLowPriorityMicroTask(name, maxDelay, fn) if err != nil { - log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err) + log.Warningf("%s: microtask %s failed: %s", m.Name, name, err) } }() } -// RunMicroTask runs a new MicroTask with high priority. It will start immediately. The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed. -func (m *Module) RunMicroTask(name *string, fn func(context.Context) error) error { +// RunHighPriorityMicroTask starts a new MicroTask with high priority. +// The given function will be executed and panics caught. +// The call blocks until the given function finishes. +func (m *Module) RunHighPriorityMicroTask(name string, fn func(context.Context) error) error { if m == nil { - log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name) + log.Errorf(`modules: cannot start microtask "%s" with nil module`, name) return errNoModule } - atomic.AddInt32(microTasks, 1) // increase global counter here, as high priority tasks are not started by the scheduler, where this counter is usually increased + // Increase global counter here, as high priority tasks do not wait for clearance. + atomic.AddInt32(microTasks, 1) return m.runMicroTask(name, fn) } -// RunMediumPriorityMicroTask runs a new MicroTask with medium priority. It will wait until a slot becomes available (max 3 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed. -func (m *Module) RunMediumPriorityMicroTask(name *string, fn func(context.Context) error) error { +// RunMicroTask starts a new MicroTask with medium priority. +// It will wait until a slot becomes available while respecting maxDelay. +// You can also set maxDelay to 0 to use the default value of 1 second. +// The given function will be executed and panics caught. +// The call blocks until the given function finishes. +func (m *Module) RunMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) error { if m == nil { - log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name) + log.Errorf(`modules: cannot start microtask "%s" with nil module`, name) return errNoModule } - // check if we can go immediately - select { - case <-mediumPriorityClearance: - default: - // wait for go or max delay - select { - case <-mediumPriorityClearance: - case <-time.After(mediumPriorityMaxDelay): - } + // Set default max delay, if not defined. + if maxDelay <= 0 { + maxDelay = defaultMediumPriorityMaxDelay } + + getMediumPriorityClearance(maxDelay) return m.runMicroTask(name, fn) } -// RunLowPriorityMicroTask runs a new MicroTask with low priority. It will wait until a slot becomes available (max 15 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed. -func (m *Module) RunLowPriorityMicroTask(name *string, fn func(context.Context) error) error { +// RunLowPriorityMicroTask starts a new MicroTask with low priority. +// It will wait until a slot becomes available while respecting maxDelay. +// You can also set maxDelay to 0 to use the default value of 3 seconds. +// The given function will be executed and panics caught. +// The call blocks until the given function finishes. +func (m *Module) RunLowPriorityMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) error { if m == nil { - log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name) + log.Errorf(`modules: cannot start microtask "%s" with nil module`, name) return errNoModule } - // check if we can go immediately - select { - case <-lowPriorityClearance: - default: - // wait for go or max delay - select { - case <-lowPriorityClearance: - case <-time.After(lowPriorityMaxDelay): - } + // Set default max delay, if not defined. + if maxDelay <= 0 { + maxDelay = defaultLowPriorityMaxDelay } + + getLowPriorityClearance(maxDelay) return m.runMicroTask(name, fn) } -func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err error) { +func (m *Module) runMicroTask(name string, fn func(context.Context) error) (err error) { // start for module // hint: only microTasks global var is important for scheduling, others can be set here atomic.AddInt32(m.microTaskCnt, 1) @@ -137,67 +150,243 @@ func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err // recover from panic panicVal := recover() if panicVal != nil { - me := m.NewPanicError(*name, "microtask", panicVal) + me := m.NewPanicError(name, "microtask", panicVal) me.Report() - log.Errorf("%s: microtask %s panicked: %s", m.Name, *name, panicVal) + log.Errorf("%s: microtask %s panicked: %s", m.Name, name, panicVal) err = me } - // finish for module - atomic.AddInt32(m.microTaskCnt, -1) - m.checkIfStopComplete() - - // finish and possibly trigger next task - atomic.AddInt32(microTasks, -1) - select { - case microTaskFinished <- struct{}{}: - default: - } + m.concludeMicroTask() }() // run err = fn(m.Ctx) - return //nolint:nakedret // need to use named return val in order to change in defer + return // Use named return val in order to change it in defer. } -var microTaskSchedulerStarted = abool.NewBool(false) +// SignalHighPriorityMicroTask signals the start of a new MicroTask with high priority. +// The returned "done" function SHOULD be called when the task has finished +// and MUST be called in any case. Failing to do so will have devastating effects. +// You can safely call "done" multiple times; additional calls do nothing. +func (m *Module) SignalHighPriorityMicroTask() (done func()) { + if m == nil { + log.Errorf("modules: cannot signal microtask with nil module") + return + } + + // Increase global counter here, as high priority tasks do not wait for clearance. + atomic.AddInt32(microTasks, 1) + return m.signalMicroTask() +} + +// SignalMicroTask signals the start of a new MicroTask with medium priority. +// The call will wait until a slot becomes available while respecting maxDelay. +// You can also set maxDelay to 0 to use the default value of 1 second. +// The returned "done" function SHOULD be called when the task has finished +// and MUST be called in any case. Failing to do so will have devastating effects. +// You can safely call "done" multiple times; additional calls do nothing. +func (m *Module) SignalMicroTask(maxDelay time.Duration) (done func()) { + if m == nil { + log.Errorf("modules: cannot signal microtask with nil module") + return + } + + getMediumPriorityClearance(maxDelay) + return m.signalMicroTask() +} + +// SignalLowPriorityMicroTask signals the start of a new MicroTask with low priority. +// The call will wait until a slot becomes available while respecting maxDelay. +// You can also set maxDelay to 0 to use the default value of 1 second. +// The returned "done" function SHOULD be called when the task has finished +// and MUST be called in any case. Failing to do so will have devastating effects. +// You can safely call "done" multiple times; additional calls do nothing. +func (m *Module) SignalLowPriorityMicroTask(maxDelay time.Duration) (done func()) { + if m == nil { + log.Errorf("modules: cannot signal microtask with nil module") + return + } + + getLowPriorityClearance(maxDelay) + return m.signalMicroTask() +} + +func (m *Module) signalMicroTask() (done func()) { + // Start microtask for module. + // Global counter is set earlier as required for scheduling. + atomic.AddInt32(m.microTaskCnt, 1) + + doneCalled := abool.New() + return func() { + if doneCalled.SetToIf(false, true) { + m.concludeMicroTask() + } + } +} + +func (m *Module) concludeMicroTask() { + // Finish for module. + atomic.AddInt32(m.microTaskCnt, -1) + m.checkIfStopComplete() + + // Finish and possibly trigger next task. + atomic.AddInt32(microTasks, -1) + select { + case microTaskFinished <- struct{}{}: + default: + } +} + +var ( + clearanceQueueBaseSize = 100 + clearanceQueueSize = runtime.GOMAXPROCS(0) * clearanceQueueBaseSize + mediumPriorityClearance = make(chan chan struct{}, clearanceQueueSize) + lowPriorityClearance = make(chan chan struct{}, clearanceQueueSize) + + triggerLogWriting = log.TriggerWriterChannel() + + microTaskSchedulerStarted = abool.NewBool(false) +) func microTaskScheduler() { + var clearanceSignal chan struct{} + + // Create ticker for max delay for checking clearances. + recheck := time.NewTicker(1 * time.Second) + defer recheck.Stop() + // only ever start once if !microTaskSchedulerStarted.SetToIf(false, true) { return } -microTaskManageLoop: + // Debugging: Print current amount of microtasks. + // go func() { + // for { + // time.Sleep(1 * time.Second) + // log.Debugf("modules: microtasks: %d", atomic.LoadInt32(microTasks)) + // } + // }() + for { if shutdownFlag.IsSet() { - close(mediumPriorityClearance) - close(lowPriorityClearance) + go microTaskShutdownScheduler() return } + // Check if there is space for one more microtask. if atomic.LoadInt32(microTasks) < atomic.LoadInt32(microTasksThreshhold) { // space left for firing task + // Give Medium clearance. select { - case mediumPriorityClearance <- struct{}{}: + case clearanceSignal = <-mediumPriorityClearance: default: + + // Give Medium and Low clearance. select { - case taskTimeslot <- struct{}{}: - continue microTaskManageLoop - case triggerLogWriting <- struct{}{}: - continue microTaskManageLoop - case mediumPriorityClearance <- struct{}{}: - case lowPriorityClearance <- struct{}{}: + case clearanceSignal = <-mediumPriorityClearance: + case clearanceSignal = <-lowPriorityClearance: + default: + + // Give Medium, Low and other clearancee. + select { + case clearanceSignal = <-mediumPriorityClearance: + case clearanceSignal = <-lowPriorityClearance: + case taskTimeslot <- struct{}{}: + case triggerLogWriting <- struct{}{}: + } } } - // increase task counter - atomic.AddInt32(microTasks, 1) + + // Send clearance signal and increase task counter. + if clearanceSignal != nil { + close(clearanceSignal) + atomic.AddInt32(microTasks, 1) + } + clearanceSignal = nil } else { // wait for signal that a task was completed select { case <-microTaskFinished: - case <-time.After(1 * time.Second): + case <-recheck.C: } } } } + +func microTaskShutdownScheduler() { + var clearanceSignal chan struct{} + + for { + // During shutdown, always give clearances immediately. + select { + case clearanceSignal = <-mediumPriorityClearance: + case clearanceSignal = <-lowPriorityClearance: + case taskTimeslot <- struct{}{}: + case triggerLogWriting <- struct{}{}: + } + + // Give clearance if requested. + if clearanceSignal != nil { + close(clearanceSignal) + atomic.AddInt32(microTasks, 1) + } + clearanceSignal = nil + } +} + +func getMediumPriorityClearance(maxDelay time.Duration) { + // Submit signal to scheduler. + signal := make(chan struct{}) + select { + case mediumPriorityClearance <- signal: + default: + select { + case mediumPriorityClearance <- signal: + case <-time.After(maxDelay): + // Start without clearance and increase microtask counter. + atomic.AddInt32(microTasks, 1) + return + } + } + // Wait for signal to start. + select { + case <-signal: + default: + select { + case <-signal: + case <-time.After(maxDelay): + // Don't keep waiting for signal forever. + // Don't increase microtask counter, as the signal was already submitted + // and the counter will be increased by the scheduler. + } + } +} + +func getLowPriorityClearance(maxDelay time.Duration) { + // Submit signal to scheduler. + signal := make(chan struct{}) + select { + case lowPriorityClearance <- signal: + default: + select { + case lowPriorityClearance <- signal: + case <-time.After(maxDelay): + // Start without clearance and increase microtask counter. + atomic.AddInt32(microTasks, 1) + return + } + } + // Wait for signal to start. + select { + case <-signal: + default: + select { + case <-signal: + case <-time.After(maxDelay): + // Don't keep waiting for signal forever. + // Don't increase microtask counter, as the signal was already submitted + // and the counter will be increased by the scheduler. + } + } +} diff --git a/modules/microtasks_test.go b/modules/microtasks_test.go index 0138bc1..3974895 100644 --- a/modules/microtasks_test.go +++ b/modules/microtasks_test.go @@ -2,6 +2,7 @@ package modules import ( "context" + "runtime" "strings" "sync" "sync/atomic" @@ -20,6 +21,11 @@ func init() { func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much interference expected. + // Check if the state is clean. + if atomic.LoadInt32(microTasks) != 0 { + t.Fatalf("cannot start test with dirty state: %d microtasks", atomic.LoadInt32(microTasks)) + } + // skip if testing.Short() { t.Skip("skipping test in short mode, as it is not fully deterministic") @@ -41,7 +47,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte go func() { defer mtwWaitGroup.Done() // exec at slot 1 - _ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error { mtwOutputChannel <- "1" // slot 1 time.Sleep(mtwSleepDuration * 5) mtwOutputChannel <- "2" // slot 5 @@ -52,7 +58,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte time.Sleep(mtwSleepDuration * 1) // clear clearances - _ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error { return nil }) @@ -60,7 +66,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte go func() { defer mtwWaitGroup.Done() // exec at slot 2 - _ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunLowPriorityMicroTask(mtTestName, 0, func(ctx context.Context) error { mtwOutputChannel <- "7" // slot 16 return nil }) @@ -73,7 +79,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte defer mtwWaitGroup.Done() time.Sleep(mtwSleepDuration * 8) // exec at slot 10 - _ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error { mtwOutputChannel <- "4" // slot 10 time.Sleep(mtwSleepDuration * 5) mtwOutputChannel <- "6" // slot 15 @@ -85,7 +91,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte go func() { defer mtwWaitGroup.Done() // exec at slot 3 - _ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunMicroTask(mtTestName, 0, func(ctx context.Context) error { mtwOutputChannel <- "3" // slot 6 time.Sleep(mtwSleepDuration * 7) mtwOutputChannel <- "5" // slot 13 @@ -107,6 +113,12 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte if completeOutput != mtwExpectedOutput { t.Errorf("MicroTask waiting test failed, expected sequence %s, got %s", mtwExpectedOutput, completeOutput) } + + // Check if the state is clean. + time.Sleep(10 * time.Millisecond) + if atomic.LoadInt32(microTasks) != 0 { + t.Fatalf("test ends with dirty state: %d microtasks", atomic.LoadInt32(microTasks)) + } } // Test Microtask ordering. @@ -121,64 +133,104 @@ var ( // Microtask test functions. +func highPrioTaskTester() { + defer mtoWaitGroup.Done() + <-mtoWaitCh + _ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error { + mtoOutputChannel <- "0" + time.Sleep(2 * time.Millisecond) + return nil + }) +} + +func highPrioSignalledTaskTester() { + defer mtoWaitGroup.Done() + <-mtoWaitCh + go func() { + done := mtModule.SignalHighPriorityMicroTask() + defer done() + + mtoOutputChannel <- "0" + time.Sleep(2 * time.Millisecond) + }() +} + func mediumPrioTaskTester() { defer mtoWaitGroup.Done() <-mtoWaitCh - _ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunMicroTask(mtTestName, 0, func(ctx context.Context) error { mtoOutputChannel <- "1" time.Sleep(2 * time.Millisecond) return nil }) } +func mediumPrioSignalledTaskTester() { + defer mtoWaitGroup.Done() + <-mtoWaitCh + go func() { + done := mtModule.SignalMicroTask(0) + defer done() + + mtoOutputChannel <- "1" + time.Sleep(2 * time.Millisecond) + }() +} + func lowPrioTaskTester() { defer mtoWaitGroup.Done() <-mtoWaitCh - _ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error { + _ = mtModule.RunLowPriorityMicroTask(mtTestName, 0, func(ctx context.Context) error { mtoOutputChannel <- "2" time.Sleep(2 * time.Millisecond) return nil }) } +func lowPrioSignalledTaskTester() { + defer mtoWaitGroup.Done() + <-mtoWaitCh + go func() { + done := mtModule.SignalLowPriorityMicroTask(0) + defer done() + + mtoOutputChannel <- "2" + time.Sleep(2 * time.Millisecond) + }() +} + func TestMicroTaskOrdering(t *testing.T) { //nolint:paralleltest // Too much interference expected. + // Check if the state is clean. + if atomic.LoadInt32(microTasks) != 0 { + t.Fatalf("cannot start test with dirty state: %d microtasks", atomic.LoadInt32(microTasks)) + } + // skip if testing.Short() { t.Skip("skipping test in short mode, as it is not fully deterministic") } + // Only allow a single concurrent task for testing. + atomic.StoreInt32(microTasksThreshhold, 1) + defer SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0)) + // init mtoOutputChannel = make(chan string, 100) mtoWaitCh = make(chan struct{}) // TEST - mtoWaitGroup.Add(20) - // ensure we only execute one microtask at once - atomic.StoreInt32(microTasksThreshhold, 1) - - // kick off - go mediumPrioTaskTester() - go mediumPrioTaskTester() - go lowPrioTaskTester() - go lowPrioTaskTester() - go lowPrioTaskTester() - go mediumPrioTaskTester() - go lowPrioTaskTester() - go mediumPrioTaskTester() - go mediumPrioTaskTester() - go mediumPrioTaskTester() - go lowPrioTaskTester() - go mediumPrioTaskTester() - go lowPrioTaskTester() - go mediumPrioTaskTester() - go lowPrioTaskTester() - go mediumPrioTaskTester() - go lowPrioTaskTester() - go lowPrioTaskTester() - go mediumPrioTaskTester() - go lowPrioTaskTester() + // init all in waiting state + for i := 0; i < 5; i++ { + mtoWaitGroup.Add(6) + go lowPrioTaskTester() + go lowPrioSignalledTaskTester() + go mediumPrioTaskTester() + go mediumPrioSignalledTaskTester() + go highPrioTaskTester() + go highPrioSignalledTaskTester() + } // wait for all goroutines to be ready time.Sleep(10 * time.Millisecond) @@ -197,12 +249,21 @@ func TestMicroTaskOrdering(t *testing.T) { //nolint:paralleltest // Too much int // collect output close(mtoOutputChannel) completeOutput := "" - for s := <-mtoOutputChannel; s != ""; s = <-mtoOutputChannel { + for s := range mtoOutputChannel { completeOutput += s } + // check if test succeeded t.Logf("microTask exec order: %s", completeOutput) - if !strings.Contains(completeOutput, "11111") || !strings.Contains(completeOutput, "22222") { + if !strings.Contains(completeOutput, "000") || + !strings.Contains(completeOutput, "1111") || + !strings.Contains(completeOutput, "22222") { t.Errorf("MicroTask ordering test failed, output was %s. This happens occasionally, please run the test multiple times to verify", completeOutput) } + + // Check if the state is clean. + time.Sleep(10 * time.Millisecond) + if atomic.LoadInt32(microTasks) != 0 { + t.Fatalf("test ends with dirty state: %d microtasks", atomic.LoadInt32(microTasks)) + } } diff --git a/modules/start.go b/modules/start.go index f92a90c..c5ab4ab 100644 --- a/modules/start.go +++ b/modules/start.go @@ -36,8 +36,8 @@ func Start() error { defer mgmtLock.Unlock() // start microtask scheduler + SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0)) go microTaskScheduler() - SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0) * 2) // inter-link modules err := initDependencies()