Adds index status endpoint
This commit is contained in:
parent
090ca387ab
commit
0a7ae8ebad
3 changed files with 111 additions and 4 deletions
|
@ -6,8 +6,10 @@ use std::path::{Path, PathBuf};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::mpsc::{channel, Sender, TryRecvError};
|
use std::sync::mpsc::{channel, Sender, TryRecvError};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::SystemTime;
|
||||||
use std::{cmp::min, time::Duration};
|
use std::{cmp::min, time::Duration};
|
||||||
use tokio::sync::Notify;
|
use tokio::sync::mpsc::unbounded_channel;
|
||||||
|
use tokio::sync::{Notify, RwLock};
|
||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
|
|
||||||
use crate::app::{config, formats, index, Error};
|
use crate::app::{config, formats, index, Error};
|
||||||
|
@ -37,11 +39,29 @@ pub struct Song {
|
||||||
pub date_added: i64,
|
pub date_added: i64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
pub enum State {
|
||||||
|
#[default]
|
||||||
|
Initial,
|
||||||
|
Pending,
|
||||||
|
InProgress,
|
||||||
|
UpToDate,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
pub struct Status {
|
||||||
|
pub state: State,
|
||||||
|
pub last_start_time: Option<SystemTime>,
|
||||||
|
pub last_end_time: Option<SystemTime>,
|
||||||
|
pub num_songs_indexed: u32,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Scanner {
|
pub struct Scanner {
|
||||||
index_manager: index::Manager,
|
index_manager: index::Manager,
|
||||||
config_manager: config::Manager,
|
config_manager: config::Manager,
|
||||||
pending_scan: Arc<Notify>,
|
pending_scan: Arc<Notify>,
|
||||||
|
status: Arc<RwLock<Status>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Scanner {
|
impl Scanner {
|
||||||
|
@ -53,6 +73,7 @@ impl Scanner {
|
||||||
index_manager,
|
index_manager,
|
||||||
config_manager,
|
config_manager,
|
||||||
pending_scan: Arc::new(Notify::new()),
|
pending_scan: Arc::new(Notify::new()),
|
||||||
|
status: Arc::new(RwLock::new(Status::default())),
|
||||||
};
|
};
|
||||||
|
|
||||||
tokio::spawn({
|
tokio::spawn({
|
||||||
|
@ -70,6 +91,10 @@ impl Scanner {
|
||||||
Ok(scanner)
|
Ok(scanner)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_status(&self) -> Status {
|
||||||
|
self.status.read().await.clone()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn trigger_scan(&self) {
|
pub fn trigger_scan(&self) {
|
||||||
self.pending_scan.notify_one();
|
self.pending_scan.notify_one();
|
||||||
}
|
}
|
||||||
|
@ -88,9 +113,15 @@ impl Scanner {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn update(&mut self) -> Result<(), Error> {
|
pub async fn update(&mut self) -> Result<(), Error> {
|
||||||
let start = Instant::now();
|
|
||||||
info!("Beginning collection scan");
|
info!("Beginning collection scan");
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
{
|
||||||
|
let mut status = self.status.write().await;
|
||||||
|
status.last_start_time = Some(SystemTime::now());
|
||||||
|
status.state = State::InProgress;
|
||||||
|
}
|
||||||
|
|
||||||
let was_empty = self.index_manager.is_index_empty().await;
|
let was_empty = self.index_manager.is_index_empty().await;
|
||||||
let mut partial_update_time = Instant::now();
|
let mut partial_update_time = Instant::now();
|
||||||
|
|
||||||
|
@ -129,13 +160,31 @@ impl Scanner {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let (status_sender, mut status_receiver) = unbounded_channel();
|
||||||
|
let progress_monitor = tokio::task::spawn({
|
||||||
|
let manager = self.clone();
|
||||||
|
async move {
|
||||||
|
loop {
|
||||||
|
match status_receiver.recv().await {
|
||||||
|
Some(n) => {
|
||||||
|
manager.status.write().await.num_songs_indexed = n;
|
||||||
|
}
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let index_task = tokio::task::spawn_blocking(move || {
|
let index_task = tokio::task::spawn_blocking(move || {
|
||||||
let mut index_builder = index::Builder::default();
|
let mut index_builder = index::Builder::default();
|
||||||
|
let mut num_songs_scanned = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let exhausted_songs = match collection_songs_input.try_recv() {
|
let exhausted_songs = match collection_songs_input.try_recv() {
|
||||||
Ok(song) => {
|
Ok(song) => {
|
||||||
index_builder.add_song(song);
|
index_builder.add_song(song);
|
||||||
|
num_songs_scanned += 1;
|
||||||
|
status_sender.send(num_songs_scanned).ok();
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
Err(TryRecvError::Empty) => {
|
Err(TryRecvError::Empty) => {
|
||||||
|
@ -172,10 +221,17 @@ impl Scanner {
|
||||||
|
|
||||||
let index = tokio::join!(scan_task, index_task).1?;
|
let index = tokio::join!(scan_task, index_task).1?;
|
||||||
partial_application_task.abort();
|
partial_application_task.abort();
|
||||||
|
progress_monitor.abort();
|
||||||
|
|
||||||
self.index_manager.persist_index(&index).await?;
|
self.index_manager.persist_index(&index).await?;
|
||||||
self.index_manager.replace_index(index).await;
|
self.index_manager.replace_index(index).await;
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut status = self.status.write().await;
|
||||||
|
status.state = State::UpToDate;
|
||||||
|
status.last_end_time = Some(SystemTime::now());
|
||||||
|
}
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Collection scan took {} seconds",
|
"Collection scan took {} seconds",
|
||||||
start.elapsed().as_millis() as f32 / 1000.0
|
start.elapsed().as_millis() as f32 / 1000.0
|
||||||
|
|
|
@ -34,6 +34,7 @@ pub fn router() -> Router<App> {
|
||||||
.route("/mount_dirs", get(get_mount_dirs))
|
.route("/mount_dirs", get(get_mount_dirs))
|
||||||
.route("/mount_dirs", put(put_mount_dirs))
|
.route("/mount_dirs", put(put_mount_dirs))
|
||||||
.route("/trigger_index", post(post_trigger_index))
|
.route("/trigger_index", post(post_trigger_index))
|
||||||
|
.route("/index_status", get(get_index_status))
|
||||||
// User management
|
// User management
|
||||||
.route("/user", post(post_user))
|
.route("/user", post(post_user))
|
||||||
.route("/user/:name", delete(delete_user))
|
.route("/user/:name", delete(delete_user))
|
||||||
|
@ -255,6 +256,13 @@ async fn post_trigger_index(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_index_status(
|
||||||
|
_admin_rights: AdminRights,
|
||||||
|
State(scanner): State<scanner::Scanner>,
|
||||||
|
) -> Result<Json<dto::IndexStatus>, APIError> {
|
||||||
|
Ok(Json(scanner.get_status().await.into()))
|
||||||
|
}
|
||||||
|
|
||||||
fn index_files_to_response(files: Vec<index::File>, api_version: APIMajorVersion) -> Response {
|
fn index_files_to_response(files: Vec<index::File>, api_version: APIMajorVersion) -> Response {
|
||||||
match api_version {
|
match api_version {
|
||||||
APIMajorVersion::V7 => Json(
|
APIMajorVersion::V7 => Json(
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::app::{config, index, peaks, playlist, thumbnail};
|
use crate::app::{config, index, peaks, playlist, scanner, thumbnail};
|
||||||
use std::{collections::HashMap, convert::From, path::PathBuf};
|
use std::{collections::HashMap, convert::From, path::PathBuf, time::UNIX_EPOCH};
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
|
||||||
pub struct Version {
|
pub struct Version {
|
||||||
|
@ -171,6 +171,49 @@ pub struct Settings {
|
||||||
pub ddns_update_url: String,
|
pub ddns_update_url: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub enum IndexState {
|
||||||
|
OutOfDate,
|
||||||
|
InProgress,
|
||||||
|
UpToDate,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<scanner::State> for IndexState {
|
||||||
|
fn from(state: scanner::State) -> Self {
|
||||||
|
match state {
|
||||||
|
scanner::State::Initial => Self::OutOfDate,
|
||||||
|
scanner::State::Pending => Self::OutOfDate,
|
||||||
|
scanner::State::InProgress => Self::InProgress,
|
||||||
|
scanner::State::UpToDate => Self::UpToDate,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct IndexStatus {
|
||||||
|
state: IndexState,
|
||||||
|
last_start_time: Option<u64>,
|
||||||
|
last_end_time: Option<u64>,
|
||||||
|
num_songs_indexed: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<scanner::Status> for IndexStatus {
|
||||||
|
fn from(s: scanner::Status) -> Self {
|
||||||
|
Self {
|
||||||
|
state: s.state.into(),
|
||||||
|
last_start_time: s
|
||||||
|
.last_start_time
|
||||||
|
.and_then(|t| t.duration_since(UNIX_EPOCH).ok())
|
||||||
|
.map(|d| d.as_millis() as u64),
|
||||||
|
last_end_time: s
|
||||||
|
.last_end_time
|
||||||
|
.and_then(|t| t.duration_since(UNIX_EPOCH).ok())
|
||||||
|
.map(|d| d.as_millis() as u64),
|
||||||
|
num_songs_indexed: s.num_songs_indexed,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
pub struct Song {
|
pub struct Song {
|
||||||
pub path: PathBuf,
|
pub path: PathBuf,
|
||||||
|
|
Loading…
Add table
Reference in a new issue