diff --git a/database/controller.go b/database/controller.go index 51699f4..918112b 100644 --- a/database/controller.go +++ b/database/controller.go @@ -137,6 +137,28 @@ func (c *Controller) Put(r record.Record) (err error) { return nil } +// PutMany stores many records in the database. +func (c *Controller) PutMany() (batch chan record.Record, err chan error) { + c.writeLock.RLock() + defer c.writeLock.RUnlock() + + if shuttingDown.IsSet() { + batch = make(chan record.Record) + err = make(chan error, 1) + err <- ErrShuttingDown + return + } + + if c.ReadOnly() { + batch = make(chan record.Record) + err = make(chan error, 1) + err <- ErrReadOnly + return + } + + return c.storage.PutMany() +} + // Query executes the given query on the database. func (c *Controller) Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) { c.readLock.RLock() diff --git a/database/database_test.go b/database/database_test.go index f2e2920..6367648 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -10,10 +10,13 @@ import ( "testing" "time" + "github.com/safing/portbase/database/record" + q "github.com/safing/portbase/database/query" _ "github.com/safing/portbase/database/storage/badger" _ "github.com/safing/portbase/database/storage/bbolt" _ "github.com/safing/portbase/database/storage/fstree" + _ "github.com/safing/portbase/database/storage/hashmap" ) func makeKey(dbName, key string) string { @@ -39,7 +42,10 @@ func testDatabase(t *testing.T, storageType string) { } // interface - db := NewInterface(nil) + db := NewInterface(&Options{ + Local: true, + Internal: true, + }) // sub sub, err := db.Subscribe(q.New(dbName).MustBeValid()) @@ -107,6 +113,18 @@ func testDatabase(t *testing.T, storageType string) { t.Fatalf("expected two records, got %d", cnt) } + switch storageType { + case "bbolt", "hashmap": + batchPut := db.PutMany(dbName) + records := []record.Record{A, B, C, nil} // nil is to signify finish + for _, r := range records { + err = batchPut(r) + if err != nil { + t.Fatal(err) + } + } + } + err = hook.Cancel() if err != nil { t.Fatal(err) @@ -142,6 +160,7 @@ func TestDatabaseSystem(t *testing.T) { testDatabase(t, "badger") testDatabase(t, "bbolt") testDatabase(t, "fstree") + testDatabase(t, "hashmap") err = MaintainRecordStates() if err != nil { diff --git a/database/interface.go b/database/interface.go index 9e9b9f7..87beccd 100644 --- a/database/interface.go +++ b/database/interface.go @@ -170,10 +170,19 @@ func (i *Interface) InsertValue(key string, attribute string, value interface{}) } // Put saves a record to the database. -func (i *Interface) Put(r record.Record) error { - _, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) - if err != nil && err != ErrNotFound { - return err +func (i *Interface) Put(r record.Record) (err error) { + // get record or only database + var db *Controller + if !i.options.Internal || !i.options.Local { + _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) + if err != nil && err != ErrNotFound { + return err + } + } else { + db, err = getController(r.DatabaseKey()) + if err != nil { + return err + } } r.Lock() @@ -186,24 +195,100 @@ func (i *Interface) Put(r record.Record) error { } // PutNew saves a record to the database as a new record (ie. with new timestamps). -func (i *Interface) PutNew(r record.Record) error { - _, db, err := i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) - if err != nil && err != ErrNotFound { - return err +func (i *Interface) PutNew(r record.Record) (err error) { + // get record or only database + var db *Controller + if !i.options.Internal || !i.options.Local { + _, db, err = i.getRecord(r.DatabaseName(), r.DatabaseKey(), true, true) + if err != nil && err != ErrNotFound { + return err + } + } else { + db, err = getController(r.DatabaseKey()) + if err != nil { + return err + } } r.Lock() defer r.Unlock() - if r.Meta() == nil { - r.CreateMeta() + if r.Meta() != nil { + r.Meta().Reset() } - r.Meta().Reset() i.options.Apply(r) i.updateCache(r) return db.Put(r) } +// PutMany stores many records in the database. Warning: This is nearly a direct database access and omits many things: +// - Record locking +// - Hooks +// - Subscriptions +// - Caching +func (i *Interface) PutMany(dbName string) (put func(record.Record) error) { + interfaceBatch := make(chan record.Record, 100) + + // permission check + if !i.options.Internal || !i.options.Local { + return func(r record.Record) error { + return ErrPermissionDenied + } + } + + // get database + db, err := getController(dbName) + if err != nil { + return func(r record.Record) error { + return err + } + } + + // start database access + dbBatch, errCh := db.PutMany() + + // interface options proxy + go func() { + for { + select { + case r := <-interfaceBatch: + // finished? + if r == nil { + close(dbBatch) // signify that we are finished + return + } + // apply options + i.options.Apply(r) + // pass along + dbBatch <- r + case <-time.After(1 * time.Second): + // bail out + errCh <- errors.New("timeout: putmany unused for too long") + close(dbBatch) // signify that we are finished + return + } + } + }() + + return func(r record.Record) error { + if r == nil { + interfaceBatch <- nil // signify that we are finished + // do not close, as this fn could be called again with nil. + return <-errCh + } + + if r.DatabaseName() != dbName { + return errors.New("record out of database scope") + } + select { + case interfaceBatch <- r: + return nil + case err := <-errCh: + return err + } + } +} + // SetAbsoluteExpiry sets an absolute record expiry. func (i *Interface) SetAbsoluteExpiry(key string, time int64) error { r, db, err := i.getRecord(getDBFromKey, key, true, true) diff --git a/database/storage/badger/badger.go b/database/storage/badger/badger.go index 8f473cd..8f2c9f3 100644 --- a/database/storage/badger/badger.go +++ b/database/storage/badger/badger.go @@ -94,6 +94,15 @@ func (b *Badger) Put(r record.Record) error { return err } +// PutMany stores many records in the database. +func (b *Badger) PutMany() (batch chan record.Record, err chan error) { + // TODO: implement PutMany + batch = make(chan record.Record) + err = make(chan error, 1) + err <- errors.New("not implemented") + return +} + // Delete deletes a record from the database. func (b *Badger) Delete(key string) error { return b.db.Update(func(txn *badger.Txn) error { diff --git a/database/storage/bbolt/bbolt.go b/database/storage/bbolt/bbolt.go index d37bf60..832c157 100644 --- a/database/storage/bbolt/bbolt.go +++ b/database/storage/bbolt/bbolt.go @@ -105,6 +105,40 @@ func (b *BBolt) Put(r record.Record) error { return nil } +// PutMany stores many records in the database. +func (b *BBolt) PutMany() (batch chan record.Record, errCh chan error) { + batch = make(chan record.Record, 100) + errCh = make(chan error, 1) + + go func() { + err := b.db.Batch(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(bucketName) + for { + r := <-batch + // finished? + if r == nil { + return nil + } + + // marshal + data, txErr := r.MarshalRecord(r) + if txErr != nil { + return txErr + } + + // put + txErr = bucket.Put([]byte(r.DatabaseKey()), data) + if txErr != nil { + return txErr + } + } + }) + errCh <- err + }() + + return batch, errCh +} + // Delete deletes a record from the database. func (b *BBolt) Delete(key string) error { err := b.db.Update(func(tx *bbolt.Tx) error { diff --git a/database/storage/fstree/fstree.go b/database/storage/fstree/fstree.go index fa160de..654695f 100644 --- a/database/storage/fstree/fstree.go +++ b/database/storage/fstree/fstree.go @@ -131,6 +131,15 @@ func (fst *FSTree) Put(r record.Record) error { return nil } +// PutMany stores many records in the database. +func (fst *FSTree) PutMany() (batch chan record.Record, err chan error) { + // TODO: implement PutMany + batch = make(chan record.Record) + err = make(chan error, 1) + err <- errors.New("not implemented") + return +} + // Delete deletes a record from the database. func (fst *FSTree) Delete(key string) error { dstPath, err := fst.buildFilePath(key, true) diff --git a/database/storage/hashmap/map.go b/database/storage/hashmap/map.go index c6bb65d..8e1103a 100644 --- a/database/storage/hashmap/map.go +++ b/database/storage/hashmap/map.go @@ -52,6 +52,33 @@ func (hm *HashMap) Put(r record.Record) error { return nil } +// PutMany stores many records in the database. +func (hm *HashMap) PutMany() (batch chan record.Record, err chan error) { + hm.dbLock.Lock() + defer hm.dbLock.Unlock() + // we could lock for every record, but we want to have the same behaviour + // as the other storage backends, especially for testing. + + batch = make(chan record.Record, 100) + err = make(chan error, 1) + + // start handler + go func() { + for { + r := <-batch + // finished? + if r == nil { + err <- nil + return + } + // put + hm.db[r.DatabaseKey()] = r + } + }() + + return +} + // Delete deletes a record from the database. func (hm *HashMap) Delete(key string) error { hm.dbLock.Lock() diff --git a/database/storage/injectbase.go b/database/storage/injectbase.go index 316c8d6..2596820 100644 --- a/database/storage/injectbase.go +++ b/database/storage/injectbase.go @@ -25,6 +25,14 @@ func (i *InjectBase) Put(m record.Record) error { return errNotImplemented } +// PutMany stores many records in the database. +func (i *InjectBase) PutMany() (batch chan record.Record, err chan error) { + batch = make(chan record.Record) + err = make(chan error, 1) + err <- errNotImplemented + return +} + // Delete deletes a record from the database. func (i *InjectBase) Delete(key string) error { return errNotImplemented diff --git a/database/storage/interface.go b/database/storage/interface.go index 4a6ce1e..3f0642a 100644 --- a/database/storage/interface.go +++ b/database/storage/interface.go @@ -10,6 +10,7 @@ import ( type Interface interface { Get(key string) (record.Record, error) Put(m record.Record) error + PutMany() (batch chan record.Record, err chan error) Delete(key string) error Query(q *query.Query, local, internal bool) (*iterator.Iterator, error) diff --git a/database/storage/sinkhole/sinkhole.go b/database/storage/sinkhole/sinkhole.go index 9c51d29..14bbfb0 100644 --- a/database/storage/sinkhole/sinkhole.go +++ b/database/storage/sinkhole/sinkhole.go @@ -40,6 +40,26 @@ func (s *Sinkhole) Put(m record.Record) error { return nil } +// PutMany stores many records in the database. +func (s *Sinkhole) PutMany() (batch chan record.Record, err chan error) { + batch = make(chan record.Record, 100) + err = make(chan error, 1) + + // start handler + go func() { + for { + r := <-batch + // finished? + if r == nil { + err <- nil + return + } + } + }() + + return +} + // Delete deletes a record from the database. func (s *Sinkhole) Delete(key string) error { return nil