refactor: Phase C — découpe scanner.rs et analyzer.rs en sous-fonctions

scanner.rs:
- Extrait should_skip_deletions() — logique pure de sécurité anti-suppression (testable)
- Extrait handle_stale_deletions() — gestion des fichiers disparus du disque
- Extrait upsert_directory_mtimes() — sauvegarde des mtimes pour scan incrémental
- 6 tests unitaires pour should_skip_deletions (volume démonté, DB vide, cas normal, etc.)

analyzer.rs:
- Extrait spawn_cancellation_poller() — polling d'annulation de job réutilisable

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-29 12:17:39 +02:00
parent 4133d406e1
commit 13b1e1768e
2 changed files with 152 additions and 79 deletions

View File

@@ -353,22 +353,7 @@ pub async fn analyze_library_books(
total, thumbnail_only, concurrency
);
let cancelled_flag = Arc::new(AtomicBool::new(false));
let cancel_pool = state.pool.clone();
let cancel_flag_for_poller = cancelled_flag.clone();
let cancel_handle = tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
match is_job_cancelled(&cancel_pool, job_id).await {
Ok(true) => {
cancel_flag_for_poller.store(true, Ordering::Relaxed);
break;
}
Ok(false) => {}
Err(_) => break,
}
}
});
let (cancelled_flag, cancel_handle) = spawn_cancellation_poller(state.pool.clone(), job_id);
#[derive(Clone)]
struct BookTask {
@@ -845,3 +830,27 @@ pub async fn cleanup_orphaned_thumbnails(state: &AppState) -> Result<()> {
info!("[ANALYZER] Deleted {} orphaned thumbnail files", deleted_count);
Ok(())
}
/// Spawn a background task that polls for job cancellation every 2 seconds.
/// Returns the shared cancellation flag and the task handle.
fn spawn_cancellation_poller(
pool: sqlx::PgPool,
job_id: Uuid,
) -> (Arc<AtomicBool>, tokio::task::JoinHandle<()>) {
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = flag.clone();
let handle = tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
match is_job_cancelled(&pool, job_id).await {
Ok(true) => {
flag_clone.store(true, Ordering::Relaxed);
break;
}
Ok(false) => {}
Err(_) => break,
}
}
});
(flag, handle)
}

View File

