safing-portbase/taskmanager/microtasks.go

178 lines
3.9 KiB
Go

// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
package taskmanager
import (
"sync/atomic"
"time"
"github.com/tevino/abool"
)
// TODO: getting some errors when in nanosecond precision for tests:
// (1) panic: sync: WaitGroup is reused before previous Wait has returned - should theoretically not happen
// (2) sometimes there seems to some kind of race condition stuff, the test hangs and does not complete
var (
closedChannel chan bool
tasks *int32
mediumPriorityClearance chan bool
lowPriorityClearance chan bool
veryLowPriorityClearance chan bool
tasksDone chan bool
tasksDoneFlag *abool.AtomicBool
tasksWaiting chan bool
tasksWaitingFlag *abool.AtomicBool
shutdownSignal = make(chan struct{}, 0)
suttingDown = abool.NewBool(false)
)
// StartMicroTask starts a new MicroTask. It will start immediately.
func StartMicroTask() {
atomic.AddInt32(tasks, 1)
tasksDoneFlag.UnSet()
}
// EndMicroTask MUST be always called when a MicroTask was previously started.
func EndMicroTask() {
c := atomic.AddInt32(tasks, -1)
if c < 1 {
if tasksDoneFlag.SetToIf(false, true) {
tasksDone <- true
}
}
}
func newTaskIsWaiting() {
tasksWaiting <- true
}
// StartMediumPriorityMicroTask starts a new MicroTask (waiting its turn) if channel receives.
func StartMediumPriorityMicroTask() chan bool {
if suttingDown.IsSet() {
return closedChannel
}
if tasksWaitingFlag.SetToIf(false, true) {
defer newTaskIsWaiting()
}
return mediumPriorityClearance
}
// StartLowPriorityMicroTask starts a new MicroTask (waiting its turn) if channel receives.
func StartLowPriorityMicroTask() chan bool {
if suttingDown.IsSet() {
return closedChannel
}
if tasksWaitingFlag.SetToIf(false, true) {
defer newTaskIsWaiting()
}
return lowPriorityClearance
}
// StartVeryLowPriorityMicroTask starts a new MicroTask (waiting its turn) if channel receives.
func StartVeryLowPriorityMicroTask() chan bool {
if suttingDown.IsSet() {
return closedChannel
}
if tasksWaitingFlag.SetToIf(false, true) {
defer newTaskIsWaiting()
}
return veryLowPriorityClearance
}
func start() error {
return nil
}
func stop() error {
close(shutdownSignal)
return nil
}
func init() {
closedChannel = make(chan bool, 0)
close(closedChannel)
var t int32 = 0
tasks = &t
mediumPriorityClearance = make(chan bool, 0)
lowPriorityClearance = make(chan bool, 0)
veryLowPriorityClearance = make(chan bool, 0)
tasksDone = make(chan bool, 1)
tasksDoneFlag = abool.NewBool(true)
tasksWaiting = make(chan bool, 1)
tasksWaitingFlag = abool.NewBool(false)
timoutTimerDuration := 1 * time.Second
// timoutTimer := time.NewTimer(timoutTimerDuration)
go func() {
microTaskManageLoop:
for {
// wait for an event to start new tasks
if !suttingDown.IsSet() {
// reset timer
// https://golang.org/pkg/time/#Timer.Reset
// if !timoutTimer.Stop() {
// <-timoutTimer.C
// }
// timoutTimer.Reset(timoutTimerDuration)
// wait for event to start a new task
select {
case <-tasksWaiting:
if !tasksDoneFlag.IsSet() {
continue microTaskManageLoop
}
case <-time.After(timoutTimerDuration):
case <-tasksDone:
case <-shutdownSignal:
}
} else {
// execute tasks until no tasks are waiting anymore
if !tasksWaitingFlag.IsSet() {
// wait until tasks are finished
if !tasksDoneFlag.IsSet() {
<-tasksDone
}
// signal module completion
// microTasksModule.StopComplete()
// exit
return
}
}
// start new task, if none is started, check if we are shutting down
select {
case mediumPriorityClearance <- true:
StartMicroTask()
default:
select {
case lowPriorityClearance <- true:
StartMicroTask()
default:
select {
case veryLowPriorityClearance <- true:
StartMicroTask()
default:
tasksWaitingFlag.UnSet()
}
}
}
}
}()
}