Indexing perf work

This commit is contained in:
Antoine Gersant 2024-07-31 01:43:13 -07:00
parent 72ec7b260a
commit b4b0e1181f
5 changed files with 110 additions and 38 deletions

1
.gitignore vendored
View file

@ -16,6 +16,7 @@ TestConfig.toml
**/*.sqlite-wal **/*.sqlite-wal
polaris.log polaris.log
polaris.pid polaris.pid
profile.json
/thumbnails /thumbnails
# Release process artifacts (usually runs on CI) # Release process artifacts (usually runs on CI)

View file

@ -2,20 +2,20 @@ use std::{
borrow::BorrowMut, borrow::BorrowMut,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
hash::{DefaultHasher, Hash, Hasher}, hash::{DefaultHasher, Hash, Hasher},
sync::Arc, sync::{Arc, RwLock},
}; };
use log::{error, info}; use log::{error, info};
use rand::{rngs::ThreadRng, seq::IteratorRandom}; use rand::{rngs::ThreadRng, seq::IteratorRandom};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::RwLock; use tokio::task::spawn_blocking;
use crate::{app::collection, db::DB}; use crate::{app::collection, db::DB};
#[derive(Clone)] #[derive(Clone)]
pub struct IndexManager { pub struct IndexManager {
db: DB, db: DB,
index: Arc<RwLock<Index>>, index: Arc<RwLock<Index>>, // Not a tokio RwLock as we want to do CPU-bound work with Index
} }
impl IndexManager { impl IndexManager {
@ -31,8 +31,15 @@ impl IndexManager {
} }
pub(super) async fn replace_index(&mut self, new_index: Index) { pub(super) async fn replace_index(&mut self, new_index: Index) {
let mut lock = self.index.write().await; spawn_blocking({
*lock = new_index; let index_manager = self.clone();
move || {
let mut lock = index_manager.index.write().unwrap();
*lock = new_index;
}
})
.await
.unwrap()
} }
pub(super) async fn persist_index(&mut self, index: &Index) -> Result<(), collection::Error> { pub(super) async fn persist_index(&mut self, index: &Index) -> Result<(), collection::Error> {
@ -66,53 +73,93 @@ impl IndexManager {
Ok(true) Ok(true)
} }
pub(super) async fn get_songs(&self) -> Vec<collection::Song> {
spawn_blocking({
let index_manager = self.clone();
move || {
let index = index_manager.index.read().unwrap();
index.songs.values().cloned().collect::<Vec<_>>()
}
})
.await
.unwrap()
}
pub async fn get_artist( pub async fn get_artist(
&self, &self,
artist_key: &ArtistKey, artist_key: &ArtistKey,
) -> Result<collection::Artist, collection::Error> { ) -> Result<collection::Artist, collection::Error> {
let index = self.index.read().await; spawn_blocking({
let artist_id = artist_key.into(); let index_manager = self.clone();
index let artist_id = artist_key.into();
.get_artist(artist_id) move || {
.ok_or_else(|| collection::Error::ArtistNotFound) let index = index_manager.index.read().unwrap();
index
.get_artist(artist_id)
.ok_or_else(|| collection::Error::ArtistNotFound)
}
})
.await
.unwrap()
} }
pub async fn get_album( pub async fn get_album(
&self, &self,
album_key: &AlbumKey, album_key: &AlbumKey,
) -> Result<collection::Album, collection::Error> { ) -> Result<collection::Album, collection::Error> {
let index = self.index.read().await; spawn_blocking({
let album_id = album_key.into(); let index_manager = self.clone();
index let album_id = album_key.into();
.get_album(album_id) move || {
.ok_or_else(|| collection::Error::AlbumNotFound) let index = index_manager.index.read().unwrap();
index
.get_album(album_id)
.ok_or_else(|| collection::Error::AlbumNotFound)
}
})
.await
.unwrap()
} }
pub async fn get_random_albums( pub async fn get_random_albums(
&self, &self,
count: usize, count: usize,
) -> Result<Vec<collection::Album>, collection::Error> { ) -> Result<Vec<collection::Album>, collection::Error> {
let index = self.index.read().await; spawn_blocking({
Ok(index let index_manager = self.clone();
.albums move || {
.keys() let index = index_manager.index.read().unwrap();
.choose_multiple(&mut ThreadRng::default(), count) Ok(index
.into_iter() .albums
.filter_map(|k| index.get_album(*k)) .keys()
.collect()) .choose_multiple(&mut ThreadRng::default(), count)
.into_iter()
.filter_map(|k| index.get_album(*k))
.collect())
}
})
.await
.unwrap()
} }
pub async fn get_recent_albums( pub async fn get_recent_albums(
&self, &self,
count: usize, count: usize,
) -> Result<Vec<collection::Album>, collection::Error> { ) -> Result<Vec<collection::Album>, collection::Error> {
let index = self.index.read().await; spawn_blocking({
Ok(index let index_manager = self.clone();
.recent_albums move || {
.iter() let index = index_manager.index.read().unwrap();
.take(count) Ok(index
.filter_map(|k| index.get_album(*k)) .recent_albums
.collect()) .iter()
.take(count)
.filter_map(|k| index.get_album(*k))
.collect())
}
})
.await
.unwrap()
} }
} }

