mirror of
https://github.com/safing/portbase
synced 2025-04-10 20:49:09 +00:00
393 lines
12 KiB
Go
393 lines
12 KiB
Go
package modules
|
|
|
|
import (
|
|
"context"
|
|
"runtime"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/tevino/abool"
|
|
|
|
"github.com/safing/portbase/log"
|
|
)
|
|
|
|
// TODO: getting some errors when in nanosecond precision for tests:
|
|
// (1) panic: sync: WaitGroup is reused before previous Wait has returned - should theoretically not happen
|
|
// (2) sometimes there seems to some kind of race condition stuff, the test hangs and does not complete
|
|
// NOTE: These might be resolved by the switch to clearance queues.
|
|
|
|
var (
|
|
microTasks *int32
|
|
microTasksThreshhold *int32
|
|
microTaskFinished = make(chan struct{}, 1)
|
|
)
|
|
|
|
const (
|
|
defaultMediumPriorityMaxDelay = 1 * time.Second
|
|
defaultLowPriorityMaxDelay = 3 * time.Second
|
|
)
|
|
|
|
func init() {
|
|
var microTasksVal int32
|
|
microTasks = µTasksVal
|
|
|
|
microTasksThreshholdVal := int32(runtime.GOMAXPROCS(0) * 2)
|
|
microTasksThreshhold = µTasksThreshholdVal
|
|
}
|
|
|
|
// SetMaxConcurrentMicroTasks sets the maximum number of microtasks that should
|
|
// be run concurrently. The modules system initializes it with GOMAXPROCS.
|
|
// The minimum is 2.
|
|
func SetMaxConcurrentMicroTasks(n int) {
|
|
if n < 2 {
|
|
atomic.StoreInt32(microTasksThreshhold, 2)
|
|
} else {
|
|
atomic.StoreInt32(microTasksThreshhold, int32(n))
|
|
}
|
|
}
|
|
|
|
// StartHighPriorityMicroTask starts a new MicroTask with high priority.
|
|
// It will start immediately.
|
|
// The call starts a new goroutine and returns immediately.
|
|
// The given function will be executed and panics caught.
|
|
func (m *Module) StartHighPriorityMicroTask(name string, fn func(context.Context) error) {
|
|
go func() {
|
|
err := m.RunHighPriorityMicroTask(name, fn)
|
|
if err != nil {
|
|
log.Warningf("%s: microtask %s failed: %s", m.Name, name, err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// StartMicroTask starts a new MicroTask with medium priority.
|
|
// The call starts a new goroutine and returns immediately.
|
|
// It will wait until a slot becomes available while respecting maxDelay.
|
|
// You can also set maxDelay to 0 to use the default value of 1 second.
|
|
// The given function will be executed and panics caught.
|
|
func (m *Module) StartMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) {
|
|
go func() {
|
|
err := m.RunMicroTask(name, maxDelay, fn)
|
|
if err != nil {
|
|
log.Warningf("%s: microtask %s failed: %s", m.Name, name, err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// StartLowPriorityMicroTask starts a new MicroTask with low priority.
|
|
// The call starts a new goroutine and returns immediately.
|
|
// It will wait until a slot becomes available while respecting maxDelay.
|
|
// You can also set maxDelay to 0 to use the default value of 3 seconds.
|
|
// The given function will be executed and panics caught.
|
|
func (m *Module) StartLowPriorityMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) {
|
|
go func() {
|
|
err := m.RunLowPriorityMicroTask(name, maxDelay, fn)
|
|
if err != nil {
|
|
log.Warningf("%s: microtask %s failed: %s", m.Name, name, err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// RunHighPriorityMicroTask starts a new MicroTask with high priority.
|
|
// The given function will be executed and panics caught.
|
|
// The call blocks until the given function finishes.
|
|
func (m *Module) RunHighPriorityMicroTask(name string, fn func(context.Context) error) error {
|
|
if m == nil {
|
|
log.Errorf(`modules: cannot start microtask "%s" with nil module`, name)
|
|
return errNoModule
|
|
}
|
|
|
|
// Increase global counter here, as high priority tasks do not wait for clearance.
|
|
atomic.AddInt32(microTasks, 1)
|
|
return m.runMicroTask(name, fn)
|
|
}
|
|
|
|
// RunMicroTask starts a new MicroTask with medium priority.
|
|
// It will wait until a slot becomes available while respecting maxDelay.
|
|
// You can also set maxDelay to 0 to use the default value of 1 second.
|
|
// The given function will be executed and panics caught.
|
|
// The call blocks until the given function finishes.
|
|
func (m *Module) RunMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) error {
|
|
if m == nil {
|
|
log.Errorf(`modules: cannot start microtask "%s" with nil module`, name)
|
|
return errNoModule
|
|
}
|
|
|
|
// Set default max delay, if not defined.
|
|
if maxDelay <= 0 {
|
|
maxDelay = defaultMediumPriorityMaxDelay
|
|
}
|
|
|
|
getMediumPriorityClearance(maxDelay)
|
|
return m.runMicroTask(name, fn)
|
|
}
|
|
|
|
// RunLowPriorityMicroTask starts a new MicroTask with low priority.
|
|
// It will wait until a slot becomes available while respecting maxDelay.
|
|
// You can also set maxDelay to 0 to use the default value of 3 seconds.
|
|
// The given function will be executed and panics caught.
|
|
// The call blocks until the given function finishes.
|
|
func (m *Module) RunLowPriorityMicroTask(name string, maxDelay time.Duration, fn func(context.Context) error) error {
|
|
if m == nil {
|
|
log.Errorf(`modules: cannot start microtask "%s" with nil module`, name)
|
|
return errNoModule
|
|
}
|
|
|
|
// Set default max delay, if not defined.
|
|
if maxDelay <= 0 {
|
|
maxDelay = defaultLowPriorityMaxDelay
|
|
}
|
|
|
|
getLowPriorityClearance(maxDelay)
|
|
return m.runMicroTask(name, fn)
|
|
}
|
|
|
|
func (m *Module) runMicroTask(name string, fn func(context.Context) error) (err error) {
|
|
// start for module
|
|
// hint: only microTasks global var is important for scheduling, others can be set here
|
|
atomic.AddInt32(m.microTaskCnt, 1)
|
|
|
|
// set up recovery
|
|
defer func() {
|
|
// recover from panic
|
|
panicVal := recover()
|
|
if panicVal != nil {
|
|
me := m.NewPanicError(name, "microtask", panicVal)
|
|
me.Report()
|
|
log.Errorf("%s: microtask %s panicked: %s", m.Name, name, panicVal)
|
|
err = me
|
|
}
|
|
|
|
m.concludeMicroTask()
|
|
}()
|
|
|
|
// run
|
|
err = fn(m.Ctx)
|
|
return // Use named return val in order to change it in defer.
|
|
}
|
|
|
|
// SignalHighPriorityMicroTask signals the start of a new MicroTask with high priority.
|
|
// The returned "done" function SHOULD be called when the task has finished
|
|
// and MUST be called in any case. Failing to do so will have devastating effects.
|
|
// You can safely call "done" multiple times; additional calls do nothing.
|
|
func (m *Module) SignalHighPriorityMicroTask() (done func()) {
|
|
if m == nil {
|
|
log.Errorf("modules: cannot signal microtask with nil module")
|
|
return
|
|
}
|
|
|
|
// Increase global counter here, as high priority tasks do not wait for clearance.
|
|
atomic.AddInt32(microTasks, 1)
|
|
return m.signalMicroTask()
|
|
}
|
|
|
|
// SignalMicroTask signals the start of a new MicroTask with medium priority.
|
|
// The call will wait until a slot becomes available while respecting maxDelay.
|
|
// You can also set maxDelay to 0 to use the default value of 1 second.
|
|
// The returned "done" function SHOULD be called when the task has finished
|
|
// and MUST be called in any case. Failing to do so will have devastating effects.
|
|
// You can safely call "done" multiple times; additional calls do nothing.
|
|
func (m *Module) SignalMicroTask(maxDelay time.Duration) (done func()) {
|
|
if m == nil {
|
|
log.Errorf("modules: cannot signal microtask with nil module")
|
|
return
|
|
}
|
|
|
|
getMediumPriorityClearance(maxDelay)
|
|
return m.signalMicroTask()
|
|
}
|
|
|
|
// SignalLowPriorityMicroTask signals the start of a new MicroTask with low priority.
|
|
// The call will wait until a slot becomes available while respecting maxDelay.
|
|
// You can also set maxDelay to 0 to use the default value of 1 second.
|
|
// The returned "done" function SHOULD be called when the task has finished
|
|
// and MUST be called in any case. Failing to do so will have devastating effects.
|
|
// You can safely call "done" multiple times; additional calls do nothing.
|
|
func (m *Module) SignalLowPriorityMicroTask(maxDelay time.Duration) (done func()) {
|
|
if m == nil {
|
|
log.Errorf("modules: cannot signal microtask with nil module")
|
|
return
|
|
}
|
|
|
|
getLowPriorityClearance(maxDelay)
|
|
return m.signalMicroTask()
|
|
}
|
|
|
|
func (m *Module) signalMicroTask() (done func()) {
|
|
// Start microtask for module.
|
|
// Global counter is set earlier as required for scheduling.
|
|
atomic.AddInt32(m.microTaskCnt, 1)
|
|
|
|
doneCalled := abool.New()
|
|
return func() {
|
|
if doneCalled.SetToIf(false, true) {
|
|
m.concludeMicroTask()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Module) concludeMicroTask() {
|
|
// Finish for module.
|
|
atomic.AddInt32(m.microTaskCnt, -1)
|
|
m.checkIfStopComplete()
|
|
|
|
// Finish and possibly trigger next task.
|
|
atomic.AddInt32(microTasks, -1)
|
|
select {
|
|
case microTaskFinished <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
var (
|
|
clearanceQueueBaseSize = 100
|
|
clearanceQueueSize = runtime.GOMAXPROCS(0) * clearanceQueueBaseSize
|
|
mediumPriorityClearance = make(chan chan struct{}, clearanceQueueSize)
|
|
lowPriorityClearance = make(chan chan struct{}, clearanceQueueSize)
|
|
|
|
triggerLogWriting = log.TriggerWriterChannel()
|
|
|
|
microTaskSchedulerStarted = abool.NewBool(false)
|
|
)
|
|
|
|
func microTaskScheduler() {
|
|
var clearanceSignal chan struct{}
|
|
|
|
// Create ticker for max delay for checking clearances.
|
|
recheck := time.NewTicker(1 * time.Second)
|
|
defer recheck.Stop()
|
|
|
|
// only ever start once
|
|
if !microTaskSchedulerStarted.SetToIf(false, true) {
|
|
return
|
|
}
|
|
|
|
// Debugging: Print current amount of microtasks.
|
|
// go func() {
|
|
// for {
|
|
// time.Sleep(1 * time.Second)
|
|
// log.Debugf("modules: microtasks: %d", atomic.LoadInt32(microTasks))
|
|
// }
|
|
// }()
|
|
|
|
for {
|
|
if shutdownFlag.IsSet() {
|
|
go microTaskShutdownScheduler()
|
|
return
|
|
}
|
|
|
|
// Check if there is space for one more microtask.
|
|
if atomic.LoadInt32(microTasks) < atomic.LoadInt32(microTasksThreshhold) { // space left for firing task
|
|
// Give Medium clearance.
|
|
select {
|
|
case clearanceSignal = <-mediumPriorityClearance:
|
|
default:
|
|
|
|
// Give Medium and Low clearance.
|
|
select {
|
|
case clearanceSignal = <-mediumPriorityClearance:
|
|
case clearanceSignal = <-lowPriorityClearance:
|
|
default:
|
|
|
|
// Give Medium, Low and other clearancee.
|
|
select {
|
|
case clearanceSignal = <-mediumPriorityClearance:
|
|
case clearanceSignal = <-lowPriorityClearance:
|
|
case taskTimeslot <- struct{}{}:
|
|
case triggerLogWriting <- struct{}{}:
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send clearance signal and increase task counter.
|
|
if clearanceSignal != nil {
|
|
close(clearanceSignal)
|
|
atomic.AddInt32(microTasks, 1)
|
|
}
|
|
clearanceSignal = nil
|
|
} else {
|
|
// wait for signal that a task was completed
|
|
select {
|
|
case <-microTaskFinished:
|
|
case <-recheck.C:
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
func microTaskShutdownScheduler() {
|
|
var clearanceSignal chan struct{}
|
|
|
|
for {
|
|
// During shutdown, always give clearances immediately.
|
|
select {
|
|
case clearanceSignal = <-mediumPriorityClearance:
|
|
case clearanceSignal = <-lowPriorityClearance:
|
|
case taskTimeslot <- struct{}{}:
|
|
case triggerLogWriting <- struct{}{}:
|
|
}
|
|
|
|
// Give clearance if requested.
|
|
if clearanceSignal != nil {
|
|
close(clearanceSignal)
|
|
atomic.AddInt32(microTasks, 1)
|
|
}
|
|
clearanceSignal = nil
|
|
}
|
|
}
|
|
|
|
func getMediumPriorityClearance(maxDelay time.Duration) {
|
|
// Submit signal to scheduler.
|
|
signal := make(chan struct{})
|
|
select {
|
|
case mediumPriorityClearance <- signal:
|
|
default:
|
|
select {
|
|
case mediumPriorityClearance <- signal:
|
|
case <-time.After(maxDelay):
|
|
// Start without clearance and increase microtask counter.
|
|
atomic.AddInt32(microTasks, 1)
|
|
return
|
|
}
|
|
}
|
|
// Wait for signal to start.
|
|
select {
|
|
case <-signal:
|
|
default:
|
|
select {
|
|
case <-signal:
|
|
case <-time.After(maxDelay):
|
|
// Don't keep waiting for signal forever.
|
|
// Don't increase microtask counter, as the signal was already submitted
|
|
// and the counter will be increased by the scheduler.
|
|
}
|
|
}
|
|
}
|
|
|
|
func getLowPriorityClearance(maxDelay time.Duration) {
|
|
// Submit signal to scheduler.
|
|
signal := make(chan struct{})
|
|
select {
|
|
case lowPriorityClearance <- signal:
|
|
default:
|
|
select {
|
|
case lowPriorityClearance <- signal:
|
|
case <-time.After(maxDelay):
|
|
// Start without clearance and increase microtask counter.
|
|
atomic.AddInt32(microTasks, 1)
|
|
return
|
|
}
|
|
}
|
|
// Wait for signal to start.
|
|
select {
|
|
case <-signal:
|
|
default:
|
|
select {
|
|
case <-signal:
|
|
case <-time.After(maxDelay):
|
|
// Don't keep waiting for signal forever.
|
|
// Don't increase microtask counter, as the signal was already submitted
|
|
// and the counter will be increased by the scheduler.
|
|
}
|
|
}
|
|
}
|