From 5bb73a7b4ce6f56930dc3dabc8b790736b3c5477 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 23 Sep 2020 17:10:33 +0200 Subject: [PATCH 1/8] Make shadow deletes conditional Also, Move maintenance to separate interface. --- database/controller.go | 50 +++++++++++++++++++++++---------- database/controllers.go | 4 +-- database/database.go | 14 ++++----- database/database_test.go | 8 +++--- database/registry.go | 4 +-- database/storage/bbolt/bbolt.go | 30 +++++++++++++------- database/storage/hashmap/map.go | 8 ++++-- database/storage/injectbase.go | 2 +- database/storage/interface.go | 8 ++++-- 9 files changed, 83 insertions(+), 45 deletions(-) diff --git a/database/controller.go b/database/controller.go index 4f7ae3e..bb9926e 100644 --- a/database/controller.go +++ b/database/controller.go @@ -16,7 +16,8 @@ import ( // A Controller takes care of all the extra database logic. type Controller struct { - storage storage.Interface + storage storage.Interface + shadowDelete bool hooks []*RegisteredHook subscriptions []*Subscription @@ -33,11 +34,12 @@ type Controller struct { } // newController creates a new controller for a storage. -func newController(storageInt storage.Interface) *Controller { +func newController(storageInt storage.Interface, shadowDelete bool) *Controller { return &Controller{ - storage: storageInt, - migrating: abool.NewBool(false), - hibernating: abool.NewBool(false), + storage: storageInt, + shadowDelete: shadowDelete, + migrating: abool.NewBool(false), + hibernating: abool.NewBool(false), } } @@ -122,12 +124,21 @@ func (c *Controller) Put(r record.Record) (err error) { } } - r, err = c.storage.Put(r) - if err != nil { - return err - } - if r == nil { - return errors.New("storage returned nil record after successful put operation") + if !c.shadowDelete && r.Meta().IsDeleted() { + // Immediate delete. + err = c.storage.Delete(r.DatabaseKey()) + if err != nil { + return err + } + } else { + // Put or shadow delete. + r, err = c.storage.Put(r) + if err != nil { + return err + } + if r == nil { + return errors.New("storage returned nil record after successful put operation") + } } // process subscriptions @@ -161,7 +172,7 @@ func (c *Controller) PutMany() (chan<- record.Record, <-chan error) { } if batcher, ok := c.storage.(storage.Batcher); ok { - return batcher.PutMany() + return batcher.PutMany(c.shadowDelete) } errs := make(chan error, 1) @@ -236,7 +247,10 @@ func (c *Controller) Maintain(ctx context.Context) error { return nil } - return c.storage.Maintain(ctx) + if maintenance, ok := c.storage.(storage.Maintenance); ok { + return maintenance.Maintain(ctx) + } + return nil } // MaintainThorough runs the MaintainThorough method on the storage. @@ -248,7 +262,10 @@ func (c *Controller) MaintainThorough(ctx context.Context) error { return nil } - return c.storage.MaintainThorough(ctx) + if maintenance, ok := c.storage.(storage.Maintenance); ok { + return maintenance.MaintainThorough(ctx) + } + return nil } // MaintainRecordStates runs the record state lifecycle maintenance on the storage. @@ -260,7 +277,10 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor return nil } - return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore) + if maintenance, ok := c.storage.(storage.Maintenance); ok { + return maintenance.MaintainRecordStates(ctx, purgeDeletedBefore) + } + return nil } // Shutdown shuts down the storage. diff --git a/database/controllers.go b/database/controllers.go index ba8c7cd..9b2ac68 100644 --- a/database/controllers.go +++ b/database/controllers.go @@ -51,7 +51,7 @@ func getController(name string) (*Controller, error) { return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err) } - controller = newController(storageInt) + controller = newController(storageInt, registeredDB.ShadowDelete) controllers[name] = controller return controller, nil } @@ -82,7 +82,7 @@ func InjectDatabase(name string, storageInt storage.Interface) (*Controller, err return nil, fmt.Errorf(`database not of type "injected"`) } - controller := newController(storageInt) + controller := newController(storageInt, false) controllers[name] = controller return controller, nil } diff --git a/database/database.go b/database/database.go index e8e4504..048f35f 100644 --- a/database/database.go +++ b/database/database.go @@ -7,13 +7,13 @@ import ( // Database holds information about registered databases type Database struct { - Name string - Description string - StorageType string - PrimaryAPI string - Registered time.Time - LastUpdated time.Time - LastLoaded time.Time + Name string + Description string + StorageType string + ShadowDelete bool // Whether deleted records should be kept until purged. + Registered time.Time + LastUpdated time.Time + LastLoaded time.Time } // MigrateTo migrates the database to another storage type. diff --git a/database/database_test.go b/database/database_test.go index 5749cec..2f3b4a5 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -31,10 +31,10 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint dbName := fmt.Sprintf("testing-%s", storageType) fmt.Println(dbName) _, err := Register(&Database{ - Name: dbName, - Description: fmt.Sprintf("Unit Test Database for %s", storageType), - StorageType: storageType, - PrimaryAPI: "", + Name: dbName, + Description: fmt.Sprintf("Unit Test Database for %s", storageType), + StorageType: storageType, + ShadowDelete: true, }) if err != nil { t.Fatal(err) diff --git a/database/registry.go b/database/registry.go index 8b65f7e..81d1de4 100644 --- a/database/registry.go +++ b/database/registry.go @@ -49,8 +49,8 @@ func Register(new *Database) (*Database, error) { registeredDB.Description = new.Description save = true } - if registeredDB.PrimaryAPI != new.PrimaryAPI { - registeredDB.PrimaryAPI = new.PrimaryAPI + if registeredDB.ShadowDelete != new.ShadowDelete { + registeredDB.ShadowDelete = new.ShadowDelete save = true } } else { diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index c512334..654aacb 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -107,7 +107,7 @@ func (b *BBolt) Put(r record.Record) (record.Record, error) { } // PutMany stores many records in the database. -func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) { +func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) { batch := make(chan record.Record, 100) errs := make(chan error, 1) @@ -115,16 +115,26 @@ func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) { err := b.db.Batch(func(tx *bbolt.Tx) error { bucket := tx.Bucket(bucketName) for r := range batch { - // marshal - data, txErr := r.MarshalRecord(r) - if txErr != nil { - return txErr - } + if !shadowDelete && r.Meta().IsDeleted() { + // Immediate delete. + txErr := bucket.Delete([]byte(r.DatabaseKey())) + if txErr != nil { + return txErr + } + } else { + // Put or shadow delete. - // put - txErr = bucket.Put([]byte(r.DatabaseKey()), data) - if txErr != nil { - return txErr + // marshal + data, txErr := r.MarshalRecord(r) + if txErr != nil { + return txErr + } + + // put + txErr = bucket.Put([]byte(r.DatabaseKey()), data) + if txErr != nil { + return txErr + } } } return nil diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go index fb1e073..25a3fb2 100644 --- a/database/storage/hashmap/map.go +++ b/database/storage/hashmap/map.go @@ -54,7 +54,7 @@ func (hm *HashMap) Put(r record.Record) (record.Record, error) { } // PutMany stores many records in the database. -func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) { +func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) { hm.dbLock.Lock() defer hm.dbLock.Unlock() // we could lock for every record, but we want to have the same behaviour @@ -66,7 +66,11 @@ func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) { // start handler go func() { for r := range batch { - hm.db[r.DatabaseKey()] = r + if !shadowDelete && r.Meta().IsDeleted() { + delete(hm.db, r.DatabaseKey()) + } else { + hm.db[r.DatabaseKey()] = r + } } errs <- nil }() diff --git a/database/storage/injectbase.go b/database/storage/injectbase.go index 8dda71d..0f6bf25 100644 --- a/database/storage/injectbase.go +++ b/database/storage/injectbase.go @@ -28,7 +28,7 @@ func (i *InjectBase) Put(m record.Record) (record.Record, error) { } // PutMany stores many records in the database. -func (i *InjectBase) PutMany() (batch chan record.Record, err chan error) { +func (i *InjectBase) PutMany(shadowDelete bool) (batch chan record.Record, err chan error) { batch = make(chan record.Record) err = make(chan error, 1) err <- errNotImplemented diff --git a/database/storage/interface.go b/database/storage/interface.go index 4ec5232..c45c2d6 100644 --- a/database/storage/interface.go +++ b/database/storage/interface.go @@ -18,13 +18,17 @@ type Interface interface { ReadOnly() bool Injected() bool + Shutdown() error +} + +// Maintenance defines the database storage API for backends that requ +type Maintenance interface { Maintain(ctx context.Context) error MaintainThorough(ctx context.Context) error MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error - Shutdown() error } // Batcher defines the database storage API for backends that support batch operations. type Batcher interface { - PutMany() (batch chan<- record.Record, errs <-chan error) + PutMany(shadowDelete bool) (batch chan<- record.Record, errs <-chan error) } From dae54812034d2e8cbc2242c1546134d00bd14e56 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 23 Sep 2020 17:11:51 +0200 Subject: [PATCH 2/8] Improve stack printing --- run/main.go | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/run/main.go b/run/main.go index f7c92be..37c3d0a 100644 --- a/run/main.go +++ b/run/main.go @@ -38,7 +38,7 @@ func Run() int { } if printStackOnExit { - printStackTo(os.Stdout) + printStackTo(os.Stdout, "PRINTING STACK ON EXIT (STARTUP ERROR)") } _ = modules.Shutdown() @@ -67,7 +67,7 @@ signalLoop: case sig := <-signalCh: // only print and continue to wait if SIGUSR1 if sig == sigUSR1 { - _ = pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) + printStackTo(os.Stderr, "PRINTING STACK ON REQUEST") continue signalLoop } @@ -83,21 +83,19 @@ signalLoop: if forceCnt > 0 { fmt.Printf(" again, but already shutting down. %d more to force.\n", forceCnt) } else { - fmt.Fprintln(os.Stderr, "===== FORCED EXIT =====") - printStackTo(os.Stderr) + printStackTo(os.Stderr, "PRINTING STACK ON FORCED EXIT") os.Exit(1) } } }() if printStackOnExit { - printStackTo(os.Stdout) + printStackTo(os.Stdout, "PRINTING STACK ON EXIT") } go func() { time.Sleep(3 * time.Minute) - fmt.Fprintln(os.Stderr, "===== TAKING TOO LONG FOR SHUTDOWN =====") - printStackTo(os.Stderr) + printStackTo(os.Stderr, "PRINTING STACK - TAKING TOO LONG FOR SHUTDOWN") os.Exit(1) }() @@ -131,13 +129,12 @@ func inputSignals(signalCh chan os.Signal) { } } -func printStackTo(writer io.Writer) { - fmt.Fprintln(writer, "=== PRINTING TRACES ===") - fmt.Fprintln(writer, "=== GOROUTINES ===") - _ = pprof.Lookup("goroutine").WriteTo(writer, 1) - fmt.Fprintln(writer, "=== BLOCKING ===") - _ = pprof.Lookup("block").WriteTo(writer, 1) - fmt.Fprintln(writer, "=== MUTEXES ===") - _ = pprof.Lookup("mutex").WriteTo(writer, 1) - fmt.Fprintln(writer, "=== END TRACES ===") +func printStackTo(writer io.Writer, msg string) { + _, err := fmt.Fprintln(writer, fmt.Sprintf("===== %s =====", msg)) + if err == nil { + err = pprof.Lookup("goroutine").WriteTo(writer, 1) + } + if err != nil { + log.Errorf("main: failed to write stack trace: %s", err) + } } From c4e24102f6247425bdbbcd4360d685e3f1ba96a0 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 24 Sep 2020 08:57:42 +0200 Subject: [PATCH 3/8] Fix linter error --- run/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run/main.go b/run/main.go index 37c3d0a..92f6828 100644 --- a/run/main.go +++ b/run/main.go @@ -130,7 +130,7 @@ func inputSignals(signalCh chan os.Signal) { } func printStackTo(writer io.Writer, msg string) { - _, err := fmt.Fprintln(writer, fmt.Sprintf("===== %s =====", msg)) + _, err := fmt.Fprintf(writer, "===== %s =====\n", msg) if err == nil { err = pprof.Lookup("goroutine").WriteTo(writer, 1) } From 362539692e510b82ea24c6eec73dd717d5ee5409 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 24 Sep 2020 09:50:38 +0200 Subject: [PATCH 4/8] Improve interfaces and fix more linter errors --- config/database.go | 1 - database/controller.go | 13 +++++-------- database/storage/badger/badger_test.go | 26 +++++++++++++++++++------- database/storage/bbolt/bbolt.go | 10 ---------- database/storage/bbolt/bbolt_test.go | 15 +++++++-------- database/storage/fstree/fstree.go | 11 +---------- database/storage/fstree/fstree_test.go | 8 ++++++++ database/storage/hashmap/map.go | 10 ---------- database/storage/hashmap/map_test.go | 19 ++++++++----------- database/storage/interface.go | 10 +++++++--- notifications/database.go | 1 - 11 files changed, 55 insertions(+), 69 deletions(-) create mode 100644 database/storage/fstree/fstree_test.go diff --git a/config/database.go b/config/database.go index 109e575..a6419b2 100644 --- a/config/database.go +++ b/config/database.go @@ -134,7 +134,6 @@ func registerAsDatabase() error { Name: "config", Description: "Configuration Manager", StorageType: "injected", - PrimaryAPI: "", }) if err != nil { return err diff --git a/database/controller.go b/database/controller.go index bb9926e..fd6ddbc 100644 --- a/database/controller.go +++ b/database/controller.go @@ -247,8 +247,8 @@ func (c *Controller) Maintain(ctx context.Context) error { return nil } - if maintenance, ok := c.storage.(storage.Maintenance); ok { - return maintenance.Maintain(ctx) + if maintainer, ok := c.storage.(storage.Maintainer); ok { + return maintainer.Maintain(ctx) } return nil } @@ -262,8 +262,8 @@ func (c *Controller) MaintainThorough(ctx context.Context) error { return nil } - if maintenance, ok := c.storage.(storage.Maintenance); ok { - return maintenance.MaintainThorough(ctx) + if maintainer, ok := c.storage.(storage.Maintainer); ok { + return maintainer.MaintainThorough(ctx) } return nil } @@ -277,10 +277,7 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor return nil } - if maintenance, ok := c.storage.(storage.Maintenance); ok { - return maintenance.MaintainRecordStates(ctx, purgeDeletedBefore) - } - return nil + return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore) } // Shutdown shuts down the storage. diff --git a/database/storage/badger/badger_test.go b/database/storage/badger/badger_test.go index 6ddb7db..934f664 100644 --- a/database/storage/badger/badger_test.go +++ b/database/storage/badger/badger_test.go @@ -11,6 +11,13 @@ import ( "github.com/safing/portbase/database/query" "github.com/safing/portbase/database/record" + "github.com/safing/portbase/database/storage" +) + +var ( + // Compile time interface checks. + _ storage.Interface = &Badger{} + _ storage.Maintainer = &Badger{} ) type TestRecord struct { @@ -117,13 +124,18 @@ func TestBadger(t *testing.T) { } // maintenance - err = db.Maintain(context.TODO()) - if err != nil { - t.Fatal(err) - } - err = db.MaintainThorough(context.TODO()) - if err != nil { - t.Fatal(err) + maintainer, ok := db.(storage.Maintainer) + if ok { + err = maintainer.Maintain(context.TODO()) + if err != nil { + t.Fatal(err) + } + err = maintainer.MaintainThorough(context.TODO()) + if err != nil { + t.Fatal(err) + } + } else { + t.Fatal("should implement Maintainer") } // shutdown diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index 654aacb..06e4d8f 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -245,16 +245,6 @@ func (b *BBolt) Injected() bool { return false } -// Maintain runs a light maintenance operation on the database. -func (b *BBolt) Maintain(_ context.Context) error { - return nil -} - -// MaintainThorough runs a thorough maintenance operation on the database. -func (b *BBolt) MaintainThorough(_ context.Context) error { - return nil -} - // MaintainRecordStates maintains records states in the database. func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { now := time.Now().Unix() diff --git a/database/storage/bbolt/bbolt_test.go b/database/storage/bbolt/bbolt_test.go index 456cd39..783c67f 100644 --- a/database/storage/bbolt/bbolt_test.go +++ b/database/storage/bbolt/bbolt_test.go @@ -12,6 +12,13 @@ import ( "github.com/safing/portbase/database/query" "github.com/safing/portbase/database/record" + "github.com/safing/portbase/database/storage" +) + +var ( + // Compile time interface checks. + _ storage.Interface = &BBolt{} + _ storage.Batcher = &BBolt{} ) type TestRecord struct { @@ -146,14 +153,6 @@ func TestBBolt(t *testing.T) { } // maintenance - err = db.Maintain(context.TODO()) - if err != nil { - t.Fatal(err) - } - err = db.MaintainThorough(context.TODO()) - if err != nil { - t.Fatal(err) - } err = db.MaintainRecordStates(context.TODO(), time.Now()) if err != nil { t.Fatal(err) diff --git a/database/storage/fstree/fstree.go b/database/storage/fstree/fstree.go index 229eed8..5970864 100644 --- a/database/storage/fstree/fstree.go +++ b/database/storage/fstree/fstree.go @@ -255,16 +255,6 @@ func (fst *FSTree) Injected() bool { return false } -// Maintain runs a light maintenance operation on the database. -func (fst *FSTree) Maintain(_ context.Context) error { - return nil -} - -// MaintainThorough runs a thorough maintenance operation on the database. -func (fst *FSTree) MaintainThorough(_ context.Context) error { - return nil -} - // MaintainRecordStates maintains records states in the database. func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { // TODO: implement MaintainRecordStates @@ -279,6 +269,7 @@ func (fst *FSTree) Shutdown() error { // writeFile mirrors ioutil.WriteFile, replacing an existing file with the same // name atomically. This is not atomic on Windows, but still an improvement. // TODO: Replace with github.com/google/renamio.WriteFile as soon as it is fixed on Windows. +// TODO: This has become a wont-fix. Explore other options. // This function is forked from https://github.com/google/renameio/blob/a368f9987532a68a3d676566141654a81aa8100b/writefile.go. func writeFile(filename string, data []byte, perm os.FileMode) error { t, err := renameio.TempFile("", filename) diff --git a/database/storage/fstree/fstree_test.go b/database/storage/fstree/fstree_test.go new file mode 100644 index 0000000..88af845 --- /dev/null +++ b/database/storage/fstree/fstree_test.go @@ -0,0 +1,8 @@ +package fstree + +import "github.com/safing/portbase/database/storage" + +var ( + // Compile time interface checks. + _ storage.Interface = &FSTree{} +) diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go index 25a3fb2..4eec7bd 100644 --- a/database/storage/hashmap/map.go +++ b/database/storage/hashmap/map.go @@ -150,16 +150,6 @@ func (hm *HashMap) Injected() bool { return false } -// Maintain runs a light maintenance operation on the database. -func (hm *HashMap) Maintain(_ context.Context) error { - return nil -} - -// MaintainThorough runs a thorough maintenance operation on the database. -func (hm *HashMap) MaintainThorough(_ context.Context) error { - return nil -} - // MaintainRecordStates maintains records states in the database. func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { hm.dbLock.Lock() diff --git a/database/storage/hashmap/map_test.go b/database/storage/hashmap/map_test.go index 36f279b..31df9e1 100644 --- a/database/storage/hashmap/map_test.go +++ b/database/storage/hashmap/map_test.go @@ -2,15 +2,22 @@ package hashmap import ( - "context" "reflect" "sync" "testing" + "github.com/safing/portbase/database/storage" + "github.com/safing/portbase/database/query" "github.com/safing/portbase/database/record" ) +var ( + // Compile time interface checks. + _ storage.Interface = &HashMap{} + _ storage.Batcher = &HashMap{} +) + type TestRecord struct { record.Base sync.Mutex @@ -130,16 +137,6 @@ func TestHashMap(t *testing.T) { t.Fatal("should fail") } - // maintenance - err = db.Maintain(context.TODO()) - if err != nil { - t.Fatal(err) - } - err = db.MaintainThorough(context.TODO()) - if err != nil { - t.Fatal(err) - } - // shutdown err = db.Shutdown() if err != nil { diff --git a/database/storage/interface.go b/database/storage/interface.go index c45c2d6..432ffd3 100644 --- a/database/storage/interface.go +++ b/database/storage/interface.go @@ -11,21 +11,25 @@ import ( // Interface defines the database storage API. type Interface interface { + // Primary Interface Get(key string) (record.Record, error) Put(m record.Record) (record.Record, error) Delete(key string) error Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) + // Information and Control ReadOnly() bool Injected() bool Shutdown() error + + // Mandatory Record Maintenance + MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error } -// Maintenance defines the database storage API for backends that requ -type Maintenance interface { +// Maintainer defines the database storage API for backends that require regular maintenance. +type Maintainer interface { Maintain(ctx context.Context) error MaintainThorough(ctx context.Context) error - MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error } // Batcher defines the database storage API for backends that support batch operations. diff --git a/notifications/database.go b/notifications/database.go index 58b455c..d73f4ca 100644 --- a/notifications/database.go +++ b/notifications/database.go @@ -48,7 +48,6 @@ func registerAsDatabase() error { Name: "notifications", Description: "Notifications", StorageType: "injected", - PrimaryAPI: "", }) if err != nil { return err From 82af9862248d329cf298747c7aa18985e650735a Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 24 Sep 2020 15:02:51 +0200 Subject: [PATCH 5/8] Add Purge method/interface to database system Also, implement Purger interface in bbolt storage. --- database/controller.go | 20 ++++++- database/interface.go | 17 ++++++ database/storage/bbolt/bbolt.go | 87 ++++++++++++++++++++++++++++ database/storage/bbolt/bbolt_test.go | 32 ++++++++++ database/storage/interface.go | 5 ++ 5 files changed, 158 insertions(+), 3 deletions(-) diff --git a/database/controller.go b/database/controller.go index fd6ddbc..902d7fc 100644 --- a/database/controller.go +++ b/database/controller.go @@ -244,7 +244,7 @@ func (c *Controller) Maintain(ctx context.Context) error { defer c.writeLock.RUnlock() if shuttingDown.IsSet() { - return nil + return ErrShuttingDown } if maintainer, ok := c.storage.(storage.Maintainer); ok { @@ -259,7 +259,7 @@ func (c *Controller) MaintainThorough(ctx context.Context) error { defer c.writeLock.RUnlock() if shuttingDown.IsSet() { - return nil + return ErrShuttingDown } if maintainer, ok := c.storage.(storage.Maintainer); ok { @@ -274,12 +274,26 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor defer c.writeLock.RUnlock() if shuttingDown.IsSet() { - return nil + return ErrShuttingDown } return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore) } +func (c *Controller) Purge(ctx context.Context, q *query.Query, local, internal bool) (int, error) { + c.writeLock.RLock() + defer c.writeLock.RUnlock() + + if shuttingDown.IsSet() { + return 0, ErrShuttingDown + } + + if purger, ok := c.storage.(storage.Purger); ok { + return purger.Purge(ctx, q, local, internal, c.shadowDelete) + } + return 0, ErrNotImplemented +} + // Shutdown shuts down the storage. func (c *Controller) Shutdown() error { // acquire full locks diff --git a/database/interface.go b/database/interface.go index 7e15767..02bae88 100644 --- a/database/interface.go +++ b/database/interface.go @@ -1,6 +1,7 @@ package database import ( + "context" "errors" "fmt" "time" @@ -400,6 +401,22 @@ func (i *Interface) Query(q *query.Query) (*iterator.Iterator, error) { return db.Query(q, i.options.Local, i.options.Internal) } +// Purge deletes all records that match the given query. It returns the number +// of successful deletes and an error. +func (i *Interface) Purge(ctx context.Context, q *query.Query) (int, error) { + _, err := q.Check() + if err != nil { + return 0, err + } + + db, err := getController(q.DatabaseName()) + if err != nil { + return 0, err + } + + return db.Purge(ctx, q, i.options.Local, i.options.Internal) +} + // Subscribe subscribes to updates matching the given query. func (i *Interface) Subscribe(q *query.Query) (*Subscription, error) { _, err := q.Check() diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index 06e4d8f..ca88864 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -297,6 +297,93 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim }) } +// Purge deletes all records that match the given query. It returns the number of successful deletes and an error. +func (b *BBolt) Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) { + prefix := []byte(q.DatabaseKeyPrefix()) + + var cnt int + var done bool + for !done { + err := b.db.Update(func(tx *bbolt.Tx) error { + // Create a cursor for iteration. + bucket := tx.Bucket(bucketName) + c := bucket.Cursor() + for key, value := c.Seek(prefix); key != nil; key, value = c.Next() { + // Check if context has been cancelled. + select { + case <-ctx.Done(): + done = true + return nil + default: + } + + // Check if we still match the key prefix, if not, exit. + if !bytes.HasPrefix(key, prefix) { + done = true + return nil + } + + // Wrap the value in a new wrapper to access the metadata. + wrapper, err := record.NewRawWrapper(b.name, string(key), value) + if err != nil { + return err + } + + // Check if we have permission for this record. + if !wrapper.Meta().CheckPermission(local, internal) { + continue + } + + // Check if record is already deleted. + if wrapper.Meta().IsDeleted() { + continue + } + + // Check if the query matches this record. + if !q.MatchesRecord(wrapper) { + continue + } + + // Delete record. + if shadowDelete { + // Shadow delete. + wrapper.Meta().Delete() + deleted, err := wrapper.MarshalRecord(wrapper) + if err != nil { + return err + } + err = bucket.Put(key, deleted) + if err != nil { + return err + } + + // Reposition the cursor after we have edited the bucket. + c.Seek(key) + } else { + // Immediate delete. + err = c.Delete() + if err != nil { + return err + } + } + + // Work in batches of 1000 changes in order to enable other operations in between. + cnt++ + if cnt%1000 == 0 { + return nil + } + } + done = true + return nil + }) + if err != nil { + return cnt, err + } + } + + return cnt, nil +} + // Shutdown shuts down the database. func (b *BBolt) Shutdown() error { return b.db.Close() diff --git a/database/storage/bbolt/bbolt_test.go b/database/storage/bbolt/bbolt_test.go index 783c67f..7897362 100644 --- a/database/storage/bbolt/bbolt_test.go +++ b/database/storage/bbolt/bbolt_test.go @@ -19,6 +19,7 @@ var ( // Compile time interface checks. _ storage.Interface = &BBolt{} _ storage.Batcher = &BBolt{} + _ storage.Purger = &BBolt{} ) type TestRecord struct { @@ -158,6 +159,37 @@ func TestBBolt(t *testing.T) { t.Fatal(err) } + // purging + purger, ok := db.(storage.Purger) + if ok { + n, err := purger.Purge(context.TODO(), query.New("test:path/to/").MustBeValid(), true, true, false) + if err != nil { + t.Fatal(err) + } + if n != 3 { + t.Fatalf("unexpected purge delete count: %d", n) + } + } else { + t.Fatal("should implement Purger") + } + + // test query + q = query.New("test").MustBeValid() + it, err = db.Query(q, true, true) + if err != nil { + t.Fatal(err) + } + cnt = 0 + for range it.Next { + cnt++ + } + if it.Err() != nil { + t.Fatal(it.Err()) + } + if cnt != 1 { + t.Fatalf("unexpected query result count: %d", cnt) + } + // shutdown err = db.Shutdown() if err != nil { diff --git a/database/storage/interface.go b/database/storage/interface.go index 432ffd3..39cffd6 100644 --- a/database/storage/interface.go +++ b/database/storage/interface.go @@ -36,3 +36,8 @@ type Maintainer interface { type Batcher interface { PutMany(shadowDelete bool) (batch chan<- record.Record, errs <-chan error) } + +// Purger defines the database storage API for backends that support the purge operation. +type Purger interface { + Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) +} From da57ca57924ded38f4c7231a3dba667548dfaca6 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 24 Sep 2020 16:38:55 +0200 Subject: [PATCH 6/8] Fix linter errors --- database/controller.go | 1 + database/storage/bbolt/bbolt.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/database/controller.go b/database/controller.go index 902d7fc..415e22d 100644 --- a/database/controller.go +++ b/database/controller.go @@ -280,6 +280,7 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore) } +// Purge deletes all records that match the given query. It returns the number of successful deletes and an error. func (c *Controller) Purge(ctx context.Context, q *query.Query, local, internal bool) (int, error) { c.writeLock.RLock() defer c.writeLock.RUnlock() diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index ca88864..34a281e 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -298,7 +298,7 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim } // Purge deletes all records that match the given query. It returns the number of successful deletes and an error. -func (b *BBolt) Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) { +func (b *BBolt) Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) { //nolint:gocognit prefix := []byte(q.DatabaseKeyPrefix()) var cnt int From bd90e5a5dff42f7fbcda66ed255c9962b02df093 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 24 Sep 2020 20:59:16 +0200 Subject: [PATCH 7/8] Add shadowDelete support to record maintenance --- database/controller.go | 2 +- database/database_test.go | 23 +++++++----- database/storage/badger/badger.go | 2 +- database/storage/bbolt/bbolt.go | 53 +++++++++++++++------------ database/storage/bbolt/bbolt_test.go | 8 +++- database/storage/fstree/fstree.go | 2 +- database/storage/hashmap/map.go | 33 ++++++++++------- database/storage/injectbase.go | 23 ++---------- database/storage/interface.go | 2 +- database/storage/sinkhole/sinkhole.go | 11 +++++- 10 files changed, 87 insertions(+), 72 deletions(-) diff --git a/database/controller.go b/database/controller.go index 415e22d..703ed49 100644 --- a/database/controller.go +++ b/database/controller.go @@ -277,7 +277,7 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor return ErrShuttingDown } - return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore) + return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore, c.shadowDelete) } // Purge deletes all records that match the given query. It returns the number of successful deletes and an error. diff --git a/database/database_test.go b/database/database_test.go index 2f3b4a5..790bd7f 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -26,15 +26,15 @@ func makeKey(dbName, key string) string { return fmt.Sprintf("%s:%s", dbName, key) } -func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaintenance bool) { //nolint:gocognit,gocyclo - t.Run(fmt.Sprintf("TestStorage_%s", storageType), func(t *testing.T) { - dbName := fmt.Sprintf("testing-%s", storageType) +func testDatabase(t *testing.T, storageType string, shadowDelete bool) { //nolint:gocognit,gocyclo + t.Run(fmt.Sprintf("TestStorage_%s_%v", storageType, shadowDelete), func(t *testing.T) { + dbName := fmt.Sprintf("testing-%s-%v", storageType, shadowDelete) fmt.Println(dbName) _, err := Register(&Database{ Name: dbName, Description: fmt.Sprintf("Unit Test Database for %s", storageType), StorageType: storageType, - ShadowDelete: true, + ShadowDelete: shadowDelete, }) if err != nil { t.Fatal(err) @@ -107,7 +107,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint } // test putmany - if testPutMany { + if _, ok := dbController.storage.(storage.Batcher); ok { batchPut := db.PutMany(dbName) records := []record.Record{A, B, C, nil} // nil is to signify finish for _, r := range records { @@ -119,7 +119,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint } // test maintenance - if testRecordMaintenance { + if _, ok := dbController.storage.(storage.Maintainer); ok { now := time.Now().UTC() nowUnix := now.Unix() @@ -238,10 +238,13 @@ func TestDatabaseSystem(t *testing.T) { } defer os.RemoveAll(testDir) // clean up - testDatabase(t, "bbolt", true, true) - testDatabase(t, "hashmap", true, true) - testDatabase(t, "fstree", false, false) - testDatabase(t, "badger", false, false) + for _, shadowDelete := range []bool{false, true} { + testDatabase(t, "bbolt", shadowDelete) + testDatabase(t, "hashmap", shadowDelete) + testDatabase(t, "fstree", shadowDelete) + // testDatabase(t, "badger", shadowDelete) + // TODO: Fix badger tests + } err = MaintainRecordStates(context.TODO()) if err != nil { diff --git a/database/storage/badger/badger.go b/database/storage/badger/badger.go index 7df3155..dd0487c 100644 --- a/database/storage/badger/badger.go +++ b/database/storage/badger/badger.go @@ -208,7 +208,7 @@ func (b *Badger) MaintainThorough(_ context.Context) (err error) { } // MaintainRecordStates maintains records states in the database. -func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { // TODO: implement MaintainRecordStates return nil } diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index 34a281e..3212f36 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -246,7 +246,7 @@ func (b *BBolt) Injected() bool { } // MaintainRecordStates maintains records states in the database. -func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { //nolint:gocognit now := time.Now().Unix() purgeThreshold := purgeDeletedBefore.Unix() @@ -255,6 +255,13 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim // Create a cursor for iteration. c := bucket.Cursor() for key, value := c.First(); key != nil; key, value = c.Next() { + // check if context is cancelled + select { + case <-ctx.Done(): + return nil + default: + } + // wrap value wrapper, err := record.NewRawWrapper(b.name, string(key), value) if err != nil { @@ -264,33 +271,33 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim // check if we need to do maintenance meta := wrapper.Meta() switch { - case meta.Deleted > 0 && meta.Deleted < purgeThreshold: + case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now: + if shadowDelete { + // mark as deleted + meta.Deleted = meta.Expires + deleted, err := wrapper.MarshalRecord(wrapper) + if err != nil { + return err + } + err = bucket.Put(key, deleted) + if err != nil { + return err + } + + // reposition cursor + c.Seek(key) + + continue + } + + // Immediately delete expired entries if shadowDelete is disabled. + fallthrough + case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold): // delete from storage err = c.Delete() if err != nil { return err } - case meta.Expires > 0 && meta.Expires < now: - // mark as deleted - meta.Deleted = meta.Expires - deleted, err := wrapper.MarshalRecord(wrapper) - if err != nil { - return err - } - err = bucket.Put(key, deleted) - if err != nil { - return err - } - - // reposition cursor - c.Seek(key) - } - - // check if context is cancelled - select { - case <-ctx.Done(): - return nil - default: } } return nil diff --git a/database/storage/bbolt/bbolt_test.go b/database/storage/bbolt/bbolt_test.go index 7897362..03fa3f0 100644 --- a/database/storage/bbolt/bbolt_test.go +++ b/database/storage/bbolt/bbolt_test.go @@ -154,7 +154,13 @@ func TestBBolt(t *testing.T) { } // maintenance - err = db.MaintainRecordStates(context.TODO(), time.Now()) + err = db.MaintainRecordStates(context.TODO(), time.Now(), true) + if err != nil { + t.Fatal(err) + } + + // maintenance + err = db.MaintainRecordStates(context.TODO(), time.Now(), false) if err != nil { t.Fatal(err) } diff --git a/database/storage/fstree/fstree.go b/database/storage/fstree/fstree.go index 5970864..a96f914 100644 --- a/database/storage/fstree/fstree.go +++ b/database/storage/fstree/fstree.go @@ -256,7 +256,7 @@ func (fst *FSTree) Injected() bool { } // MaintainRecordStates maintains records states in the database. -func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { // TODO: implement MaintainRecordStates return nil } diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go index 4eec7bd..4005f07 100644 --- a/database/storage/hashmap/map.go +++ b/database/storage/hashmap/map.go @@ -151,7 +151,7 @@ func (hm *HashMap) Injected() bool { } // MaintainRecordStates maintains records states in the database. -func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { hm.dbLock.Lock() defer hm.dbLock.Unlock() @@ -159,24 +159,31 @@ func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore purgeThreshold := purgeDeletedBefore.Unix() for key, record := range hm.db { - meta := record.Meta() - switch { - case meta.Deleted > 0 && meta.Deleted < purgeThreshold: - // delete from storage - delete(hm.db, key) - case meta.Expires > 0 && meta.Expires < now: - // mark as deleted - record.Lock() - meta.Deleted = meta.Expires - record.Unlock() - } - // check if context is cancelled select { case <-ctx.Done(): return nil default: } + + meta := record.Meta() + switch { + case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now: + if shadowDelete { + // mark as deleted + record.Lock() + meta.Deleted = meta.Expires + record.Unlock() + + continue + } + + // Immediately delete expired entries if shadowDelete is disabled. + fallthrough + case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold): + // delete from storage + delete(hm.db, key) + } } return nil diff --git a/database/storage/injectbase.go b/database/storage/injectbase.go index 0f6bf25..467c1ba 100644 --- a/database/storage/injectbase.go +++ b/database/storage/injectbase.go @@ -17,6 +17,9 @@ var ( // InjectBase is a dummy base structure to reduce boilerplate code for injected storage interfaces. type InjectBase struct{} +// Compile time interface check +var _ Interface = &InjectBase{} + // Get returns a database record. func (i *InjectBase) Get(key string) (record.Record, error) { return nil, errNotImplemented @@ -27,14 +30,6 @@ func (i *InjectBase) Put(m record.Record) (record.Record, error) { return nil, errNotImplemented } -// PutMany stores many records in the database. -func (i *InjectBase) PutMany(shadowDelete bool) (batch chan record.Record, err chan error) { - batch = make(chan record.Record) - err = make(chan error, 1) - err <- errNotImplemented - return -} - // Delete deletes a record from the database. func (i *InjectBase) Delete(key string) error { return errNotImplemented @@ -55,18 +50,8 @@ func (i *InjectBase) Injected() bool { return true } -// Maintain runs a light maintenance operation on the database. -func (i *InjectBase) Maintain(ctx context.Context) error { - return nil -} - -// MaintainThorough runs a thorough maintenance operation on the database. -func (i *InjectBase) MaintainThorough(ctx context.Context) error { - return nil -} - // MaintainRecordStates maintains records states in the database. -func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { return nil } diff --git a/database/storage/interface.go b/database/storage/interface.go index 39cffd6..b1cfcb1 100644 --- a/database/storage/interface.go +++ b/database/storage/interface.go @@ -23,7 +23,7 @@ type Interface interface { Shutdown() error // Mandatory Record Maintenance - MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error + MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error } // Maintainer defines the database storage API for backends that require regular maintenance. diff --git a/database/storage/sinkhole/sinkhole.go b/database/storage/sinkhole/sinkhole.go index b7e4627..d033638 100644 --- a/database/storage/sinkhole/sinkhole.go +++ b/database/storage/sinkhole/sinkhole.go @@ -16,6 +16,13 @@ type Sinkhole struct { name string } +var ( + // Compile time interface check + _ storage.Interface = &Sinkhole{} + _ storage.Maintainer = &Sinkhole{} + _ storage.Batcher = &Sinkhole{} +) + func init() { _ = storage.Register("sinkhole", NewSinkhole) } @@ -43,7 +50,7 @@ func (s *Sinkhole) Put(r record.Record) (record.Record, error) { } // PutMany stores many records in the database. -func (s *Sinkhole) PutMany() (chan<- record.Record, <-chan error) { +func (s *Sinkhole) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) { batch := make(chan record.Record, 100) errs := make(chan error, 1) @@ -89,7 +96,7 @@ func (s *Sinkhole) MaintainThorough(ctx context.Context) error { } // MaintainRecordStates maintains records states in the database. -func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { return nil } From 74dd5eea37020241184038b22b39e724df6ce637 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 25 Sep 2020 09:11:19 +0200 Subject: [PATCH 8/8] Improve docs for confusing and contradicting case --- database/storage/bbolt/bbolt.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index 3212f36..365888c 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -284,7 +284,11 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim return err } - // reposition cursor + // Cursor repositioning is required after modifying data. + // While the documentation states that this is also required after a + // delete, this actually makes the cursor skip a record with the + // following c.Next() call of the loop. + // Docs/Issue: https://github.com/boltdb/bolt/issues/426#issuecomment-141982984 c.Seek(key) continue @@ -364,8 +368,13 @@ func (b *BBolt) Purge(ctx context.Context, q *query.Query, local, internal, shad return err } - // Reposition the cursor after we have edited the bucket. + // Cursor repositioning is required after modifying data. + // While the documentation states that this is also required after a + // delete, this actually makes the cursor skip a record with the + // following c.Next() call of the loop. + // Docs/Issue: https://github.com/boltdb/bolt/issues/426#issuecomment-141982984 c.Seek(key) + } else { // Immediate delete. err = c.Delete()