diff --git a/taskmanager/microtasks.go b/modules/microtasks.go similarity index 94% rename from taskmanager/microtasks.go rename to modules/microtasks.go index ad758ff..50976be 100644 --- a/taskmanager/microtasks.go +++ b/modules/microtasks.go @@ -1,4 +1,4 @@ -package taskmanager +package modules import ( "sync/atomic" @@ -24,9 +24,6 @@ var ( tasksDoneFlag *abool.AtomicBool tasksWaiting chan bool tasksWaitingFlag *abool.AtomicBool - - shutdownSignal = make(chan struct{}, 0) - suttingDown = abool.NewBool(false) ) // StartMicroTask starts a new MicroTask. It will start immediately. @@ -51,7 +48,7 @@ func newTaskIsWaiting() { // StartMediumPriorityMicroTask starts a new MicroTask (waiting its turn) if channel receives. func StartMediumPriorityMicroTask() chan bool { - if suttingDown.IsSet() { + if shutdownSignalClosed.IsSet() { return closedChannel } if tasksWaitingFlag.SetToIf(false, true) { @@ -62,7 +59,7 @@ func StartMediumPriorityMicroTask() chan bool { // StartLowPriorityMicroTask starts a new MicroTask (waiting its turn) if channel receives. func StartLowPriorityMicroTask() chan bool { - if suttingDown.IsSet() { + if shutdownSignalClosed.IsSet() { return closedChannel } if tasksWaitingFlag.SetToIf(false, true) { @@ -73,7 +70,7 @@ func StartLowPriorityMicroTask() chan bool { // StartVeryLowPriorityMicroTask starts a new MicroTask (waiting its turn) if channel receives. func StartVeryLowPriorityMicroTask() chan bool { - if suttingDown.IsSet() { + if shutdownSignalClosed.IsSet() { return closedChannel } if tasksWaitingFlag.SetToIf(false, true) { @@ -96,7 +93,7 @@ func init() { closedChannel = make(chan bool, 0) close(closedChannel) - var t int32 = 0 + var t int32 tasks = &t mediumPriorityClearance = make(chan bool, 0) @@ -116,7 +113,7 @@ func init() { for { // wait for an event to start new tasks - if !suttingDown.IsSet() { + if !shutdownSignalClosed.IsSet() { // reset timer // https://golang.org/pkg/time/#Timer.Reset diff --git a/taskmanager/microtasks_test.go b/modules/microtasks_test.go similarity index 99% rename from taskmanager/microtasks_test.go rename to modules/microtasks_test.go index eb2fd74..5406ca8 100644 --- a/taskmanager/microtasks_test.go +++ b/modules/microtasks_test.go @@ -1,4 +1,4 @@ -package taskmanager +package modules import ( "strings" diff --git a/modules/start.go b/modules/start.go index 8ac5acf..b9ea72e 100644 --- a/modules/start.go +++ b/modules/start.go @@ -79,6 +79,9 @@ func Start() error { close(startCompleteSignal) } + go taskQueueHandler() + go taskScheduleHandler() + return nil } diff --git a/modules/tasks.go b/modules/tasks.go new file mode 100644 index 0000000..3fafd76 --- /dev/null +++ b/modules/tasks.go @@ -0,0 +1,425 @@ +package modules + +import ( + "container/list" + "context" + "sync" + "time" + + "github.com/tevino/abool" + + "github.com/safing/portbase/log" +) + +// Task is managed task bound to a module. +type Task struct { + name string + module *Module + taskFn TaskFn + + queued bool + canceled bool + executing bool + cancelFunc func() + + executeAt time.Time + repeat time.Duration + maxDelay time.Duration + + queueElement *list.Element + prioritizedQueueElement *list.Element + scheduleListElement *list.Element + + lock sync.Mutex +} + +// TaskFn is the function signature for creating Tasks. +type TaskFn func(ctx context.Context, task *Task) + +var ( + taskQueue = list.New() + prioritizedTaskQueue = list.New() + queuesLock sync.Mutex + queueWg sync.WaitGroup + + taskSchedule = list.New() + scheduleLock sync.Mutex + + waitForever chan time.Time + + queueIsFilled = make(chan struct{}, 1) // kick off queue handler + recalculateNextScheduledTask = make(chan struct{}, 1) +) + +const ( + maxExecutionWait = 1 * time.Minute + defaultMaxDelay = 5 * time.Minute + minRepeatDuration = 1 * time.Second +) + +// NewTask creates a new task with a descriptive name (non-unique), a optional deadline, and the task function to be executed. You must call one of Queue, Prioritize, StartASAP, Schedule or Repeat in order to have the Task executed. +func (m *Module) NewTask(name string, taskFn TaskFn) *Task { + return &Task{ + name: name, + module: m, + taskFn: taskFn, + maxDelay: defaultMaxDelay, + } +} + +func (t *Task) isActive() bool { + return !t.canceled && !t.module.ShutdownInProgress() +} + +func (t *Task) prepForQueueing() (ok bool) { + if !t.isActive() { + return false + } + + t.queued = true + if t.maxDelay != 0 { + t.executeAt = time.Now().Add(t.maxDelay) + t.addToSchedule() + } + + return true +} + +func notifyQueue() { + select { + case queueIsFilled <- struct{}{}: + default: + } +} + +// Queue queues the Task for execution. +func (t *Task) Queue() *Task { + t.lock.Lock() + if !t.prepForQueueing() { + t.lock.Unlock() + return t + } + t.lock.Unlock() + + if t.queueElement == nil { + queuesLock.Lock() + t.queueElement = taskQueue.PushBack(t) + queuesLock.Unlock() + } + + notifyQueue() + return t +} + +// Prioritize puts the task in the prioritized queue. +func (t *Task) Prioritize() *Task { + t.lock.Lock() + if !t.prepForQueueing() { + t.lock.Unlock() + return t + } + t.lock.Unlock() + + if t.prioritizedQueueElement == nil { + queuesLock.Lock() + t.prioritizedQueueElement = prioritizedTaskQueue.PushBack(t) + queuesLock.Unlock() + } + + notifyQueue() + return t +} + +// StartASAP schedules the task to be executed next. +func (t *Task) StartASAP() *Task { + t.lock.Lock() + if !t.prepForQueueing() { + t.lock.Unlock() + return t + } + t.lock.Unlock() + + queuesLock.Lock() + if t.prioritizedQueueElement == nil { + t.prioritizedQueueElement = prioritizedTaskQueue.PushFront(t) + } else { + prioritizedTaskQueue.MoveToFront(t.prioritizedQueueElement) + } + queuesLock.Unlock() + + notifyQueue() + return t +} + +// MaxDelay sets a maximum delay within the task should be executed from being queued. Scheduled tasks are queued when they are triggered. The default delay is 3 minutes. +func (t *Task) MaxDelay(maxDelay time.Duration) *Task { + t.lock.Lock() + t.maxDelay = maxDelay + t.lock.Unlock() + return t +} + +// Schedule schedules the task for execution at the given time. +func (t *Task) Schedule(executeAt time.Time) *Task { + t.lock.Lock() + t.executeAt = executeAt + t.addToSchedule() + t.lock.Unlock() + return t +} + +// Repeat sets the task to be executed in endless repeat at the specified interval. First execution will be after interval. Minimum repeat interval is one second. +func (t *Task) Repeat(interval time.Duration) *Task { + // check minimum interval duration + if interval < minRepeatDuration { + interval = minRepeatDuration + } + + t.lock.Lock() + t.repeat = interval + t.executeAt = time.Now().Add(t.repeat) + t.lock.Unlock() + + return t +} + +// Cancel cancels the current and any future execution of the Task. This is not reversible by any other functions. +func (t *Task) Cancel() { + t.lock.Lock() + t.canceled = true + if t.cancelFunc != nil { + t.cancelFunc() + } + t.lock.Unlock() +} + +func (t *Task) runWithLocking() { + t.lock.Lock() + + // check state, return if already executing or inactive + if t.executing || !t.isActive() { + t.lock.Unlock() + return + } + t.executing = true + + // get list elements + queueElement := t.queueElement + prioritizedQueueElement := t.prioritizedQueueElement + scheduleListElement := t.scheduleListElement + + // create context + var taskCtx context.Context + taskCtx, t.cancelFunc = context.WithCancel(t.module.Ctx) + + t.lock.Unlock() + + // remove from lists + if queueElement != nil { + queuesLock.Lock() + taskQueue.Remove(t.queueElement) + queuesLock.Unlock() + t.lock.Lock() + t.queueElement = nil + t.lock.Unlock() + } + if prioritizedQueueElement != nil { + queuesLock.Lock() + prioritizedTaskQueue.Remove(t.prioritizedQueueElement) + queuesLock.Unlock() + t.lock.Lock() + t.prioritizedQueueElement = nil + t.lock.Unlock() + } + if scheduleListElement != nil { + scheduleLock.Lock() + taskSchedule.Remove(t.scheduleListElement) + scheduleLock.Unlock() + t.lock.Lock() + t.scheduleListElement = nil + t.lock.Unlock() + } + + // add to module workers + t.module.AddWorkers(1) + // add to queue workgroup + queueWg.Add(1) + + go t.executeWithLocking(taskCtx, t.cancelFunc) + go func() { + select { + case <-taskCtx.Done(): + case <-time.After(maxExecutionWait): + } + // complete queue worker (early) to allow next worker + queueWg.Done() + }() +} + +func (t *Task) executeWithLocking(ctx context.Context, cancelFunc func()) { + defer func() { + // log result if error + panicVal := recover() + if panicVal != nil { + log.Errorf("%s: task %s panicked: %s", t.module.Name, t.name, panicVal) + } + + // mark task as completed + t.module.FinishWorker() + + // reset + t.lock.Lock() + // reset state + t.executing = false + t.queued = false + // repeat? + if t.isActive() && t.repeat != 0 { + t.executeAt = time.Now().Add(t.repeat) + t.addToSchedule() + } + t.lock.Unlock() + + // notify that we finished + cancelFunc() + }() + t.taskFn(ctx, t) +} + +func (t *Task) getExecuteAtWithLocking() time.Time { + t.lock.Lock() + defer t.lock.Unlock() + return t.executeAt +} + +func (t *Task) addToSchedule() { + scheduleLock.Lock() + defer scheduleLock.Unlock() + + // notify scheduler + defer func() { + select { + case recalculateNextScheduledTask <- struct{}{}: + default: + } + }() + + // insert task into schedule + for e := taskSchedule.Front(); e != nil; e = e.Next() { + // check for self + eVal := e.Value.(*Task) + if eVal == t { + continue + } + // compare + if t.executeAt.Before(eVal.getExecuteAtWithLocking()) { + // insert/move task + if t.scheduleListElement == nil { + t.scheduleListElement = taskSchedule.InsertBefore(t, e) + } else { + taskSchedule.MoveBefore(t.scheduleListElement, e) + } + return + } + } + + // add/move to end + if t.scheduleListElement == nil { + t.scheduleListElement = taskSchedule.PushBack(t) + } else { + taskSchedule.MoveToBack(t.scheduleListElement) + } +} + +func waitUntilNextScheduledTask() <-chan time.Time { + scheduleLock.Lock() + defer scheduleLock.Unlock() + + if taskSchedule.Len() > 0 { + return time.After(taskSchedule.Front().Value.(*Task).executeAt.Sub(time.Now())) + } + return waitForever +} + +var ( + taskQueueHandlerStarted = abool.NewBool(false) + taskScheduleHandlerStarted = abool.NewBool(false) +) + +func taskQueueHandler() { + if !taskQueueHandlerStarted.SetToIf(false, true) { + return + } + + for { + // wait + select { + case <-shutdownSignal: + return + case <-queueIsFilled: + } + + // execute + execLoop: + for { + // wait for execution slot + queueWg.Wait() + + // check for shutdown + if shutdownSignalClosed.IsSet() { + return + } + + // get next Task + queuesLock.Lock() + e := prioritizedTaskQueue.Front() + if e != nil { + prioritizedTaskQueue.Remove(e) + } else { + e = taskQueue.Front() + if e != nil { + taskQueue.Remove(e) + } + } + queuesLock.Unlock() + + // lists are empty + if e == nil { + break execLoop + } + + // value -> Task + t := e.Value.(*Task) + // run + t.runWithLocking() + } + } +} + +func taskScheduleHandler() { + if !taskScheduleHandlerStarted.SetToIf(false, true) { + return + } + + for { + select { + case <-shutdownSignal: + return + case <-recalculateNextScheduledTask: + case <-waitUntilNextScheduledTask(): + // get first task in schedule + scheduleLock.Lock() + e := taskSchedule.Front() + scheduleLock.Unlock() + t := e.Value.(*Task) + + // process Task + if t.queued { + // already queued and maxDelay reached + t.runWithLocking() + } else { + // place in front of prioritized queue + t.StartASAP() + } + } + } +} diff --git a/modules/tasks_test.go b/modules/tasks_test.go new file mode 100644 index 0000000..f035bd8 --- /dev/null +++ b/modules/tasks_test.go @@ -0,0 +1,168 @@ +package modules + +import ( + "context" + "fmt" + "os" + "runtime/pprof" + "sync" + "testing" + "time" + + "github.com/tevino/abool" +) + +func init() { + go taskQueueHandler() + go taskScheduleHandler() + + go func() { + <-time.After(10 * time.Second) + fmt.Fprintln(os.Stderr, "taking too long") + pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) + os.Exit(1) + }() +} + +// test waiting + +// globals +var qtWg sync.WaitGroup +var qtOutputChannel chan string +var qtSleepDuration time.Duration +var qtWorkerCnt int32 +var qtModule = &Module{ + Name: "task test module", + Prepped: abool.NewBool(false), + Started: abool.NewBool(false), + Stopped: abool.NewBool(false), + inTransition: abool.NewBool(false), + Ctx: context.Background(), + shutdownFlag: abool.NewBool(false), + workerGroup: sync.WaitGroup{}, + workerCnt: &qtWorkerCnt, +} + +// functions +func queuedTaskTester(s string) { + qtModule.NewTask(s, func(ctx context.Context, t *Task) { + time.Sleep(qtSleepDuration * 2) + qtOutputChannel <- s + qtWg.Done() + }).Queue() +} + +func prioritizedTaskTester(s string) { + qtModule.NewTask(s, func(ctx context.Context, t *Task) { + time.Sleep(qtSleepDuration * 2) + qtOutputChannel <- s + qtWg.Done() + }).Prioritize() +} + +// test +func TestQueuedTask(t *testing.T) { + // skip + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + // init + expectedOutput := "0123456789" + qtSleepDuration = 20 * time.Millisecond + qtOutputChannel = make(chan string, 100) + qtWg.Add(10) + + // TEST + queuedTaskTester("0") + queuedTaskTester("1") + queuedTaskTester("3") + queuedTaskTester("4") + queuedTaskTester("6") + queuedTaskTester("7") + queuedTaskTester("9") + + time.Sleep(qtSleepDuration * 3) + prioritizedTaskTester("2") + time.Sleep(qtSleepDuration * 6) + prioritizedTaskTester("5") + time.Sleep(qtSleepDuration * 6) + prioritizedTaskTester("8") + + // wait for test to finish + qtWg.Wait() + + // collect output + close(qtOutputChannel) + completeOutput := "" + for s := <-qtOutputChannel; s != ""; s = <-qtOutputChannel { + completeOutput += s + } + // check if test succeeded + if completeOutput != expectedOutput { + t.Errorf("QueuedTask test failed, expected sequence %s, got %s", expectedOutput, completeOutput) + } + +} + +// test scheduled tasks + +// globals +var stWg sync.WaitGroup +var stOutputChannel chan string +var stSleepDuration time.Duration +var stWaitCh chan bool + +// functions +func scheduledTaskTester(s string, sched time.Time) { + qtModule.NewTask(s, func(ctx context.Context, t *Task) { + time.Sleep(stSleepDuration) + stOutputChannel <- s + stWg.Done() + }).Schedule(sched) +} + +// test +func TestScheduledTaskWaiting(t *testing.T) { + + // skip + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + // init + expectedOutput := "0123456789" + stSleepDuration = 10 * time.Millisecond + stOutputChannel = make(chan string, 100) + stWaitCh = make(chan bool, 0) + + stWg.Add(10) + + // TEST + scheduledTaskTester("4", time.Now().Add(stSleepDuration*8)) + scheduledTaskTester("0", time.Now().Add(stSleepDuration*0)) + scheduledTaskTester("8", time.Now().Add(stSleepDuration*16)) + scheduledTaskTester("1", time.Now().Add(stSleepDuration*2)) + scheduledTaskTester("7", time.Now().Add(stSleepDuration*14)) + scheduledTaskTester("9", time.Now().Add(stSleepDuration*18)) + scheduledTaskTester("3", time.Now().Add(stSleepDuration*6)) + scheduledTaskTester("2", time.Now().Add(stSleepDuration*4)) + scheduledTaskTester("6", time.Now().Add(stSleepDuration*12)) + scheduledTaskTester("5", time.Now().Add(stSleepDuration*10)) + + // wait for test to finish + close(stWaitCh) + stWg.Wait() + + // collect output + close(stOutputChannel) + completeOutput := "" + for s := <-stOutputChannel; s != ""; s = <-stOutputChannel { + completeOutput += s + } + // check if test succeeded + if completeOutput != expectedOutput { + t.Errorf("ScheduledTask test failed, expected sequence %s, got %s", expectedOutput, completeOutput) + } + +} diff --git a/taskmanager/queuedtasks.go b/taskmanager/queuedtasks.go deleted file mode 100644 index 7925701..0000000 --- a/taskmanager/queuedtasks.go +++ /dev/null @@ -1,152 +0,0 @@ -package taskmanager - -import ( - "container/list" - "time" - - "github.com/tevino/abool" -) - -type Task struct { - name string - start chan bool - started *abool.AtomicBool - schedule *time.Time -} - -var taskQueue *list.List -var prioritizedTaskQueue *list.List -var addToQueue chan *Task -var addToPrioritizedQueue chan *Task -var addAsNextTask chan *Task - -var finishedQueuedTask chan bool -var queuedTaskRunning *abool.AtomicBool - -var getQueueLengthREQ chan bool -var getQueueLengthREP chan int - -func newUnqeuedTask(name string) *Task { - t := &Task{ - name, - make(chan bool), - abool.NewBool(false), - nil, - } - return t -} - -func NewQueuedTask(name string) *Task { - t := newUnqeuedTask(name) - addToQueue <- t - return t -} - -func NewPrioritizedQueuedTask(name string) *Task { - t := newUnqeuedTask(name) - addToPrioritizedQueue <- t - return t -} - -func (t *Task) addToPrioritizedQueue() { - addToPrioritizedQueue <- t -} - -func (t *Task) WaitForStart() chan bool { - return t.start -} - -func (t *Task) StartAnyway() { - addAsNextTask <- t -} - -func (t *Task) Done() { - if !t.started.SetToIf(false, true) { - finishedQueuedTask <- true - } -} - -func TotalQueuedTasks() int { - getQueueLengthREQ <- true - return <-getQueueLengthREP -} - -func checkQueueStatus() { - if queuedTaskRunning.SetToIf(false, true) { - finishedQueuedTask <- true - } -} - -func fireNextTask() { - - if prioritizedTaskQueue.Len() > 0 { - for e := prioritizedTaskQueue.Front(); prioritizedTaskQueue.Len() > 0; e.Next() { - t := e.Value.(*Task) - prioritizedTaskQueue.Remove(e) - if t.started.SetToIf(false, true) { - close(t.start) - return - } - } - } - - if taskQueue.Len() > 0 { - for e := taskQueue.Front(); taskQueue.Len() > 0; e.Next() { - t := e.Value.(*Task) - taskQueue.Remove(e) - if t.started.SetToIf(false, true) { - close(t.start) - return - } - } - } - - queuedTaskRunning.UnSet() - -} - -func init() { - - taskQueue = list.New() - prioritizedTaskQueue = list.New() - addToQueue = make(chan *Task, 1) - addToPrioritizedQueue = make(chan *Task, 1) - addAsNextTask = make(chan *Task, 1) - - finishedQueuedTask = make(chan bool, 1) - queuedTaskRunning = abool.NewBool(false) - - getQueueLengthREQ = make(chan bool, 1) - getQueueLengthREP = make(chan int, 1) - - go func() { - - for { - select { - case <-shutdownSignal: - // TODO: work off queue? - return - case <-getQueueLengthREQ: - // TODO: maybe clean queues before replying - if queuedTaskRunning.IsSet() { - getQueueLengthREP <- prioritizedTaskQueue.Len() + taskQueue.Len() + 1 - } else { - getQueueLengthREP <- prioritizedTaskQueue.Len() + taskQueue.Len() - } - case t := <-addToQueue: - taskQueue.PushBack(t) - checkQueueStatus() - case t := <-addToPrioritizedQueue: - prioritizedTaskQueue.PushBack(t) - checkQueueStatus() - case t := <-addAsNextTask: - prioritizedTaskQueue.PushFront(t) - checkQueueStatus() - case <-finishedQueuedTask: - fireNextTask() - } - } - - }() - -} diff --git a/taskmanager/queuedtasks_test.go b/taskmanager/queuedtasks_test.go deleted file mode 100644 index 0ed33ae..0000000 --- a/taskmanager/queuedtasks_test.go +++ /dev/null @@ -1,110 +0,0 @@ -package taskmanager - -import ( - "sync" - "testing" - "time" -) - -// test waiting - -// globals -var qtWg sync.WaitGroup -var qtOutputChannel chan string -var qtSleepDuration time.Duration - -// functions -func queuedTaskTester(s string) { - t := NewQueuedTask(s) - go func() { - <-t.WaitForStart() - time.Sleep(qtSleepDuration * 2) - qtOutputChannel <- s - t.Done() - qtWg.Done() - }() -} - -func prioritizedTastTester(s string) { - t := NewPrioritizedQueuedTask(s) - go func() { - <-t.WaitForStart() - time.Sleep(qtSleepDuration * 2) - qtOutputChannel <- s - t.Done() - qtWg.Done() - }() -} - -// test -func TestQueuedTask(t *testing.T) { - - // skip - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - // init - expectedOutput := "0123456789" - qtSleepDuration = 10 * time.Millisecond - qtOutputChannel = make(chan string, 100) - qtWg.Add(10) - - // test queue length - c := TotalQueuedTasks() - if c != 0 { - t.Errorf("Error in calculating Task Queue, expected 0, got %d", c) - } - - // TEST - queuedTaskTester("0") - queuedTaskTester("1") - queuedTaskTester("3") - queuedTaskTester("4") - queuedTaskTester("6") - queuedTaskTester("7") - queuedTaskTester("9") - - // test queue length - c = TotalQueuedTasks() - if c != 7 { - t.Errorf("Error in calculating Task Queue, expected 7, got %d", c) - } - - time.Sleep(qtSleepDuration * 3) - prioritizedTastTester("2") - time.Sleep(qtSleepDuration * 6) - prioritizedTastTester("5") - time.Sleep(qtSleepDuration * 6) - prioritizedTastTester("8") - - // test queue length - c = TotalQueuedTasks() - if c != 3 { - t.Errorf("Error in calculating Task Queue, expected 3, got %d", c) - } - - // time.Sleep(qtSleepDuration * 100) - // panic("") - - // wait for test to finish - qtWg.Wait() - - // test queue length - c = TotalQueuedTasks() - if c != 0 { - t.Errorf("Error in calculating Task Queue, expected 0, got %d", c) - } - - // collect output - close(qtOutputChannel) - completeOutput := "" - for s := <-qtOutputChannel; s != ""; s = <-qtOutputChannel { - completeOutput += s - } - // check if test succeeded - if completeOutput != expectedOutput { - t.Errorf("QueuedTask test failed, expected sequence %s, got %s", expectedOutput, completeOutput) - } - -} diff --git a/taskmanager/scheduledtasks.go b/taskmanager/scheduledtasks.go deleted file mode 100644 index 32c4780..0000000 --- a/taskmanager/scheduledtasks.go +++ /dev/null @@ -1,73 +0,0 @@ -package taskmanager - -import ( - "container/list" - "time" -) - -var taskSchedule *list.List -var addToSchedule chan *Task -var waitForever chan time.Time - -var getScheduleLengthREQ chan bool -var getScheduleLengthREP chan int - -func NewScheduledTask(name string, schedule time.Time) *Task { - t := newUnqeuedTask(name) - t.schedule = &schedule - addToSchedule <- t - return t -} - -func TotalScheduledTasks() int { - getScheduleLengthREQ <- true - return <-getScheduleLengthREP -} - -func (t *Task) addToSchedule() { - for e := taskSchedule.Back(); e != nil; e = e.Prev() { - if t.schedule.After(*e.Value.(*Task).schedule) { - taskSchedule.InsertAfter(t, e) - return - } - } - taskSchedule.PushFront(t) -} - -func waitUntilNextScheduledTask() <-chan time.Time { - if taskSchedule.Len() > 0 { - return time.After(taskSchedule.Front().Value.(*Task).schedule.Sub(time.Now())) - } - return waitForever -} - -func init() { - - taskSchedule = list.New() - addToSchedule = make(chan *Task, 1) - waitForever = make(chan time.Time, 1) - - getScheduleLengthREQ = make(chan bool, 1) - getScheduleLengthREP = make(chan int, 1) - - go func() { - - for { - select { - case <-shutdownSignal: - return - case <-getScheduleLengthREQ: - // TODO: maybe clean queues before replying - getScheduleLengthREP <- prioritizedTaskQueue.Len() + taskSchedule.Len() - case t := <-addToSchedule: - t.addToSchedule() - case <-waitUntilNextScheduledTask(): - e := taskSchedule.Front() - t := e.Value.(*Task) - t.addToPrioritizedQueue() - taskSchedule.Remove(e) - } - } - }() - -} diff --git a/taskmanager/scheduledtasks_test.go b/taskmanager/scheduledtasks_test.go deleted file mode 100644 index 18467d1..0000000 --- a/taskmanager/scheduledtasks_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package taskmanager - -import ( - "sync" - "testing" - "time" -) - -// test waiting - -// globals -var stWg sync.WaitGroup -var stOutputChannel chan string -var stSleepDuration time.Duration -var stWaitCh chan bool - -// functions -func scheduledTaskTester(s string, sched time.Time) { - t := NewScheduledTask(s, sched) - go func() { - <-stWaitCh - <-t.WaitForStart() - time.Sleep(stSleepDuration) - stOutputChannel <- s - t.Done() - stWg.Done() - }() -} - -// test -func TestScheduledTaskWaiting(t *testing.T) { - - // skip - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - // init - expectedOutput := "0123456789" - stSleepDuration = 10 * time.Millisecond - stOutputChannel = make(chan string, 100) - stWaitCh = make(chan bool, 0) - - // test queue length - c := TotalScheduledTasks() - if c != 0 { - t.Errorf("Error in calculating Task Queue, expected 0, got %d", c) - } - - stWg.Add(10) - - // TEST - scheduledTaskTester("4", time.Now().Add(stSleepDuration*4)) - scheduledTaskTester("0", time.Now().Add(stSleepDuration*1)) - scheduledTaskTester("8", time.Now().Add(stSleepDuration*8)) - scheduledTaskTester("1", time.Now().Add(stSleepDuration*2)) - scheduledTaskTester("7", time.Now().Add(stSleepDuration*7)) - - // test queue length - time.Sleep(1 * time.Millisecond) - c = TotalScheduledTasks() - if c != 5 { - t.Errorf("Error in calculating Task Queue, expected 5, got %d", c) - } - - scheduledTaskTester("9", time.Now().Add(stSleepDuration*9)) - scheduledTaskTester("3", time.Now().Add(stSleepDuration*3)) - scheduledTaskTester("2", time.Now().Add(stSleepDuration*2)) - scheduledTaskTester("6", time.Now().Add(stSleepDuration*6)) - scheduledTaskTester("5", time.Now().Add(stSleepDuration*5)) - - // wait for test to finish - close(stWaitCh) - stWg.Wait() - - // test queue length - c = TotalScheduledTasks() - if c != 0 { - t.Errorf("Error in calculating Task Queue, expected 0, got %d", c) - } - - // collect output - close(stOutputChannel) - completeOutput := "" - for s := <-stOutputChannel; s != ""; s = <-stOutputChannel { - completeOutput += s - } - // check if test succeeded - if completeOutput != expectedOutput { - t.Errorf("ScheduledTask test failed, expected sequence %s, got %s", expectedOutput, completeOutput) - } - -}