Add support for history data retention

This commit is contained in:
Patrick Pacher 2023-08-08 14:35:43 +02:00
parent 620a9c0fde
commit 3dbde10be0
6 changed files with 169 additions and 10 deletions

View file

@ -22,6 +22,7 @@ import (
"github.com/safing/portmaster/network" "github.com/safing/portmaster/network"
"github.com/safing/portmaster/network/netutils" "github.com/safing/portmaster/network/netutils"
"github.com/safing/portmaster/network/packet" "github.com/safing/portmaster/network/packet"
"github.com/safing/portmaster/profile"
) )
// InMemory is the "file path" to open a new in-memory database. // InMemory is the "file path" to open a new in-memory database.
@ -202,6 +203,42 @@ func NewInMemory() (*Database, error) {
return db, nil return db, nil
} }
func (db *Database) Close() error {
db.readConnPool.Close()
if err := db.writeConn.Close(); err != nil {
return err
}
return nil
}
func VacuumHistory(ctx context.Context) error {
historyParentDir := dataroot.Root().ChildDir("databases", 0o700)
if err := historyParentDir.Ensure(); err != nil {
return fmt.Errorf("failed to ensure database directory exists: %w", err)
}
// Get file location of history database.
historyFile := filepath.Join(historyParentDir.Path, "history.db")
// Convert to SQLite URI path.
historyURI := "file:///" + strings.TrimPrefix(filepath.ToSlash(historyFile), "/")
writeConn, err := sqlite.OpenConn(
historyURI,
sqlite.OpenCreate,
sqlite.OpenReadWrite,
sqlite.OpenWAL,
sqlite.OpenSharedCache,
sqlite.OpenURI,
)
if err != nil {
return err
}
return orm.RunQuery(ctx, writeConn, "VACUUM")
}
// ApplyMigrations applies any table and data migrations that are needed // ApplyMigrations applies any table and data migrations that are needed
// to bring db up-to-date with the built-in schema. // to bring db up-to-date with the built-in schema.
// TODO(ppacher): right now this only applies the current schema and ignores // TODO(ppacher): right now this only applies the current schema and ignores
@ -377,6 +414,56 @@ func (db *Database) dumpTo(ctx context.Context, w io.Writer) error { //nolint:un
return enc.Encode(conns) return enc.Encode(conns)
} }
func (db *Database) CleanupHistoryData(ctx context.Context) error {
query := "SELECT DISTINCT profile FROM history.connections"
var result []struct {
Profile string `sqlite:"profile"`
}
if err := db.Execute(ctx, query, orm.WithResult(&result)); err != nil {
return fmt.Errorf("failed to get a list of profiles from the history database: %w", err)
}
globalRetentionDays := profile.CfgOptionHistoryRetention()
merr := new(multierror.Error)
for _, row := range result {
id := strings.TrimPrefix(row.Profile, string(profile.SourceLocal)+"/")
p, err := profile.GetLocalProfile(id, nil, nil)
var retention int
if err == nil {
retention = p.HistoryRetention()
} else {
// we failed to get the profile, fallback to the global setting
log.Errorf("failed to load profile for id %s: %s", id, err)
retention = int(globalRetentionDays)
}
if retention == 0 {
log.Infof("skipping history data retention for profile %s: retention is disabled", row.Profile)
continue
}
threshold := time.Now().Add(-1 * time.Duration(retention) * time.Hour * 24)
log.Infof("cleaning up history data for profile %s with retention setting %d days (threshold = %s)", row.Profile, retention, threshold.Format(orm.SqliteTimeFormat))
query := "DELETE FROM history.connections WHERE profile = :profile AND active = FALSE AND datetime(started) < datetime(:threshold)"
if err := db.ExecuteWrite(ctx, query, orm.WithNamedArgs(map[string]any{
":profile": row.Profile,
":threshold": threshold.Format(orm.SqliteTimeFormat),
})); err != nil {
log.Errorf("failed to delete connections for profile %s from history: %s", row.Profile, err)
merr.Errors = append(merr.Errors, fmt.Errorf("profile %s: %w", row.Profile, err))
}
}
return merr.ErrorOrNil()
}
// MarkAllHistoryConnectionsEnded marks all connections in the history database as ended. // MarkAllHistoryConnectionsEnded marks all connections in the history database as ended.
func (db *Database) MarkAllHistoryConnectionsEnded(ctx context.Context) error { func (db *Database) MarkAllHistoryConnectionsEnded(ctx context.Context) error {
query := fmt.Sprintf("UPDATE %s.connections SET active = FALSE, ended = :ended WHERE active = TRUE", HistoryDatabase) query := fmt.Sprintf("UPDATE %s.connections SET active = FALSE, ended = :ended WHERE active = TRUE", HistoryDatabase)
@ -512,9 +599,3 @@ func (db *Database) Save(ctx context.Context, conn Conn, enableHistory bool) err
return nil return nil
} }
// Close closes the underlying database connection. db should and cannot be
// used after Close() has returned.
func (db *Database) Close() error {
return db.writeConn.Close()
}

