fix(indexer): détecter l'annulation de job pendant la phase 2 (analyzer)
L'analyzer ne vérifiait jamais le statut cancelled en DB, ce qui faisait continuer le traitement des thumbnails jusqu'au bout, puis écraser le statut 'cancelled' avec 'success'. Ajout d'un poller background toutes les 2s avec AtomicBool partagé pour stopper proprement le stream concurrent. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -4,12 +4,12 @@ use image::GenericImageView;
|
|||||||
use parsers::{analyze_book, BookFormat};
|
use parsers::{analyze_book, BookFormat};
|
||||||
use sqlx::Row;
|
use sqlx::Row;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::atomic::{AtomicI32, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{utils, AppState};
|
use crate::{job::is_job_cancelled, utils, AppState};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct ThumbnailConfig {
|
struct ThumbnailConfig {
|
||||||
@@ -187,6 +187,24 @@ pub async fn analyze_library_books(
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
let processed_count = Arc::new(AtomicI32::new(0));
|
let processed_count = Arc::new(AtomicI32::new(0));
|
||||||
|
let cancelled_flag = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
|
// Background task: poll DB every 2s to detect cancellation
|
||||||
|
let cancel_pool = state.pool.clone();
|
||||||
|
let cancel_flag_for_poller = cancelled_flag.clone();
|
||||||
|
let cancel_handle = tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
|
||||||
|
match is_job_cancelled(&cancel_pool, job_id).await {
|
||||||
|
Ok(true) => {
|
||||||
|
cancel_flag_for_poller.store(true, Ordering::Relaxed);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(false) => {}
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
struct BookTask {
|
struct BookTask {
|
||||||
book_id: Uuid,
|
book_id: Uuid,
|
||||||
@@ -208,8 +226,13 @@ pub async fn analyze_library_books(
|
|||||||
let processed_count = processed_count.clone();
|
let processed_count = processed_count.clone();
|
||||||
let pool = state.pool.clone();
|
let pool = state.pool.clone();
|
||||||
let config = config.clone();
|
let config = config.clone();
|
||||||
|
let cancelled = cancelled_flag.clone();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
|
if cancelled.load(Ordering::Relaxed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let local_path = utils::remap_libraries_path(&task.abs_path);
|
let local_path = utils::remap_libraries_path(&task.abs_path);
|
||||||
let path = Path::new(&local_path);
|
let path = Path::new(&local_path);
|
||||||
|
|
||||||
@@ -328,6 +351,13 @@ pub async fn analyze_library_books(
|
|||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
cancel_handle.abort();
|
||||||
|
|
||||||
|
if cancelled_flag.load(Ordering::Relaxed) {
|
||||||
|
info!("[ANALYZER] Job {} cancelled by user, stopping analysis", job_id);
|
||||||
|
return Err(anyhow::anyhow!("Job cancelled by user"));
|
||||||
|
}
|
||||||
|
|
||||||
let final_count = processed_count.load(Ordering::Relaxed);
|
let final_count = processed_count.load(Ordering::Relaxed);
|
||||||
info!(
|
info!(
|
||||||
"[ANALYZER] Analysis complete: {}/{} books processed",
|
"[ANALYZER] Analysis complete: {}/{} books processed",
|
||||||
|
|||||||
Reference in New Issue
Block a user