diff --git a/database/database_test.go b/database/database_test.go index 790bd7f..f625884 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -22,6 +22,24 @@ import ( _ "github.com/safing/portbase/database/storage/hashmap" ) +func TestMain(m *testing.M) { + testDir, err := ioutil.TempDir("", "portbase-database-testing-") + if err != nil { + panic(err) + } + + err = InitializeWithPath(testDir) + if err != nil { + panic(err) + } + + exitCode := m.Run() + + os.RemoveAll(testDir) // clean up + + os.Exit(exitCode) +} + func makeKey(dbName, key string) string { return fmt.Sprintf("%s:%s", dbName, key) } @@ -220,23 +238,19 @@ func testDatabase(t *testing.T, storageType string, shadowDelete bool) { //nolin func TestDatabaseSystem(t *testing.T) { // panic after 10 seconds, to check for locks - go func() { - time.Sleep(10 * time.Second) - fmt.Println("===== TAKING TOO LONG - PRINTING STACK TRACES =====") - _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) - os.Exit(1) + finished := make(chan struct{}) + defer func() { + close(finished) + }() + go func() { + select { + case <-finished: + case <-time.After(10 * time.Second): + fmt.Println("===== TAKING TOO LONG - PRINTING STACK TRACES =====") + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + os.Exit(1) + } }() - - testDir, err := ioutil.TempDir("", "portbase-database-testing-") - if err != nil { - t.Fatal(err) - } - - err = InitializeWithPath(testDir) - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(testDir) // clean up for _, shadowDelete := range []bool{false, true} { testDatabase(t, "bbolt", shadowDelete) @@ -246,7 +260,7 @@ func TestDatabaseSystem(t *testing.T) { // TODO: Fix badger tests } - err = MaintainRecordStates(context.TODO()) + err := MaintainRecordStates(context.TODO()) if err != nil { t.Fatal(err) } diff --git a/database/interface.go b/database/interface.go index 02bae88..f921e60 100644 --- a/database/interface.go +++ b/database/interface.go @@ -4,11 +4,11 @@ import ( "context" "errors" "fmt" + "sync" "time" - "github.com/tevino/abool" - "github.com/bluele/gcache" + "github.com/tevino/abool" "github.com/safing/portbase/database/accessor" "github.com/safing/portbase/database/iterator" @@ -24,6 +24,10 @@ const ( type Interface struct { options *Options cache gcache.Cache + + writeCache map[string]record.Record + writeCacheLock sync.Mutex + triggerCacheWrite chan struct{} } // Options holds options that may be set for an Interface instance. @@ -34,7 +38,22 @@ type Options struct { AlwaysMakeCrownjewel bool AlwaysSetRelativateExpiry int64 AlwaysSetAbsoluteExpiry int64 - CacheSize int + + // CacheSize defines that a cache should be used for this interface and + // defines it's size. + // Caching comes with an important caveat: If database records are changed + // from another interface, the cache will not be invalidated for these + // records. It will therefore serve outdated data until that record is + // evicted from the cache. + CacheSize int + + // DelayCachedWrites defines a database storage for which cache writes should + // be cached and batched. The database backend must support the Batcher + // interface. This option is only valid if used with a cache. + // Additionally, this may only be used for internal and local interfaces. + // Please note that this means that other interfaces will not be able to + // guarantee to serve the latest record if records are written this way. + DelayCachedWrites string } // Apply applies options to the record metadata. @@ -63,57 +82,40 @@ func NewInterface(opts *Options) *Interface { options: opts, } if opts.CacheSize > 0 { - new.cache = gcache.New(opts.CacheSize).ARC().Expiration(time.Hour).Build() + cacheBuilder := gcache.New(opts.CacheSize).ARC() + if len(opts.DelayCachedWrites) > 0 { + cacheBuilder.EvictedFunc(new.cacheEvictHandler) + new.writeCache = make(map[string]record.Record) + new.triggerCacheWrite = make(chan struct{}) + } + new.cache = cacheBuilder.Build() } return new } -func (i *Interface) checkCache(key string) (record.Record, bool) { - if i.cache != nil { - cacheVal, err := i.cache.Get(key) - if err == nil { - r, ok := cacheVal.(record.Record) - if ok { - return r, true - } - } - } - return nil, false -} - -func (i *Interface) updateCache(r record.Record) { - if i.cache != nil { - _ = i.cache.Set(r.Key(), r) - } -} - // Exists return whether a record with the given key exists. func (i *Interface) Exists(key string) (bool, error) { - _, _, err := i.getRecord(getDBFromKey, key, false, false) + _, err := i.Get(key) if err != nil { - if err == ErrNotFound { + switch err { + case ErrNotFound: return false, nil + case ErrPermissionDenied: + return true, nil + default: + return false, err } - return false, err } return true, nil } // Get return the record with the given key. func (i *Interface) Get(key string) (record.Record, error) { - r, ok := i.checkCache(key) - if ok { - if !r.Meta().CheckPermission(i.options.Local, i.options.Internal) { - return nil, ErrPermissionDenied - } - return r, nil - } - - r, _, err := i.getRecord(getDBFromKey, key, true, false) + r, _, err := i.getRecord(getDBFromKey, key, false) return r, err } -func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWriteable bool) (r record.Record, db *Controller, err error) { +func (i *Interface) getRecord(dbName string, dbKey string, mustBeWriteable bool) (r record.Record, db *Controller, err error) { if dbName == "" { dbName, dbKey = record.ParseKey(dbKey) } @@ -124,19 +126,24 @@ func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWri } if mustBeWriteable && db.ReadOnly() { - return nil, nil, ErrReadOnly + return nil, db, ErrReadOnly + } + + r = i.checkCache(dbName + ":" + dbKey) + if r != nil { + if !r.Meta().CheckPermission(i.options.Local, i.options.Internal) { + return nil, db, ErrPermissionDenied + } + return r, db, nil } r, err = db.Get(dbKey) if err != nil { - if err == ErrNotFound { - return nil, db, err - } - return nil, nil, err + return nil, db, err } - if check && !r.Meta().CheckPermission(i.options.Local, i.options.Internal) { - return nil, nil, ErrPermissionDenied + if !r.Meta().CheckPermission(i.options.Local, i.options.Internal) { + return nil, db, ErrPermissionDenied } return r, db, nil @@ -144,7 +151,7 @@ func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWri // InsertValue inserts a value into a record. func (i *Interface) InsertValue(key string, attribute string, value interface{}) error { - r, db, err := i.getRecord(getDBFromKey, key, true, true) + r, db, err := i.getRecord(getDBFromKey, key, true) if err != nil { return err } @@ -177,7 +184,40 @@ func (i *Interface) Put(r record.Record) (err error) { // get record or only database var db *Controller if !i.options.Internal || !i.options.Local { - _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) + _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true) + if err != nil && err != ErrNotFound { + return err + } + } else { + db, err = getController(r.DatabaseName()) + if err != nil { + return err + } + } + + // Check if database is read only before we add to the cache. + if db.ReadOnly() { + return ErrReadOnly + } + + r.Lock() + defer r.Unlock() + i.options.Apply(r) + + written := i.updateCache(r, true) + if written { + return nil + } + + return db.Put(r) +} + +// PutNew saves a record to the database as a new record (ie. with new timestamps). +func (i *Interface) PutNew(r record.Record) (err error) { + // get record or only database + var db *Controller + if !i.options.Internal || !i.options.Local { + _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true) if err != nil && err != ErrNotFound { return err } @@ -191,36 +231,16 @@ func (i *Interface) Put(r record.Record) (err error) { r.Lock() defer r.Unlock() - i.options.Apply(r) - - i.updateCache(r) - return db.Put(r) -} - -// PutNew saves a record to the database as a new record (ie. with new timestamps). -func (i *Interface) PutNew(r record.Record) (err error) { - // get record or only database - var db *Controller - if !i.options.Internal || !i.options.Local { - _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) - if err != nil && err != ErrNotFound { - return err - } - } else { - db, err = getController(r.DatabaseKey()) - if err != nil { - return err - } - } - - r.Lock() - defer r.Unlock() - if r.Meta() != nil { r.Meta().Reset() } i.options.Apply(r) - i.updateCache(r) + + written := i.updateCache(r, true) + if written { + return nil + } + return db.Put(r) } @@ -316,7 +336,7 @@ func (i *Interface) PutMany(dbName string) (put func(record.Record) error) { // SetAbsoluteExpiry sets an absolute record expiry. func (i *Interface) SetAbsoluteExpiry(key string, time int64) error { - r, db, err := i.getRecord(getDBFromKey, key, true, true) + r, db, err := i.getRecord(getDBFromKey, key, true) if err != nil { return err } @@ -331,7 +351,7 @@ func (i *Interface) SetAbsoluteExpiry(key string, time int64) error { // SetRelativateExpiry sets a relative (self-updating) record expiry. func (i *Interface) SetRelativateExpiry(key string, duration int64) error { - r, db, err := i.getRecord(getDBFromKey, key, true, true) + r, db, err := i.getRecord(getDBFromKey, key, true) if err != nil { return err } @@ -346,7 +366,7 @@ func (i *Interface) SetRelativateExpiry(key string, duration int64) error { // MakeSecret marks the record as a secret, meaning interfacing processes, such as an UI, are denied access to the record. func (i *Interface) MakeSecret(key string) error { - r, db, err := i.getRecord(getDBFromKey, key, true, true) + r, db, err := i.getRecord(getDBFromKey, key, true) if err != nil { return err } @@ -361,7 +381,7 @@ func (i *Interface) MakeSecret(key string) error { // MakeCrownJewel marks a record as a crown jewel, meaning it will only be accessible locally. func (i *Interface) MakeCrownJewel(key string) error { - r, db, err := i.getRecord(getDBFromKey, key, true, true) + r, db, err := i.getRecord(getDBFromKey, key, true) if err != nil { return err } @@ -376,7 +396,7 @@ func (i *Interface) MakeCrownJewel(key string) error { // Delete deletes a record from the database. func (i *Interface) Delete(key string) error { - r, db, err := i.getRecord(getDBFromKey, key, true, true) + r, db, err := i.getRecord(getDBFromKey, key, true) if err != nil { return err } diff --git a/database/interface_cache.go b/database/interface_cache.go new file mode 100644 index 0000000..d192e26 --- /dev/null +++ b/database/interface_cache.go @@ -0,0 +1,171 @@ +package database + +import ( + "context" + "errors" + "time" + + "github.com/safing/portbase/database/record" + "github.com/safing/portbase/log" +) + +// DelayedCacheWriter must be run by the caller of an interface that uses delayed cache writing. +func (i *Interface) DelayedCacheWriter(ctx context.Context) error { + // Check if the DelayedCacheWriter should be run at all. + if i.options.CacheSize <= 0 || len(i.options.DelayCachedWrites) == 0 { + return errors.New("delayed cache writer is not applicable to this database interface") + } + + // Check if backend support the Batcher interface. + batchPut := i.PutMany(i.options.DelayCachedWrites) + // End batchPut immediately and check for an error. + err := batchPut(nil) + if err != nil { + return err + } + + for { + // Wait for trigger for writing the cache. + select { + case <-ctx.Done(): + // Module is shutting down, flush write cache to database. + i.flushWriteCache() + return nil + + case <-i.triggerCacheWrite: + case <-time.After(5 * time.Second): + } + + i.flushWriteCache() + } +} + +func (i *Interface) flushWriteCache() { + i.writeCacheLock.Lock() + defer i.writeCacheLock.Unlock() + + // Check if there is anything to do. + if len(i.writeCache) == 0 { + return + } + + // Write the full cache in a batch operation. + batchPut := i.PutMany(i.options.DelayCachedWrites) + for _, r := range i.writeCache { + err := batchPut(r) + if err != nil { + log.Warningf("database: failed to write write-cached entry to %q database: %s", i.options.DelayCachedWrites, err) + } + } + // Finish batch. + err := batchPut(nil) + if err != nil { + log.Warningf("database: failed to finish flushing write cache to %q database: %s", i.options.DelayCachedWrites, err) + } + + // Optimized map clearing following the Go1.11 recommendation. + for key := range i.writeCache { + delete(i.writeCache, key) + } +} + +// cacheEvictHandler is run by the cache for every entry that gets evicted +// from the cache. +func (i *Interface) cacheEvictHandler(keyData, _ interface{}) { + // Transform the key into a string. + key, ok := keyData.(string) + if !ok { + return + } + + // Check if the evicted record is one that is to be written. + i.writeCacheLock.Lock() + r, ok := i.writeCache[key] + if ok { + delete(i.writeCache, key) + } + i.writeCacheLock.Unlock() + if !ok { + return + } + + // Write record to database in order to mitigate race conditions where the record would appear + // as non-existent for a short duration. + db, err := getController(r.DatabaseName()) + if err != nil { + log.Warningf("database: failed to write evicted cache entry %q: database %q does not exist", key, r.DatabaseName()) + return + } + err = db.Put(r) + if err != nil { + log.Warningf("database: failed to write evicted cache entry %q to database: %s", key, err) + } + + // Finally, trigger writing the full write cache because a to-be-written + // entry was just evicted from the cache, and this makes it likely that more + // to-be-written entries will be evicted shortly. + select { + case i.triggerCacheWrite <- struct{}{}: + default: + } +} + +func (i *Interface) checkCache(key string) record.Record { + // Check if cache is in use. + if i.cache == nil { + return nil + } + + // Check if record exists in cache. + cacheVal, err := i.cache.Get(key) + if err == nil { + r, ok := cacheVal.(record.Record) + if ok { + return r + } + } + return nil +} + +func (i *Interface) updateCache(r record.Record, write bool) (written bool) { + // Check if cache is in use. + if i.cache == nil { + return false + } + + // Check if record should be deleted + if r.Meta().IsDeleted() { + // Remove entry from cache. + i.cache.Remove(r.Key()) + // Let write through to database storage. + return false + } + + // Update cache with record. + ttl := r.Meta().GetRelativeExpiry() + if ttl >= 0 { + _ = i.cache.SetWithExpire( + r.Key(), + r, + time.Duration(ttl)*time.Second, + ) + } else { + _ = i.cache.Set( + r.Key(), + r, + ) + } + + // Add record to write cache instead if: + // 1. The record is being written. + // 2. Write delaying is active. + // 3. Write delaying is active for the database of this record. + if write && len(i.options.DelayCachedWrites) > 0 && r.DatabaseName() == i.options.DelayCachedWrites { + i.writeCacheLock.Lock() + defer i.writeCacheLock.Unlock() + i.writeCache[r.Key()] = r + return true + } + + return false +} diff --git a/database/interface_cache_test.go b/database/interface_cache_test.go new file mode 100644 index 0000000..ff65260 --- /dev/null +++ b/database/interface_cache_test.go @@ -0,0 +1,158 @@ +package database + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" +) + +func benchmarkCacheWriting(b *testing.B, storageType string, cacheSize int, sampleSize int, delayWrites bool) { //nolint:gocognit,gocyclo + b.Run(fmt.Sprintf("CacheWriting_%s_%d_%d_%v", storageType, cacheSize, sampleSize, delayWrites), func(b *testing.B) { + // Setup Benchmark. + + // Create database. + dbName := fmt.Sprintf("cache-w-benchmark-%s-%d-%d-%v", storageType, cacheSize, sampleSize, delayWrites) + _, err := Register(&Database{ + Name: dbName, + Description: fmt.Sprintf("Cache Benchmark Database for %s", storageType), + StorageType: storageType, + }) + if err != nil { + b.Fatal(err) + } + + // Create benchmark interface. + options := &Options{ + Local: true, + Internal: true, + CacheSize: cacheSize, + } + if cacheSize > 0 && delayWrites { + options.DelayCachedWrites = dbName + } + db := NewInterface(options) + + // Start + ctx, cancelCtx := context.WithCancel(context.Background()) + var wg sync.WaitGroup + if cacheSize > 0 && delayWrites { + wg.Add(1) + go func() { + err := db.DelayedCacheWriter(ctx) + if err != nil { + panic(err) + } + wg.Done() + }() + } + + // Start Benchmark. + b.ResetTimer() + for i := 0; i < b.N; i++ { + testRecordID := i % sampleSize + r := NewExample( + dbName+":"+strconv.Itoa(testRecordID), + "A", + 1, + ) + err = db.Put(r) + if err != nil { + b.Fatal(err) + } + } + + // End cache writer and wait + cancelCtx() + wg.Wait() + + }) +} + +func benchmarkCacheReadWrite(b *testing.B, storageType string, cacheSize int, sampleSize int, delayWrites bool) { //nolint:gocognit,gocyclo + b.Run(fmt.Sprintf("CacheReadWrite_%s_%d_%d_%v", storageType, cacheSize, sampleSize, delayWrites), func(b *testing.B) { + // Setup Benchmark. + + // Create database. + dbName := fmt.Sprintf("cache-rw-benchmark-%s-%d-%d-%v", storageType, cacheSize, sampleSize, delayWrites) + _, err := Register(&Database{ + Name: dbName, + Description: fmt.Sprintf("Cache Benchmark Database for %s", storageType), + StorageType: storageType, + }) + if err != nil { + b.Fatal(err) + } + + // Create benchmark interface. + options := &Options{ + Local: true, + Internal: true, + CacheSize: cacheSize, + } + if cacheSize > 0 && delayWrites { + options.DelayCachedWrites = dbName + } + db := NewInterface(options) + + // Start + ctx, cancelCtx := context.WithCancel(context.Background()) + var wg sync.WaitGroup + if cacheSize > 0 && delayWrites { + wg.Add(1) + go func() { + err := db.DelayedCacheWriter(ctx) + if err != nil { + panic(err) + } + wg.Done() + }() + } + + // Start Benchmark. + b.ResetTimer() + writing := true + for i := 0; i < b.N; i++ { + testRecordID := i % sampleSize + key := dbName + ":" + strconv.Itoa(testRecordID) + + if i > 0 && testRecordID == 0 { + writing = !writing // switch between reading and writing every samplesize + } + + if writing { + r := NewExample(key, "A", 1) + err = db.Put(r) + } else { + _, err = db.Get(key) + } + if err != nil { + b.Fatal(err) + } + } + + // End cache writer and wait + cancelCtx() + wg.Wait() + + }) +} + +func BenchmarkCache(b *testing.B) { + for _, storageType := range []string{"bbolt", "hashmap"} { + benchmarkCacheWriting(b, storageType, 32, 8, false) + benchmarkCacheWriting(b, storageType, 32, 8, true) + benchmarkCacheWriting(b, storageType, 32, 1024, false) + benchmarkCacheWriting(b, storageType, 32, 1024, true) + benchmarkCacheWriting(b, storageType, 512, 1024, false) + benchmarkCacheWriting(b, storageType, 512, 1024, true) + + benchmarkCacheReadWrite(b, storageType, 32, 8, false) + benchmarkCacheReadWrite(b, storageType, 32, 8, true) + benchmarkCacheReadWrite(b, storageType, 32, 1024, false) + benchmarkCacheReadWrite(b, storageType, 32, 1024, true) + benchmarkCacheReadWrite(b, storageType, 512, 1024, false) + benchmarkCacheReadWrite(b, storageType, 512, 1024, true) + } +} diff --git a/database/record/base.go b/database/record/base.go index 8bc1d6b..33f6d2f 100644 --- a/database/record/base.go +++ b/database/record/base.go @@ -2,7 +2,6 @@ package record import ( "errors" - "fmt" "github.com/safing/portbase/container" "github.com/safing/portbase/database/accessor" @@ -18,7 +17,7 @@ type Base struct { // Key returns the key of the database record. func (b *Base) Key() string { - return fmt.Sprintf("%s:%s", b.dbName, b.dbKey) + return b.dbName + ":" + b.dbKey } // KeyIsSet returns true if the database key is set. diff --git a/database/record/meta.go b/database/record/meta.go index 23ed287..e9f4fcf 100644 --- a/database/record/meta.go +++ b/database/record/meta.go @@ -31,9 +31,10 @@ func (m *Meta) GetAbsoluteExpiry() int64 { } // GetRelativeExpiry returns the current relative expiry time - ie. seconds until expiry. +// A negative value signifies that the record does not expire. func (m *Meta) GetRelativeExpiry() int64 { - if m.Deleted < 0 { - return -m.Deleted + if m.Expires == 0 { + return -1 } abs := m.Expires - time.Now().Unix() diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go index 4005f07..53fa0d9 100644 --- a/database/storage/hashmap/map.go +++ b/database/storage/hashmap/map.go @@ -66,11 +66,7 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro // start handler go func() { for r := range batch { - if !shadowDelete && r.Meta().IsDeleted() { - delete(hm.db, r.DatabaseKey()) - } else { - hm.db[r.DatabaseKey()] = r - } + hm.putOrDelete(shadowDelete, r) } errs <- nil }() @@ -78,6 +74,17 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro return batch, errs } +func (hm *HashMap) putOrDelete(shadowDelete bool, r record.Record) { + hm.dbLock.Lock() + defer hm.dbLock.Unlock() + + if !shadowDelete && r.Meta().IsDeleted() { + delete(hm.db, r.DatabaseKey()) + } else { + hm.db[r.DatabaseKey()] = r + } +} + // Delete deletes a record from the database. func (hm *HashMap) Delete(key string) error { hm.dbLock.Lock()