From f0a967515b4fa35631eb6f1b40b1aeaca4aefbd2 Mon Sep 17 00:00:00 2001 From: Froidefond Julien Date: Fri, 6 Mar 2026 22:35:11 +0100 Subject: [PATCH] fix: improve series detection and add detailed indexing logs - Fix series detection to handle path variations (symlinks, separators) - Add comprehensive logging for job processing and file scanning - Better error handling for path prefix stripping - Track files scanned, indexed, and errors per library --- apps/indexer/src/main.rs | 87 +++++++++++++++++++++++++++++++++++++-- crates/parsers/src/lib.rs | 45 ++++++++++++++++---- 2 files changed, 122 insertions(+), 10 deletions(-) diff --git a/apps/indexer/src/main.rs b/apps/indexer/src/main.rs index b6ffe42..31fdd37 100644 --- a/apps/indexer/src/main.rs +++ b/apps/indexer/src/main.rs @@ -92,8 +92,41 @@ async fn ready(State(state): State) -> Result, Ok(Json(serde_json::json!({"status": "ready"}))) } +async fn cleanup_stale_jobs(pool: &sqlx::PgPool) -> anyhow::Result<()> { + // Mark jobs that have been running for more than 5 minutes as failed + // This handles cases where the indexer was restarted while jobs were running + let result = sqlx::query( + r#" + UPDATE index_jobs + SET status = 'failed', + finished_at = NOW(), + error_opt = 'Job interrupted by indexer restart' + WHERE status = 'running' + AND started_at < NOW() - INTERVAL '5 minutes' + RETURNING id + "# + ) + .fetch_all(pool) + .await?; + + if !result.is_empty() { + let count = result.len(); + let ids: Vec = result.iter() + .map(|row| row.get::("id").to_string()) + .collect(); + info!("[CLEANUP] Marked {} stale job(s) as failed: {}", count, ids.join(", ")); + } + + Ok(()) +} + async fn run_worker(state: AppState, interval_seconds: u64) { let wait = Duration::from_secs(interval_seconds.max(1)); + + // Cleanup stale jobs from previous runs + if let Err(err) = cleanup_stale_jobs(&state.pool).await { + error!("[CLEANUP] Failed to cleanup stale jobs: {}", err); + } // Start file watcher task let watcher_state = state.clone(); @@ -121,8 +154,14 @@ async fn run_worker(state: AppState, interval_seconds: u64) { Ok(Some((job_id, library_id))) => { info!("[INDEXER] Starting job {} library={:?}", job_id, library_id); if let Err(err) = process_job(&state, job_id, library_id).await { - error!("[INDEXER] Job {} failed: {}", job_id, err); - let _ = fail_job(&state.pool, job_id, &err.to_string()).await; + let err_str = err.to_string(); + if err_str.contains("cancelled") || err_str.contains("Cancelled") { + info!("[INDEXER] Job {} was cancelled by user", job_id); + // Status is already 'cancelled' in DB, don't change it + } else { + error!("[INDEXER] Job {} failed: {}", job_id, err); + let _ = fail_job(&state.pool, job_id, &err_str).await; + } } else { info!("[INDEXER] Job {} completed", job_id); } @@ -432,6 +471,8 @@ async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Option anyhow::Result { + let status: Option = sqlx::query_scalar( + "SELECT status FROM index_jobs WHERE id = $1" + ) + .bind(job_id) + .fetch_optional(pool) + .await?; + + Ok(status.as_deref() == Some("cancelled")) +} + async fn scan_library( state: &AppState, job_id: Uuid, @@ -719,6 +772,8 @@ async fn scan_library( total_files: usize, is_full_rebuild: bool, ) -> anyhow::Result<()> { + info!("[SCAN] Starting scan of library {} at path: {} (full_rebuild={})", library_id, root.display(), is_full_rebuild); + let existing_rows = sqlx::query( r#" SELECT bf.id AS file_id, bf.book_id, bf.abs_path, bf.fingerprint @@ -741,6 +796,9 @@ async fn scan_library( (row.get("file_id"), row.get("book_id"), row.get("fingerprint")), ); } + info!("[SCAN] Found {} existing files in database for library {}", existing.len(), library_id); + } else { + info!("[SCAN] Full rebuild: skipping existing files lookup (all will be treated as new)"); } let mut seen: HashMap = HashMap::new(); @@ -762,9 +820,11 @@ async fn scan_library( let path = entry.path(); let Some(format) = detect_format(path) else { + trace!("[SCAN] Skipping non-book file: {}", path.display()); continue; }; + info!("[SCAN] Found book file: {} (format: {:?})", path.display(), format); stats.scanned_files += 1; library_processed_count += 1; *total_processed_count += 1; @@ -800,6 +860,14 @@ async fn scan_library( })?; last_progress_update = std::time::Instant::now(); + + // Check if job has been cancelled + if is_job_cancelled(&state.pool, job_id).await? { + info!("[JOB] Job {} cancelled by user, stopping...", job_id); + // Flush any pending batches before exiting + flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; + return Err(anyhow::anyhow!("Job cancelled by user")); + } } let seen_key = remap_libraries_path(&abs_path); @@ -816,9 +884,12 @@ async fn scan_library( let lookup_path = remap_libraries_path(&abs_path); if let Some((file_id, book_id, old_fingerprint)) = existing.get(&lookup_path).cloned() { if !is_full_rebuild && old_fingerprint == fingerprint { + trace!("[PROCESS] Skipping unchanged file: {}", file_name); continue; } + info!("[PROCESS] Updating existing file: {} (full_rebuild={}, fingerprint_match={})", file_name, is_full_rebuild, old_fingerprint == fingerprint); + match parse_metadata(path, format, root) { Ok(parsed) => { books_to_update.push(BookUpdate { @@ -878,6 +949,7 @@ async fn scan_library( } // New file + info!("[PROCESS] Inserting new file: {}", file_name); match parse_metadata(path, format, root) { Ok(parsed) => { let book_id = Uuid::new_v4(); @@ -954,7 +1026,11 @@ async fn scan_library( // Final flush of any remaining items flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; + info!("[SCAN] Library {} scan complete: {} files scanned, {} indexed, {} errors", + library_id, library_processed_count, stats.indexed_files, stats.errors); + // Handle deletions + let mut removed_count = 0usize; for (abs_path, (file_id, book_id, _)) in existing { if seen.contains_key(&abs_path) { continue; @@ -968,6 +1044,11 @@ async fn scan_library( .execute(&state.pool) .await?; stats.removed_files += 1; + removed_count += 1; + } + + if removed_count > 0 { + info!("[SCAN] Removed {} stale files from database", removed_count); } Ok(()) @@ -1034,7 +1115,7 @@ async fn sync_meili(pool: &sqlx::PgPool, meili_url: &str, meili_master_key: &str // Get last sync timestamp let last_sync: Option> = sqlx::query_scalar( - "SELECT last_meili_sync FROM sync_metadata WHERE id = 1" + "SELECT last_meili_sync FROM sync_metadata WHERE id = 1 AND last_meili_sync IS NOT NULL" ) .fetch_optional(pool) .await?; diff --git a/crates/parsers/src/lib.rs b/crates/parsers/src/lib.rs index 52e9e90..e983a85 100644 --- a/crates/parsers/src/lib.rs +++ b/crates/parsers/src/lib.rs @@ -54,16 +54,47 @@ pub fn parse_metadata( // Determine series from parent folder relative to library root let series = path.parent().and_then(|parent| { - // Get the relative path from library root to parent - let relative = parent.strip_prefix(library_root).ok()?; - // If relative path is not empty, use first component as series - let first_component = relative.components().next()?; - let series_name = first_component.as_os_str().to_string_lossy().to_string(); - // Only if series_name is not empty + // Normalize paths for comparison (handle different separators, etc.) + let parent_str = parent.to_string_lossy().to_string(); + let root_str = library_root.to_string_lossy().to_string(); + + // Try to find the library root in the parent path + let relative = if let Some(idx) = parent_str.find(&root_str) { + // Found root in parent, extract what comes after + let after_root = &parent_str[idx + root_str.len()..]; + Path::new(after_root) + } else if let Some(relative) = parent.strip_prefix(library_root).ok() { + // Standard approach works + relative + } else { + // Log for diagnostic on server + eprintln!( + "[PARSER] Cannot determine series: parent '{}' doesn't start with root '{}'", + parent.display(), + library_root.display() + ); + return None; + }; + + // Remove leading separators + let relative_str = relative.to_string_lossy().to_string(); + let relative_clean = relative_str.trim_start_matches(|c| c == '/' || c == '\\'); + + if relative_clean.is_empty() { + return None; + } + + // Get first component as series + let first_sep = relative_clean.find(|c| c == '/' || c == '\\'); + let series_name = match first_sep { + Some(idx) => &relative_clean[..idx], + None => relative_clean, + }; + if series_name.is_empty() { None } else { - Some(series_name) + Some(series_name.to_string()) } });