Start work on hooks and subscriptions

This commit is contained in:
Daniel 2018-09-13 21:54:20 +02:00
parent 1c7f98d9ba
commit 2c70416873
14 changed files with 482 additions and 177 deletions

View file

@ -0,0 +1,64 @@
package database
import (
"fmt"
"sync"
"github.com/Safing/portbase/database/record"
)
type Example struct {
record.Base
sync.Mutex
Name string
Score int
}
var (
exampleDB = NewInterface(nil)
)
// GetExample gets an Example from the database.
func GetExample(key string) (*Example, error) {
r, err := exampleDB.Get(key)
if err != nil {
return nil, err
}
// unwrap
if r.IsWrapped() {
// only allocate a new struct, if we need it
new := &Example{}
err = record.Unwrap(r, new)
if err != nil {
return nil, err
}
return new, nil
}
// or adjust type
new, ok := r.(*Example)
if !ok {
return nil, fmt.Errorf("record not of type *Example, but %T", r)
}
return new, nil
}
func (e *Example) Save() error {
return exampleDB.Put(e)
}
func (e *Example) SaveAs(key string) error {
e.SetKey(key)
return exampleDB.PutNew(e)
}
func NewExample(key, name string, score int) *Example {
new := &Example{
Name: name,
Score: score,
}
new.SetKey(key)
return new
}

View file

