Moved database insertions to separate threads from the file crawl

This commit is contained in:
Antoine Gersant 2020-01-18 21:57:44 -08:00
parent 18bc9594a4
commit 1764f3da4d

View file

@ -9,6 +9,7 @@ use regex::Regex;
use std::fs; use std::fs;
use std::path::Path; use std::path::Path;
use std::time; use std::time;
use std::sync::mpsc::*;
use crate::config::MiscSettings; use crate::config::MiscSettings;
use crate::db::{directories, misc_settings, songs, DB}; use crate::db::{directories, misc_settings, songs, DB};
@ -61,63 +62,29 @@ struct NewDirectory {
} }
struct IndexUpdater { struct IndexUpdater {
new_songs: Vec<NewSong>, directory_sender: Sender<NewDirectory>,
new_directories: Vec<NewDirectory>, song_sender: Sender<NewSong>,
db: DB,
album_art_pattern: Regex, album_art_pattern: Regex,
} }
impl IndexUpdater { impl IndexUpdater {
#[cfg_attr(feature = "profile-index", flame)] #[cfg_attr(feature = "profile-index", flame)]
fn new(db: DB, album_art_pattern: Regex) -> Result<IndexUpdater> { fn new(album_art_pattern: Regex, directory_sender: Sender<NewDirectory>, song_sender: Sender<NewSong>) -> Result<IndexUpdater> {
let mut new_songs = Vec::new();
let mut new_directories = Vec::new();
new_songs.reserve_exact(INDEX_BUILDING_INSERT_BUFFER_SIZE);
new_directories.reserve_exact(INDEX_BUILDING_INSERT_BUFFER_SIZE);
Ok(IndexUpdater { Ok(IndexUpdater {
new_songs, directory_sender,
new_directories, song_sender,
db,
album_art_pattern, album_art_pattern,
}) })
} }
#[cfg_attr(feature = "profile-index", flame)]
fn flush_songs(&mut self) -> Result<()> {
let connection = self.db.connect()?;
diesel::insert_into(songs::table)
.values(&self.new_songs)
.execute(&*connection)?; // TODO https://github.com/diesel-rs/diesel/issues/1822
self.new_songs.clear();
Ok(())
}
#[cfg_attr(feature = "profile-index", flame)]
fn flush_directories(&mut self) -> Result<()> {
let connection = self.db.connect()?;
diesel::insert_into(directories::table)
.values(&self.new_directories)
.execute(&*connection)?; // TODO https://github.com/diesel-rs/diesel/issues/1822
self.new_directories.clear();
Ok(())
}
#[cfg_attr(feature = "profile-index", flame)] #[cfg_attr(feature = "profile-index", flame)]
fn push_song(&mut self, song: NewSong) -> Result<()> { fn push_song(&mut self, song: NewSong) -> Result<()> {
if self.new_songs.len() >= self.new_songs.capacity() { self.song_sender.send(song).map_err(Error::new)
self.flush_songs()?;
}
self.new_songs.push(song);
Ok(())
} }
#[cfg_attr(feature = "profile-index", flame)] #[cfg_attr(feature = "profile-index", flame)]
fn push_directory(&mut self, directory: NewDirectory) -> Result<()> { fn push_directory(&mut self, directory: NewDirectory) -> Result<()> {
if self.new_directories.len() >= self.new_directories.capacity() { self.directory_sender.send(directory).map_err(Error::new)
self.flush_directories()?;
}
self.new_directories.push(directory);
Ok(())
} }
fn get_artwork(&self, dir: &Path) -> Result<Option<String>> { fn get_artwork(&self, dir: &Path) -> Result<Option<String>> {
@ -319,11 +286,78 @@ pub fn populate(db: &DB) -> Result<()> {
album_art_pattern = Regex::new(&settings.index_album_art_pattern)?; album_art_pattern = Regex::new(&settings.index_album_art_pattern)?;
} }
let mut updater = IndexUpdater::new(db.clone(), album_art_pattern)?; let (directory_sender, directory_receiver) = channel();
let directories_db = db.clone();
std::thread::spawn(move || {
insert_directories(directory_receiver, directories_db);
});
let (song_sender, song_receiver) = channel();
let songs_db = db.clone();
std::thread::spawn(move || {
insert_songs(song_receiver, songs_db);
});
let mut updater = IndexUpdater::new(album_art_pattern, directory_sender, song_sender)?;
for target in mount_points.values() { for target in mount_points.values() {
updater.populate_directory(None, target.as_path())?; updater.populate_directory(None, target.as_path())?;
} }
updater.flush_songs()?;
updater.flush_directories()?;
Ok(()) Ok(())
} }
fn insert_songs(receiver: Receiver<NewSong>, db: DB) {
let mut new_entries = Vec::new();
new_entries.reserve_exact(INDEX_BUILDING_INSERT_BUFFER_SIZE);
loop {
match receiver.recv() {
Ok(s) => {
new_entries.push(s);
if new_entries.len() >= INDEX_BUILDING_INSERT_BUFFER_SIZE {
let connection = db.connect().unwrap();
diesel::insert_into(songs::table)
.values(&new_entries)
.execute(&*connection).unwrap(); // TODO https://github.com/diesel-rs/diesel/issues/1822
new_entries.clear();
}
},
Err(_) => break,
}
}
if new_entries.len() > 0 {
let connection = db.connect().unwrap();
diesel::insert_into(songs::table)
.values(&new_entries)
.execute(&*connection).unwrap();
}
}
fn insert_directories(receiver: Receiver<NewDirectory>, db: DB) {
let mut new_entries = Vec::new();
new_entries.reserve_exact(INDEX_BUILDING_INSERT_BUFFER_SIZE);
loop {
match receiver.recv() {
Ok(s) => {
new_entries.push(s);
if new_entries.len() >= INDEX_BUILDING_INSERT_BUFFER_SIZE {
let connection = db.connect().unwrap();
diesel::insert_into(directories::table)
.values(&new_entries)
.execute(&*connection).unwrap(); // TODO https://github.com/diesel-rs/diesel/issues/1822
new_entries.clear();
}
},
Err(_) => break,
}
}
if new_entries.len() > 0 {
let connection = db.connect().unwrap();
diesel::insert_into(directories::table)
.values(&new_entries)
.execute(&*connection).unwrap();
}
}