Remove datastore based storage backends

This commit is contained in:
Daniel 2019-05-31 12:59:35 +02:00
parent 0318521ca5
commit 0b60809c70
9 changed files with 0 additions and 1240 deletions

View file

@ -1,4 +0,0 @@
from: https://github.com/ipfs/go-datastore/blob/master/basic_ds.go
commit: https://github.com/ipfs/go-datastore/commit/545f59008f75bdb6b28abafd8391d7d7d19422be
original files imported:
- basic_ds.go sha256:f6bd8d26e3511539358b1712c1f1359b9da6425f2d28c8305f4017e61688fe4d

View file

@ -1,21 +0,0 @@
The MIT License
Copyright (c) 2016 Juan Batiz-Benet
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View file

@ -1,288 +0,0 @@
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
package channelshim
import (
"errors"
"io"
"time"
datastore "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
"github.com/tevino/abool"
)
var ErrDatastoreClosed = errors.New("datastore: this instance was closed")
// ChanneledDatastore makes datastore thread-safe
type ChanneledDatastore struct {
child datastore.Datastore
setChild chan datastore.Datastore
putRequest chan *VKeyValue
putReply chan *error
getRequest chan *datastore.Key
getReply chan *VValueErr
hasRequest chan *datastore.Key
hasReply chan *VExistsErr
deleteRequest chan *datastore.Key
deleteReply chan *error
queryRequest chan *dsq.Query
queryReply chan *VResultsErr
batchRequest chan interface{} // nothing actually
batchReply chan *VBatchErr
closeRequest chan interface{} // nothing actually
closeReply chan *error
closedFlag *abool.AtomicBool
}
type VKeyValue struct {
Key datastore.Key
Value interface{}
}
type VValueErr struct {
Value interface{}
Err error
}
type VExistsErr struct {
Exists bool
Err error
}
type VResultsErr struct {
Results dsq.Results
Err error
}
type VBatchErr struct {
Batch datastore.Batch
Err error
}
func (cds *ChanneledDatastore) run() {
if cds.child == nil {
cds.child = <-cds.setChild
}
for {
select {
case v := <-cds.putRequest:
cds.put(v)
case v := <-cds.getRequest:
cds.get(v)
case v := <-cds.hasRequest:
cds.has(v)
case v := <-cds.deleteRequest:
cds.delete(v)
case v := <-cds.queryRequest:
cds.query(v)
case <-cds.batchRequest:
cds.batch()
case <-cds.closeRequest:
err := cds.close()
if err == nil {
cds.closeReply <- &err
defer cds.stop()
return
}
cds.closeReply <- &err
}
}
}
func (cds *ChanneledDatastore) stop() {
for {
select {
case <-cds.putRequest:
cds.putReply <- &ErrDatastoreClosed
case <-cds.getRequest:
cds.getReply <- &VValueErr{nil, ErrDatastoreClosed}
case <-cds.hasRequest:
cds.hasReply <- &VExistsErr{false, ErrDatastoreClosed}
case <-cds.deleteRequest:
cds.deleteReply <- &ErrDatastoreClosed
case <-cds.queryRequest:
cds.queryReply <- &VResultsErr{nil, ErrDatastoreClosed}
case <-cds.batchRequest:
cds.batchReply <- &VBatchErr{nil, ErrDatastoreClosed}
case <-cds.closeRequest:
cds.closeReply <- &ErrDatastoreClosed
case <-time.After(1 * time.Minute):
// TODO: theoretically a race condition, as some goroutines _could_ still be stuck in front of the request channel
close(cds.putRequest)
close(cds.putReply)
close(cds.getRequest)
close(cds.getReply)
close(cds.hasRequest)
close(cds.hasReply)
close(cds.deleteRequest)
close(cds.deleteReply)
close(cds.queryRequest)
close(cds.queryReply)
close(cds.batchRequest)
close(cds.batchReply)
close(cds.closeRequest)
close(cds.closeReply)
return
}
}
}
// NewChanneledDatastore constructs a datastore accessed through channels.
func NewChanneledDatastore(ds datastore.Datastore) *ChanneledDatastore {
cds := &ChanneledDatastore{child: ds}
cds.setChild = make(chan datastore.Datastore)
cds.putRequest = make(chan *VKeyValue)
cds.putReply = make(chan *error)
cds.getRequest = make(chan *datastore.Key)
cds.getReply = make(chan *VValueErr)
cds.hasRequest = make(chan *datastore.Key)
cds.hasReply = make(chan *VExistsErr)
cds.deleteRequest = make(chan *datastore.Key)
cds.deleteReply = make(chan *error)
cds.queryRequest = make(chan *dsq.Query)
cds.queryReply = make(chan *VResultsErr)
cds.batchRequest = make(chan interface{})
cds.batchReply = make(chan *VBatchErr)
cds.closeRequest = make(chan interface{})
cds.closeReply = make(chan *error)
cds.closedFlag = abool.NewBool(false)
go cds.run()
return cds
}
// SetChild sets the child of the datastore, if not yet set
func (cds *ChanneledDatastore) SetChild(ds datastore.Datastore) error {
select {
case cds.setChild <- ds:
default:
return errors.New("channelshim: child already set")
}
return nil
}
// Children implements Shim
func (cds *ChanneledDatastore) Children() []datastore.Datastore {
return []datastore.Datastore{cds.child}
}
// Put implements Datastore.Put
func (cds *ChanneledDatastore) Put(key datastore.Key, value interface{}) error {
if cds.closedFlag.IsSet() {
return ErrDatastoreClosed
}
cds.putRequest <- &VKeyValue{key, value}
err := <-cds.putReply
return *err
}
func (cds *ChanneledDatastore) put(v *VKeyValue) {
err := cds.child.Put(v.Key, v.Value)
cds.putReply <- &err
}
// Get implements Datastore.Get
func (cds *ChanneledDatastore) Get(key datastore.Key) (value interface{}, err error) {
if cds.closedFlag.IsSet() {
return nil, ErrDatastoreClosed
}
cds.getRequest <- &key
v := <-cds.getReply
return v.Value, v.Err
}
func (cds *ChanneledDatastore) get(key *datastore.Key) {
value, err := cds.child.Get(*key)
cds.getReply <- &VValueErr{value, err}
}
// Has implements Datastore.Has
func (cds *ChanneledDatastore) Has(key datastore.Key) (exists bool, err error) {
if cds.closedFlag.IsSet() {
return false, ErrDatastoreClosed
}
cds.hasRequest <- &key
v := <-cds.hasReply
return v.Exists, v.Err
}
func (cds *ChanneledDatastore) has(key *datastore.Key) {
exists, err := cds.child.Has(*key)
cds.hasReply <- &VExistsErr{exists, err}
}
// Delete implements Datastore.Delete
func (cds *ChanneledDatastore) Delete(key datastore.Key) error {
if cds.closedFlag.IsSet() {
return ErrDatastoreClosed
}
cds.deleteRequest <- &key
err := <-cds.deleteReply
return *err
}
func (cds *ChanneledDatastore) delete(key *datastore.Key) {
err := cds.child.Delete(*key)
cds.deleteReply <- &err
}
// Query implements Datastore.Query
func (cds *ChanneledDatastore) Query(q dsq.Query) (dsq.Results, error) {
if cds.closedFlag.IsSet() {
return nil, ErrDatastoreClosed
}
cds.queryRequest <- &q
v := <-cds.queryReply
return v.Results, v.Err
}
func (cds *ChanneledDatastore) query(q *dsq.Query) {
results, err := cds.child.Query(*q)
cds.queryReply <- &VResultsErr{results, err}
}
// Query implements Datastore.Batch
func (cds *ChanneledDatastore) Batch() (datastore.Batch, error) {
if cds.closedFlag.IsSet() {
return nil, ErrDatastoreClosed
}
cds.batchRequest <- nil
v := <-cds.batchReply
return v.Batch, v.Err
}
func (cds *ChanneledDatastore) batch() {
if bds, ok := cds.child.(datastore.Batching); ok {
batch, err := bds.Batch()
cds.batchReply <- &VBatchErr{batch, err}
} else {
cds.batchReply <- &VBatchErr{nil, datastore.ErrBatchUnsupported}
}
}
// Query closed child Datastore and this Shim
func (cds *ChanneledDatastore) Close() error {
if cds.closedFlag.IsSet() {
return ErrDatastoreClosed
}
cds.closeRequest <- nil
err := <-cds.closeReply
return *err
}
func (cds *ChanneledDatastore) close() error {
if cds, ok := cds.child.(io.Closer); ok {
return cds.Close()
}
return nil
}
func (cds *ChanneledDatastore) IsThreadSafe() {
}

