Merge pull request #82 from safing/feature/deletemany

Make shadow deletes conditional
This commit is contained in:
Daniel 2020-09-24 15:01:16 +02:00 committed by GitHub
commit 1c765dfbb8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 127 additions and 103 deletions

View file

@ -134,7 +134,6 @@ func registerAsDatabase() error {
Name: "config", Name: "config",
Description: "Configuration Manager", Description: "Configuration Manager",
StorageType: "injected", StorageType: "injected",
PrimaryAPI: "",
}) })
if err != nil { if err != nil {
return err return err

View file

@ -17,6 +17,7 @@ import (
// A Controller takes care of all the extra database logic. // A Controller takes care of all the extra database logic.
type Controller struct { type Controller struct {
storage storage.Interface storage storage.Interface
shadowDelete bool
hooks []*RegisteredHook hooks []*RegisteredHook
subscriptions []*Subscription subscriptions []*Subscription
@ -33,9 +34,10 @@ type Controller struct {
} }
// newController creates a new controller for a storage. // newController creates a new controller for a storage.
func newController(storageInt storage.Interface) *Controller { func newController(storageInt storage.Interface, shadowDelete bool) *Controller {
return &Controller{ return &Controller{
storage: storageInt, storage: storageInt,
shadowDelete: shadowDelete,
migrating: abool.NewBool(false), migrating: abool.NewBool(false),
hibernating: abool.NewBool(false), hibernating: abool.NewBool(false),
} }
@ -122,6 +124,14 @@ func (c *Controller) Put(r record.Record) (err error) {
} }
} }
if !c.shadowDelete && r.Meta().IsDeleted() {
// Immediate delete.
err = c.storage.Delete(r.DatabaseKey())
if err != nil {
return err
}
} else {
// Put or shadow delete.
r, err = c.storage.Put(r) r, err = c.storage.Put(r)
if err != nil { if err != nil {
return err return err
@ -129,6 +139,7 @@ func (c *Controller) Put(r record.Record) (err error) {
if r == nil { if r == nil {
return errors.New("storage returned nil record after successful put operation") return errors.New("storage returned nil record after successful put operation")
} }
}
// process subscriptions // process subscriptions
for _, sub := range c.subscriptions { for _, sub := range c.subscriptions {
@ -161,7 +172,7 @@ func (c *Controller) PutMany() (chan<- record.Record, <-chan error) {
} }
if batcher, ok := c.storage.(storage.Batcher); ok { if batcher, ok := c.storage.(storage.Batcher); ok {
return batcher.PutMany() return batcher.PutMany(c.shadowDelete)
} }
errs := make(chan error, 1) errs := make(chan error, 1)
@ -236,7 +247,10 @@ func (c *Controller) Maintain(ctx context.Context) error {
return nil return nil
} }
return c.storage.Maintain(ctx) if maintainer, ok := c.storage.(storage.Maintainer); ok {
return maintainer.Maintain(ctx)
}
return nil
} }
// MaintainThorough runs the MaintainThorough method on the storage. // MaintainThorough runs the MaintainThorough method on the storage.
@ -248,7 +262,10 @@ func (c *Controller) MaintainThorough(ctx context.Context) error {
return nil return nil
} }
return c.storage.MaintainThorough(ctx) if maintainer, ok := c.storage.(storage.Maintainer); ok {
return maintainer.MaintainThorough(ctx)
}
return nil
} }
// MaintainRecordStates runs the record state lifecycle maintenance on the storage. // MaintainRecordStates runs the record state lifecycle maintenance on the storage.

View file

@ -51,7 +51,7 @@ func getController(name string) (*Controller, error) {
return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err) return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err)
} }
controller = newController(storageInt) controller = newController(storageInt, registeredDB.ShadowDelete)
controllers[name] = controller controllers[name] = controller
return controller, nil return controller, nil
} }
@ -82,7 +82,7 @@ func InjectDatabase(name string, storageInt storage.Interface) (*Controller, err
return nil, fmt.Errorf(`database not of type "injected"`) return nil, fmt.Errorf(`database not of type "injected"`)
} }
controller := newController(storageInt) controller := newController(storageInt, false)
controllers[name] = controller controllers[name] = controller
return controller, nil return controller, nil
} }

View file

