Fix microtask signaling and improve tests

This commit is contained in:
Daniel 2022-07-29 22:04:26 +02:00
parent fed9346e46
commit aabd4fef77
3 changed files with 40 additions and 7 deletions

View file

@ -260,6 +260,14 @@ func microTaskScheduler() {
return
}
// Debugging: Print current amount of microtasks.
// go func() {
// for {
// time.Sleep(1 * time.Second)
// log.Debugf("modules: microtasks: %d", atomic.LoadInt32(microTasks))
// }
// }()
for {
if shutdownFlag.IsSet() {
go microTaskShutdownScheduler()
@ -338,6 +346,7 @@ func getMediumPriorityClearance(maxDelay time.Duration) {
case <-time.After(maxDelay):
// Start without clearance and increase microtask counter.
atomic.AddInt32(microTasks, 1)
return
}
}
// Wait for signal to start.
@ -347,9 +356,9 @@ func getMediumPriorityClearance(maxDelay time.Duration) {
select {
case <-signal:
case <-time.After(maxDelay):
// TODO: Creating the timer two times could lead to 2x max delay.
// Start without clearance and increase microtask counter.
atomic.AddInt32(microTasks, 1)
// Don't keep waiting for signal forever.
// Don't increase microtask counter, as the signal was already submitted
// and the counter will be increased by the scheduler.
}
}
}
@ -365,6 +374,7 @@ func getLowPriorityClearance(maxDelay time.Duration) {
case <-time.After(maxDelay):
// Start without clearance and increase microtask counter.
atomic.AddInt32(microTasks, 1)
return
}
}
// Wait for signal to start.
@ -374,9 +384,9 @@ func getLowPriorityClearance(maxDelay time.Duration) {
select {
case <-signal:
case <-time.After(maxDelay):
// TODO: Creating the timer two times could lead to 2x max delay.
// Start without clearance and increase microtask counter.
atomic.AddInt32(microTasks, 1)
// Don't keep waiting for signal forever.
// Don't increase microtask counter, as the signal was already submitted
// and the counter will be increased by the scheduler.
}
}
}

View file

@ -21,6 +21,11 @@ func init() {
func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much interference expected.
// Check if the state is clean.
if atomic.LoadInt32(microTasks) != 0 {
t.Fatalf("cannot start test with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
}
// skip
if testing.Short() {
t.Skip("skipping test in short mode, as it is not fully deterministic")
@ -108,6 +113,12 @@ func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much inte
if completeOutput != mtwExpectedOutput {
t.Errorf("MicroTask waiting test failed, expected sequence %s, got %s", mtwExpectedOutput, completeOutput)
}
// Check if the state is clean.
time.Sleep(10 * time.Millisecond)
if atomic.LoadInt32(microTasks) != 0 {
t.Fatalf("test ends with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
}
}
// Test Microtask ordering.
@ -189,6 +200,12 @@ func lowPrioSignalledTaskTester() {
}
func TestMicroTaskOrdering(t *testing.T) { //nolint:paralleltest // Too much interference expected.
// Check if the state is clean.
if atomic.LoadInt32(microTasks) != 0 {
t.Fatalf("cannot start test with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
}
// skip
if testing.Short() {
t.Skip("skipping test in short mode, as it is not fully deterministic")
@ -243,4 +260,10 @@ func TestMicroTaskOrdering(t *testing.T) { //nolint:paralleltest // Too much int
!strings.Contains(completeOutput, "22222") {
t.Errorf("MicroTask ordering test failed, output was %s. This happens occasionally, please run the test multiple times to verify", completeOutput)
}
// Check if the state is clean.
time.Sleep(10 * time.Millisecond)
if atomic.LoadInt32(microTasks) != 0 {
t.Fatalf("test ends with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
}
}

View file

@ -36,8 +36,8 @@ func Start() error {
defer mgmtLock.Unlock()
// start microtask scheduler
go microTaskScheduler()
SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0))
go microTaskScheduler()
// inter-link modules
err := initDependencies()