Fix and improve api messages

This commit is contained in:
Daniel 2018-10-02 11:34:32 +02:00
parent 0a46061c07
commit 18c6f01784

View file

@ -26,6 +26,13 @@ const (
dbMsgTypeNew = "new" dbMsgTypeNew = "new"
dbMsgTypeDelete = "delete" dbMsgTypeDelete = "delete"
dbMsgTypeWarning = "warning" dbMsgTypeWarning = "warning"
dbApiSeperator = "|"
emptyString = ""
)
var (
dbApiSeperatorBytes = []byte(dbApiSeperator)
) )
// DatabaseAPI is a database API instance. // DatabaseAPI is a database API instance.
@ -120,7 +127,7 @@ func (api *DatabaseAPI) handler() {
parts := bytes.SplitN(msg, []byte("|"), 3) parts := bytes.SplitN(msg, []byte("|"), 3)
if len(parts) != 3 { if len(parts) != 3 {
api.send(nil, dbMsgTypeError, []byte("bad request: malformed message")) api.send(nil, dbMsgTypeError, "bad request: malformed message", nil)
continue continue
} }
@ -142,7 +149,7 @@ func (api *DatabaseAPI) handler() {
// split key and payload // split key and payload
dataParts := bytes.SplitN(parts[2], []byte("|"), 2) dataParts := bytes.SplitN(parts[2], []byte("|"), 2)
if len(dataParts) != 2 { if len(dataParts) != 2 {
api.send(nil, dbMsgTypeError, []byte("bad request: malformed message")) api.send(nil, dbMsgTypeError, "bad request: malformed message", nil)
continue continue
} }
@ -159,7 +166,7 @@ func (api *DatabaseAPI) handler() {
} }
default: default:
api.send(parts[0], dbMsgTypeError, []byte("bad request: unknown method")) api.send(parts[0], dbMsgTypeError, "bad request: unknown method", nil)
} }
} }
} }
@ -197,10 +204,21 @@ func (api *DatabaseAPI) writer() {
} }
} }
func (api *DatabaseAPI) send(opID []byte, msgType string, data []byte) { func (api *DatabaseAPI) send(opID []byte, msgType string, msgOrKey string, data []byte) {
c := container.New(opID) c := container.New(opID)
c.Append([]byte(fmt.Sprintf("|%s|", msgType))) c.Append(dbApiSeperatorBytes)
c.Append(data) c.Append([]byte(msgType))
if msgOrKey != emptyString {
c.Append(dbApiSeperatorBytes)
c.Append([]byte(msgOrKey))
}
if len(data) > 0 {
c.Append(dbApiSeperatorBytes)
c.Append(data)
}
api.sendQueue <- c.CompileData() api.sendQueue <- c.CompileData()
} }
@ -215,10 +233,10 @@ func (api *DatabaseAPI) handleGet(opID []byte, key string) {
if err == nil { if err == nil {
data, err = r.Marshal(r, record.JSON) data, err = r.Marshal(r, record.JSON)
} else { } else {
api.send(opID, dbMsgTypeError, []byte(err.Error())) api.send(opID, dbMsgTypeError, err.Error(), nil)
return return
} }
api.send(opID, dbMsgTypeOk, data) api.send(opID, dbMsgTypeOk, r.DatabaseKey(), data)
} }
func (api *DatabaseAPI) handleQuery(opID []byte, queryText string) { func (api *DatabaseAPI) handleQuery(opID []byte, queryText string) {
@ -233,7 +251,7 @@ func (api *DatabaseAPI) handleQuery(opID []byte, queryText string) {
q, err := query.ParseQuery(queryText) q, err := query.ParseQuery(queryText)
if err != nil { if err != nil {
api.send(opID, dbMsgTypeError, []byte(err.Error())) api.send(opID, dbMsgTypeError, err.Error(), nil)
return return
} }
@ -243,23 +261,23 @@ func (api *DatabaseAPI) handleQuery(opID []byte, queryText string) {
func (api *DatabaseAPI) processQuery(opID []byte, q *query.Query) (ok bool) { func (api *DatabaseAPI) processQuery(opID []byte, q *query.Query) (ok bool) {
it, err := api.db.Query(q) it, err := api.db.Query(q)
if err != nil { if err != nil {
api.send(opID, dbMsgTypeError, []byte(err.Error())) api.send(opID, dbMsgTypeError, err.Error(), nil)
return false return false
} }
for r := range it.Next { for r := range it.Next {
data, err := r.Marshal(r, record.JSON) data, err := r.Marshal(r, record.JSON)
if err != nil { if err != nil {
api.send(opID, dbMsgTypeWarning, []byte(err.Error())) api.send(opID, dbMsgTypeWarning, err.Error(), nil)
} }
api.send(opID, dbMsgTypeOk, data) api.send(opID, dbMsgTypeOk, r.DatabaseKey(), data)
} }
if it.Err != nil { if it.Err != nil {
api.send(opID, dbMsgTypeError, []byte(it.Err.Error())) api.send(opID, dbMsgTypeError, it.Err.Error(), nil)
return false return false
} }
api.send(opID, dbMsgTypeDone, nil) api.send(opID, dbMsgTypeDone, emptyString, nil)
return true return true
} }
@ -275,7 +293,7 @@ func (api *DatabaseAPI) handleSub(opID []byte, queryText string) {
q, err := query.ParseQuery(queryText) q, err := query.ParseQuery(queryText)
if err != nil { if err != nil {
api.send(opID, dbMsgTypeError, []byte(err.Error())) api.send(opID, dbMsgTypeError, err.Error(), nil)
return return
} }
@ -290,7 +308,7 @@ func (api *DatabaseAPI) registerSub(opID []byte, q *query.Query) (sub *database.
var err error var err error
sub, err = api.db.Subscribe(q) sub, err = api.db.Subscribe(q)
if err != nil { if err != nil {
api.send(opID, dbMsgTypeError, []byte(err.Error())) api.send(opID, dbMsgTypeError, err.Error(), nil)
return nil, false return nil, false
} }
return sub, true return sub, true
@ -300,13 +318,13 @@ func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) {
for r := range sub.Feed { for r := range sub.Feed {
data, err := r.Marshal(r, record.JSON) data, err := r.Marshal(r, record.JSON)
if err != nil { if err != nil {
api.send(opID, dbMsgTypeWarning, []byte(err.Error())) api.send(opID, dbMsgTypeWarning, err.Error(), nil)
} }
// TODO: use upd, new and delete msgTypes // TODO: use upd, new and delete msgTypes
api.send(opID, dbMsgTypeOk, data) api.send(opID, dbMsgTypeUpd, r.DatabaseKey(), data)
} }
if sub.Err != nil { if sub.Err != nil {
api.send(opID, dbMsgTypeError, []byte(sub.Err.Error())) api.send(opID, dbMsgTypeError, sub.Err.Error(), nil)
} }
} }
@ -324,7 +342,7 @@ func (api *DatabaseAPI) handleQsub(opID []byte, queryText string) {
q, err := query.ParseQuery(queryText) q, err := query.ParseQuery(queryText)
if err != nil { if err != nil {
api.send(opID, dbMsgTypeError, []byte(err.Error())) api.send(opID, dbMsgTypeError, err.Error(), nil)
return return
} }
@ -356,7 +374,7 @@ func (api *DatabaseAPI) handlePut(opID []byte, key string, data []byte, create b
r, err := record.NewRawWrapper(dbName, dbKey, raw) r, err := record.NewRawWrapper(dbName, dbKey, raw)
if err != nil { if err != nil {
api.send(opID, dbMsgTypeError, []byte(err.Error())) api.send(opID, dbMsgTypeError, err.Error(), nil)
return return
} }
@ -366,10 +384,10 @@ func (api *DatabaseAPI) handlePut(opID []byte, key string, data []byte, create b
err = api.db.Put(r) err = api.db.Put(r)
} }
if err != nil { if err != nil {
api.send(opID, dbMsgTypeError, []byte(err.Error())) api.send(opID, dbMsgTypeError, err.Error(), nil)
return return
} }
api.send(opID, dbMsgTypeSuccess, nil) api.send(opID, dbMsgTypeSuccess, emptyString, nil)
} }
func (api *DatabaseAPI) handleInsert(opID []byte, key string, data []byte) { func (api *DatabaseAPI) handleInsert(opID []byte, key string, data []byte) {
@ -379,7 +397,7 @@ func (api *DatabaseAPI) handleInsert(opID []byte, key string, data []byte) {
r, err := api.db.Get(key) r, err := api.db.Get(key)
if err != nil { if err != nil {
api.send(opID, dbMsgTypeError, []byte(err.Error())) api.send(opID, dbMsgTypeError, err.Error(), nil)
return return
} }
@ -410,21 +428,21 @@ func (api *DatabaseAPI) handleInsert(opID []byte, key string, data []byte) {
}) })
if insertError != nil { if insertError != nil {
api.send(opID, dbMsgTypeError, []byte(insertError.Error())) api.send(opID, dbMsgTypeError, insertError.Error(), nil)
return return
} }
if !anythingPresent { if !anythingPresent {
api.send(opID, dbMsgTypeError, []byte("could not find any valid values")) api.send(opID, dbMsgTypeError, "could not find any valid values", nil)
return return
} }
err = api.db.Put(r) err = api.db.Put(r)
if err != nil { if err != nil {
api.send(opID, dbMsgTypeError, []byte(err.Error())) api.send(opID, dbMsgTypeError, err.Error(), nil)
return return
} }
api.send(opID, dbMsgTypeSuccess, nil) api.send(opID, dbMsgTypeSuccess, emptyString, nil)
} }
func (api *DatabaseAPI) shutdown() { func (api *DatabaseAPI) shutdown() {