From abaf5ee8792f4207db3f669a997b80f2c854feae Mon Sep 17 00:00:00 2001 From: Daniel Date: Sun, 11 Oct 2020 14:27:54 +0200 Subject: [PATCH 1/6] Add a write-cache to the database interface options --- database/database_test.go | 48 ++++++--- database/interface.go | 172 +++++++++++++++++-------------- database/interface_cache.go | 171 ++++++++++++++++++++++++++++++ database/interface_cache_test.go | 158 ++++++++++++++++++++++++++++ database/record/base.go | 3 +- database/record/meta.go | 5 +- database/storage/hashmap/map.go | 17 ++- 7 files changed, 472 insertions(+), 102 deletions(-) create mode 100644 database/interface_cache.go create mode 100644 database/interface_cache_test.go diff --git a/database/database_test.go b/database/database_test.go index 790bd7f..f625884 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -22,6 +22,24 @@ import ( _ "github.com/safing/portbase/database/storage/hashmap" ) +func TestMain(m *testing.M) { + testDir, err := ioutil.TempDir("", "portbase-database-testing-") + if err != nil { + panic(err) + } + + err = InitializeWithPath(testDir) + if err != nil { + panic(err) + } + + exitCode := m.Run() + + os.RemoveAll(testDir) // clean up + + os.Exit(exitCode) +} + func makeKey(dbName, key string) string { return fmt.Sprintf("%s:%s", dbName, key) } @@ -220,23 +238,19 @@ func testDatabase(t *testing.T, storageType string, shadowDelete bool) { //nolin func TestDatabaseSystem(t *testing.T) { // panic after 10 seconds, to check for locks - go func() { - time.Sleep(10 * time.Second) - fmt.Println("===== TAKING TOO LONG - PRINTING STACK TRACES =====") - _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) - os.Exit(1) + finished := make(chan struct{}) + defer func() { + close(finished) + }() + go func() { + select { + case <-finished: + case <-time.After(10 * time.Second): + fmt.Println("===== TAKING TOO LONG - PRINTING STACK TRACES =====") + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + os.Exit(1) + } }() - - testDir, err := ioutil.TempDir("", "portbase-database-testing-") - if err != nil { - t.Fatal(err) - } - - err = InitializeWithPath(testDir) - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(testDir) // clean up for _, shadowDelete := range []bool{false, true} { testDatabase(t, "bbolt", shadowDelete) @@ -246,7 +260,7 @@ func TestDatabaseSystem(t *testing.T) { // TODO: Fix badger tests } - err = MaintainRecordStates(context.TODO()) + err := MaintainRecordStates(context.TODO()) if err != nil { t.Fatal(err) } diff --git a/database/interface.go b/database/interface.go index 02bae88..f921e60 100644 --- a/database/interface.go +++ b/database/interface.go @@ -4,11 +4,11 @@ import ( "context" "errors" "fmt" + "sync" "time" - "github.com/tevino/abool" - "github.com/bluele/gcache" + "github.com/tevino/abool" "github.com/safing/portbase/database/accessor" "github.com/safing/portbase/database/iterator" @@ -24,6 +24,10 @@ const ( type Interface struct { options *Options cache gcache.Cache + + writeCache map[string]record.Record + writeCacheLock sync.Mutex + triggerCacheWrite chan struct{} } // Options holds options that may be set for an Interface instance. @@ -34,7 +38,22 @@ type Options struct { AlwaysMakeCrownjewel bool AlwaysSetRelativateExpiry int64 AlwaysSetAbsoluteExpiry int64 - CacheSize int + + // CacheSize defines that a cache should be used for this interface and + // defines it's size. + // Caching comes with an important caveat: If database records are changed + // from another interface, the cache will not be invalidated for these + // records. It will therefore serve outdated data until that record is + // evicted from the cache. + CacheSize int + + // DelayCachedWrites defines a database storage for which cache writes should + // be cached and batched. The database backend must support the Batcher + // interface. This option is only valid if used with a cache. + // Additionally, this may only be used for internal and local interfaces. + // Please note that this means that other interfaces will not be able to + // guarantee to serve the latest record if records are written this way. + DelayCachedWrites string } // Apply applies options to the record metadata. @@ -63,57 +82,40 @@ func NewInterface(opts *Options) *Interface { options: opts, } if opts.CacheSize > 0 { - new.cache = gcache.New(opts.CacheSize).ARC().Expiration(time.Hour).Build() + cacheBuilder := gcache.New(opts.CacheSize).ARC() + if len(opts.DelayCachedWrites) > 0 { + cacheBuilder.EvictedFunc(new.cacheEvictHandler) + new.writeCache = make(map[string]record.Record) + new.triggerCacheWrite = make(chan struct{}) + } + new.cache = cacheBuilder.Build() } return new } -func (i *Interface) checkCache(key string) (record.Record, bool) { - if i.cache != nil { - cacheVal, err := i.cache.Get(key) - if err == nil { - r, ok := cacheVal.(record.Record) - if ok { - return r, true - } - } - } - return nil, false -} - -func (i *Interface) updateCache(r record.Record) { - if i.cache != nil { - _ = i.cache.Set(r.Key(), r) - } -} - // Exists return whether a record with the given key exists. func (i *Interface) Exists(key string) (bool, error) { - _, _, err := i.getRecord(getDBFromKey, key, false, false) + _, err := i.Get(key) if err != nil { - if err == ErrNotFound { + switch err { + case ErrNotFound: return false, nil + case ErrPermissionDenied: + return true, nil + default: + return false, err } - return false, err } return true, nil } // Get return the record with the given key. func (i *Interface) Get(key string) (record.Record, error) { - r, ok := i.checkCache(key) - if ok { - if !r.Meta().CheckPermission(i.options.Local, i.options.Internal) { - return nil, ErrPermissionDenied - } - return r, nil - } - - r, _, err := i.getRecord(getDBFromKey, key, true, false) + r, _, err := i.getRecord(getDBFromKey, key, false) return r, err } -func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWriteable bool) (r record.Record, db *Controller, err error) { +func (i *Interface) getRecord(dbName string, dbKey string, mustBeWriteable bool) (r record.Record, db *Controller, err error) { if dbName == "" { dbName, dbKey = record.ParseKey(dbKey) } @@ -124,19 +126,24 @@ func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWri } if mustBeWriteable && db.ReadOnly() { - return nil, nil, ErrReadOnly + return nil, db, ErrReadOnly + } + + r = i.checkCache(dbName + ":" + dbKey) + if r != nil { + if !r.Meta().CheckPermission(i.options.Local, i.options.Internal) { + return nil, db, ErrPermissionDenied + } + return r, db, nil } r, err = db.Get(dbKey) if err != nil { - if err == ErrNotFound { - return nil, db, err - } - return nil, nil, err + return nil, db, err } - if check && !r.Meta().CheckPermission(i.options.Local, i.options.Internal) { - return nil, nil, ErrPermissionDenied + if !r.Meta().CheckPermission(i.options.Local, i.options.Internal) { + return nil, db, ErrPermissionDenied } return r, db, nil @@ -144,7 +151,7 @@ func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWri // InsertValue inserts a value into a record. func (i *Interface) InsertValue(key string, attribute string, value interface{}) error { - r, db, err := i.getRecord(getDBFromKey, key, true, true) + r, db, err := i.getRecord(getDBFromKey, key, true) if err != nil { return err } @@ -177,7 +184,40 @@ func (i *Interface) Put(r record.Record) (err error) { // get record or only database var db *Controller if !i.options.Internal || !i.options.Local { - _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) + _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true) + if err != nil && err != ErrNotFound { + return err + } + } else { + db, err = getController(r.DatabaseName()) + if err != nil { + return err + } + } + + // Check if database is read only before we add to the cache. + if db.ReadOnly() { + return ErrReadOnly + } + + r.Lock() + defer r.Unlock() + i.options.Apply(r) + + written := i.updateCache(r, true) + if written { + return nil + } + + return db.Put(r) +} + +// PutNew saves a record to the database as a new record (ie. with new timestamps). +func (i *Interface) PutNew(r record.Record) (err error) { + // get record or only database + var db *Controller + if !i.options.Internal || !i.options.Local { + _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true) if err != nil && err != ErrNotFound { return err } @@ -191,36 +231,16 @@ func (i *Interface) Put(r record.Record) (err error) { r.Lock() defer r.Unlock() - i.options.Apply(r) - - i.updateCache(r) - return db.Put(r) -} - -// PutNew saves a record to the database as a new record (ie. with new timestamps). -func (i *Interface) PutNew(r record.Record) (err error) { - // get record or only database - var db *Controller - if !i.options.Internal || !i.options.Local { - _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) - if err != nil && err != ErrNotFound { - return err - } - } else { - db, err = getController(r.DatabaseKey()) - if err != nil { - return err - } - } - - r.Lock() - defer r.Unlock() - if r.Meta() != nil { r.Meta().Reset() } i.options.Apply(r) - i.updateCache(r) + + written := i.updateCache(r, true) + if written { + return nil + } + return db.Put(r) } @@ -316,7 +336,7 @@ func (i *Interface) PutMany(dbName string) (put func(record.Record) error) { // SetAbsoluteExpiry sets an absolute record expiry. func (i *Interface) SetAbsoluteExpiry(key string, time int64) error { - r, db, err := i.getRecord(getDBFromKey, key, true, true) + r, db, err := i.getRecord(getDBFromKey, key, true) if err != nil { return err } @@ -331,7 +351,7 @@ func (i *Interface) SetAbsoluteExpiry(key string, time int64) error { // SetRelativateExpiry sets a relative (self-updating) record expiry. func (i *Interface) SetRelativateExpiry(key string, duration int64) error { - r, db, err := i.getRecord(getDBFromKey, key, true, true) + r, db, err := i.getRecord(getDBFromKey, key, true) if err != nil { return err } @@ -346,7 +366,7 @@ func (i *Interface) SetRelativateExpiry(key string, duration int64) error { // MakeSecret marks the record as a secret, meaning interfacing processes, such as an UI, are denied access to the record. func (i *Interface) MakeSecret(key string) error { - r, db, err := i.getRecord(getDBFromKey, key, true, true) + r, db, err := i.getRecord(getDBFromKey, key, true) if err != nil { return err } @@ -361,7 +381,7 @@ func (i *Interface) MakeSecret(key string) error { // MakeCrownJewel marks a record as a crown jewel, meaning it will only be accessible locally. func (i *Interface) MakeCrownJewel(key string) error { - r, db, err := i.getRecord(getDBFromKey, key, true, true) + r, db, err := i.getRecord(getDBFromKey, key, true) if err != nil { return err } @@ -376,7 +396,7 @@ func (i *Interface) MakeCrownJewel(key string) error { // Delete deletes a record from the database. func (i *Interface) Delete(key string) error { - r, db, err := i.getRecord(getDBFromKey, key, true, true) + r, db, err := i.getRecord(getDBFromKey, key, true) if err != nil { return err } diff --git a/database/interface_cache.go b/database/interface_cache.go new file mode 100644 index 0000000..d192e26 --- /dev/null +++ b/database/interface_cache.go @@ -0,0 +1,171 @@ +package database + +import ( + "context" + "errors" + "time" + + "github.com/safing/portbase/database/record" + "github.com/safing/portbase/log" +) + +// DelayedCacheWriter must be run by the caller of an interface that uses delayed cache writing. +func (i *Interface) DelayedCacheWriter(ctx context.Context) error { + // Check if the DelayedCacheWriter should be run at all. + if i.options.CacheSize <= 0 || len(i.options.DelayCachedWrites) == 0 { + return errors.New("delayed cache writer is not applicable to this database interface") + } + + // Check if backend support the Batcher interface. + batchPut := i.PutMany(i.options.DelayCachedWrites) + // End batchPut immediately and check for an error. + err := batchPut(nil) + if err != nil { + return err + } + + for { + // Wait for trigger for writing the cache. + select { + case <-ctx.Done(): + // Module is shutting down, flush write cache to database. + i.flushWriteCache() + return nil + + case <-i.triggerCacheWrite: + case <-time.After(5 * time.Second): + } + + i.flushWriteCache() + } +} + +func (i *Interface) flushWriteCache() { + i.writeCacheLock.Lock() + defer i.writeCacheLock.Unlock() + + // Check if there is anything to do. + if len(i.writeCache) == 0 { + return + } + + // Write the full cache in a batch operation. + batchPut := i.PutMany(i.options.DelayCachedWrites) + for _, r := range i.writeCache { + err := batchPut(r) + if err != nil { + log.Warningf("database: failed to write write-cached entry to %q database: %s", i.options.DelayCachedWrites, err) + } + } + // Finish batch. + err := batchPut(nil) + if err != nil { + log.Warningf("database: failed to finish flushing write cache to %q database: %s", i.options.DelayCachedWrites, err) + } + + // Optimized map clearing following the Go1.11 recommendation. + for key := range i.writeCache { + delete(i.writeCache, key) + } +} + +// cacheEvictHandler is run by the cache for every entry that gets evicted +// from the cache. +func (i *Interface) cacheEvictHandler(keyData, _ interface{}) { + // Transform the key into a string. + key, ok := keyData.(string) + if !ok { + return + } + + // Check if the evicted record is one that is to be written. + i.writeCacheLock.Lock() + r, ok := i.writeCache[key] + if ok { + delete(i.writeCache, key) + } + i.writeCacheLock.Unlock() + if !ok { + return + } + + // Write record to database in order to mitigate race conditions where the record would appear + // as non-existent for a short duration. + db, err := getController(r.DatabaseName()) + if err != nil { + log.Warningf("database: failed to write evicted cache entry %q: database %q does not exist", key, r.DatabaseName()) + return + } + err = db.Put(r) + if err != nil { + log.Warningf("database: failed to write evicted cache entry %q to database: %s", key, err) + } + + // Finally, trigger writing the full write cache because a to-be-written + // entry was just evicted from the cache, and this makes it likely that more + // to-be-written entries will be evicted shortly. + select { + case i.triggerCacheWrite <- struct{}{}: + default: + } +} + +func (i *Interface) checkCache(key string) record.Record { + // Check if cache is in use. + if i.cache == nil { + return nil + } + + // Check if record exists in cache. + cacheVal, err := i.cache.Get(key) + if err == nil { + r, ok := cacheVal.(record.Record) + if ok { + return r + } + } + return nil +} + +func (i *Interface) updateCache(r record.Record, write bool) (written bool) { + // Check if cache is in use. + if i.cache == nil { + return false + } + + // Check if record should be deleted + if r.Meta().IsDeleted() { + // Remove entry from cache. + i.cache.Remove(r.Key()) + // Let write through to database storage. + return false + } + + // Update cache with record. + ttl := r.Meta().GetRelativeExpiry() + if ttl >= 0 { + _ = i.cache.SetWithExpire( + r.Key(), + r, + time.Duration(ttl)*time.Second, + ) + } else { + _ = i.cache.Set( + r.Key(), + r, + ) + } + + // Add record to write cache instead if: + // 1. The record is being written. + // 2. Write delaying is active. + // 3. Write delaying is active for the database of this record. + if write && len(i.options.DelayCachedWrites) > 0 && r.DatabaseName() == i.options.DelayCachedWrites { + i.writeCacheLock.Lock() + defer i.writeCacheLock.Unlock() + i.writeCache[r.Key()] = r + return true + } + + return false +} diff --git a/database/interface_cache_test.go b/database/interface_cache_test.go new file mode 100644 index 0000000..ff65260 --- /dev/null +++ b/database/interface_cache_test.go @@ -0,0 +1,158 @@ +package database + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" +) + +func benchmarkCacheWriting(b *testing.B, storageType string, cacheSize int, sampleSize int, delayWrites bool) { //nolint:gocognit,gocyclo + b.Run(fmt.Sprintf("CacheWriting_%s_%d_%d_%v", storageType, cacheSize, sampleSize, delayWrites), func(b *testing.B) { + // Setup Benchmark. + + // Create database. + dbName := fmt.Sprintf("cache-w-benchmark-%s-%d-%d-%v", storageType, cacheSize, sampleSize, delayWrites) + _, err := Register(&Database{ + Name: dbName, + Description: fmt.Sprintf("Cache Benchmark Database for %s", storageType), + StorageType: storageType, + }) + if err != nil { + b.Fatal(err) + } + + // Create benchmark interface. + options := &Options{ + Local: true, + Internal: true, + CacheSize: cacheSize, + } + if cacheSize > 0 && delayWrites { + options.DelayCachedWrites = dbName + } + db := NewInterface(options) + + // Start + ctx, cancelCtx := context.WithCancel(context.Background()) + var wg sync.WaitGroup + if cacheSize > 0 && delayWrites { + wg.Add(1) + go func() { + err := db.DelayedCacheWriter(ctx) + if err != nil { + panic(err) + } + wg.Done() + }() + } + + // Start Benchmark. + b.ResetTimer() + for i := 0; i < b.N; i++ { + testRecordID := i % sampleSize + r := NewExample( + dbName+":"+strconv.Itoa(testRecordID), + "A", + 1, + ) + err = db.Put(r) + if err != nil { + b.Fatal(err) + } + } + + // End cache writer and wait + cancelCtx() + wg.Wait() + + }) +} + +func benchmarkCacheReadWrite(b *testing.B, storageType string, cacheSize int, sampleSize int, delayWrites bool) { //nolint:gocognit,gocyclo + b.Run(fmt.Sprintf("CacheReadWrite_%s_%d_%d_%v", storageType, cacheSize, sampleSize, delayWrites), func(b *testing.B) { + // Setup Benchmark. + + // Create database. + dbName := fmt.Sprintf("cache-rw-benchmark-%s-%d-%d-%v", storageType, cacheSize, sampleSize, delayWrites) + _, err := Register(&Database{ + Name: dbName, + Description: fmt.Sprintf("Cache Benchmark Database for %s", storageType), + StorageType: storageType, + }) + if err != nil { + b.Fatal(err) + } + + // Create benchmark interface. + options := &Options{ + Local: true, + Internal: true, + CacheSize: cacheSize, + } + if cacheSize > 0 && delayWrites { + options.DelayCachedWrites = dbName + } + db := NewInterface(options) + + // Start + ctx, cancelCtx := context.WithCancel(context.Background()) + var wg sync.WaitGroup + if cacheSize > 0 && delayWrites { + wg.Add(1) + go func() { + err := db.DelayedCacheWriter(ctx) + if err != nil { + panic(err) + } + wg.Done() + }() + } + + // Start Benchmark. + b.ResetTimer() + writing := true + for i := 0; i < b.N; i++ { + testRecordID := i % sampleSize + key := dbName + ":" + strconv.Itoa(testRecordID) + + if i > 0 && testRecordID == 0 { + writing = !writing // switch between reading and writing every samplesize + } + + if writing { + r := NewExample(key, "A", 1) + err = db.Put(r) + } else { + _, err = db.Get(key) + } + if err != nil { + b.Fatal(err) + } + } + + // End cache writer and wait + cancelCtx() + wg.Wait() + + }) +} + +func BenchmarkCache(b *testing.B) { + for _, storageType := range []string{"bbolt", "hashmap"} { + benchmarkCacheWriting(b, storageType, 32, 8, false) + benchmarkCacheWriting(b, storageType, 32, 8, true) + benchmarkCacheWriting(b, storageType, 32, 1024, false) + benchmarkCacheWriting(b, storageType, 32, 1024, true) + benchmarkCacheWriting(b, storageType, 512, 1024, false) + benchmarkCacheWriting(b, storageType, 512, 1024, true) + + benchmarkCacheReadWrite(b, storageType, 32, 8, false) + benchmarkCacheReadWrite(b, storageType, 32, 8, true) + benchmarkCacheReadWrite(b, storageType, 32, 1024, false) + benchmarkCacheReadWrite(b, storageType, 32, 1024, true) + benchmarkCacheReadWrite(b, storageType, 512, 1024, false) + benchmarkCacheReadWrite(b, storageType, 512, 1024, true) + } +} diff --git a/database/record/base.go b/database/record/base.go index 8bc1d6b..33f6d2f 100644 --- a/database/record/base.go +++ b/database/record/base.go @@ -2,7 +2,6 @@ package record import ( "errors" - "fmt" "github.com/safing/portbase/container" "github.com/safing/portbase/database/accessor" @@ -18,7 +17,7 @@ type Base struct { // Key returns the key of the database record. func (b *Base) Key() string { - return fmt.Sprintf("%s:%s", b.dbName, b.dbKey) + return b.dbName + ":" + b.dbKey } // KeyIsSet returns true if the database key is set. diff --git a/database/record/meta.go b/database/record/meta.go index 23ed287..e9f4fcf 100644 --- a/database/record/meta.go +++ b/database/record/meta.go @@ -31,9 +31,10 @@ func (m *Meta) GetAbsoluteExpiry() int64 { } // GetRelativeExpiry returns the current relative expiry time - ie. seconds until expiry. +// A negative value signifies that the record does not expire. func (m *Meta) GetRelativeExpiry() int64 { - if m.Deleted < 0 { - return -m.Deleted + if m.Expires == 0 { + return -1 } abs := m.Expires - time.Now().Unix() diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go index 4005f07..53fa0d9 100644 --- a/database/storage/hashmap/map.go +++ b/database/storage/hashmap/map.go @@ -66,11 +66,7 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro // start handler go func() { for r := range batch { - if !shadowDelete && r.Meta().IsDeleted() { - delete(hm.db, r.DatabaseKey()) - } else { - hm.db[r.DatabaseKey()] = r - } + hm.putOrDelete(shadowDelete, r) } errs <- nil }() @@ -78,6 +74,17 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro return batch, errs } +func (hm *HashMap) putOrDelete(shadowDelete bool, r record.Record) { + hm.dbLock.Lock() + defer hm.dbLock.Unlock() + + if !shadowDelete && r.Meta().IsDeleted() { + delete(hm.db, r.DatabaseKey()) + } else { + hm.db[r.DatabaseKey()] = r + } +} + // Delete deletes a record from the database. func (hm *HashMap) Delete(key string) error { hm.dbLock.Lock() From 4aa5c66e8e62369c2400770c966ef9c21a74b9ec Mon Sep 17 00:00:00 2001 From: Daniel Date: Sun, 11 Oct 2020 20:26:05 +0200 Subject: [PATCH 2/6] Improve write-cache flush timing --- database/interface_cache.go | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/database/interface_cache.go b/database/interface_cache.go index d192e26..c85882b 100644 --- a/database/interface_cache.go +++ b/database/interface_cache.go @@ -24,23 +24,44 @@ func (i *Interface) DelayedCacheWriter(ctx context.Context) error { return err } + // percentThreshold defines the minimum percentage of entries in the write cache in relation to the cache size that need to be present in order for flushing the cache to the database storage. + percentThreshold := 25 // % + thresholdWriteTicker := time.NewTicker(5 * time.Second) + forceWriteTicker := time.NewTicker(5 * time.Minute) + for { // Wait for trigger for writing the cache. select { case <-ctx.Done(): - // Module is shutting down, flush write cache to database. - i.flushWriteCache() + // The caller is shutting down, flush the cache to storage and exit. + i.flushWriteCache(0) return nil case <-i.triggerCacheWrite: - case <-time.After(5 * time.Second): + // An entry from the cache was evicted that was also in the write cache. + // This makes it likely that other entries that are also present in the + // write cache will be evicted soon. Flush the write cache to storage + // immediately in order to reduce single writes. + i.flushWriteCache(0) + + case <-thresholdWriteTicker.C: + // Often check if the the write cache has filled up to a certain degree and + // flush it to storage before we start evicting to-be-written entries and + // slow down the hot path again. + i.flushWriteCache(percentThreshold) + + case <-forceWriteTicker.C: + // Once in a while, flush the write cache to storage no matter how much + // it is filled. We don't want entries lingering around in the write + // cache forever. This also reduces the amount of data loss in the event + // of a total crash. + i.flushWriteCache(0) } - i.flushWriteCache() } } -func (i *Interface) flushWriteCache() { +func (i *Interface) flushWriteCache(percentThreshold int) { i.writeCacheLock.Lock() defer i.writeCacheLock.Unlock() @@ -49,6 +70,11 @@ func (i *Interface) flushWriteCache() { return } + // Check if we reach the given threshold for writing to storage. + if (len(i.writeCache)*100)/i.options.CacheSize < percentThreshold { + return + } + // Write the full cache in a batch operation. batchPut := i.PutMany(i.options.DelayCachedWrites) for _, r := range i.writeCache { From e5a72ffa372c86f1eb425eb2dc8478954afcbdc3 Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 12 Oct 2020 13:54:14 +0200 Subject: [PATCH 3/6] Implement review suggestions --- database/database_test.go | 8 ++--- database/interface.go | 62 +++++++++++++++++++++++++++---------- database/interface_cache.go | 14 ++++++--- 3 files changed, 60 insertions(+), 24 deletions(-) diff --git a/database/database_test.go b/database/database_test.go index f625884..b6d793e 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -35,7 +35,9 @@ func TestMain(m *testing.M) { exitCode := m.Run() - os.RemoveAll(testDir) // clean up + // Clean up the test directory. + // Do not defer, as we end this function with a os.Exit call. + os.RemoveAll(testDir) os.Exit(exitCode) } @@ -239,9 +241,7 @@ func TestDatabaseSystem(t *testing.T) { // panic after 10 seconds, to check for locks finished := make(chan struct{}) - defer func() { - close(finished) - }() + defer close(finished) go func() { select { case <-finished: diff --git a/database/interface.go b/database/interface.go index f921e60..04f1830 100644 --- a/database/interface.go +++ b/database/interface.go @@ -32,12 +32,32 @@ type Interface struct { // Options holds options that may be set for an Interface instance. type Options struct { - Local bool - Internal bool - AlwaysMakeSecret bool - AlwaysMakeCrownjewel bool + // Local specifies if the interface is used by an actor on the local device. + // Setting both the Local and Internal flags will bring performance + // improvements because less checks are needed. + Local bool + + // Internal specifies if the interface is used by an actor within the + // software. Setting both the Local and Internal flags will bring performance + // improvements because less checks are needed. + Internal bool + + // AlwaysMakeSecret will have the interface mark all saved records as secret. + // This means that they will be only accessible by an internal interface. + AlwaysMakeSecret bool + + // AlwaysMakeCrownjewel will have the interface mark all saved records as + // crown jewels. This means that they will be only accessible by a local + // interface. + AlwaysMakeCrownjewel bool + + // AlwaysSetRelativateExpiry will have the interface set a relative expiry, + // based on the current time, on all saved records. AlwaysSetRelativateExpiry int64 - AlwaysSetAbsoluteExpiry int64 + + // AlwaysSetAbsoluteExpiry will have the interface set an absolute expiry on + // all saved records. + AlwaysSetAbsoluteExpiry int64 // CacheSize defines that a cache should be used for this interface and // defines it's size. @@ -47,7 +67,7 @@ type Options struct { // evicted from the cache. CacheSize int - // DelayCachedWrites defines a database storage for which cache writes should + // DelayCachedWrites defines a database name for which cache writes should // be cached and batched. The database backend must support the Batcher // interface. This option is only valid if used with a cache. // Additionally, this may only be used for internal and local interfaces. @@ -72,6 +92,11 @@ func (o *Options) Apply(r record.Record) { } } +// HasAllPermissions returns whether the options specify the highest possible permissions for operations. +func (o *Options) HasAllPermissions() bool { + return o.Local && o.Internal +} + // NewInterface returns a new Interface to the database. func NewInterface(opts *Options) *Interface { if opts == nil { @@ -83,9 +108,9 @@ func NewInterface(opts *Options) *Interface { } if opts.CacheSize > 0 { cacheBuilder := gcache.New(opts.CacheSize).ARC() - if len(opts.DelayCachedWrites) > 0 { + if opts.DelayCachedWrites != "" { cacheBuilder.EvictedFunc(new.cacheEvictHandler) - new.writeCache = make(map[string]record.Record) + new.writeCache = make(map[string]record.Record, opts.CacheSize/2) new.triggerCacheWrite = make(chan struct{}) } new.cache = cacheBuilder.Build() @@ -97,10 +122,10 @@ func NewInterface(opts *Options) *Interface { func (i *Interface) Exists(key string) (bool, error) { _, err := i.Get(key) if err != nil { - switch err { - case ErrNotFound: + switch { + case errors.Is(err, ErrNotFound): return false, nil - case ErrPermissionDenied: + case errors.Is(err, ErrPermissionDenied): return true, nil default: return false, err @@ -131,8 +156,13 @@ func (i *Interface) getRecord(dbName string, dbKey string, mustBeWriteable bool) r = i.checkCache(dbName + ":" + dbKey) if r != nil { - if !r.Meta().CheckPermission(i.options.Local, i.options.Internal) { - return nil, db, ErrPermissionDenied + if !i.options.HasAllPermissions() { + r.Lock() + permitted := r.Meta().CheckPermission(i.options.Local, i.options.Internal) + r.Unlock() + if !permitted { + return nil, db, ErrPermissionDenied + } } return r, db, nil } @@ -183,7 +213,7 @@ func (i *Interface) InsertValue(key string, attribute string, value interface{}) func (i *Interface) Put(r record.Record) (err error) { // get record or only database var db *Controller - if !i.options.Internal || !i.options.Local { + if !i.options.HasAllPermissions() { _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true) if err != nil && err != ErrNotFound { return err @@ -216,7 +246,7 @@ func (i *Interface) Put(r record.Record) (err error) { func (i *Interface) PutNew(r record.Record) (err error) { // get record or only database var db *Controller - if !i.options.Internal || !i.options.Local { + if !i.options.HasAllPermissions() { _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true) if err != nil && err != ErrNotFound { return err @@ -253,7 +283,7 @@ func (i *Interface) PutMany(dbName string) (put func(record.Record) error) { interfaceBatch := make(chan record.Record, 100) // permission check - if !i.options.Internal || !i.options.Local { + if !i.options.HasAllPermissions() { return func(r record.Record) error { return ErrPermissionDenied } diff --git a/database/interface_cache.go b/database/interface_cache.go index c85882b..3e349e5 100644 --- a/database/interface_cache.go +++ b/database/interface_cache.go @@ -12,7 +12,7 @@ import ( // DelayedCacheWriter must be run by the caller of an interface that uses delayed cache writing. func (i *Interface) DelayedCacheWriter(ctx context.Context) error { // Check if the DelayedCacheWriter should be run at all. - if i.options.CacheSize <= 0 || len(i.options.DelayCachedWrites) == 0 { + if i.options.CacheSize <= 0 || i.options.DelayCachedWrites == "" { return errors.New("delayed cache writer is not applicable to this database interface") } @@ -25,7 +25,7 @@ func (i *Interface) DelayedCacheWriter(ctx context.Context) error { } // percentThreshold defines the minimum percentage of entries in the write cache in relation to the cache size that need to be present in order for flushing the cache to the database storage. - percentThreshold := 25 // % + percentThreshold := 25 thresholdWriteTicker := time.NewTicker(5 * time.Second) forceWriteTicker := time.NewTicker(5 * time.Minute) @@ -105,12 +105,14 @@ func (i *Interface) cacheEvictHandler(keyData, _ interface{}) { } // Check if the evicted record is one that is to be written. + // Lock the write cache until the end of the function. + // The read cache is locked anyway for the whole duration. i.writeCacheLock.Lock() + defer i.writeCacheLock.Unlock() r, ok := i.writeCache[key] if ok { delete(i.writeCache, key) } - i.writeCacheLock.Unlock() if !ok { return } @@ -122,6 +124,10 @@ func (i *Interface) cacheEvictHandler(keyData, _ interface{}) { log.Warningf("database: failed to write evicted cache entry %q: database %q does not exist", key, r.DatabaseName()) return } + + r.Lock() + defer r.Unlock() + err = db.Put(r) if err != nil { log.Warningf("database: failed to write evicted cache entry %q to database: %s", key, err) @@ -186,7 +192,7 @@ func (i *Interface) updateCache(r record.Record, write bool) (written bool) { // 1. The record is being written. // 2. Write delaying is active. // 3. Write delaying is active for the database of this record. - if write && len(i.options.DelayCachedWrites) > 0 && r.DatabaseName() == i.options.DelayCachedWrites { + if write && r.DatabaseName() == i.options.DelayCachedWrites { i.writeCacheLock.Lock() defer i.writeCacheLock.Unlock() i.writeCache[r.Key()] = r From 6a58ce5a7a7f5c45993c97abea0800e09618cc33 Mon Sep 17 00:00:00 2001 From: Daniel Date: Mon, 12 Oct 2020 13:54:27 +0200 Subject: [PATCH 4/6] Fix locking in PutMany implementations --- database/storage/bbolt/bbolt.go | 42 +++++++++++++++++---------------- database/storage/hashmap/map.go | 7 ++++-- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index 365888c..5291e0c 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -115,26 +115,9 @@ func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) err := b.db.Batch(func(tx *bbolt.Tx) error { bucket := tx.Bucket(bucketName) for r := range batch { - if !shadowDelete && r.Meta().IsDeleted() { - // Immediate delete. - txErr := bucket.Delete([]byte(r.DatabaseKey())) - if txErr != nil { - return txErr - } - } else { - // Put or shadow delete. - - // marshal - data, txErr := r.MarshalRecord(r) - if txErr != nil { - return txErr - } - - // put - txErr = bucket.Put([]byte(r.DatabaseKey()), data) - if txErr != nil { - return txErr - } + txErr := b.batchPutOrDelete(bucket, shadowDelete, r) + if txErr != nil { + return txErr } } return nil @@ -145,6 +128,25 @@ func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) return batch, errs } +func (b *BBolt) batchPutOrDelete(bucket *bbolt.Bucket, shadowDelete bool, r record.Record) (err error) { + r.Lock() + defer r.Unlock() + + if !shadowDelete && r.Meta().IsDeleted() { + // Immediate delete. + err = bucket.Delete([]byte(r.DatabaseKey())) + } else { + // Put or shadow delete. + var data []byte + data, err = r.MarshalRecord(r) + if err == nil { + err = bucket.Put([]byte(r.DatabaseKey()), data) + } + } + + return err +} + // Delete deletes a record from the database. func (b *BBolt) Delete(key string) error { err := b.db.Update(func(tx *bbolt.Tx) error { diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go index 53fa0d9..2d08d4d 100644 --- a/database/storage/hashmap/map.go +++ b/database/storage/hashmap/map.go @@ -66,7 +66,7 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro // start handler go func() { for r := range batch { - hm.putOrDelete(shadowDelete, r) + hm.batchPutOrDelete(shadowDelete, r) } errs <- nil }() @@ -74,7 +74,10 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro return batch, errs } -func (hm *HashMap) putOrDelete(shadowDelete bool, r record.Record) { +func (hm *HashMap) batchPutOrDelete(shadowDelete bool, r record.Record) { + r.Lock() + defer r.Unlock() + hm.dbLock.Lock() defer hm.dbLock.Unlock() From 34e12860e4f8a70c35e6fb8b3df128c687117a25 Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 13 Oct 2020 14:58:27 +0200 Subject: [PATCH 5/6] Fix another locking issue --- database/interface.go | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/database/interface.go b/database/interface.go index 04f1830..9c6b4a9 100644 --- a/database/interface.go +++ b/database/interface.go @@ -92,11 +92,28 @@ func (o *Options) Apply(r record.Record) { } } -// HasAllPermissions returns whether the options specify the highest possible permissions for operations. +// HasAllPermissions returns whether the options specify the highest possible +// permissions for operations. func (o *Options) HasAllPermissions() bool { return o.Local && o.Internal } +// hasAccessPermission checks if the interface options permit access to the +// given record, locking the record for accessing it's attributes. +func (o *Options) hasAccessPermission(r record.Record) bool { + // Check if the options specify all permissions, which makes checking the + // record unnecessary. + if o.HasAllPermissions() { + return true + } + + r.Lock() + defer r.Unlock() + + // Check permissions against record. + return r.Meta().CheckPermission(o.Local, o.Internal) +} + // NewInterface returns a new Interface to the database. func NewInterface(opts *Options) *Interface { if opts == nil { @@ -156,13 +173,8 @@ func (i *Interface) getRecord(dbName string, dbKey string, mustBeWriteable bool) r = i.checkCache(dbName + ":" + dbKey) if r != nil { - if !i.options.HasAllPermissions() { - r.Lock() - permitted := r.Meta().CheckPermission(i.options.Local, i.options.Internal) - r.Unlock() - if !permitted { - return nil, db, ErrPermissionDenied - } + if !i.options.hasAccessPermission(r) { + return nil, db, ErrPermissionDenied } return r, db, nil } @@ -172,7 +184,7 @@ func (i *Interface) getRecord(dbName string, dbKey string, mustBeWriteable bool) return nil, db, err } - if !r.Meta().CheckPermission(i.options.Local, i.options.Internal) { + if !i.options.hasAccessPermission(r) { return nil, db, ErrPermissionDenied } From 7e1125fc65cbaeccfc8a568070b1d4e6268f8216 Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 13 Oct 2020 16:51:34 +0200 Subject: [PATCH 6/6] Add cancel command to database api --- api/database.go | 49 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/api/database.go b/api/database.go index 9830571..965ed37 100644 --- a/api/database.go +++ b/api/database.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "sync" "github.com/gorilla/websocket" "github.com/tevino/abool" @@ -43,7 +44,9 @@ func init() { type DatabaseAPI struct { conn *websocket.Conn sendQueue chan []byte - subs map[string]*database.Subscription + + subsLock sync.Mutex + subs map[string]*database.Subscription shutdownSignal chan struct{} shuttingDown *abool.AtomicBool @@ -141,6 +144,15 @@ func (api *DatabaseAPI) handler() { } parts := bytes.SplitN(msg, []byte("|"), 3) + + // Handle special command "cancel" + if len(parts) == 2 && string(parts[1]) != "cancel" { + // 125|cancel + // 127|cancel + go api.handleCancel(parts[0]) + continue + } + if len(parts) != 3 { api.send(nil, dbMsgTypeError, "bad request: malformed message", nil) continue @@ -340,6 +352,7 @@ func (api *DatabaseAPI) handleSub(opID []byte, queryText string) { // 125|new|| // 125|delete| // 125|warning| // error with single record, operation continues + // 125|cancel var err error q, err := query.ParseQuery(queryText) @@ -362,10 +375,23 @@ func (api *DatabaseAPI) registerSub(opID []byte, q *query.Query) (sub *database. api.send(opID, dbMsgTypeError, err.Error(), nil) return nil, false } + + // Save subscription. + api.subsLock.Lock() + defer api.subsLock.Unlock() + api.subs[string(opID)] = sub + return sub, true } func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) { + // Remove subscription after it ended. + defer func() { + api.subsLock.Lock() + defer api.subsLock.Unlock() + delete(api.subs, string(opID)) + }() + for { select { case <-api.shutdownSignal: @@ -414,6 +440,7 @@ func (api *DatabaseAPI) handleQsub(opID []byte, queryText string) { // 127|new|| // 127|delete| // 127|warning| // error with single record, operation continues + // 127|cancel var err error @@ -434,6 +461,26 @@ func (api *DatabaseAPI) handleQsub(opID []byte, queryText string) { api.processSub(opID, sub) } +func (api *DatabaseAPI) handleCancel(opID []byte) { + api.subsLock.Lock() + defer api.subsLock.Unlock() + + // Get subscription from api. + sub, ok := api.subs[string(opID)] + if !ok { + api.send(opID, dbMsgTypeError, "could not find subscription", nil) + return + } + + // End subscription. + err := sub.Cancel() + if err != nil { + api.send(opID, dbMsgTypeError, fmt.Sprintf("failed to cancel subscription: %s", err), nil) + } + + // Subscription handler will end the communication with a done message. +} + func (api *DatabaseAPI) handlePut(opID []byte, key string, data []byte, create bool) { // 128|create|| // 128|success