Cleaned index API

This commit is contained in:
Antoine Gersant 2020-01-18 17:07:56 -08:00
parent b1e4be2f8f
commit e53b9f5867
7 changed files with 118 additions and 151 deletions

View file

@ -1,136 +1,101 @@
use anyhow::*; use anyhow::*;
use core::ops::Deref;
use diesel; use diesel;
use diesel::prelude::*; use diesel::prelude::*;
#[cfg(feature = "profile-index")] #[cfg(feature = "profile-index")]
use flame; use flame;
use log::{error, info}; use log::error;
use std::path::Path; use std::sync::{Arc, Mutex, Condvar};
use std::sync::mpsc::*;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time; use std::time;
use crate::db::{misc_settings, DB};
use crate::config::MiscSettings; use crate::config::MiscSettings;
use crate::db::{directories, misc_settings, songs, DB};
use crate::vfs::VFS; use crate::vfs::VFS;
mod populate;
mod query; mod query;
#[cfg(test)] #[cfg(test)]
mod test; mod test;
mod types; mod types;
mod update;
pub use self::populate::*; pub use self::update::*;
pub use self::query::*; pub use self::query::*;
pub use self::types::*; pub use self::types::*;
enum Command { pub fn builder(db: DB) -> IndexBuilder {
REINDEX, IndexBuilder {
EXIT, db: db,
} periodic_updates: true,
struct CommandReceiver {
receiver: Receiver<Command>,
}
impl CommandReceiver {
fn new(receiver: Receiver<Command>) -> CommandReceiver {
CommandReceiver { receiver }
} }
} }
pub struct CommandSender { pub struct IndexBuilder {
sender: Mutex<Sender<Command>>, db: DB,
periodic_updates: bool,
} }
impl CommandSender { impl IndexBuilder {
fn new(sender: Sender<Command>) -> CommandSender { pub fn periodic_updates(mut self, enabled: bool) -> IndexBuilder {
CommandSender { self.periodic_updates = enabled;
sender: Mutex::new(sender), self
}
} }
pub fn trigger_reindex(&self) -> Result<()> { pub fn build(self) -> Index {
let sender = self.sender.lock().unwrap(); let index = Index {
match sender.send(Command::REINDEX) { pending_reindex: Arc::new((Mutex::new(false), Condvar::new())),
Ok(_) => Ok(()), db: self.db.clone(),
Err(_) => bail!("Trigger reindex channel error"), };
}
}
#[allow(dead_code)] let commands_index = index.clone();
pub fn exit(&self) -> Result<()> {
let sender = self.sender.lock().unwrap();
match sender.send(Command::EXIT) {
Ok(_) => Ok(()),
Err(_) => bail!("Index exit channel error"),
}
}
}
pub fn init(db: DB) -> Arc<CommandSender> {
let (index_sender, index_receiver) = channel();
let command_sender = Arc::new(CommandSender::new(index_sender));
let command_receiver = CommandReceiver::new(index_receiver);
// Start update loop
std::thread::spawn(move || { std::thread::spawn(move || {
update_loop(&db, &command_receiver); commands_index.process_commands();
}); });
command_sender if self.periodic_updates {
let auto_index = index.clone();
std::thread::spawn(move || {
auto_index.automatic_reindex();
});
}
index
}
} }
pub fn update(db: &DB) -> Result<()> { #[derive(Clone)]
let start = time::Instant::now(); pub struct Index {
info!("Beginning library index update"); db: DB,
clean(db)?; pending_reindex: Arc<(Mutex<bool>, Condvar)>,
populate(db)?;
info!(
"Library index update took {} seconds",
start.elapsed().as_secs()
);
#[cfg(feature = "profile-index")]
flame::dump_html(&mut fs::File::create("index-flame-graph.html").unwrap()).unwrap();
Ok(())
} }
fn update_loop(db: &DB, command_buffer: &CommandReceiver) { impl Index {
loop { pub fn trigger_reindex(&self) {
// Wait for a command let (lock, cvar) = &*self.pending_reindex;
if command_buffer.receiver.recv().is_err() { let mut pending_reindex = lock.lock().unwrap();
return; *pending_reindex = true;
cvar.notify_one();
} }
// Flush the buffer to ignore spammy requests fn process_commands(&self) {
loop { loop {
match command_buffer.receiver.try_recv() { {
Err(TryRecvError::Disconnected) => return, let (lock, cvar) = &*self.pending_reindex;
Ok(Command::EXIT) => return, let mut pending = lock.lock().unwrap();
Err(TryRecvError::Empty) => break, while !*pending {
Ok(_) => (), pending = cvar.wait(pending).unwrap();
} }
*pending = false;
} }
if let Err(e) = update(&self.db) {
// Do the update
if let Err(e) = update(db) {
error!("Error while updating index: {}", e); error!("Error while updating index: {}", e);
} }
} }
} }
pub fn self_trigger(db: &DB, command_buffer: &Arc<CommandSender>) { fn automatic_reindex(&self) {
loop { loop {
{ self.trigger_reindex();
let command_buffer = command_buffer.deref();
if let Err(e) = command_buffer.trigger_reindex() {
error!("Error while writing to index command buffer: {}", e);
return;
}
}
let sleep_duration = { let sleep_duration = {
let connection = db.connect(); let connection = self.db.connect();
connection connection
.and_then(|c| { .and_then(|c| {
misc_settings::table misc_settings::table
@ -143,6 +108,7 @@ pub fn self_trigger(db: &DB, command_buffer: &Arc<CommandSender>) {
1800 1800
}) })
}; };
thread::sleep(time::Duration::from_secs(sleep_duration as u64)); std::thread::sleep(time::Duration::from_secs(sleep_duration as u64));
}
} }
} }

