Add microtask queues and signaling

This commit is contained in:
Daniel 2022-07-22 14:41:35 +02:00
parent ce02b26ff5
commit 7fec4f5428
3 changed files with 331 additions and 114 deletions

View file

@ -2,6 +2,7 @@ package modules
import (
"context"
"runtime"
"sync/atomic"
"time"
@ -13,21 +14,17 @@ import (
// TODO: getting some errors when in nanosecond precision for tests:
// (1) panic: sync: WaitGroup is reused before previous Wait has returned - should theoretically not happen
// (2) sometimes there seems to some kind of race condition stuff, the test hangs and does not complete
// NOTE: These might be resolved by the switch to clearance queues.
var (
microTasks *int32
microTasksThreshhold *int32
microTaskFinished = make(chan struct{}, 1)
mediumPriorityClearance = make(chan struct{})
lowPriorityClearance = make(chan struct{})
triggerLogWriting = log.TriggerWriterChannel()
)
const (
mediumPriorityMaxDelay = 1 * time.Second
lowPriorityMaxDelay = 3 * time.Second
defaultMediumPriorityMaxDelay = 1 * time.Second
defaultLowPriorityMaxDelay = 3 * time.Second
)
func init() {
@ -37,97 +34,113 @@ func init() {
microTasksThreshhold = &microTasksThreshholdVal
}
// SetMaxConcurrentMicroTasks sets the maximum number of microtasks that should be run concurrently.
// SetMaxConcurrentMicroTasks sets the maximum number of microtasks that should
// be run concurrently. The modules system initializes it with GOMAXPROCS.
// The minimum is 2.
func SetMaxConcurrentMicroTasks(n int) {
if n < 4 {
atomic.StoreInt32(microTasksThreshhold, 4)
if n < 2 {
atomic.StoreInt32(microTasksThreshhold, 2)
} else {
atomic.StoreInt32(microTasksThreshhold, int32(n))
}
}
// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The call starts a new goroutine and returns immediately. The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) {
// StartHighPriorityMicroTask starts a new MicroTask with high priority.
// It will start immediately.
// The call starts a new goroutine and returns immediately.
// The given function will be executed and panics caught.
func (m *Module) StartHighPriorityMicroTask(name string, fn func(context.Context) error) {
go func() {
err := m.RunMicroTask(name, fn)
err := m.RunHighPriorityMicroTask(name, fn)
if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err)
log.Warningf("%s: microtask %s failed: %s", m.Name, name, err)
}
}()
}
// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 3 seconds). The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) {
// StartMicroTask starts a new MicroTask with medium priority.
// The call starts a new goroutine and returns immediately.
// It will wait until a slot becomes available while respecting maxDelay.
// You can also set maxDelay to 0 to use the default value of 1 second.
// The given function will be executed and panics caught.
func (m *Module) StartMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) {
go func() {
err := m.RunMediumPriorityMicroTask(name, fn)
err := m.RunMicroTask(name, maxDelay, fn)
if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err)
log.Warningf("%s: microtask %s failed: %s", m.Name, name, err)
}
}()
}
// StartLowPriorityMicroTask starts a new MicroTask with low priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 15 seconds). The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) {
// StartLowPriorityMicroTask starts a new MicroTask with low priority.
// The call starts a new goroutine and returns immediately.
// It will wait until a slot becomes available while respecting maxDelay.
// You can also set maxDelay to 0 to use the default value of 3 seconds.
// The given function will be executed and panics caught.
func (m *Module) StartLowPriorityMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) {
go func() {
err := m.RunLowPriorityMicroTask(name, fn)
err := m.RunLowPriorityMicroTask(name, maxDelay, fn)
if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err)
log.Warningf("%s: microtask %s failed: %s", m.Name, name, err)
}
}()
}
// RunMicroTask runs a new MicroTask with high priority. It will start immediately. The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) RunMicroTask(name *string, fn func(context.Context) error) error {
// RunHighPriorityMicroTask starts a new MicroTask with high priority.
// The given function will be executed and panics caught.
// The call blocks until the given function finishes.
func (m *Module) RunHighPriorityMicroTask(name string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
log.Errorf(`modules: cannot start microtask "%s" with nil module`, name)
return errNoModule
}
atomic.AddInt32(microTasks, 1) // increase global counter here, as high priority tasks are not started by the scheduler, where this counter is usually increased
// Increase global counter here, as high priority tasks do not wait for clearance.
atomic.AddInt32(microTasks, 1)
return m.runMicroTask(name, fn)
}
// RunMediumPriorityMicroTask runs a new MicroTask with medium priority. It will wait until a slot becomes available (max 3 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) RunMediumPriorityMicroTask(name *string, fn func(context.Context) error) error {
// RunMicroTask starts a new MicroTask with medium priority.
// It will wait until a slot becomes available while respecting maxDelay.
// You can also set maxDelay to 0 to use the default value of 1 second.
// The given function will be executed and panics caught.
// The call blocks until the given function finishes.
func (m *Module) RunMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
log.Errorf(`modules: cannot start microtask "%s" with nil module`, name)
return errNoModule
}
// check if we can go immediately
select {
case <-mediumPriorityClearance:
default:
// wait for go or max delay
select {
case <-mediumPriorityClearance:
case <-time.After(mediumPriorityMaxDelay):
}
// Set default max delay, if not defined.
if maxDelay <= 0 {
maxDelay = defaultMediumPriorityMaxDelay
}
getMediumPriorityClearance(maxDelay)
return m.runMicroTask(name, fn)
}
// RunLowPriorityMicroTask runs a new MicroTask with low priority. It will wait until a slot becomes available (max 15 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed.
func (m *Module) RunLowPriorityMicroTask(name *string, fn func(context.Context) error) error {
// RunLowPriorityMicroTask starts a new MicroTask with low priority.
// It will wait until a slot becomes available while respecting maxDelay.
// You can also set maxDelay to 0 to use the default value of 3 seconds.
// The given function will be executed and panics caught.
// The call blocks until the given function finishes.
func (m *Module) RunLowPriorityMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
log.Errorf(`modules: cannot start microtask "%s" with nil module`, name)
return errNoModule
}
// check if we can go immediately
select {
case <-lowPriorityClearance:
default:
// wait for go or max delay
select {
case <-lowPriorityClearance:
case <-time.After(lowPriorityMaxDelay):
}
// Set default max delay, if not defined.
if maxDelay <= 0 {
maxDelay = defaultLowPriorityMaxDelay
}
getLowPriorityClearance(maxDelay)
return m.runMicroTask(name, fn)
}
func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err error) {
func (m *Module) runMicroTask(name string, fn func(context.Context) error) (err error) {
// start for module
// hint: only microTasks global var is important for scheduling, others can be set here
atomic.AddInt32(m.microTaskCnt, 1)
@ -137,67 +150,233 @@ func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err
// recover from panic
panicVal := recover()
if panicVal != nil {
me := m.NewPanicError(*name, "microtask", panicVal)
me := m.NewPanicError(name, "microtask", panicVal)
me.Report()
log.Errorf("%s: microtask %s panicked: %s", m.Name, *name, panicVal)
log.Errorf("%s: microtask %s panicked: %s", m.Name, name, panicVal)
err = me
}
// finish for module
atomic.AddInt32(m.microTaskCnt, -1)
m.checkIfStopComplete()
// finish and possibly trigger next task
atomic.AddInt32(microTasks, -1)
select {
case microTaskFinished <- struct{}{}:
default:
}
m.concludeMicroTask()
}()
// run
err = fn(m.Ctx)
return //nolint:nakedret // need to use named return val in order to change in defer
return // Use named return val in order to change it in defer.
}
var microTaskSchedulerStarted = abool.NewBool(false)
// SignalHighPriorityMicroTask signals the start of a new MicroTask with high priority.
// The returned "done" function SHOULD be called when the task has finished
// and MUST be called in any case. Failing to do so will have devastating effects.
// You can safely call "done" multiple times; additional calls do nothing.
func (m *Module) SignalHighPriorityMicroTask() (done func()) {
if m == nil {
log.Errorf("modules: cannot signal microtask with nil module")
return
}
// Increase global counter here, as high priority tasks do not wait for clearance.
atomic.AddInt32(microTasks, 1)
return m.signalMicroTask()
}
// SignalMicroTask signals the start of a new MicroTask with medium priority.
// The call will wait until a slot becomes available while respecting maxDelay.
// You can also set maxDelay to 0 to use the default value of 1 second.
// The returned "done" function SHOULD be called when the task has finished
// and MUST be called in any case. Failing to do so will have devastating effects.
// You can safely call "done" multiple times; additional calls do nothing.
func (m *Module) SignalMicroTask(maxDelay time.Duration) (done func()) {
if m == nil {
log.Errorf("modules: cannot signal microtask with nil module")
return
}
getMediumPriorityClearance(maxDelay)
return m.signalMicroTask()
}
// SignalLowPriorityMicroTask signals the start of a new MicroTask with low priority.
// The call will wait until a slot becomes available while respecting maxDelay.
// You can also set maxDelay to 0 to use the default value of 1 second.
// The returned "done" function SHOULD be called when the task has finished
// and MUST be called in any case. Failing to do so will have devastating effects.
// You can safely call "done" multiple times; additional calls do nothing.
func (m *Module) SignalLowPriorityMicroTask(maxDelay time.Duration) (done func()) {
if m == nil {
log.Errorf("modules: cannot signal microtask with nil module")
return
}
getLowPriorityClearance(maxDelay)
return m.signalMicroTask()
}
func (m *Module) signalMicroTask() (done func()) {
// Start microtask for module.
// Global counter is set earlier as required for scheduling.
atomic.AddInt32(m.microTaskCnt, 1)
doneCalled := abool.New()
return func() {
if doneCalled.SetToIf(false, true) {
m.concludeMicroTask()
}
}
}
func (m *Module) concludeMicroTask() {
// Finish for module.
atomic.AddInt32(m.microTaskCnt, -1)
m.checkIfStopComplete()
// Finish and possibly trigger next task.
atomic.AddInt32(microTasks, -1)
select {
case microTaskFinished <- struct{}{}:
default:
}
}
var (
clearanceQueueBaseSize = 100
clearanceQueueSize = runtime.GOMAXPROCS(0) * clearanceQueueBaseSize
mediumPriorityClearance = make(chan chan struct{}, clearanceQueueSize)
lowPriorityClearance = make(chan chan struct{}, clearanceQueueSize)
triggerLogWriting = log.TriggerWriterChannel()
microTaskSchedulerStarted = abool.NewBool(false)
)
func microTaskScheduler() {
var clearanceSignal chan struct{}
// Create ticker for max delay for checking clearances.
recheck := time.NewTicker(1 * time.Second)
defer recheck.Stop()
// only ever start once
if !microTaskSchedulerStarted.SetToIf(false, true) {
return
}
microTaskManageLoop:
for {
if shutdownFlag.IsSet() {
close(mediumPriorityClearance)
close(lowPriorityClearance)
go microTaskShutdownScheduler()
return
}
// Check if there is space for one more microtask.
if atomic.LoadInt32(microTasks) < atomic.LoadInt32(microTasksThreshhold) { // space left for firing task
// Give Medium clearance.
select {
case mediumPriorityClearance <- struct{}{}:
case clearanceSignal = <-mediumPriorityClearance:
default:
// Give Medium and Low clearance.
select {
case taskTimeslot <- struct{}{}:
continue microTaskManageLoop
case triggerLogWriting <- struct{}{}:
continue microTaskManageLoop
case mediumPriorityClearance <- struct{}{}:
case lowPriorityClearance <- struct{}{}:
case clearanceSignal = <-mediumPriorityClearance:
case clearanceSignal = <-lowPriorityClearance:
default:
// Give Medium, Low and other clearancee.
select {
case clearanceSignal = <-mediumPriorityClearance:
case clearanceSignal = <-lowPriorityClearance:
case taskTimeslot <- struct{}{}:
case triggerLogWriting <- struct{}{}:
}
}
}
// increase task counter
atomic.AddInt32(microTasks, 1)
// Send clearance signal and increase task counter.
if clearanceSignal != nil {
close(clearanceSignal)
atomic.AddInt32(microTasks, 1)
}
clearanceSignal = nil
} else {
// wait for signal that a task was completed
select {
case <-microTaskFinished:
case <-time.After(1 * time.Second):
case <-recheck.C:
}
}
}
}
func microTaskShutdownScheduler() {
var clearanceSignal chan struct{}
for {
// During shutdown, always give clearances immediately.
select {
case clearanceSignal = <-mediumPriorityClearance:
case clearanceSignal = <-lowPriorityClearance:
case taskTimeslot <- struct{}{}:
case triggerLogWriting <- struct{}{}:
}
// Give clearance if requested.
if clearanceSignal != nil {
close(clearanceSignal)
atomic.AddInt32(microTasks, 1)
}
clearanceSignal = nil
}
}
func getMediumPriorityClearance(maxDelay time.Duration) {
// Submit signal to scheduler.
signal := make(chan struct{})
select {
case mediumPriorityClearance <- signal:
default:
select {
case mediumPriorityClearance <- signal:
case <-time.After(maxDelay):
// Start without clearance and increase microtask counter.
atomic.AddInt32(microTasks, 1)
}
}
// Wait for signal to start.
select {
case <-signal:
default:
select {
case <-signal:
case <-time.After(maxDelay):
// TODO: Creating the timer two times could lead to 2x max delay.
// Start without clearance and increase microtask counter.
atomic.AddInt32(microTasks, 1)
}
}
}
func getLowPriorityClearance(maxDelay time.Duration) {
// Submit signal to scheduler.
signal := make(chan struct{})
select {
case lowPriorityClearance <- signal:
default:
select {
case lowPriorityClearance <- signal:
case <-time.After(maxDelay):
// Start without clearance and increase microtask counter.
atomic.AddInt32(microTasks, 1)
}
}
// Wait for signal to start.
select {
case <-signal:
default:
select {
case <-signal:
case <-time.After(maxDelay):
// TODO: Creating the timer two times could lead to 2x max delay.
// Start without clearance and increase microtask counter.
atomic.AddInt32(microTasks, 1)
}
}
}

