Use workers for database api handlers

This commit is contained in:
Daniel 2021-05-11 14:59:00 +02:00
parent 745a27d92d
commit 8c1c522475

View file

@ -2,6 +2,7 @@ package api
import ( import (
"bytes" "bytes"
"context"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -38,7 +39,8 @@ const (
) )
var ( var (
dbAPISeperatorBytes = []byte(dbAPISeperator) dbAPISeperatorBytes = []byte(dbAPISeperator)
dbCompatibilityPermission = PermitAdmin
) )
func init() { func init() {
@ -46,8 +48,8 @@ func init() {
startDatabaseAPI, startDatabaseAPI,
// Default to admin read/write permissions until the database gets support // Default to admin read/write permissions until the database gets support
// for api permissions. // for api permissions.
PermitAdmin, dbCompatibilityPermission,
PermitAdmin, dbCompatibilityPermission,
)) ))
} }
@ -96,13 +98,13 @@ func startDatabaseAPI(w http.ResponseWriter, r *http.Request) {
db: database.NewInterface(nil), db: database.NewInterface(nil),
} }
go new.handler() module.StartWorker("database api handler", new.handler)
go new.writer() module.StartWorker("database api writer", new.writer)
log.Tracer(r.Context()).Infof("api request: init websocket %s %s", r.RemoteAddr, r.RequestURI) log.Tracer(r.Context()).Infof("api request: init websocket %s %s", r.RemoteAddr, r.RequestURI)
} }
func (api *DatabaseAPI) handler() { func (api *DatabaseAPI) handler(context.Context) error {
// 123|get|<key> // 123|get|<key>
// 123|ok|<key>|<data> // 123|ok|<key>|<data>
@ -146,19 +148,7 @@ func (api *DatabaseAPI) handler() {
_, msg, err := api.conn.ReadMessage() _, msg, err := api.conn.ReadMessage()
if err != nil { if err != nil {
if !api.shuttingDown.IsSet() { return api.shutdown(err)
api.shutdown()
if websocket.IsCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
) {
log.Infof("api: websocket connection to %s closed", api.conn.RemoteAddr())
} else {
log.Warningf("api: websocket read error from %s: %s", api.conn.RemoteAddr(), err)
}
}
return
} }
parts := bytes.SplitN(msg, []byte("|"), 3) parts := bytes.SplitN(msg, []byte("|"), 3)
@ -218,45 +208,56 @@ func (api *DatabaseAPI) handler() {
} }
} }
func (api *DatabaseAPI) writer() { func (api *DatabaseAPI) writer(ctx context.Context) error {
var data []byte var data []byte
var err error var err error
for { for {
data = nil
select { select {
// prioritize direct writes // prioritize direct writes
case data = <-api.sendQueue: case data = <-api.sendQueue:
if len(data) == 0 { if len(data) == 0 {
api.shutdown() return api.shutdown(nil)
return
} }
case <-ctx.Done():
return api.shutdown(nil)
case <-api.shutdownSignal: case <-api.shutdownSignal:
return return api.shutdown(nil)
} }
// log.Tracef("api: sending %s", string(*msg)) // log.Tracef("api: sending %s", string(*msg))
err = api.conn.WriteMessage(websocket.BinaryMessage, data) err = api.conn.WriteMessage(websocket.BinaryMessage, data)
if err != nil { if err != nil {
if !api.shuttingDown.IsSet() { return api.shutdown(err)
api.shutdown()
if websocket.IsCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
) {
log.Infof("api: websocket connection to %s closed", api.conn.RemoteAddr())
} else {
log.Warningf("api: websocket write error to %s: %s", api.conn.RemoteAddr(), err)
}
}
return
} }
} }
} }
func (api *DatabaseAPI) shutdown(err error) error {
// Check if we are the first to shut down.
if !api.shuttingDown.SetToIf(false, true) {
return nil
}
// Check the given error.
if err != nil {
if websocket.IsCloseError(err,
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
) {
log.Infof("api: websocket connection to %s closed", api.conn.RemoteAddr())
} else {
log.Warningf("api: websocket connection error with %s: %s", api.conn.RemoteAddr(), err)
}
}
// Trigger shutdown.
close(api.shutdownSignal)
api.conn.Close()
return nil
}
func (api *DatabaseAPI) send(opID []byte, msgType string, msgOrKey string, data []byte) { func (api *DatabaseAPI) send(opID []byte, msgType string, msgOrKey string, data []byte) {
c := container.New(opID) c := container.New(opID)
c.Append(dbAPISeperatorBytes) c.Append(dbAPISeperatorBytes)
@ -622,13 +623,6 @@ func (api *DatabaseAPI) handleDelete(opID []byte, key string) {
api.send(opID, dbMsgTypeSuccess, emptyString, nil) api.send(opID, dbMsgTypeSuccess, emptyString, nil)
} }
func (api *DatabaseAPI) shutdown() {
if api.shuttingDown.SetToIf(false, true) {
close(api.shutdownSignal)
api.conn.Close()
}
}
// marsharlRecords locks and marshals the given record, additionally adding // marsharlRecords locks and marshals the given record, additionally adding
// metadata and returning it as json. // metadata and returning it as json.
func marshalRecord(r record.Record, withDSDIdentifier bool) ([]byte, error) { func marshalRecord(r record.Record, withDSDIdentifier bool) ([]byte, error) {