From cfc896e92f6485050da68143de18f3a7382b4abc Mon Sep 17 00:00:00 2001 From: Froidefond Julien Date: Mon, 9 Mar 2026 22:13:05 +0100 Subject: [PATCH] feat: two-phase indexation with direct thumbnail generation in indexer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 (discovery): walkdir + filename-only metadata, zero archive I/O. Books are visible immediately in the UI while Phase 2 runs in background. Phase 2 (analysis): open each archive once via analyze_book() to extract page_count and first page bytes, then generate WebP thumbnail directly in the indexer — removing the HTTP roundtrip to the API checkup endpoint. - Add parse_metadata_fast() (infallible, no archive I/O) - Add analyze_book() returning (page_count, first_page_bytes) in one pass - Add looks_like_image() magic bytes check for unrar p stdout validation - Add lsar fallback in list_cbr_images() for UTF-16BE encoded filenames - Add directory_mtimes table to skip unchanged dirs on incremental scans - Add analyzer.rs: generate_thumbnail, analyze_library_books, regenerate_thumbnails - Remove run_checkup() from API; indexer handles thumbnail jobs directly - Remove api_base_url/api_bootstrap_token from IndexerConfig and AppState - Add unar + poppler-utils to indexer Dockerfile - Fix smoke.sh: wait for job completion, check thumbnail_url field Co-Authored-By: Claude Sonnet 4.6 --- AGENTS.md | 7 +- Cargo.lock | 3 + Cargo.toml | 1 + apps/api/Cargo.toml | 2 +- apps/api/src/index_jobs.rs | 2 +- apps/api/src/main.rs | 1 - apps/api/src/pages.rs | 12 +- apps/api/src/thumbnails.rs | 316 +------------- apps/indexer/AGENTS.md | 59 ++- apps/indexer/Cargo.toml | 3 + apps/indexer/Dockerfile | 6 +- apps/indexer/src/analyzer.rs | 442 +++++++++++++++++++ apps/indexer/src/job.rs | 252 ++++++----- apps/indexer/src/lib.rs | 3 +- apps/indexer/src/main.rs | 2 - apps/indexer/src/meili.rs | 2 +- apps/indexer/src/scanner.rs | 473 ++++++++++++--------- apps/indexer/src/watcher.rs | 2 +- crates/core/src/config.rs | 8 - crates/parsers/src/lib.rs | 326 +++++++++----- infra/migrations/0012_directory_mtimes.sql | 8 + infra/smoke.sh | 112 ++++- 22 files changed, 1274 insertions(+), 768 deletions(-) create mode 100644 apps/indexer/src/analyzer.rs create mode 100644 infra/migrations/0012_directory_mtimes.sql diff --git a/AGENTS.md b/AGENTS.md index 41e5260..c07cb87 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -251,9 +251,10 @@ stripstream-librarian/ |------|---------| | `apps/api/src/books.rs` | Book CRUD endpoints | | `apps/api/src/pages.rs` | Page rendering & caching (LRU + disk) | -| `apps/api/src/thumbnails.rs` | Thumbnail generation (triggered by indexer) | +| `apps/api/src/thumbnails.rs` | Endpoints pour créer des jobs thumbnail (rebuild/regenerate) | | `apps/api/src/state.rs` | AppState, Semaphore concurrent_renders | -| `apps/indexer/src/scanner.rs` | Filesystem scan, rayon parallel parsing | +| `apps/indexer/src/scanner.rs` | Phase 1 discovery : scan rapide sans I/O archive, skip dossiers inchangés | +| `apps/indexer/src/analyzer.rs` | Phase 2 analysis : `analyze_book` + génération thumbnails WebP | | `apps/indexer/src/batch.rs` | Bulk DB ops via UNNEST | | `apps/indexer/src/worker.rs` | Job loop, watcher, scheduler orchestration | | `crates/parsers/src/lib.rs` | Format detection, metadata parsing | @@ -302,5 +303,5 @@ fn remap_libraries_path(path: &str) -> String { - **Dependencies**: External crates are defined in workspace `Cargo.toml`, not individual `Cargo.toml`. - **Database**: PostgreSQL is required. Run migrations before starting services. - **External Tools**: 4 system tools required — `unrar` (CBR page count), `unar` (CBR extraction), `pdfinfo` (PDF page count), `pdftoppm` (PDF page render). Note: `unrar` and `unar` are distinct tools. -- **Thumbnails**: generated by the **API** service (not the indexer). The indexer triggers a checkup via `POST /index/jobs/:id/thumbnails/checkup` after indexing. +- **Thumbnails**: generated by the **indexer** service (phase 2, `analyzer.rs`). The API only creates jobs in DB — it does not generate thumbnails directly. - **Sub-AGENTS.md**: module-specific guidelines in `apps/api/`, `apps/indexer/`, `apps/backoffice/`, `crates/parsers/`. diff --git a/Cargo.lock b/Cargo.lock index 07a6b3c..270a78f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1146,6 +1146,8 @@ dependencies = [ "anyhow", "axum", "chrono", + "futures", + "image", "notify", "parsers", "rand 0.8.5", @@ -1161,6 +1163,7 @@ dependencies = [ "tracing-subscriber", "uuid", "walkdir", + "webp", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 59dcfa6..d57e085 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,5 +33,6 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } uuid = { version = "1.12", features = ["serde", "v4"] } walkdir = "2.5" +webp = "0.3" utoipa = "4.0" utoipa-swagger-ui = "6.0" diff --git a/apps/api/Cargo.toml b/apps/api/Cargo.toml index c49f1e6..8bb69b1 100644 --- a/apps/api/Cargo.toml +++ b/apps/api/Cargo.toml @@ -31,5 +31,5 @@ uuid.workspace = true zip = { version = "2.2", default-features = false, features = ["deflate"] } utoipa.workspace = true utoipa-swagger-ui = { workspace = true, features = ["axum"] } -webp = "0.3" +webp.workspace = true walkdir = "2" diff --git a/apps/api/src/index_jobs.rs b/apps/api/src/index_jobs.rs index 3fcc177..b0ed2aa 100644 --- a/apps/api/src/index_jobs.rs +++ b/apps/api/src/index_jobs.rs @@ -247,7 +247,7 @@ pub async fn list_folders( } let mut folders = Vec::new(); - let depth = if params.get("path").is_some() { + let depth = if params.contains_key("path") { canonical_target.strip_prefix(&canonical_base) .map(|p| p.components().count()) .unwrap_or(0) diff --git a/apps/api/src/main.rs b/apps/api/src/main.rs index f007239..b8b4d3a 100644 --- a/apps/api/src/main.rs +++ b/apps/api/src/main.rs @@ -76,7 +76,6 @@ async fn main() -> anyhow::Result<()> { .route("/index/jobs/active", get(index_jobs::get_active_jobs)) .route("/index/jobs/:id", get(index_jobs::get_job_details)) .route("/index/jobs/:id/stream", get(index_jobs::stream_job_progress)) - .route("/index/jobs/:id/thumbnails/checkup", axum::routing::post(thumbnails::start_checkup)) .route("/index/jobs/:id/errors", get(index_jobs::get_job_errors)) .route("/index/cancel/:id", axum::routing::post(index_jobs::cancel_job)) .route("/folders", get(index_jobs::list_folders)) diff --git a/apps/api/src/pages.rs b/apps/api/src/pages.rs index f5cbac2..8d98e07 100644 --- a/apps/api/src/pages.rs +++ b/apps/api/src/pages.rs @@ -550,12 +550,12 @@ fn transcode_image(input: &[u8], out_format: &OutputFormat, quality: u8, width: } fn format_matches(source: &ImageFormat, target: &OutputFormat) -> bool { - match (source, target) { - (ImageFormat::Jpeg, OutputFormat::Jpeg) => true, - (ImageFormat::Png, OutputFormat::Png) => true, - (ImageFormat::WebP, OutputFormat::Webp) => true, - _ => false, - } + matches!( + (source, target), + (ImageFormat::Jpeg, OutputFormat::Jpeg) + | (ImageFormat::Png, OutputFormat::Png) + | (ImageFormat::WebP, OutputFormat::Webp) + ) } fn is_image_name(name: &str) -> bool { diff --git a/apps/api/src/thumbnails.rs b/apps/api/src/thumbnails.rs index bbb239e..1560e3a 100644 --- a/apps/api/src/thumbnails.rs +++ b/apps/api/src/thumbnails.rs @@ -1,310 +1,12 @@ -use std::path::Path; -use std::sync::atomic::{AtomicI32, Ordering}; -use std::sync::Arc; - -use anyhow::Context; use axum::{ - extract::{Path as AxumPath, State}, - http::StatusCode, + extract::State, Json, }; -use futures::stream::{self, StreamExt}; -use image::GenericImageView; use serde::Deserialize; -use sqlx::Row; -use tracing::{info, warn}; use uuid::Uuid; use utoipa::ToSchema; -use crate::{error::ApiError, index_jobs, pages, state::AppState}; - -#[derive(Clone)] -struct ThumbnailConfig { - enabled: bool, - width: u32, - height: u32, - quality: u8, - directory: String, -} - -async fn load_thumbnail_concurrency(pool: &sqlx::PgPool) -> usize { - let default_concurrency = 4; - let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#) - .fetch_optional(pool) - .await; - - match row { - Ok(Some(row)) => { - let value: serde_json::Value = row.get("value"); - value - .get("concurrent_renders") - .and_then(|v| v.as_u64()) - .map(|v| v as usize) - .unwrap_or(default_concurrency) - } - _ => default_concurrency, - } -} - -async fn load_thumbnail_config(pool: &sqlx::PgPool) -> ThumbnailConfig { - let fallback = ThumbnailConfig { - enabled: true, - width: 300, - height: 400, - quality: 80, - directory: "/data/thumbnails".to_string(), - }; - let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'thumbnail'"#) - .fetch_optional(pool) - .await; - - match row { - Ok(Some(row)) => { - let value: serde_json::Value = row.get("value"); - ThumbnailConfig { - enabled: value - .get("enabled") - .and_then(|v| v.as_bool()) - .unwrap_or(fallback.enabled), - width: value - .get("width") - .and_then(|v| v.as_u64()) - .map(|v| v as u32) - .unwrap_or(fallback.width), - height: value - .get("height") - .and_then(|v| v.as_u64()) - .map(|v| v as u32) - .unwrap_or(fallback.height), - quality: value - .get("quality") - .and_then(|v| v.as_u64()) - .map(|v| v as u8) - .unwrap_or(fallback.quality), - directory: value - .get("directory") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()) - .unwrap_or_else(|| fallback.directory.clone()), - } - } - _ => fallback, - } -} - -fn generate_thumbnail(image_bytes: &[u8], config: &ThumbnailConfig) -> anyhow::Result> { - let img = image::load_from_memory(image_bytes).context("failed to load image")?; - let (orig_w, orig_h) = img.dimensions(); - let ratio_w = config.width as f32 / orig_w as f32; - let ratio_h = config.height as f32 / orig_h as f32; - let ratio = ratio_w.min(ratio_h); - let new_w = (orig_w as f32 * ratio) as u32; - let new_h = (orig_h as f32 * ratio) as u32; - let resized = img.resize(new_w, new_h, image::imageops::FilterType::Lanczos3); - let rgba = resized.to_rgba8(); - let (w, h) = rgba.dimensions(); - let rgb_data: Vec = rgba.pixels().flat_map(|p| [p[0], p[1], p[2]]).collect(); - let quality = f32::max(config.quality as f32, 85.0); - let webp_data = - webp::Encoder::new(&rgb_data, webp::PixelLayout::Rgb, w, h).encode(quality); - Ok(webp_data.to_vec()) -} - -fn save_thumbnail(book_id: Uuid, thumbnail_bytes: &[u8], config: &ThumbnailConfig) -> anyhow::Result { - let dir = Path::new(&config.directory); - std::fs::create_dir_all(dir)?; - let filename = format!("{}.webp", book_id); - let path = dir.join(&filename); - std::fs::write(&path, thumbnail_bytes)?; - Ok(path.to_string_lossy().to_string()) -} - -async fn run_checkup(state: AppState, job_id: Uuid) { - let pool = &state.pool; - let row = sqlx::query("SELECT library_id, type FROM index_jobs WHERE id = $1") - .bind(job_id) - .fetch_optional(pool) - .await; - - let (library_id, job_type) = match row { - Ok(Some(r)) => ( - r.get::, _>("library_id"), - r.get::("type"), - ), - _ => { - warn!("thumbnails checkup: job {} not found", job_id); - return; - } - }; - - // Regenerate or full_rebuild: clear existing thumbnails in scope so they get regenerated - if job_type == "thumbnail_regenerate" || job_type == "full_rebuild" { - let config = load_thumbnail_config(pool).await; - - if job_type == "full_rebuild" { - // For full_rebuild: delete orphaned thumbnail files (books were deleted, new ones have new UUIDs) - // Get all existing book IDs to keep their thumbnails - let existing_book_ids: std::collections::HashSet = sqlx::query_scalar( - r#"SELECT id FROM books WHERE (library_id = $1 OR $1 IS NULL)"#, - ) - .bind(library_id) - .fetch_all(pool) - .await - .unwrap_or_default() - .into_iter() - .collect(); - - // Delete thumbnail files that don't correspond to existing books - let thumbnail_dir = Path::new(&config.directory); - if thumbnail_dir.exists() { - let mut deleted_count = 0; - if let Ok(entries) = std::fs::read_dir(thumbnail_dir) { - for entry in entries.flatten() { - if let Some(file_name) = entry.file_name().to_str() { - if file_name.ends_with(".webp") { - if let Some(book_id_str) = file_name.strip_suffix(".webp") { - if let Ok(book_id) = Uuid::parse_str(book_id_str) { - if !existing_book_ids.contains(&book_id) { - if let Err(e) = std::fs::remove_file(entry.path()) { - warn!("Failed to delete orphaned thumbnail {}: {}", entry.path().display(), e); - } else { - deleted_count += 1; - } - } - } - } - } - } - } - } - info!("thumbnails full_rebuild: deleted {} orphaned thumbnail files", deleted_count); - } - } else { - // For regenerate: delete thumbnail files for books with thumbnails - let book_ids_to_clear: Vec = sqlx::query_scalar( - r#"SELECT id FROM books WHERE (library_id = $1 OR $1 IS NULL) AND thumbnail_path IS NOT NULL"#, - ) - .bind(library_id) - .fetch_all(pool) - .await - .unwrap_or_default(); - - let mut deleted_count = 0; - for book_id in &book_ids_to_clear { - let filename = format!("{}.webp", book_id); - let thumbnail_path = Path::new(&config.directory).join(&filename); - if thumbnail_path.exists() { - if let Err(e) = std::fs::remove_file(&thumbnail_path) { - warn!("Failed to delete thumbnail file {}: {}", thumbnail_path.display(), e); - } else { - deleted_count += 1; - } - } - } - info!("thumbnails regenerate: deleted {} thumbnail files", deleted_count); - } - - // Clear thumbnail_path in database - let cleared = sqlx::query( - r#"UPDATE books SET thumbnail_path = NULL WHERE (library_id = $1 OR $1 IS NULL)"#, - ) - .bind(library_id) - .execute(pool) - .await; - if let Ok(res) = cleared { - info!("thumbnails {}: cleared {} books in database", job_type, res.rows_affected()); - } - } - - let book_ids: Vec = sqlx::query_scalar( - r#"SELECT id FROM books WHERE (library_id = $1 OR $1 IS NULL) AND thumbnail_path IS NULL"#, - ) - .bind(library_id) - .fetch_all(pool) - .await - .unwrap_or_default(); - - let config = load_thumbnail_config(pool).await; - if !config.enabled || book_ids.is_empty() { - let _ = sqlx::query( - "UPDATE index_jobs SET status = 'success', finished_at = NOW(), progress_percent = 100, current_file = NULL WHERE id = $1", - ) - .bind(job_id) - .execute(pool) - .await; - return; - } - - let total = book_ids.len() as i32; - let _ = sqlx::query( - "UPDATE index_jobs SET status = 'generating_thumbnails', total_files = $2, processed_files = 0, current_file = NULL WHERE id = $1", - ) - .bind(job_id) - .bind(total) - .execute(pool) - .await; - - let concurrency = load_thumbnail_concurrency(pool).await; - let processed_count = Arc::new(AtomicI32::new(0)); - let pool_clone = pool.clone(); - let job_id_clone = job_id; - let config_clone = config.clone(); - let state_clone = state.clone(); - - let total_clone = total; - stream::iter(book_ids) - .for_each_concurrent(concurrency, |book_id| { - let processed_count = processed_count.clone(); - let pool = pool_clone.clone(); - let job_id = job_id_clone; - let config = config_clone.clone(); - let state = state_clone.clone(); - let total = total_clone; - - async move { - match pages::render_book_page_1(&state, book_id, config.width, config.quality).await { - Ok(page_bytes) => { - match generate_thumbnail(&page_bytes, &config) { - Ok(thumb_bytes) => { - if let Ok(path) = save_thumbnail(book_id, &thumb_bytes, &config) { - if sqlx::query("UPDATE books SET thumbnail_path = $1 WHERE id = $2") - .bind(&path) - .bind(book_id) - .execute(&pool) - .await - .is_ok() - { - let processed = processed_count.fetch_add(1, Ordering::Relaxed) + 1; - let percent = (processed as f64 / total as f64 * 100.0) as i32; - let _ = sqlx::query( - "UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1", - ) - .bind(job_id) - .bind(processed) - .bind(percent) - .execute(&pool) - .await; - } - } - } - Err(e) => warn!("thumbnail generate failed for book {}: {:?}", book_id, e), - } - } - Err(e) => warn!("render page 1 failed for book {}: {:?}", book_id, e), - } - } - }) - .await; - - let _ = sqlx::query( - "UPDATE index_jobs SET status = 'success', finished_at = NOW(), progress_percent = 100, current_file = NULL WHERE id = $1", - ) - .bind(job_id) - .execute(pool) - .await; - - info!("thumbnails checkup finished for job {} ({} books)", job_id, total); -} +use crate::{error::ApiError, index_jobs, state::AppState}; #[derive(Deserialize, ToSchema)] pub struct ThumbnailsRebuildRequest { @@ -312,7 +14,7 @@ pub struct ThumbnailsRebuildRequest { pub library_id: Option, } -/// POST /index/thumbnails/rebuild — create a job and generate thumbnails for books that don't have one (optional library scope). +/// POST /index/thumbnails/rebuild — create a job to generate thumbnails for books that don't have one. #[utoipa::path( post, path = "/index/thumbnails/rebuild", @@ -346,7 +48,7 @@ pub async fn start_thumbnails_rebuild( Ok(Json(index_jobs::map_row(row))) } -/// POST /index/thumbnails/regenerate — create a job and regenerate all thumbnails in scope (clears then regenerates). +/// POST /index/thumbnails/regenerate — create a job to regenerate all thumbnails (clears then regenerates). #[utoipa::path( post, path = "/index/thumbnails/regenerate", @@ -379,13 +81,3 @@ pub async fn start_thumbnails_regenerate( Ok(Json(index_jobs::map_row(row))) } - -/// POST /index/jobs/:id/thumbnails/checkup — start thumbnail generation for books missing thumbnails (called by indexer at end of build). -pub async fn start_checkup( - State(state): State, - AxumPath(job_id): AxumPath, -) -> Result { - let state = state.clone(); - tokio::spawn(async move { run_checkup(state, job_id).await }); - Ok(StatusCode::ACCEPTED) -} diff --git a/apps/indexer/AGENTS.md b/apps/indexer/AGENTS.md index 6dd0b3c..f07a4fd 100644 --- a/apps/indexer/AGENTS.md +++ b/apps/indexer/AGENTS.md @@ -7,15 +7,16 @@ Service background sur le port **7081**. Voir `AGENTS.md` racine pour les conven | Fichier | Rôle | |---------|------| | `main.rs` | Point d'entrée, initialisation, lancement du worker | -| `lib.rs` | `AppState` (pool, meili, api_base_url) | +| `lib.rs` | `AppState` (pool, meili_url, meili_master_key) | | `worker.rs` | Boucle principale : claim job → process → cleanup stale | | `job.rs` | `claim_next_job`, `process_job`, `fail_job`, `cleanup_stale_jobs` | -| `scanner.rs` | Scan filesystem, parsing parallèle (rayon), batching DB | +| `scanner.rs` | Phase 1 discovery : WalkDir + `parse_metadata_fast` (zéro I/O archive), skip dossiers inchangés via mtime, batching DB | +| `analyzer.rs` | Phase 2 analysis : ouvre chaque archive une fois (`analyze_book`), génère page_count + thumbnail WebP | | `batch.rs` | `flush_all_batches` avec UNNEST, structures `BookInsert/Update/FileInsert/Update/ErrorInsert` | | `scheduler.rs` | Auto-scan : vérifie toutes les 60s les bibliothèques à monitorer | | `watcher.rs` | File watcher temps réel | | `meili.rs` | Indexation/sync Meilisearch | -| `api.rs` | Appels HTTP vers l'API (pour checkup thumbnails) | +| `api.rs` | Endpoints HTTP de l'indexer (/health, /ready) | | `utils.rs` | `remap_libraries_path`, `unmap_libraries_path`, `compute_fingerprint`, `kind_from_format` | ## Cycle de vie d'un job @@ -23,10 +24,21 @@ Service background sur le port **7081**. Voir `AGENTS.md` racine pour les conven ``` claim_next_job (UPDATE ... RETURNING, status pending→running) └─ process_job - ├─ scanner::scan_library (rayon par_iter pour le parsing) - │ └─ flush_all_batches toutes les BATCH_SIZE=100 itérations - └─ meili sync - └─ api checkup thumbnails (POST /index/jobs/:id/thumbnails/checkup) + ├─ Phase 1 : scanner::scan_library_discovery + │ ├─ WalkDir + parse_metadata_fast (zéro I/O archive) + │ ├─ skip dossiers via directory_mtimes (table DB) + │ └─ INSERT books (page_count=NULL) → livres visibles immédiatement + ├─ meili::sync_meili + ├─ analyzer::cleanup_orphaned_thumbnails (full_rebuild uniquement) + └─ Phase 2 : analyzer::analyze_library_books + ├─ SELECT books WHERE page_count IS NULL + ├─ parsers::analyze_book → (page_count, first_page_bytes) + ├─ generate_thumbnail (WebP, Lanczos3) + └─ UPDATE books SET page_count, thumbnail_path + +Jobs spéciaux : + thumbnail_rebuild → analyze_library_books(thumbnail_only=true) + thumbnail_regenerate → regenerate_thumbnails (clear + re-analyze) ``` - Annulation : `is_job_cancelled` vérifié toutes les 10 fichiers ou 1s — retourne `Err("Job cancelled")` @@ -49,14 +61,28 @@ if books_to_insert.len() >= BATCH_SIZE { Toutes les opérations du flush sont dans une seule transaction. -## Scan filesystem (scanner.rs) +## Scan filesystem — architecture 2 phases -Pipeline en 3 étapes : -1. **Collect** : WalkDir → filtrer par format (CBZ/CBR/PDF) -2. **Parse** : `file_infos.into_par_iter().map(parse_metadata)` (rayon) -3. **Process** : séquentiel pour les inserts/updates DB +### Phase 1 : Discovery (`scanner.rs`) -Fingerprint = SHA256(taille + mtime) pour détecter les changements sans relire le fichier. +Pipeline allégé — **zéro ouverture d'archive** : +1. Charger `directory_mtimes` depuis la DB +2. WalkDir : pour chaque dossier, comparer mtime filesystem vs mtime stocké → skip si inchangé +3. Pour chaque fichier : `parse_metadata_fast` (title/series/volume depuis filename uniquement) +4. INSERT/UPDATE avec `page_count = NULL` — les livres sont visibles immédiatement +5. Upsert `directory_mtimes` en fin de scan + +Fingerprint = SHA256(taille + mtime + filename) pour détecter les changements sans relire le fichier. + +### Phase 2 : Analysis (`analyzer.rs`) + +Traitement progressif en background : +- Query `WHERE page_count IS NULL` (ou `thumbnail_path IS NULL` pour thumbnail jobs) +- Concurrence bornée (`futures::stream::for_each_concurrent`, défaut 4) +- Par livre : `parsers::analyze_book(path, format)` → `(page_count, first_page_bytes)` +- Génération thumbnail : resize Lanczos3 + encode WebP +- UPDATE `books SET page_count, thumbnail_path` +- Config lue depuis `app_settings` (clés `'thumbnail'` et `'limits'`) ## Path remapping @@ -69,7 +95,10 @@ utils::unmap_libraries_path(&local_path) // filesystem local → DB ## Gotchas -- **Thumbnails** : générés par l'API après handoff, pas par l'indexer directement. L'indexer appelle `/index/jobs/:id/thumbnails/checkup` via `api.rs`. -- **full_rebuild** : si `true`, ignore les fingerprints → tous les fichiers sont retraités. +- **Thumbnails** : générés **directement par l'indexer** (phase 2, `analyzer.rs`). L'API ne gère plus la génération — elle crée juste les jobs en DB. +- **page_count = NULL** : après la phase discovery, tous les nouveaux livres ont `page_count = NULL`. La phase analysis les remplit progressivement. Ne pas confondre avec une erreur. +- **directory_mtimes** : table DB qui stocke le mtime de chaque dossier scanné. Vidée au full_rebuild, mise à jour après chaque scan. Permet de skipper les dossiers inchangés en scan incrémental. +- **full_rebuild** : supprime toutes les données puis re-insère. Ignore les fingerprints et les directory_mtimes. - **Annulation** : vérifier `is_job_cancelled` régulièrement pour respecter les annulations utilisateur. - **Watcher + scheduler** : tournent en tâches tokio séparées dans `worker.rs`, en parallèle de la boucle principale. +- **spawn_blocking** : l'ouverture d'archive (`analyze_book`) et la génération de thumbnail sont des opérations bloquantes — toujours les wrapper dans `tokio::task::spawn_blocking`. diff --git a/apps/indexer/Cargo.toml b/apps/indexer/Cargo.toml index dd1b0c2..4b86a86 100644 --- a/apps/indexer/Cargo.toml +++ b/apps/indexer/Cargo.toml @@ -10,6 +10,8 @@ license.workspace = true anyhow.workspace = true axum.workspace = true chrono.workspace = true +futures = "0.3" +image.workspace = true notify = "6.1" parsers = { path = "../../crates/parsers" } rand.workspace = true @@ -25,3 +27,4 @@ tracing.workspace = true tracing-subscriber.workspace = true uuid.workspace = true walkdir.workspace = true +webp.workspace = true diff --git a/apps/indexer/Dockerfile b/apps/indexer/Dockerfile index 3e20c52..3f97b12 100644 --- a/apps/indexer/Dockerfile +++ b/apps/indexer/Dockerfile @@ -21,7 +21,11 @@ RUN --mount=type=cache,target=/sccache \ cargo build --release -p indexer FROM debian:bookworm-slim -RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates wget unrar-free && rm -rf /var/lib/apt/lists/* +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates wget \ + unrar-free unar \ + poppler-utils \ + && rm -rf /var/lib/apt/lists/* COPY --from=builder /app/target/release/indexer /usr/local/bin/indexer EXPOSE 7081 CMD ["/usr/local/bin/indexer"] diff --git a/apps/indexer/src/analyzer.rs b/apps/indexer/src/analyzer.rs new file mode 100644 index 0000000..1edf8d5 --- /dev/null +++ b/apps/indexer/src/analyzer.rs @@ -0,0 +1,442 @@ +use anyhow::Result; +use futures::stream::{self, StreamExt}; +use image::GenericImageView; +use parsers::{analyze_book, BookFormat}; +use sqlx::Row; +use std::path::Path; +use std::sync::atomic::{AtomicI32, Ordering}; +use std::sync::Arc; +use tracing::{info, warn}; +use uuid::Uuid; + +use crate::{utils, AppState}; + +#[derive(Clone)] +struct ThumbnailConfig { + enabled: bool, + width: u32, + height: u32, + quality: u8, + directory: String, +} + +async fn load_thumbnail_config(pool: &sqlx::PgPool) -> ThumbnailConfig { + let fallback = ThumbnailConfig { + enabled: true, + width: 300, + height: 400, + quality: 80, + directory: "/data/thumbnails".to_string(), + }; + let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'thumbnail'"#) + .fetch_optional(pool) + .await; + + match row { + Ok(Some(row)) => { + let value: serde_json::Value = row.get("value"); + ThumbnailConfig { + enabled: value + .get("enabled") + .and_then(|v| v.as_bool()) + .unwrap_or(fallback.enabled), + width: value + .get("width") + .and_then(|v| v.as_u64()) + .map(|v| v as u32) + .unwrap_or(fallback.width), + height: value + .get("height") + .and_then(|v| v.as_u64()) + .map(|v| v as u32) + .unwrap_or(fallback.height), + quality: value + .get("quality") + .and_then(|v| v.as_u64()) + .map(|v| v as u8) + .unwrap_or(fallback.quality), + directory: value + .get("directory") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .unwrap_or_else(|| fallback.directory.clone()), + } + } + _ => fallback, + } +} + +async fn load_thumbnail_concurrency(pool: &sqlx::PgPool) -> usize { + let default_concurrency = 4; + let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#) + .fetch_optional(pool) + .await; + + match row { + Ok(Some(row)) => { + let value: serde_json::Value = row.get("value"); + value + .get("concurrent_renders") + .and_then(|v| v.as_u64()) + .map(|v| v as usize) + .unwrap_or(default_concurrency) + } + _ => default_concurrency, + } +} + +fn generate_thumbnail(image_bytes: &[u8], config: &ThumbnailConfig) -> anyhow::Result> { + let img = image::load_from_memory(image_bytes) + .map_err(|e| anyhow::anyhow!("failed to load image: {}", e))?; + let (orig_w, orig_h) = img.dimensions(); + let ratio_w = config.width as f32 / orig_w as f32; + let ratio_h = config.height as f32 / orig_h as f32; + let ratio = ratio_w.min(ratio_h); + let new_w = (orig_w as f32 * ratio) as u32; + let new_h = (orig_h as f32 * ratio) as u32; + let resized = img.resize(new_w, new_h, image::imageops::FilterType::Lanczos3); + let rgba = resized.to_rgba8(); + let (w, h) = rgba.dimensions(); + let rgb_data: Vec = rgba.pixels().flat_map(|p| [p[0], p[1], p[2]]).collect(); + let quality = f32::max(config.quality as f32, 85.0); + let webp_data = webp::Encoder::new(&rgb_data, webp::PixelLayout::Rgb, w, h).encode(quality); + Ok(webp_data.to_vec()) +} + +fn save_thumbnail( + book_id: Uuid, + thumbnail_bytes: &[u8], + config: &ThumbnailConfig, +) -> anyhow::Result { + let dir = Path::new(&config.directory); + std::fs::create_dir_all(dir)?; + let filename = format!("{}.webp", book_id); + let path = dir.join(&filename); + std::fs::write(&path, thumbnail_bytes)?; + Ok(path.to_string_lossy().to_string()) +} + +fn book_format_from_str(s: &str) -> Option { + match s { + "cbz" => Some(BookFormat::Cbz), + "cbr" => Some(BookFormat::Cbr), + "pdf" => Some(BookFormat::Pdf), + _ => None, + } +} + +/// Phase 2 — Analysis: open each unanalyzed archive once, extract page_count + thumbnail. +/// `thumbnail_only` = true: only process books missing thumbnail (page_count may already be set). +/// `thumbnail_only` = false: process books missing page_count. +pub async fn analyze_library_books( + state: &AppState, + job_id: Uuid, + library_id: Option, + thumbnail_only: bool, +) -> Result<()> { + let config = load_thumbnail_config(&state.pool).await; + + if !config.enabled { + info!("[ANALYZER] Thumbnails disabled, skipping analysis phase"); + return Ok(()); + } + + let concurrency = load_thumbnail_concurrency(&state.pool).await; + + // Query books that need analysis + let query_filter = if thumbnail_only { + "b.thumbnail_path IS NULL" + } else { + "b.page_count IS NULL" + }; + + let sql = format!( + r#" + SELECT b.id AS book_id, bf.abs_path, bf.format + FROM books b + JOIN book_files bf ON bf.book_id = b.id + WHERE (b.library_id = $1 OR $1 IS NULL) + AND {} + "#, + query_filter + ); + + let rows = sqlx::query(&sql) + .bind(library_id) + .fetch_all(&state.pool) + .await?; + + if rows.is_empty() { + info!("[ANALYZER] No books to analyze"); + return Ok(()); + } + + let total = rows.len() as i32; + info!( + "[ANALYZER] Analyzing {} books (thumbnail_only={}, concurrency={})", + total, thumbnail_only, concurrency + ); + + // Update job status + let _ = sqlx::query( + "UPDATE index_jobs SET status = 'generating_thumbnails', total_files = $2, processed_files = 0, current_file = NULL WHERE id = $1", + ) + .bind(job_id) + .bind(total) + .execute(&state.pool) + .await; + + let processed_count = Arc::new(AtomicI32::new(0)); + + struct BookTask { + book_id: Uuid, + abs_path: String, + format: String, + } + + let tasks: Vec = rows + .into_iter() + .map(|row| BookTask { + book_id: row.get("book_id"), + abs_path: row.get("abs_path"), + format: row.get("format"), + }) + .collect(); + + stream::iter(tasks) + .for_each_concurrent(concurrency, |task| { + let processed_count = processed_count.clone(); + let pool = state.pool.clone(); + let config = config.clone(); + + async move { + let local_path = utils::remap_libraries_path(&task.abs_path); + let path = Path::new(&local_path); + + let format = match book_format_from_str(&task.format) { + Some(f) => f, + None => { + warn!("[ANALYZER] Unknown format '{}' for book {}", task.format, task.book_id); + return; + } + }; + + // Run blocking archive I/O on a thread pool + let book_id = task.book_id; + let path_owned = path.to_path_buf(); + let analyze_result = tokio::task::spawn_blocking(move || { + analyze_book(&path_owned, format) + }) + .await; + + let (page_count, image_bytes) = match analyze_result { + Ok(Ok(result)) => result, + Ok(Err(e)) => { + warn!("[ANALYZER] analyze_book failed for book {}: {}", book_id, e); + // Mark parse_status = error in book_files + let _ = sqlx::query( + "UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1", + ) + .bind(book_id) + .bind(e.to_string()) + .execute(&pool) + .await; + return; + } + Err(e) => { + warn!("[ANALYZER] spawn_blocking error for book {}: {}", book_id, e); + return; + } + }; + + // Generate thumbnail + let thumb_result = tokio::task::spawn_blocking({ + let config = config.clone(); + move || generate_thumbnail(&image_bytes, &config) + }) + .await; + + let thumb_bytes = match thumb_result { + Ok(Ok(b)) => b, + Ok(Err(e)) => { + warn!("[ANALYZER] thumbnail generation failed for book {}: {}", book_id, e); + // Still update page_count even if thumbnail fails + let _ = sqlx::query( + "UPDATE books SET page_count = $1 WHERE id = $2", + ) + .bind(page_count) + .bind(book_id) + .execute(&pool) + .await; + return; + } + Err(e) => { + warn!("[ANALYZER] spawn_blocking thumbnail error for book {}: {}", book_id, e); + return; + } + }; + + // Save thumbnail file + let save_result = { + let config = config.clone(); + tokio::task::spawn_blocking(move || save_thumbnail(book_id, &thumb_bytes, &config)) + .await + }; + + let thumb_path = match save_result { + Ok(Ok(p)) => p, + Ok(Err(e)) => { + warn!("[ANALYZER] save_thumbnail failed for book {}: {}", book_id, e); + let _ = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2") + .bind(page_count) + .bind(book_id) + .execute(&pool) + .await; + return; + } + Err(e) => { + warn!("[ANALYZER] spawn_blocking save error for book {}: {}", book_id, e); + return; + } + }; + + // Update DB + if let Err(e) = sqlx::query( + "UPDATE books SET page_count = $1, thumbnail_path = $2 WHERE id = $3", + ) + .bind(page_count) + .bind(&thumb_path) + .bind(book_id) + .execute(&pool) + .await + { + warn!("[ANALYZER] DB update failed for book {}: {}", book_id, e); + return; + } + + let processed = processed_count.fetch_add(1, Ordering::Relaxed) + 1; + let percent = (processed as f64 / total as f64 * 100.0) as i32; + let _ = sqlx::query( + "UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1", + ) + .bind(job_id) + .bind(processed) + .bind(percent) + .execute(&pool) + .await; + } + }) + .await; + + let final_count = processed_count.load(Ordering::Relaxed); + info!( + "[ANALYZER] Analysis complete: {}/{} books processed", + final_count, total + ); + + Ok(()) +} + +/// Clear thumbnail files and DB references for books in scope, then re-analyze. +pub async fn regenerate_thumbnails( + state: &AppState, + job_id: Uuid, + library_id: Option, +) -> Result<()> { + let config = load_thumbnail_config(&state.pool).await; + + // Delete thumbnail files for all books in scope + let book_ids_to_clear: Vec = sqlx::query_scalar( + r#"SELECT id FROM books WHERE (library_id = $1 OR $1 IS NULL) AND thumbnail_path IS NOT NULL"#, + ) + .bind(library_id) + .fetch_all(&state.pool) + .await + .unwrap_or_default(); + + let mut deleted_count = 0usize; + for book_id in &book_ids_to_clear { + let filename = format!("{}.webp", book_id); + let thumbnail_path = Path::new(&config.directory).join(&filename); + if thumbnail_path.exists() { + if let Err(e) = std::fs::remove_file(&thumbnail_path) { + warn!( + "[ANALYZER] Failed to delete thumbnail {}: {}", + thumbnail_path.display(), + e + ); + } else { + deleted_count += 1; + } + } + } + info!( + "[ANALYZER] Deleted {} thumbnail files for regeneration", + deleted_count + ); + + // Clear thumbnail_path in DB + sqlx::query( + r#"UPDATE books SET thumbnail_path = NULL WHERE (library_id = $1 OR $1 IS NULL)"#, + ) + .bind(library_id) + .execute(&state.pool) + .await?; + + // Re-analyze all books (now thumbnail_path IS NULL for all) + analyze_library_books(state, job_id, library_id, true).await +} + +/// Delete orphaned thumbnail files (books deleted in full_rebuild get new UUIDs). +pub async fn cleanup_orphaned_thumbnails( + state: &AppState, + library_id: Option, +) -> Result<()> { + let config = load_thumbnail_config(&state.pool).await; + + let existing_book_ids: std::collections::HashSet = sqlx::query_scalar( + r#"SELECT id FROM books WHERE (library_id = $1 OR $1 IS NULL)"#, + ) + .bind(library_id) + .fetch_all(&state.pool) + .await + .unwrap_or_default() + .into_iter() + .collect(); + + let thumbnail_dir = Path::new(&config.directory); + if !thumbnail_dir.exists() { + return Ok(()); + } + + let mut deleted_count = 0usize; + if let Ok(entries) = std::fs::read_dir(thumbnail_dir) { + for entry in entries.flatten() { + if let Some(file_name) = entry.file_name().to_str() { + if file_name.ends_with(".webp") { + if let Some(book_id_str) = file_name.strip_suffix(".webp") { + if let Ok(book_id) = Uuid::parse_str(book_id_str) { + if !existing_book_ids.contains(&book_id) { + if let Err(e) = std::fs::remove_file(entry.path()) { + warn!( + "Failed to delete orphaned thumbnail {}: {}", + entry.path().display(), + e + ); + } else { + deleted_count += 1; + } + } + } + } + } + } + } + } + + info!( + "[ANALYZER] Deleted {} orphaned thumbnail files", + deleted_count + ); + Ok(()) +} diff --git a/apps/indexer/src/job.rs b/apps/indexer/src/job.rs index f6ebab9..bf7a955 100644 --- a/apps/indexer/src/job.rs +++ b/apps/indexer/src/job.rs @@ -1,63 +1,60 @@ use anyhow::Result; use rayon::prelude::*; use sqlx::{PgPool, Row}; -use std::time::Duration; use tracing::{error, info}; use uuid::Uuid; -use crate::{meili, scanner, AppState}; +use crate::{analyzer, meili, scanner, AppState}; pub async fn cleanup_stale_jobs(pool: &PgPool) -> Result<()> { - // Mark jobs that have been running for more than 5 minutes as failed - // This handles cases where the indexer was restarted while jobs were running let result = sqlx::query( r#" - UPDATE index_jobs - SET status = 'failed', - finished_at = NOW(), + UPDATE index_jobs + SET status = 'failed', + finished_at = NOW(), error_opt = 'Job interrupted by indexer restart' - WHERE status = 'running' + WHERE status = 'running' AND started_at < NOW() - INTERVAL '5 minutes' RETURNING id - "# + "#, ) .fetch_all(pool) .await?; - + if !result.is_empty() { let count = result.len(); - let ids: Vec = result.iter() + let ids: Vec = result + .iter() .map(|row| row.get::("id").to_string()) .collect(); - info!("[CLEANUP] Marked {} stale job(s) as failed: {}", count, ids.join(", ")); + info!( + "[CLEANUP] Marked {} stale job(s) as failed: {}", + count, + ids.join(", ") + ); } - + Ok(()) } pub async fn claim_next_job(pool: &PgPool) -> Result)>> { let mut tx = pool.begin().await?; - - // Atomically select and lock the next job - // Exclude rebuild/full_rebuild if one is already running - // Prioritize: full_rebuild > rebuild > others + let row = sqlx::query( r#" SELECT j.id, j.type, j.library_id FROM index_jobs j WHERE j.status = 'pending' AND ( - -- Allow rebuilds only if no rebuild is running (j.type IN ('rebuild', 'full_rebuild') AND NOT EXISTS ( SELECT 1 FROM index_jobs WHERE status = 'running' AND type IN ('rebuild', 'full_rebuild') )) OR - -- Always allow non-rebuild jobs j.type NOT IN ('rebuild', 'full_rebuild') ) - ORDER BY + ORDER BY CASE j.type WHEN 'full_rebuild' THEN 1 WHEN 'rebuild' THEN 2 @@ -66,7 +63,7 @@ pub async fn claim_next_job(pool: &PgPool) -> Result) j.created_at ASC FOR UPDATE SKIP LOCKED LIMIT 1 - "# + "#, ) .fetch_optional(&mut *tx) .await?; @@ -79,8 +76,7 @@ pub async fn claim_next_job(pool: &PgPool) -> Result) let id: Uuid = row.get("id"); let job_type: String = row.get("type"); let library_id: Option = row.get("library_id"); - - // Final check: if this is a rebuild, ensure no rebuild started between SELECT and UPDATE + if job_type == "rebuild" || job_type == "full_rebuild" { let has_running_rebuild: bool = sqlx::query_scalar( r#" @@ -90,48 +86,55 @@ pub async fn claim_next_job(pool: &PgPool) -> Result) AND type IN ('rebuild', 'full_rebuild') AND id != $1 ) - "# + "#, ) .bind(id) .fetch_one(&mut *tx) .await?; - + if has_running_rebuild { tx.rollback().await?; return Ok(None); } } - sqlx::query("UPDATE index_jobs SET status = 'running', started_at = NOW(), error_opt = NULL WHERE id = $1") - .bind(id) - .execute(&mut *tx) - .await?; + sqlx::query( + "UPDATE index_jobs SET status = 'running', started_at = NOW(), error_opt = NULL WHERE id = $1", + ) + .bind(id) + .execute(&mut *tx) + .await?; tx.commit().await?; Ok(Some((id, library_id))) } pub async fn fail_job(pool: &PgPool, job_id: Uuid, error_message: &str) -> Result<()> { - sqlx::query("UPDATE index_jobs SET status = 'failed', finished_at = NOW(), error_opt = $2 WHERE id = $1") - .bind(job_id) - .bind(error_message) - .execute(pool) - .await?; + sqlx::query( + "UPDATE index_jobs SET status = 'failed', finished_at = NOW(), error_opt = $2 WHERE id = $1", + ) + .bind(job_id) + .bind(error_message) + .execute(pool) + .await?; Ok(()) } pub async fn is_job_cancelled(pool: &PgPool, job_id: Uuid) -> Result { - let status: Option = sqlx::query_scalar( - "SELECT status FROM index_jobs WHERE id = $1" - ) - .bind(job_id) - .fetch_optional(pool) - .await?; - + let status: Option = + sqlx::query_scalar("SELECT status FROM index_jobs WHERE id = $1") + .bind(job_id) + .fetch_optional(pool) + .await?; + Ok(status.as_deref() == Some("cancelled")) } -pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Option) -> Result<()> { +pub async fn process_job( + state: &AppState, + job_id: Uuid, + target_library_id: Option, +) -> Result<()> { info!("[JOB] Processing {} library={:?}", job_id, target_library_id); let job_type: String = sqlx::query_scalar("SELECT type FROM index_jobs WHERE id = $1") @@ -139,8 +142,8 @@ pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Opti .fetch_one(&state.pool) .await?; - // Thumbnail jobs: hand off to API and wait for completion (same queue as rebuilds) - if job_type == "thumbnail_rebuild" || job_type == "thumbnail_regenerate" { + // Thumbnail rebuild: generate thumbnails for books missing them + if job_type == "thumbnail_rebuild" { sqlx::query( "UPDATE index_jobs SET status = 'generating_thumbnails', started_at = NOW() WHERE id = $1", ) @@ -148,54 +151,65 @@ pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Opti .execute(&state.pool) .await?; - let api_base = state.api_base_url.trim_end_matches('/'); - let url = format!("{}/index/jobs/{}/thumbnails/checkup", api_base, job_id); - let client = reqwest::Client::new(); - let res = client - .post(&url) - .header("Authorization", format!("Bearer {}", state.api_bootstrap_token)) - .send() - .await?; - if !res.status().is_success() { - anyhow::bail!("thumbnail checkup API returned {}", res.status()); - } + analyzer::analyze_library_books(state, job_id, target_library_id, true).await?; - // Poll until job is finished (API updates the same row) - let poll_interval = Duration::from_secs(1); - loop { - tokio::time::sleep(poll_interval).await; - let status: String = sqlx::query_scalar("SELECT status FROM index_jobs WHERE id = $1") - .bind(job_id) - .fetch_one(&state.pool) - .await?; - if status == "success" || status == "failed" { - info!("[JOB] Thumbnail job {} finished with status {}", job_id, status); - return Ok(()); - } - } + sqlx::query( + "UPDATE index_jobs SET status = 'success', finished_at = NOW(), progress_percent = 100, current_file = NULL WHERE id = $1", + ) + .bind(job_id) + .execute(&state.pool) + .await?; + + return Ok(()); + } + + // Thumbnail regenerate: clear all thumbnails then re-generate + if job_type == "thumbnail_regenerate" { + sqlx::query( + "UPDATE index_jobs SET status = 'generating_thumbnails', started_at = NOW() WHERE id = $1", + ) + .bind(job_id) + .execute(&state.pool) + .await?; + + analyzer::regenerate_thumbnails(state, job_id, target_library_id).await?; + + sqlx::query( + "UPDATE index_jobs SET status = 'success', finished_at = NOW(), progress_percent = 100, current_file = NULL WHERE id = $1", + ) + .bind(job_id) + .execute(&state.pool) + .await?; + + return Ok(()); } let is_full_rebuild = job_type == "full_rebuild"; - info!("[JOB] {} type={} full_rebuild={}", job_id, job_type, is_full_rebuild); + info!( + "[JOB] {} type={} full_rebuild={}", + job_id, job_type, is_full_rebuild + ); - // For full rebuilds, delete existing data first + // Full rebuild: delete existing data first if is_full_rebuild { info!("[JOB] Full rebuild: deleting existing data"); if let Some(library_id) = target_library_id { - // Delete books and files for specific library - sqlx::query("DELETE FROM book_files WHERE book_id IN (SELECT id FROM books WHERE library_id = $1)") - .bind(library_id) - .execute(&state.pool) - .await?; + sqlx::query( + "DELETE FROM book_files WHERE book_id IN (SELECT id FROM books WHERE library_id = $1)", + ) + .bind(library_id) + .execute(&state.pool) + .await?; sqlx::query("DELETE FROM books WHERE library_id = $1") .bind(library_id) .execute(&state.pool) .await?; info!("[JOB] Deleted existing data for library {}", library_id); } else { - // Delete all books and files - sqlx::query("DELETE FROM book_files").execute(&state.pool).await?; + sqlx::query("DELETE FROM book_files") + .execute(&state.pool) + .await?; sqlx::query("DELETE FROM books").execute(&state.pool).await?; info!("[JOB] Deleted all existing data"); } @@ -212,24 +226,34 @@ pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Opti .await? }; - // First pass: count total files for progress estimation (parallel) - let library_paths: Vec = libraries.iter() - .map(|library| crate::utils::remap_libraries_path(&library.get::("root_path"))) + // Count total files for progress estimation + let library_paths: Vec = libraries + .iter() + .map(|library| { + crate::utils::remap_libraries_path(&library.get::("root_path")) + }) .collect(); - - let total_files: usize = library_paths.par_iter() + + let total_files: usize = library_paths + .par_iter() .map(|root_path| { walkdir::WalkDir::new(root_path) .into_iter() .filter_map(Result::ok) - .filter(|entry| entry.file_type().is_file() && parsers::detect_format(entry.path()).is_some()) + .filter(|entry| { + entry.file_type().is_file() + && parsers::detect_format(entry.path()).is_some() + }) .count() }) .sum(); - info!("[JOB] Found {} libraries, {} total files to index", libraries.len(), total_files); - - // Update job with total estimate + info!( + "[JOB] Found {} libraries, {} total files to index", + libraries.len(), + total_files + ); + sqlx::query("UPDATE index_jobs SET total_files = $2 WHERE id = $1") .bind(job_id) .bind(total_files as i32) @@ -242,26 +266,47 @@ pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Opti removed_files: 0, errors: 0, }; - - // Track processed files across all libraries for accurate progress + let mut total_processed_count = 0i32; - for library in libraries { + // Phase 1: Discovery + for library in &libraries { let library_id: Uuid = library.get("id"); let root_path: String = library.get("root_path"); let root_path = crate::utils::remap_libraries_path(&root_path); - match scanner::scan_library(state, job_id, library_id, std::path::Path::new(&root_path), &mut stats, &mut total_processed_count, total_files, is_full_rebuild).await { + match scanner::scan_library_discovery( + state, + job_id, + library_id, + std::path::Path::new(&root_path), + &mut stats, + &mut total_processed_count, + total_files, + is_full_rebuild, + ) + .await + { Ok(()) => {} Err(err) => { + let err_str = err.to_string(); + if err_str.contains("cancelled") || err_str.contains("Cancelled") { + return Err(err); + } stats.errors += 1; error!(library_id = %library_id, error = %err, "library scan failed"); } } } + // Sync search index after discovery (books are visible immediately) meili::sync_meili(&state.pool, &state.meili_url, &state.meili_master_key).await?; - // Hand off to API for thumbnail checkup (API will set status = 'success' when done) + // For full rebuild: clean up orphaned thumbnail files (old UUIDs) + if is_full_rebuild { + analyzer::cleanup_orphaned_thumbnails(state, target_library_id).await?; + } + + // Phase 2: Analysis (extract page_count + thumbnails for new/updated books) sqlx::query( "UPDATE index_jobs SET status = 'generating_thumbnails', stats_json = $2, current_file = NULL, processed_files = $3 WHERE id = $1", ) @@ -271,23 +316,14 @@ pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Opti .execute(&state.pool) .await?; - let api_base = state.api_base_url.trim_end_matches('/'); - let url = format!("{}/index/jobs/{}/thumbnails/checkup", api_base, job_id); - let client = reqwest::Client::new(); - let res = client - .post(&url) - .header("Authorization", format!("Bearer {}", state.api_bootstrap_token)) - .send() - .await; - if let Err(e) = res { - tracing::warn!("[JOB] Failed to trigger thumbnail checkup: {} — API will not generate thumbnails for this job", e); - } else if let Ok(r) = res { - if !r.status().is_success() { - tracing::warn!("[JOB] Thumbnail checkup returned {} — API may not generate thumbnails", r.status()); - } else { - info!("[JOB] Thumbnail checkup started (job {}), API will complete the job", job_id); - } - } + analyzer::analyze_library_books(state, job_id, target_library_id, false).await?; + + sqlx::query( + "UPDATE index_jobs SET status = 'success', finished_at = NOW(), progress_percent = 100, current_file = NULL WHERE id = $1", + ) + .bind(job_id) + .execute(&state.pool) + .await?; Ok(()) } diff --git a/apps/indexer/src/lib.rs b/apps/indexer/src/lib.rs index 17ecc25..9eaf8fc 100644 --- a/apps/indexer/src/lib.rs +++ b/apps/indexer/src/lib.rs @@ -1,3 +1,4 @@ +pub mod analyzer; pub mod api; pub mod batch; pub mod job; @@ -15,6 +16,4 @@ pub struct AppState { pub pool: PgPool, pub meili_url: String, pub meili_master_key: String, - pub api_base_url: String, - pub api_bootstrap_token: String, } diff --git a/apps/indexer/src/main.rs b/apps/indexer/src/main.rs index 761a800..7d543a1 100644 --- a/apps/indexer/src/main.rs +++ b/apps/indexer/src/main.rs @@ -22,8 +22,6 @@ async fn main() -> anyhow::Result<()> { pool, meili_url: config.meili_url.clone(), meili_master_key: config.meili_master_key.clone(), - api_base_url: config.api_base_url.clone(), - api_bootstrap_token: config.api_bootstrap_token.clone(), }; tokio::spawn(indexer::worker::run_worker(state.clone(), config.scan_interval_seconds)); diff --git a/apps/indexer/src/meili.rs b/apps/indexer/src/meili.rs index 2ddba8d..8e293be 100644 --- a/apps/indexer/src/meili.rs +++ b/apps/indexer/src/meili.rs @@ -100,7 +100,7 @@ pub async fn sync_meili(pool: &PgPool, meili_url: &str, meili_master_key: &str) const MEILI_BATCH_SIZE: usize = 1000; for (i, chunk) in docs.chunks(MEILI_BATCH_SIZE).enumerate() { let batch_num = i + 1; - info!("[MEILI] Sending batch {}/{} ({} docs)", batch_num, (doc_count + MEILI_BATCH_SIZE - 1) / MEILI_BATCH_SIZE, chunk.len()); + info!("[MEILI] Sending batch {}/{} ({} docs)", batch_num, doc_count.div_ceil(MEILI_BATCH_SIZE), chunk.len()); let response = client .post(format!("{base}/indexes/books/documents")) diff --git a/apps/indexer/src/scanner.rs b/apps/indexer/src/scanner.rs index a8883e7..0699e25 100644 --- a/apps/indexer/src/scanner.rs +++ b/apps/indexer/src/scanner.rs @@ -1,7 +1,6 @@ use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; -use parsers::{detect_format, parse_metadata, BookFormat, ParsedMetadata}; -use rayon::prelude::*; +use parsers::{detect_format, parse_metadata_fast}; use serde::Serialize; use sqlx::Row; use std::{collections::HashMap, path::Path, time::Duration}; @@ -26,7 +25,11 @@ pub struct JobStats { const BATCH_SIZE: usize = 100; -pub async fn scan_library( +/// Phase 1 — Discovery: walk filesystem, extract metadata from filenames only (no archive I/O). +/// New books are inserted with page_count = NULL so the analyzer phase can fill them in. +/// Updated books (fingerprint changed) get page_count/thumbnail reset. +#[allow(clippy::too_many_arguments)] +pub async fn scan_library_discovery( state: &AppState, job_id: Uuid, library_id: Uuid, @@ -36,8 +39,14 @@ pub async fn scan_library( total_files: usize, is_full_rebuild: bool, ) -> Result<()> { - info!("[SCAN] Starting scan of library {} at path: {} (full_rebuild={})", library_id, root.display(), is_full_rebuild); - + info!( + "[SCAN] Starting discovery scan of library {} at path: {} (full_rebuild={})", + library_id, + root.display(), + is_full_rebuild + ); + + // Load existing files from DB let existing_rows = sqlx::query( r#" SELECT bf.id AS file_id, bf.book_id, bf.abs_path, bf.fingerprint @@ -60,15 +69,46 @@ pub async fn scan_library( (row.get("file_id"), row.get("book_id"), row.get("fingerprint")), ); } - info!("[SCAN] Found {} existing files in database for library {}", existing.len(), library_id); + info!( + "[SCAN] Found {} existing files in database for library {}", + existing.len(), + library_id + ); } else { - info!("[SCAN] Full rebuild: skipping existing files lookup (all will be treated as new)"); + info!("[SCAN] Full rebuild: skipping existing files lookup"); + // Delete stale directory mtime records for full rebuild + let _ = sqlx::query("DELETE FROM directory_mtimes WHERE library_id = $1") + .bind(library_id) + .execute(&state.pool) + .await; } + // Load stored directory mtimes for incremental skip + let dir_mtimes: HashMap> = if !is_full_rebuild { + let rows = sqlx::query( + "SELECT dir_path, mtime FROM directory_mtimes WHERE library_id = $1", + ) + .bind(library_id) + .fetch_all(&state.pool) + .await + .unwrap_or_default(); + + rows.into_iter() + .map(|row| { + let db_path: String = row.get("dir_path"); + let local_path = utils::remap_libraries_path(&db_path); + let mtime: DateTime = row.get("mtime"); + (local_path, mtime) + }) + .collect() + } else { + HashMap::new() + }; + let mut seen: HashMap = HashMap::new(); let mut library_processed_count = 0i32; let mut last_progress_update = std::time::Instant::now(); - + // Batching buffers let mut books_to_update: Vec = Vec::with_capacity(BATCH_SIZE); let mut files_to_update: Vec = Vec::with_capacity(BATCH_SIZE); @@ -76,37 +116,85 @@ pub async fn scan_library( let mut files_to_insert: Vec = Vec::with_capacity(BATCH_SIZE); let mut errors_to_insert: Vec = Vec::with_capacity(BATCH_SIZE); - // Step 1: Collect all book files first - #[derive(Clone)] - struct FileInfo { - path: std::path::PathBuf, - format: BookFormat, - abs_path: String, - file_name: String, - metadata: std::fs::Metadata, - mtime: DateTime, - fingerprint: String, - lookup_path: String, - } + // Track discovered directory mtimes for upsert after scan + let mut new_dir_mtimes: Vec<(String, DateTime)> = Vec::new(); + + // Prefixes (with trailing "/") of directories whose mtime hasn't changed. + // Files under these prefixes are added to `seen` but not reprocessed. + let mut skipped_dir_prefixes: Vec = Vec::new(); - let mut file_infos: Vec = Vec::new(); for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) { + let path = entry.path().to_path_buf(); + let local_path = path.to_string_lossy().to_string(); + + if entry.file_type().is_dir() { + if entry.depth() == 0 { + continue; // skip root itself + } + + // Check if parent dir is already skipped (propagate skip to subdirs) + let already_under_skipped = skipped_dir_prefixes + .iter() + .any(|p| local_path.starts_with(p.as_str())); + + if let Ok(meta) = entry.metadata() { + if let Ok(sys_mtime) = meta.modified() { + let mtime_utc: DateTime = DateTime::from(sys_mtime); + + // Only record mtimes for non-skipped dirs (to avoid polluting DB) + if !already_under_skipped { + new_dir_mtimes.push((local_path.clone(), mtime_utc)); + } + + // Skip if mtime unchanged (incremental only, not already skipped subtree) + if !is_full_rebuild && !already_under_skipped { + if let Some(&stored_mtime) = dir_mtimes.get(&local_path) { + if mtime_utc <= stored_mtime { + trace!("[SCAN] Skipping unchanged dir: {}", local_path); + // Add trailing slash so starts_with check is exact per-segment + skipped_dir_prefixes.push(format!("{}/", local_path)); + } + } + } + } + } + continue; + } + if !entry.file_type().is_file() { continue; } - let path = entry.path().to_path_buf(); + // Check if this file is under a skipped dir + let under_skipped = skipped_dir_prefixes + .iter() + .any(|p| local_path.starts_with(p.as_str())); + + if under_skipped { + // Dir unchanged — just mark file as seen so it's not deleted + let abs_path_local = local_path.clone(); + let abs_path = utils::unmap_libraries_path(&abs_path_local); + let lookup_path = utils::remap_libraries_path(&abs_path); + seen.insert(lookup_path, true); + continue; + } + let Some(format) = detect_format(&path) else { trace!("[SCAN] Skipping non-book file: {}", path.display()); continue; }; - info!("[SCAN] Found book file: {} (format: {:?})", path.display(), format); + info!( + "[SCAN] Found book file: {} (format: {:?})", + path.display(), + format + ); stats.scanned_files += 1; - + let abs_path_local = path.to_string_lossy().to_string(); let abs_path = utils::unmap_libraries_path(&abs_path_local); - let file_name = path.file_name() + let file_name = path + .file_name() .map(|s| s.to_string_lossy().to_string()) .unwrap_or_else(|| abs_path.clone()); @@ -119,38 +207,12 @@ pub async fn scan_library( let fingerprint = utils::compute_fingerprint(&path, metadata.len(), &mtime)?; let lookup_path = utils::remap_libraries_path(&abs_path); - file_infos.push(FileInfo { - path, - format, - abs_path, - file_name, - metadata, - mtime, - fingerprint, - lookup_path, - }); - } - - info!("[SCAN] Collected {} files, starting parallel parsing", file_infos.len()); - - // Step 2: Parse metadata in parallel - let parsed_results: Vec<(FileInfo, Result)> = file_infos - .into_par_iter() - .map(|file_info| { - let parse_result = parse_metadata(&file_info.path, file_info.format, root); - (file_info, parse_result) - }) - .collect(); - - info!("[SCAN] Completed parallel parsing, processing {} results", parsed_results.len()); - - // Step 3: Process results sequentially for batch inserts - for (file_info, parse_result) in parsed_results { library_processed_count += 1; *total_processed_count += 1; - // Update progress in DB every 1 second or every 10 files - let should_update_progress = last_progress_update.elapsed() > Duration::from_secs(1) || library_processed_count % 10 == 0; + // Progress update + let should_update_progress = last_progress_update.elapsed() > Duration::from_secs(1) + || library_processed_count % 10 == 0; if should_update_progress { let progress_percent = if total_files > 0 { ((*total_processed_count as f64 / total_files as f64) * 100.0) as i32 @@ -159,10 +221,10 @@ pub async fn scan_library( }; sqlx::query( - "UPDATE index_jobs SET current_file = $2, processed_files = $3, progress_percent = $4 WHERE id = $1" + "UPDATE index_jobs SET current_file = $2, processed_files = $3, progress_percent = $4 WHERE id = $1", ) .bind(job_id) - .bind(&file_info.file_name) + .bind(&file_name) .bind(*total_processed_count) .bind(progress_percent) .execute(&state.pool) @@ -171,189 +233,210 @@ pub async fn scan_library( error!("[BDD] Failed to update progress for job {}: {}", job_id, e); e })?; - + last_progress_update = std::time::Instant::now(); - - // Check if job has been cancelled + if is_job_cancelled(&state.pool, job_id).await? { info!("[JOB] Job {} cancelled by user, stopping...", job_id); - // Flush any pending batches before exiting - flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; + flush_all_batches( + &state.pool, + &mut books_to_update, + &mut files_to_update, + &mut books_to_insert, + &mut files_to_insert, + &mut errors_to_insert, + ) + .await?; return Err(anyhow::anyhow!("Job cancelled by user")); } } - let seen_key = utils::remap_libraries_path(&file_info.abs_path); - seen.insert(seen_key.clone(), true); + seen.insert(lookup_path.clone(), true); - if let Some((file_id, book_id, old_fingerprint)) = existing.get(&file_info.lookup_path).cloned() { - if !is_full_rebuild && old_fingerprint == file_info.fingerprint { - trace!("[PROCESS] Skipping unchanged file: {}", file_info.file_name); + // Fast metadata extraction — no archive I/O + let parsed = parse_metadata_fast(&path, format, root); + + if let Some((file_id, book_id, old_fingerprint)) = + existing.get(&lookup_path).cloned() + { + if !is_full_rebuild && old_fingerprint == fingerprint { + trace!("[PROCESS] Skipping unchanged file: {}", file_name); continue; } - info!("[PROCESS] Updating existing file: {} (full_rebuild={}, fingerprint_match={})", file_info.file_name, is_full_rebuild, old_fingerprint == file_info.fingerprint); + info!( + "[PROCESS] Updating existing file: {} (fingerprint_changed={})", + file_name, + old_fingerprint != fingerprint + ); - match parse_result { - Ok(parsed) => { - books_to_update.push(BookUpdate { - book_id, - title: parsed.title, - kind: utils::kind_from_format(file_info.format).to_string(), - series: parsed.series, - volume: parsed.volume, - page_count: parsed.page_count, - }); + books_to_update.push(BookUpdate { + book_id, + title: parsed.title, + kind: utils::kind_from_format(format).to_string(), + series: parsed.series, + volume: parsed.volume, + // Reset page_count so analyzer re-processes this book + page_count: None, + }); - files_to_update.push(FileUpdate { - file_id, - format: file_info.format.as_str().to_string(), - size_bytes: file_info.metadata.len() as i64, - mtime: file_info.mtime, - fingerprint: file_info.fingerprint, - }); + files_to_update.push(FileUpdate { + file_id, + format: format.as_str().to_string(), + size_bytes: metadata.len() as i64, + mtime, + fingerprint, + }); - stats.indexed_files += 1; - } - Err(err) => { - warn!("[PARSER] Failed to parse {}: {}", file_info.file_name, err); - stats.errors += 1; - - files_to_update.push(FileUpdate { - file_id, - format: file_info.format.as_str().to_string(), - size_bytes: file_info.metadata.len() as i64, - mtime: file_info.mtime, - fingerprint: file_info.fingerprint.clone(), - }); - - errors_to_insert.push(ErrorInsert { - job_id, - file_path: file_info.abs_path.clone(), - error_message: err.to_string(), - }); - - // Also need to mark file as error - we'll do this separately - sqlx::query( - "UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE id = $1" - ) - .bind(file_id) - .bind(err.to_string()) - .execute(&state.pool) - .await?; - } + // Also clear thumbnail so it gets regenerated + if let Err(e) = sqlx::query( + "UPDATE books SET thumbnail_path = NULL WHERE id = $1", + ) + .bind(book_id) + .execute(&state.pool) + .await + { + warn!( + "[BDD] Failed to clear thumbnail for book {}: {}", + book_id, e + ); } - // Flush if batch is full + stats.indexed_files += 1; + if books_to_update.len() >= BATCH_SIZE || files_to_update.len() >= BATCH_SIZE { - flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; + flush_all_batches( + &state.pool, + &mut books_to_update, + &mut files_to_update, + &mut books_to_insert, + &mut files_to_insert, + &mut errors_to_insert, + ) + .await?; } - + continue; } - // New file (thumbnails generated by API after job handoff) - info!("[PROCESS] Inserting new file: {}", file_info.file_name); + // New file — insert with page_count = NULL (analyzer fills it in) + info!("[PROCESS] Inserting new file: {}", file_name); let book_id = Uuid::new_v4(); + let file_id = Uuid::new_v4(); - match parse_result { - Ok(parsed) => { - let file_id = Uuid::new_v4(); + books_to_insert.push(BookInsert { + book_id, + library_id, + kind: utils::kind_from_format(format).to_string(), + title: parsed.title, + series: parsed.series, + volume: parsed.volume, + page_count: None, + thumbnail_path: None, + }); - books_to_insert.push(BookInsert { - book_id, - library_id, - kind: utils::kind_from_format(file_info.format).to_string(), - title: parsed.title, - series: parsed.series, - volume: parsed.volume, - page_count: parsed.page_count, - thumbnail_path: None, - }); + files_to_insert.push(FileInsert { + file_id, + book_id, + format: format.as_str().to_string(), + abs_path: abs_path.clone(), + size_bytes: metadata.len() as i64, + mtime, + fingerprint, + parse_status: "ok".to_string(), + parse_error: None, + }); - files_to_insert.push(FileInsert { - file_id, - book_id, - format: file_info.format.as_str().to_string(), - abs_path: file_info.abs_path.clone(), - size_bytes: file_info.metadata.len() as i64, - mtime: file_info.mtime, - fingerprint: file_info.fingerprint, - parse_status: "ok".to_string(), - parse_error: None, - }); + stats.indexed_files += 1; - stats.indexed_files += 1; - } - Err(err) => { - warn!("[PARSER] Failed to parse {}: {}", file_info.file_name, err); - stats.errors += 1; - let book_id = Uuid::new_v4(); - let file_id = Uuid::new_v4(); - - books_to_insert.push(BookInsert { - book_id, - library_id, - kind: utils::kind_from_format(file_info.format).to_string(), - title: utils::file_display_name(&file_info.path), - series: None, - volume: None, - page_count: None, - thumbnail_path: None, - }); - - files_to_insert.push(FileInsert { - file_id, - book_id, - format: file_info.format.as_str().to_string(), - abs_path: file_info.abs_path.clone(), - size_bytes: file_info.metadata.len() as i64, - mtime: file_info.mtime, - fingerprint: file_info.fingerprint, - parse_status: "error".to_string(), - parse_error: Some(err.to_string()), - }); - - errors_to_insert.push(ErrorInsert { - job_id, - file_path: file_info.abs_path, - error_message: err.to_string(), - }); - } - } - - // Flush if batch is full if books_to_insert.len() >= BATCH_SIZE || files_to_insert.len() >= BATCH_SIZE { - flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; + flush_all_batches( + &state.pool, + &mut books_to_update, + &mut files_to_update, + &mut books_to_insert, + &mut files_to_insert, + &mut errors_to_insert, + ) + .await?; } } - // Final flush of any remaining items - flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; + // Flush remaining batches + flush_all_batches( + &state.pool, + &mut books_to_update, + &mut files_to_update, + &mut books_to_insert, + &mut files_to_insert, + &mut errors_to_insert, + ) + .await?; - info!("[SCAN] Library {} scan complete: {} files scanned, {} indexed, {} errors", - library_id, library_processed_count, stats.indexed_files, stats.errors); + if !skipped_dir_prefixes.is_empty() { + info!( + "[SCAN] Skipped {} unchanged directories", + skipped_dir_prefixes.len() + ); + } + + info!( + "[SCAN] Library {} discovery complete: {} files scanned, {} indexed, {} errors", + library_id, library_processed_count, stats.indexed_files, stats.errors + ); // Handle deletions let mut removed_count = 0usize; - for (abs_path, (file_id, book_id, _)) in existing { - if seen.contains_key(&abs_path) { + for (abs_path, (file_id, book_id, _)) in &existing { + if seen.contains_key(abs_path) { continue; } sqlx::query("DELETE FROM book_files WHERE id = $1") .bind(file_id) .execute(&state.pool) .await?; - sqlx::query("DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)") - .bind(book_id) - .execute(&state.pool) - .await?; + sqlx::query( + "DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)", + ) + .bind(book_id) + .execute(&state.pool) + .await?; stats.removed_files += 1; removed_count += 1; } - + if removed_count > 0 { - info!("[SCAN] Removed {} stale files from database", removed_count); + info!( + "[SCAN] Removed {} stale files from database", + removed_count + ); + } + + // Upsert directory mtimes for next incremental scan + if !new_dir_mtimes.is_empty() { + let dir_paths_db: Vec = new_dir_mtimes + .iter() + .map(|(local, _)| utils::unmap_libraries_path(local)) + .collect(); + let mtimes: Vec> = new_dir_mtimes.iter().map(|(_, m)| *m).collect(); + let library_ids: Vec = vec![library_id; new_dir_mtimes.len()]; + + if let Err(e) = sqlx::query( + r#" + INSERT INTO directory_mtimes (library_id, dir_path, mtime) + SELECT * FROM UNNEST($1::uuid[], $2::text[], $3::timestamptz[]) + AS t(library_id, dir_path, mtime) + ON CONFLICT (library_id, dir_path) DO UPDATE SET mtime = EXCLUDED.mtime + "#, + ) + .bind(&library_ids) + .bind(&dir_paths_db) + .bind(&mtimes) + .execute(&state.pool) + .await + { + warn!("[SCAN] Failed to upsert directory mtimes: {}", e); + } } Ok(()) diff --git a/apps/indexer/src/watcher.rs b/apps/indexer/src/watcher.rs index 0a9d3cf..cccbf82 100644 --- a/apps/indexer/src/watcher.rs +++ b/apps/indexer/src/watcher.rs @@ -138,7 +138,7 @@ fn setup_watcher( })?; // Actually watch the library directories - for (_, root_path) in &libraries { + for root_path in libraries.values() { info!("[WATCHER] Watching directory: {}", root_path); watcher.watch(std::path::Path::new(root_path), RecursiveMode::Recursive)?; } diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index 365d549..7bf82f5 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -32,10 +32,6 @@ pub struct IndexerConfig { pub meili_master_key: String, pub scan_interval_seconds: u64, pub thumbnail_config: ThumbnailConfig, - /// API base URL for thumbnail checkup at end of build (e.g. http://api:7080) - pub api_base_url: String, - /// Token to call API (e.g. API_BOOTSTRAP_TOKEN) - pub api_bootstrap_token: String, } #[derive(Debug, Clone)] @@ -97,10 +93,6 @@ impl IndexerConfig { .and_then(|v| v.parse::().ok()) .unwrap_or(5), thumbnail_config, - api_base_url: std::env::var("API_BASE_URL") - .unwrap_or_else(|_| "http://api:7080".to_string()), - api_bootstrap_token: std::env::var("API_BOOTSTRAP_TOKEN") - .context("API_BOOTSTRAP_TOKEN is required for thumbnail checkup")?, }) } } diff --git a/crates/parsers/src/lib.rs b/crates/parsers/src/lib.rs index 5981ef4..cf2d316 100644 --- a/crates/parsers/src/lib.rs +++ b/crates/parsers/src/lib.rs @@ -2,6 +2,7 @@ use anyhow::{Context, Result}; use std::io::Read; use std::path::Path; use std::process::Command; +use std::sync::OnceLock; use uuid::Uuid; use walkdir::WalkDir; @@ -40,38 +41,52 @@ pub fn detect_format(path: &Path) -> Option { } } -pub fn parse_metadata( - path: &Path, - format: BookFormat, - library_root: &Path, -) -> Result { - let filename = path - .file_stem() - .map(|s| s.to_string_lossy().to_string()) - .unwrap_or_else(|| "Untitled".to_string()); +// Cache compiled regex patterns — compiled once on first use +static VOLUME_PATTERNS: OnceLock> = OnceLock::new(); - // Extract volume from filename (patterns: T01, T02, Vol 1, Volume 1, #1, - 01, etc.) - let volume = extract_volume(&filename); +fn get_volume_patterns() -> &'static Vec<(regex::Regex, usize)> { + VOLUME_PATTERNS.get_or_init(|| { + [ + // T01, T02 pattern (most common for manga/comics) + (r"(?i)T(\d+)", 1usize), + // Vol 1, Vol. 1, Volume 1 + (r"(?i)Vol\.?\s*(\d+)", 1), + (r"(?i)Volume\s*(\d+)", 1), + // #1, #01 + (r"#(\d+)", 1), + // - 1, - 01 at the end + (r"-\s*(\d+)\s*$", 1), + ] + .iter() + .filter_map(|(pattern, group)| { + regex::Regex::new(pattern).ok().map(|re| (re, *group)) + }) + .collect() + }) +} - // Keep original filename as title (don't clean it) - let title = filename; +fn extract_volume(filename: &str) -> Option { + for (re, group) in get_volume_patterns() { + if let Some(caps) = re.captures(filename) { + if let Some(mat) = caps.get(*group) { + return mat.as_str().parse::().ok(); + } + } + } + None +} - // Determine series from parent folder relative to library root - let series = path.parent().and_then(|parent| { - // Normalize paths for comparison (handle different separators, etc.) +fn extract_series(path: &Path, library_root: &Path) -> Option { + path.parent().and_then(|parent| { let parent_str = parent.to_string_lossy().to_string(); let root_str = library_root.to_string_lossy().to_string(); - // Try to find the library root in the parent path let relative = if let Some(idx) = parent_str.find(&root_str) { - // Found root in parent, extract what comes after let after_root = &parent_str[idx + root_str.len()..]; Path::new(after_root) - } else if let Some(relative) = parent.strip_prefix(library_root).ok() { - // Standard approach works + } else if let Ok(relative) = parent.strip_prefix(library_root) { relative } else { - // Log for diagnostic on server eprintln!( "[PARSER] Cannot determine series: parent '{}' doesn't start with root '{}'", parent.display(), @@ -80,16 +95,14 @@ pub fn parse_metadata( return None; }; - // Remove leading separators let relative_str = relative.to_string_lossy().to_string(); - let relative_clean = relative_str.trim_start_matches(|c| c == '/' || c == '\\'); + let relative_clean = relative_str.trim_start_matches(['/', '\\']); if relative_clean.is_empty() { return None; } - // Get first component as series - let first_sep = relative_clean.find(|c| c == '/' || c == '\\'); + let first_sep = relative_clean.find(['/', '\\']); let series_name = match first_sep { Some(idx) => &relative_clean[..idx], None => relative_clean, @@ -100,80 +113,178 @@ pub fn parse_metadata( } else { Some(series_name.to_string()) } - }); + }) +} - let page_count = match format { +/// Fast metadata extraction from filename only — no archive I/O. Always succeeds. +pub fn parse_metadata_fast(path: &Path, _format: BookFormat, library_root: &Path) -> ParsedMetadata { + let filename = path + .file_stem() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or_else(|| "Untitled".to_string()); + + let volume = extract_volume(&filename); + let title = filename; + let series = extract_series(path, library_root); + + ParsedMetadata { + title, + series, + volume, + page_count: None, + } +} + +pub fn parse_metadata( + path: &Path, + format: BookFormat, + library_root: &Path, +) -> Result { + let mut meta = parse_metadata_fast(path, format, library_root); + + meta.page_count = match format { BookFormat::Cbz => parse_cbz_page_count(path).ok(), BookFormat::Cbr => parse_cbr_page_count(path).ok(), BookFormat::Pdf => parse_pdf_page_count(path).ok(), }; - Ok(ParsedMetadata { - title, - series, - volume, - page_count, - }) + Ok(meta) } -fn extract_volume(filename: &str) -> Option { - // Common volume patterns: T01, T02, T1, T2, Vol 1, Vol. 1, Volume 1, #1, #01, - 1, - 01 - let patterns = [ - // T01, T02 pattern (most common for manga/comics) - (r"(?i)T(\d+)", 1), - // Vol 1, Vol. 1, Volume 1 - (r"(?i)Vol\.?\s*(\d+)", 1), - (r"(?i)Volume\s*(\d+)", 1), - // #1, #01 - (r"#(\d+)", 1), - // - 1, - 01 at the end - (r"-\s*(\d+)\s*$", 1), - ]; +/// Open an archive once and return (page_count, first_page_bytes). +/// This is more efficient than calling parse_metadata + extract_first_page separately. +pub fn analyze_book(path: &Path, format: BookFormat) -> Result<(i32, Vec)> { + match format { + BookFormat::Cbz => analyze_cbz(path), + BookFormat::Cbr => analyze_cbr(path), + BookFormat::Pdf => analyze_pdf(path), + } +} - for (pattern, group) in &patterns { - if let Ok(re) = regex::Regex::new(pattern) { - if let Some(caps) = re.captures(filename) { - if let Some(mat) = caps.get(*group) { - // Parse as integer to remove leading zeros - return mat.as_str().parse::().ok(); - } - } +fn analyze_cbz(path: &Path) -> Result<(i32, Vec)> { + let file = std::fs::File::open(path) + .with_context(|| format!("cannot open cbz: {}", path.display()))?; + let mut archive = zip::ZipArchive::new(file).context("invalid cbz archive")?; + + let mut image_names: Vec = Vec::new(); + for i in 0..archive.len() { + let entry = archive.by_index(i).context("cannot read cbz entry")?; + let name = entry.name().to_ascii_lowercase(); + if is_image_name(&name) { + image_names.push(entry.name().to_string()); + } + } + image_names.sort(); + + let count = image_names.len() as i32; + let first_image = image_names.first().context("no images found in cbz")?; + + let mut entry = archive + .by_name(first_image) + .context("cannot read first image")?; + let mut buf = Vec::new(); + entry.read_to_end(&mut buf)?; + + Ok((count, buf)) +} + +fn list_cbr_images(path: &Path) -> Result> { + // Try unrar lb first (fast) + let output = std::process::Command::new("unrar") + .arg("lb") + .arg(path) + .output() + .with_context(|| format!("failed to execute unrar lb for {}", path.display()))?; + + if output.status.success() { + let stdout = String::from_utf8_lossy(&output.stdout); + let images: Vec = stdout + .lines() + .filter(|line| is_image_name(&line.to_ascii_lowercase())) + .map(|l| l.to_string()) + .collect(); + if !images.is_empty() { + return Ok(images); } } - None + // Fallback: lsar (from unar package) handles UTF-16BE encoded filenames + let lsar_output = std::process::Command::new("lsar") + .arg(path) + .output() + .with_context(|| format!("failed to execute lsar for {}", path.display()))?; + + if !lsar_output.status.success() { + return Err(anyhow::anyhow!( + "both unrar lb and lsar failed for {}", + path.display() + )); + } + + let stdout = String::from_utf8_lossy(&lsar_output.stdout); + // lsar output: first line is archive info, then one file per line (indented) + let images: Vec = stdout + .lines() + .skip(1) // skip the archive header line + .map(|l| l.trim().to_string()) + .filter(|line| is_image_name(&line.to_ascii_lowercase())) + .collect(); + + Ok(images) } -#[allow(dead_code)] -fn clean_title(filename: &str) -> String { - // Remove volume patterns from title to clean it up - let cleaned = regex::Regex::new(r"(?i)\s*T\d+\s*") - .ok() - .and_then(|re| Some(re.replace_all(filename, " ").to_string())) - .unwrap_or_else(|| filename.to_string()); +fn analyze_cbr(path: &Path) -> Result<(i32, Vec)> { + let mut image_names = list_cbr_images(path)?; + image_names.sort(); - let cleaned = regex::Regex::new(r"(?i)\s*Vol\.?\s*\d+\s*") - .ok() - .and_then(|re| Some(re.replace_all(&cleaned, " ").to_string())) - .unwrap_or_else(|| cleaned); + let count = image_names.len() as i32; + if count == 0 { + return Err(anyhow::anyhow!("no images found in cbr: {}", path.display())); + } - let cleaned = regex::Regex::new(r"(?i)\s*Volume\s*\d+\s*") - .ok() - .and_then(|re| Some(re.replace_all(&cleaned, " ").to_string())) - .unwrap_or_else(|| cleaned); + let first_name = &image_names[0]; - let cleaned = regex::Regex::new(r"#\d+") - .ok() - .and_then(|re| Some(re.replace_all(&cleaned, " ").to_string())) - .unwrap_or_else(|| cleaned); + // Try unrar p to extract first image to stdout (faster — no temp dir) + let p_output = std::process::Command::new("unrar") + .args(["p", "-inul"]) + .arg(path) + .arg(first_name) + .output(); - let cleaned = regex::Regex::new(r"-\s*\d+\s*$") - .ok() - .and_then(|re| Some(re.replace_all(&cleaned, " ").to_string())) - .unwrap_or_else(|| cleaned); + match p_output { + Ok(out) if out.status.success() && looks_like_image(&out.stdout) => Ok((count, out.stdout)), + _ => { + // Fallback: full extraction with unar (handles special chars, encoding issues) + let image_bytes = extract_cbr_first_page(path)?; + Ok((count, image_bytes)) + } + } +} - // Clean up extra spaces - cleaned.split_whitespace().collect::>().join(" ") +/// Check image magic bytes to validate that bytes are a real image before decoding. +fn looks_like_image(bytes: &[u8]) -> bool { + if bytes.len() < 12 { + return false; + } + // JPEG: FF D8 FF + if bytes.starts_with(&[0xFF, 0xD8, 0xFF]) { + return true; + } + // PNG: 89 50 4E 47 0D 0A 1A 0A + if bytes.starts_with(&[0x89, 0x50, 0x4E, 0x47]) { + return true; + } + // WebP: RIFF....WEBP + if &bytes[0..4] == b"RIFF" && &bytes[8..12] == b"WEBP" { + return true; + } + false +} + +fn analyze_pdf(path: &Path) -> Result<(i32, Vec)> { + let count = parse_pdf_page_count(path)?; + let image_bytes = extract_pdf_first_page(path)?; + Ok((count, image_bytes)) } fn parse_cbz_page_count(path: &Path) -> Result { @@ -192,26 +303,11 @@ fn parse_cbz_page_count(path: &Path) -> Result { } fn parse_cbr_page_count(path: &Path) -> Result { - let output = std::process::Command::new("unrar") - .arg("lb") - .arg(path) - .output() - .with_context(|| format!("failed to execute unrar for {}", path.display()))?; - - if !output.status.success() { - return Err(anyhow::anyhow!("unrar failed for {}", path.display())); - } - - let stdout = String::from_utf8_lossy(&output.stdout); - let count = stdout - .lines() - .filter(|line| is_image_name(&line.to_ascii_lowercase())) - .count() as i32; - Ok(count) + let images = list_cbr_images(path)?; + Ok(images.len() as i32) } fn parse_pdf_page_count(path: &Path) -> Result { - // Use pdfinfo command line tool instead of lopdf for better performance let output = std::process::Command::new("pdfinfo") .arg(path) .output() @@ -238,6 +334,10 @@ fn parse_pdf_page_count(path: &Path) -> Result { } fn is_image_name(name: &str) -> bool { + // Skip macOS metadata entries (__MACOSX/ prefix or AppleDouble ._* files) + if name.starts_with("__macosx/") || name.contains("/._") || name.starts_with("._") { + return false; + } name.ends_with(".jpg") || name.ends_with(".jpeg") || name.ends_with(".png") @@ -282,7 +382,6 @@ fn extract_cbr_first_page(path: &Path) -> Result> { let tmp_dir = std::env::temp_dir().join(format!("stripstream-cbr-thumb-{}", Uuid::new_v4())); std::fs::create_dir_all(&tmp_dir).context("cannot create temp dir")?; - // Use env command like the API does let output = std::process::Command::new("env") .args(["LC_ALL=en_US.UTF-8", "LANG=en_US.UTF-8", "unar", "-o"]) .arg(&tmp_dir) @@ -298,7 +397,6 @@ fn extract_cbr_first_page(path: &Path) -> Result> { )); } - // Use WalkDir for recursive search (CBR can have subdirectories) let mut image_files: Vec<_> = WalkDir::new(&tmp_dir) .into_iter() .filter_map(|e| e.ok()) @@ -346,3 +444,33 @@ fn extract_pdf_first_page(path: &Path) -> Result> { let _ = std::fs::remove_dir_all(&tmp_dir); Ok(data) } + +#[allow(dead_code)] +fn clean_title(filename: &str) -> String { + let cleaned = regex::Regex::new(r"(?i)\s*T\d+\s*") + .ok() + .map(|re| re.replace_all(filename, " ").to_string()) + .unwrap_or_else(|| filename.to_string()); + + let cleaned = regex::Regex::new(r"(?i)\s*Vol\.?\s*\d+\s*") + .ok() + .map(|re| re.replace_all(&cleaned, " ").to_string()) + .unwrap_or(cleaned); + + let cleaned = regex::Regex::new(r"(?i)\s*Volume\s*\d+\s*") + .ok() + .map(|re| re.replace_all(&cleaned, " ").to_string()) + .unwrap_or(cleaned); + + let cleaned = regex::Regex::new(r"#\d+") + .ok() + .map(|re| re.replace_all(&cleaned, " ").to_string()) + .unwrap_or(cleaned); + + let cleaned = regex::Regex::new(r"-\s*\d+\s*$") + .ok() + .map(|re| re.replace_all(&cleaned, " ").to_string()) + .unwrap_or(cleaned); + + cleaned.split_whitespace().collect::>().join(" ") +} diff --git a/infra/migrations/0012_directory_mtimes.sql b/infra/migrations/0012_directory_mtimes.sql new file mode 100644 index 0000000..ff1d2c8 --- /dev/null +++ b/infra/migrations/0012_directory_mtimes.sql @@ -0,0 +1,8 @@ +CREATE TABLE directory_mtimes ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + library_id UUID NOT NULL REFERENCES libraries(id) ON DELETE CASCADE, + dir_path TEXT NOT NULL, + mtime TIMESTAMPTZ NOT NULL, + UNIQUE(library_id, dir_path) +); +CREATE INDEX idx_directory_mtimes_library ON directory_mtimes(library_id); diff --git a/infra/smoke.sh b/infra/smoke.sh index 9b57053..426e26a 100755 --- a/infra/smoke.sh +++ b/infra/smoke.sh @@ -5,37 +5,125 @@ BASE_API="${BASE_API:-http://127.0.0.1:7080}" BASE_INDEXER="${BASE_INDEXER:-http://127.0.0.1:7081}" BASE_BACKOFFICE="${BASE_BACKOFFICE:-${BASE_ADMIN:-http://127.0.0.1:7082}}" TOKEN="${API_TOKEN:-stripstream-dev-bootstrap-token}" +# Max seconds to wait for a job to finish +JOB_TIMEOUT="${JOB_TIMEOUT:-120}" + +# ─── helpers ──────────────────────────────────────────────────────────────── + +auth() { curl -fsS -H "Authorization: Bearer $TOKEN" "$@"; } + +# Wait for a job (by id) to reach status success or failed. +wait_job() { + local job_id="$1" + local label="${2:-job}" + local waited=0 + while true; do + local status + status="$(auth "$BASE_API/index/jobs/$job_id" | python3 -c "import sys,json; print(json.load(sys.stdin).get('status',''))")" + case "$status" in + success) echo "[smoke] $label finished: success"; return 0 ;; + failed) echo "[smoke] $label finished: FAILED"; return 1 ;; + cancelled) echo "[smoke] $label finished: cancelled"; return 1 ;; + esac + if [ "$waited" -ge "$JOB_TIMEOUT" ]; then + echo "[smoke] $label timed out after ${JOB_TIMEOUT}s (last status: $status)"; return 1 + fi + sleep 2; waited=$((waited + 2)) + done +} + +# ─── health ────────────────────────────────────────────────────────────────── echo "[smoke] health checks" -curl -fsS "$BASE_API/health" >/dev/null -curl -fsS "$BASE_API/ready" >/dev/null +curl -fsS "$BASE_API/health" >/dev/null +curl -fsS "$BASE_API/ready" >/dev/null curl -fsS "$BASE_INDEXER/health" >/dev/null curl -fsS "$BASE_INDEXER/ready" >/dev/null curl -fsS "$BASE_BACKOFFICE/health" >/dev/null +# ─── libraries ─────────────────────────────────────────────────────────────── + echo "[smoke] list libraries" -curl -fsS -H "Authorization: Bearer $TOKEN" "$BASE_API/libraries" >/dev/null +auth "$BASE_API/libraries" >/dev/null -echo "[smoke] queue rebuild" -curl -fsS -X POST -H "Authorization: Bearer $TOKEN" "$BASE_API/index/rebuild" >/dev/null -sleep 2 +# ─── full rebuild (2-phase: discovery + analysis) ──────────────────────────── -echo "[smoke] list books and optional page fetch" -BOOKS_JSON="$(curl -fsS -H "Authorization: Bearer $TOKEN" "$BASE_API/books")" -BOOK_ID="$(BOOKS_JSON="$BOOKS_JSON" python3 - <<'PY' -import json -import os +echo "[smoke] queue full rebuild" +REBUILD_JOB_ID="$(auth -X POST "$BASE_API/index/rebuild" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")" +echo "[smoke] rebuild job id: $REBUILD_JOB_ID" +wait_job "$REBUILD_JOB_ID" "rebuild" + +# ─── verify books have page_count + thumbnail after analysis phase ──────────── + +echo "[smoke] verify books metadata (page_count + thumbnail)" +BOOKS_JSON="$(auth "$BASE_API/books")" +export BOOKS_JSON +python3 - <<'PY' +import json, os, sys payload = json.loads(os.environ.get("BOOKS_JSON", "{}")) items = payload.get("items") or [] +if not items: + print("[smoke] no books found — skipping metadata check") + sys.exit(0) + +missing_page_count = [b["id"] for b in items if not b.get("page_count")] +missing_thumbnail = [b["id"] for b in items if not b.get("thumbnail_url")] + +if missing_page_count: + print(f"[smoke] WARN: {len(missing_page_count)} book(s) still missing page_count") +if missing_thumbnail: + print(f"[smoke] WARN: {len(missing_thumbnail)} book(s) still missing thumbnail") + +print(f"[smoke] {len(items)} books, {len(items)-len(missing_page_count)} with page_count, {len(items)-len(missing_thumbnail)} with thumbnail") +PY + +# ─── page fetch ────────────────────────────────────────────────────────────── + +BOOK_ID="$(python3 - <<'PY' +import json, os +items = json.loads(os.environ.get("BOOKS_JSON", "{}")).get("items") or [] print(items[0]["id"] if items else "") PY )" if [ -n "$BOOK_ID" ]; then - curl -fsS -H "Authorization: Bearer $TOKEN" "$BASE_API/books/$BOOK_ID/pages/1?format=webp&quality=80&width=1080" >/dev/null + echo "[smoke] fetch page 1 for book $BOOK_ID" + auth "$BASE_API/books/$BOOK_ID/pages/1?format=webp&quality=80&width=1080" >/dev/null + + echo "[smoke] fetch thumbnail for book $BOOK_ID" + auth "$BASE_API/books/$BOOK_ID/thumbnail" >/dev/null fi +# ─── thumbnail rebuild (handled by indexer, not API) ───────────────────────── + +echo "[smoke] thumbnail rebuild job" +THUMB_REBUILD_ID="$(auth -X POST -H "Content-Type: application/json" -d '{}' "$BASE_API/index/thumbnails/rebuild" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")" +echo "[smoke] thumbnail rebuild job id: $THUMB_REBUILD_ID" +wait_job "$THUMB_REBUILD_ID" "thumbnail_rebuild" + +# ─── thumbnail regenerate ──────────────────────────────────────────────────── + +echo "[smoke] thumbnail regenerate job" +THUMB_REGEN_ID="$(auth -X POST -H "Content-Type: application/json" -d '{}' "$BASE_API/index/thumbnails/regenerate" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")" +echo "[smoke] thumbnail regenerate job id: $THUMB_REGEN_ID" +wait_job "$THUMB_REGEN_ID" "thumbnail_regenerate" + +# ─── route checkup supprimée (doit retourner 404) ──────────────────────────── + +echo "[smoke] /index/jobs/:id/thumbnails/checkup must be gone (404)" +HTTP_CODE="$(curl -s -o /dev/null -w "%{http_code}" -X POST \ + -H "Authorization: Bearer $TOKEN" \ + "$BASE_API/index/jobs/$REBUILD_JOB_ID/thumbnails/checkup")" +if [ "$HTTP_CODE" = "404" ]; then + echo "[smoke] checkup route correctly returns 404" +else + echo "[smoke] FAIL: checkup route returned $HTTP_CODE (expected 404)" + exit 1 +fi + +# ─── metrics ───────────────────────────────────────────────────────────────── + echo "[smoke] metrics" curl -fsS "$BASE_API/metrics" >/dev/null