View file

@ -64,6 +64,9 @@ where
} }
pub async fn flush(&mut self) { pub async fn flush(&mut self) {
if self.new_entries.is_empty() {
return;
}
let Ok(connection) = self.db.connect().await else { let Ok(connection) = self.db.connect().await else {
error!("Could not acquire connection to insert new entries in database"); error!("Could not acquire connection to insert new entries in database");
return; return;

View file

@ -43,7 +43,7 @@ impl Scanner {
let num_threads = std::env::var_os(key) let num_threads = std::env::var_os(key)
.map(|v| v.to_string_lossy().to_string()) .map(|v| v.to_string_lossy().to_string())
.and_then(|v| usize::from_str(&v).ok()) .and_then(|v| usize::from_str(&v).ok())
.unwrap_or_else(|| min(num_cpus::get(), 4)); .unwrap_or_else(|| min(num_cpus::get(), 8));
info!("Browsing collection using {} threads", num_threads); info!("Browsing collection using {} threads", num_threads);
let directories_output = self.directories_output.clone(); let directories_output = self.directories_output.clone();

View file

@ -78,9 +78,6 @@ impl Updater {
let start = Instant::now(); let start = Instant::now();
info!("Beginning collection scan"); info!("Beginning collection scan");
let cleaner = Cleaner::new(self.db.clone(), self.vfs_manager.clone());
cleaner.clean().await?;
let album_art_pattern = self let album_art_pattern = self
.settings_manager .settings_manager
.get_index_album_art_pattern() .get_index_album_art_pattern()
@ -97,7 +94,6 @@ impl Updater {
album_art_pattern, album_art_pattern,
); );
let mut song_inserter = Inserter::<Song>::new(self.db.clone());
let mut directory_inserter = Inserter::<Directory>::new(self.db.clone()); let mut directory_inserter = Inserter::<Directory>::new(self.db.clone());
let directory_task = tokio::spawn(async move { let directory_task = tokio::spawn(async move {
@ -132,13 +128,11 @@ impl Updater {
0 => break, 0 => break,
_ => { _ => {
for song in buffer.drain(0..) { for song in buffer.drain(0..) {
index_builder.add_song(song.clone()); index_builder.add_song(song);
song_inserter.insert(song).await;
} }
} }
} }
} }
song_inserter.flush().await;
index_builder.build() index_builder.build()
}); });
@ -151,6 +145,33 @@ impl Updater {
start.elapsed().as_millis() as f32 / 1000.0 start.elapsed().as_millis() as f32 / 1000.0
); );
let start = Instant::now();
info!("Beginning collection DB update");
tokio::task::spawn({
let db = self.db.clone();
let vfs_manager = self.vfs_manager.clone();
let index_manager = self.index_manager.clone();
async move {
let cleaner = Cleaner::new(db.clone(), vfs_manager);
if let Err(e) = cleaner.clean().await {
error!("Error while cleaning up database: {}", e);
}
let mut song_inserter = Inserter::<Song>::new(db.clone());
for song in index_manager.get_songs().await {
song_inserter.insert(song).await;
}
song_inserter.flush().await;
}
})
.await?;
info!(
"Collection DB update took {} seconds",
start.elapsed().as_millis() as f32 / 1000.0
);
Ok(()) Ok(())
} }
} }