safing-portbase/modules/microtasks_test.go

269 lines
6.2 KiB
Go

package modules
import (
"context"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
var (
mtTestName = "microtask test"
mtModule = initNewModule("microtask test module", nil, nil, nil)
)
func init() {
go microTaskScheduler()
}
func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much interference expected.
// Check if the state is clean.
if atomic.LoadInt32(microTasks) != 0 {
t.Fatalf("cannot start test with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
}
// skip
if testing.Short() {
t.Skip("skipping test in short mode, as it is not fully deterministic")
}
// init
mtwWaitGroup := new(sync.WaitGroup)
mtwOutputChannel := make(chan string, 100)
mtwExpectedOutput := "1234567"
mtwSleepDuration := 10 * time.Millisecond
// TEST
mtwWaitGroup.Add(4)
// ensure we only execute one microtask at once
atomic.StoreInt32(microTasksThreshhold, 1)
// High Priority - slot 1-5
go func() {
defer mtwWaitGroup.Done()
// exec at slot 1
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "1" // slot 1
time.Sleep(mtwSleepDuration * 5)
mtwOutputChannel <- "2" // slot 5
return nil
})
}()
time.Sleep(mtwSleepDuration * 1)
// clear clearances
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
return nil
})
// Low Priority - slot 16
go func() {
defer mtwWaitGroup.Done()
// exec at slot 2
_ = mtModule.RunLowPriorityMicroTask(mtTestName, 0, func(ctx context.Context) error {
mtwOutputChannel <- "7" // slot 16
return nil
})
}()
time.Sleep(mtwSleepDuration * 1)
// High Priority - slot 10-15
go func() {
defer mtwWaitGroup.Done()
time.Sleep(mtwSleepDuration * 8)
// exec at slot 10
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
mtwOutputChannel <- "4" // slot 10
time.Sleep(mtwSleepDuration * 5)
mtwOutputChannel <- "6" // slot 15
return nil
})
}()
// Medium Priority - slot 6-13
go func() {
defer mtwWaitGroup.Done()
// exec at slot 3
_ = mtModule.RunMicroTask(mtTestName, 0, func(ctx context.Context) error {
mtwOutputChannel <- "3" // slot 6
time.Sleep(mtwSleepDuration * 7)
mtwOutputChannel <- "5" // slot 13
return nil
})
}()
// wait for test to finish
mtwWaitGroup.Wait()
// collect output
close(mtwOutputChannel)
completeOutput := ""
for s := <-mtwOutputChannel; s != ""; s = <-mtwOutputChannel {
completeOutput += s
}
// check if test succeeded
t.Logf("microTask wait order: %s", completeOutput)
if completeOutput != mtwExpectedOutput {
t.Errorf("MicroTask waiting test failed, expected sequence %s, got %s", mtwExpectedOutput, completeOutput)
}
// Check if the state is clean.
time.Sleep(10 * time.Millisecond)
if atomic.LoadInt32(microTasks) != 0 {
t.Fatalf("test ends with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
}
}
// Test Microtask ordering.
// Microtask test globals.
var (
mtoWaitGroup sync.WaitGroup
mtoOutputChannel chan string
mtoWaitCh chan struct{}
)
// Microtask test functions.
func highPrioTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
mtoOutputChannel <- "0"
time.Sleep(2 * time.Millisecond)
return nil
})
}
func highPrioSignalledTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
go func() {
done := mtModule.SignalHighPriorityMicroTask()
defer done()
mtoOutputChannel <- "0"
time.Sleep(2 * time.Millisecond)
}()
}
func mediumPrioTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
_ = mtModule.RunMicroTask(mtTestName, 0, func(ctx context.Context) error {
mtoOutputChannel <- "1"
time.Sleep(2 * time.Millisecond)
return nil
})
}
func mediumPrioSignalledTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
go func() {
done := mtModule.SignalMicroTask(0)
defer done()
mtoOutputChannel <- "1"
time.Sleep(2 * time.Millisecond)
}()
}
func lowPrioTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
_ = mtModule.RunLowPriorityMicroTask(mtTestName, 0, func(ctx context.Context) error {
mtoOutputChannel <- "2"
time.Sleep(2 * time.Millisecond)
return nil
})
}
func lowPrioSignalledTaskTester() {
defer mtoWaitGroup.Done()
<-mtoWaitCh
go func() {
done := mtModule.SignalLowPriorityMicroTask(0)
defer done()
mtoOutputChannel <- "2"
time.Sleep(2 * time.Millisecond)
}()
}
func TestMicroTaskOrdering(t *testing.T) { //nolint:paralleltest // Too much interference expected.
// Check if the state is clean.
if atomic.LoadInt32(microTasks) != 0 {
t.Fatalf("cannot start test with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
}
// skip
if testing.Short() {
t.Skip("skipping test in short mode, as it is not fully deterministic")
}
// Only allow a single concurrent task for testing.
atomic.StoreInt32(microTasksThreshhold, 1)
defer SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0))
// init
mtoOutputChannel = make(chan string, 100)
mtoWaitCh = make(chan struct{})
// TEST
// init all in waiting state
for i := 0; i < 5; i++ {
mtoWaitGroup.Add(6)
go lowPrioTaskTester()
go lowPrioSignalledTaskTester()
go mediumPrioTaskTester()
go mediumPrioSignalledTaskTester()
go highPrioTaskTester()
go highPrioSignalledTaskTester()
}
// wait for all goroutines to be ready
time.Sleep(10 * time.Millisecond)
// sync all goroutines
close(mtoWaitCh)
// trigger
select {
case microTaskFinished <- struct{}{}:
default:
}
// wait for test to finish
mtoWaitGroup.Wait()
// collect output
close(mtoOutputChannel)
completeOutput := ""
for s := range mtoOutputChannel {
completeOutput += s
}
// check if test succeeded
t.Logf("microTask exec order: %s", completeOutput)
if !strings.Contains(completeOutput, "000") ||
!strings.Contains(completeOutput, "1111") ||
!strings.Contains(completeOutput, "22222") {
t.Errorf("MicroTask ordering test failed, output was %s. This happens occasionally, please run the test multiple times to verify", completeOutput)
}
// Check if the state is clean.
time.Sleep(10 * time.Millisecond)
if atomic.LoadInt32(microTasks) != 0 {
t.Fatalf("test ends with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
}
}