View file

@ -1,64 +0,0 @@
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
package channelshim
import (
"io"
"sync"
"testing"
datastore "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
)
var cds datastore.Datastore
var key datastore.Key
var q query.Query
var wg sync.WaitGroup
func testFunctions(testClose bool) {
wg.Add(1)
defer wg.Done()
cds.Put(key, "value")
cds.Get(key)
cds.Has(key)
cds.Delete(key)
cds.Query(q)
if batchingDS, ok := cds.(datastore.Batching); ok {
batchingDS.Batch()
}
if testClose {
if closingDS, ok := cds.(io.Closer); ok {
closingDS.Close()
}
}
}
func TestChanneledDatastore(t *testing.T) {
cds = NewChanneledDatastore(datastore.NewNullDatastore())
key = datastore.RandomKey()
// test normal concurrency-safe operation
for i := 0; i < 100; i++ {
go testFunctions(false)
}
wg.Wait()
// test shutdown procedure
for i := 0; i < 50; i++ {
go testFunctions(false)
}
for i := 0; i < 50; i++ {
go testFunctions(true)
}
wg.Wait()
// test closed functions, just to be sure
go testFunctions(true)
wg.Wait()
}

View file

@ -1,5 +0,0 @@
from: https://github.com/ipfs/go-ds-leveldb
commit: https://github.com/ipfs/go-ds-leveldb/commit/d2d7b2c585634fcf7edb0de3602d060de85616a5
original files imported:
- datastore.go sha256:03994e4ddc33e4b9f13a637c95753b808eb02d97a80b8f7539ef9f0498567ce1
- ds_test.go sha256:3f49b4c7769e8a69ba058411f763f7425128769fab25b58650368581dc303a7c

