Compare commits

..

2 Commits

Author SHA1 Message Date
358896c7d5 perf(indexer): éliminer le pre-count WalkDir en mode incrémental + concurrence adaptative
- Incremental rebuild: remplace le WalkDir de comptage par un COUNT(*) SQL
  → incrémental 67s → 25s (-62%) sur disque externe
- Full rebuild: conserve le WalkDir (DB vidée avant le comptage)
- Concurrence par défaut: num_cpus/2 clampé [2,8] au lieu de 2 fixe
- Ajoute num_cpus comme dépendance workspace
- Backoffice jobs: un seul formulaire avec formAction par bouton (icônes rétablies)
- infra/perf.sh: corrige l'endpoint /index/jobs/:id (pas /details), exporte BASE_API/TOKEN

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 22:15:41 +01:00
1d10044d46 fix: plusieurs correctifs jobs et analyzer
- cancel_job: ajouter 'extracting_pages' aux statuts annulables
- cleanup_stale_jobs: couvrir 'extracting_pages' et 'generating_thumbnails' au redémarrage
- analyzer: ne pas régénérer le thumbnail si déjà existant (skip sub-phase B)
- analyzer: supprimer les dotfiles macOS (._*) encore en DB
- SSE backoffice: réduire le spam de logs en cas d'API injoignable

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 21:41:52 +01:00
9 changed files with 166 additions and 116 deletions

17
Cargo.lock generated
View File

@@ -863,6 +863,12 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
[[package]] [[package]]
name = "hex" name = "hex"
version = "0.4.3" version = "0.4.3"
@@ -1171,6 +1177,7 @@ dependencies = [
"futures", "futures",
"image", "image",
"notify", "notify",
"num_cpus",
"parsers", "parsers",
"rand 0.8.5", "rand 0.8.5",
"rayon", "rayon",
@@ -1639,6 +1646,16 @@ dependencies = [
"libm", "libm",
] ]
[[package]]
name = "num_cpus"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b"
dependencies = [
"hermit-abi",
"libc",
]
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.21.3" version = "1.21.3"

View File

@@ -33,6 +33,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
uuid = { version = "1.12", features = ["serde", "v4"] } uuid = { version = "1.12", features = ["serde", "v4"] }
natord = "1.0" natord = "1.0"
num_cpus = "1.16"
pdfium-render = { version = "0.8", default-features = false, features = ["pdfium_latest", "image_latest", "thread_safe"] } pdfium-render = { version = "0.8", default-features = false, features = ["pdfium_latest", "image_latest", "thread_safe"] }
unrar = "0.5" unrar = "0.5"
walkdir = "2.5" walkdir = "2.5"

View File

