Added API endpoint to trigger collection reindex

This commit is contained in:
Antoine Gersant 2017-07-04 17:16:32 -07:00
parent 5b62b7d048
commit 4b251dd953
3 changed files with 94 additions and 26 deletions

View file

@ -12,7 +12,8 @@ use std::fs;
use std::io; use std::io;
use std::path::*; use std::path::*;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::sync::mpsc::Sender;
use typemap; use typemap;
use url::percent_encoding::percent_decode; use url::percent_encoding::percent_decode;
@ -53,8 +54,8 @@ fn get_auth_secret<T>(db: &T) -> Result<String>
Ok(misc.auth_secret.to_owned()) Ok(misc.auth_secret.to_owned())
} }
pub fn get_handler(db: Arc<DB>) -> Result<Chain> { pub fn get_handler(db: Arc<DB>, index: Arc<Mutex<Sender<index::Command>>>) -> Result<Chain> {
let api_handler = get_endpoints(db.clone()); let api_handler = get_endpoints(db.clone(), index);
let mut api_chain = Chain::new(api_handler); let mut api_chain = Chain::new(api_handler);
let auth_secret = get_auth_secret(db.deref())?; let auth_secret = get_auth_secret(db.deref())?;
@ -71,7 +72,7 @@ pub fn get_handler(db: Arc<DB>) -> Result<Chain> {
Ok(api_chain) Ok(api_chain)
} }
fn get_endpoints(db: Arc<DB>) -> Mount { fn get_endpoints(db: Arc<DB>, index_channel: Arc<Mutex<Sender<index::Command>>>) -> Mount {
let mut api_handler = Mount::new(); let mut api_handler = Mount::new();
{ {
@ -137,6 +138,19 @@ fn get_endpoints(db: Arc<DB>) -> Mount {
auth_api_mount.mount("/settings/", settings_api_chain); auth_api_mount.mount("/settings/", settings_api_chain);
} }
{
let index_channel = index_channel.clone();
let mut reindex_router = Router::new();
reindex_router.post("/",
move |_: &mut Request| self::trigger_index(index_channel.deref()),
"trigger_index");
let mut reindex_api_chain = Chain::new(reindex_router);
let admin_req = AdminRequirement { db: db.clone() };
reindex_api_chain.link_around(admin_req);
auth_api_mount.mount("/trigger_index/", reindex_api_chain);
}
let mut auth_api_chain = Chain::new(auth_api_mount); let mut auth_api_chain = Chain::new(auth_api_mount);
let auth = AuthRequirement { db: db.clone() }; let auth = AuthRequirement { db: db.clone() };
@ -450,3 +464,12 @@ fn put_config(request: &mut Request, db: &DB) -> IronResult<Response> {
config::amend(db, &config)?; config::amend(db, &config)?;
Ok(Response::with(status::Ok)) Ok(Response::with(status::Ok))
} }
fn trigger_index(channel: &Mutex<Sender<index::Command>>) -> IronResult<Response> {
let channel = channel.lock().unwrap();
let channel = channel.deref();
if let Err(e) = channel.send(index::Command::REINDEX) {
return Err(IronError::new(e, status::InternalServerError));
};
Ok(Response::with(status::Ok))
}

View file

@ -7,7 +7,8 @@ use diesel::types;
use regex::Regex; use regex::Regex;
use std::fs; use std::fs;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Mutex; use std::sync::{Arc, Mutex};
use std::sync::mpsc::*;
use std::thread; use std::thread;
use std::time; use std::time;
@ -22,6 +23,10 @@ use metadata;
const INDEX_BUILDING_INSERT_BUFFER_SIZE: usize = 1000; // Insertions in each transaction const INDEX_BUILDING_INSERT_BUFFER_SIZE: usize = 1000; // Insertions in each transaction
const INDEX_BUILDING_CLEAN_BUFFER_SIZE: usize = 500; // Insertions in each transaction const INDEX_BUILDING_CLEAN_BUFFER_SIZE: usize = 500; // Insertions in each transaction
pub enum Command {
REINDEX,
}
#[derive(Debug, Queryable, Serialize)] #[derive(Debug, Queryable, Serialize)]
pub struct Song { pub struct Song {
#[serde(skip_serializing)] #[serde(skip_serializing)]
@ -384,31 +389,63 @@ pub fn update<T>(db: &T) -> Result<()>
Ok(()) Ok(())
} }
pub fn update_loop<T>(db: &T) pub fn update_loop<T>(db: &T, command_buffer: Receiver<Command>)
where T: ConnectionSource + VFSSource where T: ConnectionSource + VFSSource
{ {
loop { loop {
// Wait for a command
if let Err(e) = command_buffer.recv() {
println!("Error while waiting on index command buffer: {}", e);
return;
}
// Flush the buffer to ignore spammy requests
loop {
match command_buffer.try_recv() {
Err(TryRecvError::Disconnected) => {
println!("Error while flushing index command buffer");
return;
}
Err(TryRecvError::Empty) => break,
Ok(_) => (),
}
}
// Do the update
if let Err(e) = update(db) { if let Err(e) = update(db) {
println!("Error while updating index: {}", e); println!("Error while updating index: {}", e);
} }
}
}
pub fn self_trigger<T>(db: &T, command_buffer: Arc<Mutex<Sender<Command>>>)
where T: ConnectionSource
{
loop {
{ {
let sleep_duration; let command_buffer = command_buffer.lock().unwrap();
{ let command_buffer = command_buffer.deref();
let connection = db.get_connection(); if let Err(e) = command_buffer.send(Command::REINDEX) {
let connection = connection.lock().unwrap(); println!("Error while writing to index command buffer: {}", e);
let connection = connection.deref(); return;
let settings: Result<MiscSettings> = misc_settings::table
.get_result(connection)
.map_err(|e| e.into());
if let Err(ref e) = settings {
println!("Could not retrieve index sleep duration: {}", e);
}
sleep_duration = settings
.map(|s| s.index_sleep_duration_seconds)
.unwrap_or(1800);
} }
thread::sleep(time::Duration::from_secs(sleep_duration as u64));
} }
let sleep_duration;
{
let connection = db.get_connection();
let connection = connection.lock().unwrap();
let connection = connection.deref();
let settings: Result<MiscSettings> = misc_settings::table
.get_result(connection)
.map_err(|e| e.into());
if let Err(ref e) = settings {
println!("Could not retrieve index sleep duration: {}", e);
}
sleep_duration = settings
.map(|s| s.index_sleep_duration_seconds)
.unwrap_or(1800);
}
thread::sleep(time::Duration::from_secs(sleep_duration as u64));
} }
} }

View file

@ -57,7 +57,8 @@ use iron::prelude::*;
use mount::Mount; use mount::Mount;
use staticfile::Static; use staticfile::Static;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::sync::mpsc::channel;
mod api; mod api;
mod config; mod config;
@ -123,17 +124,24 @@ fn run() -> Result<()> {
config::overwrite(db.deref(), &config)?; config::overwrite(db.deref(), &config)?;
} }
// Begin indexing // Init index
let (index_sender, index_receiver) = channel();
let index_sender = Arc::new(Mutex::new(index_sender));
let db_ref = db.clone(); let db_ref = db.clone();
std::thread::spawn(move || { std::thread::spawn(move || {
let db = db_ref.deref(); let db = db_ref.deref();
index::update_loop(db); index::update_loop(db, index_receiver);
}); });
// Trigger auto-indexing
let db_ref = db.clone();
let sender_ref = index_sender.clone();
std::thread::spawn(move || { index::self_trigger(db_ref.deref(), sender_ref); });
// Mount API // Mount API
println!("Mounting API"); println!("Mounting API");
let mut mount = Mount::new(); let mut mount = Mount::new();
let handler = api::get_handler(db.clone())?; let handler = api::get_handler(db.clone(), index_sender)?;
mount.mount("/api/", handler); mount.mount("/api/", handler);
// Mount static files // Mount static files