mirror of
https://github.com/safing/portbase
synced 2025-09-01 18:19:57 +00:00
Finish first API version
This commit is contained in:
parent
d7e0602548
commit
31c09512a0
6 changed files with 175 additions and 22 deletions
104
api/database.go
104
api/database.go
|
@ -2,11 +2,13 @@ package api
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/tevino/abool"
|
||||
"github.com/tidwall/gjson"
|
||||
|
||||
"github.com/Safing/portbase/container"
|
||||
"github.com/Safing/portbase/database"
|
||||
|
@ -50,7 +52,7 @@ func startDatabaseAPI(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
wsConn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("could not upgrade to websocket: %s", err)
|
||||
errMsg := fmt.Sprintf("could not upgrade: %s", err)
|
||||
log.Error(errMsg)
|
||||
http.Error(w, errMsg, 400)
|
||||
return
|
||||
|
@ -78,11 +80,12 @@ func (api *DatabaseAPI) handler() {
|
|||
// 124|ok|<key>|<data>
|
||||
// 124|done
|
||||
// 124|error|<message>
|
||||
// 124|warning|<message> // error with single record, operation continues
|
||||
// 125|sub|<query>
|
||||
// 125|upd|<key>|<data>
|
||||
// 125|new|<key>|<data>
|
||||
// 125|delete|<key>|<data>
|
||||
// 125|warning|<message> // does not cancel the subscription
|
||||
// 125|warning|<message> // error with single record, operation continues
|
||||
// 127|qsub|<query>
|
||||
// 127|ok|<key>|<data>
|
||||
// 127|done
|
||||
|
@ -90,7 +93,7 @@ func (api *DatabaseAPI) handler() {
|
|||
// 127|upd|<key>|<data>
|
||||
// 127|new|<key>|<data>
|
||||
// 127|delete|<key>|<data>
|
||||
// 127|warning|<message> // does not cancel the subscription
|
||||
// 127|warning|<message> // error with single record, operation continues
|
||||
|
||||
// 128|create|<key>|<data>
|
||||
// 128|success
|
||||
|
@ -115,7 +118,7 @@ func (api *DatabaseAPI) handler() {
|
|||
return
|
||||
}
|
||||
|
||||
parts := bytes.SplitN(msg, []byte("|"), 2)
|
||||
parts := bytes.SplitN(msg, []byte("|"), 3)
|
||||
if len(parts) != 3 {
|
||||
api.send(nil, dbMsgTypeError, []byte("bad request: malformed message"))
|
||||
continue
|
||||
|
@ -137,7 +140,7 @@ func (api *DatabaseAPI) handler() {
|
|||
case "create", "update", "insert":
|
||||
|
||||
// split key and payload
|
||||
dataParts := bytes.SplitN(parts[2], []byte("|"), 1)
|
||||
dataParts := bytes.SplitN(parts[2], []byte("|"), 2)
|
||||
if len(dataParts) != 2 {
|
||||
api.send(nil, dbMsgTypeError, []byte("bad request: malformed message"))
|
||||
continue
|
||||
|
@ -146,10 +149,10 @@ func (api *DatabaseAPI) handler() {
|
|||
switch string(parts[1]) {
|
||||
case "create":
|
||||
// 128|create|<key>|<data>
|
||||
go api.handleCreate(parts[0], string(dataParts[0]), dataParts[1])
|
||||
go api.handlePut(parts[0], string(dataParts[0]), dataParts[1], true)
|
||||
case "update":
|
||||
// 129|update|<key>|<data>
|
||||
go api.handleUpdate(parts[0], string(dataParts[0]), dataParts[1])
|
||||
go api.handlePut(parts[0], string(dataParts[0]), dataParts[1], false)
|
||||
case "insert":
|
||||
// 130|insert|<key>|<data>
|
||||
go api.handleInsert(parts[0], string(dataParts[0]), dataParts[1])
|
||||
|
@ -224,6 +227,7 @@ func (api *DatabaseAPI) handleQuery(opID []byte, queryText string) {
|
|||
// 124|done
|
||||
// 124|warning|<message>
|
||||
// 124|error|<message>
|
||||
// 124|warning|<message> // error with single record, operation continues
|
||||
|
||||
var err error
|
||||
|
||||
|
@ -250,8 +254,8 @@ func (api *DatabaseAPI) processQuery(opID []byte, q *query.Query) (ok bool) {
|
|||
}
|
||||
api.send(opID, dbMsgTypeOk, data)
|
||||
}
|
||||
if it.Error != nil {
|
||||
api.send(opID, dbMsgTypeError, []byte(err.Error()))
|
||||
if it.Err != nil {
|
||||
api.send(opID, dbMsgTypeError, []byte(it.Err.Error()))
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -266,7 +270,7 @@ func (api *DatabaseAPI) handleSub(opID []byte, queryText string) {
|
|||
// 125|upd|<key>|<data>
|
||||
// 125|new|<key>|<data>
|
||||
// 125|delete|<key>
|
||||
// 125|warning|<message> // does not cancel the subscription
|
||||
// 125|warning|<message> // error with single record, operation continues
|
||||
var err error
|
||||
|
||||
q, err := query.ParseQuery(queryText)
|
||||
|
@ -314,7 +318,7 @@ func (api *DatabaseAPI) handleQsub(opID []byte, queryText string) {
|
|||
// 127|upd|<key>|<data>
|
||||
// 127|new|<key>|<data>
|
||||
// 127|delete|<key>
|
||||
// 127|warning|<message> // does not cancel the subscription
|
||||
// 127|warning|<message> // error with single record, operation continues
|
||||
|
||||
var err error
|
||||
|
||||
|
@ -335,20 +339,92 @@ func (api *DatabaseAPI) handleQsub(opID []byte, queryText string) {
|
|||
api.processSub(opID, sub)
|
||||
}
|
||||
|
||||
func (api *DatabaseAPI) handleCreate(opID []byte, key string, data []byte) {
|
||||
func (api *DatabaseAPI) handlePut(opID []byte, key string, data []byte, create bool) {
|
||||
// 128|create|<key>|<data>
|
||||
// 128|success
|
||||
// 128|error|<message>
|
||||
}
|
||||
func (api *DatabaseAPI) handleUpdate(opID []byte, key string, data []byte) {
|
||||
|
||||
// 129|update|<key>|<data>
|
||||
// 129|success
|
||||
// 129|error|<message>
|
||||
|
||||
raw := make([]byte, len(data)+1)
|
||||
raw[0] = record.JSON
|
||||
copy(raw[1:], data)
|
||||
|
||||
dbName, dbKey := record.ParseKey(key)
|
||||
|
||||
r, err := record.NewRawWrapper(dbName, dbKey, raw)
|
||||
if err != nil {
|
||||
api.send(opID, dbMsgTypeError, []byte(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
if create {
|
||||
err = api.db.PutNew(r)
|
||||
} else {
|
||||
err = api.db.Put(r)
|
||||
}
|
||||
if err != nil {
|
||||
api.send(opID, dbMsgTypeError, []byte(err.Error()))
|
||||
return
|
||||
}
|
||||
api.send(opID, dbMsgTypeSuccess, nil)
|
||||
}
|
||||
|
||||
func (api *DatabaseAPI) handleInsert(opID []byte, key string, data []byte) {
|
||||
// 130|insert|<key>|<data>
|
||||
// 130|success
|
||||
// 130|error|<message>
|
||||
|
||||
r, err := api.db.Get(key)
|
||||
if err != nil {
|
||||
api.send(opID, dbMsgTypeError, []byte(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
acc := r.GetAccessor(r)
|
||||
|
||||
result := gjson.ParseBytes(data)
|
||||
anythingPresent := false
|
||||
var insertError error
|
||||
result.ForEach(func(key gjson.Result, value gjson.Result) bool {
|
||||
anythingPresent = true
|
||||
if !key.Exists() {
|
||||
insertError = errors.New("values must be in a map")
|
||||
return false
|
||||
}
|
||||
if key.Type != gjson.String {
|
||||
insertError = errors.New("keys must be strings")
|
||||
return false
|
||||
}
|
||||
if !value.Exists() {
|
||||
insertError = errors.New("non-existent value")
|
||||
return false
|
||||
}
|
||||
insertError = acc.Set(key.String(), value.Value())
|
||||
if insertError != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if insertError != nil {
|
||||
api.send(opID, dbMsgTypeError, []byte(insertError.Error()))
|
||||
return
|
||||
}
|
||||
if !anythingPresent {
|
||||
api.send(opID, dbMsgTypeError, []byte("could not find any valid values"))
|
||||
return
|
||||
}
|
||||
|
||||
err = api.db.Put(r)
|
||||
if err != nil {
|
||||
api.send(opID, dbMsgTypeError, []byte(err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
api.send(opID, dbMsgTypeSuccess, nil)
|
||||
}
|
||||
|
||||
func (api *DatabaseAPI) shutdown() {
|
||||
|
|
22
api/enriched-response.go
Normal file
22
api/enriched-response.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type EnrichedResponseWriter struct {
|
||||
http.ResponseWriter
|
||||
Status int
|
||||
}
|
||||
|
||||
func NewEnrichedResponseWriter(w http.ResponseWriter) *EnrichedResponseWriter {
|
||||
return &EnrichedResponseWriter{
|
||||
w,
|
||||
0,
|
||||
}
|
||||
}
|
||||
|
||||
func (ew *EnrichedResponseWriter) WriteHeader(code int) {
|
||||
ew.Status = code
|
||||
ew.ResponseWriter.WriteHeader(code)
|
||||
}
|
22
api/main.go
Normal file
22
api/main.go
Normal file
|
@ -0,0 +1,22 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"github.com/Safing/portbase/modules"
|
||||
)
|
||||
|
||||
func init() {
|
||||
modules.Register("api", prep, start, stop, "database")
|
||||
}
|
||||
|
||||
func prep() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func start() error {
|
||||
go Serve()
|
||||
return nil
|
||||
}
|
||||
|
||||
func stop() error {
|
||||
return nil
|
||||
}
|
|
@ -4,26 +4,44 @@ import (
|
|||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
|
||||
"github.com/Safing/portbase/log"
|
||||
)
|
||||
|
||||
var (
|
||||
additionalRoutes map[string]func(arg1 http.ResponseWriter, arg2 *http.Request)
|
||||
additionalRoutes map[string]http.Handler
|
||||
)
|
||||
|
||||
func RegisterAdditionalRoute(path string, handleFunc func(arg1 http.ResponseWriter, arg2 *http.Request)) {
|
||||
func RegisterAdditionalRoute(path string, handler http.Handler) {
|
||||
if additionalRoutes == nil {
|
||||
additionalRoutes = make(map[string]func(arg1 http.ResponseWriter, arg2 *http.Request))
|
||||
additionalRoutes = make(map[string]http.Handler)
|
||||
}
|
||||
additionalRoutes[path] = handleFunc
|
||||
additionalRoutes[path] = handler
|
||||
}
|
||||
|
||||
func logger(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ew := NewEnrichedResponseWriter(w)
|
||||
next.ServeHTTP(ew, r)
|
||||
log.Infof("api request: %s %d %s", r.RemoteAddr, ew.Status, r.RequestURI)
|
||||
})
|
||||
}
|
||||
|
||||
func Serve() {
|
||||
|
||||
router := mux.NewRouter()
|
||||
router.HandleFunc("/api/database/v1", startDatabaseAPI)
|
||||
// router.HandleFunc("/api/database/v1", startDatabaseAPI)
|
||||
|
||||
for path, handleFunc := range additionalRoutes {
|
||||
router.HandleFunc(path, handleFunc)
|
||||
for path, handler := range additionalRoutes {
|
||||
router.Handle(path, handler)
|
||||
}
|
||||
|
||||
router.Use(logger)
|
||||
|
||||
http.Handle("/", router)
|
||||
http.HandleFunc("/api/database/v1", startDatabaseAPI)
|
||||
|
||||
address := "127.0.0.1:18"
|
||||
log.Infof("api: starting to listen on %s", address)
|
||||
log.Errorf("api: failed to listen on %s: %s", address, http.ListenAndServe(address, nil))
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
<body>
|
||||
<script type="text/javascript">
|
||||
|
||||
var ws = new WebSocket('ws://localhost:18/api/v1');
|
||||
var ws = new WebSocket('ws://127.0.0.1:18/api/database/v1')
|
||||
|
||||
ws.onopen = function () {
|
||||
console.log('open');
|
||||
|
@ -26,6 +26,10 @@
|
|||
reader.readAsText(e.data)
|
||||
};
|
||||
|
||||
function send(text) {
|
||||
ws.send(text)
|
||||
}
|
||||
|
||||
// var sock = new SockJS("http://localhost:8080/api/v1");
|
||||
//
|
||||
// sock.onopen = function() {
|
11
api/testclient/serve.go
Normal file
11
api/testclient/serve.go
Normal file
|
@ -0,0 +1,11 @@
|
|||
package testclient
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/Safing/portbase/api"
|
||||
)
|
||||
|
||||
func init() {
|
||||
api.RegisterAdditionalRoute("/test/", http.StripPrefix("/test/", http.FileServer(http.Dir("./api/testclient/root/"))))
|
||||
}
|
Loading…
Add table
Reference in a new issue