View file

@ -1,21 +0,0 @@
The MIT License
Copyright (c) 2016 Jeromy Johnson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View file

@ -1,250 +0,0 @@
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
package leveldb
import (
"fmt"
"github.com/Safing/safing-core/database/dbutils"
"github.com/Safing/safing-core/formats/dsd"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
"github.com/jbenet/goprocess"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
type datastore struct {
DB *leveldb.DB
}
type Options opt.Options
func NewDatastore(path string, opts *Options) (*datastore, error) {
var nopts opt.Options
if opts != nil {
nopts = opt.Options(*opts)
}
db, err := leveldb.OpenFile(path, &nopts)
if err != nil {
return nil, err
}
return &datastore{
DB: db,
}, nil
}
// Returns ErrInvalidType if value is not of type []byte.
//
// Note: using sync = false.
// see http://godoc.org/github.com/syndtr/goleveldb/leveldb/opt#WriteOptions
func (d *datastore) Put(key ds.Key, value interface{}) (err error) {
bytes, err := dbutils.DumpModel(value, dsd.AUTO) // or other dsd format
if err != nil {
return err
}
return d.DB.Put(key.Bytes(), bytes, nil)
}
func (d *datastore) Get(key ds.Key) (interface{}, error) {
data, err := d.DB.Get(key.Bytes(), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, ds.ErrNotFound
}
return nil, err
}
model, err := dbutils.NewWrapper(&key, data)
if err != nil {
return nil, err
}
return model, nil
}
func (d *datastore) Has(key ds.Key) (exists bool, err error) {
return d.DB.Has(key.Bytes(), nil)
}
func (d *datastore) Delete(key ds.Key) (err error) {
// leveldb Delete will not return an error if the key doesn't
// exist (see https://github.com/syndtr/goleveldb/issues/109),
// so check that the key exists first and if not return an
// error
exists, err := d.DB.Has(key.Bytes(), nil)
if !exists {
return ds.ErrNotFound
} else if err != nil {
return err
}
return d.DB.Delete(key.Bytes(), nil)
}
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return d.QueryNew(q)
}
func (d *datastore) QueryNew(q dsq.Query) (dsq.Results, error) {
if len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
return d.QueryOrig(q)
}
var rnge *util.Range
if q.Prefix != "" {
rnge = util.BytesPrefix([]byte(q.Prefix))
}
i := d.DB.NewIterator(rnge, nil)
return dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
ok := i.Next()
if !ok {
return dsq.Result{}, false
}
k := string(i.Key())
e := dsq.Entry{Key: k}
if !q.KeysOnly {
buf := make([]byte, len(i.Value()))
copy(buf, i.Value())
newKey := ds.RawKey(k)
wrapper, err := dbutils.NewWrapper(&newKey, buf)
if err != nil {
return dsq.Result{Error: fmt.Errorf("failed to create wrapper for %s: %s", k, err)}, false
}
e.Value = wrapper
}
return dsq.Result{Entry: e}, true
},
Close: func() error {
i.Release()
return nil
},
}), nil
}
func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) {
// we can use multiple iterators concurrently. see:
// https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
// advance the iterator only if the reader reads
//
// run query in own sub-process tied to Results.Process(), so that
// it waits for us to finish AND so that clients can signal to us
// that resources should be reclaimed.
qrb := dsq.NewResultBuilder(q)
qrb.Process.Go(func(worker goprocess.Process) {
d.runQuery(worker, qrb)
})
// go wait on the worker (without signaling close)
go qrb.Process.CloseAfterChildren()
// Now, apply remaining things (filters, order)
qr := qrb.Results()
for _, f := range q.Filters {
qr = dsq.NaiveFilter(qr, f)
}
for _, o := range q.Orders {
qr = dsq.NaiveOrder(qr, o)
}
return qr, nil
}
func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
var rnge *util.Range
if qrb.Query.Prefix != "" {
rnge = util.BytesPrefix([]byte(qrb.Query.Prefix))
}
i := d.DB.NewIterator(rnge, nil)
defer i.Release()
// advance iterator for offset
if qrb.Query.Offset > 0 {
for j := 0; j < qrb.Query.Offset; j++ {
i.Next()
}
}
// iterate, and handle limit, too
for sent := 0; i.Next(); sent++ {
// end early if we hit the limit
if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit {
break
}
k := string(i.Key())
e := dsq.Entry{Key: k}
if !qrb.Query.KeysOnly {
buf := make([]byte, len(i.Value()))
copy(buf, i.Value())
newKey := ds.RawKey(k)
wrapper, err := dbutils.NewWrapper(&newKey, buf)
if err != nil {
return
}
e.Value = wrapper
}
select {
case qrb.Output <- dsq.Result{Entry: e}: // we sent it out
case <-worker.Closing(): // client told us to end early.
break
}
}
if err := i.Error(); err != nil {
select {
case qrb.Output <- dsq.Result{Error: err}: // client read our error
case <-worker.Closing(): // client told us to end.
return
}
}
}
// LevelDB needs to be closed.
func (d *datastore) Close() (err error) {
return d.DB.Close()
}
func (d *datastore) IsThreadSafe() {}
type leveldbBatch struct {
b *leveldb.Batch
db *leveldb.DB
}
func (d *datastore) Batch() (ds.Batch, error) {
return &leveldbBatch{
b: new(leveldb.Batch),
db: d.DB,
}, nil
}
func (b *leveldbBatch) Put(key ds.Key, value interface{}) error {
val, err := dsd.Dump(value, dsd.AUTO)
if err != nil {
// return ds.ErrInvalidType
return err
}
b.b.Put(key.Bytes(), val)
return nil
}
func (b *leveldbBatch) Commit() error {
return b.db.Write(b.b, nil)
}
func (b *leveldbBatch) Delete(key ds.Key) error {
b.b.Delete(key.Bytes())
return nil
}