@@ -182,7 +182,7 @@ pub async fn cancel_job(
id: axum::extract::Path<Uuid>, id: axum::extract::Path<Uuid>,
) -> Result<Json<IndexJobResponse>, ApiError> { ) -> Result<Json<IndexJobResponse>, ApiError> {
let rows_affected = sqlx::query( let rows_affected = sqlx::query(
"UPDATE index_jobs SET status = 'cancelled' WHERE id = $1 AND status IN ('pending', 'running', 'generating_thumbnails')", "UPDATE index_jobs SET status = 'cancelled' WHERE id = $1 AND status IN ('pending', 'running', 'extracting_pages', 'generating_thumbnails')",
) )
.bind(id.0) .bind(id.0)
.execute(&state.pool) .execute(&state.pool)

View File

@@ -15,19 +15,21 @@ export async function GET(
let lastData: string | null = null; let lastData: string | null = null;
let isActive = true; let isActive = true;
let consecutiveErrors = 0;
const fetchJob = async () => { const fetchJob = async () => {
if (!isActive) return; if (!isActive) return;
try { try {
const response = await fetch(`${baseUrl}/index/jobs/${id}`, { const response = await fetch(`${baseUrl}/index/jobs/${id}`, {
headers: { Authorization: `Bearer ${token}` }, headers: { Authorization: `Bearer ${token}` },
}); });
if (response.ok && isActive) { if (response.ok && isActive) {
consecutiveErrors = 0;
const data = await response.json(); const data = await response.json();
const dataStr = JSON.stringify(data); const dataStr = JSON.stringify(data);
// Only send if data changed // Only send if data changed
if (dataStr !== lastData && isActive) { if (dataStr !== lastData && isActive) {
lastData = dataStr; lastData = dataStr;
@@ -40,7 +42,7 @@ export async function GET(
isActive = false; isActive = false;
return; return;
} }
// Stop polling if job is complete // Stop polling if job is complete
if (data.status === "success" || data.status === "failed" || data.status === "cancelled") { if (data.status === "success" || data.status === "failed" || data.status === "cancelled") {
isActive = false; isActive = false;
@@ -54,7 +56,11 @@ export async function GET(
} }
} catch (error) { } catch (error) {
if (isActive) { if (isActive) {
console.error("SSE fetch error:", error); consecutiveErrors++;
// Only log first failure and every 60th to avoid spam
if (consecutiveErrors === 1 || consecutiveErrors % 60 === 0) {
console.warn(`SSE fetch error (${consecutiveErrors} consecutive):`, error);
}
} }
} }
}; };

View File

@@ -10,19 +10,21 @@ export async function GET(request: NextRequest) {
let lastData: string | null = null; let lastData: string | null = null;
let isActive = true; let isActive = true;
let consecutiveErrors = 0;
const fetchJobs = async () => { const fetchJobs = async () => {
if (!isActive) return; if (!isActive) return;
try { try {
const response = await fetch(`${baseUrl}/index/status`, { const response = await fetch(`${baseUrl}/index/status`, {
headers: { Authorization: `Bearer ${token}` }, headers: { Authorization: `Bearer ${token}` },
}); });
if (response.ok && isActive) { if (response.ok && isActive) {
consecutiveErrors = 0;
const data = await response.json(); const data = await response.json();
const dataStr = JSON.stringify(data); const dataStr = JSON.stringify(data);
// Send if data changed // Send if data changed
if (dataStr !== lastData && isActive) { if (dataStr !== lastData && isActive) {
lastData = dataStr; lastData = dataStr;
@@ -38,7 +40,11 @@ export async function GET(request: NextRequest) {
} }
} catch (error) { } catch (error) {
if (isActive) { if (isActive) {
console.error("SSE fetch error:", error); consecutiveErrors++;
// Only log first failure and every 30th to avoid spam
if (consecutiveErrors === 1 || consecutiveErrors % 30 === 0) {
console.warn(`SSE fetch error (${consecutiveErrors} consecutive):`, error);
}
} }
} }
}; };

View File

@@ -2,7 +2,7 @@ import { revalidatePath } from "next/cache";
import { redirect } from "next/navigation"; import { redirect } from "next/navigation";
import { listJobs, fetchLibraries, rebuildIndex, rebuildThumbnails, regenerateThumbnails, IndexJobDto, LibraryDto } from "../../lib/api"; import { listJobs, fetchLibraries, rebuildIndex, rebuildThumbnails, regenerateThumbnails, IndexJobDto, LibraryDto } from "../../lib/api";
import { JobsList } from "../components/JobsList"; import { JobsList } from "../components/JobsList";
import { Card, CardHeader, CardTitle, CardDescription, CardContent, Button, FormField, FormSelect, FormRow } from "../components/ui"; import { Card, CardHeader, CardTitle, CardContent, Button, FormField, FormSelect, FormRow } from "../components/ui";
export const dynamic = "force-dynamic"; export const dynamic = "force-dynamic";
@@ -57,100 +57,54 @@ export default async function JobsPage({ searchParams }: { searchParams: Promise
Index Jobs Index Jobs
</h1> </h1>
</div> </div>
<Card className="mb-6"> <Card className="mb-6">
<CardHeader> <CardHeader>
<CardTitle>Queue New Job</CardTitle> <CardTitle>Queue New Job</CardTitle>
<CardDescription>Rebuild index, full rebuild, generate missing thumbnails, or regenerate all thumbnails</CardDescription>
</CardHeader> </CardHeader>
<CardContent className="space-y-4"> <CardContent>
<form action={triggerRebuild}> <form>
<FormRow> <FormRow>
<FormField className="flex-1"> <FormField className="flex-1 max-w-xs">
<FormSelect name="library_id" defaultValue=""> <FormSelect name="library_id" defaultValue="">
<option value="">All libraries</option> <option value="">All libraries</option>
{libraries.map((lib) => ( {libraries.map((lib) => (
<option key={lib.id} value={lib.id}> <option key={lib.id} value={lib.id}>{lib.name}</option>
{lib.name}
</option>
))} ))}
</FormSelect> </FormSelect>
</FormField> </FormField>
<Button type="submit"> <div className="flex flex-wrap gap-2">
<svg className="w-4 h-4 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24"> <Button type="submit" formAction={triggerRebuild}>
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" /> <svg className="w-4 h-4 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24">
</svg> <path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
Queue Rebuild </svg>
</Button> Rebuild
</FormRow> </Button>
</form> <Button type="submit" formAction={triggerFullRebuild} variant="warning">
<svg className="w-4 h-4 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<form action={triggerFullRebuild}> <path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M19 7l-.867 12.142A2 2 0 0116.138 21H7.862a2 2 0 01-1.995-1.858L5 7m5 4v6m4-6v6m1-10V4a1 1 0 00-1-1h-4a1 1 0 00-1 1v3M4 7h16" />
<FormRow> </svg>
<FormField className="flex-1"> Full Rebuild
<FormSelect name="library_id" defaultValue=""> </Button>
<option value="">All libraries</option> <Button type="submit" formAction={triggerThumbnailsRebuild} variant="secondary">
{libraries.map((lib) => ( <svg className="w-4 h-4 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<option key={lib.id} value={lib.id}> <path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M4 16l4.586-4.586a2 2 0 012.828 0L16 16m-2-2l1.586-1.586a2 2 0 012.828 0L20 14m-6-6h.01M6 20h12a2 2 0 002-2V6a2 2 0 00-2-2H6a2 2 0 00-2 2v12a2 2 0 002 2z" />
{lib.name} </svg>
</option> Generate thumbnails
))} </Button>
</FormSelect> <Button type="submit" formAction={triggerThumbnailsRegenerate} variant="warning">
</FormField> <svg className="w-4 h-4 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<Button type="submit" variant="warning"> <path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
<svg className="w-4 h-4 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24"> </svg>
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M19 7l-.867 12.142A2 2 0 0116.138 21H7.862a2 2 0 01-1.995-1.858L5 7m5 4v6m4-6v6m1-10V4a1 1 0 00-1-1h-4a1 1 0 00-1 1v3M4 7h16" /> Regenerate thumbnails
</svg> </Button>
Full Rebuild </div>
</Button>
</FormRow>
</form>
<form action={triggerThumbnailsRebuild}>
<FormRow>
<FormField className="flex-1">
<FormSelect name="library_id" defaultValue="">
<option value="">All libraries</option>
{libraries.map((lib) => (
<option key={lib.id} value={lib.id}>
{lib.name}
</option>
))}
</FormSelect>
</FormField>
<Button type="submit" variant="secondary">
<svg className="w-4 h-4 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M4 16l4.586-4.586a2 2 0 012.828 0L16 16m-2-2l1.586-1.586a2 2 0 012.828 0L20 14m-6-6h.01M6 20h12a2 2 0 002-2V6a2 2 0 00-2-2H6a2 2 0 00-2 2v12a2 2 0 002 2z" />
</svg>
Generate thumbnails
</Button>
</FormRow>
</form>
<form action={triggerThumbnailsRegenerate}>
<FormRow>
<FormField className="flex-1">
<FormSelect name="library_id" defaultValue="">
<option value="">All libraries</option>
{libraries.map((lib) => (
<option key={lib.id} value={lib.id}>
{lib.name}
</option>
))}
</FormSelect>
</FormField>
<Button type="submit" variant="warning">
<svg className="w-4 h-4 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
</svg>
Regenerate thumbnails
</Button>
</FormRow> </FormRow>
</form> </form>
</CardContent> </CardContent>
</Card> </Card>
<JobsList <JobsList
initialJobs={jobs} initialJobs={jobs}
libraries={libraryMap} libraries={libraryMap}
highlightJobId={highlight} highlightJobId={highlight}

View File

@@ -13,6 +13,7 @@ chrono.workspace = true
futures = "0.3" futures = "0.3"
image.workspace = true image.workspace = true
notify = "6.1" notify = "6.1"
num_cpus.workspace = true
parsers = { path = "../../crates/parsers" } parsers = { path = "../../crates/parsers" }
rand.workspace = true rand.workspace = true
rayon.workspace = true rayon.workspace = true

View File

@@ -67,7 +67,10 @@ async fn load_thumbnail_config(pool: &sqlx::PgPool) -> ThumbnailConfig {
} }
async fn load_thumbnail_concurrency(pool: &sqlx::PgPool) -> usize { async fn load_thumbnail_concurrency(pool: &sqlx::PgPool) -> usize {
let default_concurrency = 2; // Default: half the logical CPUs, clamped between 2 and 8.
// Archive extraction is I/O bound but benefits from moderate parallelism.
let cpus = num_cpus::get();
let default_concurrency = (cpus / 2).clamp(2, 8);
let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#) let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#)
.fetch_optional(pool) .fetch_optional(pool)
.await; .await;
@@ -173,7 +176,7 @@ pub async fn analyze_library_books(
let sql = format!( let sql = format!(
r#" r#"
SELECT b.id AS book_id, bf.abs_path, bf.format SELECT b.id AS book_id, bf.abs_path, bf.format, (b.thumbnail_path IS NULL) AS needs_thumbnail
FROM books b FROM books b
JOIN book_files bf ON bf.book_id = b.id JOIN book_files bf ON bf.book_id = b.id
WHERE (b.library_id = $1 OR $1 IS NULL) WHERE (b.library_id = $1 OR $1 IS NULL)
@@ -219,6 +222,7 @@ pub async fn analyze_library_books(
book_id: Uuid, book_id: Uuid,
abs_path: String, abs_path: String,
format: String, format: String,
needs_thumbnail: bool,
} }
let tasks: Vec<BookTask> = rows let tasks: Vec<BookTask> = rows
@@ -227,6 +231,7 @@ pub async fn analyze_library_books(
book_id: row.get("book_id"), book_id: row.get("book_id"),
abs_path: row.get("abs_path"), abs_path: row.get("abs_path"),
format: row.get("format"), format: row.get("format"),
needs_thumbnail: row.get("needs_thumbnail"),
}) })
.collect(); .collect();
@@ -245,7 +250,7 @@ pub async fn analyze_library_books(
let extracted_count = Arc::new(AtomicI32::new(0)); let extracted_count = Arc::new(AtomicI32::new(0));
// Collected results: (book_id, raw_path, page_count) // Collected results: (book_id, raw_path, page_count) — only books that need thumbnail generation
let extracted: Vec<(Uuid, String, i32)> = stream::iter(tasks) let extracted: Vec<(Uuid, String, i32)> = stream::iter(tasks)
.map(|task| { .map(|task| {
let pool = state.pool.clone(); let pool = state.pool.clone();
@@ -261,6 +266,28 @@ pub async fn analyze_library_books(
let local_path = utils::remap_libraries_path(&task.abs_path); let local_path = utils::remap_libraries_path(&task.abs_path);
let path = std::path::Path::new(&local_path); let path = std::path::Path::new(&local_path);
let book_id = task.book_id; let book_id = task.book_id;
let needs_thumbnail = task.needs_thumbnail;
// Remove macOS Apple Double resource fork files (._*) that were indexed before the scanner filter was added
if path
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("._"))
.unwrap_or(false)
{
warn!("[ANALYZER] Removing macOS resource fork from DB: {}", local_path);
let _ = sqlx::query("DELETE FROM book_files WHERE book_id = $1")
.bind(book_id)
.execute(&pool)
.await;
let _ = sqlx::query(
"DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)",
)
.bind(book_id)
.execute(&pool)
.await;
return None;
}
let format = match book_format_from_str(&task.format) { let format = match book_format_from_str(&task.format) {
Some(f) => f, Some(f) => f,
@@ -295,6 +322,29 @@ pub async fn analyze_library_books(
} }
}; };
// If thumbnail already exists, just update page_count and skip thumbnail generation
if !needs_thumbnail {
if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2")
.bind(page_count)
.bind(book_id)
.execute(&pool)
.await
{
warn!("[ANALYZER] DB page_count update failed for book {}: {}", book_id, e);
}
let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1;
let percent = (processed as f64 / total as f64 * 50.0) as i32;
let _ = sqlx::query(
"UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1",
)
.bind(job_id)
.bind(processed)
.bind(percent)
.execute(&pool)
.await;
return None; // don't enqueue for thumbnail sub-phase
}
// Save raw bytes to disk (no resize, no encode) // Save raw bytes to disk (no resize, no encode)
let raw_path = match tokio::task::spawn_blocking({ let raw_path = match tokio::task::spawn_blocking({
let dir = config.directory.clone(); let dir = config.directory.clone();

View File

@@ -13,7 +13,7 @@ pub async fn cleanup_stale_jobs(pool: &PgPool) -> Result<()> {
SET status = 'failed', SET status = 'failed',
finished_at = NOW(), finished_at = NOW(),
error_opt = 'Job interrupted by indexer restart' error_opt = 'Job interrupted by indexer restart'
WHERE status = 'running' WHERE status IN ('running', 'extracting_pages', 'generating_thumbnails')
AND started_at < NOW() - INTERVAL '5 minutes' AND started_at < NOW() - INTERVAL '5 minutes'
RETURNING id RETURNING id
"#, "#,
@@ -238,27 +238,42 @@ pub async fn process_job(
.await? .await?
}; };
// Count total files for progress estimation // Count total files for progress estimation.
let library_paths: Vec<String> = libraries // For incremental rebuilds, use the DB count (instant) — the filesystem will be walked
.iter() // once during discovery anyway, no need for a second full WalkDir pass.
.map(|library| { // For full rebuilds, the DB is already cleared, so we must walk the filesystem.
crate::utils::remap_libraries_path(&library.get::<String, _>("root_path")) let library_ids: Vec<uuid::Uuid> = libraries.iter().map(|r| r.get("id")).collect();
})
.collect();
let total_files: usize = library_paths let total_files: usize = if !is_full_rebuild {
.par_iter() let count: i64 = sqlx::query_scalar(
.map(|root_path| { "SELECT COUNT(*) FROM book_files bf JOIN books b ON b.id = bf.book_id WHERE b.library_id = ANY($1)"
walkdir::WalkDir::new(root_path) )
.into_iter() .bind(&library_ids)
.filter_map(Result::ok) .fetch_one(&state.pool)
.filter(|entry| { .await
entry.file_type().is_file() .unwrap_or(0);
&& parsers::detect_format(entry.path()).is_some() count as usize
}) } else {
.count() let library_paths: Vec<String> = libraries
}) .iter()
.sum(); .map(|library| {
crate::utils::remap_libraries_path(&library.get::<String, _>("root_path"))
})
.collect();
library_paths
.par_iter()
.map(|root_path| {
walkdir::WalkDir::new(root_path)
.into_iter()
.filter_map(Result::ok)
.filter(|entry| {
entry.file_type().is_file()
&& parsers::detect_format(entry.path()).is_some()
})
.count()
})
.sum()
};
info!( info!(
"[JOB] Found {} libraries, {} total files to index", "[JOB] Found {} libraries, {} total files to index",