View file

@ -1,7 +1,9 @@
use std::path::PathBuf; use std::path::{Path, PathBuf};
use crate::db; use crate::db;
use crate::db::{directories, songs};
use crate::index::*; use crate::index::*;
#[test] #[test]

View file

@ -3,7 +3,7 @@ use diesel;
use diesel::prelude::*; use diesel::prelude::*;
#[cfg(feature = "profile-index")] #[cfg(feature = "profile-index")]
use flame; use flame;
use log::{error}; use log::{error, info};
use regex::Regex; use regex::Regex;
use std::fs; use std::fs;
use std::path::Path; use std::path::Path;
@ -17,6 +17,20 @@ use crate::vfs::VFSSource;
const INDEX_BUILDING_INSERT_BUFFER_SIZE: usize = 1000; // Insertions in each transaction const INDEX_BUILDING_INSERT_BUFFER_SIZE: usize = 1000; // Insertions in each transaction
const INDEX_BUILDING_CLEAN_BUFFER_SIZE: usize = 500; // Insertions in each transaction const INDEX_BUILDING_CLEAN_BUFFER_SIZE: usize = 500; // Insertions in each transaction
pub fn update(db: &DB) -> Result<()> {
let start = time::Instant::now();
info!("Beginning library index update");
clean(db)?;
populate(db)?;
info!(
"Library index update took {} seconds",
start.elapsed().as_secs()
);
#[cfg(feature = "profile-index")]
flame::dump_html(&mut fs::File::create("index-flame-graph.html").unwrap()).unwrap();
Ok(())
}
#[derive(Debug, Insertable)] #[derive(Debug, Insertable)]
#[table_name = "songs"] #[table_name = "songs"]
struct NewSong { struct NewSong {

View file

@ -174,14 +174,9 @@ fn main() -> Result<()> {
// Init index // Init index
info!("Initializing index"); info!("Initializing index");
let command_sender = index::init(db.clone()); let index = index::builder(db.clone())
.periodic_updates(true)
// Trigger auto-indexing .build();
let db_auto_index = db.clone();
let command_sender_auto_index = command_sender.clone();
std::thread::spawn(move || {
index::self_trigger(&db_auto_index, &command_sender_auto_index);
});
// API mount target // API mount target
let prefix_url = config.prefix_url.unwrap_or_else(|| "".to_string()); let prefix_url = config.prefix_url.unwrap_or_else(|| "".to_string());
@ -228,7 +223,7 @@ fn main() -> Result<()> {
swagger_url, swagger_url,
swagger_dir_path, swagger_dir_path,
db_server, db_server,
command_sender, index,
); );
}); });

View file

