diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index 365888c..5291e0c 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -115,26 +115,9 @@ func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) err := b.db.Batch(func(tx *bbolt.Tx) error { bucket := tx.Bucket(bucketName) for r := range batch { - if !shadowDelete && r.Meta().IsDeleted() { - // Immediate delete. - txErr := bucket.Delete([]byte(r.DatabaseKey())) - if txErr != nil { - return txErr - } - } else { - // Put or shadow delete. - - // marshal - data, txErr := r.MarshalRecord(r) - if txErr != nil { - return txErr - } - - // put - txErr = bucket.Put([]byte(r.DatabaseKey()), data) - if txErr != nil { - return txErr - } + txErr := b.batchPutOrDelete(bucket, shadowDelete, r) + if txErr != nil { + return txErr } } return nil @@ -145,6 +128,25 @@ func (b *BBolt) PutMany(shadowDelete bool) (chan<- record.Record, <-chan error) return batch, errs } +func (b *BBolt) batchPutOrDelete(bucket *bbolt.Bucket, shadowDelete bool, r record.Record) (err error) { + r.Lock() + defer r.Unlock() + + if !shadowDelete && r.Meta().IsDeleted() { + // Immediate delete. + err = bucket.Delete([]byte(r.DatabaseKey())) + } else { + // Put or shadow delete. + var data []byte + data, err = r.MarshalRecord(r) + if err == nil { + err = bucket.Put([]byte(r.DatabaseKey()), data) + } + } + + return err +} + // Delete deletes a record from the database. func (b *BBolt) Delete(key string) error { err := b.db.Update(func(tx *bbolt.Tx) error { diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go index 53fa0d9..2d08d4d 100644 --- a/database/storage/hashmap/map.go +++ b/database/storage/hashmap/map.go @@ -66,7 +66,7 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro // start handler go func() { for r := range batch { - hm.putOrDelete(shadowDelete, r) + hm.batchPutOrDelete(shadowDelete, r) } errs <- nil }() @@ -74,7 +74,10 @@ func (hm *HashMap) PutMany(shadowDelete bool) (chan<- record.Record, <-chan erro return batch, errs } -func (hm *HashMap) putOrDelete(shadowDelete bool, r record.Record) { +func (hm *HashMap) batchPutOrDelete(shadowDelete bool, r record.Record) { + r.Lock() + defer r.Unlock() + hm.dbLock.Lock() defer hm.dbLock.Unlock()