Fix race condition in database package. Simplify db locking

This commit is contained in:
Patrick Pacher 2020-09-21 15:18:54 +02:00
parent 93a0b27ea2
commit 50a10485e1
No known key found for this signature in database
GPG key ID: E8CD2DA160925A6D
3 changed files with 34 additions and 46 deletions

View file

@ -21,12 +21,7 @@ type Controller struct {
hooks []*RegisteredHook hooks []*RegisteredHook
subscriptions []*Subscription subscriptions []*Subscription
writeLock sync.RWMutex exclusiveAccess sync.RWMutex
// Lock: nobody may write
// RLock: concurrent writing
readLock sync.RWMutex
// Lock: nobody may read
// RLock: concurrent reading
migrating *abool.AtomicBool // TODO migrating *abool.AtomicBool // TODO
hibernating *abool.AtomicBool // TODO hibernating *abool.AtomicBool // TODO
@ -53,8 +48,8 @@ func (c *Controller) Injected() bool {
// Get return the record with the given key. // Get return the record with the given key.
func (c *Controller) Get(key string) (record.Record, error) { func (c *Controller) Get(key string) (record.Record, error) {
c.readLock.RLock() c.exclusiveAccess.RLock()
defer c.readLock.RUnlock() defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
return nil, ErrShuttingDown return nil, ErrShuttingDown
@ -101,8 +96,8 @@ func (c *Controller) Get(key string) (record.Record, error) {
// Put saves a record in the database. // Put saves a record in the database.
func (c *Controller) Put(r record.Record) (err error) { func (c *Controller) Put(r record.Record) (err error) {
c.writeLock.RLock() c.exclusiveAccess.RLock()
defer c.writeLock.RUnlock() defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
return ErrShuttingDown return ErrShuttingDown
@ -145,8 +140,8 @@ func (c *Controller) Put(r record.Record) (err error) {
// PutMany stores many records in the database. // PutMany stores many records in the database.
func (c *Controller) PutMany() (chan<- record.Record, <-chan error) { func (c *Controller) PutMany() (chan<- record.Record, <-chan error) {
c.writeLock.RLock() c.exclusiveAccess.RLock()
defer c.writeLock.RUnlock() defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
errs := make(chan error, 1) errs := make(chan error, 1)
@ -171,16 +166,16 @@ func (c *Controller) PutMany() (chan<- record.Record, <-chan error) {
// Query executes the given query on the database. // Query executes the given query on the database.
func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
c.readLock.RLock() c.exclusiveAccess.RLock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
c.readLock.RUnlock() c.exclusiveAccess.RUnlock()
return nil, ErrShuttingDown return nil, ErrShuttingDown
} }
it, err := c.storage.Query(q, local, internal) it, err := c.storage.Query(q, local, internal)
if err != nil { if err != nil {
c.readLock.RUnlock() c.exclusiveAccess.RUnlock()
return nil, err return nil, err
} }
@ -191,15 +186,19 @@ func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iter
// PushUpdate pushes a record update to subscribers. // PushUpdate pushes a record update to subscribers.
func (c *Controller) PushUpdate(r record.Record) { func (c *Controller) PushUpdate(r record.Record) {
if c != nil { if c != nil {
c.readLock.RLock() c.exclusiveAccess.RLock()
defer c.readLock.RUnlock() defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
return return
} }
for _, sub := range c.subscriptions { for _, sub := range c.subscriptions {
if r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r) { r.Lock()
push := r.Meta().CheckPermission(sub.local, sub.internal) && sub.q.Matches(r)
r.Unlock()
if push {
select { select {
case sub.Feed <- r: case sub.Feed <- r:
default: default:
@ -210,10 +209,8 @@ func (c *Controller) PushUpdate(r record.Record) {
} }
func (c *Controller) addSubscription(sub *Subscription) { func (c *Controller) addSubscription(sub *Subscription) {
c.readLock.Lock() c.exclusiveAccess.Lock()
defer c.readLock.Unlock() defer c.exclusiveAccess.Unlock()
c.writeLock.Lock()
defer c.writeLock.Unlock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
return return
@ -223,14 +220,14 @@ func (c *Controller) addSubscription(sub *Subscription) {
} }
func (c *Controller) readUnlockerAfterQuery(it *iterator.Iterator) { func (c *Controller) readUnlockerAfterQuery(it *iterator.Iterator) {
defer c.exclusiveAccess.RUnlock()
<-it.Done <-it.Done
c.readLock.RUnlock()
} }
// Maintain runs the Maintain method on the storage. // Maintain runs the Maintain method on the storage.
func (c *Controller) Maintain(ctx context.Context) error { func (c *Controller) Maintain(ctx context.Context) error {
c.writeLock.RLock() c.exclusiveAccess.RLock()
defer c.writeLock.RUnlock() defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
return nil return nil
@ -241,8 +238,8 @@ func (c *Controller) Maintain(ctx context.Context) error {
// MaintainThorough runs the MaintainThorough method on the storage. // MaintainThorough runs the MaintainThorough method on the storage.
func (c *Controller) MaintainThorough(ctx context.Context) error { func (c *Controller) MaintainThorough(ctx context.Context) error {
c.writeLock.RLock() c.exclusiveAccess.RLock()
defer c.writeLock.RUnlock() defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
return nil return nil
@ -253,8 +250,8 @@ func (c *Controller) MaintainThorough(ctx context.Context) error {
// MaintainRecordStates runs the record state lifecycle maintenance on the storage. // MaintainRecordStates runs the record state lifecycle maintenance on the storage.
func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error { func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefore time.Time) error {
c.writeLock.RLock() c.exclusiveAccess.RLock()
defer c.writeLock.RUnlock() defer c.exclusiveAccess.RUnlock()
if shuttingDown.IsSet() { if shuttingDown.IsSet() {
return nil return nil
@ -265,11 +262,8 @@ func (c *Controller) MaintainRecordStates(ctx context.Context, purgeDeletedBefor
// Shutdown shuts down the storage. // Shutdown shuts down the storage.
func (c *Controller) Shutdown() error { func (c *Controller) Shutdown() error {
// acquire full locks c.exclusiveAccess.Lock()
c.readLock.Lock() defer c.exclusiveAccess.Unlock()
defer c.readLock.Unlock()
c.writeLock.Lock()
defer c.writeLock.Unlock()
return c.storage.Shutdown() return c.storage.Shutdown()
} }

View file

@ -35,10 +35,8 @@ func RegisterHook(q *query.Query, hook Hook) (*RegisteredHook, error) {
return nil, err return nil, err
} }
c.readLock.Lock() c.exclusiveAccess.Lock()
defer c.readLock.Unlock() defer c.exclusiveAccess.Unlock()
c.writeLock.Lock()
defer c.writeLock.Unlock()
rh := &RegisteredHook{ rh := &RegisteredHook{
q: q, q: q,
@ -55,10 +53,8 @@ func (h *RegisteredHook) Cancel() error {
return err return err
} }
c.readLock.Lock() c.exclusiveAccess.Lock()
defer c.readLock.Unlock() defer c.exclusiveAccess.Unlock()
c.writeLock.Lock()
defer c.writeLock.Unlock()
for key, hook := range c.hooks { for key, hook := range c.hooks {
if hook.q == h.q { if hook.q == h.q {

View file

@ -22,10 +22,8 @@ func (s *Subscription) Cancel() error {
return err return err
} }
c.readLock.Lock() c.exclusiveAccess.Lock()
defer c.readLock.Unlock() defer c.exclusiveAccess.Unlock()
c.writeLock.Lock()
defer c.writeLock.Unlock()
if s.canceled { if s.canceled {
return nil return nil