diff --git a/database/ds/channelshim/FORKED b/database/ds/channelshim/FORKED deleted file mode 100644 index c7364b3..0000000 --- a/database/ds/channelshim/FORKED +++ /dev/null @@ -1,4 +0,0 @@ -from: https://github.com/ipfs/go-datastore/blob/master/basic_ds.go -commit: https://github.com/ipfs/go-datastore/commit/545f59008f75bdb6b28abafd8391d7d7d19422be -original files imported: - - basic_ds.go sha256:f6bd8d26e3511539358b1712c1f1359b9da6425f2d28c8305f4017e61688fe4d diff --git a/database/ds/channelshim/ORIGINAL_LICENSE b/database/ds/channelshim/ORIGINAL_LICENSE deleted file mode 100644 index f204902..0000000 --- a/database/ds/channelshim/ORIGINAL_LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License - -Copyright (c) 2016 Juan Batiz-Benet - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. diff --git a/database/ds/channelshim/channelshim.go b/database/ds/channelshim/channelshim.go deleted file mode 100644 index 9ebc829..0000000 --- a/database/ds/channelshim/channelshim.go +++ /dev/null @@ -1,288 +0,0 @@ -// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file. - -package channelshim - -import ( - "errors" - "io" - "time" - - datastore "github.com/ipfs/go-datastore" - dsq "github.com/ipfs/go-datastore/query" - "github.com/tevino/abool" -) - -var ErrDatastoreClosed = errors.New("datastore: this instance was closed") - -// ChanneledDatastore makes datastore thread-safe -type ChanneledDatastore struct { - child datastore.Datastore - setChild chan datastore.Datastore - - putRequest chan *VKeyValue - putReply chan *error - getRequest chan *datastore.Key - getReply chan *VValueErr - hasRequest chan *datastore.Key - hasReply chan *VExistsErr - deleteRequest chan *datastore.Key - deleteReply chan *error - queryRequest chan *dsq.Query - queryReply chan *VResultsErr - batchRequest chan interface{} // nothing actually - batchReply chan *VBatchErr - closeRequest chan interface{} // nothing actually - closeReply chan *error - - closedFlag *abool.AtomicBool -} - -type VKeyValue struct { - Key datastore.Key - Value interface{} -} - -type VValueErr struct { - Value interface{} - Err error -} - -type VExistsErr struct { - Exists bool - Err error -} - -type VResultsErr struct { - Results dsq.Results - Err error -} - -type VBatchErr struct { - Batch datastore.Batch - Err error -} - -func (cds *ChanneledDatastore) run() { - if cds.child == nil { - cds.child = <-cds.setChild - } - for { - select { - case v := <-cds.putRequest: - cds.put(v) - case v := <-cds.getRequest: - cds.get(v) - case v := <-cds.hasRequest: - cds.has(v) - case v := <-cds.deleteRequest: - cds.delete(v) - case v := <-cds.queryRequest: - cds.query(v) - case <-cds.batchRequest: - cds.batch() - case <-cds.closeRequest: - err := cds.close() - if err == nil { - cds.closeReply <- &err - defer cds.stop() - return - } - cds.closeReply <- &err - } - } -} - -func (cds *ChanneledDatastore) stop() { - for { - select { - case <-cds.putRequest: - cds.putReply <- &ErrDatastoreClosed - case <-cds.getRequest: - cds.getReply <- &VValueErr{nil, ErrDatastoreClosed} - case <-cds.hasRequest: - cds.hasReply <- &VExistsErr{false, ErrDatastoreClosed} - case <-cds.deleteRequest: - cds.deleteReply <- &ErrDatastoreClosed - case <-cds.queryRequest: - cds.queryReply <- &VResultsErr{nil, ErrDatastoreClosed} - case <-cds.batchRequest: - cds.batchReply <- &VBatchErr{nil, ErrDatastoreClosed} - case <-cds.closeRequest: - cds.closeReply <- &ErrDatastoreClosed - case <-time.After(1 * time.Minute): - // TODO: theoretically a race condition, as some goroutines _could_ still be stuck in front of the request channel - close(cds.putRequest) - close(cds.putReply) - close(cds.getRequest) - close(cds.getReply) - close(cds.hasRequest) - close(cds.hasReply) - close(cds.deleteRequest) - close(cds.deleteReply) - close(cds.queryRequest) - close(cds.queryReply) - close(cds.batchRequest) - close(cds.batchReply) - close(cds.closeRequest) - close(cds.closeReply) - return - } - } -} - -// NewChanneledDatastore constructs a datastore accessed through channels. -func NewChanneledDatastore(ds datastore.Datastore) *ChanneledDatastore { - cds := &ChanneledDatastore{child: ds} - cds.setChild = make(chan datastore.Datastore) - - cds.putRequest = make(chan *VKeyValue) - cds.putReply = make(chan *error) - cds.getRequest = make(chan *datastore.Key) - cds.getReply = make(chan *VValueErr) - cds.hasRequest = make(chan *datastore.Key) - cds.hasReply = make(chan *VExistsErr) - cds.deleteRequest = make(chan *datastore.Key) - cds.deleteReply = make(chan *error) - cds.queryRequest = make(chan *dsq.Query) - cds.queryReply = make(chan *VResultsErr) - cds.batchRequest = make(chan interface{}) - cds.batchReply = make(chan *VBatchErr) - cds.closeRequest = make(chan interface{}) - cds.closeReply = make(chan *error) - - cds.closedFlag = abool.NewBool(false) - - go cds.run() - - return cds -} - -// SetChild sets the child of the datastore, if not yet set -func (cds *ChanneledDatastore) SetChild(ds datastore.Datastore) error { - select { - case cds.setChild <- ds: - default: - return errors.New("channelshim: child already set") - } - return nil -} - -// Children implements Shim -func (cds *ChanneledDatastore) Children() []datastore.Datastore { - return []datastore.Datastore{cds.child} -} - -// Put implements Datastore.Put -func (cds *ChanneledDatastore) Put(key datastore.Key, value interface{}) error { - if cds.closedFlag.IsSet() { - return ErrDatastoreClosed - } - cds.putRequest <- &VKeyValue{key, value} - err := <-cds.putReply - return *err -} - -func (cds *ChanneledDatastore) put(v *VKeyValue) { - err := cds.child.Put(v.Key, v.Value) - cds.putReply <- &err -} - -// Get implements Datastore.Get -func (cds *ChanneledDatastore) Get(key datastore.Key) (value interface{}, err error) { - if cds.closedFlag.IsSet() { - return nil, ErrDatastoreClosed - } - cds.getRequest <- &key - v := <-cds.getReply - return v.Value, v.Err -} - -func (cds *ChanneledDatastore) get(key *datastore.Key) { - value, err := cds.child.Get(*key) - cds.getReply <- &VValueErr{value, err} -} - -// Has implements Datastore.Has -func (cds *ChanneledDatastore) Has(key datastore.Key) (exists bool, err error) { - if cds.closedFlag.IsSet() { - return false, ErrDatastoreClosed - } - cds.hasRequest <- &key - v := <-cds.hasReply - return v.Exists, v.Err -} - -func (cds *ChanneledDatastore) has(key *datastore.Key) { - exists, err := cds.child.Has(*key) - cds.hasReply <- &VExistsErr{exists, err} -} - -// Delete implements Datastore.Delete -func (cds *ChanneledDatastore) Delete(key datastore.Key) error { - if cds.closedFlag.IsSet() { - return ErrDatastoreClosed - } - cds.deleteRequest <- &key - err := <-cds.deleteReply - return *err -} - -func (cds *ChanneledDatastore) delete(key *datastore.Key) { - err := cds.child.Delete(*key) - cds.deleteReply <- &err -} - -// Query implements Datastore.Query -func (cds *ChanneledDatastore) Query(q dsq.Query) (dsq.Results, error) { - if cds.closedFlag.IsSet() { - return nil, ErrDatastoreClosed - } - cds.queryRequest <- &q - v := <-cds.queryReply - return v.Results, v.Err -} - -func (cds *ChanneledDatastore) query(q *dsq.Query) { - results, err := cds.child.Query(*q) - cds.queryReply <- &VResultsErr{results, err} -} - -// Query implements Datastore.Batch -func (cds *ChanneledDatastore) Batch() (datastore.Batch, error) { - if cds.closedFlag.IsSet() { - return nil, ErrDatastoreClosed - } - cds.batchRequest <- nil - v := <-cds.batchReply - return v.Batch, v.Err -} - -func (cds *ChanneledDatastore) batch() { - if bds, ok := cds.child.(datastore.Batching); ok { - batch, err := bds.Batch() - cds.batchReply <- &VBatchErr{batch, err} - } else { - cds.batchReply <- &VBatchErr{nil, datastore.ErrBatchUnsupported} - } -} - -// Query closed child Datastore and this Shim -func (cds *ChanneledDatastore) Close() error { - if cds.closedFlag.IsSet() { - return ErrDatastoreClosed - } - cds.closeRequest <- nil - err := <-cds.closeReply - return *err -} - -func (cds *ChanneledDatastore) close() error { - if cds, ok := cds.child.(io.Closer); ok { - return cds.Close() - } - return nil -} - -func (cds *ChanneledDatastore) IsThreadSafe() { - -} diff --git a/database/ds/channelshim/channelshim_test.go b/database/ds/channelshim/channelshim_test.go deleted file mode 100644 index 33321bf..0000000 --- a/database/ds/channelshim/channelshim_test.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file. - -package channelshim - -import ( - "io" - "sync" - "testing" - - datastore "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/query" -) - -var cds datastore.Datastore -var key datastore.Key -var q query.Query -var wg sync.WaitGroup - -func testFunctions(testClose bool) { - wg.Add(1) - defer wg.Done() - - cds.Put(key, "value") - cds.Get(key) - cds.Has(key) - cds.Delete(key) - cds.Query(q) - if batchingDS, ok := cds.(datastore.Batching); ok { - batchingDS.Batch() - } - - if testClose { - if closingDS, ok := cds.(io.Closer); ok { - closingDS.Close() - } - } - -} - -func TestChanneledDatastore(t *testing.T) { - - cds = NewChanneledDatastore(datastore.NewNullDatastore()) - key = datastore.RandomKey() - - // test normal concurrency-safe operation - for i := 0; i < 100; i++ { - go testFunctions(false) - } - wg.Wait() - - // test shutdown procedure - for i := 0; i < 50; i++ { - go testFunctions(false) - } - for i := 0; i < 50; i++ { - go testFunctions(true) - } - wg.Wait() - - // test closed functions, just to be sure - go testFunctions(true) - wg.Wait() - -} diff --git a/database/ds/leveldb/FORKED b/database/ds/leveldb/FORKED deleted file mode 100644 index 7d97d55..0000000 --- a/database/ds/leveldb/FORKED +++ /dev/null @@ -1,5 +0,0 @@ -from: https://github.com/ipfs/go-ds-leveldb -commit: https://github.com/ipfs/go-ds-leveldb/commit/d2d7b2c585634fcf7edb0de3602d060de85616a5 -original files imported: - - datastore.go sha256:03994e4ddc33e4b9f13a637c95753b808eb02d97a80b8f7539ef9f0498567ce1 - - ds_test.go sha256:3f49b4c7769e8a69ba058411f763f7425128769fab25b58650368581dc303a7c diff --git a/database/ds/leveldb/ORIGINAL_LICENSE b/database/ds/leveldb/ORIGINAL_LICENSE deleted file mode 100644 index 6152c32..0000000 --- a/database/ds/leveldb/ORIGINAL_LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License - -Copyright (c) 2016 Jeromy Johnson - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. diff --git a/database/ds/leveldb/leveldb.go b/database/ds/leveldb/leveldb.go deleted file mode 100644 index ea552b4..0000000 --- a/database/ds/leveldb/leveldb.go +++ /dev/null @@ -1,250 +0,0 @@ -// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file. - -package leveldb - -import ( - "fmt" - - "github.com/Safing/safing-core/database/dbutils" - "github.com/Safing/safing-core/formats/dsd" - - ds "github.com/ipfs/go-datastore" - dsq "github.com/ipfs/go-datastore/query" - "github.com/jbenet/goprocess" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/util" -) - -type datastore struct { - DB *leveldb.DB -} - -type Options opt.Options - -func NewDatastore(path string, opts *Options) (*datastore, error) { - var nopts opt.Options - if opts != nil { - nopts = opt.Options(*opts) - } - db, err := leveldb.OpenFile(path, &nopts) - if err != nil { - return nil, err - } - - return &datastore{ - DB: db, - }, nil -} - -// Returns ErrInvalidType if value is not of type []byte. -// -// Note: using sync = false. -// see http://godoc.org/github.com/syndtr/goleveldb/leveldb/opt#WriteOptions -func (d *datastore) Put(key ds.Key, value interface{}) (err error) { - bytes, err := dbutils.DumpModel(value, dsd.AUTO) // or other dsd format - if err != nil { - return err - } - return d.DB.Put(key.Bytes(), bytes, nil) -} - -func (d *datastore) Get(key ds.Key) (interface{}, error) { - data, err := d.DB.Get(key.Bytes(), nil) - if err != nil { - if err == leveldb.ErrNotFound { - return nil, ds.ErrNotFound - } - return nil, err - } - - model, err := dbutils.NewWrapper(&key, data) - if err != nil { - return nil, err - } - - return model, nil -} - -func (d *datastore) Has(key ds.Key) (exists bool, err error) { - return d.DB.Has(key.Bytes(), nil) -} - -func (d *datastore) Delete(key ds.Key) (err error) { - // leveldb Delete will not return an error if the key doesn't - // exist (see https://github.com/syndtr/goleveldb/issues/109), - // so check that the key exists first and if not return an - // error - exists, err := d.DB.Has(key.Bytes(), nil) - if !exists { - return ds.ErrNotFound - } else if err != nil { - return err - } - return d.DB.Delete(key.Bytes(), nil) -} - -func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { - return d.QueryNew(q) -} - -func (d *datastore) QueryNew(q dsq.Query) (dsq.Results, error) { - if len(q.Filters) > 0 || - len(q.Orders) > 0 || - q.Limit > 0 || - q.Offset > 0 { - return d.QueryOrig(q) - } - var rnge *util.Range - if q.Prefix != "" { - rnge = util.BytesPrefix([]byte(q.Prefix)) - } - i := d.DB.NewIterator(rnge, nil) - return dsq.ResultsFromIterator(q, dsq.Iterator{ - Next: func() (dsq.Result, bool) { - ok := i.Next() - if !ok { - return dsq.Result{}, false - } - k := string(i.Key()) - e := dsq.Entry{Key: k} - - if !q.KeysOnly { - buf := make([]byte, len(i.Value())) - copy(buf, i.Value()) - - newKey := ds.RawKey(k) - wrapper, err := dbutils.NewWrapper(&newKey, buf) - if err != nil { - return dsq.Result{Error: fmt.Errorf("failed to create wrapper for %s: %s", k, err)}, false - } - e.Value = wrapper - } - return dsq.Result{Entry: e}, true - }, - Close: func() error { - i.Release() - return nil - }, - }), nil -} - -func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) { - // we can use multiple iterators concurrently. see: - // https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator - // advance the iterator only if the reader reads - // - // run query in own sub-process tied to Results.Process(), so that - // it waits for us to finish AND so that clients can signal to us - // that resources should be reclaimed. - qrb := dsq.NewResultBuilder(q) - qrb.Process.Go(func(worker goprocess.Process) { - d.runQuery(worker, qrb) - }) - - // go wait on the worker (without signaling close) - go qrb.Process.CloseAfterChildren() - - // Now, apply remaining things (filters, order) - qr := qrb.Results() - for _, f := range q.Filters { - qr = dsq.NaiveFilter(qr, f) - } - for _, o := range q.Orders { - qr = dsq.NaiveOrder(qr, o) - } - return qr, nil -} - -func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) { - - var rnge *util.Range - if qrb.Query.Prefix != "" { - rnge = util.BytesPrefix([]byte(qrb.Query.Prefix)) - } - i := d.DB.NewIterator(rnge, nil) - defer i.Release() - - // advance iterator for offset - if qrb.Query.Offset > 0 { - for j := 0; j < qrb.Query.Offset; j++ { - i.Next() - } - } - - // iterate, and handle limit, too - for sent := 0; i.Next(); sent++ { - // end early if we hit the limit - if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit { - break - } - - k := string(i.Key()) - e := dsq.Entry{Key: k} - - if !qrb.Query.KeysOnly { - buf := make([]byte, len(i.Value())) - copy(buf, i.Value()) - - newKey := ds.RawKey(k) - wrapper, err := dbutils.NewWrapper(&newKey, buf) - if err != nil { - return - } - e.Value = wrapper - } - - select { - case qrb.Output <- dsq.Result{Entry: e}: // we sent it out - case <-worker.Closing(): // client told us to end early. - break - } - } - - if err := i.Error(); err != nil { - select { - case qrb.Output <- dsq.Result{Error: err}: // client read our error - case <-worker.Closing(): // client told us to end. - return - } - } -} - -// LevelDB needs to be closed. -func (d *datastore) Close() (err error) { - return d.DB.Close() -} - -func (d *datastore) IsThreadSafe() {} - -type leveldbBatch struct { - b *leveldb.Batch - db *leveldb.DB -} - -func (d *datastore) Batch() (ds.Batch, error) { - return &leveldbBatch{ - b: new(leveldb.Batch), - db: d.DB, - }, nil -} - -func (b *leveldbBatch) Put(key ds.Key, value interface{}) error { - val, err := dsd.Dump(value, dsd.AUTO) - if err != nil { - // return ds.ErrInvalidType - return err - } - - b.b.Put(key.Bytes(), val) - return nil -} - -func (b *leveldbBatch) Commit() error { - return b.db.Write(b.b, nil) -} - -func (b *leveldbBatch) Delete(key ds.Key) error { - b.b.Delete(key.Bytes()) - return nil -} diff --git a/database/ds/leveldb/leveldb_test.go b/database/ds/leveldb/leveldb_test.go deleted file mode 100644 index aa8498d..0000000 --- a/database/ds/leveldb/leveldb_test.go +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file. - -package leveldb - -import ( - "io/ioutil" - "os" - "testing" - - ds "github.com/ipfs/go-datastore" - dsq "github.com/ipfs/go-datastore/query" -) - -var testcases = map[string]string{ - "/a": "a", - "/a/b": "ab", - "/a/b/c": "abc", - "/a/b/d": "a/b/d", - "/a/c": "ac", - "/a/d": "ad", - "/e": "e", - "/f": "f", -} - -// returns datastore, and a function to call on exit. -// (this garbage collects). So: -// -// d, close := newDS(t) -// defer close() -func newDS(t *testing.T) (*datastore, func()) { - path, err := ioutil.TempDir("/tmp", "testing_leveldb_") - if err != nil { - t.Fatal(err) - } - - d, err := NewDatastore(path, nil) - if err != nil { - t.Fatal(err) - } - return d, func() { - os.RemoveAll(path) - d.Close() - } -} - -func addTestCases(t *testing.T, d *datastore, testcases map[string]string) { - for k, v := range testcases { - dsk := ds.NewKey(k) - if err := d.Put(dsk, []byte(v)); err != nil { - t.Fatal(err) - } - } - - for k, v := range testcases { - dsk := ds.NewKey(k) - v2, err := d.Get(dsk) - if err != nil { - t.Fatal(err) - } - v2b := v2.(*[]byte) - if string(*v2b) != v { - t.Errorf("%s values differ: %s != %s", k, v, v2) - } - } - -} - -func TestQuery(t *testing.T) { - d, close := newDS(t) - defer close() - addTestCases(t, d, testcases) - - rs, err := d.Query(dsq.Query{Prefix: "/a/"}) - if err != nil { - t.Fatal(err) - } - - expectMatches(t, []string{ - "/a/b", - "/a/b/c", - "/a/b/d", - "/a/c", - "/a/d", - }, rs) - - // test offset and limit - - rs, err = d.Query(dsq.Query{Prefix: "/a/", Offset: 2, Limit: 2}) - if err != nil { - t.Fatal(err) - } - - expectMatches(t, []string{ - "/a/b/d", - "/a/c", - }, rs) - -} - -func TestQueryRespectsProcess(t *testing.T) { - d, close := newDS(t) - defer close() - addTestCases(t, d, testcases) -} - -func expectMatches(t *testing.T, expect []string, actualR dsq.Results) { - actual, err := actualR.Rest() - if err != nil { - t.Error(err) - } - - if len(actual) != len(expect) { - t.Error("not enough", expect, actual) - } - for _, k := range expect { - found := false - for _, e := range actual { - if e.Key == k { - found = true - } - } - if !found { - t.Error(k, "not found") - } - } -} - -func TestBatching(t *testing.T) { - d, done := newDS(t) - defer done() - - b, err := d.Batch() - if err != nil { - t.Fatal(err) - } - - for k, v := range testcases { - err := b.Put(ds.NewKey(k), []byte(v)) - if err != nil { - t.Fatal(err) - } - } - - err = b.Commit() - if err != nil { - t.Fatal(err) - } - - for k, v := range testcases { - val, err := d.Get(ds.NewKey(k)) - if err != nil { - t.Fatal(err) - } - - if v != string(*val.(*[]byte)) { - t.Fatal("got wrong data!") - } - } -} diff --git a/database/ds/simplefs/simplefs.go b/database/ds/simplefs/simplefs.go deleted file mode 100644 index 0e1baff..0000000 --- a/database/ds/simplefs/simplefs.go +++ /dev/null @@ -1,428 +0,0 @@ -// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file. - -/* -Package simplefs provides a dead simple file-based datastore backed. -It is primarily meant for easy testing or storing big files that can easily be accesses directly, without datastore. - - /path/path/type:key.json - /path/path/type:key/type:key - -*/ -package simplefs - -import ( - "errors" - "fmt" - "io/ioutil" - "os" - "path/filepath" - "strings" - "time" - - ds "github.com/ipfs/go-datastore" - dsq "github.com/ipfs/go-datastore/query" - "github.com/tevino/abool" - - "github.com/Safing/safing-core/database/dbutils" - "github.com/Safing/safing-core/formats/dsd" - "github.com/Safing/safing-core/log" -) - -const ( - SIMPLEFS_TAG = "330adcf3924003a59ae93289bc2cbb236391588f" - DEFAULT_FILEMODE = os.FileMode(int(0644)) - DEFAULT_DIRMODE = os.FileMode(int(0755)) -) - -type datastore struct { - basePath string - basePathLen int -} - -func NewDatastore(path string) (*datastore, error) { - basePath, err := filepath.Abs(path) - if err != nil { - return nil, fmt.Errorf("failed to validate path %s: %s", path, err) - } - tagfile := filepath.Join(basePath, ".simplefs") - - file, err := os.Stat(basePath) - if err != nil { - if os.IsNotExist(err) { - err = os.MkdirAll(basePath, DEFAULT_DIRMODE) - if err != nil { - return nil, fmt.Errorf("failed to create directory: %s", err) - } - err = ioutil.WriteFile(tagfile, []byte(SIMPLEFS_TAG), DEFAULT_FILEMODE) - if err != nil { - return nil, fmt.Errorf("failed to create tag file (%s): %s", tagfile, err) - } - } else { - return nil, fmt.Errorf("failed to stat path: %s", err) - } - } else { - if !file.IsDir() { - return nil, fmt.Errorf("provided path (%s) is a file", basePath) - } - // check if valid simplefs storage dir - content, err := ioutil.ReadFile(tagfile) - if err != nil { - return nil, fmt.Errorf("could not read tag file (%s): %s", tagfile, err) - } - if string(content) != SIMPLEFS_TAG { - return nil, fmt.Errorf("invalid tag file (%s)", tagfile) - } - } - - log.Infof("simplefs: opened database at %s", basePath) - return &datastore{ - basePath: basePath, - basePathLen: len(basePath), - }, nil -} - -func (d *datastore) buildPath(path string) (string, error) { - if len(path) < 2 { - return "", fmt.Errorf("key too short: %s", path) - } - newPath := filepath.Clean(filepath.Join(d.basePath, path[1:])) + ".dsd" - if !strings.HasPrefix(newPath, d.basePath) { - return "", fmt.Errorf("key integrity check failed, compiled path is %s", newPath) - } - return newPath, nil -} - -func (d *datastore) Put(key ds.Key, value interface{}) (err error) { - objPath, err := d.buildPath(key.String()) - if err != nil { - return err - } - - bytes, err := dbutils.DumpModel(value, dsd.AUTO) // or other dsd format - if err != nil { - return err - } - - err = ioutil.WriteFile(objPath, bytes, DEFAULT_FILEMODE) - if err != nil { - // create dir and try again - err = os.MkdirAll(filepath.Dir(objPath), DEFAULT_DIRMODE) - if err != nil { - return fmt.Errorf("failed to create directory %s: %s", filepath.Dir(objPath), err) - } - err = ioutil.WriteFile(objPath, bytes, DEFAULT_FILEMODE) - if err != nil { - return fmt.Errorf("could not write file %s: %s", objPath, err) - } - } - - return nil -} - -func (d *datastore) Get(key ds.Key) (interface{}, error) { - objPath, err := d.buildPath(key.String()) - if err != nil { - return nil, err - } - - data, err := ioutil.ReadFile(objPath) - if err != nil { - // TODO: distinguish between error and inexistance - return nil, ds.ErrNotFound - } - - model, err := dbutils.NewWrapper(&key, data) - if err != nil { - return nil, err - } - - return model, nil -} - -func (d *datastore) Has(key ds.Key) (exists bool, err error) { - objPath, err := d.buildPath(key.String()) - if err != nil { - return false, err - } - - _, err = os.Stat(objPath) - if err != nil { - if os.IsNotExist(err) { - return false, nil - } - return false, fmt.Errorf("failed to stat path %s: %s", objPath, err) - } - return true, nil -} - -func (d *datastore) Delete(key ds.Key) (err error) { - objPath, err := d.buildPath(key.String()) - if err != nil { - return err - } - - // remove entry - err = os.Remove(objPath) - if err != nil { - return fmt.Errorf("could not delete (all) in path %s: %s", objPath, err) - } - // remove children - err = os.RemoveAll(objPath[:len(objPath)-4]) - if err != nil { - return fmt.Errorf("could not delete (all) in path %s: %s", objPath, err) - } - - return nil -} - -func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { - if len(q.Orders) > 0 { - return nil, fmt.Errorf("simplefs: no support for ordering queries yet") - } - - // log.Tracef("new query: %s", q.Prefix) - - // log.Tracef("simplefs: new query with prefix %s", q.Prefix) - - walkPath, err := d.buildPath(q.Prefix) - if err != nil { - return nil, err - } - walkPath = walkPath[:strings.LastIndex(walkPath, string(os.PathSeparator))] - - files := make(chan *dsq.Entry) - stopWalkingFlag := abool.NewBool(false) - stopWalking := make(chan interface{}) - counter := 0 - - go func() { - - err := filepath.Walk(walkPath, func(path string, info os.FileInfo, err error) error { - - // log.Tracef("walking: %s", path) - - if err != nil { - return fmt.Errorf("simplfs: error in query: %s", err) - } - - // skip directories - if info.IsDir() { - return nil - } - - // check if we are still were we should be - if !strings.HasPrefix(path, d.basePath) { - log.Criticalf("simplfs: query jailbreaked: %s", path) - return errors.New("jailbreaked") - } - path = path[d.basePathLen:] - - // check if there is enough space to remove ".dsd" - if len(path) < 6 { - return nil - } - path = path[:len(path)-4] - - // check if we still match prefix - if !strings.HasPrefix(path, q.Prefix) { - return nil - } - - entry := dsq.Entry{ - Key: path, - // TODO: phew, do we really want to load every single file we might not need? use nil for now. - Value: nil, - } - for _, filter := range q.Filters { - if !filter.Filter(entry) { - return nil - } - } - - // yay, entry matches! - counter++ - - if q.Offset > counter { - return nil - } - - select { - case files <- &entry: - case <-stopWalking: - return errors.New("finished") - } - - if q.Limit != 0 && q.Limit <= counter { - return errors.New("finished") - } - - return nil - - }) - - if err != nil { - log.Warningf("simplefs: filewalker for query failed: %s", err) - } - - close(files) - - }() - - return dsq.ResultsFromIterator(q, dsq.Iterator{ - Next: func() (dsq.Result, bool) { - select { - case entry := <-files: - if entry == nil { - return dsq.Result{}, false - } - // log.Tracef("processing: %s", entry.Key) - if !q.KeysOnly { - objPath, err := d.buildPath(entry.Key) - if err != nil { - return dsq.Result{Error: err}, false - } - data, err := ioutil.ReadFile(objPath) - if err != nil { - return dsq.Result{Error: fmt.Errorf("error reading file %s: %s", entry.Key, err)}, false - } - newKey := ds.RawKey(entry.Key) - wrapper, err := dbutils.NewWrapper(&newKey, data) - if err != nil { - return dsq.Result{Error: fmt.Errorf("failed to create wrapper for %s: %s", entry.Key, err)}, false - } - entry.Value = wrapper - } - return dsq.Result{Entry: *entry, Error: nil}, true - case <-time.After(10 * time.Second): - return dsq.Result{Error: errors.New("filesystem timeout")}, false - } - }, - Close: func() error { - if stopWalkingFlag.SetToIf(false, true) { - close(stopWalking) - } - return nil - }, - }), nil -} - -// -// func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { -// return d.QueryNew(q) -// } -// -// func (d *datastore) QueryNew(q dsq.Query) (dsq.Results, error) { -// if len(q.Filters) > 0 || -// len(q.Orders) > 0 || -// q.Limit > 0 || -// q.Offset > 0 { -// return d.QueryOrig(q) -// } -// var rnge *util.Range -// if q.Prefix != "" { -// rnge = util.BytesPrefix([]byte(q.Prefix)) -// } -// i := d.DB.NewIterator(rnge, nil) -// return dsq.ResultsFromIterator(q, dsq.Iterator{ -// Next: func() (dsq.Result, bool) { -// ok := i.Next() -// if !ok { -// return dsq.Result{}, false -// } -// k := string(i.Key()) -// e := dsq.Entry{Key: k} -// -// if !q.KeysOnly { -// buf := make([]byte, len(i.Value())) -// copy(buf, i.Value()) -// e.Value = buf -// } -// return dsq.Result{Entry: e}, true -// }, -// Close: func() error { -// i.Release() -// return nil -// }, -// }), nil -// } -// -// func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) { -// // we can use multiple iterators concurrently. see: -// // https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator -// // advance the iterator only if the reader reads -// // -// // run query in own sub-process tied to Results.Process(), so that -// // it waits for us to finish AND so that clients can signal to us -// // that resources should be reclaimed. -// qrb := dsq.NewResultBuilder(q) -// qrb.Process.Go(func(worker goprocess.Process) { -// d.runQuery(worker, qrb) -// }) -// -// // go wait on the worker (without signaling close) -// go qrb.Process.CloseAfterChildren() -// -// // Now, apply remaining things (filters, order) -// qr := qrb.Results() -// for _, f := range q.Filters { -// qr = dsq.NaiveFilter(qr, f) -// } -// for _, o := range q.Orders { -// qr = dsq.NaiveOrder(qr, o) -// } -// return qr, nil -// } -// -// func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) { -// -// var rnge *util.Range -// if qrb.Query.Prefix != "" { -// rnge = util.BytesPrefix([]byte(qrb.Query.Prefix)) -// } -// i := d.DB.NewIterator(rnge, nil) -// defer i.Release() -// -// // advance iterator for offset -// if qrb.Query.Offset > 0 { -// for j := 0; j < qrb.Query.Offset; j++ { -// i.Next() -// } -// } -// -// // iterate, and handle limit, too -// for sent := 0; i.Next(); sent++ { -// // end early if we hit the limit -// if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit { -// break -// } -// -// k := string(i.Key()) -// e := dsq.Entry{Key: k} -// -// if !qrb.Query.KeysOnly { -// buf := make([]byte, len(i.Value())) -// copy(buf, i.Value()) -// e.Value = buf -// } -// -// select { -// case qrb.Output <- dsq.Result{Entry: e}: // we sent it out -// case <-worker.Closing(): // client told us to end early. -// break -// } -// } -// -// if err := i.Error(); err != nil { -// select { -// case qrb.Output <- dsq.Result{Error: err}: // client read our error -// case <-worker.Closing(): // client told us to end. -// return -// } -// } -// } - -func (d *datastore) Close() (err error) { - return nil -} - -func (d *datastore) IsThreadSafe() {}