feat(indexing): Lot 4 - Progression temps reel, Full Rebuild, Optimisations
- Ajout migrations DB: index_job_errors, library_monitoring, full_rebuild_type - API: endpoints progression temps reel (/jobs/:id/stream), active jobs, details - API: support full_rebuild avec suppression donnees existantes - Indexer: logs detailles avec timing [SCAN][META][PARSER][BDD] - Indexer: optimisation parsing PDF (lopdf -> pdfinfo) 235x plus rapide - Indexer: corrections chemins LIBRARIES_ROOT_PATH pour dev local - Backoffice: composants JobProgress, JobsIndicator (header), JobsList - Backoffice: SSE streaming pour progression temps reel - Backoffice: boutons Index/Index Full sur page libraries - Backoffice: highlight job apres creation avec redirection - Fix: parsing volume type i32, sync meilisearch cleanup Perf: parsing PDF passe de 8.7s a 37ms Perf: indexation 45 fichiers en ~15s vs plusieurs minutes avant
This commit is contained in:
@@ -8,10 +8,28 @@ use sha2::{Digest, Sha256};
|
||||
use sqlx::{postgres::PgPoolOptions, Row};
|
||||
use std::{collections::HashMap, path::Path, time::Duration};
|
||||
use stripstream_core::config::IndexerConfig;
|
||||
use tracing::{error, info};
|
||||
use tracing::{error, info, trace, warn};
|
||||
use uuid::Uuid;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
fn remap_libraries_path(path: &str) -> String {
|
||||
if let Ok(root) = std::env::var("LIBRARIES_ROOT_PATH") {
|
||||
if path.starts_with("/libraries/") {
|
||||
return path.replacen("/libraries", &root, 1);
|
||||
}
|
||||
}
|
||||
path.to_string()
|
||||
}
|
||||
|
||||
fn unmap_libraries_path(path: &str) -> String {
|
||||
if let Ok(root) = std::env::var("LIBRARIES_ROOT_PATH") {
|
||||
if path.starts_with(&root) {
|
||||
return path.replacen(&root, "/libraries", 1);
|
||||
}
|
||||
}
|
||||
path.to_string()
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AppState {
|
||||
pool: sqlx::PgPool,
|
||||
@@ -77,14 +95,20 @@ async fn run_worker(state: AppState, interval_seconds: u64) {
|
||||
loop {
|
||||
match claim_next_job(&state.pool).await {
|
||||
Ok(Some((job_id, library_id))) => {
|
||||
info!("[INDEXER] Starting job {} library={:?}", job_id, library_id);
|
||||
if let Err(err) = process_job(&state, job_id, library_id).await {
|
||||
error!(job_id = %job_id, error = %err, "index job failed");
|
||||
error!("[INDEXER] Job {} failed: {}", job_id, err);
|
||||
let _ = fail_job(&state.pool, job_id, &err.to_string()).await;
|
||||
} else {
|
||||
info!("[INDEXER] Job {} completed", job_id);
|
||||
}
|
||||
}
|
||||
Ok(None) => tokio::time::sleep(wait).await,
|
||||
Ok(None) => {
|
||||
trace!("[INDEXER] No pending jobs, waiting...");
|
||||
tokio::time::sleep(wait).await;
|
||||
}
|
||||
Err(err) => {
|
||||
error!(error = %err, "worker loop error");
|
||||
error!("[INDEXER] Worker error: {}", err);
|
||||
tokio::time::sleep(wait).await;
|
||||
}
|
||||
}
|
||||
@@ -124,6 +148,38 @@ async fn claim_next_job(pool: &sqlx::PgPool) -> anyhow::Result<Option<(Uuid, Opt
|
||||
}
|
||||
|
||||
async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Option<Uuid>) -> anyhow::Result<()> {
|
||||
info!("[JOB] Processing {} library={:?}", job_id, target_library_id);
|
||||
|
||||
// Get job type to check if it's a full rebuild
|
||||
let job_type: String = sqlx::query_scalar("SELECT type FROM index_jobs WHERE id = $1")
|
||||
.bind(job_id)
|
||||
.fetch_one(&state.pool)
|
||||
.await?;
|
||||
let is_full_rebuild = job_type == "full_rebuild";
|
||||
info!("[JOB] {} type={} full_rebuild={}", job_id, job_type, is_full_rebuild);
|
||||
|
||||
// For full rebuilds, delete existing data first
|
||||
if is_full_rebuild {
|
||||
info!("[JOB] Full rebuild: deleting existing data");
|
||||
if let Some(library_id) = target_library_id {
|
||||
// Delete books and files for specific library
|
||||
sqlx::query("DELETE FROM book_files WHERE book_id IN (SELECT id FROM books WHERE library_id = $1)")
|
||||
.bind(library_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
sqlx::query("DELETE FROM books WHERE library_id = $1")
|
||||
.bind(library_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
info!("[JOB] Deleted existing data for library {}", library_id);
|
||||
} else {
|
||||
// Delete all books and files
|
||||
sqlx::query("DELETE FROM book_files").execute(&state.pool).await?;
|
||||
sqlx::query("DELETE FROM books").execute(&state.pool).await?;
|
||||
info!("[JOB] Deleted all existing data");
|
||||
}
|
||||
}
|
||||
|
||||
let libraries = if let Some(library_id) = target_library_id {
|
||||
sqlx::query("SELECT id, root_path FROM libraries WHERE id = $1 AND enabled = TRUE")
|
||||
.bind(library_id)
|
||||
@@ -135,6 +191,25 @@ async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Option<U
|
||||
.await?
|
||||
};
|
||||
|
||||
// First pass: count total files for progress estimation
|
||||
let mut total_files = 0usize;
|
||||
for library in &libraries {
|
||||
let root_path: String = library.get("root_path");
|
||||
let root_path = remap_libraries_path(&root_path);
|
||||
for entry in WalkDir::new(&root_path).into_iter().filter_map(Result::ok) {
|
||||
if entry.file_type().is_file() && detect_format(entry.path()).is_some() {
|
||||
total_files += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update job with total estimate
|
||||
sqlx::query("UPDATE index_jobs SET total_files = $2 WHERE id = $1")
|
||||
.bind(job_id)
|
||||
.bind(total_files as i32)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
let mut stats = JobStats {
|
||||
scanned_files: 0,
|
||||
indexed_files: 0,
|
||||
@@ -145,7 +220,8 @@ async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Option<U
|
||||
for library in libraries {
|
||||
let library_id: Uuid = library.get("id");
|
||||
let root_path: String = library.get("root_path");
|
||||
match scan_library(state, library_id, Path::new(&root_path), &mut stats).await {
|
||||
let root_path = remap_libraries_path(&root_path);
|
||||
match scan_library(state, job_id, library_id, Path::new(&root_path), &mut stats, total_files, is_full_rebuild).await {
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
stats.errors += 1;
|
||||
@@ -156,7 +232,7 @@ async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Option<U
|
||||
|
||||
sync_meili(&state.pool, &state.meili_url, &state.meili_master_key).await?;
|
||||
|
||||
sqlx::query("UPDATE index_jobs SET status = 'success', finished_at = NOW(), stats_json = $2 WHERE id = $1")
|
||||
sqlx::query("UPDATE index_jobs SET status = 'success', finished_at = NOW(), stats_json = $2, current_file = NULL WHERE id = $1")
|
||||
.bind(job_id)
|
||||
.bind(serde_json::to_value(&stats)?)
|
||||
.execute(&state.pool)
|
||||
@@ -176,9 +252,12 @@ async fn fail_job(pool: &sqlx::PgPool, job_id: Uuid, error_message: &str) -> any
|
||||
|
||||
async fn scan_library(
|
||||
state: &AppState,
|
||||
job_id: Uuid,
|
||||
library_id: Uuid,
|
||||
root: &Path,
|
||||
stats: &mut JobStats,
|
||||
total_files: usize,
|
||||
is_full_rebuild: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let existing_rows = sqlx::query(
|
||||
r#"
|
||||
@@ -193,14 +272,22 @@ async fn scan_library(
|
||||
.await?;
|
||||
|
||||
let mut existing: HashMap<String, (Uuid, Uuid, String)> = HashMap::new();
|
||||
for row in existing_rows {
|
||||
existing.insert(
|
||||
row.get("abs_path"),
|
||||
(row.get("file_id"), row.get("book_id"), row.get("fingerprint")),
|
||||
);
|
||||
// For full rebuilds, don't use existing files - force reindex of everything
|
||||
if !is_full_rebuild {
|
||||
for row in existing_rows {
|
||||
let abs_path: String = row.get("abs_path");
|
||||
// Remap for local development to match scanned paths
|
||||
let remapped_path = remap_libraries_path(&abs_path);
|
||||
existing.insert(
|
||||
remapped_path,
|
||||
(row.get("file_id"), row.get("book_id"), row.get("fingerprint")),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let mut seen: HashMap<String, bool> = HashMap::new();
|
||||
let mut processed_count = 0i32;
|
||||
|
||||
for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) {
|
||||
if !entry.file_type().is_file() {
|
||||
continue;
|
||||
@@ -212,9 +299,43 @@ async fn scan_library(
|
||||
};
|
||||
|
||||
stats.scanned_files += 1;
|
||||
let abs_path = path.to_string_lossy().to_string();
|
||||
processed_count += 1;
|
||||
let abs_path_local = path.to_string_lossy().to_string();
|
||||
// Convert local path to /libraries format for DB storage
|
||||
let abs_path = unmap_libraries_path(&abs_path_local);
|
||||
let file_name = path.file_name()
|
||||
.map(|s| s.to_string_lossy().to_string())
|
||||
.unwrap_or_else(|| abs_path.clone());
|
||||
|
||||
info!("[SCAN] Job {} processing file {}/{}: {}", job_id, processed_count, total_files, file_name);
|
||||
let start_time = std::time::Instant::now();
|
||||
|
||||
// Update progress in DB
|
||||
let progress_percent = if total_files > 0 {
|
||||
((processed_count as f64 / total_files as f64) * 100.0) as i32
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
let db_start = std::time::Instant::now();
|
||||
sqlx::query(
|
||||
"UPDATE index_jobs SET current_file = $2, processed_files = $3, progress_percent = $4 WHERE id = $1"
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(&file_name)
|
||||
.bind(processed_count)
|
||||
.bind(progress_percent)
|
||||
.execute(&state.pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("[BDD] Failed to update progress for job {}: {}", job_id, e);
|
||||
e
|
||||
})?;
|
||||
info!("[BDD] Progress update took {:?}", db_start.elapsed());
|
||||
|
||||
seen.insert(abs_path.clone(), true);
|
||||
|
||||
let meta_start = std::time::Instant::now();
|
||||
let metadata = std::fs::metadata(path)
|
||||
.with_context(|| format!("cannot stat {}", path.display()))?;
|
||||
let mtime: DateTime<Utc> = metadata
|
||||
@@ -222,14 +343,22 @@ async fn scan_library(
|
||||
.map(DateTime::<Utc>::from)
|
||||
.unwrap_or_else(|_| Utc::now());
|
||||
let fingerprint = compute_fingerprint(path, metadata.len(), &mtime)?;
|
||||
info!("[META] Metadata+fingerprint took {:?}", meta_start.elapsed());
|
||||
|
||||
if let Some((file_id, book_id, old_fingerprint)) = existing.get(&abs_path).cloned() {
|
||||
if old_fingerprint == fingerprint {
|
||||
// Skip fingerprint check for full rebuilds - always reindex
|
||||
if !is_full_rebuild && old_fingerprint == fingerprint {
|
||||
info!("[SKIP] File unchanged, skipping: {} (total time: {:?})", file_name, start_time.elapsed());
|
||||
continue;
|
||||
}
|
||||
|
||||
info!("[PARSER] Starting parse_metadata for: {}", file_name);
|
||||
let parse_start = std::time::Instant::now();
|
||||
match parse_metadata(path, format, root) {
|
||||
Ok(parsed) => {
|
||||
info!("[PARSER] Parsing took {:?} for {} (pages={:?})", parse_start.elapsed(), file_name, parsed.page_count);
|
||||
|
||||
let db_start = std::time::Instant::now();
|
||||
sqlx::query(
|
||||
"UPDATE books SET title = $2, kind = $3, series = $4, volume = $5, page_count = $6, updated_at = NOW() WHERE id = $1",
|
||||
)
|
||||
@@ -252,10 +381,13 @@ async fn scan_library(
|
||||
.bind(fingerprint)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
info!("[BDD] UPDATE took {:?} for {}", db_start.elapsed(), file_name);
|
||||
|
||||
stats.indexed_files += 1;
|
||||
info!("[DONE] Updated file {} (total time: {:?})", file_name, start_time.elapsed());
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("[PARSER] Failed to parse {} after {:?}: {}", file_name, parse_start.elapsed(), err);
|
||||
stats.errors += 1;
|
||||
sqlx::query(
|
||||
"UPDATE book_files SET parse_status = 'error', parse_error_opt = $2, updated_at = NOW() WHERE id = $1",
|
||||
@@ -264,14 +396,29 @@ async fn scan_library(
|
||||
.bind(err.to_string())
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
// Store error in index_job_errors table
|
||||
sqlx::query(
|
||||
"INSERT INTO index_job_errors (job_id, file_path, error_message) VALUES ($1, $2, $3)"
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(&abs_path)
|
||||
.bind(err.to_string())
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
info!("[PARSER] Starting parse_metadata for new file: {}", file_name);
|
||||
let parse_start = std::time::Instant::now();
|
||||
match parse_metadata(path, format, root) {
|
||||
Ok(parsed) => {
|
||||
info!("[PARSER] Parsing took {:?} for {} (pages={:?})", parse_start.elapsed(), file_name, parsed.page_count);
|
||||
|
||||
let db_start = std::time::Instant::now();
|
||||
let book_id = Uuid::new_v4();
|
||||
let file_id = Uuid::new_v4();
|
||||
sqlx::query(
|
||||
@@ -299,10 +446,13 @@ async fn scan_library(
|
||||
.bind(fingerprint)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
info!("[BDD] INSERT took {:?} for {}", db_start.elapsed(), file_name);
|
||||
|
||||
stats.indexed_files += 1;
|
||||
info!("[DONE] Inserted new file {} (total time: {:?})", file_name, start_time.elapsed());
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("[PARSER] Failed to parse {} after {:?}: {}", file_name, parse_start.elapsed(), err);
|
||||
stats.errors += 1;
|
||||
let book_id = Uuid::new_v4();
|
||||
let file_id = Uuid::new_v4();
|
||||
@@ -329,6 +479,16 @@ async fn scan_library(
|
||||
.bind(err.to_string())
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
// Store error in index_job_errors table
|
||||
sqlx::query(
|
||||
"INSERT INTO index_job_errors (job_id, file_path, error_message) VALUES ($1, $2, $3)"
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(&abs_path)
|
||||
.bind(err.to_string())
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -383,7 +543,7 @@ struct SearchDoc {
|
||||
title: String,
|
||||
author: Option<String>,
|
||||
series: Option<String>,
|
||||
volume: Option<String>,
|
||||
volume: Option<i32>,
|
||||
language: Option<String>,
|
||||
}
|
||||
|
||||
@@ -405,6 +565,13 @@ async fn sync_meili(pool: &sqlx::PgPool, meili_url: &str, meili_master_key: &str
|
||||
.send()
|
||||
.await;
|
||||
|
||||
// Clear existing documents to avoid stale data
|
||||
let _ = client
|
||||
.delete(format!("{base}/indexes/books/documents"))
|
||||
.header("Authorization", format!("Bearer {meili_master_key}"))
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let rows = sqlx::query(
|
||||
"SELECT id, library_id, kind, title, author, series, volume, language FROM books",
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user