use log::{error, info}; use notify::{RecommendedWatcher, Watcher}; use notify_debouncer_full::{Debouncer, FileIdMap}; use rayon::{Scope, ThreadPoolBuilder}; use regex::Regex; use std::fs; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::mpsc::{channel, Sender, TryRecvError}; use std::sync::Arc; use std::time::SystemTime; use std::{cmp::min, time::Duration}; use tokio::sync::mpsc::unbounded_channel; use tokio::sync::{Notify, RwLock}; use tokio::task::JoinSet; use tokio::time::Instant; use crate::app::{config, formats, index, Error}; #[derive(Debug, PartialEq, Eq)] pub struct Directory { pub virtual_path: PathBuf, } #[derive(Debug, Default, PartialEq, Eq)] pub struct Song { pub real_path: PathBuf, pub virtual_path: PathBuf, pub track_number: Option, pub disc_number: Option, pub title: Option, pub artists: Vec, pub album_artists: Vec, pub year: Option, pub album: Option, pub artwork: Option, pub duration: Option, pub lyricists: Vec, pub composers: Vec, pub genres: Vec, pub labels: Vec, pub date_added: i64, } #[derive(Clone, Default)] pub enum State { #[default] Initial, Pending, InProgress, UpToDate, } #[derive(Clone)] struct Parameters { artwork_regex: Option, mount_dirs: Vec, } impl PartialEq for Parameters { fn eq(&self, other: &Self) -> bool { self.artwork_regex.as_ref().map(|r| r.as_str()) == other.artwork_regex.as_ref().map(|r| r.as_str()) && self.mount_dirs == other.mount_dirs } } #[derive(Clone, Default)] pub struct Status { pub state: State, pub last_start_time: Option, pub last_end_time: Option, pub num_songs_indexed: u32, } #[derive(Clone)] pub struct Scanner { index_manager: index::Manager, config_manager: config::Manager, file_watcher: Arc>>>, on_file_change: Arc, pending_scan: Arc, status: Arc>, parameters: Arc>>, } impl Scanner { pub async fn new( index_manager: index::Manager, config_manager: config::Manager, ) -> Result { let scanner = Self { index_manager, config_manager: config_manager.clone(), file_watcher: Arc::default(), on_file_change: Arc::default(), pending_scan: Arc::new(Notify::new()), status: Arc::new(RwLock::new(Status::default())), parameters: Arc::default(), }; let abort_scan = Arc::new(Notify::new()); tokio::spawn({ let scanner = scanner.clone(); let abort_scan = abort_scan.clone(); async move { loop { scanner.wait_for_change().await; abort_scan.notify_waiters(); scanner.status.write().await.state = State::Pending; while tokio::time::timeout(Duration::from_secs(2), scanner.wait_for_change()) .await .is_ok() {} scanner.pending_scan.notify_waiters(); } } }); tokio::spawn({ let scanner = scanner.clone(); async move { loop { scanner.pending_scan.notified().await; tokio::select! { result = scanner.run_scan() => { if let Err(e) = result { error!("Error while updating index: {e}"); } } _ = abort_scan.notified() => { info!("Interrupted index update"); } }; } } }); Ok(scanner) } async fn setup_file_watcher( config_manager: &config::Manager, on_file_changed: Arc, ) -> Result, Error> { let mut debouncer = notify_debouncer_full::new_debouncer(Duration::from_millis(100), None, move |_| { on_file_changed.notify_waiters(); })?; let mount_dirs = config_manager.get_mounts().await; for mount_dir in &mount_dirs { if let Err(e) = debouncer .watcher() .watch(&mount_dir.source, notify::RecursiveMode::Recursive) { error!("Failed to setup file watcher for `{mount_dir:#?}`: {e}"); } } Ok(debouncer) } async fn wait_for_change(&self) { tokio::select! { _ = async { loop { self.config_manager.on_config_change().await; if *self.parameters.read().await == Some(self.read_parameters().await) { continue; } break; } } => {}, _ = self.on_file_change.notified() => {}, } } async fn read_parameters(&self) -> Parameters { let album_art_pattern = self.config_manager.get_index_album_art_pattern().await; let artwork_regex = Regex::new(&format!("(?i){}", &album_art_pattern)).ok(); Parameters { artwork_regex, mount_dirs: self.config_manager.get_mounts().await, } } pub async fn get_status(&self) -> Status { self.status.read().await.clone() } pub fn queue_scan(&self) { self.pending_scan.notify_one(); } pub fn try_trigger_scan(&self) { self.pending_scan.notify_waiters(); } pub async fn run_scan(&self) -> Result<(), Error> { info!("Beginning collection scan"); let start = Instant::now(); { let mut status = self.status.write().await; status.last_start_time = Some(SystemTime::now()); status.state = State::InProgress; status.num_songs_indexed = 0; } let was_empty = self.index_manager.is_index_empty().await; let mut partial_update_time = Instant::now(); let new_parameters = self.read_parameters().await; *self.parameters.write().await = Some(new_parameters.clone()); let (scan_directories_output, collection_directories_input) = channel(); let (scan_songs_output, collection_songs_input) = channel(); let scan = Scan::new(scan_directories_output, scan_songs_output, new_parameters); let mut scan_task_set = JoinSet::new(); let mut index_task_set = JoinSet::new(); let mut watch_task_set = JoinSet::>::new(); let mut secondary_task_set = JoinSet::new(); scan_task_set.spawn_blocking(|| scan.run()); watch_task_set.spawn({ let scanner = self.clone(); let config_manager = self.config_manager.clone(); async move { let mut watcher = scanner.file_watcher.write().await; *watcher = None; // Drops previous watcher *watcher = Some( Self::setup_file_watcher(&config_manager, scanner.on_file_change.clone()) .await?, ); Ok(()) } }); let partial_index_notify = Arc::new(tokio::sync::Notify::new()); let partial_index_mutex = Arc::new(tokio::sync::Mutex::new(index::Builder::default())); secondary_task_set.spawn({ let index_manager = self.index_manager.clone(); let partial_index_notify = partial_index_notify.clone(); let partial_index_mutex = partial_index_mutex.clone(); async move { loop { partial_index_notify.notified().await; let mut partial_index = partial_index_mutex.clone().lock_owned().await; let partial_index = std::mem::replace(&mut *partial_index, index::Builder::new()); let partial_index = partial_index.build(); let num_songs = partial_index.collection.num_songs(); index_manager.clone().replace_index(partial_index).await; info!("Promoted partial collection index ({num_songs} songs)"); } } }); let (status_sender, mut status_receiver) = unbounded_channel(); secondary_task_set.spawn({ let manager = self.clone(); async move { loop { match status_receiver.recv().await { Some(n) => { manager.status.write().await.num_songs_indexed = n; } None => break, } } } }); index_task_set.spawn_blocking(move || { let mut index_builder = index::Builder::default(); let mut num_songs_scanned = 0; loop { let exhausted_songs = match collection_songs_input.try_recv() { Ok(song) => { index_builder.add_song(song); num_songs_scanned += 1; status_sender.send(num_songs_scanned).ok(); false } Err(TryRecvError::Empty) => { std::thread::sleep(Duration::from_millis(1)); false } Err(TryRecvError::Disconnected) => true, }; let exhausted_directories = match collection_directories_input.try_recv() { Ok(directory) => { index_builder.add_directory(directory); false } Err(TryRecvError::Empty) => false, Err(TryRecvError::Disconnected) => true, }; if exhausted_directories && exhausted_songs { break; } if was_empty && partial_update_time.elapsed().as_secs() > 5 { if let Ok(mut m) = partial_index_mutex.clone().try_lock_owned() { *m = index_builder.clone(); partial_index_notify.notify_one(); partial_update_time = Instant::now() } } } index_builder.build() }); scan_task_set.join_next().await.unwrap()??; watch_task_set.join_next().await.unwrap()??; let index = index_task_set.join_next().await.unwrap()?; secondary_task_set.abort_all(); self.index_manager.persist_index(&index).await?; self.index_manager.replace_index(index).await; { let mut status = self.status.write().await; status.state = State::UpToDate; status.last_end_time = Some(SystemTime::now()); } info!( "Collection scan took {} seconds", start.elapsed().as_millis() as f32 / 1000.0 ); Ok(()) } } struct Scan { directories_output: Sender, songs_output: Sender, parameters: Parameters, } impl Scan { pub fn new( directories_output: Sender, songs_output: Sender, parameters: Parameters, ) -> Self { Self { directories_output, songs_output, parameters, } } pub fn run(self) -> Result<(), Error> { let key = "POLARIS_NUM_TRAVERSER_THREADS"; let num_threads = std::env::var_os(key) .map(|v| v.to_string_lossy().to_string()) .and_then(|v| usize::from_str(&v).ok()) .unwrap_or_else(|| min(num_cpus::get(), 8)); info!("Browsing collection using {} threads", num_threads); let directories_output = self.directories_output.clone(); let songs_output = self.songs_output.clone(); let artwork_regex = self.parameters.artwork_regex.clone(); let thread_pool = ThreadPoolBuilder::new().num_threads(num_threads).build()?; thread_pool.scope({ |scope| { for mount in self.parameters.mount_dirs { scope.spawn(|scope| { process_directory( scope, mount.source, mount.name, directories_output.clone(), songs_output.clone(), artwork_regex.clone(), ); }); } } }); Ok(()) } } fn process_directory, Q: AsRef>( scope: &Scope, real_path: P, virtual_path: Q, directories_output: Sender, songs_output: Sender, artwork_regex: Option, ) { let read_dir = match fs::read_dir(&real_path) { Ok(read_dir) => read_dir, Err(e) => { error!( "Directory read error for `{}`: {}", real_path.as_ref().display(), e ); return; } }; let mut songs = vec![]; let mut artwork_file = None; for entry in read_dir { let entry = match entry { Ok(e) => e, Err(e) => { error!( "File read error within `{}`: {}", real_path.as_ref().display(), e ); continue; } }; let is_dir = match entry.file_type().map(|f| f.is_dir()) { Ok(d) => d, Err(e) => { error!( "Could not determine file type for `{}`: {}", entry.path().to_string_lossy(), e ); continue; } }; let name = entry.file_name(); let entry_real_path = real_path.as_ref().join(&name); let entry_virtual_path = virtual_path.as_ref().join(&name); if is_dir { scope.spawn({ let directories_output = directories_output.clone(); let songs_output = songs_output.clone(); let artwork_regex = artwork_regex.clone(); |scope| { process_directory( scope, entry_real_path, entry_virtual_path, directories_output, songs_output, artwork_regex, ); } }); } else if let Some(metadata) = formats::read_metadata(&entry_real_path) { songs.push(Song { real_path: entry_real_path.clone(), virtual_path: entry_virtual_path.clone(), track_number: metadata.track_number.map(|n| n as i64), disc_number: metadata.disc_number.map(|n| n as i64), title: metadata.title, artists: metadata.artists, album_artists: metadata.album_artists, year: metadata.year.map(|n| n as i64), album: metadata.album, artwork: metadata.has_artwork.then(|| entry_virtual_path.clone()), duration: metadata.duration.map(|n| n as i64), lyricists: metadata.lyricists, composers: metadata.composers, genres: metadata.genres, labels: metadata.labels, date_added: get_date_created(&entry_real_path).unwrap_or_default(), }); } else if artwork_file.is_none() && artwork_regex .as_ref() .is_some_and(|r| r.is_match(name.to_str().unwrap_or_default())) { artwork_file = Some(entry_virtual_path); } } for mut song in songs { song.artwork = song.artwork.or_else(|| artwork_file.clone()); songs_output.send(song).ok(); } directories_output .send(Directory { virtual_path: virtual_path.as_ref().to_owned(), }) .ok(); } fn get_date_created>(path: P) -> Option { if let Ok(t) = fs::metadata(path).and_then(|m| m.created().or_else(|_| m.modified())) { t.duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_secs() as i64) .ok() } else { None } } #[cfg(test)] mod test { use std::path::PathBuf; use crate::app::test::{self}; use crate::test_name; use super::*; #[tokio::test] async fn scan_finds_songs_and_directories() { let (directories_sender, directories_receiver) = channel(); let (songs_sender, songs_receiver) = channel(); let parameters = Parameters { artwork_regex: None, mount_dirs: vec![config::MountDir { source: ["test-data", "small-collection"].iter().collect(), name: "root".to_owned(), }], }; let scan = Scan::new(directories_sender, songs_sender, parameters); scan.run().unwrap(); let directories = directories_receiver.iter().collect::>(); assert_eq!(directories.len(), 6); let songs = songs_receiver.iter().collect::>(); assert_eq!(songs.len(), 13); } #[tokio::test] async fn scan_finds_embedded_artwork() { let (directories_sender, _) = channel(); let (songs_sender, songs_receiver) = channel(); let parameters = Parameters { artwork_regex: None, mount_dirs: vec![config::MountDir { source: ["test-data", "small-collection"].iter().collect(), name: "root".to_owned(), }], }; let scan = Scan::new(directories_sender, songs_sender, parameters); scan.run().unwrap(); let songs = songs_receiver.iter().collect::>(); songs .iter() .any(|s| s.artwork.as_ref() == Some(&s.virtual_path)); } #[tokio::test] async fn album_art_pattern_is_case_insensitive() { let artwork_path = PathBuf::from_iter(["root", "Khemmis", "Hunted", "Folder.jpg"]); let patterns = vec!["folder", "FOLDER"]; for pattern in patterns.into_iter() { let (directories_sender, _) = channel(); let (songs_sender, songs_receiver) = channel(); let parameters = Parameters { artwork_regex: Some(Regex::new(pattern).unwrap()), mount_dirs: vec![config::MountDir { source: ["test-data", "small-collection"].iter().collect(), name: "root".to_owned(), }], }; let scan = Scan::new(directories_sender, songs_sender, parameters); scan.run().unwrap(); let songs = songs_receiver.iter().collect::>(); songs .iter() .any(|s| s.artwork.as_ref() == Some(&artwork_path)); } } #[tokio::test] async fn scanner_reacts_to_config_changes() { let ctx = test::ContextBuilder::new(test_name!()).build().await; assert!(ctx.index_manager.is_index_empty().await); ctx.config_manager .set_mounts(vec![config::storage::MountDir { source: ["test-data", "small-collection"].iter().collect(), name: "root".to_owned(), }]) .await .unwrap(); tokio::time::timeout(Duration::from_secs(10), async { loop { tokio::time::sleep(Duration::from_millis(100)).await; if !ctx.index_manager.is_index_empty().await { break; } } }) .await .expect("Index did not populate"); } }