Improve profile getting and updating to fix interference

This commit is contained in:
Daniel 2021-12-03 10:22:17 +01:00
parent 8ce3bb1cfc
commit 4fb3bf0645
4 changed files with 118 additions and 107 deletions

View file

@ -7,8 +7,8 @@ import (
)
const (
activeProfileCleanerTickDuration = 1 * time.Minute
activeProfileCleanerThreshold = 5 * time.Minute
activeProfileCleanerTickDuration = 5 * time.Minute
activeProfileCleanerThreshold = 1 * time.Hour
)
var (

View file

@ -6,6 +6,7 @@ import (
"strings"
"github.com/safing/portbase/config"
"github.com/safing/portbase/log"
"github.com/safing/portbase/database"
"github.com/safing/portbase/database/query"
@ -41,24 +42,37 @@ func registerValidationDBHook() (err error) {
}
func startProfileUpdateChecker() error {
profilesSub, err := profileDB.Subscribe(query.New(profilesDBPath))
if err != nil {
return err
}
module.StartServiceWorker("update active profiles", 0, func(ctx context.Context) (err error) {
profilesSub, err := profileDB.Subscribe(query.New(profilesDBPath))
if err != nil {
return err
}
defer func() {
err := profilesSub.Cancel()
if err != nil {
log.Warningf("profile: failed to cancel subscription for updating active profiles: %s", err)
}
}()
for {
select {
case r := <-profilesSub.Feed:
// check if nil
// Check if subscription was canceled.
if r == nil {
return errors.New("subscription canceled")
}
// mark as outdated
// Don't mark profiles as outdated that are saved internally, as
// profiles only exist once in memory.
p, ok := r.(*Profile)
if ok && p.savedInternally {
return
}
// Mark profile as outdated.
markActiveProfileAsOutdated(strings.TrimPrefix(r.Key(), profilesDBPath))
case <-ctx.Done():
return profilesSub.Cancel()
return nil
}
}
})

View file

@ -2,16 +2,16 @@ package profile
import (
"errors"
"sync"
"github.com/safing/portbase/database"
"github.com/safing/portbase/database/query"
"github.com/safing/portbase/database/record"
"github.com/safing/portbase/log"
"golang.org/x/sync/singleflight"
)
var getProfileSingleInflight singleflight.Group
var getProfileLock sync.Mutex
// GetProfile fetches a profile. This function ensures that the loaded profile
// is shared among all callers. You must always supply both the scopedID and
@ -22,102 +22,92 @@ func GetProfile(source profileSource, id, linkedPath string) ( //nolint:gocognit
profile *Profile,
err error,
) {
// Select correct key for single in flight.
singleInflightKey := linkedPath
if singleInflightKey == "" {
singleInflightKey = makeScopedID(source, id)
// Globally lock getting a profile.
// This does not happen too often, and it ensures we really have integrity
// and no race conditions.
getProfileLock.Lock()
defer getProfileLock.Unlock()
var previousVersion *Profile
// Fetch profile depending on the available information.
switch {
case id != "":
scopedID := makeScopedID(source, id)
// Get profile via the scoped ID.
// Check if there already is an active and not outdated profile.
profile = getActiveProfile(scopedID)
if profile != nil {
profile.MarkStillActive()
if profile.outdated.IsSet() {
previousVersion = profile
} else {
return profile, nil
}
}
// Get from database.
profile, err = getProfile(scopedID)
// Check if the request is for a special profile that may need a reset.
if err == nil && specialProfileNeedsReset(profile) {
// Trigger creation of special profile.
err = database.ErrNotFound
}
// If we cannot find a profile, check if the request is for a special
// profile we can create.
if errors.Is(err, database.ErrNotFound) {
profile = getSpecialProfile(id, linkedPath)
if profile != nil {
err = nil
}
}
case linkedPath != "":
// Search for profile via a linked path.
// Check if there already is an active and not outdated profile for
// the linked path.
profile = findActiveProfile(linkedPath)
if profile != nil {
if profile.outdated.IsSet() {
previousVersion = profile
} else {
return profile, nil
}
}
// Get from database.
profile, err = findProfile(linkedPath)
default:
return nil, errors.New("cannot fetch profile without ID or path")
}
p, err, _ := getProfileSingleInflight.Do(singleInflightKey, func() (interface{}, error) {
var previousVersion *Profile
// Fetch profile depending on the available information.
switch {
case id != "":
scopedID := makeScopedID(source, id)
// Get profile via the scoped ID.
// Check if there already is an active and not outdated profile.
profile = getActiveProfile(scopedID)
if profile != nil {
profile.MarkStillActive()
if profile.outdated.IsSet() {
previousVersion = profile
} else {
return profile, nil
}
}
// Get from database.
profile, err = getProfile(scopedID)
// Check if the request is for a special profile that may need a reset.
if err == nil && specialProfileNeedsReset(profile) {
// Trigger creation of special profile.
err = database.ErrNotFound
}
// If we cannot find a profile, check if the request is for a special
// profile we can create.
if errors.Is(err, database.ErrNotFound) {
profile = getSpecialProfile(id, linkedPath)
if profile != nil {
err = nil
}
}
case linkedPath != "":
// Search for profile via a linked path.
// Check if there already is an active and not outdated profile for
// the linked path.
profile = findActiveProfile(linkedPath)
if profile != nil {
if profile.outdated.IsSet() {
previousVersion = profile
} else {
return profile, nil
}
}
// Get from database.
profile, err = findProfile(linkedPath)
default:
return nil, errors.New("cannot fetch profile without ID or path")
}
if err != nil {
return nil, err
}
// Process profiles coming directly from the database.
// As we don't use any caching, these will be new objects.
// Add a layeredProfile to local and network profiles.
if profile.Source == SourceLocal || profile.Source == SourceNetwork {
// If we are refetching, assign the layered profile from the previous version.
if previousVersion != nil {
profile.layeredProfile = previousVersion.layeredProfile
}
// Local profiles must have a layered profile, create a new one if it
// does not yet exist.
if profile.layeredProfile == nil {
profile.layeredProfile = NewLayeredProfile(profile)
}
}
// Add the profile to the currently active profiles.
addActiveProfile(profile)
return profile, nil
})
if err != nil {
return nil, err
}
if p == nil {
return nil, errors.New("profile getter returned nil")
// Process profiles coming directly from the database.
// As we don't use any caching, these will be new objects.
// Add a layeredProfile to local and network profiles.
if profile.Source == SourceLocal || profile.Source == SourceNetwork {
// If we are refetching, assign the layered profile from the previous version.
if previousVersion != nil {
profile.layeredProfile = previousVersion.layeredProfile
}
// Local profiles must have a layered profile, create a new one if it
// does not yet exist.
if profile.layeredProfile == nil {
profile.layeredProfile = NewLayeredProfile(profile)
}
}
return p.(*Profile), nil
// Add the profile to the currently active profiles.
addActiveProfile(profile)
return profile, nil
}
// getProfile fetches the profile for the given scoped ID.
@ -181,6 +171,9 @@ func prepProfile(r record.Record) (*Profile, error) {
log.Errorf("profiles: profile %s has (partly) invalid configuration: %s", profile.ID, err)
}
// Set saved internally to suppress outdating profiles if saving internally.
profile.savedInternally = true
// return parsed profile
return profile, nil
}

View file

@ -140,6 +140,9 @@ type Profile struct { //nolint:maligned // not worth the effort
// Lifecycle Management
outdated *abool.AtomicBool
lastActive *int64
// savedInternally is set to true for profiles that are saved internally.
savedInternally bool
}
func (profile *Profile) prepConfig() (err error) {
@ -226,11 +229,12 @@ func New(
}
profile := &Profile{
ID: id,
Source: source,
LinkedPath: linkedPath,
Created: time.Now().Unix(),
Config: customConfig,
ID: id,
Source: source,
LinkedPath: linkedPath,
Created: time.Now().Unix(),
Config: customConfig,
savedInternally: true,
}
// Generate random ID if none is given.