Fix locking in PutMany implementations

This commit is contained in:
Daniel 2020-10-12 13:54:27 +02:00
parent e5a72ffa37
commit 6a58ce5a7a
2 changed files with 27 additions and 22 deletions

View file

@ -115,26 +115,9 @@ func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error)
err := b.db.Batch(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
for r := range batch {
if !shadowDelete && r.Meta().IsDeleted() {
// Immediate delete.
txErr := bucket.Delete([]byte(r.DatabaseKey()))
if txErr != nil {
return txErr
}
} else {
// Put or shadow delete.
// marshal
data, txErr := r.MarshalRecord(r)
if txErr != nil {
return txErr
}
// put
txErr = bucket.Put([]byte(r.DatabaseKey()), data)
if txErr != nil {
return txErr
}
txErr := b.batchPutOrDelete(bucket, shadowDelete, r)
if txErr != nil {
return txErr
}
}
return nil
@ -145,6 +128,25 @@ func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error)
return batch, errs
}
func (b *BBolt) batchPutOrDelete(bucket *bbolt.Bucket, shadowDelete bool, r record.Record) (err error) {
r.Lock()
defer r.Unlock()
if !shadowDelete && r.Meta().IsDeleted() {
// Immediate delete.
err = bucket.Delete([]byte(r.DatabaseKey()))
} else {
// Put or shadow delete.
var data []byte
data, err = r.MarshalRecord(r)
if err == nil {
err = bucket.Put([]byte(r.DatabaseKey()), data)
}
}
return err
}
// Delete deletes a record from the database.
func (b *BBolt) Delete(key string) error {
err := b.db.Update(func(tx *bbolt.Tx) error {

View file

@ -66,7 +66,7 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro
// start handler
go func() {
for r := range batch {
hm.putOrDelete(shadowDelete, r)
hm.batchPutOrDelete(shadowDelete, r)
}
errs <- nil
}()
@ -74,7 +74,10 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro
return batch, errs
}
func (hm *HashMap) putOrDelete(shadowDelete bool, r record.Record) {
func (hm *HashMap) batchPutOrDelete(shadowDelete bool, r record.Record) {
r.Lock()
defer r.Unlock()
hm.dbLock.Lock()
defer hm.dbLock.Unlock()