Use new Purge method of the database

This commit is contained in:
Daniel 2020-09-24 16:57:54 +02:00
parent 6093c5b847
commit f46e759524
6 changed files with 32 additions and 97 deletions

View file

@ -20,7 +20,6 @@ func registerDatabases() error {
Name: "core", Name: "core",
Description: "Holds core data, such as settings and profiles", Description: "Holds core data, such as settings and profiles",
StorageType: DefaultDatabaseStorageType, StorageType: DefaultDatabaseStorageType,
PrimaryAPI: "",
}) })
if err != nil { if err != nil {
return err return err
@ -30,7 +29,6 @@ func registerDatabases() error {
Name: "cache", Name: "cache",
Description: "Cached data, such as Intelligence and DNS Records", Description: "Cached data, such as Intelligence and DNS Records",
StorageType: DefaultDatabaseStorageType, StorageType: DefaultDatabaseStorageType,
PrimaryAPI: "",
}) })
if err != nil { if err != nil {
return err return err
@ -40,7 +38,6 @@ func registerDatabases() error {
// Name: "history", // Name: "history",
// Description: "Historic event data", // Description: "Historic event data",
// StorageType: DefaultDatabaseStorageType, // StorageType: DefaultDatabaseStorageType,
// PrimaryAPI: "",
// }) // })
// if err != nil { // if err != nil {
// return err // return err

View file

@ -69,7 +69,6 @@ func registerControlDatabase() error {
Name: "control", Name: "control",
Description: "Control Interface for the Portmaster", Description: "Control Interface for the Portmaster",
StorageType: "injected", StorageType: "injected",
PrimaryAPI: "",
}) })
if err != nil { if err != nil {
return err return err

View file

@ -76,7 +76,7 @@ func isLoaded() bool {
} }
} }
// processListFile opens the latest version of f ile and decodes it's DSDL // processListFile opens the latest version of file and decodes it's DSDL
// content. It calls processEntry for each decoded filterlists entry. // content. It calls processEntry for each decoded filterlists entry.
func processListFile(ctx context.Context, filter *scopedBloom, file *updater.File) error { func processListFile(ctx context.Context, filter *scopedBloom, file *updater.File) error {
f, err := os.Open(file.Path()) f, err := os.Open(file.Path())
@ -135,10 +135,20 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil
func persistRecords(startJob func(func() error), records <-chan record.Record) { func persistRecords(startJob func(func() error), records <-chan record.Record) {
var cnt int var cnt int
start := time.Now() start := time.Now()
logProgress := func() {
if cnt == 0 {
// protection against panic
return
}
timePerEntity := time.Since(start) / time.Duration(cnt)
speed := float64(time.Second) / float64(timePerEntity)
log.Debugf("processed %d entities in %s with %s / entity (%.2f entities/second)", cnt, time.Since(start), timePerEntity, speed)
}
batch := database.NewInterface(&database.Options{Local: true, Internal: true}) batch := database.NewInterface(&database.Options{Local: true, Internal: true})
var processBatch func() error
var processBatch func() error
processBatch = func() error { processBatch = func() error {
batchPut := batch.PutMany("cache") batchPut := batch.PutMany("cache")
for r := range records { for r := range records {
@ -148,9 +158,7 @@ func persistRecords(startJob func(func() error), records <-chan record.Record) {
cnt++ cnt++
if cnt%10000 == 0 { if cnt%10000 == 0 {
timePerEntity := time.Since(start) / time.Duration(cnt) logProgress()
speed := float64(time.Second) / float64(timePerEntity)
log.Debugf("processed %d entities %s with %s / entity (%.2f entits/second)", cnt, time.Since(start), timePerEntity, speed)
} }
if cnt%1000 == 0 { if cnt%1000 == 0 {
@ -164,6 +172,10 @@ func persistRecords(startJob func(func() error), records <-chan record.Record) {
} }
} }
// log final batch
if cnt%10000 != 0 { // avoid duplicate logging
logProgress()
}
return batchPut(nil) return batchPut(nil)
} }

View file

@ -129,56 +129,19 @@ func performUpdate(ctx context.Context) error {
return nil return nil
} }
func removeAllObsoleteFilterEntries(_ context.Context) error { func removeAllObsoleteFilterEntries(ctx context.Context) error {
log.Debugf("intel/filterlists: cleanup task started, removing obsolete filter list entries ...") log.Debugf("intel/filterlists: cleanup task started, removing obsolete filter list entries ...")
for { n, err := cache.Purge(ctx, query.New(filterListKeyPrefix).Where(
done, err := removeObsoleteFilterEntries(10000) // TODO(ppacher): remember the timestamp we started the last update
if err != nil { // and use that rather than "one hour ago"
return err query.Where("UpdatedAt", query.LessThan, time.Now().Add(-time.Hour).Unix()),
} ))
if done {
return nil
}
}
}
func removeObsoleteFilterEntries(batchSize int) (bool, error) {
iter, err := cache.Query(
query.New(filterListKeyPrefix).Where(
// TODO(ppacher): remember the timestamp we started the last update
// and use that rather than "one hour ago"
query.Where("UpdatedAt", query.LessThan, time.Now().Add(-time.Hour).Unix()),
),
)
if err != nil { if err != nil {
return false, err return err
} }
keys := make([]string, 0, batchSize) log.Debugf("intel/filterlists: successfully removed %d obsolete entries", n)
return nil
var cnt int
for r := range iter.Next {
cnt++
keys = append(keys, r.Key())
if cnt == batchSize {
break
}
}
iter.Cancel()
for _, key := range keys {
if err := cache.Delete(key); err != nil {
log.Errorf("intel/filterlists: failed to remove stale cache entry %q: %s", key, err)
}
}
log.Debugf("intel/filterlists: successfully removed %d obsolete entries", cnt)
// if we removed less entries that the batch size we
// are done and no more entries exist
return cnt < batchSize, nil
} }
// getUpgradableFiles returns a slice of filterlists files // getUpgradableFiles returns a slice of filterlists files

View file

@ -129,7 +129,6 @@ func registerAsDatabase() error {
Name: "network", Name: "network",
Description: "Network and Firewall Data", Description: "Network and Firewall Data",
StorageType: "injected", StorageType: "injected",
PrimaryAPI: "",
}) })
if err != nil { if err != nil {
return err return err

View file

@ -85,48 +85,13 @@ func (rec *NameRecord) Save() error {
return recordDatabase.PutNew(rec) return recordDatabase.PutNew(rec)
} }
func clearNameCache(_ context.Context, _ interface{}) error { func clearNameCache(ctx context.Context, _ interface{}) error {
log.Debugf("resolver: name cache clearing started...") log.Debugf("resolver: dns cache clearing started...")
for { n, err := recordDatabase.Purge(ctx, query.New(nameRecordsKeyPrefix))
done, err := removeNameEntries(10000)
if err != nil {
return err
}
if done {
return nil
}
}
}
func removeNameEntries(batchSize int) (bool, error) {
iter, err := recordDatabase.Query(query.New(nameRecordsKeyPrefix))
if err != nil { if err != nil {
return false, err return err
} }
keys := make([]string, 0, batchSize) log.Debugf("resolver: cleared %d entries in dns cache", n)
return nil
var cnt int
for r := range iter.Next {
cnt++
keys = append(keys, r.Key())
if cnt == batchSize {
break
}
}
iter.Cancel()
for _, key := range keys {
if err := recordDatabase.Delete(key); err != nil {
log.Warningf("resolver: failed to remove name cache entry %q: %s", key, err)
}
}
log.Debugf("resolver: successfully removed %d name cache entries", cnt)
// if we removed less entries that the batch size we
// are done and no more entries exist
return cnt < batchSize, nil
} }