View file

@ -2,6 +2,7 @@ package modules
import (
"context"
"runtime"
"strings"
"sync"
"sync/atomic"
@ -41,7 +42,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte
go func() {
defer mtwWaitGroup.Done()
// exec at slot 1
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "1" // slot 1
time.Sleep(mtwSleepDuration * 5)
mtwOutputChannel <- "2" // slot 5
@ -52,7 +53,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte
time.Sleep(mtwSleepDuration * 1)
// clear clearances
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
return nil
})
@ -60,7 +61,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte
go func() {
defer mtwWaitGroup.Done()
// exec at slot 2
_ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunLowPriorityMicroTask(mtTestName, 0, func(ctx context.Context) error {
mtwOutputChannel <- "7" // slot 16
return nil
})
@ -73,7 +74,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte
defer mtwWaitGroup.Done()
time.Sleep(mtwSleepDuration * 8)
// exec at slot 10
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "4" // slot 10
time.Sleep(mtwSleepDuration * 5)
mtwOutputChannel <- "6" // slot 15
@ -85,7 +86,7 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte
go func() {
defer mtwWaitGroup.Done()
// exec at slot 3
_ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunMicroTask(mtTestName, 0, func(ctx context.Context) error {
mtwOutputChannel <- "3" // slot 6
time.Sleep(mtwSleepDuration * 7)
mtwOutputChannel <- "5" // slot 13
@ -121,64 +122,98 @@ var (
// Microtask test functions.
func highPrioTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
mtoOutputChannel <- "0"
time.Sleep(2 * time.Millisecond)
return nil
})
}
func highPrioSignalledTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
go func() {
done := mtModule.SignalHighPriorityMicroTask()
defer done()
mtoOutputChannel <- "0"
time.Sleep(2 * time.Millisecond)
}()
}
func mediumPrioTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
_ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunMicroTask(mtTestName, 0, func(ctx context.Context) error {
mtoOutputChannel <- "1"
time.Sleep(2 * time.Millisecond)
return nil
})
}
func mediumPrioSignalledTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
go func() {
done := mtModule.SignalMicroTask(0)
defer done()
mtoOutputChannel <- "1"
time.Sleep(2 * time.Millisecond)
}()
}
func lowPrioTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
_ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
_ = mtModule.RunLowPriorityMicroTask(mtTestName, 0, func(ctx context.Context) error {
mtoOutputChannel <- "2"
time.Sleep(2 * time.Millisecond)
return nil
})
}
func TestMicroTaskOrdering(t *testing.T) { //nolint:paralleltest // Too much interference expected.
func lowPrioSignalledTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
go func() {
done := mtModule.SignalLowPriorityMicroTask(0)
defer done()
mtoOutputChannel <- "2"
time.Sleep(2 * time.Millisecond)
}()
}
func TestMicroTaskOrdering(t *testing.T) { //nolint:paralleltest // Too much interference expected.
// skip
if testing.Short() {
t.Skip("skipping test in short mode, as it is not fully deterministic")
}
// Only allow a single concurrent task for testing.
atomic.StoreInt32(microTasksThreshhold, 1)
defer SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0))
// init
mtoOutputChannel = make(chan string, 100)
mtoWaitCh = make(chan struct{})
// TEST
mtoWaitGroup.Add(20)
// ensure we only execute one microtask at once
atomic.StoreInt32(microTasksThreshhold, 1)
// kick off
go mediumPrioTaskTester()
go mediumPrioTaskTester()
go lowPrioTaskTester()
go lowPrioTaskTester()
go lowPrioTaskTester()
go mediumPrioTaskTester()
go lowPrioTaskTester()
go mediumPrioTaskTester()
go mediumPrioTaskTester()
go mediumPrioTaskTester()
go lowPrioTaskTester()
go mediumPrioTaskTester()
go lowPrioTaskTester()
go mediumPrioTaskTester()
go lowPrioTaskTester()
go mediumPrioTaskTester()
go lowPrioTaskTester()
go lowPrioTaskTester()
go mediumPrioTaskTester()
go lowPrioTaskTester()
// init all in waiting state
for i := 0; i < 5; i++ {
mtoWaitGroup.Add(6)
go lowPrioTaskTester()
go lowPrioSignalledTaskTester()
go mediumPrioTaskTester()
go mediumPrioSignalledTaskTester()
go highPrioTaskTester()
go highPrioSignalledTaskTester()
}
// wait for all goroutines to be ready
time.Sleep(10 * time.Millisecond)
@ -197,12 +232,15 @@ func TestMicroTaskOrdering(t *testing.T) { //nolint:paralleltest // Too much int
// collect output
close(mtoOutputChannel)
completeOutput := ""
for s := <-mtoOutputChannel; s != ""; s = <-mtoOutputChannel {
for s := range mtoOutputChannel {
completeOutput += s
}
// check if test succeeded
t.Logf("microTask exec order: %s", completeOutput)
if !strings.Contains(completeOutput, "11111") || !strings.Contains(completeOutput, "22222") {
if !strings.Contains(completeOutput, "000") ||
!strings.Contains(completeOutput, "1111") ||
!strings.Contains(completeOutput, "22222") {
t.Errorf("MicroTask ordering test failed, output was %s. This happens occasionally, please run the test multiple times to verify", completeOutput)
}
}

View file

@ -37,7 +37,7 @@ func Start() error {
// start microtask scheduler
go microTaskScheduler()
SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0) * 2)
SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0))
// inter-link modules
err := initDependencies()