Promot partial collection index during initial scan
This commit is contained in:
parent
1a8bf91628
commit
053b684f3a
6 changed files with 90 additions and 48 deletions
|
@ -38,15 +38,27 @@ impl Manager {
|
|||
index: Arc::default(),
|
||||
};
|
||||
|
||||
if let Err(e) = index_manager.try_restore_index().await {
|
||||
error!("Failed to restore index: {}", e);
|
||||
} else {
|
||||
info!("Restored collection index from disk");
|
||||
}
|
||||
match index_manager.try_restore_index().await {
|
||||
Ok(true) => info!("Restored collection index from disk"),
|
||||
Ok(false) => info!("No existing collection index to restore"),
|
||||
Err(e) => error!("Failed to restore collection index: {}", e),
|
||||
};
|
||||
|
||||
Ok(index_manager)
|
||||
}
|
||||
|
||||
pub async fn is_index_empty(&self) -> bool {
|
||||
spawn_blocking({
|
||||
let index_manager = self.clone();
|
||||
move || {
|
||||
let index = index_manager.index.read().unwrap();
|
||||
index.collection.num_songs() == 0
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub async fn replace_index(&mut self, new_index: Index) {
|
||||
spawn_blocking({
|
||||
let index_manager = self.clone();
|
||||
|
@ -72,11 +84,8 @@ impl Manager {
|
|||
|
||||
async fn try_restore_index(&mut self) -> Result<bool, Error> {
|
||||
match tokio::fs::try_exists(&self.index_file_path).await {
|
||||
Ok(false) => {
|
||||
info!("No existing index to restore");
|
||||
return Ok(false);
|
||||
}
|
||||
Ok(true) => (),
|
||||
Ok(false) => return Ok(false),
|
||||
Err(e) => return Err(Error::Io(self.index_file_path.clone(), e)),
|
||||
};
|
||||
|
||||
|
@ -316,6 +325,7 @@ impl Default for Index {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Builder {
|
||||
strings: Rodeo,
|
||||
canon: HashMap<String, Spur>,
|
||||
|
|
|
@ -130,7 +130,7 @@ impl Browser {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Builder {
|
||||
directories: HashMap<PathKey, BTreeSet<storage::File>>,
|
||||
flattened: TrieBuilder<lasso2::Spur>,
|
||||
|
|
|
@ -252,6 +252,10 @@ impl Collection {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn num_songs(&self) -> usize {
|
||||
self.songs.len()
|
||||
}
|
||||
|
||||
pub fn get_song(&self, strings: &RodeoReader, song_key: SongKey) -> Option<Song> {
|
||||
self.songs.get(&song_key).map(|s| fetch_song(strings, s))
|
||||
}
|
||||
|
@ -297,7 +301,7 @@ fn make_genre_header(genre: &storage::Genre, strings: &RodeoReader) -> GenreHead
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Builder {
|
||||
artists: HashMap<ArtistKey, storage::Artist>,
|
||||
albums: HashMap<AlbumKey, storage::Album>,
|
||||
|
|
|
@ -175,7 +175,7 @@ impl Search {
|
|||
|
||||
const NGRAM_SIZE: usize = 2;
|
||||
|
||||
#[derive(Default, Deserialize, Serialize)]
|
||||
#[derive(Clone, Default, Deserialize, Serialize)]
|
||||
struct TextFieldIndex {
|
||||
exact: HashMap<Spur, IntSet<SongKey>>,
|
||||
ngrams: HashMap<[char; NGRAM_SIZE], IntMap<SongKey, Spur>>,
|
||||
|
@ -239,7 +239,7 @@ impl TextFieldIndex {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Deserialize, Serialize)]
|
||||
#[derive(Clone, Default, Deserialize, Serialize)]
|
||||
struct NumberFieldIndex {
|
||||
values: BTreeMap<i64, IntSet<SongKey>>,
|
||||
}
|
||||
|
@ -266,7 +266,7 @@ impl NumberFieldIndex {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Builder {
|
||||
text_fields: EnumMap<TextField, TextFieldIndex>,
|
||||
number_fields: EnumMap<NumberField, NumberFieldIndex>,
|
||||
|
|
|
@ -16,7 +16,7 @@ pub enum File {
|
|||
Song(PathKey),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct Genre {
|
||||
pub name: Spur,
|
||||
pub albums: HashSet<AlbumKey>,
|
||||
|
@ -25,7 +25,7 @@ pub struct Genre {
|
|||
pub songs: Vec<SongKey>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct Artist {
|
||||
pub name: Spur,
|
||||
pub all_albums: HashSet<AlbumKey>,
|
||||
|
|
|
@ -4,10 +4,9 @@ use regex::Regex;
|
|||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::mpsc::{channel, Sender, TryRecvError};
|
||||
use std::sync::Arc;
|
||||
use std::{cmp::min, time::Duration};
|
||||
use tokio::sync::mpsc::error::TryRecvError;
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
|
||||
use tokio::sync::Notify;
|
||||
use tokio::time::Instant;
|
||||
|
||||
|
@ -102,14 +101,17 @@ impl Scanner {
|
|||
let start = Instant::now();
|
||||
info!("Beginning collection scan");
|
||||
|
||||
let was_empty = self.index_manager.is_index_empty().await;
|
||||
let mut partial_update_time = Instant::now();
|
||||
|
||||
let album_art_pattern = self
|
||||
.settings_manager
|
||||
.get_index_album_art_pattern()
|
||||
.await
|
||||
.ok();
|
||||
|
||||
let (scan_directories_output, mut collection_directories_input) = unbounded_channel();
|
||||
let (scan_songs_output, mut collection_songs_input) = unbounded_channel();
|
||||
let (scan_directories_output, collection_directories_input) = channel();
|
||||
let (scan_songs_output, collection_songs_input) = channel();
|
||||
|
||||
let vfs = self.vfs_manager.get_vfs().await?;
|
||||
|
||||
|
@ -120,6 +122,28 @@ impl Scanner {
|
|||
album_art_pattern,
|
||||
);
|
||||
|
||||
let scan_task = tokio::task::spawn_blocking(|| scan.run());
|
||||
|
||||
let partial_index_notify = Arc::new(tokio::sync::Notify::new());
|
||||
let partial_index_mutex = Arc::new(tokio::sync::Mutex::new(index::Builder::default()));
|
||||
let partial_application_task = tokio::task::spawn({
|
||||
let index_manager = self.index_manager.clone();
|
||||
let partial_index_notify = partial_index_notify.clone();
|
||||
let partial_index_mutex = partial_index_mutex.clone();
|
||||
async move {
|
||||
loop {
|
||||
partial_index_notify.notified().await;
|
||||
let mut partial_index = partial_index_mutex.clone().lock_owned().await;
|
||||
let partial_index =
|
||||
std::mem::replace(&mut *partial_index, index::Builder::new());
|
||||
let partial_index = partial_index.build();
|
||||
let num_songs = partial_index.collection.num_songs();
|
||||
index_manager.clone().replace_index(partial_index).await;
|
||||
info!("Promoted partial collection index ({num_songs} songs)");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let index_task = tokio::task::spawn_blocking(move || {
|
||||
let mut index_builder = index::Builder::default();
|
||||
|
||||
|
@ -148,12 +172,22 @@ impl Scanner {
|
|||
if exhausted_directories && exhausted_songs {
|
||||
break;
|
||||
}
|
||||
|
||||
if was_empty && partial_update_time.elapsed().as_secs() > 5 {
|
||||
if let Ok(mut m) = partial_index_mutex.clone().try_lock_owned() {
|
||||
*m = index_builder.clone();
|
||||
partial_index_notify.notify_one();
|
||||
partial_update_time = Instant::now()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
index_builder.build()
|
||||
});
|
||||
|
||||
let index = tokio::join!(scan.start(), index_task).1?;
|
||||
let index = tokio::join!(scan_task, index_task).1?;
|
||||
partial_application_task.abort();
|
||||
|
||||
self.index_manager.persist_index(&index).await?;
|
||||
self.index_manager.replace_index(index).await;
|
||||
|
||||
|
@ -167,16 +201,16 @@ impl Scanner {
|
|||
}
|
||||
|
||||
struct Scan {
|
||||
directories_output: UnboundedSender<Directory>,
|
||||
songs_output: UnboundedSender<Song>,
|
||||
directories_output: Sender<Directory>,
|
||||
songs_output: Sender<Song>,
|
||||
mounts: Vec<vfs::Mount>,
|
||||
artwork_regex: Option<Regex>,
|
||||
}
|
||||
|
||||
impl Scan {
|
||||
pub fn new(
|
||||
directories_output: UnboundedSender<Directory>,
|
||||
songs_output: UnboundedSender<Song>,
|
||||
directories_output: Sender<Directory>,
|
||||
songs_output: Sender<Song>,
|
||||
mounts: Vec<vfs::Mount>,
|
||||
artwork_regex: Option<Regex>,
|
||||
) -> Self {
|
||||
|
@ -188,7 +222,7 @@ impl Scan {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn start(self) -> Result<(), Error> {
|
||||
pub fn run(self) -> Result<(), Error> {
|
||||
let key = "POLARIS_NUM_TRAVERSER_THREADS";
|
||||
let num_threads = std::env::var_os(key)
|
||||
.map(|v| v.to_string_lossy().to_string())
|
||||
|
@ -226,8 +260,8 @@ fn process_directory<P: AsRef<Path>, Q: AsRef<Path>>(
|
|||
scope: &Scope,
|
||||
real_path: P,
|
||||
virtual_path: Q,
|
||||
directories_output: UnboundedSender<Directory>,
|
||||
songs_output: UnboundedSender<Song>,
|
||||
directories_output: Sender<Directory>,
|
||||
songs_output: Sender<Song>,
|
||||
artwork_regex: Option<Regex>,
|
||||
) {
|
||||
let read_dir = match fs::read_dir(&real_path) {
|
||||
|
@ -329,14 +363,14 @@ fn get_date_created<P: AsRef<Path>>(path: P) -> Option<i64> {
|
|||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::{path::PathBuf, usize};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn scan_finds_songs_and_directories() {
|
||||
let (directories_sender, mut directories_receiver) = unbounded_channel();
|
||||
let (songs_sender, mut songs_receiver) = unbounded_channel();
|
||||
let (directories_sender, directories_receiver) = channel();
|
||||
let (songs_sender, songs_receiver) = channel();
|
||||
let mounts = vec![vfs::Mount {
|
||||
source: PathBuf::from_iter(["test-data", "small-collection"]),
|
||||
name: "root".to_string(),
|
||||
|
@ -344,23 +378,19 @@ mod test {
|
|||
let artwork_regex = None;
|
||||
|
||||
let scan = Scan::new(directories_sender, songs_sender, mounts, artwork_regex);
|
||||
scan.start().await.unwrap();
|
||||
scan.run().unwrap();
|
||||
|
||||
let mut directories = vec![];
|
||||
directories_receiver
|
||||
.recv_many(&mut directories, usize::MAX)
|
||||
.await;
|
||||
let directories = directories_receiver.iter().collect::<Vec<_>>();
|
||||
assert_eq!(directories.len(), 6);
|
||||
|
||||
let mut songs = vec![];
|
||||
songs_receiver.recv_many(&mut songs, usize::MAX).await;
|
||||
let songs = songs_receiver.iter().collect::<Vec<_>>();
|
||||
assert_eq!(songs.len(), 13);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn scan_finds_embedded_artwork() {
|
||||
let (directories_sender, _) = unbounded_channel();
|
||||
let (songs_sender, mut songs_receiver) = unbounded_channel();
|
||||
let (directories_sender, _) = channel();
|
||||
let (songs_sender, songs_receiver) = channel();
|
||||
let mounts = vec![vfs::Mount {
|
||||
source: PathBuf::from_iter(["test-data", "small-collection"]),
|
||||
name: "root".to_string(),
|
||||
|
@ -368,10 +398,9 @@ mod test {
|
|||
let artwork_regex = None;
|
||||
|
||||
let scan = Scan::new(directories_sender, songs_sender, mounts, artwork_regex);
|
||||
scan.start().await.unwrap();
|
||||
scan.run().unwrap();
|
||||
|
||||
let mut songs = vec![];
|
||||
songs_receiver.recv_many(&mut songs, usize::MAX).await;
|
||||
let songs = songs_receiver.iter().collect::<Vec<_>>();
|
||||
|
||||
songs
|
||||
.iter()
|
||||
|
@ -383,8 +412,8 @@ mod test {
|
|||
let artwork_path = PathBuf::from_iter(["root", "Khemmis", "Hunted", "Folder.jpg"]);
|
||||
let patterns = vec!["folder", "FOLDER"];
|
||||
for pattern in patterns.into_iter() {
|
||||
let (directories_sender, _) = unbounded_channel();
|
||||
let (songs_sender, mut songs_receiver) = unbounded_channel();
|
||||
let (directories_sender, _) = channel();
|
||||
let (songs_sender, songs_receiver) = channel();
|
||||
let mounts = vec![vfs::Mount {
|
||||
source: PathBuf::from_iter(["test-data", "small-collection"]),
|
||||
name: "root".to_string(),
|
||||
|
@ -392,10 +421,9 @@ mod test {
|
|||
let artwork_regex = Some(Regex::new(pattern).unwrap());
|
||||
|
||||
let scan = Scan::new(directories_sender, songs_sender, mounts, artwork_regex);
|
||||
scan.start().await.unwrap();
|
||||
scan.run().unwrap();
|
||||
|
||||
let mut songs = vec![];
|
||||
songs_receiver.recv_many(&mut songs, usize::MAX).await;
|
||||
let songs = songs_receiver.iter().collect::<Vec<_>>();
|
||||
|
||||
songs
|
||||
.iter()
|
||||
|
|
Loading…
Add table
Reference in a new issue