Merge pull request #78 from safing/feature/runtime-db

Add runtime module providing a simple injected DB
This commit is contained in:
Patrick Pacher 2020-09-18 12:00:46 +02:00 committed by GitHub
commit 75ab99d681
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 683 additions and 3 deletions

View file

@ -9,6 +9,22 @@ import (
"github.com/safing/portbase/formats/dsd"
)
// TODO(ppacher):
// we can reduce the record.Record interface a lot by moving
// most of those functions that require the Record as it's first
// parameter to static package functions
// (i.e. Marshal, MarshalRecord, GetAccessor, ...).
// We should also consider given Base a GetBase() *Base method
// that returns itself. This way we can remove almost all Base
// only methods from the record.Record interface. That is, we can
// remove all those CreateMeta, UpdateMeta, ... stuff from the
// interface definition (not the actual functions!). This would make
// the record.Record interface slim and only provide methods that
// most users actually need. All those database/storage related methods
// can still be accessed by using GetBase().XXX() instead. We can also
// expose the dbName and dbKey and meta properties directly which would
// make a nice JSON blob when marshalled.
// Base provides a quick way to comply with the Model interface.
type Base struct {
dbName string

View file

@ -7,8 +7,8 @@ import (
// ParseKey splits a key into it's database name and key parts.
func ParseKey(key string) (dbName, dbKey string) {
splitted := strings.SplitN(key, ":", 2)
if len(splitted) == 2 {
return splitted[0], splitted[1]
if len(splitted) < 2 {
return splitted[0], ""
}
return splitted[0], ""
return splitted[0], strings.Join(splitted[1:], ":")
}

42
runtime/module_api.go Normal file
View file

@ -0,0 +1,42 @@
package runtime
import (
"github.com/safing/portbase/database"
"github.com/safing/portbase/modules"
)
var (
// DefaultRegistry is the default registry
// that is used by the module-level API.
DefaultRegistry = NewRegistry()
module *modules.Module
)
func init() {
module = modules.Register("runtime", nil, startModule, nil, "database")
}
func startModule() error {
_, err := database.Register(&database.Database{
Name: "runtime",
Description: "Runtime database",
StorageType: "injected",
PrimaryAPI: "",
})
if err != nil {
return err
}
if err := DefaultRegistry.InjectAsDatabase("runtime"); err != nil {
return err
}
return nil
}
// Register is like Registry.Register but uses
// the package DefaultRegistry.
func Register(key string, provider ValueProvider) (PushFunc, error) {
return DefaultRegistry.Register(key, provider)
}

66
runtime/provider.go Normal file
View file

@ -0,0 +1,66 @@
package runtime
import (
"errors"
"github.com/safing/portbase/database/record"
)
var (
// ErrReadOnly should be returned from ValueProvider.Set if a
// runtime record is considered read-only.
ErrReadOnly = errors.New("runtime record is read-only")
// ErrWriteOnly should be returned from ValueProvider.Get if
// a runtime record is considered write-only.
ErrWriteOnly = errors.New("runtime record is write-only")
)
type (
// PushFunc is returned when registering a new value provider
// and can be used to inform the database system about the
// availability of a new runtime record value.
PushFunc func(...record.Record)
// ValueProvider provides access to a runtime-computed
// database record.
ValueProvider interface {
// Set is called when the value is set from outside.
// If the runtime value is considered read-only ErrReadOnly
// should be returned. It is guaranteed that the key of
// the record passed to Set is prefixed with the key used
// to register the value provider.
Set(record.Record) (record.Record, error)
// Get should return one or more records that match keyOrPrefix.
// keyOrPrefix is guaranteed to be at least the prefix used to
// register the ValueProvider.
Get(keyOrPrefix string) ([]record.Record, error)
}
// SimpleValueSetterFunc is a convenience type for implementing a
// write-only value provider.
SimpleValueSetterFunc func(record.Record) error
// SimpleValueGetterFunc is a convenience type for implementing a
// read-only value provider.
SimpleValueGetterFunc func(keyOrPrefix string) ([]record.Record, error)
)
// Set implements ValueProvider.Set and calls fn.
func (fn SimpleValueSetterFunc) Set(r record.Record) error {
return fn(r)
}
// Get implements ValueProvider.Get and returns ErrWriteOnly.
func (SimpleValueSetterFunc) Get(_ string) ([]record.Record, error) {
return nil, ErrWriteOnly
}
// Set implements ValueProvider.Set and returns ErrReadOnly.
func (SimpleValueGetterFunc) Set(r record.Record) error {
return ErrReadOnly
}
// Get implements ValueProvider.Get and calls fn.
func (fn SimpleValueGetterFunc) Get(keyOrPrefix string) ([]record.Record, error) {
return fn(keyOrPrefix)
}

294
runtime/registry.go Normal file
View file

@ -0,0 +1,294 @@
package runtime
import (
"errors"
"fmt"
"strings"
"sync"
"github.com/armon/go-radix"
"github.com/safing/portbase/database"
"github.com/safing/portbase/database/iterator"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/database/storage"
"golang.org/x/sync/errgroup"
)
var (
// ErrKeyTaken is returned when trying to register
// a value provider at database key or prefix that
// is already occupied by another provider.
ErrKeyTaken = errors.New("runtime key or prefix already used")
// ErrKeyUnmanaged is returned when a Put operation
// on an unmanaged key is performed.
ErrKeyUnmanaged = errors.New("runtime key not managed by any provider")
// ErrInjected is returned by Registry.InjectAsDatabase
// if the registry has already been injected.
ErrInjected = errors.New("registry already injected")
)
// Registry keeps track of registered runtime
// value providers and exposes them via an
// injected database. Users normally just need
// to use the defaul registry provided by this
// package but may consider creating a dedicated
// runtime registry on their own. Registry uses
// a radix tree for value providers and their
// choosen database key/prefix.
type Registry struct {
l sync.RWMutex
providers *radix.Tree
dbController *database.Controller
}
// keyedValueProvider simply wraps a value provider with it's
// registration prefix.
type keyedValueProvider struct {
ValueProvider
key string
}
// NewRegistry returns a new registry.
func NewRegistry() *Registry {
return &Registry{
providers: radix.New(),
}
}
func isPrefixKey(key string) bool {
return strings.HasSuffix(key, "/")
}
// InjectAsDatabase injects the registry as the storage
// database for name.
func (r *Registry) InjectAsDatabase(name string) error {
r.l.Lock()
defer r.l.Unlock()
if r.dbController != nil {
return ErrInjected
}
ctrl, err := database.InjectDatabase(name, r.asStorage())
if err != nil {
return err
}
r.dbController = ctrl
return nil
}
// Register registers a new value provider p under keyOrPrefix. The
// returned PushFunc can be used to send update notitifcations to
// database subscribers. Note that keyOrPrefix must end in '/' to be
// accepted as a prefix.
func (r *Registry) Register(keyOrPrefix string, p ValueProvider) (PushFunc, error) {
r.l.Lock()
defer r.l.Unlock()
// search if there's a provider registered for a prefix
// that matches or is equal to keyOrPrefix.
key, _, ok := r.providers.LongestPrefix(keyOrPrefix)
if ok && (isPrefixKey(key) || key == keyOrPrefix) {
return nil, fmt.Errorf("%w: found provider on %s", ErrKeyTaken, key)
}
// if keyOrPrefix is a prefix there must not be any provider
// registered for a key that matches keyOrPrefix.
if isPrefixKey(keyOrPrefix) {
foundProvider := ""
r.providers.WalkPrefix(keyOrPrefix, func(s string, _ interface{}) bool {
foundProvider = s
return true
})
if foundProvider != "" {
return nil, fmt.Errorf("%w: found provider on %s", ErrKeyTaken, foundProvider)
}
}
r.providers.Insert(keyOrPrefix, &keyedValueProvider{
ValueProvider: p,
key: keyOrPrefix,
})
return func(records ...record.Record) {
r.l.RLock()
defer r.l.RUnlock()
if r.dbController == nil {
return
}
for _, rec := range records {
r.dbController.PushUpdate(rec)
}
}, nil
}
// Get returns the runtime value that is identified by key.
// It implements the storage.Interface.
func (r *Registry) Get(key string) (record.Record, error) {
provider := r.getMatchingProvider(key)
if provider == nil {
return nil, nil
}
records, err := provider.Get(key)
if err != nil {
return nil, err
}
// Get performs an exact match so filter out
// and values that do not match key.
for _, r := range records {
if r.DatabaseKey() == key {
return r, nil
}
}
return nil, nil
}
// Put stores the record m in the runtime database. Note that
// ErrReadOnly is returned if there's no value provider responsible
// for m.Key().
func (r *Registry) Put(m record.Record) (record.Record, error) {
provider := r.getMatchingProvider(m.DatabaseKey())
if provider == nil {
// if there's no provider for the given value
// return ErrKeyUnmanaged.
return nil, ErrKeyUnmanaged
}
res, err := provider.Set(m)
if err != nil {
return nil, err
}
return res, nil
}
// Query performs a query on the runtime registry returning all
// records across all value providers that match q.
// Query implements the storage.Storage interface.
func (r *Registry) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
if _, err := q.Check(); err != nil {
return nil, fmt.Errorf("invalid query: %w", err)
}
searchPrefix := q.DatabaseKeyPrefix()
providers := r.collectProviderByPrefix(searchPrefix)
if len(providers) == 0 {
return nil, fmt.Errorf("%w: for key %s", ErrKeyUnmanaged, searchPrefix)
}
iter := iterator.New()
grp := new(errgroup.Group)
for idx := range providers {
p := providers[idx]
grp.Go(func() (err error) {
defer recovery(&err)
key := p.key
if len(searchPrefix) > len(key) {
key = searchPrefix
}
records, err := p.Get(key)
if err != nil {
return err
}
for _, r := range records {
// TODO(ppacher): do we need to lock r?
// storage/hashmap does not lock the records
// before sending them to the iterator but
// better make sure that's correct.
if !strings.HasPrefix(r.DatabaseKey(), searchPrefix) {
continue
}
if !r.Meta().CheckValidity() {
continue
}
if !r.Meta().CheckPermission(local, internal) {
continue
}
if !q.MatchesRecord(r) {
continue
}
select {
case iter.Next <- r:
case <-iter.Done:
return nil
}
}
return nil
})
}
go func() {
err := grp.Wait()
iter.Finish(err)
}()
return iter, nil
}
func (r *Registry) getMatchingProvider(key string) *keyedValueProvider {
r.l.RLock()
defer r.l.RUnlock()
providerKey, provider, ok := r.providers.LongestPrefix(key)
if !ok {
return nil
}
if !isPrefixKey(providerKey) && providerKey != key {
return nil
}
return provider.(*keyedValueProvider)
}
func (r *Registry) collectProviderByPrefix(prefix string) []*keyedValueProvider {
r.l.RLock()
defer r.l.RUnlock()
// if there's a LongestPrefix provider that's the only one
// we need to ask
if _, p, ok := r.providers.LongestPrefix(prefix); ok {
return []*keyedValueProvider{p.(*keyedValueProvider)}
}
var providers []*keyedValueProvider
r.providers.WalkPrefix(prefix, func(key string, p interface{}) bool {
providers = append(providers, p.(*keyedValueProvider))
return false
})
return providers
}
// asStorage returns a storage.Interface compatible struct
// that is backed by r.
func (r *Registry) asStorage() storage.Interface {
return &storageWrapper{
Registry: r,
}
}
func recovery(err *error) {
if x := recover(); x != nil {
if e, ok := x.(error); ok {
*err = e
return
}
*err = fmt.Errorf("%v", x)
}
}

150
runtime/registry_test.go Normal file
View file

@ -0,0 +1,150 @@
package runtime
import (
"errors"
"sync"
"testing"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type testRecord struct {
record.Base
sync.Mutex
Value string
}
func makeTestRecord(key, value string) record.Record {
r := &testRecord{Value: value}
r.CreateMeta()
r.SetKey("runtime:" + key)
return r
}
type testProvider struct {
k string
r []record.Record
}
func (tp *testProvider) Get(key string) ([]record.Record, error) {
return tp.r, nil
}
func (tp *testProvider) Set(r record.Record) (record.Record, error) {
return nil, errors.New("not implemented")
}
func getTestRegistry(t *testing.T) *Registry {
t.Helper()
r := NewRegistry()
providers := []testProvider{
testProvider{
k: "p1/",
r: []record.Record{
makeTestRecord("p1/f1/v1", "p1.1"),
makeTestRecord("p1/f2/v2", "p1.2"),
makeTestRecord("p1/v3", "p1.3"),
},
},
testProvider{
k: "p2/f1",
r: []record.Record{
makeTestRecord("p2/f1/v1", "p2.1"),
makeTestRecord("p2/f1/f2/v2", "p2.2"),
makeTestRecord("p2/f1/v3", "p2.3"),
},
},
}
for idx := range providers {
p := providers[idx]
_, err := r.Register(p.k, &p)
require.NoError(t, err)
}
return r
}
func TestRegistryGet(t *testing.T) {
var (
r record.Record
err error
)
reg := getTestRegistry(t)
r, err = reg.Get("p1/f1/v1")
require.NoError(t, err)
require.NotNil(t, r)
assert.Equal(t, "p1.1", r.(*testRecord).Value)
r, err = reg.Get("p1/v3")
require.NoError(t, err)
require.NotNil(t, r)
assert.Equal(t, "p1.3", r.(*testRecord).Value)
r, err = reg.Get("p1/v4")
require.NoError(t, err)
assert.Nil(t, r)
r, err = reg.Get("no-provider/foo")
require.NoError(t, err)
assert.Nil(t, r)
}
func TestRegistryQuery(t *testing.T) {
reg := getTestRegistry(t)
q := query.New("runtime:p")
iter, err := reg.Query(q, true, true)
require.NoError(t, err)
require.NotNil(t, iter)
var records []record.Record
for r := range iter.Next {
records = append(records, r)
}
assert.Len(t, records, 6)
q = query.New("runtime:p1/f")
iter, err = reg.Query(q, true, true)
require.NoError(t, err)
require.NotNil(t, iter)
records = nil
for r := range iter.Next {
records = append(records, r)
}
assert.Len(t, records, 2)
}
func TestRegistryRegister(t *testing.T) {
r := NewRegistry()
cases := []struct {
inp string
err bool
}{
{"runtime:foo/bar/bar", false},
{"runtime:foo/bar/bar2", false},
{"runtime:foo/bar", false},
{"runtime:foo/bar", true}, // already used
{"runtime:foo/bar/", true}, // cannot register a prefix if there are providers below
{"runtime:foo/baz/", false},
{"runtime:foo/baz2/", false},
{"runtime:foo/baz3", false},
{"runtime:foo/baz/bar", true},
}
for _, c := range cases {
_, err := r.Register(c.inp, nil)
if c.err {
assert.Error(t, err, c.inp)
} else {
assert.NoError(t, err, c.inp)
}
}
}

View file

@ -0,0 +1,45 @@
package runtime
import "github.com/safing/portbase/database/record"
// singleRecordReader is a convenience type for read-only exposing
// a single record.Record. Note that users must lock the whole record
// themself before performing any manipulation on the record.
type singleRecordReader struct {
record.Record
}
// ProvideRecord returns a ValueProvider the exposes read-only
// access to r. Users of ProvideRecord need to ensure the lock
// the whole record before performing modifications on it.
//
// Example:
//
// type MyValue struct {
// record.Base
// Value string
// }
// r := new(MyValue)
// pushUpdate, _ := runtime.Register("my/key", ProvideRecord(r))
// r.Lock()
// r.Value = "foobar"
// r.Unlock()
// pushUpdate(r)
//
func ProvideRecord(r record.Record) ValueProvider {
return &singleRecordReader{r}
}
// Set implements ValueProvider.Set and returns ErrReadOnly.
func (sr *singleRecordReader) Set(_ record.Record) (record.Record, error) {
return nil, ErrReadOnly
}
// Get implements ValueProvider.Get and returns the wrapped record.Record
// but only if keyOrPrefix exactly matches the records database key.
func (sr *singleRecordReader) Get(keyOrPrefix string) ([]record.Record, error) {
if keyOrPrefix != sr.Record.Key() {
return nil, nil
}
return []record.Record{sr.Record}, nil
}

30
runtime/storage.go Normal file
View file

@ -0,0 +1,30 @@
package runtime
import (
"github.com/safing/portbase/database/iterator"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/database/storage"
)
// storageWrapper is a simple wrapper around storage.InjectBase and
// Registry and make sure the supported methods are handled by
// the registry rather than the InjectBase defaults.
// storageWrapper is mainly there to keep the method landscape of
// Registry as small as possible.
type storageWrapper struct {
storage.InjectBase
Registry *Registry
}
func (sw *storageWrapper) Get(key string) (record.Record, error) {
return sw.Registry.Get(key)
}
func (sw *storageWrapper) Put(r record.Record) (record.Record, error) {
return sw.Registry.Put(r)
}
func (sw *storageWrapper) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
return sw.Registry.Query(q, local, internal)
}

37
runtime/trace_provider.go Normal file
View file

@ -0,0 +1,37 @@
package runtime
import (
"time"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/log"
)
// traceValueProvider can be used to wrap an
// existing value provider to trace an calls to
// their Set and Get methods.
type traceValueProvider struct {
ValueProvider
}
// TraceProvider returns a new ValueProvider that wraps
// vp but traces all Set and Get methods calls.
func TraceProvider(vp ValueProvider) ValueProvider {
return &traceValueProvider{vp}
}
func (tvp *traceValueProvider) Set(r record.Record) (res record.Record, err error) {
defer func(start time.Time) {
log.Tracef("runtime: settings record %q: duration=%s err=%v", r.Key(), time.Since(start), err)
}(time.Now())
return tvp.ValueProvider.Set(r)
}
func (tvp *traceValueProvider) Get(keyOrPrefix string) (records []record.Record, err error) {
defer func(start time.Time) {
log.Tracef("runtime: loading records %q: duration=%s err=%v #records=%d", keyOrPrefix, time.Since(start), err, len(records))
}(time.Now())
return tvp.ValueProvider.Get(keyOrPrefix)
}