mirror of
https://github.com/safing/portbase
synced 2025-09-01 10:09:50 +00:00
Merge taskmanager into modules package, attach tasks to modules
This commit is contained in:
parent
c1cc409558
commit
938c92d6c4
9 changed files with 603 additions and 438 deletions
|
@ -1,4 +1,4 @@
|
|||
package taskmanager
|
||||
package modules
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
|
@ -24,9 +24,6 @@ var (
|
|||
tasksDoneFlag *abool.AtomicBool
|
||||
tasksWaiting chan bool
|
||||
tasksWaitingFlag *abool.AtomicBool
|
||||
|
||||
shutdownSignal = make(chan struct{}, 0)
|
||||
suttingDown = abool.NewBool(false)
|
||||
)
|
||||
|
||||
// StartMicroTask starts a new MicroTask. It will start immediately.
|
||||
|
@ -51,7 +48,7 @@ func newTaskIsWaiting() {
|
|||
|
||||
// StartMediumPriorityMicroTask starts a new MicroTask (waiting its turn) if channel receives.
|
||||
func StartMediumPriorityMicroTask() chan bool {
|
||||
if suttingDown.IsSet() {
|
||||
if shutdownSignalClosed.IsSet() {
|
||||
return closedChannel
|
||||
}
|
||||
if tasksWaitingFlag.SetToIf(false, true) {
|
||||
|
@ -62,7 +59,7 @@ func StartMediumPriorityMicroTask() chan bool {
|
|||
|
||||
// StartLowPriorityMicroTask starts a new MicroTask (waiting its turn) if channel receives.
|
||||
func StartLowPriorityMicroTask() chan bool {
|
||||
if suttingDown.IsSet() {
|
||||
if shutdownSignalClosed.IsSet() {
|
||||
return closedChannel
|
||||
}
|
||||
if tasksWaitingFlag.SetToIf(false, true) {
|
||||
|
@ -73,7 +70,7 @@ func StartLowPriorityMicroTask() chan bool {
|
|||
|
||||
// StartVeryLowPriorityMicroTask starts a new MicroTask (waiting its turn) if channel receives.
|
||||
func StartVeryLowPriorityMicroTask() chan bool {
|
||||
if suttingDown.IsSet() {
|
||||
if shutdownSignalClosed.IsSet() {
|
||||
return closedChannel
|
||||
}
|
||||
if tasksWaitingFlag.SetToIf(false, true) {
|
||||
|
@ -96,7 +93,7 @@ func init() {
|
|||
closedChannel = make(chan bool, 0)
|
||||
close(closedChannel)
|
||||
|
||||
var t int32 = 0
|
||||
var t int32
|
||||
tasks = &t
|
||||
|
||||
mediumPriorityClearance = make(chan bool, 0)
|
||||
|
@ -116,7 +113,7 @@ func init() {
|
|||
for {
|
||||
|
||||
// wait for an event to start new tasks
|
||||
if !suttingDown.IsSet() {
|
||||
if !shutdownSignalClosed.IsSet() {
|
||||
|
||||
// reset timer
|
||||
// https://golang.org/pkg/time/#Timer.Reset
|
|
@ -1,4 +1,4 @@
|
|||
package taskmanager
|
||||
package modules
|
||||
|
||||
import (
|
||||
"strings"
|
|
@ -79,6 +79,9 @@ func Start() error {
|
|||
close(startCompleteSignal)
|
||||
}
|
||||
|
||||
go taskQueueHandler()
|
||||
go taskScheduleHandler()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
425
modules/tasks.go
Normal file
425
modules/tasks.go
Normal file
|
@ -0,0 +1,425 @@
|
|||
package modules
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tevino/abool"
|
||||
|
||||
"github.com/safing/portbase/log"
|
||||
)
|
||||
|
||||
// Task is managed task bound to a module.
|
||||
type Task struct {
|
||||
name string
|
||||
module *Module
|
||||
taskFn TaskFn
|
||||
|
||||
queued bool
|
||||
canceled bool
|
||||
executing bool
|
||||
cancelFunc func()
|
||||
|
||||
executeAt time.Time
|
||||
repeat time.Duration
|
||||
maxDelay time.Duration
|
||||
|
||||
queueElement *list.Element
|
||||
prioritizedQueueElement *list.Element
|
||||
scheduleListElement *list.Element
|
||||
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// TaskFn is the function signature for creating Tasks.
|
||||
type TaskFn func(ctx context.Context, task *Task)
|
||||
|
||||
var (
|
||||
taskQueue = list.New()
|
||||
prioritizedTaskQueue = list.New()
|
||||
queuesLock sync.Mutex
|
||||
queueWg sync.WaitGroup
|
||||
|
||||
taskSchedule = list.New()
|
||||
scheduleLock sync.Mutex
|
||||
|
||||
waitForever chan time.Time
|
||||
|
||||
queueIsFilled = make(chan struct{}, 1) // kick off queue handler
|
||||
recalculateNextScheduledTask = make(chan struct{}, 1)
|
||||
)
|
||||
|
||||
const (
|
||||
maxExecutionWait = 1 * time.Minute
|
||||
defaultMaxDelay = 5 * time.Minute
|
||||
minRepeatDuration = 1 * time.Second
|
||||
)
|
||||
|
||||
// NewTask creates a new task with a descriptive name (non-unique), a optional deadline, and the task function to be executed. You must call one of Queue, Prioritize, StartASAP, Schedule or Repeat in order to have the Task executed.
|
||||
func (m *Module) NewTask(name string, taskFn TaskFn) *Task {
|
||||
return &Task{
|
||||
name: name,
|
||||
module: m,
|
||||
taskFn: taskFn,
|
||||
maxDelay: defaultMaxDelay,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Task) isActive() bool {
|
||||
return !t.canceled && !t.module.ShutdownInProgress()
|
||||
}
|
||||
|
||||
func (t *Task) prepForQueueing() (ok bool) {
|
||||
if !t.isActive() {
|
||||
return false
|
||||
}
|
||||
|
||||
t.queued = true
|
||||
if t.maxDelay != 0 {
|
||||
t.executeAt = time.Now().Add(t.maxDelay)
|
||||
t.addToSchedule()
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func notifyQueue() {
|
||||
select {
|
||||
case queueIsFilled <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Queue queues the Task for execution.
|
||||
func (t *Task) Queue() *Task {
|
||||
t.lock.Lock()
|
||||
if !t.prepForQueueing() {
|
||||
t.lock.Unlock()
|
||||
return t
|
||||
}
|
||||
t.lock.Unlock()
|
||||
|
||||
if t.queueElement == nil {
|
||||
queuesLock.Lock()
|
||||
t.queueElement = taskQueue.PushBack(t)
|
||||
queuesLock.Unlock()
|
||||
}
|
||||
|
||||
notifyQueue()
|
||||
return t
|
||||
}
|
||||
|
||||
// Prioritize puts the task in the prioritized queue.
|
||||
func (t *Task) Prioritize() *Task {
|
||||
t.lock.Lock()
|
||||
if !t.prepForQueueing() {
|
||||
t.lock.Unlock()
|
||||
return t
|
||||
}
|
||||
t.lock.Unlock()
|
||||
|
||||
if t.prioritizedQueueElement == nil {
|
||||
queuesLock.Lock()
|
||||
t.prioritizedQueueElement = prioritizedTaskQueue.PushBack(t)
|
||||
queuesLock.Unlock()
|
||||
}
|
||||
|
||||
notifyQueue()
|
||||
return t
|
||||
}
|
||||
|
||||
// StartASAP schedules the task to be executed next.
|
||||
func (t *Task) StartASAP() *Task {
|
||||
t.lock.Lock()
|
||||
if !t.prepForQueueing() {
|
||||
t.lock.Unlock()
|
||||
return t
|
||||
}
|
||||
t.lock.Unlock()
|
||||
|
||||
queuesLock.Lock()
|
||||
if t.prioritizedQueueElement == nil {
|
||||
t.prioritizedQueueElement = prioritizedTaskQueue.PushFront(t)
|
||||
} else {
|
||||
prioritizedTaskQueue.MoveToFront(t.prioritizedQueueElement)
|
||||
}
|
||||
queuesLock.Unlock()
|
||||
|
||||
notifyQueue()
|
||||
return t
|
||||
}
|
||||
|
||||
// MaxDelay sets a maximum delay within the task should be executed from being queued. Scheduled tasks are queued when they are triggered. The default delay is 3 minutes.
|
||||
func (t *Task) MaxDelay(maxDelay time.Duration) *Task {
|
||||
t.lock.Lock()
|
||||
t.maxDelay = maxDelay
|
||||
t.lock.Unlock()
|
||||
return t
|
||||
}
|
||||
|
||||
// Schedule schedules the task for execution at the given time.
|
||||
func (t *Task) Schedule(executeAt time.Time) *Task {
|
||||
t.lock.Lock()
|
||||
t.executeAt = executeAt
|
||||
t.addToSchedule()
|
||||
t.lock.Unlock()
|
||||
return t
|
||||
}
|
||||
|
||||
// Repeat sets the task to be executed in endless repeat at the specified interval. First execution will be after interval. Minimum repeat interval is one second.
|
||||
func (t *Task) Repeat(interval time.Duration) *Task {
|
||||
// check minimum interval duration
|
||||
if interval < minRepeatDuration {
|
||||
interval = minRepeatDuration
|
||||
}
|
||||
|
||||
t.lock.Lock()
|
||||
t.repeat = interval
|
||||
t.executeAt = time.Now().Add(t.repeat)
|
||||
t.lock.Unlock()
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
// Cancel cancels the current and any future execution of the Task. This is not reversible by any other functions.
|
||||
func (t *Task) Cancel() {
|
||||
t.lock.Lock()
|
||||
t.canceled = true
|
||||
if t.cancelFunc != nil {
|
||||
t.cancelFunc()
|
||||
}
|
||||
t.lock.Unlock()
|
||||
}
|
||||
|
||||
func (t *Task) runWithLocking() {
|
||||
t.lock.Lock()
|
||||
|
||||
// check state, return if already executing or inactive
|
||||
if t.executing || !t.isActive() {
|
||||
t.lock.Unlock()
|
||||
return
|
||||
}
|
||||
t.executing = true
|
||||
|
||||
// get list elements
|
||||
queueElement := t.queueElement
|
||||
prioritizedQueueElement := t.prioritizedQueueElement
|
||||
scheduleListElement := t.scheduleListElement
|
||||
|
||||
// create context
|
||||
var taskCtx context.Context
|
||||
taskCtx, t.cancelFunc = context.WithCancel(t.module.Ctx)
|
||||
|
||||
t.lock.Unlock()
|
||||
|
||||
// remove from lists
|
||||
if queueElement != nil {
|
||||
queuesLock.Lock()
|
||||
taskQueue.Remove(t.queueElement)
|
||||
queuesLock.Unlock()
|
||||
t.lock.Lock()
|
||||
t.queueElement = nil
|
||||
t.lock.Unlock()
|
||||
}
|
||||
if prioritizedQueueElement != nil {
|
||||
queuesLock.Lock()
|
||||
prioritizedTaskQueue.Remove(t.prioritizedQueueElement)
|
||||
queuesLock.Unlock()
|
||||
t.lock.Lock()
|
||||
t.prioritizedQueueElement = nil
|
||||
t.lock.Unlock()
|
||||
}
|
||||
if scheduleListElement != nil {
|
||||
scheduleLock.Lock()
|
||||
taskSchedule.Remove(t.scheduleListElement)
|
||||
scheduleLock.Unlock()
|
||||
t.lock.Lock()
|
||||
t.scheduleListElement = nil
|
||||
t.lock.Unlock()
|
||||
}
|
||||
|
||||
// add to module workers
|
||||
t.module.AddWorkers(1)
|
||||
// add to queue workgroup
|
||||
queueWg.Add(1)
|
||||
|
||||
go t.executeWithLocking(taskCtx, t.cancelFunc)
|
||||
go func() {
|
||||
select {
|
||||
case <-taskCtx.Done():
|
||||
case <-time.After(maxExecutionWait):
|
||||
}
|
||||
// complete queue worker (early) to allow next worker
|
||||
queueWg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
func (t *Task) executeWithLocking(ctx context.Context, cancelFunc func()) {
|
||||
defer func() {
|
||||
// log result if error
|
||||
panicVal := recover()
|
||||
if panicVal != nil {
|
||||
log.Errorf("%s: task %s panicked: %s", t.module.Name, t.name, panicVal)
|
||||
}
|
||||
|
||||
// mark task as completed
|
||||
t.module.FinishWorker()
|
||||
|
||||
// reset
|
||||
t.lock.Lock()
|
||||
// reset state
|
||||
t.executing = false
|
||||
t.queued = false
|
||||
// repeat?
|
||||
if t.isActive() && t.repeat != 0 {
|
||||
t.executeAt = time.Now().Add(t.repeat)
|
||||
t.addToSchedule()
|
||||
}
|
||||
t.lock.Unlock()
|
||||
|
||||
// notify that we finished
|
||||
cancelFunc()
|
||||
}()
|
||||
t.taskFn(ctx, t)
|
||||
}
|
||||
|
||||
func (t *Task) getExecuteAtWithLocking() time.Time {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
return t.executeAt
|
||||
}
|
||||
|
||||
func (t *Task) addToSchedule() {
|
||||
scheduleLock.Lock()
|
||||
defer scheduleLock.Unlock()
|
||||
|
||||
// notify scheduler
|
||||
defer func() {
|
||||
select {
|
||||
case recalculateNextScheduledTask <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
||||
// insert task into schedule
|
||||
for e := taskSchedule.Front(); e != nil; e = e.Next() {
|
||||
// check for self
|
||||
eVal := e.Value.(*Task)
|
||||
if eVal == t {
|
||||
continue
|
||||
}
|
||||
// compare
|
||||
if t.executeAt.Before(eVal.getExecuteAtWithLocking()) {
|
||||
// insert/move task
|
||||
if t.scheduleListElement == nil {
|
||||
t.scheduleListElement = taskSchedule.InsertBefore(t, e)
|
||||
} else {
|
||||
taskSchedule.MoveBefore(t.scheduleListElement, e)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// add/move to end
|
||||
if t.scheduleListElement == nil {
|
||||
t.scheduleListElement = taskSchedule.PushBack(t)
|
||||
} else {
|
||||
taskSchedule.MoveToBack(t.scheduleListElement)
|
||||
}
|
||||
}
|
||||
|
||||
func waitUntilNextScheduledTask() <-chan time.Time {
|
||||
scheduleLock.Lock()
|
||||
defer scheduleLock.Unlock()
|
||||
|
||||
if taskSchedule.Len() > 0 {
|
||||
return time.After(taskSchedule.Front().Value.(*Task).executeAt.Sub(time.Now()))
|
||||
}
|
||||
return waitForever
|
||||
}
|
||||
|
||||
var (
|
||||
taskQueueHandlerStarted = abool.NewBool(false)
|
||||
taskScheduleHandlerStarted = abool.NewBool(false)
|
||||
)
|
||||
|
||||
func taskQueueHandler() {
|
||||
if !taskQueueHandlerStarted.SetToIf(false, true) {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
// wait
|
||||
select {
|
||||
case <-shutdownSignal:
|
||||
return
|
||||
case <-queueIsFilled:
|
||||
}
|
||||
|
||||
// execute
|
||||
execLoop:
|
||||
for {
|
||||
// wait for execution slot
|
||||
queueWg.Wait()
|
||||
|
||||
// check for shutdown
|
||||
if shutdownSignalClosed.IsSet() {
|
||||
return
|
||||
}
|
||||
|
||||
// get next Task
|
||||
queuesLock.Lock()
|
||||
e := prioritizedTaskQueue.Front()
|
||||
if e != nil {
|
||||
prioritizedTaskQueue.Remove(e)
|
||||
} else {
|
||||
e = taskQueue.Front()
|
||||
if e != nil {
|
||||
taskQueue.Remove(e)
|
||||
}
|
||||
}
|
||||
queuesLock.Unlock()
|
||||
|
||||
// lists are empty
|
||||
if e == nil {
|
||||
break execLoop
|
||||
}
|
||||
|
||||
// value -> Task
|
||||
t := e.Value.(*Task)
|
||||
// run
|
||||
t.runWithLocking()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func taskScheduleHandler() {
|
||||
if !taskScheduleHandlerStarted.SetToIf(false, true) {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-shutdownSignal:
|
||||
return
|
||||
case <-recalculateNextScheduledTask:
|
||||
case <-waitUntilNextScheduledTask():
|
||||
// get first task in schedule
|
||||
scheduleLock.Lock()
|
||||
e := taskSchedule.Front()
|
||||
scheduleLock.Unlock()
|
||||
t := e.Value.(*Task)
|
||||
|
||||
// process Task
|
||||
if t.queued {
|
||||
// already queued and maxDelay reached
|
||||
t.runWithLocking()
|
||||
} else {
|
||||
// place in front of prioritized queue
|
||||
t.StartASAP()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
168
modules/tasks_test.go
Normal file
168
modules/tasks_test.go
Normal file
|
@ -0,0 +1,168 @@
|
|||
package modules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime/pprof"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tevino/abool"
|
||||
)
|
||||
|
||||
func init() {
|
||||
go taskQueueHandler()
|
||||
go taskScheduleHandler()
|
||||
|
||||
go func() {
|
||||
<-time.After(10 * time.Second)
|
||||
fmt.Fprintln(os.Stderr, "taking too long")
|
||||
pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
|
||||
os.Exit(1)
|
||||
}()
|
||||
}
|
||||
|
||||
// test waiting
|
||||
|
||||
// globals
|
||||
var qtWg sync.WaitGroup
|
||||
var qtOutputChannel chan string
|
||||
var qtSleepDuration time.Duration
|
||||
var qtWorkerCnt int32
|
||||
var qtModule = &Module{
|
||||
Name: "task test module",
|
||||
Prepped: abool.NewBool(false),
|
||||
Started: abool.NewBool(false),
|
||||
Stopped: abool.NewBool(false),
|
||||
inTransition: abool.NewBool(false),
|
||||
Ctx: context.Background(),
|
||||
shutdownFlag: abool.NewBool(false),
|
||||
workerGroup: sync.WaitGroup{},
|
||||
workerCnt: &qtWorkerCnt,
|
||||
}
|
||||
|
||||
// functions
|
||||
func queuedTaskTester(s string) {
|
||||
qtModule.NewTask(s, func(ctx context.Context, t *Task) {
|
||||
time.Sleep(qtSleepDuration * 2)
|
||||
qtOutputChannel <- s
|
||||
qtWg.Done()
|
||||
}).Queue()
|
||||
}
|
||||
|
||||
func prioritizedTaskTester(s string) {
|
||||
qtModule.NewTask(s, func(ctx context.Context, t *Task) {
|
||||
time.Sleep(qtSleepDuration * 2)
|
||||
qtOutputChannel <- s
|
||||
qtWg.Done()
|
||||
}).Prioritize()
|
||||
}
|
||||
|
||||
// test
|
||||
func TestQueuedTask(t *testing.T) {
|
||||
// skip
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
|
||||
// init
|
||||
expectedOutput := "0123456789"
|
||||
qtSleepDuration = 20 * time.Millisecond
|
||||
qtOutputChannel = make(chan string, 100)
|
||||
qtWg.Add(10)
|
||||
|
||||
// TEST
|
||||
queuedTaskTester("0")
|
||||
queuedTaskTester("1")
|
||||
queuedTaskTester("3")
|
||||
queuedTaskTester("4")
|
||||
queuedTaskTester("6")
|
||||
queuedTaskTester("7")
|
||||
queuedTaskTester("9")
|
||||
|
||||
time.Sleep(qtSleepDuration * 3)
|
||||
prioritizedTaskTester("2")
|
||||
time.Sleep(qtSleepDuration * 6)
|
||||
prioritizedTaskTester("5")
|
||||
time.Sleep(qtSleepDuration * 6)
|
||||
prioritizedTaskTester("8")
|
||||
|
||||
// wait for test to finish
|
||||
qtWg.Wait()
|
||||
|
||||
// collect output
|
||||
close(qtOutputChannel)
|
||||
completeOutput := ""
|
||||
for s := <-qtOutputChannel; s != ""; s = <-qtOutputChannel {
|
||||
completeOutput += s
|
||||
}
|
||||
// check if test succeeded
|
||||
if completeOutput != expectedOutput {
|
||||
t.Errorf("QueuedTask test failed, expected sequence %s, got %s", expectedOutput, completeOutput)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// test scheduled tasks
|
||||
|
||||
// globals
|
||||
var stWg sync.WaitGroup
|
||||
var stOutputChannel chan string
|
||||
var stSleepDuration time.Duration
|
||||
var stWaitCh chan bool
|
||||
|
||||
// functions
|
||||
func scheduledTaskTester(s string, sched time.Time) {
|
||||
qtModule.NewTask(s, func(ctx context.Context, t *Task) {
|
||||
time.Sleep(stSleepDuration)
|
||||
stOutputChannel <- s
|
||||
stWg.Done()
|
||||
}).Schedule(sched)
|
||||
}
|
||||
|
||||
// test
|
||||
func TestScheduledTaskWaiting(t *testing.T) {
|
||||
|
||||
// skip
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
|
||||
// init
|
||||
expectedOutput := "0123456789"
|
||||
stSleepDuration = 10 * time.Millisecond
|
||||
stOutputChannel = make(chan string, 100)
|
||||
stWaitCh = make(chan bool, 0)
|
||||
|
||||
stWg.Add(10)
|
||||
|
||||
// TEST
|
||||
scheduledTaskTester("4", time.Now().Add(stSleepDuration*8))
|
||||
scheduledTaskTester("0", time.Now().Add(stSleepDuration*0))
|
||||
scheduledTaskTester("8", time.Now().Add(stSleepDuration*16))
|
||||
scheduledTaskTester("1", time.Now().Add(stSleepDuration*2))
|
||||
scheduledTaskTester("7", time.Now().Add(stSleepDuration*14))
|
||||
scheduledTaskTester("9", time.Now().Add(stSleepDuration*18))
|
||||
scheduledTaskTester("3", time.Now().Add(stSleepDuration*6))
|
||||
scheduledTaskTester("2", time.Now().Add(stSleepDuration*4))
|
||||
scheduledTaskTester("6", time.Now().Add(stSleepDuration*12))
|
||||
scheduledTaskTester("5", time.Now().Add(stSleepDuration*10))
|
||||
|
||||
// wait for test to finish
|
||||
close(stWaitCh)
|
||||
stWg.Wait()
|
||||
|
||||
// collect output
|
||||
close(stOutputChannel)
|
||||
completeOutput := ""
|
||||
for s := <-stOutputChannel; s != ""; s = <-stOutputChannel {
|
||||
completeOutput += s
|
||||
}
|
||||
// check if test succeeded
|
||||
if completeOutput != expectedOutput {
|
||||
t.Errorf("ScheduledTask test failed, expected sequence %s, got %s", expectedOutput, completeOutput)
|
||||
}
|
||||
|
||||
}
|
|
@ -1,152 +0,0 @@
|
|||
package taskmanager
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"time"
|
||||
|
||||
"github.com/tevino/abool"
|
||||
)
|
||||
|
||||
type Task struct {
|
||||
name string
|
||||
start chan bool
|
||||
started *abool.AtomicBool
|
||||
schedule *time.Time
|
||||
}
|
||||
|
||||
var taskQueue *list.List
|
||||
var prioritizedTaskQueue *list.List
|
||||
var addToQueue chan *Task
|
||||
var addToPrioritizedQueue chan *Task
|
||||
var addAsNextTask chan *Task
|
||||
|
||||
var finishedQueuedTask chan bool
|
||||
var queuedTaskRunning *abool.AtomicBool
|
||||
|
||||
var getQueueLengthREQ chan bool
|
||||
var getQueueLengthREP chan int
|
||||
|
||||
func newUnqeuedTask(name string) *Task {
|
||||
t := &Task{
|
||||
name,
|
||||
make(chan bool),
|
||||
abool.NewBool(false),
|
||||
nil,
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
func NewQueuedTask(name string) *Task {
|
||||
t := newUnqeuedTask(name)
|
||||
addToQueue <- t
|
||||
return t
|
||||
}
|
||||
|
||||
func NewPrioritizedQueuedTask(name string) *Task {
|
||||
t := newUnqeuedTask(name)
|
||||
addToPrioritizedQueue <- t
|
||||
return t
|
||||
}
|
||||
|
||||
func (t *Task) addToPrioritizedQueue() {
|
||||
addToPrioritizedQueue <- t
|
||||
}
|
||||
|
||||
func (t *Task) WaitForStart() chan bool {
|
||||
return t.start
|
||||
}
|
||||
|
||||
func (t *Task) StartAnyway() {
|
||||
addAsNextTask <- t
|
||||
}
|
||||
|
||||
func (t *Task) Done() {
|
||||
if !t.started.SetToIf(false, true) {
|
||||
finishedQueuedTask <- true
|
||||
}
|
||||
}
|
||||
|
||||
func TotalQueuedTasks() int {
|
||||
getQueueLengthREQ <- true
|
||||
return <-getQueueLengthREP
|
||||
}
|
||||
|
||||
func checkQueueStatus() {
|
||||
if queuedTaskRunning.SetToIf(false, true) {
|
||||
finishedQueuedTask <- true
|
||||
}
|
||||
}
|
||||
|
||||
func fireNextTask() {
|
||||
|
||||
if prioritizedTaskQueue.Len() > 0 {
|
||||
for e := prioritizedTaskQueue.Front(); prioritizedTaskQueue.Len() > 0; e.Next() {
|
||||
t := e.Value.(*Task)
|
||||
prioritizedTaskQueue.Remove(e)
|
||||
if t.started.SetToIf(false, true) {
|
||||
close(t.start)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if taskQueue.Len() > 0 {
|
||||
for e := taskQueue.Front(); taskQueue.Len() > 0; e.Next() {
|
||||
t := e.Value.(*Task)
|
||||
taskQueue.Remove(e)
|
||||
if t.started.SetToIf(false, true) {
|
||||
close(t.start)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
queuedTaskRunning.UnSet()
|
||||
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
taskQueue = list.New()
|
||||
prioritizedTaskQueue = list.New()
|
||||
addToQueue = make(chan *Task, 1)
|
||||
addToPrioritizedQueue = make(chan *Task, 1)
|
||||
addAsNextTask = make(chan *Task, 1)
|
||||
|
||||
finishedQueuedTask = make(chan bool, 1)
|
||||
queuedTaskRunning = abool.NewBool(false)
|
||||
|
||||
getQueueLengthREQ = make(chan bool, 1)
|
||||
getQueueLengthREP = make(chan int, 1)
|
||||
|
||||
go func() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-shutdownSignal:
|
||||
// TODO: work off queue?
|
||||
return
|
||||
case <-getQueueLengthREQ:
|
||||
// TODO: maybe clean queues before replying
|
||||
if queuedTaskRunning.IsSet() {
|
||||
getQueueLengthREP <- prioritizedTaskQueue.Len() + taskQueue.Len() + 1
|
||||
} else {
|
||||
getQueueLengthREP <- prioritizedTaskQueue.Len() + taskQueue.Len()
|
||||
}
|
||||
case t := <-addToQueue:
|
||||
taskQueue.PushBack(t)
|
||||
checkQueueStatus()
|
||||
case t := <-addToPrioritizedQueue:
|
||||
prioritizedTaskQueue.PushBack(t)
|
||||
checkQueueStatus()
|
||||
case t := <-addAsNextTask:
|
||||
prioritizedTaskQueue.PushFront(t)
|
||||
checkQueueStatus()
|
||||
case <-finishedQueuedTask:
|
||||
fireNextTask()
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
}
|
|
@ -1,110 +0,0 @@
|
|||
package taskmanager
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// test waiting
|
||||
|
||||
// globals
|
||||
var qtWg sync.WaitGroup
|
||||
var qtOutputChannel chan string
|
||||
var qtSleepDuration time.Duration
|
||||
|
||||
// functions
|
||||
func queuedTaskTester(s string) {
|
||||
t := NewQueuedTask(s)
|
||||
go func() {
|
||||
<-t.WaitForStart()
|
||||
time.Sleep(qtSleepDuration * 2)
|
||||
qtOutputChannel <- s
|
||||
t.Done()
|
||||
qtWg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
func prioritizedTastTester(s string) {
|
||||
t := NewPrioritizedQueuedTask(s)
|
||||
go func() {
|
||||
<-t.WaitForStart()
|
||||
time.Sleep(qtSleepDuration * 2)
|
||||
qtOutputChannel <- s
|
||||
t.Done()
|
||||
qtWg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
// test
|
||||
func TestQueuedTask(t *testing.T) {
|
||||
|
||||
// skip
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
|
||||
// init
|
||||
expectedOutput := "0123456789"
|
||||
qtSleepDuration = 10 * time.Millisecond
|
||||
qtOutputChannel = make(chan string, 100)
|
||||
qtWg.Add(10)
|
||||
|
||||
// test queue length
|
||||
c := TotalQueuedTasks()
|
||||
if c != 0 {
|
||||
t.Errorf("Error in calculating Task Queue, expected 0, got %d", c)
|
||||
}
|
||||
|
||||
// TEST
|
||||
queuedTaskTester("0")
|
||||
queuedTaskTester("1")
|
||||
queuedTaskTester("3")
|
||||
queuedTaskTester("4")
|
||||
queuedTaskTester("6")
|
||||
queuedTaskTester("7")
|
||||
queuedTaskTester("9")
|
||||
|
||||
// test queue length
|
||||
c = TotalQueuedTasks()
|
||||
if c != 7 {
|
||||
t.Errorf("Error in calculating Task Queue, expected 7, got %d", c)
|
||||
}
|
||||
|
||||
time.Sleep(qtSleepDuration * 3)
|
||||
prioritizedTastTester("2")
|
||||
time.Sleep(qtSleepDuration * 6)
|
||||
prioritizedTastTester("5")
|
||||
time.Sleep(qtSleepDuration * 6)
|
||||
prioritizedTastTester("8")
|
||||
|
||||
// test queue length
|
||||
c = TotalQueuedTasks()
|
||||
if c != 3 {
|
||||
t.Errorf("Error in calculating Task Queue, expected 3, got %d", c)
|
||||
}
|
||||
|
||||
// time.Sleep(qtSleepDuration * 100)
|
||||
// panic("")
|
||||
|
||||
// wait for test to finish
|
||||
qtWg.Wait()
|
||||
|
||||
// test queue length
|
||||
c = TotalQueuedTasks()
|
||||
if c != 0 {
|
||||
t.Errorf("Error in calculating Task Queue, expected 0, got %d", c)
|
||||
}
|
||||
|
||||
// collect output
|
||||
close(qtOutputChannel)
|
||||
completeOutput := ""
|
||||
for s := <-qtOutputChannel; s != ""; s = <-qtOutputChannel {
|
||||
completeOutput += s
|
||||
}
|
||||
// check if test succeeded
|
||||
if completeOutput != expectedOutput {
|
||||
t.Errorf("QueuedTask test failed, expected sequence %s, got %s", expectedOutput, completeOutput)
|
||||
}
|
||||
|
||||
}
|
|
@ -1,73 +0,0 @@
|
|||
package taskmanager
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"time"
|
||||
)
|
||||
|
||||
var taskSchedule *list.List
|
||||
var addToSchedule chan *Task
|
||||
var waitForever chan time.Time
|
||||
|
||||
var getScheduleLengthREQ chan bool
|
||||
var getScheduleLengthREP chan int
|
||||
|
||||
func NewScheduledTask(name string, schedule time.Time) *Task {
|
||||
t := newUnqeuedTask(name)
|
||||
t.schedule = &schedule
|
||||
addToSchedule <- t
|
||||
return t
|
||||
}
|
||||
|
||||
func TotalScheduledTasks() int {
|
||||
getScheduleLengthREQ <- true
|
||||
return <-getScheduleLengthREP
|
||||
}
|
||||
|
||||
func (t *Task) addToSchedule() {
|
||||
for e := taskSchedule.Back(); e != nil; e = e.Prev() {
|
||||
if t.schedule.After(*e.Value.(*Task).schedule) {
|
||||
taskSchedule.InsertAfter(t, e)
|
||||
return
|
||||
}
|
||||
}
|
||||
taskSchedule.PushFront(t)
|
||||
}
|
||||
|
||||
func waitUntilNextScheduledTask() <-chan time.Time {
|
||||
if taskSchedule.Len() > 0 {
|
||||
return time.After(taskSchedule.Front().Value.(*Task).schedule.Sub(time.Now()))
|
||||
}
|
||||
return waitForever
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
taskSchedule = list.New()
|
||||
addToSchedule = make(chan *Task, 1)
|
||||
waitForever = make(chan time.Time, 1)
|
||||
|
||||
getScheduleLengthREQ = make(chan bool, 1)
|
||||
getScheduleLengthREP = make(chan int, 1)
|
||||
|
||||
go func() {
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-shutdownSignal:
|
||||
return
|
||||
case <-getScheduleLengthREQ:
|
||||
// TODO: maybe clean queues before replying
|
||||
getScheduleLengthREP <- prioritizedTaskQueue.Len() + taskSchedule.Len()
|
||||
case t := <-addToSchedule:
|
||||
t.addToSchedule()
|
||||
case <-waitUntilNextScheduledTask():
|
||||
e := taskSchedule.Front()
|
||||
t := e.Value.(*Task)
|
||||
t.addToPrioritizedQueue()
|
||||
taskSchedule.Remove(e)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
|
@ -1,93 +0,0 @@
|
|||
package taskmanager
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// test waiting
|
||||
|
||||
// globals
|
||||
var stWg sync.WaitGroup
|
||||
var stOutputChannel chan string
|
||||
var stSleepDuration time.Duration
|
||||
var stWaitCh chan bool
|
||||
|
||||
// functions
|
||||
func scheduledTaskTester(s string, sched time.Time) {
|
||||
t := NewScheduledTask(s, sched)
|
||||
go func() {
|
||||
<-stWaitCh
|
||||
<-t.WaitForStart()
|
||||
time.Sleep(stSleepDuration)
|
||||
stOutputChannel <- s
|
||||
t.Done()
|
||||
stWg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
// test
|
||||
func TestScheduledTaskWaiting(t *testing.T) {
|
||||
|
||||
// skip
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
|
||||
// init
|
||||
expectedOutput := "0123456789"
|
||||
stSleepDuration = 10 * time.Millisecond
|
||||
stOutputChannel = make(chan string, 100)
|
||||
stWaitCh = make(chan bool, 0)
|
||||
|
||||
// test queue length
|
||||
c := TotalScheduledTasks()
|
||||
if c != 0 {
|
||||
t.Errorf("Error in calculating Task Queue, expected 0, got %d", c)
|
||||
}
|
||||
|
||||
stWg.Add(10)
|
||||
|
||||
// TEST
|
||||
scheduledTaskTester("4", time.Now().Add(stSleepDuration*4))
|
||||
scheduledTaskTester("0", time.Now().Add(stSleepDuration*1))
|
||||
scheduledTaskTester("8", time.Now().Add(stSleepDuration*8))
|
||||
scheduledTaskTester("1", time.Now().Add(stSleepDuration*2))
|
||||
scheduledTaskTester("7", time.Now().Add(stSleepDuration*7))
|
||||
|
||||
// test queue length
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
c = TotalScheduledTasks()
|
||||
if c != 5 {
|
||||
t.Errorf("Error in calculating Task Queue, expected 5, got %d", c)
|
||||
}
|
||||
|
||||
scheduledTaskTester("9", time.Now().Add(stSleepDuration*9))
|
||||
scheduledTaskTester("3", time.Now().Add(stSleepDuration*3))
|
||||
scheduledTaskTester("2", time.Now().Add(stSleepDuration*2))
|
||||
scheduledTaskTester("6", time.Now().Add(stSleepDuration*6))
|
||||
scheduledTaskTester("5", time.Now().Add(stSleepDuration*5))
|
||||
|
||||
// wait for test to finish
|
||||
close(stWaitCh)
|
||||
stWg.Wait()
|
||||
|
||||
// test queue length
|
||||
c = TotalScheduledTasks()
|
||||
if c != 0 {
|
||||
t.Errorf("Error in calculating Task Queue, expected 0, got %d", c)
|
||||
}
|
||||
|
||||
// collect output
|
||||
close(stOutputChannel)
|
||||
completeOutput := ""
|
||||
for s := <-stOutputChannel; s != ""; s = <-stOutputChannel {
|
||||
completeOutput += s
|
||||
}
|
||||
// check if test succeeded
|
||||
if completeOutput != expectedOutput {
|
||||
t.Errorf("ScheduledTask test failed, expected sequence %s, got %s", expectedOutput, completeOutput)
|
||||
}
|
||||
|
||||
}
|
Loading…
Add table
Reference in a new issue