Merge pull request #174 from safing/feature/queued-signaled-microtasks

Improve microtasks with queues and signaling
This commit is contained in:
Daniel 2022-08-02 09:32:19 +02:00 committed by GitHub
commit 2c0c580c2f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 379 additions and 163 deletions

View file

@ -31,6 +31,7 @@ linters:
- whitespace - whitespace
- wrapcheck - wrapcheck
- wsl - wsl
- nolintlint
linters-settings: linters-settings:
revive: revive:

View file

@ -433,8 +433,7 @@ func updateAPIKeys(_ context.Context, _ interface{}) error {
} }
if hasExpiredKeys { if hasExpiredKeys {
name := "api-key-cleanup" module.StartLowPriorityMicroTask("api key cleanup", 0, func(ctx context.Context) error {
module.StartLowPriorityMicroTask(&name, func(ctx context.Context) error {
if err := config.SetConfigOption(CfgAPIKeys, validAPIKeys); err != nil { if err := config.SetConfigOption(CfgAPIKeys, validAPIKeys); err != nil {
log.Errorf("api: failed to remove expired API keys: %s", err) log.Errorf("api: failed to remove expired API keys: %s", err)
} else { } else {

View file

@ -127,7 +127,7 @@ func (c *Container) CompileData() []byte {
// Get returns the given amount of bytes. Data MAY be copied and IS consumed. // Get returns the given amount of bytes. Data MAY be copied and IS consumed.
func (c *Container) Get(n int) ([]byte, error) { func (c *Container) Get(n int) ([]byte, error) {
buf := c.gather(n) buf := c.Peek(n)
if len(buf) < n { if len(buf) < n {
return nil, errors.New("container: not enough data to return") return nil, errors.New("container: not enough data to return")
} }
@ -138,14 +138,14 @@ func (c *Container) Get(n int) ([]byte, error) {
// GetAll returns all data. Data MAY be copied and IS consumed. // GetAll returns all data. Data MAY be copied and IS consumed.
func (c *Container) GetAll() []byte { func (c *Container) GetAll() []byte {
// TODO: Improve. // TODO: Improve.
buf := c.gather(c.Length()) buf := c.Peek(c.Length())
c.skip(len(buf)) c.skip(len(buf))
return buf return buf
} }
// GetAsContainer returns the given amount of bytes in a new container. Data will NOT be copied and IS consumed. // GetAsContainer returns the given amount of bytes in a new container. Data will NOT be copied and IS consumed.
func (c *Container) GetAsContainer(n int) (*Container, error) { func (c *Container) GetAsContainer(n int) (*Container, error) {
newC := c.gatherAsContainer(n) newC := c.PeekContainer(n)
if newC == nil { if newC == nil {
return nil, errors.New("container: not enough data to return") return nil, errors.New("container: not enough data to return")
} }
@ -155,7 +155,7 @@ func (c *Container) GetAsContainer(n int) (*Container, error) {
// GetMax returns as much as possible, but the given amount of bytes at maximum. Data MAY be copied and IS consumed. // GetMax returns as much as possible, but the given amount of bytes at maximum. Data MAY be copied and IS consumed.
func (c *Container) GetMax(n int) []byte { func (c *Container) GetMax(n int) []byte {
buf := c.gather(n) buf := c.Peek(n)
c.skip(len(buf)) c.skip(len(buf))
return buf return buf
} }
@ -226,42 +226,6 @@ func (c *Container) checkOffset() {
} }
} }
// Error Handling
/*
DEPRECATING... like.... NOW.
// SetError sets an error.
func (c *Container) SetError(err error) {
c.err = err
c.Replace(append([]byte{0x00}, []byte(err.Error())...))
}
// CheckError checks if there is an error in the data. If so, it will parse the error and delete the data.
func (c *Container) CheckError() {
if len(c.compartments[c.offset]) > 0 && c.compartments[c.offset][0] == 0x00 {
c.compartments[c.offset] = c.compartments[c.offset][1:]
c.err = errors.New(string(c.CompileData()))
c.compartments = nil
}
}
// HasError returns wether or not the container is holding an error.
func (c *Container) HasError() bool {
return c.err != nil
}
// Error returns the error.
func (c *Container) Error() error {
return c.err
}
// ErrString returns the error as a string.
func (c *Container) ErrString() string {
return c.err.Error()
}
*/
// Block Handling // Block Handling
// PrependLength prepends the current full length of all bytes in the container. // PrependLength prepends the current full length of all bytes in the container.
@ -269,7 +233,8 @@ func (c *Container) PrependLength() {
c.Prepend(varint.Pack64(uint64(c.Length()))) c.Prepend(varint.Pack64(uint64(c.Length())))
} }
func (c *Container) gather(n int) []byte { // Peek returns the given amount of bytes. Data MAY be copied and IS NOT consumed.
func (c *Container) Peek(n int) []byte {
// Check requested length. // Check requested length.
if n <= 0 { if n <= 0 {
return nil return nil
@ -296,7 +261,8 @@ func (c *Container) gather(n int) []byte {
return slice[:n] return slice[:n]
} }
func (c *Container) gatherAsContainer(n int) (newC *Container) { // PeekContainer returns the given amount of bytes in a new container. Data will NOT be copied and IS NOT consumed.
func (c *Container) PeekContainer(n int) (newC *Container) {
// Check requested length. // Check requested length.
if n < 0 { if n < 0 {
return nil return nil
@ -359,7 +325,7 @@ func (c *Container) GetNextBlockAsContainer() (*Container, error) {
// GetNextN8 parses and returns a varint of type uint8. // GetNextN8 parses and returns a varint of type uint8.
func (c *Container) GetNextN8() (uint8, error) { func (c *Container) GetNextN8() (uint8, error) {
buf := c.gather(2) buf := c.Peek(2)
num, n, err := varint.Unpack8(buf) num, n, err := varint.Unpack8(buf)
if err != nil { if err != nil {
return 0, err return 0, err
@ -370,7 +336,7 @@ func (c *Container) GetNextN8() (uint8, error) {
// GetNextN16 parses and returns a varint of type uint16. // GetNextN16 parses and returns a varint of type uint16.
func (c *Container) GetNextN16() (uint16, error) { func (c *Container) GetNextN16() (uint16, error) {
buf := c.gather(3) buf := c.Peek(3)
num, n, err := varint.Unpack16(buf) num, n, err := varint.Unpack16(buf)
if err != nil { if err != nil {
return 0, err return 0, err
@ -381,7 +347,7 @@ func (c *Container) GetNextN16() (uint16, error) {
// GetNextN32 parses and returns a varint of type uint32. // GetNextN32 parses and returns a varint of type uint32.
func (c *Container) GetNextN32() (uint32, error) { func (c *Container) GetNextN32() (uint32, error) {
buf := c.gather(5) buf := c.Peek(5)
num, n, err := varint.Unpack32(buf) num, n, err := varint.Unpack32(buf)
if err != nil { if err != nil {
return 0, err return 0, err
@ -392,7 +358,7 @@ func (c *Container) GetNextN32() (uint32, error) {
// GetNextN64 parses and returns a varint of type uint64. // GetNextN64 parses and returns a varint of type uint64.
func (c *Container) GetNextN64() (uint64, error) { func (c *Container) GetNextN64() (uint64, error) {
buf := c.gather(10) buf := c.Peek(10)
num, n, err := varint.Unpack64(buf) num, n, err := varint.Unpack64(buf)
if err != nil { if err != nil {
return 0, err return 0, err

View file

@ -66,9 +66,9 @@ func TestContainerDataHandling(t *testing.T) {
} }
c8.clean() c8.clean()
c9 := c8.gatherAsContainer(len(testData)) c9 := c8.PeekContainer(len(testData))
c10 := c9.gatherAsContainer(len(testData) - 1) c10 := c9.PeekContainer(len(testData) - 1)
c10.Append(testData[len(testData)-1:]) c10.Append(testData[len(testData)-1:])
compareMany(t, testData, c1.CompileData(), c2.CompileData(), c3.CompileData(), d4, d5, c6.CompileData(), c7.CompileData(), c8.CompileData(), c9.CompileData(), c10.CompileData()) compareMany(t, testData, c1.CompileData(), c2.CompileData(), c3.CompileData(), d4, d5, c6.CompileData(), c7.CompileData(), c8.CompileData(), c9.CompileData(), c10.CompileData())

View file

@ -2,6 +2,7 @@ package modules
import ( import (
"context" "context"
"runtime"
"sync/atomic" "sync/atomic"
"time" "time"
@ -13,21 +14,17 @@ import (
// TODO: getting some errors when in nanosecond precision for tests: // TODO: getting some errors when in nanosecond precision for tests:
// (1) panic: sync: WaitGroup is reused before previous Wait has returned - should theoretically not happen // (1) panic: sync: WaitGroup is reused before previous Wait has returned - should theoretically not happen
// (2) sometimes there seems to some kind of race condition stuff, the test hangs and does not complete // (2) sometimes there seems to some kind of race condition stuff, the test hangs and does not complete
// NOTE: These might be resolved by the switch to clearance queues.
var ( var (
microTasks *int32 microTasks *int32
microTasksThreshhold *int32 microTasksThreshhold *int32
microTaskFinished = make(chan struct{}, 1) microTaskFinished = make(chan struct{}, 1)
mediumPriorityClearance = make(chan struct{})
lowPriorityClearance = make(chan struct{})
triggerLogWriting = log.TriggerWriterChannel()
) )
const ( const (
mediumPriorityMaxDelay = 1 * time.Second defaultMediumPriorityMaxDelay = 1 * time.Second
lowPriorityMaxDelay = 3 * time.Second defaultLowPriorityMaxDelay = 3 * time.Second
) )
func init() { func init() {
@ -37,97 +34,113 @@ func init() {
microTasksThreshhold = &microTasksThreshholdVal microTasksThreshhold = &microTasksThreshholdVal
} }
// SetMaxConcurrentMicroTasks sets the maximum number of microtasks that should be run concurrently. // SetMaxConcurrentMicroTasks sets the maximum number of microtasks that should
// be run concurrently. The modules system initializes it with GOMAXPROCS.
// The minimum is 2.
func SetMaxConcurrentMicroTasks(n int) { func SetMaxConcurrentMicroTasks(n int) {
if n < 4 { if n < 2 {
atomic.StoreInt32(microTasksThreshhold, 4) atomic.StoreInt32(microTasksThreshhold, 2)
} else { } else {
atomic.StoreInt32(microTasksThreshhold, int32(n)) atomic.StoreInt32(microTasksThreshhold, int32(n))
} }
} }
// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The call starts a new goroutine and returns immediately. The given function will be executed and panics caught. The supplied name must not be changed. // StartHighPriorityMicroTask starts a new MicroTask with high priority.
func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) { // It will start immediately.
// The call starts a new goroutine and returns immediately.
// The given function will be executed and panics caught.
func (m *Module) StartHighPriorityMicroTask(name string, fn func(context.Context) error) {
go func() { go func() {
err := m.RunMicroTask(name, fn) err := m.RunHighPriorityMicroTask(name, fn)
if err != nil { if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err) log.Warningf("%s: microtask %s failed: %s", m.Name, name, err)
} }
}() }()
} }
// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 3 seconds). The given function will be executed and panics caught. The supplied name must not be changed. // StartMicroTask starts a new MicroTask with medium priority.
func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) { // The call starts a new goroutine and returns immediately.
// It will wait until a slot becomes available while respecting maxDelay.
// You can also set maxDelay to 0 to use the default value of 1 second.
// The given function will be executed and panics caught.
func (m *Module) StartMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) {
go func() { go func() {
err := m.RunMediumPriorityMicroTask(name, fn) err := m.RunMicroTask(name, maxDelay, fn)
if err != nil { if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err) log.Warningf("%s: microtask %s failed: %s", m.Name, name, err)
} }
}() }()
} }
// StartLowPriorityMicroTask starts a new MicroTask with low priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 15 seconds). The given function will be executed and panics caught. The supplied name must not be changed. // StartLowPriorityMicroTask starts a new MicroTask with low priority.
func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) { // The call starts a new goroutine and returns immediately.
// It will wait until a slot becomes available while respecting maxDelay.
// You can also set maxDelay to 0 to use the default value of 3 seconds.
// The given function will be executed and panics caught.
func (m *Module) StartLowPriorityMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) {
go func() { go func() {
err := m.RunLowPriorityMicroTask(name, fn) err := m.RunLowPriorityMicroTask(name, maxDelay, fn)
if err != nil { if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err) log.Warningf("%s: microtask %s failed: %s", m.Name, name, err)
} }
}() }()
} }
// RunMicroTask runs a new MicroTask with high priority. It will start immediately. The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed. // RunHighPriorityMicroTask starts a new MicroTask with high priority.
func (m *Module) RunMicroTask(name *string, fn func(context.Context) error) error { // The given function will be executed and panics caught.
// The call blocks until the given function finishes.
func (m *Module) RunHighPriorityMicroTask(name string, fn func(context.Context) error) error {
if m == nil { if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name) log.Errorf(`modules: cannot start microtask "%s" with nil module`, name)
return errNoModule return errNoModule
} }
atomic.AddInt32(microTasks, 1) // increase global counter here, as high priority tasks are not started by the scheduler, where this counter is usually increased // Increase global counter here, as high priority tasks do not wait for clearance.
atomic.AddInt32(microTasks, 1)
return m.runMicroTask(name, fn) return m.runMicroTask(name, fn)
} }
// RunMediumPriorityMicroTask runs a new MicroTask with medium priority. It will wait until a slot becomes available (max 3 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed. // RunMicroTask starts a new MicroTask with medium priority.
func (m *Module) RunMediumPriorityMicroTask(name *string, fn func(context.Context) error) error { // It will wait until a slot becomes available while respecting maxDelay.
// You can also set maxDelay to 0 to use the default value of 1 second.
// The given function will be executed and panics caught.
// The call blocks until the given function finishes.
func (m *Module) RunMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) error {
if m == nil { if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name) log.Errorf(`modules: cannot start microtask "%s" with nil module`, name)
return errNoModule return errNoModule
} }
// check if we can go immediately // Set default max delay, if not defined.
select { if maxDelay <= 0 {
case <-mediumPriorityClearance: maxDelay = defaultMediumPriorityMaxDelay
default:
// wait for go or max delay
select {
case <-mediumPriorityClearance:
case <-time.After(mediumPriorityMaxDelay):
}
} }
getMediumPriorityClearance(maxDelay)
return m.runMicroTask(name, fn) return m.runMicroTask(name, fn)
} }
// RunLowPriorityMicroTask runs a new MicroTask with low priority. It will wait until a slot becomes available (max 15 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed. // RunLowPriorityMicroTask starts a new MicroTask with low priority.
func (m *Module) RunLowPriorityMicroTask(name *string, fn func(context.Context) error) error { // It will wait until a slot becomes available while respecting maxDelay.
// You can also set maxDelay to 0 to use the default value of 3 seconds.
// The given function will be executed and panics caught.
// The call blocks until the given function finishes.
func (m *Module) RunLowPriorityMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) error {
if m == nil { if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name) log.Errorf(`modules: cannot start microtask "%s" with nil module`, name)
return errNoModule return errNoModule
} }
// check if we can go immediately // Set default max delay, if not defined.
select { if maxDelay <= 0 {
case <-lowPriorityClearance: maxDelay = defaultLowPriorityMaxDelay
default:
// wait for go or max delay
select {
case <-lowPriorityClearance:
case <-time.After(lowPriorityMaxDelay):
}
} }
getLowPriorityClearance(maxDelay)
return m.runMicroTask(name, fn) return m.runMicroTask(name, fn)
} }
func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err error) { func (m *Module) runMicroTask(name string, fn func(context.Context) error) (err error) {
// start for module // start for module
// hint: only microTasks global var is important for scheduling, others can be set here // hint: only microTasks global var is important for scheduling, others can be set here
atomic.AddInt32(m.microTaskCnt, 1) atomic.AddInt32(m.microTaskCnt, 1)
@ -137,67 +150,243 @@ func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err
// recover from panic // recover from panic
panicVal := recover() panicVal := recover()
if panicVal != nil { if panicVal != nil {
me := m.NewPanicError(*name, "microtask", panicVal) me := m.NewPanicError(name, "microtask", panicVal)
me.Report() me.Report()
log.Errorf("%s: microtask %s panicked: %s", m.Name, *name, panicVal) log.Errorf("%s: microtask %s panicked: %s", m.Name, name, panicVal)
err = me err = me
} }
// finish for module m.concludeMicroTask()
atomic.AddInt32(m.microTaskCnt, -1)
m.checkIfStopComplete()
// finish and possibly trigger next task
atomic.AddInt32(microTasks, -1)
select {
case microTaskFinished <- struct{}{}:
default:
}
}() }()
// run // run
err = fn(m.Ctx) err = fn(m.Ctx)
return //nolint:nakedret // need to use named return val in order to change in defer return // Use named return val in order to change it in defer.
} }
var microTaskSchedulerStarted = abool.NewBool(false) // SignalHighPriorityMicroTask signals the start of a new MicroTask with high priority.
// The returned "done" function SHOULD be called when the task has finished
// and MUST be called in any case. Failing to do so will have devastating effects.
// You can safely call "done" multiple times; additional calls do nothing.
func (m *Module) SignalHighPriorityMicroTask() (done func()) {
if m == nil {
log.Errorf("modules: cannot signal microtask with nil module")
return
}
// Increase global counter here, as high priority tasks do not wait for clearance.
atomic.AddInt32(microTasks, 1)
return m.signalMicroTask()
}
// SignalMicroTask signals the start of a new MicroTask with medium priority.
// The call will wait until a slot becomes available while respecting maxDelay.
// You can also set maxDelay to 0 to use the default value of 1 second.
// The returned "done" function SHOULD be called when the task has finished
// and MUST be called in any case. Failing to do so will have devastating effects.
// You can safely call "done" multiple times; additional calls do nothing.
func (m *Module) SignalMicroTask(maxDelay time.Duration) (done func()) {
if m == nil {
log.Errorf("modules: cannot signal microtask with nil module")
return
}
getMediumPriorityClearance(maxDelay)
return m.signalMicroTask()
}
// SignalLowPriorityMicroTask signals the start of a new MicroTask with low priority.
// The call will wait until a slot becomes available while respecting maxDelay.
// You can also set maxDelay to 0 to use the default value of 1 second.
// The returned "done" function SHOULD be called when the task has finished
// and MUST be called in any case. Failing to do so will have devastating effects.
// You can safely call "done" multiple times; additional calls do nothing.
func (m *Module) SignalLowPriorityMicroTask(maxDelay time.Duration) (done func()) {
if m == nil {
log.Errorf("modules: cannot signal microtask with nil module")
return
}
getLowPriorityClearance(maxDelay)
return m.signalMicroTask()
}
func (m *Module) signalMicroTask() (done func()) {
// Start microtask for module.
// Global counter is set earlier as required for scheduling.
atomic.AddInt32(m.microTaskCnt, 1)
doneCalled := abool.New()
return func() {
if doneCalled.SetToIf(false, true) {
m.concludeMicroTask()
}
}
}
func (m *Module) concludeMicroTask() {
// Finish for module.
atomic.AddInt32(m.microTaskCnt, -1)
m.checkIfStopComplete()
// Finish and possibly trigger next task.
atomic.AddInt32(microTasks, -1)
select {
case microTaskFinished <- struct{}{}:
default:
}
}
var (
clearanceQueueBaseSize = 100
clearanceQueueSize = runtime.GOMAXPROCS(0) * clearanceQueueBaseSize
mediumPriorityClearance = make(chan chan struct{}, clearanceQueueSize)
lowPriorityClearance = make(chan chan struct{}, clearanceQueueSize)
triggerLogWriting = log.TriggerWriterChannel()
microTaskSchedulerStarted = abool.NewBool(false)
)
func microTaskScheduler() { func microTaskScheduler() {
var clearanceSignal chan struct{}
// Create ticker for max delay for checking clearances.
recheck := time.NewTicker(1 * time.Second)
defer recheck.Stop()
// only ever start once // only ever start once
if !microTaskSchedulerStarted.SetToIf(false, true) { if !microTaskSchedulerStarted.SetToIf(false, true) {
return return
} }
microTaskManageLoop: // Debugging: Print current amount of microtasks.
// go func() {
// for {
// time.Sleep(1 * time.Second)
// log.Debugf("modules: microtasks: %d", atomic.LoadInt32(microTasks))
// }
// }()
for { for {
if shutdownFlag.IsSet() { if shutdownFlag.IsSet() {
close(mediumPriorityClearance) go microTaskShutdownScheduler()
close(lowPriorityClearance)
return return
} }
// Check if there is space for one more microtask.
if atomic.LoadInt32(microTasks) < atomic.LoadInt32(microTasksThreshhold) { // space left for firing task if atomic.LoadInt32(microTasks) < atomic.LoadInt32(microTasksThreshhold) { // space left for firing task
// Give Medium clearance.
select { select {
case mediumPriorityClearance <- struct{}{}: case clearanceSignal = <-mediumPriorityClearance:
default: default:
// Give Medium and Low clearance.
select { select {
case taskTimeslot <- struct{}{}: case clearanceSignal = <-mediumPriorityClearance:
continue microTaskManageLoop case clearanceSignal = <-lowPriorityClearance:
case triggerLogWriting <- struct{}{}: default:
continue microTaskManageLoop
case mediumPriorityClearance <- struct{}{}: // Give Medium, Low and other clearancee.
case lowPriorityClearance <- struct{}{}: select {
case clearanceSignal = <-mediumPriorityClearance:
case clearanceSignal = <-lowPriorityClearance:
case taskTimeslot <- struct{}{}:
case triggerLogWriting <- struct{}{}:
}
} }
} }
// increase task counter
atomic.AddInt32(microTasks, 1) // Send clearance signal and increase task counter.
if clearanceSignal != nil {
close(clearanceSignal)
atomic.AddInt32(microTasks, 1)
}
clearanceSignal = nil
} else { } else {
// wait for signal that a task was completed // wait for signal that a task was completed
select { select {
case <-microTaskFinished: case <-microTaskFinished:
case <-time.After(1 * time.Second): case <-recheck.C:
} }
} }
} }
} }
func microTaskShutdownScheduler() {
var clearanceSignal chan struct{}
for {
// During shutdown, always give clearances immediately.
select {
case clearanceSignal = <-mediumPriorityClearance:
case clearanceSignal = <-lowPriorityClearance:
case taskTimeslot <- struct{}{}:
case triggerLogWriting <- struct{}{}:
}
// Give clearance if requested.
if clearanceSignal != nil {
close(clearanceSignal)
atomic.AddInt32(microTasks, 1)
}
clearanceSignal = nil
}
}
func getMediumPriorityClearance(maxDelay time.Duration) {
// Submit signal to scheduler.
signal := make(chan struct{})
select {
case mediumPriorityClearance <- signal:
default:
select {
case mediumPriorityClearance <- signal:
case <-time.After(maxDelay):
// Start without clearance and increase microtask counter.
atomic.AddInt32(microTasks, 1)
return
}
}
// Wait for signal to start.
select {
case <-signal:
default:
select {
case <-signal:
case <-time.After(maxDelay):
// Don't keep waiting for signal forever.
// Don't increase microtask counter, as the signal was already submitted
// and the counter will be increased by the scheduler.
}
}
}
func getLowPriorityClearance(maxDelay time.Duration) {
// Submit signal to scheduler.
signal := make(chan struct{})
select {
case lowPriorityClearance <- signal:
default:
select {
case lowPriorityClearance <- signal:
case <-time.After(maxDelay):
// Start without clearance and increase microtask counter.
atomic.AddInt32(microTasks, 1)
return
}
}
// Wait for signal to start.
select {
case <-signal:
default:
select {
case <-signal:
case <-time.After(maxDelay):
// Don't keep waiting for signal forever.
// Don't increase microtask counter, as the signal was already submitted
// and the counter will be increased by the scheduler.
}
}
}

View file

@ -2,6 +2,7 @@ package modules
import ( import (
"context" "context"
"runtime"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -20,6 +21,11 @@ func init() {
func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much interference expected. func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much interference expected.
// Check if the state is clean.
if atomic.LoadInt32(microTasks) != 0 {
t.Fatalf("cannot start test with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
}
// skip // skip
if testing.Short() { if testing.Short() {
t.Skip("skipping test in short mode, as it is not fully deterministic") t.Skip("skipping test in short mode, as it is not fully deterministic")
@ -41,7 +47,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte
go func() { go func() {
defer mtwWaitGroup.Done() defer mtwWaitGroup.Done()
// exec at slot 1 // exec at slot 1
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error { _ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "1" // slot 1 mtwOutputChannel <- "1" // slot 1
time.Sleep(mtwSleepDuration * 5) time.Sleep(mtwSleepDuration * 5)
mtwOutputChannel <- "2" // slot 5 mtwOutputChannel <- "2" // slot 5
@ -52,7 +58,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte
time.Sleep(mtwSleepDuration * 1) time.Sleep(mtwSleepDuration * 1)
// clear clearances // clear clearances
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error { _ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
return nil return nil
}) })
@ -60,7 +66,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte
go func() { go func() {
defer mtwWaitGroup.Done() defer mtwWaitGroup.Done()
// exec at slot 2 // exec at slot 2
_ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error { _ = mtModule.RunLowPriorityMicroTask(mtTestName, 0, func(ctx context.Context) error {
mtwOutputChannel <- "7" // slot 16 mtwOutputChannel <- "7" // slot 16
return nil return nil
}) })
@ -73,7 +79,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte
defer mtwWaitGroup.Done() defer mtwWaitGroup.Done()
time.Sleep(mtwSleepDuration * 8) time.Sleep(mtwSleepDuration * 8)
// exec at slot 10 // exec at slot 10
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error { _ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "4" // slot 10 mtwOutputChannel <- "4" // slot 10
time.Sleep(mtwSleepDuration * 5) time.Sleep(mtwSleepDuration * 5)
mtwOutputChannel <- "6" // slot 15 mtwOutputChannel <- "6" // slot 15
@ -85,7 +91,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte
go func() { go func() {
defer mtwWaitGroup.Done() defer mtwWaitGroup.Done()
// exec at slot 3 // exec at slot 3
_ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error { _ = mtModule.RunMicroTask(mtTestName, 0, func(ctx context.Context) error {
mtwOutputChannel <- "3" // slot 6 mtwOutputChannel <- "3" // slot 6
time.Sleep(mtwSleepDuration * 7) time.Sleep(mtwSleepDuration * 7)
mtwOutputChannel <- "5" // slot 13 mtwOutputChannel <- "5" // slot 13
@ -107,6 +113,12 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte
if completeOutput != mtwExpectedOutput { if completeOutput != mtwExpectedOutput {
t.Errorf("MicroTask waiting test failed, expected sequence %s, got %s", mtwExpectedOutput, completeOutput) t.Errorf("MicroTask waiting test failed, expected sequence %s, got %s", mtwExpectedOutput, completeOutput)
} }
// Check if the state is clean.
time.Sleep(10 * time.Millisecond)
if atomic.LoadInt32(microTasks) != 0 {
t.Fatalf("test ends with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
}
} }
// Test Microtask ordering. // Test Microtask ordering.
@ -121,64 +133,104 @@ var (
// Microtask test functions. // Microtask test functions.
func highPrioTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
mtoOutputChannel <- "0"
time.Sleep(2 * time.Millisecond)
return nil
})
}
func highPrioSignalledTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
go func() {
done := mtModule.SignalHighPriorityMicroTask()
defer done()
mtoOutputChannel <- "0"
time.Sleep(2 * time.Millisecond)
}()
}
func mediumPrioTaskTester() { func mediumPrioTaskTester() {
defer mtoWaitGroup.Done() defer mtoWaitGroup.Done()
<-mtoWaitCh <-mtoWaitCh
_ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error { _ = mtModule.RunMicroTask(mtTestName, 0, func(ctx context.Context) error {
mtoOutputChannel <- "1" mtoOutputChannel <- "1"
time.Sleep(2 * time.Millisecond) time.Sleep(2 * time.Millisecond)
return nil return nil
}) })
} }
func mediumPrioSignalledTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
go func() {
done := mtModule.SignalMicroTask(0)
defer done()
mtoOutputChannel <- "1"
time.Sleep(2 * time.Millisecond)
}()
}
func lowPrioTaskTester() { func lowPrioTaskTester() {
defer mtoWaitGroup.Done() defer mtoWaitGroup.Done()
<-mtoWaitCh <-mtoWaitCh
_ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error { _ = mtModule.RunLowPriorityMicroTask(mtTestName, 0, func(ctx context.Context) error {
mtoOutputChannel <- "2" mtoOutputChannel <- "2"
time.Sleep(2 * time.Millisecond) time.Sleep(2 * time.Millisecond)
return nil return nil
}) })
} }
func lowPrioSignalledTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
go func() {
done := mtModule.SignalLowPriorityMicroTask(0)
defer done()
mtoOutputChannel <- "2"
time.Sleep(2 * time.Millisecond)
}()
}
func TestMicroTaskOrdering(t *testing.T) { //nolint:paralleltest // Too much interference expected. func TestMicroTaskOrdering(t *testing.T) { //nolint:paralleltest // Too much interference expected.
// Check if the state is clean.
if atomic.LoadInt32(microTasks) != 0 {
t.Fatalf("cannot start test with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
}
// skip // skip
if testing.Short() { if testing.Short() {
t.Skip("skipping test in short mode, as it is not fully deterministic") t.Skip("skipping test in short mode, as it is not fully deterministic")
} }
// Only allow a single concurrent task for testing.
atomic.StoreInt32(microTasksThreshhold, 1)
defer SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0))
// init // init
mtoOutputChannel = make(chan string, 100) mtoOutputChannel = make(chan string, 100)
mtoWaitCh = make(chan struct{}) mtoWaitCh = make(chan struct{})
// TEST // TEST
mtoWaitGroup.Add(20)
// ensure we only execute one microtask at once // init all in waiting state
atomic.StoreInt32(microTasksThreshhold, 1) for i := 0; i < 5; i++ {
mtoWaitGroup.Add(6)
// kick off go lowPrioTaskTester()
go mediumPrioTaskTester() go lowPrioSignalledTaskTester()
go mediumPrioTaskTester() go mediumPrioTaskTester()
go lowPrioTaskTester() go mediumPrioSignalledTaskTester()
go lowPrioTaskTester() go highPrioTaskTester()
go lowPrioTaskTester() go highPrioSignalledTaskTester()
go mediumPrioTaskTester() }
go lowPrioTaskTester()
go mediumPrioTaskTester()
go mediumPrioTaskTester()
go mediumPrioTaskTester()
go lowPrioTaskTester()
go mediumPrioTaskTester()
go lowPrioTaskTester()
go mediumPrioTaskTester()
go lowPrioTaskTester()
go mediumPrioTaskTester()
go lowPrioTaskTester()
go lowPrioTaskTester()
go mediumPrioTaskTester()
go lowPrioTaskTester()
// wait for all goroutines to be ready // wait for all goroutines to be ready
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -197,12 +249,21 @@ func TestMicroTaskOrdering(t *testing.T) { //nolint:paralleltest // Too much int
// collect output // collect output
close(mtoOutputChannel) close(mtoOutputChannel)
completeOutput := "" completeOutput := ""
for s := <-mtoOutputChannel; s != ""; s = <-mtoOutputChannel { for s := range mtoOutputChannel {
completeOutput += s completeOutput += s
} }
// check if test succeeded // check if test succeeded
t.Logf("microTask exec order: %s", completeOutput) t.Logf("microTask exec order: %s", completeOutput)
if !strings.Contains(completeOutput, "11111") || !strings.Contains(completeOutput, "22222") { if !strings.Contains(completeOutput, "000") ||
!strings.Contains(completeOutput, "1111") ||
!strings.Contains(completeOutput, "22222") {
t.Errorf("MicroTask ordering test failed, output was %s. This happens occasionally, please run the test multiple times to verify", completeOutput) t.Errorf("MicroTask ordering test failed, output was %s. This happens occasionally, please run the test multiple times to verify", completeOutput)
} }
// Check if the state is clean.
time.Sleep(10 * time.Millisecond)
if atomic.LoadInt32(microTasks) != 0 {
t.Fatalf("test ends with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
}
} }

View file

@ -36,8 +36,8 @@ func Start() error {
defer mgmtLock.Unlock() defer mgmtLock.Unlock()
// start microtask scheduler // start microtask scheduler
SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0))
go microTaskScheduler() go microTaskScheduler()
SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0) * 2)
// inter-link modules // inter-link modules
err := initDependencies() err := initDependencies()