Improve history purging

This commit is contained in:
Daniel 2023-08-09 14:45:56 +02:00
parent a722b27c01
commit cf70c55ab5
5 changed files with 101 additions and 72 deletions

View file

@ -87,35 +87,37 @@ func (m *module) prepare() error {
}
if err := api.RegisterEndpoint(api.Endpoint{
Name: "Query Connections",
Description: "Query the in-memory sqlite connection database.",
Path: "netquery/query",
MimeType: "application/json",
Read: api.PermitUser, // Needs read+write as the query is sent using POST data.
Write: api.PermitUser, // Needs read+write as the query is sent using POST data.
BelongsTo: m.Module,
HandlerFunc: queryHander.ServeHTTP,
Name: "Query Connections",
Description: "Query the in-memory sqlite connection database.",
}); err != nil {
return fmt.Errorf("failed to register API endpoint: %w", err)
}
if err := api.RegisterEndpoint(api.Endpoint{
Name: "Active Connections Chart",
Description: "Query the in-memory sqlite connection database and return a chart of active connections.",
Path: "netquery/charts/connection-active",
MimeType: "application/json",
Write: api.PermitUser,
BelongsTo: m.Module,
HandlerFunc: chartHandler.ServeHTTP,
Name: "Active Connections Chart",
Description: "Query the in-memory sqlite connection database and return a chart of active connections.",
}); err != nil {
return fmt.Errorf("failed to register API endpoint: %w", err)
}
if err := api.RegisterEndpoint(api.Endpoint{
Path: "netquery/history/clear",
MimeType: "application/json",
Write: api.PermitUser,
BelongsTo: m.Module,
Name: "Remove connections from profile history",
Description: "Remove all connections from the history database for one or more profiles",
Path: "netquery/history/clear",
MimeType: "application/json",
Write: api.PermitUser,
BelongsTo: m.Module,
HandlerFunc: func(w http.ResponseWriter, r *http.Request) {
var body struct {
ProfileIDs []string `json:"profileIDs"`
@ -154,28 +156,21 @@ func (m *module) prepare() error {
w.WriteHeader(http.StatusNoContent)
},
Name: "Remove connections from profile history",
Description: "Remove all connections from the history database for one or more profiles",
}); err != nil {
return fmt.Errorf("failed to register API endpoint: %w", err)
}
if err := api.RegisterEndpoint(api.Endpoint{
Name: "Apply connection history retention threshold",
Path: "netquery/history/cleanup",
MimeType: "application/json",
Write: api.PermitUser,
BelongsTo: m.Module,
HandlerFunc: func(w http.ResponseWriter, r *http.Request) {
if err := m.Store.CleanupHistoryData(r.Context()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
ActionFunc: func(ar *api.Request) (msg string, err error) {
if err := m.Store.PurgeOldHistory(ar.Context()); err != nil {
return "", err
}
w.WriteHeader(http.StatusNoContent)
return "Deleted outdated connections.", nil
},
Name: "Apply connection history retention threshold",
}); err != nil {
return fmt.Errorf("failed to register API endpoint: %w", err)
}
@ -184,7 +179,7 @@ func (m *module) prepare() error {
}
func (m *module) start() error {
m.StartServiceWorker("netquery-feeder", time.Second, func(ctx context.Context) error {
m.StartServiceWorker("netquery connection feed listener", 0, func(ctx context.Context) error {
sub, err := m.db.Subscribe(query.New("network:"))
if err != nil {
return fmt.Errorf("failed to subscribe to network tree: %w", err)
@ -215,16 +210,12 @@ func (m *module) start() error {
}
})
m.StartServiceWorker("netquery-persister", time.Second, func(ctx context.Context) error {
m.StartServiceWorker("netquery connection feed handler", 0, func(ctx context.Context) error {
m.mng.HandleFeed(ctx, m.feed)
return nil
})
m.StartServiceWorker("history-row-cleaner", time.Hour, func(ctx context.Context) error {
return m.Store.CleanupHistoryData(ctx)
})
m.StartServiceWorker("netquery-row-cleaner", time.Second, func(ctx context.Context) error {
m.StartServiceWorker("netquery live db cleaner", 0, func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
@ -233,14 +224,18 @@ func (m *module) start() error {
threshold := time.Now().Add(-network.DeleteConnsAfterEndedThreshold)
count, err := m.Store.Cleanup(ctx, threshold)
if err != nil {
log.Errorf("netquery: failed to count number of rows in memory: %s", err)
log.Errorf("netquery: failed to removed old connections from live db: %s", err)
} else {
log.Tracef("netquery: successfully removed %d old rows that ended before %s", count, threshold)
log.Tracef("netquery: successfully removed %d old connections from live db that ended before %s", count, threshold)
}
}
}
})
m.NewTask("network history cleaner", func(ctx context.Context, _ *modules.Task) error {
return m.Store.PurgeOldHistory(ctx)
}).Repeat(time.Hour).Schedule(time.Now().Add(10 * time.Minute))
// For debugging, provide a simple direct SQL query interface using
// the runtime database.
// Only expose in development mode.
@ -269,9 +264,9 @@ func (m *module) stop() error {
if err := m.mng.store.Close(); err != nil {
log.Errorf("netquery: failed to close sqlite database: %s", err)
} else {
// try to vaccum the history database now
// Clear deleted connections from database.
if err := VacuumHistory(ctx); err != nil {
log.Errorf("netquery: failed to execute VACCUM in history database: %s", err)
log.Errorf("netquery: failed to execute VACUUM in history database: %s", err)
}
}