View file

@ -39,6 +39,12 @@ type (
// UpdateBandwidth updates bandwidth data for the connection and optionally also writes // UpdateBandwidth updates bandwidth data for the connection and optionally also writes
// the bandwidth data to the history database. // the bandwidth data to the history database.
UpdateBandwidth(ctx context.Context, enableHistory bool, processKey string, connID string, bytesReceived uint64, bytesSent uint64) error UpdateBandwidth(ctx context.Context, enableHistory bool, processKey string, connID string, bytesReceived uint64, bytesSent uint64) error
// CleanupHistoryData applies data retention to the history database.
CleanupHistoryData(ctx context.Context) error
// Close closes the connection store. It must not be used afterwards.
Close() error
} }
// Manager handles new and updated network.Connections feeds and persists them // Manager handles new and updated network.Connections feeds and persists them

View file

@ -158,6 +158,26 @@ func (m *module) prepare() error {
Description: "Remove all connections from the history database for one or more profiles", Description: "Remove all connections from the history database for one or more profiles",
}); err != nil { }); err != nil {
return fmt.Errorf("failed to register API endpoint: %w", err) return fmt.Errorf("failed to register API endpoint: %w", err)
}
if err := api.RegisterEndpoint(api.Endpoint{
Path: "netquery/history/cleanup",
MimeType: "application/json",
Write: api.PermitUser,
BelongsTo: m.Module,
HandlerFunc: func(w http.ResponseWriter, r *http.Request) {
if err := m.Store.CleanupHistoryData(r.Context()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
},
Name: "Apply connection history retention threshold",
}); err != nil {
return fmt.Errorf("failed to register API endpoint: %w", err)
} }
return nil return nil
@ -200,6 +220,10 @@ func (m *module) start() error {
return nil return nil
}) })
m.StartServiceWorker("history-row-cleaner", time.Hour, func(ctx context.Context) error {
return m.Store.CleanupHistoryData(ctx)
})
m.StartServiceWorker("netquery-row-cleaner", time.Second, func(ctx context.Context) error { m.StartServiceWorker("netquery-row-cleaner", time.Second, func(ctx context.Context) error {
for { for {
select { select {
@ -242,5 +266,14 @@ func (m *module) stop() error {
log.Errorf("netquery: failed to mark connections in history database as ended: %s", err) log.Errorf("netquery: failed to mark connections in history database as ended: %s", err)
} }
if err := m.mng.store.Close(); err != nil {
log.Errorf("netquery: failed to close sqlite database: %s", err)
} else {
// try to vaccum the history database now
if err := VacuumHistory(ctx); err != nil {
log.Errorf("netquery: failed to execute VACCUM in history database: %s", err)
}
}
return nil return nil
} }

View file

@ -112,6 +112,10 @@ var (
cfgOptionEnableHistory config.BoolOption cfgOptionEnableHistory config.BoolOption
cfgOptionEnableHistoryOrder = 96 cfgOptionEnableHistoryOrder = 96
CfgOptionHistoryRetentionKey = "history/retention"
CfgOptionHistoryRetention config.IntOption
cfgOptionHistoryRetentionOrder = 97
// Setting "Enable SPN" at order 128. // Setting "Enable SPN" at order 128.
CfgOptionUseSPNKey = "spn/use" CfgOptionUseSPNKey = "spn/use"
@ -267,6 +271,27 @@ func registerConfiguration() error { //nolint:maintidx
cfgOptionEnableHistory = config.Concurrent.GetAsBool(CfgOptionEnableHistoryKey, false) cfgOptionEnableHistory = config.Concurrent.GetAsBool(CfgOptionEnableHistoryKey, false)
cfgBoolOptions[CfgOptionEnableHistoryKey] = cfgOptionEnableHistory cfgBoolOptions[CfgOptionEnableHistoryKey] = cfgOptionEnableHistory
err = config.Register(&config.Option{
Name: "History Data Retention",
Key: CfgOptionHistoryRetentionKey,
Description: "How low, in days, connections should be kept in history.",
OptType: config.OptTypeInt,
ReleaseLevel: config.ReleaseLevelStable,
ExpertiseLevel: config.ExpertiseLevelUser,
DefaultValue: 7,
Annotations: config.Annotations{
config.UnitAnnotation: "Days",
config.DisplayOrderAnnotation: cfgOptionHistoryRetentionOrder,
config.CategoryAnnotation: "History",
config.RequiresFeatureID: account.FeatureHistory,
},
})
if err != nil {
return err
}
CfgOptionHistoryRetention = config.Concurrent.GetAsInt(CfgOptionHistoryRetentionKey, 7)
cfgIntOptions[CfgOptionHistoryRetentionKey] = CfgOptionHistoryRetention
rulesHelp := strings.ReplaceAll(`Rules are checked from top to bottom, stopping after the first match. They can match: rulesHelp := strings.ReplaceAll(`Rules are checked from top to bottom, stopping after the first match. They can match:
- By address: "192.168.0.1" - By address: "192.168.0.1"

View file

@ -127,10 +127,12 @@ func GetLocalProfile(id string, md MatchingData, createProfileCallback func() *P
// Update metadata. // Update metadata.
var changed bool var changed bool
if special { if md != nil {
changed = updateSpecialProfileMetadata(profile, md.Path()) if special {
} else { changed = updateSpecialProfileMetadata(profile, md.Path())
changed = profile.updateMetadata(md.Path()) } else {
changed = profile.updateMetadata(md.Path())
}
} }
// Save if created or changed. // Save if created or changed.

View file

@ -137,6 +137,7 @@ type Profile struct { //nolint:maligned // not worth the effort
spnUsagePolicy endpoints.Endpoints spnUsagePolicy endpoints.Endpoints
spnExitHubPolicy endpoints.Endpoints spnExitHubPolicy endpoints.Endpoints
enableHistory bool enableHistory bool
historyRetention int
// Lifecycle Management // Lifecycle Management
outdated *abool.AtomicBool outdated *abool.AtomicBool
@ -239,6 +240,13 @@ func (profile *Profile) parseConfig() error {
profile.enableHistory = enableHistory profile.enableHistory = enableHistory
} }
retention, ok := profile.configPerspective.GetAsInt(CfgOptionHistoryRetentionKey)
if ok {
profile.historyRetention = int(retention)
} else {
profile.historyRetention = int(CfgOptionHistoryRetention())
}
return lastErr return lastErr
} }
@ -326,6 +334,10 @@ func (profile *Profile) HistoryEnabled() bool {
return profile.enableHistory return profile.enableHistory
} }
func (profile *Profile) HistoryRetention() int {
return profile.historyRetention
}
// GetEndpoints returns the endpoint list of the profile. This functions // GetEndpoints returns the endpoint list of the profile. This functions
// requires the profile to be read locked. // requires the profile to be read locked.
func (profile *Profile) GetEndpoints() endpoints.Endpoints { func (profile *Profile) GetEndpoints() endpoints.Endpoints {