Merge pull request #156 from safing/feature/improve-filterlists-cleanup

Use new Purge method of the database
This commit is contained in:
Daniel 2020-09-24 17:11:09 +02:00 committed by GitHub
commit 142bc1e54a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 32 additions and 97 deletions

View file

@ -20,7 +20,6 @@ func registerDatabases() error {
Name: "core",
Description: "Holds core data, such as settings and profiles",
StorageType: DefaultDatabaseStorageType,
PrimaryAPI: "",
})
if err != nil {
return err
@ -30,7 +29,6 @@ func registerDatabases() error {
Name: "cache",
Description: "Cached data, such as Intelligence and DNS Records",
StorageType: DefaultDatabaseStorageType,
PrimaryAPI: "",
})
if err != nil {
return err
@ -40,7 +38,6 @@ func registerDatabases() error {
// Name: "history",
// Description: "Historic event data",
// StorageType: DefaultDatabaseStorageType,
// PrimaryAPI: "",
// })
// if err != nil {
// return err

View file

@ -69,7 +69,6 @@ func registerControlDatabase() error {
Name: "control",
Description: "Control Interface for the Portmaster",
StorageType: "injected",
PrimaryAPI: "",
})
if err != nil {
return err

View file

@ -76,7 +76,7 @@ func isLoaded() bool {
}
}
// processListFile opens the latest version of f ile and decodes it's DSDL
// processListFile opens the latest version of file and decodes it's DSDL
// content. It calls processEntry for each decoded filterlists entry.
func processListFile(ctx context.Context, filter *scopedBloom, file *updater.File) error {
f, err := os.Open(file.Path())
@ -135,10 +135,20 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil
func persistRecords(startJob func(func() error), records <-chan record.Record) {
var cnt int
start := time.Now()
logProgress := func() {
if cnt == 0 {
// protection against panic
return
}
timePerEntity := time.Since(start) / time.Duration(cnt)
speed := float64(time.Second) / float64(timePerEntity)
log.Debugf("processed %d entities in %s with %s / entity (%.2f entities/second)", cnt, time.Since(start), timePerEntity, speed)
}
batch := database.NewInterface(&database.Options{Local: true, Internal: true})
var processBatch func() error
var processBatch func() error
processBatch = func() error {
batchPut := batch.PutMany("cache")
for r := range records {
@ -148,9 +158,7 @@ func persistRecords(startJob func(func() error), records <-chan record.Record) {
cnt++
if cnt%10000 == 0 {
timePerEntity := time.Since(start) / time.Duration(cnt)
speed := float64(time.Second) / float64(timePerEntity)
log.Debugf("processed %d entities %s with %s / entity (%.2f entits/second)", cnt, time.Since(start), timePerEntity, speed)
logProgress()
}
if cnt%1000 == 0 {
@ -164,6 +172,10 @@ func persistRecords(startJob func(func() error), records <-chan record.Record) {
}
}
// log final batch
if cnt%10000 != 0 { // avoid duplicate logging
logProgress()
}
return batchPut(nil)
}

View file

@ -129,56 +129,19 @@ func performUpdate(ctx context.Context) error {
return nil
}
func removeAllObsoleteFilterEntries(_ context.Context) error {
func removeAllObsoleteFilterEntries(ctx context.Context) error {
log.Debugf("intel/filterlists: cleanup task started, removing obsolete filter list entries ...")
for {
done, err := removeObsoleteFilterEntries(10000)
if err != nil {
return err
}
if done {
return nil
}
}
}
func removeObsoleteFilterEntries(batchSize int) (bool, error) {
iter, err := cache.Query(
query.New(filterListKeyPrefix).Where(
// TODO(ppacher): remember the timestamp we started the last update
// and use that rather than "one hour ago"
query.Where("UpdatedAt", query.LessThan, time.Now().Add(-time.Hour).Unix()),
),
)
n, err := cache.Purge(ctx, query.New(filterListKeyPrefix).Where(
// TODO(ppacher): remember the timestamp we started the last update
// and use that rather than "one hour ago"
query.Where("UpdatedAt", query.LessThan, time.Now().Add(-time.Hour).Unix()),
))
if err != nil {
return false, err
return err
}
keys := make([]string, 0, batchSize)
var cnt int
for r := range iter.Next {
cnt++
keys = append(keys, r.Key())
if cnt == batchSize {
break
}
}
iter.Cancel()
for _, key := range keys {
if err := cache.Delete(key); err != nil {
log.Errorf("intel/filterlists: failed to remove stale cache entry %q: %s", key, err)
}
}
log.Debugf("intel/filterlists: successfully removed %d obsolete entries", cnt)
// if we removed less entries that the batch size we
// are done and no more entries exist
return cnt < batchSize, nil
log.Debugf("intel/filterlists: successfully removed %d obsolete entries", n)
return nil
}
// getUpgradableFiles returns a slice of filterlists files

View file

@ -129,7 +129,6 @@ func registerAsDatabase() error {
Name: "network",
Description: "Network and Firewall Data",
StorageType: "injected",
PrimaryAPI: "",
})
if err != nil {
return err

View file

@ -85,48 +85,13 @@ func (rec *NameRecord) Save() error {
return recordDatabase.PutNew(rec)
}
func clearNameCache(_ context.Context, _ interface{}) error {
log.Debugf("resolver: name cache clearing started...")
for {
done, err := removeNameEntries(10000)
if err != nil {
return err
}
if done {
return nil
}
}
}
func removeNameEntries(batchSize int) (bool, error) {
iter, err := recordDatabase.Query(query.New(nameRecordsKeyPrefix))
func clearNameCache(ctx context.Context, _ interface{}) error {
log.Debugf("resolver: dns cache clearing started...")
n, err := recordDatabase.Purge(ctx, query.New(nameRecordsKeyPrefix))
if err != nil {
return false, err
return err
}
keys := make([]string, 0, batchSize)
var cnt int
for r := range iter.Next {
cnt++
keys = append(keys, r.Key())
if cnt == batchSize {
break
}
}
iter.Cancel()
for _, key := range keys {
if err := recordDatabase.Delete(key); err != nil {
log.Warningf("resolver: failed to remove name cache entry %q: %s", key, err)
}
}
log.Debugf("resolver: successfully removed %d name cache entries", cnt)
// if we removed less entries that the batch size we
// are done and no more entries exist
return cnt < batchSize, nil
log.Debugf("resolver: cleared %d entries in dns cache", n)
return nil
}