safing-portbase/modules/tasks.go
2023-04-20 15:03:15 +02:00

580 lines
12 KiB
Go

package modules
import (
"container/list"
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/tevino/abool"
"github.com/safing/portbase/log"
)
// Task is managed task bound to a module.
type Task struct {
name string
module *Module
taskFn func(context.Context, *Task) error
canceled bool
executing bool
overtime bool // locked by scheduleLock
// these are populated at task creation
// ctx is canceled when module is shutdown -> all tasks become canceled
ctx context.Context
cancelCtx func()
executeAt time.Time
repeat time.Duration
maxDelay time.Duration
queueElement *list.Element
prioritizedQueueElement *list.Element
scheduleListElement *list.Element
lock sync.Mutex
}
var (
taskQueue = list.New()
prioritizedTaskQueue = list.New()
queuesLock sync.Mutex
queueWg sync.WaitGroup
taskSchedule = list.New()
scheduleLock sync.Mutex
waitForever chan time.Time
queueIsFilled = make(chan struct{}, 1) // kick off queue handler
notifyTaskScheduler = make(chan struct{}, 1)
taskTimeslot = make(chan struct{})
)
const (
maxTimeslotWait = 30 * time.Second
minRepeatDuration = 1 * time.Minute
maxExecutionWait = 1 * time.Minute
defaultMaxDelay = 1 * time.Minute
)
// NewTask creates a new task with a descriptive name (non-unique), a optional
// deadline, and the task function to be executed. You must call one of Queue,
// QueuePrioritized, StartASAP, Schedule or Repeat in order to have the Task
// executed.
func (m *Module) NewTask(name string, fn func(context.Context, *Task) error) *Task {
if m == nil {
log.Errorf(`modules: cannot create task "%s" with nil module`, name)
return &Task{
name: name,
module: &Module{Name: "[NONE]"},
canceled: true,
}
}
m.Lock()
defer m.Unlock()
return m.newTask(name, fn)
}
func (m *Module) newTask(name string, fn func(context.Context, *Task) error) *Task {
if m.Ctx == nil || !m.OnlineSoon() {
log.Errorf(`modules: tasks should only be started when the module is online or starting`)
return &Task{
name: name,
module: m,
canceled: true,
}
}
// create new task
newTask := &Task{
name: name,
module: m,
taskFn: fn,
maxDelay: defaultMaxDelay,
}
// create context
newTask.ctx, newTask.cancelCtx = context.WithCancel(m.Ctx)
return newTask
}
func (t *Task) isActive() bool {
if t.canceled {
return false
}
return t.module.OnlineSoon()
}
func (t *Task) prepForQueueing() (ok bool) {
if !t.isActive() {
return false
}
if t.maxDelay != 0 {
t.executeAt = time.Now().Add(t.maxDelay)
t.addToSchedule(true)
}
return true
}
func notifyQueue() {
select {
case queueIsFilled <- struct{}{}:
default:
}
}
// Queue queues the Task for execution.
func (t *Task) Queue() *Task {
t.lock.Lock()
defer t.lock.Unlock()
if !t.prepForQueueing() {
return t
}
if t.queueElement == nil {
queuesLock.Lock()
t.queueElement = taskQueue.PushBack(t)
queuesLock.Unlock()
}
notifyQueue()
return t
}
// QueuePrioritized queues the Task for execution in the prioritized queue.
func (t *Task) QueuePrioritized() *Task {
t.lock.Lock()
defer t.lock.Unlock()
if !t.prepForQueueing() {
return t
}
if t.prioritizedQueueElement == nil {
queuesLock.Lock()
t.prioritizedQueueElement = prioritizedTaskQueue.PushBack(t)
queuesLock.Unlock()
}
notifyQueue()
return t
}
// StartASAP schedules the task to be executed next.
func (t *Task) StartASAP() *Task {
t.lock.Lock()
defer t.lock.Unlock()
if !t.prepForQueueing() {
return t
}
queuesLock.Lock()
if t.prioritizedQueueElement == nil {
t.prioritizedQueueElement = prioritizedTaskQueue.PushFront(t)
} else {
prioritizedTaskQueue.MoveToFront(t.prioritizedQueueElement)
}
queuesLock.Unlock()
notifyQueue()
return t
}
// MaxDelay sets a maximum delay within the task should be executed from being queued. Scheduled tasks are queued when they are triggered. The default delay is 3 minutes.
func (t *Task) MaxDelay(maxDelay time.Duration) *Task {
t.lock.Lock()
defer t.lock.Unlock()
t.maxDelay = maxDelay
return t
}
// Schedule schedules the task for execution at the given time. A zero time will remove cancel the scheduled execution.
func (t *Task) Schedule(executeAt time.Time) *Task {
t.lock.Lock()
defer t.lock.Unlock()
t.executeAt = executeAt
if executeAt.IsZero() {
t.removeFromQueues()
} else {
t.addToSchedule(false)
}
return t
}
// Repeat sets the task to be executed in endless repeat at the specified interval. First execution will be after interval. Minimum repeat interval is one minute. An interval of zero will disable repeating, but won't change the current schedule.
func (t *Task) Repeat(interval time.Duration) *Task {
// check minimum interval duration
if interval != 0 && interval < minRepeatDuration {
interval = minRepeatDuration
}
t.lock.Lock()
defer t.lock.Unlock()
// Check if repeating should be disabled.
if interval == 0 {
t.repeat = 0
return t
}
t.repeat = interval
t.executeAt = time.Now().Add(t.repeat)
t.addToSchedule(false)
return t
}
// Cancel cancels the current and any future execution of the Task. This is not reversible by any other functions.
func (t *Task) Cancel() {
t.lock.Lock()
defer t.lock.Unlock()
t.canceled = true
if t.cancelCtx != nil {
t.cancelCtx()
}
}
func (t *Task) removeFromQueues() {
// remove from lists
if t.queueElement != nil {
queuesLock.Lock()
taskQueue.Remove(t.queueElement)
queuesLock.Unlock()
t.queueElement = nil
}
if t.prioritizedQueueElement != nil {
queuesLock.Lock()
prioritizedTaskQueue.Remove(t.prioritizedQueueElement)
queuesLock.Unlock()
t.prioritizedQueueElement = nil
}
if t.scheduleListElement != nil {
scheduleLock.Lock()
taskSchedule.Remove(t.scheduleListElement)
t.overtime = false
scheduleLock.Unlock()
t.scheduleListElement = nil
}
}
func (t *Task) runWithLocking() {
t.lock.Lock()
// we will not attempt execution, remove from queues
t.removeFromQueues()
// check if task is already executing
if t.executing {
t.lock.Unlock()
return
}
// check if task is active
// - has not been cancelled
// - module is online (soon)
if !t.isActive() {
t.lock.Unlock()
return
}
// check if module was stopped
select {
case <-t.ctx.Done():
t.lock.Unlock()
return
default:
}
// enter executing state
t.executing = true
t.lock.Unlock()
// wait for good timeslot regarding microtasks
select {
case <-taskTimeslot:
case <-time.After(maxTimeslotWait):
}
// wait for module start
if !t.module.Online() {
if t.module.OnlineSoon() {
// wait
<-t.module.StartCompleted()
} else {
// abort, module will not come online
t.lock.Lock()
t.executing = false
t.lock.Unlock()
return
}
}
// add to queue workgroup
queueWg.Add(1)
go t.executeWithLocking()
go func() {
select {
case <-t.ctx.Done():
case <-time.After(maxExecutionWait):
}
// complete queue worker (early) to allow next worker
queueWg.Done()
}()
}
func (t *Task) executeWithLocking() {
// start for module
// hint: only queueWg global var is important for scheduling, others can be set here
atomic.AddInt32(t.module.taskCnt, 1)
defer func() {
// recover from panic
panicVal := recover()
if panicVal != nil {
me := t.module.NewPanicError(t.name, "task", panicVal)
me.Report()
log.Errorf("%s: task %s panicked: %s\n%s", t.module.Name, t.name, panicVal, me.StackTrace)
}
// finish for module
atomic.AddInt32(t.module.taskCnt, -1)
t.module.checkIfStopComplete()
t.lock.Lock()
// reset state
t.executing = false
// repeat?
if t.isActive() && t.repeat != 0 && t.executeAt.IsZero() {
t.executeAt = time.Now().Add(t.repeat)
t.addToSchedule(false)
}
// notify that we finished
t.cancelCtx()
// refresh context
// RACE CONDITION with L314!
t.ctx, t.cancelCtx = context.WithCancel(t.module.Ctx)
t.lock.Unlock()
}()
// reset executeAt to detect if task set next execution itself
t.executeAt = time.Time{}
// run
err := t.taskFn(t.ctx, t)
switch {
case err == nil:
return
case errors.Is(err, context.Canceled):
log.Debugf("%s: task %s was canceled: %s", t.module.Name, t.name, err)
default:
log.Errorf("%s: task %s failed: %s", t.module.Name, t.name, err)
}
}
func (t *Task) addToSchedule(overtime bool) {
if !t.isActive() {
return
}
scheduleLock.Lock()
defer scheduleLock.Unlock()
// defer printTaskList(taskSchedule) // for debugging
if overtime {
// do not set to false
t.overtime = true
}
// notify scheduler
defer func() {
select {
case notifyTaskScheduler <- struct{}{}:
default:
}
}()
// insert task into schedule
for e := taskSchedule.Front(); e != nil; e = e.Next() {
// check for self
eVal := e.Value.(*Task) //nolint:forcetypeassert // Can only be *Task.
if eVal == t {
continue
}
// compare
if t.executeAt.Before(eVal.executeAt) {
// insert/move task
if t.scheduleListElement == nil {
t.scheduleListElement = taskSchedule.InsertBefore(t, e)
} else {
taskSchedule.MoveBefore(t.scheduleListElement, e)
}
return
}
}
// add/move to end
if t.scheduleListElement == nil {
t.scheduleListElement = taskSchedule.PushBack(t)
} else {
taskSchedule.MoveToBack(t.scheduleListElement)
}
}
func waitUntilNextScheduledTask() <-chan time.Time {
scheduleLock.Lock()
defer scheduleLock.Unlock()
if taskSchedule.Len() > 0 {
return time.After(time.Until(taskSchedule.Front().Value.(*Task).executeAt)) //nolint:forcetypeassert // Can only be *Task.
}
return waitForever
}
var (
taskQueueHandlerStarted = abool.NewBool(false)
taskScheduleHandlerStarted = abool.NewBool(false)
)
func taskQueueHandler() {
// only ever start once
if !taskQueueHandlerStarted.SetToIf(false, true) {
return
}
for {
// wait
select {
case <-shutdownSignal:
return
case <-queueIsFilled:
}
// execute
execLoop:
for {
// wait for execution slot
queueWg.Wait()
// check for shutdown
if shutdownFlag.IsSet() {
return
}
// get next Task
queuesLock.Lock()
e := prioritizedTaskQueue.Front()
if e != nil {
prioritizedTaskQueue.Remove(e)
} else {
e = taskQueue.Front()
if e != nil {
taskQueue.Remove(e)
}
}
queuesLock.Unlock()
// lists are empty
if e == nil {
break execLoop
}
// value -> Task
t := e.Value.(*Task) //nolint:forcetypeassert // Can only be *Task.
// run
t.runWithLocking()
}
}
}
func taskScheduleHandler() {
// only ever start once
if !taskScheduleHandlerStarted.SetToIf(false, true) {
return
}
for {
if sleepMode.IsSet() {
select {
case <-shutdownSignal:
return
case <-notifyTaskScheduler:
continue
}
}
select {
case <-shutdownSignal:
return
case <-notifyTaskScheduler:
continue
case <-waitUntilNextScheduledTask():
scheduleLock.Lock()
// get first task in schedule
e := taskSchedule.Front()
if e == nil {
scheduleLock.Unlock()
continue
}
t := e.Value.(*Task) //nolint:forcetypeassert // Can only be *Task.
// process Task
if t.overtime {
// already queued and maxDelay reached
t.overtime = false
scheduleLock.Unlock()
t.runWithLocking()
} else {
// place in front of prioritized queue
t.overtime = true
scheduleLock.Unlock()
t.StartASAP()
}
}
}
}
func printTaskList(*list.List) { //nolint:unused,deadcode // for debugging, NOT production use
fmt.Println("Modules Task List:")
for e := taskSchedule.Front(); e != nil; e = e.Next() {
t, ok := e.Value.(*Task)
if ok {
fmt.Printf(
"%s:%s over=%v canc=%v exec=%v exat=%s rep=%s delay=%s\n",
t.module.Name,
t.name,
t.overtime,
t.canceled,
t.executing,
t.executeAt,
t.repeat,
t.maxDelay,
)
}
}
}