Add new micro-task scheduler implementation

This patch adds a new implementation for the current global
micro-task-scheduler. It's backward compatible and works as
a drop-in replacement. The new implementation is struct based
and meant to support easier unit testing by being mocked out.

This patch is the first in a series of changes that refactor
portbase's module system to be less global-state heavy and
thus support easier and more powerfull testing of dependents.
This commit is contained in:
Patrick Pacher 2020-07-12 18:52:58 +02:00
parent 568dd3bf33
commit 127f3f82c9
No known key found for this signature in database
GPG key ID: E8CD2DA160925A6D
3 changed files with 371 additions and 151 deletions

View file

@ -2,27 +2,270 @@ package modules
import (
"context"
"reflect"
"sync"
"sync/atomic"
"time"
"github.com/safing/portbase/log"
"github.com/tevino/abool"
)
// TODO: getting some errors when in nanosecond precision for tests:
// (1) panic: sync: WaitGroup is reused before previous Wait has returned - should theoretically not happen
// (2) sometimes there seems to some kind of race condition stuff, the test hangs and does not complete
var (
microTasks *int32
microTasksThreshhold *int32
microTaskFinished = make(chan struct{}, 1)
var emptyStructVal = reflect.ValueOf(struct{}{})
mediumPriorityClearance = make(chan struct{})
lowPriorityClearance = make(chan struct{})
// MicroTaskFunc defines the function prototype for micro tasks.
type MicroTaskFunc func(ctx context.Context) error
triggerLogWriting = log.TriggerWriterChannel()
)
// MicroTaskScheduler schedules execution of mirco tasks. In addition
// to normal micro tasks, the scheduler supports medium and low priority
// tasks that are only executed if execution slots are available. While
// all normal (high priority) tasks are executed immediately low and medium
// priority tasks are only run if less tasks than the configured concurrency
// limit are currently active. The execution of each tasks is wrapped in
// with recovery and reporting methods.
type MicroTaskScheduler struct {
// activeTasks counts the number of currently running
// tasks.
// Note that high priority tasks do not honor the task
// scheduler and are executed immediately.
activeTasks int32
// concurrencyLimit limits the number of tasks that are
// allowed to be executed concurrently.
concurrencyLimit int32
// taskFinishNotifier is used to signal the scheduler
// goroutine that another task may run. Senders must not
// expect the channel to be read and use a select block
// with a default.
taskFinishNotifier chan struct{}
// mediumPriorityClearance is used to trigger a medium priority
// task waiting for execution.
mediumPriorityClearance chan struct{}
// lowPriorityClearance is used to trigger a low priority
// task waiting for execution.
lowPriorityClearance chan struct{}
// idleNotifiersLock is used to protect idleNotifiers
idleNotifiersLock sync.Mutex
// idleNotifiers is a slice of channels that are
// notified when the task scheduler is idle.
idleNotifiers []chan<- struct{}
// wg can be used to wait for all currently running tasks
// to finish.
wg sync.WaitGroup
// start is used to start the micro-task scheduler once.
start sync.Once
}
// Wait waits until the micro task scheduler has shutdown.
func (ts *MicroTaskScheduler) Wait() {
ts.wg.Wait()
}
// NewMicroTaskScheduler returns a new microtask scheduler.
func NewMicroTaskScheduler() *MicroTaskScheduler {
return &MicroTaskScheduler{
taskFinishNotifier: make(chan struct{}),
mediumPriorityClearance: make(chan struct{}),
lowPriorityClearance: make(chan struct{}),
}
}
// Start starts the microtask scheduler. If the scheduler has already
// been started Start() is a no-op. If ctx is cancelled the task
// schedule will shutdown and cannot be started again.
func (ts *MicroTaskScheduler) Start(ctx context.Context) {
ts.start.Do(func() {
ts.wg.Add(1)
go ts.scheduler(ctx)
})
}
// SetMaxConcurrentMicroTasks sets the maximum number of tasks
// that are allowed to execute concurrently.
func (ts *MicroTaskScheduler) SetMaxConcurrentMicroTasks(limit int) {
if limit < 4 {
limit = 4
}
atomic.StoreInt32(&ts.concurrencyLimit, int32(limit))
}
// RunNow executes fn as a micro task right now without waiting for a
// dedicated execution slot.
func (ts *MicroTaskScheduler) RunNow(m *Module, name string, fn MicroTaskFunc) error {
now := make(chan struct{})
close(now)
atomic.AddInt32(&ts.activeTasks, 1) // we start immediately so the scheduler doesn't know
return ts.waitAndRun(m, now, time.Second, name, fn)
}
// RunLowPriority waits for a low-priority execution slot and executes fn.
func (ts *MicroTaskScheduler) RunLowPriority(m *Module, name string, fn MicroTaskFunc) error {
return ts.waitAndRun(m, ts.lowPriorityClearance, lowPriorityMaxDelay, name, fn)
}
// RunMediumPriority waits for a medium-priority execution slot and
// executes fn.
func (ts *MicroTaskScheduler) RunMediumPriority(m *Module, name string, fn MicroTaskFunc) error {
return ts.waitAndRun(m, ts.mediumPriorityClearance, mediumPriorityMaxDelay, name, fn)
}
// AddIdleNotifier adds ch as an idle notifier. Whenever the micro task scheduler becomes
// idle it tries to notify one of the listening channels. The scheduler will block until
// a notifier becomes active (receives on ch). Most of the time callers should use
// unbuffered channels for notifiers.
//
// Example:
//
// idle := make(chan struct{})
// go func() {
// for _ = range idle {
// // Microtask scheduler is idle ...
// }
// }()
// ts.AddIdleNotifier(idle)
//
func (ts *MicroTaskScheduler) AddIdleNotifier(ch chan<- struct{}) {
ts.idleNotifiersLock.Lock()
defer ts.idleNotifiersLock.Unlock()
ts.idleNotifiers = append(ts.idleNotifiers, ch)
}
// runMicroTask executes immediately executes fn with task recovery and
// scheduling notifiers. The caller is responsible of increasing ts.wg
// before executing runMicroTask.
func (ts *MicroTaskScheduler) runMicroTask(m *Module, name string, fn MicroTaskFunc) (err error) {
defer Recoverf(m, &err, name, "microtask")
defer ts.wg.Done()
defer func() {
atomic.AddInt32(&ts.activeTasks, -1)
select {
case ts.taskFinishNotifier <- struct{}{}:
default:
}
}()
return fn(m.Ctx)
}
// waitAndRun increases the wait group of ts and waits for either trigger or maxDelay to allow execution
// of fn. Once triggered, waitAndRun calls runMicroTask.
func (ts *MicroTaskScheduler) waitAndRun(m *Module, trigger <-chan struct{}, maxDelay time.Duration, name string, fn MicroTaskFunc) (err error) {
ts.wg.Add(1)
select {
case <-trigger:
default:
select {
case <-time.After(maxDelay):
case <-trigger:
case <-m.Ctx.Done():
// NOTE(ppacher): legacy task scheduler (<= v0.7.2) did not
// wait for context cancellation and risked racing
// between starting a task and blocking on the wait group.
// It's not sure if we can count that as backwards
// compatible.
ts.wg.Done()
return ErrShuttingDown
}
if m.Ctx.Err() != nil {
// <-trigger may fire if it's closed during scheduler shutdown.
return ErrShuttingDown
}
}
return ts.runMicroTask(m, name, fn)
}
func (ts *MicroTaskScheduler) canFire() bool {
return atomic.LoadInt32(&ts.activeTasks) < atomic.LoadInt32(&ts.concurrencyLimit)
}
func (ts *MicroTaskScheduler) notifyAnyWaiter(ctx context.Context) (isTask bool) {
ts.idleNotifiersLock.Lock()
cases := make([]reflect.SelectCase, len(ts.idleNotifiers)+3)
cases[0] = reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(ts.mediumPriorityClearance),
Send: emptyStructVal,
}
cases[1] = reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(ts.lowPriorityClearance),
Send: emptyStructVal,
}
cases[2] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
}
for idx, ch := range ts.idleNotifiers {
cases[idx+3] = reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(ch),
Send: emptyStructVal,
}
}
ts.idleNotifiersLock.Unlock()
chosen, _, _ := reflect.Select(cases)
switch chosen {
case 0, 1: // medium or low priority clearance
return true
case 2: // context cancelled
return false
default: // external idle notifiers
return false
}
}
func (ts *MicroTaskScheduler) scheduler(ctx context.Context) {
defer ts.wg.Done()
defer close(ts.lowPriorityClearance)
defer close(ts.mediumPriorityClearance)
for {
if ts.canFire() {
select {
case <-ctx.Done():
return
case ts.mediumPriorityClearance <- struct{}{}:
atomic.AddInt32(&ts.activeTasks, 1)
default:
// trigger the first one to wait for a slot
if isTask := ts.notifyAnyWaiter(ctx); isTask {
// increase task counter
atomic.AddInt32(&ts.activeTasks, 1)
}
}
} else {
// wait for signal that a task was completed
select {
case <-ctx.Done():
return
case <-ts.taskFinishNotifier:
case <-time.After(1 * time.Second):
}
}
}
}
// DefaultMicroTaskScheduler is the default micro task scheduler.
var DefaultMicroTaskScheduler *MicroTaskScheduler
const (
mediumPriorityMaxDelay = 1 * time.Second
@ -30,174 +273,90 @@ const (
)
func init() {
var microTasksVal int32
microTasks = &microTasksVal
var microTasksThreshholdVal int32
microTasksThreshhold = &microTasksThreshholdVal
DefaultMicroTaskScheduler = NewMicroTaskScheduler()
// trigger log writting when the microtask scheduler
// is idle.
DefaultMicroTaskScheduler.AddIdleNotifier(log.TriggerWriterChannel())
// allow normal tasks to execute if the micro-task scheduler is
// idle.
DefaultMicroTaskScheduler.AddIdleNotifier(taskTimeslot)
}
// SetMaxConcurrentMicroTasks sets the maximum number of microtasks that should be run concurrently.
// SetMaxConcurrentMicroTasks sets the maximum number of microtasks that should be run
// concurrently. It uses the default micro task scheduler instance.
func SetMaxConcurrentMicroTasks(n int) {
if n < 4 {
atomic.StoreInt32(microTasksThreshhold, 4)
} else {
atomic.StoreInt32(microTasksThreshhold, int32(n))
}
DefaultMicroTaskScheduler.SetMaxConcurrentMicroTasks(n)
}
// StartMicroTask starts a new MicroTask with high priority. It will start immediately. The call starts a new goroutine and returns immediately. The given function will be executed and panics caught. The supplied name must not be changed.
// StartMicroTask starts a new MicroTask with high priority. It will start immediately.
// The call starts a new goroutine and returns immediately. The given function will
// be executed and panics caught. The supplied name must not be changed.
func (m *Module) StartMicroTask(name *string, fn func(context.Context) error) {
go func() {
err := m.RunMicroTask(name, fn)
if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err)
if err := m.RunMicroTask(name, fn); err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, name, err)
}
}()
}
// StartMediumPriorityMicroTask starts a new MicroTask with medium priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 3 seconds). The given function will be executed and panics caught. The supplied name must not be changed.
// StartMediumPriorityMicroTask starts a new MicroTask with medium priority.
// The call starts a new goroutine and returns immediately. It will wait until
// a slot becomes available (max 3 seconds). The given function will be
// executed and panics caught. The supplied name must not be changed.
func (m *Module) StartMediumPriorityMicroTask(name *string, fn func(context.Context) error) {
go func() {
err := m.RunMediumPriorityMicroTask(name, fn)
if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err)
if err := m.RunMediumPriorityMicroTask(name, fn); err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, name, err)
}
}()
}
// StartLowPriorityMicroTask starts a new MicroTask with low priority. The call starts a new goroutine and returns immediately. It will wait until a slot becomes available (max 15 seconds). The given function will be executed and panics caught. The supplied name must not be changed.
// StartLowPriorityMicroTask starts a new MicroTask with low priority.
// The call starts a new goroutine and returns immediately. It will wait
// until a slot becomes available (max 15 seconds). The given function
// will be executed and panics caught. The supplied name must not be changed.
func (m *Module) StartLowPriorityMicroTask(name *string, fn func(context.Context) error) {
go func() {
err := m.RunLowPriorityMicroTask(name, fn)
if err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, *name, err)
if err := m.RunLowPriorityMicroTask(name, fn); err != nil {
log.Warningf("%s: microtask %s failed: %s", m.Name, name, err)
}
}()
}
// RunMicroTask runs a new MicroTask with high priority. It will start immediately. The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed.
// RunMicroTask runs a new MicroTask with high priority.
// It will start immediately. The call blocks until finished.
// The given function will be executed and panics caught.
// The supplied name must not be changed.
func (m *Module) RunMicroTask(name *string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
return errNoModule
}
atomic.AddInt32(microTasks, 1) // increase global counter here, as high priority tasks are not started by the scheduler, where this counter is usually increased
return m.runMicroTask(name, fn)
defer m.countTask()()
return DefaultMicroTaskScheduler.RunNow(m, *name, fn)
}
// RunMediumPriorityMicroTask runs a new MicroTask with medium priority. It will wait until a slot becomes available (max 3 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed.
// RunMediumPriorityMicroTask runs a new MicroTask with medium priority.
// It will wait until a slot becomes available (max 3 seconds). The call blocks
// until finished. The given function will be executed and panics caught.
// The supplied name must not be changed.
func (m *Module) RunMediumPriorityMicroTask(name *string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
return errNoModule
}
// check if we can go immediately
select {
case <-mediumPriorityClearance:
default:
// wait for go or max delay
select {
case <-mediumPriorityClearance:
case <-time.After(mediumPriorityMaxDelay):
}
}
return m.runMicroTask(name, fn)
defer m.countTask()()
return DefaultMicroTaskScheduler.RunMediumPriority(m, *name, fn)
}
// RunLowPriorityMicroTask runs a new MicroTask with low priority. It will wait until a slot becomes available (max 15 seconds). The call blocks until finished. The given function will be executed and panics caught. The supplied name must not be changed.
// RunLowPriorityMicroTask runs a new MicroTask with low priority.
// It will wait until a slot becomes available (max 15 seconds).
// The call blocks until finished. The given function will be executed
// and panics caught. The supplied name must not be changed.
func (m *Module) RunLowPriorityMicroTask(name *string, fn func(context.Context) error) error {
if m == nil {
log.Errorf(`modules: cannot start microtask "%s" with nil module`, *name)
return errNoModule
}
// check if we can go immediately
select {
case <-lowPriorityClearance:
default:
// wait for go or max delay
select {
case <-lowPriorityClearance:
case <-time.After(lowPriorityMaxDelay):
}
}
return m.runMicroTask(name, fn)
defer m.countTask()()
return DefaultMicroTaskScheduler.RunLowPriority(m, *name, fn)
}
func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err error) {
// start for module
// hint: only microTasks global var is important for scheduling, others can be set here
func (m *Module) countTask() func() {
atomic.AddInt32(m.microTaskCnt, 1)
m.waitGroup.Add(1)
// set up recovery
defer func() {
// recover from panic
panicVal := recover()
if panicVal != nil {
me := m.NewPanicError(*name, "microtask", panicVal)
me.Report()
log.Errorf("%s: microtask %s panicked: %s", m.Name, *name, panicVal)
err = me
}
return func() {
// finish for module
atomic.AddInt32(m.microTaskCnt, -1)
m.waitGroup.Done()
// finish and possibly trigger next task
atomic.AddInt32(microTasks, -1)
select {
case microTaskFinished <- struct{}{}:
default:
}
}()
// run
err = fn(m.Ctx)
return //nolint:nakedret // need to use named return val in order to change in defer
}
var microTaskSchedulerStarted = abool.NewBool(false)
func microTaskScheduler() {
// only ever start once
if !microTaskSchedulerStarted.SetToIf(false, true) {
return
}
microTaskManageLoop:
for {
if shutdownFlag.IsSet() {
close(mediumPriorityClearance)
close(lowPriorityClearance)
return
}
if atomic.LoadInt32(microTasks) < atomic.LoadInt32(microTasksThreshhold) { // space left for firing task
select {
case mediumPriorityClearance <- struct{}{}:
default:
select {
case taskTimeslot <- struct{}{}:
continue microTaskManageLoop
case triggerLogWriting <- struct{}{}:
continue microTaskManageLoop
case mediumPriorityClearance <- struct{}{}:
case lowPriorityClearance <- struct{}{}:
}
}
// increase task counter
atomic.AddInt32(microTasks, 1)
} else {
// wait for signal that a task was completed
select {
case <-microTaskFinished:
case <-time.After(1 * time.Second):
}
}
}
}

View file

@ -2,11 +2,14 @@ package modules
import (
"context"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
var (
@ -15,11 +18,59 @@ var (
)
func init() {
go microTaskScheduler()
// TODO(ppacher): remove once refactoring is done
DefaultMicroTaskScheduler.Start(context.Background())
}
func TestMicroTaskScheduler(t *testing.T) {
sched := NewMicroTaskScheduler()
sched.SetMaxConcurrentMicroTasks(4)
done := make(chan struct{})
defer close(done)
// occupy three of our 4 slots.
go sched.RunNow(mtModule, "block", func(context.Context) error { <-done; return nil })
go sched.RunNow(mtModule, "block", func(context.Context) error { <-done; return nil })
go sched.RunNow(mtModule, "block", func(context.Context) error { <-done; return nil })
results := make(chan string, 100)
go sched.RunLowPriority(mtModule, "low-1", func(context.Context) error {
results <- "low"
return nil
})
go sched.RunLowPriority(mtModule, "low-2", func(context.Context) error {
results <- "low"
return nil
})
go sched.RunMediumPriority(mtModule, "medium-1", func(context.Context) error {
results <- "medium"
return nil
})
go sched.RunMediumPriority(mtModule, "medium-2", func(context.Context) error {
results <- "medium"
return nil
})
var str []string
for s := range results {
str = append(str, s)
if len(str) == 4 {
close(results)
}
}
assert.Equal(t, []string{"medium", "medium", "low", "low"}, str)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sched.Start(ctx)
}
// test waiting.
func TestMicroTaskWaiting(t *testing.T) {
DefaultMicroTaskScheduler.Start(context.Background())
// skip
if testing.Short() {
t.Skip("skipping test in short mode, as it is not fully deterministic")
@ -29,19 +80,20 @@ func TestMicroTaskWaiting(t *testing.T) {
mtwWaitGroup := new(sync.WaitGroup)
mtwOutputChannel := make(chan string, 100)
mtwExpectedOutput := "1234567"
mtwSleepDuration := 10 * time.Millisecond
mtwSleepDuration := 1 * time.Millisecond
// TEST
mtwWaitGroup.Add(4)
// ensure we only execute one microtask at once
atomic.StoreInt32(microTasksThreshhold, 1)
atomic.StoreInt32(&DefaultMicroTaskScheduler.concurrencyLimit, 1)
// High Priority - slot 1-5
go func() {
defer mtwWaitGroup.Done()
// exec at slot 1
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error {
name := "1 and 2"
_ = mtModule.RunMicroTask(&name, func(ctx context.Context) error {
mtwOutputChannel <- "1" // slot 1
time.Sleep(mtwSleepDuration * 5)
mtwOutputChannel <- "2" // slot 5
@ -49,6 +101,7 @@ func TestMicroTaskWaiting(t *testing.T) {
})
}()
runtime.Gosched()
time.Sleep(mtwSleepDuration * 1)
// clear clearances
@ -60,12 +113,14 @@ func TestMicroTaskWaiting(t *testing.T) {
go func() {
defer mtwWaitGroup.Done()
// exec at slot 2
_ = mtModule.RunLowPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
name := "7"
_ = mtModule.RunLowPriorityMicroTask(&name, func(ctx context.Context) error {
mtwOutputChannel <- "7" // slot 16
return nil
})
}()
runtime.Gosched()
time.Sleep(mtwSleepDuration * 1)
// High Priority - slot 10-15
@ -73,7 +128,8 @@ func TestMicroTaskWaiting(t *testing.T) {
defer mtwWaitGroup.Done()
time.Sleep(mtwSleepDuration * 8)
// exec at slot 10
_ = mtModule.RunMicroTask(&mtTestName, func(ctx context.Context) error {
name := "4 and 6"
_ = mtModule.RunMicroTask(&name, func(ctx context.Context) error {
mtwOutputChannel <- "4" // slot 10
time.Sleep(mtwSleepDuration * 5)
mtwOutputChannel <- "6" // slot 15
@ -85,7 +141,8 @@ func TestMicroTaskWaiting(t *testing.T) {
go func() {
defer mtwWaitGroup.Done()
// exec at slot 3
_ = mtModule.RunMediumPriorityMicroTask(&mtTestName, func(ctx context.Context) error {
name := "3 and 5"
_ = mtModule.RunMediumPriorityMicroTask(&name, func(ctx context.Context) error {
mtwOutputChannel <- "3" // slot 6
time.Sleep(mtwSleepDuration * 7)
mtwOutputChannel <- "5" // slot 13
@ -145,6 +202,7 @@ func TestMicroTaskOrdering(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode, as it is not fully deterministic")
}
DefaultMicroTaskScheduler.Start(context.Background())
// init
mtoOutputChannel = make(chan string, 100)
@ -154,7 +212,7 @@ func TestMicroTaskOrdering(t *testing.T) {
mtoWaitGroup.Add(20)
// ensure we only execute one microtask at once
atomic.StoreInt32(microTasksThreshhold, 1)
atomic.StoreInt32(&DefaultMicroTaskScheduler.concurrencyLimit, 1)
// kick off
go mediumPrioTaskTester()
@ -185,7 +243,7 @@ func TestMicroTaskOrdering(t *testing.T) {
close(mtoWaitCh)
// trigger
select {
case microTaskFinished <- struct{}{}:
case DefaultMicroTaskScheduler.taskFinishNotifier <- struct{}{}:
default:
}

View file

@ -1,6 +1,7 @@
package modules
import (
"context"
"errors"
"fmt"
"os"
@ -37,7 +38,9 @@ func Start() error {
defer mgmtLock.Unlock()
// start microtask scheduler
go microTaskScheduler()
// FIXME go microTaskScheduler()
DefaultMicroTaskScheduler.Start(context.Background())
SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0) * 2)
// inter-link modules