mirror of
https://github.com/safing/portbase
synced 2025-09-01 10:09:50 +00:00
Add PutMany, currently only for bbolt and hashmap storage backends
This commit is contained in:
parent
35ab2be6a0
commit
e0f96d5188
10 changed files with 246 additions and 12 deletions
|
@ -137,6 +137,28 @@ func (c *Controller) Put(r record.Record) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PutMany stores many records in the database.
|
||||||
|
func (c *Controller) PutMany() (batch chan record.Record, err chan error) {
|
||||||
|
c.writeLock.RLock()
|
||||||
|
defer c.writeLock.RUnlock()
|
||||||
|
|
||||||
|
if shuttingDown.IsSet() {
|
||||||
|
batch = make(chan record.Record)
|
||||||
|
err = make(chan error, 1)
|
||||||
|
err <- ErrShuttingDown
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.ReadOnly() {
|
||||||
|
batch = make(chan record.Record)
|
||||||
|
err = make(chan error, 1)
|
||||||
|
err <- ErrReadOnly
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.storage.PutMany()
|
||||||
|
}
|
||||||
|
|
||||||
// Query executes the given query on the database.
|
// Query executes the given query on the database.
|
||||||
func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
|
func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) {
|
||||||
c.readLock.RLock()
|
c.readLock.RLock()
|
||||||
|
|
|
@ -10,10 +10,13 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/safing/portbase/database/record"
|
||||||
|
|
||||||
q "github.com/safing/portbase/database/query"
|
q "github.com/safing/portbase/database/query"
|
||||||
_ "github.com/safing/portbase/database/storage/badger"
|
_ "github.com/safing/portbase/database/storage/badger"
|
||||||
_ "github.com/safing/portbase/database/storage/bbolt"
|
_ "github.com/safing/portbase/database/storage/bbolt"
|
||||||
_ "github.com/safing/portbase/database/storage/fstree"
|
_ "github.com/safing/portbase/database/storage/fstree"
|
||||||
|
_ "github.com/safing/portbase/database/storage/hashmap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func makeKey(dbName, key string) string {
|
func makeKey(dbName, key string) string {
|
||||||
|
@ -39,7 +42,10 @@ func testDatabase(t *testing.T, storageType string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// interface
|
// interface
|
||||||
db := NewInterface(nil)
|
db := NewInterface(&Options{
|
||||||
|
Local: true,
|
||||||
|
Internal: true,
|
||||||
|
})
|
||||||
|
|
||||||
// sub
|
// sub
|
||||||
sub, err := db.Subscribe(q.New(dbName).MustBeValid())
|
sub, err := db.Subscribe(q.New(dbName).MustBeValid())
|
||||||
|
@ -107,6 +113,18 @@ func testDatabase(t *testing.T, storageType string) {
|
||||||
t.Fatalf("expected two records, got %d", cnt)
|
t.Fatalf("expected two records, got %d", cnt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch storageType {
|
||||||
|
case "bbolt", "hashmap":
|
||||||
|
batchPut := db.PutMany(dbName)
|
||||||
|
records := []record.Record{A, B, C, nil} // nil is to signify finish
|
||||||
|
for _, r := range records {
|
||||||
|
err = batchPut(r)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = hook.Cancel()
|
err = hook.Cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -142,6 +160,7 @@ func TestDatabaseSystem(t *testing.T) {
|
||||||
testDatabase(t, "badger")
|
testDatabase(t, "badger")
|
||||||
testDatabase(t, "bbolt")
|
testDatabase(t, "bbolt")
|
||||||
testDatabase(t, "fstree")
|
testDatabase(t, "fstree")
|
||||||
|
testDatabase(t, "hashmap")
|
||||||
|
|
||||||
err = MaintainRecordStates()
|
err = MaintainRecordStates()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -170,11 +170,20 @@ func (i *Interface) InsertValue(key string, attribute string, value interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put saves a record to the database.
|
// Put saves a record to the database.
|
||||||
func (i *Interface) Put(r record.Record) error {
|
func (i *Interface) Put(r record.Record) (err error) {
|
||||||
_, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
// get record or only database
|
||||||
|
var db *Controller
|
||||||
|
if !i.options.Internal || !i.options.Local {
|
||||||
|
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
||||||
if err != nil && err != ErrNotFound {
|
if err != nil && err != ErrNotFound {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
db, err = getController(r.DatabaseKey())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
@ -186,24 +195,100 @@ func (i *Interface) Put(r record.Record) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutNew saves a record to the database as a new record (ie. with new timestamps).
|
// PutNew saves a record to the database as a new record (ie. with new timestamps).
|
||||||
func (i *Interface) PutNew(r record.Record) error {
|
func (i *Interface) PutNew(r record.Record) (err error) {
|
||||||
_, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
// get record or only database
|
||||||
|
var db *Controller
|
||||||
|
if !i.options.Internal || !i.options.Local {
|
||||||
|
_, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true)
|
||||||
if err != nil && err != ErrNotFound {
|
if err != nil && err != ErrNotFound {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
db, err = getController(r.DatabaseKey())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
|
||||||
if r.Meta() == nil {
|
if r.Meta() != nil {
|
||||||
r.CreateMeta()
|
|
||||||
}
|
|
||||||
r.Meta().Reset()
|
r.Meta().Reset()
|
||||||
|
}
|
||||||
i.options.Apply(r)
|
i.options.Apply(r)
|
||||||
i.updateCache(r)
|
i.updateCache(r)
|
||||||
return db.Put(r)
|
return db.Put(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PutMany stores many records in the database. Warning: This is nearly a direct database access and omits many things:
|
||||||
|
// - Record locking
|
||||||
|
// - Hooks
|
||||||
|
// - Subscriptions
|
||||||
|
// - Caching
|
||||||
|
func (i *Interface) PutMany(dbName string) (put func(record.Record) error) {
|
||||||
|
interfaceBatch := make(chan record.Record, 100)
|
||||||
|
|
||||||
|
// permission check
|
||||||
|
if !i.options.Internal || !i.options.Local {
|
||||||
|
return func(r record.Record) error {
|
||||||
|
return ErrPermissionDenied
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// get database
|
||||||
|
db, err := getController(dbName)
|
||||||
|
if err != nil {
|
||||||
|
return func(r record.Record) error {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// start database access
|
||||||
|
dbBatch, errCh := db.PutMany()
|
||||||
|
|
||||||
|
// interface options proxy
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case r := <-interfaceBatch:
|
||||||
|
// finished?
|
||||||
|
if r == nil {
|
||||||
|
close(dbBatch) // signify that we are finished
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// apply options
|
||||||
|
i.options.Apply(r)
|
||||||
|
// pass along
|
||||||
|
dbBatch <- r
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
// bail out
|
||||||
|
errCh <- errors.New("timeout: putmany unused for too long")
|
||||||
|
close(dbBatch) // signify that we are finished
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return func(r record.Record) error {
|
||||||
|
if r == nil {
|
||||||
|
interfaceBatch <- nil // signify that we are finished
|
||||||
|
// do not close, as this fn could be called again with nil.
|
||||||
|
return <-errCh
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.DatabaseName() != dbName {
|
||||||
|
return errors.New("record out of database scope")
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case interfaceBatch <- r:
|
||||||
|
return nil
|
||||||
|
case err := <-errCh:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SetAbsoluteExpiry sets an absolute record expiry.
|
// SetAbsoluteExpiry sets an absolute record expiry.
|
||||||
func (i *Interface) SetAbsoluteExpiry(key string, time int64) error {
|
func (i *Interface) SetAbsoluteExpiry(key string, time int64) error {
|
||||||
r, db, err := i.getRecord(getDBFromKey, key, true, true)
|
r, db, err := i.getRecord(getDBFromKey, key, true, true)
|
||||||
|
|
|
@ -94,6 +94,15 @@ func (b *Badger) Put(r record.Record) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PutMany stores many records in the database.
|
||||||
|
func (b *Badger) PutMany() (batch chan record.Record, err chan error) {
|
||||||
|
// TODO: implement PutMany
|
||||||
|
batch = make(chan record.Record)
|
||||||
|
err = make(chan error, 1)
|
||||||
|
err <- errors.New("not implemented")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Delete deletes a record from the database.
|
// Delete deletes a record from the database.
|
||||||
func (b *Badger) Delete(key string) error {
|
func (b *Badger) Delete(key string) error {
|
||||||
return b.db.Update(func(txn *badger.Txn) error {
|
return b.db.Update(func(txn *badger.Txn) error {
|
||||||
|
|
|
@ -105,6 +105,40 @@ func (b *BBolt) Put(r record.Record) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PutMany stores many records in the database.
|
||||||
|
func (b *BBolt) PutMany() (batch chan record.Record, errCh chan error) {
|
||||||
|
batch = make(chan record.Record, 100)
|
||||||
|
errCh = make(chan error, 1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
err := b.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
|
bucket := tx.Bucket(bucketName)
|
||||||
|
for {
|
||||||
|
r := <-batch
|
||||||
|
// finished?
|
||||||
|
if r == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// marshal
|
||||||
|
data, txErr := r.MarshalRecord(r)
|
||||||
|
if txErr != nil {
|
||||||
|
return txErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// put
|
||||||
|
txErr = bucket.Put([]byte(r.DatabaseKey()), data)
|
||||||
|
if txErr != nil {
|
||||||
|
return txErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
errCh <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
return batch, errCh
|
||||||
|
}
|
||||||
|
|
||||||
// Delete deletes a record from the database.
|
// Delete deletes a record from the database.
|
||||||
func (b *BBolt) Delete(key string) error {
|
func (b *BBolt) Delete(key string) error {
|
||||||
err := b.db.Update(func(tx *bbolt.Tx) error {
|
err := b.db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
|
|
@ -131,6 +131,15 @@ func (fst *FSTree) Put(r record.Record) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PutMany stores many records in the database.
|
||||||
|
func (fst *FSTree) PutMany() (batch chan record.Record, err chan error) {
|
||||||
|
// TODO: implement PutMany
|
||||||
|
batch = make(chan record.Record)
|
||||||
|
err = make(chan error, 1)
|
||||||
|
err <- errors.New("not implemented")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Delete deletes a record from the database.
|
// Delete deletes a record from the database.
|
||||||
func (fst *FSTree) Delete(key string) error {
|
func (fst *FSTree) Delete(key string) error {
|
||||||
dstPath, err := fst.buildFilePath(key, true)
|
dstPath, err := fst.buildFilePath(key, true)
|
||||||
|
|
|
@ -52,6 +52,33 @@ func (hm *HashMap) Put(r record.Record) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PutMany stores many records in the database.
|
||||||
|
func (hm *HashMap) PutMany() (batch chan record.Record, err chan error) {
|
||||||
|
hm.dbLock.Lock()
|
||||||
|
defer hm.dbLock.Unlock()
|
||||||
|
// we could lock for every record, but we want to have the same behaviour
|
||||||
|
// as the other storage backends, especially for testing.
|
||||||
|
|
||||||
|
batch = make(chan record.Record, 100)
|
||||||
|
err = make(chan error, 1)
|
||||||
|
|
||||||
|
// start handler
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
r := <-batch
|
||||||
|
// finished?
|
||||||
|
if r == nil {
|
||||||
|
err <- nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// put
|
||||||
|
hm.db[r.DatabaseKey()] = r
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Delete deletes a record from the database.
|
// Delete deletes a record from the database.
|
||||||
func (hm *HashMap) Delete(key string) error {
|
func (hm *HashMap) Delete(key string) error {
|
||||||
hm.dbLock.Lock()
|
hm.dbLock.Lock()
|
||||||
|
|
|
@ -25,6 +25,14 @@ func (i *InjectBase) Put(m record.Record) error {
|
||||||
return errNotImplemented
|
return errNotImplemented
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PutMany stores many records in the database.
|
||||||
|
func (i *InjectBase) PutMany() (batch chan record.Record, err chan error) {
|
||||||
|
batch = make(chan record.Record)
|
||||||
|
err = make(chan error, 1)
|
||||||
|
err <- errNotImplemented
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Delete deletes a record from the database.
|
// Delete deletes a record from the database.
|
||||||
func (i *InjectBase) Delete(key string) error {
|
func (i *InjectBase) Delete(key string) error {
|
||||||
return errNotImplemented
|
return errNotImplemented
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
type Interface interface {
|
type Interface interface {
|
||||||
Get(key string) (record.Record, error)
|
Get(key string) (record.Record, error)
|
||||||
Put(m record.Record) error
|
Put(m record.Record) error
|
||||||
|
PutMany() (batch chan record.Record, err chan error)
|
||||||
Delete(key string) error
|
Delete(key string) error
|
||||||
Query(q *query.Query, local, internal bool) (*iterator.Iterator, error)
|
Query(q *query.Query, local, internal bool) (*iterator.Iterator, error)
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,26 @@ func (s *Sinkhole) Put(m record.Record) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PutMany stores many records in the database.
|
||||||
|
func (s *Sinkhole) PutMany() (batch chan record.Record, err chan error) {
|
||||||
|
batch = make(chan record.Record, 100)
|
||||||
|
err = make(chan error, 1)
|
||||||
|
|
||||||
|
// start handler
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
r := <-batch
|
||||||
|
// finished?
|
||||||
|
if r == nil {
|
||||||
|
err <- nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Delete deletes a record from the database.
|
// Delete deletes a record from the database.
|
||||||
func (s *Sinkhole) Delete(key string) error {
|
func (s *Sinkhole) Delete(key string) error {
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Add table
Reference in a new issue