mirror of
https://github.com/safing/portbase
synced 2025-09-04 03:29:59 +00:00
Fix minor bugs in the runtime registry
This commit is contained in:
parent
4d0d97f198
commit
9266dc885d
5 changed files with 36 additions and 4 deletions
|
@ -100,14 +100,12 @@ func (mng *Manager) Get(keyOrPrefix string) ([]record.Record, error) {
|
||||||
defer mng.l.RUnlock()
|
defer mng.l.RUnlock()
|
||||||
|
|
||||||
dbName := mng.runtime.DatabaseName()
|
dbName := mng.runtime.DatabaseName()
|
||||||
|
|
||||||
records := make([]record.Record, 0, len(mng.subsys))
|
records := make([]record.Record, 0, len(mng.subsys))
|
||||||
for _, subsys := range mng.subsys {
|
for _, subsys := range mng.subsys {
|
||||||
subsys.Lock()
|
subsys.Lock()
|
||||||
if !subsys.KeyIsSet() {
|
if !subsys.KeyIsSet() {
|
||||||
subsys.SetKey(dbName + ":subsystems/" + subsys.ID)
|
subsys.SetKey(dbName + ":subsystems/" + subsys.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if strings.HasPrefix(subsys.DatabaseKey(), keyOrPrefix) {
|
if strings.HasPrefix(subsys.DatabaseKey(), keyOrPrefix) {
|
||||||
records = append(records, subsys)
|
records = append(records, subsys)
|
||||||
}
|
}
|
||||||
|
@ -152,6 +150,8 @@ func (mng *Manager) Register(id, name, description string, module *modules.Modul
|
||||||
toggleOption: option,
|
toggleOption: option,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.CreateMeta()
|
||||||
|
|
||||||
if s.toggleOption != nil {
|
if s.toggleOption != nil {
|
||||||
s.ToggleOptionKey = s.toggleOption.Key
|
s.ToggleOptionKey = s.toggleOption.Key
|
||||||
s.ExpertiseLevel = s.toggleOption.ExpertiseLevel
|
s.ExpertiseLevel = s.toggleOption.ExpertiseLevel
|
||||||
|
|
|
@ -352,6 +352,8 @@ func (t *Task) executeWithLocking() {
|
||||||
// notify that we finished
|
// notify that we finished
|
||||||
t.cancelCtx()
|
t.cancelCtx()
|
||||||
// refresh context
|
// refresh context
|
||||||
|
|
||||||
|
// RACE CONDITION with L314!
|
||||||
t.ctx, t.cancelCtx = context.WithCancel(t.module.Ctx)
|
t.ctx, t.cancelCtx = context.WithCancel(t.module.Ctx)
|
||||||
|
|
||||||
t.lock.Unlock()
|
t.lock.Unlock()
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/safing/portbase/database/query"
|
"github.com/safing/portbase/database/query"
|
||||||
"github.com/safing/portbase/database/record"
|
"github.com/safing/portbase/database/record"
|
||||||
"github.com/safing/portbase/database/storage"
|
"github.com/safing/portbase/database/storage"
|
||||||
|
"github.com/safing/portbase/log"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -121,10 +122,12 @@ func (r *Registry) Register(keyOrPrefix string, p ValueProvider) (PushFunc, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
r.providers.Insert(keyOrPrefix, &keyedValueProvider{
|
r.providers.Insert(keyOrPrefix, &keyedValueProvider{
|
||||||
ValueProvider: p,
|
ValueProvider: TraceProvider(p),
|
||||||
key: keyOrPrefix,
|
key: keyOrPrefix,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
log.Tracef("runtime: registered new provider at %s", keyOrPrefix)
|
||||||
|
|
||||||
return func(records ...record.Record) {
|
return func(records ...record.Record) {
|
||||||
r.l.RLock()
|
r.l.RLock()
|
||||||
defer r.l.RUnlock()
|
defer r.l.RUnlock()
|
||||||
|
@ -149,6 +152,12 @@ func (r *Registry) Get(key string) (record.Record, error) {
|
||||||
|
|
||||||
records, err := provider.Get(key)
|
records, err := provider.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// instead of returning ErrWriteOnly to the database interface
|
||||||
|
// we wrap it in ErrNotFound so the records effectively gets
|
||||||
|
// hidden.
|
||||||
|
if errors.Is(err, ErrWriteOnly) {
|
||||||
|
return nil, database.ErrNotFound
|
||||||
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,6 +219,9 @@ func (r *Registry) Query(q *query.Query, local, internal bool) (*iterator.Iterat
|
||||||
|
|
||||||
records, err := p.Get(key)
|
records, err := p.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.Is(err, ErrWriteOnly) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,6 +240,7 @@ func (r *Registry) Query(q *query.Query, local, internal bool) (*iterator.Iterat
|
||||||
r.Unlock()
|
r.Unlock()
|
||||||
|
|
||||||
if !allowed {
|
if !allowed {
|
||||||
|
log.Tracef("runtime: not sending %s for query %s. matchesKey=%v isValid=%v isAllowed=%v", r.DatabaseKey(), searchPrefix, matchesKey, isValid, isAllowed)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,6 +298,21 @@ func (r *Registry) collectProviderByPrefix(prefix string) []*keyedValueProvider
|
||||||
return providers
|
return providers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetRegistrationKeys returns a list of all provider registration
|
||||||
|
// keys or prefixes.
|
||||||
|
func (r *Registry) GetRegistrationKeys() []string {
|
||||||
|
r.l.RLock()
|
||||||
|
defer r.l.RUnlock()
|
||||||
|
|
||||||
|
var keys []string
|
||||||
|
|
||||||
|
r.providers.Walk(func(key string, p interface{}) bool {
|
||||||
|
keys = append(keys, key)
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
return keys
|
||||||
|
}
|
||||||
|
|
||||||
// asStorage returns a storage.Interface compatible struct
|
// asStorage returns a storage.Interface compatible struct
|
||||||
// that is backed by r.
|
// that is backed by r.
|
||||||
func (r *Registry) asStorage() storage.Interface {
|
func (r *Registry) asStorage() storage.Interface {
|
||||||
|
|
|
@ -28,3 +28,5 @@ func (sw *storageWrapper) Put(r record.Record) (record.Record, error) {
|
||||||
func (sw *storageWrapper) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
|
func (sw *storageWrapper) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
|
||||||
return sw.Registry.Query(q, local, internal)
|
return sw.Registry.Query(q, local, internal)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sw *storageWrapper) ReadOnly() bool { return false }
|
||||||
|
|
|
@ -22,7 +22,7 @@ func TraceProvider(vp ValueProvider) ValueProvider {
|
||||||
|
|
||||||
func (tvp *traceValueProvider) Set(r record.Record) (res record.Record, err error) {
|
func (tvp *traceValueProvider) Set(r record.Record) (res record.Record, err error) {
|
||||||
defer func(start time.Time) {
|
defer func(start time.Time) {
|
||||||
log.Tracef("runtime: settings record %q: duration=%s err=%v", r.Key(), time.Since(start), err)
|
log.Tracef("runtime: setting record %q: duration=%s err=%v", r.Key(), time.Since(start), err)
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
return tvp.ValueProvider.Set(r)
|
return tvp.ValueProvider.Set(r)
|
||||||
|
|
Loading…
Add table
Reference in a new issue