@ -14,6 +14,10 @@ import (
// A Controller takes care of all the extra database logic.
type Controller struct {
storage storage.Interface
hooks []*RegisteredHook
subscriptions []*Subscription
writeLock sync.RWMutex
readLock sync.RWMutex
migrating *abool.AtomicBool // TODO
@ -45,6 +49,19 @@ func (c *Controller) Get(key string) (record.Record, error) {
return nil, ErrShuttingDown
}
c.readLock.RLock()
defer c.readLock.RUnlock()
// process hooks
for _, hook := range c.hooks {
if hook.q.MatchesKey(key) {
err := hook.hook.PreGet(key)
if err != nil {
return nil, err
}
}
}
r, err := c.storage.Get(key)
if err != nil {
// replace not found error
@ -57,6 +74,16 @@ func (c *Controller) Get(key string) (record.Record, error) {
r.Lock()
defer r.Unlock()
// process hooks
for _, hook := range c.hooks {
if hook.q.Matches(r) {
r, err = hook.hook.PostGet(r)
if err != nil {
return nil, err
}
}
}
if !r.Meta().CheckValidity() {
return nil, ErrNotFound
}
@ -65,7 +92,7 @@ func (c *Controller) Get(key string) (record.Record, error) {
}
// Put saves a record in the database.
func (c *Controller) Put(r record.Record) error {
func (c *Controller) Put(r record.Record) (err error) {
if shuttingDown.IsSet() {
return ErrShuttingDown
}
@ -74,12 +101,40 @@ func (c *Controller) Put(r record.Record) error {
return ErrReadOnly
}
r.Lock()
defer r.Unlock()
// process hooks
for _, hook := range c.hooks {
if hook.q.Matches(r) {
r, err = hook.hook.PrePut(r)
if err != nil {
return err
}
}
}
if r.Meta() == nil {
r.SetMeta(&record.Meta{})
}
r.Meta().Update()
return c.storage.Put(r)
c.writeLock.RLock()
defer c.writeLock.RUnlock()
err = c.storage.Put(r)
if err != nil {
return err
}
// process hooks
for _, hook := range c.hooks {
if hook.q.Matches(r) {
hook.hook.PostPut(r)
}
}
return nil
}
// Query executes the given query on the database.
@ -87,20 +142,39 @@ func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iter
if shuttingDown.IsSet() {
return nil, ErrShuttingDown
}
return c.storage.Query(q, local, internal)
c.readLock.RLock()
it, err := c.storage.Query(q, local, internal)
if err != nil {
c.readLock.RUnlock()
return nil, err
}
go c.readUnlockerAfterQuery(it)
return it, nil
}
func (c *Controller) readUnlockerAfterQuery(it *iterator.Iterator) {
<- it.Done
c.readLock.RUnlock()
}
// Maintain runs the Maintain method no the storage.
func (c *Controller) Maintain() error {
c.writeLock.RLock()
defer c.writeLock.RUnlock()
return c.storage.Maintain()
}
// MaintainThorough runs the MaintainThorough method no the storage.
func (c *Controller) MaintainThorough() error {
c.writeLock.RLock()
defer c.writeLock.RUnlock()
return c.storage.MaintainThorough()
}
// Shutdown shuts down the storage.
func (c *Controller) Shutdown() error {
// TODO: should we wait for gets/puts/queries to complete?
return c.storage.Shutdown()
}

View file

@ -3,41 +3,17 @@ package database
import (
"fmt"
"io/ioutil"
"log"
"os"
"reflect"
"runtime/pprof"
"sync"
"testing"
"time"
"github.com/Safing/portbase/database/record"
q "github.com/Safing/portbase/database/query"
_ "github.com/Safing/portbase/database/storage/badger"
)
type TestRecord struct {
record.Base
lock sync.Mutex
S string
I int
I8 int8
I16 int16
I32 int32
I64 int64
UI uint
UI8 uint8
UI16 uint16
UI32 uint32
UI64 uint64
F32 float32
F64 float64
B bool
}
func (tr *TestRecord) Lock() {
}
func (tr *TestRecord) Unlock() {
}
func makeKey(dbName, key string) string {
return fmt.Sprintf("%s:%s", dbName, key)
}
@ -56,26 +32,20 @@ func testDatabase(t *testing.T, storageType string) {
db := NewInterface(nil)
new := &TestRecord{
S: "banana",
I: 42,
I8: 42,
I16: 42,
I32: 42,
I64: 42,
UI: 42,
UI8: 42,
UI16: 42,
UI32: 42,
UI64: 42,
F32: 42.42,
F64: 42.42,
B: true,
A := NewExample(makeKey(dbName, "A"), "Herbert", 411)
err = A.Save()
if err != nil {
t.Fatal(err)
}
new.SetMeta(&record.Meta{})
new.Meta().Update()
new.SetKey(makeKey(dbName, "A"))
err = db.Put(new)
B := NewExample(makeKey(dbName, "B"), "Fritz", 347)
err = B.Save()
if err != nil {
t.Fatal(err)
}
C := NewExample(makeKey(dbName, "C"), "Norbert", 217)
err = C.Save()
if err != nil {
t.Fatal(err)
}
@ -88,10 +58,40 @@ func testDatabase(t *testing.T, storageType string) {
t.Fatalf("record %s should exist!", makeKey(dbName, "A"))
}
_, err = db.Get(makeKey(dbName, "A"))
A1, err := GetExample(makeKey(dbName, "A"))
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(A, A1) {
log.Fatalf("A and A1 mismatch, A1: %v", A1)
}
query, err := q.New(dbName).Where(
q.And(
q.Where("Name", q.EndsWith, "bert"),
q.Where("Score", q.GreaterThan, 100),
),
).Check()
if err != nil {
t.Fatal(err)
}
it, err := db.Query(query)
if err != nil {
t.Fatal(err)
}
cnt := 0
for _ = range it.Next {
cnt++
}
if it.Error != nil {
t.Fatal(it.Error)
}
if cnt != 2 {
t.Fatal("expected two records")
}
}
func TestDatabaseSystem(t *testing.T) {

View file

@ -3,98 +3,63 @@
/*
Package database provides a universal interface for interacting with the database.
The Lazy Database
A Lazy Database
The database system can handle Go structs as well as serialized data by the dsd package.
While data is in transit within the system, it does not know which form it currently has. Only when it reaches its destination, it must ensure that it is either of a certain type or dump it.
Internals
Record Interface
The database system uses the Model interface to transparently handle all types of structs that get saved in the database. Structs include Base struct to fulfill most parts of the Model interface.
The database system uses the Record interface to transparently handle all types of structs that get saved in the database. Structs include the Base struct to fulfill most parts of the Record interface.
Boilerplate Code
Boilerplate Code:
Receiving model, using as struct:
type Example struct {
record.Base
sync.Mutex
// At some point, declare a pointer to your model.
// This is only used to identify the model, so you can reuse it safely for this purpose
var cowModel *Cow // only use this as parameter for database.EnsureModel-like functions
receivedModel := <- models // chan database.Model
cow, ok := database.SilentEnsureModel(receivedModel, cowModel).(*Cow)
if !ok {
panic("received model does not match expected model")
}
// more verbose, in case you need better error handling
receivedModel := <- models // chan database.Model
genericModel, err := database.EnsureModel(receivedModel, cowModel)
if err != nil {
panic(err)
}
cow, ok := genericModel.(*Cow)
if !ok {
panic("received model does not match expected model")
}
Receiving a model, dumping:
// receivedModel <- chan database.Model
bytes, err := database.DumpModel(receivedModel, dsd.JSON) // or other dsd format
if err != nil {
panic(err)
}
Model definition:
// Cow makes moo.
type Cow struct {
database.Base
// Fields...
}
var cowModel *Cow // only use this as parameter for database.EnsureModel-like functions
func init() {
database.RegisterModel(cowModel, func() database.Model { return new(Cow) })
}
// this all you need, but you might find the following code helpful:
var cowNamespace = datastore.NewKey("/Cow")
// Create saves Cow with the provided name in the default namespace.
func (m *Cow) Create(name string) error {
return m.CreateObject(&cowNamespace, name, m)
}
// CreateInNamespace saves Cow with the provided name in the provided namespace.
func (m *Cow) CreateInNamespace(namespace *datastore.Key, name string) error {
return m.CreateObject(namespace, name, m)
}
// Save saves Cow.
func (m *Cow) Save() error {
return m.SaveObject(m)
}
// GetCow fetches Cow with the provided name from the default namespace.
func GetCow(name string) (*Cow, error) {
return GetCowFromNamespace(&cowNamespace, name)
}
// GetCowFromNamespace fetches Cow with the provided name from the provided namespace.
func GetCowFromNamespace(namespace *datastore.Key, name string) (*Cow, error) {
object, err := database.GetAndEnsureModel(namespace, name, cowModel)
if err != nil {
return nil, err
Name string
Score int
}
model, ok := object.(*Cow)
if !ok {
return nil, database.NewMismatchError(object, cowModel)
var (
db = database.NewInterface(nil)
)
// GetExample gets an Example from the database.
func GetExample(key string) (*Example, error) {
r, err := db.Get(key)
if err != nil {
return nil, err
}
// unwrap
if r.IsWrapped() {
// only allocate a new struct, if we need it
new := &Example{}
err = record.Unwrap(r, new)
if err != nil {
return nil, err
}
return new, nil
}
// or adjust type
new, ok := r.(*Example)
if !ok {
return nil, fmt.Errorf("record not of type *Example, but %T", r)
}
return new, nil
}
func (e *Example) Save() error {
return db.Put(e)
}
func (e *Example) SaveAs(key string) error {
e.SetKey(key)
return db.PutNew(e)
}
return model, nil
}
*/
package database

72
database/hook.go Normal file
View file

@ -0,0 +1,72 @@
package database
import (
"github.com/Safing/portbase/database/query"
"github.com/Safing/portbase/database/record"
)
// Hook describes a hook
type Hook interface {
UsesPreGet() bool
PreGet(dbKey string) error
UsesPostGet() bool
PostGet(r record.Record) (record.Record, error)
UsesPrePut() bool
PrePut(r record.Record) (record.Record, error)
UsesPostPut() bool
PostPut(r record.Record)
}
// RegisteredHook is a registered database hook.
type RegisteredHook struct {
q *query.Query
hook Hook
}
// RegisterHook registeres a hook for records matching the given query in the database.
func RegisterHook(q *query.Query, hook Hook) error {
_, err := q.Check()
if err != nil {
return err
}
c, err := getController(q.DatabaseName())
if err != nil {
return err
}
c.readLock.Lock()
defer c.readLock.Lock()
c.writeLock.Lock()
defer c.writeLock.Unlock()
c.hooks = append(c.hooks, &RegisteredHook{
q: q,
hook: hook,
})
return nil
}
// Cancel unhooks the hook.
func (h *RegisteredHook) Cancel() error {
c, err := getController(h.q.DatabaseName())
if err != nil {
return err
}
c.readLock.Lock()
defer c.readLock.Lock()
c.writeLock.Lock()
defer c.writeLock.Unlock()
for key, hook := range c.hooks {
if hook.q == h.q {
c.hooks = append(c.hooks[:key], c.hooks[key+1:]...)
return nil
}
}
return nil
}

29
database/hookbase.go Normal file
View file

@ -0,0 +1,29 @@
package database
import (
"github.com/Safing/portbase/database/record"
)
// HookBase implements the Hook interface.
type HookBase struct {
}
// PreGet implements the Hook interface.
func (b *HookBase) PreGet(dbKey string) error {
return nil
}
// PostGet implements the Hook interface.
func (b *HookBase) PostGet(r record.Record) (record.Record, error) {
return r, nil
}
// PrePut implements the Hook interface.
func (b *HookBase) PrePut(r record.Record) (record.Record, error) {
return r, nil
}
// PostPut implements the Hook interface.
func (b *HookBase) PostPut(r record.Record) {
return
}

View file

@ -7,6 +7,7 @@ import (
// Iterator defines the iterator structure.
type Iterator struct {
Next chan record.Record
Done chan struct{}
Error error
}
@ -14,5 +15,6 @@ type Iterator struct {
func New() *Iterator {
return &Iterator{
Next: make(chan record.Record, 10),
Done: make(chan struct{}),
}
}

View file

@ -4,7 +4,6 @@ import (
"fmt"
"strings"
"github.com/Safing/portbase/database/accessor"
"github.com/Safing/portbase/database/record"
)
@ -74,8 +73,6 @@ func (q *Query) Check() (*Query, error) {
if err != nil {
return nil, err
}
} else {
q.where = &noCond{}
}
q.checked = true
@ -96,8 +93,28 @@ func (q *Query) IsChecked() bool {
return q.checked
}
// MatchesKey checks whether the query matches the supplied database key (key without database prefix).
func (q *Query) MatchesKey(dbKey string) bool {
if !strings.HasPrefix(dbKey, q.dbKeyPrefix) {
return false
}
return true
}
// Matches checks whether the query matches the supplied data object.
func (q *Query) Matches(acc accessor.Accessor) bool {
func (q *Query) Matches(r record.Record) bool {
if !strings.HasPrefix(r.DatabaseKey(), q.dbKeyPrefix) {
return false
}
if q.where == nil {
return false
}
acc := r.GetAccessor(r)
if acc == nil {
return false
}
return q.where.complies(acc)
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/Safing/portbase/container"
"github.com/Safing/portbase/database/accessor"
"github.com/Safing/portbase/formats/dsd"
)
@ -98,3 +99,8 @@ func (b *Base) MarshalRecord(self Record) ([]byte, error) {
func (b *Base) IsWrapped() bool {
return false
}
// GetAccessor returns an accessor for this record, if available.
func (b *Base) GetAccessor(self Record) accessor.Accessor {
return accessor.NewStructAccessor(self)
}

View file

@ -1,5 +1,9 @@
package record
import (
"github.com/Safing/portbase/database/accessor"
)
// Record provides an interface for uniformally handling database records.
type Record interface {
Key() string // test:config
@ -11,8 +15,9 @@ type Record interface {
Meta() *Meta
SetMeta(meta *Meta)
Marshal(r Record, format uint8) ([]byte, error)
MarshalRecord(r Record) ([]byte, error)
Marshal(self Record, format uint8) ([]byte, error)
MarshalRecord(self Record) ([]byte, error)
GetAccessor(self Record) accessor.Accessor
Lock()
Unlock()

View file

@ -6,19 +6,26 @@ import (
"sync"
"github.com/Safing/portbase/container"
"github.com/Safing/portbase/database/accessor"
"github.com/Safing/portbase/formats/dsd"
"github.com/Safing/portbase/formats/varint"
)
// Wrapper wraps raw data and implements the Record interface.
type Wrapper struct {
Base
sync.Mutex
Format uint8
Data []byte
lock sync.Mutex
}
// NewRawWrapper returns a record wrapper for the given data, including metadata. This is normally only used by storage backends when loading records.
func NewRawWrapper(database, key string, data []byte) (*Wrapper, error) {
version, offset, err := varint.Unpack8(data)
if err != nil {
return nil, err
}
if version != 1 {
return nil, fmt.Errorf("incompatible record version: %d", version)
}
@ -46,13 +53,13 @@ func NewRawWrapper(database, key string, data []byte) (*Wrapper, error) {
key,
newMeta,
},
sync.Mutex{},
format,
data[offset:],
sync.Mutex{},
}, nil
}
// NewWrapper returns a new model wrapper for the given data.
// NewWrapper returns a new record wrapper for the given data.
func NewWrapper(key string, meta *Meta, data []byte) (*Wrapper, error) {
format, _, err := varint.Unpack8(data)
if err != nil {
@ -67,9 +74,9 @@ func NewWrapper(key string, meta *Meta, data []byte) (*Wrapper, error) {
dbKey: dbKey,
meta: meta,
},
sync.Mutex{},
format,
data,
sync.Mutex{},
}, nil
}
@ -117,34 +124,44 @@ func (w *Wrapper) MarshalRecord(r Record) ([]byte, error) {
return c.CompileData(), nil
}
// Lock locks the record.
func (w *Wrapper) Lock() {
w.lock.Lock()
}
// Unlock unlocks the record.
func (w *Wrapper) Unlock() {
w.lock.Unlock()
}
// // Lock locks the record.
// func (w *Wrapper) Lock() {
// w.lock.Lock()
// }
//
// // Unlock unlocks the record.
// func (w *Wrapper) Unlock() {
// w.lock.Unlock()
// }
// IsWrapped returns whether the record is a Wrapper.
func (w *Wrapper) IsWrapped() bool {
return true
}
func Unwrap(wrapped, new Record) (Record, error) {
// Unwrap unwraps data into a record.
func Unwrap(wrapped, new Record) error {
wrapper, ok := wrapped.(*Wrapper)
if !ok {
return nil, fmt.Errorf("cannot unwrap %T", wrapped)
return fmt.Errorf("cannot unwrap %T", wrapped)
}
_, err := dsd.Load(wrapper.Data, new)
if err != nil {
return nil, fmt.Errorf("database: failed to unwrap %T: %s", new, err)
return fmt.Errorf("failed to unwrap %T: %s", new, err)
}
new.SetKey(wrapped.Key())
new.SetMeta(wrapped.Meta())
return new, nil
return nil
}
// GetAccessor returns an accessor for this record, if available.
func (w *Wrapper) GetAccessor(self Record) accessor.Accessor {
if len(w.Data) > 1 && w.Data[0] == JSON {
jsonData := w.Data[1:]
return accessor.NewJSONBytesAccessor(&jsonData)
}
return nil
}

View file

@ -7,7 +7,6 @@ import (
"github.com/dgraph-io/badger"
"github.com/Safing/portbase/database/accessor"
"github.com/Safing/portbase/database/iterator"
"github.com/Safing/portbase/database/query"
"github.com/Safing/portbase/database/record"
@ -139,29 +138,24 @@ func (b *Badger) queryExecutor(queryIter *iterator.Iterator, q *query.Query, loc
continue
}
if len(r.Data) > 1 {
jsonData := r.Data[1:]
acc := accessor.NewJSONBytesAccessor(&jsonData)
if q.Matches(acc) {
copiedData, err := item.ValueCopy(nil)
if err != nil {
return err
}
new, err := record.NewRawWrapper(b.name, string(item.Key()), copiedData)
if err != nil {
return err
}
acc := r.GetAccessor(r)
if acc != nil && q.Matches(acc) {
copiedData, err := item.ValueCopy(nil)
if err != nil {
return err
}
new, err := record.NewRawWrapper(b.name, string(item.Key()), copiedData)
if err != nil {
return err
}
select {
case queryIter.Next <- new:
default:
select {
case queryIter.Next <- new:
default:
select {
case queryIter.Next <- new:
case <-time.After(1 * time.Minute):
return errors.New("query timeout")
}
case <-time.After(1 * time.Minute):
return errors.New("query timeout")
}
}
}
@ -173,6 +167,7 @@ func (b *Badger) queryExecutor(queryIter *iterator.Iterator, q *query.Query, loc
queryIter.Error = err
}
close(queryIter.Next)
close(queryIter.Done)
}
// ReadOnly returns whether the database is read only.

View file

@ -78,7 +78,7 @@ func TestBadger(t *testing.T) {
}
a1 := &TestRecord{}
_, err = record.Unwrap(r1, a1)
err = record.Unwrap(r1, a1)
if err != nil {
t.Fatal(err)
}

59
database/subscription.go Normal file
View file

@ -0,0 +1,59 @@
package database
import (
"github.com/Safing/portbase/database/query"
"github.com/Safing/portbase/database/record"
)
// Subscription is a database subscription for updates.
type Subscription struct {
q *query.Query
Feed chan record.Record
Err error
}
// Subscribe subscribes to updates matching the given query.
func Subscribe(q *query.Query) (*Subscription, error) {
_, err := q.Check()
if err != nil {
return nil, err
}
c, err := getController(q.DatabaseName())
if err != nil {
return nil, err
}
c.readLock.Lock()
defer c.readLock.Lock()
c.writeLock.Lock()
defer c.writeLock.Unlock()
sub := &Subscription{
q: q,
Feed: make(chan record.Record, 100),
}
c.subscriptions = append(c.subscriptions, sub)
return sub, nil
}
// Cancel cancels the subscription.
func (s *Subscription) Cancel() error {
c, err := getController(s.q.DatabaseName())
if err != nil {
return err
}
c.readLock.Lock()
defer c.readLock.Lock()
c.writeLock.Lock()
defer c.writeLock.Unlock()
for key, sub := range c.subscriptions {
if sub.q == s.q {
c.subscriptions = append(c.subscriptions[:key], c.subscriptions[key+1:]...)
return nil
}
}
return nil
}