diff --git a/apps/indexer/src/analyzer.rs b/apps/indexer/src/analyzer.rs index 9f7dc5d..af4bdaf 100644 --- a/apps/indexer/src/analyzer.rs +++ b/apps/indexer/src/analyzer.rs @@ -89,7 +89,7 @@ async fn load_thumbnail_concurrency(pool: &sqlx::PgPool) -> usize { // Default: half the logical CPUs, clamped between 2 and 8. // Archive extraction is I/O bound but benefits from moderate parallelism. let cpus = num_cpus::get(); - let default_concurrency = (cpus / 2).clamp(2, 8); + let default_concurrency = (cpus / 2).clamp(1, 2); let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#) .fetch_optional(pool) .await; @@ -369,6 +369,7 @@ pub async fn analyze_library_books( } }); + #[derive(Clone)] struct BookTask { book_id: Uuid, abs_path: String, @@ -388,8 +389,11 @@ pub async fn analyze_library_books( // ------------------------------------------------------------------------- // Sub-phase A: extract first page from each archive and store raw image - // I/O bound — limited by HDD throughput, runs at `concurrency` + // Processed in batches of 500 to limit memory — raw_bytes are freed between batches. + // The collected results (Uuid, String, i32) are lightweight (~100 bytes each). // ------------------------------------------------------------------------- + const BATCH_SIZE: usize = 200; + let phase_a_start = std::time::Instant::now(); let _ = sqlx::query( "UPDATE index_jobs SET status = 'extracting_pages', total_files = $2, processed_files = 0, current_file = NULL WHERE id = $1", @@ -400,121 +404,189 @@ pub async fn analyze_library_books( .await; let extracted_count = Arc::new(AtomicI32::new(0)); + let mut all_extracted: Vec<(Uuid, String, i32)> = Vec::new(); - // Collected results: (book_id, raw_path, page_count) — only books that need thumbnail generation - let extracted: Vec<(Uuid, String, i32)> = stream::iter(tasks) - .map(|task| { - let pool = state.pool.clone(); - let config = config.clone(); - let cancelled = cancelled_flag.clone(); - let extracted_count = extracted_count.clone(); + let num_batches = (tasks.len() + BATCH_SIZE - 1) / BATCH_SIZE; + let task_chunks: Vec> = tasks + .into_iter() + .collect::>() + .chunks(BATCH_SIZE) + .map(|c| c.to_vec()) + .collect(); - async move { - if cancelled.load(Ordering::Relaxed) { - return None; - } + for (batch_idx, batch_tasks) in task_chunks.into_iter().enumerate() { + if cancelled_flag.load(Ordering::Relaxed) { + break; + } - let local_path = utils::remap_libraries_path(&task.abs_path); - let path = std::path::Path::new(&local_path); - let book_id = task.book_id; - let needs_thumbnail = task.needs_thumbnail; + info!( + "[ANALYZER] Extraction batch {}/{} — {} books", + batch_idx + 1, num_batches, batch_tasks.len() + ); - // Remove macOS Apple Double resource fork files (._*) that were indexed before the scanner filter was added - if path - .file_name() - .and_then(|n| n.to_str()) - .map(|n| n.starts_with("._")) - .unwrap_or(false) - { - warn!("[ANALYZER] Removing macOS resource fork from DB: {}", local_path); - let _ = sqlx::query("DELETE FROM book_files WHERE book_id = $1") + let batch_extracted: Vec<(Uuid, String, i32)> = stream::iter(batch_tasks) + .map(|task| { + let pool = state.pool.clone(); + let config = config.clone(); + let cancelled = cancelled_flag.clone(); + let extracted_count = extracted_count.clone(); + + async move { + if cancelled.load(Ordering::Relaxed) { + return None; + } + + let local_path = utils::remap_libraries_path(&task.abs_path); + let path = std::path::Path::new(&local_path); + let book_id = task.book_id; + let needs_thumbnail = task.needs_thumbnail; + + // Remove macOS Apple Double resource fork files (._*) that were indexed before the scanner filter was added + if path + .file_name() + .and_then(|n| n.to_str()) + .map(|n| n.starts_with("._")) + .unwrap_or(false) + { + warn!("[ANALYZER] Removing macOS resource fork from DB: {}", local_path); + let _ = sqlx::query("DELETE FROM book_files WHERE book_id = $1") + .bind(book_id) + .execute(&pool) + .await; + let _ = sqlx::query( + "DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)", + ) .bind(book_id) .execute(&pool) .await; - let _ = sqlx::query( - "DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)", + return None; + } + + let format = match book_format_from_str(&task.format) { + Some(f) => f, + None => { + warn!("[ANALYZER] Unknown format '{}' for book {}", task.format, book_id); + return None; + } + }; + + let pdf_scale = config.width.max(config.height); + let path_owned = path.to_path_buf(); + let timeout_secs = config.timeout_secs; + let file_name = path.file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| local_path.clone()); + + debug!(target: "extraction", "[EXTRACTION] Starting: {} ({})", file_name, task.format); + let extract_start = std::time::Instant::now(); + + let analyze_result = tokio::time::timeout( + std::time::Duration::from_secs(timeout_secs), + tokio::task::spawn_blocking(move || analyze_book(&path_owned, format, pdf_scale)), ) - .bind(book_id) - .execute(&pool) .await; - return None; - } - let format = match book_format_from_str(&task.format) { - Some(f) => f, - None => { - warn!("[ANALYZER] Unknown format '{}' for book {}", task.format, book_id); - return None; - } - }; + let (page_count, raw_bytes) = match analyze_result { + Ok(Ok(Ok(result))) => result, + Ok(Ok(Err(e))) => { + warn!(target: "extraction", "[EXTRACTION] Failed: {} — {}", file_name, e); + let _ = sqlx::query( + "UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1", + ) + .bind(book_id) + .bind(e.to_string()) + .execute(&pool) + .await; + return None; + } + Ok(Err(e)) => { + warn!(target: "extraction", "[EXTRACTION] spawn error: {} — {}", file_name, e); + return None; + } + Err(_) => { + warn!(target: "extraction", "[EXTRACTION] Timeout ({}s): {}", timeout_secs, file_name); + let _ = sqlx::query( + "UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1", + ) + .bind(book_id) + .bind(format!("analyze_book timed out after {}s", timeout_secs)) + .execute(&pool) + .await; + return None; + } + }; - let pdf_scale = config.width.max(config.height); - let path_owned = path.to_path_buf(); - let timeout_secs = config.timeout_secs; - let file_name = path.file_name() - .map(|n| n.to_string_lossy().to_string()) - .unwrap_or_else(|| local_path.clone()); + let extract_elapsed = extract_start.elapsed(); + debug!( + target: "extraction", + "[EXTRACTION] Done: {} — {} pages, image={}KB in {:.0}ms", + file_name, page_count, raw_bytes.len() / 1024, + extract_elapsed.as_secs_f64() * 1000.0, + ); - debug!(target: "extraction", "[EXTRACTION] Starting: {} ({})", file_name, task.format); - let extract_start = std::time::Instant::now(); - - let analyze_result = tokio::time::timeout( - std::time::Duration::from_secs(timeout_secs), - tokio::task::spawn_blocking(move || analyze_book(&path_owned, format, pdf_scale)), - ) - .await; - - let (page_count, raw_bytes) = match analyze_result { - Ok(Ok(Ok(result))) => result, - Ok(Ok(Err(e))) => { - warn!(target: "extraction", "[EXTRACTION] Failed: {} — {}", file_name, e); + // If thumbnail already exists, just update page_count and skip thumbnail generation + if !needs_thumbnail { + debug!(target: "extraction", "[EXTRACTION] Page count only: {} — {} pages", file_name, page_count); + if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2") + .bind(page_count) + .bind(book_id) + .execute(&pool) + .await + { + warn!(target: "extraction", "[EXTRACTION] DB page_count update failed for {}: {}", file_name, e); + } + let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1; + let percent = (processed as f64 / total as f64 * 50.0) as i32; let _ = sqlx::query( - "UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1", + "UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1", ) - .bind(book_id) - .bind(e.to_string()) + .bind(job_id) + .bind(processed) + .bind(percent) .execute(&pool) .await; - return None; - } - Ok(Err(e)) => { - warn!(target: "extraction", "[EXTRACTION] spawn error: {} — {}", file_name, e); - return None; - } - Err(_) => { - warn!(target: "extraction", "[EXTRACTION] Timeout ({}s): {}", timeout_secs, file_name); - let _ = sqlx::query( - "UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1", - ) - .bind(book_id) - .bind(format!("analyze_book timed out after {}s", timeout_secs)) - .execute(&pool) - .await; - return None; - } - }; - let extract_elapsed = extract_start.elapsed(); - debug!( - target: "extraction", - "[EXTRACTION] Done: {} — {} pages, image={}KB in {:.0}ms", - file_name, page_count, raw_bytes.len() / 1024, - extract_elapsed.as_secs_f64() * 1000.0, - ); + if processed % 25 == 0 || processed == total { + info!( + target: "extraction", + "[EXTRACTION] Progress: {}/{} books extracted ({}%)", + processed, total, percent + ); + } + return None; // don't enqueue for thumbnail sub-phase + } - // If thumbnail already exists, just update page_count and skip thumbnail generation - if !needs_thumbnail { - debug!(target: "extraction", "[EXTRACTION] Page count only: {} — {} pages", file_name, page_count); + // Save raw bytes to disk (no resize, no encode) — moves raw_bytes, no clone + let raw_path = match tokio::task::spawn_blocking({ + let dir = config.directory.clone(); + move || save_raw_image(book_id, &raw_bytes, &dir) + }) + .await + { + Ok(Ok(p)) => p, + Ok(Err(e)) => { + warn!("[ANALYZER] save_raw_image failed for book {}: {}", book_id, e); + return None; + } + Err(e) => { + warn!("[ANALYZER] spawn_blocking save_raw error for book {}: {}", book_id, e); + return None; + } + }; + + // Update page_count in DB if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2") .bind(page_count) .bind(book_id) .execute(&pool) .await { - warn!(target: "extraction", "[EXTRACTION] DB page_count update failed for {}: {}", file_name, e); + warn!("[ANALYZER] DB page_count update failed for book {}: {}", book_id, e); + return None; } + let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1; - let percent = (processed as f64 / total as f64 * 50.0) as i32; + let percent = (processed as f64 / total as f64 * 50.0) as i32; // first 50% let _ = sqlx::query( "UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1", ) @@ -531,65 +603,28 @@ pub async fn analyze_library_books( processed, total, percent ); } - return None; // don't enqueue for thumbnail sub-phase + + Some((book_id, raw_path, page_count)) } + }) + .buffer_unordered(concurrency) + .filter_map(|x| async move { x }) + .collect() + .await; - // Save raw bytes to disk (no resize, no encode) - let raw_path = match tokio::task::spawn_blocking({ - let dir = config.directory.clone(); - let bytes = raw_bytes.clone(); - move || save_raw_image(book_id, &bytes, &dir) - }) - .await - { - Ok(Ok(p)) => p, - Ok(Err(e)) => { - warn!("[ANALYZER] save_raw_image failed for book {}: {}", book_id, e); - return None; - } - Err(e) => { - warn!("[ANALYZER] spawn_blocking save_raw error for book {}: {}", book_id, e); - return None; - } - }; + // Collect lightweight results; raw_bytes already saved to disk and freed + all_extracted.extend(batch_extracted); - // Update page_count in DB - if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2") - .bind(page_count) - .bind(book_id) - .execute(&pool) - .await - { - warn!("[ANALYZER] DB page_count update failed for book {}: {}", book_id, e); - return None; + // Log RSS to track memory growth between batches + if let Ok(status) = std::fs::read_to_string("/proc/self/status") { + for line in status.lines() { + if line.starts_with("VmRSS:") { + info!("[ANALYZER] Memory after batch {}/{}: {}", batch_idx + 1, num_batches, line.trim()); + break; } - - let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1; - let percent = (processed as f64 / total as f64 * 50.0) as i32; // first 50% - let _ = sqlx::query( - "UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1", - ) - .bind(job_id) - .bind(processed) - .bind(percent) - .execute(&pool) - .await; - - if processed % 25 == 0 || processed == total { - info!( - target: "extraction", - "[EXTRACTION] Progress: {}/{} books extracted ({}%)", - processed, total, percent - ); - } - - Some((book_id, raw_path, page_count)) } - }) - .buffer_unordered(concurrency) - .filter_map(|x| async move { x }) - .collect() - .await; + } + } if cancelled_flag.load(Ordering::Relaxed) { cancel_handle.abort(); @@ -597,14 +632,15 @@ pub async fn analyze_library_books( return Err(anyhow::anyhow!("Job cancelled by user")); } - let extracted_total = extracted.len() as i32; + let extracted_total = all_extracted.len() as i32; let phase_a_elapsed = phase_a_start.elapsed(); info!( - "[ANALYZER] Sub-phase A complete: {}/{} books extracted in {:.1}s ({:.0} ms/book)", + "[ANALYZER] Sub-phase A complete: {}/{} books extracted in {:.1}s ({:.0} ms/book, {} batches)", extracted_total, total, phase_a_elapsed.as_secs_f64(), - if extracted_total > 0 { phase_a_elapsed.as_millis() as f64 / extracted_total as f64 } else { 0.0 } + if extracted_total > 0 { phase_a_elapsed.as_millis() as f64 / extracted_total as f64 } else { 0.0 }, + num_batches, ); // ------------------------------------------------------------------------- @@ -622,7 +658,7 @@ pub async fn analyze_library_books( let resize_count = Arc::new(AtomicI32::new(0)); - stream::iter(extracted) + stream::iter(all_extracted) .for_each_concurrent(concurrency, |(book_id, raw_path, page_count)| { let pool = state.pool.clone(); let config = config.clone(); diff --git a/apps/indexer/src/main.rs b/apps/indexer/src/main.rs index 565f6d6..dd07ea6 100644 --- a/apps/indexer/src/main.rs +++ b/apps/indexer/src/main.rs @@ -4,8 +4,18 @@ use sqlx::postgres::PgPoolOptions; use stripstream_core::config::IndexerConfig; use tracing::info; -#[tokio::main] -async fn main() -> anyhow::Result<()> { +fn main() -> anyhow::Result<()> { + // Limit blocking thread pool to 8 threads (default 512). + // Each spawn_blocking call (archive extraction, image save) gets a thread. + // With thousands of books, unlimited threads cause OOM via stack memory (~8MB each). + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .max_blocking_threads(8) + .build()?; + runtime.block_on(async_main()) +} + +async fn async_main() -> anyhow::Result<()> { tracing_subscriber::fmt() .with_env_filter( std::env::var("RUST_LOG").unwrap_or_else(|_| {