safing-portmaster/base/utils/call_limiter2.go
2025-04-07 14:22:38 +02:00

74 lines
1.6 KiB
Go

package utils
import (
"sync"
"sync/atomic"
"time"
)
// CallLimiter2 bundles concurrent calls and optionally limits how fast a function is called.
type CallLimiter2 struct {
pause time.Duration
slot atomic.Int64
slotWait sync.RWMutex
executing atomic.Bool
lastExec time.Time
}
// NewCallLimiter2 returns a new call limiter.
// Set minPause to zero to disable the minimum pause between calls.
func NewCallLimiter2(minPause time.Duration) *CallLimiter2 {
return &CallLimiter2{
pause: minPause,
}
}
// Do executes the given function.
// All concurrent calls to Do are bundled and return when f() finishes.
// Waits until the minimum pause is over before executing f() again.
func (l *CallLimiter2) Do(f func()) {
// Get ticket number.
slot := l.slot.Load()
// Check if we can execute.
if l.executing.CompareAndSwap(false, true) {
// Make others wait.
l.slotWait.Lock()
defer l.slotWait.Unlock()
// Execute and return.
l.waitAndExec(f)
return
}
// Wait for slot to end and check if slot is done.
for l.slot.Load() == slot {
time.Sleep(100 * time.Microsecond)
l.slotWait.RLock()
l.slotWait.RUnlock() //nolint:staticcheck
}
}
func (l *CallLimiter2) waitAndExec(f func()) {
defer func() {
// Update last exec time.
l.lastExec = time.Now().UTC()
// Enable next execution first.
l.executing.Store(false)
// Move to next slot aftewards to prevent wait loops.
l.slot.Add(1)
}()
// Wait for the minimum duration between executions.
if l.pause > 0 {
sinceLastExec := time.Since(l.lastExec)
if sinceLastExec < l.pause {
time.Sleep(l.pause - sinceLastExec)
}
}
// Execute.
f()
}