Improve task scheduler sleep mode

This commit is contained in:
Vladimir Stoilov 2023-04-20 15:03:15 +02:00
parent ad52a8dc1b
commit d14791df9f
5 changed files with 39 additions and 39 deletions

View file

@ -128,7 +128,7 @@ func metricsWriter(ctx context.Context) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
case <-ticker.Read(): case <-ticker.Wait():
err := writeMetricsTo(ctx, pushURL) err := writeMetricsTo(ctx, pushURL)
if err != nil { if err != nil {
return err return err

View file

@ -20,8 +20,7 @@ var (
// modulesLocked locks `modules` during starting. // modulesLocked locks `modules` during starting.
modulesLocked = abool.New() modulesLocked = abool.New()
sleepMode = abool.NewBool(false) sleepMode = abool.NewBool(false)
taskSchedulerSleepModeExitChannel = make(chan struct{})
moduleStartTimeout = 2 * time.Minute moduleStartTimeout = 2 * time.Minute
moduleStopTimeout = 1 * time.Minute moduleStopTimeout = 1 * time.Minute
@ -112,13 +111,13 @@ func (m *Module) Sleep(enable bool) {
return return
} }
// Notify all waiting tasks that we are not sleeping anymore.
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
if enable { if enable {
m.sleepWaitingChannel = make(chan time.Time) m.sleepWaitingChannel = make(chan time.Time)
} else { } else {
// Notify all waiting tasks that we are not sleeping anymore.
close(m.sleepWaitingChannel) close(m.sleepWaitingChannel)
} }
} }
@ -129,6 +128,8 @@ func (m *Module) IsSleeping() bool {
} }
// WaitIfSleeping returns channel that will signal when it exits sleep mode. // WaitIfSleeping returns channel that will signal when it exits sleep mode.
// The channel will always return a zero-value time.Time.
// It uses time.Time to be easier dropped in to replace a time.Ticker.
func (m *Module) WaitIfSleeping() <-chan time.Time { func (m *Module) WaitIfSleeping() <-chan time.Time {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
@ -136,7 +137,7 @@ func (m *Module) WaitIfSleeping() <-chan time.Time {
} }
// NewSleepyTicker returns new sleepyTicker that will respect the modules sleep mode. // NewSleepyTicker returns new sleepyTicker that will respect the modules sleep mode.
func (m *Module) NewSleepyTicker(normalDuration time.Duration, sleepDuration time.Duration) *SleepyTicker { func (m *Module) NewSleepyTicker(normalDuration, sleepDuration time.Duration) *SleepyTicker {
return newSleepyTicker(m, normalDuration, sleepDuration) return newSleepyTicker(m, normalDuration, sleepDuration)
} }
@ -383,7 +384,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
Name: name, Name: name,
enabled: abool.NewBool(false), enabled: abool.NewBool(false),
enabledAsDependency: abool.NewBool(false), enabledAsDependency: abool.NewBool(false),
sleepMode: abool.NewBool(false), sleepMode: abool.NewBool(true),
sleepWaitingChannel: make(chan time.Time), sleepWaitingChannel: make(chan time.Time),
prepFn: prep, prepFn: prep,
startFn: start, startFn: start,
@ -400,6 +401,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
depNames: dependencies, depNames: dependencies,
} }
// Sleep mode is disabled by default
newModule.Sleep(false) newModule.Sleep(false)
return newModule return newModule
@ -425,21 +427,20 @@ func initDependencies() error {
return nil return nil
} }
// EnterSleepMode enables or disables sleep mode for all the modules. // SetSleepMode enables or disables sleep mode for all the modules.
func EnterSleepMode(enabled bool) { func SetSleepMode(enabled bool) {
// Check if differs with the old state.
set := sleepMode.SetToIf(!enabled, enabled)
if !set {
return
}
// Update all modules // Update all modules
for _, m := range modules { for _, m := range modules {
m.Sleep(enabled) m.Sleep(enabled)
} }
// Send signal to the task schedular. // Check if differs with the old state.
if !enabled { set := sleepMode.SetToIf(!enabled, enabled)
taskSchedulerSleepModeExitChannel <- struct{}{} if set {
// Send signal to the task schedular.
select {
case notifyTaskScheduler <- struct{}{}:
default:
}
} }
} }