View file

@ -1,159 +0,0 @@
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
package leveldb
import (
"io/ioutil"
"os"
"testing"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
)
var testcases = map[string]string{
"/a": "a",
"/a/b": "ab",
"/a/b/c": "abc",
"/a/b/d": "a/b/d",
"/a/c": "ac",
"/a/d": "ad",
"/e": "e",
"/f": "f",
}
// returns datastore, and a function to call on exit.
// (this garbage collects). So:
//
// d, close := newDS(t)
// defer close()
func newDS(t *testing.T) (*datastore, func()) {
path, err := ioutil.TempDir("/tmp", "testing_leveldb_")
if err != nil {
t.Fatal(err)
}
d, err := NewDatastore(path, nil)
if err != nil {
t.Fatal(err)
}
return d, func() {
os.RemoveAll(path)
d.Close()
}
}
func addTestCases(t *testing.T, d *datastore, testcases map[string]string) {
for k, v := range testcases {
dsk := ds.NewKey(k)
if err := d.Put(dsk, []byte(v)); err != nil {
t.Fatal(err)
}
}
for k, v := range testcases {
dsk := ds.NewKey(k)
v2, err := d.Get(dsk)
if err != nil {
t.Fatal(err)
}
v2b := v2.(*[]byte)
if string(*v2b) != v {
t.Errorf("%s values differ: %s != %s", k, v, v2)
}
}
}
func TestQuery(t *testing.T) {
d, close := newDS(t)
defer close()
addTestCases(t, d, testcases)
rs, err := d.Query(dsq.Query{Prefix: "/a/"})
if err != nil {
t.Fatal(err)
}
expectMatches(t, []string{
"/a/b",
"/a/b/c",
"/a/b/d",
"/a/c",
"/a/d",
}, rs)
// test offset and limit
rs, err = d.Query(dsq.Query{Prefix: "/a/", Offset: 2, Limit: 2})
if err != nil {
t.Fatal(err)
}
expectMatches(t, []string{
"/a/b/d",
"/a/c",
}, rs)
}
func TestQueryRespectsProcess(t *testing.T) {
d, close := newDS(t)
defer close()
addTestCases(t, d, testcases)
}
func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
actual, err := actualR.Rest()
if err != nil {
t.Error(err)
}
if len(actual) != len(expect) {
t.Error("not enough", expect, actual)
}
for _, k := range expect {
found := false
for _, e := range actual {
if e.Key == k {
found = true
}
}
if !found {
t.Error(k, "not found")
}
}
}
func TestBatching(t *testing.T) {
d, done := newDS(t)
defer done()
b, err := d.Batch()
if err != nil {
t.Fatal(err)
}
for k, v := range testcases {
err := b.Put(ds.NewKey(k), []byte(v))
if err != nil {
t.Fatal(err)
}
}
err = b.Commit()
if err != nil {
t.Fatal(err)
}
for k, v := range testcases {
val, err := d.Get(ds.NewKey(k))
if err != nil {
t.Fatal(err)
}
if v != string(*val.(*[]byte)) {
t.Fatal("got wrong data!")
}
}
}

