Added endpoint for trigger reindex

This commit is contained in:
Antoine Gersant 2018-10-28 10:19:07 -07:00
parent 00968f0a4f
commit 36e6016e67
4 changed files with 77 additions and 35 deletions

View file

@ -15,8 +15,7 @@ use std::fs;
use std::io; use std::io;
use std::ops::Deref; use std::ops::Deref;
use std::path::*; use std::path::*;
use std::sync::mpsc::Sender; use std::sync::{Arc};
use std::sync::{Arc, Mutex};
use typemap; use typemap;
use url::percent_encoding::percent_decode; use url::percent_encoding::percent_decode;
@ -63,7 +62,7 @@ where
Ok(secret) Ok(secret)
} }
pub fn get_handler(db: &Arc<DB>, index: &Arc<Mutex<Sender<index::Command>>>) -> Result<Chain> { pub fn get_handler(db: &Arc<DB>, index: &Arc<index::CommandSender>) -> Result<Chain> {
let api_handler = get_endpoints(&db.clone(), &index); let api_handler = get_endpoints(&db.clone(), &index);
let mut api_chain = Chain::new(api_handler); let mut api_chain = Chain::new(api_handler);
@ -80,7 +79,7 @@ pub fn get_handler(db: &Arc<DB>, index: &Arc<Mutex<Sender<index::Command>>>) ->
Ok(api_chain) Ok(api_chain)
} }
fn get_endpoints(db: &Arc<DB>, index_channel: &Arc<Mutex<Sender<index::Command>>>) -> Mount { fn get_endpoints(db: &Arc<DB>, index_channel: &Arc<index::CommandSender>) -> Mount {
let mut api_handler = Mount::new(); let mut api_handler = Mount::new();
{ {
@ -178,7 +177,7 @@ fn get_endpoints(db: &Arc<DB>, index_channel: &Arc<Mutex<Sender<index::Command>>
let mut reindex_router = Router::new(); let mut reindex_router = Router::new();
reindex_router.post( reindex_router.post(
"/", "/",
move |_: &mut Request| self::trigger_index(index_channel.deref()), move |_: &mut Request| self::trigger_index(index_channel.clone()),
"trigger_index", "trigger_index",
); );
@ -603,10 +602,9 @@ fn put_preferences(request: &mut Request, db: &DB) -> IronResult<Response> {
Ok(Response::with(status::Ok)) Ok(Response::with(status::Ok))
} }
fn trigger_index(channel: &Mutex<Sender<index::Command>>) -> IronResult<Response> { fn trigger_index(channel: Arc<index::CommandSender>) -> IronResult<Response> {
let channel = channel.lock().unwrap();
let channel = channel.deref(); let channel = channel.deref();
if let Err(e) = channel.send(index::Command::REINDEX) { if let Err(e) = channel.trigger_reindex() {
return Err(IronError::new(e, status::InternalServerError)); return Err(IronError::new(e, status::InternalServerError));
}; };
Ok(Response::with(status::Ok)) Ok(Response::with(status::Ok))

View file

@ -17,7 +17,7 @@ use std::time;
use config::MiscSettings; use config::MiscSettings;
#[cfg(test)] #[cfg(test)]
use db; use db;
use db::ConnectionSource; use db::{ConnectionSource, DB};
use db::{directories, misc_settings, songs}; use db::{directories, misc_settings, songs};
use errors; use errors;
use metadata; use metadata;
@ -32,10 +32,60 @@ no_arg_sql_function!(
"Represents the SQL RANDOM() function" "Represents the SQL RANDOM() function"
); );
pub enum Command { enum Command {
REINDEX, REINDEX,
} }
struct CommandReceiver {
receiver: Receiver<Command>,
}
impl CommandReceiver {
fn new(receiver: Receiver<Command>) -> CommandReceiver {
CommandReceiver{ receiver }
}
}
pub struct CommandSender {
sender: Mutex<Sender<Command>>,
}
impl CommandSender {
fn new(sender: Sender<Command>) -> CommandSender {
CommandSender{ sender: Mutex::new(sender) }
}
pub fn trigger_reindex(&self) -> Result<(), errors::Error> {
let sender = self.sender.lock().unwrap();
match sender.send(Command::REINDEX) {
Ok(_) => Ok(()),
Err(_) => bail!("Trigger reindex channel error"),
}
}
}
pub fn init(db: Arc<DB>) -> Arc<CommandSender> {
let (index_sender, index_receiver) = channel();
let command_sender = Arc::new(CommandSender::new(index_sender));
let command_receiver = CommandReceiver::new(index_receiver);
// Start update loop
let db_ref = db.clone();
std::thread::spawn(move || {
let db = db_ref.deref();
update_loop(db, &command_receiver);
});
// Trigger auto-indexing
let db_ref = db.clone();
let command_sender_clone = command_sender.clone();
std::thread::spawn(move || {
self_trigger(db_ref.deref(), &command_sender_clone);
});
command_sender
}
#[derive(Debug, Queryable, QueryableByName, Serialize)] #[derive(Debug, Queryable, QueryableByName, Serialize)]
#[table_name = "songs"] #[table_name = "songs"]
pub struct Song { pub struct Song {
@ -383,7 +433,7 @@ where
Ok(()) Ok(())
} }
pub fn update<T>(db: &T) -> Result<(), errors::Error> fn update<T>(db: &T) -> Result<(), errors::Error>
where where
T: ConnectionSource + VFSSource, T: ConnectionSource + VFSSource,
{ {
@ -398,20 +448,20 @@ where
Ok(()) Ok(())
} }
pub fn update_loop<T>(db: &T, command_buffer: &Receiver<Command>) fn update_loop<T>(db: &T, command_buffer: &CommandReceiver)
where where
T: ConnectionSource + VFSSource, T: ConnectionSource + VFSSource,
{ {
loop { loop {
// Wait for a command // Wait for a command
if let Err(e) = command_buffer.recv() { if let Err(e) = command_buffer.receiver.recv() {
error!("Error while waiting on index command buffer: {}", e); error!("Error while waiting on index command buffer: {}", e);
return; return;
} }
// Flush the buffer to ignore spammy requests // Flush the buffer to ignore spammy requests
loop { loop {
match command_buffer.try_recv() { match command_buffer.receiver.try_recv() {
Err(TryRecvError::Disconnected) => { Err(TryRecvError::Disconnected) => {
error!("Error while flushing index command buffer"); error!("Error while flushing index command buffer");
return; return;
@ -428,15 +478,14 @@ where
} }
} }
pub fn self_trigger<T>(db: &T, command_buffer: &Arc<Mutex<Sender<Command>>>) pub fn self_trigger<T>(db: &T, command_buffer: &Arc<CommandSender>)
where where
T: ConnectionSource, T: ConnectionSource,
{ {
loop { loop {
{ {
let command_buffer = command_buffer.lock().unwrap();
let command_buffer = command_buffer.deref(); let command_buffer = command_buffer.deref();
if let Err(e) = command_buffer.send(Command::REINDEX) { if let Err(e) = command_buffer.trigger_reindex() {
error!("Error while writing to index command buffer: {}", e); error!("Error while writing to index command buffer: {}", e);
return; return;
} }

View file

@ -71,8 +71,7 @@ use rocket_contrib::serve::StaticFiles;
use simplelog::{Level, LevelFilter, SimpleLogger, TermLogger}; use simplelog::{Level, LevelFilter, SimpleLogger, TermLogger};
use staticfile::Static; use staticfile::Static;
use std::path::Path; use std::path::Path;
use std::sync::mpsc::channel; use std::sync::Arc;
use std::sync::{Arc, Mutex};
mod api; mod api;
mod config; mod config;
@ -221,27 +220,14 @@ fn run() -> Result<()> {
let config = config::read(db.deref())?; let config = config::read(db.deref())?;
// Init index // Init index
let (index_sender, index_receiver) = channel(); let command_sender = index::init(db.clone());
let index_sender = Arc::new(Mutex::new(index_sender));
let db_ref = db.clone();
std::thread::spawn(move || {
let db = db_ref.deref();
index::update_loop(db, &index_receiver);
});
// Trigger auto-indexing
let db_ref = db.clone();
let sender_ref = index_sender.clone();
std::thread::spawn(move || {
index::self_trigger(db_ref.deref(), &sender_ref);
});
// Mount API // Mount API
let prefix_url = config.prefix_url.unwrap_or_else(|| "".to_string()); let prefix_url = config.prefix_url.unwrap_or_else(|| "".to_string());
let api_url = format!("{}/api", &prefix_url); let api_url = format!("{}/api", &prefix_url);
info!("Mounting API on {}", api_url); info!("Mounting API on {}", api_url);
let mut mount = Mount::new(); let mut mount = Mount::new();
let handler = api::get_handler(&db.clone(), &index_sender)?; let handler = api::get_handler(&db.clone(), &command_sender)?;
mount.mount(&api_url, handler); mount.mount(&api_url, handler);
// Mount static files // Mount static files
@ -270,6 +256,7 @@ fn run() -> Result<()> {
rocket::ignite() rocket::ignite()
.manage(db::DB::new(&db_path)?) .manage(db::DB::new(&db_path)?)
.manage(command_sender)
.mount(&static_url, StaticFiles::from(web_dir_path)) .mount(&static_url, StaticFiles::from(web_dir_path))
.mount(&api_url, rocket_api::get_routes()) .mount(&api_url, rocket_api::get_routes())
.launch(); .launch();

View file

@ -2,17 +2,19 @@ use rocket::http::{Cookies, Status};
use rocket::request::{self, FromRequest, Request}; use rocket::request::{self, FromRequest, Request};
use rocket::{Outcome, State}; use rocket::{Outcome, State};
use rocket_contrib::json::Json; use rocket_contrib::json::Json;
use std::sync::Arc;
use config::{self, Config}; use config::{self, Config};
use db::DB; use db::DB;
use errors; use errors;
use index;
use user; use user;
const CURRENT_MAJOR_VERSION: i32 = 2; const CURRENT_MAJOR_VERSION: i32 = 2;
const CURRENT_MINOR_VERSION: i32 = 2; const CURRENT_MINOR_VERSION: i32 = 2;
pub fn get_routes() -> Vec<rocket::Route> { pub fn get_routes() -> Vec<rocket::Route> {
routes![version, initial_setup, get_settings, put_settings] routes![version, initial_setup, get_settings, put_settings, trigger_index]
} }
struct Auth { struct Auth {
@ -94,3 +96,9 @@ fn put_settings(db: State<DB>, _admin_rights: AdminRights, config: Json<Config>)
config::amend::<DB>(&db, &config)?; config::amend::<DB>(&db, &config)?;
Ok(()) Ok(())
} }
#[post("/trigger_index")]
fn trigger_index(command_sender: State<Arc<index::CommandSender>>) -> Result<(), errors::Error> {
command_sender.trigger_reindex()?;
Ok(())
}