diff --git a/database/controller.go b/database/controller.go index 415e22d..703ed49 100644 --- a/database/controller.go +++ b/database/controller.go @@ -277,7 +277,7 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor return ErrShuttingDown } - return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore) + return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore, c.shadowDelete) } // Purge deletes all records that match the given query. It returns the number of successful deletes and an error. diff --git a/database/database_test.go b/database/database_test.go index 2f3b4a5..790bd7f 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -26,15 +26,15 @@ func makeKey(dbName, key string) string { return fmt.Sprintf("%s:%s", dbName, key) } -func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaintenance bool) { //nolint:gocognit,gocyclo - t.Run(fmt.Sprintf("TestStorage_%s", storageType), func(t *testing.T) { - dbName := fmt.Sprintf("testing-%s", storageType) +func testDatabase(t *testing.T, storageType string, shadowDelete bool) { //nolint:gocognit,gocyclo + t.Run(fmt.Sprintf("TestStorage_%s_%v", storageType, shadowDelete), func(t *testing.T) { + dbName := fmt.Sprintf("testing-%s-%v", storageType, shadowDelete) fmt.Println(dbName) _, err := Register(&Database{ Name: dbName, Description: fmt.Sprintf("Unit Test Database for %s", storageType), StorageType: storageType, - ShadowDelete: true, + ShadowDelete: shadowDelete, }) if err != nil { t.Fatal(err) @@ -107,7 +107,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint } // test putmany - if testPutMany { + if _, ok := dbController.storage.(storage.Batcher); ok { batchPut := db.PutMany(dbName) records := []record.Record{A, B, C, nil} // nil is to signify finish for _, r := range records { @@ -119,7 +119,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint } // test maintenance - if testRecordMaintenance { + if _, ok := dbController.storage.(storage.Maintainer); ok { now := time.Now().UTC() nowUnix := now.Unix() @@ -238,10 +238,13 @@ func TestDatabaseSystem(t *testing.T) { } defer os.RemoveAll(testDir) // clean up - testDatabase(t, "bbolt", true, true) - testDatabase(t, "hashmap", true, true) - testDatabase(t, "fstree", false, false) - testDatabase(t, "badger", false, false) + for _, shadowDelete := range []bool{false, true} { + testDatabase(t, "bbolt", shadowDelete) + testDatabase(t, "hashmap", shadowDelete) + testDatabase(t, "fstree", shadowDelete) + // testDatabase(t, "badger", shadowDelete) + // TODO: Fix badger tests + } err = MaintainRecordStates(context.TODO()) if err != nil { diff --git a/database/storage/badger/badger.go b/database/storage/badger/badger.go index 7df3155..dd0487c 100644 --- a/database/storage/badger/badger.go +++ b/database/storage/badger/badger.go @@ -208,7 +208,7 @@ func (b *Badger) MaintainThorough(_ context.Context) (err error) { } // MaintainRecordStates maintains records states in the database. -func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { // TODO: implement MaintainRecordStates return nil } diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index 34a281e..3212f36 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -246,7 +246,7 @@ func (b *BBolt) Injected() bool { } // MaintainRecordStates maintains records states in the database. -func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { //nolint:gocognit now := time.Now().Unix() purgeThreshold := purgeDeletedBefore.Unix() @@ -255,6 +255,13 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim // Create a cursor for iteration. c := bucket.Cursor() for key, value := c.First(); key != nil; key, value = c.Next() { + // check if context is cancelled + select { + case <-ctx.Done(): + return nil + default: + } + // wrap value wrapper, err := record.NewRawWrapper(b.name, string(key), value) if err != nil { @@ -264,33 +271,33 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim // check if we need to do maintenance meta := wrapper.Meta() switch { - case meta.Deleted > 0 && meta.Deleted < purgeThreshold: + case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now: + if shadowDelete { + // mark as deleted + meta.Deleted = meta.Expires + deleted, err := wrapper.MarshalRecord(wrapper) + if err != nil { + return err + } + err = bucket.Put(key, deleted) + if err != nil { + return err + } + + // reposition cursor + c.Seek(key) + + continue + } + + // Immediately delete expired entries if shadowDelete is disabled. + fallthrough + case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold): // delete from storage err = c.Delete() if err != nil { return err } - case meta.Expires > 0 && meta.Expires < now: - // mark as deleted - meta.Deleted = meta.Expires - deleted, err := wrapper.MarshalRecord(wrapper) - if err != nil { - return err - } - err = bucket.Put(key, deleted) - if err != nil { - return err - } - - // reposition cursor - c.Seek(key) - } - - // check if context is cancelled - select { - case <-ctx.Done(): - return nil - default: } } return nil diff --git a/database/storage/bbolt/bbolt_test.go b/database/storage/bbolt/bbolt_test.go index 7897362..03fa3f0 100644 --- a/database/storage/bbolt/bbolt_test.go +++ b/database/storage/bbolt/bbolt_test.go @@ -154,7 +154,13 @@ func TestBBolt(t *testing.T) { } // maintenance - err = db.MaintainRecordStates(context.TODO(), time.Now()) + err = db.MaintainRecordStates(context.TODO(), time.Now(), true) + if err != nil { + t.Fatal(err) + } + + // maintenance + err = db.MaintainRecordStates(context.TODO(), time.Now(), false) if err != nil { t.Fatal(err) } diff --git a/database/storage/fstree/fstree.go b/database/storage/fstree/fstree.go index 5970864..a96f914 100644 --- a/database/storage/fstree/fstree.go +++ b/database/storage/fstree/fstree.go @@ -256,7 +256,7 @@ func (fst *FSTree) Injected() bool { } // MaintainRecordStates maintains records states in the database. -func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { // TODO: implement MaintainRecordStates return nil } diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go index 4eec7bd..4005f07 100644 --- a/database/storage/hashmap/map.go +++ b/database/storage/hashmap/map.go @@ -151,7 +151,7 @@ func (hm *HashMap) Injected() bool { } // MaintainRecordStates maintains records states in the database. -func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { hm.dbLock.Lock() defer hm.dbLock.Unlock() @@ -159,24 +159,31 @@ func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore purgeThreshold := purgeDeletedBefore.Unix() for key, record := range hm.db { - meta := record.Meta() - switch { - case meta.Deleted > 0 && meta.Deleted < purgeThreshold: - // delete from storage - delete(hm.db, key) - case meta.Expires > 0 && meta.Expires < now: - // mark as deleted - record.Lock() - meta.Deleted = meta.Expires - record.Unlock() - } - // check if context is cancelled select { case <-ctx.Done(): return nil default: } + + meta := record.Meta() + switch { + case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now: + if shadowDelete { + // mark as deleted + record.Lock() + meta.Deleted = meta.Expires + record.Unlock() + + continue + } + + // Immediately delete expired entries if shadowDelete is disabled. + fallthrough + case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold): + // delete from storage + delete(hm.db, key) + } } return nil diff --git a/database/storage/injectbase.go b/database/storage/injectbase.go index 0f6bf25..467c1ba 100644 --- a/database/storage/injectbase.go +++ b/database/storage/injectbase.go @@ -17,6 +17,9 @@ var ( // InjectBase is a dummy base structure to reduce boilerplate code for injected storage interfaces. type InjectBase struct{} +// Compile time interface check +var _ Interface = &InjectBase{} + // Get returns a database record. func (i *InjectBase) Get(key string) (record.Record, error) { return nil, errNotImplemented @@ -27,14 +30,6 @@ func (i *InjectBase) Put(m record.Record) (record.Record, error) { return nil, errNotImplemented } -// PutMany stores many records in the database. -func (i *InjectBase) PutMany(shadowDelete bool) (batch chan record.Record, err chan error) { - batch = make(chan record.Record) - err = make(chan error, 1) - err <- errNotImplemented - return -} - // Delete deletes a record from the database. func (i *InjectBase) Delete(key string) error { return errNotImplemented @@ -55,18 +50,8 @@ func (i *InjectBase) Injected() bool { return true } -// Maintain runs a light maintenance operation on the database. -func (i *InjectBase) Maintain(ctx context.Context) error { - return nil -} - -// MaintainThorough runs a thorough maintenance operation on the database. -func (i *InjectBase) MaintainThorough(ctx context.Context) error { - return nil -} - // MaintainRecordStates maintains records states in the database. -func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { return nil } diff --git a/database/storage/interface.go b/database/storage/interface.go index 39cffd6..b1cfcb1 100644 --- a/database/storage/interface.go +++ b/database/storage/interface.go @@ -23,7 +23,7 @@ type Interface interface { Shutdown() error // Mandatory Record Maintenance - MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error + MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error } // Maintainer defines the database storage API for backends that require regular maintenance. diff --git a/database/storage/sinkhole/sinkhole.go b/database/storage/sinkhole/sinkhole.go index b7e4627..d033638 100644 --- a/database/storage/sinkhole/sinkhole.go +++ b/database/storage/sinkhole/sinkhole.go @@ -16,6 +16,13 @@ type Sinkhole struct { name string } +var ( + // Compile time interface check + _ storage.Interface = &Sinkhole{} + _ storage.Maintainer = &Sinkhole{} + _ storage.Batcher = &Sinkhole{} +) + func init() { _ = storage.Register("sinkhole", NewSinkhole) } @@ -43,7 +50,7 @@ func (s *Sinkhole) Put(r record.Record) (record.Record, error) { } // PutMany stores many records in the database. -func (s *Sinkhole) PutMany() (chan<- record.Record, <-chan error) { +func (s *Sinkhole) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) { batch := make(chan record.Record, 100) errs := make(chan error, 1) @@ -89,7 +96,7 @@ func (s *Sinkhole) MaintainThorough(ctx context.Context) error { } // MaintainRecordStates maintains records states in the database. -func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { return nil }