Implement some review suggestions

This commit is contained in:
Daniel 2022-07-14 13:44:31 +02:00
parent c9a217a9b8
commit 51a2d27522
2 changed files with 13 additions and 47 deletions

View file

@ -78,12 +78,6 @@ func (mng *Manager) runtimeGet(keyOrPrefix string) ([]record.Record, error) {
// HandleFeed handles and persists updates one after each other! Depending on the system // HandleFeed handles and persists updates one after each other! Depending on the system
// load the user might want to use a buffered channel for feed. // load the user might want to use a buffered channel for feed.
func (mng *Manager) HandleFeed(ctx context.Context, feed <-chan *network.Connection) { func (mng *Manager) HandleFeed(ctx context.Context, feed <-chan *network.Connection) {
// count the number of inserted rows for logging purposes.
//
// TODO(ppacher): how to handle the, though unlikely case, of a
// overflow to 0 here?
var count uint64
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -101,7 +95,7 @@ func (mng *Manager) HandleFeed(ctx context.Context, feed <-chan *network.Connect
continue continue
} }
log.Infof("netquery: persisting create/update to connection %s", conn.ID) log.Tracef("netquery: updating connection %s", conn.ID)
if err := mng.store.Save(ctx, *model); err != nil { if err := mng.store.Save(ctx, *model); err != nil {
log.Errorf("netquery: failed to save connection %s in sqlite database: %s", conn.ID, err) log.Errorf("netquery: failed to save connection %s in sqlite database: %s", conn.ID, err)
@ -118,12 +112,6 @@ func (mng *Manager) HandleFeed(ctx context.Context, feed <-chan *network.Connect
if err := mng.pushConnUpdate(ctx, *cloned, *model); err != nil { if err := mng.pushConnUpdate(ctx, *cloned, *model); err != nil {
log.Errorf("netquery: failed to push update for conn %s via database system: %s", conn.ID, err) log.Errorf("netquery: failed to push update for conn %s via database system: %s", conn.ID, err)
} }
count++
if count%20 == 0 {
log.Debugf("netquery: persisted %d connections so far", count)
}
} }
} }
} }

View file

@ -43,7 +43,6 @@ func (m *Module) Prepare() error {
m.db = database.NewInterface(&database.Options{ m.db = database.NewInterface(&database.Options{
Local: true, Local: true,
Internal: true, Internal: true,
CacheSize: 0,
}) })
m.sqlStore, err = NewInMemory() m.sqlStore, err = NewInMemory()
@ -71,12 +70,12 @@ func (m *Module) Prepare() error {
if err := api.RegisterEndpoint(api.Endpoint{ if err := api.RegisterEndpoint(api.Endpoint{
Path: "netquery/query", Path: "netquery/query",
MimeType: "application/json", MimeType: "application/json",
Read: api.PermitAnyone, Read: api.PermitUser,
Write: api.PermitAnyone, Write: api.PermitUser,
BelongsTo: m.Module, BelongsTo: m.Module,
HandlerFunc: queryHander.ServeHTTP, HandlerFunc: queryHander.ServeHTTP,
Name: "Query In-Memory Database", Name: "Query Connections",
Description: "Query the in-memory sqlite database", Description: "Query the in-memory sqlite connection database.",
}); err != nil { }); err != nil {
return fmt.Errorf("failed to register API endpoint: %w", err) return fmt.Errorf("failed to register API endpoint: %w", err)
} }
@ -84,12 +83,12 @@ func (m *Module) Prepare() error {
if err := api.RegisterEndpoint(api.Endpoint{ if err := api.RegisterEndpoint(api.Endpoint{
Path: "netquery/charts/connection-active", Path: "netquery/charts/connection-active",
MimeType: "application/json", MimeType: "application/json",
Read: api.PermitAnyone, Read: api.PermitUser,
Write: api.PermitAnyone, Write: api.PermitUser,
BelongsTo: m.Module, BelongsTo: m.Module,
HandlerFunc: chartHandler.ServeHTTP, HandlerFunc: chartHandler.ServeHTTP,
Name: "Query In-Memory Database", Name: "Active Connections Chart",
Description: "Query the in-memory sqlite database", Description: "Query the in-memory sqlite connection database and return a chart of active connections.",
}); err != nil { }); err != nil {
return fmt.Errorf("failed to register API endpoint: %w", err) return fmt.Errorf("failed to register API endpoint: %w", err)
} }
@ -142,36 +141,15 @@ func (mod *Module) Start() error {
if err != nil { if err != nil {
log.Errorf("netquery: failed to count number of rows in memory: %s", err) log.Errorf("netquery: failed to count number of rows in memory: %s", err)
} else { } else {
log.Infof("netquery: successfully removed %d old rows that ended before %s", count, threshold) log.Tracef("netquery: successfully removed %d old rows that ended before %s", count, threshold)
} }
} }
} }
}) })
mod.StartWorker("netquery-row-counter", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(1 * time.Second):
count, err := mod.sqlStore.CountRows(ctx)
if err != nil {
log.Errorf("netquery: failed to count number of rows in memory: %s", err)
} else {
log.Infof("netquery: currently holding %d rows in memory", count)
}
/*
if err := sqlStore.dumpTo(ctx, os.Stderr); err != nil {
log.Errorf("netquery: failed to dump sqlite memory content: %w", err)
}
*/
}
}
})
// for debugging, we provide a simple direct SQL query interface using // for debugging, we provide a simple direct SQL query interface using
// the runtime database // the runtime database
// FIXME: Expose only in dev mode.
_, err := NewRuntimeQueryRunner(mod.sqlStore, "netquery/query/", runtime.DefaultRegistry) _, err := NewRuntimeQueryRunner(mod.sqlStore, "netquery/query/", runtime.DefaultRegistry)
if err != nil { if err != nil {
return fmt.Errorf("failed to set up runtime SQL query runner: %w", err) return fmt.Errorf("failed to set up runtime SQL query runner: %w", err)