mirror of
https://github.com/bytedance/g3.git
synced 2026-05-20 17:52:05 +00:00
g3keymess: drop temp key collection
This commit is contained in:
parent
e8944483fe
commit
519aba15bb
4 changed files with 34 additions and 41 deletions
|
|
@ -18,6 +18,7 @@ use std::path::{Path, PathBuf};
|
|||
|
||||
use anyhow::anyhow;
|
||||
use async_trait::async_trait;
|
||||
use log::warn;
|
||||
use openssl::pkey::{PKey, Private};
|
||||
use tokio::sync::oneshot;
|
||||
use yaml_rust::{yaml, Yaml};
|
||||
|
|
@ -95,11 +96,14 @@ impl KeyStoreConfig for LocalKeyStoreConfig {
|
|||
&self.name
|
||||
}
|
||||
|
||||
async fn load_certs(&self) -> anyhow::Result<Vec<PKey<Private>>> {
|
||||
let mut keys = Vec::with_capacity(128);
|
||||
async fn load_keys(&self) -> anyhow::Result<()> {
|
||||
const BATCH_SIZE: usize = 128;
|
||||
|
||||
let mut dir = tokio::fs::read_dir(&self.dir_path)
|
||||
.await
|
||||
.map_err(|e| anyhow!("failed to open {}: {e}", self.dir_path.display()))?;
|
||||
|
||||
let mut count = 0;
|
||||
while let Some(entry) = dir
|
||||
.next_entry()
|
||||
.await
|
||||
|
|
@ -112,11 +116,25 @@ impl KeyStoreConfig for LocalKeyStoreConfig {
|
|||
continue;
|
||||
}
|
||||
|
||||
if let Some(key) = load_key(entry.path()).await? {
|
||||
keys.push(key);
|
||||
let path = entry.path();
|
||||
match load_key(&path).await {
|
||||
Ok(Some(key)) => {
|
||||
if let Err(e) = crate::store::add_global(key) {
|
||||
warn!("failed to add key from file {}: {e}", path.display());
|
||||
}
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(e) => {
|
||||
warn!("failed to load key from file {}: {e}", path.display());
|
||||
}
|
||||
}
|
||||
|
||||
count += 1;
|
||||
if count >= BATCH_SIZE {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
Ok(keys)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ use std::path::Path;
|
|||
|
||||
use anyhow::anyhow;
|
||||
use async_trait::async_trait;
|
||||
use openssl::pkey::{PKey, Private};
|
||||
use tokio::sync::oneshot;
|
||||
use yaml_rust::{yaml, Yaml};
|
||||
|
||||
|
|
@ -36,7 +35,7 @@ const CONFIG_KEY_STORE_TYPE: &str = "type";
|
|||
#[async_trait]
|
||||
pub trait KeyStoreConfig {
|
||||
fn name(&self) -> &MetricsName;
|
||||
async fn load_certs(&self) -> anyhow::Result<Vec<PKey<Private>>>;
|
||||
async fn load_keys(&self) -> anyhow::Result<()>;
|
||||
fn spawn_subscriber(&self) -> anyhow::Result<Option<oneshot::Sender<()>>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
|
@ -72,7 +71,7 @@ pub enum AnyKeyStoreConfig {
|
|||
|
||||
impl AnyKeyStoreConfig {
|
||||
impl_transparent0!(name, &MetricsName);
|
||||
impl_async_transparent0!(load_certs, anyhow::Result<Vec<PKey<Private>>>);
|
||||
impl_async_transparent0!(load_keys, anyhow::Result<()>);
|
||||
impl_transparent0!(
|
||||
spawn_subscriber,
|
||||
anyhow::Result<Option<oneshot::Sender<()>>>
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@
|
|||
|
||||
use anyhow::anyhow;
|
||||
use async_trait::async_trait;
|
||||
use openssl::pkey::{PKey, Private};
|
||||
use url::Url;
|
||||
use yaml_rust::{yaml, Yaml};
|
||||
|
||||
|
|
@ -87,7 +86,7 @@ impl KeyStoreConfig for RedisKeyStoreConfig {
|
|||
&self.name
|
||||
}
|
||||
|
||||
async fn load_certs(&self) -> anyhow::Result<Vec<PKey<Private>>> {
|
||||
async fn load_keys(&self) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@
|
|||
*/
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use tokio::sync::Mutex;
|
||||
|
|
@ -23,7 +22,6 @@ use tokio::sync::Mutex;
|
|||
use g3_types::metrics::MetricsName;
|
||||
|
||||
use super::registry;
|
||||
use crate::config::store::AnyKeyStoreConfig;
|
||||
|
||||
static KEY_STORE_OPS_LOCK: Mutex<()> = Mutex::const_new(());
|
||||
|
||||
|
|
@ -33,7 +31,10 @@ pub async fn load_all() -> anyhow::Result<()> {
|
|||
let all_config = crate::config::store::get_all();
|
||||
for config in all_config {
|
||||
let name = config.name().clone();
|
||||
load_blocked(config.clone()).await?;
|
||||
config
|
||||
.load_keys()
|
||||
.await
|
||||
.context(format!("failed to load keys for key store {name}"))?;
|
||||
|
||||
if let Some(sender) = config
|
||||
.spawn_subscriber()
|
||||
|
|
@ -55,7 +56,10 @@ pub async fn reload_all() -> anyhow::Result<()> {
|
|||
for config in all_config {
|
||||
let name = config.name().clone();
|
||||
new_names.insert(name.clone());
|
||||
load_batched(config.clone()).await?;
|
||||
config
|
||||
.load_keys()
|
||||
.await
|
||||
.context(format!("failed to load keys for key store {name}"))?;
|
||||
|
||||
if let Some(sender) = config
|
||||
.spawn_subscriber()
|
||||
|
|
@ -73,30 +77,3 @@ pub async fn reload_all() -> anyhow::Result<()> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_blocked(config: Arc<AnyKeyStoreConfig>) -> anyhow::Result<()> {
|
||||
let keys = config.load_certs().await?;
|
||||
|
||||
for key in keys {
|
||||
super::add_global(key)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_batched(config: Arc<AnyKeyStoreConfig>) -> anyhow::Result<()> {
|
||||
const YIELD_SIZE: usize = 16;
|
||||
|
||||
let keys = config.load_certs().await?;
|
||||
|
||||
let mut next_yield = YIELD_SIZE;
|
||||
for (i, key) in keys.into_iter().enumerate() {
|
||||
super::add_global(key)?;
|
||||
if i > next_yield {
|
||||
tokio::task::yield_now().await;
|
||||
next_yield += YIELD_SIZE;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue