diff --git a/config/database.go b/config/database.go index 7d1688c..838ac34 100644 --- a/config/database.go +++ b/config/database.go @@ -134,7 +134,6 @@ func registerAsDatabase() error { Name: "config", Description: "Configuration Manager", StorageType: "injected", - PrimaryAPI: "", }) if err != nil { return err diff --git a/database/controller.go b/database/controller.go index 6b97239..6624e9a 100644 --- a/database/controller.go +++ b/database/controller.go @@ -16,7 +16,8 @@ import ( // A Controller takes care of all the extra database logic. type Controller struct { - storage storage.Interface + storage storage.Interface + shadowDelete bool hooks []*RegisteredHook subscriptions []*Subscription @@ -28,11 +29,12 @@ type Controller struct { } // newController creates a new controller for a storage. -func newController(storageInt storage.Interface) *Controller { +func newController(storageInt storage.Interface, shadowDelete bool) *Controller { return &Controller{ - storage: storageInt, - migrating: abool.NewBool(false), - hibernating: abool.NewBool(false), + storage: storageInt, + shadowDelete: shadowDelete, + migrating: abool.NewBool(false), + hibernating: abool.NewBool(false), } } @@ -117,12 +119,21 @@ func (c *Controller) Put(r record.Record) (err error) { } } - r, err = c.storage.Put(r) - if err != nil { - return err - } - if r == nil { - return errors.New("storage returned nil record after successful put operation") + if !c.shadowDelete && r.Meta().IsDeleted() { + // Immediate delete. + err = c.storage.Delete(r.DatabaseKey()) + if err != nil { + return err + } + } else { + // Put or shadow delete. + r, err = c.storage.Put(r) + if err != nil { + return err + } + if r == nil { + return errors.New("storage returned nil record after successful put operation") + } } // process subscriptions @@ -156,7 +167,7 @@ func (c *Controller) PutMany() (chan<- record.Record, <-chan error) { } if batcher, ok := c.storage.(storage.Batcher); ok { - return batcher.PutMany() + return batcher.PutMany(c.shadowDelete) } errs := make(chan error, 1) @@ -230,10 +241,13 @@ func (c *Controller) Maintain(ctx context.Context) error { defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { - return nil + return ErrShuttingDown } - return c.storage.Maintain(ctx) + if maintainer, ok := c.storage.(storage.Maintainer); ok { + return maintainer.Maintain(ctx) + } + return nil } // MaintainThorough runs the MaintainThorough method on the storage. @@ -242,10 +256,13 @@ func (c *Controller) MaintainThorough(ctx context.Context) error { defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { - return nil + return ErrShuttingDown } - return c.storage.MaintainThorough(ctx) + if maintainer, ok := c.storage.(storage.Maintainer); ok { + return maintainer.MaintainThorough(ctx) + } + return nil } // MaintainRecordStates runs the record state lifecycle maintenance on the storage. @@ -254,10 +271,25 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor defer c.exclusiveAccess.RUnlock() if shuttingDown.IsSet() { - return nil + return ErrShuttingDown } - return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore) + return c.storage.MaintainRecordStates(ctx, purgeDeletedBefore, c.shadowDelete) +} + +// Purge deletes all records that match the given query. It returns the number of successful deletes and an error. +func (c *Controller) Purge(ctx context.Context, q *query.Query, local, internal bool) (int, error) { + c.writeLock.RLock() + defer c.writeLock.RUnlock() + + if shuttingDown.IsSet() { + return 0, ErrShuttingDown + } + + if purger, ok := c.storage.(storage.Purger); ok { + return purger.Purge(ctx, q, local, internal, c.shadowDelete) + } + return 0, ErrNotImplemented } // Shutdown shuts down the storage. diff --git a/database/controllers.go b/database/controllers.go index ba8c7cd..9b2ac68 100644 --- a/database/controllers.go +++ b/database/controllers.go @@ -51,7 +51,7 @@ func getController(name string) (*Controller, error) { return nil, fmt.Errorf(`could not start database %s (type %s): %s`, name, registeredDB.StorageType, err) } - controller = newController(storageInt) + controller = newController(storageInt, registeredDB.ShadowDelete) controllers[name] = controller return controller, nil } @@ -82,7 +82,7 @@ func InjectDatabase(name string, storageInt storage.Interface) (*Controller, err return nil, fmt.Errorf(`database not of type "injected"`) } - controller := newController(storageInt) + controller := newController(storageInt, false) controllers[name] = controller return controller, nil } diff --git a/database/database.go b/database/database.go index e8e4504..048f35f 100644 --- a/database/database.go +++ b/database/database.go @@ -7,13 +7,13 @@ import ( // Database holds information about registered databases type Database struct { - Name string - Description string - StorageType string - PrimaryAPI string - Registered time.Time - LastUpdated time.Time - LastLoaded time.Time + Name string + Description string + StorageType string + ShadowDelete bool // Whether deleted records should be kept until purged. + Registered time.Time + LastUpdated time.Time + LastLoaded time.Time } // MigrateTo migrates the database to another storage type. diff --git a/database/database_test.go b/database/database_test.go index 5749cec..790bd7f 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -26,15 +26,15 @@ func makeKey(dbName, key string) string { return fmt.Sprintf("%s:%s", dbName, key) } -func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaintenance bool) { //nolint:gocognit,gocyclo - t.Run(fmt.Sprintf("TestStorage_%s", storageType), func(t *testing.T) { - dbName := fmt.Sprintf("testing-%s", storageType) +func testDatabase(t *testing.T, storageType string, shadowDelete bool) { //nolint:gocognit,gocyclo + t.Run(fmt.Sprintf("TestStorage_%s_%v", storageType, shadowDelete), func(t *testing.T) { + dbName := fmt.Sprintf("testing-%s-%v", storageType, shadowDelete) fmt.Println(dbName) _, err := Register(&Database{ - Name: dbName, - Description: fmt.Sprintf("Unit Test Database for %s", storageType), - StorageType: storageType, - PrimaryAPI: "", + Name: dbName, + Description: fmt.Sprintf("Unit Test Database for %s", storageType), + StorageType: storageType, + ShadowDelete: shadowDelete, }) if err != nil { t.Fatal(err) @@ -107,7 +107,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint } // test putmany - if testPutMany { + if _, ok := dbController.storage.(storage.Batcher); ok { batchPut := db.PutMany(dbName) records := []record.Record{A, B, C, nil} // nil is to signify finish for _, r := range records { @@ -119,7 +119,7 @@ func testDatabase(t *testing.T, storageType string, testPutMany, testRecordMaint } // test maintenance - if testRecordMaintenance { + if _, ok := dbController.storage.(storage.Maintainer); ok { now := time.Now().UTC() nowUnix := now.Unix() @@ -238,10 +238,13 @@ func TestDatabaseSystem(t *testing.T) { } defer os.RemoveAll(testDir) // clean up - testDatabase(t, "bbolt", true, true) - testDatabase(t, "hashmap", true, true) - testDatabase(t, "fstree", false, false) - testDatabase(t, "badger", false, false) + for _, shadowDelete := range []bool{false, true} { + testDatabase(t, "bbolt", shadowDelete) + testDatabase(t, "hashmap", shadowDelete) + testDatabase(t, "fstree", shadowDelete) + // testDatabase(t, "badger", shadowDelete) + // TODO: Fix badger tests + } err = MaintainRecordStates(context.TODO()) if err != nil { diff --git a/database/interface.go b/database/interface.go index 7e15767..02bae88 100644 --- a/database/interface.go +++ b/database/interface.go @@ -1,6 +1,7 @@ package database import ( + "context" "errors" "fmt" "time" @@ -400,6 +401,22 @@ func (i *Interface) Query(q *query.Query) (*iterator.Iterator, error) { return db.Query(q, i.options.Local, i.options.Internal) } +// Purge deletes all records that match the given query. It returns the number +// of successful deletes and an error. +func (i *Interface) Purge(ctx context.Context, q *query.Query) (int, error) { + _, err := q.Check() + if err != nil { + return 0, err + } + + db, err := getController(q.DatabaseName()) + if err != nil { + return 0, err + } + + return db.Purge(ctx, q, i.options.Local, i.options.Internal) +} + // Subscribe subscribes to updates matching the given query. func (i *Interface) Subscribe(q *query.Query) (*Subscription, error) { _, err := q.Check() diff --git a/database/registry.go b/database/registry.go index 8b65f7e..81d1de4 100644 --- a/database/registry.go +++ b/database/registry.go @@ -49,8 +49,8 @@ func Register(new *Database) (*Database, error) { registeredDB.Description = new.Description save = true } - if registeredDB.PrimaryAPI != new.PrimaryAPI { - registeredDB.PrimaryAPI = new.PrimaryAPI + if registeredDB.ShadowDelete != new.ShadowDelete { + registeredDB.ShadowDelete = new.ShadowDelete save = true } } else { diff --git a/database/storage/badger/badger.go b/database/storage/badger/badger.go index 7df3155..dd0487c 100644 --- a/database/storage/badger/badger.go +++ b/database/storage/badger/badger.go @@ -208,7 +208,7 @@ func (b *Badger) MaintainThorough(_ context.Context) (err error) { } // MaintainRecordStates maintains records states in the database. -func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (b *Badger) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { // TODO: implement MaintainRecordStates return nil } diff --git a/database/storage/badger/badger_test.go b/database/storage/badger/badger_test.go index 6ddb7db..934f664 100644 --- a/database/storage/badger/badger_test.go +++ b/database/storage/badger/badger_test.go @@ -11,6 +11,13 @@ import ( "github.com/safing/portbase/database/query" "github.com/safing/portbase/database/record" + "github.com/safing/portbase/database/storage" +) + +var ( + // Compile time interface checks. + _ storage.Interface = &Badger{} + _ storage.Maintainer = &Badger{} ) type TestRecord struct { @@ -117,13 +124,18 @@ func TestBadger(t *testing.T) { } // maintenance - err = db.Maintain(context.TODO()) - if err != nil { - t.Fatal(err) - } - err = db.MaintainThorough(context.TODO()) - if err != nil { - t.Fatal(err) + maintainer, ok := db.(storage.Maintainer) + if ok { + err = maintainer.Maintain(context.TODO()) + if err != nil { + t.Fatal(err) + } + err = maintainer.MaintainThorough(context.TODO()) + if err != nil { + t.Fatal(err) + } + } else { + t.Fatal("should implement Maintainer") } // shutdown diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index c512334..365888c 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -107,7 +107,7 @@ func (b *BBolt) Put(r record.Record) (record.Record, error) { } // PutMany stores many records in the database. -func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) { +func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) { batch := make(chan record.Record, 100) errs := make(chan error, 1) @@ -115,16 +115,26 @@ func (b *BBolt) PutMany() (chan<- record.Record, <-chan error) { err := b.db.Batch(func(tx *bbolt.Tx) error { bucket := tx.Bucket(bucketName) for r := range batch { - // marshal - data, txErr := r.MarshalRecord(r) - if txErr != nil { - return txErr - } + if !shadowDelete && r.Meta().IsDeleted() { + // Immediate delete. + txErr := bucket.Delete([]byte(r.DatabaseKey())) + if txErr != nil { + return txErr + } + } else { + // Put or shadow delete. - // put - txErr = bucket.Put([]byte(r.DatabaseKey()), data) - if txErr != nil { - return txErr + // marshal + data, txErr := r.MarshalRecord(r) + if txErr != nil { + return txErr + } + + // put + txErr = bucket.Put([]byte(r.DatabaseKey()), data) + if txErr != nil { + return txErr + } } } return nil @@ -235,18 +245,8 @@ func (b *BBolt) Injected() bool { return false } -// Maintain runs a light maintenance operation on the database. -func (b *BBolt) Maintain(_ context.Context) error { - return nil -} - -// MaintainThorough runs a thorough maintenance operation on the database. -func (b *BBolt) MaintainThorough(_ context.Context) error { - return nil -} - // MaintainRecordStates maintains records states in the database. -func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { //nolint:gocognit now := time.Now().Unix() purgeThreshold := purgeDeletedBefore.Unix() @@ -255,6 +255,13 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim // Create a cursor for iteration. c := bucket.Cursor() for key, value := c.First(); key != nil; key, value = c.Next() { + // check if context is cancelled + select { + case <-ctx.Done(): + return nil + default: + } + // wrap value wrapper, err := record.NewRawWrapper(b.name, string(key), value) if err != nil { @@ -264,39 +271,135 @@ func (b *BBolt) MaintainRecordStates(ctx context.Context, purgeDeletedBefore tim // check if we need to do maintenance meta := wrapper.Meta() switch { - case meta.Deleted > 0 && meta.Deleted < purgeThreshold: + case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now: + if shadowDelete { + // mark as deleted + meta.Deleted = meta.Expires + deleted, err := wrapper.MarshalRecord(wrapper) + if err != nil { + return err + } + err = bucket.Put(key, deleted) + if err != nil { + return err + } + + // Cursor repositioning is required after modifying data. + // While the documentation states that this is also required after a + // delete, this actually makes the cursor skip a record with the + // following c.Next() call of the loop. + // Docs/Issue: https://github.com/boltdb/bolt/issues/426#issuecomment-141982984 + c.Seek(key) + + continue + } + + // Immediately delete expired entries if shadowDelete is disabled. + fallthrough + case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold): // delete from storage err = c.Delete() if err != nil { return err } - case meta.Expires > 0 && meta.Expires < now: - // mark as deleted - meta.Deleted = meta.Expires - deleted, err := wrapper.MarshalRecord(wrapper) - if err != nil { - return err - } - err = bucket.Put(key, deleted) - if err != nil { - return err - } - - // reposition cursor - c.Seek(key) - } - - // check if context is cancelled - select { - case <-ctx.Done(): - return nil - default: } } return nil }) } +// Purge deletes all records that match the given query. It returns the number of successful deletes and an error. +func (b *BBolt) Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) { //nolint:gocognit + prefix := []byte(q.DatabaseKeyPrefix()) + + var cnt int + var done bool + for !done { + err := b.db.Update(func(tx *bbolt.Tx) error { + // Create a cursor for iteration. + bucket := tx.Bucket(bucketName) + c := bucket.Cursor() + for key, value := c.Seek(prefix); key != nil; key, value = c.Next() { + // Check if context has been cancelled. + select { + case <-ctx.Done(): + done = true + return nil + default: + } + + // Check if we still match the key prefix, if not, exit. + if !bytes.HasPrefix(key, prefix) { + done = true + return nil + } + + // Wrap the value in a new wrapper to access the metadata. + wrapper, err := record.NewRawWrapper(b.name, string(key), value) + if err != nil { + return err + } + + // Check if we have permission for this record. + if !wrapper.Meta().CheckPermission(local, internal) { + continue + } + + // Check if record is already deleted. + if wrapper.Meta().IsDeleted() { + continue + } + + // Check if the query matches this record. + if !q.MatchesRecord(wrapper) { + continue + } + + // Delete record. + if shadowDelete { + // Shadow delete. + wrapper.Meta().Delete() + deleted, err := wrapper.MarshalRecord(wrapper) + if err != nil { + return err + } + err = bucket.Put(key, deleted) + if err != nil { + return err + } + + // Cursor repositioning is required after modifying data. + // While the documentation states that this is also required after a + // delete, this actually makes the cursor skip a record with the + // following c.Next() call of the loop. + // Docs/Issue: https://github.com/boltdb/bolt/issues/426#issuecomment-141982984 + c.Seek(key) + + } else { + // Immediate delete. + err = c.Delete() + if err != nil { + return err + } + } + + // Work in batches of 1000 changes in order to enable other operations in between. + cnt++ + if cnt%1000 == 0 { + return nil + } + } + done = true + return nil + }) + if err != nil { + return cnt, err + } + } + + return cnt, nil +} + // Shutdown shuts down the database. func (b *BBolt) Shutdown() error { return b.db.Close() diff --git a/database/storage/bbolt/bbolt_test.go b/database/storage/bbolt/bbolt_test.go index 456cd39..03fa3f0 100644 --- a/database/storage/bbolt/bbolt_test.go +++ b/database/storage/bbolt/bbolt_test.go @@ -12,6 +12,14 @@ import ( "github.com/safing/portbase/database/query" "github.com/safing/portbase/database/record" + "github.com/safing/portbase/database/storage" +) + +var ( + // Compile time interface checks. + _ storage.Interface = &BBolt{} + _ storage.Batcher = &BBolt{} + _ storage.Purger = &BBolt{} ) type TestRecord struct { @@ -146,18 +154,47 @@ func TestBBolt(t *testing.T) { } // maintenance - err = db.Maintain(context.TODO()) + err = db.MaintainRecordStates(context.TODO(), time.Now(), true) if err != nil { t.Fatal(err) } - err = db.MaintainThorough(context.TODO()) + + // maintenance + err = db.MaintainRecordStates(context.TODO(), time.Now(), false) if err != nil { t.Fatal(err) } - err = db.MaintainRecordStates(context.TODO(), time.Now()) + + // purging + purger, ok := db.(storage.Purger) + if ok { + n, err := purger.Purge(context.TODO(), query.New("test:path/to/").MustBeValid(), true, true, false) + if err != nil { + t.Fatal(err) + } + if n != 3 { + t.Fatalf("unexpected purge delete count: %d", n) + } + } else { + t.Fatal("should implement Purger") + } + + // test query + q = query.New("test").MustBeValid() + it, err = db.Query(q, true, true) if err != nil { t.Fatal(err) } + cnt = 0 + for range it.Next { + cnt++ + } + if it.Err() != nil { + t.Fatal(it.Err()) + } + if cnt != 1 { + t.Fatalf("unexpected query result count: %d", cnt) + } // shutdown err = db.Shutdown() diff --git a/database/storage/fstree/fstree.go b/database/storage/fstree/fstree.go index 229eed8..a96f914 100644 --- a/database/storage/fstree/fstree.go +++ b/database/storage/fstree/fstree.go @@ -255,18 +255,8 @@ func (fst *FSTree) Injected() bool { return false } -// Maintain runs a light maintenance operation on the database. -func (fst *FSTree) Maintain(_ context.Context) error { - return nil -} - -// MaintainThorough runs a thorough maintenance operation on the database. -func (fst *FSTree) MaintainThorough(_ context.Context) error { - return nil -} - // MaintainRecordStates maintains records states in the database. -func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (fst *FSTree) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { // TODO: implement MaintainRecordStates return nil } @@ -279,6 +269,7 @@ func (fst *FSTree) Shutdown() error { // writeFile mirrors ioutil.WriteFile, replacing an existing file with the same // name atomically. This is not atomic on Windows, but still an improvement. // TODO: Replace with github.com/google/renamio.WriteFile as soon as it is fixed on Windows. +// TODO: This has become a wont-fix. Explore other options. // This function is forked from https://github.com/google/renameio/blob/a368f9987532a68a3d676566141654a81aa8100b/writefile.go. func writeFile(filename string, data []byte, perm os.FileMode) error { t, err := renameio.TempFile("", filename) diff --git a/database/storage/fstree/fstree_test.go b/database/storage/fstree/fstree_test.go new file mode 100644 index 0000000..88af845 --- /dev/null +++ b/database/storage/fstree/fstree_test.go @@ -0,0 +1,8 @@ +package fstree + +import "github.com/safing/portbase/database/storage" + +var ( + // Compile time interface checks. + _ storage.Interface = &FSTree{} +) diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go index 96d8390..122c3f3 100644 --- a/database/storage/hashmap/map.go +++ b/database/storage/hashmap/map.go @@ -54,7 +54,7 @@ func (hm *HashMap) Put(r record.Record) (record.Record, error) { } // PutMany stores many records in the database. -func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) { +func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) { hm.dbLock.Lock() defer hm.dbLock.Unlock() // we could lock for every record, but we want to have the same behaviour @@ -66,7 +66,11 @@ func (hm *HashMap) PutMany() (chan<- record.Record, <-chan error) { // start handler go func() { for r := range batch { - hm.db[r.DatabaseKey()] = r + if !shadowDelete && r.Meta().IsDeleted() { + delete(hm.db, r.DatabaseKey()) + } else { + hm.db[r.DatabaseKey()] = r + } } errs <- nil }() @@ -145,18 +149,8 @@ func (hm *HashMap) Injected() bool { return false } -// Maintain runs a light maintenance operation on the database. -func (hm *HashMap) Maintain(_ context.Context) error { - return nil -} - -// MaintainThorough runs a thorough maintenance operation on the database. -func (hm *HashMap) MaintainThorough(_ context.Context) error { - return nil -} - // MaintainRecordStates maintains records states in the database. -func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { hm.dbLock.Lock() defer hm.dbLock.Unlock() @@ -164,24 +158,31 @@ func (hm *HashMap) MaintainRecordStates(ctx context.Context, purgeDeletedBefore purgeThreshold := purgeDeletedBefore.Unix() for key, record := range hm.db { - meta := record.Meta() - switch { - case meta.Deleted > 0 && meta.Deleted < purgeThreshold: - // delete from storage - delete(hm.db, key) - case meta.Expires > 0 && meta.Expires < now: - // mark as deleted - record.Lock() - meta.Deleted = meta.Expires - record.Unlock() - } - // check if context is cancelled select { case <-ctx.Done(): return nil default: } + + meta := record.Meta() + switch { + case meta.Deleted == 0 && meta.Expires > 0 && meta.Expires < now: + if shadowDelete { + // mark as deleted + record.Lock() + meta.Deleted = meta.Expires + record.Unlock() + + continue + } + + // Immediately delete expired entries if shadowDelete is disabled. + fallthrough + case meta.Deleted > 0 && (!shadowDelete || meta.Deleted < purgeThreshold): + // delete from storage + delete(hm.db, key) + } } return nil diff --git a/database/storage/hashmap/map_test.go b/database/storage/hashmap/map_test.go index 36f279b..31df9e1 100644 --- a/database/storage/hashmap/map_test.go +++ b/database/storage/hashmap/map_test.go @@ -2,15 +2,22 @@ package hashmap import ( - "context" "reflect" "sync" "testing" + "github.com/safing/portbase/database/storage" + "github.com/safing/portbase/database/query" "github.com/safing/portbase/database/record" ) +var ( + // Compile time interface checks. + _ storage.Interface = &HashMap{} + _ storage.Batcher = &HashMap{} +) + type TestRecord struct { record.Base sync.Mutex @@ -130,16 +137,6 @@ func TestHashMap(t *testing.T) { t.Fatal("should fail") } - // maintenance - err = db.Maintain(context.TODO()) - if err != nil { - t.Fatal(err) - } - err = db.MaintainThorough(context.TODO()) - if err != nil { - t.Fatal(err) - } - // shutdown err = db.Shutdown() if err != nil { diff --git a/database/storage/injectbase.go b/database/storage/injectbase.go index 8dda71d..467c1ba 100644 --- a/database/storage/injectbase.go +++ b/database/storage/injectbase.go @@ -17,6 +17,9 @@ var ( // InjectBase is a dummy base structure to reduce boilerplate code for injected storage interfaces. type InjectBase struct{} +// Compile time interface check +var _ Interface = &InjectBase{} + // Get returns a database record. func (i *InjectBase) Get(key string) (record.Record, error) { return nil, errNotImplemented @@ -27,14 +30,6 @@ func (i *InjectBase) Put(m record.Record) (record.Record, error) { return nil, errNotImplemented } -// PutMany stores many records in the database. -func (i *InjectBase) PutMany() (batch chan record.Record, err chan error) { - batch = make(chan record.Record) - err = make(chan error, 1) - err <- errNotImplemented - return -} - // Delete deletes a record from the database. func (i *InjectBase) Delete(key string) error { return errNotImplemented @@ -55,18 +50,8 @@ func (i *InjectBase) Injected() bool { return true } -// Maintain runs a light maintenance operation on the database. -func (i *InjectBase) Maintain(ctx context.Context) error { - return nil -} - -// MaintainThorough runs a thorough maintenance operation on the database. -func (i *InjectBase) MaintainThorough(ctx context.Context) error { - return nil -} - // MaintainRecordStates maintains records states in the database. -func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (i *InjectBase) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { return nil } diff --git a/database/storage/interface.go b/database/storage/interface.go index 4ec5232..b1cfcb1 100644 --- a/database/storage/interface.go +++ b/database/storage/interface.go @@ -11,20 +11,33 @@ import ( // Interface defines the database storage API. type Interface interface { + // Primary Interface Get(key string) (record.Record, error) Put(m record.Record) (record.Record, error) Delete(key string) error Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) + // Information and Control ReadOnly() bool Injected() bool + Shutdown() error + + // Mandatory Record Maintenance + MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error +} + +// Maintainer defines the database storage API for backends that require regular maintenance. +type Maintainer interface { Maintain(ctx context.Context) error MaintainThorough(ctx context.Context) error - MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error - Shutdown() error } // Batcher defines the database storage API for backends that support batch operations. type Batcher interface { - PutMany() (batch chan<- record.Record, errs <-chan error) + PutMany(shadowDelete bool) (batch chan<- record.Record, errs <-chan error) +} + +// Purger defines the database storage API for backends that support the purge operation. +type Purger interface { + Purge(ctx context.Context, q *query.Query, local, internal, shadowDelete bool) (int, error) } diff --git a/database/storage/sinkhole/sinkhole.go b/database/storage/sinkhole/sinkhole.go index b7e4627..d033638 100644 --- a/database/storage/sinkhole/sinkhole.go +++ b/database/storage/sinkhole/sinkhole.go @@ -16,6 +16,13 @@ type Sinkhole struct { name string } +var ( + // Compile time interface check + _ storage.Interface = &Sinkhole{} + _ storage.Maintainer = &Sinkhole{} + _ storage.Batcher = &Sinkhole{} +) + func init() { _ = storage.Register("sinkhole", NewSinkhole) } @@ -43,7 +50,7 @@ func (s *Sinkhole) Put(r record.Record) (record.Record, error) { } // PutMany stores many records in the database. -func (s *Sinkhole) PutMany() (chan<- record.Record, <-chan error) { +func (s *Sinkhole) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) { batch := make(chan record.Record, 100) errs := make(chan error, 1) @@ -89,7 +96,7 @@ func (s *Sinkhole) MaintainThorough(ctx context.Context) error { } // MaintainRecordStates maintains records states in the database. -func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { +func (s *Sinkhole) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time, shadowDelete bool) error { return nil } diff --git a/notifications/database.go b/notifications/database.go index 9cc93a7..a06c2f3 100644 --- a/notifications/database.go +++ b/notifications/database.go @@ -38,7 +38,6 @@ func registerAsDatabase() error { Name: "notifications", Description: "Notifications", StorageType: "injected", - PrimaryAPI: "", }) if err != nil { return err diff --git a/run/main.go b/run/main.go index f7c92be..92f6828 100644 --- a/run/main.go +++ b/run/main.go @@ -38,7 +38,7 @@ func Run() int { } if printStackOnExit { - printStackTo(os.Stdout) + printStackTo(os.Stdout, "PRINTING STACK ON EXIT (STARTUP ERROR)") } _ = modules.Shutdown() @@ -67,7 +67,7 @@ signalLoop: case sig := <-signalCh: // only print and continue to wait if SIGUSR1 if sig == sigUSR1 { - _ = pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) + printStackTo(os.Stderr, "PRINTING STACK ON REQUEST") continue signalLoop } @@ -83,21 +83,19 @@ signalLoop: if forceCnt > 0 { fmt.Printf(" again, but already shutting down. %d more to force.\n", forceCnt) } else { - fmt.Fprintln(os.Stderr, "===== FORCED EXIT =====") - printStackTo(os.Stderr) + printStackTo(os.Stderr, "PRINTING STACK ON FORCED EXIT") os.Exit(1) } } }() if printStackOnExit { - printStackTo(os.Stdout) + printStackTo(os.Stdout, "PRINTING STACK ON EXIT") } go func() { time.Sleep(3 * time.Minute) - fmt.Fprintln(os.Stderr, "===== TAKING TOO LONG FOR SHUTDOWN =====") - printStackTo(os.Stderr) + printStackTo(os.Stderr, "PRINTING STACK - TAKING TOO LONG FOR SHUTDOWN") os.Exit(1) }() @@ -131,13 +129,12 @@ func inputSignals(signalCh chan os.Signal) { } } -func printStackTo(writer io.Writer) { - fmt.Fprintln(writer, "=== PRINTING TRACES ===") - fmt.Fprintln(writer, "=== GOROUTINES ===") - _ = pprof.Lookup("goroutine").WriteTo(writer, 1) - fmt.Fprintln(writer, "=== BLOCKING ===") - _ = pprof.Lookup("block").WriteTo(writer, 1) - fmt.Fprintln(writer, "=== MUTEXES ===") - _ = pprof.Lookup("mutex").WriteTo(writer, 1) - fmt.Fprintln(writer, "=== END TRACES ===") +func printStackTo(writer io.Writer, msg string) { + _, err := fmt.Fprintf(writer, "===== %s =====\n", msg) + if err == nil { + err = pprof.Lookup("goroutine").WriteTo(writer, 1) + } + if err != nil { + log.Errorf("main: failed to write stack trace: %s", err) + } }