Fix and improve task management

This commit is contained in:
Daniel 2020-04-24 09:52:42 +02:00
parent 00d9594ea5
commit adcc916a6a
2 changed files with 140 additions and 47 deletions

View file

@ -18,12 +18,12 @@ type Task struct {
module *Module
taskFn func(context.Context, *Task) error
queued bool
canceled bool
executing bool
overtime bool // locked by scheduleLock
// these are populated at task creation
// ctx is canceled when task is shutdown -> all tasks become canceled
// ctx is canceled when module is shutdown -> all tasks become canceled
ctx context.Context
cancelCtx func()
@ -110,10 +110,9 @@ func (t *Task) prepForQueueing() (ok bool) {
return false
}
t.queued = true
if t.maxDelay != 0 {
t.executeAt = time.Now().Add(t.maxDelay)
t.addToSchedule()
t.addToSchedule(true)
}
return true
@ -129,11 +128,11 @@ func notifyQueue() {
// Queue queues the Task for execution.
func (t *Task) Queue() *Task {
t.lock.Lock()
defer t.lock.Unlock()
if !t.prepForQueueing() {
t.lock.Unlock()
return t
}
t.lock.Unlock()
if t.queueElement == nil {
queuesLock.Lock()
@ -148,11 +147,11 @@ func (t *Task) Queue() *Task {
// Prioritize puts the task in the prioritized queue.
func (t *Task) Prioritize() *Task {
t.lock.Lock()
defer t.lock.Unlock()
if !t.prepForQueueing() {
t.lock.Unlock()
return t
}
t.lock.Unlock()
if t.prioritizedQueueElement == nil {
queuesLock.Lock()
@ -167,11 +166,11 @@ func (t *Task) Prioritize() *Task {
// StartASAP schedules the task to be executed next.
func (t *Task) StartASAP() *Task {
t.lock.Lock()
defer t.lock.Unlock()
if !t.prepForQueueing() {
t.lock.Unlock()
return t
}
t.lock.Unlock()
queuesLock.Lock()
if t.prioritizedQueueElement == nil {
@ -188,17 +187,19 @@ func (t *Task) StartASAP() *Task {
// MaxDelay sets a maximum delay within the task should be executed from being queued. Scheduled tasks are queued when they are triggered. The default delay is 3 minutes.
func (t *Task) MaxDelay(maxDelay time.Duration) *Task {
t.lock.Lock()
defer t.lock.Unlock()
t.maxDelay = maxDelay
t.lock.Unlock()
return t
}
// Schedule schedules the task for execution at the given time.
func (t *Task) Schedule(executeAt time.Time) *Task {
t.lock.Lock()
defer t.lock.Unlock()
t.executeAt = executeAt
t.addToSchedule()
t.lock.Unlock()
t.addToSchedule(false)
return t
}
@ -210,22 +211,23 @@ func (t *Task) Repeat(interval time.Duration) *Task {
}
t.lock.Lock()
defer t.lock.Unlock()
t.repeat = interval
t.executeAt = time.Now().Add(t.repeat)
t.addToSchedule()
t.lock.Unlock()
t.addToSchedule(false)
return t
}
// Cancel cancels the current and any future execution of the Task. This is not reversible by any other functions.
func (t *Task) Cancel() {
t.lock.Lock()
defer t.lock.Unlock()
t.canceled = true
if t.cancelCtx != nil {
t.cancelCtx()
}
t.lock.Unlock()
}
func (t *Task) removeFromQueues() {
@ -234,31 +236,29 @@ func (t *Task) removeFromQueues() {
queuesLock.Lock()
taskQueue.Remove(t.queueElement)
queuesLock.Unlock()
t.lock.Lock()
t.queueElement = nil
t.lock.Unlock()
}
if t.prioritizedQueueElement != nil {
queuesLock.Lock()
prioritizedTaskQueue.Remove(t.prioritizedQueueElement)
queuesLock.Unlock()
t.lock.Lock()
t.prioritizedQueueElement = nil
t.lock.Unlock()
}
if t.scheduleListElement != nil {
scheduleLock.Lock()
taskSchedule.Remove(t.scheduleListElement)
t.overtime = false
scheduleLock.Unlock()
t.lock.Lock()
t.scheduleListElement = nil
t.lock.Unlock()
}
}
func (t *Task) runWithLocking() {
t.lock.Lock()
// we will not attempt execution, remove from queues
t.removeFromQueues()
// check if task is already executing
if t.executing {
t.lock.Unlock()
@ -266,21 +266,22 @@ func (t *Task) runWithLocking() {
}
// check if task is active
// - has not been cancelled
// - module is online (soon)
if !t.isActive() {
t.removeFromQueues()
t.lock.Unlock()
return
}
// check if module was stopped
select {
case <-t.ctx.Done(): // check if module is stopped
t.removeFromQueues()
case <-t.ctx.Done():
t.lock.Unlock()
return
default:
}
// enter executing state
t.executing = true
t.lock.Unlock()
@ -296,8 +297,9 @@ func (t *Task) runWithLocking() {
// wait
<-t.module.StartCompleted()
} else {
// abort, module will not come online
t.lock.Lock()
t.removeFromQueues()
t.executing = false
t.lock.Unlock()
return
}
@ -336,22 +338,23 @@ func (t *Task) executeWithLocking() {
atomic.AddInt32(t.module.taskCnt, -1)
t.module.waitGroup.Done()
// reset
t.lock.Lock()
// reset state
t.executing = false
t.queued = false
// repeat?
if t.isActive() && t.repeat != 0 {
t.executeAt = time.Now().Add(t.repeat)
t.addToSchedule()
t.addToSchedule(false)
}
t.lock.Unlock()
// notify that we finished
if t.cancelCtx != nil {
t.cancelCtx()
}
t.cancelCtx()
// refresh context
t.ctx, t.cancelCtx = context.WithCancel(t.module.Ctx)
t.lock.Unlock()
}()
// run
@ -361,13 +364,7 @@ func (t *Task) executeWithLocking() {
}
}
func (t *Task) getExecuteAtWithLocking() time.Time {
t.lock.Lock()
defer t.lock.Unlock()
return t.executeAt
}
func (t *Task) addToSchedule() {
func (t *Task) addToSchedule(overtime bool) {
if !t.isActive() {
return
}
@ -376,6 +373,11 @@ func (t *Task) addToSchedule() {
defer scheduleLock.Unlock()
// defer printTaskList(taskSchedule) // for debugging
if overtime {
// do not set to false
t.overtime = true
}
// notify scheduler
defer func() {
select {
@ -392,7 +394,7 @@ func (t *Task) addToSchedule() {
continue
}
// compare
if t.executeAt.Before(eVal.getExecuteAtWithLocking()) {
if t.executeAt.Before(eVal.executeAt) {
// insert/move task
if t.scheduleListElement == nil {
t.scheduleListElement = taskSchedule.InsertBefore(t, e)
@ -489,18 +491,28 @@ func taskScheduleHandler() {
return
case <-recalculateNextScheduledTask:
case <-waitUntilNextScheduledTask():
// get first task in schedule
scheduleLock.Lock()
// get first task in schedule
e := taskSchedule.Front()
scheduleLock.Unlock()
if e == nil {
scheduleLock.Unlock()
continue
}
t := e.Value.(*Task)
// process Task
if t.queued {
if t.overtime {
// already queued and maxDelay reached
t.overtime = false
scheduleLock.Unlock()
t.runWithLocking()
} else {
// place in front of prioritized queue
t.overtime = true
scheduleLock.Unlock()
t.StartASAP()
}
}
@ -513,10 +525,10 @@ func printTaskList(*list.List) { //nolint:unused,deadcode // for debugging, NOT
t, ok := e.Value.(*Task)
if ok {
fmt.Printf(
"%s:%s qu=%v ca=%v exec=%v at=%s rep=%s delay=%s\n",
"%s:%s over=%v canc=%v exec=%v exat=%s rep=%s delay=%s\n",
t.module.Name,
t.name,
t.queued,
t.overtime,
t.canceled,
t.executing,
t.executeAt,

View file

@ -168,3 +168,84 @@ func TestScheduledTaskWaiting(t *testing.T) {
}
}
func TestRequeueingTask(t *testing.T) {
blockWg := &sync.WaitGroup{}
wg := &sync.WaitGroup{}
// block task execution
blockWg.Add(1) // mark done at beginning
wg.Add(2) // mark done at end
block := qtModule.NewTask("TestRequeueingTask:block", func(ctx context.Context, t *Task) error {
blockWg.Done()
time.Sleep(100 * time.Millisecond)
wg.Done()
return nil
}).StartASAP()
// make sure first task has started
blockWg.Wait()
// fmt.Printf("%s: %+v\n", time.Now(), block)
// schedule again while executing
blockWg.Add(1) // mark done at beginning
block.StartASAP()
// fmt.Printf("%s: %+v\n", time.Now(), block)
// test task
wg.Add(1)
task := qtModule.NewTask("TestRequeueingTask:test", func(ctx context.Context, t *Task) error {
wg.Done()
return nil
}).Schedule(time.Now().Add(2 * time.Second))
// reschedule
task.Schedule(time.Now().Add(1 * time.Second))
task.Queue()
task.Prioritize()
task.StartASAP()
wg.Wait()
time.Sleep(100 * time.Millisecond) // let tasks finalize execution
// do it again
// block task execution (while first block task is still running!)
blockWg.Add(1) // mark done at beginning
wg.Add(1) // mark done at end
block.StartASAP()
blockWg.Wait()
// reschedule
wg.Add(1)
task.Schedule(time.Now().Add(1 * time.Second))
task.Queue()
task.Prioritize()
task.StartASAP()
wg.Wait()
}
func TestQueueSuccession(t *testing.T) {
var cnt int
wg := &sync.WaitGroup{}
wg.Add(10)
tt := qtModule.NewTask("TestRequeueingTask:test", func(ctx context.Context, task *Task) error {
time.Sleep(10 * time.Millisecond)
wg.Done()
cnt++
fmt.Printf("completed succession %d\n", cnt)
switch cnt {
case 1, 4, 6:
task.Queue()
case 2, 5, 8:
task.StartASAP()
case 3, 7, 9:
task.Schedule(time.Now().Add(10 * time.Millisecond))
}
return nil
})
// fmt.Printf("%+v\n", tt)
tt.StartASAP()
// time.Sleep(100 * time.Millisecond)
// fmt.Printf("%+v\n", tt)
wg.Wait()
}