diff --git a/src/index/mod.rs b/src/index/mod.rs index 565b89e..dcd146b 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -1,148 +1,114 @@ use anyhow::*; -use core::ops::Deref; use diesel; use diesel::prelude::*; #[cfg(feature = "profile-index")] use flame; -use log::{error, info}; -use std::path::Path; -use std::sync::mpsc::*; -use std::sync::{Arc, Mutex}; -use std::thread; +use log::error; +use std::sync::{Arc, Mutex, Condvar}; use std::time; +use crate::db::{misc_settings, DB}; use crate::config::MiscSettings; -use crate::db::{directories, misc_settings, songs, DB}; use crate::vfs::VFS; -mod populate; mod query; #[cfg(test)] mod test; mod types; +mod update; -pub use self::populate::*; +pub use self::update::*; pub use self::query::*; pub use self::types::*; -enum Command { - REINDEX, - EXIT, -} - -struct CommandReceiver { - receiver: Receiver, -} - -impl CommandReceiver { - fn new(receiver: Receiver) -> CommandReceiver { - CommandReceiver { receiver } +pub fn builder(db: DB) -> IndexBuilder { + IndexBuilder { + db: db, + periodic_updates: true, } } -pub struct CommandSender { - sender: Mutex>, +pub struct IndexBuilder { + db: DB, + periodic_updates: bool, } -impl CommandSender { - fn new(sender: Sender) -> CommandSender { - CommandSender { - sender: Mutex::new(sender), - } +impl IndexBuilder { + pub fn periodic_updates(mut self, enabled: bool) -> IndexBuilder { + self.periodic_updates = enabled; + self } - pub fn trigger_reindex(&self) -> Result<()> { - let sender = self.sender.lock().unwrap(); - match sender.send(Command::REINDEX) { - Ok(_) => Ok(()), - Err(_) => bail!("Trigger reindex channel error"), - } - } - - #[allow(dead_code)] - pub fn exit(&self) -> Result<()> { - let sender = self.sender.lock().unwrap(); - match sender.send(Command::EXIT) { - Ok(_) => Ok(()), - Err(_) => bail!("Index exit channel error"), - } - } -} - -pub fn init(db: DB) -> Arc { - let (index_sender, index_receiver) = channel(); - let command_sender = Arc::new(CommandSender::new(index_sender)); - let command_receiver = CommandReceiver::new(index_receiver); - - // Start update loop - std::thread::spawn(move || { - update_loop(&db, &command_receiver); - }); - - command_sender -} - -pub fn update(db: &DB) -> Result<()> { - let start = time::Instant::now(); - info!("Beginning library index update"); - clean(db)?; - populate(db)?; - info!( - "Library index update took {} seconds", - start.elapsed().as_secs() - ); - #[cfg(feature = "profile-index")] - flame::dump_html(&mut fs::File::create("index-flame-graph.html").unwrap()).unwrap(); - Ok(()) -} - -fn update_loop(db: &DB, command_buffer: &CommandReceiver) { - loop { - // Wait for a command - if command_buffer.receiver.recv().is_err() { - return; - } - - // Flush the buffer to ignore spammy requests - loop { - match command_buffer.receiver.try_recv() { - Err(TryRecvError::Disconnected) => return, - Ok(Command::EXIT) => return, - Err(TryRecvError::Empty) => break, - Ok(_) => (), - } - } - - // Do the update - if let Err(e) = update(db) { - error!("Error while updating index: {}", e); - } - } -} - -pub fn self_trigger(db: &DB, command_buffer: &Arc) { - loop { - { - let command_buffer = command_buffer.deref(); - if let Err(e) = command_buffer.trigger_reindex() { - error!("Error while writing to index command buffer: {}", e); - return; - } - } - let sleep_duration = { - let connection = db.connect(); - connection - .and_then(|c| { - misc_settings::table - .get_result(&c) - .map_err(|e| Error::new(e)) - }) - .map(|s: MiscSettings| s.index_sleep_duration_seconds) - .unwrap_or_else(|e| { - error!("Could not retrieve index sleep duration: {}", e); - 1800 - }) + pub fn build(self) -> Index { + let index = Index { + pending_reindex: Arc::new((Mutex::new(false), Condvar::new())), + db: self.db.clone(), }; - thread::sleep(time::Duration::from_secs(sleep_duration as u64)); + + let commands_index = index.clone(); + std::thread::spawn(move || { + commands_index.process_commands(); + }); + + if self.periodic_updates { + let auto_index = index.clone(); + std::thread::spawn(move || { + auto_index.automatic_reindex(); + }); + } + + index + } +} + +#[derive(Clone)] +pub struct Index { + db: DB, + pending_reindex: Arc<(Mutex, Condvar)>, +} + +impl Index { + pub fn trigger_reindex(&self) { + let (lock, cvar) = &*self.pending_reindex; + let mut pending_reindex = lock.lock().unwrap(); + *pending_reindex = true; + cvar.notify_one(); + } + + fn process_commands(&self) { + loop { + { + let (lock, cvar) = &*self.pending_reindex; + let mut pending = lock.lock().unwrap(); + while !*pending { + pending = cvar.wait(pending).unwrap(); + } + *pending = false; + } + if let Err(e) = update(&self.db) { + error!("Error while updating index: {}", e); + } + } + } + + fn automatic_reindex(&self) { + loop { + self.trigger_reindex(); + let sleep_duration = { + let connection = self.db.connect(); + connection + .and_then(|c| { + misc_settings::table + .get_result(&c) + .map_err(|e| Error::new(e)) + }) + .map(|s: MiscSettings| s.index_sleep_duration_seconds) + .unwrap_or_else(|e| { + error!("Could not retrieve index sleep duration: {}", e); + 1800 + }) + }; + std::thread::sleep(time::Duration::from_secs(sleep_duration as u64)); + } } } diff --git a/src/index/test.rs b/src/index/test.rs index 285149c..a347af9 100644 --- a/src/index/test.rs +++ b/src/index/test.rs @@ -1,7 +1,9 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; + use crate::db; +use crate::db::{directories, songs}; use crate::index::*; #[test] diff --git a/src/index/populate.rs b/src/index/update.rs similarity index 95% rename from src/index/populate.rs rename to src/index/update.rs index 64e58bd..3ba30f6 100644 --- a/src/index/populate.rs +++ b/src/index/update.rs @@ -3,7 +3,7 @@ use diesel; use diesel::prelude::*; #[cfg(feature = "profile-index")] use flame; -use log::{error}; +use log::{error, info}; use regex::Regex; use std::fs; use std::path::Path; @@ -17,6 +17,20 @@ use crate::vfs::VFSSource; const INDEX_BUILDING_INSERT_BUFFER_SIZE: usize = 1000; // Insertions in each transaction const INDEX_BUILDING_CLEAN_BUFFER_SIZE: usize = 500; // Insertions in each transaction +pub fn update(db: &DB) -> Result<()> { + let start = time::Instant::now(); + info!("Beginning library index update"); + clean(db)?; + populate(db)?; + info!( + "Library index update took {} seconds", + start.elapsed().as_secs() + ); + #[cfg(feature = "profile-index")] + flame::dump_html(&mut fs::File::create("index-flame-graph.html").unwrap()).unwrap(); + Ok(()) +} + #[derive(Debug, Insertable)] #[table_name = "songs"] struct NewSong { diff --git a/src/main.rs b/src/main.rs index 3208f81..5548708 100644 --- a/src/main.rs +++ b/src/main.rs @@ -174,14 +174,9 @@ fn main() -> Result<()> { // Init index info!("Initializing index"); - let command_sender = index::init(db.clone()); - - // Trigger auto-indexing - let db_auto_index = db.clone(); - let command_sender_auto_index = command_sender.clone(); - std::thread::spawn(move || { - index::self_trigger(&db_auto_index, &command_sender_auto_index); - }); + let index = index::builder(db.clone()) + .periodic_updates(true) + .build(); // API mount target let prefix_url = config.prefix_url.unwrap_or_else(|| "".to_string()); @@ -228,7 +223,7 @@ fn main() -> Result<()> { swagger_url, swagger_dir_path, db_server, - command_sender, + index, ); }); diff --git a/src/service/rocket/api.rs b/src/service/rocket/api.rs index 19d227d..9e474e6 100644 --- a/src/service/rocket/api.rs +++ b/src/service/rocket/api.rs @@ -9,13 +9,13 @@ use std::ops::Deref; use std::path::PathBuf; use std::str; use std::str::FromStr; -use std::sync::Arc; use time::Duration; use super::serve; use crate::config::{self, Config, Preferences}; use crate::db::DB; use crate::index; +use crate::index::Index; use crate::lastfm; use crate::playlist; use crate::service::constants::*; @@ -231,10 +231,10 @@ fn put_preferences(db: State<'_, DB>, auth: Auth, preferences: Json #[post("/trigger_index")] fn trigger_index( - command_sender: State<'_, Arc>, + index: State<'_, Index>, _admin_rights: AdminRights, ) -> Result<()> { - command_sender.trigger_reindex()?; + index.trigger_reindex(); Ok(()) } diff --git a/src/service/rocket/server.rs b/src/service/rocket/server.rs index e02f23f..1951028 100644 --- a/src/service/rocket/server.rs +++ b/src/service/rocket/server.rs @@ -3,11 +3,10 @@ use rocket; use rocket::config::{Environment, LoggingLevel}; use rocket_contrib::serve::StaticFiles; use std::path::PathBuf; -use std::sync::Arc; use super::api; use crate::db::DB; -use crate::index::CommandSender; +use crate::index::Index; pub fn get_server( port: u16, @@ -18,7 +17,7 @@ pub fn get_server( swagger_url: &str, swagger_dir_path: &PathBuf, db: DB, - command_sender: Arc, + command_sender: Index, ) -> Result { let mut config = rocket::Config::build(Environment::Production) .log_level(LoggingLevel::Normal) @@ -55,7 +54,7 @@ pub fn run( swagger_url: String, swagger_dir_path: PathBuf, db: DB, - command_sender: Arc, + command_sender: Index, ) -> Result<()> { let server = get_server( port, diff --git a/src/service/rocket/test.rs b/src/service/rocket/test.rs index 18ff43a..3ae787b 100644 --- a/src/service/rocket/test.rs +++ b/src/service/rocket/test.rs @@ -5,9 +5,8 @@ use rocket::local::Client; use serde::de::DeserializeOwned; use serde::Serialize; use std::fs; -use std::ops::{Deref, DerefMut}; +use std::ops::DerefMut; use std::path::PathBuf; -use std::sync::Arc; use super::server; use crate::db::DB; @@ -49,7 +48,6 @@ impl<'r, 's> RocketResponse<'r, 's> { pub struct RocketTestService { client: Client, - command_sender: Arc, } pub type ServiceType = RocketTestService; @@ -67,7 +65,7 @@ impl TestService for RocketTestService { let web_dir_path = PathBuf::from("web"); let mut swagger_dir_path = PathBuf::from("docs"); swagger_dir_path.push("swagger"); - let command_sender = index::init(db.clone()); + let index = index::builder(db.clone()).periodic_updates(false).build(); let auth_secret: [u8; 32] = [0; 32]; @@ -79,14 +77,13 @@ impl TestService for RocketTestService { &web_dir_path, "/swagger", &swagger_dir_path, - db.clone(), - command_sender.clone(), + db, + index, ) .unwrap(); let client = Client::new(server).unwrap(); RocketTestService { client, - command_sender, } } @@ -156,9 +153,3 @@ impl TestService for RocketTestService { .to_void() } } - -impl Drop for RocketTestService { - fn drop(&mut self) { - self.command_sender.deref().exit().unwrap(); - } -}