diff --git a/apps/indexer/src/analyzer.rs b/apps/indexer/src/analyzer.rs index dba467d..ecbfc32 100644 --- a/apps/indexer/src/analyzer.rs +++ b/apps/indexer/src/analyzer.rs @@ -4,12 +4,12 @@ use image::GenericImageView; use parsers::{analyze_book, BookFormat}; use sqlx::Row; use std::path::Path; -use std::sync::atomic::{AtomicI32, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::Arc; use tracing::{info, warn}; use uuid::Uuid; -use crate::{utils, AppState}; +use crate::{job::is_job_cancelled, utils, AppState}; #[derive(Clone)] struct ThumbnailConfig { @@ -187,6 +187,24 @@ pub async fn analyze_library_books( .await; let processed_count = Arc::new(AtomicI32::new(0)); + let cancelled_flag = Arc::new(AtomicBool::new(false)); + + // Background task: poll DB every 2s to detect cancellation + let cancel_pool = state.pool.clone(); + let cancel_flag_for_poller = cancelled_flag.clone(); + let cancel_handle = tokio::spawn(async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + match is_job_cancelled(&cancel_pool, job_id).await { + Ok(true) => { + cancel_flag_for_poller.store(true, Ordering::Relaxed); + break; + } + Ok(false) => {} + Err(_) => break, + } + } + }); struct BookTask { book_id: Uuid, @@ -208,8 +226,13 @@ pub async fn analyze_library_books( let processed_count = processed_count.clone(); let pool = state.pool.clone(); let config = config.clone(); + let cancelled = cancelled_flag.clone(); async move { + if cancelled.load(Ordering::Relaxed) { + return; + } + let local_path = utils::remap_libraries_path(&task.abs_path); let path = Path::new(&local_path); @@ -328,6 +351,13 @@ pub async fn analyze_library_books( }) .await; + cancel_handle.abort(); + + if cancelled_flag.load(Ordering::Relaxed) { + info!("[ANALYZER] Job {} cancelled by user, stopping analysis", job_id); + return Err(anyhow::anyhow!("Job cancelled by user")); + } + let final_count = processed_count.load(Ordering::Relaxed); info!( "[ANALYZER] Analysis complete: {}/{} books processed",