From 36e6016e6783bec894fbd1a370db22d30e7bfa1c Mon Sep 17 00:00:00 2001 From: Antoine Gersant Date: Sun, 28 Oct 2018 10:19:07 -0700 Subject: [PATCH] Added endpoint for trigger reindex --- src/api.rs | 14 +++++----- src/index.rs | 67 ++++++++++++++++++++++++++++++++++++++++------- src/main.rs | 21 +++------------ src/rocket_api.rs | 10 ++++++- 4 files changed, 77 insertions(+), 35 deletions(-) diff --git a/src/api.rs b/src/api.rs index 6b0c7f4..452c673 100644 --- a/src/api.rs +++ b/src/api.rs @@ -15,8 +15,7 @@ use std::fs; use std::io; use std::ops::Deref; use std::path::*; -use std::sync::mpsc::Sender; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc}; use typemap; use url::percent_encoding::percent_decode; @@ -63,7 +62,7 @@ where Ok(secret) } -pub fn get_handler(db: &Arc, index: &Arc>>) -> Result { +pub fn get_handler(db: &Arc, index: &Arc) -> Result { let api_handler = get_endpoints(&db.clone(), &index); let mut api_chain = Chain::new(api_handler); @@ -80,7 +79,7 @@ pub fn get_handler(db: &Arc, index: &Arc>>) -> Ok(api_chain) } -fn get_endpoints(db: &Arc, index_channel: &Arc>>) -> Mount { +fn get_endpoints(db: &Arc, index_channel: &Arc) -> Mount { let mut api_handler = Mount::new(); { @@ -178,7 +177,7 @@ fn get_endpoints(db: &Arc, index_channel: &Arc> let mut reindex_router = Router::new(); reindex_router.post( "/", - move |_: &mut Request| self::trigger_index(index_channel.deref()), + move |_: &mut Request| self::trigger_index(index_channel.clone()), "trigger_index", ); @@ -603,10 +602,9 @@ fn put_preferences(request: &mut Request, db: &DB) -> IronResult { Ok(Response::with(status::Ok)) } -fn trigger_index(channel: &Mutex>) -> IronResult { - let channel = channel.lock().unwrap(); +fn trigger_index(channel: Arc) -> IronResult { let channel = channel.deref(); - if let Err(e) = channel.send(index::Command::REINDEX) { + if let Err(e) = channel.trigger_reindex() { return Err(IronError::new(e, status::InternalServerError)); }; Ok(Response::with(status::Ok)) diff --git a/src/index.rs b/src/index.rs index aadc115..cefc852 100644 --- a/src/index.rs +++ b/src/index.rs @@ -17,7 +17,7 @@ use std::time; use config::MiscSettings; #[cfg(test)] use db; -use db::ConnectionSource; +use db::{ConnectionSource, DB}; use db::{directories, misc_settings, songs}; use errors; use metadata; @@ -32,10 +32,60 @@ no_arg_sql_function!( "Represents the SQL RANDOM() function" ); -pub enum Command { +enum Command { REINDEX, } +struct CommandReceiver { + receiver: Receiver, +} + +impl CommandReceiver { + fn new(receiver: Receiver) -> CommandReceiver { + CommandReceiver{ receiver } + } +} + +pub struct CommandSender { + sender: Mutex>, +} + +impl CommandSender { + fn new(sender: Sender) -> CommandSender { + CommandSender{ sender: Mutex::new(sender) } + } + + pub fn trigger_reindex(&self) -> Result<(), errors::Error> { + let sender = self.sender.lock().unwrap(); + match sender.send(Command::REINDEX) { + Ok(_) => Ok(()), + Err(_) => bail!("Trigger reindex channel error"), + } + } +} + +pub fn init(db: Arc) -> Arc { + let (index_sender, index_receiver) = channel(); + let command_sender = Arc::new(CommandSender::new(index_sender)); + let command_receiver = CommandReceiver::new(index_receiver); + + // Start update loop + let db_ref = db.clone(); + std::thread::spawn(move || { + let db = db_ref.deref(); + update_loop(db, &command_receiver); + }); + + // Trigger auto-indexing + let db_ref = db.clone(); + let command_sender_clone = command_sender.clone(); + std::thread::spawn(move || { + self_trigger(db_ref.deref(), &command_sender_clone); + }); + + command_sender +} + #[derive(Debug, Queryable, QueryableByName, Serialize)] #[table_name = "songs"] pub struct Song { @@ -383,7 +433,7 @@ where Ok(()) } -pub fn update(db: &T) -> Result<(), errors::Error> +fn update(db: &T) -> Result<(), errors::Error> where T: ConnectionSource + VFSSource, { @@ -398,20 +448,20 @@ where Ok(()) } -pub fn update_loop(db: &T, command_buffer: &Receiver) +fn update_loop(db: &T, command_buffer: &CommandReceiver) where T: ConnectionSource + VFSSource, { loop { // Wait for a command - if let Err(e) = command_buffer.recv() { + if let Err(e) = command_buffer.receiver.recv() { error!("Error while waiting on index command buffer: {}", e); return; } // Flush the buffer to ignore spammy requests loop { - match command_buffer.try_recv() { + match command_buffer.receiver.try_recv() { Err(TryRecvError::Disconnected) => { error!("Error while flushing index command buffer"); return; @@ -428,15 +478,14 @@ where } } -pub fn self_trigger(db: &T, command_buffer: &Arc>>) +pub fn self_trigger(db: &T, command_buffer: &Arc) where T: ConnectionSource, { loop { { - let command_buffer = command_buffer.lock().unwrap(); let command_buffer = command_buffer.deref(); - if let Err(e) = command_buffer.send(Command::REINDEX) { + if let Err(e) = command_buffer.trigger_reindex() { error!("Error while writing to index command buffer: {}", e); return; } diff --git a/src/main.rs b/src/main.rs index 15f83b0..cb31d9d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -71,8 +71,7 @@ use rocket_contrib::serve::StaticFiles; use simplelog::{Level, LevelFilter, SimpleLogger, TermLogger}; use staticfile::Static; use std::path::Path; -use std::sync::mpsc::channel; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; mod api; mod config; @@ -221,27 +220,14 @@ fn run() -> Result<()> { let config = config::read(db.deref())?; // Init index - let (index_sender, index_receiver) = channel(); - let index_sender = Arc::new(Mutex::new(index_sender)); - let db_ref = db.clone(); - std::thread::spawn(move || { - let db = db_ref.deref(); - index::update_loop(db, &index_receiver); - }); - - // Trigger auto-indexing - let db_ref = db.clone(); - let sender_ref = index_sender.clone(); - std::thread::spawn(move || { - index::self_trigger(db_ref.deref(), &sender_ref); - }); + let command_sender = index::init(db.clone()); // Mount API let prefix_url = config.prefix_url.unwrap_or_else(|| "".to_string()); let api_url = format!("{}/api", &prefix_url); info!("Mounting API on {}", api_url); let mut mount = Mount::new(); - let handler = api::get_handler(&db.clone(), &index_sender)?; + let handler = api::get_handler(&db.clone(), &command_sender)?; mount.mount(&api_url, handler); // Mount static files @@ -270,6 +256,7 @@ fn run() -> Result<()> { rocket::ignite() .manage(db::DB::new(&db_path)?) + .manage(command_sender) .mount(&static_url, StaticFiles::from(web_dir_path)) .mount(&api_url, rocket_api::get_routes()) .launch(); diff --git a/src/rocket_api.rs b/src/rocket_api.rs index 1f1eb10..783acae 100644 --- a/src/rocket_api.rs +++ b/src/rocket_api.rs @@ -2,17 +2,19 @@ use rocket::http::{Cookies, Status}; use rocket::request::{self, FromRequest, Request}; use rocket::{Outcome, State}; use rocket_contrib::json::Json; +use std::sync::Arc; use config::{self, Config}; use db::DB; use errors; +use index; use user; const CURRENT_MAJOR_VERSION: i32 = 2; const CURRENT_MINOR_VERSION: i32 = 2; pub fn get_routes() -> Vec { - routes![version, initial_setup, get_settings, put_settings] + routes![version, initial_setup, get_settings, put_settings, trigger_index] } struct Auth { @@ -94,3 +96,9 @@ fn put_settings(db: State, _admin_rights: AdminRights, config: Json) config::amend::(&db, &config)?; Ok(()) } + +#[post("/trigger_index")] +fn trigger_index(command_sender: State>) -> Result<(), errors::Error> { + command_sender.trigger_reindex()?; + Ok(()) +}