diff --git a/utils/call_limiter.go b/utils/call_limiter.go new file mode 100644 index 0000000..eb669a2 --- /dev/null +++ b/utils/call_limiter.go @@ -0,0 +1,87 @@ +package utils + +import ( + "sync" + "sync/atomic" + "time" +) + +// CallLimiter bundles concurrent calls and optionally limits how fast a function is called. +type CallLimiter struct { + pause time.Duration + + inLock sync.Mutex + lastExec time.Time + + waiters atomic.Int32 + outLock sync.Mutex +} + +// NewCallLimiter returns a new call limiter. +// Set minPause to zero to disable the minimum pause between calls. +func NewCallLimiter(minPause time.Duration) *CallLimiter { + return &CallLimiter{ + pause: minPause, + } +} + +// Do executes the given function. +// All concurrent calls to Do are bundled and return when f() finishes. +// Waits until the minimum pause is over before executing f() again. +func (l *CallLimiter) Do(f func()) { + // Wait for the previous waiters to exit. + l.inLock.Lock() + + // Defer final unlock to safeguard from panics. + defer func() { + // Execution is finished - leave. + // If we are the last waiter, let the next batch in. + if l.waiters.Add(-1) == 0 { + l.inLock.Unlock() + } + }() + + // Check if we are the first waiter. + if l.waiters.Add(1) == 1 { + // Take the lead on this execution run. + l.lead(f) + } else { + // We are not the first waiter, let others in. + l.inLock.Unlock() + } + + // Wait for execution to complete. + l.outLock.Lock() + l.outLock.Unlock() //nolint:staticcheck + + // Last statement is in defer above. +} + +func (l *CallLimiter) lead(f func()) { + // Make all others wait while we execute the function. + l.outLock.Lock() + + // Unlock in lock until execution is finished. + l.inLock.Unlock() + + // Transition from out lock to in lock when done. + defer func() { + // Update last execution time. + l.lastExec = time.Now().UTC() + // Stop newcomers from waiting on previous execution. + l.inLock.Lock() + // Allow waiters to leave. + l.outLock.Unlock() + }() + + // Wait for the minimum duration between executions. + if l.pause > 0 { + sinceLastExec := time.Since(l.lastExec) + if sinceLastExec < l.pause { + time.Sleep(l.pause - sinceLastExec) + } + } + + // Execute. + f() +} diff --git a/utils/call_limiter_test.go b/utils/call_limiter_test.go new file mode 100644 index 0000000..1dd699e --- /dev/null +++ b/utils/call_limiter_test.go @@ -0,0 +1,71 @@ +package utils + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/tevino/abool" +) + +func TestCallLimiter(t *testing.T) { + t.Parallel() + + pause := 10 * time.Millisecond + oa := NewCallLimiter(pause) + executed := abool.New() + var testWg sync.WaitGroup + + // One execution should gobble up the whole batch. + // We are doing this without sleep in function, so dummy exec first to trigger first pause. + oa.Do(func() {}) + // Start + for i := 0; i < 10; i++ { + testWg.Add(100) + for i := 0; i < 100; i++ { + go func() { + oa.Do(func() { + if !executed.SetToIf(false, true) { + t.Errorf("concurrent execution!") + } + }) + testWg.Done() + }() + } + testWg.Wait() + // Check if function was executed at least once. + if executed.IsNotSet() { + t.Errorf("no execution!") + } + executed.UnSet() // reset check + } + + // Wait for pause to reset. + time.Sleep(pause) + + // Continuous use with re-execution. + // Choose values so that about 10 executions are expected + var execs uint32 + testWg.Add(200) + for i := 0; i < 200; i++ { + go func() { + oa.Do(func() { + atomic.AddUint32(&execs, 1) + time.Sleep(10 * time.Millisecond) + }) + testWg.Done() + }() + + // Start one goroutine every 1ms. + time.Sleep(1 * time.Millisecond) + } + + testWg.Wait() + if execs <= 8 { + t.Errorf("unexpected low exec count: %d", execs) + } + if execs >= 12 { + t.Errorf("unexpected high exec count: %d", execs) + } +} diff --git a/utils/onceagain.go b/utils/onceagain.go index 78802b1..3c4af5c 100644 --- a/utils/onceagain.go +++ b/utils/onceagain.go @@ -7,7 +7,13 @@ import ( "sync/atomic" ) -// OnceAgain is an object that will perform only one action "in flight". It's basically the same as sync.Once, but is automatically reused when the function was executed and everyone who waited has left. +// OnceAgain is an object that will perform only one action "in flight". It's +// basically the same as sync.Once, but is automatically reused when the +// function was executed and everyone who waited has left. +// Important: This is somewhat racy when used heavily as it only resets _after_ +// everyone who waited has left. So, while some goroutines are waiting to be +// activated again to leave the waiting state, other goroutines will call Do() +// without executing the function again. type OnceAgain struct { // done indicates whether the action has been performed. // It is first in the struct because it is used in the hot path. diff --git a/utils/onceagain_test.go b/utils/onceagain_test.go index 0863d27..5d4e4aa 100644 --- a/utils/onceagain_test.go +++ b/utils/onceagain_test.go @@ -16,7 +16,7 @@ func TestOnceAgain(t *testing.T) { executed := abool.New() var testWg sync.WaitGroup - // basic + // One execution should gobble up the whole batch. for i := 0; i < 10; i++ { testWg.Add(100) for i := 0; i < 100; i++ { @@ -34,7 +34,8 @@ func TestOnceAgain(t *testing.T) { executed.UnSet() // reset check } - // streaming + // Continuous use with re-execution. + // Choose values so that about 10 executions are expected var execs uint32 testWg.Add(100) for i := 0; i < 100; i++ { @@ -50,7 +51,10 @@ func TestOnceAgain(t *testing.T) { } testWg.Wait() - if execs >= 20 { + if execs <= 8 { + t.Errorf("unexpected low exec count: %d", execs) + } + if execs >= 12 { t.Errorf("unexpected high exec count: %d", execs) } }