use anyhow::Result; use futures::stream::{self, StreamExt}; use image::GenericImageView; use parsers::{analyze_book, BookFormat}; use sqlx::Row; use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::Arc; use tracing::{info, warn}; use uuid::Uuid; use crate::{job::is_job_cancelled, utils, AppState}; #[derive(Clone)] struct ThumbnailConfig { enabled: bool, width: u32, height: u32, quality: u8, directory: String, } async fn load_thumbnail_config(pool: &sqlx::PgPool) -> ThumbnailConfig { let fallback = ThumbnailConfig { enabled: true, width: 300, height: 400, quality: 80, directory: "/data/thumbnails".to_string(), }; let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'thumbnail'"#) .fetch_optional(pool) .await; match row { Ok(Some(row)) => { let value: serde_json::Value = row.get("value"); ThumbnailConfig { enabled: value .get("enabled") .and_then(|v| v.as_bool()) .unwrap_or(fallback.enabled), width: value .get("width") .and_then(|v| v.as_u64()) .map(|v| v as u32) .unwrap_or(fallback.width), height: value .get("height") .and_then(|v| v.as_u64()) .map(|v| v as u32) .unwrap_or(fallback.height), quality: value .get("quality") .and_then(|v| v.as_u64()) .map(|v| v as u8) .unwrap_or(fallback.quality), directory: value .get("directory") .and_then(|v| v.as_str()) .map(|s| s.to_string()) .unwrap_or_else(|| fallback.directory.clone()), } } _ => fallback, } } async fn load_thumbnail_concurrency(pool: &sqlx::PgPool) -> usize { let default_concurrency = 2; let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#) .fetch_optional(pool) .await; match row { Ok(Some(row)) => { let value: serde_json::Value = row.get("value"); value .get("concurrent_renders") .and_then(|v| v.as_u64()) .map(|v| v as usize) .unwrap_or(default_concurrency) } _ => default_concurrency, } } fn generate_thumbnail(image_bytes: &[u8], config: &ThumbnailConfig) -> anyhow::Result> { let img = image::load_from_memory(image_bytes) .map_err(|e| anyhow::anyhow!("failed to load image: {}", e))?; let (orig_w, orig_h) = img.dimensions(); let ratio_w = config.width as f32 / orig_w as f32; let ratio_h = config.height as f32 / orig_h as f32; let ratio = ratio_w.min(ratio_h); let new_w = (orig_w as f32 * ratio) as u32; let new_h = (orig_h as f32 * ratio) as u32; let resized = img.resize(new_w, new_h, image::imageops::FilterType::Triangle); let rgba = resized.to_rgba8(); let (w, h) = rgba.dimensions(); let rgb_data: Vec = rgba.pixels().flat_map(|p| [p[0], p[1], p[2]]).collect(); let quality = config.quality as f32; let webp_data = webp::Encoder::new(&rgb_data, webp::PixelLayout::Rgb, w, h).encode(quality); Ok(webp_data.to_vec()) } /// Save raw image bytes (as extracted from the archive) without any processing. fn save_raw_image(book_id: Uuid, raw_bytes: &[u8], directory: &str) -> anyhow::Result { let dir = Path::new(directory); std::fs::create_dir_all(dir)?; let path = dir.join(format!("{}.raw", book_id)); std::fs::write(&path, raw_bytes)?; Ok(path.to_string_lossy().to_string()) } /// Resize the raw image and save it as a WebP thumbnail, overwriting the raw file. fn resize_raw_to_webp( book_id: Uuid, raw_path: &str, config: &ThumbnailConfig, ) -> anyhow::Result { let raw_bytes = std::fs::read(raw_path) .map_err(|e| anyhow::anyhow!("failed to read raw image {}: {}", raw_path, e))?; let webp_bytes = generate_thumbnail(&raw_bytes, config)?; let webp_path = Path::new(&config.directory).join(format!("{}.webp", book_id)); std::fs::write(&webp_path, &webp_bytes)?; // Delete the raw file now that the WebP is written let _ = std::fs::remove_file(raw_path); Ok(webp_path.to_string_lossy().to_string()) } fn book_format_from_str(s: &str) -> Option { match s { "cbz" => Some(BookFormat::Cbz), "cbr" => Some(BookFormat::Cbr), "pdf" => Some(BookFormat::Pdf), _ => None, } } /// Phase 2 — Two-sub-phase analysis: /// /// **Sub-phase A (extracting_pages)**: open each archive once, extract (page_count, raw_image_bytes), /// save the raw bytes to `{directory}/{book_id}.raw`. I/O bound — runs at `concurrent_renders`. /// /// **Sub-phase B (generating_thumbnails)**: load each `.raw` file, resize and encode as WebP, /// overwrite as `{directory}/{book_id}.webp`. CPU bound — runs at `concurrent_renders`. /// /// `thumbnail_only` = true: only process books missing thumbnail (page_count may already be set). /// `thumbnail_only` = false: process books missing page_count. pub async fn analyze_library_books( state: &AppState, job_id: Uuid, library_id: Option, thumbnail_only: bool, ) -> Result<()> { let config = load_thumbnail_config(&state.pool).await; if !config.enabled { info!("[ANALYZER] Thumbnails disabled, skipping analysis phase"); return Ok(()); } let concurrency = load_thumbnail_concurrency(&state.pool).await; let query_filter = if thumbnail_only { "b.thumbnail_path IS NULL" } else { "b.page_count IS NULL" }; let sql = format!( r#" SELECT b.id AS book_id, bf.abs_path, bf.format FROM books b JOIN book_files bf ON bf.book_id = b.id WHERE (b.library_id = $1 OR $1 IS NULL) AND {} "#, query_filter ); let rows = sqlx::query(&sql) .bind(library_id) .fetch_all(&state.pool) .await?; if rows.is_empty() { info!("[ANALYZER] No books to analyze"); return Ok(()); } let total = rows.len() as i32; info!( "[ANALYZER] Analyzing {} books (thumbnail_only={}, concurrency={})", total, thumbnail_only, concurrency ); let cancelled_flag = Arc::new(AtomicBool::new(false)); let cancel_pool = state.pool.clone(); let cancel_flag_for_poller = cancelled_flag.clone(); let cancel_handle = tokio::spawn(async move { loop { tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; match is_job_cancelled(&cancel_pool, job_id).await { Ok(true) => { cancel_flag_for_poller.store(true, Ordering::Relaxed); break; } Ok(false) => {} Err(_) => break, } } }); struct BookTask { book_id: Uuid, abs_path: String, format: String, } let tasks: Vec = rows .into_iter() .map(|row| BookTask { book_id: row.get("book_id"), abs_path: row.get("abs_path"), format: row.get("format"), }) .collect(); // ------------------------------------------------------------------------- // Sub-phase A: extract first page from each archive and store raw image // I/O bound — limited by HDD throughput, runs at `concurrency` // ------------------------------------------------------------------------- let phase_a_start = std::time::Instant::now(); let _ = sqlx::query( "UPDATE index_jobs SET status = 'extracting_pages', total_files = $2, processed_files = 0, current_file = NULL WHERE id = $1", ) .bind(job_id) .bind(total) .execute(&state.pool) .await; let extracted_count = Arc::new(AtomicI32::new(0)); // Collected results: (book_id, raw_path, page_count) let extracted: Vec<(Uuid, String, i32)> = stream::iter(tasks) .map(|task| { let pool = state.pool.clone(); let config = config.clone(); let cancelled = cancelled_flag.clone(); let extracted_count = extracted_count.clone(); async move { if cancelled.load(Ordering::Relaxed) { return None; } let local_path = utils::remap_libraries_path(&task.abs_path); let path = std::path::Path::new(&local_path); let book_id = task.book_id; let format = match book_format_from_str(&task.format) { Some(f) => f, None => { warn!("[ANALYZER] Unknown format '{}' for book {}", task.format, book_id); return None; } }; let pdf_scale = config.width.max(config.height); let path_owned = path.to_path_buf(); let analyze_result = tokio::task::spawn_blocking(move || analyze_book(&path_owned, format, pdf_scale)) .await; let (page_count, raw_bytes) = match analyze_result { Ok(Ok(result)) => result, Ok(Err(e)) => { warn!("[ANALYZER] analyze_book failed for book {}: {}", book_id, e); let _ = sqlx::query( "UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1", ) .bind(book_id) .bind(e.to_string()) .execute(&pool) .await; return None; } Err(e) => { warn!("[ANALYZER] spawn_blocking error for book {}: {}", book_id, e); return None; } }; // Save raw bytes to disk (no resize, no encode) let raw_path = match tokio::task::spawn_blocking({ let dir = config.directory.clone(); let bytes = raw_bytes.clone(); move || save_raw_image(book_id, &bytes, &dir) }) .await { Ok(Ok(p)) => p, Ok(Err(e)) => { warn!("[ANALYZER] save_raw_image failed for book {}: {}", book_id, e); return None; } Err(e) => { warn!("[ANALYZER] spawn_blocking save_raw error for book {}: {}", book_id, e); return None; } }; // Update page_count in DB if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2") .bind(page_count) .bind(book_id) .execute(&pool) .await { warn!("[ANALYZER] DB page_count update failed for book {}: {}", book_id, e); return None; } let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1; let percent = (processed as f64 / total as f64 * 50.0) as i32; // first 50% let _ = sqlx::query( "UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1", ) .bind(job_id) .bind(processed) .bind(percent) .execute(&pool) .await; Some((book_id, raw_path, page_count)) } }) .buffer_unordered(concurrency) .filter_map(|x| async move { x }) .collect() .await; if cancelled_flag.load(Ordering::Relaxed) { cancel_handle.abort(); info!("[ANALYZER] Job {} cancelled during extraction phase", job_id); return Err(anyhow::anyhow!("Job cancelled by user")); } let extracted_total = extracted.len() as i32; let phase_a_elapsed = phase_a_start.elapsed(); info!( "[ANALYZER] Sub-phase A complete: {}/{} books extracted in {:.1}s ({:.0} ms/book)", extracted_total, total, phase_a_elapsed.as_secs_f64(), if extracted_total > 0 { phase_a_elapsed.as_millis() as f64 / extracted_total as f64 } else { 0.0 } ); // ------------------------------------------------------------------------- // Sub-phase B: resize raw images and encode as WebP // CPU bound — can run at higher concurrency than I/O phase // ------------------------------------------------------------------------- let phase_b_start = std::time::Instant::now(); let _ = sqlx::query( "UPDATE index_jobs SET status = 'generating_thumbnails', generating_thumbnails_started_at = NOW(), total_files = $2, processed_files = 0, current_file = NULL WHERE id = $1", ) .bind(job_id) .bind(extracted_total) .execute(&state.pool) .await; let resize_count = Arc::new(AtomicI32::new(0)); stream::iter(extracted) .for_each_concurrent(concurrency, |(book_id, raw_path, page_count)| { let pool = state.pool.clone(); let config = config.clone(); let cancelled = cancelled_flag.clone(); let resize_count = resize_count.clone(); async move { if cancelled.load(Ordering::Relaxed) { return; } let raw_path_clone = raw_path.clone(); let thumb_result = tokio::task::spawn_blocking(move || { resize_raw_to_webp(book_id, &raw_path_clone, &config) }) .await; let thumb_path = match thumb_result { Ok(Ok(p)) => p, Ok(Err(e)) => { warn!("[ANALYZER] resize_raw_to_webp failed for book {}: {}", book_id, e); // page_count is already set; thumbnail stays NULL return; } Err(e) => { warn!("[ANALYZER] spawn_blocking resize error for book {}: {}", book_id, e); return; } }; if let Err(e) = sqlx::query( "UPDATE books SET page_count = $1, thumbnail_path = $2 WHERE id = $3", ) .bind(page_count) .bind(&thumb_path) .bind(book_id) .execute(&pool) .await { warn!("[ANALYZER] DB thumbnail update failed for book {}: {}", book_id, e); return; } let processed = resize_count.fetch_add(1, Ordering::Relaxed) + 1; let percent = 50 + (processed as f64 / extracted_total as f64 * 50.0) as i32; // last 50% let _ = sqlx::query( "UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1", ) .bind(job_id) .bind(processed) .bind(percent) .execute(&pool) .await; } }) .await; cancel_handle.abort(); if cancelled_flag.load(Ordering::Relaxed) { info!("[ANALYZER] Job {} cancelled during resize phase", job_id); return Err(anyhow::anyhow!("Job cancelled by user")); } let final_count = resize_count.load(Ordering::Relaxed); let phase_b_elapsed = phase_b_start.elapsed(); info!( "[ANALYZER] Sub-phase B complete: {}/{} thumbnails generated in {:.1}s ({:.0} ms/book)", final_count, extracted_total, phase_b_elapsed.as_secs_f64(), if final_count > 0 { phase_b_elapsed.as_millis() as f64 / final_count as f64 } else { 0.0 } ); info!( "[ANALYZER] Total: {:.1}s (extraction {:.1}s + resize {:.1}s)", (phase_a_elapsed + phase_b_elapsed).as_secs_f64(), phase_a_elapsed.as_secs_f64(), phase_b_elapsed.as_secs_f64(), ); Ok(()) } /// Clear thumbnail files and DB references for books in scope, then re-analyze. pub async fn regenerate_thumbnails( state: &AppState, job_id: Uuid, library_id: Option, ) -> Result<()> { let config = load_thumbnail_config(&state.pool).await; let book_ids_to_clear: Vec = sqlx::query_scalar( r#"SELECT id FROM books WHERE (library_id = $1 OR $1 IS NULL) AND thumbnail_path IS NOT NULL"#, ) .bind(library_id) .fetch_all(&state.pool) .await .unwrap_or_default(); let mut deleted_count = 0usize; for book_id in &book_ids_to_clear { // Delete WebP thumbnail let webp_path = Path::new(&config.directory).join(format!("{}.webp", book_id)); if webp_path.exists() { if let Err(e) = std::fs::remove_file(&webp_path) { warn!("[ANALYZER] Failed to delete thumbnail {}: {}", webp_path.display(), e); } else { deleted_count += 1; } } // Delete raw file if it exists (interrupted previous run) let raw_path = Path::new(&config.directory).join(format!("{}.raw", book_id)); let _ = std::fs::remove_file(&raw_path); } info!("[ANALYZER] Deleted {} thumbnail files for regeneration", deleted_count); sqlx::query(r#"UPDATE books SET thumbnail_path = NULL WHERE (library_id = $1 OR $1 IS NULL)"#) .bind(library_id) .execute(&state.pool) .await?; analyze_library_books(state, job_id, library_id, true).await } /// Delete orphaned thumbnail files (books deleted in full_rebuild get new UUIDs). pub async fn cleanup_orphaned_thumbnails(state: &AppState) -> Result<()> { let config = load_thumbnail_config(&state.pool).await; let existing_book_ids: std::collections::HashSet = sqlx::query_scalar(r#"SELECT id FROM books"#) .fetch_all(&state.pool) .await .unwrap_or_default() .into_iter() .collect(); let thumbnail_dir = Path::new(&config.directory); if !thumbnail_dir.exists() { return Ok(()); } let mut deleted_count = 0usize; if let Ok(entries) = std::fs::read_dir(thumbnail_dir) { for entry in entries.flatten() { let file_name = entry.file_name(); let file_name = file_name.to_string_lossy(); // Clean up both .webp and orphaned .raw files let stem = if let Some(s) = file_name.strip_suffix(".webp") { Some(s.to_string()) } else if let Some(s) = file_name.strip_suffix(".raw") { Some(s.to_string()) } else { None }; if let Some(book_id_str) = stem { if let Ok(book_id) = Uuid::parse_str(&book_id_str) { if !existing_book_ids.contains(&book_id) { if let Err(e) = std::fs::remove_file(entry.path()) { warn!("Failed to delete orphaned file {}: {}", entry.path().display(), e); } else { deleted_count += 1; } } } } } } info!("[ANALYZER] Deleted {} orphaned thumbnail files", deleted_count); Ok(()) }