Merge pull request #86 from lnicola/parallel-index

Fix build and populate index in parallel
This commit is contained in:
Antoine Gersant 2020-07-21 01:46:41 -07:00 committed by GitHub
commit 17976dc99f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 102 additions and 74 deletions

35
Cargo.lock generated
View file

@ -325,6 +325,15 @@ dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-channel"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-deque"
version = "0.7.3"
@ -1419,22 +1428,22 @@ dependencies = [
[[package]]
name = "pear"
version = "0.1.2"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"pear_codegen 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"pear_codegen 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "pear_codegen"
version = "0.1.2"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.44 (registry+https://github.com/rust-lang/crates.io-index)",
"version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"yansi 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"version_check 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
"yansi 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1472,6 +1481,7 @@ dependencies = [
"app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"base64 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)",
"cookie 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel 1.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"flame 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1856,7 +1866,7 @@ dependencies = [
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
"pear 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"pear 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"rocket_codegen 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rocket_http 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"state 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1900,7 +1910,7 @@ dependencies = [
"cookie 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.10.16 (registry+https://github.com/rust-lang/crates.io-index)",
"indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"pear 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"pear 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"state 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2790,11 +2800,6 @@ name = "xdg"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "yansi"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "yansi"
version = "0.5.0"
@ -2847,6 +2852,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum core-foundation 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "57d24c7a13c43e870e37c1556b74555437870a04514f7685f5b354e090567171"
"checksum core-foundation-sys 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061"
"checksum crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
"checksum crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
"checksum crossbeam-queue 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ab6bffe714b6bb07e42f201352c34f51fefd355ace793f9e638ebd52d23f98d2"
@ -2969,8 +2975,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum parking_lot_core 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b876b1b9e7ac6e1a74a6da34d25c42e17e8862aa409cbbbdcfc8d86c6f3bc62b"
"checksum parking_lot_core 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3"
"checksum pbkdf2 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "006c038a43a45995a9670da19e67600114740e8511d4333bf97a56e66a7542d9"
"checksum pear 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c26d2b92e47063ffce70d3e3b1bd097af121a9e0db07ca38a6cc1cf0cc85ff25"
"checksum pear_codegen 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "336db4a192cc7f54efeb0c4e11a9245394824cc3bcbd37ba3ff51240c35d7a6e"
"checksum pear 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5320f212db967792b67cfe12bd469d08afd6318a249bd917d5c19bc92200ab8a"
"checksum pear_codegen 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "bfc1c836fdc3d1ef87c348b237b5b5c4dff922156fb2d968f57734f9669768ca"
"checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831"
"checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
"checksum pkg-config 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)" = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677"
@ -3120,6 +3126,5 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum wrapped-vec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "06c29bb4abe93d1c8ef79b60f270d0efcaa6c5c97aaaaaaa0d477ea72f5f9e45"
"checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
"checksum xdg 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d089681aa106a86fade1b0128fb5daf07d5867a509ab036d99988dec80429a57"
"checksum yansi 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d60c3b48c9cdec42fb06b3b84b5b087405e1fa1c644a1af3930e4dfafe93de48"
"checksum yansi 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9fc79f4a1e39857fc00c3f662cbf2651c771f00e9c15fe2abc341806bd46bd71"
"checksum zeroize 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3cbac2ed2ba24cc90f5e06485ac8c7c1e5449fe8911aef4d8877218af021a5b8"

View file

@ -15,6 +15,7 @@ anyhow = "1.0.31"
ape = "0.3.0"
app_dirs = "1.1.1"
base64 = "0.12.1"
crossbeam-channel = "0.4"
diesel = { version = "1.4.4", features = ["sqlite", "r2d2"] }
diesel_migrations = { version = "1.4", features = ["sqlite"] }
flame = { version = "0.2.2", optional = true }

View file

@ -1,4 +1,5 @@
use anyhow::*;
use crossbeam_channel::{Receiver, Sender};
use diesel;
use diesel::prelude::*;
#[cfg(feature = "profile-index")]
@ -7,14 +8,14 @@ use log::{error, info};
use rayon::prelude::*;
use regex::Regex;
use std::fs;
use std::path::Path;
use std::sync::mpsc::*;
use std::path::{Path, PathBuf};
use std::time;
use crate::config::MiscSettings;
use crate::db::{directories, misc_settings, songs, DB};
use crate::index::metadata;
use crate::vfs::VFSSource;
use metadata::SongTags;
const INDEX_BUILDING_INSERT_BUFFER_SIZE: usize = 1000; // Insertions in each transaction
const INDEX_BUILDING_CLEAN_BUFFER_SIZE: usize = 500; // Insertions in each transaction
@ -82,12 +83,12 @@ impl IndexUpdater {
}
#[cfg_attr(feature = "profile-index", flame)]
fn push_song(&mut self, song: NewSong) -> Result<()> {
fn push_song(&self, song: NewSong) -> Result<()> {
self.song_sender.send(song).map_err(Error::new)
}
#[cfg_attr(feature = "profile-index", flame)]
fn push_directory(&mut self, directory: NewDirectory) -> Result<()> {
fn push_directory(&self, directory: NewDirectory) -> Result<()> {
self.directory_sender.send(directory).map_err(Error::new)
}
@ -103,7 +104,7 @@ impl IndexUpdater {
Ok(None)
}
fn populate_directory(&mut self, parent: Option<&Path>, path: &Path) -> Result<()> {
fn populate_directory(&self, parent: Option<&Path>, path: &Path) -> Result<()> {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard(format!(
"dir: {}",
@ -148,13 +149,22 @@ impl IndexUpdater {
// Sub directories
let mut sub_directories = Vec::new();
let mut song_files = Vec::new();
let files = match fs::read_dir(path) {
Ok(files) => files,
Err(e) => {
error!("Directory read error for `{}`: {}", path.display(), e);
return Err(e.into());
}
};
// Insert content
for file in fs::read_dir(path)? {
for file in files {
let file_path = match file {
Ok(ref f) => f.path(),
_ => {
error!("File read error within {}", path_string);
Err(e) => {
error!("File read error within `{}`: {}", path_string, e);
break;
}
};
@ -174,54 +184,64 @@ impl IndexUpdater {
continue;
}
if let Some(file_path_string) = file_path.to_str() {
if let Some(tags) = metadata::read(file_path.as_path()) {
if tags.year.is_some() {
inconsistent_directory_year |=
directory_year.is_some() && directory_year != tags.year;
directory_year = tags.year;
}
song_files.push(file_path);
}
if tags.album.is_some() {
inconsistent_directory_album |=
directory_album.is_some() && directory_album != tags.album;
directory_album = tags.album.as_ref().cloned();
}
let song_metadata = |path: PathBuf| -> Option<(String, SongTags)> {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard("song_metadata");
if tags.album_artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.album_artist;
directory_artist = tags.album_artist.as_ref().cloned();
} else if tags.artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.artist;
directory_artist = tags.artist.as_ref().cloned();
}
path.to_str().and_then(|file_path_string| {
metadata::read(&path).map(|m| (file_path_string.to_owned(), m))
})
};
let song_tags = song_files
.into_par_iter()
.filter_map(song_metadata)
.collect::<Vec<_>>();
let song = NewSong {
path: file_path_string.to_owned(),
parent: path_string.to_owned(),
disc_number: tags.disc_number.map(|n| n as i32),
track_number: tags.track_number.map(|n| n as i32),
title: tags.title,
duration: tags.duration.map(|n| n as i32),
artist: tags.artist,
album_artist: tags.album_artist,
album: tags.album,
year: tags.year,
artwork: artwork.as_ref().cloned(),
};
self.push_song(song)?;
}
for (file_path_string, tags) in song_tags {
if tags.year.is_some() {
inconsistent_directory_year |=
directory_year.is_some() && directory_year != tags.year;
directory_year = tags.year;
}
if tags.album.is_some() {
inconsistent_directory_album |=
directory_album.is_some() && directory_album != tags.album;
directory_album = tags.album.as_ref().cloned();
}
if tags.album_artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.album_artist;
directory_artist = tags.album_artist.as_ref().cloned();
} else if tags.artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.artist;
directory_artist = tags.artist.as_ref().cloned();
}
let song = NewSong {
path: file_path_string.to_owned(),
parent: path_string.to_owned(),
disc_number: tags.disc_number.map(|n| n as i32),
track_number: tags.track_number.map(|n| n as i32),
title: tags.title,
duration: tags.duration.map(|n| n as i32),
artist: tags.artist,
album_artist: tags.album_artist,
album: tags.album,
year: tags.year,
artwork: artwork.as_ref().cloned(),
};
self.push_song(song)?;
}
// Insert directory
let directory = {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard("create_directory");
if inconsistent_directory_year {
directory_year = None;
}
@ -246,11 +266,10 @@ impl IndexUpdater {
self.push_directory(directory)?;
// Populate subdirectories
for sub_directory in sub_directories {
self.populate_directory(Some(path), &sub_directory)?;
}
Ok(())
sub_directories
.into_par_iter()
.map(|sub_directory| self.populate_directory(Some(path), &sub_directory))
.collect() // propagate an error to the caller if one of them failed
}
}
@ -322,8 +341,8 @@ pub fn populate(db: &DB) -> Result<()> {
Regex::new(&settings.index_album_art_pattern)?
};
let (directory_sender, directory_receiver) = channel();
let (song_sender, song_receiver) = channel();
let (directory_sender, directory_receiver) = crossbeam_channel::unbounded();
let (song_sender, song_receiver) = crossbeam_channel::unbounded();
let songs_db = db.clone();
let directories_db = db.clone();
@ -337,10 +356,13 @@ pub fn populate(db: &DB) -> Result<()> {
});
{
let mut updater = IndexUpdater::new(album_art_pattern, directory_sender, song_sender)?;
for target in mount_points.values() {
updater.populate_directory(None, target.as_path())?;
}
let updater = IndexUpdater::new(album_art_pattern, directory_sender, song_sender)?;
let mount_points = mount_points.values().collect::<Vec<_>>();
mount_points
.iter()
.par_bridge()
.map(|target| updater.populate_directory(None, target.as_path()))
.collect::<Result<()>>()?;
}
match directories_thread.join() {