mirror of
https://github.com/safing/portbase
synced 2025-04-10 20:49:09 +00:00
580 lines
12 KiB
Go
580 lines
12 KiB
Go
package modules
|
|
|
|
import (
|
|
"container/list"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/tevino/abool"
|
|
|
|
"github.com/safing/portbase/log"
|
|
)
|
|
|
|
// Task is managed task bound to a module.
|
|
type Task struct {
|
|
name string
|
|
module *Module
|
|
taskFn func(context.Context, *Task) error
|
|
|
|
canceled bool
|
|
executing bool
|
|
overtime bool // locked by scheduleLock
|
|
|
|
// these are populated at task creation
|
|
// ctx is canceled when module is shutdown -> all tasks become canceled
|
|
ctx context.Context
|
|
cancelCtx func()
|
|
|
|
executeAt time.Time
|
|
repeat time.Duration
|
|
maxDelay time.Duration
|
|
|
|
queueElement *list.Element
|
|
prioritizedQueueElement *list.Element
|
|
scheduleListElement *list.Element
|
|
|
|
lock sync.Mutex
|
|
}
|
|
|
|
var (
|
|
taskQueue = list.New()
|
|
prioritizedTaskQueue = list.New()
|
|
queuesLock sync.Mutex
|
|
queueWg sync.WaitGroup
|
|
|
|
taskSchedule = list.New()
|
|
scheduleLock sync.Mutex
|
|
|
|
waitForever chan time.Time
|
|
|
|
queueIsFilled = make(chan struct{}, 1) // kick off queue handler
|
|
notifyTaskScheduler = make(chan struct{}, 1)
|
|
taskTimeslot = make(chan struct{})
|
|
)
|
|
|
|
const (
|
|
maxTimeslotWait = 30 * time.Second
|
|
minRepeatDuration = 1 * time.Minute
|
|
maxExecutionWait = 1 * time.Minute
|
|
defaultMaxDelay = 1 * time.Minute
|
|
)
|
|
|
|
// NewTask creates a new task with a descriptive name (non-unique), a optional
|
|
// deadline, and the task function to be executed. You must call one of Queue,
|
|
// QueuePrioritized, StartASAP, Schedule or Repeat in order to have the Task
|
|
// executed.
|
|
func (m *Module) NewTask(name string, fn func(context.Context, *Task) error) *Task {
|
|
if m == nil {
|
|
log.Errorf(`modules: cannot create task "%s" with nil module`, name)
|
|
return &Task{
|
|
name: name,
|
|
module: &Module{Name: "[NONE]"},
|
|
canceled: true,
|
|
}
|
|
}
|
|
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
|
|
return m.newTask(name, fn)
|
|
}
|
|
|
|
func (m *Module) newTask(name string, fn func(context.Context, *Task) error) *Task {
|
|
if m.Ctx == nil || !m.OnlineSoon() {
|
|
log.Errorf(`modules: tasks should only be started when the module is online or starting`)
|
|
return &Task{
|
|
name: name,
|
|
module: m,
|
|
canceled: true,
|
|
}
|
|
}
|
|
|
|
// create new task
|
|
newTask := &Task{
|
|
name: name,
|
|
module: m,
|
|
taskFn: fn,
|
|
maxDelay: defaultMaxDelay,
|
|
}
|
|
|
|
// create context
|
|
newTask.ctx, newTask.cancelCtx = context.WithCancel(m.Ctx)
|
|
|
|
return newTask
|
|
}
|
|
|
|
func (t *Task) isActive() bool {
|
|
if t.canceled {
|
|
return false
|
|
}
|
|
return t.module.OnlineSoon()
|
|
}
|
|
|
|
func (t *Task) prepForQueueing() (ok bool) {
|
|
if !t.isActive() {
|
|
return false
|
|
}
|
|
|
|
if t.maxDelay != 0 {
|
|
t.executeAt = time.Now().Add(t.maxDelay)
|
|
t.addToSchedule(true)
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func notifyQueue() {
|
|
select {
|
|
case queueIsFilled <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Queue queues the Task for execution.
|
|
func (t *Task) Queue() *Task {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
if !t.prepForQueueing() {
|
|
return t
|
|
}
|
|
|
|
if t.queueElement == nil {
|
|
queuesLock.Lock()
|
|
t.queueElement = taskQueue.PushBack(t)
|
|
queuesLock.Unlock()
|
|
}
|
|
|
|
notifyQueue()
|
|
return t
|
|
}
|
|
|
|
// QueuePrioritized queues the Task for execution in the prioritized queue.
|
|
func (t *Task) QueuePrioritized() *Task {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
if !t.prepForQueueing() {
|
|
return t
|
|
}
|
|
|
|
if t.prioritizedQueueElement == nil {
|
|
queuesLock.Lock()
|
|
t.prioritizedQueueElement = prioritizedTaskQueue.PushBack(t)
|
|
queuesLock.Unlock()
|
|
}
|
|
|
|
notifyQueue()
|
|
return t
|
|
}
|
|
|
|
// StartASAP schedules the task to be executed next.
|
|
func (t *Task) StartASAP() *Task {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
if !t.prepForQueueing() {
|
|
return t
|
|
}
|
|
|
|
queuesLock.Lock()
|
|
if t.prioritizedQueueElement == nil {
|
|
t.prioritizedQueueElement = prioritizedTaskQueue.PushFront(t)
|
|
} else {
|
|
prioritizedTaskQueue.MoveToFront(t.prioritizedQueueElement)
|
|
}
|
|
queuesLock.Unlock()
|
|
|
|
notifyQueue()
|
|
return t
|
|
}
|
|
|
|
// MaxDelay sets a maximum delay within the task should be executed from being queued. Scheduled tasks are queued when they are triggered. The default delay is 3 minutes.
|
|
func (t *Task) MaxDelay(maxDelay time.Duration) *Task {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
t.maxDelay = maxDelay
|
|
return t
|
|
}
|
|
|
|
// Schedule schedules the task for execution at the given time. A zero time will remove cancel the scheduled execution.
|
|
func (t *Task) Schedule(executeAt time.Time) *Task {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
t.executeAt = executeAt
|
|
|
|
if executeAt.IsZero() {
|
|
t.removeFromQueues()
|
|
} else {
|
|
t.addToSchedule(false)
|
|
}
|
|
return t
|
|
}
|
|
|
|
// Repeat sets the task to be executed in endless repeat at the specified interval. First execution will be after interval. Minimum repeat interval is one minute. An interval of zero will disable repeating, but won't change the current schedule.
|
|
func (t *Task) Repeat(interval time.Duration) *Task {
|
|
// check minimum interval duration
|
|
if interval != 0 && interval < minRepeatDuration {
|
|
interval = minRepeatDuration
|
|
}
|
|
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
// Check if repeating should be disabled.
|
|
if interval == 0 {
|
|
t.repeat = 0
|
|
return t
|
|
}
|
|
|
|
t.repeat = interval
|
|
t.executeAt = time.Now().Add(t.repeat)
|
|
t.addToSchedule(false)
|
|
return t
|
|
}
|
|
|
|
// Cancel cancels the current and any future execution of the Task. This is not reversible by any other functions.
|
|
func (t *Task) Cancel() {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
t.canceled = true
|
|
if t.cancelCtx != nil {
|
|
t.cancelCtx()
|
|
}
|
|
}
|
|
|
|
func (t *Task) removeFromQueues() {
|
|
// remove from lists
|
|
if t.queueElement != nil {
|
|
queuesLock.Lock()
|
|
taskQueue.Remove(t.queueElement)
|
|
queuesLock.Unlock()
|
|
t.queueElement = nil
|
|
}
|
|
if t.prioritizedQueueElement != nil {
|
|
queuesLock.Lock()
|
|
prioritizedTaskQueue.Remove(t.prioritizedQueueElement)
|
|
queuesLock.Unlock()
|
|
t.prioritizedQueueElement = nil
|
|
}
|
|
if t.scheduleListElement != nil {
|
|
scheduleLock.Lock()
|
|
taskSchedule.Remove(t.scheduleListElement)
|
|
t.overtime = false
|
|
scheduleLock.Unlock()
|
|
t.scheduleListElement = nil
|
|
}
|
|
}
|
|
|
|
func (t *Task) runWithLocking() {
|
|
t.lock.Lock()
|
|
|
|
// we will not attempt execution, remove from queues
|
|
t.removeFromQueues()
|
|
|
|
// check if task is already executing
|
|
if t.executing {
|
|
t.lock.Unlock()
|
|
return
|
|
}
|
|
|
|
// check if task is active
|
|
// - has not been cancelled
|
|
// - module is online (soon)
|
|
if !t.isActive() {
|
|
t.lock.Unlock()
|
|
return
|
|
}
|
|
|
|
// check if module was stopped
|
|
select {
|
|
case <-t.ctx.Done():
|
|
t.lock.Unlock()
|
|
return
|
|
default:
|
|
}
|
|
|
|
// enter executing state
|
|
t.executing = true
|
|
t.lock.Unlock()
|
|
|
|
// wait for good timeslot regarding microtasks
|
|
select {
|
|
case <-taskTimeslot:
|
|
case <-time.After(maxTimeslotWait):
|
|
}
|
|
|
|
// wait for module start
|
|
if !t.module.Online() {
|
|
if t.module.OnlineSoon() {
|
|
// wait
|
|
<-t.module.StartCompleted()
|
|
} else {
|
|
// abort, module will not come online
|
|
t.lock.Lock()
|
|
t.executing = false
|
|
t.lock.Unlock()
|
|
return
|
|
}
|
|
}
|
|
|
|
// add to queue workgroup
|
|
queueWg.Add(1)
|
|
|
|
go t.executeWithLocking()
|
|
go func() {
|
|
select {
|
|
case <-t.ctx.Done():
|
|
case <-time.After(maxExecutionWait):
|
|
}
|
|
// complete queue worker (early) to allow next worker
|
|
queueWg.Done()
|
|
}()
|
|
}
|
|
|
|
func (t *Task) executeWithLocking() {
|
|
// start for module
|
|
// hint: only queueWg global var is important for scheduling, others can be set here
|
|
atomic.AddInt32(t.module.taskCnt, 1)
|
|
|
|
defer func() {
|
|
// recover from panic
|
|
panicVal := recover()
|
|
if panicVal != nil {
|
|
me := t.module.NewPanicError(t.name, "task", panicVal)
|
|
me.Report()
|
|
log.Errorf("%s: task %s panicked: %s\n%s", t.module.Name, t.name, panicVal, me.StackTrace)
|
|
}
|
|
|
|
// finish for module
|
|
atomic.AddInt32(t.module.taskCnt, -1)
|
|
t.module.checkIfStopComplete()
|
|
|
|
t.lock.Lock()
|
|
|
|
// reset state
|
|
t.executing = false
|
|
|
|
// repeat?
|
|
if t.isActive() && t.repeat != 0 && t.executeAt.IsZero() {
|
|
t.executeAt = time.Now().Add(t.repeat)
|
|
t.addToSchedule(false)
|
|
}
|
|
|
|
// notify that we finished
|
|
t.cancelCtx()
|
|
// refresh context
|
|
|
|
// RACE CONDITION with L314!
|
|
t.ctx, t.cancelCtx = context.WithCancel(t.module.Ctx)
|
|
|
|
t.lock.Unlock()
|
|
}()
|
|
|
|
// reset executeAt to detect if task set next execution itself
|
|
t.executeAt = time.Time{}
|
|
|
|
// run
|
|
err := t.taskFn(t.ctx, t)
|
|
switch {
|
|
case err == nil:
|
|
return
|
|
case errors.Is(err, context.Canceled):
|
|
log.Debugf("%s: task %s was canceled: %s", t.module.Name, t.name, err)
|
|
default:
|
|
log.Errorf("%s: task %s failed: %s", t.module.Name, t.name, err)
|
|
}
|
|
}
|
|
|
|
func (t *Task) addToSchedule(overtime bool) {
|
|
if !t.isActive() {
|
|
return
|
|
}
|
|
|
|
scheduleLock.Lock()
|
|
defer scheduleLock.Unlock()
|
|
// defer printTaskList(taskSchedule) // for debugging
|
|
|
|
if overtime {
|
|
// do not set to false
|
|
t.overtime = true
|
|
}
|
|
|
|
// notify scheduler
|
|
defer func() {
|
|
select {
|
|
case notifyTaskScheduler <- struct{}{}:
|
|
default:
|
|
}
|
|
}()
|
|
|
|
// insert task into schedule
|
|
for e := taskSchedule.Front(); e != nil; e = e.Next() {
|
|
// check for self
|
|
eVal := e.Value.(*Task) //nolint:forcetypeassert // Can only be *Task.
|
|
if eVal == t {
|
|
continue
|
|
}
|
|
// compare
|
|
if t.executeAt.Before(eVal.executeAt) {
|
|
// insert/move task
|
|
if t.scheduleListElement == nil {
|
|
t.scheduleListElement = taskSchedule.InsertBefore(t, e)
|
|
} else {
|
|
taskSchedule.MoveBefore(t.scheduleListElement, e)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
// add/move to end
|
|
if t.scheduleListElement == nil {
|
|
t.scheduleListElement = taskSchedule.PushBack(t)
|
|
} else {
|
|
taskSchedule.MoveToBack(t.scheduleListElement)
|
|
}
|
|
}
|
|
|
|
func waitUntilNextScheduledTask() <-chan time.Time {
|
|
scheduleLock.Lock()
|
|
defer scheduleLock.Unlock()
|
|
|
|
if taskSchedule.Len() > 0 {
|
|
return time.After(time.Until(taskSchedule.Front().Value.(*Task).executeAt)) //nolint:forcetypeassert // Can only be *Task.
|
|
}
|
|
return waitForever
|
|
}
|
|
|
|
var (
|
|
taskQueueHandlerStarted = abool.NewBool(false)
|
|
taskScheduleHandlerStarted = abool.NewBool(false)
|
|
)
|
|
|
|
func taskQueueHandler() {
|
|
// only ever start once
|
|
if !taskQueueHandlerStarted.SetToIf(false, true) {
|
|
return
|
|
}
|
|
|
|
for {
|
|
// wait
|
|
select {
|
|
case <-shutdownSignal:
|
|
return
|
|
case <-queueIsFilled:
|
|
}
|
|
|
|
// execute
|
|
execLoop:
|
|
for {
|
|
// wait for execution slot
|
|
queueWg.Wait()
|
|
|
|
// check for shutdown
|
|
if shutdownFlag.IsSet() {
|
|
return
|
|
}
|
|
|
|
// get next Task
|
|
queuesLock.Lock()
|
|
e := prioritizedTaskQueue.Front()
|
|
if e != nil {
|
|
prioritizedTaskQueue.Remove(e)
|
|
} else {
|
|
e = taskQueue.Front()
|
|
if e != nil {
|
|
taskQueue.Remove(e)
|
|
}
|
|
}
|
|
queuesLock.Unlock()
|
|
|
|
// lists are empty
|
|
if e == nil {
|
|
break execLoop
|
|
}
|
|
|
|
// value -> Task
|
|
t := e.Value.(*Task) //nolint:forcetypeassert // Can only be *Task.
|
|
// run
|
|
t.runWithLocking()
|
|
}
|
|
}
|
|
}
|
|
|
|
func taskScheduleHandler() {
|
|
// only ever start once
|
|
if !taskScheduleHandlerStarted.SetToIf(false, true) {
|
|
return
|
|
}
|
|
|
|
for {
|
|
|
|
if sleepMode.IsSet() {
|
|
select {
|
|
case <-shutdownSignal:
|
|
return
|
|
case <-notifyTaskScheduler:
|
|
continue
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-shutdownSignal:
|
|
return
|
|
case <-notifyTaskScheduler:
|
|
continue
|
|
case <-waitUntilNextScheduledTask():
|
|
scheduleLock.Lock()
|
|
|
|
// get first task in schedule
|
|
e := taskSchedule.Front()
|
|
if e == nil {
|
|
scheduleLock.Unlock()
|
|
continue
|
|
}
|
|
t := e.Value.(*Task) //nolint:forcetypeassert // Can only be *Task.
|
|
|
|
// process Task
|
|
if t.overtime {
|
|
// already queued and maxDelay reached
|
|
t.overtime = false
|
|
scheduleLock.Unlock()
|
|
|
|
t.runWithLocking()
|
|
} else {
|
|
// place in front of prioritized queue
|
|
t.overtime = true
|
|
scheduleLock.Unlock()
|
|
|
|
t.StartASAP()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func printTaskList(*list.List) { //nolint:unused,deadcode // for debugging, NOT production use
|
|
fmt.Println("Modules Task List:")
|
|
for e := taskSchedule.Front(); e != nil; e = e.Next() {
|
|
t, ok := e.Value.(*Task)
|
|
if ok {
|
|
fmt.Printf(
|
|
"%s:%s over=%v canc=%v exec=%v exat=%s rep=%s delay=%s\n",
|
|
t.module.Name,
|
|
t.name,
|
|
t.overtime,
|
|
t.canceled,
|
|
t.executing,
|
|
t.executeAt,
|
|
t.repeat,
|
|
t.maxDelay,
|
|
)
|
|
}
|
|
}
|
|
}
|