Add shadowDelete support to record maintenance

This commit is contained in:
Daniel 2020-09-24 20:59:16 +02:00
parent 30a53cf840
commit bd90e5a5df
10 changed files with 87 additions and 72 deletions

View file

@ -277,7 +277,7 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor
return ErrShuttingDown
}
return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore)
return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore, c.shadowDelete)
}
// Purge deletes all records that match the given query. It returns the number of successful deletes and an error.

View file

@ -26,15 +26,15 @@ func makeKey(dbName, key string) string {
return fmt.Sprintf("%s:%s", dbName, key)
}
func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaintenance bool) { //nolint:gocognit,gocyclo
t.Run(fmt.Sprintf("TestStorage_%s", storageType), func(t *testing.T) {
dbName := fmt.Sprintf("testing-%s", storageType)
func testDatabase(t *testing.T, storageType string, shadowDelete bool) { //nolint:gocognit,gocyclo
t.Run(fmt.Sprintf("TestStorage_%s_%v", storageType, shadowDelete), func(t *testing.T) {
dbName := fmt.Sprintf("testing-%s-%v", storageType, shadowDelete)
fmt.Println(dbName)
_, err := Register(&Database{
Name: dbName,
Description: fmt.Sprintf("Unit Test Database for %s", storageType),
StorageType: storageType,
ShadowDelete: true,
ShadowDelete: shadowDelete,
})
if err != nil {
t.Fatal(err)
@ -107,7 +107,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint
}
// test putmany
if testPutMany {
if _, ok := dbController.storage.(storage.Batcher); ok {
batchPut := db.PutMany(dbName)
records := []record.Record{A, B, C, nil} // nil is to signify finish
for _, r := range records {
@ -119,7 +119,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint
}
// test maintenance
if testRecordMaintenance {
if _, ok := dbController.storage.(storage.Maintainer); ok {
now := time.Now().UTC()
nowUnix := now.Unix()
@ -238,10 +238,13 @@ func TestDatabaseSystem(t *testing.T) {
}
defer os.RemoveAll(testDir) // clean up
testDatabase(t, "bbolt", true, true)
testDatabase(t, "hashmap", true, true)
testDatabase(t, "fstree", false, false)
testDatabase(t, "badger", false, false)
for _, shadowDelete := range []bool{false, true} {
testDatabase(t, "bbolt", shadowDelete)
testDatabase(t, "hashmap", shadowDelete)
testDatabase(t, "fstree", shadowDelete)
// testDatabase(t, "badger", shadowDelete)
// TODO: Fix badger tests
}
err = MaintainRecordStates(context.TODO())
if err != nil {

View file

@ -208,7 +208,7 @@ func (b *Badger) MaintainThorough(_ context.Context) (err error) {
}
// MaintainRecordStates maintains records states in the database.
func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
// TODO: implement MaintainRecordStates
return nil
}

View file

@ -246,7 +246,7 @@ func (b *BBolt) Injected() bool {
}
// MaintainRecordStates maintains records states in the database.
func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { //nolint:gocognit
now := time.Now().Unix()
purgeThreshold := purgeDeletedBefore.Unix()
@ -255,6 +255,13 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim
// Create a cursor for iteration.
c := bucket.Cursor()
for key, value := c.First(); key != nil; key, value = c.Next() {
// check if context is cancelled
select {
case <-ctx.Done():
return nil
default:
}
// wrap value
wrapper, err := record.NewRawWrapper(b.name, string(key), value)
if err != nil {
@ -264,33 +271,33 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim
// check if we need to do maintenance
meta := wrapper.Meta()
switch {
case meta.Deleted > 0 && meta.Deleted < purgeThreshold:
case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now:
if shadowDelete {
// mark as deleted
meta.Deleted = meta.Expires
deleted, err := wrapper.MarshalRecord(wrapper)
if err != nil {
return err
}
err = bucket.Put(key, deleted)
if err != nil {
return err
}
// reposition cursor
c.Seek(key)
continue
}
// Immediately delete expired entries if shadowDelete is disabled.
fallthrough
case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold):
// delete from storage
err = c.Delete()
if err != nil {
return err
}
case meta.Expires > 0 && meta.Expires < now:
// mark as deleted
meta.Deleted = meta.Expires
deleted, err := wrapper.MarshalRecord(wrapper)
if err != nil {
return err
}
err = bucket.Put(key, deleted)
if err != nil {
return err
}
// reposition cursor
c.Seek(key)
}
// check if context is cancelled
select {
case <-ctx.Done():
return nil
default:
}
}
return nil

View file

@ -154,7 +154,13 @@ func TestBBolt(t *testing.T) {
}
// maintenance
err = db.MaintainRecordStates(context.TODO(), time.Now())
err = db.MaintainRecordStates(context.TODO(), time.Now(), true)
if err != nil {
t.Fatal(err)
}
// maintenance
err = db.MaintainRecordStates(context.TODO(), time.Now(), false)
if err != nil {
t.Fatal(err)
}

View file

@ -256,7 +256,7 @@ func (fst *FSTree) Injected() bool {
}
// MaintainRecordStates maintains records states in the database.
func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
// TODO: implement MaintainRecordStates
return nil
}

View file

@ -151,7 +151,7 @@ func (hm *HashMap) Injected() bool {
}
// MaintainRecordStates maintains records states in the database.
func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
hm.dbLock.Lock()
defer hm.dbLock.Unlock()
@ -159,24 +159,31 @@ func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore
purgeThreshold := purgeDeletedBefore.Unix()
for key, record := range hm.db {
meta := record.Meta()
switch {
case meta.Deleted > 0 && meta.Deleted < purgeThreshold:
// delete from storage
delete(hm.db, key)
case meta.Expires > 0 && meta.Expires < now:
// mark as deleted
record.Lock()
meta.Deleted = meta.Expires
record.Unlock()
}
// check if context is cancelled
select {
case <-ctx.Done():
return nil
default:
}
meta := record.Meta()
switch {
case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now:
if shadowDelete {
// mark as deleted
record.Lock()
meta.Deleted = meta.Expires
record.Unlock()
continue
}
// Immediately delete expired entries if shadowDelete is disabled.
fallthrough
case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold):
// delete from storage
delete(hm.db, key)
}
}
return nil

View file

@ -17,6 +17,9 @@ var (
// InjectBase is a dummy base structure to reduce boilerplate code for injected storage interfaces.
type InjectBase struct{}
// Compile time interface check
var _ Interface = &InjectBase{}
// Get returns a database record.
func (i *InjectBase) Get(key string) (record.Record, error) {
return nil, errNotImplemented
@ -27,14 +30,6 @@ func (i *InjectBase) Put(m record.Record) (record.Record, error) {
return nil, errNotImplemented
}
// PutMany stores many records in the database.
func (i *InjectBase) PutMany(shadowDelete bool) (batch chan record.Record, err chan error) {
batch = make(chan record.Record)
err = make(chan error, 1)
err <- errNotImplemented
return
}
// Delete deletes a record from the database.
func (i *InjectBase) Delete(key string) error {
return errNotImplemented
@ -55,18 +50,8 @@ func (i *InjectBase) Injected() bool {
return true
}
// Maintain runs a light maintenance operation on the database.
func (i *InjectBase) Maintain(ctx context.Context) error {
return nil
}
// MaintainThorough runs a thorough maintenance operation on the database.
func (i *InjectBase) MaintainThorough(ctx context.Context) error {
return nil
}
// MaintainRecordStates maintains records states in the database.
func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
return nil
}

View file

@ -23,7 +23,7 @@ type Interface interface {
Shutdown() error
// Mandatory Record Maintenance
MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error
MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error
}
// Maintainer defines the database storage API for backends that require regular maintenance.

View file

@ -16,6 +16,13 @@ type Sinkhole struct {
name string
}
var (
// Compile time interface check
_ storage.Interface = &Sinkhole{}
_ storage.Maintainer = &Sinkhole{}
_ storage.Batcher = &Sinkhole{}
)
func init() {
_ = storage.Register("sinkhole", NewSinkhole)
}
@ -43,7 +50,7 @@ func (s *Sinkhole) Put(r record.Record) (record.Record, error) {
}
// PutMany stores many records in the database.
func (s *Sinkhole) PutMany() (chan<- record.Record, <-chan error) {
func (s *Sinkhole) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
batch := make(chan record.Record, 100)
errs := make(chan error, 1)
@ -89,7 +96,7 @@ func (s *Sinkhole) MaintainThorough(ctx context.Context) error {
}
// MaintainRecordStates maintains records states in the database.
func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
return nil
}