@@ -501,81 +501,145 @@ pub async fn scan_library_discovery(
library_id, library_processed_count, stats.indexed_files, stats.errors
);
// Handle deletions — with safety check against volume mount failures
handle_stale_deletions(state, library_id, root, &existing, &seen, stats).await?;
upsert_directory_mtimes(state, library_id, &new_dir_mtimes).await;
Ok(())
}
/// Determine whether file deletions should be skipped based on safety heuristics.
/// Returns true if deletions should be skipped (e.g., volume not mounted).
fn should_skip_deletions(root_accessible: bool, seen_count: usize, existing_count: usize, stale_count: usize) -> bool {
!root_accessible
|| (seen_count == 0 && existing_count > 0)
|| (stale_count > 0 && stale_count == existing_count)
}
/// Handle deletion of stale files (files in DB but no longer on disk).
/// Includes safety checks to prevent mass deletion if volume is unmounted.
async fn handle_stale_deletions(
state: &AppState,
library_id: Uuid,
root: &Path,
existing: &HashMap<String, (Uuid, Uuid, String)>,
seen: &HashMap<String, bool>,
stats: &mut JobStats,
) -> Result<()> {
let existing_count = existing.len();
let seen_count = seen.len();
let stale_count = existing.iter().filter(|(p, _)| !seen.contains_key(p.as_str())).count();
// Safety: if the library root is not accessible, or if we found zero files
// but the DB had many, the volume is probably not mounted correctly.
// Do NOT delete anything in that case.
let root_accessible = root.is_dir() && std::fs::read_dir(root).is_ok();
let skip_deletions = !root_accessible
|| (seen_count == 0 && existing_count > 0)
|| (stale_count > 0 && stale_count == existing_count);
if skip_deletions && stale_count > 0 {
warn!(
"[SCAN] Skipping deletion of {} stale files for library {} — \
root accessible={}, seen={}, existing={}. \
Volume may not be mounted correctly.",
stale_count, library_id, root_accessible, seen_count, existing_count
);
stats.warnings += stale_count;
} else {
let mut removed_count = 0usize;
for (abs_path, (file_id, book_id, _)) in &existing {
if seen.contains_key(abs_path) {
continue;
}
sqlx::query("DELETE FROM book_files WHERE id = $1")
.bind(file_id)
.execute(&state.pool)
.await?;
sqlx::query(
"DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)",
)
.bind(book_id)
.execute(&state.pool)
.await?;
stats.removed_files += 1;
removed_count += 1;
}
if removed_count > 0 {
info!(
"[SCAN] Removed {} stale files from database",
removed_count
if should_skip_deletions(root_accessible, seen_count, existing_count, stale_count) {
if stale_count > 0 {
warn!(
"[SCAN] Skipping deletion of {} stale files for library {} — \
root accessible={}, seen={}, existing={}. \
Volume may not be mounted correctly.",
stale_count, library_id, root_accessible, seen_count, existing_count
);
stats.warnings += stale_count;
}
return Ok(());
}
// Upsert directory mtimes for next incremental scan
if !new_dir_mtimes.is_empty() {
let dir_paths_db: Vec<String> = new_dir_mtimes
.iter()
.map(|(local, _)| utils::unmap_libraries_path(local))
.collect();
let mtimes: Vec<DateTime<Utc>> = new_dir_mtimes.iter().map(|(_, m)| *m).collect();
let library_ids: Vec<Uuid> = vec![library_id; new_dir_mtimes.len()];
if let Err(e) = sqlx::query(
r#"
INSERT INTO directory_mtimes (library_id, dir_path, mtime)
SELECT * FROM UNNEST($1::uuid[], $2::text[], $3::timestamptz[])
AS t(library_id, dir_path, mtime)
ON CONFLICT (library_id, dir_path) DO UPDATE SET mtime = EXCLUDED.mtime
"#,
)
.bind(&library_ids)
.bind(&dir_paths_db)
.bind(&mtimes)
.execute(&state.pool)
.await
{
warn!("[SCAN] Failed to upsert directory mtimes: {}", e);
let mut removed_count = 0usize;
for (abs_path, (file_id, book_id, _)) in existing {
if seen.contains_key(abs_path) {
continue;
}
sqlx::query("DELETE FROM book_files WHERE id = $1")
.bind(file_id)
.execute(&state.pool)
.await?;
sqlx::query(
"DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)",
)
.bind(book_id)
.execute(&state.pool)
.await?;
stats.removed_files += 1;
removed_count += 1;
}
if removed_count > 0 {
info!("[SCAN] Removed {} stale files from database", removed_count);
}
Ok(())
}
/// Save directory modification times to DB for incremental scan optimization.
async fn upsert_directory_mtimes(
state: &AppState,
library_id: Uuid,
new_dir_mtimes: &[(String, DateTime<Utc>)],
) {
if new_dir_mtimes.is_empty() {
return;
}
let dir_paths_db: Vec<String> = new_dir_mtimes
.iter()
.map(|(local, _)| utils::unmap_libraries_path(local))
.collect();
let mtimes: Vec<DateTime<Utc>> = new_dir_mtimes.iter().map(|(_, m)| *m).collect();
let library_ids: Vec<Uuid> = vec![library_id; new_dir_mtimes.len()];
if let Err(e) = sqlx::query(
r#"
INSERT INTO directory_mtimes (library_id, dir_path, mtime)
SELECT * FROM UNNEST($1::uuid[], $2::text[], $3::timestamptz[])
AS t(library_id, dir_path, mtime)
ON CONFLICT (library_id, dir_path) DO UPDATE SET mtime = EXCLUDED.mtime
"#,
)
.bind(&library_ids)
.bind(&dir_paths_db)
.bind(&mtimes)
.execute(&state.pool)
.await
{
warn!("[SCAN] Failed to upsert directory mtimes: {}", e);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn skip_deletions_when_root_not_accessible() {
assert!(should_skip_deletions(false, 10, 10, 5));
}
#[test]
fn skip_deletions_when_no_files_seen_but_existing() {
// Volume probably not mounted — saw 0 files but DB has 50
assert!(should_skip_deletions(true, 0, 50, 50));
}
#[test]
fn skip_deletions_when_all_existing_are_stale() {
// Every DB file is stale — suspicious, skip
assert!(should_skip_deletions(true, 5, 10, 10));
}
#[test]
fn allow_deletions_normal_case() {
// Some stale files but most are still present — normal
assert!(!should_skip_deletions(true, 45, 50, 5));
}
#[test]
fn allow_deletions_no_stale() {
assert!(!should_skip_deletions(true, 50, 50, 0));
}
#[test]
fn allow_deletions_empty_db() {
// No existing files in DB — nothing to delete anyway
assert!(!should_skip_deletions(true, 10, 0, 0));
}
}