fix(indexer): corriger OOM lors du full rebuild (batching + limite threads)
- Extraction par batches de 200 livres (libère mémoire entre chaque batch) - Limiter tokio spawn_blocking à 8 threads (défaut 512, chaque thread ~8MB stack) - Réduire concurrence extraction de 8 à 2 max - Supprimer raw_bytes.clone() inutile (passage par ownership) - Ajouter log RSS entre chaque batch pour diagnostic mémoire Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -89,7 +89,7 @@ async fn load_thumbnail_concurrency(pool: &sqlx::PgPool) -> usize {
|
||||
// Default: half the logical CPUs, clamped between 2 and 8.
|
||||
// Archive extraction is I/O bound but benefits from moderate parallelism.
|
||||
let cpus = num_cpus::get();
|
||||
let default_concurrency = (cpus / 2).clamp(2, 8);
|
||||
let default_concurrency = (cpus / 2).clamp(1, 2);
|
||||
let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#)
|
||||
.fetch_optional(pool)
|
||||
.await;
|
||||
@@ -369,6 +369,7 @@ pub async fn analyze_library_books(
|
||||
}
|
||||
});
|
||||
|
||||
#[derive(Clone)]
|
||||
struct BookTask {
|
||||
book_id: Uuid,
|
||||
abs_path: String,
|
||||
@@ -388,8 +389,11 @@ pub async fn analyze_library_books(
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Sub-phase A: extract first page from each archive and store raw image
|
||||
// I/O bound — limited by HDD throughput, runs at `concurrency`
|
||||
// Processed in batches of 500 to limit memory — raw_bytes are freed between batches.
|
||||
// The collected results (Uuid, String, i32) are lightweight (~100 bytes each).
|
||||
// -------------------------------------------------------------------------
|
||||
const BATCH_SIZE: usize = 200;
|
||||
|
||||
let phase_a_start = std::time::Instant::now();
|
||||
let _ = sqlx::query(
|
||||
"UPDATE index_jobs SET status = 'extracting_pages', total_files = $2, processed_files = 0, current_file = NULL WHERE id = $1",
|
||||
@@ -400,9 +404,27 @@ pub async fn analyze_library_books(
|
||||
.await;
|
||||
|
||||
let extracted_count = Arc::new(AtomicI32::new(0));
|
||||
let mut all_extracted: Vec<(Uuid, String, i32)> = Vec::new();
|
||||
|
||||
// Collected results: (book_id, raw_path, page_count) — only books that need thumbnail generation
|
||||
let extracted: Vec<(Uuid, String, i32)> = stream::iter(tasks)
|
||||
let num_batches = (tasks.len() + BATCH_SIZE - 1) / BATCH_SIZE;
|
||||
let task_chunks: Vec<Vec<BookTask>> = tasks
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>()
|
||||
.chunks(BATCH_SIZE)
|
||||
.map(|c| c.to_vec())
|
||||
.collect();
|
||||
|
||||
for (batch_idx, batch_tasks) in task_chunks.into_iter().enumerate() {
|
||||
if cancelled_flag.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
info!(
|
||||
"[ANALYZER] Extraction batch {}/{} — {} books",
|
||||
batch_idx + 1, num_batches, batch_tasks.len()
|
||||
);
|
||||
|
||||
let batch_extracted: Vec<(Uuid, String, i32)> = stream::iter(batch_tasks)
|
||||
.map(|task| {
|
||||
let pool = state.pool.clone();
|
||||
let config = config.clone();
|
||||
@@ -534,11 +556,10 @@ pub async fn analyze_library_books(
|
||||
return None; // don't enqueue for thumbnail sub-phase
|
||||
}
|
||||
|
||||
// Save raw bytes to disk (no resize, no encode)
|
||||
// Save raw bytes to disk (no resize, no encode) — moves raw_bytes, no clone
|
||||
let raw_path = match tokio::task::spawn_blocking({
|
||||
let dir = config.directory.clone();
|
||||
let bytes = raw_bytes.clone();
|
||||
move || save_raw_image(book_id, &bytes, &dir)
|
||||
move || save_raw_image(book_id, &raw_bytes, &dir)
|
||||
})
|
||||
.await
|
||||
{
|
||||
@@ -591,20 +612,35 @@ pub async fn analyze_library_books(
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
// Collect lightweight results; raw_bytes already saved to disk and freed
|
||||
all_extracted.extend(batch_extracted);
|
||||
|
||||
// Log RSS to track memory growth between batches
|
||||
if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
|
||||
for line in status.lines() {
|
||||
if line.starts_with("VmRSS:") {
|
||||
info!("[ANALYZER] Memory after batch {}/{}: {}", batch_idx + 1, num_batches, line.trim());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if cancelled_flag.load(Ordering::Relaxed) {
|
||||
cancel_handle.abort();
|
||||
info!("[ANALYZER] Job {} cancelled during extraction phase", job_id);
|
||||
return Err(anyhow::anyhow!("Job cancelled by user"));
|
||||
}
|
||||
|
||||
let extracted_total = extracted.len() as i32;
|
||||
let extracted_total = all_extracted.len() as i32;
|
||||
let phase_a_elapsed = phase_a_start.elapsed();
|
||||
info!(
|
||||
"[ANALYZER] Sub-phase A complete: {}/{} books extracted in {:.1}s ({:.0} ms/book)",
|
||||
"[ANALYZER] Sub-phase A complete: {}/{} books extracted in {:.1}s ({:.0} ms/book, {} batches)",
|
||||
extracted_total,
|
||||
total,
|
||||
phase_a_elapsed.as_secs_f64(),
|
||||
if extracted_total > 0 { phase_a_elapsed.as_millis() as f64 / extracted_total as f64 } else { 0.0 }
|
||||
if extracted_total > 0 { phase_a_elapsed.as_millis() as f64 / extracted_total as f64 } else { 0.0 },
|
||||
num_batches,
|
||||
);
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -622,7 +658,7 @@ pub async fn analyze_library_books(
|
||||
|
||||
let resize_count = Arc::new(AtomicI32::new(0));
|
||||
|
||||
stream::iter(extracted)
|
||||
stream::iter(all_extracted)
|
||||
.for_each_concurrent(concurrency, |(book_id, raw_path, page_count)| {
|
||||
let pool = state.pool.clone();
|
||||
let config = config.clone();
|
||||
|
||||
@@ -4,8 +4,18 @@ use sqlx::postgres::PgPoolOptions;
|
||||
use stripstream_core::config::IndexerConfig;
|
||||
use tracing::info;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
fn main() -> anyhow::Result<()> {
|
||||
// Limit blocking thread pool to 8 threads (default 512).
|
||||
// Each spawn_blocking call (archive extraction, image save) gets a thread.
|
||||
// With thousands of books, unlimited threads cause OOM via stack memory (~8MB each).
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.max_blocking_threads(8)
|
||||
.build()?;
|
||||
runtime.block_on(async_main())
|
||||
}
|
||||
|
||||
async fn async_main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
std::env::var("RUST_LOG").unwrap_or_else(|_| {
|
||||
|
||||
Reference in New Issue
Block a user