mirror of
https://github.com/safing/portmaster
synced 2025-04-22 11:59:09 +00:00
* Move portbase into monorepo * Add new simple module mgr * [WIP] Switch to new simple module mgr * Add StateMgr and more worker variants * [WIP] Switch more modules * [WIP] Switch more modules * [WIP] swtich more modules * [WIP] switch all SPN modules * [WIP] switch all service modules * [WIP] Convert all workers to the new module system * [WIP] add new task system to module manager * [WIP] Add second take for scheduling workers * [WIP] Add FIXME for bugs in new scheduler * [WIP] Add minor improvements to scheduler * [WIP] Add new worker scheduler * [WIP] Fix more bug related to new module system * [WIP] Fix start handing of the new module system * [WIP] Improve startup process * [WIP] Fix minor issues * [WIP] Fix missing subsystem in settings * [WIP] Initialize managers in constructor * [WIP] Move module event initialization to constrictors * [WIP] Fix setting for enabling and disabling the SPN module * [WIP] Move API registeration into module construction * [WIP] Update states mgr for all modules * [WIP] Add CmdLine operation support * Add state helper methods to module group and instance * Add notification and module status handling to status package * Fix starting issues * Remove pilot widget and update security lock to new status data * Remove debug logs * Improve http server shutdown * Add workaround for cleanly shutting down firewall+netquery * Improve logging * Add syncing states with notifications for new module system * Improve starting, stopping, shutdown; resolve FIXMEs/TODOs * [WIP] Fix most unit tests * Review new module system and fix minor issues * Push shutdown and restart events again via API * Set sleep mode via interface * Update example/template module * [WIP] Fix spn/cabin unit test * Remove deprecated UI elements * Make log output more similar for the logging transition phase * Switch spn hub and observer cmds to new module system * Fix log sources * Make worker mgr less error prone * Fix tests and minor issues * Fix observation hub * Improve shutdown and restart handling * Split up big connection.go source file * Move varint and dsd packages to structures repo * Improve expansion test * Fix linter warnings * Fix interception module on windows * Fix linter errors --------- Co-authored-by: Vladimir Stoilov <vladimir@safing.io>
359 lines
9.7 KiB
Go
359 lines
9.7 KiB
Go
package unit
|
|
|
|
import (
|
|
"errors"
|
|
"math"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/tevino/abool"
|
|
|
|
"github.com/safing/portmaster/service/mgr"
|
|
)
|
|
|
|
const (
|
|
defaultSlotDuration = 10 * time.Millisecond // 100 slots per second
|
|
defaultMinSlotPace = 100 // 10 000 pps
|
|
|
|
defaultWorkSlotPercentage = 0.7 // 70%
|
|
defaultSlotChangeRatePerStreak = 0.02 // 2%
|
|
|
|
defaultStatCycleDuration = 1 * time.Minute
|
|
)
|
|
|
|
// Scheduler creates and schedules units.
|
|
// Must be created using NewScheduler().
|
|
type Scheduler struct { //nolint:maligned
|
|
// Configuration.
|
|
config SchedulerConfig
|
|
|
|
// Units IDs Limit / Thresholds.
|
|
|
|
// currentUnitID holds the last assigned Unit ID.
|
|
currentUnitID atomic.Int64
|
|
// clearanceUpTo holds the current threshold up to which Unit ID Units may be processed.
|
|
clearanceUpTo atomic.Int64
|
|
// slotPace holds the current pace. This is the base value for clearance
|
|
// calculation, not the value of the current cleared Units itself.
|
|
slotPace atomic.Int64
|
|
// finished holds the amount of units that were finished within the current slot.
|
|
finished atomic.Int64
|
|
|
|
// Slot management.
|
|
slotSignalA chan struct{}
|
|
slotSignalB chan struct{}
|
|
slotSignalSwitch bool
|
|
slotSignalsLock sync.RWMutex
|
|
|
|
stopping abool.AtomicBool
|
|
unitDebugger *UnitDebugger
|
|
|
|
// Stats.
|
|
stats struct {
|
|
// Working Values.
|
|
progress struct {
|
|
maxPace atomic.Int64
|
|
maxLeveledPace atomic.Int64
|
|
avgPaceSum atomic.Int64
|
|
avgPaceCnt atomic.Int64
|
|
avgUnitLifeSum atomic.Int64
|
|
avgUnitLifeCnt atomic.Int64
|
|
avgWorkSlotSum atomic.Int64
|
|
avgWorkSlotCnt atomic.Int64
|
|
avgCatchUpSlotSum atomic.Int64
|
|
avgCatchUpSlotCnt atomic.Int64
|
|
}
|
|
|
|
// Calculated Values.
|
|
current struct {
|
|
maxPace atomic.Int64
|
|
maxLeveledPace atomic.Int64
|
|
avgPace atomic.Int64
|
|
avgUnitLife atomic.Int64
|
|
avgWorkSlot atomic.Int64
|
|
avgCatchUpSlot atomic.Int64
|
|
}
|
|
}
|
|
}
|
|
|
|
// SchedulerConfig holds scheduler configuration.
|
|
type SchedulerConfig struct {
|
|
// SlotDuration defines the duration of one slot.
|
|
SlotDuration time.Duration
|
|
|
|
// MinSlotPace defines the minimum slot pace.
|
|
// The slot pace will never fall below this value.
|
|
MinSlotPace int64
|
|
|
|
// WorkSlotPercentage defines the how much of a slot should be scheduled with work.
|
|
// The remainder is for catching up and breathing room for other tasks.
|
|
// Must be between 55% (0.55) and 95% (0.95).
|
|
// The default value is 0.7 (70%).
|
|
WorkSlotPercentage float64
|
|
|
|
// SlotChangeRatePerStreak defines how many percent (0-1) the slot pace
|
|
// should change per streak.
|
|
// Is enforced to be able to change the minimum slot pace by at least 1.
|
|
// The default value is 0.02 (2%).
|
|
SlotChangeRatePerStreak float64
|
|
|
|
// StatCycleDuration defines how often stats are calculated.
|
|
// The default value is 1 minute.
|
|
StatCycleDuration time.Duration
|
|
}
|
|
|
|
// NewScheduler returns a new scheduler.
|
|
func NewScheduler(config *SchedulerConfig) *Scheduler {
|
|
// Fallback to empty config if none is given.
|
|
if config == nil {
|
|
config = &SchedulerConfig{}
|
|
}
|
|
|
|
// Create new scheduler.
|
|
s := &Scheduler{
|
|
config: *config,
|
|
slotSignalA: make(chan struct{}),
|
|
slotSignalB: make(chan struct{}),
|
|
}
|
|
|
|
// Fill in defaults.
|
|
if s.config.SlotDuration == 0 {
|
|
s.config.SlotDuration = defaultSlotDuration
|
|
}
|
|
if s.config.MinSlotPace == 0 {
|
|
s.config.MinSlotPace = defaultMinSlotPace
|
|
}
|
|
if s.config.WorkSlotPercentage == 0 {
|
|
s.config.WorkSlotPercentage = defaultWorkSlotPercentage
|
|
}
|
|
if s.config.SlotChangeRatePerStreak == 0 {
|
|
s.config.SlotChangeRatePerStreak = defaultSlotChangeRatePerStreak
|
|
}
|
|
if s.config.StatCycleDuration == 0 {
|
|
s.config.StatCycleDuration = defaultStatCycleDuration
|
|
}
|
|
|
|
// Check boundaries of WorkSlotPercentage.
|
|
switch {
|
|
case s.config.WorkSlotPercentage < 0.55:
|
|
s.config.WorkSlotPercentage = 0.55
|
|
case s.config.WorkSlotPercentage > 0.95:
|
|
s.config.WorkSlotPercentage = 0.95
|
|
}
|
|
|
|
// The slot change rate must be able to change the slot pace by at least 1.
|
|
if s.config.SlotChangeRatePerStreak < (1 / float64(s.config.MinSlotPace)) {
|
|
s.config.SlotChangeRatePerStreak = (1 / float64(s.config.MinSlotPace))
|
|
|
|
// Debug logging:
|
|
// fmt.Printf("--- increased SlotChangeRatePerStreak to %f\n", s.config.SlotChangeRatePerStreak)
|
|
}
|
|
|
|
// Initialize scheduler fields.
|
|
s.clearanceUpTo.Store(s.config.MinSlotPace)
|
|
s.slotPace.Store(s.config.MinSlotPace)
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *Scheduler) nextSlotSignal() chan struct{} {
|
|
s.slotSignalsLock.RLock()
|
|
defer s.slotSignalsLock.RUnlock()
|
|
|
|
if s.slotSignalSwitch {
|
|
return s.slotSignalA
|
|
}
|
|
return s.slotSignalB
|
|
}
|
|
|
|
func (s *Scheduler) announceNextSlot() {
|
|
s.slotSignalsLock.Lock()
|
|
defer s.slotSignalsLock.Unlock()
|
|
|
|
// Close new slot signal and refresh previous one.
|
|
if s.slotSignalSwitch {
|
|
close(s.slotSignalA)
|
|
s.slotSignalB = make(chan struct{})
|
|
} else {
|
|
close(s.slotSignalB)
|
|
s.slotSignalA = make(chan struct{})
|
|
}
|
|
|
|
// Switch to next slot.
|
|
s.slotSignalSwitch = !s.slotSignalSwitch
|
|
}
|
|
|
|
// SlotScheduler manages the slot and schedules units.
|
|
// Must only be started once.
|
|
func (s *Scheduler) SlotScheduler(ctx *mgr.WorkerCtx) error {
|
|
// Start slot ticker.
|
|
ticker := time.NewTicker(s.config.SlotDuration / 2)
|
|
defer ticker.Stop()
|
|
|
|
// Give clearance to all when stopping.
|
|
defer s.clearanceUpTo.Store(math.MaxInt64 - math.MaxInt32)
|
|
|
|
var (
|
|
halfSlotID uint64
|
|
halfSlotStartedAt = time.Now()
|
|
halfSlotEndedAt time.Time
|
|
halfSlotDuration = float64(s.config.SlotDuration / 2)
|
|
|
|
increaseStreak float64
|
|
decreaseStreak float64
|
|
oneStreaks int
|
|
|
|
cycleStatsAt = uint64(s.config.StatCycleDuration / (s.config.SlotDuration / 2))
|
|
)
|
|
|
|
for range ticker.C {
|
|
halfSlotEndedAt = time.Now()
|
|
|
|
switch {
|
|
case halfSlotID%2 == 0:
|
|
|
|
// First Half-Slot: Work Slot
|
|
|
|
// Calculate time taken in previous slot.
|
|
catchUpSlotDuration := halfSlotEndedAt.Sub(halfSlotStartedAt).Nanoseconds()
|
|
|
|
// Add current slot duration to avg calculation.
|
|
s.stats.progress.avgCatchUpSlotCnt.Add(1)
|
|
if s.stats.progress.avgCatchUpSlotSum.Add(catchUpSlotDuration) < 0 {
|
|
// Reset if we wrap.
|
|
s.stats.progress.avgCatchUpSlotCnt.Store(1)
|
|
s.stats.progress.avgCatchUpSlotSum.Store(catchUpSlotDuration)
|
|
}
|
|
|
|
// Reset slot counters.
|
|
s.finished.Store(0)
|
|
|
|
// Raise clearance according
|
|
s.clearanceUpTo.Store(
|
|
s.currentUnitID.Load() +
|
|
int64(
|
|
float64(s.slotPace.Load())*s.config.WorkSlotPercentage,
|
|
),
|
|
)
|
|
|
|
// Announce start of new slot.
|
|
s.announceNextSlot()
|
|
|
|
default:
|
|
|
|
// Second Half-Slot: Catch-Up Slot
|
|
|
|
// Calculate time taken in previous slot.
|
|
workSlotDuration := halfSlotEndedAt.Sub(halfSlotStartedAt).Nanoseconds()
|
|
|
|
// Add current slot duration to avg calculation.
|
|
s.stats.progress.avgWorkSlotCnt.Add(1)
|
|
if s.stats.progress.avgWorkSlotSum.Add(workSlotDuration) < 0 {
|
|
// Reset if we wrap.
|
|
s.stats.progress.avgWorkSlotCnt.Store(1)
|
|
s.stats.progress.avgWorkSlotSum.Store(workSlotDuration)
|
|
}
|
|
|
|
// Calculate slot duration skew correction, as slots will not run in the
|
|
// exact specified duration.
|
|
slotDurationSkewCorrection := halfSlotDuration / float64(workSlotDuration)
|
|
|
|
// Calculate slot pace with performance of first half-slot.
|
|
// Get current slot pace as float64.
|
|
currentSlotPace := float64(s.slotPace.Load())
|
|
// Calculate current raw slot pace.
|
|
newRawSlotPace := float64(s.finished.Load()*2) * slotDurationSkewCorrection
|
|
|
|
// Move slot pace in the trending direction.
|
|
if newRawSlotPace >= currentSlotPace {
|
|
// Adjust based on streak.
|
|
increaseStreak++
|
|
decreaseStreak = 0
|
|
s.slotPace.Add(int64(
|
|
currentSlotPace * s.config.SlotChangeRatePerStreak * increaseStreak,
|
|
))
|
|
|
|
// Count one-streaks.
|
|
if increaseStreak == 1 {
|
|
oneStreaks++
|
|
} else {
|
|
oneStreaks = 0
|
|
}
|
|
|
|
// Debug logging:
|
|
// fmt.Printf("+++ slot pace: %.0f (current raw pace: %.0f, increaseStreak: %.0f, clearanceUpTo: %d)\n", currentSlotPace, newRawSlotPace, increaseStreak, s.clearanceUpTo.Load())
|
|
} else {
|
|
// Adjust based on streak.
|
|
decreaseStreak++
|
|
increaseStreak = 0
|
|
s.slotPace.Add(int64(
|
|
-currentSlotPace * s.config.SlotChangeRatePerStreak * decreaseStreak,
|
|
))
|
|
|
|
// Enforce minimum.
|
|
if s.slotPace.Load() < s.config.MinSlotPace {
|
|
s.slotPace.Store(s.config.MinSlotPace)
|
|
decreaseStreak = 0
|
|
}
|
|
|
|
// Count one-streaks.
|
|
if decreaseStreak == 1 {
|
|
oneStreaks++
|
|
} else {
|
|
oneStreaks = 0
|
|
}
|
|
|
|
// Debug logging:
|
|
// fmt.Printf("--- slot pace: %.0f (current raw pace: %.0f, decreaseStreak: %.0f, clearanceUpTo: %d)\n", currentSlotPace, newRawSlotPace, decreaseStreak, s.clearanceUpTo.Load())
|
|
}
|
|
|
|
// Record Stats
|
|
|
|
// Add current pace to avg calculation.
|
|
s.stats.progress.avgPaceCnt.Add(1)
|
|
if s.stats.progress.avgPaceSum.Add(s.slotPace.Load()) < 0 {
|
|
// Reset if we wrap.
|
|
s.stats.progress.avgPaceCnt.Store(1)
|
|
s.stats.progress.avgPaceSum.Store(s.slotPace.Load())
|
|
}
|
|
|
|
// Check if current pace is new max.
|
|
if s.slotPace.Load() > s.stats.progress.maxPace.Load() {
|
|
s.stats.progress.maxPace.Store(s.slotPace.Load())
|
|
}
|
|
|
|
// Check if current pace is new leveled max
|
|
if oneStreaks >= 3 && s.slotPace.Load() > s.stats.progress.maxLeveledPace.Load() {
|
|
s.stats.progress.maxLeveledPace.Store(s.slotPace.Load())
|
|
}
|
|
}
|
|
// Switch to other slot-half.
|
|
halfSlotID++
|
|
halfSlotStartedAt = halfSlotEndedAt
|
|
|
|
// Cycle stats after defined time period.
|
|
if halfSlotID%cycleStatsAt == 0 {
|
|
s.cycleStats()
|
|
}
|
|
|
|
// Check if we are stopping.
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
default:
|
|
}
|
|
if s.stopping.IsSet() {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// We should never get here.
|
|
// If we do, trigger a worker restart via the service worker.
|
|
return errors.New("unexpected end of scheduler")
|
|
}
|
|
|
|
// Stop stops the scheduler and gives clearance to all units.
|
|
func (s *Scheduler) Stop() {
|
|
s.stopping.Set()
|
|
}
|