Add StablePool

This commit is contained in:
Daniel 2020-06-11 16:41:49 +02:00
parent 3c7af2102d
commit 6bf91e1232
3 changed files with 261 additions and 0 deletions

25
Gopkg.lock generated
View file

@ -156,6 +156,14 @@
revision = "614d223910a179a466c1767a985424175c39b465"
version = "v0.9.1"
[[projects]]
digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411"
name = "github.com/pmezard/go-difflib"
packages = ["difflib"]
pruneopts = ""
revision = "792786c7400a136282c1664665ae0a8db921c6c2"
version = "v1.0.0"
[[projects]]
digest = "1:90da107a52bdacf25384ae6fc2889df9cf3c956ecde1561825b13ba70a8283b5"
name = "github.com/seehuhn/fortuna"
@ -203,6 +211,14 @@
revision = "2e9d26c8c37aae03e3f9d4e90b7116f5accb7cab"
version = "v1.0.5"
[[projects]]
digest = "1:83fd2513b9f6ae0997bf646db6b74e9e00131e31002116fda597175f25add42d"
name = "github.com/stretchr/testify"
packages = ["assert"]
pruneopts = ""
revision = "f654a9112bbeac49ca2cd45bfbe11533c4666cf8"
version = "v1.6.1"
[[projects]]
branch = "master"
digest = "1:86e6712cfd4070a2120c03fcec41cfcbbc51813504a74e28d74479edfaf669ee"
@ -310,6 +326,14 @@
revision = "d165be301fb1e13390ad453281ded24385fd8ebc"
version = "v1.23.0"
[[projects]]
branch = "v3"
digest = "1:0aa137e32b369fbb8c0f4d579e653c72e3431b6cc8cb3c19d6a21a14209031fd"
name = "gopkg.in/yaml.v3"
packages = ["."]
pruneopts = ""
revision = "a5ece683394c3b88d90572e44d36c93aea492c2c"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
@ -326,6 +350,7 @@
"github.com/seehuhn/fortuna",
"github.com/shirou/gopsutil/host",
"github.com/spf13/cobra",
"github.com/stretchr/testify/assert",
"github.com/tevino/abool",
"github.com/tidwall/gjson",
"github.com/tidwall/sjson",

121
utils/stablepool.go Normal file
View file

@ -0,0 +1,121 @@
package utils
import "sync"
// This file is forked from https://github.com/golang/go/blob/bc593eac2dc63d979a575eccb16c7369a5ff81e0/src/sync/once.go.
// A StablePool is a drop-in replacement for sync.Pool that is slower, but
// predictable.
// A StablePool is a set of temporary objects that may be individually saved and
// retrieved.
//
//
// In contrast to sync.Pool, items are not removed automatically. Every item
// will be returned at some point. Items are returned in a FIFO manner in order
// to evenly distribute usage of a set of items.
//
// A StablePool is safe for use by multiple goroutines simultaneously and must
// not be copied after first use.
type StablePool struct {
lock sync.Mutex
pool []interface{}
cnt int
getIndex int
putIndex int
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
// Put adds x to the pool.
func (p *StablePool) Put(x interface{}) {
if x == nil {
return
}
p.lock.Lock()
defer p.lock.Unlock()
// check if pool is full (or unitialized)
if p.cnt == len(p.pool) {
p.pool = append(p.pool, x)
p.cnt++
p.putIndex = p.cnt
return
}
// correct putIndex
p.putIndex %= len(p.pool)
// iterate the whole pool once to find a free spot
stopAt := p.putIndex - 1
for i := p.putIndex; i != stopAt; i = (i + 1) % len(p.pool) {
if p.pool[i] == nil {
p.pool[i] = x
p.cnt++
p.putIndex = i + 1
return
}
}
}
// Get returns the next item from the Pool, removes it from the Pool, and
// returns it to the caller.
// In contrast to sync.Pool, Get never ignores the pool.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *StablePool) Get() interface{} {
p.lock.Lock()
defer p.lock.Unlock()
// check if pool is empty
if p.cnt == 0 {
if p.New != nil {
return p.New()
}
return nil
}
// correct getIndex
p.getIndex %= len(p.pool)
// iterate the whole pool to find an item
stopAt := p.getIndex - 1
for i := p.getIndex; i != stopAt; i = (i + 1) % len(p.pool) {
if p.pool[i] != nil {
x := p.pool[i]
p.pool[i] = nil
p.cnt--
p.getIndex = i + 1
return x
}
}
// if we ever get here, return a new item
if p.New != nil {
return p.New()
}
return nil
}
// Cnt returns the amount of items the pool currently holds.
func (p *StablePool) Cnt() int {
p.lock.Lock()
defer p.lock.Unlock()
return p.cnt
}
// Max returns the amount of items the pool held at maximum.
func (p *StablePool) Max() int {
p.lock.Lock()
defer p.lock.Unlock()
return len(p.pool)
}

115
utils/stablepool_test.go Normal file
View file

@ -0,0 +1,115 @@
package utils
import (
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestStablePool(t *testing.T) {
// "real world" simulation
cnt := 0
testPool := &StablePool{
New: func() interface{} {
cnt++
return cnt
},
}
var testWg sync.WaitGroup
var testWorkerWg sync.WaitGroup
// for i := 0; i < 100; i++ {
// cnt++
// testPool.Put(cnt)
// }
for i := 0; i < 100; i++ {
// block round
testWg.Add(1)
// add workers
testWorkerWg.Add(100)
for j := 0; j < 100; j++ {
k := j
go func() {
// wait for round to start
testWg.Wait()
// get value
x := testPool.Get()
// fmt.Println(x)
// "work"
time.Sleep(5 * time.Microsecond)
// re-insert 99%
if k%100 > 0 {
testPool.Put(x)
}
// mark as finished
testWorkerWg.Done()
}()
}
// start round
testWg.Done()
// wait for round to finish
testWorkerWg.Wait()
}
t.Logf("real world simulation: cnt=%d p.cnt=%d p.max=%d\n", cnt, testPool.Cnt(), testPool.Max())
assert.GreaterOrEqual(t, 200, cnt, "should not use more than 200 values")
assert.GreaterOrEqual(t, 100, testPool.Max(), "pool should have at most this max size")
// optimal usage test
optPool := &StablePool{}
for i := 0; i < 1000; i++ {
for j := 0; j < 100; j++ {
optPool.Put(j)
}
for k := 0; k < 100; k++ {
assert.Equal(t, k, optPool.Get(), "should match")
}
}
assert.Equal(t, 100, optPool.Max(), "pool should have exactly this max size")
// fuzzing test
fuzzPool := &StablePool{}
var fuzzWg sync.WaitGroup
var fuzzWorkerWg sync.WaitGroup
// start goroutines and wait
fuzzWg.Add(1)
for i := 0; i < 1000; i++ {
fuzzWorkerWg.Add(2)
j := i
go func() {
fuzzWg.Wait()
fuzzPool.Put(j)
fuzzWorkerWg.Done()
}()
go func() {
fuzzWg.Wait()
fmt.Print(fuzzPool.Get())
fuzzWorkerWg.Done()
}()
}
// kick off
fuzzWg.Done()
// wait for all to finish
fuzzWorkerWg.Wait()
// try to break it
breakPool := &StablePool{}
for i := 0; i < 10; i++ {
for j := 0; j < 100; j++ {
breakPool.Put(nil)
breakPool.Put(j)
breakPool.Put(nil)
}
for k := 0; k < 100; k++ {
assert.Equal(t, k, breakPool.Get(), "should match")
}
}
}