From 1c7f98d9bae9b9582a480b2a81daa853936341f7 Mon Sep 17 00:00:00 2001 From: Daniel Date: Wed, 12 Sep 2018 17:10:35 +0200 Subject: [PATCH] Work on tests and query --- database/controller.go | 8 +- database/database_test.go | 36 +++++++- database/dbmodule/maintenance.go | 14 ++-- database/maintainence.go | 36 -------- database/maintenance.go | 92 +++++++++++++++++++++ database/record/base.go | 8 +- database/record/meta.go | 4 +- database/record/record.go | 4 +- database/record/wrapper.go | 6 +- database/record/wrapper_test.go | 4 +- database/storage/badger/badger.go | 110 +++++++++++++++++++------ database/storage/badger/badger_test.go | 53 +++++++++++- database/storage/interface.go | 1 + 13 files changed, 288 insertions(+), 88 deletions(-) delete mode 100644 database/maintainence.go create mode 100644 database/maintenance.go diff --git a/database/controller.go b/database/controller.go index 2c4875f..e1325a4 100644 --- a/database/controller.go +++ b/database/controller.go @@ -2,7 +2,6 @@ package database import ( "sync" - "time" "github.com/tevino/abool" @@ -35,6 +34,11 @@ func (c *Controller) ReadOnly() bool { return c.storage.ReadOnly() } +// Injected returns whether the storage is injected. +func (c *Controller) Injected() bool { + return c.storage.Injected() +} + // Get return the record with the given key. func (c *Controller) Get(key string) (record.Record, error) { if shuttingDown.IsSet() { @@ -53,7 +57,7 @@ func (c *Controller) Get(key string) (record.Record, error) { r.Lock() defer r.Unlock() - if !r.Meta().CheckValidity(time.Now().Unix()) { + if !r.Meta().CheckValidity() { return nil, ErrNotFound } diff --git a/database/database_test.go b/database/database_test.go index bab7579..9cd679d 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -38,8 +38,8 @@ func (tr *TestRecord) Lock() { func (tr *TestRecord) Unlock() { } -func makeKey(storageType, key string) string { - return fmt.Sprintf("%s:%s", storageType, key) +func makeKey(dbName, key string) string { + return fmt.Sprintf("%s:%s", dbName, key) } func testDatabase(t *testing.T, storageType string) { @@ -56,13 +56,38 @@ func testDatabase(t *testing.T, storageType string) { db := NewInterface(nil) - new := &TestRecord{} + new := &TestRecord{ + S: "banana", + I: 42, + I8: 42, + I16: 42, + I32: 42, + I64: 42, + UI: 42, + UI8: 42, + UI16: 42, + UI32: 42, + UI64: 42, + F32: 42.42, + F64: 42.42, + B: true, + } + new.SetMeta(&record.Meta{}) + new.Meta().Update() new.SetKey(makeKey(dbName, "A")) err = db.Put(new) if err != nil { t.Fatal(err) } + exists, err := db.Exists(makeKey(dbName, "A")) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Fatalf("record %s should exist!", makeKey(dbName, "A")) + } + _, err = db.Get(makeKey(dbName, "A")) if err != nil { t.Fatal(err) @@ -92,6 +117,11 @@ func TestDatabaseSystem(t *testing.T) { testDatabase(t, "badger") + err = MaintainRecordStates() + if err != nil { + t.Fatal(err) + } + err = Maintain() if err != nil { t.Fatal(err) diff --git a/database/dbmodule/maintenance.go b/database/dbmodule/maintenance.go index df9a9eb..4bdd6fe 100644 --- a/database/dbmodule/maintenance.go +++ b/database/dbmodule/maintenance.go @@ -8,8 +8,8 @@ import ( ) func maintainer() { - ticker := time.NewTicker(1 * time.Hour) - tickerThorough := time.NewTicker(10 * time.Minute) + ticker := time.NewTicker(10 * time.Minute) + longTicker := time.NewTicker(1 * time.Hour) maintenanceWg.Add(1) for { @@ -19,10 +19,14 @@ func maintainer() { if err != nil { log.Errorf("database: maintenance error: %s", err) } - case <- ticker.C: - err := database.MaintainThorough() + case <- longTicker.C: + err := database.MaintainRecordStates() if err != nil { - log.Errorf("database: maintenance (thorough) error: %s", err) + log.Errorf("database: record states maintenance error: %s", err) + } + err = database.MaintainThorough() + if err != nil { + log.Errorf("database: thorough maintenance error: %s", err) } case <-shutdownSignal: maintenanceWg.Done() diff --git a/database/maintainence.go b/database/maintainence.go deleted file mode 100644 index 4029a69..0000000 --- a/database/maintainence.go +++ /dev/null @@ -1,36 +0,0 @@ -package database - -// Maintain runs the Maintain method on all storages. -func Maintain() (err error) { - controllers := duplicateControllers() - for _, c := range controllers { - err = c.Maintain() - if err != nil { - return - } - } - return -} - -// MaintainThorough runs the MaintainThorough method on all storages. -func MaintainThorough() (err error) { - all := duplicateControllers() - for _, c := range all { - err = c.MaintainThorough() - if err != nil { - return - } - } - return -} - -func duplicateControllers() (all []*Controller) { - controllersLock.Lock() - defer controllersLock.Unlock() - - for _, c := range controllers { - all = append(all, c) - } - - return -} diff --git a/database/maintenance.go b/database/maintenance.go new file mode 100644 index 0000000..d1399bd --- /dev/null +++ b/database/maintenance.go @@ -0,0 +1,92 @@ +package database + +import ( + "time" + + "github.com/Safing/portbase/database/query" + "github.com/Safing/portbase/database/record" +) + +// Maintain runs the Maintain method on all storages. +func Maintain() (err error) { + controllers := duplicateControllers() + for _, c := range controllers { + err = c.Maintain() + if err != nil { + return + } + } + return +} + +// MaintainThorough runs the MaintainThorough method on all storages. +func MaintainThorough() (err error) { + all := duplicateControllers() + for _, c := range all { + err = c.MaintainThorough() + if err != nil { + return + } + } + return +} + +// MaintainRecordStates runs record state lifecycle maintenance on all storages. +func MaintainRecordStates() error { + all := duplicateControllers() + now := time.Now().Unix() + thirtyDaysAgo := time.Now().Add(-30*24*time.Hour).Unix() + + for _, c := range all { + + if c.ReadOnly() || c.Injected() { + continue + } + + q, err := query.New("").Check() + if err != nil { + return err + } + + it, err := c.Query(q, true, true) + if err != nil { + return err + } + + var toDelete []record.Record + var toExpire []record.Record + + for r := range it.Next { + switch { + case r.Meta().Deleted < thirtyDaysAgo: + toDelete = append(toDelete, r) + case r.Meta().Expires < now: + toExpire = append(toExpire, r) + } + } + if it.Error != nil { + return err + } + + for _, r := range toDelete { + c.storage.Delete(r.DatabaseKey()) + } + for _, r := range toExpire { + r.Meta().Delete() + return c.Put(r) + } + + } + return nil +} + +func duplicateControllers() (all []*Controller) { + controllersLock.Lock() + defer controllersLock.Unlock() + + for _, c := range controllers { + all = append(all, c) + } + + return +} diff --git a/database/record/base.go b/database/record/base.go index c2966bb..44d9b54 100644 --- a/database/record/base.go +++ b/database/record/base.go @@ -52,7 +52,7 @@ func (b *Base) SetMeta(meta *Meta) { } // Marshal marshals the object, without the database key or metadata -func (b *Base) Marshal(format uint8) ([]byte, error) { +func (b *Base) Marshal(self Record, format uint8) ([]byte, error) { if b.Meta() == nil { return nil, errors.New("missing meta") } @@ -61,7 +61,7 @@ func (b *Base) Marshal(format uint8) ([]byte, error) { return nil, nil } - dumped, err := dsd.Dump(b, format) + dumped, err := dsd.Dump(self, format) if err != nil { return nil, err } @@ -69,7 +69,7 @@ func (b *Base) Marshal(format uint8) ([]byte, error) { } // MarshalRecord packs the object, including metadata, into a byte array for saving in a database. -func (b *Base) MarshalRecord() ([]byte, error) { +func (b *Base) MarshalRecord(self Record) ([]byte, error) { if b.Meta() == nil { return nil, errors.New("missing meta") } @@ -85,7 +85,7 @@ func (b *Base) MarshalRecord() ([]byte, error) { c.AppendAsBlock(metaSection) // data - dataSection, err := b.Marshal(dsd.JSON) + dataSection, err := b.Marshal(self, dsd.JSON) if err != nil { return nil, err } diff --git a/database/record/meta.go b/database/record/meta.go index d916816..a2c8bb4 100644 --- a/database/record/meta.go +++ b/database/record/meta.go @@ -79,11 +79,11 @@ func (m *Meta) Delete() { } // CheckValidity checks whether the database record is valid. -func (m *Meta) CheckValidity(now int64) (valid bool) { +func (m *Meta) CheckValidity() (valid bool) { switch { case m.Deleted > 0: return false - case m.Expires < now: + case m.Expires > 0 && m.Expires < time.Now().Unix(): return false default: return true diff --git a/database/record/record.go b/database/record/record.go index c40fca2..a09de4b 100644 --- a/database/record/record.go +++ b/database/record/record.go @@ -11,8 +11,8 @@ type Record interface { Meta() *Meta SetMeta(meta *Meta) - Marshal(format uint8) ([]byte, error) - MarshalRecord() ([]byte, error) + Marshal(r Record, format uint8) ([]byte, error) + MarshalRecord(r Record) ([]byte, error) Lock() Unlock() diff --git a/database/record/wrapper.go b/database/record/wrapper.go index d6352b7..6e7ef69 100644 --- a/database/record/wrapper.go +++ b/database/record/wrapper.go @@ -74,7 +74,7 @@ func NewWrapper(key string, meta *Meta, data []byte) (*Wrapper, error) { } // Marshal marshals the object, without the database key or metadata -func (w *Wrapper) Marshal(storageType uint8) ([]byte, error) { +func (w *Wrapper) Marshal(r Record, storageType uint8) ([]byte, error) { if w.Meta() == nil { return nil, errors.New("missing meta") } @@ -90,7 +90,7 @@ func (w *Wrapper) Marshal(storageType uint8) ([]byte, error) { } // MarshalRecord packs the object, including metadata, into a byte array for saving in a database. -func (w *Wrapper) MarshalRecord() ([]byte, error) { +func (w *Wrapper) MarshalRecord(r Record) ([]byte, error) { // Duplication necessary, as the version from Base would call Base.Marshal instead of Wrapper.Marshal if w.Meta() == nil { @@ -108,7 +108,7 @@ func (w *Wrapper) MarshalRecord() ([]byte, error) { c.AppendAsBlock(metaSection) // data - dataSection, err := w.Marshal(dsd.JSON) + dataSection, err := w.Marshal(r, dsd.JSON) if err != nil { return nil, err } diff --git a/database/record/wrapper_test.go b/database/record/wrapper_test.go index e725916..e2988f0 100644 --- a/database/record/wrapper_test.go +++ b/database/record/wrapper_test.go @@ -30,7 +30,7 @@ func TestWrapper(t *testing.T) { t.Error("data mismatch") } - encoded, err := wrapper.Marshal(dsd.JSON) + encoded, err := wrapper.Marshal(wrapper, dsd.JSON) if err != nil { t.Fatal(err) } @@ -39,7 +39,7 @@ func TestWrapper(t *testing.T) { } wrapper.SetMeta(&Meta{}) - raw, err := wrapper.MarshalRecord() + raw, err := wrapper.MarshalRecord(wrapper) if err != nil { t.Fatal(err) } diff --git a/database/storage/badger/badger.go b/database/storage/badger/badger.go index ebe849a..f685d90 100644 --- a/database/storage/badger/badger.go +++ b/database/storage/badger/badger.go @@ -2,9 +2,12 @@ package badger import ( "errors" + "fmt" + "time" "github.com/dgraph-io/badger" + "github.com/Safing/portbase/database/accessor" "github.com/Safing/portbase/database/iterator" "github.com/Safing/portbase/database/query" "github.com/Safing/portbase/database/record" @@ -38,24 +41,6 @@ func NewBadger(name, location string) (storage.Interface, error) { }, nil } -// Exists returns whether an entry with the given key exists. -func (b *Badger) Exists(key string) (bool, error) { - err := b.db.View(func(txn *badger.Txn) error { - _, err := txn.Get([]byte(key)) - if err != nil { - if err == badger.ErrKeyNotFound { - return nil - } - return err - } - return nil - }) - if err == nil { - return true, nil - } - return false, nil -} - // Get returns a database record. func (b *Badger) Get(key string) (record.Record, error) { var item *badger.Item @@ -75,9 +60,10 @@ func (b *Badger) Get(key string) (record.Record, error) { return nil, err } - if item.IsDeletedOrExpired() { - return nil, storage.ErrNotFound - } + // DO NOT check for this, as we got our own machanism for that. + // if item.IsDeletedOrExpired() { + // return nil, storage.ErrNotFound + // } data, err := item.ValueCopy(nil) if err != nil { @@ -92,14 +78,14 @@ func (b *Badger) Get(key string) (record.Record, error) { } // Put stores a record in the database. -func (b *Badger) Put(m record.Record) error { - data, err := m.MarshalRecord() +func (b *Badger) Put(r record.Record) error { + data, err := r.MarshalRecord(r) if err != nil { return err } err = b.db.Update(func(txn *badger.Txn) error { - return txn.Set([]byte(m.DatabaseKey()), data) + return txn.Set([]byte(r.DatabaseKey()), data) }) return err } @@ -117,7 +103,76 @@ func (b *Badger) Delete(key string) error { // Query returns a an iterator for the supplied query. func (b *Badger) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { - return nil, errors.New("query not implemented by badger") + _, err := q.Check() + if err != nil { + return nil, fmt.Errorf("invalid query: %s", err) + } + + queryIter := iterator.New() + + go b.queryExecutor(queryIter, q, local, internal) + return queryIter, nil +} + +func (b *Badger) queryExecutor(queryIter *iterator.Iterator, q *query.Query, local, internal bool) { + err := b.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + prefix := []byte(q.DatabaseKeyPrefix()) + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + + data, err := item.Value() + if err != nil { + return err + } + + r, err := record.NewRawWrapper(b.name, string(item.Key()), data) + if err != nil { + return err + } + + if !r.Meta().CheckValidity() { + continue + } + if !r.Meta().CheckPermission(local, internal) { + continue + } + + if len(r.Data) > 1 { + jsonData := r.Data[1:] + acc := accessor.NewJSONBytesAccessor(&jsonData) + if q.Matches(acc) { + + copiedData, err := item.ValueCopy(nil) + if err != nil { + return err + } + new, err := record.NewRawWrapper(b.name, string(item.Key()), copiedData) + if err != nil { + return err + } + select { + case queryIter.Next <- new: + default: + select { + case queryIter.Next <- new: + case <-time.After(1 * time.Minute): + return errors.New("query timeout") + } + } + + } + } + + } + return nil + }) + + if err != nil { + queryIter.Error = err + } + close(queryIter.Next) } // ReadOnly returns whether the database is read only. @@ -125,6 +180,11 @@ func (b *Badger) ReadOnly() bool { return false } +// Injected returns whether the database is injected. +func (b *Badger) Injected() bool { + return false +} + // Maintain runs a light maintenance operation on the database. func (b *Badger) Maintain() error { b.db.RunValueLogGC(0.7) diff --git a/database/storage/badger/badger_test.go b/database/storage/badger/badger_test.go index cc0f2f8..ee22fb2 100644 --- a/database/storage/badger/badger_test.go +++ b/database/storage/badger/badger_test.go @@ -3,6 +3,7 @@ package badger import ( "io/ioutil" "os" + "reflect" "sync" "testing" @@ -46,7 +47,22 @@ func TestBadger(t *testing.T) { t.Fatal(err) } - a := &TestRecord{S: "banana"} + a := &TestRecord{ + S: "banana", + I: 42, + I8: 42, + I16: 42, + I32: 42, + I64: 42, + UI: 42, + UI8: 42, + UI16: 42, + UI32: 42, + UI64: 42, + F32: 42.42, + F64: 42.42, + B: true, + } a.SetMeta(&record.Meta{}) a.Meta().Update() a.SetKey("test:A") @@ -61,9 +77,38 @@ func TestBadger(t *testing.T) { t.Fatal(err) } - a1 := r1.(*TestRecord) + a1 := &TestRecord{} + _, err = record.Unwrap(r1, a1) + if err != nil { + t.Fatal(err) + } - if a.S != a1.S { - t.Fatal("mismatch") + if !reflect.DeepEqual(a, a1) { + t.Fatalf("mismatch, got %v", a1) + } + + err = db.Delete("A") + if err != nil { + t.Fatal(err) + } + + _, err = db.Get("A") + if err == nil { + t.Fatal("should fail") + } + + err = db.Maintain() + if err != nil { + t.Fatal(err) + } + + err = db.MaintainThorough() + if err != nil { + t.Fatal(err) + } + + err = db.Shutdown() + if err != nil { + t.Fatal(err) } } diff --git a/database/storage/interface.go b/database/storage/interface.go index b5bd803..73b1a5f 100644 --- a/database/storage/interface.go +++ b/database/storage/interface.go @@ -14,6 +14,7 @@ type Interface interface { Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) ReadOnly() bool + Injected() bool Maintain() error MaintainThorough() error Shutdown() error