@ -9,13 +9,13 @@ use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use std::str; use std::str;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use time::Duration; use time::Duration;
use super::serve; use super::serve;
use crate::config::{self, Config, Preferences}; use crate::config::{self, Config, Preferences};
use crate::db::DB; use crate::db::DB;
use crate::index; use crate::index;
use crate::index::Index;
use crate::lastfm; use crate::lastfm;
use crate::playlist; use crate::playlist;
use crate::service::constants::*; use crate::service::constants::*;
@ -231,10 +231,10 @@ fn put_preferences(db: State<'_, DB>, auth: Auth, preferences: Json<Preferences>
#[post("/trigger_index")] #[post("/trigger_index")]
fn trigger_index( fn trigger_index(
command_sender: State<'_, Arc<index::CommandSender>>, index: State<'_, Index>,
_admin_rights: AdminRights, _admin_rights: AdminRights,
) -> Result<()> { ) -> Result<()> {
command_sender.trigger_reindex()?; index.trigger_reindex();
Ok(()) Ok(())
} }

View file

@ -3,11 +3,10 @@ use rocket;
use rocket::config::{Environment, LoggingLevel}; use rocket::config::{Environment, LoggingLevel};
use rocket_contrib::serve::StaticFiles; use rocket_contrib::serve::StaticFiles;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use super::api; use super::api;
use crate::db::DB; use crate::db::DB;
use crate::index::CommandSender; use crate::index::Index;
pub fn get_server( pub fn get_server(
port: u16, port: u16,
@ -18,7 +17,7 @@ pub fn get_server(
swagger_url: &str, swagger_url: &str,
swagger_dir_path: &PathBuf, swagger_dir_path: &PathBuf,
db: DB, db: DB,
command_sender: Arc<CommandSender>, command_sender: Index,
) -> Result<rocket::Rocket> { ) -> Result<rocket::Rocket> {
let mut config = rocket::Config::build(Environment::Production) let mut config = rocket::Config::build(Environment::Production)
.log_level(LoggingLevel::Normal) .log_level(LoggingLevel::Normal)
@ -55,7 +54,7 @@ pub fn run(
swagger_url: String, swagger_url: String,
swagger_dir_path: PathBuf, swagger_dir_path: PathBuf,
db: DB, db: DB,
command_sender: Arc<CommandSender>, command_sender: Index,
) -> Result<()> { ) -> Result<()> {
let server = get_server( let server = get_server(
port, port,

View file

@ -5,9 +5,8 @@ use rocket::local::Client;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::fs; use std::fs;
use std::ops::{Deref, DerefMut}; use std::ops::DerefMut;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use super::server; use super::server;
use crate::db::DB; use crate::db::DB;
@ -49,7 +48,6 @@ impl<'r, 's> RocketResponse<'r, 's> {
pub struct RocketTestService { pub struct RocketTestService {
client: Client, client: Client,
command_sender: Arc<index::CommandSender>,
} }
pub type ServiceType = RocketTestService; pub type ServiceType = RocketTestService;
@ -67,7 +65,7 @@ impl TestService for RocketTestService {
let web_dir_path = PathBuf::from("web"); let web_dir_path = PathBuf::from("web");
let mut swagger_dir_path = PathBuf::from("docs"); let mut swagger_dir_path = PathBuf::from("docs");
swagger_dir_path.push("swagger"); swagger_dir_path.push("swagger");
let command_sender = index::init(db.clone()); let index = index::builder(db.clone()).periodic_updates(false).build();
let auth_secret: [u8; 32] = [0; 32]; let auth_secret: [u8; 32] = [0; 32];
@ -79,14 +77,13 @@ impl TestService for RocketTestService {
&web_dir_path, &web_dir_path,
"/swagger", "/swagger",
&swagger_dir_path, &swagger_dir_path,
db.clone(), db,
command_sender.clone(), index,
) )
.unwrap(); .unwrap();
let client = Client::new(server).unwrap(); let client = Client::new(server).unwrap();
RocketTestService { RocketTestService {
client, client,
command_sender,
} }
} }
@ -156,9 +153,3 @@ impl TestService for RocketTestService {
.to_void() .to_void()
} }
} }
impl Drop for RocketTestService {
fn drop(&mut self) {
self.command_sender.deref().exit().unwrap();
}
}