diff --git a/api/database.go b/api/database.go index 46eb1bb..42472a5 100644 --- a/api/database.go +++ b/api/database.go @@ -7,6 +7,11 @@ import ( "net/http" "sync" + "github.com/tidwall/sjson" + + "github.com/safing/portbase/database/iterator" + "github.com/safing/portbase/formats/varint" + "github.com/gorilla/websocket" "github.com/tevino/abool" "github.com/tidwall/gjson" @@ -45,6 +50,9 @@ type DatabaseAPI struct { conn *websocket.Conn sendQueue chan []byte + queriesLock sync.Mutex + queries map[string]*iterator.Iterator + subsLock sync.Mutex subs map[string]*database.Subscription @@ -75,6 +83,7 @@ func startDatabaseAPI(w http.ResponseWriter, r *http.Request) { new := &DatabaseAPI{ conn: wsConn, sendQueue: make(chan []byte, 100), + queries: make(map[string]*iterator.Iterator), subs: make(map[string]*database.Subscription), shutdownSignal: make(chan struct{}), shuttingDown: abool.NewBool(false), @@ -97,11 +106,13 @@ func (api *DatabaseAPI) handler() { // 124|done // 124|error| // 124|warning| // error with single record, operation continues + // 124|cancel // 125|sub| // 125|upd|| // 125|new|| // 127|del| // 125|warning| // error with single record, operation continues + // 125|cancel // 127|qsub| // 127|ok|| // 127|done @@ -110,6 +121,7 @@ func (api *DatabaseAPI) handler() { // 127|new|| // 127|del| // 127|warning| // error with single record, operation continues + // 127|cancel // 128|create|| // 128|success @@ -147,6 +159,7 @@ func (api *DatabaseAPI) handler() { // Handle special command "cancel" if len(parts) == 2 && string(parts[1]) == "cancel" { + // 124|cancel // 125|cancel // 127|cancel go api.handleCancel(parts[0]) @@ -265,7 +278,7 @@ func (api *DatabaseAPI) handleGet(opID []byte, key string) { r, err := api.db.Get(key) if err == nil { - data, err = r.Marshal(r, record.JSON) + data, err = marshalRecord(r) } if err != nil { api.send(opID, dbMsgTypeError, err.Error(), nil) @@ -281,6 +294,7 @@ func (api *DatabaseAPI) handleQuery(opID []byte, queryText string) { // 124|warning| // 124|error| // 124|warning| // error with single record, operation continues + // 124|cancel var err error @@ -300,19 +314,17 @@ func (api *DatabaseAPI) processQuery(opID []byte, q *query.Query) (ok bool) { return false } - for r := range it.Next { - r.Lock() - data, err := r.Marshal(r, record.JSON) - r.Unlock() - if err != nil { - api.send(opID, dbMsgTypeWarning, err.Error(), nil) - } - api.send(opID, dbMsgTypeOk, r.Key(), data) - } - if it.Err() != nil { - api.send(opID, dbMsgTypeError, it.Err().Error(), nil) - return false - } + // Save query iterator. + api.queriesLock.Lock() + api.queries[string(opID)] = it + api.queriesLock.Unlock() + + // Remove query iterator after it ended. + defer func() { + api.queriesLock.Lock() + defer api.queriesLock.Unlock() + delete(api.queries, string(opID)) + }() for { select { @@ -324,9 +336,7 @@ func (api *DatabaseAPI) processQuery(opID []byte, q *query.Query) (ok bool) { // process query feed if r != nil { // process record - r.Lock() - data, err := r.Marshal(r, record.JSON) - r.Unlock() + data, err := marshalRecord(r) if err != nil { api.send(opID, dbMsgTypeWarning, err.Error(), nil) } @@ -376,15 +386,15 @@ func (api *DatabaseAPI) registerSub(opID []byte, q *query.Query) (sub *database. return nil, false } - // Save subscription. - api.subsLock.Lock() - defer api.subsLock.Unlock() - api.subs[string(opID)] = sub - return sub, true } func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) { + // Save subscription. + api.subsLock.Lock() + api.subs[string(opID)] = sub + api.subsLock.Unlock() + // Remove subscription after it ended. defer func() { api.subsLock.Lock() @@ -402,9 +412,7 @@ func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) { // process sub feed if r != nil { // process record - r.Lock() - data, err := r.Marshal(r, record.JSON) - r.Unlock() + data, err := marshalRecord(r) if err != nil { api.send(opID, dbMsgTypeWarning, err.Error(), nil) continue @@ -462,6 +470,28 @@ func (api *DatabaseAPI) handleQsub(opID []byte, queryText string) { } func (api *DatabaseAPI) handleCancel(opID []byte) { + api.cancelQuery(opID) + api.cancelSub(opID) +} + +func (api *DatabaseAPI) cancelQuery(opID []byte) { + api.queriesLock.Lock() + defer api.queriesLock.Unlock() + + // Get subscription from api. + it, ok := api.queries[string(opID)] + if !ok { + // Fail silently as quries end by themselves when finished. + return + } + + // End query. + it.Cancel() + + // The query handler will end the communication with a done message. +} + +func (api *DatabaseAPI) cancelSub(opID []byte) { api.subsLock.Lock() defer api.subsLock.Unlock() @@ -478,7 +508,7 @@ func (api *DatabaseAPI) handleCancel(opID []byte) { api.send(opID, dbMsgTypeError, fmt.Sprintf("failed to cancel subscription: %s", err), nil) } - // Subscription handler will end the communication with a done message. + // The subscription handler will end the communication with a done message. } func (api *DatabaseAPI) handlePut(opID []byte, key string, data []byte, create bool) { @@ -592,3 +622,39 @@ func (api *DatabaseAPI) shutdown() { api.conn.Close() } } + +// marsharlRecords locks and marshals the given record, additionally adding +// metadata and returning it as json. +func marshalRecord(r record.Record) ([]byte, error) { + r.Lock() + defer r.Unlock() + + // Pour record into JSON. + jsonData, err := r.Marshal(r, record.JSON) + if err != nil { + return nil, err + } + + // Remove JSON identifier for manual editing. + jsonData = bytes.TrimPrefix(jsonData, varint.Pack8(record.JSON)) + + // Add metadata. + jsonData, err = sjson.SetBytes(jsonData, "_meta", r.Meta()) + if err != nil { + return nil, err + } + + // Add database key. + jsonData, err = sjson.SetBytes(jsonData, "_meta.Key", r.Key()) + if err != nil { + return nil, err + } + + // Add JSON identifier again. + formatID := varint.Pack8(record.JSON) + finalData := make([]byte, 0, len(formatID)+len(jsonData)) + finalData = append(finalData, formatID...) + finalData = append(finalData, jsonData...) + + return finalData, nil +}