From 3dbde10be01f3d8b12c7ea322c96bafd1499243c Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Tue, 8 Aug 2023 14:35:43 +0200 Subject: [PATCH] Add support for history data retention --- netquery/database.go | 93 +++++++++++++++++++++++++++++++++++++++--- netquery/manager.go | 6 +++ netquery/module_api.go | 33 +++++++++++++++ profile/config.go | 25 ++++++++++++ profile/get.go | 10 +++-- profile/profile.go | 12 ++++++ 6 files changed, 169 insertions(+), 10 deletions(-) diff --git a/netquery/database.go b/netquery/database.go index 7823a700..d71c4501 100644 --- a/netquery/database.go +++ b/netquery/database.go @@ -22,6 +22,7 @@ import ( "github.com/safing/portmaster/network" "github.com/safing/portmaster/network/netutils" "github.com/safing/portmaster/network/packet" + "github.com/safing/portmaster/profile" ) // InMemory is the "file path" to open a new in-memory database. @@ -202,6 +203,42 @@ func NewInMemory() (*Database, error) { return db, nil } +func (db *Database) Close() error { + db.readConnPool.Close() + + if err := db.writeConn.Close(); err != nil { + return err + } + + return nil +} + +func VacuumHistory(ctx context.Context) error { + historyParentDir := dataroot.Root().ChildDir("databases", 0o700) + if err := historyParentDir.Ensure(); err != nil { + return fmt.Errorf("failed to ensure database directory exists: %w", err) + } + + // Get file location of history database. + historyFile := filepath.Join(historyParentDir.Path, "history.db") + // Convert to SQLite URI path. + historyURI := "file:///" + strings.TrimPrefix(filepath.ToSlash(historyFile), "/") + + writeConn, err := sqlite.OpenConn( + historyURI, + sqlite.OpenCreate, + sqlite.OpenReadWrite, + sqlite.OpenWAL, + sqlite.OpenSharedCache, + sqlite.OpenURI, + ) + if err != nil { + return err + } + + return orm.RunQuery(ctx, writeConn, "VACUUM") +} + // ApplyMigrations applies any table and data migrations that are needed // to bring db up-to-date with the built-in schema. // TODO(ppacher): right now this only applies the current schema and ignores @@ -377,6 +414,56 @@ func (db *Database) dumpTo(ctx context.Context, w io.Writer) error { //nolint:un return enc.Encode(conns) } +func (db *Database) CleanupHistoryData(ctx context.Context) error { + query := "SELECT DISTINCT profile FROM history.connections" + + var result []struct { + Profile string `sqlite:"profile"` + } + + if err := db.Execute(ctx, query, orm.WithResult(&result)); err != nil { + return fmt.Errorf("failed to get a list of profiles from the history database: %w", err) + } + + globalRetentionDays := profile.CfgOptionHistoryRetention() + merr := new(multierror.Error) + + for _, row := range result { + id := strings.TrimPrefix(row.Profile, string(profile.SourceLocal)+"/") + p, err := profile.GetLocalProfile(id, nil, nil) + + var retention int + if err == nil { + retention = p.HistoryRetention() + } else { + // we failed to get the profile, fallback to the global setting + log.Errorf("failed to load profile for id %s: %s", id, err) + retention = int(globalRetentionDays) + } + + if retention == 0 { + log.Infof("skipping history data retention for profile %s: retention is disabled", row.Profile) + + continue + } + + threshold := time.Now().Add(-1 * time.Duration(retention) * time.Hour * 24) + + log.Infof("cleaning up history data for profile %s with retention setting %d days (threshold = %s)", row.Profile, retention, threshold.Format(orm.SqliteTimeFormat)) + + query := "DELETE FROM history.connections WHERE profile = :profile AND active = FALSE AND datetime(started) < datetime(:threshold)" + if err := db.ExecuteWrite(ctx, query, orm.WithNamedArgs(map[string]any{ + ":profile": row.Profile, + ":threshold": threshold.Format(orm.SqliteTimeFormat), + })); err != nil { + log.Errorf("failed to delete connections for profile %s from history: %s", row.Profile, err) + merr.Errors = append(merr.Errors, fmt.Errorf("profile %s: %w", row.Profile, err)) + } + } + + return merr.ErrorOrNil() +} + // MarkAllHistoryConnectionsEnded marks all connections in the history database as ended. func (db *Database) MarkAllHistoryConnectionsEnded(ctx context.Context) error { query := fmt.Sprintf("UPDATE %s.connections SET active = FALSE, ended = :ended WHERE active = TRUE", HistoryDatabase) @@ -512,9 +599,3 @@ func (db *Database) Save(ctx context.Context, conn Conn, enableHistory bool) err return nil } - -// Close closes the underlying database connection. db should and cannot be -// used after Close() has returned. -func (db *Database) Close() error { - return db.writeConn.Close() -} diff --git a/netquery/manager.go b/netquery/manager.go index 9116cbad..857576e9 100644 --- a/netquery/manager.go +++ b/netquery/manager.go @@ -39,6 +39,12 @@ type ( // UpdateBandwidth updates bandwidth data for the connection and optionally also writes // the bandwidth data to the history database. UpdateBandwidth(ctx context.Context, enableHistory bool, processKey string, connID string, bytesReceived uint64, bytesSent uint64) error + + // CleanupHistoryData applies data retention to the history database. + CleanupHistoryData(ctx context.Context) error + + // Close closes the connection store. It must not be used afterwards. + Close() error } // Manager handles new and updated network.Connections feeds and persists them diff --git a/netquery/module_api.go b/netquery/module_api.go index 860f4e4c..c2b5b6a2 100644 --- a/netquery/module_api.go +++ b/netquery/module_api.go @@ -158,6 +158,26 @@ func (m *module) prepare() error { Description: "Remove all connections from the history database for one or more profiles", }); err != nil { return fmt.Errorf("failed to register API endpoint: %w", err) + + } + + if err := api.RegisterEndpoint(api.Endpoint{ + Path: "netquery/history/cleanup", + MimeType: "application/json", + Write: api.PermitUser, + BelongsTo: m.Module, + HandlerFunc: func(w http.ResponseWriter, r *http.Request) { + if err := m.Store.CleanupHistoryData(r.Context()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + + return + } + + w.WriteHeader(http.StatusNoContent) + }, + Name: "Apply connection history retention threshold", + }); err != nil { + return fmt.Errorf("failed to register API endpoint: %w", err) } return nil @@ -200,6 +220,10 @@ func (m *module) start() error { return nil }) + m.StartServiceWorker("history-row-cleaner", time.Hour, func(ctx context.Context) error { + return m.Store.CleanupHistoryData(ctx) + }) + m.StartServiceWorker("netquery-row-cleaner", time.Second, func(ctx context.Context) error { for { select { @@ -242,5 +266,14 @@ func (m *module) stop() error { log.Errorf("netquery: failed to mark connections in history database as ended: %s", err) } + if err := m.mng.store.Close(); err != nil { + log.Errorf("netquery: failed to close sqlite database: %s", err) + } else { + // try to vaccum the history database now + if err := VacuumHistory(ctx); err != nil { + log.Errorf("netquery: failed to execute VACCUM in history database: %s", err) + } + } + return nil } diff --git a/profile/config.go b/profile/config.go index 8c69a4e9..e9a6b963 100644 --- a/profile/config.go +++ b/profile/config.go @@ -112,6 +112,10 @@ var ( cfgOptionEnableHistory config.BoolOption cfgOptionEnableHistoryOrder = 96 + CfgOptionHistoryRetentionKey = "history/retention" + CfgOptionHistoryRetention config.IntOption + cfgOptionHistoryRetentionOrder = 97 + // Setting "Enable SPN" at order 128. CfgOptionUseSPNKey = "spn/use" @@ -267,6 +271,27 @@ func registerConfiguration() error { //nolint:maintidx cfgOptionEnableHistory = config.Concurrent.GetAsBool(CfgOptionEnableHistoryKey, false) cfgBoolOptions[CfgOptionEnableHistoryKey] = cfgOptionEnableHistory + err = config.Register(&config.Option{ + Name: "History Data Retention", + Key: CfgOptionHistoryRetentionKey, + Description: "How low, in days, connections should be kept in history.", + OptType: config.OptTypeInt, + ReleaseLevel: config.ReleaseLevelStable, + ExpertiseLevel: config.ExpertiseLevelUser, + DefaultValue: 7, + Annotations: config.Annotations{ + config.UnitAnnotation: "Days", + config.DisplayOrderAnnotation: cfgOptionHistoryRetentionOrder, + config.CategoryAnnotation: "History", + config.RequiresFeatureID: account.FeatureHistory, + }, + }) + if err != nil { + return err + } + CfgOptionHistoryRetention = config.Concurrent.GetAsInt(CfgOptionHistoryRetentionKey, 7) + cfgIntOptions[CfgOptionHistoryRetentionKey] = CfgOptionHistoryRetention + rulesHelp := strings.ReplaceAll(`Rules are checked from top to bottom, stopping after the first match. They can match: - By address: "192.168.0.1" diff --git a/profile/get.go b/profile/get.go index b9cc360f..7ed686e8 100644 --- a/profile/get.go +++ b/profile/get.go @@ -127,10 +127,12 @@ func GetLocalProfile(id string, md MatchingData, createProfileCallback func() *P // Update metadata. var changed bool - if special { - changed = updateSpecialProfileMetadata(profile, md.Path()) - } else { - changed = profile.updateMetadata(md.Path()) + if md != nil { + if special { + changed = updateSpecialProfileMetadata(profile, md.Path()) + } else { + changed = profile.updateMetadata(md.Path()) + } } // Save if created or changed. diff --git a/profile/profile.go b/profile/profile.go index 2d0eb9c4..29ca434d 100644 --- a/profile/profile.go +++ b/profile/profile.go @@ -137,6 +137,7 @@ type Profile struct { //nolint:maligned // not worth the effort spnUsagePolicy endpoints.Endpoints spnExitHubPolicy endpoints.Endpoints enableHistory bool + historyRetention int // Lifecycle Management outdated *abool.AtomicBool @@ -239,6 +240,13 @@ func (profile *Profile) parseConfig() error { profile.enableHistory = enableHistory } + retention, ok := profile.configPerspective.GetAsInt(CfgOptionHistoryRetentionKey) + if ok { + profile.historyRetention = int(retention) + } else { + profile.historyRetention = int(CfgOptionHistoryRetention()) + } + return lastErr } @@ -326,6 +334,10 @@ func (profile *Profile) HistoryEnabled() bool { return profile.enableHistory } +func (profile *Profile) HistoryRetention() int { + return profile.historyRetention +} + // GetEndpoints returns the endpoint list of the profile. This functions // requires the profile to be read locked. func (profile *Profile) GetEndpoints() endpoints.Endpoints {