View file

@ -1,428 +0,0 @@
// Copyright Safing ICS Technologies GmbH. Use of this source code is governed by the AGPL license that can be found in the LICENSE file.
/*
Package simplefs provides a dead simple file-based datastore backed.
It is primarily meant for easy testing or storing big files that can easily be accesses directly, without datastore.
/path/path/type:key.json
/path/path/type:key/type:key
*/
package simplefs
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
"github.com/tevino/abool"
"github.com/Safing/safing-core/database/dbutils"
"github.com/Safing/safing-core/formats/dsd"
"github.com/Safing/safing-core/log"
)
const (
SIMPLEFS_TAG = "330adcf3924003a59ae93289bc2cbb236391588f"
DEFAULT_FILEMODE = os.FileMode(int(0644))
DEFAULT_DIRMODE = os.FileMode(int(0755))
)
type datastore struct {
basePath string
basePathLen int
}
func NewDatastore(path string) (*datastore, error) {
basePath, err := filepath.Abs(path)
if err != nil {
return nil, fmt.Errorf("failed to validate path %s: %s", path, err)
}
tagfile := filepath.Join(basePath, ".simplefs")
file, err := os.Stat(basePath)
if err != nil {
if os.IsNotExist(err) {
err = os.MkdirAll(basePath, DEFAULT_DIRMODE)
if err != nil {
return nil, fmt.Errorf("failed to create directory: %s", err)
}
err = ioutil.WriteFile(tagfile, []byte(SIMPLEFS_TAG), DEFAULT_FILEMODE)
if err != nil {
return nil, fmt.Errorf("failed to create tag file (%s): %s", tagfile, err)
}
} else {
return nil, fmt.Errorf("failed to stat path: %s", err)
}
} else {
if !file.IsDir() {
return nil, fmt.Errorf("provided path (%s) is a file", basePath)
}
// check if valid simplefs storage dir
content, err := ioutil.ReadFile(tagfile)
if err != nil {
return nil, fmt.Errorf("could not read tag file (%s): %s", tagfile, err)
}
if string(content) != SIMPLEFS_TAG {
return nil, fmt.Errorf("invalid tag file (%s)", tagfile)
}
}
log.Infof("simplefs: opened database at %s", basePath)
return &datastore{
basePath: basePath,
basePathLen: len(basePath),
}, nil
}
func (d *datastore) buildPath(path string) (string, error) {
if len(path) < 2 {
return "", fmt.Errorf("key too short: %s", path)
}
newPath := filepath.Clean(filepath.Join(d.basePath, path[1:])) + ".dsd"
if !strings.HasPrefix(newPath, d.basePath) {
return "", fmt.Errorf("key integrity check failed, compiled path is %s", newPath)
}
return newPath, nil
}
func (d *datastore) Put(key ds.Key, value interface{}) (err error) {
objPath, err := d.buildPath(key.String())
if err != nil {
return err
}
bytes, err := dbutils.DumpModel(value, dsd.AUTO) // or other dsd format
if err != nil {
return err
}
err = ioutil.WriteFile(objPath, bytes, DEFAULT_FILEMODE)
if err != nil {
// create dir and try again
err = os.MkdirAll(filepath.Dir(objPath), DEFAULT_DIRMODE)
if err != nil {
return fmt.Errorf("failed to create directory %s: %s", filepath.Dir(objPath), err)
}
err = ioutil.WriteFile(objPath, bytes, DEFAULT_FILEMODE)
if err != nil {
return fmt.Errorf("could not write file %s: %s", objPath, err)
}
}
return nil
}
func (d *datastore) Get(key ds.Key) (interface{}, error) {
objPath, err := d.buildPath(key.String())
if err != nil {
return nil, err
}
data, err := ioutil.ReadFile(objPath)
if err != nil {
// TODO: distinguish between error and inexistance
return nil, ds.ErrNotFound
}
model, err := dbutils.NewWrapper(&key, data)
if err != nil {
return nil, err
}
return model, nil
}
func (d *datastore) Has(key ds.Key) (exists bool, err error) {
objPath, err := d.buildPath(key.String())
if err != nil {
return false, err
}
_, err = os.Stat(objPath)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, fmt.Errorf("failed to stat path %s: %s", objPath, err)
}
return true, nil
}
func (d *datastore) Delete(key ds.Key) (err error) {
objPath, err := d.buildPath(key.String())
if err != nil {
return err
}
// remove entry
err = os.Remove(objPath)
if err != nil {
return fmt.Errorf("could not delete (all) in path %s: %s", objPath, err)
}
// remove children
err = os.RemoveAll(objPath[:len(objPath)-4])
if err != nil {
return fmt.Errorf("could not delete (all) in path %s: %s", objPath, err)
}
return nil
}
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
if len(q.Orders) > 0 {
return nil, fmt.Errorf("simplefs: no support for ordering queries yet")
}
// log.Tracef("new query: %s", q.Prefix)
// log.Tracef("simplefs: new query with prefix %s", q.Prefix)
walkPath, err := d.buildPath(q.Prefix)
if err != nil {
return nil, err
}
walkPath = walkPath[:strings.LastIndex(walkPath, string(os.PathSeparator))]
files := make(chan *dsq.Entry)
stopWalkingFlag := abool.NewBool(false)
stopWalking := make(chan interface{})
counter := 0
go func() {
err := filepath.Walk(walkPath, func(path string, info os.FileInfo, err error) error {
// log.Tracef("walking: %s", path)
if err != nil {
return fmt.Errorf("simplfs: error in query: %s", err)
}
// skip directories
if info.IsDir() {
return nil
}
// check if we are still were we should be
if !strings.HasPrefix(path, d.basePath) {
log.Criticalf("simplfs: query jailbreaked: %s", path)
return errors.New("jailbreaked")
}
path = path[d.basePathLen:]
// check if there is enough space to remove ".dsd"
if len(path) < 6 {
return nil
}
path = path[:len(path)-4]
// check if we still match prefix
if !strings.HasPrefix(path, q.Prefix) {
return nil
}
entry := dsq.Entry{
Key: path,
// TODO: phew, do we really want to load every single file we might not need? use nil for now.
Value: nil,
}
for _, filter := range q.Filters {
if !filter.Filter(entry) {
return nil
}
}
// yay, entry matches!
counter++
if q.Offset > counter {
return nil
}
select {
case files <- &entry:
case <-stopWalking:
return errors.New("finished")
}
if q.Limit != 0 && q.Limit <= counter {
return errors.New("finished")
}
return nil
})
if err != nil {
log.Warningf("simplefs: filewalker for query failed: %s", err)
}
close(files)
}()
return dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
select {
case entry := <-files:
if entry == nil {
return dsq.Result{}, false
}
// log.Tracef("processing: %s", entry.Key)
if !q.KeysOnly {
objPath, err := d.buildPath(entry.Key)
if err != nil {
return dsq.Result{Error: err}, false
}
data, err := ioutil.ReadFile(objPath)
if err != nil {
return dsq.Result{Error: fmt.Errorf("error reading file %s: %s", entry.Key, err)}, false
}
newKey := ds.RawKey(entry.Key)
wrapper, err := dbutils.NewWrapper(&newKey, data)
if err != nil {
return dsq.Result{Error: fmt.Errorf("failed to create wrapper for %s: %s", entry.Key, err)}, false
}
entry.Value = wrapper
}
return dsq.Result{Entry: *entry, Error: nil}, true
case <-time.After(10 * time.Second):
return dsq.Result{Error: errors.New("filesystem timeout")}, false
}
},
Close: func() error {
if stopWalkingFlag.SetToIf(false, true) {
close(stopWalking)
}
return nil
},
}), nil
}
//
// func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
// return d.QueryNew(q)
// }
//
// func (d *datastore) QueryNew(q dsq.Query) (dsq.Results, error) {
// if len(q.Filters) > 0 ||
// len(q.Orders) > 0 ||
// q.Limit > 0 ||
// q.Offset > 0 {
// return d.QueryOrig(q)
// }
// var rnge *util.Range
// if q.Prefix != "" {
// rnge = util.BytesPrefix([]byte(q.Prefix))
// }
// i := d.DB.NewIterator(rnge, nil)
// return dsq.ResultsFromIterator(q, dsq.Iterator{
// Next: func() (dsq.Result, bool) {
// ok := i.Next()
// if !ok {
// return dsq.Result{}, false
// }
// k := string(i.Key())
// e := dsq.Entry{Key: k}
//
// if !q.KeysOnly {
// buf := make([]byte, len(i.Value()))
// copy(buf, i.Value())
// e.Value = buf
// }
// return dsq.Result{Entry: e}, true
// },
// Close: func() error {
// i.Release()
// return nil
// },
// }), nil
// }
//
// func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) {
// // we can use multiple iterators concurrently. see:
// // https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
// // advance the iterator only if the reader reads
// //
// // run query in own sub-process tied to Results.Process(), so that
// // it waits for us to finish AND so that clients can signal to us
// // that resources should be reclaimed.
// qrb := dsq.NewResultBuilder(q)
// qrb.Process.Go(func(worker goprocess.Process) {
// d.runQuery(worker, qrb)
// })
//
// // go wait on the worker (without signaling close)
// go qrb.Process.CloseAfterChildren()
//
// // Now, apply remaining things (filters, order)
// qr := qrb.Results()
// for _, f := range q.Filters {
// qr = dsq.NaiveFilter(qr, f)
// }
// for _, o := range q.Orders {
// qr = dsq.NaiveOrder(qr, o)
// }
// return qr, nil
// }
//
// func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
//
// var rnge *util.Range
// if qrb.Query.Prefix != "" {
// rnge = util.BytesPrefix([]byte(qrb.Query.Prefix))
// }
// i := d.DB.NewIterator(rnge, nil)
// defer i.Release()
//
// // advance iterator for offset
// if qrb.Query.Offset > 0 {
// for j := 0; j < qrb.Query.Offset; j++ {
// i.Next()
// }
// }
//
// // iterate, and handle limit, too
// for sent := 0; i.Next(); sent++ {
// // end early if we hit the limit
// if qrb.Query.Limit > 0 && sent >= qrb.Query.Limit {
// break
// }
//
// k := string(i.Key())
// e := dsq.Entry{Key: k}
//
// if !qrb.Query.KeysOnly {
// buf := make([]byte, len(i.Value()))
// copy(buf, i.Value())
// e.Value = buf
// }
//
// select {
// case qrb.Output <- dsq.Result{Entry: e}: // we sent it out
// case <-worker.Closing(): // client told us to end early.
// break
// }
// }
//
// if err := i.Error(); err != nil {
// select {
// case qrb.Output <- dsq.Result{Error: err}: // client read our error
// case <-worker.Closing(): // client told us to end.
// return
// }
// }
// }
func (d *datastore) Close() (err error) {
return nil
}
func (d *datastore) IsThreadSafe() {}