From 4b251dd953e37e8ce5ba4e195c5121535a114a8d Mon Sep 17 00:00:00 2001 From: Antoine Gersant Date: Tue, 4 Jul 2017 17:16:32 -0700 Subject: [PATCH] Added API endpoint to trigger collection reindex --- src/api.rs | 33 ++++++++++++++++++++---- src/index.rs | 71 +++++++++++++++++++++++++++++++++++++++------------- src/main.rs | 16 +++++++++--- 3 files changed, 94 insertions(+), 26 deletions(-) diff --git a/src/api.rs b/src/api.rs index 67d08ae..f88cca4 100644 --- a/src/api.rs +++ b/src/api.rs @@ -12,7 +12,8 @@ use std::fs; use std::io; use std::path::*; use std::ops::Deref; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::sync::mpsc::Sender; use typemap; use url::percent_encoding::percent_decode; @@ -53,8 +54,8 @@ fn get_auth_secret(db: &T) -> Result Ok(misc.auth_secret.to_owned()) } -pub fn get_handler(db: Arc) -> Result { - let api_handler = get_endpoints(db.clone()); +pub fn get_handler(db: Arc, index: Arc>>) -> Result { + let api_handler = get_endpoints(db.clone(), index); let mut api_chain = Chain::new(api_handler); let auth_secret = get_auth_secret(db.deref())?; @@ -71,7 +72,7 @@ pub fn get_handler(db: Arc) -> Result { Ok(api_chain) } -fn get_endpoints(db: Arc) -> Mount { +fn get_endpoints(db: Arc, index_channel: Arc>>) -> Mount { let mut api_handler = Mount::new(); { @@ -137,6 +138,19 @@ fn get_endpoints(db: Arc) -> Mount { auth_api_mount.mount("/settings/", settings_api_chain); } + { + let index_channel = index_channel.clone(); + let mut reindex_router = Router::new(); + reindex_router.post("/", + move |_: &mut Request| self::trigger_index(index_channel.deref()), + "trigger_index"); + + let mut reindex_api_chain = Chain::new(reindex_router); + let admin_req = AdminRequirement { db: db.clone() }; + reindex_api_chain.link_around(admin_req); + + auth_api_mount.mount("/trigger_index/", reindex_api_chain); + } let mut auth_api_chain = Chain::new(auth_api_mount); let auth = AuthRequirement { db: db.clone() }; @@ -303,7 +317,7 @@ fn auth(request: &mut Request, db: &DB) -> IronResult { _ => return Err(Error::from(ErrorKind::MissingPassword).into()), }; } - + if !user::auth(db, username.as_str(), password.as_str())? { return Err(Error::from(ErrorKind::IncorrectCredentials).into()); } @@ -450,3 +464,12 @@ fn put_config(request: &mut Request, db: &DB) -> IronResult { config::amend(db, &config)?; Ok(Response::with(status::Ok)) } + +fn trigger_index(channel: &Mutex>) -> IronResult { + let channel = channel.lock().unwrap(); + let channel = channel.deref(); + if let Err(e) = channel.send(index::Command::REINDEX) { + return Err(IronError::new(e, status::InternalServerError)); + }; + Ok(Response::with(status::Ok)) +} diff --git a/src/index.rs b/src/index.rs index 16272b6..f785d98 100644 --- a/src/index.rs +++ b/src/index.rs @@ -7,7 +7,8 @@ use diesel::types; use regex::Regex; use std::fs; use std::path::{Path, PathBuf}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; +use std::sync::mpsc::*; use std::thread; use std::time; @@ -22,6 +23,10 @@ use metadata; const INDEX_BUILDING_INSERT_BUFFER_SIZE: usize = 1000; // Insertions in each transaction const INDEX_BUILDING_CLEAN_BUFFER_SIZE: usize = 500; // Insertions in each transaction +pub enum Command { + REINDEX, +} + #[derive(Debug, Queryable, Serialize)] pub struct Song { #[serde(skip_serializing)] @@ -384,31 +389,63 @@ pub fn update(db: &T) -> Result<()> Ok(()) } -pub fn update_loop(db: &T) +pub fn update_loop(db: &T, command_buffer: Receiver) where T: ConnectionSource + VFSSource { loop { + // Wait for a command + if let Err(e) = command_buffer.recv() { + println!("Error while waiting on index command buffer: {}", e); + return; + } + + // Flush the buffer to ignore spammy requests + loop { + match command_buffer.try_recv() { + Err(TryRecvError::Disconnected) => { + println!("Error while flushing index command buffer"); + return; + } + Err(TryRecvError::Empty) => break, + Ok(_) => (), + } + } + + // Do the update if let Err(e) = update(db) { println!("Error while updating index: {}", e); } + } +} + +pub fn self_trigger(db: &T, command_buffer: Arc>>) + where T: ConnectionSource +{ + loop { { - let sleep_duration; - { - let connection = db.get_connection(); - let connection = connection.lock().unwrap(); - let connection = connection.deref(); - let settings: Result = misc_settings::table - .get_result(connection) - .map_err(|e| e.into()); - if let Err(ref e) = settings { - println!("Could not retrieve index sleep duration: {}", e); - } - sleep_duration = settings - .map(|s| s.index_sleep_duration_seconds) - .unwrap_or(1800); + let command_buffer = command_buffer.lock().unwrap(); + let command_buffer = command_buffer.deref(); + if let Err(e) = command_buffer.send(Command::REINDEX) { + println!("Error while writing to index command buffer: {}", e); + return; } - thread::sleep(time::Duration::from_secs(sleep_duration as u64)); } + let sleep_duration; + { + let connection = db.get_connection(); + let connection = connection.lock().unwrap(); + let connection = connection.deref(); + let settings: Result = misc_settings::table + .get_result(connection) + .map_err(|e| e.into()); + if let Err(ref e) = settings { + println!("Could not retrieve index sleep duration: {}", e); + } + sleep_duration = settings + .map(|s| s.index_sleep_duration_seconds) + .unwrap_or(1800); + } + thread::sleep(time::Duration::from_secs(sleep_duration as u64)); } } diff --git a/src/main.rs b/src/main.rs index 130ef76..d3434fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -57,7 +57,8 @@ use iron::prelude::*; use mount::Mount; use staticfile::Static; use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::sync::mpsc::channel; mod api; mod config; @@ -123,17 +124,24 @@ fn run() -> Result<()> { config::overwrite(db.deref(), &config)?; } - // Begin indexing + // Init index + let (index_sender, index_receiver) = channel(); + let index_sender = Arc::new(Mutex::new(index_sender)); let db_ref = db.clone(); std::thread::spawn(move || { let db = db_ref.deref(); - index::update_loop(db); + index::update_loop(db, index_receiver); }); + // Trigger auto-indexing + let db_ref = db.clone(); + let sender_ref = index_sender.clone(); + std::thread::spawn(move || { index::self_trigger(db_ref.deref(), sender_ref); }); + // Mount API println!("Mounting API"); let mut mount = Mount::new(); - let handler = api::get_handler(db.clone())?; + let handler = api::get_handler(db.clone(), index_sender)?; mount.mount("/api/", handler); // Mount static files