From d14791df9fdff8ccb02f2400945cbf49903e3cbb Mon Sep 17 00:00:00 2001 From: Vladimir Stoilov <vladimir@safing.io> Date: Thu, 20 Apr 2023 15:03:15 +0200 Subject: [PATCH] Improve task scheduler sleep mode --- metrics/api.go | 2 +- modules/modules.go | 33 +++++++++++++++++---------------- modules/sleepyticker.go | 16 ++++------------ modules/tasks.go | 25 ++++++++++++++++--------- notifications/cleaner.go | 2 +- 5 files changed, 39 insertions(+), 39 deletions(-) diff --git a/metrics/api.go b/metrics/api.go index 9221f73..ccf4051 100644 --- a/metrics/api.go +++ b/metrics/api.go @@ -128,7 +128,7 @@ func metricsWriter(ctx context.Context) error { select { case <-ctx.Done(): return nil - case <-ticker.Read(): + case <-ticker.Wait(): err := writeMetricsTo(ctx, pushURL) if err != nil { return err diff --git a/modules/modules.go b/modules/modules.go index 74a04a6..fe10e87 100644 --- a/modules/modules.go +++ b/modules/modules.go @@ -20,8 +20,7 @@ var ( // modulesLocked locks `modules` during starting. modulesLocked = abool.New() - sleepMode = abool.NewBool(false) - taskSchedulerSleepModeExitChannel = make(chan struct{}) + sleepMode = abool.NewBool(false) moduleStartTimeout = 2 * time.Minute moduleStopTimeout = 1 * time.Minute @@ -112,13 +111,13 @@ func (m *Module) Sleep(enable bool) { return } - // Notify all waiting tasks that we are not sleeping anymore. m.Lock() defer m.Unlock() if enable { m.sleepWaitingChannel = make(chan time.Time) } else { + // Notify all waiting tasks that we are not sleeping anymore. close(m.sleepWaitingChannel) } } @@ -129,6 +128,8 @@ func (m *Module) IsSleeping() bool { } // WaitIfSleeping returns channel that will signal when it exits sleep mode. +// The channel will always return a zero-value time.Time. +// It uses time.Time to be easier dropped in to replace a time.Ticker. func (m *Module) WaitIfSleeping() <-chan time.Time { m.RLock() defer m.RUnlock() @@ -136,7 +137,7 @@ func (m *Module) WaitIfSleeping() <-chan time.Time { } // NewSleepyTicker returns new sleepyTicker that will respect the modules sleep mode. -func (m *Module) NewSleepyTicker(normalDuration time.Duration, sleepDuration time.Duration) *SleepyTicker { +func (m *Module) NewSleepyTicker(normalDuration, sleepDuration time.Duration) *SleepyTicker { return newSleepyTicker(m, normalDuration, sleepDuration) } @@ -383,7 +384,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ... Name: name, enabled: abool.NewBool(false), enabledAsDependency: abool.NewBool(false), - sleepMode: abool.NewBool(false), + sleepMode: abool.NewBool(true), sleepWaitingChannel: make(chan time.Time), prepFn: prep, startFn: start, @@ -400,6 +401,7 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ... depNames: dependencies, } + // Sleep mode is disabled by default newModule.Sleep(false) return newModule @@ -425,21 +427,20 @@ func initDependencies() error { return nil } -// EnterSleepMode enables or disables sleep mode for all the modules. -func EnterSleepMode(enabled bool) { - // Check if differs with the old state. - set := sleepMode.SetToIf(!enabled, enabled) - if !set { - return - } - +// SetSleepMode enables or disables sleep mode for all the modules. +func SetSleepMode(enabled bool) { // Update all modules for _, m := range modules { m.Sleep(enabled) } - // Send signal to the task schedular. - if !enabled { - taskSchedulerSleepModeExitChannel <- struct{}{} + // Check if differs with the old state. + set := sleepMode.SetToIf(!enabled, enabled) + if set { + // Send signal to the task schedular. + select { + case notifyTaskScheduler <- struct{}{}: + default: + } } } diff --git a/modules/sleepyticker.go b/modules/sleepyticker.go index d302102..f172a12 100644 --- a/modules/sleepyticker.go +++ b/modules/sleepyticker.go @@ -25,8 +25,8 @@ func newSleepyTicker(module *Module, normalDuration time.Duration, sleepDuration return st } -// Read waits until the module is not in sleep mode and returns time.Ticker.C channel. -func (st *SleepyTicker) Read() <-chan time.Time { +// Wait waits until the module is not in sleep mode and returns time.Ticker.C channel. +func (st *SleepyTicker) Wait() <-chan time.Time { sleepModeEnabled := st.module.sleepMode.IsSet() // Update Sleep mode @@ -35,10 +35,8 @@ func (st *SleepyTicker) Read() <-chan time.Time { } // Wait if until sleep mode exits only if sleepDuration is set to 0. - if sleepModeEnabled { - if st.sleepDuration == 0 { - return st.module.WaitIfSleeping() - } + if sleepModeEnabled && st.sleepDuration == 0 { + return st.module.WaitIfSleeping() } return st.ticker.C @@ -49,12 +47,6 @@ func (st *SleepyTicker) Stop() { st.ticker.Stop() } -// Reset stops a ticker and resets its period to the specified duration. The next tick will arrive after the new period elapses. The duration d must be greater than zero; if not, Reset will panic. -func (st *SleepyTicker) Reset(d time.Duration) { - // Reset standard ticker - st.ticker.Reset(d) -} - func (st *SleepyTicker) enterSleepMode(enabled bool) { st.sleepMode = enabled if enabled { diff --git a/modules/tasks.go b/modules/tasks.go index f27fd95..4358f67 100644 --- a/modules/tasks.go +++ b/modules/tasks.go @@ -51,9 +51,9 @@ var ( waitForever chan time.Time - queueIsFilled = make(chan struct{}, 1) // kick off queue handler - recalculateNextScheduledTask = make(chan struct{}, 1) - taskTimeslot = make(chan struct{}) + queueIsFilled = make(chan struct{}, 1) // kick off queue handler + notifyTaskScheduler = make(chan struct{}, 1) + taskTimeslot = make(chan struct{}) ) const ( @@ -410,7 +410,7 @@ func (t *Task) addToSchedule(overtime bool) { // notify scheduler defer func() { select { - case recalculateNextScheduledTask <- struct{}{}: + case notifyTaskScheduler <- struct{}{}: default: } }() @@ -443,10 +443,6 @@ func (t *Task) addToSchedule(overtime bool) { } func waitUntilNextScheduledTask() <-chan time.Time { - if sleepMode.IsSet() { - <-taskSchedulerSleepModeExitChannel - } - scheduleLock.Lock() defer scheduleLock.Unlock() @@ -519,10 +515,21 @@ func taskScheduleHandler() { } for { + + if sleepMode.IsSet() { + select { + case <-shutdownSignal: + return + case <-notifyTaskScheduler: + continue + } + } + select { case <-shutdownSignal: return - case <-recalculateNextScheduledTask: + case <-notifyTaskScheduler: + continue case <-waitUntilNextScheduledTask(): scheduleLock.Lock() diff --git a/notifications/cleaner.go b/notifications/cleaner.go index b0ac980..6298261 100644 --- a/notifications/cleaner.go +++ b/notifications/cleaner.go @@ -13,7 +13,7 @@ func cleaner(ctx context.Context) error { //nolint:unparam // Conforms to worker select { case <-ctx.Done(): return nil - case <-ticker.Read(): + case <-ticker.Wait(): deleteExpiredNotifs() } }