Use more threads when populating index

This commit is contained in:
Laurențiu Nicola 2020-07-20 16:30:38 +03:00
parent 8035856fb6
commit dbb5f79371
3 changed files with 93 additions and 59 deletions

11
Cargo.lock generated
View file

@ -325,6 +325,15 @@ dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-channel"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-deque"
version = "0.7.3"
@ -1472,6 +1481,7 @@ dependencies = [
"app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"base64 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)",
"cookie 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"flame 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2842,6 +2852,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum core-foundation 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "57d24c7a13c43e870e37c1556b74555437870a04514f7685f5b354e090567171"
"checksum core-foundation-sys 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061"
"checksum crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
"checksum crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
"checksum crossbeam-queue 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ab6bffe714b6bb07e42f201352c34f51fefd355ace793f9e638ebd52d23f98d2"

View file

@ -15,6 +15,7 @@ anyhow = "1.0.31"
ape = "0.3.0"
app_dirs = "1.1.1"
base64 = "0.12.1"
crossbeam-channel = "0.4"
diesel = { version = "1.4.4", features = ["sqlite", "r2d2"] }
diesel_migrations = { version = "1.4", features = ["sqlite"] }
flame = { version = "0.2.2", optional = true }

View file

@ -1,4 +1,5 @@
use anyhow::*;
use crossbeam_channel::{Receiver, Sender};
use diesel;
use diesel::prelude::*;
#[cfg(feature = "profile-index")]
@ -7,14 +8,14 @@ use log::{error, info};
use rayon::prelude::*;
use regex::Regex;
use std::fs;
use std::path::Path;
use std::sync::mpsc::*;
use std::path::{Path, PathBuf};
use std::time;
use crate::config::MiscSettings;
use crate::db::{directories, misc_settings, songs, DB};
use crate::index::metadata;
use crate::vfs::VFSSource;
use metadata::SongTags;
const INDEX_BUILDING_INSERT_BUFFER_SIZE: usize = 1000; // Insertions in each transaction
const INDEX_BUILDING_CLEAN_BUFFER_SIZE: usize = 500; // Insertions in each transaction
@ -82,12 +83,12 @@ impl IndexUpdater {
}
#[cfg_attr(feature = "profile-index", flame)]
fn push_song(&mut self, song: NewSong) -> Result<()> {
fn push_song(&self, song: NewSong) -> Result<()> {
self.song_sender.send(song).map_err(Error::new)
}
#[cfg_attr(feature = "profile-index", flame)]
fn push_directory(&mut self, directory: NewDirectory) -> Result<()> {
fn push_directory(&self, directory: NewDirectory) -> Result<()> {
self.directory_sender.send(directory).map_err(Error::new)
}
@ -103,7 +104,7 @@ impl IndexUpdater {
Ok(None)
}
fn populate_directory(&mut self, parent: Option<&Path>, path: &Path) -> Result<()> {
fn populate_directory(&self, parent: Option<&Path>, path: &Path) -> Result<()> {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard(format!(
"dir: {}",
@ -148,13 +149,22 @@ impl IndexUpdater {
// Sub directories
let mut sub_directories = Vec::new();
let mut song_files = Vec::new();
let files = match fs::read_dir(path) {
Ok(files) => files,
Err(e) => {
error!("Directory read error for `{}`: {}", path.display(), e);
return Err(e.into());
}
};
// Insert content
for file in fs::read_dir(path)? {
for file in files {
let file_path = match file {
Ok(ref f) => f.path(),
_ => {
error!("File read error within {}", path_string);
Err(e) => {
error!("File read error within `{}`: {}", path_string, e);
break;
}
};
@ -174,54 +184,64 @@ impl IndexUpdater {
continue;
}
if let Some(file_path_string) = file_path.to_str() {
if let Some(tags) = metadata::read(file_path.as_path()) {
if tags.year.is_some() {
inconsistent_directory_year |=
directory_year.is_some() && directory_year != tags.year;
directory_year = tags.year;
}
song_files.push(file_path);
}
if tags.album.is_some() {
inconsistent_directory_album |=
directory_album.is_some() && directory_album != tags.album;
directory_album = tags.album.as_ref().cloned();
}
let song_metadata = |path: PathBuf| -> Option<(String, SongTags)> {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard("song_metadata");
if tags.album_artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.album_artist;
directory_artist = tags.album_artist.as_ref().cloned();
} else if tags.artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.artist;
directory_artist = tags.artist.as_ref().cloned();
}
path.to_str().and_then(|file_path_string| {
metadata::read(&path).map(|m| (file_path_string.to_owned(), m))
})
};
let song_tags = song_files
.into_par_iter()
.filter_map(song_metadata)
.collect::<Vec<_>>();
let song = NewSong {
path: file_path_string.to_owned(),
parent: path_string.to_owned(),
disc_number: tags.disc_number.map(|n| n as i32),
track_number: tags.track_number.map(|n| n as i32),
title: tags.title,
duration: tags.duration.map(|n| n as i32),
artist: tags.artist,
album_artist: tags.album_artist,
album: tags.album,
year: tags.year,
artwork: artwork.as_ref().cloned(),
};
self.push_song(song)?;
}
for (file_path_string, tags) in song_tags {
if tags.year.is_some() {
inconsistent_directory_year |=
directory_year.is_some() && directory_year != tags.year;
directory_year = tags.year;
}
if tags.album.is_some() {
inconsistent_directory_album |=
directory_album.is_some() && directory_album != tags.album;
directory_album = tags.album.as_ref().cloned();
}
if tags.album_artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.album_artist;
directory_artist = tags.album_artist.as_ref().cloned();
} else if tags.artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.artist;
directory_artist = tags.artist.as_ref().cloned();
}
let song = NewSong {
path: file_path_string.to_owned(),
parent: path_string.to_owned(),
disc_number: tags.disc_number.map(|n| n as i32),
track_number: tags.track_number.map(|n| n as i32),
title: tags.title,
duration: tags.duration.map(|n| n as i32),
artist: tags.artist,
album_artist: tags.album_artist,
album: tags.album,
year: tags.year,
artwork: artwork.as_ref().cloned(),
};
self.push_song(song)?;
}
// Insert directory
let directory = {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard("create_directory");
if inconsistent_directory_year {
directory_year = None;
}
@ -246,11 +266,10 @@ impl IndexUpdater {
self.push_directory(directory)?;
// Populate subdirectories
for sub_directory in sub_directories {
self.populate_directory(Some(path), &sub_directory)?;
}
Ok(())
sub_directories
.into_par_iter()
.map(|sub_directory| self.populate_directory(Some(path), &sub_directory))
.collect() // propagate an error to the caller if one of them failed
}
}
@ -322,8 +341,8 @@ pub fn populate(db: &DB) -> Result<()> {
Regex::new(&settings.index_album_art_pattern)?
};
let (directory_sender, directory_receiver) = channel();
let (song_sender, song_receiver) = channel();
let (directory_sender, directory_receiver) = crossbeam_channel::unbounded();
let (song_sender, song_receiver) = crossbeam_channel::unbounded();
let songs_db = db.clone();
let directories_db = db.clone();
@ -337,10 +356,13 @@ pub fn populate(db: &DB) -> Result<()> {
});
{
let mut updater = IndexUpdater::new(album_art_pattern, directory_sender, song_sender)?;
for target in mount_points.values() {
updater.populate_directory(None, target.as_path())?;
}
let updater = IndexUpdater::new(album_art_pattern, directory_sender, song_sender)?;
let mount_points = mount_points.values().collect::<Vec<_>>();
mount_points
.iter()
.par_bridge()
.map(|target| updater.populate_directory(None, target.as_path()))
.collect::<Result<()>>()?;
}
match directories_thread.join() {