From 8c1c522475cb5cbca8bbdc9bc14974566a0afb25 Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 11 May 2021 14:59:00 +0200 Subject: [PATCH] Use workers for database api handlers --- api/database.go | 86 +++++++++++++++++++++++-------------------------- 1 file changed, 40 insertions(+), 46 deletions(-) diff --git a/api/database.go b/api/database.go index d893f10..f23523f 100644 --- a/api/database.go +++ b/api/database.go @@ -2,6 +2,7 @@ package api import ( "bytes" + "context" "errors" "fmt" "net/http" @@ -38,7 +39,8 @@ const ( ) var ( - dbAPISeperatorBytes = []byte(dbAPISeperator) + dbAPISeperatorBytes = []byte(dbAPISeperator) + dbCompatibilityPermission = PermitAdmin ) func init() { @@ -46,8 +48,8 @@ func init() { startDatabaseAPI, // Default to admin read/write permissions until the database gets support // for api permissions. - PermitAdmin, - PermitAdmin, + dbCompatibilityPermission, + dbCompatibilityPermission, )) } @@ -96,13 +98,13 @@ func startDatabaseAPI(w http.ResponseWriter, r *http.Request) { db: database.NewInterface(nil), } - go new.handler() - go new.writer() + module.StartWorker("database api handler", new.handler) + module.StartWorker("database api writer", new.writer) log.Tracer(r.Context()).Infof("api request: init websocket %s %s", r.RemoteAddr, r.RequestURI) } -func (api *DatabaseAPI) handler() { +func (api *DatabaseAPI) handler(context.Context) error { // 123|get| // 123|ok|| @@ -146,19 +148,7 @@ func (api *DatabaseAPI) handler() { _, msg, err := api.conn.ReadMessage() if err != nil { - if !api.shuttingDown.IsSet() { - api.shutdown() - if websocket.IsCloseError(err, - websocket.CloseNormalClosure, - websocket.CloseGoingAway, - websocket.CloseAbnormalClosure, - ) { - log.Infof("api: websocket connection to %s closed", api.conn.RemoteAddr()) - } else { - log.Warningf("api: websocket read error from %s: %s", api.conn.RemoteAddr(), err) - } - } - return + return api.shutdown(err) } parts := bytes.SplitN(msg, []byte("|"), 3) @@ -218,45 +208,56 @@ func (api *DatabaseAPI) handler() { } } -func (api *DatabaseAPI) writer() { +func (api *DatabaseAPI) writer(ctx context.Context) error { var data []byte var err error for { - data = nil - select { // prioritize direct writes case data = <-api.sendQueue: if len(data) == 0 { - api.shutdown() - return + return api.shutdown(nil) } + case <-ctx.Done(): + return api.shutdown(nil) case <-api.shutdownSignal: - return + return api.shutdown(nil) } // log.Tracef("api: sending %s", string(*msg)) err = api.conn.WriteMessage(websocket.BinaryMessage, data) if err != nil { - if !api.shuttingDown.IsSet() { - api.shutdown() - if websocket.IsCloseError(err, - websocket.CloseNormalClosure, - websocket.CloseGoingAway, - websocket.CloseAbnormalClosure, - ) { - log.Infof("api: websocket connection to %s closed", api.conn.RemoteAddr()) - } else { - log.Warningf("api: websocket write error to %s: %s", api.conn.RemoteAddr(), err) - } - } - return + return api.shutdown(err) } - } } +func (api *DatabaseAPI) shutdown(err error) error { + // Check if we are the first to shut down. + if !api.shuttingDown.SetToIf(false, true) { + return nil + } + + // Check the given error. + if err != nil { + if websocket.IsCloseError(err, + websocket.CloseNormalClosure, + websocket.CloseGoingAway, + websocket.CloseAbnormalClosure, + ) { + log.Infof("api: websocket connection to %s closed", api.conn.RemoteAddr()) + } else { + log.Warningf("api: websocket connection error with %s: %s", api.conn.RemoteAddr(), err) + } + } + + // Trigger shutdown. + close(api.shutdownSignal) + api.conn.Close() + return nil +} + func (api *DatabaseAPI) send(opID []byte, msgType string, msgOrKey string, data []byte) { c := container.New(opID) c.Append(dbAPISeperatorBytes) @@ -622,13 +623,6 @@ func (api *DatabaseAPI) handleDelete(opID []byte, key string) { api.send(opID, dbMsgTypeSuccess, emptyString, nil) } -func (api *DatabaseAPI) shutdown() { - if api.shuttingDown.SetToIf(false, true) { - close(api.shutdownSignal) - api.conn.Close() - } -} - // marsharlRecords locks and marshals the given record, additionally adding // metadata and returning it as json. func marshalRecord(r record.Record, withDSDIdentifier bool) ([]byte, error) {