diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go new file mode 100644 index 0000000..fcb9b56 --- /dev/null +++ b/database/storage/bbolt/bbolt.go @@ -0,0 +1,221 @@ +package bbolt + +import ( + "bytes" + "errors" + "fmt" + "path/filepath" + "time" + + "go.etcd.io/bbolt" + + "github.com/Safing/portbase/database/iterator" + "github.com/Safing/portbase/database/query" + "github.com/Safing/portbase/database/record" + "github.com/Safing/portbase/database/storage" +) + +var ( + bucketName = []byte{0} +) + +// BBolt database made pluggable for portbase. +type BBolt struct { + name string + db *bbolt.DB +} + +func init() { + storage.Register("bbolt", NewBBolt) +} + +// NewBBolt opens/creates a bbolt database. +func NewBBolt(name, location string) (storage.Interface, error) { + + db, err := bbolt.Open(filepath.Join(location, "db.bbolt"), 0600, nil) + if err != nil { + return nil, err + } + + // Create bucket + err = db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bucketName) + if err != nil { + return err + } + return nil + }) + if err != nil { + return nil, err + } + + return &BBolt{ + name: name, + db: db, + }, nil +} + +// Get returns a database record. +func (b *BBolt) Get(key string) (record.Record, error) { + var r record.Record + + err := b.db.View(func(tx *bbolt.Tx) error { + // get value from db + value := tx.Bucket(bucketName).Get([]byte(key)) + if value == nil { + return storage.ErrNotFound + } + + // copy data + duplicate := make([]byte, len(value)) + copy(duplicate, value) + + // create record + var txErr error + r, txErr = record.NewRawWrapper(b.name, key, duplicate) + if txErr != nil { + return txErr + } + return nil + }) + + if err != nil { + return nil, err + } + return r, nil +} + +// Put stores a record in the database. +func (b *BBolt) Put(r record.Record) error { + data, err := r.MarshalRecord(r) + if err != nil { + return err + } + + err = b.db.Update(func(tx *bbolt.Tx) error { + txErr := tx.Bucket(bucketName).Put([]byte(r.DatabaseKey()), data) + if txErr != nil { + return txErr + } + return nil + }) + if err != nil { + return err + } + return nil +} + +// Delete deletes a record from the database. +func (b *BBolt) Delete(key string) error { + err := b.db.Update(func(tx *bbolt.Tx) error { + txErr := tx.Bucket(bucketName).Delete([]byte(key)) + if txErr != nil { + return txErr + } + return nil + }) + if err != nil { + return err + } + return nil +} + +// Query returns a an iterator for the supplied query. +func (b *BBolt) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { + _, err := q.Check() + if err != nil { + return nil, fmt.Errorf("invalid query: %s", err) + } + + queryIter := iterator.New() + + go b.queryExecutor(queryIter, q, local, internal) + return queryIter, nil +} + +func (b *BBolt) queryExecutor(queryIter *iterator.Iterator, q *query.Query, local, internal bool) { + prefix := []byte(q.DatabaseKeyPrefix()) + err := b.db.View(func(tx *bbolt.Tx) error { + // Create a cursor for iteration. + c := tx.Bucket(bucketName).Cursor() + + // Iterate over items in sorted key order. This starts from the + // first key/value pair and updates the k/v variables to the + // next key/value on each iteration. + // + // The loop finishes at the end of the cursor when a nil key is returned. + for key, value := c.Seek(prefix); key != nil; key, value = c.Next() { + + // if we don't match the prefix anymore, exit + if !bytes.HasPrefix(key, prefix) { + return nil + } + + // wrap value + iterWrapper, err := record.NewRawWrapper(b.name, string(key), value) + if err != nil { + return err + } + + // check validity / access + if !iterWrapper.Meta().CheckValidity() { + continue + } + if !iterWrapper.Meta().CheckPermission(local, internal) { + continue + } + + // check if matches & send + if q.MatchesRecord(iterWrapper) { + // copy data + duplicate := make([]byte, len(value)) + copy(duplicate, value) + + new, err := record.NewRawWrapper(b.name, iterWrapper.DatabaseKey(), duplicate) + if err != nil { + return err + } + select { + case <-queryIter.Done: + return nil + case queryIter.Next <- new: + default: + select { + case <-queryIter.Done: + return nil + case queryIter.Next <- new: + case <-time.After(1 * time.Second): + return errors.New("query timeout") + } + } + } + } + return nil + }) + queryIter.Finish(err) +} + +// ReadOnly returns whether the database is read only. +func (b *BBolt) ReadOnly() bool { + return false +} + +// Injected returns whether the database is injected. +func (b *BBolt) Injected() bool { + return false +} + +// Maintain runs a light maintenance operation on the database. +func (b *BBolt) Maintain() error { + return nil +} + +// MaintainThorough runs a thorough maintenance operation on the database. +func (b *BBolt) MaintainThorough() (err error) { + return nil +} + +// Shutdown shuts down the database. +func (b *BBolt) Shutdown() error { + return b.db.Close() +} diff --git a/database/storage/bbolt/bbolt_test.go b/database/storage/bbolt/bbolt_test.go new file mode 100644 index 0000000..d19a17d --- /dev/null +++ b/database/storage/bbolt/bbolt_test.go @@ -0,0 +1,166 @@ +package bbolt + +import ( + "io/ioutil" + "os" + "reflect" + "sync" + "testing" + + "github.com/Safing/portbase/database/query" + "github.com/Safing/portbase/database/record" +) + +type TestRecord struct { + record.Base + lock sync.Mutex + S string + I int + I8 int8 + I16 int16 + I32 int32 + I64 int64 + UI uint + UI8 uint8 + UI16 uint16 + UI32 uint32 + UI64 uint64 + F32 float32 + F64 float64 + B bool +} + +func (tr *TestRecord) Lock() { +} + +func (tr *TestRecord) Unlock() { +} + +func TestBadger(t *testing.T) { + testDir, err := ioutil.TempDir("", "testing-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(testDir) // clean up + + // start + db, err := NewBBolt("test", testDir) + if err != nil { + t.Fatal(err) + } + + a := &TestRecord{ + S: "banana", + I: 42, + I8: 42, + I16: 42, + I32: 42, + I64: 42, + UI: 42, + UI8: 42, + UI16: 42, + UI32: 42, + UI64: 42, + F32: 42.42, + F64: 42.42, + B: true, + } + a.SetMeta(&record.Meta{}) + a.Meta().Update() + a.SetKey("test:A") + + // put record + err = db.Put(a) + if err != nil { + t.Fatal(err) + } + + // get and compare + r1, err := db.Get("A") + if err != nil { + t.Fatal(err) + } + + a1 := &TestRecord{} + err = record.Unwrap(r1, a1) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(a, a1) { + t.Fatalf("mismatch, got %v", a1) + } + + // setup query test records + qA := &TestRecord{} + qA.SetKey("test:path/to/A") + qA.CreateMeta() + qB := &TestRecord{} + qB.SetKey("test:path/to/B") + qB.CreateMeta() + qC := &TestRecord{} + qC.SetKey("test:path/to/C") + qC.CreateMeta() + qZ := &TestRecord{} + qZ.SetKey("test:z") + qZ.CreateMeta() + // put + err = db.Put(qA) + if err == nil { + err = db.Put(qB) + } + if err == nil { + err = db.Put(qC) + } + if err == nil { + err = db.Put(qZ) + } + if err != nil { + t.Fatal(err) + } + + // test query + q := query.New("test:path/to/").MustBeValid() + it, err := db.Query(q, true, true) + if err != nil { + t.Fatal(err) + } + cnt := 0 + for _ = range it.Next { + cnt++ + } + if it.Err() != nil { + t.Fatal(it.Err()) + } + if cnt != 3 { + t.Fatalf("unexpected query result count: %d", cnt) + } + + // delete + err = db.Delete("A") + if err != nil { + t.Fatal(err) + } + + // check if its gone + _, err = db.Get("A") + if err == nil { + t.Fatal("should fail") + } + + // maintenance + err = db.Maintain() + if err != nil { + t.Fatal(err) + } + err = db.MaintainThorough() + if err != nil { + t.Fatal(err) + } + + // shutdown + err = db.Shutdown() + if err != nil { + t.Fatal(err) + } +}