mirror of
https://github.com/safing/portbase
synced 2025-04-10 20:49:09 +00:00
269 lines
6.2 KiB
Go
269 lines
6.2 KiB
Go
package modules
|
|
|
|
import (
|
|
"context"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
mtTestName = "microtask test"
|
|
mtModule = initNewModule("microtask test module", nil, nil, nil)
|
|
)
|
|
|
|
func init() {
|
|
go microTaskScheduler()
|
|
}
|
|
|
|
func TestMicroTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much interference expected.
|
|
|
|
// Check if the state is clean.
|
|
if atomic.LoadInt32(microTasks) != 0 {
|
|
t.Fatalf("cannot start test with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
|
|
}
|
|
|
|
// skip
|
|
if testing.Short() {
|
|
t.Skip("skipping test in short mode, as it is not fully deterministic")
|
|
}
|
|
|
|
// init
|
|
mtwWaitGroup := new(sync.WaitGroup)
|
|
mtwOutputChannel := make(chan string, 100)
|
|
mtwExpectedOutput := "1234567"
|
|
mtwSleepDuration := 10 * time.Millisecond
|
|
|
|
// TEST
|
|
mtwWaitGroup.Add(4)
|
|
|
|
// ensure we only execute one microtask at once
|
|
atomic.StoreInt32(microTasksThreshhold, 1)
|
|
|
|
// High Priority - slot 1-5
|
|
go func() {
|
|
defer mtwWaitGroup.Done()
|
|
// exec at slot 1
|
|
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
|
|
mtwOutputChannel <- "1" // slot 1
|
|
time.Sleep(mtwSleepDuration * 5)
|
|
mtwOutputChannel <- "2" // slot 5
|
|
return nil
|
|
})
|
|
}()
|
|
|
|
time.Sleep(mtwSleepDuration * 1)
|
|
|
|
// clear clearances
|
|
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
|
|
return nil
|
|
})
|
|
|
|
// Low Priority - slot 16
|
|
go func() {
|
|
defer mtwWaitGroup.Done()
|
|
// exec at slot 2
|
|
_ = mtModule.RunLowPriorityMicroTask(mtTestName, 0, func(ctx context.Context) error {
|
|
mtwOutputChannel <- "7" // slot 16
|
|
return nil
|
|
})
|
|
}()
|
|
|
|
time.Sleep(mtwSleepDuration * 1)
|
|
|
|
// High Priority - slot 10-15
|
|
go func() {
|
|
defer mtwWaitGroup.Done()
|
|
time.Sleep(mtwSleepDuration * 8)
|
|
// exec at slot 10
|
|
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
|
|
mtwOutputChannel <- "4" // slot 10
|
|
time.Sleep(mtwSleepDuration * 5)
|
|
mtwOutputChannel <- "6" // slot 15
|
|
return nil
|
|
})
|
|
}()
|
|
|
|
// Medium Priority - slot 6-13
|
|
go func() {
|
|
defer mtwWaitGroup.Done()
|
|
// exec at slot 3
|
|
_ = mtModule.RunMicroTask(mtTestName, 0, func(ctx context.Context) error {
|
|
mtwOutputChannel <- "3" // slot 6
|
|
time.Sleep(mtwSleepDuration * 7)
|
|
mtwOutputChannel <- "5" // slot 13
|
|
return nil
|
|
})
|
|
}()
|
|
|
|
// wait for test to finish
|
|
mtwWaitGroup.Wait()
|
|
|
|
// collect output
|
|
close(mtwOutputChannel)
|
|
completeOutput := ""
|
|
for s := <-mtwOutputChannel; s != ""; s = <-mtwOutputChannel {
|
|
completeOutput += s
|
|
}
|
|
// check if test succeeded
|
|
t.Logf("microTask wait order: %s", completeOutput)
|
|
if completeOutput != mtwExpectedOutput {
|
|
t.Errorf("MicroTask waiting test failed, expected sequence %s, got %s", mtwExpectedOutput, completeOutput)
|
|
}
|
|
|
|
// Check if the state is clean.
|
|
time.Sleep(10 * time.Millisecond)
|
|
if atomic.LoadInt32(microTasks) != 0 {
|
|
t.Fatalf("test ends with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
|
|
}
|
|
}
|
|
|
|
// Test Microtask ordering.
|
|
|
|
// Microtask test globals.
|
|
|
|
var (
|
|
mtoWaitGroup sync.WaitGroup
|
|
mtoOutputChannel chan string
|
|
mtoWaitCh chan struct{}
|
|
)
|
|
|
|
// Microtask test functions.
|
|
|
|
func highPrioTaskTester() {
|
|
defer mtoWaitGroup.Done()
|
|
<-mtoWaitCh
|
|
_ = mtModule.RunHighPriorityMicroTask(mtTestName, func(ctx context.Context) error {
|
|
mtoOutputChannel <- "0"
|
|
time.Sleep(2 * time.Millisecond)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func highPrioSignalledTaskTester() {
|
|
defer mtoWaitGroup.Done()
|
|
<-mtoWaitCh
|
|
go func() {
|
|
done := mtModule.SignalHighPriorityMicroTask()
|
|
defer done()
|
|
|
|
mtoOutputChannel <- "0"
|
|
time.Sleep(2 * time.Millisecond)
|
|
}()
|
|
}
|
|
|
|
func mediumPrioTaskTester() {
|
|
defer mtoWaitGroup.Done()
|
|
<-mtoWaitCh
|
|
_ = mtModule.RunMicroTask(mtTestName, 0, func(ctx context.Context) error {
|
|
mtoOutputChannel <- "1"
|
|
time.Sleep(2 * time.Millisecond)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func mediumPrioSignalledTaskTester() {
|
|
defer mtoWaitGroup.Done()
|
|
<-mtoWaitCh
|
|
go func() {
|
|
done := mtModule.SignalMicroTask(0)
|
|
defer done()
|
|
|
|
mtoOutputChannel <- "1"
|
|
time.Sleep(2 * time.Millisecond)
|
|
}()
|
|
}
|
|
|
|
func lowPrioTaskTester() {
|
|
defer mtoWaitGroup.Done()
|
|
<-mtoWaitCh
|
|
_ = mtModule.RunLowPriorityMicroTask(mtTestName, 0, func(ctx context.Context) error {
|
|
mtoOutputChannel <- "2"
|
|
time.Sleep(2 * time.Millisecond)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func lowPrioSignalledTaskTester() {
|
|
defer mtoWaitGroup.Done()
|
|
<-mtoWaitCh
|
|
go func() {
|
|
done := mtModule.SignalLowPriorityMicroTask(0)
|
|
defer done()
|
|
|
|
mtoOutputChannel <- "2"
|
|
time.Sleep(2 * time.Millisecond)
|
|
}()
|
|
}
|
|
|
|
func TestMicroTaskOrdering(t *testing.T) { //nolint:paralleltest // Too much interference expected.
|
|
|
|
// Check if the state is clean.
|
|
if atomic.LoadInt32(microTasks) != 0 {
|
|
t.Fatalf("cannot start test with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
|
|
}
|
|
|
|
// skip
|
|
if testing.Short() {
|
|
t.Skip("skipping test in short mode, as it is not fully deterministic")
|
|
}
|
|
|
|
// Only allow a single concurrent task for testing.
|
|
atomic.StoreInt32(microTasksThreshhold, 1)
|
|
defer SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0))
|
|
|
|
// init
|
|
mtoOutputChannel = make(chan string, 100)
|
|
mtoWaitCh = make(chan struct{})
|
|
|
|
// TEST
|
|
|
|
// init all in waiting state
|
|
for i := 0; i < 5; i++ {
|
|
mtoWaitGroup.Add(6)
|
|
go lowPrioTaskTester()
|
|
go lowPrioSignalledTaskTester()
|
|
go mediumPrioTaskTester()
|
|
go mediumPrioSignalledTaskTester()
|
|
go highPrioTaskTester()
|
|
go highPrioSignalledTaskTester()
|
|
}
|
|
|
|
// wait for all goroutines to be ready
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
// sync all goroutines
|
|
close(mtoWaitCh)
|
|
// trigger
|
|
select {
|
|
case microTaskFinished <- struct{}{}:
|
|
default:
|
|
}
|
|
|
|
// wait for test to finish
|
|
mtoWaitGroup.Wait()
|
|
|
|
// collect output
|
|
close(mtoOutputChannel)
|
|
completeOutput := ""
|
|
for s := range mtoOutputChannel {
|
|
completeOutput += s
|
|
}
|
|
|
|
// check if test succeeded
|
|
t.Logf("microTask exec order: %s", completeOutput)
|
|
if !strings.Contains(completeOutput, "000") ||
|
|
!strings.Contains(completeOutput, "1111") ||
|
|
!strings.Contains(completeOutput, "22222") {
|
|
t.Errorf("MicroTask ordering test failed, output was %s. This happens occasionally, please run the test multiple times to verify", completeOutput)
|
|
}
|
|
|
|
// Check if the state is clean.
|
|
time.Sleep(10 * time.Millisecond)
|
|
if atomic.LoadInt32(microTasks) != 0 {
|
|
t.Fatalf("test ends with dirty state: %d microtasks", atomic.LoadInt32(microTasks))
|
|
}
|
|
}
|