diff --git a/database/record/base.go b/database/record/base.go index 8bc1d6b..d00e613 100644 --- a/database/record/base.go +++ b/database/record/base.go @@ -9,6 +9,22 @@ import ( "github.com/safing/portbase/formats/dsd" ) +// TODO(ppacher): +// we can reduce the record.Record interface a lot by moving +// most of those functions that require the Record as it's first +// parameter to static package functions +// (i.e. Marshal, MarshalRecord, GetAccessor, ...). +// We should also consider given Base a GetBase() *Base method +// that returns itself. This way we can remove almost all Base +// only methods from the record.Record interface. That is, we can +// remove all those CreateMeta, UpdateMeta, ... stuff from the +// interface definition (not the actual functions!). This would make +// the record.Record interface slim and only provide methods that +// most users actually need. All those database/storage related methods +// can still be accessed by using GetBase().XXX() instead. We can also +// expose the dbName and dbKey and meta properties directly which would +// make a nice JSON blob when marshalled. + // Base provides a quick way to comply with the Model interface. type Base struct { dbName string diff --git a/database/record/key.go b/database/record/key.go index b02eecf..0dfb0a3 100644 --- a/database/record/key.go +++ b/database/record/key.go @@ -7,8 +7,8 @@ import ( // ParseKey splits a key into it's database name and key parts. func ParseKey(key string) (dbName, dbKey string) { splitted := strings.SplitN(key, ":", 2) - if len(splitted) == 2 { - return splitted[0], splitted[1] + if len(splitted) < 2 { + return splitted[0], "" } - return splitted[0], "" + return splitted[0], strings.Join(splitted[1:], ":") } diff --git a/runtime/module_api.go b/runtime/module_api.go new file mode 100644 index 0000000..fc417dd --- /dev/null +++ b/runtime/module_api.go @@ -0,0 +1,42 @@ +package runtime + +import ( + "github.com/safing/portbase/database" + "github.com/safing/portbase/modules" +) + +var ( + // DefaultRegistry is the default registry + // that is used by the module-level API. + DefaultRegistry = NewRegistry() + + module *modules.Module +) + +func init() { + module = modules.Register("runtime", nil, startModule, nil, "database") +} + +func startModule() error { + _, err := database.Register(&database.Database{ + Name: "runtime", + Description: "Runtime database", + StorageType: "injected", + PrimaryAPI: "", + }) + if err != nil { + return err + } + + if err := DefaultRegistry.InjectAsDatabase("runtime"); err != nil { + return err + } + + return nil +} + +// Register is like Registry.Register but uses +// the package DefaultRegistry. +func Register(key string, provider ValueProvider) (PushFunc, error) { + return DefaultRegistry.Register(key, provider) +} diff --git a/runtime/provider.go b/runtime/provider.go new file mode 100644 index 0000000..94cdbd5 --- /dev/null +++ b/runtime/provider.go @@ -0,0 +1,66 @@ +package runtime + +import ( + "errors" + + "github.com/safing/portbase/database/record" +) + +var ( + // ErrReadOnly should be returned from ValueProvider.Set if a + // runtime record is considered read-only. + ErrReadOnly = errors.New("runtime record is read-only") + // ErrWriteOnly should be returned from ValueProvider.Get if + // a runtime record is considered write-only. + ErrWriteOnly = errors.New("runtime record is write-only") +) + +type ( + // PushFunc is returned when registering a new value provider + // and can be used to inform the database system about the + // availability of a new runtime record value. + PushFunc func(...record.Record) + + // ValueProvider provides access to a runtime-computed + // database record. + ValueProvider interface { + // Set is called when the value is set from outside. + // If the runtime value is considered read-only ErrReadOnly + // should be returned. It is guaranteed that the key of + // the record passed to Set is prefixed with the key used + // to register the value provider. + Set(record.Record) (record.Record, error) + // Get should return one or more records that match keyOrPrefix. + // keyOrPrefix is guaranteed to be at least the prefix used to + // register the ValueProvider. + Get(keyOrPrefix string) ([]record.Record, error) + } + + // SimpleValueSetterFunc is a convenience type for implementing a + // write-only value provider. + SimpleValueSetterFunc func(record.Record) error + + // SimpleValueGetterFunc is a convenience type for implementing a + // read-only value provider. + SimpleValueGetterFunc func(keyOrPrefix string) ([]record.Record, error) +) + +// Set implements ValueProvider.Set and calls fn. +func (fn SimpleValueSetterFunc) Set(r record.Record) error { + return fn(r) +} + +// Get implements ValueProvider.Get and returns ErrWriteOnly. +func (SimpleValueSetterFunc) Get(_ string) ([]record.Record, error) { + return nil, ErrWriteOnly +} + +// Set implements ValueProvider.Set and returns ErrReadOnly. +func (SimpleValueGetterFunc) Set(r record.Record) error { + return ErrReadOnly +} + +// Get implements ValueProvider.Get and calls fn. +func (fn SimpleValueGetterFunc) Get(keyOrPrefix string) ([]record.Record, error) { + return fn(keyOrPrefix) +} diff --git a/runtime/registry.go b/runtime/registry.go new file mode 100644 index 0000000..21f8a02 --- /dev/null +++ b/runtime/registry.go @@ -0,0 +1,294 @@ +package runtime + +import ( + "errors" + "fmt" + "strings" + "sync" + + "github.com/armon/go-radix" + "github.com/safing/portbase/database" + "github.com/safing/portbase/database/iterator" + "github.com/safing/portbase/database/query" + "github.com/safing/portbase/database/record" + "github.com/safing/portbase/database/storage" + "golang.org/x/sync/errgroup" +) + +var ( + // ErrKeyTaken is returned when trying to register + // a value provider at database key or prefix that + // is already occupied by another provider. + ErrKeyTaken = errors.New("runtime key or prefix already used") + // ErrKeyUnmanaged is returned when a Put operation + // on an unmanaged key is performed. + ErrKeyUnmanaged = errors.New("runtime key not managed by any provider") + // ErrInjected is returned by Registry.InjectAsDatabase + // if the registry has already been injected. + ErrInjected = errors.New("registry already injected") +) + +// Registry keeps track of registered runtime +// value providers and exposes them via an +// injected database. Users normally just need +// to use the defaul registry provided by this +// package but may consider creating a dedicated +// runtime registry on their own. Registry uses +// a radix tree for value providers and their +// choosen database key/prefix. +type Registry struct { + l sync.RWMutex + providers *radix.Tree + dbController *database.Controller +} + +// keyedValueProvider simply wraps a value provider with it's +// registration prefix. +type keyedValueProvider struct { + ValueProvider + key string +} + +// NewRegistry returns a new registry. +func NewRegistry() *Registry { + return &Registry{ + providers: radix.New(), + } +} + +func isPrefixKey(key string) bool { + return strings.HasSuffix(key, "/") +} + +// InjectAsDatabase injects the registry as the storage +// database for name. +func (r *Registry) InjectAsDatabase(name string) error { + r.l.Lock() + defer r.l.Unlock() + + if r.dbController != nil { + return ErrInjected + } + + ctrl, err := database.InjectDatabase(name, r.asStorage()) + if err != nil { + return err + } + + r.dbController = ctrl + + return nil +} + +// Register registers a new value provider p under keyOrPrefix. The +// returned PushFunc can be used to send update notitifcations to +// database subscribers. Note that keyOrPrefix must end in '/' to be +// accepted as a prefix. +func (r *Registry) Register(keyOrPrefix string, p ValueProvider) (PushFunc, error) { + r.l.Lock() + defer r.l.Unlock() + + // search if there's a provider registered for a prefix + // that matches or is equal to keyOrPrefix. + key, _, ok := r.providers.LongestPrefix(keyOrPrefix) + if ok && (isPrefixKey(key) || key == keyOrPrefix) { + return nil, fmt.Errorf("%w: found provider on %s", ErrKeyTaken, key) + } + + // if keyOrPrefix is a prefix there must not be any provider + // registered for a key that matches keyOrPrefix. + if isPrefixKey(keyOrPrefix) { + foundProvider := "" + r.providers.WalkPrefix(keyOrPrefix, func(s string, _ interface{}) bool { + foundProvider = s + return true + }) + if foundProvider != "" { + return nil, fmt.Errorf("%w: found provider on %s", ErrKeyTaken, foundProvider) + } + } + + r.providers.Insert(keyOrPrefix, &keyedValueProvider{ + ValueProvider: p, + key: keyOrPrefix, + }) + + return func(records ...record.Record) { + r.l.RLock() + defer r.l.RUnlock() + + if r.dbController == nil { + return + } + + for _, rec := range records { + r.dbController.PushUpdate(rec) + } + }, nil +} + +// Get returns the runtime value that is identified by key. +// It implements the storage.Interface. +func (r *Registry) Get(key string) (record.Record, error) { + provider := r.getMatchingProvider(key) + if provider == nil { + return nil, nil + } + + records, err := provider.Get(key) + if err != nil { + return nil, err + } + + // Get performs an exact match so filter out + // and values that do not match key. + for _, r := range records { + if r.DatabaseKey() == key { + return r, nil + } + } + return nil, nil +} + +// Put stores the record m in the runtime database. Note that +// ErrReadOnly is returned if there's no value provider responsible +// for m.Key(). +func (r *Registry) Put(m record.Record) (record.Record, error) { + provider := r.getMatchingProvider(m.DatabaseKey()) + if provider == nil { + // if there's no provider for the given value + // return ErrKeyUnmanaged. + return nil, ErrKeyUnmanaged + } + + res, err := provider.Set(m) + if err != nil { + return nil, err + } + return res, nil +} + +// Query performs a query on the runtime registry returning all +// records across all value providers that match q. +// Query implements the storage.Storage interface. +func (r *Registry) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { + if _, err := q.Check(); err != nil { + return nil, fmt.Errorf("invalid query: %w", err) + } + + searchPrefix := q.DatabaseKeyPrefix() + providers := r.collectProviderByPrefix(searchPrefix) + if len(providers) == 0 { + return nil, fmt.Errorf("%w: for key %s", ErrKeyUnmanaged, searchPrefix) + } + + iter := iterator.New() + + grp := new(errgroup.Group) + for idx := range providers { + p := providers[idx] + + grp.Go(func() (err error) { + defer recovery(&err) + + key := p.key + if len(searchPrefix) > len(key) { + key = searchPrefix + } + + records, err := p.Get(key) + if err != nil { + return err + } + + for _, r := range records { + // TODO(ppacher): do we need to lock r? + // storage/hashmap does not lock the records + // before sending them to the iterator but + // better make sure that's correct. + + if !strings.HasPrefix(r.DatabaseKey(), searchPrefix) { + continue + } + if !r.Meta().CheckValidity() { + continue + } + if !r.Meta().CheckPermission(local, internal) { + continue + } + if !q.MatchesRecord(r) { + continue + } + + select { + case iter.Next <- r: + case <-iter.Done: + return nil + } + } + + return nil + }) + } + + go func() { + err := grp.Wait() + iter.Finish(err) + }() + + return iter, nil +} + +func (r *Registry) getMatchingProvider(key string) *keyedValueProvider { + r.l.RLock() + defer r.l.RUnlock() + + providerKey, provider, ok := r.providers.LongestPrefix(key) + if !ok { + return nil + } + + if !isPrefixKey(providerKey) && providerKey != key { + return nil + } + + return provider.(*keyedValueProvider) +} + +func (r *Registry) collectProviderByPrefix(prefix string) []*keyedValueProvider { + r.l.RLock() + defer r.l.RUnlock() + + // if there's a LongestPrefix provider that's the only one + // we need to ask + if _, p, ok := r.providers.LongestPrefix(prefix); ok { + return []*keyedValueProvider{p.(*keyedValueProvider)} + } + + var providers []*keyedValueProvider + r.providers.WalkPrefix(prefix, func(key string, p interface{}) bool { + providers = append(providers, p.(*keyedValueProvider)) + return false + }) + + return providers +} + +// asStorage returns a storage.Interface compatible struct +// that is backed by r. +func (r *Registry) asStorage() storage.Interface { + return &storageWrapper{ + Registry: r, + } +} + +func recovery(err *error) { + if x := recover(); x != nil { + if e, ok := x.(error); ok { + *err = e + return + } + + *err = fmt.Errorf("%v", x) + } +} diff --git a/runtime/registry_test.go b/runtime/registry_test.go new file mode 100644 index 0000000..458a7fb --- /dev/null +++ b/runtime/registry_test.go @@ -0,0 +1,150 @@ +package runtime + +import ( + "errors" + "sync" + "testing" + + "github.com/safing/portbase/database/query" + "github.com/safing/portbase/database/record" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type testRecord struct { + record.Base + sync.Mutex + Value string +} + +func makeTestRecord(key, value string) record.Record { + r := &testRecord{Value: value} + r.CreateMeta() + r.SetKey("runtime:" + key) + return r +} + +type testProvider struct { + k string + r []record.Record +} + +func (tp *testProvider) Get(key string) ([]record.Record, error) { + return tp.r, nil +} + +func (tp *testProvider) Set(r record.Record) (record.Record, error) { + return nil, errors.New("not implemented") +} + +func getTestRegistry(t *testing.T) *Registry { + t.Helper() + + r := NewRegistry() + + providers := []testProvider{ + testProvider{ + k: "p1/", + r: []record.Record{ + makeTestRecord("p1/f1/v1", "p1.1"), + makeTestRecord("p1/f2/v2", "p1.2"), + makeTestRecord("p1/v3", "p1.3"), + }, + }, + testProvider{ + k: "p2/f1", + r: []record.Record{ + makeTestRecord("p2/f1/v1", "p2.1"), + makeTestRecord("p2/f1/f2/v2", "p2.2"), + makeTestRecord("p2/f1/v3", "p2.3"), + }, + }, + } + + for idx := range providers { + p := providers[idx] + _, err := r.Register(p.k, &p) + require.NoError(t, err) + } + + return r +} + +func TestRegistryGet(t *testing.T) { + var ( + r record.Record + err error + ) + + reg := getTestRegistry(t) + + r, err = reg.Get("p1/f1/v1") + require.NoError(t, err) + require.NotNil(t, r) + assert.Equal(t, "p1.1", r.(*testRecord).Value) + + r, err = reg.Get("p1/v3") + require.NoError(t, err) + require.NotNil(t, r) + assert.Equal(t, "p1.3", r.(*testRecord).Value) + + r, err = reg.Get("p1/v4") + require.NoError(t, err) + assert.Nil(t, r) + + r, err = reg.Get("no-provider/foo") + require.NoError(t, err) + assert.Nil(t, r) +} + +func TestRegistryQuery(t *testing.T) { + reg := getTestRegistry(t) + + q := query.New("runtime:p") + iter, err := reg.Query(q, true, true) + require.NoError(t, err) + require.NotNil(t, iter) + var records []record.Record + for r := range iter.Next { + records = append(records, r) + } + assert.Len(t, records, 6) + + q = query.New("runtime:p1/f") + iter, err = reg.Query(q, true, true) + require.NoError(t, err) + require.NotNil(t, iter) + records = nil + for r := range iter.Next { + records = append(records, r) + } + assert.Len(t, records, 2) +} + +func TestRegistryRegister(t *testing.T) { + r := NewRegistry() + + cases := []struct { + inp string + err bool + }{ + {"runtime:foo/bar/bar", false}, + {"runtime:foo/bar/bar2", false}, + {"runtime:foo/bar", false}, + {"runtime:foo/bar", true}, // already used + {"runtime:foo/bar/", true}, // cannot register a prefix if there are providers below + {"runtime:foo/baz/", false}, + {"runtime:foo/baz2/", false}, + {"runtime:foo/baz3", false}, + {"runtime:foo/baz/bar", true}, + } + + for _, c := range cases { + _, err := r.Register(c.inp, nil) + if c.err { + assert.Error(t, err, c.inp) + } else { + assert.NoError(t, err, c.inp) + } + } +} diff --git a/runtime/singe_record_provider.go b/runtime/singe_record_provider.go new file mode 100644 index 0000000..7619f28 --- /dev/null +++ b/runtime/singe_record_provider.go @@ -0,0 +1,45 @@ +package runtime + +import "github.com/safing/portbase/database/record" + +// singleRecordReader is a convenience type for read-only exposing +// a single record.Record. Note that users must lock the whole record +// themself before performing any manipulation on the record. +type singleRecordReader struct { + record.Record +} + +// ProvideRecord returns a ValueProvider the exposes read-only +// access to r. Users of ProvideRecord need to ensure the lock +// the whole record before performing modifications on it. +// +// Example: +// +// type MyValue struct { +// record.Base +// Value string +// } +// r := new(MyValue) +// pushUpdate, _ := runtime.Register("my/key", ProvideRecord(r)) +// r.Lock() +// r.Value = "foobar" +// r.Unlock() +// pushUpdate(r) +// +func ProvideRecord(r record.Record) ValueProvider { + return &singleRecordReader{r} +} + +// Set implements ValueProvider.Set and returns ErrReadOnly. +func (sr *singleRecordReader) Set(_ record.Record) (record.Record, error) { + return nil, ErrReadOnly +} + +// Get implements ValueProvider.Get and returns the wrapped record.Record +// but only if keyOrPrefix exactly matches the records database key. +func (sr *singleRecordReader) Get(keyOrPrefix string) ([]record.Record, error) { + if keyOrPrefix != sr.Record.Key() { + return nil, nil + } + return []record.Record{sr.Record}, nil +} diff --git a/runtime/storage.go b/runtime/storage.go new file mode 100644 index 0000000..d0dddf8 --- /dev/null +++ b/runtime/storage.go @@ -0,0 +1,30 @@ +package runtime + +import ( + "github.com/safing/portbase/database/iterator" + "github.com/safing/portbase/database/query" + "github.com/safing/portbase/database/record" + "github.com/safing/portbase/database/storage" +) + +// storageWrapper is a simple wrapper around storage.InjectBase and +// Registry and make sure the supported methods are handled by +// the registry rather than the InjectBase defaults. +// storageWrapper is mainly there to keep the method landscape of +// Registry as small as possible. +type storageWrapper struct { + storage.InjectBase + Registry *Registry +} + +func (sw *storageWrapper) Get(key string) (record.Record, error) { + return sw.Registry.Get(key) +} + +func (sw *storageWrapper) Put(r record.Record) (record.Record, error) { + return sw.Registry.Put(r) +} + +func (sw *storageWrapper) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { + return sw.Registry.Query(q, local, internal) +} diff --git a/runtime/trace_provider.go b/runtime/trace_provider.go new file mode 100644 index 0000000..83dcab5 --- /dev/null +++ b/runtime/trace_provider.go @@ -0,0 +1,37 @@ +package runtime + +import ( + "time" + + "github.com/safing/portbase/database/record" + "github.com/safing/portbase/log" +) + +// traceValueProvider can be used to wrap an +// existing value provider to trace an calls to +// their Set and Get methods. +type traceValueProvider struct { + ValueProvider +} + +// TraceProvider returns a new ValueProvider that wraps +// vp but traces all Set and Get methods calls. +func TraceProvider(vp ValueProvider) ValueProvider { + return &traceValueProvider{vp} +} + +func (tvp *traceValueProvider) Set(r record.Record) (res record.Record, err error) { + defer func(start time.Time) { + log.Tracef("runtime: settings record %q: duration=%s err=%v", r.Key(), time.Since(start), err) + }(time.Now()) + + return tvp.ValueProvider.Set(r) +} + +func (tvp *traceValueProvider) Get(keyOrPrefix string) (records []record.Record, err error) { + defer func(start time.Time) { + log.Tracef("runtime: loading records %q: duration=%s err=%v #records=%d", keyOrPrefix, time.Since(start), err, len(records)) + }(time.Now()) + + return tvp.ValueProvider.Get(keyOrPrefix) +}