mirror of
https://github.com/safing/portbase
synced 2025-09-01 10:09:50 +00:00
Merge pull request #86 from safing/feature/shadowdelete-expiry
Add shadowDelete support to record maintenance
This commit is contained in:
commit
d4bb5ae522
10 changed files with 97 additions and 73 deletions
|
@ -277,7 +277,7 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor
|
|||
return ErrShuttingDown
|
||||
}
|
||||
|
||||
return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore)
|
||||
return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore, c.shadowDelete)
|
||||
}
|
||||
|
||||
// Purge deletes all records that match the given query. It returns the number of successful deletes and an error.
|
||||
|
|
|
@ -26,15 +26,15 @@ func makeKey(dbName, key string) string {
|
|||
return fmt.Sprintf("%s:%s", dbName, key)
|
||||
}
|
||||
|
||||
func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaintenance bool) { //nolint:gocognit,gocyclo
|
||||
t.Run(fmt.Sprintf("TestStorage_%s", storageType), func(t *testing.T) {
|
||||
dbName := fmt.Sprintf("testing-%s", storageType)
|
||||
func testDatabase(t *testing.T, storageType string, shadowDelete bool) { //nolint:gocognit,gocyclo
|
||||
t.Run(fmt.Sprintf("TestStorage_%s_%v", storageType, shadowDelete), func(t *testing.T) {
|
||||
dbName := fmt.Sprintf("testing-%s-%v", storageType, shadowDelete)
|
||||
fmt.Println(dbName)
|
||||
_, err := Register(&Database{
|
||||
Name: dbName,
|
||||
Description: fmt.Sprintf("Unit Test Database for %s", storageType),
|
||||
StorageType: storageType,
|
||||
ShadowDelete: true,
|
||||
ShadowDelete: shadowDelete,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -107,7 +107,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint
|
|||
}
|
||||
|
||||
// test putmany
|
||||
if testPutMany {
|
||||
if _, ok := dbController.storage.(storage.Batcher); ok {
|
||||
batchPut := db.PutMany(dbName)
|
||||
records := []record.Record{A, B, C, nil} // nil is to signify finish
|
||||
for _, r := range records {
|
||||
|
@ -119,7 +119,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint
|
|||
}
|
||||
|
||||
// test maintenance
|
||||
if testRecordMaintenance {
|
||||
if _, ok := dbController.storage.(storage.Maintainer); ok {
|
||||
now := time.Now().UTC()
|
||||
nowUnix := now.Unix()
|
||||
|
||||
|
@ -238,10 +238,13 @@ func TestDatabaseSystem(t *testing.T) {
|
|||
}
|
||||
defer os.RemoveAll(testDir) // clean up
|
||||
|
||||
testDatabase(t, "bbolt", true, true)
|
||||
testDatabase(t, "hashmap", true, true)
|
||||
testDatabase(t, "fstree", false, false)
|
||||
testDatabase(t, "badger", false, false)
|
||||
for _, shadowDelete := range []bool{false, true} {
|
||||
testDatabase(t, "bbolt", shadowDelete)
|
||||
testDatabase(t, "hashmap", shadowDelete)
|
||||
testDatabase(t, "fstree", shadowDelete)
|
||||
// testDatabase(t, "badger", shadowDelete)
|
||||
// TODO: Fix badger tests
|
||||
}
|
||||
|
||||
err = MaintainRecordStates(context.TODO())
|
||||
if err != nil {
|
||||
|
|
|
@ -208,7 +208,7 @@ func (b *Badger) MaintainThorough(_ context.Context) (err error) {
|
|||
}
|
||||
|
||||
// MaintainRecordStates maintains records states in the database.
|
||||
func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
|
||||
func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
|
||||
// TODO: implement MaintainRecordStates
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -246,7 +246,7 @@ func (b *BBolt) Injected() bool {
|
|||
}
|
||||
|
||||
// MaintainRecordStates maintains records states in the database.
|
||||
func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
|
||||
func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { //nolint:gocognit
|
||||
now := time.Now().Unix()
|
||||
purgeThreshold := purgeDeletedBefore.Unix()
|
||||
|
||||
|
@ -255,6 +255,13 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim
|
|||
// Create a cursor for iteration.
|
||||
c := bucket.Cursor()
|
||||
for key, value := c.First(); key != nil; key, value = c.Next() {
|
||||
// check if context is cancelled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// wrap value
|
||||
wrapper, err := record.NewRawWrapper(b.name, string(key), value)
|
||||
if err != nil {
|
||||
|
@ -264,33 +271,37 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim
|
|||
// check if we need to do maintenance
|
||||
meta := wrapper.Meta()
|
||||
switch {
|
||||
case meta.Deleted > 0 && meta.Deleted < purgeThreshold:
|
||||
case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now:
|
||||
if shadowDelete {
|
||||
// mark as deleted
|
||||
meta.Deleted = meta.Expires
|
||||
deleted, err := wrapper.MarshalRecord(wrapper)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = bucket.Put(key, deleted)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Cursor repositioning is required after modifying data.
|
||||
// While the documentation states that this is also required after a
|
||||
// delete, this actually makes the cursor skip a record with the
|
||||
// following c.Next() call of the loop.
|
||||
// Docs/Issue: https://github.com/boltdb/bolt/issues/426#issuecomment-141982984
|
||||
c.Seek(key)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// Immediately delete expired entries if shadowDelete is disabled.
|
||||
fallthrough
|
||||
case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold):
|
||||
// delete from storage
|
||||
err = c.Delete()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case meta.Expires > 0 && meta.Expires < now:
|
||||
// mark as deleted
|
||||
meta.Deleted = meta.Expires
|
||||
deleted, err := wrapper.MarshalRecord(wrapper)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = bucket.Put(key, deleted)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// reposition cursor
|
||||
c.Seek(key)
|
||||
}
|
||||
|
||||
// check if context is cancelled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -357,8 +368,13 @@ func (b *BBolt) Purge(ctx context.Context, q *query.Query, local, internal, shad
|
|||
return err
|
||||
}
|
||||
|
||||
// Reposition the cursor after we have edited the bucket.
|
||||
// Cursor repositioning is required after modifying data.
|
||||
// While the documentation states that this is also required after a
|
||||
// delete, this actually makes the cursor skip a record with the
|
||||
// following c.Next() call of the loop.
|
||||
// Docs/Issue: https://github.com/boltdb/bolt/issues/426#issuecomment-141982984
|
||||
c.Seek(key)
|
||||
|
||||
} else {
|
||||
// Immediate delete.
|
||||
err = c.Delete()
|
||||
|
|
|
@ -154,7 +154,13 @@ func TestBBolt(t *testing.T) {
|
|||
}
|
||||
|
||||
// maintenance
|
||||
err = db.MaintainRecordStates(context.TODO(), time.Now())
|
||||
err = db.MaintainRecordStates(context.TODO(), time.Now(), true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// maintenance
|
||||
err = db.MaintainRecordStates(context.TODO(), time.Now(), false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -256,7 +256,7 @@ func (fst *FSTree) Injected() bool {
|
|||
}
|
||||
|
||||
// MaintainRecordStates maintains records states in the database.
|
||||
func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
|
||||
func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
|
||||
// TODO: implement MaintainRecordStates
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -151,7 +151,7 @@ func (hm *HashMap) Injected() bool {
|
|||
}
|
||||
|
||||
// MaintainRecordStates maintains records states in the database.
|
||||
func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
|
||||
func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
|
||||
hm.dbLock.Lock()
|
||||
defer hm.dbLock.Unlock()
|
||||
|
||||
|
@ -159,24 +159,31 @@ func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore
|
|||
purgeThreshold := purgeDeletedBefore.Unix()
|
||||
|
||||
for key, record := range hm.db {
|
||||
meta := record.Meta()
|
||||
switch {
|
||||
case meta.Deleted > 0 && meta.Deleted < purgeThreshold:
|
||||
// delete from storage
|
||||
delete(hm.db, key)
|
||||
case meta.Expires > 0 && meta.Expires < now:
|
||||
// mark as deleted
|
||||
record.Lock()
|
||||
meta.Deleted = meta.Expires
|
||||
record.Unlock()
|
||||
}
|
||||
|
||||
// check if context is cancelled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
meta := record.Meta()
|
||||
switch {
|
||||
case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now:
|
||||
if shadowDelete {
|
||||
// mark as deleted
|
||||
record.Lock()
|
||||
meta.Deleted = meta.Expires
|
||||
record.Unlock()
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// Immediately delete expired entries if shadowDelete is disabled.
|
||||
fallthrough
|
||||
case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold):
|
||||
// delete from storage
|
||||
delete(hm.db, key)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -17,6 +17,9 @@ var (
|
|||
// InjectBase is a dummy base structure to reduce boilerplate code for injected storage interfaces.
|
||||
type InjectBase struct{}
|
||||
|
||||
// Compile time interface check
|
||||
var _ Interface = &InjectBase{}
|
||||
|
||||
// Get returns a database record.
|
||||
func (i *InjectBase) Get(key string) (record.Record, error) {
|
||||
return nil, errNotImplemented
|
||||
|
@ -27,14 +30,6 @@ func (i *InjectBase) Put(m record.Record) (record.Record, error) {
|
|||
return nil, errNotImplemented
|
||||
}
|
||||
|
||||
// PutMany stores many records in the database.
|
||||
func (i *InjectBase) PutMany(shadowDelete bool) (batch chan record.Record, err chan error) {
|
||||
batch = make(chan record.Record)
|
||||
err = make(chan error, 1)
|
||||
err <- errNotImplemented
|
||||
return
|
||||
}
|
||||
|
||||
// Delete deletes a record from the database.
|
||||
func (i *InjectBase) Delete(key string) error {
|
||||
return errNotImplemented
|
||||
|
@ -55,18 +50,8 @@ func (i *InjectBase) Injected() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// Maintain runs a light maintenance operation on the database.
|
||||
func (i *InjectBase) Maintain(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// MaintainThorough runs a thorough maintenance operation on the database.
|
||||
func (i *InjectBase) MaintainThorough(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// MaintainRecordStates maintains records states in the database.
|
||||
func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
|
||||
func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ type Interface interface {
|
|||
Shutdown() error
|
||||
|
||||
// Mandatory Record Maintenance
|
||||
MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error
|
||||
MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error
|
||||
}
|
||||
|
||||
// Maintainer defines the database storage API for backends that require regular maintenance.
|
||||
|
|
|
@ -16,6 +16,13 @@ type Sinkhole struct {
|
|||
name string
|
||||
}
|
||||
|
||||
var (
|
||||
// Compile time interface check
|
||||
_ storage.Interface = &Sinkhole{}
|
||||
_ storage.Maintainer = &Sinkhole{}
|
||||
_ storage.Batcher = &Sinkhole{}
|
||||
)
|
||||
|
||||
func init() {
|
||||
_ = storage.Register("sinkhole", NewSinkhole)
|
||||
}
|
||||
|
@ -43,7 +50,7 @@ func (s *Sinkhole) Put(r record.Record) (record.Record, error) {
|
|||
}
|
||||
|
||||
// PutMany stores many records in the database.
|
||||
func (s *Sinkhole) PutMany() (chan<- record.Record, <-chan error) {
|
||||
func (s *Sinkhole) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) {
|
||||
batch := make(chan record.Record, 100)
|
||||
errs := make(chan error, 1)
|
||||
|
||||
|
@ -89,7 +96,7 @@ func (s *Sinkhole) MaintainThorough(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// MaintainRecordStates maintains records states in the database.
|
||||
func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
|
||||
func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue