mirror of
https://github.com/safing/portbase
synced 2025-09-04 03:29:59 +00:00
Merge pull request #87 from safing/fix/multiple-races
Fix locking for database push-updates and config system
This commit is contained in:
commit
458d4e7f15
16 changed files with 123 additions and 96 deletions
|
@ -24,11 +24,8 @@ type StorageInterface struct {
|
||||||
|
|
||||||
// Get returns a database record.
|
// Get returns a database record.
|
||||||
func (s *StorageInterface) Get(key string) (record.Record, error) {
|
func (s *StorageInterface) Get(key string) (record.Record, error) {
|
||||||
optionsLock.Lock()
|
opt, err := GetOption(key)
|
||||||
defer optionsLock.Unlock()
|
if err != nil {
|
||||||
|
|
||||||
opt, ok := options[key]
|
|
||||||
if !ok {
|
|
||||||
return nil, storage.ErrNotFound
|
return nil, storage.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,11 +52,9 @@ func (s *StorageInterface) Put(r record.Record) (record.Record, error) {
|
||||||
return s.Get(r.DatabaseKey())
|
return s.Get(r.DatabaseKey())
|
||||||
}
|
}
|
||||||
|
|
||||||
optionsLock.RLock()
|
option, err := GetOption(r.DatabaseKey())
|
||||||
option, ok := options[r.DatabaseKey()]
|
if err != nil {
|
||||||
optionsLock.RUnlock()
|
return nil, err
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("config option does not exist")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var value interface{}
|
var value interface{}
|
||||||
|
@ -77,8 +72,7 @@ func (s *StorageInterface) Put(r record.Record) (record.Record, error) {
|
||||||
return nil, errors.New("received invalid value in \"Value\"")
|
return nil, errors.New("received invalid value in \"Value\"")
|
||||||
}
|
}
|
||||||
|
|
||||||
err := setConfigOption(r.DatabaseKey(), value, false)
|
if err := setConfigOption(r.DatabaseKey(), value, false); err != nil {
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return option.Export()
|
return option.Export()
|
||||||
|
@ -91,9 +85,8 @@ func (s *StorageInterface) Delete(key string) error {
|
||||||
|
|
||||||
// Query returns a an iterator for the supplied query.
|
// Query returns a an iterator for the supplied query.
|
||||||
func (s *StorageInterface) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
|
func (s *StorageInterface) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
|
||||||
|
optionsLock.RLock()
|
||||||
optionsLock.Lock()
|
defer optionsLock.RUnlock()
|
||||||
defer optionsLock.Unlock()
|
|
||||||
|
|
||||||
it := iterator.New()
|
it := iterator.New()
|
||||||
var opts []*Option
|
var opts []*Option
|
||||||
|
@ -109,7 +102,6 @@ func (s *StorageInterface) Query(q *query.Query, local, internal bool) (*iterato
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageInterface) processQuery(it *iterator.Iterator, opts []*Option) {
|
func (s *StorageInterface) processQuery(it *iterator.Iterator, opts []*Option) {
|
||||||
|
|
||||||
sort.Sort(sortByKey(opts))
|
sort.Sort(sortByKey(opts))
|
||||||
|
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
|
@ -148,17 +140,27 @@ func registerAsDatabase() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func pushFullUpdate() {
|
// handleOptionUpdate updates the expertise and release level options,
|
||||||
optionsLock.RLock()
|
// if required, and eventually pushes a update for the option.
|
||||||
defer optionsLock.RUnlock()
|
// The caller must hold the option lock.
|
||||||
|
func handleOptionUpdate(option *Option, push bool) {
|
||||||
|
if expertiseLevelOptionFlag.IsSet() && option == expertiseLevelOption {
|
||||||
|
updateExpertiseLevel()
|
||||||
|
}
|
||||||
|
|
||||||
for _, option := range options {
|
if releaseLevelOptionFlag.IsSet() && option == releaseLevelOption {
|
||||||
|
updateReleaseLevel()
|
||||||
|
}
|
||||||
|
|
||||||
|
if push {
|
||||||
pushUpdate(option)
|
pushUpdate(option)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pushUpdate pushes an database update notification for option.
|
||||||
|
// The caller must hold the option lock.
|
||||||
func pushUpdate(option *Option) {
|
func pushUpdate(option *Option) {
|
||||||
r, err := option.Export()
|
r, err := option.export()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to export option to push update: %s", err)
|
log.Errorf("failed to export option to push update: %s", err)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -78,10 +78,6 @@ func registerExpertiseLevelOption() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateExpertiseLevel() {
|
func updateExpertiseLevel() {
|
||||||
// check if already registered
|
|
||||||
if !expertiseLevelOptionFlag.IsSet() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// get value
|
// get value
|
||||||
value := expertiseLevelOption.activeFallbackValue
|
value := expertiseLevelOption.activeFallbackValue
|
||||||
if expertiseLevelOption.activeValue != nil {
|
if expertiseLevelOption.activeValue != nil {
|
||||||
|
|
|
@ -18,23 +18,21 @@ type (
|
||||||
func getValueCache(name string, option *Option, requestedType OptionType) (*Option, *valueCache) {
|
func getValueCache(name string, option *Option, requestedType OptionType) (*Option, *valueCache) {
|
||||||
// get option
|
// get option
|
||||||
if option == nil {
|
if option == nil {
|
||||||
var ok bool
|
var err error
|
||||||
optionsLock.RLock()
|
option, err = GetOption(name)
|
||||||
option, ok = options[name]
|
if err != nil {
|
||||||
optionsLock.RUnlock()
|
|
||||||
if !ok {
|
|
||||||
log.Errorf("config: request for unregistered option: %s", name)
|
log.Errorf("config: request for unregistered option: %s", name)
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check type
|
// Check the option type, no locking required as
|
||||||
|
// OptType is immutable once it is set
|
||||||
if requestedType != option.OptType {
|
if requestedType != option.OptType {
|
||||||
log.Errorf("config: bad type: requested %s as %s, but is %s", name, getTypeName(requestedType), getTypeName(option.OptType))
|
log.Errorf("config: bad type: requested %s as %s, but is %s", name, getTypeName(requestedType), getTypeName(option.OptType))
|
||||||
return option, nil
|
return option, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// lock option
|
|
||||||
option.Lock()
|
option.Lock()
|
||||||
defer option.Unlock()
|
defer option.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -7,22 +7,22 @@ import (
|
||||||
"github.com/safing/portbase/log"
|
"github.com/safing/portbase/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
func parseAndSetConfig(jsonData string) error {
|
func parseAndReplaceConfig(jsonData string) error {
|
||||||
m, err := JSONToMap([]byte(jsonData))
|
m, err := JSONToMap([]byte(jsonData))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return setConfig(m)
|
return replaceConfig(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseAndSetDefaultConfig(jsonData string) error {
|
func parseAndReplaceDefaultConfig(jsonData string) error {
|
||||||
m, err := JSONToMap([]byte(jsonData))
|
m, err := JSONToMap([]byte(jsonData))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return SetDefaultConfig(m)
|
return replaceDefaultConfig(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func quickRegister(t *testing.T, key string, optType OptionType, defaultValue interface{}) {
|
func quickRegister(t *testing.T, key string, optType OptionType, defaultValue interface{}) {
|
||||||
|
@ -55,7 +55,7 @@ func TestGet(t *testing.T) { //nolint:gocognit
|
||||||
quickRegister(t, "hot", OptTypeBool, false)
|
quickRegister(t, "hot", OptTypeBool, false)
|
||||||
quickRegister(t, "cold", OptTypeBool, true)
|
quickRegister(t, "cold", OptTypeBool, true)
|
||||||
|
|
||||||
err = parseAndSetConfig(`
|
err = parseAndReplaceConfig(`
|
||||||
{
|
{
|
||||||
"monkey": "a",
|
"monkey": "a",
|
||||||
"zebras": {
|
"zebras": {
|
||||||
|
@ -70,7 +70,7 @@ func TestGet(t *testing.T) { //nolint:gocognit
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = parseAndSetDefaultConfig(`
|
err = parseAndReplaceDefaultConfig(`
|
||||||
{
|
{
|
||||||
"monkey": "b",
|
"monkey": "b",
|
||||||
"snake": "0",
|
"snake": "0",
|
||||||
|
@ -106,7 +106,7 @@ func TestGet(t *testing.T) { //nolint:gocognit
|
||||||
t.Errorf("cold should be false, is %v", cold())
|
t.Errorf("cold should be false, is %v", cold())
|
||||||
}
|
}
|
||||||
|
|
||||||
err = parseAndSetConfig(`
|
err = parseAndReplaceConfig(`
|
||||||
{
|
{
|
||||||
"monkey": "3"
|
"monkey": "3"
|
||||||
}
|
}
|
||||||
|
@ -284,7 +284,7 @@ func BenchmarkGetAsStringCached(b *testing.B) {
|
||||||
options = make(map[string]*Option)
|
options = make(map[string]*Option)
|
||||||
|
|
||||||
// Setup
|
// Setup
|
||||||
err := parseAndSetConfig(`{
|
err := parseAndReplaceConfig(`{
|
||||||
"monkey": "banana"
|
"monkey": "banana"
|
||||||
}`)
|
}`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -303,7 +303,7 @@ func BenchmarkGetAsStringCached(b *testing.B) {
|
||||||
|
|
||||||
func BenchmarkGetAsStringRefetch(b *testing.B) {
|
func BenchmarkGetAsStringRefetch(b *testing.B) {
|
||||||
// Setup
|
// Setup
|
||||||
err := parseAndSetConfig(`{
|
err := parseAndReplaceConfig(`{
|
||||||
"monkey": "banana"
|
"monkey": "banana"
|
||||||
}`)
|
}`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -321,7 +321,7 @@ func BenchmarkGetAsStringRefetch(b *testing.B) {
|
||||||
|
|
||||||
func BenchmarkGetAsIntCached(b *testing.B) {
|
func BenchmarkGetAsIntCached(b *testing.B) {
|
||||||
// Setup
|
// Setup
|
||||||
err := parseAndSetConfig(`{
|
err := parseAndReplaceConfig(`{
|
||||||
"elephant": 1
|
"elephant": 1
|
||||||
}`)
|
}`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -340,7 +340,7 @@ func BenchmarkGetAsIntCached(b *testing.B) {
|
||||||
|
|
||||||
func BenchmarkGetAsIntRefetch(b *testing.B) {
|
func BenchmarkGetAsIntRefetch(b *testing.B) {
|
||||||
// Setup
|
// Setup
|
||||||
err := parseAndSetConfig(`{
|
err := parseAndReplaceConfig(`{
|
||||||
"elephant": 1
|
"elephant": 1
|
||||||
}`)
|
}`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -222,6 +222,10 @@ func (option *Option) Export() (record.Record, error) {
|
||||||
option.Lock()
|
option.Lock()
|
||||||
defer option.Unlock()
|
defer option.Unlock()
|
||||||
|
|
||||||
|
return option.export()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (option *Option) export() (record.Record, error) {
|
||||||
data, err := json.Marshal(option)
|
data, err := json.Marshal(option)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -31,11 +31,16 @@ func loadConfig() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// apply
|
return replaceConfig(newValues)
|
||||||
return setConfig(newValues)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// saveConfig saves the current configuration to file.
|
||||||
|
// It will acquire a read-lock on the global options registry
|
||||||
|
// lock and must lock each option!
|
||||||
func saveConfig() error {
|
func saveConfig() error {
|
||||||
|
optionsLock.RLock()
|
||||||
|
defer optionsLock.RUnlock()
|
||||||
|
|
||||||
// check if persistence is configured
|
// check if persistence is configured
|
||||||
if configFilePath == "" {
|
if configFilePath == "" {
|
||||||
return nil
|
return nil
|
||||||
|
@ -43,15 +48,18 @@ func saveConfig() error {
|
||||||
|
|
||||||
// extract values
|
// extract values
|
||||||
activeValues := make(map[string]interface{})
|
activeValues := make(map[string]interface{})
|
||||||
optionsLock.RLock()
|
|
||||||
for key, option := range options {
|
for key, option := range options {
|
||||||
|
// we cannot immedately unlock the option afger
|
||||||
|
// getData() because someone could lock and change it
|
||||||
|
// while we are marshaling the value (i.e. for string slices).
|
||||||
|
// We NEED to keep the option locks until we finsihed.
|
||||||
option.Lock()
|
option.Lock()
|
||||||
|
defer option.Unlock()
|
||||||
|
|
||||||
if option.activeValue != nil {
|
if option.activeValue != nil {
|
||||||
activeValues[key] = option.activeValue.getData(option)
|
activeValues[key] = option.activeValue.getData(option)
|
||||||
}
|
}
|
||||||
option.Unlock()
|
|
||||||
}
|
}
|
||||||
optionsLock.RUnlock()
|
|
||||||
|
|
||||||
// convert to JSON
|
// convert to JSON
|
||||||
data, err := MapToJSON(activeValues)
|
data, err := MapToJSON(activeValues)
|
||||||
|
|
|
@ -27,7 +27,7 @@ func NewPerspective(config map[string]interface{}) (*Perspective, error) {
|
||||||
var firstErr error
|
var firstErr error
|
||||||
var errCnt int
|
var errCnt int
|
||||||
|
|
||||||
optionsLock.Lock()
|
optionsLock.RLock()
|
||||||
optionsLoop:
|
optionsLoop:
|
||||||
for key, option := range options {
|
for key, option := range options {
|
||||||
// get option key from config
|
// get option key from config
|
||||||
|
@ -51,7 +51,7 @@ optionsLoop:
|
||||||
valueCache: valueCache,
|
valueCache: valueCache,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
optionsLock.Unlock()
|
optionsLock.RUnlock()
|
||||||
|
|
||||||
if firstErr != nil {
|
if firstErr != nil {
|
||||||
if errCnt > 0 {
|
if errCnt > 0 {
|
||||||
|
@ -68,10 +68,7 @@ func (p *Perspective) getPerspectiveValueCache(name string, requestedType Option
|
||||||
pOption, ok := p.config[name]
|
pOption, ok := p.config[name]
|
||||||
if !ok {
|
if !ok {
|
||||||
// check if option exists at all
|
// check if option exists at all
|
||||||
optionsLock.RLock()
|
if _, err := GetOption(name); err != nil {
|
||||||
_, ok = options[name]
|
|
||||||
optionsLock.RUnlock()
|
|
||||||
if !ok {
|
|
||||||
log.Errorf("config: request for unregistered option: %s", name)
|
log.Errorf("config: request for unregistered option: %s", name)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -18,8 +18,8 @@ var (
|
||||||
// iteration between multiple calles. ForEachOption does NOT lock
|
// iteration between multiple calles. ForEachOption does NOT lock
|
||||||
// opt when calling fn.
|
// opt when calling fn.
|
||||||
func ForEachOption(fn func(opt *Option) error) error {
|
func ForEachOption(fn func(opt *Option) error) error {
|
||||||
optionsLock.Lock()
|
optionsLock.RLock()
|
||||||
defer optionsLock.Unlock()
|
defer optionsLock.RUnlock()
|
||||||
|
|
||||||
for _, opt := range options {
|
for _, opt := range options {
|
||||||
if err := fn(opt); err != nil {
|
if err := fn(opt); err != nil {
|
||||||
|
@ -29,6 +29,20 @@ func ForEachOption(fn func(opt *Option) error) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetOption returns the option with name or an error
|
||||||
|
// if the option does not exist. The caller should lock
|
||||||
|
// the returned option itself for further processing.
|
||||||
|
func GetOption(name string) (*Option, error) {
|
||||||
|
optionsLock.RLock()
|
||||||
|
defer optionsLock.RUnlock()
|
||||||
|
|
||||||
|
opt, ok := options[name]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("option %q does not exist", name)
|
||||||
|
}
|
||||||
|
return opt, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Register registers a new configuration option.
|
// Register registers a new configuration option.
|
||||||
func Register(option *Option) error {
|
func Register(option *Option) error {
|
||||||
if option.Name == "" {
|
if option.Name == "" {
|
||||||
|
|
|
@ -76,10 +76,6 @@ func registerReleaseLevelOption() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateReleaseLevel() {
|
func updateReleaseLevel() {
|
||||||
// check if already registered
|
|
||||||
if !releaseLevelOptionFlag.IsSet() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// get value
|
// get value
|
||||||
value := releaseLevelOption.activeFallbackValue
|
value := releaseLevelOption.activeFallbackValue
|
||||||
if releaseLevelOption.activeValue != nil {
|
if releaseLevelOption.activeValue != nil {
|
||||||
|
|
|
@ -26,11 +26,9 @@ func getValidityFlag() *abool.AtomicBool {
|
||||||
return validityFlag
|
return validityFlag
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// signalChanges marks the configs validtityFlag as dirty and eventually
|
||||||
|
// triggers a config change event.
|
||||||
func signalChanges() {
|
func signalChanges() {
|
||||||
// refetch and save release level and expertise level
|
|
||||||
updateReleaseLevel()
|
|
||||||
updateExpertiseLevel()
|
|
||||||
|
|
||||||
// reset validity flag
|
// reset validity flag
|
||||||
validityFlagLock.Lock()
|
validityFlagLock.Lock()
|
||||||
validityFlag.SetTo(false)
|
validityFlag.SetTo(false)
|
||||||
|
@ -40,14 +38,20 @@ func signalChanges() {
|
||||||
module.TriggerEvent(configChangeEvent, nil)
|
module.TriggerEvent(configChangeEvent, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// setConfig sets the (prioritized) user defined config.
|
// replaceConfig sets the (prioritized) user defined config.
|
||||||
func setConfig(newValues map[string]interface{}) error {
|
func replaceConfig(newValues map[string]interface{}) error {
|
||||||
var firstErr error
|
var firstErr error
|
||||||
var errCnt int
|
var errCnt int
|
||||||
|
|
||||||
optionsLock.Lock()
|
// RLock the options because we are not adding or removing
|
||||||
|
// options from the registration but rather only update the
|
||||||
|
// options value which is guarded by the option's lock itself
|
||||||
|
optionsLock.RLock()
|
||||||
|
defer optionsLock.RUnlock()
|
||||||
|
|
||||||
for key, option := range options {
|
for key, option := range options {
|
||||||
newValue, ok := newValues[key]
|
newValue, ok := newValues[key]
|
||||||
|
|
||||||
option.Lock()
|
option.Lock()
|
||||||
option.activeValue = nil
|
option.activeValue = nil
|
||||||
if ok {
|
if ok {
|
||||||
|
@ -61,12 +65,12 @@ func setConfig(newValues map[string]interface{}) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handleOptionUpdate(option, true)
|
||||||
option.Unlock()
|
option.Unlock()
|
||||||
}
|
}
|
||||||
optionsLock.Unlock()
|
|
||||||
|
|
||||||
signalChanges()
|
signalChanges()
|
||||||
go pushFullUpdate()
|
|
||||||
|
|
||||||
if firstErr != nil {
|
if firstErr != nil {
|
||||||
if errCnt > 0 {
|
if errCnt > 0 {
|
||||||
|
@ -78,14 +82,20 @@ func setConfig(newValues map[string]interface{}) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetDefaultConfig sets the (fallback) default config.
|
// replaceDefaultConfig sets the (fallback) default config.
|
||||||
func SetDefaultConfig(newValues map[string]interface{}) error {
|
func replaceDefaultConfig(newValues map[string]interface{}) error {
|
||||||
var firstErr error
|
var firstErr error
|
||||||
var errCnt int
|
var errCnt int
|
||||||
|
|
||||||
optionsLock.Lock()
|
// RLock the options because we are not adding or removing
|
||||||
|
// options from the registration but rather only update the
|
||||||
|
// options value which is guarded by the option's lock itself
|
||||||
|
optionsLock.RLock()
|
||||||
|
defer optionsLock.RUnlock()
|
||||||
|
|
||||||
for key, option := range options {
|
for key, option := range options {
|
||||||
newValue, ok := newValues[key]
|
newValue, ok := newValues[key]
|
||||||
|
|
||||||
option.Lock()
|
option.Lock()
|
||||||
option.activeDefaultValue = nil
|
option.activeDefaultValue = nil
|
||||||
if ok {
|
if ok {
|
||||||
|
@ -99,12 +109,11 @@ func SetDefaultConfig(newValues map[string]interface{}) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
handleOptionUpdate(option, true)
|
||||||
option.Unlock()
|
option.Unlock()
|
||||||
}
|
}
|
||||||
optionsLock.Unlock()
|
|
||||||
|
|
||||||
signalChanges()
|
signalChanges()
|
||||||
go pushFullUpdate()
|
|
||||||
|
|
||||||
if firstErr != nil {
|
if firstErr != nil {
|
||||||
if errCnt > 0 {
|
if errCnt > 0 {
|
||||||
|
@ -122,11 +131,9 @@ func SetConfigOption(key string, value interface{}) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func setConfigOption(key string, value interface{}, push bool) (err error) {
|
func setConfigOption(key string, value interface{}, push bool) (err error) {
|
||||||
optionsLock.Lock()
|
option, err := GetOption(key)
|
||||||
option, ok := options[key]
|
if err != nil {
|
||||||
optionsLock.Unlock()
|
return err
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("config option %s does not exist", key)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
option.Lock()
|
option.Lock()
|
||||||
|
@ -139,16 +146,17 @@ func setConfigOption(key string, value interface{}, push bool) (err error) {
|
||||||
option.activeValue = valueCache
|
option.activeValue = valueCache
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handleOptionUpdate(option, push)
|
||||||
option.Unlock()
|
option.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// finalize change, activate triggers
|
// finalize change, activate triggers
|
||||||
signalChanges()
|
signalChanges()
|
||||||
if push {
|
|
||||||
go pushUpdate(option)
|
|
||||||
}
|
|
||||||
return saveConfig()
|
return saveConfig()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,11 +166,9 @@ func SetDefaultConfigOption(key string, value interface{}) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func setDefaultConfigOption(key string, value interface{}, push bool) (err error) {
|
func setDefaultConfigOption(key string, value interface{}, push bool) (err error) {
|
||||||
optionsLock.Lock()
|
option, err := GetOption(key)
|
||||||
option, ok := options[key]
|
if err != nil {
|
||||||
optionsLock.Unlock()
|
return err
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("config option %s does not exist", key)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
option.Lock()
|
option.Lock()
|
||||||
|
@ -175,15 +181,16 @@ func setDefaultConfigOption(key string, value interface{}, push bool) (err error
|
||||||
option.activeDefaultValue = valueCache
|
option.activeDefaultValue = valueCache
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handleOptionUpdate(option, push)
|
||||||
option.Unlock()
|
option.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// finalize change, activate triggers
|
// finalize change, activate triggers
|
||||||
signalChanges()
|
signalChanges()
|
||||||
if push {
|
|
||||||
go pushUpdate(option)
|
|
||||||
}
|
|
||||||
return saveConfig()
|
return saveConfig()
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,7 @@ func TestLayersGetters(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = setConfig(mapData)
|
err = replaceConfig(mapData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,6 +62,8 @@ func isAllowedPossibleValue(opt *Option, value interface{}) error {
|
||||||
return fmt.Errorf("value is not allowed")
|
return fmt.Errorf("value is not allowed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// validateValue ensures that value matches the expected type of option.
|
||||||
|
// It does not create a copy of the value!
|
||||||
func validateValue(option *Option, value interface{}) (*valueCache, error) { //nolint:gocyclo
|
func validateValue(option *Option, value interface{}) (*valueCache, error) { //nolint:gocyclo
|
||||||
if option.OptType != OptTypeStringArray {
|
if option.OptType != OptTypeStringArray {
|
||||||
if err := isAllowedPossibleValue(option, value); err != nil {
|
if err := isAllowedPossibleValue(option, value); err != nil {
|
||||||
|
|
|
@ -195,6 +195,8 @@ func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iter
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushUpdate pushes a record update to subscribers.
|
// PushUpdate pushes a record update to subscribers.
|
||||||
|
// The caller must hold the record's lock when calling
|
||||||
|
// PushUpdate.
|
||||||
func (c *Controller) PushUpdate(r record.Record) {
|
func (c *Controller) PushUpdate(r record.Record) {
|
||||||
if c != nil {
|
if c != nil {
|
||||||
c.exclusiveAccess.RLock()
|
c.exclusiveAccess.RLock()
|
||||||
|
@ -205,9 +207,7 @@ func (c *Controller) PushUpdate(r record.Record) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, sub := range c.subscriptions {
|
for _, sub := range c.subscriptions {
|
||||||
r.Lock()
|
|
||||||
push := r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r)
|
push := r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r)
|
||||||
r.Unlock()
|
|
||||||
|
|
||||||
if push {
|
if push {
|
||||||
select {
|
select {
|
||||||
|
|
|
@ -207,11 +207,12 @@ func (mng *Manager) handleModuleUpdate(m *modules.Module) {
|
||||||
}
|
}
|
||||||
|
|
||||||
subsys.Lock()
|
subsys.Lock()
|
||||||
|
defer subsys.Unlock()
|
||||||
|
|
||||||
updated := compareAndUpdateStatus(m, ms)
|
updated := compareAndUpdateStatus(m, ms)
|
||||||
if updated {
|
if updated {
|
||||||
subsys.makeSummary()
|
subsys.makeSummary()
|
||||||
}
|
}
|
||||||
subsys.Unlock()
|
|
||||||
|
|
||||||
if updated {
|
if updated {
|
||||||
mng.pushUpdate(subsys)
|
mng.pushUpdate(subsys)
|
||||||
|
|
|
@ -18,7 +18,9 @@ var (
|
||||||
type (
|
type (
|
||||||
// PushFunc is returned when registering a new value provider
|
// PushFunc is returned when registering a new value provider
|
||||||
// and can be used to inform the database system about the
|
// and can be used to inform the database system about the
|
||||||
// availability of a new runtime record value.
|
// availability of a new runtime record value. Similar to
|
||||||
|
// database.Controller.PushUpdate, the caller must hold
|
||||||
|
// the lock for each record passed to PushFunc.
|
||||||
PushFunc func(...record.Record)
|
PushFunc func(...record.Record)
|
||||||
|
|
||||||
// ValueProvider provides access to a runtime-computed
|
// ValueProvider provides access to a runtime-computed
|
||||||
|
|
|
@ -23,8 +23,8 @@ type singleRecordReader struct {
|
||||||
// pushUpdate, _ := runtime.Register("my/key", ProvideRecord(r))
|
// pushUpdate, _ := runtime.Register("my/key", ProvideRecord(r))
|
||||||
// r.Lock()
|
// r.Lock()
|
||||||
// r.Value = "foobar"
|
// r.Value = "foobar"
|
||||||
// r.Unlock()
|
|
||||||
// pushUpdate(r)
|
// pushUpdate(r)
|
||||||
|
// r.Unlock()
|
||||||
//
|
//
|
||||||
func ProvideRecord(r record.Record) ValueProvider {
|
func ProvideRecord(r record.Record) ValueProvider {
|
||||||
return &singleRecordReader{r}
|
return &singleRecordReader{r}
|
||||||
|
|
Loading…
Add table
Reference in a new issue