Add call limiter

This commit is contained in:
Daniel 2023-09-28 15:11:59 +02:00
parent 900a654a4d
commit 2ca78b1803
4 changed files with 172 additions and 4 deletions

87
utils/call_limiter.go Normal file
View file

@ -0,0 +1,87 @@
package utils
import (
"sync"
"sync/atomic"
"time"
)
// CallLimiter bundles concurrent calls and optionally limits how fast a function is called.
type CallLimiter struct {
pause time.Duration
inLock sync.Mutex
lastExec time.Time
waiters atomic.Int32
outLock sync.Mutex
}
// NewCallLimiter returns a new call limiter.
// Set minPause to zero to disable the minimum pause between calls.
func NewCallLimiter(minPause time.Duration) *CallLimiter {
return &CallLimiter{
pause: minPause,
}
}
// Do executes the given function.
// All concurrent calls to Do are bundled and return when f() finishes.
// Waits until the minimum pause is over before executing f() again.
func (l *CallLimiter) Do(f func()) {
// Wait for the previous waiters to exit.
l.inLock.Lock()
// Defer final unlock to safeguard from panics.
defer func() {
// Execution is finished - leave.
// If we are the last waiter, let the next batch in.
if l.waiters.Add(-1) == 0 {
l.inLock.Unlock()
}
}()
// Check if we are the first waiter.
if l.waiters.Add(1) == 1 {
// Take the lead on this execution run.
l.lead(f)
} else {
// We are not the first waiter, let others in.
l.inLock.Unlock()
}
// Wait for execution to complete.
l.outLock.Lock()
l.outLock.Unlock() //nolint:staticcheck
// Last statement is in defer above.
}
func (l *CallLimiter) lead(f func()) {
// Make all others wait while we execute the function.
l.outLock.Lock()
// Unlock in lock until execution is finished.
l.inLock.Unlock()
// Transition from out lock to in lock when done.
defer func() {
// Update last execution time.
l.lastExec = time.Now().UTC()
// Stop newcomers from waiting on previous execution.
l.inLock.Lock()
// Allow waiters to leave.
l.outLock.Unlock()
}()
// Wait for the minimum duration between executions.
if l.pause > 0 {
sinceLastExec := time.Since(l.lastExec)
if sinceLastExec < l.pause {
time.Sleep(l.pause - sinceLastExec)
}
}
// Execute.
f()
}

View file

@ -0,0 +1,71 @@
package utils
import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/tevino/abool"
)
func TestCallLimiter(t *testing.T) {
t.Parallel()
pause := 10 * time.Millisecond
oa := NewCallLimiter(pause)
executed := abool.New()
var testWg sync.WaitGroup
// One execution should gobble up the whole batch.
// We are doing this without sleep in function, so dummy exec first to trigger first pause.
oa.Do(func() {})
// Start
for i := 0; i < 10; i++ {
testWg.Add(100)
for i := 0; i < 100; i++ {
go func() {
oa.Do(func() {
if !executed.SetToIf(false, true) {
t.Errorf("concurrent execution!")
}
})
testWg.Done()
}()
}
testWg.Wait()
// Check if function was executed at least once.
if executed.IsNotSet() {
t.Errorf("no execution!")
}
executed.UnSet() // reset check
}
// Wait for pause to reset.
time.Sleep(pause)
// Continuous use with re-execution.
// Choose values so that about 10 executions are expected
var execs uint32
testWg.Add(200)
for i := 0; i < 200; i++ {
go func() {
oa.Do(func() {
atomic.AddUint32(&execs, 1)
time.Sleep(10 * time.Millisecond)
})
testWg.Done()
}()
// Start one goroutine every 1ms.
time.Sleep(1 * time.Millisecond)
}
testWg.Wait()
if execs <= 8 {
t.Errorf("unexpected low exec count: %d", execs)
}
if execs >= 12 {
t.Errorf("unexpected high exec count: %d", execs)
}
}

View file

@ -7,7 +7,13 @@ import (
"sync/atomic"
)
// OnceAgain is an object that will perform only one action "in flight". It's basically the same as sync.Once, but is automatically reused when the function was executed and everyone who waited has left.
// OnceAgain is an object that will perform only one action "in flight". It's
// basically the same as sync.Once, but is automatically reused when the
// function was executed and everyone who waited has left.
// Important: This is somewhat racy when used heavily as it only resets _after_
// everyone who waited has left. So, while some goroutines are waiting to be
// activated again to leave the waiting state, other goroutines will call Do()
// without executing the function again.
type OnceAgain struct {
// done indicates whether the action has been performed.
// It is first in the struct because it is used in the hot path.

View file

@ -16,7 +16,7 @@ func TestOnceAgain(t *testing.T) {
executed := abool.New()
var testWg sync.WaitGroup
// basic
// One execution should gobble up the whole batch.
for i := 0; i < 10; i++ {
testWg.Add(100)
for i := 0; i < 100; i++ {
@ -34,7 +34,8 @@ func TestOnceAgain(t *testing.T) {
executed.UnSet() // reset check
}
// streaming
// Continuous use with re-execution.
// Choose values so that about 10 executions are expected
var execs uint32
testWg.Add(100)
for i := 0; i < 100; i++ {
@ -50,7 +51,10 @@ func TestOnceAgain(t *testing.T) {
}
testWg.Wait()
if execs >= 20 {
if execs <= 8 {
t.Errorf("unexpected low exec count: %d", execs)
}
if execs >= 12 {
t.Errorf("unexpected high exec count: %d", execs)
}
}