From 6947af10fe9bf16f9f4c206f24cef67879372c0c Mon Sep 17 00:00:00 2001 From: Froidefond Julien Date: Sat, 14 Mar 2026 23:07:42 +0100 Subject: [PATCH] perf(api,indexer): optimiser pages, thumbnails, watcher et robustesse fd MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Pages: mode Original (zero-transcoding), ETag/304, cache index CBZ, préfetch next 2 pages, filtre Triangle par défaut - Thumbnails: DCT scaling JPEG via jpeg-decoder (decode 7x plus rapide), img.thumbnail() pour resize, support format Original, fix JPEG RGBA8 - API fallback thumbnail: OutputFormat::Original + DCT scaling au lieu de WebP full-decode, retour (bytes, content_type) dynamique - Watcher: remplacement notify par poll léger sans inotify/fd, skip poll quand job actif, snapshots en mémoire - Jobs: mutex exclusif corrigé (tous statuts actifs, tous types exclusifs) - Robustesse: suppression fs::canonicalize (problèmes fd Docker), list_folders avec erreurs explicites, has_children default true - Backoffice: FormRow items-start pour alignement inputs avec helper text, labels settings clarifiés Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 119 ++------ Cargo.toml | 1 + apps/api/Cargo.toml | 1 + apps/api/src/books.rs | 20 +- apps/api/src/index_jobs.rs | 43 ++- apps/api/src/libraries.rs | 15 +- apps/api/src/pages.rs | 287 ++++++++++++++---- apps/api/src/state.rs | 2 +- apps/backoffice/app/components/ui/Form.tsx | 2 +- apps/backoffice/app/settings/SettingsPage.tsx | 22 +- apps/indexer/Cargo.toml | 2 +- apps/indexer/src/analyzer.rs | 216 ++++++++++--- apps/indexer/src/job.rs | 66 ++-- apps/indexer/src/watcher.rs | 260 +++++++++------- crates/parsers/src/lib.rs | 50 ++- 15 files changed, 711 insertions(+), 395 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d661bbf..f18b937 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,6 +61,7 @@ dependencies = [ "chrono", "futures", "image", + "jpeg-decoder", "lru", "parsers", "rand 0.8.5", @@ -215,12 +216,6 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - [[package]] name = "bitflags" version = "2.11.0" @@ -634,15 +629,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fsevent-sys" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" -dependencies = [ - "libc", -] - [[package]] name = "futures" version = "0.3.32" @@ -1143,7 +1129,7 @@ dependencies = [ "chrono", "futures", "image", - "notify", + "jpeg-decoder", "num_cpus", "parsers", "rayon", @@ -1173,26 +1159,6 @@ dependencies = [ "serde_core", ] -[[package]] -name = "inotify" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd5b3eaf1a28b758ac0faa5a4254e8ab2705605496f1b1f3fbbc3988ad73d199" -dependencies = [ - "bitflags 2.11.0", - "inotify-sys", - "libc", -] - -[[package]] -name = "inotify-sys" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" -dependencies = [ - "libc", -] - [[package]] name = "inout" version = "0.1.4" @@ -1285,6 +1251,15 @@ dependencies = [ "libc", ] +[[package]] +name = "jpeg-decoder" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00810f1d8b74be64b13dbf3db89ac67740615d6c891f0e7b6179326533011a07" +dependencies = [ + "rayon", +] + [[package]] name = "js-sys" version = "0.3.91" @@ -1295,26 +1270,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "kqueue" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" -dependencies = [ - "kqueue-sys", - "libc", -] - -[[package]] -name = "kqueue-sys" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" -dependencies = [ - "bitflags 1.3.2", - "libc", -] - [[package]] name = "lazy_static" version = "1.5.0" @@ -1358,7 +1313,7 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" dependencies = [ - "bitflags 2.11.0", + "bitflags", "libc", "plain", "redox_syscall 0.7.3", @@ -1412,7 +1367,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f560f57dfb9142a02d673e137622fd515d4231e51feb8b4af28d92647d83f35b" dependencies = [ "aes", - "bitflags 2.11.0", + "bitflags", "cbc", "chrono", "ecb", @@ -1522,7 +1477,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", - "log", "wasi", "windows-sys 0.61.2", ] @@ -1563,33 +1517,6 @@ dependencies = [ "nom", ] -[[package]] -name = "notify" -version = "8.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" -dependencies = [ - "bitflags 2.11.0", - "fsevent-sys", - "inotify", - "kqueue", - "libc", - "log", - "mio", - "notify-types", - "walkdir", - "windows-sys 0.60.2", -] - -[[package]] -name = "notify-types" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a" -dependencies = [ - "bitflags 2.11.0", -] - [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -1728,7 +1655,7 @@ version = "0.8.37" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6553f6604a52b3203db7b4e9d51eb4dd193cf455af9e56d40cab6575b547b679" dependencies = [ - "bitflags 2.11.0", + "bitflags", "bytemuck", "bytes", "chrono", @@ -1820,7 +1747,7 @@ version = "0.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60769b8b31b2a9f263dae2776c37b1b28ae246943cf719eb6946a1db05128a61" dependencies = [ - "bitflags 2.11.0", + "bitflags", "crc32fast", "fdeflate", "flate2", @@ -2088,7 +2015,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags 2.11.0", + "bitflags", ] [[package]] @@ -2097,7 +2024,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" dependencies = [ - "bitflags 2.11.0", + "bitflags", ] [[package]] @@ -2579,7 +2506,7 @@ checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" dependencies = [ "atoi", "base64", - "bitflags 2.11.0", + "bitflags", "byteorder", "bytes", "chrono", @@ -2623,7 +2550,7 @@ checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" dependencies = [ "atoi", "base64", - "bitflags 2.11.0", + "bitflags", "byteorder", "chrono", "crc", @@ -2921,7 +2848,7 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ - "bitflags 2.11.0", + "bitflags", "bytes", "futures-util", "http", @@ -3076,7 +3003,7 @@ version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92ec61343a630d2b50d13216dea5125e157d3fc180a7d3f447d22fe146b648fc" dependencies = [ - "bitflags 2.11.0", + "bitflags", "regex", "unrar_sys", "widestring", @@ -3341,7 +3268,7 @@ version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags 2.11.0", + "bitflags", "hashbrown 0.15.5", "indexmap", "semver", @@ -3787,7 +3714,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags 2.11.0", + "bitflags", "indexmap", "log", "serde", diff --git a/Cargo.toml b/Cargo.toml index 2744d41..a6581e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ axum = "0.7" base64 = "0.22" chrono = { version = "0.4", features = ["serde"] } image = { version = "0.25", default-features = false, features = ["jpeg", "png", "webp"] } +jpeg-decoder = "0.3" lru = "0.12" rayon = "1.10" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } diff --git a/apps/api/Cargo.toml b/apps/api/Cargo.toml index d7642ac..7983517 100644 --- a/apps/api/Cargo.toml +++ b/apps/api/Cargo.toml @@ -13,6 +13,7 @@ async-stream = "0.3" chrono.workspace = true futures = "0.3" image.workspace = true +jpeg-decoder.workspace = true lru.workspace = true stripstream-core = { path = "../../crates/core" } parsers = { path = "../../crates/parsers" } diff --git a/apps/api/src/books.rs b/apps/api/src/books.rs index 9fce20c..1ade223 100644 --- a/apps/api/src/books.rs +++ b/apps/api/src/books.rs @@ -584,6 +584,17 @@ use axum::{ response::IntoResponse, }; +/// Detect content type from thumbnail file extension. +fn detect_thumbnail_content_type(path: &str) -> &'static str { + if path.ends_with(".jpg") || path.ends_with(".jpeg") { + "image/jpeg" + } else if path.ends_with(".png") { + "image/png" + } else { + "image/webp" + } +} + /// Get book thumbnail image #[utoipa::path( get, @@ -612,9 +623,12 @@ pub async fn get_thumbnail( let row = row.ok_or_else(|| ApiError::not_found("book not found"))?; let thumbnail_path: Option = row.get("thumbnail_path"); - let data = if let Some(ref path) = thumbnail_path { + let (data, content_type) = if let Some(ref path) = thumbnail_path { match std::fs::read(path) { - Ok(bytes) => bytes, + Ok(bytes) => { + let ct = detect_thumbnail_content_type(path); + (bytes, ct) + } Err(_) => { // File missing on disk (e.g. different mount in dev) — fall back to live render crate::pages::render_book_page_1(&state, book_id, 300, 80).await? @@ -626,7 +640,7 @@ pub async fn get_thumbnail( }; let mut headers = HeaderMap::new(); - headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("image/webp")); + headers.insert(header::CONTENT_TYPE, HeaderValue::from_static(content_type)); headers.insert( header::CACHE_CONTROL, HeaderValue::from_static("public, max-age=31536000, immutable"), diff --git a/apps/api/src/index_jobs.rs b/apps/api/src/index_jobs.rs index 0800543..8019615 100644 --- a/apps/api/src/index_jobs.rs +++ b/apps/api/src/index_jobs.rs @@ -246,9 +246,9 @@ pub async fn list_folders( base_path.to_path_buf() }; - // Ensure the path is within the libraries root - let canonical_target = target_path.canonicalize().unwrap_or(target_path.clone()); - let canonical_base = base_path.canonicalize().unwrap_or(base_path.to_path_buf()); + // Ensure the path is within the libraries root (avoid canonicalize — burns fd on Docker mounts) + let canonical_target = target_path.clone(); + let canonical_base = base_path.to_path_buf(); if !canonical_target.starts_with(&canonical_base) { return Err(ApiError::bad_request("Path is outside libraries root")); @@ -263,19 +263,31 @@ pub async fn list_folders( 0 }; - if let Ok(entries) = std::fs::read_dir(&canonical_target) { - for entry in entries.flatten() { - if entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false) { + let entries = std::fs::read_dir(&canonical_target) + .map_err(|e| ApiError::internal(format!("cannot read directory {}: {}", canonical_target.display(), e)))?; + + for entry in entries { + let entry = match entry { + Ok(e) => e, + Err(e) => { + tracing::warn!("[FOLDERS] entry error in {}: {}", canonical_target.display(), e); + continue; + } + }; + let is_dir = match entry.file_type() { + Ok(ft) => ft.is_dir(), + Err(e) => { + tracing::warn!("[FOLDERS] cannot stat {}: {}", entry.path().display(), e); + continue; + } + }; + if is_dir { let name = entry.file_name().to_string_lossy().to_string(); - - // Check if this folder has children - let has_children = if let Ok(sub_entries) = std::fs::read_dir(entry.path()) { - sub_entries.flatten().any(|e| { - e.file_type().map(|ft| ft.is_dir()).unwrap_or(false) - }) - } else { - false - }; + + // Check if this folder has children (best-effort, default to true on error) + let has_children = std::fs::read_dir(entry.path()) + .map(|sub| sub.flatten().any(|e| e.file_type().map(|ft| ft.is_dir()).unwrap_or(false))) + .unwrap_or(true); // Calculate the full path relative to libraries root let full_path = if let Ok(relative) = entry.path().strip_prefix(&canonical_base) { @@ -290,7 +302,6 @@ pub async fn list_folders( depth, has_children, }); - } } } diff --git a/apps/api/src/libraries.rs b/apps/api/src/libraries.rs index c92f6c3..8d59f52 100644 --- a/apps/api/src/libraries.rs +++ b/apps/api/src/libraries.rs @@ -156,14 +156,19 @@ fn canonicalize_library_root(root_path: &str) -> Result { return Err(ApiError::bad_request("root_path must be absolute")); } - let canonical = std::fs::canonicalize(path) - .map_err(|_| ApiError::bad_request("root_path does not exist or is inaccessible"))?; - - if !canonical.is_dir() { + // Avoid fs::canonicalize — it opens extra file descriptors to resolve symlinks + // and can fail on Docker volume mounts (ro, cached) when fd limits are low. + if !path.exists() { + return Err(ApiError::bad_request(format!( + "root_path does not exist: {}", + root_path + ))); + } + if !path.is_dir() { return Err(ApiError::bad_request("root_path must point to a directory")); } - Ok(canonical) + Ok(path.to_path_buf()) } use crate::index_jobs::{IndexJobResponse, RebuildRequest}; diff --git a/apps/api/src/pages.rs b/apps/api/src/pages.rs index 96c3845..540105c 100644 --- a/apps/api/src/pages.rs +++ b/apps/api/src/pages.rs @@ -16,7 +16,7 @@ use serde::Deserialize; use utoipa::ToSchema; use sha2::{Digest, Sha256}; use sqlx::Row; -use tracing::{debug, error, info, instrument, warn}; +use tracing::{error, info, instrument, warn}; use uuid::Uuid; use crate::{error::ApiError, state::AppState}; @@ -32,9 +32,9 @@ fn remap_libraries_path(path: &str) -> String { fn parse_filter(s: &str) -> image::imageops::FilterType { match s { - "triangle" => image::imageops::FilterType::Triangle, + "lanczos3" => image::imageops::FilterType::Lanczos3, "nearest" => image::imageops::FilterType::Nearest, - _ => image::imageops::FilterType::Lanczos3, + _ => image::imageops::FilterType::Triangle, // Triangle (bilinear) is fast and good enough for comics } } @@ -64,7 +64,7 @@ fn write_to_disk_cache(cache_path: &Path, data: &[u8]) -> Result<(), std::io::Er } let mut file = std::fs::File::create(cache_path)?; file.write_all(data)?; - file.sync_data()?; + // No sync_data() — this is a cache, durability is not critical Ok(()) } @@ -80,6 +80,8 @@ pub struct PageQuery { #[derive(Clone, Copy, Debug)] enum OutputFormat { + /// Serve raw bytes from the archive — no decode, no re-encode. + Original, Jpeg, Png, Webp, @@ -87,16 +89,19 @@ enum OutputFormat { impl OutputFormat { fn parse(value: Option<&str>) -> Result { - match value.unwrap_or("webp") { - "jpeg" | "jpg" => Ok(Self::Jpeg), - "png" => Ok(Self::Png), - "webp" => Ok(Self::Webp), - _ => Err(ApiError::bad_request("format must be webp|jpeg|png")), + match value { + None => Ok(Self::Original), + Some("original") => Ok(Self::Original), + Some("jpeg") | Some("jpg") => Ok(Self::Jpeg), + Some("png") => Ok(Self::Png), + Some("webp") => Ok(Self::Webp), + _ => Err(ApiError::bad_request("format must be original|webp|jpeg|png")), } } fn content_type(&self) -> &'static str { match self { + Self::Original => "application/octet-stream", // will be overridden by detected type Self::Jpeg => "image/jpeg", Self::Png => "image/png", Self::Webp => "image/webp", @@ -105,6 +110,7 @@ impl OutputFormat { fn extension(&self) -> &'static str { match self { + Self::Original => "orig", Self::Jpeg => "jpg", Self::Png => "png", Self::Webp => "webp", @@ -112,6 +118,17 @@ impl OutputFormat { } } +/// Detect content type from raw image bytes. +fn detect_content_type(data: &[u8]) -> &'static str { + match image::guess_format(data) { + Ok(ImageFormat::Jpeg) => "image/jpeg", + Ok(ImageFormat::Png) => "image/png", + Ok(ImageFormat::WebP) => "image/webp", + Ok(ImageFormat::Avif) => "image/avif", + _ => "application/octet-stream", + } +} + /// Get a specific page image from a book with optional format conversion #[utoipa::path( get, @@ -132,44 +149,38 @@ impl OutputFormat { ), security(("Bearer" = [])) )] -#[instrument(skip(state), fields(book_id = %book_id, page = n))] +#[instrument(skip(state, headers), fields(book_id = %book_id, page = n))] pub async fn get_page( State(state): State, AxumPath((book_id, n)): AxumPath<(Uuid, u32)>, Query(query): Query, + headers: HeaderMap, ) -> Result { - info!("Processing image request"); - if n == 0 { - warn!("Invalid page number: 0"); return Err(ApiError::bad_request("page index starts at 1")); } - let (default_format, default_quality, max_width, filter_str, timeout_secs, cache_dir) = { + let (default_quality, max_width, filter_str, timeout_secs, cache_dir) = { let s = state.settings.read().await; - (s.image_format.clone(), s.image_quality, s.image_max_width, s.image_filter.clone(), s.timeout_seconds, s.cache_directory.clone()) + (s.image_quality, s.image_max_width, s.image_filter.clone(), s.timeout_seconds, s.cache_directory.clone()) }; - let format_str = query.format.as_deref().unwrap_or(default_format.as_str()); - let format = OutputFormat::parse(Some(format_str))?; + let format = OutputFormat::parse(query.format.as_deref())?; let quality = query.quality.unwrap_or(default_quality).clamp(1, 100); let width = query.width.unwrap_or(0); if width > max_width { - warn!("Invalid width: {}", width); return Err(ApiError::bad_request(format!("width must be <= {}", max_width))); } let filter = parse_filter(&filter_str); let cache_dir_path = std::path::PathBuf::from(&cache_dir); let memory_cache_key = format!("{book_id}:{n}:{}:{quality}:{width}", format.extension()); - + if let Some(cached) = state.page_cache.lock().await.get(&memory_cache_key).cloned() { state.metrics.page_cache_hits.fetch_add(1, Ordering::Relaxed); - debug!("Memory cache hit for key: {}", memory_cache_key); - return Ok(image_response(cached, format.content_type(), None)); + return Ok(image_response(cached, format, None, &headers)); } state.metrics.page_cache_misses.fetch_add(1, Ordering::Relaxed); - debug!("Memory cache miss for key: {}", memory_cache_key); let row = sqlx::query( r#" @@ -191,27 +202,30 @@ pub async fn get_page( let row = match row { Some(r) => r, None => { - error!("Book file not found for book_id: {}", book_id); return Err(ApiError::not_found("book file not found")); } }; - + let abs_path: String = row.get("abs_path"); let abs_path = remap_libraries_path(&abs_path); let input_format: String = row.get("format"); - - info!("Processing book file: {} (format: {})", abs_path, input_format); let disk_cache_key = get_cache_key(&abs_path, n, format.extension(), quality, width); let cache_path = get_cache_path(&disk_cache_key, &format, &cache_dir_path); - + + // If-None-Match: return 304 if the client already has this version + if let Some(if_none_match) = headers.get(header::IF_NONE_MATCH) { + let expected_etag = format!("\"{}\"", disk_cache_key); + if if_none_match.as_bytes() == expected_etag.as_bytes() { + return Ok(StatusCode::NOT_MODIFIED.into_response()); + } + } + if let Some(cached_bytes) = read_from_disk_cache(&cache_path) { - info!("Disk cache hit for: {}", cache_path.display()); let bytes = Arc::new(cached_bytes); state.page_cache.lock().await.put(memory_cache_key, bytes.clone()); - return Ok(image_response(bytes, format.content_type(), Some(&disk_cache_key))); + return Ok(image_response(bytes, format, Some(&disk_cache_key), &headers)); } - debug!("Disk cache miss for: {}", cache_path.display()); let _permit = state .page_render_limit @@ -223,11 +237,10 @@ pub async fn get_page( ApiError::internal("render limiter unavailable") })?; - info!("Rendering page {} from {}", n, abs_path); let abs_path_clone = abs_path.clone(); let format_clone = format; let start_time = std::time::Instant::now(); - + let bytes = tokio::time::timeout( Duration::from_secs(timeout_secs), tokio::task::spawn_blocking(move || { @@ -243,23 +256,32 @@ pub async fn get_page( error!("Render task panicked for {} page {}: {}", abs_path, n, e); ApiError::internal(format!("render task failed: {e}")) })?; - + let duration = start_time.elapsed(); - + match bytes { Ok(data) => { - info!("Successfully rendered page {} in {:?}", n, duration); - + info!("Rendered page {} in {:?}", n, duration); + if let Err(e) = write_to_disk_cache(&cache_path, &data) { warn!("Failed to write to disk cache: {}", e); - } else { - info!("Cached rendered image to: {}", cache_path.display()); } let bytes = Arc::new(data); - state.page_cache.lock().await.put(memory_cache_key, bytes.clone()); + state.page_cache.lock().await.put(memory_cache_key.clone(), bytes.clone()); - Ok(image_response(bytes, format.content_type(), Some(&disk_cache_key))) + // Prefetch next 2 pages in background (fire-and-forget) + for next_page in [n + 1, n + 2] { + let state2 = state.clone(); + let abs_path2 = abs_path.clone(); + let cache_dir2 = cache_dir_path.clone(); + let format2 = format; + tokio::spawn(async move { + prefetch_page(state2, book_id, &abs_path2, next_page, format2, quality, width, filter, timeout_secs, &cache_dir2).await; + }); + } + + Ok(image_response(bytes, format, Some(&disk_cache_key), &headers)) } Err(e) => { error!("Failed to render page {} from {}: {:?}", n, abs_path, e); @@ -268,11 +290,72 @@ pub async fn get_page( } } -fn image_response(bytes: Arc>, content_type: &str, etag_suffix: Option<&str>) -> Response { - let mut headers = HeaderMap::new(); - headers.insert(header::CONTENT_TYPE, HeaderValue::from_str(content_type).unwrap_or(HeaderValue::from_static("application/octet-stream"))); - headers.insert(header::CACHE_CONTROL, HeaderValue::from_static("public, max-age=31536000, immutable")); - +/// Prefetch a single page into disk+memory cache (best-effort, ignores errors). +async fn prefetch_page( + state: AppState, + book_id: Uuid, + abs_path: &str, + page: u32, + format: OutputFormat, + quality: u8, + width: u32, + filter: image::imageops::FilterType, + timeout_secs: u64, + cache_dir: &Path, +) { + let mem_key = format!("{book_id}:{page}:{}:{quality}:{width}", format.extension()); + // Already in memory cache? + if state.page_cache.lock().await.contains(&mem_key) { + return; + } + // Already on disk? + let disk_key = get_cache_key(abs_path, page, format.extension(), quality, width); + let cache_path = get_cache_path(&disk_key, &format, cache_dir); + if cache_path.exists() { + return; + } + // Acquire render permit (don't block too long — if busy, skip) + let permit = tokio::time::timeout( + Duration::from_millis(100), + state.page_render_limit.clone().acquire_owned(), + ) + .await; + let _permit = match permit { + Ok(Ok(p)) => p, + _ => return, + }; + + // Fetch the book format from the path extension as a shortcut + let input_format = match abs_path.rsplit('.').next().map(|e| e.to_ascii_lowercase()) { + Some(ref e) if e == "cbz" => "cbz", + Some(ref e) if e == "cbr" => "cbr", + Some(ref e) if e == "pdf" => "pdf", + _ => return, + } + .to_string(); + + let abs_clone = abs_path.to_string(); + let fmt = format; + let result = tokio::time::timeout( + Duration::from_secs(timeout_secs), + tokio::task::spawn_blocking(move || { + render_page(&abs_clone, &input_format, page, &fmt, quality, width, filter) + }), + ) + .await; + + if let Ok(Ok(Ok(data))) = result { + let _ = write_to_disk_cache(&cache_path, &data); + let bytes = Arc::new(data); + state.page_cache.lock().await.put(mem_key, bytes); + } +} + +fn image_response(bytes: Arc>, format: OutputFormat, etag_suffix: Option<&str>, req_headers: &HeaderMap) -> Response { + let content_type = match format { + OutputFormat::Original => detect_content_type(&bytes), + _ => format.content_type(), + }; let etag = if let Some(suffix) = etag_suffix { format!("\"{}\"", suffix) } else { @@ -280,20 +363,38 @@ fn image_response(bytes: Arc>, content_type: &str, etag_suffix: Option<& hasher.update(&*bytes); format!("\"{:x}\"", hasher.finalize()) }; - + + // Check If-None-Match for 304 + if let Some(if_none_match) = req_headers.get(header::IF_NONE_MATCH) { + if if_none_match.as_bytes() == etag.as_bytes() { + let mut headers = HeaderMap::new(); + headers.insert(header::CACHE_CONTROL, HeaderValue::from_static("public, max-age=31536000, immutable")); + if let Ok(v) = HeaderValue::from_str(&etag) { + headers.insert(header::ETAG, v); + } + return (StatusCode::NOT_MODIFIED, headers).into_response(); + } + } + + let mut headers = HeaderMap::new(); + headers.insert(header::CONTENT_TYPE, HeaderValue::from_str(content_type).unwrap_or(HeaderValue::from_static("application/octet-stream"))); + headers.insert(header::CACHE_CONTROL, HeaderValue::from_static("public, max-age=31536000, immutable")); if let Ok(v) = HeaderValue::from_str(&etag) { headers.insert(header::ETAG, v); } - (StatusCode::OK, headers, Body::from((*bytes).clone())).into_response() + // Use Bytes to avoid cloning the Vec — shares the Arc's allocation via zero-copy + let body_bytes = axum::body::Bytes::from(Arc::unwrap_or_clone(bytes)); + (StatusCode::OK, headers, Body::from(body_bytes)).into_response() } /// Render page 1 of a book (for thumbnail fallback or thumbnail checkup). Uses thumbnail dimensions by default. +/// Render page 1 as a thumbnail fallback. Returns (bytes, content_type). pub async fn render_book_page_1( state: &AppState, book_id: Uuid, width: u32, quality: u8, -) -> Result, ApiError> { +) -> Result<(Vec, &'static str), ApiError> { let row = sqlx::query( r#"SELECT abs_path, format FROM book_files WHERE book_id = $1 ORDER BY updated_at DESC LIMIT 1"#, ) @@ -328,7 +429,7 @@ pub async fn render_book_page_1( &abs_path_clone, &input_format, 1, - &OutputFormat::Webp, + &OutputFormat::Original, quality, width, filter, @@ -339,7 +440,9 @@ pub async fn render_book_page_1( .map_err(|_| ApiError::internal("page rendering timeout"))? .map_err(|e| ApiError::internal(format!("render task failed: {e}")))?; - bytes + let bytes = bytes?; + let content_type = detect_content_type(&bytes); + Ok((bytes, content_type)) } fn render_page( @@ -370,43 +473,93 @@ fn render_page( ApiError::internal(format!("page extraction failed: {e}")) })?; + // Original mode or source matches output with no resize → return raw bytes (zero transcoding) + if matches!(out_format, OutputFormat::Original) && width == 0 { + return Ok(page_bytes); + } + if width == 0 { + if let Ok(source_fmt) = image::guess_format(&page_bytes) { + if format_matches(&source_fmt, out_format) { + return Ok(page_bytes); + } + } + } + transcode_image(&page_bytes, out_format, quality, width, filter) } +/// Fast JPEG decode with DCT scaling: decodes directly at reduced resolution. +fn fast_jpeg_decode(input: &[u8], target_w: u32, target_h: u32) -> Option { + if image::guess_format(input).ok()? != ImageFormat::Jpeg { + return None; + } + let mut decoder = jpeg_decoder::Decoder::new(std::io::Cursor::new(input)); + decoder.read_info().ok()?; + decoder.scale(target_w as u16, target_h as u16).ok()?; + let pixels = decoder.decode().ok()?; + let info = decoder.info()?; + let w = info.width as u32; + let h = info.height as u32; + match info.pixel_format { + jpeg_decoder::PixelFormat::RGB24 => { + let buf = image::RgbImage::from_raw(w, h, pixels)?; + Some(image::DynamicImage::ImageRgb8(buf)) + } + jpeg_decoder::PixelFormat::L8 => { + let buf = image::GrayImage::from_raw(w, h, pixels)?; + Some(image::DynamicImage::ImageLuma8(buf)) + } + _ => None, + } +} + fn transcode_image(input: &[u8], out_format: &OutputFormat, quality: u8, width: u32, filter: image::imageops::FilterType) -> Result, ApiError> { - debug!("Transcoding image: {} bytes, format: {:?}, quality: {}, width: {}", input.len(), out_format, quality, width); let source_format = image::guess_format(input).ok(); - debug!("Source format detected: {:?}", source_format); - let needs_transcode = source_format.map(|f| !format_matches(&f, out_format)).unwrap_or(true); + + // Resolve "Original" to the actual source format for encoding + let effective_format = match out_format { + OutputFormat::Original => match source_format { + Some(ImageFormat::Png) => OutputFormat::Png, + Some(ImageFormat::WebP) => OutputFormat::Webp, + _ => OutputFormat::Jpeg, // default to JPEG for original resize + }, + other => *other, + }; + + let needs_transcode = source_format.map(|f| !format_matches(&f, &effective_format)).unwrap_or(true); if width == 0 && !needs_transcode { - debug!("No transcoding needed, returning original"); return Ok(input.to_vec()); } - debug!("Loading image from memory..."); - let mut image = image::load_from_memory(input).map_err(|e| { - error!("Failed to load image from memory: {} (input size: {} bytes)", e, input.len()); - ApiError::internal(format!("invalid source image: {e}")) - })?; + // For JPEG with resize: use DCT scaling to decode at ~target size (much faster) + let mut image = if width > 0 { + fast_jpeg_decode(input, width, u32::MAX) + .unwrap_or_else(|| { + image::load_from_memory(input).unwrap_or_default() + }) + } else { + image::load_from_memory(input).map_err(|e| { + ApiError::internal(format!("invalid source image: {e}")) + })? + }; if width > 0 { - debug!("Resizing image to width: {}", width); image = image.resize(width, u32::MAX, filter); } - debug!("Converting to RGBA..."); let rgba = image.to_rgba8(); let (w, h) = rgba.dimensions(); - debug!("Image dimensions: {}x{}", w, h); - + let mut out = Vec::new(); - match out_format { - OutputFormat::Jpeg => { + match effective_format { + OutputFormat::Jpeg | OutputFormat::Original => { + // JPEG doesn't support alpha — convert RGBA to RGB + let rgb = image::DynamicImage::ImageRgba8(rgba.clone()).to_rgb8(); let mut encoder = JpegEncoder::new_with_quality(&mut out, quality); encoder - .encode(&rgba, w, h, ColorType::Rgba8.into()) + .encode(&rgb, w, h, ColorType::Rgb8.into()) .map_err(|e| ApiError::internal(format!("jpeg encode failed: {e}")))?; } OutputFormat::Png => { @@ -421,7 +574,7 @@ fn transcode_image(input: &[u8], out_format: &OutputFormat, quality: u8, width: .flat_map(|p| [p[0], p[1], p[2]]) .collect(); let webp_data = webp::Encoder::new(&rgb_data, webp::PixelLayout::Rgb, w, h) - .encode(f32::max(quality as f32, 85.0)); + .encode(quality as f32); out.extend_from_slice(&webp_data); } } diff --git a/apps/api/src/state.rs b/apps/api/src/state.rs index 2203687..4648233 100644 --- a/apps/api/src/state.rs +++ b/apps/api/src/state.rs @@ -39,7 +39,7 @@ impl Default for DynamicSettings { timeout_seconds: 12, image_format: "webp".to_string(), image_quality: 85, - image_filter: "lanczos3".to_string(), + image_filter: "triangle".to_string(), image_max_width: 2160, cache_directory: std::env::var("IMAGE_CACHE_DIR") .unwrap_or_else(|_| "/tmp/stripstream-image-cache".to_string()), diff --git a/apps/backoffice/app/components/ui/Form.tsx b/apps/backoffice/app/components/ui/Form.tsx index a7f075f..d683f7f 100644 --- a/apps/backoffice/app/components/ui/Form.tsx +++ b/apps/backoffice/app/components/ui/Form.tsx @@ -90,7 +90,7 @@ interface FormRowProps { } export function FormRow({ children, className = "" }: FormRowProps) { - return
{children}
; + return
{children}
; } // Form Section diff --git a/apps/backoffice/app/settings/SettingsPage.tsx b/apps/backoffice/app/settings/SettingsPage.tsx index eb408cd..76ba38d 100644 --- a/apps/backoffice/app/settings/SettingsPage.tsx +++ b/apps/backoffice/app/settings/SettingsPage.tsx @@ -88,14 +88,14 @@ export default function SettingsPage({ initialSettings, initialCacheStats, initi Image Processing - Configure how images are processed and compressed + These settings only apply when a client explicitly requests format conversion via the API (e.g. ?format=webp&width=800). Pages served without parameters are delivered as-is from the archive, with no processing.
- - Default Output Format + { const newSettings = { ...settings, image_processing: { ...settings.image_processing, format: e.target.value } }; @@ -103,13 +103,13 @@ export default function SettingsPage({ initialSettings, initialCacheStats, initi handleUpdateSetting("image_processing", newSettings.image_processing); }} > - + - + - + { @@ -141,7 +141,7 @@ export default function SettingsPage({ initialSettings, initialCacheStats, initi - + - + + +

+ {settings.thumbnail.format === "original" + ? "Resizes to target dimensions, keeps source format (JPEG→JPEG). Much faster generation." + : "Resizes and re-encodes to selected format."} +

diff --git a/apps/indexer/Cargo.toml b/apps/indexer/Cargo.toml index c34f8fd..8cd2a6c 100644 --- a/apps/indexer/Cargo.toml +++ b/apps/indexer/Cargo.toml @@ -12,7 +12,7 @@ axum.workspace = true chrono.workspace = true futures = "0.3" image.workspace = true -notify = "8" +jpeg-decoder.workspace = true num_cpus.workspace = true parsers = { path = "../../crates/parsers" } rayon.workspace = true diff --git a/apps/indexer/src/analyzer.rs b/apps/indexer/src/analyzer.rs index 2277146..11549bc 100644 --- a/apps/indexer/src/analyzer.rs +++ b/apps/indexer/src/analyzer.rs @@ -1,6 +1,6 @@ use anyhow::Result; use futures::stream::{self, StreamExt}; -use image::GenericImageView; +use image::{GenericImageView, ImageEncoder}; use parsers::{analyze_book, BookFormat}; use sqlx::Row; use std::path::Path; @@ -14,6 +14,7 @@ use crate::{job::is_job_cancelled, utils, AppState}; #[derive(Clone)] struct ThumbnailConfig { enabled: bool, + format: Option, width: u32, height: u32, quality: u8, @@ -24,6 +25,7 @@ struct ThumbnailConfig { async fn load_thumbnail_config(pool: &sqlx::PgPool) -> ThumbnailConfig { let fallback = ThumbnailConfig { enabled: true, + format: Some("webp".to_string()), width: 300, height: 400, quality: 80, @@ -51,6 +53,11 @@ async fn load_thumbnail_config(pool: &sqlx::PgPool) -> ThumbnailConfig { .get("enabled") .and_then(|v| v.as_bool()) .unwrap_or(fallback.enabled), + format: value + .get("format") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .or_else(|| fallback.format.clone()), width: value .get("width") .and_then(|v| v.as_u64()) @@ -100,22 +107,143 @@ async fn load_thumbnail_concurrency(pool: &sqlx::PgPool) -> usize { } } +/// Detect the image format from raw bytes and return the corresponding file extension. +fn detect_image_ext(data: &[u8]) -> &'static str { + match image::guess_format(data) { + Ok(image::ImageFormat::Png) => "png", + Ok(image::ImageFormat::WebP) => "webp", + _ => "jpg", // JPEG is the most common in comic archives + } +} + +/// Fast JPEG decode with DCT scaling: decodes directly at reduced resolution (1/8, 1/4, 1/2). +/// Returns (DynamicImage, original_width, original_height) or None if not JPEG / decode fails. +fn fast_jpeg_decode(image_bytes: &[u8], target_w: u32, target_h: u32) -> Option<(image::DynamicImage, u32, u32)> { + // Only attempt for JPEG + if image::guess_format(image_bytes).ok()? != image::ImageFormat::Jpeg { + return None; + } + + let mut decoder = jpeg_decoder::Decoder::new(std::io::Cursor::new(image_bytes)); + // Read header to get original dimensions + decoder.read_info().ok()?; + let info = decoder.info()?; + let orig_w = info.width as u32; + let orig_h = info.height as u32; + + // Request DCT-scaled decode (picks smallest scale >= requested size) + decoder.scale(target_w as u16, target_h as u16).ok()?; + + let pixels = decoder.decode().ok()?; + let info = decoder.info()?; + let dec_w = info.width as u32; + let dec_h = info.height as u32; + + let img = match info.pixel_format { + jpeg_decoder::PixelFormat::RGB24 => { + let buf = image::RgbImage::from_raw(dec_w, dec_h, pixels)?; + image::DynamicImage::ImageRgb8(buf) + } + jpeg_decoder::PixelFormat::L8 => { + let buf = image::GrayImage::from_raw(dec_w, dec_h, pixels)?; + image::DynamicImage::ImageLuma8(buf) + } + _ => return None, + }; + Some((img, orig_w, orig_h)) +} + fn generate_thumbnail(image_bytes: &[u8], config: &ThumbnailConfig) -> anyhow::Result> { - let img = image::load_from_memory(image_bytes) - .map_err(|e| anyhow::anyhow!("failed to load image: {}", e))?; - let (orig_w, orig_h) = img.dimensions(); - let ratio_w = config.width as f32 / orig_w as f32; - let ratio_h = config.height as f32 / orig_h as f32; - let ratio = ratio_w.min(ratio_h); - let new_w = (orig_w as f32 * ratio) as u32; - let new_h = (orig_h as f32 * ratio) as u32; - let resized = img.resize(new_w, new_h, image::imageops::FilterType::Triangle); - let rgba = resized.to_rgba8(); - let (w, h) = rgba.dimensions(); - let rgb_data: Vec = rgba.pixels().flat_map(|p| [p[0], p[1], p[2]]).collect(); - let quality = config.quality as f32; - let webp_data = webp::Encoder::new(&rgb_data, webp::PixelLayout::Rgb, w, h).encode(quality); - Ok(webp_data.to_vec()) + let t0 = std::time::Instant::now(); + + // Try fast JPEG DCT-scaled decode first (decodes directly at ~target size) + let (img, orig_w, orig_h) = if let Some(result) = fast_jpeg_decode(image_bytes, config.width, config.height) { + result + } else { + // Fallback for PNG/WebP/other formats + let img = image::load_from_memory(image_bytes) + .map_err(|e| anyhow::anyhow!("failed to load image: {}", e))?; + let (ow, oh) = img.dimensions(); + (img, ow, oh) + }; + let t_decode = t0.elapsed(); + + // Don't upscale — clamp to original size + let target_w = config.width.min(orig_w); + let target_h = config.height.min(orig_h); + + let t1 = std::time::Instant::now(); + // thumbnail() is optimized for large downscale ratios (uses fast sampling) + let resized = img.thumbnail(target_w, target_h); + let (w, h) = resized.dimensions(); + let t_resize = t1.elapsed(); + + let format = config.format.as_deref().unwrap_or("webp"); + info!( + "[THUMBNAIL] {}x{} -> {}x{} decode={:.0}ms resize={:.0}ms encode_format={}", + orig_w, orig_h, w, h, + t_decode.as_secs_f64() * 1000.0, + t_resize.as_secs_f64() * 1000.0, + format, + ); + + let t2 = std::time::Instant::now(); + let result = match format { + "original" => { + // Re-encode in source format (fast JPEG encode instead of slow WebP) + let source_format = image::guess_format(image_bytes).unwrap_or(image::ImageFormat::Jpeg); + match source_format { + image::ImageFormat::Png => { + let rgba = resized.to_rgba8(); + let mut buf = Vec::new(); + let encoder = image::codecs::png::PngEncoder::new(&mut buf); + encoder.write_image(&rgba, w, h, image::ColorType::Rgba8.into()) + .map_err(|e| anyhow::anyhow!("png encode failed: {}", e))?; + Ok(buf) + } + _ => { + let rgb = resized.to_rgb8(); + let mut buf = Vec::new(); + let mut encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf, config.quality); + encoder.encode(&rgb, w, h, image::ColorType::Rgb8.into()) + .map_err(|e| anyhow::anyhow!("jpeg encode failed: {}", e))?; + Ok(buf) + } + } + } + "jpeg" | "jpg" => { + let rgb = resized.to_rgb8(); + let mut buf = Vec::new(); + let mut encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf, config.quality); + encoder.encode(&rgb, w, h, image::ColorType::Rgb8.into()) + .map_err(|e| anyhow::anyhow!("jpeg encode failed: {}", e))?; + Ok(buf) + } + "png" => { + let rgba = resized.to_rgba8(); + let mut buf = Vec::new(); + let encoder = image::codecs::png::PngEncoder::new(&mut buf); + encoder.write_image(&rgba, w, h, image::ColorType::Rgba8.into()) + .map_err(|e| anyhow::anyhow!("png encode failed: {}", e))?; + Ok(buf) + } + _ => { + // WebP (default) + let rgb = resized.to_rgb8(); + let rgb_data: &[u8] = rgb.as_raw(); + let quality = config.quality as f32; + let webp_data = webp::Encoder::new(rgb_data, webp::PixelLayout::Rgb, w, h).encode(quality); + Ok(webp_data.to_vec()) + } + }; + let t_encode = t2.elapsed(); + info!( + "[THUMBNAIL] encode={:.0}ms total={:.0}ms output_size={}KB", + t_encode.as_secs_f64() * 1000.0, + t0.elapsed().as_secs_f64() * 1000.0, + result.as_ref().map(|b| b.len() / 1024).unwrap_or(0), + ); + result } /// Save raw image bytes (as extracted from the archive) without any processing. @@ -127,23 +255,32 @@ fn save_raw_image(book_id: Uuid, raw_bytes: &[u8], directory: &str) -> anyhow::R Ok(path.to_string_lossy().to_string()) } -/// Resize the raw image and save it as a WebP thumbnail, overwriting the raw file. -fn resize_raw_to_webp( +/// Resize the raw image and save it as a thumbnail, overwriting the raw file. +fn resize_raw_to_thumbnail( book_id: Uuid, raw_path: &str, config: &ThumbnailConfig, ) -> anyhow::Result { let raw_bytes = std::fs::read(raw_path) .map_err(|e| anyhow::anyhow!("failed to read raw image {}: {}", raw_path, e))?; - let webp_bytes = generate_thumbnail(&raw_bytes, config)?; + info!("[THUMBNAIL] book={} raw_size={}KB", book_id, raw_bytes.len() / 1024); + let thumb_bytes = generate_thumbnail(&raw_bytes, config)?; - let webp_path = Path::new(&config.directory).join(format!("{}.webp", book_id)); - std::fs::write(&webp_path, &webp_bytes)?; + let format = config.format.as_deref().unwrap_or("webp"); + let ext = match format { + "original" => detect_image_ext(&raw_bytes), + "jpeg" | "jpg" => "jpg", + "png" => "png", + _ => "webp", + }; - // Delete the raw file now that the WebP is written + let thumb_path = Path::new(&config.directory).join(format!("{}.{}", book_id, ext)); + std::fs::write(&thumb_path, &thumb_bytes)?; + + // Delete the raw file now that the thumbnail is written let _ = std::fs::remove_file(raw_path); - Ok(webp_path.to_string_lossy().to_string()) + Ok(thumb_path.to_string_lossy().to_string()) } fn book_format_from_str(s: &str) -> Option { @@ -465,7 +602,7 @@ pub async fn analyze_library_books( let raw_path_clone = raw_path.clone(); let thumb_result = tokio::task::spawn_blocking(move || { - resize_raw_to_webp(book_id, &raw_path_clone, &config) + resize_raw_to_thumbnail(book_id, &raw_path_clone, &config) }) .await; @@ -554,18 +691,17 @@ pub async fn regenerate_thumbnails( let mut deleted_count = 0usize; for book_id in &book_ids_to_clear { - // Delete WebP thumbnail - let webp_path = Path::new(&config.directory).join(format!("{}.webp", book_id)); - if webp_path.exists() { - if let Err(e) = std::fs::remove_file(&webp_path) { - warn!("[ANALYZER] Failed to delete thumbnail {}: {}", webp_path.display(), e); - } else { - deleted_count += 1; + // Delete thumbnail in any format (webp, jpg, png) + raw + for ext in &["webp", "jpg", "png", "raw"] { + let path = Path::new(&config.directory).join(format!("{}.{}", book_id, ext)); + if path.exists() { + if let Err(e) = std::fs::remove_file(&path) { + warn!("[ANALYZER] Failed to delete thumbnail {}: {}", path.display(), e); + } else if *ext != "raw" { + deleted_count += 1; + } } } - // Delete raw file if it exists (interrupted previous run) - let raw_path = Path::new(&config.directory).join(format!("{}.raw", book_id)); - let _ = std::fs::remove_file(&raw_path); } info!("[ANALYZER] Deleted {} thumbnail files for regeneration", deleted_count); @@ -599,14 +735,10 @@ pub async fn cleanup_orphaned_thumbnails(state: &AppState) -> Result<()> { for entry in entries.flatten() { let file_name = entry.file_name(); let file_name = file_name.to_string_lossy(); - // Clean up both .webp and orphaned .raw files - let stem = if let Some(s) = file_name.strip_suffix(".webp") { - Some(s.to_string()) - } else if let Some(s) = file_name.strip_suffix(".raw") { - Some(s.to_string()) - } else { - None - }; + // Clean up all thumbnail formats and orphaned .raw files + let stem = [".webp", ".jpg", ".png", ".raw"] + .iter() + .find_map(|ext| file_name.strip_suffix(ext).map(|s| s.to_string())); if let Some(book_id_str) = stem { if let Ok(book_id) = Uuid::parse_str(&book_id_str) { if !existing_book_ids.contains(&book_id) { diff --git a/apps/indexer/src/job.rs b/apps/indexer/src/job.rs index eea13e7..830c5ca 100644 --- a/apps/indexer/src/job.rs +++ b/apps/indexer/src/job.rs @@ -37,27 +37,57 @@ pub async fn cleanup_stale_jobs(pool: &PgPool) -> Result<()> { Ok(()) } +/// Job types that modify book/thumbnail data and must not run concurrently. +const EXCLUSIVE_JOB_TYPES: &[&str] = &[ + "rebuild", + "full_rebuild", + "scan", + "thumbnail_rebuild", + "thumbnail_regenerate", +]; + +/// Active statuses (job is still in progress, not just queued). +const ACTIVE_STATUSES: &[&str] = &[ + "running", + "extracting_pages", + "generating_thumbnails", +]; + pub async fn claim_next_job(pool: &PgPool) -> Result)>> { let mut tx = pool.begin().await?; + // Check if any exclusive job is currently active + let has_active_exclusive: bool = sqlx::query_scalar( + r#" + SELECT EXISTS( + SELECT 1 FROM index_jobs + WHERE status = ANY($1) + AND type = ANY($2) + ) + "#, + ) + .bind(ACTIVE_STATUSES) + .bind(EXCLUSIVE_JOB_TYPES) + .fetch_one(&mut *tx) + .await?; + let row = sqlx::query( r#" SELECT j.id, j.type, j.library_id FROM index_jobs j WHERE j.status = 'pending' AND ( - (j.type IN ('rebuild', 'full_rebuild') AND NOT EXISTS ( - SELECT 1 FROM index_jobs - WHERE status = 'running' - AND type IN ('rebuild', 'full_rebuild') - )) + -- Exclusive jobs: only if no other exclusive job is active + (j.type = ANY($1) AND NOT $2::bool) OR - j.type NOT IN ('rebuild', 'full_rebuild') + -- Non-exclusive jobs (cbr_to_cbz): can always run + j.type != ALL($1) ) ORDER BY CASE j.type WHEN 'full_rebuild' THEN 1 WHEN 'rebuild' THEN 2 + WHEN 'scan' THEN 2 ELSE 3 END, j.created_at ASC @@ -65,6 +95,8 @@ pub async fn claim_next_job(pool: &PgPool) -> Result) LIMIT 1 "#, ) + .bind(EXCLUSIVE_JOB_TYPES) + .bind(has_active_exclusive) .fetch_optional(&mut *tx) .await?; @@ -74,30 +106,8 @@ pub async fn claim_next_job(pool: &PgPool) -> Result) }; let id: Uuid = row.get("id"); - let job_type: String = row.get("type"); let library_id: Option = row.get("library_id"); - if job_type == "rebuild" || job_type == "full_rebuild" { - let has_running_rebuild: bool = sqlx::query_scalar( - r#" - SELECT EXISTS( - SELECT 1 FROM index_jobs - WHERE status = 'running' - AND type IN ('rebuild', 'full_rebuild') - AND id != $1 - ) - "#, - ) - .bind(id) - .fetch_one(&mut *tx) - .await?; - - if has_running_rebuild { - tx.rollback().await?; - return Ok(None); - } - } - sqlx::query( "UPDATE index_jobs SET status = 'running', started_at = NOW(), error_opt = NULL WHERE id = $1", ) diff --git a/apps/indexer/src/watcher.rs b/apps/indexer/src/watcher.rs index cccbf82..9904e4a 100644 --- a/apps/indexer/src/watcher.rs +++ b/apps/indexer/src/watcher.rs @@ -1,147 +1,179 @@ use anyhow::Result; -use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher}; use sqlx::Row; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::path::Path; use std::time::Duration; -use tokio::sync::mpsc; -use tracing::{error, info, trace}; +use tracing::{error, info, trace, warn}; use uuid::Uuid; +use walkdir::WalkDir; use crate::utils; use crate::AppState; -pub async fn run_file_watcher(state: AppState) -> Result<()> { - let (tx, mut rx) = mpsc::channel::<(Uuid, String)>(100); +/// Check if any indexing job is currently active. +async fn has_active_jobs(pool: &sqlx::PgPool) -> bool { + sqlx::query_scalar::<_, bool>( + "SELECT EXISTS(SELECT 1 FROM index_jobs WHERE status IN ('pending', 'running', 'extracting_pages', 'generating_thumbnails'))", + ) + .fetch_one(pool) + .await + .unwrap_or(false) +} - // Start watcher refresh loop - let refresh_interval = Duration::from_secs(30); +/// Snapshot: set of book file paths found in a library. +type LibrarySnapshot = HashSet; + +/// Walk a library directory and collect all book file paths. +/// Walks sequentially to limit open file descriptors. +fn snapshot_library(root_path: &str) -> LibrarySnapshot { + let mut files = HashSet::new(); + let walker = WalkDir::new(root_path) + .follow_links(true) + .into_iter() + .filter_map(|e| e.ok()); + + for entry in walker { + if entry.file_type().is_file() && parsers::detect_format(entry.path()).is_some() { + files.insert(entry.path().to_string_lossy().to_string()); + } + } + files +} + +pub async fn run_file_watcher(state: AppState) -> Result<()> { + let poll_interval = Duration::from_secs(30); let pool = state.pool.clone(); - tokio::spawn(async move { - let mut watched_libraries: HashMap = HashMap::new(); + // Stored snapshots per library — used to detect additions/removals between polls + let mut snapshots: HashMap = HashMap::new(); + // Track which libraries we're watching (to detect config changes) + let mut watched_libraries: HashMap = HashMap::new(); - loop { - // Get libraries with watcher enabled - match sqlx::query( - "SELECT id, root_path FROM libraries WHERE watcher_enabled = TRUE AND enabled = TRUE" - ) - .fetch_all(&pool) - .await - { - Ok(rows) => { - let current_libraries: HashMap = rows - .into_iter() - .map(|row| { - let id: Uuid = row.get("id"); - let root_path: String = row.get("root_path"); - let local_path = utils::remap_libraries_path(&root_path); - (id, local_path) - }) - .collect(); + loop { + tokio::time::sleep(poll_interval).await; - // Check if we need to recreate watcher - let needs_restart = watched_libraries.len() != current_libraries.len() - || watched_libraries.iter().any(|(id, path)| { - current_libraries.get(id) != Some(path) - }); - - if needs_restart { - info!("[WATCHER] Restarting watcher for {} libraries", current_libraries.len()); - - if !current_libraries.is_empty() { - let tx_clone = tx.clone(); - let libraries_clone = current_libraries.clone(); - - match setup_watcher(libraries_clone, tx_clone) { - Ok(_new_watcher) => { - watched_libraries = current_libraries; - info!("[WATCHER] Watching {} libraries", watched_libraries.len()); - } - Err(err) => { - error!("[WATCHER] Failed to setup watcher: {}", err); - } - } - } - } - } - Err(err) => { - error!("[WATCHER] Failed to fetch libraries: {}", err); - } - } - - tokio::time::sleep(refresh_interval).await; + // Skip if any job is active — avoid competing for file descriptors + if has_active_jobs(&pool).await { + trace!("[WATCHER] Skipping poll — job active"); + continue; } - }); - // Process watcher events - while let Some((library_id, file_path)) = rx.recv().await { - info!("[WATCHER] File changed in library {}: {}", library_id, file_path); - - // Check if there's already a pending job for this library - match sqlx::query_scalar::<_, bool>( - "SELECT EXISTS(SELECT 1 FROM index_jobs WHERE library_id = $1 AND status IN ('pending', 'running'))" + // Fetch enabled libraries with watcher + let rows = match sqlx::query( + "SELECT id, root_path FROM libraries WHERE watcher_enabled = TRUE AND enabled = TRUE", ) - .bind(library_id) - .fetch_one(&state.pool) + .fetch_all(&pool) .await { - Ok(exists) => { - if !exists { - // Create a quick scan job + Ok(rows) => rows, + Err(err) => { + error!("[WATCHER] Failed to fetch libraries: {}", err); + continue; + } + }; + + let current_libraries: HashMap = rows + .into_iter() + .map(|row| { + let id: Uuid = row.get("id"); + let root_path: String = row.get("root_path"); + let local_path = utils::remap_libraries_path(&root_path); + (id, local_path) + }) + .collect(); + + // If library config changed, reset snapshots for removed/changed libraries + if current_libraries != watched_libraries { + let removed: Vec = watched_libraries + .keys() + .filter(|id| !current_libraries.contains_key(id)) + .copied() + .collect(); + for id in removed { + snapshots.remove(&id); + } + if !current_libraries.is_empty() { + info!( + "[WATCHER] Watching {} libraries (lightweight poll, {}s interval)", + current_libraries.len(), + poll_interval.as_secs() + ); + } else { + info!("[WATCHER] No libraries to watch"); + } + watched_libraries = current_libraries.clone(); + } + + // Poll each library sequentially to limit concurrent file descriptor usage + for (library_id, root_path) in ¤t_libraries { + if !Path::new(root_path).is_dir() { + warn!("[WATCHER] Library {} path not accessible: {}", library_id, root_path); + continue; + } + + // Re-check between libraries in case a job was created + if has_active_jobs(&pool).await { + trace!("[WATCHER] Job became active during poll, stopping"); + break; + } + + let root_owned = root_path.clone(); + let new_snapshot = tokio::task::spawn_blocking(move || snapshot_library(&root_owned)) + .await + .unwrap_or_default(); + + let changed = match snapshots.get(library_id) { + Some(old_snapshot) => *old_snapshot != new_snapshot, + None => { + // First scan — store baseline, don't trigger a job + trace!( + "[WATCHER] Initial snapshot for library {}: {} files", + library_id, + new_snapshot.len() + ); + snapshots.insert(*library_id, new_snapshot); + continue; + } + }; + + if changed { + info!( + "[WATCHER] Changes detected in library {} ({})", + library_id, root_path + ); + + // Check if a job already exists for this library + let job_exists = sqlx::query_scalar::<_, bool>( + "SELECT EXISTS(SELECT 1 FROM index_jobs WHERE library_id = $1 AND status IN ('pending', 'running', 'extracting_pages', 'generating_thumbnails'))", + ) + .bind(library_id) + .fetch_one(&pool) + .await + .unwrap_or(true); + + if !job_exists { let job_id = Uuid::new_v4(); match sqlx::query( - "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, 'rebuild', 'pending')" + "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, 'rebuild', 'pending')", ) .bind(job_id) .bind(library_id) - .execute(&state.pool) + .execute(&pool) .await { - Ok(_) => info!("[WATCHER] Created job {} for library {}", job_id, library_id), + Ok(_) => info!( + "[WATCHER] Created rebuild job {} for library {}", + job_id, library_id + ), Err(err) => error!("[WATCHER] Failed to create job: {}", err), } } else { - trace!("[WATCHER] Job already pending for library {}, skipping", library_id); + trace!("[WATCHER] Job already active for library {}, skipping", library_id); } } - Err(err) => error!("[WATCHER] Failed to check existing jobs: {}", err), + + // Update snapshot + snapshots.insert(*library_id, new_snapshot); } } - - Ok(()) -} - -fn setup_watcher( - libraries: HashMap, - tx: mpsc::Sender<(Uuid, String)>, -) -> Result { - let libraries_for_closure = libraries.clone(); - - let mut watcher = notify::recommended_watcher(move |res: Result| { - match res { - Ok(event) => { - if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() { - for path in event.paths { - if let Some((library_id, _)) = libraries_for_closure.iter().find(|(_, root)| { - path.starts_with(root) - }) { - let path_str = path.to_string_lossy().to_string(); - if parsers::detect_format(&path).is_some() { - let _ = tx.try_send((*library_id, path_str)); - } - } - } - } - } - Err(err) => error!("[WATCHER] Event error: {}", err), - } - })?; - - // Actually watch the library directories - for root_path in libraries.values() { - info!("[WATCHER] Watching directory: {}", root_path); - watcher.watch(std::path::Path::new(root_path), RecursiveMode::Recursive)?; - } - - Ok(watcher) } diff --git a/crates/parsers/src/lib.rs b/crates/parsers/src/lib.rs index db3e6d3..cd32f21 100644 --- a/crates/parsers/src/lib.rs +++ b/crates/parsers/src/lib.rs @@ -1,7 +1,8 @@ use anyhow::{Context, Result}; +use std::collections::HashMap; use std::io::{Read, Write}; use std::path::{Path, PathBuf}; -use std::sync::OnceLock; +use std::sync::{Mutex, OnceLock}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum BookFormat { @@ -527,6 +528,40 @@ pub fn extract_page(path: &Path, format: BookFormat, page_number: u32, pdf_rende } } +/// Cache of sorted image names per archive path. Avoids re-listing and sorting on every page request. +static CBZ_INDEX_CACHE: OnceLock>>> = OnceLock::new(); + +fn cbz_index_cache() -> &'static Mutex>> { + CBZ_INDEX_CACHE.get_or_init(|| Mutex::new(HashMap::new())) +} + +/// Get sorted image names from cache, or list + sort + cache them. +fn get_cbz_image_index(path: &Path, archive: &mut zip::ZipArchive) -> Vec { + { + let cache = cbz_index_cache().lock().unwrap(); + if let Some(names) = cache.get(path) { + return names.clone(); + } + } + let mut image_names: Vec = Vec::new(); + for i in 0..archive.len() { + let entry = match archive.by_index(i) { + Ok(e) => e, + Err(_) => continue, + }; + let name = entry.name().to_ascii_lowercase(); + if is_image_name(&name) { + image_names.push(entry.name().to_string()); + } + } + image_names.sort_by(|a, b| natord::compare(a, b)); + { + let mut cache = cbz_index_cache().lock().unwrap(); + cache.insert(path.to_path_buf(), image_names.clone()); + } + image_names +} + fn extract_cbz_page(path: &Path, page_number: u32, allow_fallback: bool) -> Result> { let file = std::fs::File::open(path) .with_context(|| format!("cannot open cbz: {}", path.display()))?; @@ -534,18 +569,7 @@ fn extract_cbz_page(path: &Path, page_number: u32, allow_fallback: bool) -> Resu match zip::ZipArchive::new(file) { Ok(mut archive) => { - let mut image_names: Vec = Vec::new(); - for i in 0..archive.len() { - let entry = match archive.by_index(i) { - Ok(e) => e, - Err(_) => continue, - }; - let name = entry.name().to_ascii_lowercase(); - if is_image_name(&name) { - image_names.push(entry.name().to_string()); - } - } - image_names.sort_by(|a, b| natord::compare(a, b)); + let image_names = get_cbz_image_index(path, &mut archive); let selected = image_names .get(index)