From adcc916a6a626984e79e144597172b0ee9b4570d Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 24 Apr 2020 09:52:42 +0200 Subject: [PATCH] Fix and improve task management --- modules/tasks.go | 106 +++++++++++++++++++++++------------------- modules/tasks_test.go | 81 ++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+), 47 deletions(-) diff --git a/modules/tasks.go b/modules/tasks.go index 3e3f192..b88ea5f 100644 --- a/modules/tasks.go +++ b/modules/tasks.go @@ -18,12 +18,12 @@ type Task struct { module *Module taskFn func(context.Context, *Task) error - queued bool canceled bool executing bool + overtime bool // locked by scheduleLock // these are populated at task creation - // ctx is canceled when task is shutdown -> all tasks become canceled + // ctx is canceled when module is shutdown -> all tasks become canceled ctx context.Context cancelCtx func() @@ -110,10 +110,9 @@ func (t *Task) prepForQueueing() (ok bool) { return false } - t.queued = true if t.maxDelay != 0 { t.executeAt = time.Now().Add(t.maxDelay) - t.addToSchedule() + t.addToSchedule(true) } return true @@ -129,11 +128,11 @@ func notifyQueue() { // Queue queues the Task for execution. func (t *Task) Queue() *Task { t.lock.Lock() + defer t.lock.Unlock() + if !t.prepForQueueing() { - t.lock.Unlock() return t } - t.lock.Unlock() if t.queueElement == nil { queuesLock.Lock() @@ -148,11 +147,11 @@ func (t *Task) Queue() *Task { // Prioritize puts the task in the prioritized queue. func (t *Task) Prioritize() *Task { t.lock.Lock() + defer t.lock.Unlock() + if !t.prepForQueueing() { - t.lock.Unlock() return t } - t.lock.Unlock() if t.prioritizedQueueElement == nil { queuesLock.Lock() @@ -167,11 +166,11 @@ func (t *Task) Prioritize() *Task { // StartASAP schedules the task to be executed next. func (t *Task) StartASAP() *Task { t.lock.Lock() + defer t.lock.Unlock() + if !t.prepForQueueing() { - t.lock.Unlock() return t } - t.lock.Unlock() queuesLock.Lock() if t.prioritizedQueueElement == nil { @@ -188,17 +187,19 @@ func (t *Task) StartASAP() *Task { // MaxDelay sets a maximum delay within the task should be executed from being queued. Scheduled tasks are queued when they are triggered. The default delay is 3 minutes. func (t *Task) MaxDelay(maxDelay time.Duration) *Task { t.lock.Lock() + defer t.lock.Unlock() + t.maxDelay = maxDelay - t.lock.Unlock() return t } // Schedule schedules the task for execution at the given time. func (t *Task) Schedule(executeAt time.Time) *Task { t.lock.Lock() + defer t.lock.Unlock() + t.executeAt = executeAt - t.addToSchedule() - t.lock.Unlock() + t.addToSchedule(false) return t } @@ -210,22 +211,23 @@ func (t *Task) Repeat(interval time.Duration) *Task { } t.lock.Lock() + defer t.lock.Unlock() + t.repeat = interval t.executeAt = time.Now().Add(t.repeat) - t.addToSchedule() - t.lock.Unlock() - + t.addToSchedule(false) return t } // Cancel cancels the current and any future execution of the Task. This is not reversible by any other functions. func (t *Task) Cancel() { t.lock.Lock() + defer t.lock.Unlock() + t.canceled = true if t.cancelCtx != nil { t.cancelCtx() } - t.lock.Unlock() } func (t *Task) removeFromQueues() { @@ -234,31 +236,29 @@ func (t *Task) removeFromQueues() { queuesLock.Lock() taskQueue.Remove(t.queueElement) queuesLock.Unlock() - t.lock.Lock() t.queueElement = nil - t.lock.Unlock() } if t.prioritizedQueueElement != nil { queuesLock.Lock() prioritizedTaskQueue.Remove(t.prioritizedQueueElement) queuesLock.Unlock() - t.lock.Lock() t.prioritizedQueueElement = nil - t.lock.Unlock() } if t.scheduleListElement != nil { scheduleLock.Lock() taskSchedule.Remove(t.scheduleListElement) + t.overtime = false scheduleLock.Unlock() - t.lock.Lock() t.scheduleListElement = nil - t.lock.Unlock() } } func (t *Task) runWithLocking() { t.lock.Lock() + // we will not attempt execution, remove from queues + t.removeFromQueues() + // check if task is already executing if t.executing { t.lock.Unlock() @@ -266,21 +266,22 @@ func (t *Task) runWithLocking() { } // check if task is active + // - has not been cancelled + // - module is online (soon) if !t.isActive() { - t.removeFromQueues() t.lock.Unlock() return } // check if module was stopped select { - case <-t.ctx.Done(): // check if module is stopped - t.removeFromQueues() + case <-t.ctx.Done(): t.lock.Unlock() return default: } + // enter executing state t.executing = true t.lock.Unlock() @@ -296,8 +297,9 @@ func (t *Task) runWithLocking() { // wait <-t.module.StartCompleted() } else { + // abort, module will not come online t.lock.Lock() - t.removeFromQueues() + t.executing = false t.lock.Unlock() return } @@ -336,22 +338,23 @@ func (t *Task) executeWithLocking() { atomic.AddInt32(t.module.taskCnt, -1) t.module.waitGroup.Done() - // reset t.lock.Lock() + // reset state t.executing = false - t.queued = false + // repeat? if t.isActive() && t.repeat != 0 { t.executeAt = time.Now().Add(t.repeat) - t.addToSchedule() + t.addToSchedule(false) } - t.lock.Unlock() // notify that we finished - if t.cancelCtx != nil { - t.cancelCtx() - } + t.cancelCtx() + // refresh context + t.ctx, t.cancelCtx = context.WithCancel(t.module.Ctx) + + t.lock.Unlock() }() // run @@ -361,13 +364,7 @@ func (t *Task) executeWithLocking() { } } -func (t *Task) getExecuteAtWithLocking() time.Time { - t.lock.Lock() - defer t.lock.Unlock() - return t.executeAt -} - -func (t *Task) addToSchedule() { +func (t *Task) addToSchedule(overtime bool) { if !t.isActive() { return } @@ -376,6 +373,11 @@ func (t *Task) addToSchedule() { defer scheduleLock.Unlock() // defer printTaskList(taskSchedule) // for debugging + if overtime { + // do not set to false + t.overtime = true + } + // notify scheduler defer func() { select { @@ -392,7 +394,7 @@ func (t *Task) addToSchedule() { continue } // compare - if t.executeAt.Before(eVal.getExecuteAtWithLocking()) { + if t.executeAt.Before(eVal.executeAt) { // insert/move task if t.scheduleListElement == nil { t.scheduleListElement = taskSchedule.InsertBefore(t, e) @@ -489,18 +491,28 @@ func taskScheduleHandler() { return case <-recalculateNextScheduledTask: case <-waitUntilNextScheduledTask(): - // get first task in schedule scheduleLock.Lock() + + // get first task in schedule e := taskSchedule.Front() - scheduleLock.Unlock() + if e == nil { + scheduleLock.Unlock() + continue + } t := e.Value.(*Task) // process Task - if t.queued { + if t.overtime { // already queued and maxDelay reached + t.overtime = false + scheduleLock.Unlock() + t.runWithLocking() } else { // place in front of prioritized queue + t.overtime = true + scheduleLock.Unlock() + t.StartASAP() } } @@ -513,10 +525,10 @@ func printTaskList(*list.List) { //nolint:unused,deadcode // for debugging, NOT t, ok := e.Value.(*Task) if ok { fmt.Printf( - "%s:%s qu=%v ca=%v exec=%v at=%s rep=%s delay=%s\n", + "%s:%s over=%v canc=%v exec=%v exat=%s rep=%s delay=%s\n", t.module.Name, t.name, - t.queued, + t.overtime, t.canceled, t.executing, t.executeAt, diff --git a/modules/tasks_test.go b/modules/tasks_test.go index a7ff01d..00d9bdc 100644 --- a/modules/tasks_test.go +++ b/modules/tasks_test.go @@ -168,3 +168,84 @@ func TestScheduledTaskWaiting(t *testing.T) { } } + +func TestRequeueingTask(t *testing.T) { + blockWg := &sync.WaitGroup{} + wg := &sync.WaitGroup{} + + // block task execution + blockWg.Add(1) // mark done at beginning + wg.Add(2) // mark done at end + block := qtModule.NewTask("TestRequeueingTask:block", func(ctx context.Context, t *Task) error { + blockWg.Done() + time.Sleep(100 * time.Millisecond) + wg.Done() + return nil + }).StartASAP() + // make sure first task has started + blockWg.Wait() + // fmt.Printf("%s: %+v\n", time.Now(), block) + + // schedule again while executing + blockWg.Add(1) // mark done at beginning + block.StartASAP() + // fmt.Printf("%s: %+v\n", time.Now(), block) + + // test task + wg.Add(1) + task := qtModule.NewTask("TestRequeueingTask:test", func(ctx context.Context, t *Task) error { + wg.Done() + return nil + }).Schedule(time.Now().Add(2 * time.Second)) + + // reschedule + task.Schedule(time.Now().Add(1 * time.Second)) + task.Queue() + task.Prioritize() + task.StartASAP() + wg.Wait() + time.Sleep(100 * time.Millisecond) // let tasks finalize execution + + // do it again + + // block task execution (while first block task is still running!) + blockWg.Add(1) // mark done at beginning + wg.Add(1) // mark done at end + block.StartASAP() + blockWg.Wait() + // reschedule + wg.Add(1) + task.Schedule(time.Now().Add(1 * time.Second)) + task.Queue() + task.Prioritize() + task.StartASAP() + wg.Wait() +} + +func TestQueueSuccession(t *testing.T) { + var cnt int + wg := &sync.WaitGroup{} + wg.Add(10) + + tt := qtModule.NewTask("TestRequeueingTask:test", func(ctx context.Context, task *Task) error { + time.Sleep(10 * time.Millisecond) + wg.Done() + cnt++ + fmt.Printf("completed succession %d\n", cnt) + switch cnt { + case 1, 4, 6: + task.Queue() + case 2, 5, 8: + task.StartASAP() + case 3, 7, 9: + task.Schedule(time.Now().Add(10 * time.Millisecond)) + } + return nil + }) + // fmt.Printf("%+v\n", tt) + tt.StartASAP() + // time.Sleep(100 * time.Millisecond) + // fmt.Printf("%+v\n", tt) + + wg.Wait() +}