From f96f8d8d6ebc1d31026c4699637ccbd5c4c64bae Mon Sep 17 00:00:00 2001 From: ppacher Date: Wed, 1 Apr 2020 09:12:06 +0200 Subject: [PATCH 1/7] Added filterlist integration --- Gopkg.lock | 51 ++++++ firewall/master.go | 12 ++ intel/entity.go | 283 ++++++++++++++++++++++++------ intel/filterlist/bloom.go | 217 +++++++++++++++++++++++ intel/filterlist/cache_version.go | 57 ++++++ intel/filterlist/database.go | 195 ++++++++++++++++++++ intel/filterlist/decoder.go | 127 ++++++++++++++ intel/filterlist/index.go | 216 +++++++++++++++++++++++ intel/filterlist/keys.go | 26 +++ intel/filterlist/lookup.go | 131 ++++++++++++++ intel/filterlist/lookup_map.go | 17 ++ intel/filterlist/lookup_test.go | 92 ++++++++++ intel/filterlist/module.go | 90 ++++++++++ intel/filterlist/module_test.go | 86 +++++++++ intel/filterlist/record.go | 40 +++++ intel/filterlist/updater.go | 241 +++++++++++++++++++++++++ intel/module.go | 2 +- profile/config.go | 19 ++ profile/profile-layered.go | 44 ++++- profile/profile.go | 10 ++ 20 files changed, 1898 insertions(+), 58 deletions(-) create mode 100644 intel/filterlist/bloom.go create mode 100644 intel/filterlist/cache_version.go create mode 100644 intel/filterlist/database.go create mode 100644 intel/filterlist/decoder.go create mode 100644 intel/filterlist/index.go create mode 100644 intel/filterlist/keys.go create mode 100644 intel/filterlist/lookup.go create mode 100644 intel/filterlist/lookup_map.go create mode 100644 intel/filterlist/lookup_test.go create mode 100644 intel/filterlist/module.go create mode 100644 intel/filterlist/module_test.go create mode 100644 intel/filterlist/record.go create mode 100644 intel/filterlist/updater.go diff --git a/Gopkg.lock b/Gopkg.lock index c51616bf..4111b3f9 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -9,6 +9,14 @@ revision = "5d049714c4a64225c3c79a7cf7d02f7fb5b96338" version = "1.0.0" +[[projects]] + digest = "1:e010d6b45ee6c721df761eae89961c634ceb55feff166a48d15504729309f267" + name = "github.com/TheTannerRyan/ring" + packages = ["."] + pruneopts = "" + revision = "7b27005873e31b5d5a035e166636a09e03aaf40e" + version = "v1.1.1" + [[projects]] digest = "1:3c753679736345f50125ae993e0a2614da126859921ea7faeecda6d217501ce2" name = "github.com/agext/levenshtein" @@ -33,6 +41,14 @@ revision = "78b5fff24e6df8886ef8eca9411f683a884349a5" version = "v0.4.1" +[[projects]] + digest = "1:0deddd908b6b4b768cfc272c16ee61e7088a60f7fe2f06c547bd3d8e1f8b8e77" + name = "github.com/davecgh/go-spew" + packages = ["spew"] + pruneopts = "" + revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73" + version = "v1.1.1" + [[projects]] digest = "1:b6581f9180e0f2d5549280d71819ab951db9d511478c87daca95669589d505c0" name = "github.com/go-ole/go-ole" @@ -72,6 +88,14 @@ revision = "f0e32980c006571efd537032e5f9cd8c1a92819e" version = "v0.1.0" +[[projects]] + digest = "1:2f0c811248aeb64978037b357178b1593372439146bda860cb16f2c80785ea93" + name = "github.com/hashicorp/go-version" + packages = ["."] + pruneopts = "" + revision = "ac23dc3fea5d1a983c43f6a0f6e2c13f0195d8bd" + version = "v1.2.0" + [[projects]] digest = "1:870d441fe217b8e689d7949fef6e43efbc787e50f200cb1e70dbca9204a1d6be" name = "github.com/inconshreveable/mousetrap" @@ -104,6 +128,14 @@ revision = "2905694a1b00c5574f1418a7dbf8a22a7d247559" version = "v1.3.1" +[[projects]] + digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411" + name = "github.com/pmezard/go-difflib" + packages = ["difflib"] + pruneopts = "" + revision = "792786c7400a136282c1664665ae0a8db921c6c2" + version = "v1.0.0" + [[projects]] digest = "1:7f569d906bdd20d906b606415b7d794f798f91a62fcfb6a4daa6d50690fb7a3f" name = "github.com/satori/go.uuid" @@ -150,6 +182,14 @@ revision = "298182f68c66c05229eb03ac171abe6e309ee79a" version = "v1.0.3" +[[projects]] + digest = "1:cc4eb6813da8d08694e557fcafae8fcc24f47f61a0717f952da130ca9a486dfc" + name = "github.com/stretchr/testify" + packages = ["assert"] + pruneopts = "" + revision = "3ebf1ddaeb260c4b1ae502a01c7844fa8c1fa0e9" + version = "v1.5.1" + [[projects]] branch = "master" digest = "1:86e6712cfd4070a2120c03fcec41cfcbbc51813504a74e28d74479edfaf669ee" @@ -235,10 +275,19 @@ revision = "342b2e1fbaa52c93f31447ad2c6abc048c63e475" version = "v0.3.2" +[[projects]] + digest = "1:2efc9662a6a1ff28c65c84fc2f9030f13d3afecdb2ecad445f3b0c80e75fc281" + name = "gopkg.in/yaml.v2" + packages = ["."] + pruneopts = "" + revision = "53403b58ad1b561927d19068c655246f2db79d48" + version = "v2.2.8" + [solve-meta] analyzer-name = "dep" analyzer-version = 1 input-imports = [ + "github.com/TheTannerRyan/ring", "github.com/agext/levenshtein", "github.com/cookieo9/resources-go", "github.com/coreos/go-iptables/iptables", @@ -247,11 +296,13 @@ "github.com/google/gopacket/layers", "github.com/google/gopacket/tcpassembly", "github.com/google/renameio", + "github.com/hashicorp/go-version", "github.com/miekg/dns", "github.com/oschwald/maxminddb-golang", "github.com/satori/go.uuid", "github.com/shirou/gopsutil/process", "github.com/spf13/cobra", + "github.com/stretchr/testify/assert", "github.com/tevino/abool", "github.com/umahmood/haversine", "golang.org/x/net/icmp", diff --git a/firewall/master.go b/firewall/master.go index c5731a37..a8899b92 100644 --- a/firewall/master.go +++ b/firewall/master.go @@ -154,6 +154,18 @@ func DecideOnConnection(conn *network.Connection, pkt packet.Packet) { //nolint: } // continuing with result == NoMatch + // apply privacy filter lists + result, reason = p.MatchFilterLists(conn.Entity) + switch result { + case endpoints.Denied: + conn.Deny("endpoint in filterlist: " + reason) + return + case endpoints.NoMatch: + // nothing to do + default: + log.Debugf("filter: filter lists returned unsupported verdict: %s", result) + } + // implicit default=block for inbound if conn.Inbound { conn.Drop("endpoint is not whitelisted (incoming is always default=block)") diff --git a/intel/entity.go b/intel/entity.go index 8c152fd9..470ae7f6 100644 --- a/intel/entity.go +++ b/intel/entity.go @@ -2,41 +2,97 @@ package intel import ( "context" + "fmt" "net" + "sort" "sync" - "github.com/tevino/abool" - "github.com/safing/portbase/log" + "github.com/safing/portmaster/intel/filterlist" "github.com/safing/portmaster/intel/geoip" "github.com/safing/portmaster/status" ) // Entity describes a remote endpoint in many different ways. +// It embeddes a sync.Mutex but none of the endpoints own +// functions performs locking. The caller MUST ENSURE +// proper locking and synchronization when accessing +// any properties of Entity. type Entity struct { sync.Mutex - Domain string - IP net.IP - Protocol uint8 - Port uint16 - doReverseResolve bool - reverseResolveDone *abool.AtomicBool + Domain string + IP net.IP + Protocol uint8 + Port uint16 + reverseResolveEnabled bool + reverseResolveOnce sync.Once - Country string - ASN uint - location *geoip.Location - locationFetched *abool.AtomicBool + Country string + ASN uint + location *geoip.Location + fetchLocationOnce sync.Once - Lists []string - listsFetched *abool.AtomicBool + Lists []string + ListsMap filterlist.LookupMap + + // we only load each data above at most once + loadDomainListOnce sync.Once + loadIPListOnce sync.Once + loadCoutryListOnce sync.Once + loadAsnListOnce sync.Once + + // lists exist for most entity information and + // we need to know which one we loaded + domainListLoaded bool + ipListLoaded bool + countryListLoaded bool + asnListLoaded bool } // Init initializes the internal state and returns the entity. func (e *Entity) Init() *Entity { - e.reverseResolveDone = abool.New() - e.locationFetched = abool.New() - e.listsFetched = abool.New() + // for backwards compatibility, remove that one + return e +} + +// MergeDomain copies the Domain from other to e. It does +// not lock e or other so the caller must ensure +// proper locking of entities. +func (e *Entity) MergeDomain(other *Entity) *Entity { + + // FIXME(ppacher): should we disable reverse lookups now? + e.Domain = other.Domain + + return e +} + +// MergeLists merges the intel lists stored in other with the +// lists stored in e. Neither e nor other are locked so the +// caller must ensure proper locking on both entities. +// MergeLists ensures list entries are unique and sorted. +func (e *Entity) MergeLists(other *Entity) *Entity { + e.Lists = mergeStringList(e.Lists, other.Lists) + e.ListsMap = buildLookupMap(e.Lists) + + // mark every list other has loaded also as + // loaded in e. Don't copy values of lists + // not loaded in other because they might have + // been loaded in e. + + if other.domainListLoaded { + e.domainListLoaded = true + } + if other.ipListLoaded { + e.ipListLoaded = true + } + if other.countryListLoaded { + e.countryListLoaded = true + } + if other.asnListLoaded { + e.asnListLoaded = true + } + return e } @@ -50,26 +106,13 @@ func (e *Entity) FetchData() { // EnableReverseResolving enables reverse resolving the domain from the IP on demand. func (e *Entity) EnableReverseResolving() { - e.Lock() - defer e.Lock() - - e.doReverseResolve = true + e.reverseResolveEnabled = true } func (e *Entity) reverseResolve() { - // only get once - if !e.reverseResolveDone.IsSet() { - e.Lock() - defer e.Unlock() - - // check for concurrent request - if e.reverseResolveDone.IsSet() { - return - } - defer e.reverseResolveDone.Set() - + e.reverseResolveOnce.Do(func() { // check if we should resolve - if !e.doReverseResolve { + if !e.reverseResolveEnabled { return } @@ -89,7 +132,7 @@ func (e *Entity) reverseResolve() { return } e.Domain = domain - } + }) } // GetDomain returns the domain and whether it is set. @@ -113,17 +156,7 @@ func (e *Entity) GetIP() (net.IP, bool) { // Location func (e *Entity) getLocation() { - // only get once - if !e.locationFetched.IsSet() { - e.Lock() - defer e.Unlock() - - // check for concurrent request - if e.locationFetched.IsSet() { - return - } - defer e.locationFetched.Set() - + e.fetchLocationOnce.Do(func() { // need IP! if e.IP == nil { log.Warningf("intel: cannot get location for %s data without IP", e.Domain) @@ -139,7 +172,7 @@ func (e *Entity) getLocation() { e.location = loc e.Country = loc.Country.ISOCode e.ASN = loc.AutonomousSystemNumber - } + }) } // GetLocation returns the raw location data and whether it is set. @@ -173,21 +206,124 @@ func (e *Entity) GetASN() (uint, bool) { } // Lists - func (e *Entity) getLists() { - // only get once - if !e.listsFetched.IsSet() { - e.Lock() - defer e.Unlock() + e.getDomainLists() + e.getASNLists() + e.getIPLists() + e.getCountryLists() +} - // check for concurrent request - if e.listsFetched.IsSet() { +func (e *Entity) mergeList(list []string) { + e.Lists = mergeStringList(e.Lists, list) + e.ListsMap = buildLookupMap(e.Lists) +} + +func (e *Entity) getDomainLists() { + if e.domainListLoaded { + return + } + + domain, ok := e.GetDomain() + if !ok { + return + } + + e.loadDomainListOnce.Do(func() { + log.Debugf("intel: loading domain list for %s", domain) + list, err := filterlist.LookupDomain(domain) + if err != nil { + log.Errorf("intel: failed to get domain blocklists for %s: %s", domain, err) + e.loadDomainListOnce = sync.Once{} return } - defer e.listsFetched.Set() - // TODO: fetch lists + e.domainListLoaded = true + e.mergeList(list) + }) +} + +func (e *Entity) getASNLists() { + if e.asnListLoaded { + return } + + asn, ok := e.GetASN() + if !ok { + return + } + + log.Debugf("intel: loading ASN list for %d", asn) + e.loadAsnListOnce.Do(func() { + list, err := filterlist.LookupASNString(fmt.Sprintf("%d", asn)) + if err != nil { + log.Errorf("intel: failed to get ASN blocklist for %d: %s", asn, err) + e.loadAsnListOnce = sync.Once{} + return + } + + e.asnListLoaded = true + e.mergeList(list) + }) +} + +func (e *Entity) getCountryLists() { + if e.countryListLoaded { + return + } + + country, ok := e.GetCountry() + if !ok { + return + } + + log.Debugf("intel: loading country list for %s", country) + e.loadCoutryListOnce.Do(func() { + list, err := filterlist.LookupCountry(country) + if err != nil { + log.Errorf("intel: failed to load country blocklist for %s: %s", country, err) + e.loadCoutryListOnce = sync.Once{} + return + } + + e.countryListLoaded = true + e.mergeList(list) + }) +} + +func (e *Entity) getIPLists() { + if e.ipListLoaded { + return + } + + ip, ok := e.GetIP() + if !ok { + return + } + + if ip == nil { + return + } + // abort if it's not a global unicast (not that IPv6 link local unicasts are treated + // as global) + if !ip.IsGlobalUnicast() { + return + } + // ingore linc local unicasts as well (not done by IsGlobalUnicast above). + if ip.IsLinkLocalUnicast() { + return + } + log.Debugf("intel: loading IP list for %s", ip) + e.loadIPListOnce.Do(func() { + list, err := filterlist.LookupIP(ip) + + if err != nil { + log.Errorf("intel: failed to get IP blocklist for %s: %s", ip.String(), err) + e.loadIPListOnce = sync.Once{} + return + } + e.ipListLoaded = true + e.mergeList(list) + }) } // GetLists returns the filter list identifiers the entity matched and whether this data is set. @@ -199,3 +335,40 @@ func (e *Entity) GetLists() ([]string, bool) { } return e.Lists, true } + +// GetListsMap is like GetLists but returns a lookup map for list IDs. +func (e *Entity) GetListsMap() (filterlist.LookupMap, bool) { + e.getLists() + + if e.ListsMap == nil { + return nil, false + } + return e.ListsMap, true +} + +func mergeStringList(a, b []string) []string { + listMap := make(map[string]struct{}) + for _, s := range a { + listMap[s] = struct{}{} + } + for _, s := range b { + listMap[s] = struct{}{} + } + + res := make([]string, 0, len(listMap)) + for s := range listMap { + res = append(res, s) + } + sort.Strings(res) + return res +} + +func buildLookupMap(l []string) filterlist.LookupMap { + m := make(filterlist.LookupMap, len(l)) + + for _, s := range l { + m[s] = struct{}{} + } + + return m +} diff --git a/intel/filterlist/bloom.go b/intel/filterlist/bloom.go new file mode 100644 index 00000000..aa6e182b --- /dev/null +++ b/intel/filterlist/bloom.go @@ -0,0 +1,217 @@ +package filterlist + +import ( + "encoding/hex" + "fmt" + "strings" + "sync" + + "github.com/TheTannerRyan/ring" + "github.com/safing/portbase/database/record" + "github.com/safing/portbase/log" +) + +var defaultFilter = newScopedBloom() + +// scopedBloom is a wrapper around a bloomfilter implementation +// providing scoped filters for different entity types. +type scopedBloom struct { + rw sync.RWMutex + domain *ring.Ring + asn *ring.Ring + country *ring.Ring + ipv4 *ring.Ring + ipv6 *ring.Ring +} + +func newScopedBloom() *scopedBloom { + mustInit := func(size int) *ring.Ring { + f, err := ring.Init(size, bfFalsePositiveRate) + if err != nil { + // we panic here as those values cannot be controlled + // by the user and invalid values shouldn't be + // in a release anyway. + panic("Invalid bloom filter parameters!") + } + return f + } + return &scopedBloom{ + domain: mustInit(domainBfSize), + asn: mustInit(asnBfSize), + country: mustInit(countryBfSize), + ipv4: mustInit(ipv4BfSize), + ipv6: mustInit(ipv6BfSize), + } +} + +func (bf *scopedBloom) getBloomForType(entityType string) (*ring.Ring, error) { + var r *ring.Ring + + switch strings.ToLower(entityType) { + case "domain": + r = bf.domain + case "asn": + r = bf.asn + case "ipv4": + r = bf.ipv4 + case "ipv6": + r = bf.ipv6 + case "country": + r = bf.country + default: + return nil, fmt.Errorf("unsupported filterlist entity type %q", entityType) + } + + return r, nil +} + +func (bf *scopedBloom) add(scope, value string) { + bf.rw.RLock() + defer bf.rw.RUnlock() + + r, err := bf.getBloomForType(scope) + if err != nil { + // If we don't have a bloom filter for that scope + // we are probably running an older version that does + // not have support for it. We just drop the value + // as a call to Test() for that scope will always + // return "true" + log.Warningf("failed to add unknown entity type %q", scope) + return + } + + r.Add([]byte(value)) +} + +func (bf *scopedBloom) test(scope, value string) bool { + bf.rw.RLock() + defer bf.rw.RUnlock() + + r, err := bf.getBloomForType(scope) + if err != nil { + log.Warningf("testing for unknown entity type %q", scope) + return true // simulate a match to the caller + } + + return r.Test([]byte(value)) +} + +func (bf *scopedBloom) loadFromCache() error { + bf.rw.Lock() + defer bf.rw.Unlock() + + if err := loadBloomFromCache(bf.domain, "domain"); err != nil { + return err + } + if err := loadBloomFromCache(bf.asn, "asn"); err != nil { + return err + } + if err := loadBloomFromCache(bf.country, "country"); err != nil { + return err + } + if err := loadBloomFromCache(bf.ipv4, "ipv4"); err != nil { + return err + } + if err := loadBloomFromCache(bf.ipv6, "ipv6"); err != nil { + return err + } + + return nil +} + +func (bf *scopedBloom) saveToCache() error { + bf.rw.RLock() + defer bf.rw.RUnlock() + + if err := saveBloomToCache(bf.domain, "domain"); err != nil { + return err + } + if err := saveBloomToCache(bf.asn, "asn"); err != nil { + return err + } + if err := saveBloomToCache(bf.country, "country"); err != nil { + return err + } + if err := saveBloomToCache(bf.ipv4, "ipv4"); err != nil { + return err + } + if err := saveBloomToCache(bf.ipv6, "ipv6"); err != nil { + return err + } + + return nil +} + +func (bf *scopedBloom) replaceWith(other *scopedBloom) { + bf.rw.Lock() + defer bf.rw.Unlock() + + other.rw.RLock() + defer other.rw.RUnlock() + + bf.domain = other.domain + bf.asn = other.asn + bf.country = other.country + bf.ipv4 = other.ipv4 + bf.ipv6 = other.ipv6 +} + +type bloomFilterRecord struct { + record.Base + sync.Mutex + + Filter string +} + +// loadBloomFromCache loads the bloom filter stored under scope +// into bf. +func loadBloomFromCache(bf *ring.Ring, scope string) error { + r, err := cache.Get(makeBloomCacheKey(scope)) + if err != nil { + return err + } + + var filterRecord *bloomFilterRecord + if r.IsWrapped() { + filterRecord = new(bloomFilterRecord) + if err := record.Unwrap(r, filterRecord); err != nil { + return err + } + } else { + var ok bool + filterRecord, ok = r.(*bloomFilterRecord) + if !ok { + return fmt.Errorf("invalid type, expected bloomFilterRecord but got %T", r) + } + } + + blob, err := hex.DecodeString(filterRecord.Filter) + if err != nil { + return err + } + + if err := bf.UnmarshalBinary(blob); err != nil { + return err + } + + return nil +} + +// saveBloomToCache saves the bitset of the bloomfilter bf +// in the cache db. +func saveBloomToCache(bf *ring.Ring, scope string) error { + blob, err := bf.MarshalBinary() + if err != nil { + return err + } + + filter := hex.EncodeToString(blob) + + r := &bloomFilterRecord{ + Filter: filter, + } + + r.SetKey(makeBloomCacheKey(scope)) + + return cache.Put(r) +} diff --git a/intel/filterlist/cache_version.go b/intel/filterlist/cache_version.go new file mode 100644 index 00000000..cac9d9fd --- /dev/null +++ b/intel/filterlist/cache_version.go @@ -0,0 +1,57 @@ +package filterlist + +import ( + "fmt" + "sync" + + "github.com/hashicorp/go-version" + "github.com/safing/portbase/database/record" +) + +type cacheVersionRecord struct { + record.Base + sync.Mutex + + Version string +} + +// getCacheDatabaseVersion reads and returns the cache +// database version record. +func getCacheDatabaseVersion() (*version.Version, error) { + r, err := cache.Get(filterListCacheVersionKey) + if err != nil { + return nil, err + } + + var verRecord *cacheVersionRecord + if r.IsWrapped() { + verRecord = new(cacheVersionRecord) + if err := record.Unwrap(r, verRecord); err != nil { + return nil, err + } + } else { + var ok bool + verRecord, ok = r.(*cacheVersionRecord) + if !ok { + return nil, fmt.Errorf("invalid type, expected cacheVersionRecord but got %T", r) + } + } + + ver, err := version.NewSemver(verRecord.Version) + if err != nil { + return nil, err + } + + return ver, nil +} + +// setCacheDatabaseVersion updates the cache database +// version record to ver. +func setCacheDatabaseVersion(ver string) error { + verRecord := &cacheVersionRecord{ + Version: ver, + } + + verRecord.SetKey(filterListCacheVersionKey) + return cache.Put(verRecord) +} diff --git a/intel/filterlist/database.go b/intel/filterlist/database.go new file mode 100644 index 00000000..0552f2fc --- /dev/null +++ b/intel/filterlist/database.go @@ -0,0 +1,195 @@ +package filterlist + +import ( + "context" + "os" + "sort" + "strings" + "sync" + "time" + + "github.com/safing/portbase/database" + "github.com/safing/portbase/database/record" + "github.com/safing/portbase/log" + "github.com/safing/portbase/updater" + "github.com/safing/portmaster/updates" + "golang.org/x/sync/errgroup" +) + +const ( + baseListFilePath = "intel/lists/base.dsdl" + intermediateListFilePath = "intel/lists/intermediate.dsdl" + urgentListFilePath = "intel/lists/urgent.dsdl" + listIndexFilePath = "intel/lists/index.dsd" +) + +// default bloomfilter element sizes (estimated). +const ( + domainBfSize = 1000000 + asnBfSize = 1000 + countryBfSize = 100 + ipv4BfSize = 100 + ipv6BfSize = 100 +) + +const bfFalsePositiveRate = 0.001 +const filterlistsDisabled = "filterlists:disabled" + +var ( + filterListLock sync.RWMutex + + // Updater files for tracking upgrades. + baseFile *updater.File + intermediateFile *updater.File + urgentFile *updater.File + + filterListsLoaded chan struct{} +) + +var ( + cache = database.NewInterface(&database.Options{ + CacheSize: 2 ^ 8, + }) +) + +// getFileFunc is the function used to get a file from +// the updater. It's basically updates.GetFile and used +// for unit testing. +type getFileFunc func(string) (*updater.File, error) + +// getFile points to updates.GetFile but may be set to +// something different during unit testing. +var getFile getFileFunc = updates.GetFile + +func init() { + filterListsLoaded = make(chan struct{}) +} + +// isLoaded returns true if the filterlists have been +// loaded. +func isLoaded() bool { + select { + case <-filterListsLoaded: + return true + default: + return false + } +} + +// processListFile opens the latest version of f ile and decodes it's DSDL +// content. It calls processEntry for each decoded filterlist entry. +func processListFile(ctx context.Context, filter *scopedBloom, file *updater.File) error { + f, err := os.Open(file.Path()) + if err != nil { + return err + } + defer f.Close() + + values := make(chan *listEntry, 100) + records := make(chan record.Record, 100) + + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(values) + return decodeFile(ctx, f, values) + }) + + g.Go(func() error { + defer close(records) + for entry := range values { + if err := processEntry(ctx, filter, entry, records); err != nil { + return err + } + } + + return nil + }) + + var cnt int + start := time.Now() + + batch := database.NewInterface(&database.Options{Local: true, Internal: true}) + var startBatch func() + processBatch := func() error { + batchPut := batch.PutMany("cache") + for r := range records { + if err := batchPut(r); err != nil { + return err + } + cnt++ + + if cnt%10000 == 0 { + timePerEntity := time.Since(start) / time.Duration(cnt) + speed := float64(time.Second) / float64(timePerEntity) + log.Debugf("processed %d entities %s with %s / entity (%.2f entits/second)", cnt, time.Since(start), timePerEntity, speed) + } + + if cnt%1000 == 0 { + if err := batchPut(nil); err != nil { + return err + } + + startBatch() + + return nil + } + } + + return batchPut(nil) + } + startBatch = func() { + g.Go(processBatch) + } + + startBatch() + + return g.Wait() +} + +func normalizeEntry(entry *listEntry) { + switch strings.ToLower(entry.Type) { // + case "domain": + entry.Entity = strings.ToLower(entry.Entity) + if entry.Entity[len(entry.Entity)-1] != '.' { + // ensure domains from the filter list are fully qualified and end in dot. + entry.Entity += "." + } + default: + } +} + +func processEntry(ctx context.Context, filter *scopedBloom, entry *listEntry, records chan<- record.Record) error { + normalizeEntry(entry) + + if len(entry.Sources) > 0 { + filter.add(entry.Type, entry.Entity) + } + + r := &entityRecord{ + Value: entry.Entity, + Type: entry.Type, + Sources: entry.Sources, + UpdatedAt: time.Now().Unix(), + } + + key := makeListCacheKey(strings.ToLower(r.Type), r.Value) + r.SetKey(key) + + select { + case records <- r: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func mapKeys(m map[string]struct{}) []string { + sl := make([]string, 0, len(m)) + for s := range m { + sl = append(sl, s) + } + + sort.Strings(sl) + return sl +} diff --git a/intel/filterlist/decoder.go b/intel/filterlist/decoder.go new file mode 100644 index 00000000..566014b0 --- /dev/null +++ b/intel/filterlist/decoder.go @@ -0,0 +1,127 @@ +package filterlist + +import ( + "compress/gzip" + "context" + "encoding/binary" + "fmt" + "io" + + "github.com/safing/portbase/formats/dsd" +) + +type listEntry struct { + Entity string `json:"entity"` + Sources []string `json:"sources"` + Whitelist bool `json:"whitelist"` + Type string `json:"type"` +} + +// decodeFile decodes a DSDL filterlist file and sends decoded entities to +// ch. It blocks until all list entries have been consumed or ctx is cancelled. +func decodeFile(ctx context.Context, r io.Reader, ch chan<- *listEntry) error { + compressed, format, err := parseHeader(r) + if err != nil { + return fmt.Errorf("failed to parser header: %w", err) + } + + if compressed { + r, err = gzip.NewReader(r) + if err != nil { + return fmt.Errorf("failed to open gzip reader: %w", err) + } + } + + // we need a reader that supports io.ByteReader + reader := &byteReader{r} + var entryCount int + for { + entryCount++ + length, readErr := binary.ReadUvarint(reader) + if readErr != nil { + if readErr == io.EOF { + return nil + } + return fmt.Errorf("failed to load varint entity length: %w", readErr) + } + + blob := make([]byte, length) + _, readErr = io.ReadFull(reader, blob) + if readErr != nil { + if readErr == io.EOF { + // there shouldn't be an EOF here because + // we actually got a length above. Return + // ErrUnexpectedEOF instead of just EOF. + // io.ReadFull already returns ErrUnexpectedEOF + // if it failed to read blob as a whole but my + // return io.EOF if it read exactly 0 bytes. + readErr = io.ErrUnexpectedEOF + } + return readErr + } + + // we don't really care about the format here but it must be + // something that can encode/decode complex structures like + // JSON, BSON or GenCode. So LoadAsFormat MUST return the value + // passed as the third parameter. String or RAW encoding IS AN + // error here. + val, err := dsd.LoadAsFormat(blob, format, &listEntry{}) + if err != nil { + return fmt.Errorf("failed to decoded DSD encoded entity: %w", err) + } + entry, ok := val.(*listEntry) + if !ok { + return fmt.Errorf("unsupported encoding format: %d (%c)", format, format) + } + + select { + case ch <- entry: + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func parseHeader(r io.Reader) (compressed bool, format byte, err error) { + var listHeader [1]byte + if _, err = r.Read(listHeader[:]); err != nil { + // if we have an error here we can safely abort because + // the file must be broken + return + } + + if listHeader[0] != dsd.LIST { + err = fmt.Errorf("unexpected file type: %d (%c), expected dsd list", listHeader[0], listHeader[0]) + return + } + + var compression [1]byte + if _, err = r.Read(compression[:]); err != nil { + // same here, a DSDL file must have at least 2 bytes header + return + } + + if compression[0] == dsd.GZIP { + compressed = true + + var formatSlice [1]byte + if _, err = r.Read(formatSlice[:]); err != nil { + return + } + + format = formatSlice[0] + return + } + + format = compression[0] + return // nolint:nakedret +} + +// byteReader extends an io.Reader to implement the ByteReader interface. +type byteReader struct{ io.Reader } + +func (br *byteReader) ReadByte() (byte, error) { + var b [1]byte + _, err := br.Read(b[:]) + return b[0], err +} diff --git a/intel/filterlist/index.go b/intel/filterlist/index.go new file mode 100644 index 00000000..e7a686ee --- /dev/null +++ b/intel/filterlist/index.go @@ -0,0 +1,216 @@ +package filterlist + +import ( + "fmt" + "io/ioutil" + "sync" + + "github.com/safing/portbase/database" + "github.com/safing/portbase/database/record" + "github.com/safing/portbase/formats/dsd" + "github.com/safing/portbase/log" + "github.com/safing/portmaster/updates" +) + +// the following definitions are copied from the intelhub repository +// and stripped down to only include data required by portmaster. + +// Category is used to group different list sources by the type +// of entity they are blocking. Categories may be nested using +// the Parent field. +type Category struct { + // ID is a unique ID for the category. For sub-categories + // this ID must be used in the Parent field of any directly + // nesteded categories. + ID string `json:"id"` + + // Parent may hold the ID of another category. If set, this + // category is made a sub-category of it's parent. + Parent string `json:"parent,omitempty"` + + // Name is a human readable name for the category and can + // be used in user interfaces. + Name string `json:"name"` + + // Description is a human readable description that may be + // displayed in user interfaces. + Description string `json:"description,omitempty"` +} + +// Source defines an external filterlist source. +type Source struct { + // ID is a unique ID for the source. Entities always reference the + // sources they have been observed in using this ID. Refer to the + // Entry struct for more information. + ID string `json:"id"` + + // Name is a human readable name for the source and can be used + // in user interfaces. + Name string `json:"name"` + + // Description may hold a human readable description for the source. + // It may be used in user interfaces. + Description string `json:"description"` + + // Type describes the type of entities the source provides. Refer + // to the Type definition for more information and well-known types. + Type string `json:"type"` + + // URL points to the filterlist file. + URL string `json:"url"` + + // Category holds the unique ID of a category the source belongs to. Since + // categories can be nested the source is automatically part of all categories + // in the hierarchy. Refer to the Category struct for more information. + Category string `json:"category"` + + // Website may holds the URL of the source maintainers website. + Website string `json:"website,omitempty"` + + // License holds the license that is used for the source. + License string `json:"license"` + + // Contribute may hold an opaque string that informs a user on how to + // contribute to the source. This may be a URL or mail address. + Contribute string `json:"contribute"` +} + +// ListIndexFile describes the structure of the released list +// index file. +type ListIndexFile struct { + record.Base + sync.RWMutex + + Version string `json:"version"` + SchemaVersion string `json:"schemaVersion"` + Categories []Category `json:"categories"` + Sources []Source `json:"sources"` +} + +func (index *ListIndexFile) getCategorySources(id string) []string { + ids := make(map[string]struct{}) + + // find all sources that match against cat + for _, s := range index.Sources { + if s.Category == id { + ids[s.ID] = struct{}{} + } + } + + // find all child-categories recursing into getCategorySources. + for _, c := range index.Categories { + if c.Parent == id { + for _, sid := range index.getCategorySources(c.ID) { + ids[sid] = struct{}{} + } + } + } + + return mapKeys(ids) +} + +func (index *ListIndexFile) getSourcesMatching(id string) []string { + // if id is already a source ID we just return it + for _, s := range index.Sources { + if s.ID == id { + return []string{s.ID} + } + } + + // otherwise we need to check the category tree + return index.getCategorySources(id) +} + +func (index *ListIndexFile) getDistictSourceIDs(ids ...string) []string { + index.RLock() + defer index.RUnlock() + + distinctIDs := make(map[string]struct{}) + + for _, id := range ids { + for _, sid := range index.getSourcesMatching(id) { + distinctIDs[sid] = struct{}{} + } + } + + return mapKeys(distinctIDs) +} + +func getListIndexFromCache() (*ListIndexFile, error) { + r, err := cache.Get(filterListIndexKey) + if err != nil { + return nil, err + } + + var index *ListIndexFile + if r.IsWrapped() { + index = new(ListIndexFile) + if err := record.Unwrap(r, index); err != nil { + return nil, err + } + } else { + var ok bool + index, ok = r.(*ListIndexFile) + if !ok { + return nil, fmt.Errorf("invalid type, expected ListIndexFile but got %T", r) + } + } + + return index, nil +} + +func updateListIndex() error { + index, err := updates.GetFile(listIndexFilePath) + if err != nil { + return err + } + + blob, err := ioutil.ReadFile(index.Path()) + if err != nil { + return err + } + + res, err := dsd.Load(blob, &ListIndexFile{}) + if err != nil { + return err + } + + content, ok := res.(*ListIndexFile) + if !ok { + return fmt.Errorf("unexpected format in list index") + } + + content.SetKey(filterListIndexKey) + + if err := cache.Put(content); err != nil { + return err + } + + log.Debugf("intel/filterlists: updated cache record for list index with version %s", content.Version) + + return nil +} + +func ResolveListIDs(ids []string) ([]string, error) { + index, err := getListIndexFromCache() + + if err != nil { + if err == database.ErrNotFound { + if err := updateListIndex(); err != nil { + return nil, err + } + + // retry resolving IDs + return ResolveListIDs(ids) + } + + log.Errorf("failed to resolved ids %v: %s", ids, err) + return nil, err + } + + resolved := index.getDistictSourceIDs(ids...) + + log.Debugf("intel/filterlists: resolved ids %v to %v", ids, resolved) + + return resolved, nil +} diff --git a/intel/filterlist/keys.go b/intel/filterlist/keys.go new file mode 100644 index 00000000..635acb15 --- /dev/null +++ b/intel/filterlist/keys.go @@ -0,0 +1,26 @@ +package filterlist + +const ( + cacheDBPrefix = "cache:intel/filterlists" + + // filterListCacheVersionKey is used to store the highest version + // of a filterlist file (base, intermediate or urgent) in the + // cache database. It's used to decide if the cache database and + // bloomfilters need to be resetted and rebuilt. + filterListCacheVersionKey = cacheDBPrefix + "/version" + + // filterListIndexKey is used to store the filterlist index. + filterListIndexKey = cacheDBPrefix + "/index" + + // filterListKeyPrefix is the prefix inside that cache database + // used for filter list entries. + filterListKeyPrefix = cacheDBPrefix + "/lists/" +) + +func makeBloomCacheKey(scope string) string { + return cacheDBPrefix + "/bloom/" + scope +} + +func makeListCacheKey(scope, key string) string { + return filterListKeyPrefix + scope + "/" + key +} diff --git a/intel/filterlist/lookup.go b/intel/filterlist/lookup.go new file mode 100644 index 00000000..ee592eeb --- /dev/null +++ b/intel/filterlist/lookup.go @@ -0,0 +1,131 @@ +package filterlist + +import ( + "errors" + "fmt" + "net" + "strings" + + "github.com/safing/portbase/database" + "github.com/safing/portbase/log" +) + +// lookupBlockLists loads the entity record for key from +// cache and returns the list of blocklist sources the +// key is part of. It is not considered an error if +// key does not exist, instead, an empty slice is +// returned. +func lookupBlockLists(entity, value string) ([]string, error) { + key := makeListCacheKey(entity, value) + if !isLoaded() { + log.Warningf("intel/filterlist: not searching for %s because filterlists not loaded", key) + // filterLists have not yet been loaded so + // there's no point querying into the cache + // database. + return nil, nil + } + + filterListLock.RLock() + defer filterListLock.RUnlock() + + if !defaultFilter.test(entity, value) { + return nil, nil + } + + log.Debugf("intel/filterlist: searching for entries with %s", key) + entry, err := getEntityRecordByKey(key) + if err != nil { + if err == database.ErrNotFound { + return nil, nil + } + log.Errorf("intel/filterlists: failed to get entries for key %s: %s", key, err) + + return nil, err + } + + return entry.Sources, nil +} + +// LookupCountry returns a list of sources that mark the country +// as blacklisted. If country is not stored in the cache database +// a nil slice is returned. +func LookupCountry(country string) ([]string, error) { + return lookupBlockLists("country", country) +} + +// LookupDomain returns a list of sources that mark the domain +// as blacklisted. If domain is not stored in the cache database +// a nil slice is returned. +func LookupDomain(domain string) ([]string, error) { + // make sure we only fully qualified domains + // ending in a dot. + domain = strings.ToLower(domain) + if domain[len(domain)-1] != '.' { + domain += "." + } + return lookupBlockLists("domain", domain) +} + +// LookupASNString returns a list of sources that mark the ASN +// as blacklisted. If ASN is not stored in the cache database +// a nil slice is returned. +func LookupASNString(asn string) ([]string, error) { + return lookupBlockLists("asn", asn) +} + +// LookupIP returns a list of blacklist sources that contain +// a reference to ip. LookupIP automatically checks the IPv4 or +// IPv6 lists respectively. +func LookupIP(ip net.IP) ([]string, error) { + if ip.To4() == nil { + return LookupIPv6(ip) + } + + return LookupIPv4(ip) +} + +// LookupIPString is like LookupIP but accepts an IPv4 or +// IPv6 address in their string representations. +func LookupIPString(ipStr string) ([]string, error) { + ip := net.ParseIP(ipStr) + if ip == nil { + return nil, fmt.Errorf("invalid IP") + } + + return LookupIP(ip) +} + +// LookupIPv4String returns a list of blacklist sources that +// contain a reference to ip. If the IP is not stored in the +// cache database a nil slice is returned. +func LookupIPv4String(ipv4 string) ([]string, error) { + return lookupBlockLists("ipv4", ipv4) +} + +// LookupIPv4 is like LookupIPv4String but accepts a net.IP. +func LookupIPv4(ipv4 net.IP) ([]string, error) { + + ip := ipv4.To4() + if ip == nil { + return nil, errors.New("invalid IPv4") + } + + return LookupIPv4String(ip.String()) +} + +// LookupIPv6String returns a list of blacklist sources that +// contain a reference to ip. If the IP is not stored in the +// cache database a nil slice is returned. +func LookupIPv6String(ipv6 string) ([]string, error) { + return lookupBlockLists("ipv6", ipv6) +} + +// LookupIPv6 is like LookupIPv6String but accepts a net.IP. +func LookupIPv6(ipv6 net.IP) ([]string, error) { + ip := ipv6.To16() + if ip == nil { + return nil, errors.New("invalid IPv6") + } + + return LookupIPv6String(ip.String()) +} diff --git a/intel/filterlist/lookup_map.go b/intel/filterlist/lookup_map.go new file mode 100644 index 00000000..399617f1 --- /dev/null +++ b/intel/filterlist/lookup_map.go @@ -0,0 +1,17 @@ +package filterlist + +// LookupMap is a helper type for matching a list of endpoint sources +// against a map. +type LookupMap map[string]struct{} + +// Match returns Denied if a source in `list` is part of lm. +// If nothing is found, an empty string is returned. +func (lm LookupMap) Match(list []string) string { + for _, l := range list { + if _, ok := lm[l]; ok { + return l + } + } + + return "" +} diff --git a/intel/filterlist/lookup_test.go b/intel/filterlist/lookup_test.go new file mode 100644 index 00000000..b975a57a --- /dev/null +++ b/intel/filterlist/lookup_test.go @@ -0,0 +1,92 @@ +package filterlist + +/* + +func TestLookupASN(t *testing.T) { + lists, err := LookupASNString("123") + assert.NoError(t, err) + assert.Equal(t, []string{"TEST"}, lists) + + lists, err = LookupASNString("does-not-exist") + assert.NoError(t, err) + assert.Empty(t, lists) + + defer testMarkNotLoaded()() + lists, err = LookupASNString("123") + assert.NoError(t, err) + assert.Empty(t, lists) +} + +func TestLookupCountry(t *testing.T) { + lists, err := LookupCountry("AT") + assert.NoError(t, err) + assert.Equal(t, []string{"TEST"}, lists) + + lists, err = LookupCountry("does-not-exist") + assert.NoError(t, err) + assert.Empty(t, lists) + + defer testMarkNotLoaded()() + lists, err = LookupCountry("AT") + assert.NoError(t, err) + assert.Empty(t, lists) +} + +func TestLookupIP(t *testing.T) { + lists, err := LookupIP(net.IP{1, 1, 1, 1}) + assert.NoError(t, err) + assert.Equal(t, []string{"TEST"}, lists) + + lists, err = LookupIP(net.IP{127, 0, 0, 1}) + assert.NoError(t, err) + assert.Empty(t, lists) + + defer testMarkNotLoaded()() + lists, err = LookupIP(net.IP{1, 1, 1, 1}) + assert.NoError(t, err) + assert.Empty(t, lists) +} + +func TestLookupDomain(t *testing.T) { + lists, err := LookupDomain("example.com") + assert.NoError(t, err) + assert.Equal(t, []string{"TEST"}, lists) + + lists, err = LookupDomain("does-not-exist") + assert.NoError(t, err) + assert.Empty(t, lists) + + defer testMarkNotLoaded()() + lists, err = LookupDomain("example.com") + assert.NoError(t, err) + assert.Empty(t, lists) +} + +// testMarkNotLoaded ensures that functions believe +// filterlist are not yet loaded. It returns a +// func that restores the previous state. +func testMarkNotLoaded() func() { + if isLoaded() { + filterListsLoaded = make(chan struct{}) + return func() { + close(filterListsLoaded) + } + } + + return func() {} +} + +// testMarkLoaded is like testMarkNotLoaded but ensures +// isLoaded() return true. It returns a function to restore +// the previous state. +func testMarkLoaded() func() { + if !isLoaded() { + close(filterListsLoaded) + return func() { + filterListsLoaded = make(chan struct{}) + } + } + + return func() {} +} +*/ diff --git a/intel/filterlist/module.go b/intel/filterlist/module.go new file mode 100644 index 00000000..e6348723 --- /dev/null +++ b/intel/filterlist/module.go @@ -0,0 +1,90 @@ +package filterlist + +import ( + "context" + "fmt" + + "github.com/safing/portbase/log" + "github.com/safing/portbase/modules" + "github.com/safing/portmaster/netenv" + "github.com/safing/portmaster/updates" + "github.com/tevino/abool" +) + +var ( + module *modules.Module +) + +// booleans mainly used to decouple the module +// during testing. +var ( + ignoreUpdateEvents = abool.New() + ignoreNetEnvEvents = abool.New() +) + +func init() { + module = modules.Register("filterlist", prep, start, nil, "core", "netenv") +} + +func prep() error { + if err := module.RegisterEventHook( + updates.ModuleName, + updates.ResourceUpdateEvent, + "Check for blocklist updates", + func(ctx context.Context, _ interface{}) error { + if ignoreUpdateEvents.IsSet() { + return nil + } + + return tryListUpdate(ctx) + }, + ); err != nil { + return fmt.Errorf("failed to register resource update event handler: %w", err) + } + + if err := module.RegisterEventHook( + "netenv", + netenv.OnlineStatusChangedEvent, + "Check for blocklist updates", + func(ctx context.Context, _ interface{}) error { + if ignoreNetEnvEvents.IsSet() { + return nil + } + + // Nothing to do if we went offline. + if !netenv.Online() { + return nil + } + + return tryListUpdate(ctx) + }, + ); err != nil { + return fmt.Errorf("failed to register online status changed event handler: %w", err) + } + + return nil +} + +func start() error { + filterListLock.Lock() + defer filterListLock.Unlock() + + ver, err := getCacheDatabaseVersion() + if err == nil { + log.Debugf("intel/filterlists: cache database has version %s", ver.String()) + + if err = defaultFilter.loadFromCache(); err != nil { + err = fmt.Errorf("failed to initialize bloom filters: %w", err) + } + } + + if err != nil { + log.Debugf("intel/filterlists: blocklists disabled, waiting for update (%s)", err) + module.Warning(filterlistsDisabled, "Blocklist features disabled, waiting for update") + } else { + log.Debugf("intel/filterlists: using cache database") + close(filterListsLoaded) + } + + return nil +} diff --git a/intel/filterlist/module_test.go b/intel/filterlist/module_test.go new file mode 100644 index 00000000..956597e2 --- /dev/null +++ b/intel/filterlist/module_test.go @@ -0,0 +1,86 @@ +package filterlist + +/* +func TestMain(m *testing.M) { + // we completely ignore netenv events during testing. + ignoreNetEnvEvents.Set() + + if err := updates.DisableUpdateSchedule(); err != nil { + fmt.Fprintf(os.Stderr, "failed to disable update schedule: %s", err) + os.Exit(1) + } + pmtesting.TestMainWithHooks(m, module, loadOnStart, nil) +} + +func loadOnStart() error { + log.SetLogLevel(log.TraceLevel) + + ch := make(chan struct{}) + defer close(ch) + + if err := updates.TriggerUpdate(); err != nil { + return fmt.Errorf("failed to trigger update: %w", err) + } + + var err error + + go func() { + select { + case <-ch: + return + + case <-time.After(time.Minute): + err = fmt.Errorf("timeout loading") + close(filterListsLoaded) // let waitUntilLoaded() return + } + }() + + waitUntilLoaded() + time.Sleep(time.Second * 10) + if err != nil { + return err + } + + failureStatus, failureID, failureMsg := module.FailureStatus() + if failureStatus == modules.FailureError || failureStatus == modules.FailureWarning { + return fmt.Errorf("module in failure state: %s %q", failureID, failureMsg) + } + + // ignore update events from now on during testing. + ignoreUpdateEvents.Set() + + testSources := []string{"TEST"} + testEntries := []*listEntry{ + { + Entity: "example.com", + Sources: testSources, + Type: "Domain", + }, + { + Entity: "1.1.1.1", + Sources: testSources, + Type: "IPv4", + }, + { + Entity: "AT", + Sources: testSources, + Type: "Country", + }, + { + Entity: "123", + Sources: testSources, + Type: "ASN", + }, + } + + for _, e := range testEntries { + // add some test entries + if err := processEntry(e); err != nil { + return err + } + + } + + return nil +} +*/ diff --git a/intel/filterlist/record.go b/intel/filterlist/record.go new file mode 100644 index 00000000..23cd7cf9 --- /dev/null +++ b/intel/filterlist/record.go @@ -0,0 +1,40 @@ +package filterlist + +import ( + "fmt" + "sync" + + "github.com/safing/portbase/database/record" +) + +type entityRecord struct { + record.Base `json:"-"` + sync.Mutex `json:"-"` + + Value string + Sources []string + Type string + UpdatedAt int64 +} + +func getEntityRecordByKey(key string) (*entityRecord, error) { + r, err := cache.Get(key) + if err != nil { + return nil, err + } + + if r.IsWrapped() { + new := &entityRecord{} + if err := record.Unwrap(r, new); err != nil { + return nil, err + } + + return new, nil + } + + e, ok := r.(*entityRecord) + if !ok { + return nil, fmt.Errorf("record not of type *entityRecord, but %T", r) + } + return e, nil +} diff --git a/intel/filterlist/updater.go b/intel/filterlist/updater.go new file mode 100644 index 00000000..3eee0027 --- /dev/null +++ b/intel/filterlist/updater.go @@ -0,0 +1,241 @@ +package filterlist + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/hashicorp/go-version" + "github.com/safing/portbase/database" + "github.com/safing/portbase/database/query" + "github.com/safing/portbase/log" + "github.com/safing/portbase/updater" + "github.com/tevino/abool" +) + +var updateInProgress = abool.New() + +// tryListUpdate wraps performUpdate but ensures the module's +// error state is correctly set or resolved. +func tryListUpdate(ctx context.Context) error { + err := performUpdate(ctx) + + if err != nil { + if !isLoaded() { + module.Error(filterlistsDisabled, err.Error()) + } + return err + } + + // if the module is in an error state resolve that right now. + module.Resolve(filterlistsDisabled) + return nil +} + +func performUpdate(ctx context.Context) error { + if !updateInProgress.SetToIf(false, true) { + log.Debugf("intel/filterlists: upgrade already in progress") + return nil + } + + upgradables, err := getUpgradableFiles() + if err != nil { + return err + } + + if len(upgradables) == 0 { + log.Debugf("intel/filterlists: ignoring update, latest version is already used") + return nil + } + + cleanupRequired := false + filterToUpdate := defaultFilter + + // perform the actual upgrade by processing each file + // in the returned order. + for idx, file := range upgradables { + log.Debugf("applying update %s version %s", file.Identifier(), file.Version()) + + if file == baseFile { + if idx != 0 { + log.Warningf("intel/filterlists: upgrade order is wrong, base file needs to be updated first not at idx %d", idx) + // we still continue because after processing the base + // file everything is correct again, we just used some + // CPU and IO resources for nothing when processing + // the previous files. + } + cleanupRequired = true + + // since we are processing a base update we will create our + // bloom filters from scratch. + filterToUpdate = newScopedBloom() + } + + if err := processListFile(ctx, filterToUpdate, file); err != nil { + return fmt.Errorf("failed to process upgrade %s: %w", file.Identifier(), err) + } + } + + if filterToUpdate != defaultFilter { + // replace the bloom filters in our default + // filter. + defaultFilter.replaceWith(filterToUpdate) + } + + // from now on, the database is ready and can be used if + // it wasn't loaded yet. + if !isLoaded() { + close(filterListsLoaded) + } + + if err := defaultFilter.saveToCache(); err != nil { + // just handle the error by logging as it's only consequence + // is that we will need to reprocess all files during the next + // start. + log.Errorf("intel/filterlists: failed to persist bloom filters in cache database: %s", err) + } + + // if we processed the base file we need to perform + // some cleanup on filterlist entities that have not + // been updated now. Once we are done, start a worker + // for that purpose. + if cleanupRequired { + defer module.StartWorker("filterlist:cleanup", removeObsoleteFilterEntries) + } + + // try to save the highest version of our files. + highestVersion := upgradables[len(upgradables)-1] + if err := setCacheDatabaseVersion(highestVersion.Version()); err != nil { + log.Errorf("intel/filterlists: failed to save cache database version: %s", err) + } + + return nil +} + +func removeObsoleteFilterEntries(_ context.Context) error { + log.Infof("intel/filterlists: cleanup task started, removing obsolete filter list entries ...") + + iter, err := cache.Query( + query.New(filterListKeyPrefix).Where( + // TODO(ppacher): remember the timestamp we started the last update + // and use that rather than "one hour ago" + query.Where("UpdatedAt", query.LessThan, time.Now().Add(-time.Hour).Unix()), + ), + ) + if err != nil { + return err + } + + var cnt int + for r := range iter.Next { + cnt++ + r.Meta().Delete() + if err := cache.Put(r); err != nil { + log.Errorf("intel/filterlists: failed to remove stale cache entry %q: %s", r.Key(), err) + } + } + + log.Debugf("intel/filterlists: successfully removed %d obsolete entries", cnt) + return nil +} + +// getUpgradableFiles returns a slice of filterlist files +// that should be updated. The files MUST be updated and +// processed in the returned order! +func getUpgradableFiles() ([]*updater.File, error) { + var updateOrder []*updater.File + + cacheDBInUse := isLoaded() + + if baseFile == nil || baseFile.UpgradeAvailable() || !cacheDBInUse { + var err error + baseFile, err = getFile(baseListFilePath) + if err != nil { + return nil, err + } + updateOrder = append(updateOrder, baseFile) + } + + if intermediateFile == nil || intermediateFile.UpgradeAvailable() || !cacheDBInUse { + var err error + intermediateFile, err = getFile(intermediateListFilePath) + if err != nil && err != updater.ErrNotFound { + return nil, err + } + + if err == nil { + updateOrder = append(updateOrder, intermediateFile) + } + } + + if urgentFile == nil || urgentFile.UpgradeAvailable() || !cacheDBInUse { + var err error + urgentFile, err = getFile(urgentListFilePath) + if err != nil && err != updater.ErrNotFound { + return nil, err + } + + if err == nil { + updateOrder = append(updateOrder, intermediateFile) + } + } + + return resolveUpdateOrder(updateOrder) +} + +func resolveUpdateOrder(updateOrder []*updater.File) ([]*updater.File, error) { + // sort the update order by ascending version + sort.Sort(byAscVersion(updateOrder)) + + var cacheDBVersion *version.Version + if !isLoaded() { + cacheDBVersion, _ = version.NewSemver("v0.0.0") + } else { + var err error + cacheDBVersion, err = getCacheDatabaseVersion() + if err != nil { + if err != database.ErrNotFound { + log.Errorf("intel/filterlists: failed to get cache database version: %s", err) + } + cacheDBVersion, _ = version.NewSemver("v0.0.0") + } + } + + startAtIdx := -1 + for idx, file := range updateOrder { + ver, _ := version.NewSemver(file.Version()) + log.Tracef("intel/filterlists: checking file with version %s against %s", ver, cacheDBVersion) + if ver.GreaterThan(cacheDBVersion) && (startAtIdx == -1 || file == baseFile) { + startAtIdx = idx + } + } + + // if startAtIdx == -1 we don't have any upgradables to + // process. + if startAtIdx == -1 { + log.Tracef("intel/filterlists: nothing to process, latest version %s already in use", cacheDBVersion) + return nil, nil + } + + // skip any files that are lower then the current cache db version + // or after which a base upgrade would be performed. + return updateOrder[startAtIdx:], nil +} + +type byAscVersion []*updater.File + +func (fs byAscVersion) Len() int { return len(fs) } +func (fs byAscVersion) Less(i, j int) bool { + vi, _ := version.NewSemver(fs[i].Version()) + vj, _ := version.NewSemver(fs[j].Version()) + + return vi.LessThan(vj) +} +func (fs byAscVersion) Swap(i, j int) { + fi := fs[i] + fj := fs[j] + + fs[i] = fj + fs[j] = fi +} diff --git a/intel/module.go b/intel/module.go index ea4d013f..aeb231e0 100644 --- a/intel/module.go +++ b/intel/module.go @@ -10,5 +10,5 @@ var ( ) func init() { - Module = modules.Register("intel", nil, nil, nil, "geoip") + Module = modules.Register("intel", nil, nil, nil, "geoip", "filterlist") } diff --git a/profile/config.go b/profile/config.go index 6075e02a..82ff8766 100644 --- a/profile/config.go +++ b/profile/config.go @@ -24,6 +24,9 @@ var ( CfgOptionServiceEndpointsKey = "filter/serviceEndpoints" cfgOptionServiceEndpoints config.StringArrayOption + CfgOptionFilterListKey = "filter/lists" + cfgOptionFilterLists config.StringArrayOption + CfgOptionBlockScopeLocalKey = "filter/blockLocal" cfgOptionBlockScopeLocal config.IntOption // security level option @@ -135,6 +138,22 @@ Examples: cfgOptionServiceEndpoints = config.Concurrent.GetAsStringArray(CfgOptionServiceEndpointsKey, []string{}) cfgStringArrayOptions[CfgOptionServiceEndpointsKey] = cfgOptionServiceEndpoints + // Filter list IDs + err = config.Register(&config.Option{ + Name: "Filterlists", + Key: CfgOptionFilterListKey, + Description: "Filter connections by matching the endpoint against configured filterlists", + OptType: config.OptTypeStringArray, + DefaultValue: []string{}, + ExternalOptType: "filter list", + ValidationRegex: `^[a-zA-Z0-9\-]+$`, + }) + if err != nil { + return err + } + cfgOptionFilterLists = config.Concurrent.GetAsStringArray(CfgOptionFilterListKey, []string{}) + cfgStringArrayOptions[CfgOptionFilterListKey] = cfgOptionFilterLists + // Block Scope Local err = config.Register(&config.Option{ Name: "Block Scope Local", diff --git a/profile/profile-layered.go b/profile/profile-layered.go index 11c9b004..58c03a80 100644 --- a/profile/profile-layered.go +++ b/profile/profile-layered.go @@ -6,6 +6,7 @@ import ( "github.com/safing/portbase/log" + "github.com/safing/portmaster/intel/filterlist" "github.com/safing/portmaster/status" "github.com/tevino/abool" @@ -94,7 +95,7 @@ func NewLayeredProfile(localProfile *Profile) *LayeredProfile { cfgOptionRemoveBlockedDNS, ) - // TODO: load referenced profiles + // TODO: load linked profiles. // FUTURE: load forced company profile new.layers = append(new.layers, localProfile) @@ -154,7 +155,7 @@ func (lp *LayeredProfile) Update() (revisionCounter uint64) { func (lp *LayeredProfile) updateCaches() { // update security level - var newLevel uint8 = 0 + var newLevel uint8 for _, layer := range lp.layers { if newLevel < layer.SecurityLevel { newLevel = layer.SecurityLevel @@ -217,6 +218,45 @@ func (lp *LayeredProfile) MatchServiceEndpoint(entity *intel.Entity) (result end return cfgServiceEndpoints.Match(entity) } +// MatchFilterLists matches the entity against the set of filter +// lists. +func (lp *LayeredProfile) MatchFilterLists(entity *intel.Entity) (result endpoints.EPResult, reason string) { + lookupMap, hasLists := entity.GetListsMap() + if !hasLists { + return endpoints.NoMatch, "" + } + + log.Errorf("number of layers: %d", len(lp.layers)) + for _, layer := range lp.layers { + if id := lookupMap.Match(layer.filterListIDs); id != "" { + return endpoints.Denied, id + } + + // only check the first layer that has filter list + // IDs defined. + if len(layer.filterListIDs) > 0 { + return endpoints.NoMatch, "" + } + } + + // TODO(ppacher): re-resolving global list IDs is a bit overkill, + // add some caching here. + cfgLock.RLock() + defer cfgLock.RUnlock() + + globalIds, err := filterlist.ResolveListIDs(cfgOptionFilterLists()) + if err != nil { + log.Errorf("filter: failed to get global filter list IDs: %s", err) + return endpoints.NoMatch, "" + } + + if id := lookupMap.Match(globalIds); id != "" { + return endpoints.Denied, id + } + + return endpoints.NoMatch, "" +} + // AddEndpoint adds an endpoint to the local endpoint list, saves the local profile and reloads the configuration. func (lp *LayeredProfile) AddEndpoint(newEntry string) { lp.localProfile.AddEndpoint(newEntry) diff --git a/profile/profile.go b/profile/profile.go index 8aa8c3b8..13ec36a7 100644 --- a/profile/profile.go +++ b/profile/profile.go @@ -14,6 +14,7 @@ import ( "github.com/safing/portbase/config" "github.com/safing/portbase/database/record" + "github.com/safing/portmaster/intel/filterlist" "github.com/safing/portmaster/profile/endpoints" ) @@ -73,6 +74,7 @@ type Profile struct { //nolint:maligned // not worth the effort defaultAction uint8 endpoints endpoints.Endpoints serviceEndpoints endpoints.Endpoints + filterListIDs []string // Lifecycle Management oudated *abool.AtomicBool @@ -140,6 +142,14 @@ func (profile *Profile) parseConfig() error { } } + list, ok = profile.configPerspective.GetAsStringArray(CfgOptionFilterListKey) + if ok { + profile.filterListIDs, err = filterlist.ResolveListIDs(list) + if err != nil { + lastErr = err + } + } + return lastErr } From f630df0b1f4aae9fe071ba697160b557c36b292b Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Tue, 14 Apr 2020 11:14:04 +0200 Subject: [PATCH 2/7] Implemented peer review comments --- intel/entity.go | 12 +++++----- intel/filterlist/database.go | 30 +++++++++++++++++++++---- intel/filterlist/index.go | 2 ++ intel/filterlist/lookup_map.go | 14 +++++++++--- intel/filterlist/updater.go | 40 ++++++++++++++++++++++++++++------ profile/config-update.go | 8 +++++++ profile/config.go | 1 + profile/profile-layered.go | 18 ++++----------- 8 files changed, 90 insertions(+), 35 deletions(-) diff --git a/intel/entity.go b/intel/entity.go index 470ae7f6..f4aba5f8 100644 --- a/intel/entity.go +++ b/intel/entity.go @@ -10,6 +10,7 @@ import ( "github.com/safing/portbase/log" "github.com/safing/portmaster/intel/filterlist" "github.com/safing/portmaster/intel/geoip" + "github.com/safing/portmaster/network/netutils" "github.com/safing/portmaster/status" ) @@ -303,15 +304,12 @@ func (e *Entity) getIPLists() { if ip == nil { return } - // abort if it's not a global unicast (not that IPv6 link local unicasts are treated - // as global) - if !ip.IsGlobalUnicast() { - return - } - // ingore linc local unicasts as well (not done by IsGlobalUnicast above). - if ip.IsLinkLocalUnicast() { + + // only load lists for IP addresses that are classified as global. + if netutils.ClassifyIP(ip) != netutils.Global { return } + log.Debugf("intel: loading IP list for %s", ip) e.loadIPListOnce.Do(func() { list, err := filterlist.LookupIP(ip) diff --git a/intel/filterlist/database.go b/intel/filterlist/database.go index 0552f2fc..c4915f4b 100644 --- a/intel/filterlist/database.go +++ b/intel/filterlist/database.go @@ -2,6 +2,7 @@ package filterlist import ( "context" + "fmt" "os" "sort" "strings" @@ -90,12 +91,33 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { + // startSafe runs fn inside the error group but wrapped + // in recovered function. + startSafe := func(fn func() error) { + g.Go(func() (err error) { + defer func() { + if x := recover(); x != nil { + if e, ok := x.(error); ok { + err = e + } else { + err = fmt.Errorf("%v", x) + } + } + }() + + err = fn() + return err + }) + } + + startSafe(func() (err error) { defer close(values) - return decodeFile(ctx, f, values) + + err = decodeFile(ctx, f, values) + return }) - g.Go(func() error { + startSafe(func() error { defer close(records) for entry := range values { if err := processEntry(ctx, filter, entry, records); err != nil { @@ -139,7 +161,7 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil return batchPut(nil) } startBatch = func() { - g.Go(processBatch) + startSafe(processBatch) } startBatch() diff --git a/intel/filterlist/index.go b/intel/filterlist/index.go index e7a686ee..67ac34f0 100644 --- a/intel/filterlist/index.go +++ b/intel/filterlist/index.go @@ -191,6 +191,8 @@ func updateListIndex() error { return nil } +// ResolveListIDs resolves a slice of source or category IDs into +// a slice of distinct source IDs. func ResolveListIDs(ids []string) ([]string, error) { index, err := getListIndexFromCache() diff --git a/intel/filterlist/lookup_map.go b/intel/filterlist/lookup_map.go index 399617f1..0828e41e 100644 --- a/intel/filterlist/lookup_map.go +++ b/intel/filterlist/lookup_map.go @@ -1,17 +1,25 @@ package filterlist +import "strings" + // LookupMap is a helper type for matching a list of endpoint sources // against a map. type LookupMap map[string]struct{} -// Match returns Denied if a source in `list` is part of lm. +// Match checks if a source in `list` is part of lm. +// Matches are joined to string and returned. // If nothing is found, an empty string is returned. func (lm LookupMap) Match(list []string) string { + matches := make([]string, 0, len(list)) for _, l := range list { if _, ok := lm[l]; ok { - return l + matches = append(matches, l) } } - return "" + if len(matches) == 0 { + return "" + } + + return strings.Join(matches, ", ") } diff --git a/intel/filterlist/updater.go b/intel/filterlist/updater.go index 3eee0027..8593730e 100644 --- a/intel/filterlist/updater.go +++ b/intel/filterlist/updater.go @@ -101,7 +101,7 @@ func performUpdate(ctx context.Context) error { // been updated now. Once we are done, start a worker // for that purpose. if cleanupRequired { - defer module.StartWorker("filterlist:cleanup", removeObsoleteFilterEntries) + defer module.StartWorker("filterlist:cleanup", removeAllObsoleteFilterEntries) } // try to save the highest version of our files. @@ -113,7 +113,20 @@ func performUpdate(ctx context.Context) error { return nil } -func removeObsoleteFilterEntries(_ context.Context) error { +func removeAllObsoleteFilterEntries(_ context.Context) error { + for { + done, err := removeObsoleteFilterEntries(1000) + if err != nil { + return err + } + + if done { + return nil + } + } +} + +func removeObsoleteFilterEntries(batchSize int) (bool, error) { log.Infof("intel/filterlists: cleanup task started, removing obsolete filter list entries ...") iter, err := cache.Query( @@ -124,20 +137,33 @@ func removeObsoleteFilterEntries(_ context.Context) error { ), ) if err != nil { - return err + return false, err } + keys := make([]string, 0, batchSize) + var cnt int for r := range iter.Next { cnt++ - r.Meta().Delete() - if err := cache.Put(r); err != nil { - log.Errorf("intel/filterlists: failed to remove stale cache entry %q: %s", r.Key(), err) + keys = append(keys, r.Key()) + + if cnt == batchSize { + break + } + } + iter.Cancel() + + for _, key := range keys { + if err := cache.Delete(key); err != nil { + log.Errorf("intel/filterlists: failed to remove stale cache entry %q: %s", key, err) } } log.Debugf("intel/filterlists: successfully removed %d obsolete entries", cnt) - return nil + + // if we removed less entries that the batch size we + // are done and no more entries exist + return cnt < batchSize, nil } // getUpgradableFiles returns a slice of filterlist files diff --git a/profile/config-update.go b/profile/config-update.go index cac888b9..6d32fa1c 100644 --- a/profile/config-update.go +++ b/profile/config-update.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "github.com/safing/portmaster/intel/filterlist" "github.com/safing/portmaster/profile/endpoints" ) @@ -14,6 +15,7 @@ var ( cfgDefaultAction uint8 cfgEndpoints endpoints.Endpoints cfgServiceEndpoints endpoints.Endpoints + cfgFilterLists []string ) func registerConfigUpdater() error { @@ -60,6 +62,12 @@ func updateGlobalConfigProfile(ctx context.Context, data interface{}) error { lastErr = err } + list = cfgOptionFilterLists() + cfgFilterLists, err = filterlist.ResolveListIDs(list) + if err != nil { + lastErr = err + } + // build global profile for reference profile := &Profile{ ID: "config", diff --git a/profile/config.go b/profile/config.go index 82ff8766..1ec9a6f3 100644 --- a/profile/config.go +++ b/profile/config.go @@ -63,6 +63,7 @@ func registerConfiguration() error { Description: `The default filter action when nothing else permits or blocks a connection.`, OptType: config.OptTypeString, DefaultValue: "permit", + ExternalOptType: "string list", ValidationRegex: "^(permit|ask|block)$", }) if err != nil { diff --git a/profile/profile-layered.go b/profile/profile-layered.go index 58c03a80..1a91dae6 100644 --- a/profile/profile-layered.go +++ b/profile/profile-layered.go @@ -6,7 +6,6 @@ import ( "github.com/safing/portbase/log" - "github.com/safing/portmaster/intel/filterlist" "github.com/safing/portmaster/status" "github.com/tevino/abool" @@ -228,8 +227,8 @@ func (lp *LayeredProfile) MatchFilterLists(entity *intel.Entity) (result endpoin log.Errorf("number of layers: %d", len(lp.layers)) for _, layer := range lp.layers { - if id := lookupMap.Match(layer.filterListIDs); id != "" { - return endpoints.Denied, id + if reason := lookupMap.Match(layer.filterListIDs); reason != "" { + return endpoints.Denied, reason } // only check the first layer that has filter list @@ -239,19 +238,10 @@ func (lp *LayeredProfile) MatchFilterLists(entity *intel.Entity) (result endpoin } } - // TODO(ppacher): re-resolving global list IDs is a bit overkill, - // add some caching here. cfgLock.RLock() defer cfgLock.RUnlock() - - globalIds, err := filterlist.ResolveListIDs(cfgOptionFilterLists()) - if err != nil { - log.Errorf("filter: failed to get global filter list IDs: %s", err) - return endpoints.NoMatch, "" - } - - if id := lookupMap.Match(globalIds); id != "" { - return endpoints.Denied, id + if reason := lookupMap.Match(cfgFilterLists); reason != "" { + return endpoints.Denied, reason } return endpoints.NoMatch, "" From e77d971259dc7ee448f47ad46594cb697c367a4a Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Tue, 14 Apr 2020 11:23:15 +0200 Subject: [PATCH 3/7] Rename filterlist to filterlists --- intel/entity.go | 18 +++++++++--------- intel/{filterlist => filterlists}/bloom.go | 4 ++-- .../cache_version.go | 2 +- intel/{filterlist => filterlists}/database.go | 4 ++-- intel/{filterlist => filterlists}/decoder.go | 4 ++-- intel/{filterlist => filterlists}/index.go | 6 +++--- intel/{filterlist => filterlists}/keys.go | 6 +++--- intel/{filterlist => filterlists}/lookup.go | 6 +++--- .../{filterlist => filterlists}/lookup_map.go | 2 +- .../{filterlist => filterlists}/lookup_test.go | 4 ++-- intel/{filterlist => filterlists}/module.go | 4 ++-- .../{filterlist => filterlists}/module_test.go | 2 +- intel/{filterlist => filterlists}/record.go | 2 +- intel/{filterlist => filterlists}/updater.go | 8 ++++---- intel/module.go | 2 +- profile/config-update.go | 4 ++-- profile/profile.go | 4 ++-- 17 files changed, 41 insertions(+), 41 deletions(-) rename intel/{filterlist => filterlists}/bloom.go (97%) rename intel/{filterlist => filterlists}/cache_version.go (98%) rename intel/{filterlist => filterlists}/database.go (98%) rename intel/{filterlist => filterlists}/decoder.go (97%) rename intel/{filterlist => filterlists}/index.go (98%) rename intel/{filterlist => filterlists}/keys.go (82%) rename intel/{filterlist => filterlists}/lookup.go (94%) rename intel/{filterlist => filterlists}/lookup_map.go (96%) rename intel/{filterlist => filterlists}/lookup_test.go (96%) rename intel/{filterlist => filterlists}/module.go (95%) rename intel/{filterlist => filterlists}/module_test.go (98%) rename intel/{filterlist => filterlists}/record.go (96%) rename intel/{filterlist => filterlists}/updater.go (96%) diff --git a/intel/entity.go b/intel/entity.go index f4aba5f8..d1544676 100644 --- a/intel/entity.go +++ b/intel/entity.go @@ -8,7 +8,7 @@ import ( "sync" "github.com/safing/portbase/log" - "github.com/safing/portmaster/intel/filterlist" + "github.com/safing/portmaster/intel/filterlists" "github.com/safing/portmaster/intel/geoip" "github.com/safing/portmaster/network/netutils" "github.com/safing/portmaster/status" @@ -35,7 +35,7 @@ type Entity struct { fetchLocationOnce sync.Once Lists []string - ListsMap filterlist.LookupMap + ListsMap filterlists.LookupMap // we only load each data above at most once loadDomainListOnce sync.Once @@ -231,7 +231,7 @@ func (e *Entity) getDomainLists() { e.loadDomainListOnce.Do(func() { log.Debugf("intel: loading domain list for %s", domain) - list, err := filterlist.LookupDomain(domain) + list, err := filterlists.LookupDomain(domain) if err != nil { log.Errorf("intel: failed to get domain blocklists for %s: %s", domain, err) e.loadDomainListOnce = sync.Once{} @@ -255,7 +255,7 @@ func (e *Entity) getASNLists() { log.Debugf("intel: loading ASN list for %d", asn) e.loadAsnListOnce.Do(func() { - list, err := filterlist.LookupASNString(fmt.Sprintf("%d", asn)) + list, err := filterlists.LookupASNString(fmt.Sprintf("%d", asn)) if err != nil { log.Errorf("intel: failed to get ASN blocklist for %d: %s", asn, err) e.loadAsnListOnce = sync.Once{} @@ -279,7 +279,7 @@ func (e *Entity) getCountryLists() { log.Debugf("intel: loading country list for %s", country) e.loadCoutryListOnce.Do(func() { - list, err := filterlist.LookupCountry(country) + list, err := filterlists.LookupCountry(country) if err != nil { log.Errorf("intel: failed to load country blocklist for %s: %s", country, err) e.loadCoutryListOnce = sync.Once{} @@ -312,7 +312,7 @@ func (e *Entity) getIPLists() { log.Debugf("intel: loading IP list for %s", ip) e.loadIPListOnce.Do(func() { - list, err := filterlist.LookupIP(ip) + list, err := filterlists.LookupIP(ip) if err != nil { log.Errorf("intel: failed to get IP blocklist for %s: %s", ip.String(), err) @@ -335,7 +335,7 @@ func (e *Entity) GetLists() ([]string, bool) { } // GetListsMap is like GetLists but returns a lookup map for list IDs. -func (e *Entity) GetListsMap() (filterlist.LookupMap, bool) { +func (e *Entity) GetListsMap() (filterlists.LookupMap, bool) { e.getLists() if e.ListsMap == nil { @@ -361,8 +361,8 @@ func mergeStringList(a, b []string) []string { return res } -func buildLookupMap(l []string) filterlist.LookupMap { - m := make(filterlist.LookupMap, len(l)) +func buildLookupMap(l []string) filterlists.LookupMap { + m := make(filterlists.LookupMap, len(l)) for _, s := range l { m[s] = struct{}{} diff --git a/intel/filterlist/bloom.go b/intel/filterlists/bloom.go similarity index 97% rename from intel/filterlist/bloom.go rename to intel/filterlists/bloom.go index aa6e182b..4df4f70a 100644 --- a/intel/filterlist/bloom.go +++ b/intel/filterlists/bloom.go @@ -1,4 +1,4 @@ -package filterlist +package filterlists import ( "encoding/hex" @@ -59,7 +59,7 @@ func (bf *scopedBloom) getBloomForType(entityType string) (*ring.Ring, error) { case "country": r = bf.country default: - return nil, fmt.Errorf("unsupported filterlist entity type %q", entityType) + return nil, fmt.Errorf("unsupported filterlists entity type %q", entityType) } return r, nil diff --git a/intel/filterlist/cache_version.go b/intel/filterlists/cache_version.go similarity index 98% rename from intel/filterlist/cache_version.go rename to intel/filterlists/cache_version.go index cac9d9fd..c48d8bd4 100644 --- a/intel/filterlist/cache_version.go +++ b/intel/filterlists/cache_version.go @@ -1,4 +1,4 @@ -package filterlist +package filterlists import ( "fmt" diff --git a/intel/filterlist/database.go b/intel/filterlists/database.go similarity index 98% rename from intel/filterlist/database.go rename to intel/filterlists/database.go index c4915f4b..cee940f2 100644 --- a/intel/filterlist/database.go +++ b/intel/filterlists/database.go @@ -1,4 +1,4 @@ -package filterlist +package filterlists import ( "context" @@ -78,7 +78,7 @@ func isLoaded() bool { } // processListFile opens the latest version of f ile and decodes it's DSDL -// content. It calls processEntry for each decoded filterlist entry. +// content. It calls processEntry for each decoded filterlists entry. func processListFile(ctx context.Context, filter *scopedBloom, file *updater.File) error { f, err := os.Open(file.Path()) if err != nil { diff --git a/intel/filterlist/decoder.go b/intel/filterlists/decoder.go similarity index 97% rename from intel/filterlist/decoder.go rename to intel/filterlists/decoder.go index 566014b0..49133790 100644 --- a/intel/filterlist/decoder.go +++ b/intel/filterlists/decoder.go @@ -1,4 +1,4 @@ -package filterlist +package filterlists import ( "compress/gzip" @@ -17,7 +17,7 @@ type listEntry struct { Type string `json:"type"` } -// decodeFile decodes a DSDL filterlist file and sends decoded entities to +// decodeFile decodes a DSDL filterlists file and sends decoded entities to // ch. It blocks until all list entries have been consumed or ctx is cancelled. func decodeFile(ctx context.Context, r io.Reader, ch chan<- *listEntry) error { compressed, format, err := parseHeader(r) diff --git a/intel/filterlist/index.go b/intel/filterlists/index.go similarity index 98% rename from intel/filterlist/index.go rename to intel/filterlists/index.go index 67ac34f0..ae436494 100644 --- a/intel/filterlist/index.go +++ b/intel/filterlists/index.go @@ -1,4 +1,4 @@ -package filterlist +package filterlists import ( "fmt" @@ -37,7 +37,7 @@ type Category struct { Description string `json:"description,omitempty"` } -// Source defines an external filterlist source. +// Source defines an external filterlists source. type Source struct { // ID is a unique ID for the source. Entities always reference the // sources they have been observed in using this ID. Refer to the @@ -56,7 +56,7 @@ type Source struct { // to the Type definition for more information and well-known types. Type string `json:"type"` - // URL points to the filterlist file. + // URL points to the filterlists file. URL string `json:"url"` // Category holds the unique ID of a category the source belongs to. Since diff --git a/intel/filterlist/keys.go b/intel/filterlists/keys.go similarity index 82% rename from intel/filterlist/keys.go rename to intel/filterlists/keys.go index 635acb15..9ecffa46 100644 --- a/intel/filterlist/keys.go +++ b/intel/filterlists/keys.go @@ -1,15 +1,15 @@ -package filterlist +package filterlists const ( cacheDBPrefix = "cache:intel/filterlists" // filterListCacheVersionKey is used to store the highest version - // of a filterlist file (base, intermediate or urgent) in the + // of a filterlists file (base, intermediate or urgent) in the // cache database. It's used to decide if the cache database and // bloomfilters need to be resetted and rebuilt. filterListCacheVersionKey = cacheDBPrefix + "/version" - // filterListIndexKey is used to store the filterlist index. + // filterListIndexKey is used to store the filterlists index. filterListIndexKey = cacheDBPrefix + "/index" // filterListKeyPrefix is the prefix inside that cache database diff --git a/intel/filterlist/lookup.go b/intel/filterlists/lookup.go similarity index 94% rename from intel/filterlist/lookup.go rename to intel/filterlists/lookup.go index ee592eeb..8304efb3 100644 --- a/intel/filterlist/lookup.go +++ b/intel/filterlists/lookup.go @@ -1,4 +1,4 @@ -package filterlist +package filterlists import ( "errors" @@ -18,7 +18,7 @@ import ( func lookupBlockLists(entity, value string) ([]string, error) { key := makeListCacheKey(entity, value) if !isLoaded() { - log.Warningf("intel/filterlist: not searching for %s because filterlists not loaded", key) + log.Warningf("intel/filterlists: not searching for %s because filterlists not loaded", key) // filterLists have not yet been loaded so // there's no point querying into the cache // database. @@ -32,7 +32,7 @@ func lookupBlockLists(entity, value string) ([]string, error) { return nil, nil } - log.Debugf("intel/filterlist: searching for entries with %s", key) + log.Debugf("intel/filterlists: searching for entries with %s", key) entry, err := getEntityRecordByKey(key) if err != nil { if err == database.ErrNotFound { diff --git a/intel/filterlist/lookup_map.go b/intel/filterlists/lookup_map.go similarity index 96% rename from intel/filterlist/lookup_map.go rename to intel/filterlists/lookup_map.go index 0828e41e..5a161c91 100644 --- a/intel/filterlist/lookup_map.go +++ b/intel/filterlists/lookup_map.go @@ -1,4 +1,4 @@ -package filterlist +package filterlists import "strings" diff --git a/intel/filterlist/lookup_test.go b/intel/filterlists/lookup_test.go similarity index 96% rename from intel/filterlist/lookup_test.go rename to intel/filterlists/lookup_test.go index b975a57a..0a2e9220 100644 --- a/intel/filterlist/lookup_test.go +++ b/intel/filterlists/lookup_test.go @@ -1,4 +1,4 @@ -package filterlist +package filterlists /* @@ -63,7 +63,7 @@ func TestLookupDomain(t *testing.T) { } // testMarkNotLoaded ensures that functions believe -// filterlist are not yet loaded. It returns a +// filterlists are not yet loaded. It returns a // func that restores the previous state. func testMarkNotLoaded() func() { if isLoaded() { diff --git a/intel/filterlist/module.go b/intel/filterlists/module.go similarity index 95% rename from intel/filterlist/module.go rename to intel/filterlists/module.go index e6348723..14805f69 100644 --- a/intel/filterlist/module.go +++ b/intel/filterlists/module.go @@ -1,4 +1,4 @@ -package filterlist +package filterlists import ( "context" @@ -23,7 +23,7 @@ var ( ) func init() { - module = modules.Register("filterlist", prep, start, nil, "core", "netenv") + module = modules.Register("filterlists", prep, start, nil, "core", "netenv") } func prep() error { diff --git a/intel/filterlist/module_test.go b/intel/filterlists/module_test.go similarity index 98% rename from intel/filterlist/module_test.go rename to intel/filterlists/module_test.go index 956597e2..df157355 100644 --- a/intel/filterlist/module_test.go +++ b/intel/filterlists/module_test.go @@ -1,4 +1,4 @@ -package filterlist +package filterlists /* func TestMain(m *testing.M) { diff --git a/intel/filterlist/record.go b/intel/filterlists/record.go similarity index 96% rename from intel/filterlist/record.go rename to intel/filterlists/record.go index 23cd7cf9..8865ce9d 100644 --- a/intel/filterlist/record.go +++ b/intel/filterlists/record.go @@ -1,4 +1,4 @@ -package filterlist +package filterlists import ( "fmt" diff --git a/intel/filterlist/updater.go b/intel/filterlists/updater.go similarity index 96% rename from intel/filterlist/updater.go rename to intel/filterlists/updater.go index 8593730e..a9c0322b 100644 --- a/intel/filterlist/updater.go +++ b/intel/filterlists/updater.go @@ -1,4 +1,4 @@ -package filterlist +package filterlists import ( "context" @@ -97,11 +97,11 @@ func performUpdate(ctx context.Context) error { } // if we processed the base file we need to perform - // some cleanup on filterlist entities that have not + // some cleanup on filterlists entities that have not // been updated now. Once we are done, start a worker // for that purpose. if cleanupRequired { - defer module.StartWorker("filterlist:cleanup", removeAllObsoleteFilterEntries) + defer module.StartWorker("filterlists:cleanup", removeAllObsoleteFilterEntries) } // try to save the highest version of our files. @@ -166,7 +166,7 @@ func removeObsoleteFilterEntries(batchSize int) (bool, error) { return cnt < batchSize, nil } -// getUpgradableFiles returns a slice of filterlist files +// getUpgradableFiles returns a slice of filterlists files // that should be updated. The files MUST be updated and // processed in the returned order! func getUpgradableFiles() ([]*updater.File, error) { diff --git a/intel/module.go b/intel/module.go index aeb231e0..8c30985e 100644 --- a/intel/module.go +++ b/intel/module.go @@ -10,5 +10,5 @@ var ( ) func init() { - Module = modules.Register("intel", nil, nil, nil, "geoip", "filterlist") + Module = modules.Register("intel", nil, nil, nil, "geoip", "filterlists") } diff --git a/profile/config-update.go b/profile/config-update.go index 6d32fa1c..15e62b8e 100644 --- a/profile/config-update.go +++ b/profile/config-update.go @@ -5,7 +5,7 @@ import ( "fmt" "sync" - "github.com/safing/portmaster/intel/filterlist" + "github.com/safing/portmaster/intel/filterlists" "github.com/safing/portmaster/profile/endpoints" ) @@ -63,7 +63,7 @@ func updateGlobalConfigProfile(ctx context.Context, data interface{}) error { } list = cfgOptionFilterLists() - cfgFilterLists, err = filterlist.ResolveListIDs(list) + cfgFilterLists, err = filterlists.ResolveListIDs(list) if err != nil { lastErr = err } diff --git a/profile/profile.go b/profile/profile.go index 13ec36a7..46e3b447 100644 --- a/profile/profile.go +++ b/profile/profile.go @@ -14,7 +14,7 @@ import ( "github.com/safing/portbase/config" "github.com/safing/portbase/database/record" - "github.com/safing/portmaster/intel/filterlist" + "github.com/safing/portmaster/intel/filterlists" "github.com/safing/portmaster/profile/endpoints" ) @@ -144,7 +144,7 @@ func (profile *Profile) parseConfig() error { list, ok = profile.configPerspective.GetAsStringArray(CfgOptionFilterListKey) if ok { - profile.filterListIDs, err = filterlist.ResolveListIDs(list) + profile.filterListIDs, err = filterlists.ResolveListIDs(list) if err != nil { lastErr = err } From a6d768958d42df25a8bec8cc5b6de5856015f29c Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Tue, 14 Apr 2020 11:24:42 +0200 Subject: [PATCH 4/7] Streamline naming of filter list option --- profile/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/profile/config.go b/profile/config.go index 1ec9a6f3..941af738 100644 --- a/profile/config.go +++ b/profile/config.go @@ -141,7 +141,7 @@ Examples: // Filter list IDs err = config.Register(&config.Option{ - Name: "Filterlists", + Name: "Filter List", Key: CfgOptionFilterListKey, Description: "Filter connections by matching the endpoint against configured filterlists", OptType: config.OptTypeStringArray, From fc9835f91a0a10d5e3460e6830331b5eed460283 Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Tue, 14 Apr 2020 14:02:05 +0200 Subject: [PATCH 5/7] Fix resource index not updated. Minor module improvements --- intel/filterlists/bloom.go | 2 +- intel/filterlists/database.go | 21 +++++++++++---------- intel/filterlists/module.go | 10 ++++++++++ intel/filterlists/updater.go | 22 +++++++++++++++++----- updates/main.go | 6 +++++- 5 files changed, 44 insertions(+), 17 deletions(-) diff --git a/intel/filterlists/bloom.go b/intel/filterlists/bloom.go index 4df4f70a..93cde1ed 100644 --- a/intel/filterlists/bloom.go +++ b/intel/filterlists/bloom.go @@ -76,7 +76,7 @@ func (bf *scopedBloom) add(scope, value string) { // not have support for it. We just drop the value // as a call to Test() for that scope will always // return "true" - log.Warningf("failed to add unknown entity type %q", scope) + log.Warningf("failed to add unknown entity type %q with value %q", scope, value) return } diff --git a/intel/filterlists/database.go b/intel/filterlists/database.go index cee940f2..46c54673 100644 --- a/intel/filterlists/database.go +++ b/intel/filterlists/database.go @@ -34,7 +34,6 @@ const ( ) const bfFalsePositiveRate = 0.001 -const filterlistsDisabled = "filterlists:disabled" var ( filterListLock sync.RWMutex @@ -128,12 +127,19 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil return nil }) + persistRecords(startSafe, records) + + return g.Wait() +} + +func persistRecords(startJob func(func() error), records <-chan record.Record) { var cnt int start := time.Now() batch := database.NewInterface(&database.Options{Local: true, Internal: true}) - var startBatch func() - processBatch := func() error { + var processBatch func() error + + processBatch = func() error { batchPut := batch.PutMany("cache") for r := range records { if err := batchPut(r); err != nil { @@ -152,7 +158,7 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil return err } - startBatch() + startJob(processBatch) return nil } @@ -160,13 +166,8 @@ func processListFile(ctx context.Context, filter *scopedBloom, file *updater.Fil return batchPut(nil) } - startBatch = func() { - startSafe(processBatch) - } - startBatch() - - return g.Wait() + startJob(processBatch) } func normalizeEntry(entry *listEntry) { diff --git a/intel/filterlists/module.go b/intel/filterlists/module.go index 14805f69..9b74c5c4 100644 --- a/intel/filterlists/module.go +++ b/intel/filterlists/module.go @@ -15,6 +15,14 @@ var ( module *modules.Module ) +const ( + filterlistsDisabled = "filterlists:disabled" + filterlistsStaleDataSurvived = "filterlists:staledata" + filterlistsStaleDataDescr = "Removing stale filter list records failed. Some connections may be overblocked." + filterlistsUpdateInProgress = "filterlists:update-in-progress" + filterlistsUpdateInProgressDescr = "Performance slightly degraded during list update." +) + // booleans mainly used to decouple the module // during testing. var ( @@ -23,6 +31,8 @@ var ( ) func init() { + ignoreNetEnvEvents.Set() + module = modules.Register("filterlists", prep, start, nil, "core", "netenv") } diff --git a/intel/filterlists/updater.go b/intel/filterlists/updater.go index a9c0322b..5a4bfffc 100644 --- a/intel/filterlists/updater.go +++ b/intel/filterlists/updater.go @@ -28,8 +28,10 @@ func tryListUpdate(ctx context.Context) error { return err } - // if the module is in an error state resolve that right now. + // if the module is in an error, warning or hint state resolve that right now. module.Resolve(filterlistsDisabled) + module.Resolve(filterlistsStaleDataSurvived) + module.Resolve(filterlistsUpdateInProgress) return nil } @@ -38,11 +40,15 @@ func performUpdate(ctx context.Context) error { log.Debugf("intel/filterlists: upgrade already in progress") return nil } + defer updateInProgress.UnSet() + + module.Hint(filterlistsUpdateInProgress, filterlistsUpdateInProgressDescr) upgradables, err := getUpgradableFiles() if err != nil { return err } + log.Debugf("intel/filterlists: resources to update: %v", upgradables) if len(upgradables) == 0 { log.Debugf("intel/filterlists: ignoring update, latest version is already used") @@ -55,7 +61,7 @@ func performUpdate(ctx context.Context) error { // perform the actual upgrade by processing each file // in the returned order. for idx, file := range upgradables { - log.Debugf("applying update %s version %s", file.Identifier(), file.Version()) + log.Debugf("intel/filterlists: applying update (%d) %s version %s", idx, file.Identifier(), file.Version()) if file == baseFile { if idx != 0 { @@ -101,7 +107,13 @@ func performUpdate(ctx context.Context) error { // been updated now. Once we are done, start a worker // for that purpose. if cleanupRequired { - defer module.StartWorker("filterlists:cleanup", removeAllObsoleteFilterEntries) + if err := module.RunWorker("filterlists:cleanup", removeAllObsoleteFilterEntries); err != nil { + // if we failed to remove all stale cache entries + // we abort now WITHOUT updating the database version. This means + // we'll try again during the next update. + module.Warning(filterlistsStaleDataSurvived, filterlistsStaleDataDescr) + return fmt.Errorf("failed to cleanup stale cache records: %w", err) + } } // try to save the highest version of our files. @@ -114,8 +126,9 @@ func performUpdate(ctx context.Context) error { } func removeAllObsoleteFilterEntries(_ context.Context) error { + log.Infof("intel/filterlists: cleanup task started, removing obsolete filter list entries ...") for { - done, err := removeObsoleteFilterEntries(1000) + done, err := removeObsoleteFilterEntries(10000) if err != nil { return err } @@ -127,7 +140,6 @@ func removeAllObsoleteFilterEntries(_ context.Context) error { } func removeObsoleteFilterEntries(batchSize int) (bool, error) { - log.Infof("intel/filterlists: cleanup task started, removing obsolete filter list entries ...") iter, err := cache.Query( query.New(filterListKeyPrefix).Where( diff --git a/updates/main.go b/updates/main.go index f2c9b78c..f8741bf7 100644 --- a/updates/main.go +++ b/updates/main.go @@ -157,9 +157,13 @@ func DisableUpdateSchedule() error { } func checkForUpdates(ctx context.Context) error { + if err := registry.UpdateIndexes(); err != nil { + return fmt.Errorf("updates: failed to update indexes: %w", err) + } + err := registry.DownloadUpdates(ctx) if err != nil { - return fmt.Errorf("updates: failed to update: %s", err) + return fmt.Errorf("updates: failed to update: %w", err) } module.TriggerEvent(ResourceUpdateEvent, nil) return nil From ef560447aeee839f2c2850fafee0923b49caa3b0 Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Tue, 14 Apr 2020 14:05:53 +0200 Subject: [PATCH 6/7] Remove debug message --- profile/profile-layered.go | 1 - 1 file changed, 1 deletion(-) diff --git a/profile/profile-layered.go b/profile/profile-layered.go index 1a91dae6..7cd6f4e4 100644 --- a/profile/profile-layered.go +++ b/profile/profile-layered.go @@ -225,7 +225,6 @@ func (lp *LayeredProfile) MatchFilterLists(entity *intel.Entity) (result endpoin return endpoints.NoMatch, "" } - log.Errorf("number of layers: %d", len(lp.layers)) for _, layer := range lp.layers { if reason := lookupMap.Match(layer.filterListIDs); reason != "" { return endpoints.Denied, reason From 568674bdf84b064dcf089d0b18d009b2b3449e47 Mon Sep 17 00:00:00 2001 From: Patrick Pacher Date: Tue, 14 Apr 2020 14:41:55 +0200 Subject: [PATCH 7/7] Fix updater and add more logging to filterlist --- intel/filterlists/updater.go | 11 ++++++++--- updates/main.go | 3 +++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/intel/filterlists/updater.go b/intel/filterlists/updater.go index 5a4bfffc..3be9144f 100644 --- a/intel/filterlists/updater.go +++ b/intel/filterlists/updater.go @@ -120,13 +120,15 @@ func performUpdate(ctx context.Context) error { highestVersion := upgradables[len(upgradables)-1] if err := setCacheDatabaseVersion(highestVersion.Version()); err != nil { log.Errorf("intel/filterlists: failed to save cache database version: %s", err) + } else { + log.Infof("intel/filterlists: successfully migrated cache database to %s", highestVersion) } return nil } func removeAllObsoleteFilterEntries(_ context.Context) error { - log.Infof("intel/filterlists: cleanup task started, removing obsolete filter list entries ...") + log.Debugf("intel/filterlists: cleanup task started, removing obsolete filter list entries ...") for { done, err := removeObsoleteFilterEntries(10000) if err != nil { @@ -140,7 +142,6 @@ func removeAllObsoleteFilterEntries(_ context.Context) error { } func removeObsoleteFilterEntries(batchSize int) (bool, error) { - iter, err := cache.Query( query.New(filterListKeyPrefix).Where( // TODO(ppacher): remember the timestamp we started the last update @@ -192,6 +193,7 @@ func getUpgradableFiles() ([]*updater.File, error) { if err != nil { return nil, err } + log.Tracef("intel/filterlists: base file needs update, selected version %s", baseFile.Version()) updateOrder = append(updateOrder, baseFile) } @@ -203,6 +205,7 @@ func getUpgradableFiles() ([]*updater.File, error) { } if err == nil { + log.Tracef("intel/filterlists: intermediate file needs update, selected version %s", intermediateFile.Version()) updateOrder = append(updateOrder, intermediateFile) } } @@ -215,7 +218,8 @@ func getUpgradableFiles() ([]*updater.File, error) { } if err == nil { - updateOrder = append(updateOrder, intermediateFile) + log.Tracef("intel/filterlists: urgent file needs update, selected version %s", urgentFile.Version()) + updateOrder = append(updateOrder, urgentFile) } } @@ -225,6 +229,7 @@ func getUpgradableFiles() ([]*updater.File, error) { func resolveUpdateOrder(updateOrder []*updater.File) ([]*updater.File, error) { // sort the update order by ascending version sort.Sort(byAscVersion(updateOrder)) + log.Tracef("intel/filterlists: order of updates: %v", updateOrder) var cacheDBVersion *version.Version if !isLoaded() { diff --git a/updates/main.go b/updates/main.go index f8741bf7..3a7fef99 100644 --- a/updates/main.go +++ b/updates/main.go @@ -165,6 +165,9 @@ func checkForUpdates(ctx context.Context) error { if err != nil { return fmt.Errorf("updates: failed to update: %w", err) } + + registry.SelectVersions() + module.TriggerEvent(ResourceUpdateEvent, nil) return nil }