Files
stripstream-librarian/apps/indexer/src/analyzer.rs
Froidefond Julien db11c62d2f fix(analyzer): timeout sur analyze_book pour éviter les blocages indefinis
Un fichier corrompu (RAR/ZIP/PDF qui ne répond plus) occupait un slot
de concurrence indéfiniment, bloquant le pipeline à ex. 1517/1521.

- Ajoute tokio::time::timeout autour de spawn_blocking(analyze_book)
- Timeout lu depuis limits.timeout_seconds en DB (défaut 120s)
- Le livre est marqué parse_status='error' en cas de timeout

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 22:44:48 +01:00

627 lines
23 KiB
Rust

use anyhow::Result;
use futures::stream::{self, StreamExt};
use image::GenericImageView;
use parsers::{analyze_book, BookFormat};
use sqlx::Row;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use std::sync::Arc;
use tracing::{info, warn};
use uuid::Uuid;
use crate::{job::is_job_cancelled, utils, AppState};
#[derive(Clone)]
struct ThumbnailConfig {
enabled: bool,
width: u32,
height: u32,
quality: u8,
directory: String,
timeout_secs: u64,
}
async fn load_thumbnail_config(pool: &sqlx::PgPool) -> ThumbnailConfig {
let fallback = ThumbnailConfig {
enabled: true,
width: 300,
height: 400,
quality: 80,
directory: "/data/thumbnails".to_string(),
timeout_secs: 120,
};
let thumb_row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'thumbnail'"#)
.fetch_optional(pool)
.await;
let limits_row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#)
.fetch_optional(pool)
.await;
let timeout_secs = limits_row
.ok()
.flatten()
.and_then(|r| r.get::<serde_json::Value, _>("value").get("timeout_seconds").and_then(|v| v.as_u64()))
.unwrap_or(fallback.timeout_secs);
match thumb_row {
Ok(Some(row)) => {
let value: serde_json::Value = row.get("value");
ThumbnailConfig {
enabled: value
.get("enabled")
.and_then(|v| v.as_bool())
.unwrap_or(fallback.enabled),
width: value
.get("width")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(fallback.width),
height: value
.get("height")
.and_then(|v| v.as_u64())
.map(|v| v as u32)
.unwrap_or(fallback.height),
quality: value
.get("quality")
.and_then(|v| v.as_u64())
.map(|v| v as u8)
.unwrap_or(fallback.quality),
directory: value
.get("directory")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| fallback.directory.clone()),
timeout_secs,
}
}
_ => ThumbnailConfig { timeout_secs, ..fallback },
}
}
async fn load_thumbnail_concurrency(pool: &sqlx::PgPool) -> usize {
// Default: half the logical CPUs, clamped between 2 and 8.
// Archive extraction is I/O bound but benefits from moderate parallelism.
let cpus = num_cpus::get();
let default_concurrency = (cpus / 2).clamp(2, 8);
let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#)
.fetch_optional(pool)
.await;
match row {
Ok(Some(row)) => {
let value: serde_json::Value = row.get("value");
value
.get("concurrent_renders")
.and_then(|v| v.as_u64())
.map(|v| v as usize)
.unwrap_or(default_concurrency)
}
_ => default_concurrency,
}
}
fn generate_thumbnail(image_bytes: &[u8], config: &ThumbnailConfig) -> anyhow::Result<Vec<u8>> {
let img = image::load_from_memory(image_bytes)
.map_err(|e| anyhow::anyhow!("failed to load image: {}", e))?;
let (orig_w, orig_h) = img.dimensions();
let ratio_w = config.width as f32 / orig_w as f32;
let ratio_h = config.height as f32 / orig_h as f32;
let ratio = ratio_w.min(ratio_h);
let new_w = (orig_w as f32 * ratio) as u32;
let new_h = (orig_h as f32 * ratio) as u32;
let resized = img.resize(new_w, new_h, image::imageops::FilterType::Triangle);
let rgba = resized.to_rgba8();
let (w, h) = rgba.dimensions();
let rgb_data: Vec<u8> = rgba.pixels().flat_map(|p| [p[0], p[1], p[2]]).collect();
let quality = config.quality as f32;
let webp_data = webp::Encoder::new(&rgb_data, webp::PixelLayout::Rgb, w, h).encode(quality);
Ok(webp_data.to_vec())
}
/// Save raw image bytes (as extracted from the archive) without any processing.
fn save_raw_image(book_id: Uuid, raw_bytes: &[u8], directory: &str) -> anyhow::Result<String> {
let dir = Path::new(directory);
std::fs::create_dir_all(dir)?;
let path = dir.join(format!("{}.raw", book_id));
std::fs::write(&path, raw_bytes)?;
Ok(path.to_string_lossy().to_string())
}
/// Resize the raw image and save it as a WebP thumbnail, overwriting the raw file.
fn resize_raw_to_webp(
book_id: Uuid,
raw_path: &str,
config: &ThumbnailConfig,
) -> anyhow::Result<String> {
let raw_bytes = std::fs::read(raw_path)
.map_err(|e| anyhow::anyhow!("failed to read raw image {}: {}", raw_path, e))?;
let webp_bytes = generate_thumbnail(&raw_bytes, config)?;
let webp_path = Path::new(&config.directory).join(format!("{}.webp", book_id));
std::fs::write(&webp_path, &webp_bytes)?;
// Delete the raw file now that the WebP is written
let _ = std::fs::remove_file(raw_path);
Ok(webp_path.to_string_lossy().to_string())
}
fn book_format_from_str(s: &str) -> Option<BookFormat> {
match s {
"cbz" => Some(BookFormat::Cbz),
"cbr" => Some(BookFormat::Cbr),
"pdf" => Some(BookFormat::Pdf),
_ => None,
}
}
/// Phase 2 — Two-sub-phase analysis:
///
/// **Sub-phase A (extracting_pages)**: open each archive once, extract (page_count, raw_image_bytes),
/// save the raw bytes to `{directory}/{book_id}.raw`. I/O bound — runs at `concurrent_renders`.
///
/// **Sub-phase B (generating_thumbnails)**: load each `.raw` file, resize and encode as WebP,
/// overwrite as `{directory}/{book_id}.webp`. CPU bound — runs at `concurrent_renders`.
///
/// `thumbnail_only` = true: only process books missing thumbnail (page_count may already be set).
/// `thumbnail_only` = false: process books missing page_count.
pub async fn analyze_library_books(
state: &AppState,
job_id: Uuid,
library_id: Option<Uuid>,
thumbnail_only: bool,
) -> Result<()> {
let config = load_thumbnail_config(&state.pool).await;
if !config.enabled {
info!("[ANALYZER] Thumbnails disabled, skipping analysis phase");
return Ok(());
}
let concurrency = load_thumbnail_concurrency(&state.pool).await;
let query_filter = if thumbnail_only {
"b.thumbnail_path IS NULL"
} else {
"b.page_count IS NULL"
};
let sql = format!(
r#"
SELECT b.id AS book_id, bf.abs_path, bf.format, (b.thumbnail_path IS NULL) AS needs_thumbnail
FROM books b
JOIN book_files bf ON bf.book_id = b.id
WHERE (b.library_id = $1 OR $1 IS NULL)
AND {}
"#,
query_filter
);
let rows = sqlx::query(&sql)
.bind(library_id)
.fetch_all(&state.pool)
.await?;
if rows.is_empty() {
info!("[ANALYZER] No books to analyze");
return Ok(());
}
let total = rows.len() as i32;
info!(
"[ANALYZER] Analyzing {} books (thumbnail_only={}, concurrency={})",
total, thumbnail_only, concurrency
);
let cancelled_flag = Arc::new(AtomicBool::new(false));
let cancel_pool = state.pool.clone();
let cancel_flag_for_poller = cancelled_flag.clone();
let cancel_handle = tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
match is_job_cancelled(&cancel_pool, job_id).await {
Ok(true) => {
cancel_flag_for_poller.store(true, Ordering::Relaxed);
break;
}
Ok(false) => {}
Err(_) => break,
}
}
});
struct BookTask {
book_id: Uuid,
abs_path: String,
format: String,
needs_thumbnail: bool,
}
let tasks: Vec<BookTask> = rows
.into_iter()
.map(|row| BookTask {
book_id: row.get("book_id"),
abs_path: row.get("abs_path"),
format: row.get("format"),
needs_thumbnail: row.get("needs_thumbnail"),
})
.collect();
// -------------------------------------------------------------------------
// Sub-phase A: extract first page from each archive and store raw image
// I/O bound — limited by HDD throughput, runs at `concurrency`
// -------------------------------------------------------------------------
let phase_a_start = std::time::Instant::now();
let _ = sqlx::query(
"UPDATE index_jobs SET status = 'extracting_pages', total_files = $2, processed_files = 0, current_file = NULL WHERE id = $1",
)
.bind(job_id)
.bind(total)
.execute(&state.pool)
.await;
let extracted_count = Arc::new(AtomicI32::new(0));
// Collected results: (book_id, raw_path, page_count) — only books that need thumbnail generation
let extracted: Vec<(Uuid, String, i32)> = stream::iter(tasks)
.map(|task| {
let pool = state.pool.clone();
let config = config.clone();
let cancelled = cancelled_flag.clone();
let extracted_count = extracted_count.clone();
async move {
if cancelled.load(Ordering::Relaxed) {
return None;
}
let local_path = utils::remap_libraries_path(&task.abs_path);
let path = std::path::Path::new(&local_path);
let book_id = task.book_id;
let needs_thumbnail = task.needs_thumbnail;
// Remove macOS Apple Double resource fork files (._*) that were indexed before the scanner filter was added
if path
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("._"))
.unwrap_or(false)
{
warn!("[ANALYZER] Removing macOS resource fork from DB: {}", local_path);
let _ = sqlx::query("DELETE FROM book_files WHERE book_id = $1")
.bind(book_id)
.execute(&pool)
.await;
let _ = sqlx::query(
"DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)",
)
.bind(book_id)
.execute(&pool)
.await;
return None;
}
let format = match book_format_from_str(&task.format) {
Some(f) => f,
None => {
warn!("[ANALYZER] Unknown format '{}' for book {}", task.format, book_id);
return None;
}
};
let pdf_scale = config.width.max(config.height);
let path_owned = path.to_path_buf();
let timeout_secs = config.timeout_secs;
let analyze_result = tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
tokio::task::spawn_blocking(move || analyze_book(&path_owned, format, pdf_scale)),
)
.await;
let (page_count, raw_bytes) = match analyze_result {
Ok(Ok(Ok(result))) => result,
Ok(Ok(Err(e))) => {
warn!("[ANALYZER] analyze_book failed for book {}: {}", book_id, e);
let _ = sqlx::query(
"UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1",
)
.bind(book_id)
.bind(e.to_string())
.execute(&pool)
.await;
return None;
}
Ok(Err(e)) => {
warn!("[ANALYZER] spawn_blocking error for book {}: {}", book_id, e);
return None;
}
Err(_) => {
warn!("[ANALYZER] analyze_book timed out after {}s for book {}: {}", timeout_secs, book_id, local_path);
let _ = sqlx::query(
"UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1",
)
.bind(book_id)
.bind(format!("analyze_book timed out after {}s", timeout_secs))
.execute(&pool)
.await;
return None;
}
};
// If thumbnail already exists, just update page_count and skip thumbnail generation
if !needs_thumbnail {
if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2")
.bind(page_count)
.bind(book_id)
.execute(&pool)
.await
{
warn!("[ANALYZER] DB page_count update failed for book {}: {}", book_id, e);
}
let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1;
let percent = (processed as f64 / total as f64 * 50.0) as i32;
let _ = sqlx::query(
"UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1",
)
.bind(job_id)
.bind(processed)
.bind(percent)
.execute(&pool)
.await;
return None; // don't enqueue for thumbnail sub-phase
}
// Save raw bytes to disk (no resize, no encode)
let raw_path = match tokio::task::spawn_blocking({
let dir = config.directory.clone();
let bytes = raw_bytes.clone();
move || save_raw_image(book_id, &bytes, &dir)
})
.await
{
Ok(Ok(p)) => p,
Ok(Err(e)) => {
warn!("[ANALYZER] save_raw_image failed for book {}: {}", book_id, e);
return None;
}
Err(e) => {
warn!("[ANALYZER] spawn_blocking save_raw error for book {}: {}", book_id, e);
return None;
}
};
// Update page_count in DB
if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2")
.bind(page_count)
.bind(book_id)
.execute(&pool)
.await
{
warn!("[ANALYZER] DB page_count update failed for book {}: {}", book_id, e);
return None;
}
let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1;
let percent = (processed as f64 / total as f64 * 50.0) as i32; // first 50%
let _ = sqlx::query(
"UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1",
)
.bind(job_id)
.bind(processed)
.bind(percent)
.execute(&pool)
.await;
Some((book_id, raw_path, page_count))
}
})
.buffer_unordered(concurrency)
.filter_map(|x| async move { x })
.collect()
.await;
if cancelled_flag.load(Ordering::Relaxed) {
cancel_handle.abort();
info!("[ANALYZER] Job {} cancelled during extraction phase", job_id);
return Err(anyhow::anyhow!("Job cancelled by user"));
}
let extracted_total = extracted.len() as i32;
let phase_a_elapsed = phase_a_start.elapsed();
info!(
"[ANALYZER] Sub-phase A complete: {}/{} books extracted in {:.1}s ({:.0} ms/book)",
extracted_total,
total,
phase_a_elapsed.as_secs_f64(),
if extracted_total > 0 { phase_a_elapsed.as_millis() as f64 / extracted_total as f64 } else { 0.0 }
);
// -------------------------------------------------------------------------
// Sub-phase B: resize raw images and encode as WebP
// CPU bound — can run at higher concurrency than I/O phase
// -------------------------------------------------------------------------
let phase_b_start = std::time::Instant::now();
let _ = sqlx::query(
"UPDATE index_jobs SET status = 'generating_thumbnails', generating_thumbnails_started_at = NOW(), total_files = $2, processed_files = 0, current_file = NULL WHERE id = $1",
)
.bind(job_id)
.bind(extracted_total)
.execute(&state.pool)
.await;
let resize_count = Arc::new(AtomicI32::new(0));
stream::iter(extracted)
.for_each_concurrent(concurrency, |(book_id, raw_path, page_count)| {
let pool = state.pool.clone();
let config = config.clone();
let cancelled = cancelled_flag.clone();
let resize_count = resize_count.clone();
async move {
if cancelled.load(Ordering::Relaxed) {
return;
}
let raw_path_clone = raw_path.clone();
let thumb_result = tokio::task::spawn_blocking(move || {
resize_raw_to_webp(book_id, &raw_path_clone, &config)
})
.await;
let thumb_path = match thumb_result {
Ok(Ok(p)) => p,
Ok(Err(e)) => {
warn!("[ANALYZER] resize_raw_to_webp failed for book {}: {}", book_id, e);
// page_count is already set; thumbnail stays NULL
return;
}
Err(e) => {
warn!("[ANALYZER] spawn_blocking resize error for book {}: {}", book_id, e);
return;
}
};
if let Err(e) = sqlx::query(
"UPDATE books SET page_count = $1, thumbnail_path = $2 WHERE id = $3",
)
.bind(page_count)
.bind(&thumb_path)
.bind(book_id)
.execute(&pool)
.await
{
warn!("[ANALYZER] DB thumbnail update failed for book {}: {}", book_id, e);
return;
}
let processed = resize_count.fetch_add(1, Ordering::Relaxed) + 1;
let percent =
50 + (processed as f64 / extracted_total as f64 * 50.0) as i32; // last 50%
let _ = sqlx::query(
"UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1",
)
.bind(job_id)
.bind(processed)
.bind(percent)
.execute(&pool)
.await;
}
})
.await;
cancel_handle.abort();
if cancelled_flag.load(Ordering::Relaxed) {
info!("[ANALYZER] Job {} cancelled during resize phase", job_id);
return Err(anyhow::anyhow!("Job cancelled by user"));
}
let final_count = resize_count.load(Ordering::Relaxed);
let phase_b_elapsed = phase_b_start.elapsed();
info!(
"[ANALYZER] Sub-phase B complete: {}/{} thumbnails generated in {:.1}s ({:.0} ms/book)",
final_count,
extracted_total,
phase_b_elapsed.as_secs_f64(),
if final_count > 0 { phase_b_elapsed.as_millis() as f64 / final_count as f64 } else { 0.0 }
);
info!(
"[ANALYZER] Total: {:.1}s (extraction {:.1}s + resize {:.1}s)",
(phase_a_elapsed + phase_b_elapsed).as_secs_f64(),
phase_a_elapsed.as_secs_f64(),
phase_b_elapsed.as_secs_f64(),
);
Ok(())
}
/// Clear thumbnail files and DB references for books in scope, then re-analyze.
pub async fn regenerate_thumbnails(
state: &AppState,
job_id: Uuid,
library_id: Option<Uuid>,
) -> Result<()> {
let config = load_thumbnail_config(&state.pool).await;
let book_ids_to_clear: Vec<Uuid> = sqlx::query_scalar(
r#"SELECT id FROM books WHERE (library_id = $1 OR $1 IS NULL) AND thumbnail_path IS NOT NULL"#,
)
.bind(library_id)
.fetch_all(&state.pool)
.await
.unwrap_or_default();
let mut deleted_count = 0usize;
for book_id in &book_ids_to_clear {
// Delete WebP thumbnail
let webp_path = Path::new(&config.directory).join(format!("{}.webp", book_id));
if webp_path.exists() {
if let Err(e) = std::fs::remove_file(&webp_path) {
warn!("[ANALYZER] Failed to delete thumbnail {}: {}", webp_path.display(), e);
} else {
deleted_count += 1;
}
}
// Delete raw file if it exists (interrupted previous run)
let raw_path = Path::new(&config.directory).join(format!("{}.raw", book_id));
let _ = std::fs::remove_file(&raw_path);
}
info!("[ANALYZER] Deleted {} thumbnail files for regeneration", deleted_count);
sqlx::query(r#"UPDATE books SET thumbnail_path = NULL WHERE (library_id = $1 OR $1 IS NULL)"#)
.bind(library_id)
.execute(&state.pool)
.await?;
analyze_library_books(state, job_id, library_id, true).await
}
/// Delete orphaned thumbnail files (books deleted in full_rebuild get new UUIDs).
pub async fn cleanup_orphaned_thumbnails(state: &AppState) -> Result<()> {
let config = load_thumbnail_config(&state.pool).await;
let existing_book_ids: std::collections::HashSet<Uuid> =
sqlx::query_scalar(r#"SELECT id FROM books"#)
.fetch_all(&state.pool)
.await
.unwrap_or_default()
.into_iter()
.collect();
let thumbnail_dir = Path::new(&config.directory);
if !thumbnail_dir.exists() {
return Ok(());
}
let mut deleted_count = 0usize;
if let Ok(entries) = std::fs::read_dir(thumbnail_dir) {
for entry in entries.flatten() {
let file_name = entry.file_name();
let file_name = file_name.to_string_lossy();
// Clean up both .webp and orphaned .raw files
let stem = if let Some(s) = file_name.strip_suffix(".webp") {
Some(s.to_string())
} else if let Some(s) = file_name.strip_suffix(".raw") {
Some(s.to_string())
} else {
None
};
if let Some(book_id_str) = stem {
if let Ok(book_id) = Uuid::parse_str(&book_id_str) {
if !existing_book_ids.contains(&book_id) {
if let Err(e) = std::fs::remove_file(entry.path()) {
warn!("Failed to delete orphaned file {}: {}", entry.path().display(), e);
} else {
deleted_count += 1;
}
}
}
}
}
}
info!("[ANALYZER] Deleted {} orphaned thumbnail files", deleted_count);
Ok(())
}