@ -10,7 +10,7 @@ type Database struct {
Name string Name string
Description string Description string
StorageType string StorageType string
PrimaryAPI string ShadowDelete bool // Whether deleted records should be kept until purged.
Registered time.Time Registered time.Time
LastUpdated time.Time LastUpdated time.Time
LastLoaded time.Time LastLoaded time.Time

View file

@ -34,7 +34,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint
Name: dbName, Name: dbName,
Description: fmt.Sprintf("Unit Test Database for %s", storageType), Description: fmt.Sprintf("Unit Test Database for %s", storageType),
StorageType: storageType, StorageType: storageType,
PrimaryAPI: "", ShadowDelete: true,
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View file

@ -49,8 +49,8 @@ func Register(new *Database) (*Database, error) {
registeredDB.Description = new.Description registeredDB.Description = new.Description
save = true save = true
} }
if registeredDB.PrimaryAPI != new.PrimaryAPI { if registeredDB.ShadowDelete != new.ShadowDelete {
registeredDB.PrimaryAPI = new.PrimaryAPI registeredDB.ShadowDelete = new.ShadowDelete
save = true save = true
} }
} else { } else {

View file

@ -11,6 +11,13 @@ import (
"github.com/safing/portbase/database/query" "github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record" "github.com/safing/portbase/database/record"
"github.com/safing/portbase/database/storage"
)
var (
// Compile time interface checks.
_ storage.Interface = &Badger{}
_ storage.Maintainer = &Badger{}
) )
type TestRecord struct { type TestRecord struct {
@ -117,14 +124,19 @@ func TestBadger(t *testing.T) {
} }
// maintenance // maintenance
err = db.Maintain(context.TODO()) maintainer, ok := db.(storage.Maintainer)
if ok {
err = maintainer.Maintain(context.TODO())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = db.MaintainThorough(context.TODO()) err = maintainer.MaintainThorough(context.TODO())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
} else {
t.Fatal("should implement Maintainer")
}
// shutdown // shutdown
err = db.Shutdown() err = db.Shutdown()

View file

@ -107,7 +107,7 @@ func (b *BBolt) Put(r record.Record) (record.Record, error) {
} }
// PutMany stores many records in the database. // PutMany stores many records in the database.
func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) { func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
batch := make(chan record.Record, 100) batch := make(chan record.Record, 100)
errs := make(chan error, 1) errs := make(chan error, 1)
@ -115,6 +115,15 @@ func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) {
err := b.db.Batch(func(tx *bbolt.Tx) error { err := b.db.Batch(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName) bucket := tx.Bucket(bucketName)
for r := range batch { for r := range batch {
if !shadowDelete && r.Meta().IsDeleted() {
// Immediate delete.
txErr := bucket.Delete([]byte(r.DatabaseKey()))
if txErr != nil {
return txErr
}
} else {
// Put or shadow delete.
// marshal // marshal
data, txErr := r.MarshalRecord(r) data, txErr := r.MarshalRecord(r)
if txErr != nil { if txErr != nil {
@ -127,6 +136,7 @@ func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) {
return txErr return txErr
} }
} }
}
return nil return nil
}) })
errs <- err errs <- err
@ -235,16 +245,6 @@ func (b *BBolt) Injected() bool {
return false return false
} }
// Maintain runs a light maintenance operation on the database.
func (b *BBolt) Maintain(_ context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (b *BBolt) MaintainThorough(_ context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database. // MaintainRecordStates maintains records states in the database.
func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
now := time.Now().Unix() now := time.Now().Unix()

View file

@ -12,6 +12,13 @@ import (
"github.com/safing/portbase/database/query" "github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record" "github.com/safing/portbase/database/record"
"github.com/safing/portbase/database/storage"
)
var (
// Compile time interface checks.
_ storage.Interface = &BBolt{}
_ storage.Batcher = &BBolt{}
) )
type TestRecord struct { type TestRecord struct {
@ -146,14 +153,6 @@ func TestBBolt(t *testing.T) {
} }
// maintenance // maintenance
err = db.Maintain(context.TODO())
if err != nil {
t.Fatal(err)
}
err = db.MaintainThorough(context.TODO())
if err != nil {
t.Fatal(err)
}
err = db.MaintainRecordStates(context.TODO(), time.Now()) err = db.MaintainRecordStates(context.TODO(), time.Now())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View file

@ -255,16 +255,6 @@ func (fst *FSTree) Injected() bool {
return false return false
} }
// Maintain runs a light maintenance operation on the database.
func (fst *FSTree) Maintain(_ context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (fst *FSTree) MaintainThorough(_ context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database. // MaintainRecordStates maintains records states in the database.
func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
// TODO: implement MaintainRecordStates // TODO: implement MaintainRecordStates
@ -279,6 +269,7 @@ func (fst *FSTree) Shutdown() error {
// writeFile mirrors ioutil.WriteFile, replacing an existing file with the same // writeFile mirrors ioutil.WriteFile, replacing an existing file with the same
// name atomically. This is not atomic on Windows, but still an improvement. // name atomically. This is not atomic on Windows, but still an improvement.
// TODO: Replace with github.com/google/renamio.WriteFile as soon as it is fixed on Windows. // TODO: Replace with github.com/google/renamio.WriteFile as soon as it is fixed on Windows.
// TODO: This has become a wont-fix. Explore other options.
// This function is forked from https://github.com/google/renameio/blob/a368f9987532a68a3d676566141654a81aa8100b/writefile.go. // This function is forked from https://github.com/google/renameio/blob/a368f9987532a68a3d676566141654a81aa8100b/writefile.go.
func writeFile(filename string, data []byte, perm os.FileMode) error { func writeFile(filename string, data []byte, perm os.FileMode) error {
t, err := renameio.TempFile("", filename) t, err := renameio.TempFile("", filename)

View file

@ -0,0 +1,8 @@
package fstree
import "github.com/safing/portbase/database/storage"
var (
// Compile time interface checks.
_ storage.Interface = &FSTree{}
)

View file

@ -54,7 +54,7 @@ func (hm *HashMap) Put(r record.Record) (record.Record, error) {
} }
// PutMany stores many records in the database. // PutMany stores many records in the database.
func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) { func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
hm.dbLock.Lock() hm.dbLock.Lock()
defer hm.dbLock.Unlock() defer hm.dbLock.Unlock()
// we could lock for every record, but we want to have the same behaviour // we could lock for every record, but we want to have the same behaviour
@ -66,8 +66,12 @@ func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) {
// start handler // start handler
go func() { go func() {
for r := range batch { for r := range batch {
if !shadowDelete && r.Meta().IsDeleted() {
delete(hm.db, r.DatabaseKey())
} else {
hm.db[r.DatabaseKey()] = r hm.db[r.DatabaseKey()] = r
} }
}
errs <- nil errs <- nil
}() }()
@ -146,16 +150,6 @@ func (hm *HashMap) Injected() bool {
return false return false
} }
// Maintain runs a light maintenance operation on the database.
func (hm *HashMap) Maintain(_ context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (hm *HashMap) MaintainThorough(_ context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database. // MaintainRecordStates maintains records states in the database.
func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
hm.dbLock.Lock() hm.dbLock.Lock()

View file

@ -2,15 +2,22 @@
package hashmap package hashmap
import ( import (
"context"
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
"github.com/safing/portbase/database/storage"
"github.com/safing/portbase/database/query" "github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record" "github.com/safing/portbase/database/record"
) )
var (
// Compile time interface checks.
_ storage.Interface = &HashMap{}
_ storage.Batcher = &HashMap{}
)
type TestRecord struct { type TestRecord struct {
record.Base record.Base
sync.Mutex sync.Mutex
@ -130,16 +137,6 @@ func TestHashMap(t *testing.T) {
t.Fatal("should fail") t.Fatal("should fail")
} }
// maintenance
err = db.Maintain(context.TODO())
if err != nil {
t.Fatal(err)
}
err = db.MaintainThorough(context.TODO())
if err != nil {
t.Fatal(err)
}
// shutdown // shutdown
err = db.Shutdown() err = db.Shutdown()
if err != nil { if err != nil {

View file

@ -28,7 +28,7 @@ func (i *InjectBase) Put(m record.Record) (record.Record, error) {
} }
// PutMany stores many records in the database. // PutMany stores many records in the database.
func (i *InjectBase) PutMany() (batch chan record.Record, err chan error) { func (i *InjectBase) PutMany(shadowDelete bool) (batch chan record.Record, err chan error) {
batch = make(chan record.Record) batch = make(chan record.Record)
err = make(chan error, 1) err = make(chan error, 1)
err <- errNotImplemented err <- errNotImplemented

View file

@ -11,20 +11,28 @@ import (
// Interface defines the database storage API. // Interface defines the database storage API.
type Interface interface { type Interface interface {
// Primary Interface
Get(key string) (record.Record, error) Get(key string) (record.Record, error)
Put(m record.Record) (record.Record, error) Put(m record.Record) (record.Record, error)
Delete(key string) error Delete(key string) error
Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error)
// Information and Control
ReadOnly() bool ReadOnly() bool
Injected() bool Injected() bool
Shutdown() error
// Mandatory Record Maintenance
MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error
}
// Maintainer defines the database storage API for backends that require regular maintenance.
type Maintainer interface {
Maintain(ctx context.Context) error Maintain(ctx context.Context) error
MaintainThorough(ctx context.Context) error MaintainThorough(ctx context.Context) error
MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error
Shutdown() error
} }
// Batcher defines the database storage API for backends that support batch operations. // Batcher defines the database storage API for backends that support batch operations.
type Batcher interface { type Batcher interface {
PutMany() (batch chan<- record.Record, errs <-chan error) PutMany(shadowDelete bool) (batch chan<- record.Record, errs <-chan error)
} }

View file

@ -48,7 +48,6 @@ func registerAsDatabase() error {
Name: "notifications", Name: "notifications",
Description: "Notifications", Description: "Notifications",
StorageType: "injected", StorageType: "injected",
PrimaryAPI: "",
}) })
if err != nil { if err != nil {
return err return err