mirror of
https://github.com/safing/portbase
synced 2025-09-14 09:09:50 +00:00
Add a write-cache to the database interface options
This commit is contained in:
parent
d4bb5ae522
commit
abaf5ee879
7 changed files with 472 additions and 102 deletions
171
database/interface_cache.go
Normal file
171
database/interface_cache.go
Normal file
|
@ -0,0 +1,171 @@
|
|||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/safing/portbase/database/record"
|
||||
"github.com/safing/portbase/log"
|
||||
)
|
||||
|
||||
// DelayedCacheWriter must be run by the caller of an interface that uses delayed cache writing.
|
||||
func (i *Interface) DelayedCacheWriter(ctx context.Context) error {
|
||||
// Check if the DelayedCacheWriter should be run at all.
|
||||
if i.options.CacheSize <= 0 || len(i.options.DelayCachedWrites) == 0 {
|
||||
return errors.New("delayed cache writer is not applicable to this database interface")
|
||||
}
|
||||
|
||||
// Check if backend support the Batcher interface.
|
||||
batchPut := i.PutMany(i.options.DelayCachedWrites)
|
||||
// End batchPut immediately and check for an error.
|
||||
err := batchPut(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
// Wait for trigger for writing the cache.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Module is shutting down, flush write cache to database.
|
||||
i.flushWriteCache()
|
||||
return nil
|
||||
|
||||
case <-i.triggerCacheWrite:
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
|
||||
i.flushWriteCache()
|
||||
}
|
||||
}
|
||||
|
||||
func (i *Interface) flushWriteCache() {
|
||||
i.writeCacheLock.Lock()
|
||||
defer i.writeCacheLock.Unlock()
|
||||
|
||||
// Check if there is anything to do.
|
||||
if len(i.writeCache) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Write the full cache in a batch operation.
|
||||
batchPut := i.PutMany(i.options.DelayCachedWrites)
|
||||
for _, r := range i.writeCache {
|
||||
err := batchPut(r)
|
||||
if err != nil {
|
||||
log.Warningf("database: failed to write write-cached entry to %q database: %s", i.options.DelayCachedWrites, err)
|
||||
}
|
||||
}
|
||||
// Finish batch.
|
||||
err := batchPut(nil)
|
||||
if err != nil {
|
||||
log.Warningf("database: failed to finish flushing write cache to %q database: %s", i.options.DelayCachedWrites, err)
|
||||
}
|
||||
|
||||
// Optimized map clearing following the Go1.11 recommendation.
|
||||
for key := range i.writeCache {
|
||||
delete(i.writeCache, key)
|
||||
}
|
||||
}
|
||||
|
||||
// cacheEvictHandler is run by the cache for every entry that gets evicted
|
||||
// from the cache.
|
||||
func (i *Interface) cacheEvictHandler(keyData, _ interface{}) {
|
||||
// Transform the key into a string.
|
||||
key, ok := keyData.(string)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if the evicted record is one that is to be written.
|
||||
i.writeCacheLock.Lock()
|
||||
r, ok := i.writeCache[key]
|
||||
if ok {
|
||||
delete(i.writeCache, key)
|
||||
}
|
||||
i.writeCacheLock.Unlock()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Write record to database in order to mitigate race conditions where the record would appear
|
||||
// as non-existent for a short duration.
|
||||
db, err := getController(r.DatabaseName())
|
||||
if err != nil {
|
||||
log.Warningf("database: failed to write evicted cache entry %q: database %q does not exist", key, r.DatabaseName())
|
||||
return
|
||||
}
|
||||
err = db.Put(r)
|
||||
if err != nil {
|
||||
log.Warningf("database: failed to write evicted cache entry %q to database: %s", key, err)
|
||||
}
|
||||
|
||||
// Finally, trigger writing the full write cache because a to-be-written
|
||||
// entry was just evicted from the cache, and this makes it likely that more
|
||||
// to-be-written entries will be evicted shortly.
|
||||
select {
|
||||
case i.triggerCacheWrite <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (i *Interface) checkCache(key string) record.Record {
|
||||
// Check if cache is in use.
|
||||
if i.cache == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if record exists in cache.
|
||||
cacheVal, err := i.cache.Get(key)
|
||||
if err == nil {
|
||||
r, ok := cacheVal.(record.Record)
|
||||
if ok {
|
||||
return r
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Interface) updateCache(r record.Record, write bool) (written bool) {
|
||||
// Check if cache is in use.
|
||||
if i.cache == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if record should be deleted
|
||||
if r.Meta().IsDeleted() {
|
||||
// Remove entry from cache.
|
||||
i.cache.Remove(r.Key())
|
||||
// Let write through to database storage.
|
||||
return false
|
||||
}
|
||||
|
||||
// Update cache with record.
|
||||
ttl := r.Meta().GetRelativeExpiry()
|
||||
if ttl >= 0 {
|
||||
_ = i.cache.SetWithExpire(
|
||||
r.Key(),
|
||||
r,
|
||||
time.Duration(ttl)*time.Second,
|
||||
)
|
||||
} else {
|
||||
_ = i.cache.Set(
|
||||
r.Key(),
|
||||
r,
|
||||
)
|
||||
}
|
||||
|
||||
// Add record to write cache instead if:
|
||||
// 1. The record is being written.
|
||||
// 2. Write delaying is active.
|
||||
// 3. Write delaying is active for the database of this record.
|
||||
if write && len(i.options.DelayCachedWrites) > 0 && r.DatabaseName() == i.options.DelayCachedWrites {
|
||||
i.writeCacheLock.Lock()
|
||||
defer i.writeCacheLock.Unlock()
|
||||
i.writeCache[r.Key()] = r
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue