feat: enhance library scanning and metadata parsing

- Introduce a structured approach to collect book file information before parsing.
- Implement parallel processing for metadata extraction to improve performance.
- Refactor file handling to utilize a new FileInfo struct for better organization.
- Update database interactions to use collected file information for batch inserts.
- Improve logging for scanning and parsing processes to provide better insights.
This commit is contained in:
2026-03-08 21:07:03 +01:00
parent b1844a4f01
commit 9c7120c3dc

View File

@@ -3,7 +3,7 @@ use axum::{extract::State, routing::get, Json, Router};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use axum::http::StatusCode; use axum::http::StatusCode;
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher}; use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
use parsers::{detect_format, parse_metadata, BookFormat}; use parsers::{detect_format, parse_metadata, BookFormat, ParsedMetadata};
use rayon::prelude::*; use rayon::prelude::*;
use serde::Serialize; use serde::Serialize;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
@@ -878,28 +878,78 @@ async fn scan_library(
let mut files_to_insert: Vec<FileInsert> = Vec::with_capacity(BATCH_SIZE); let mut files_to_insert: Vec<FileInsert> = Vec::with_capacity(BATCH_SIZE);
let mut errors_to_insert: Vec<ErrorInsert> = Vec::with_capacity(BATCH_SIZE); let mut errors_to_insert: Vec<ErrorInsert> = Vec::with_capacity(BATCH_SIZE);
// Step 1: Collect all book files first
#[derive(Clone)]
struct FileInfo {
path: std::path::PathBuf,
format: BookFormat,
abs_path: String,
file_name: String,
metadata: std::fs::Metadata,
mtime: DateTime<Utc>,
fingerprint: String,
lookup_path: String,
}
let mut file_infos: Vec<FileInfo> = Vec::new();
for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) { for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) {
if !entry.file_type().is_file() { if !entry.file_type().is_file() {
continue; continue;
} }
let path = entry.path(); let path = entry.path().to_path_buf();
let Some(format) = detect_format(path) else { let Some(format) = detect_format(&path) else {
trace!("[SCAN] Skipping non-book file: {}", path.display()); trace!("[SCAN] Skipping non-book file: {}", path.display());
continue; continue;
}; };
info!("[SCAN] Found book file: {} (format: {:?})", path.display(), format); info!("[SCAN] Found book file: {} (format: {:?})", path.display(), format);
stats.scanned_files += 1; stats.scanned_files += 1;
library_processed_count += 1;
*total_processed_count += 1;
let abs_path_local = path.to_string_lossy().to_string(); let abs_path_local = path.to_string_lossy().to_string();
let abs_path = unmap_libraries_path(&abs_path_local); let abs_path = unmap_libraries_path(&abs_path_local);
let file_name = path.file_name() let file_name = path.file_name()
.map(|s| s.to_string_lossy().to_string()) .map(|s| s.to_string_lossy().to_string())
.unwrap_or_else(|| abs_path.clone()); .unwrap_or_else(|| abs_path.clone());
let start_time = std::time::Instant::now(); let metadata = std::fs::metadata(&path)
.with_context(|| format!("cannot stat {}", path.display()))?;
let mtime: DateTime<Utc> = metadata
.modified()
.map(DateTime::<Utc>::from)
.unwrap_or_else(|_| Utc::now());
let fingerprint = compute_fingerprint(&path, metadata.len(), &mtime)?;
let lookup_path = remap_libraries_path(&abs_path);
file_infos.push(FileInfo {
path,
format,
abs_path,
file_name,
metadata,
mtime,
fingerprint,
lookup_path,
});
}
info!("[SCAN] Collected {} files, starting parallel parsing", file_infos.len());
// Step 2: Parse metadata in parallel
let parsed_results: Vec<(FileInfo, anyhow::Result<ParsedMetadata>)> = file_infos
.into_par_iter()
.map(|file_info| {
let parse_result = parse_metadata(&file_info.path, file_info.format, root);
(file_info, parse_result)
})
.collect();
info!("[SCAN] Completed parallel parsing, processing {} results", parsed_results.len());
// Step 3: Process results sequentially for batch inserts
for (file_info, parse_result) in parsed_results {
library_processed_count += 1;
*total_processed_count += 1;
// Update progress in DB every 1 second or every 10 files // Update progress in DB every 1 second or every 10 files
let should_update_progress = last_progress_update.elapsed() > Duration::from_secs(1) || library_processed_count % 10 == 0; let should_update_progress = last_progress_update.elapsed() > Duration::from_secs(1) || library_processed_count % 10 == 0;
@@ -914,7 +964,7 @@ async fn scan_library(
"UPDATE index_jobs SET current_file = $2, processed_files = $3, progress_percent = $4 WHERE id = $1" "UPDATE index_jobs SET current_file = $2, processed_files = $3, progress_percent = $4 WHERE id = $1"
) )
.bind(job_id) .bind(job_id)
.bind(&file_name) .bind(&file_info.file_name)
.bind(*total_processed_count) .bind(*total_processed_count)
.bind(progress_percent) .bind(progress_percent)
.execute(&state.pool) .execute(&state.pool)
@@ -935,32 +985,23 @@ async fn scan_library(
} }
} }
let seen_key = remap_libraries_path(&abs_path); let seen_key = remap_libraries_path(&file_info.abs_path);
seen.insert(seen_key.clone(), true); seen.insert(seen_key.clone(), true);
let metadata = std::fs::metadata(path) if let Some((file_id, book_id, old_fingerprint)) = existing.get(&file_info.lookup_path).cloned() {
.with_context(|| format!("cannot stat {}", path.display()))?; if !is_full_rebuild && old_fingerprint == file_info.fingerprint {
let mtime: DateTime<Utc> = metadata trace!("[PROCESS] Skipping unchanged file: {}", file_info.file_name);
.modified()
.map(DateTime::<Utc>::from)
.unwrap_or_else(|_| Utc::now());
let fingerprint = compute_fingerprint(path, metadata.len(), &mtime)?;
let lookup_path = remap_libraries_path(&abs_path);
if let Some((file_id, book_id, old_fingerprint)) = existing.get(&lookup_path).cloned() {
if !is_full_rebuild && old_fingerprint == fingerprint {
trace!("[PROCESS] Skipping unchanged file: {}", file_name);
continue; continue;
} }
info!("[PROCESS] Updating existing file: {} (full_rebuild={}, fingerprint_match={})", file_name, is_full_rebuild, old_fingerprint == fingerprint); info!("[PROCESS] Updating existing file: {} (full_rebuild={}, fingerprint_match={})", file_info.file_name, is_full_rebuild, old_fingerprint == file_info.fingerprint);
match parse_metadata(path, format, root) { match parse_result {
Ok(parsed) => { Ok(parsed) => {
books_to_update.push(BookUpdate { books_to_update.push(BookUpdate {
book_id, book_id,
title: parsed.title, title: parsed.title,
kind: kind_from_format(format).to_string(), kind: kind_from_format(file_info.format).to_string(),
series: parsed.series, series: parsed.series,
volume: parsed.volume, volume: parsed.volume,
page_count: parsed.page_count, page_count: parsed.page_count,
@@ -968,29 +1009,29 @@ async fn scan_library(
files_to_update.push(FileUpdate { files_to_update.push(FileUpdate {
file_id, file_id,
format: format.as_str().to_string(), format: file_info.format.as_str().to_string(),
size_bytes: metadata.len() as i64, size_bytes: file_info.metadata.len() as i64,
mtime, mtime: file_info.mtime,
fingerprint, fingerprint: file_info.fingerprint,
}); });
stats.indexed_files += 1; stats.indexed_files += 1;
} }
Err(err) => { Err(err) => {
warn!("[PARSER] Failed to parse {}: {}", file_name, err); warn!("[PARSER] Failed to parse {}: {}", file_info.file_name, err);
stats.errors += 1; stats.errors += 1;
files_to_update.push(FileUpdate { files_to_update.push(FileUpdate {
file_id, file_id,
format: format.as_str().to_string(), format: file_info.format.as_str().to_string(),
size_bytes: metadata.len() as i64, size_bytes: file_info.metadata.len() as i64,
mtime, mtime: file_info.mtime,
fingerprint: fingerprint.clone(), fingerprint: file_info.fingerprint.clone(),
}); });
errors_to_insert.push(ErrorInsert { errors_to_insert.push(ErrorInsert {
job_id, job_id,
file_path: abs_path.clone(), file_path: file_info.abs_path.clone(),
error_message: err.to_string(), error_message: err.to_string(),
}); });
@@ -1014,17 +1055,17 @@ async fn scan_library(
} }
// New file (thumbnails generated by API after job handoff) // New file (thumbnails generated by API after job handoff)
info!("[PROCESS] Inserting new file: {}", file_name); info!("[PROCESS] Inserting new file: {}", file_info.file_name);
let book_id = Uuid::new_v4(); let book_id = Uuid::new_v4();
match parse_metadata(path, format, root) { match parse_result {
Ok(parsed) => { Ok(parsed) => {
let file_id = Uuid::new_v4(); let file_id = Uuid::new_v4();
books_to_insert.push(BookInsert { books_to_insert.push(BookInsert {
book_id, book_id,
library_id, library_id,
kind: kind_from_format(format).to_string(), kind: kind_from_format(file_info.format).to_string(),
title: parsed.title, title: parsed.title,
series: parsed.series, series: parsed.series,
volume: parsed.volume, volume: parsed.volume,
@@ -1035,11 +1076,11 @@ async fn scan_library(
files_to_insert.push(FileInsert { files_to_insert.push(FileInsert {
file_id, file_id,
book_id, book_id,
format: format.as_str().to_string(), format: file_info.format.as_str().to_string(),
abs_path: abs_path.clone(), abs_path: file_info.abs_path.clone(),
size_bytes: metadata.len() as i64, size_bytes: file_info.metadata.len() as i64,
mtime, mtime: file_info.mtime,
fingerprint, fingerprint: file_info.fingerprint,
parse_status: "ok".to_string(), parse_status: "ok".to_string(),
parse_error: None, parse_error: None,
}); });
@@ -1047,7 +1088,7 @@ async fn scan_library(
stats.indexed_files += 1; stats.indexed_files += 1;
} }
Err(err) => { Err(err) => {
warn!("[PARSER] Failed to parse {}: {}", file_name, err); warn!("[PARSER] Failed to parse {}: {}", file_info.file_name, err);
stats.errors += 1; stats.errors += 1;
let book_id = Uuid::new_v4(); let book_id = Uuid::new_v4();
let file_id = Uuid::new_v4(); let file_id = Uuid::new_v4();
@@ -1055,8 +1096,8 @@ async fn scan_library(
books_to_insert.push(BookInsert { books_to_insert.push(BookInsert {
book_id, book_id,
library_id, library_id,
kind: kind_from_format(format).to_string(), kind: kind_from_format(file_info.format).to_string(),
title: file_display_name(path), title: file_display_name(&file_info.path),
series: None, series: None,
volume: None, volume: None,
page_count: None, page_count: None,
@@ -1066,18 +1107,18 @@ async fn scan_library(
files_to_insert.push(FileInsert { files_to_insert.push(FileInsert {
file_id, file_id,
book_id, book_id,
format: format.as_str().to_string(), format: file_info.format.as_str().to_string(),
abs_path: abs_path.clone(), abs_path: file_info.abs_path.clone(),
size_bytes: metadata.len() as i64, size_bytes: file_info.metadata.len() as i64,
mtime, mtime: file_info.mtime,
fingerprint, fingerprint: file_info.fingerprint,
parse_status: "error".to_string(), parse_status: "error".to_string(),
parse_error: Some(err.to_string()), parse_error: Some(err.to_string()),
}); });
errors_to_insert.push(ErrorInsert { errors_to_insert.push(ErrorInsert {
job_id, job_id,
file_path: abs_path, file_path: file_info.abs_path,
error_message: err.to_string(), error_message: err.to_string(),
}); });
} }
@@ -1087,8 +1128,6 @@ async fn scan_library(
if books_to_insert.len() >= BATCH_SIZE || files_to_insert.len() >= BATCH_SIZE { if books_to_insert.len() >= BATCH_SIZE || files_to_insert.len() >= BATCH_SIZE {
flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?;
} }
trace!("[DONE] Processed file {} (total time: {:?})", file_name, start_time.elapsed());
} }
// Final flush of any remaining items // Final flush of any remaining items