mirror of
https://github.com/navidrome/navidrome.git
synced 2026-04-28 03:19:38 +00:00
feat(prometheus): add metrics to Subsonic API and Plugins (#4266)
* Add prometheus metrics to subsonic and plugins * address feedback, do not log error if operation is not supported * add missing timestamp and client to stats * remove .view from subsonic route * directly inject DataStore in Prometheus, to avoid having to pass it in every call Signed-off-by: Deluan <deluan@navidrome.org> --------- Signed-off-by: Deluan <deluan@navidrome.org> Co-authored-by: Deluan <deluan@navidrome.org>
This commit is contained in:
parent
709714cfc0
commit
0cd15c1ddc
22 changed files with 246 additions and 89 deletions
|
|
@ -67,7 +67,8 @@ func CreateSubsonicAPIRouter(ctx context.Context) *subsonic.Router {
|
|||
dataStore := persistence.New(sqlDB)
|
||||
fileCache := artwork.GetImageCache()
|
||||
fFmpeg := ffmpeg.New()
|
||||
manager := plugins.GetManager(dataStore)
|
||||
metricsMetrics := metrics.GetPrometheusInstance(dataStore)
|
||||
manager := plugins.GetManager(dataStore, metricsMetrics)
|
||||
agentsAgents := agents.GetAgents(dataStore, manager)
|
||||
provider := external.NewProvider(dataStore, agentsAgents)
|
||||
artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider)
|
||||
|
|
@ -79,11 +80,10 @@ func CreateSubsonicAPIRouter(ctx context.Context) *subsonic.Router {
|
|||
cacheWarmer := artwork.NewCacheWarmer(artworkArtwork, fileCache)
|
||||
broker := events.GetBroker()
|
||||
playlists := core.NewPlaylists(dataStore)
|
||||
metricsMetrics := metrics.NewPrometheusInstance(dataStore)
|
||||
scannerScanner := scanner.New(ctx, dataStore, cacheWarmer, broker, playlists, metricsMetrics)
|
||||
playTracker := scrobbler.GetPlayTracker(dataStore, broker, manager)
|
||||
playbackServer := playback.GetInstance(dataStore)
|
||||
router := subsonic.New(dataStore, artworkArtwork, mediaStreamer, archiver, players, provider, scannerScanner, broker, playlists, playTracker, share, playbackServer)
|
||||
router := subsonic.New(dataStore, artworkArtwork, mediaStreamer, archiver, players, provider, scannerScanner, broker, playlists, playTracker, share, playbackServer, metricsMetrics)
|
||||
return router
|
||||
}
|
||||
|
||||
|
|
@ -92,7 +92,8 @@ func CreatePublicRouter() *public.Router {
|
|||
dataStore := persistence.New(sqlDB)
|
||||
fileCache := artwork.GetImageCache()
|
||||
fFmpeg := ffmpeg.New()
|
||||
manager := plugins.GetManager(dataStore)
|
||||
metricsMetrics := metrics.GetPrometheusInstance(dataStore)
|
||||
manager := plugins.GetManager(dataStore, metricsMetrics)
|
||||
agentsAgents := agents.GetAgents(dataStore, manager)
|
||||
provider := external.NewProvider(dataStore, agentsAgents)
|
||||
artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider)
|
||||
|
|
@ -128,7 +129,7 @@ func CreateInsights() metrics.Insights {
|
|||
func CreatePrometheus() metrics.Metrics {
|
||||
sqlDB := db.Db()
|
||||
dataStore := persistence.New(sqlDB)
|
||||
metricsMetrics := metrics.NewPrometheusInstance(dataStore)
|
||||
metricsMetrics := metrics.GetPrometheusInstance(dataStore)
|
||||
return metricsMetrics
|
||||
}
|
||||
|
||||
|
|
@ -137,14 +138,14 @@ func CreateScanner(ctx context.Context) scanner.Scanner {
|
|||
dataStore := persistence.New(sqlDB)
|
||||
fileCache := artwork.GetImageCache()
|
||||
fFmpeg := ffmpeg.New()
|
||||
manager := plugins.GetManager(dataStore)
|
||||
metricsMetrics := metrics.GetPrometheusInstance(dataStore)
|
||||
manager := plugins.GetManager(dataStore, metricsMetrics)
|
||||
agentsAgents := agents.GetAgents(dataStore, manager)
|
||||
provider := external.NewProvider(dataStore, agentsAgents)
|
||||
artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider)
|
||||
cacheWarmer := artwork.NewCacheWarmer(artworkArtwork, fileCache)
|
||||
broker := events.GetBroker()
|
||||
playlists := core.NewPlaylists(dataStore)
|
||||
metricsMetrics := metrics.NewPrometheusInstance(dataStore)
|
||||
scannerScanner := scanner.New(ctx, dataStore, cacheWarmer, broker, playlists, metricsMetrics)
|
||||
return scannerScanner
|
||||
}
|
||||
|
|
@ -154,14 +155,14 @@ func CreateScanWatcher(ctx context.Context) scanner.Watcher {
|
|||
dataStore := persistence.New(sqlDB)
|
||||
fileCache := artwork.GetImageCache()
|
||||
fFmpeg := ffmpeg.New()
|
||||
manager := plugins.GetManager(dataStore)
|
||||
metricsMetrics := metrics.GetPrometheusInstance(dataStore)
|
||||
manager := plugins.GetManager(dataStore, metricsMetrics)
|
||||
agentsAgents := agents.GetAgents(dataStore, manager)
|
||||
provider := external.NewProvider(dataStore, agentsAgents)
|
||||
artworkArtwork := artwork.NewArtwork(dataStore, fileCache, fFmpeg, provider)
|
||||
cacheWarmer := artwork.NewCacheWarmer(artworkArtwork, fileCache)
|
||||
broker := events.GetBroker()
|
||||
playlists := core.NewPlaylists(dataStore)
|
||||
metricsMetrics := metrics.NewPrometheusInstance(dataStore)
|
||||
scannerScanner := scanner.New(ctx, dataStore, cacheWarmer, broker, playlists, metricsMetrics)
|
||||
watcher := scanner.NewWatcher(dataStore, scannerScanner)
|
||||
return watcher
|
||||
|
|
@ -177,13 +178,14 @@ func GetPlaybackServer() playback.PlaybackServer {
|
|||
func getPluginManager() *plugins.Manager {
|
||||
sqlDB := db.Db()
|
||||
dataStore := persistence.New(sqlDB)
|
||||
manager := plugins.GetManager(dataStore)
|
||||
metricsMetrics := metrics.GetPrometheusInstance(dataStore)
|
||||
manager := plugins.GetManager(dataStore, metricsMetrics)
|
||||
return manager
|
||||
}
|
||||
|
||||
// wire_injectors.go:
|
||||
|
||||
var allProviders = wire.NewSet(core.Set, artwork.Set, server.New, subsonic.New, nativeapi.New, public.New, persistence.New, lastfm.NewRouter, listenbrainz.NewRouter, events.GetBroker, scanner.New, scanner.NewWatcher, plugins.GetManager, metrics.NewPrometheusInstance, db.Db, wire.Bind(new(agents.PluginLoader), new(*plugins.Manager)), wire.Bind(new(scrobbler.PluginLoader), new(*plugins.Manager)))
|
||||
var allProviders = wire.NewSet(core.Set, artwork.Set, server.New, subsonic.New, nativeapi.New, public.New, persistence.New, lastfm.NewRouter, listenbrainz.NewRouter, events.GetBroker, scanner.New, scanner.NewWatcher, plugins.GetManager, metrics.GetPrometheusInstance, db.Db, wire.Bind(new(agents.PluginLoader), new(*plugins.Manager)), wire.Bind(new(scrobbler.PluginLoader), new(*plugins.Manager)))
|
||||
|
||||
func GetPluginManager(ctx context.Context) *plugins.Manager {
|
||||
manager := getPluginManager()
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ var allProviders = wire.NewSet(
|
|||
scanner.New,
|
||||
scanner.NewWatcher,
|
||||
plugins.GetManager,
|
||||
metrics.NewPrometheusInstance,
|
||||
metrics.GetPrometheusInstance,
|
||||
db.Db,
|
||||
wire.Bind(new(agents.PluginLoader), new(*plugins.Manager)),
|
||||
wire.Bind(new(scrobbler.PluginLoader), new(*plugins.Manager)),
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package metrics
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
|
@ -13,6 +12,7 @@ import (
|
|||
"github.com/navidrome/navidrome/consts"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/model"
|
||||
"github.com/navidrome/navidrome/utils/singleton"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
|
@ -20,6 +20,8 @@ import (
|
|||
type Metrics interface {
|
||||
WriteInitialMetrics(ctx context.Context)
|
||||
WriteAfterScanMetrics(ctx context.Context, success bool)
|
||||
RecordRequest(ctx context.Context, endpoint, method, client string, status int, elapsed int64)
|
||||
RecordPluginRequest(ctx context.Context, plugin, method string, ok bool, elapsed int64)
|
||||
GetHandler() http.Handler
|
||||
}
|
||||
|
||||
|
|
@ -27,11 +29,14 @@ type metrics struct {
|
|||
ds model.DataStore
|
||||
}
|
||||
|
||||
func NewPrometheusInstance(ds model.DataStore) Metrics {
|
||||
if conf.Server.Prometheus.Enabled {
|
||||
return &metrics{ds: ds}
|
||||
func GetPrometheusInstance(ds model.DataStore) Metrics {
|
||||
if !conf.Server.Prometheus.Enabled {
|
||||
return noopMetrics{}
|
||||
}
|
||||
return noopMetrics{}
|
||||
|
||||
return singleton.GetInstance(func() *metrics {
|
||||
return &metrics{ds: ds}
|
||||
})
|
||||
}
|
||||
|
||||
func NewNoopInstance() Metrics {
|
||||
|
|
@ -51,6 +56,38 @@ func (m *metrics) WriteAfterScanMetrics(ctx context.Context, success bool) {
|
|||
getPrometheusMetrics().mediaScansCounter.With(scanLabels).Inc()
|
||||
}
|
||||
|
||||
func (m *metrics) RecordRequest(_ context.Context, endpoint, method, client string, status int, elapsed int64) {
|
||||
httpLabel := prometheus.Labels{
|
||||
"endpoint": endpoint,
|
||||
"method": method,
|
||||
"client": client,
|
||||
"status": strconv.FormatInt(int64(status), 10),
|
||||
}
|
||||
getPrometheusMetrics().httpRequestCounter.With(httpLabel).Inc()
|
||||
|
||||
httpLatencyLabel := prometheus.Labels{
|
||||
"endpoint": endpoint,
|
||||
"method": method,
|
||||
"client": client,
|
||||
}
|
||||
getPrometheusMetrics().httpRequestDuration.With(httpLatencyLabel).Observe(float64(elapsed))
|
||||
}
|
||||
|
||||
func (m *metrics) RecordPluginRequest(_ context.Context, plugin, method string, ok bool, elapsed int64) {
|
||||
pluginLabel := prometheus.Labels{
|
||||
"plugin": plugin,
|
||||
"method": method,
|
||||
"ok": strconv.FormatBool(ok),
|
||||
}
|
||||
getPrometheusMetrics().pluginRequestCounter.With(pluginLabel).Inc()
|
||||
|
||||
pluginLatencyLabel := prometheus.Labels{
|
||||
"plugin": plugin,
|
||||
"method": method,
|
||||
}
|
||||
getPrometheusMetrics().pluginRequestDuration.With(pluginLatencyLabel).Observe(float64(elapsed))
|
||||
}
|
||||
|
||||
func (m *metrics) GetHandler() http.Handler {
|
||||
r := chi.NewRouter()
|
||||
|
||||
|
|
@ -59,20 +96,31 @@ func (m *metrics) GetHandler() http.Handler {
|
|||
consts.PrometheusAuthUser: conf.Server.Prometheus.Password,
|
||||
}))
|
||||
}
|
||||
r.Handle("/", promhttp.Handler())
|
||||
|
||||
// Enable created at timestamp to handle zero counter on create.
|
||||
// This requires --enable-feature=created-timestamp-zero-ingestion to be passed in Prometheus
|
||||
r.Handle("/", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{
|
||||
EnableOpenMetrics: true,
|
||||
EnableOpenMetricsTextCreatedSamples: true,
|
||||
}))
|
||||
return r
|
||||
}
|
||||
|
||||
type prometheusMetrics struct {
|
||||
dbTotal *prometheus.GaugeVec
|
||||
versionInfo *prometheus.GaugeVec
|
||||
lastMediaScan *prometheus.GaugeVec
|
||||
mediaScansCounter *prometheus.CounterVec
|
||||
dbTotal *prometheus.GaugeVec
|
||||
versionInfo *prometheus.GaugeVec
|
||||
lastMediaScan *prometheus.GaugeVec
|
||||
mediaScansCounter *prometheus.CounterVec
|
||||
httpRequestCounter *prometheus.CounterVec
|
||||
httpRequestDuration *prometheus.SummaryVec
|
||||
pluginRequestCounter *prometheus.CounterVec
|
||||
pluginRequestDuration *prometheus.SummaryVec
|
||||
}
|
||||
|
||||
// Prometheus' metrics requires initialization. But not more than once
|
||||
var getPrometheusMetrics = sync.OnceValue(func() *prometheusMetrics {
|
||||
quartilesToEstimate := map[float64]float64{0.5: 0.05, 0.75: 0.025, 0.9: 0.01, 0.99: 0.001}
|
||||
|
||||
instance := &prometheusMetrics{
|
||||
dbTotal: prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
|
|
@ -102,23 +150,49 @@ var getPrometheusMetrics = sync.OnceValue(func() *prometheusMetrics {
|
|||
},
|
||||
[]string{"success"},
|
||||
),
|
||||
httpRequestCounter: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "http_request_count",
|
||||
Help: "Request types by status",
|
||||
},
|
||||
[]string{"endpoint", "method", "client", "status"},
|
||||
),
|
||||
httpRequestDuration: prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "http_request_latency",
|
||||
Help: "Latency (in ms) of HTTP requests",
|
||||
Objectives: quartilesToEstimate,
|
||||
},
|
||||
[]string{"endpoint", "method", "client"},
|
||||
),
|
||||
pluginRequestCounter: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "plugin_request_count",
|
||||
Help: "Plugin requests by method/status",
|
||||
},
|
||||
[]string{"plugin", "method", "ok"},
|
||||
),
|
||||
pluginRequestDuration: prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "plugin_request_latency",
|
||||
Help: "Latency (in ms) of plugin requests",
|
||||
Objectives: quartilesToEstimate,
|
||||
},
|
||||
[]string{"plugin", "method"},
|
||||
),
|
||||
}
|
||||
err := prometheus.DefaultRegisterer.Register(instance.dbTotal)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to create Prometheus metric instance", fmt.Errorf("unable to register db_model_totals metrics: %w", err))
|
||||
}
|
||||
err = prometheus.DefaultRegisterer.Register(instance.versionInfo)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to create Prometheus metric instance", fmt.Errorf("unable to register navidrome_info metrics: %w", err))
|
||||
}
|
||||
err = prometheus.DefaultRegisterer.Register(instance.lastMediaScan)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to create Prometheus metric instance", fmt.Errorf("unable to register media_scan_last metrics: %w", err))
|
||||
}
|
||||
err = prometheus.DefaultRegisterer.Register(instance.mediaScansCounter)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to create Prometheus metric instance", fmt.Errorf("unable to register media_scans metrics: %w", err))
|
||||
}
|
||||
|
||||
prometheus.DefaultRegisterer.MustRegister(
|
||||
instance.dbTotal,
|
||||
instance.versionInfo,
|
||||
instance.lastMediaScan,
|
||||
instance.mediaScansCounter,
|
||||
instance.httpRequestCounter,
|
||||
instance.httpRequestDuration,
|
||||
instance.pluginRequestCounter,
|
||||
instance.pluginRequestDuration,
|
||||
)
|
||||
|
||||
return instance
|
||||
})
|
||||
|
||||
|
|
@ -159,4 +233,8 @@ func (n noopMetrics) WriteInitialMetrics(context.Context) {}
|
|||
|
||||
func (n noopMetrics) WriteAfterScanMetrics(context.Context, bool) {}
|
||||
|
||||
func (n noopMetrics) RecordRequest(context.Context, string, string, string, int, int64) {}
|
||||
|
||||
func (n noopMetrics) RecordPluginRequest(context.Context, string, string, bool, int64) {}
|
||||
|
||||
func (n noopMetrics) GetHandler() http.Handler { return nil }
|
||||
|
|
|
|||
|
|
@ -10,22 +10,23 @@ import (
|
|||
)
|
||||
|
||||
// NewWasmMediaAgent creates a new adapter for a MetadataAgent plugin
|
||||
func newWasmMediaAgent(wasmPath, pluginID string, runtime api.WazeroNewRuntime, mc wazero.ModuleConfig) WasmPlugin {
|
||||
func newWasmMediaAgent(wasmPath, pluginID string, m *Manager, runtime api.WazeroNewRuntime, mc wazero.ModuleConfig) WasmPlugin {
|
||||
loader, err := api.NewMetadataAgentPlugin(context.Background(), api.WazeroRuntime(runtime), api.WazeroModuleConfig(mc))
|
||||
if err != nil {
|
||||
log.Error("Error creating media metadata service plugin", "plugin", pluginID, "path", wasmPath, err)
|
||||
return nil
|
||||
}
|
||||
return &wasmMediaAgent{
|
||||
wasmBasePlugin: &wasmBasePlugin[api.MetadataAgent, *api.MetadataAgentPlugin]{
|
||||
wasmPath: wasmPath,
|
||||
id: pluginID,
|
||||
capability: CapabilityMetadataAgent,
|
||||
loader: loader,
|
||||
loadFunc: func(ctx context.Context, l *api.MetadataAgentPlugin, path string) (api.MetadataAgent, error) {
|
||||
wasmBasePlugin: newWasmBasePlugin[api.MetadataAgent, *api.MetadataAgentPlugin](
|
||||
wasmPath,
|
||||
pluginID,
|
||||
CapabilityMetadataAgent,
|
||||
m.metrics,
|
||||
loader,
|
||||
func(ctx context.Context, l *api.MetadataAgentPlugin, path string) (api.MetadataAgent, error) {
|
||||
return l.Load(ctx, path)
|
||||
},
|
||||
},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ var _ = Describe("Adapter Media Agent", func() {
|
|||
DeferCleanup(configtest.SetupConfig())
|
||||
conf.Server.Plugins.Folder = testDataDir
|
||||
|
||||
mgr = createManager(nil)
|
||||
mgr = createManager(nil, nil)
|
||||
mgr.ScanPlugins()
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -9,22 +9,23 @@ import (
|
|||
)
|
||||
|
||||
// newWasmSchedulerCallback creates a new adapter for a SchedulerCallback plugin
|
||||
func newWasmSchedulerCallback(wasmPath, pluginID string, runtime api.WazeroNewRuntime, mc wazero.ModuleConfig) WasmPlugin {
|
||||
func newWasmSchedulerCallback(wasmPath, pluginID string, m *Manager, runtime api.WazeroNewRuntime, mc wazero.ModuleConfig) WasmPlugin {
|
||||
loader, err := api.NewSchedulerCallbackPlugin(context.Background(), api.WazeroRuntime(runtime), api.WazeroModuleConfig(mc))
|
||||
if err != nil {
|
||||
log.Error("Error creating scheduler callback plugin", "plugin", pluginID, "path", wasmPath, err)
|
||||
return nil
|
||||
}
|
||||
return &wasmSchedulerCallback{
|
||||
wasmBasePlugin: &wasmBasePlugin[api.SchedulerCallback, *api.SchedulerCallbackPlugin]{
|
||||
wasmPath: wasmPath,
|
||||
id: pluginID,
|
||||
capability: CapabilitySchedulerCallback,
|
||||
loader: loader,
|
||||
loadFunc: func(ctx context.Context, l *api.SchedulerCallbackPlugin, path string) (api.SchedulerCallback, error) {
|
||||
wasmBasePlugin: newWasmBasePlugin[api.SchedulerCallback, *api.SchedulerCallbackPlugin](
|
||||
wasmPath,
|
||||
pluginID,
|
||||
CapabilitySchedulerCallback,
|
||||
m.metrics,
|
||||
loader,
|
||||
func(ctx context.Context, l *api.SchedulerCallbackPlugin, path string) (api.SchedulerCallback, error) {
|
||||
return l.Load(ctx, path)
|
||||
},
|
||||
},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,22 +12,23 @@ import (
|
|||
"github.com/tetratelabs/wazero"
|
||||
)
|
||||
|
||||
func newWasmScrobblerPlugin(wasmPath, pluginID string, runtime api.WazeroNewRuntime, mc wazero.ModuleConfig) WasmPlugin {
|
||||
func newWasmScrobblerPlugin(wasmPath, pluginID string, m *Manager, runtime api.WazeroNewRuntime, mc wazero.ModuleConfig) WasmPlugin {
|
||||
loader, err := api.NewScrobblerPlugin(context.Background(), api.WazeroRuntime(runtime), api.WazeroModuleConfig(mc))
|
||||
if err != nil {
|
||||
log.Error("Error creating scrobbler service plugin", "plugin", pluginID, "path", wasmPath, err)
|
||||
return nil
|
||||
}
|
||||
return &wasmScrobblerPlugin{
|
||||
wasmBasePlugin: &wasmBasePlugin[api.Scrobbler, *api.ScrobblerPlugin]{
|
||||
wasmPath: wasmPath,
|
||||
id: pluginID,
|
||||
capability: CapabilityScrobbler,
|
||||
loader: loader,
|
||||
loadFunc: func(ctx context.Context, l *api.ScrobblerPlugin, path string) (api.Scrobbler, error) {
|
||||
wasmBasePlugin: newWasmBasePlugin[api.Scrobbler, *api.ScrobblerPlugin](
|
||||
wasmPath,
|
||||
pluginID,
|
||||
CapabilityScrobbler,
|
||||
m.metrics,
|
||||
loader,
|
||||
func(ctx context.Context, l *api.ScrobblerPlugin, path string) (api.Scrobbler, error) {
|
||||
return l.Load(ctx, path)
|
||||
},
|
||||
},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,22 +9,23 @@ import (
|
|||
)
|
||||
|
||||
// newWasmWebSocketCallback creates a new adapter for a WebSocketCallback plugin
|
||||
func newWasmWebSocketCallback(wasmPath, pluginID string, runtime api.WazeroNewRuntime, mc wazero.ModuleConfig) WasmPlugin {
|
||||
func newWasmWebSocketCallback(wasmPath, pluginID string, m *Manager, runtime api.WazeroNewRuntime, mc wazero.ModuleConfig) WasmPlugin {
|
||||
loader, err := api.NewWebSocketCallbackPlugin(context.Background(), api.WazeroRuntime(runtime), api.WazeroModuleConfig(mc))
|
||||
if err != nil {
|
||||
log.Error("Error creating WebSocket callback plugin", "plugin", pluginID, "path", wasmPath, err)
|
||||
return nil
|
||||
}
|
||||
return &wasmWebSocketCallback{
|
||||
wasmBasePlugin: &wasmBasePlugin[api.WebSocketCallback, *api.WebSocketCallbackPlugin]{
|
||||
wasmPath: wasmPath,
|
||||
id: pluginID,
|
||||
capability: CapabilityWebSocketCallback,
|
||||
loader: loader,
|
||||
loadFunc: func(ctx context.Context, l *api.WebSocketCallbackPlugin, path string) (api.WebSocketCallback, error) {
|
||||
wasmBasePlugin: newWasmBasePlugin[api.WebSocketCallback, *api.WebSocketCallbackPlugin](
|
||||
wasmPath,
|
||||
pluginID,
|
||||
CapabilityWebSocketCallback,
|
||||
m.metrics,
|
||||
loader,
|
||||
func(ctx context.Context, l *api.WebSocketCallbackPlugin, path string) (api.WebSocketCallback, error) {
|
||||
return l.Load(ctx, path)
|
||||
},
|
||||
},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ var _ = Describe("SchedulerService", func() {
|
|||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
manager = createManager(nil)
|
||||
manager = createManager(nil, nil)
|
||||
ss = manager.schedulerService
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ var _ = Describe("WebSocket Host Service", func() {
|
|||
DeferCleanup(server.Close)
|
||||
|
||||
// Create a new manager and websocket service
|
||||
manager = createManager(nil)
|
||||
manager = createManager(nil, nil)
|
||||
wsService = newWebsocketService(manager)
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import (
|
|||
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/core/agents"
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/core/scrobbler"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/model"
|
||||
|
|
@ -39,7 +40,7 @@ const (
|
|||
)
|
||||
|
||||
// pluginCreators maps capability types to their respective creator functions
|
||||
type pluginConstructor func(wasmPath, pluginID string, runtime api.WazeroNewRuntime, mc wazero.ModuleConfig) WasmPlugin
|
||||
type pluginConstructor func(wasmPath, pluginID string, m *Manager, runtime api.WazeroNewRuntime, mc wazero.ModuleConfig) WasmPlugin
|
||||
|
||||
var pluginCreators = map[string]pluginConstructor{
|
||||
CapabilityMetadataAgent: newWasmMediaAgent,
|
||||
|
|
@ -95,21 +96,23 @@ type Manager struct {
|
|||
lifecycle *pluginLifecycleManager // Manages plugin lifecycle and initialization
|
||||
adapters map[string]WasmPlugin // Map of plugin folder name + capability to adapter
|
||||
ds model.DataStore // DataStore for accessing persistent data
|
||||
metrics metrics.Metrics
|
||||
}
|
||||
|
||||
// GetManager returns the singleton instance of Manager
|
||||
func GetManager(ds model.DataStore) *Manager {
|
||||
func GetManager(ds model.DataStore, metrics metrics.Metrics) *Manager {
|
||||
return singleton.GetInstance(func() *Manager {
|
||||
return createManager(ds)
|
||||
return createManager(ds, metrics)
|
||||
})
|
||||
}
|
||||
|
||||
// createManager creates a new Manager instance. Used in tests
|
||||
func createManager(ds model.DataStore) *Manager {
|
||||
func createManager(ds model.DataStore, metrics metrics.Metrics) *Manager {
|
||||
m := &Manager{
|
||||
plugins: make(map[string]*plugin),
|
||||
lifecycle: newPluginLifecycleManager(),
|
||||
ds: ds,
|
||||
metrics: metrics,
|
||||
}
|
||||
|
||||
// Create the host services
|
||||
|
|
@ -174,7 +177,7 @@ func (m *Manager) registerPlugin(pluginID, pluginDir, wasmPath string, manifest
|
|||
}
|
||||
continue
|
||||
}
|
||||
adapter := constructor(wasmPath, pluginID, customRuntime, mc)
|
||||
adapter := constructor(wasmPath, pluginID, m, customRuntime, mc)
|
||||
if adapter == nil {
|
||||
log.Error("Failed to create plugin adapter", "plugin", pluginID, "capability", capabilityStr, "path", wasmPath)
|
||||
continue
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ var _ = Describe("Plugin Manager", func() {
|
|||
conf.Server.Plugins.Folder = testDataDir
|
||||
|
||||
ctx = GinkgoT().Context()
|
||||
mgr = createManager(nil)
|
||||
mgr = createManager(nil, nil)
|
||||
mgr.ScanPlugins()
|
||||
})
|
||||
|
||||
|
|
@ -85,7 +85,7 @@ var _ = Describe("Plugin Manager", func() {
|
|||
})
|
||||
|
||||
conf.Server.Plugins.Folder = tempPluginsDir
|
||||
m = createManager(nil)
|
||||
m = createManager(nil, nil)
|
||||
})
|
||||
|
||||
// Helper to create a complete valid plugin for manager testing
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ var _ = Describe("Plugin Permissions", func() {
|
|||
BeforeEach(func() {
|
||||
DeferCleanup(configtest.SetupConfig())
|
||||
ctx = context.Background()
|
||||
mgr = createManager(nil)
|
||||
mgr = createManager(nil, nil)
|
||||
tempDir = GinkgoT().TempDir()
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ var _ = Describe("CachingRuntime", func() {
|
|||
|
||||
BeforeEach(func() {
|
||||
ctx = GinkgoT().Context()
|
||||
mgr = createManager(nil)
|
||||
mgr = createManager(nil, nil)
|
||||
// Add permissions for the test plugin using typed struct
|
||||
permissions := schema.PluginManifestPermissions{
|
||||
Http: &schema.PluginManifestPermissionsHttp{
|
||||
|
|
@ -58,6 +58,7 @@ var _ = Describe("CachingRuntime", func() {
|
|||
plugin = newWasmScrobblerPlugin(
|
||||
filepath.Join(testDataDir, "fake_scrobbler", "plugin.wasm"),
|
||||
"fake_scrobbler",
|
||||
mgr,
|
||||
rtFunc,
|
||||
wazero.NewModuleConfig().WithStartFunctions("_initialize"),
|
||||
).(*wasmScrobblerPlugin)
|
||||
|
|
|
|||
|
|
@ -2,13 +2,28 @@ package plugins
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/core/agents"
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/model/id"
|
||||
)
|
||||
|
||||
// newWasmBasePlugin creates a new instance of wasmBasePlugin with the required parameters.
|
||||
func newWasmBasePlugin[S any, P any](wasmPath, id, capability string, m metrics.Metrics, loader P, loadFunc loaderFunc[S, P]) *wasmBasePlugin[S, P] {
|
||||
return &wasmBasePlugin[S, P]{
|
||||
wasmPath: wasmPath,
|
||||
id: id,
|
||||
capability: capability,
|
||||
loader: loader,
|
||||
loadFunc: loadFunc,
|
||||
metrics: m,
|
||||
}
|
||||
}
|
||||
|
||||
// LoaderFunc is a generic function type that loads a plugin instance.
|
||||
type loaderFunc[S any, P any] func(ctx context.Context, loader P, path string) (S, error)
|
||||
|
||||
|
|
@ -20,6 +35,7 @@ type wasmBasePlugin[S any, P any] struct {
|
|||
capability string
|
||||
loader P
|
||||
loadFunc loaderFunc[S, P]
|
||||
metrics metrics.Metrics
|
||||
}
|
||||
|
||||
func (w *wasmBasePlugin[S, P]) PluginID() string {
|
||||
|
|
@ -34,6 +50,10 @@ func (w *wasmBasePlugin[S, P]) serviceName() string {
|
|||
return w.id + "_" + w.capability
|
||||
}
|
||||
|
||||
func (w *wasmBasePlugin[S, P]) getMetrics() metrics.Metrics {
|
||||
return w.metrics
|
||||
}
|
||||
|
||||
// getInstance loads a new plugin instance and returns a cleanup function.
|
||||
func (w *wasmBasePlugin[S, P]) getInstance(ctx context.Context, methodName string) (S, func(), error) {
|
||||
start := time.Now()
|
||||
|
|
@ -57,7 +77,9 @@ func (w *wasmBasePlugin[S, P]) getInstance(ctx context.Context, methodName strin
|
|||
}
|
||||
|
||||
type wasmPlugin[S any] interface {
|
||||
PluginID() string
|
||||
getInstance(ctx context.Context, methodName string) (S, func(), error)
|
||||
getMetrics() metrics.Metrics
|
||||
}
|
||||
|
||||
type errorMapper interface {
|
||||
|
|
@ -73,10 +95,25 @@ func callMethod[S any, R any](ctx context.Context, w wasmPlugin[S], methodName s
|
|||
if err != nil {
|
||||
return r, err
|
||||
}
|
||||
start := time.Now()
|
||||
defer done()
|
||||
r, err = fn(inst)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if em, ok := any(w).(errorMapper); ok {
|
||||
return r, em.mapError(err)
|
||||
mappedErr := em.mapError(err)
|
||||
|
||||
if !errors.Is(mappedErr, agents.ErrNotFound) {
|
||||
id := w.PluginID()
|
||||
isOk := mappedErr == nil
|
||||
metrics := w.getMetrics()
|
||||
if metrics != nil {
|
||||
metrics.RecordPluginRequest(ctx, id, methodName, isOk, elapsed.Milliseconds())
|
||||
}
|
||||
log.Trace(ctx, "callMethod", "plugin", id, "method", methodName, "ok", isOk, elapsed)
|
||||
}
|
||||
|
||||
return r, mappedErr
|
||||
}
|
||||
return r, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ var _ = Describe("Album Lists", func() {
|
|||
BeforeEach(func() {
|
||||
ds = &tests.MockDataStore{}
|
||||
mockRepo = ds.Album(ctx).(*tests.MockAlbumRepo)
|
||||
router = New(ds, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
router = New(ds, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
w = httptest.NewRecorder()
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/navidrome/navidrome/core"
|
||||
"github.com/navidrome/navidrome/core/artwork"
|
||||
"github.com/navidrome/navidrome/core/external"
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/core/playback"
|
||||
"github.com/navidrome/navidrome/core/scrobbler"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
|
|
@ -43,11 +44,13 @@ type Router struct {
|
|||
scrobbler scrobbler.PlayTracker
|
||||
share core.Share
|
||||
playback playback.PlaybackServer
|
||||
metrics metrics.Metrics
|
||||
}
|
||||
|
||||
func New(ds model.DataStore, artwork artwork.Artwork, streamer core.MediaStreamer, archiver core.Archiver,
|
||||
players core.Players, provider external.Provider, scanner scanner.Scanner, broker events.Broker,
|
||||
playlists core.Playlists, scrobbler scrobbler.PlayTracker, share core.Share, playback playback.PlaybackServer,
|
||||
metrics metrics.Metrics,
|
||||
) *Router {
|
||||
r := &Router{
|
||||
ds: ds,
|
||||
|
|
@ -62,6 +65,7 @@ func New(ds model.DataStore, artwork artwork.Artwork, streamer core.MediaStreame
|
|||
scrobbler: scrobbler,
|
||||
share: share,
|
||||
playback: playback,
|
||||
metrics: metrics,
|
||||
}
|
||||
r.Handler = r.routes()
|
||||
return r
|
||||
|
|
@ -69,6 +73,11 @@ func New(ds model.DataStore, artwork artwork.Artwork, streamer core.MediaStreame
|
|||
|
||||
func (api *Router) routes() http.Handler {
|
||||
r := chi.NewRouter()
|
||||
|
||||
if conf.Server.Prometheus.Enabled {
|
||||
r.Use(recordStats(api.metrics))
|
||||
}
|
||||
|
||||
r.Use(postFormToQueryParams)
|
||||
|
||||
// Public
|
||||
|
|
@ -223,7 +232,7 @@ func h(r chi.Router, path string, f handler) {
|
|||
})
|
||||
}
|
||||
|
||||
// Add a Subsonic handler that requires a http.ResponseWriter (ex: stream, getCoverArt...)
|
||||
// Add a Subsonic handler that requires an http.ResponseWriter (ex: stream, getCoverArt...)
|
||||
func hr(r chi.Router, path string, f handlerRaw) {
|
||||
handle := func(w http.ResponseWriter, r *http.Request) {
|
||||
res, err := f(w, r)
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ var _ = Describe("MediaAnnotationController", func() {
|
|||
ds = &tests.MockDataStore{}
|
||||
playTracker = &fakePlayTracker{}
|
||||
eventBroker = &fakeEventBroker{}
|
||||
router = New(ds, nil, nil, nil, nil, nil, nil, eventBroker, nil, playTracker, nil, nil)
|
||||
router = New(ds, nil, nil, nil, nil, nil, nil, eventBroker, nil, playTracker, nil, nil, nil)
|
||||
})
|
||||
|
||||
Describe("Scrobble", func() {
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ var _ = Describe("MediaRetrievalController", func() {
|
|||
MockedMediaFile: mockRepo,
|
||||
}
|
||||
artwork = &fakeArtwork{data: "image data"}
|
||||
router = New(ds, artwork, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
router = New(ds, artwork, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
w = httptest.NewRecorder()
|
||||
DeferCleanup(configtest.SetupConfig())
|
||||
conf.Server.LyricsPriority = "embedded,.lrc"
|
||||
|
|
|
|||
|
|
@ -11,12 +11,15 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5/middleware"
|
||||
ua "github.com/mileusna/useragent"
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/consts"
|
||||
"github.com/navidrome/navidrome/core"
|
||||
"github.com/navidrome/navidrome/core/auth"
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/model"
|
||||
"github.com/navidrome/navidrome/model/request"
|
||||
|
|
@ -222,3 +225,23 @@ func playerIDCookieName(userName string) string {
|
|||
cookieName := fmt.Sprintf("nd-player-%x", userName)
|
||||
return cookieName
|
||||
}
|
||||
|
||||
func recordStats(metrics metrics.Metrics) func(next http.Handler) http.Handler {
|
||||
return func(next http.Handler) http.Handler {
|
||||
fn := func(w http.ResponseWriter, r *http.Request) {
|
||||
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor)
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
// We want to get the client name (even if not present for certain endpoints)
|
||||
p := req.Params(r)
|
||||
client, _ := p.String("c")
|
||||
|
||||
metrics.RecordRequest(r.Context(), strings.Replace(r.URL.Path, ".view", "", 1), r.Method, client, ww.Status(), time.Since(start).Milliseconds())
|
||||
}()
|
||||
|
||||
next.ServeHTTP(ww, r)
|
||||
}
|
||||
return http.HandlerFunc(fn)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ var _ = Describe("GetOpenSubsonicExtensions", func() {
|
|||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
router = subsonic.New(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
router = subsonic.New(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
w = httptest.NewRecorder()
|
||||
r = httptest.NewRequest("GET", "/getOpenSubsonicExtensions?f=json", nil)
|
||||
})
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ var _ = Describe("UpdatePlaylist", func() {
|
|||
BeforeEach(func() {
|
||||
ds = &tests.MockDataStore{}
|
||||
playlists = &fakePlaylists{}
|
||||
router = New(ds, nil, nil, nil, nil, nil, nil, nil, playlists, nil, nil, nil)
|
||||
router = New(ds, nil, nil, nil, nil, nil, nil, nil, playlists, nil, nil, nil, nil)
|
||||
})
|
||||
|
||||
It("clears the comment when parameter is empty", func() {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue