mirror of
https://github.com/safing/portbase
synced 2025-04-10 20:49:09 +00:00
255 lines
6 KiB
Go
255 lines
6 KiB
Go
package modules
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"runtime/pprof"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func init() {
|
|
go taskQueueHandler()
|
|
go taskScheduleHandler()
|
|
|
|
go func() {
|
|
<-time.After(30 * time.Second)
|
|
fmt.Fprintln(os.Stderr, "taking too long")
|
|
_ = pprof.Lookup("goroutine").WriteTo(os.Stderr, 2)
|
|
os.Exit(1)
|
|
}()
|
|
|
|
// always trigger task timeslot for testing
|
|
go func() {
|
|
for {
|
|
taskTimeslot <- struct{}{}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Test queued tasks.
|
|
|
|
// Queued task test globals.
|
|
|
|
var (
|
|
qtWg sync.WaitGroup
|
|
qtOutputChannel chan string
|
|
qtSleepDuration time.Duration
|
|
qtModule *Module
|
|
)
|
|
|
|
func init() {
|
|
qtModule = initNewModule("task test module", nil, nil, nil)
|
|
qtModule.status = StatusOnline
|
|
}
|
|
|
|
// Queued task test functions.
|
|
|
|
func queuedTaskTester(s string) {
|
|
qtModule.NewTask(s, func(ctx context.Context, t *Task) error {
|
|
time.Sleep(qtSleepDuration * 2)
|
|
qtOutputChannel <- s
|
|
qtWg.Done()
|
|
return nil
|
|
}).Queue()
|
|
}
|
|
|
|
func prioritizedTaskTester(s string) {
|
|
qtModule.NewTask(s, func(ctx context.Context, t *Task) error {
|
|
time.Sleep(qtSleepDuration * 2)
|
|
qtOutputChannel <- s
|
|
qtWg.Done()
|
|
return nil
|
|
}).QueuePrioritized()
|
|
}
|
|
|
|
func TestQueuedTask(t *testing.T) { //nolint:paralleltest // Too much interference expected.
|
|
// skip
|
|
if testing.Short() {
|
|
t.Skip("skipping test in short mode, as it is not fully deterministic")
|
|
}
|
|
|
|
// init
|
|
expectedOutput := "0123456789"
|
|
qtSleepDuration = 20 * time.Millisecond
|
|
qtOutputChannel = make(chan string, 100)
|
|
qtWg.Add(10)
|
|
|
|
// TEST
|
|
queuedTaskTester("0")
|
|
queuedTaskTester("1")
|
|
queuedTaskTester("3")
|
|
queuedTaskTester("4")
|
|
queuedTaskTester("6")
|
|
queuedTaskTester("7")
|
|
queuedTaskTester("9")
|
|
|
|
time.Sleep(qtSleepDuration * 3)
|
|
prioritizedTaskTester("2")
|
|
time.Sleep(qtSleepDuration * 6)
|
|
prioritizedTaskTester("5")
|
|
time.Sleep(qtSleepDuration * 6)
|
|
prioritizedTaskTester("8")
|
|
|
|
// wait for test to finish
|
|
qtWg.Wait()
|
|
|
|
// collect output
|
|
close(qtOutputChannel)
|
|
completeOutput := ""
|
|
for s := <-qtOutputChannel; s != ""; s = <-qtOutputChannel {
|
|
completeOutput += s
|
|
}
|
|
// check if test succeeded
|
|
if completeOutput != expectedOutput {
|
|
t.Errorf("QueuedTask test failed, expected sequence %s, got %s", expectedOutput, completeOutput)
|
|
}
|
|
}
|
|
|
|
// Test scheduled tasks.
|
|
|
|
// Scheduled task test globals.
|
|
|
|
var (
|
|
stWg sync.WaitGroup
|
|
stOutputChannel chan string
|
|
stSleepDuration time.Duration
|
|
stWaitCh chan bool
|
|
)
|
|
|
|
// Scheduled task test functions.
|
|
|
|
func scheduledTaskTester(s string, sched time.Time) {
|
|
qtModule.NewTask(s, func(ctx context.Context, t *Task) error {
|
|
time.Sleep(stSleepDuration)
|
|
stOutputChannel <- s
|
|
stWg.Done()
|
|
return nil
|
|
}).Schedule(sched)
|
|
}
|
|
|
|
func TestScheduledTaskWaiting(t *testing.T) { //nolint:paralleltest // Too much interference expected.
|
|
|
|
// skip
|
|
if testing.Short() {
|
|
t.Skip("skipping test in short mode, as it is not fully deterministic")
|
|
}
|
|
|
|
// init
|
|
expectedOutput := "0123456789"
|
|
stSleepDuration = 10 * time.Millisecond
|
|
stOutputChannel = make(chan string, 100)
|
|
stWaitCh = make(chan bool)
|
|
|
|
stWg.Add(10)
|
|
|
|
// TEST
|
|
scheduledTaskTester("4", time.Now().Add(stSleepDuration*8))
|
|
scheduledTaskTester("0", time.Now().Add(stSleepDuration*0))
|
|
scheduledTaskTester("8", time.Now().Add(stSleepDuration*16))
|
|
scheduledTaskTester("1", time.Now().Add(stSleepDuration*2))
|
|
scheduledTaskTester("7", time.Now().Add(stSleepDuration*14))
|
|
scheduledTaskTester("9", time.Now().Add(stSleepDuration*18))
|
|
scheduledTaskTester("3", time.Now().Add(stSleepDuration*6))
|
|
scheduledTaskTester("2", time.Now().Add(stSleepDuration*4))
|
|
scheduledTaskTester("6", time.Now().Add(stSleepDuration*12))
|
|
scheduledTaskTester("5", time.Now().Add(stSleepDuration*10))
|
|
|
|
// wait for test to finish
|
|
close(stWaitCh)
|
|
stWg.Wait()
|
|
|
|
// collect output
|
|
close(stOutputChannel)
|
|
completeOutput := ""
|
|
for s := <-stOutputChannel; s != ""; s = <-stOutputChannel {
|
|
completeOutput += s
|
|
}
|
|
// check if test succeeded
|
|
if completeOutput != expectedOutput {
|
|
t.Errorf("ScheduledTask test failed, expected sequence %s, got %s", expectedOutput, completeOutput)
|
|
}
|
|
}
|
|
|
|
func TestRequeueingTask(t *testing.T) { //nolint:paralleltest // Too much interference expected.
|
|
blockWg := &sync.WaitGroup{}
|
|
wg := &sync.WaitGroup{}
|
|
|
|
// block task execution
|
|
blockWg.Add(1) // mark done at beginning
|
|
wg.Add(2) // mark done at end
|
|
block := qtModule.NewTask("TestRequeueingTask:block", func(ctx context.Context, t *Task) error {
|
|
blockWg.Done()
|
|
time.Sleep(100 * time.Millisecond)
|
|
wg.Done()
|
|
return nil
|
|
}).StartASAP()
|
|
// make sure first task has started
|
|
blockWg.Wait()
|
|
// fmt.Printf("%s: %+v\n", time.Now(), block)
|
|
|
|
// schedule again while executing
|
|
blockWg.Add(1) // mark done at beginning
|
|
block.StartASAP()
|
|
// fmt.Printf("%s: %+v\n", time.Now(), block)
|
|
|
|
// test task
|
|
wg.Add(1)
|
|
task := qtModule.NewTask("TestRequeueingTask:test", func(ctx context.Context, t *Task) error {
|
|
wg.Done()
|
|
return nil
|
|
}).Schedule(time.Now().Add(2 * time.Second))
|
|
|
|
// reschedule
|
|
task.Schedule(time.Now().Add(1 * time.Second))
|
|
task.Queue()
|
|
task.QueuePrioritized()
|
|
task.StartASAP()
|
|
wg.Wait()
|
|
time.Sleep(100 * time.Millisecond) // let tasks finalize execution
|
|
|
|
// do it again
|
|
|
|
// block task execution (while first block task is still running!)
|
|
blockWg.Add(1) // mark done at beginning
|
|
wg.Add(1) // mark done at end
|
|
block.StartASAP()
|
|
blockWg.Wait()
|
|
// reschedule
|
|
wg.Add(1)
|
|
task.Schedule(time.Now().Add(1 * time.Second))
|
|
task.Queue()
|
|
task.QueuePrioritized()
|
|
task.StartASAP()
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestQueueSuccession(t *testing.T) { //nolint:paralleltest // Too much interference expected.
|
|
var cnt int
|
|
wg := &sync.WaitGroup{}
|
|
wg.Add(10)
|
|
|
|
tt := qtModule.NewTask("TestRequeueingTask:test", func(ctx context.Context, task *Task) error {
|
|
time.Sleep(10 * time.Millisecond)
|
|
wg.Done()
|
|
cnt++
|
|
fmt.Printf("completed succession %d\n", cnt)
|
|
switch cnt {
|
|
case 1, 4, 6:
|
|
task.Queue()
|
|
case 2, 5, 8:
|
|
task.StartASAP()
|
|
case 3, 7, 9:
|
|
task.Schedule(time.Now().Add(10 * time.Millisecond))
|
|
}
|
|
return nil
|
|
})
|
|
// fmt.Printf("%+v\n", tt)
|
|
tt.StartASAP()
|
|
// time.Sleep(100 * time.Millisecond)
|
|
// fmt.Printf("%+v\n", tt)
|
|
|
|
wg.Wait()
|
|
}
|