From 462b070a1b9a76e85151cfb98012fcae40967e53 Mon Sep 17 00:00:00 2001 From: Lowder Date: Tue, 10 Mar 2026 22:24:30 +0500 Subject: [PATCH] fix: lists swapping (#14) --- Cargo.lock | 12 +++++++- querying/Cargo.toml | 1 + querying/src/geoip.rs | 6 ++++ querying/src/lib.rs | 71 +++++++++++++++++++++++++++++-------------- querying/src/lists.rs | 12 ++++++++ website/Cargo.toml | 2 +- 6 files changed, 79 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 916bdc8..da040c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,6 +88,15 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "arc-swap" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9f3647c145568cec02c42054e07bdf9a5a698e15b466fb2341bfc393cd24aa5" +dependencies = [ + "rustversion", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -2348,6 +2357,7 @@ dependencies = [ name = "querying" version = "0.1.0" dependencies = [ + "arc-swap", "async-trait", "chrono", "csv", @@ -4163,7 +4173,7 @@ dependencies = [ [[package]] name = "website" -version = "0.4.2" +version = "0.4.3" dependencies = [ "dotenvy", "env_logger", diff --git a/querying/Cargo.toml b/querying/Cargo.toml index 6128384..54b2054 100644 --- a/querying/Cargo.toml +++ b/querying/Cargo.toml @@ -18,6 +18,7 @@ hickory-resolver = { version = "0.26.0-alpha.1", features = ["tokio", "webpki-ro thiserror = "2.0.18" url = "2.5.8" indicatif = "0.18.4" +arc-swap = "1.7" chrono = { version = "0.4.44", features = ["serde"] } futures-util = "0.3.32" serde_json = "1.0" diff --git a/querying/src/geoip.rs b/querying/src/geoip.rs index e29567c..4d0d876 100644 --- a/querying/src/geoip.rs +++ b/querying/src/geoip.rs @@ -43,6 +43,12 @@ impl GeoIp { } } + pub fn load(asn: Vec, country: Vec, city: Vec) -> Result { + let mut geoip = Self::new(); + geoip.update(asn, country, city)?; + Ok(geoip) + } + pub fn update(&mut self, asn: Vec, country: Vec, city: Vec) -> Result<(), MaxMindDbError> { self.asn = Some(maxminddb::Reader::from_source(asn)?); self.country = Some(maxminddb::Reader::from_source(country)?); diff --git a/querying/src/lib.rs b/querying/src/lib.rs index d9f705b..10238e1 100644 --- a/querying/src/lib.rs +++ b/querying/src/lib.rs @@ -12,7 +12,8 @@ use std::net::IpAddr; use std::sync::Arc; use maxminddb::MaxMindDbError; use thiserror::Error; -use tokio::sync::{watch, RwLock}; +use tokio::sync::watch; +use arc_swap::ArcSwap; pub mod asn; pub mod geoip; @@ -27,9 +28,9 @@ pub use subnet_sampler::{sample_ipv4_subnet, sample_ipv6_subnet}; pub struct Checker { rx: watch::Receiver>>, tx: watch::Sender>>, - cdn_list: Arc>, - ru_blacklist: Arc>, - geo_ip: Arc>, + cdn_list: ArcSwap, + ru_blacklist: ArcSwap, + geo_ip: ArcSwap, resolver: Resolver, } @@ -39,7 +40,7 @@ pub struct Check { pub geo: IpInfo, pub ips: Vec, pub rkn_subnets: HashSet, - pub asn_info: Option, + pub asn_info: Option, } #[derive(Clone)] @@ -70,15 +71,15 @@ impl Checker { Checker { rx, tx, - cdn_list: Arc::new(RwLock::new(CdnList::new())), - ru_blacklist: Arc::new(RwLock::new(RuBlacklist::new())), - geo_ip: Arc::new(RwLock::new(GeoIp::new())), + cdn_list: ArcSwap::from_pointee(CdnList::new()), + ru_blacklist: ArcSwap::from_pointee(RuBlacklist::new()), + geo_ip: ArcSwap::from_pointee(GeoIp::new()), resolver: Resolver::new().await, } } pub async fn geo_ip(&self, ip: IpAddr) -> Result { - self.geo_ip.read().await.lookup(ip) + self.geo_ip.load().lookup(ip) } pub async fn check(&self, target: Target) -> Result { @@ -92,7 +93,7 @@ impl Checker { return Err(CheckError::ResolveError(e)); }, }; - let geo_ip = self.geo_ip.read().await; + let geo_ip = self.geo_ip.load(); let geo = match ips.get(0).map(|ip| geo_ip.lookup(ip.clone())) { None => IpInfo::default(), Some(Ok(ip)) => ip, @@ -103,7 +104,7 @@ impl Checker { }; let mut cdn_provider_subnets: HashMap> = HashMap::new(); - let cdn_list = self.cdn_list.read().await; + let cdn_list = self.cdn_list.load(); ips.iter() .filter_map(|ip| cdn_list.contains(ip)) .map(|ip| (match &ip.region { @@ -114,7 +115,7 @@ impl Checker { cdn_provider_subnets.entry(k).or_default().insert(v); }); - let ru_blacklist = self.ru_blacklist.read().await; + let ru_blacklist = self.ru_blacklist.load(); let domain = match &target { Target::Domain(domain) => ru_blacklist.contains_domain(domain), _ => None @@ -125,7 +126,7 @@ impl Checker { .collect(); let asn_info = if let Target::Asn(asn) = &target { - let prefixes = crate::asn::fetch_asn_prefixes_cached( + let prefixes = asn::fetch_asn_prefixes_cached( *asn, |asn| self.resolver.asn_cache.get_cached_asn(asn), |asn, prefixes| self.resolver.asn_cache.cache_asn(asn, prefixes), @@ -155,7 +156,7 @@ impl Checker { } } - Some(crate::asn::AsnInfo::new(*asn, prefixes, blocked_prefixes)) + Some(asn::AsnInfo::new(*asn, prefixes, blocked_prefixes)) } else { None }; @@ -189,26 +190,50 @@ impl Checker { Ok((GeoIp::download().await?, RuBlacklist::download().await?, CdnList::download().await?)) } - pub async fn update_all(&self, (geo_ip, ru_blacklist, cdn_list): Bases) { - if let Err(e) = self.geo_ip.write().await.install(geo_ip).await { - error!("Failed to update GeoIP: {}", e); + pub async fn update_all(&self, (geo_ip_base, ru_blacklist_base, cdn_list_base): Bases) { + let geo_ip = match GeoIp::load(geo_ip_base.0, geo_ip_base.1, geo_ip_base.2) { + Ok(geoip) => Some(geoip), + Err(e) => { + error!("Failed to load GeoIP: {}", e); + None + } + }; + + let ru_blacklist = match RuBlacklist::load(ru_blacklist_base.0, ru_blacklist_base.1, ru_blacklist_base.2) { + Ok(ru_blacklist) => Some(ru_blacklist), + Err(e) => { + error!("Failed to load RKN: {}", e); + None + } + }; + + let cdn_list = match CdnList::load(cdn_list_base) { + Ok(cdn_list) => Some(cdn_list), + Err(e) => { + error!("Failed to load CDN: {}", e); + None + } + }; + + if let Some(geo_ip) = geo_ip { + self.geo_ip.store(Arc::new(geo_ip)); } - if let Err(e) = self.ru_blacklist.write().await.install(ru_blacklist).await { - error!("Failed to update RKN: {}", e); + if let Some(ru_blacklist) = ru_blacklist { + self.ru_blacklist.store(Arc::new(ru_blacklist)); } - if let Err(e) = self.cdn_list.write().await.install(cdn_list).await { - error!("Failed to update CDN: {}", e); + if let Some(cdn_list) = cdn_list { + self.cdn_list.store(Arc::new(cdn_list)); } self.tx.send(Some(Utc::now())).unwrap(); } pub async fn total_domains(&self) -> usize { - self.ru_blacklist.read().await.domain_count + self.ru_blacklist.load().domain_count } pub async fn total_v4s(&self) -> usize { - (self.cdn_list.read().await.v4_count() + self.ru_blacklist.read().await.v4_count()) as usize + (self.cdn_list.load().v4_count() + self.ru_blacklist.load().v4_count()) as usize } } diff --git a/querying/src/lists.rs b/querying/src/lists.rs index 299d92c..223e6a9 100644 --- a/querying/src/lists.rs +++ b/querying/src/lists.rs @@ -44,6 +44,12 @@ impl CdnList { CdnList { trie: IpnetTrie::new() } } + pub fn load(list_reader: R) -> Result { + let mut list = Self::new(); + list.update(list_reader)?; + Ok(list) + } + pub fn update(&mut self, list_reader: R) -> Result<(), Error> { let mut trie = IpnetTrie::new(); let mut rdr = csv::Reader::from_reader(list_reader); @@ -97,6 +103,12 @@ impl RuBlacklist { } } + pub fn load(ip_reader: R, domain_reader: R, custom_domains_reader: R) -> Result { + let mut list = Self::new(); + list.update(ip_reader, domain_reader, custom_domains_reader)?; + Ok(list) + } + pub fn update(&mut self, ip_reader: R, domain_reader: R, custom_domains_reader: R) -> Result<(), Error> { let mut ip_trie = IpnetTrie::new(); for net in ip_reader.lines() { diff --git a/website/Cargo.toml b/website/Cargo.toml index 16d316c..b3cefff 100644 --- a/website/Cargo.toml +++ b/website/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "website" -version = "0.4.2" +version = "0.4.3" edition = "2024" [dependencies]