mirror of
https://github.com/safing/portbase
synced 2025-09-01 10:09:50 +00:00
commit
592e3ee00d
2 changed files with 133 additions and 0 deletions
78
utils/onceagain.go
Normal file
78
utils/onceagain.go
Normal file
|
@ -0,0 +1,78 @@
|
|||
package utils
|
||||
|
||||
// This file is forked from https://github.com/golang/go/blob/bc593eac2dc63d979a575eccb16c7369a5ff81e0/src/sync/once.go.
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// OnceAgain is an object that will perform only one action "in flight". It's basically the same as sync.Once, but is automatically reused when the function was executed and everyone who waited has left.
|
||||
type OnceAgain struct {
|
||||
// done indicates whether the action has been performed.
|
||||
// It is first in the struct because it is used in the hot path.
|
||||
// The hot path is inlined at every call site.
|
||||
// Placing done first allows more compact instructions on some architectures (amd64/x86),
|
||||
// and fewer instructions (to calculate offset) on other architectures.
|
||||
done uint32
|
||||
|
||||
// Number of waiters waiting for the function to finish. The last waiter resets done.
|
||||
waiters int32
|
||||
|
||||
m sync.Mutex
|
||||
}
|
||||
|
||||
// Do calls the function f if and only if Do is being called for the
|
||||
// first time for this instance of Once. In other words, given
|
||||
// var once Once
|
||||
// if once.Do(f) is called multiple times, only the first call will invoke f,
|
||||
// even if f has a different value in each invocation. A new instance of
|
||||
// Once is required for each function to execute.
|
||||
//
|
||||
// Do is intended for initialization that must be run exactly once. Since f
|
||||
// is niladic, it may be necessary to use a function literal to capture the
|
||||
// arguments to a function to be invoked by Do:
|
||||
// config.once.Do(func() { config.init(filename) })
|
||||
//
|
||||
// Because no call to Do returns until the one call to f returns, if f causes
|
||||
// Do to be called, it will deadlock.
|
||||
//
|
||||
// If f panics, Do considers it to have returned; future calls of Do return
|
||||
// without calling f.
|
||||
//
|
||||
func (o *OnceAgain) Do(f func()) {
|
||||
// Note: Here is an incorrect implementation of Do:
|
||||
//
|
||||
// if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
|
||||
// f()
|
||||
// }
|
||||
//
|
||||
// Do guarantees that when it returns, f has finished.
|
||||
// This implementation would not implement that guarantee:
|
||||
// given two simultaneous calls, the winner of the cas would
|
||||
// call f, and the second would return immediately, without
|
||||
// waiting for the first's call to f to complete.
|
||||
// This is why the slow path falls back to a mutex, and why
|
||||
// the atomic.StoreUint32 must be delayed until after f returns.
|
||||
|
||||
if atomic.LoadUint32(&o.done) == 0 {
|
||||
// Outlined slow-path to allow inlining of the fast-path.
|
||||
o.doSlow(f)
|
||||
}
|
||||
}
|
||||
|
||||
func (o *OnceAgain) doSlow(f func()) {
|
||||
atomic.AddInt32(&o.waiters, 1)
|
||||
defer func() {
|
||||
if atomic.AddInt32(&o.waiters, -1) == 0 {
|
||||
atomic.StoreUint32(&o.done, 0) // reset
|
||||
}
|
||||
}()
|
||||
|
||||
o.m.Lock()
|
||||
defer o.m.Unlock()
|
||||
if o.done == 0 {
|
||||
defer atomic.StoreUint32(&o.done, 1)
|
||||
f()
|
||||
}
|
||||
}
|
55
utils/onceagain_test.go
Normal file
55
utils/onceagain_test.go
Normal file
|
@ -0,0 +1,55 @@
|
|||
package utils
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tevino/abool"
|
||||
)
|
||||
|
||||
func TestOnceAgain(t *testing.T) {
|
||||
|
||||
oa := OnceAgain{}
|
||||
executed := abool.New()
|
||||
var testWg sync.WaitGroup
|
||||
|
||||
// basic
|
||||
for i := 0; i < 10; i++ {
|
||||
testWg.Add(100)
|
||||
for i := 0; i < 100; i++ {
|
||||
go func() {
|
||||
oa.Do(func() {
|
||||
if !executed.SetToIf(false, true) {
|
||||
t.Errorf("concurrent execution!")
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
})
|
||||
testWg.Done()
|
||||
}()
|
||||
}
|
||||
testWg.Wait()
|
||||
executed.UnSet() // reset check
|
||||
}
|
||||
|
||||
// streaming
|
||||
var execs uint32
|
||||
testWg.Add(100)
|
||||
for i := 0; i < 100; i++ {
|
||||
go func() {
|
||||
oa.Do(func() {
|
||||
atomic.AddUint32(&execs, 1)
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
})
|
||||
testWg.Done()
|
||||
}()
|
||||
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
|
||||
testWg.Wait()
|
||||
if execs >= 20 {
|
||||
t.Errorf("unexpected high exec count: %d", execs)
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue