Merge pull request #89 from safing/feature/database-writecache

Add a write-cache to the database interface options
This commit is contained in:
Daniel 2020-10-13 15:31:55 +02:00 committed by GitHub
commit 85e985c493
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 577 additions and 128 deletions

View file

@ -22,6 +22,26 @@ import (
_ "github.com/safing/portbase/database/storage/hashmap"
)
func TestMain(m *testing.M) {
testDir, err := ioutil.TempDir("", "portbase-database-testing-")
if err != nil {
panic(err)
}
err = InitializeWithPath(testDir)
if err != nil {
panic(err)
}
exitCode := m.Run()
// Clean up the test directory.
// Do not defer, as we end this function with a os.Exit call.
os.RemoveAll(testDir)
os.Exit(exitCode)
}
func makeKey(dbName, key string) string {
return fmt.Sprintf("%s:%s", dbName, key)
}
@ -220,24 +240,18 @@ func testDatabase(t *testing.T, storageType string, shadowDelete bool) { //nolin
func TestDatabaseSystem(t *testing.T) {
// panic after 10 seconds, to check for locks
finished := make(chan struct{})
defer close(finished)
go func() {
time.Sleep(10 * time.Second)
fmt.Println("===== TAKING TOO LONG - PRINTING STACK TRACES =====")
_ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
os.Exit(1)
select {
case <-finished:
case <-time.After(10 * time.Second):
fmt.Println("===== TAKING TOO LONG - PRINTING STACK TRACES =====")
_ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
os.Exit(1)
}
}()
testDir, err := ioutil.TempDir("", "portbase-database-testing-")
if err != nil {
t.Fatal(err)
}
err = InitializeWithPath(testDir)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(testDir) // clean up
for _, shadowDelete := range []bool{false, true} {
testDatabase(t, "bbolt", shadowDelete)
testDatabase(t, "hashmap", shadowDelete)
@ -246,7 +260,7 @@ func TestDatabaseSystem(t *testing.T) {
// TODO: Fix badger tests
}
err = MaintainRecordStates(context.TODO())
err := MaintainRecordStates(context.TODO())
if err != nil {
t.Fatal(err)
}

View file

@ -4,11 +4,11 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/tevino/abool"
"github.com/bluele/gcache"
"github.com/tevino/abool"
"github.com/safing/portbase/database/accessor"
"github.com/safing/portbase/database/iterator"
@ -24,17 +24,56 @@ const (
type Interface struct {
options *Options
cache gcache.Cache
writeCache map[string]record.Record
writeCacheLock sync.Mutex
triggerCacheWrite chan struct{}
}
// Options holds options that may be set for an Interface instance.
type Options struct {
Local bool
Internal bool
AlwaysMakeSecret bool
AlwaysMakeCrownjewel bool
// Local specifies if the interface is used by an actor on the local device.
// Setting both the Local and Internal flags will bring performance
// improvements because less checks are needed.
Local bool
// Internal specifies if the interface is used by an actor within the
// software. Setting both the Local and Internal flags will bring performance
// improvements because less checks are needed.
Internal bool
// AlwaysMakeSecret will have the interface mark all saved records as secret.
// This means that they will be only accessible by an internal interface.
AlwaysMakeSecret bool
// AlwaysMakeCrownjewel will have the interface mark all saved records as
// crown jewels. This means that they will be only accessible by a local
// interface.
AlwaysMakeCrownjewel bool
// AlwaysSetRelativateExpiry will have the interface set a relative expiry,
// based on the current time, on all saved records.
AlwaysSetRelativateExpiry int64
AlwaysSetAbsoluteExpiry int64
CacheSize int
// AlwaysSetAbsoluteExpiry will have the interface set an absolute expiry on
// all saved records.
AlwaysSetAbsoluteExpiry int64
// CacheSize defines that a cache should be used for this interface and
// defines it's size.
// Caching comes with an important caveat: If database records are changed
// from another interface, the cache will not be invalidated for these
// records. It will therefore serve outdated data until that record is
// evicted from the cache.
CacheSize int
// DelayCachedWrites defines a database name for which cache writes should
// be cached and batched. The database backend must support the Batcher
// interface. This option is only valid if used with a cache.
// Additionally, this may only be used for internal and local interfaces.
// Please note that this means that other interfaces will not be able to
// guarantee to serve the latest record if records are written this way.
DelayCachedWrites string
}
// Apply applies options to the record metadata.
@ -53,6 +92,28 @@ func (o *Options) Apply(r record.Record) {
}
}
// HasAllPermissions returns whether the options specify the highest possible
// permissions for operations.
func (o *Options) HasAllPermissions() bool {
return o.Local && o.Internal
}
// hasAccessPermission checks if the interface options permit access to the
// given record, locking the record for accessing it's attributes.
func (o *Options) hasAccessPermission(r record.Record) bool {
// Check if the options specify all permissions, which makes checking the
// record unnecessary.
if o.HasAllPermissions() {
return true
}
r.Lock()
defer r.Unlock()
// Check permissions against record.
return r.Meta().CheckPermission(o.Local, o.Internal)
}
// NewInterface returns a new Interface to the database.
func NewInterface(opts *Options) *Interface {
if opts == nil {
@ -63,57 +124,40 @@ func NewInterface(opts *Options) *Interface {
options: opts,
}
if opts.CacheSize > 0 {
new.cache = gcache.New(opts.CacheSize).ARC().Expiration(time.Hour).Build()
cacheBuilder := gcache.New(opts.CacheSize).ARC()
if opts.DelayCachedWrites != "" {
cacheBuilder.EvictedFunc(new.cacheEvictHandler)
new.writeCache = make(map[string]record.Record, opts.CacheSize/2)
new.triggerCacheWrite = make(chan struct{})
}
new.cache = cacheBuilder.Build()
}
return new
}
func (i *Interface) checkCache(key string) (record.Record, bool) {
if i.cache != nil {
cacheVal, err := i.cache.Get(key)
if err == nil {
r, ok := cacheVal.(record.Record)
if ok {
return r, true
}
}
}
return nil, false
}
func (i *Interface) updateCache(r record.Record) {
if i.cache != nil {
_ = i.cache.Set(r.Key(), r)
}
}
// Exists return whether a record with the given key exists.
func (i *Interface) Exists(key string) (bool, error) {
_, _, err := i.getRecord(getDBFromKey, key, false, false)
_, err := i.Get(key)
if err != nil {
if err == ErrNotFound {
switch {
case errors.Is(err, ErrNotFound):
return false, nil
case errors.Is(err, ErrPermissionDenied):
return true, nil
default:
return false, err
}
return false, err
}
return true, nil
}
// Get return the record with the given key.
func (i *Interface) Get(key string) (record.Record, error) {
r, ok := i.checkCache(key)
if ok {
if !r.Meta().CheckPermission(i.options.Local, i.options.Internal) {
return nil, ErrPermissionDenied
}
return r, nil
}
r, _, err := i.getRecord(getDBFromKey, key, true, false)
r, _, err := i.getRecord(getDBFromKey, key, false)
return r, err
}
func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWriteable bool) (r record.Record, db *Controller, err error) {
func (i *Interface) getRecord(dbName string, dbKey string, mustBeWriteable bool) (r record.Record, db *Controller, err error) {
if dbName == "" {
dbName, dbKey = record.ParseKey(dbKey)
}
@ -124,19 +168,24 @@ func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWri
}
if mustBeWriteable && db.ReadOnly() {
return nil, nil, ErrReadOnly
return nil, db, ErrReadOnly
}
r = i.checkCache(dbName + ":" + dbKey)
if r != nil {
if !i.options.hasAccessPermission(r) {
return nil, db, ErrPermissionDenied
}
return r, db, nil
}
r, err = db.Get(dbKey)
if err != nil {
if err == ErrNotFound {
return nil, db, err
}
return nil, nil, err
return nil, db, err
}
if check && !r.Meta().CheckPermission(i.options.Local, i.options.Internal) {
return nil, nil, ErrPermissionDenied
if !i.options.hasAccessPermission(r) {
return nil, db, ErrPermissionDenied
}
return r, db, nil
@ -144,7 +193,7 @@ func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWri
// InsertValue inserts a value into a record.
func (i *Interface) InsertValue(key string, attribute string, value interface{}) error {
r, db, err := i.getRecord(getDBFromKey, key, true, true)
r, db, err := i.getRecord(getDBFromKey, key, true)
if err != nil {
return err
}
@ -176,8 +225,41 @@ func (i *Interface) InsertValue(key string, attribute string, value interface{})
func (i *Interface) Put(r record.Record) (err error) {
// get record or only database
var db *Controller
if !i.options.Internal || !i.options.Local {
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
if !i.options.HasAllPermissions() {
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true)
if err != nil && err != ErrNotFound {
return err
}
} else {
db, err = getController(r.DatabaseName())
if err != nil {
return err
}
}
// Check if database is read only before we add to the cache.
if db.ReadOnly() {
return ErrReadOnly
}
r.Lock()
defer r.Unlock()
i.options.Apply(r)
written := i.updateCache(r, true)
if written {
return nil
}
return db.Put(r)
}
// PutNew saves a record to the database as a new record (ie. with new timestamps).
func (i *Interface) PutNew(r record.Record) (err error) {
// get record or only database
var db *Controller
if !i.options.HasAllPermissions() {
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true)
if err != nil && err != ErrNotFound {
return err
}
@ -191,36 +273,16 @@ func (i *Interface) Put(r record.Record) (err error) {
r.Lock()
defer r.Unlock()
i.options.Apply(r)
i.updateCache(r)
return db.Put(r)
}
// PutNew saves a record to the database as a new record (ie. with new timestamps).
func (i *Interface) PutNew(r record.Record) (err error) {
// get record or only database
var db *Controller
if !i.options.Internal || !i.options.Local {
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
if err != nil && err != ErrNotFound {
return err
}
} else {
db, err = getController(r.DatabaseKey())
if err != nil {
return err
}
}
r.Lock()
defer r.Unlock()
if r.Meta() != nil {
r.Meta().Reset()
}
i.options.Apply(r)
i.updateCache(r)
written := i.updateCache(r, true)
if written {
return nil
}
return db.Put(r)
}
@ -233,7 +295,7 @@ func (i *Interface) PutMany(dbName string) (put func(record.Record) error) {
interfaceBatch := make(chan record.Record, 100)
// permission check
if !i.options.Internal || !i.options.Local {
if !i.options.HasAllPermissions() {
return func(r record.Record) error {
return ErrPermissionDenied
}
@ -316,7 +378,7 @@ func (i *Interface) PutMany(dbName string) (put func(record.Record) error) {
// SetAbsoluteExpiry sets an absolute record expiry.
func (i *Interface) SetAbsoluteExpiry(key string, time int64) error {
r, db, err := i.getRecord(getDBFromKey, key, true, true)
r, db, err := i.getRecord(getDBFromKey, key, true)
if err != nil {
return err
}
@ -331,7 +393,7 @@ func (i *Interface) SetAbsoluteExpiry(key string, time int64) error {
// SetRelativateExpiry sets a relative (self-updating) record expiry.
func (i *Interface) SetRelativateExpiry(key string, duration int64) error {
r, db, err := i.getRecord(getDBFromKey, key, true, true)
r, db, err := i.getRecord(getDBFromKey, key, true)
if err != nil {
return err
}
@ -346,7 +408,7 @@ func (i *Interface) SetRelativateExpiry(key string, duration int64) error {
// MakeSecret marks the record as a secret, meaning interfacing processes, such as an UI, are denied access to the record.
func (i *Interface) MakeSecret(key string) error {
r, db, err := i.getRecord(getDBFromKey, key, true, true)
r, db, err := i.getRecord(getDBFromKey, key, true)
if err != nil {
return err
}
@ -361,7 +423,7 @@ func (i *Interface) MakeSecret(key string) error {
// MakeCrownJewel marks a record as a crown jewel, meaning it will only be accessible locally.
func (i *Interface) MakeCrownJewel(key string) error {
r, db, err := i.getRecord(getDBFromKey, key, true, true)
r, db, err := i.getRecord(getDBFromKey, key, true)
if err != nil {
return err
}
@ -376,7 +438,7 @@ func (i *Interface) MakeCrownJewel(key string) error {
// Delete deletes a record from the database.
func (i *Interface) Delete(key string) error {
r, db, err := i.getRecord(getDBFromKey, key, true, true)
r, db, err := i.getRecord(getDBFromKey, key, true)
if err != nil {
return err
}

203
database/interface_cache.go Normal file
View file

@ -0,0 +1,203 @@
package database
import (
"context"
"errors"
"time"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/log"
)
// DelayedCacheWriter must be run by the caller of an interface that uses delayed cache writing.
func (i *Interface) DelayedCacheWriter(ctx context.Context) error {
// Check if the DelayedCacheWriter should be run at all.
if i.options.CacheSize <= 0 || i.options.DelayCachedWrites == "" {
return errors.New("delayed cache writer is not applicable to this database interface")
}
// Check if backend support the Batcher interface.
batchPut := i.PutMany(i.options.DelayCachedWrites)
// End batchPut immediately and check for an error.
err := batchPut(nil)
if err != nil {
return err
}
// percentThreshold defines the minimum percentage of entries in the write cache in relation to the cache size that need to be present in order for flushing the cache to the database storage.
percentThreshold := 25
thresholdWriteTicker := time.NewTicker(5 * time.Second)
forceWriteTicker := time.NewTicker(5 * time.Minute)
for {
// Wait for trigger for writing the cache.
select {
case <-ctx.Done():
// The caller is shutting down, flush the cache to storage and exit.
i.flushWriteCache(0)
return nil
case <-i.triggerCacheWrite:
// An entry from the cache was evicted that was also in the write cache.
// This makes it likely that other entries that are also present in the
// write cache will be evicted soon. Flush the write cache to storage
// immediately in order to reduce single writes.
i.flushWriteCache(0)
case <-thresholdWriteTicker.C:
// Often check if the the write cache has filled up to a certain degree and
// flush it to storage before we start evicting to-be-written entries and
// slow down the hot path again.
i.flushWriteCache(percentThreshold)
case <-forceWriteTicker.C:
// Once in a while, flush the write cache to storage no matter how much
// it is filled. We don't want entries lingering around in the write
// cache forever. This also reduces the amount of data loss in the event
// of a total crash.
i.flushWriteCache(0)
}
}
}
func (i *Interface) flushWriteCache(percentThreshold int) {
i.writeCacheLock.Lock()
defer i.writeCacheLock.Unlock()
// Check if there is anything to do.
if len(i.writeCache) == 0 {
return
}
// Check if we reach the given threshold for writing to storage.
if (len(i.writeCache)*100)/i.options.CacheSize < percentThreshold {
return
}
// Write the full cache in a batch operation.
batchPut := i.PutMany(i.options.DelayCachedWrites)
for _, r := range i.writeCache {
err := batchPut(r)
if err != nil {
log.Warningf("database: failed to write write-cached entry to %q database: %s", i.options.DelayCachedWrites, err)
}
}
// Finish batch.
err := batchPut(nil)
if err != nil {
log.Warningf("database: failed to finish flushing write cache to %q database: %s", i.options.DelayCachedWrites, err)
}
// Optimized map clearing following the Go1.11 recommendation.
for key := range i.writeCache {
delete(i.writeCache, key)
}
}
// cacheEvictHandler is run by the cache for every entry that gets evicted
// from the cache.
func (i *Interface) cacheEvictHandler(keyData, _ interface{}) {
// Transform the key into a string.
key, ok := keyData.(string)
if !ok {
return
}
// Check if the evicted record is one that is to be written.
// Lock the write cache until the end of the function.
// The read cache is locked anyway for the whole duration.
i.writeCacheLock.Lock()
defer i.writeCacheLock.Unlock()
r, ok := i.writeCache[key]
if ok {
delete(i.writeCache, key)
}
if !ok {
return
}
// Write record to database in order to mitigate race conditions where the record would appear
// as non-existent for a short duration.
db, err := getController(r.DatabaseName())
if err != nil {
log.Warningf("database: failed to write evicted cache entry %q: database %q does not exist", key, r.DatabaseName())
return
}
r.Lock()
defer r.Unlock()
err = db.Put(r)
if err != nil {
log.Warningf("database: failed to write evicted cache entry %q to database: %s", key, err)
}
// Finally, trigger writing the full write cache because a to-be-written
// entry was just evicted from the cache, and this makes it likely that more
// to-be-written entries will be evicted shortly.
select {
case i.triggerCacheWrite <- struct{}{}:
default:
}
}
func (i *Interface) checkCache(key string) record.Record {
// Check if cache is in use.
if i.cache == nil {
return nil
}
// Check if record exists in cache.
cacheVal, err := i.cache.Get(key)
if err == nil {
r, ok := cacheVal.(record.Record)
if ok {
return r
}
}
return nil
}
func (i *Interface) updateCache(r record.Record, write bool) (written bool) {
// Check if cache is in use.
if i.cache == nil {
return false
}
// Check if record should be deleted
if r.Meta().IsDeleted() {
// Remove entry from cache.
i.cache.Remove(r.Key())
// Let write through to database storage.
return false
}
// Update cache with record.
ttl := r.Meta().GetRelativeExpiry()
if ttl >= 0 {
_ = i.cache.SetWithExpire(
r.Key(),
r,
time.Duration(ttl)*time.Second,
)
} else {
_ = i.cache.Set(
r.Key(),
r,
)
}
// Add record to write cache instead if:
// 1. The record is being written.
// 2. Write delaying is active.
// 3. Write delaying is active for the database of this record.
if write && r.DatabaseName() == i.options.DelayCachedWrites {
i.writeCacheLock.Lock()
defer i.writeCacheLock.Unlock()
i.writeCache[r.Key()] = r
return true
}
return false
}

View file

@ -0,0 +1,158 @@
package database
import (
"context"
"fmt"
"strconv"
"sync"
"testing"
)
func benchmarkCacheWriting(b *testing.B, storageType string, cacheSize int, sampleSize int, delayWrites bool) { //nolint:gocognit,gocyclo
b.Run(fmt.Sprintf("CacheWriting_%s_%d_%d_%v", storageType, cacheSize, sampleSize, delayWrites), func(b *testing.B) {
// Setup Benchmark.
// Create database.
dbName := fmt.Sprintf("cache-w-benchmark-%s-%d-%d-%v", storageType, cacheSize, sampleSize, delayWrites)
_, err := Register(&Database{
Name: dbName,
Description: fmt.Sprintf("Cache Benchmark Database for %s", storageType),
StorageType: storageType,
})
if err != nil {
b.Fatal(err)
}
// Create benchmark interface.
options := &Options{
Local: true,
Internal: true,
CacheSize: cacheSize,
}
if cacheSize > 0 && delayWrites {
options.DelayCachedWrites = dbName
}
db := NewInterface(options)
// Start
ctx, cancelCtx := context.WithCancel(context.Background())
var wg sync.WaitGroup
if cacheSize > 0 && delayWrites {
wg.Add(1)
go func() {
err := db.DelayedCacheWriter(ctx)
if err != nil {
panic(err)
}
wg.Done()
}()
}
// Start Benchmark.
b.ResetTimer()
for i := 0; i < b.N; i++ {
testRecordID := i % sampleSize
r := NewExample(
dbName+":"+strconv.Itoa(testRecordID),
"A",
1,
)
err = db.Put(r)
if err != nil {
b.Fatal(err)
}
}
// End cache writer and wait
cancelCtx()
wg.Wait()
})
}
func benchmarkCacheReadWrite(b *testing.B, storageType string, cacheSize int, sampleSize int, delayWrites bool) { //nolint:gocognit,gocyclo
b.Run(fmt.Sprintf("CacheReadWrite_%s_%d_%d_%v", storageType, cacheSize, sampleSize, delayWrites), func(b *testing.B) {
// Setup Benchmark.
// Create database.
dbName := fmt.Sprintf("cache-rw-benchmark-%s-%d-%d-%v", storageType, cacheSize, sampleSize, delayWrites)
_, err := Register(&Database{
Name: dbName,
Description: fmt.Sprintf("Cache Benchmark Database for %s", storageType),
StorageType: storageType,
})
if err != nil {
b.Fatal(err)
}
// Create benchmark interface.
options := &Options{
Local: true,
Internal: true,
CacheSize: cacheSize,
}
if cacheSize > 0 && delayWrites {
options.DelayCachedWrites = dbName
}
db := NewInterface(options)
// Start
ctx, cancelCtx := context.WithCancel(context.Background())
var wg sync.WaitGroup
if cacheSize > 0 && delayWrites {
wg.Add(1)
go func() {
err := db.DelayedCacheWriter(ctx)
if err != nil {
panic(err)
}
wg.Done()
}()
}
// Start Benchmark.
b.ResetTimer()
writing := true
for i := 0; i < b.N; i++ {
testRecordID := i % sampleSize
key := dbName + ":" + strconv.Itoa(testRecordID)
if i > 0 && testRecordID == 0 {
writing = !writing // switch between reading and writing every samplesize
}
if writing {
r := NewExample(key, "A", 1)
err = db.Put(r)
} else {
_, err = db.Get(key)
}
if err != nil {
b.Fatal(err)
}
}
// End cache writer and wait
cancelCtx()
wg.Wait()
})
}
func BenchmarkCache(b *testing.B) {
for _, storageType := range []string{"bbolt", "hashmap"} {
benchmarkCacheWriting(b, storageType, 32, 8, false)
benchmarkCacheWriting(b, storageType, 32, 8, true)
benchmarkCacheWriting(b, storageType, 32, 1024, false)
benchmarkCacheWriting(b, storageType, 32, 1024, true)
benchmarkCacheWriting(b, storageType, 512, 1024, false)
benchmarkCacheWriting(b, storageType, 512, 1024, true)
benchmarkCacheReadWrite(b, storageType, 32, 8, false)
benchmarkCacheReadWrite(b, storageType, 32, 8, true)
benchmarkCacheReadWrite(b, storageType, 32, 1024, false)
benchmarkCacheReadWrite(b, storageType, 32, 1024, true)
benchmarkCacheReadWrite(b, storageType, 512, 1024, false)
benchmarkCacheReadWrite(b, storageType, 512, 1024, true)
}
}

View file

@ -2,7 +2,6 @@ package record
import (
"errors"
"fmt"
"github.com/safing/portbase/container"
"github.com/safing/portbase/database/accessor"
@ -18,7 +17,7 @@ type Base struct {
// Key returns the key of the database record.
func (b *Base) Key() string {
return fmt.Sprintf("%s:%s", b.dbName, b.dbKey)
return b.dbName + ":" + b.dbKey
}
// KeyIsSet returns true if the database key is set.

View file

@ -31,9 +31,10 @@ func (m *Meta) GetAbsoluteExpiry() int64 {
}
// GetRelativeExpiry returns the current relative expiry time - ie. seconds until expiry.
// A negative value signifies that the record does not expire.
func (m *Meta) GetRelativeExpiry() int64 {
if m.Deleted < 0 {
return -m.Deleted
if m.Expires == 0 {
return -1
}
abs := m.Expires - time.Now().Unix()

View file

@ -115,26 +115,9 @@ func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error)
err := b.db.Batch(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
for r := range batch {
if !shadowDelete && r.Meta().IsDeleted() {
// Immediate delete.
txErr := bucket.Delete([]byte(r.DatabaseKey()))
if txErr != nil {
return txErr
}
} else {
// Put or shadow delete.
// marshal
data, txErr := r.MarshalRecord(r)
if txErr != nil {
return txErr
}
// put
txErr = bucket.Put([]byte(r.DatabaseKey()), data)
if txErr != nil {
return txErr
}
txErr := b.batchPutOrDelete(bucket, shadowDelete, r)
if txErr != nil {
return txErr
}
}
return nil
@ -145,6 +128,25 @@ func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error)
return batch, errs
}
func (b *BBolt) batchPutOrDelete(bucket *bbolt.Bucket, shadowDelete bool, r record.Record) (err error) {
r.Lock()
defer r.Unlock()
if !shadowDelete && r.Meta().IsDeleted() {
// Immediate delete.
err = bucket.Delete([]byte(r.DatabaseKey()))
} else {
// Put or shadow delete.
var data []byte
data, err = r.MarshalRecord(r)
if err == nil {
err = bucket.Put([]byte(r.DatabaseKey()), data)
}
}
return err
}
// Delete deletes a record from the database.
func (b *BBolt) Delete(key string) error {
err := b.db.Update(func(tx *bbolt.Tx) error {

View file

@ -66,11 +66,7 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro
// start handler
go func() {
for r := range batch {
if !shadowDelete && r.Meta().IsDeleted() {
delete(hm.db, r.DatabaseKey())
} else {
hm.db[r.DatabaseKey()] = r
}
hm.batchPutOrDelete(shadowDelete, r)
}
errs <- nil
}()
@ -78,6 +74,20 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro
return batch, errs
}
func (hm *HashMap) batchPutOrDelete(shadowDelete bool, r record.Record) {
r.Lock()
defer r.Unlock()
hm.dbLock.Lock()
defer hm.dbLock.Unlock()
if !shadowDelete && r.Meta().IsDeleted() {
delete(hm.db, r.DatabaseKey())
} else {
hm.db[r.DatabaseKey()] = r
}
}
// Delete deletes a record from the database.
func (hm *HashMap) Delete(key string) error {
hm.dbLock.Lock()