diff --git a/.env.example b/.env.example index d505438..985db7e 100644 --- a/.env.example +++ b/.env.example @@ -34,6 +34,24 @@ MEILI_URL=http://meilisearch:7700 # PostgreSQL Database DATABASE_URL=postgres://stripstream:stripstream@postgres:5432/stripstream +# ============================================================================= +# Logging +# ============================================================================= +# Log levels per domain. Default: indexer=info,scan=info,extraction=info,thumbnail=warn,watcher=info +# Domains: +# scan — filesystem scan (discovery phase) +# extraction — page extraction from archives (extracting_pages phase) +# thumbnail — thumbnail generation (resize/encode) +# watcher — file watcher polling +# indexer — general indexer logs +# Levels: error, warn, info, debug, trace +# Examples: +# RUST_LOG=indexer=info # default, quiet thumbnails +# RUST_LOG=indexer=info,thumbnail=debug # enable thumbnail timing logs +# RUST_LOG=indexer=info,extraction=debug # per-book extraction details +# RUST_LOG=indexer=debug,scan=debug,extraction=debug,thumbnail=debug,watcher=debug # tout voir +# RUST_LOG=indexer=info,scan=info,extraction=info,thumbnail=warn,watcher=info + # ============================================================================= # Storage Configuration # ============================================================================= diff --git a/Cargo.lock b/Cargo.lock index f18b937..5c618e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1132,7 +1132,6 @@ dependencies = [ "jpeg-decoder", "num_cpus", "parsers", - "rayon", "reqwest", "serde", "serde_json", diff --git a/apps/backoffice/app/components/MobileNav.tsx b/apps/backoffice/app/components/MobileNav.tsx index 0e5ffa0..95d54d2 100644 --- a/apps/backoffice/app/components/MobileNav.tsx +++ b/apps/backoffice/app/components/MobileNav.tsx @@ -68,6 +68,17 @@ export function MobileNav({ navItems }: { navItems: NavItem[] }) { {item.label} ))} + +
+ setIsOpen(false)} + > + + Settings + +
diff --git a/apps/backoffice/app/layout.tsx b/apps/backoffice/app/layout.tsx index 3d71b80..b6e5e23 100644 --- a/apps/backoffice/app/layout.tsx +++ b/apps/backoffice/app/layout.tsx @@ -52,7 +52,7 @@ export default function RootLayout({ children }: { children: ReactNode }) { StripStream - + backoffice @@ -74,7 +74,7 @@ export default function RootLayout({ children }: { children: ReactNode }) { diff --git a/apps/indexer/Cargo.toml b/apps/indexer/Cargo.toml index 8cd2a6c..7e49610 100644 --- a/apps/indexer/Cargo.toml +++ b/apps/indexer/Cargo.toml @@ -15,7 +15,6 @@ image.workspace = true jpeg-decoder.workspace = true num_cpus.workspace = true parsers = { path = "../../crates/parsers" } -rayon.workspace = true reqwest.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/apps/indexer/src/analyzer.rs b/apps/indexer/src/analyzer.rs index 11549bc..9f7dc5d 100644 --- a/apps/indexer/src/analyzer.rs +++ b/apps/indexer/src/analyzer.rs @@ -6,7 +6,7 @@ use sqlx::Row; use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::Arc; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use uuid::Uuid; use crate::{job::is_job_cancelled, utils, AppState}; @@ -179,7 +179,8 @@ fn generate_thumbnail(image_bytes: &[u8], config: &ThumbnailConfig) -> anyhow::R let t_resize = t1.elapsed(); let format = config.format.as_deref().unwrap_or("webp"); - info!( + debug!( + target: "thumbnail", "[THUMBNAIL] {}x{} -> {}x{} decode={:.0}ms resize={:.0}ms encode_format={}", orig_w, orig_h, w, h, t_decode.as_secs_f64() * 1000.0, @@ -237,7 +238,8 @@ fn generate_thumbnail(image_bytes: &[u8], config: &ThumbnailConfig) -> anyhow::R } }; let t_encode = t2.elapsed(); - info!( + debug!( + target: "thumbnail", "[THUMBNAIL] encode={:.0}ms total={:.0}ms output_size={}KB", t_encode.as_secs_f64() * 1000.0, t0.elapsed().as_secs_f64() * 1000.0, @@ -263,7 +265,7 @@ fn resize_raw_to_thumbnail( ) -> anyhow::Result { let raw_bytes = std::fs::read(raw_path) .map_err(|e| anyhow::anyhow!("failed to read raw image {}: {}", raw_path, e))?; - info!("[THUMBNAIL] book={} raw_size={}KB", book_id, raw_bytes.len() / 1024); + debug!(target: "thumbnail", "[THUMBNAIL] book={} raw_size={}KB", book_id, raw_bytes.len() / 1024); let thumb_bytes = generate_thumbnail(&raw_bytes, config)?; let format = config.format.as_deref().unwrap_or("webp"); @@ -449,6 +451,13 @@ pub async fn analyze_library_books( let pdf_scale = config.width.max(config.height); let path_owned = path.to_path_buf(); let timeout_secs = config.timeout_secs; + let file_name = path.file_name() + .map(|n| n.to_string_lossy().to_string()) + .unwrap_or_else(|| local_path.clone()); + + debug!(target: "extraction", "[EXTRACTION] Starting: {} ({})", file_name, task.format); + let extract_start = std::time::Instant::now(); + let analyze_result = tokio::time::timeout( std::time::Duration::from_secs(timeout_secs), tokio::task::spawn_blocking(move || analyze_book(&path_owned, format, pdf_scale)), @@ -458,7 +467,7 @@ pub async fn analyze_library_books( let (page_count, raw_bytes) = match analyze_result { Ok(Ok(Ok(result))) => result, Ok(Ok(Err(e))) => { - warn!("[ANALYZER] analyze_book failed for book {}: {}", book_id, e); + warn!(target: "extraction", "[EXTRACTION] Failed: {} — {}", file_name, e); let _ = sqlx::query( "UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1", ) @@ -469,11 +478,11 @@ pub async fn analyze_library_books( return None; } Ok(Err(e)) => { - warn!("[ANALYZER] spawn_blocking error for book {}: {}", book_id, e); + warn!(target: "extraction", "[EXTRACTION] spawn error: {} — {}", file_name, e); return None; } Err(_) => { - warn!("[ANALYZER] analyze_book timed out after {}s for book {}: {}", timeout_secs, book_id, local_path); + warn!(target: "extraction", "[EXTRACTION] Timeout ({}s): {}", timeout_secs, file_name); let _ = sqlx::query( "UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1", ) @@ -485,15 +494,24 @@ pub async fn analyze_library_books( } }; + let extract_elapsed = extract_start.elapsed(); + debug!( + target: "extraction", + "[EXTRACTION] Done: {} — {} pages, image={}KB in {:.0}ms", + file_name, page_count, raw_bytes.len() / 1024, + extract_elapsed.as_secs_f64() * 1000.0, + ); + // If thumbnail already exists, just update page_count and skip thumbnail generation if !needs_thumbnail { + debug!(target: "extraction", "[EXTRACTION] Page count only: {} — {} pages", file_name, page_count); if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2") .bind(page_count) .bind(book_id) .execute(&pool) .await { - warn!("[ANALYZER] DB page_count update failed for book {}: {}", book_id, e); + warn!(target: "extraction", "[EXTRACTION] DB page_count update failed for {}: {}", file_name, e); } let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1; let percent = (processed as f64 / total as f64 * 50.0) as i32; @@ -505,6 +523,14 @@ pub async fn analyze_library_books( .bind(percent) .execute(&pool) .await; + + if processed % 25 == 0 || processed == total { + info!( + target: "extraction", + "[EXTRACTION] Progress: {}/{} books extracted ({}%)", + processed, total, percent + ); + } return None; // don't enqueue for thumbnail sub-phase } @@ -549,6 +575,14 @@ pub async fn analyze_library_books( .execute(&pool) .await; + if processed % 25 == 0 || processed == total { + info!( + target: "extraction", + "[EXTRACTION] Progress: {}/{} books extracted ({}%)", + processed, total, percent + ); + } + Some((book_id, raw_path, page_count)) } }) @@ -643,6 +677,14 @@ pub async fn analyze_library_books( .bind(percent) .execute(&pool) .await; + + if processed % 25 == 0 || processed == extracted_total { + info!( + target: "thumbnail", + "[THUMBNAIL] Progress: {}/{} thumbnails generated ({}%)", + processed, extracted_total, percent + ); + } } }) .await; diff --git a/apps/indexer/src/job.rs b/apps/indexer/src/job.rs index 830c5ca..18d172d 100644 --- a/apps/indexer/src/job.rs +++ b/apps/indexer/src/job.rs @@ -1,5 +1,4 @@ use anyhow::Result; -use rayon::prelude::*; use sqlx::{PgPool, Row}; use tracing::{error, info}; use uuid::Uuid; @@ -270,10 +269,12 @@ pub async fn process_job( crate::utils::remap_libraries_path(&library.get::("root_path")) }) .collect(); + // Count sequentially with limited open fds to avoid ENFILE exhaustion library_paths - .par_iter() + .iter() .map(|root_path| { walkdir::WalkDir::new(root_path) + .max_open(20) .into_iter() .filter_map(Result::ok) .filter(|entry| { diff --git a/apps/indexer/src/main.rs b/apps/indexer/src/main.rs index 7d543a1..565f6d6 100644 --- a/apps/indexer/src/main.rs +++ b/apps/indexer/src/main.rs @@ -8,7 +8,9 @@ use tracing::info; async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() .with_env_filter( - std::env::var("RUST_LOG").unwrap_or_else(|_| "indexer=info,axum=info".to_string()), + std::env::var("RUST_LOG").unwrap_or_else(|_| { + "indexer=info,axum=info,scan=info,extraction=info,thumbnail=warn,watcher=info".to_string() + }), ) .init(); diff --git a/apps/indexer/src/scanner.rs b/apps/indexer/src/scanner.rs index 979206a..28b6e48 100644 --- a/apps/indexer/src/scanner.rs +++ b/apps/indexer/src/scanner.rs @@ -4,7 +4,7 @@ use parsers::{detect_format, parse_metadata_fast}; use serde::Serialize; use sqlx::Row; use std::{collections::HashMap, path::Path, time::Duration}; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; use uuid::Uuid; use walkdir::WalkDir; @@ -124,7 +124,37 @@ pub async fn scan_library_discovery( // Files under these prefixes are added to `seen` but not reprocessed. let mut skipped_dir_prefixes: Vec = Vec::new(); - for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) { + // Track consecutive IO errors to detect fd exhaustion (ENFILE) + let mut consecutive_io_errors: usize = 0; + const MAX_CONSECUTIVE_IO_ERRORS: usize = 10; + + for result in WalkDir::new(root).max_open(20).into_iter() { + let entry = match result { + Ok(e) => { + consecutive_io_errors = 0; + e + } + Err(e) => { + consecutive_io_errors += 1; + let is_enfile = e + .io_error() + .map(|io| io.raw_os_error() == Some(23) || io.raw_os_error() == Some(24)) + .unwrap_or(false); + if is_enfile || consecutive_io_errors >= MAX_CONSECUTIVE_IO_ERRORS { + error!( + "[SCAN] Too many IO errors ({} consecutive) scanning library {} — \ + fd limit likely exhausted. Aborting scan for this library.", + consecutive_io_errors, library_id + ); + stats.warnings += 1; + break; + } + warn!("[SCAN] walkdir error: {}", e); + stats.warnings += 1; + continue; + } + }; + let path = entry.path().to_path_buf(); let local_path = path.to_string_lossy().to_string(); @@ -192,7 +222,8 @@ pub async fn scan_library_discovery( continue; }; - info!( + debug!( + target: "scan", "[SCAN] Found book file: {} (format: {:?})", path.display(), format @@ -209,6 +240,17 @@ pub async fn scan_library_discovery( let metadata = match std::fs::metadata(&path) { Ok(m) => m, Err(e) => { + let is_enfile = e.raw_os_error() == Some(23) || e.raw_os_error() == Some(24); + if is_enfile { + consecutive_io_errors += 1; + } + if consecutive_io_errors >= MAX_CONSECUTIVE_IO_ERRORS { + error!( + "[SCAN] fd limit exhausted while stat'ing files in library {}. Aborting.", + library_id + ); + break; + } warn!("[SCAN] cannot stat {}, skipping: {}", path.display(), e); stats.warnings += 1; continue; @@ -278,8 +320,9 @@ pub async fn scan_library_discovery( continue; } - info!( - "[PROCESS] Updating existing file: {} (fingerprint_changed={})", + debug!( + target: "scan", + "[SCAN] Updating: {} (fingerprint_changed={})", file_name, old_fingerprint != fingerprint ); @@ -335,7 +378,7 @@ pub async fn scan_library_discovery( } // New file — insert with page_count = NULL (analyzer fills it in) - info!("[PROCESS] Inserting new file: {}", file_name); + debug!(target: "scan", "[SCAN] Inserting: {}", file_name); let book_id = Uuid::new_v4(); let file_id = Uuid::new_v4(); @@ -401,31 +444,53 @@ pub async fn scan_library_discovery( library_id, library_processed_count, stats.indexed_files, stats.errors ); - // Handle deletions - let mut removed_count = 0usize; - for (abs_path, (file_id, book_id, _)) in &existing { - if seen.contains_key(abs_path) { - continue; - } - sqlx::query("DELETE FROM book_files WHERE id = $1") - .bind(file_id) + // Handle deletions — with safety check against volume mount failures + let existing_count = existing.len(); + let seen_count = seen.len(); + let stale_count = existing.iter().filter(|(p, _)| !seen.contains_key(p.as_str())).count(); + + // Safety: if the library root is not accessible, or if we found zero files + // but the DB had many, the volume is probably not mounted correctly. + // Do NOT delete anything in that case. + let root_accessible = root.is_dir() && std::fs::read_dir(root).is_ok(); + let skip_deletions = !root_accessible + || (seen_count == 0 && existing_count > 0) + || (stale_count > 0 && stale_count == existing_count); + + if skip_deletions && stale_count > 0 { + warn!( + "[SCAN] Skipping deletion of {} stale files for library {} — \ + root accessible={}, seen={}, existing={}. \ + Volume may not be mounted correctly.", + stale_count, library_id, root_accessible, seen_count, existing_count + ); + stats.warnings += stale_count; + } else { + let mut removed_count = 0usize; + for (abs_path, (file_id, book_id, _)) in &existing { + if seen.contains_key(abs_path) { + continue; + } + sqlx::query("DELETE FROM book_files WHERE id = $1") + .bind(file_id) + .execute(&state.pool) + .await?; + sqlx::query( + "DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)", + ) + .bind(book_id) .execute(&state.pool) .await?; - sqlx::query( - "DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)", - ) - .bind(book_id) - .execute(&state.pool) - .await?; - stats.removed_files += 1; - removed_count += 1; - } + stats.removed_files += 1; + removed_count += 1; + } - if removed_count > 0 { - info!( - "[SCAN] Removed {} stale files from database", - removed_count - ); + if removed_count > 0 { + info!( + "[SCAN] Removed {} stale files from database", + removed_count + ); + } } // Upsert directory mtimes for next incremental scan diff --git a/apps/indexer/src/watcher.rs b/apps/indexer/src/watcher.rs index 9904e4a..47f5339 100644 --- a/apps/indexer/src/watcher.rs +++ b/apps/indexer/src/watcher.rs @@ -3,7 +3,7 @@ use sqlx::Row; use std::collections::{HashMap, HashSet}; use std::path::Path; use std::time::Duration; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, warn}; use uuid::Uuid; use walkdir::WalkDir; @@ -29,6 +29,7 @@ fn snapshot_library(root_path: &str) -> LibrarySnapshot { let mut files = HashSet::new(); let walker = WalkDir::new(root_path) .follow_links(true) + .max_open(10) .into_iter() .filter_map(|e| e.ok()); @@ -54,7 +55,7 @@ pub async fn run_file_watcher(state: AppState) -> Result<()> { // Skip if any job is active — avoid competing for file descriptors if has_active_jobs(&pool).await { - trace!("[WATCHER] Skipping poll — job active"); + debug!(target: "watcher", "[WATCHER] Skipping poll — job active"); continue; } @@ -113,7 +114,7 @@ pub async fn run_file_watcher(state: AppState) -> Result<()> { // Re-check between libraries in case a job was created if has_active_jobs(&pool).await { - trace!("[WATCHER] Job became active during poll, stopping"); + debug!(target: "watcher", "[WATCHER] Job became active during poll, stopping"); break; } @@ -126,7 +127,8 @@ pub async fn run_file_watcher(state: AppState) -> Result<()> { Some(old_snapshot) => *old_snapshot != new_snapshot, None => { // First scan — store baseline, don't trigger a job - trace!( + debug!( + target: "watcher", "[WATCHER] Initial snapshot for library {}: {} files", library_id, new_snapshot.len() @@ -168,7 +170,7 @@ pub async fn run_file_watcher(state: AppState) -> Result<()> { Err(err) => error!("[WATCHER] Failed to create job: {}", err), } } else { - trace!("[WATCHER] Job already active for library {}, skipping", library_id); + debug!(target: "watcher", "[WATCHER] Job already active for library {}, skipping", library_id); } }