From 519aba15bbbbef6e7565e59bfb794b1c494703ef Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Tue, 12 Dec 2023 16:07:16 +0800 Subject: [PATCH] g3keymess: drop temp key collection --- g3keymess/src/config/store/local.rs | 28 +++++++++++++++++---- g3keymess/src/config/store/mod.rs | 5 ++-- g3keymess/src/config/store/redis.rs | 3 +-- g3keymess/src/store/ops.rs | 39 ++++++----------------------- 4 files changed, 34 insertions(+), 41 deletions(-) diff --git a/g3keymess/src/config/store/local.rs b/g3keymess/src/config/store/local.rs index a7357e8d..e3064a2a 100644 --- a/g3keymess/src/config/store/local.rs +++ b/g3keymess/src/config/store/local.rs @@ -18,6 +18,7 @@ use std::path::{Path, PathBuf}; use anyhow::anyhow; use async_trait::async_trait; +use log::warn; use openssl::pkey::{PKey, Private}; use tokio::sync::oneshot; use yaml_rust::{yaml, Yaml}; @@ -95,11 +96,14 @@ impl KeyStoreConfig for LocalKeyStoreConfig { &self.name } - async fn load_certs(&self) -> anyhow::Result>> { - let mut keys = Vec::with_capacity(128); + async fn load_keys(&self) -> anyhow::Result<()> { + const BATCH_SIZE: usize = 128; + let mut dir = tokio::fs::read_dir(&self.dir_path) .await .map_err(|e| anyhow!("failed to open {}: {e}", self.dir_path.display()))?; + + let mut count = 0; while let Some(entry) = dir .next_entry() .await @@ -112,11 +116,25 @@ impl KeyStoreConfig for LocalKeyStoreConfig { continue; } - if let Some(key) = load_key(entry.path()).await? { - keys.push(key); + let path = entry.path(); + match load_key(&path).await { + Ok(Some(key)) => { + if let Err(e) = crate::store::add_global(key) { + warn!("failed to add key from file {}: {e}", path.display()); + } + } + Ok(None) => {} + Err(e) => { + warn!("failed to load key from file {}: {e}", path.display()); + } + } + + count += 1; + if count >= BATCH_SIZE { + tokio::task::yield_now().await; } } - Ok(keys) + Ok(()) } #[cfg(target_os = "linux")] diff --git a/g3keymess/src/config/store/mod.rs b/g3keymess/src/config/store/mod.rs index 198f93e1..5880f74f 100644 --- a/g3keymess/src/config/store/mod.rs +++ b/g3keymess/src/config/store/mod.rs @@ -18,7 +18,6 @@ use std::path::Path; use anyhow::anyhow; use async_trait::async_trait; -use openssl::pkey::{PKey, Private}; use tokio::sync::oneshot; use yaml_rust::{yaml, Yaml}; @@ -36,7 +35,7 @@ const CONFIG_KEY_STORE_TYPE: &str = "type"; #[async_trait] pub trait KeyStoreConfig { fn name(&self) -> &MetricsName; - async fn load_certs(&self) -> anyhow::Result>>; + async fn load_keys(&self) -> anyhow::Result<()>; fn spawn_subscriber(&self) -> anyhow::Result>> { Ok(None) } @@ -72,7 +71,7 @@ pub enum AnyKeyStoreConfig { impl AnyKeyStoreConfig { impl_transparent0!(name, &MetricsName); - impl_async_transparent0!(load_certs, anyhow::Result>>); + impl_async_transparent0!(load_keys, anyhow::Result<()>); impl_transparent0!( spawn_subscriber, anyhow::Result>> diff --git a/g3keymess/src/config/store/redis.rs b/g3keymess/src/config/store/redis.rs index 22cefc53..28339f82 100644 --- a/g3keymess/src/config/store/redis.rs +++ b/g3keymess/src/config/store/redis.rs @@ -16,7 +16,6 @@ use anyhow::anyhow; use async_trait::async_trait; -use openssl::pkey::{PKey, Private}; use url::Url; use yaml_rust::{yaml, Yaml}; @@ -87,7 +86,7 @@ impl KeyStoreConfig for RedisKeyStoreConfig { &self.name } - async fn load_certs(&self) -> anyhow::Result>> { + async fn load_keys(&self) -> anyhow::Result<()> { unimplemented!() } } diff --git a/g3keymess/src/store/ops.rs b/g3keymess/src/store/ops.rs index a551b0c7..bd81aebe 100644 --- a/g3keymess/src/store/ops.rs +++ b/g3keymess/src/store/ops.rs @@ -15,7 +15,6 @@ */ use std::collections::HashSet; -use std::sync::Arc; use anyhow::Context; use tokio::sync::Mutex; @@ -23,7 +22,6 @@ use tokio::sync::Mutex; use g3_types::metrics::MetricsName; use super::registry; -use crate::config::store::AnyKeyStoreConfig; static KEY_STORE_OPS_LOCK: Mutex<()> = Mutex::const_new(()); @@ -33,7 +31,10 @@ pub async fn load_all() -> anyhow::Result<()> { let all_config = crate::config::store::get_all(); for config in all_config { let name = config.name().clone(); - load_blocked(config.clone()).await?; + config + .load_keys() + .await + .context(format!("failed to load keys for key store {name}"))?; if let Some(sender) = config .spawn_subscriber() @@ -55,7 +56,10 @@ pub async fn reload_all() -> anyhow::Result<()> { for config in all_config { let name = config.name().clone(); new_names.insert(name.clone()); - load_batched(config.clone()).await?; + config + .load_keys() + .await + .context(format!("failed to load keys for key store {name}"))?; if let Some(sender) = config .spawn_subscriber() @@ -73,30 +77,3 @@ pub async fn reload_all() -> anyhow::Result<()> { Ok(()) } - -async fn load_blocked(config: Arc) -> anyhow::Result<()> { - let keys = config.load_certs().await?; - - for key in keys { - super::add_global(key)?; - } - - Ok(()) -} - -async fn load_batched(config: Arc) -> anyhow::Result<()> { - const YIELD_SIZE: usize = 16; - - let keys = config.load_certs().await?; - - let mut next_yield = YIELD_SIZE; - for (i, key) in keys.into_iter().enumerate() { - super::add_global(key)?; - if i > next_yield { - tokio::task::yield_now().await; - next_yield += YIELD_SIZE; - } - } - - Ok(()) -}