mirror of
https://github.com/safing/portbase
synced 2025-09-01 18:19:57 +00:00
Merge branch 'develop' into feature/ui-revamp
This commit is contained in:
commit
f48ed07467
9 changed files with 625 additions and 129 deletions
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/tevino/abool"
|
||||
|
@ -43,7 +44,9 @@ func init() {
|
|||
type DatabaseAPI struct {
|
||||
conn *websocket.Conn
|
||||
sendQueue chan []byte
|
||||
subs map[string]*database.Subscription
|
||||
|
||||
subsLock sync.Mutex
|
||||
subs map[string]*database.Subscription
|
||||
|
||||
shutdownSignal chan struct{}
|
||||
shuttingDown *abool.AtomicBool
|
||||
|
@ -141,6 +144,15 @@ func (api *DatabaseAPI) handler() {
|
|||
}
|
||||
|
||||
parts := bytes.SplitN(msg, []byte("|"), 3)
|
||||
|
||||
// Handle special command "cancel"
|
||||
if len(parts) == 2 && string(parts[1]) != "cancel" {
|
||||
// 125|cancel
|
||||
// 127|cancel
|
||||
go api.handleCancel(parts[0])
|
||||
continue
|
||||
}
|
||||
|
||||
if len(parts) != 3 {
|
||||
api.send(nil, dbMsgTypeError, "bad request: malformed message", nil)
|
||||
continue
|
||||
|
@ -340,6 +352,7 @@ func (api *DatabaseAPI) handleSub(opID []byte, queryText string) {
|
|||
// 125|new|<key>|<data>
|
||||
// 125|delete|<key>
|
||||
// 125|warning|<message> // error with single record, operation continues
|
||||
// 125|cancel
|
||||
var err error
|
||||
|
||||
q, err := query.ParseQuery(queryText)
|
||||
|
@ -362,10 +375,23 @@ func (api *DatabaseAPI) registerSub(opID []byte, q *query.Query) (sub *database.
|
|||
api.send(opID, dbMsgTypeError, err.Error(), nil)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Save subscription.
|
||||
api.subsLock.Lock()
|
||||
defer api.subsLock.Unlock()
|
||||
api.subs[string(opID)] = sub
|
||||
|
||||
return sub, true
|
||||
}
|
||||
|
||||
func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) {
|
||||
// Remove subscription after it ended.
|
||||
defer func() {
|
||||
api.subsLock.Lock()
|
||||
defer api.subsLock.Unlock()
|
||||
delete(api.subs, string(opID))
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-api.shutdownSignal:
|
||||
|
@ -414,6 +440,7 @@ func (api *DatabaseAPI) handleQsub(opID []byte, queryText string) {
|
|||
// 127|new|<key>|<data>
|
||||
// 127|delete|<key>
|
||||
// 127|warning|<message> // error with single record, operation continues
|
||||
// 127|cancel
|
||||
|
||||
var err error
|
||||
|
||||
|
@ -434,6 +461,26 @@ func (api *DatabaseAPI) handleQsub(opID []byte, queryText string) {
|
|||
api.processSub(opID, sub)
|
||||
}
|
||||
|
||||
func (api *DatabaseAPI) handleCancel(opID []byte) {
|
||||
api.subsLock.Lock()
|
||||
defer api.subsLock.Unlock()
|
||||
|
||||
// Get subscription from api.
|
||||
sub, ok := api.subs[string(opID)]
|
||||
if !ok {
|
||||
api.send(opID, dbMsgTypeError, "could not find subscription", nil)
|
||||
return
|
||||
}
|
||||
|
||||
// End subscription.
|
||||
err := sub.Cancel()
|
||||
if err != nil {
|
||||
api.send(opID, dbMsgTypeError, fmt.Sprintf("failed to cancel subscription: %s", err), nil)
|
||||
}
|
||||
|
||||
// Subscription handler will end the communication with a done message.
|
||||
}
|
||||
|
||||
func (api *DatabaseAPI) handlePut(opID []byte, key string, data []byte, create bool) {
|
||||
// 128|create|<key>|<data>
|
||||
// 128|success
|
||||
|
|
|
@ -22,6 +22,26 @@ import (
|
|||
_ "github.com/safing/portbase/database/storage/hashmap"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
testDir, err := ioutil.TempDir("", "portbase-database-testing-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = InitializeWithPath(testDir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
exitCode := m.Run()
|
||||
|
||||
// Clean up the test directory.
|
||||
// Do not defer, as we end this function with a os.Exit call.
|
||||
os.RemoveAll(testDir)
|
||||
|
||||
os.Exit(exitCode)
|
||||
}
|
||||
|
||||
func makeKey(dbName, key string) string {
|
||||
return fmt.Sprintf("%s:%s", dbName, key)
|
||||
}
|
||||
|
@ -220,24 +240,18 @@ func testDatabase(t *testing.T, storageType string, shadowDelete bool) { //nolin
|
|||
func TestDatabaseSystem(t *testing.T) {
|
||||
|
||||
// panic after 10 seconds, to check for locks
|
||||
finished := make(chan struct{})
|
||||
defer close(finished)
|
||||
go func() {
|
||||
time.Sleep(10 * time.Second)
|
||||
fmt.Println("===== TAKING TOO LONG - PRINTING STACK TRACES =====")
|
||||
_ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
os.Exit(1)
|
||||
select {
|
||||
case <-finished:
|
||||
case <-time.After(10 * time.Second):
|
||||
fmt.Println("===== TAKING TOO LONG - PRINTING STACK TRACES =====")
|
||||
_ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
os.Exit(1)
|
||||
}
|
||||
}()
|
||||
|
||||
testDir, err := ioutil.TempDir("", "portbase-database-testing-")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = InitializeWithPath(testDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(testDir) // clean up
|
||||
|
||||
for _, shadowDelete := range []bool{false, true} {
|
||||
testDatabase(t, "bbolt", shadowDelete)
|
||||
testDatabase(t, "hashmap", shadowDelete)
|
||||
|
@ -246,7 +260,7 @@ func TestDatabaseSystem(t *testing.T) {
|
|||
// TODO: Fix badger tests
|
||||
}
|
||||
|
||||
err = MaintainRecordStates(context.TODO())
|
||||
err := MaintainRecordStates(context.TODO())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -4,11 +4,11 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tevino/abool"
|
||||
|
||||
"github.com/bluele/gcache"
|
||||
"github.com/tevino/abool"
|
||||
|
||||
"github.com/safing/portbase/database/accessor"
|
||||
"github.com/safing/portbase/database/iterator"
|
||||
|
@ -24,17 +24,56 @@ const (
|
|||
type Interface struct {
|
||||
options *Options
|
||||
cache gcache.Cache
|
||||
|
||||
writeCache map[string]record.Record
|
||||
writeCacheLock sync.Mutex
|
||||
triggerCacheWrite chan struct{}
|
||||
}
|
||||
|
||||
// Options holds options that may be set for an Interface instance.
|
||||
type Options struct {
|
||||
Local bool
|
||||
Internal bool
|
||||
AlwaysMakeSecret bool
|
||||
AlwaysMakeCrownjewel bool
|
||||
// Local specifies if the interface is used by an actor on the local device.
|
||||
// Setting both the Local and Internal flags will bring performance
|
||||
// improvements because less checks are needed.
|
||||
Local bool
|
||||
|
||||
// Internal specifies if the interface is used by an actor within the
|
||||
// software. Setting both the Local and Internal flags will bring performance
|
||||
// improvements because less checks are needed.
|
||||
Internal bool
|
||||
|
||||
// AlwaysMakeSecret will have the interface mark all saved records as secret.
|
||||
// This means that they will be only accessible by an internal interface.
|
||||
AlwaysMakeSecret bool
|
||||
|
||||
// AlwaysMakeCrownjewel will have the interface mark all saved records as
|
||||
// crown jewels. This means that they will be only accessible by a local
|
||||
// interface.
|
||||
AlwaysMakeCrownjewel bool
|
||||
|
||||
// AlwaysSetRelativateExpiry will have the interface set a relative expiry,
|
||||
// based on the current time, on all saved records.
|
||||
AlwaysSetRelativateExpiry int64
|
||||
AlwaysSetAbsoluteExpiry int64
|
||||
CacheSize int
|
||||
|
||||
// AlwaysSetAbsoluteExpiry will have the interface set an absolute expiry on
|
||||
// all saved records.
|
||||
AlwaysSetAbsoluteExpiry int64
|
||||
|
||||
// CacheSize defines that a cache should be used for this interface and
|
||||
// defines it's size.
|
||||
// Caching comes with an important caveat: If database records are changed
|
||||
// from another interface, the cache will not be invalidated for these
|
||||
// records. It will therefore serve outdated data until that record is
|
||||
// evicted from the cache.
|
||||
CacheSize int
|
||||
|
||||
// DelayCachedWrites defines a database name for which cache writes should
|
||||
// be cached and batched. The database backend must support the Batcher
|
||||
// interface. This option is only valid if used with a cache.
|
||||
// Additionally, this may only be used for internal and local interfaces.
|
||||
// Please note that this means that other interfaces will not be able to
|
||||
// guarantee to serve the latest record if records are written this way.
|
||||
DelayCachedWrites string
|
||||
}
|
||||
|
||||
// Apply applies options to the record metadata.
|
||||
|
@ -53,6 +92,28 @@ func (o *Options) Apply(r record.Record) {
|
|||
}
|
||||
}
|
||||
|
||||
// HasAllPermissions returns whether the options specify the highest possible
|
||||
// permissions for operations.
|
||||
func (o *Options) HasAllPermissions() bool {
|
||||
return o.Local && o.Internal
|
||||
}
|
||||
|
||||
// hasAccessPermission checks if the interface options permit access to the
|
||||
// given record, locking the record for accessing it's attributes.
|
||||
func (o *Options) hasAccessPermission(r record.Record) bool {
|
||||
// Check if the options specify all permissions, which makes checking the
|
||||
// record unnecessary.
|
||||
if o.HasAllPermissions() {
|
||||
return true
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
// Check permissions against record.
|
||||
return r.Meta().CheckPermission(o.Local, o.Internal)
|
||||
}
|
||||
|
||||
// NewInterface returns a new Interface to the database.
|
||||
func NewInterface(opts *Options) *Interface {
|
||||
if opts == nil {
|
||||
|
@ -63,57 +124,40 @@ func NewInterface(opts *Options) *Interface {
|
|||
options: opts,
|
||||
}
|
||||
if opts.CacheSize > 0 {
|
||||
new.cache = gcache.New(opts.CacheSize).ARC().Expiration(time.Hour).Build()
|
||||
cacheBuilder := gcache.New(opts.CacheSize).ARC()
|
||||
if opts.DelayCachedWrites != "" {
|
||||
cacheBuilder.EvictedFunc(new.cacheEvictHandler)
|
||||
new.writeCache = make(map[string]record.Record, opts.CacheSize/2)
|
||||
new.triggerCacheWrite = make(chan struct{})
|
||||
}
|
||||
new.cache = cacheBuilder.Build()
|
||||
}
|
||||
return new
|
||||
}
|
||||
|
||||
func (i *Interface) checkCache(key string) (record.Record, bool) {
|
||||
if i.cache != nil {
|
||||
cacheVal, err := i.cache.Get(key)
|
||||
if err == nil {
|
||||
r, ok := cacheVal.(record.Record)
|
||||
if ok {
|
||||
return r, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (i *Interface) updateCache(r record.Record) {
|
||||
if i.cache != nil {
|
||||
_ = i.cache.Set(r.Key(), r)
|
||||
}
|
||||
}
|
||||
|
||||
// Exists return whether a record with the given key exists.
|
||||
func (i *Interface) Exists(key string) (bool, error) {
|
||||
_, _, err := i.getRecord(getDBFromKey, key, false, false)
|
||||
_, err := i.Get(key)
|
||||
if err != nil {
|
||||
if err == ErrNotFound {
|
||||
switch {
|
||||
case errors.Is(err, ErrNotFound):
|
||||
return false, nil
|
||||
case errors.Is(err, ErrPermissionDenied):
|
||||
return true, nil
|
||||
default:
|
||||
return false, err
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Get return the record with the given key.
|
||||
func (i *Interface) Get(key string) (record.Record, error) {
|
||||
r, ok := i.checkCache(key)
|
||||
if ok {
|
||||
if !r.Meta().CheckPermission(i.options.Local, i.options.Internal) {
|
||||
return nil, ErrPermissionDenied
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
r, _, err := i.getRecord(getDBFromKey, key, true, false)
|
||||
r, _, err := i.getRecord(getDBFromKey, key, false)
|
||||
return r, err
|
||||
}
|
||||
|
||||
func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWriteable bool) (r record.Record, db *Controller, err error) {
|
||||
func (i *Interface) getRecord(dbName string, dbKey string, mustBeWriteable bool) (r record.Record, db *Controller, err error) {
|
||||
if dbName == "" {
|
||||
dbName, dbKey = record.ParseKey(dbKey)
|
||||
}
|
||||
|
@ -124,19 +168,24 @@ func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWri
|
|||
}
|
||||
|
||||
if mustBeWriteable && db.ReadOnly() {
|
||||
return nil, nil, ErrReadOnly
|
||||
return nil, db, ErrReadOnly
|
||||
}
|
||||
|
||||
r = i.checkCache(dbName + ":" + dbKey)
|
||||
if r != nil {
|
||||
if !i.options.hasAccessPermission(r) {
|
||||
return nil, db, ErrPermissionDenied
|
||||
}
|
||||
return r, db, nil
|
||||
}
|
||||
|
||||
r, err = db.Get(dbKey)
|
||||
if err != nil {
|
||||
if err == ErrNotFound {
|
||||
return nil, db, err
|
||||
}
|
||||
return nil, nil, err
|
||||
return nil, db, err
|
||||
}
|
||||
|
||||
if check && !r.Meta().CheckPermission(i.options.Local, i.options.Internal) {
|
||||
return nil, nil, ErrPermissionDenied
|
||||
if !i.options.hasAccessPermission(r) {
|
||||
return nil, db, ErrPermissionDenied
|
||||
}
|
||||
|
||||
return r, db, nil
|
||||
|
@ -144,7 +193,7 @@ func (i *Interface) getRecord(dbName string, dbKey string, check bool, mustBeWri
|
|||
|
||||
// InsertValue inserts a value into a record.
|
||||
func (i *Interface) InsertValue(key string, attribute string, value interface{}) error {
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true, true)
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -176,8 +225,41 @@ func (i *Interface) InsertValue(key string, attribute string, value interface{})
|
|||
func (i *Interface) Put(r record.Record) (err error) {
|
||||
// get record or only database
|
||||
var db *Controller
|
||||
if !i.options.Internal || !i.options.Local {
|
||||
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
||||
if !i.options.HasAllPermissions() {
|
||||
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
db, err = getController(r.DatabaseName())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Check if database is read only before we add to the cache.
|
||||
if db.ReadOnly() {
|
||||
return ErrReadOnly
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
i.options.Apply(r)
|
||||
|
||||
written := i.updateCache(r, true)
|
||||
if written {
|
||||
return nil
|
||||
}
|
||||
|
||||
return db.Put(r)
|
||||
}
|
||||
|
||||
// PutNew saves a record to the database as a new record (ie. with new timestamps).
|
||||
func (i *Interface) PutNew(r record.Record) (err error) {
|
||||
// get record or only database
|
||||
var db *Controller
|
||||
if !i.options.HasAllPermissions() {
|
||||
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
}
|
||||
|
@ -191,36 +273,16 @@ func (i *Interface) Put(r record.Record) (err error) {
|
|||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
i.options.Apply(r)
|
||||
|
||||
i.updateCache(r)
|
||||
return db.Put(r)
|
||||
}
|
||||
|
||||
// PutNew saves a record to the database as a new record (ie. with new timestamps).
|
||||
func (i *Interface) PutNew(r record.Record) (err error) {
|
||||
// get record or only database
|
||||
var db *Controller
|
||||
if !i.options.Internal || !i.options.Local {
|
||||
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
||||
if err != nil && err != ErrNotFound {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
db, err = getController(r.DatabaseKey())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
if r.Meta() != nil {
|
||||
r.Meta().Reset()
|
||||
}
|
||||
i.options.Apply(r)
|
||||
i.updateCache(r)
|
||||
|
||||
written := i.updateCache(r, true)
|
||||
if written {
|
||||
return nil
|
||||
}
|
||||
|
||||
return db.Put(r)
|
||||
}
|
||||
|
||||
|
@ -233,7 +295,7 @@ func (i *Interface) PutMany(dbName string) (put func(record.Record) error) {
|
|||
interfaceBatch := make(chan record.Record, 100)
|
||||
|
||||
// permission check
|
||||
if !i.options.Internal || !i.options.Local {
|
||||
if !i.options.HasAllPermissions() {
|
||||
return func(r record.Record) error {
|
||||
return ErrPermissionDenied
|
||||
}
|
||||
|
@ -316,7 +378,7 @@ func (i *Interface) PutMany(dbName string) (put func(record.Record) error) {
|
|||
|
||||
// SetAbsoluteExpiry sets an absolute record expiry.
|
||||
func (i *Interface) SetAbsoluteExpiry(key string, time int64) error {
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true, true)
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -331,7 +393,7 @@ func (i *Interface) SetAbsoluteExpiry(key string, time int64) error {
|
|||
|
||||
// SetRelativateExpiry sets a relative (self-updating) record expiry.
|
||||
func (i *Interface) SetRelativateExpiry(key string, duration int64) error {
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true, true)
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -346,7 +408,7 @@ func (i *Interface) SetRelativateExpiry(key string, duration int64) error {
|
|||
|
||||
// MakeSecret marks the record as a secret, meaning interfacing processes, such as an UI, are denied access to the record.
|
||||
func (i *Interface) MakeSecret(key string) error {
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true, true)
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -361,7 +423,7 @@ func (i *Interface) MakeSecret(key string) error {
|
|||
|
||||
// MakeCrownJewel marks a record as a crown jewel, meaning it will only be accessible locally.
|
||||
func (i *Interface) MakeCrownJewel(key string) error {
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true, true)
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -376,7 +438,7 @@ func (i *Interface) MakeCrownJewel(key string) error {
|
|||
|
||||
// Delete deletes a record from the database.
|
||||
func (i *Interface) Delete(key string) error {
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true, true)
|
||||
r, db, err := i.getRecord(getDBFromKey, key, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
203
database/interface_cache.go
Normal file
203
database/interface_cache.go
Normal file
|
@ -0,0 +1,203 @@
|
|||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/safing/portbase/database/record"
|
||||
"github.com/safing/portbase/log"
|
||||
)
|
||||
|
||||
// DelayedCacheWriter must be run by the caller of an interface that uses delayed cache writing.
|
||||
func (i *Interface) DelayedCacheWriter(ctx context.Context) error {
|
||||
// Check if the DelayedCacheWriter should be run at all.
|
||||
if i.options.CacheSize <= 0 || i.options.DelayCachedWrites == "" {
|
||||
return errors.New("delayed cache writer is not applicable to this database interface")
|
||||
}
|
||||
|
||||
// Check if backend support the Batcher interface.
|
||||
batchPut := i.PutMany(i.options.DelayCachedWrites)
|
||||
// End batchPut immediately and check for an error.
|
||||
err := batchPut(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// percentThreshold defines the minimum percentage of entries in the write cache in relation to the cache size that need to be present in order for flushing the cache to the database storage.
|
||||
percentThreshold := 25
|
||||
thresholdWriteTicker := time.NewTicker(5 * time.Second)
|
||||
forceWriteTicker := time.NewTicker(5 * time.Minute)
|
||||
|
||||
for {
|
||||
// Wait for trigger for writing the cache.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// The caller is shutting down, flush the cache to storage and exit.
|
||||
i.flushWriteCache(0)
|
||||
return nil
|
||||
|
||||
case <-i.triggerCacheWrite:
|
||||
// An entry from the cache was evicted that was also in the write cache.
|
||||
// This makes it likely that other entries that are also present in the
|
||||
// write cache will be evicted soon. Flush the write cache to storage
|
||||
// immediately in order to reduce single writes.
|
||||
i.flushWriteCache(0)
|
||||
|
||||
case <-thresholdWriteTicker.C:
|
||||
// Often check if the the write cache has filled up to a certain degree and
|
||||
// flush it to storage before we start evicting to-be-written entries and
|
||||
// slow down the hot path again.
|
||||
i.flushWriteCache(percentThreshold)
|
||||
|
||||
case <-forceWriteTicker.C:
|
||||
// Once in a while, flush the write cache to storage no matter how much
|
||||
// it is filled. We don't want entries lingering around in the write
|
||||
// cache forever. This also reduces the amount of data loss in the event
|
||||
// of a total crash.
|
||||
i.flushWriteCache(0)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (i *Interface) flushWriteCache(percentThreshold int) {
|
||||
i.writeCacheLock.Lock()
|
||||
defer i.writeCacheLock.Unlock()
|
||||
|
||||
// Check if there is anything to do.
|
||||
if len(i.writeCache) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if we reach the given threshold for writing to storage.
|
||||
if (len(i.writeCache)*100)/i.options.CacheSize < percentThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
// Write the full cache in a batch operation.
|
||||
batchPut := i.PutMany(i.options.DelayCachedWrites)
|
||||
for _, r := range i.writeCache {
|
||||
err := batchPut(r)
|
||||
if err != nil {
|
||||
log.Warningf("database: failed to write write-cached entry to %q database: %s", i.options.DelayCachedWrites, err)
|
||||
}
|
||||
}
|
||||
// Finish batch.
|
||||
err := batchPut(nil)
|
||||
if err != nil {
|
||||
log.Warningf("database: failed to finish flushing write cache to %q database: %s", i.options.DelayCachedWrites, err)
|
||||
}
|
||||
|
||||
// Optimized map clearing following the Go1.11 recommendation.
|
||||
for key := range i.writeCache {
|
||||
delete(i.writeCache, key)
|
||||
}
|
||||
}
|
||||
|
||||
// cacheEvictHandler is run by the cache for every entry that gets evicted
|
||||
// from the cache.
|
||||
func (i *Interface) cacheEvictHandler(keyData, _ interface{}) {
|
||||
// Transform the key into a string.
|
||||
key, ok := keyData.(string)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if the evicted record is one that is to be written.
|
||||
// Lock the write cache until the end of the function.
|
||||
// The read cache is locked anyway for the whole duration.
|
||||
i.writeCacheLock.Lock()
|
||||
defer i.writeCacheLock.Unlock()
|
||||
r, ok := i.writeCache[key]
|
||||
if ok {
|
||||
delete(i.writeCache, key)
|
||||
}
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Write record to database in order to mitigate race conditions where the record would appear
|
||||
// as non-existent for a short duration.
|
||||
db, err := getController(r.DatabaseName())
|
||||
if err != nil {
|
||||
log.Warningf("database: failed to write evicted cache entry %q: database %q does not exist", key, r.DatabaseName())
|
||||
return
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
err = db.Put(r)
|
||||
if err != nil {
|
||||
log.Warningf("database: failed to write evicted cache entry %q to database: %s", key, err)
|
||||
}
|
||||
|
||||
// Finally, trigger writing the full write cache because a to-be-written
|
||||
// entry was just evicted from the cache, and this makes it likely that more
|
||||
// to-be-written entries will be evicted shortly.
|
||||
select {
|
||||
case i.triggerCacheWrite <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (i *Interface) checkCache(key string) record.Record {
|
||||
// Check if cache is in use.
|
||||
if i.cache == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if record exists in cache.
|
||||
cacheVal, err := i.cache.Get(key)
|
||||
if err == nil {
|
||||
r, ok := cacheVal.(record.Record)
|
||||
if ok {
|
||||
return r
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Interface) updateCache(r record.Record, write bool) (written bool) {
|
||||
// Check if cache is in use.
|
||||
if i.cache == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if record should be deleted
|
||||
if r.Meta().IsDeleted() {
|
||||
// Remove entry from cache.
|
||||
i.cache.Remove(r.Key())
|
||||
// Let write through to database storage.
|
||||
return false
|
||||
}
|
||||
|
||||
// Update cache with record.
|
||||
ttl := r.Meta().GetRelativeExpiry()
|
||||
if ttl >= 0 {
|
||||
_ = i.cache.SetWithExpire(
|
||||
r.Key(),
|
||||
r,
|
||||
time.Duration(ttl)*time.Second,
|
||||
)
|
||||
} else {
|
||||
_ = i.cache.Set(
|
||||
r.Key(),
|
||||
r,
|
||||
)
|
||||
}
|
||||
|
||||
// Add record to write cache instead if:
|
||||
// 1. The record is being written.
|
||||
// 2. Write delaying is active.
|
||||
// 3. Write delaying is active for the database of this record.
|
||||
if write && r.DatabaseName() == i.options.DelayCachedWrites {
|
||||
i.writeCacheLock.Lock()
|
||||
defer i.writeCacheLock.Unlock()
|
||||
i.writeCache[r.Key()] = r
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
158
database/interface_cache_test.go
Normal file
158
database/interface_cache_test.go
Normal file
|
@ -0,0 +1,158 @@
|
|||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func benchmarkCacheWriting(b *testing.B, storageType string, cacheSize int, sampleSize int, delayWrites bool) { //nolint:gocognit,gocyclo
|
||||
b.Run(fmt.Sprintf("CacheWriting_%s_%d_%d_%v", storageType, cacheSize, sampleSize, delayWrites), func(b *testing.B) {
|
||||
// Setup Benchmark.
|
||||
|
||||
// Create database.
|
||||
dbName := fmt.Sprintf("cache-w-benchmark-%s-%d-%d-%v", storageType, cacheSize, sampleSize, delayWrites)
|
||||
_, err := Register(&Database{
|
||||
Name: dbName,
|
||||
Description: fmt.Sprintf("Cache Benchmark Database for %s", storageType),
|
||||
StorageType: storageType,
|
||||
})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
// Create benchmark interface.
|
||||
options := &Options{
|
||||
Local: true,
|
||||
Internal: true,
|
||||
CacheSize: cacheSize,
|
||||
}
|
||||
if cacheSize > 0 && delayWrites {
|
||||
options.DelayCachedWrites = dbName
|
||||
}
|
||||
db := NewInterface(options)
|
||||
|
||||
// Start
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
var wg sync.WaitGroup
|
||||
if cacheSize > 0 && delayWrites {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
err := db.DelayedCacheWriter(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
// Start Benchmark.
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
testRecordID := i % sampleSize
|
||||
r := NewExample(
|
||||
dbName+":"+strconv.Itoa(testRecordID),
|
||||
"A",
|
||||
1,
|
||||
)
|
||||
err = db.Put(r)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// End cache writer and wait
|
||||
cancelCtx()
|
||||
wg.Wait()
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
func benchmarkCacheReadWrite(b *testing.B, storageType string, cacheSize int, sampleSize int, delayWrites bool) { //nolint:gocognit,gocyclo
|
||||
b.Run(fmt.Sprintf("CacheReadWrite_%s_%d_%d_%v", storageType, cacheSize, sampleSize, delayWrites), func(b *testing.B) {
|
||||
// Setup Benchmark.
|
||||
|
||||
// Create database.
|
||||
dbName := fmt.Sprintf("cache-rw-benchmark-%s-%d-%d-%v", storageType, cacheSize, sampleSize, delayWrites)
|
||||
_, err := Register(&Database{
|
||||
Name: dbName,
|
||||
Description: fmt.Sprintf("Cache Benchmark Database for %s", storageType),
|
||||
StorageType: storageType,
|
||||
})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
// Create benchmark interface.
|
||||
options := &Options{
|
||||
Local: true,
|
||||
Internal: true,
|
||||
CacheSize: cacheSize,
|
||||
}
|
||||
if cacheSize > 0 && delayWrites {
|
||||
options.DelayCachedWrites = dbName
|
||||
}
|
||||
db := NewInterface(options)
|
||||
|
||||
// Start
|
||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
var wg sync.WaitGroup
|
||||
if cacheSize > 0 && delayWrites {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
err := db.DelayedCacheWriter(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
// Start Benchmark.
|
||||
b.ResetTimer()
|
||||
writing := true
|
||||
for i := 0; i < b.N; i++ {
|
||||
testRecordID := i % sampleSize
|
||||
key := dbName + ":" + strconv.Itoa(testRecordID)
|
||||
|
||||
if i > 0 && testRecordID == 0 {
|
||||
writing = !writing // switch between reading and writing every samplesize
|
||||
}
|
||||
|
||||
if writing {
|
||||
r := NewExample(key, "A", 1)
|
||||
err = db.Put(r)
|
||||
} else {
|
||||
_, err = db.Get(key)
|
||||
}
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// End cache writer and wait
|
||||
cancelCtx()
|
||||
wg.Wait()
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkCache(b *testing.B) {
|
||||
for _, storageType := range []string{"bbolt", "hashmap"} {
|
||||
benchmarkCacheWriting(b, storageType, 32, 8, false)
|
||||
benchmarkCacheWriting(b, storageType, 32, 8, true)
|
||||
benchmarkCacheWriting(b, storageType, 32, 1024, false)
|
||||
benchmarkCacheWriting(b, storageType, 32, 1024, true)
|
||||
benchmarkCacheWriting(b, storageType, 512, 1024, false)
|
||||
benchmarkCacheWriting(b, storageType, 512, 1024, true)
|
||||
|
||||
benchmarkCacheReadWrite(b, storageType, 32, 8, false)
|
||||
benchmarkCacheReadWrite(b, storageType, 32, 8, true)
|
||||
benchmarkCacheReadWrite(b, storageType, 32, 1024, false)
|
||||
benchmarkCacheReadWrite(b, storageType, 32, 1024, true)
|
||||
benchmarkCacheReadWrite(b, storageType, 512, 1024, false)
|
||||
benchmarkCacheReadWrite(b, storageType, 512, 1024, true)
|
||||
}
|
||||
}
|
|
@ -2,7 +2,6 @@ package record
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/safing/portbase/container"
|
||||
"github.com/safing/portbase/database/accessor"
|
||||
|
@ -34,7 +33,7 @@ type Base struct {
|
|||
|
||||
// Key returns the key of the database record.
|
||||
func (b *Base) Key() string {
|
||||
return fmt.Sprintf("%s:%s", b.dbName, b.dbKey)
|
||||
return b.dbName + ":" + b.dbKey
|
||||
}
|
||||
|
||||
// KeyIsSet returns true if the database key is set.
|
||||
|
|
|
@ -31,9 +31,10 @@ func (m *Meta) GetAbsoluteExpiry() int64 {
|
|||
}
|
||||
|
||||
// GetRelativeExpiry returns the current relative expiry time - ie. seconds until expiry.
|
||||
// A negative value signifies that the record does not expire.
|
||||
func (m *Meta) GetRelativeExpiry() int64 {
|
||||
if m.Deleted < 0 {
|
||||
return -m.Deleted
|
||||
if m.Expires == 0 {
|
||||
return -1
|
||||
}
|
||||
|
||||
abs := m.Expires - time.Now().Unix()
|
||||
|
|
|
@ -115,26 +115,9 @@ func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error)
|
|||
err := b.db.Batch(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket(bucketName)
|
||||
for r := range batch {
|
||||
if !shadowDelete && r.Meta().IsDeleted() {
|
||||
// Immediate delete.
|
||||
txErr := bucket.Delete([]byte(r.DatabaseKey()))
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
}
|
||||
} else {
|
||||
// Put or shadow delete.
|
||||
|
||||
// marshal
|
||||
data, txErr := r.MarshalRecord(r)
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
}
|
||||
|
||||
// put
|
||||
txErr = bucket.Put([]byte(r.DatabaseKey()), data)
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
}
|
||||
txErr := b.batchPutOrDelete(bucket, shadowDelete, r)
|
||||
if txErr != nil {
|
||||
return txErr
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -145,6 +128,25 @@ func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error)
|
|||
return batch, errs
|
||||
}
|
||||
|
||||
func (b *BBolt) batchPutOrDelete(bucket *bbolt.Bucket, shadowDelete bool, r record.Record) (err error) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
if !shadowDelete && r.Meta().IsDeleted() {
|
||||
// Immediate delete.
|
||||
err = bucket.Delete([]byte(r.DatabaseKey()))
|
||||
} else {
|
||||
// Put or shadow delete.
|
||||
var data []byte
|
||||
data, err = r.MarshalRecord(r)
|
||||
if err == nil {
|
||||
err = bucket.Put([]byte(r.DatabaseKey()), data)
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete deletes a record from the database.
|
||||
func (b *BBolt) Delete(key string) error {
|
||||
err := b.db.Update(func(tx *bbolt.Tx) error {
|
||||
|
|
|
@ -66,11 +66,7 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro
|
|||
// start handler
|
||||
go func() {
|
||||
for r := range batch {
|
||||
if !shadowDelete && r.Meta().IsDeleted() {
|
||||
delete(hm.db, r.DatabaseKey())
|
||||
} else {
|
||||
hm.db[r.DatabaseKey()] = r
|
||||
}
|
||||
hm.batchPutOrDelete(shadowDelete, r)
|
||||
}
|
||||
errs <- nil
|
||||
}()
|
||||
|
@ -78,6 +74,20 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro
|
|||
return batch, errs
|
||||
}
|
||||
|
||||
func (hm *HashMap) batchPutOrDelete(shadowDelete bool, r record.Record) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
hm.dbLock.Lock()
|
||||
defer hm.dbLock.Unlock()
|
||||
|
||||
if !shadowDelete && r.Meta().IsDeleted() {
|
||||
delete(hm.db, r.DatabaseKey())
|
||||
} else {
|
||||
hm.db[r.DatabaseKey()] = r
|
||||
}
|
||||
}
|
||||
|
||||
// Delete deletes a record from the database.
|
||||
func (hm *HashMap) Delete(key string) error {
|
||||
hm.dbLock.Lock()
|
||||
|
|
Loading…
Add table
Reference in a new issue