View file

@ -25,8 +25,8 @@ func newSleepyTicker(module *Module, normalDuration time.Duration, sleepDuration
return st return st
} }
// Read waits until the module is not in sleep mode and returns time.Ticker.C channel. // Wait waits until the module is not in sleep mode and returns time.Ticker.C channel.
func (st *SleepyTicker) Read() <-chan time.Time { func (st *SleepyTicker) Wait() <-chan time.Time {
sleepModeEnabled := st.module.sleepMode.IsSet() sleepModeEnabled := st.module.sleepMode.IsSet()
// Update Sleep mode // Update Sleep mode
@ -35,10 +35,8 @@ func (st *SleepyTicker) Read() <-chan time.Time {
} }
// Wait if until sleep mode exits only if sleepDuration is set to 0. // Wait if until sleep mode exits only if sleepDuration is set to 0.
if sleepModeEnabled { if sleepModeEnabled && st.sleepDuration == 0 {
if st.sleepDuration == 0 { return st.module.WaitIfSleeping()
return st.module.WaitIfSleeping()
}
} }
return st.ticker.C return st.ticker.C
@ -49,12 +47,6 @@ func (st *SleepyTicker) Stop() {
st.ticker.Stop() st.ticker.Stop()
} }
// Reset stops a ticker and resets its period to the specified duration. The next tick will arrive after the new period elapses. The duration d must be greater than zero; if not, Reset will panic.
func (st *SleepyTicker) Reset(d time.Duration) {
// Reset standard ticker
st.ticker.Reset(d)
}
func (st *SleepyTicker) enterSleepMode(enabled bool) { func (st *SleepyTicker) enterSleepMode(enabled bool) {
st.sleepMode = enabled st.sleepMode = enabled
if enabled { if enabled {

View file

@ -51,9 +51,9 @@ var (
waitForever chan time.Time waitForever chan time.Time
queueIsFilled = make(chan struct{}, 1) // kick off queue handler queueIsFilled = make(chan struct{}, 1) // kick off queue handler
recalculateNextScheduledTask = make(chan struct{}, 1) notifyTaskScheduler = make(chan struct{}, 1)
taskTimeslot = make(chan struct{}) taskTimeslot = make(chan struct{})
) )
const ( const (
@ -410,7 +410,7 @@ func (t *Task) addToSchedule(overtime bool) {
// notify scheduler // notify scheduler
defer func() { defer func() {
select { select {
case recalculateNextScheduledTask <- struct{}{}: case notifyTaskScheduler <- struct{}{}:
default: default:
} }
}() }()
@ -443,10 +443,6 @@ func (t *Task) addToSchedule(overtime bool) {
} }
func waitUntilNextScheduledTask() <-chan time.Time { func waitUntilNextScheduledTask() <-chan time.Time {
if sleepMode.IsSet() {
<-taskSchedulerSleepModeExitChannel
}
scheduleLock.Lock() scheduleLock.Lock()
defer scheduleLock.Unlock() defer scheduleLock.Unlock()
@ -519,10 +515,21 @@ func taskScheduleHandler() {
} }
for { for {
if sleepMode.IsSet() {
select {
case <-shutdownSignal:
return
case <-notifyTaskScheduler:
continue
}
}
select { select {
case <-shutdownSignal: case <-shutdownSignal:
return return
case <-recalculateNextScheduledTask: case <-notifyTaskScheduler:
continue
case <-waitUntilNextScheduledTask(): case <-waitUntilNextScheduledTask():
scheduleLock.Lock() scheduleLock.Lock()

View file

@ -13,7 +13,7 @@ func cleaner(ctx context.Context) error { //nolint:unparam // Conforms to worker
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
case <-ticker.Read(): case <-ticker.Wait():
deleteExpiredNotifs() deleteExpiredNotifs()
} }
} }