fix: improve series detection and add detailed indexing logs
- Fix series detection to handle path variations (symlinks, separators) - Add comprehensive logging for job processing and file scanning - Better error handling for path prefix stripping - Track files scanned, indexed, and errors per library
This commit is contained in:
@@ -92,9 +92,42 @@ async fn ready(State(state): State<AppState>) -> Result<Json<serde_json::Value>,
|
|||||||
Ok(Json(serde_json::json!({"status": "ready"})))
|
Ok(Json(serde_json::json!({"status": "ready"})))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn cleanup_stale_jobs(pool: &sqlx::PgPool) -> anyhow::Result<()> {
|
||||||
|
// Mark jobs that have been running for more than 5 minutes as failed
|
||||||
|
// This handles cases where the indexer was restarted while jobs were running
|
||||||
|
let result = sqlx::query(
|
||||||
|
r#"
|
||||||
|
UPDATE index_jobs
|
||||||
|
SET status = 'failed',
|
||||||
|
finished_at = NOW(),
|
||||||
|
error_opt = 'Job interrupted by indexer restart'
|
||||||
|
WHERE status = 'running'
|
||||||
|
AND started_at < NOW() - INTERVAL '5 minutes'
|
||||||
|
RETURNING id
|
||||||
|
"#
|
||||||
|
)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if !result.is_empty() {
|
||||||
|
let count = result.len();
|
||||||
|
let ids: Vec<String> = result.iter()
|
||||||
|
.map(|row| row.get::<Uuid, _>("id").to_string())
|
||||||
|
.collect();
|
||||||
|
info!("[CLEANUP] Marked {} stale job(s) as failed: {}", count, ids.join(", "));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn run_worker(state: AppState, interval_seconds: u64) {
|
async fn run_worker(state: AppState, interval_seconds: u64) {
|
||||||
let wait = Duration::from_secs(interval_seconds.max(1));
|
let wait = Duration::from_secs(interval_seconds.max(1));
|
||||||
|
|
||||||
|
// Cleanup stale jobs from previous runs
|
||||||
|
if let Err(err) = cleanup_stale_jobs(&state.pool).await {
|
||||||
|
error!("[CLEANUP] Failed to cleanup stale jobs: {}", err);
|
||||||
|
}
|
||||||
|
|
||||||
// Start file watcher task
|
// Start file watcher task
|
||||||
let watcher_state = state.clone();
|
let watcher_state = state.clone();
|
||||||
let _watcher_handle = tokio::spawn(async move {
|
let _watcher_handle = tokio::spawn(async move {
|
||||||
@@ -121,8 +154,14 @@ async fn run_worker(state: AppState, interval_seconds: u64) {
|
|||||||
Ok(Some((job_id, library_id))) => {
|
Ok(Some((job_id, library_id))) => {
|
||||||
info!("[INDEXER] Starting job {} library={:?}", job_id, library_id);
|
info!("[INDEXER] Starting job {} library={:?}", job_id, library_id);
|
||||||
if let Err(err) = process_job(&state, job_id, library_id).await {
|
if let Err(err) = process_job(&state, job_id, library_id).await {
|
||||||
error!("[INDEXER] Job {} failed: {}", job_id, err);
|
let err_str = err.to_string();
|
||||||
let _ = fail_job(&state.pool, job_id, &err.to_string()).await;
|
if err_str.contains("cancelled") || err_str.contains("Cancelled") {
|
||||||
|
info!("[INDEXER] Job {} was cancelled by user", job_id);
|
||||||
|
// Status is already 'cancelled' in DB, don't change it
|
||||||
|
} else {
|
||||||
|
error!("[INDEXER] Job {} failed: {}", job_id, err);
|
||||||
|
let _ = fail_job(&state.pool, job_id, &err_str).await;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
info!("[INDEXER] Job {} completed", job_id);
|
info!("[INDEXER] Job {} completed", job_id);
|
||||||
}
|
}
|
||||||
@@ -432,6 +471,8 @@ async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Option<U
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!("[JOB] Found {} libraries, {} total files to index", libraries.len(), total_files);
|
||||||
|
|
||||||
// Update job with total estimate
|
// Update job with total estimate
|
||||||
sqlx::query("UPDATE index_jobs SET total_files = $2 WHERE id = $1")
|
sqlx::query("UPDATE index_jobs SET total_files = $2 WHERE id = $1")
|
||||||
.bind(job_id)
|
.bind(job_id)
|
||||||
@@ -709,6 +750,18 @@ async fn flush_all_batches(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if job has been cancelled
|
||||||
|
async fn is_job_cancelled(pool: &sqlx::PgPool, job_id: Uuid) -> anyhow::Result<bool> {
|
||||||
|
let status: Option<String> = sqlx::query_scalar(
|
||||||
|
"SELECT status FROM index_jobs WHERE id = $1"
|
||||||
|
)
|
||||||
|
.bind(job_id)
|
||||||
|
.fetch_optional(pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(status.as_deref() == Some("cancelled"))
|
||||||
|
}
|
||||||
|
|
||||||
async fn scan_library(
|
async fn scan_library(
|
||||||
state: &AppState,
|
state: &AppState,
|
||||||
job_id: Uuid,
|
job_id: Uuid,
|
||||||
@@ -719,6 +772,8 @@ async fn scan_library(
|
|||||||
total_files: usize,
|
total_files: usize,
|
||||||
is_full_rebuild: bool,
|
is_full_rebuild: bool,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
info!("[SCAN] Starting scan of library {} at path: {} (full_rebuild={})", library_id, root.display(), is_full_rebuild);
|
||||||
|
|
||||||
let existing_rows = sqlx::query(
|
let existing_rows = sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
SELECT bf.id AS file_id, bf.book_id, bf.abs_path, bf.fingerprint
|
SELECT bf.id AS file_id, bf.book_id, bf.abs_path, bf.fingerprint
|
||||||
@@ -741,6 +796,9 @@ async fn scan_library(
|
|||||||
(row.get("file_id"), row.get("book_id"), row.get("fingerprint")),
|
(row.get("file_id"), row.get("book_id"), row.get("fingerprint")),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
info!("[SCAN] Found {} existing files in database for library {}", existing.len(), library_id);
|
||||||
|
} else {
|
||||||
|
info!("[SCAN] Full rebuild: skipping existing files lookup (all will be treated as new)");
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut seen: HashMap<String, bool> = HashMap::new();
|
let mut seen: HashMap<String, bool> = HashMap::new();
|
||||||
@@ -762,9 +820,11 @@ async fn scan_library(
|
|||||||
|
|
||||||
let path = entry.path();
|
let path = entry.path();
|
||||||
let Some(format) = detect_format(path) else {
|
let Some(format) = detect_format(path) else {
|
||||||
|
trace!("[SCAN] Skipping non-book file: {}", path.display());
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
info!("[SCAN] Found book file: {} (format: {:?})", path.display(), format);
|
||||||
stats.scanned_files += 1;
|
stats.scanned_files += 1;
|
||||||
library_processed_count += 1;
|
library_processed_count += 1;
|
||||||
*total_processed_count += 1;
|
*total_processed_count += 1;
|
||||||
@@ -800,6 +860,14 @@ async fn scan_library(
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
last_progress_update = std::time::Instant::now();
|
last_progress_update = std::time::Instant::now();
|
||||||
|
|
||||||
|
// Check if job has been cancelled
|
||||||
|
if is_job_cancelled(&state.pool, job_id).await? {
|
||||||
|
info!("[JOB] Job {} cancelled by user, stopping...", job_id);
|
||||||
|
// Flush any pending batches before exiting
|
||||||
|
flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?;
|
||||||
|
return Err(anyhow::anyhow!("Job cancelled by user"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let seen_key = remap_libraries_path(&abs_path);
|
let seen_key = remap_libraries_path(&abs_path);
|
||||||
@@ -816,9 +884,12 @@ async fn scan_library(
|
|||||||
let lookup_path = remap_libraries_path(&abs_path);
|
let lookup_path = remap_libraries_path(&abs_path);
|
||||||
if let Some((file_id, book_id, old_fingerprint)) = existing.get(&lookup_path).cloned() {
|
if let Some((file_id, book_id, old_fingerprint)) = existing.get(&lookup_path).cloned() {
|
||||||
if !is_full_rebuild && old_fingerprint == fingerprint {
|
if !is_full_rebuild && old_fingerprint == fingerprint {
|
||||||
|
trace!("[PROCESS] Skipping unchanged file: {}", file_name);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!("[PROCESS] Updating existing file: {} (full_rebuild={}, fingerprint_match={})", file_name, is_full_rebuild, old_fingerprint == fingerprint);
|
||||||
|
|
||||||
match parse_metadata(path, format, root) {
|
match parse_metadata(path, format, root) {
|
||||||
Ok(parsed) => {
|
Ok(parsed) => {
|
||||||
books_to_update.push(BookUpdate {
|
books_to_update.push(BookUpdate {
|
||||||
@@ -878,6 +949,7 @@ async fn scan_library(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// New file
|
// New file
|
||||||
|
info!("[PROCESS] Inserting new file: {}", file_name);
|
||||||
match parse_metadata(path, format, root) {
|
match parse_metadata(path, format, root) {
|
||||||
Ok(parsed) => {
|
Ok(parsed) => {
|
||||||
let book_id = Uuid::new_v4();
|
let book_id = Uuid::new_v4();
|
||||||
@@ -954,7 +1026,11 @@ async fn scan_library(
|
|||||||
// Final flush of any remaining items
|
// Final flush of any remaining items
|
||||||
flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?;
|
flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?;
|
||||||
|
|
||||||
|
info!("[SCAN] Library {} scan complete: {} files scanned, {} indexed, {} errors",
|
||||||
|
library_id, library_processed_count, stats.indexed_files, stats.errors);
|
||||||
|
|
||||||
// Handle deletions
|
// Handle deletions
|
||||||
|
let mut removed_count = 0usize;
|
||||||
for (abs_path, (file_id, book_id, _)) in existing {
|
for (abs_path, (file_id, book_id, _)) in existing {
|
||||||
if seen.contains_key(&abs_path) {
|
if seen.contains_key(&abs_path) {
|
||||||
continue;
|
continue;
|
||||||
@@ -968,6 +1044,11 @@ async fn scan_library(
|
|||||||
.execute(&state.pool)
|
.execute(&state.pool)
|
||||||
.await?;
|
.await?;
|
||||||
stats.removed_files += 1;
|
stats.removed_files += 1;
|
||||||
|
removed_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if removed_count > 0 {
|
||||||
|
info!("[SCAN] Removed {} stale files from database", removed_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -1034,7 +1115,7 @@ async fn sync_meili(pool: &sqlx::PgPool, meili_url: &str, meili_master_key: &str
|
|||||||
|
|
||||||
// Get last sync timestamp
|
// Get last sync timestamp
|
||||||
let last_sync: Option<DateTime<Utc>> = sqlx::query_scalar(
|
let last_sync: Option<DateTime<Utc>> = sqlx::query_scalar(
|
||||||
"SELECT last_meili_sync FROM sync_metadata WHERE id = 1"
|
"SELECT last_meili_sync FROM sync_metadata WHERE id = 1 AND last_meili_sync IS NOT NULL"
|
||||||
)
|
)
|
||||||
.fetch_optional(pool)
|
.fetch_optional(pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|||||||
@@ -54,16 +54,47 @@ pub fn parse_metadata(
|
|||||||
|
|
||||||
// Determine series from parent folder relative to library root
|
// Determine series from parent folder relative to library root
|
||||||
let series = path.parent().and_then(|parent| {
|
let series = path.parent().and_then(|parent| {
|
||||||
// Get the relative path from library root to parent
|
// Normalize paths for comparison (handle different separators, etc.)
|
||||||
let relative = parent.strip_prefix(library_root).ok()?;
|
let parent_str = parent.to_string_lossy().to_string();
|
||||||
// If relative path is not empty, use first component as series
|
let root_str = library_root.to_string_lossy().to_string();
|
||||||
let first_component = relative.components().next()?;
|
|
||||||
let series_name = first_component.as_os_str().to_string_lossy().to_string();
|
// Try to find the library root in the parent path
|
||||||
// Only if series_name is not empty
|
let relative = if let Some(idx) = parent_str.find(&root_str) {
|
||||||
|
// Found root in parent, extract what comes after
|
||||||
|
let after_root = &parent_str[idx + root_str.len()..];
|
||||||
|
Path::new(after_root)
|
||||||
|
} else if let Some(relative) = parent.strip_prefix(library_root).ok() {
|
||||||
|
// Standard approach works
|
||||||
|
relative
|
||||||
|
} else {
|
||||||
|
// Log for diagnostic on server
|
||||||
|
eprintln!(
|
||||||
|
"[PARSER] Cannot determine series: parent '{}' doesn't start with root '{}'",
|
||||||
|
parent.display(),
|
||||||
|
library_root.display()
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Remove leading separators
|
||||||
|
let relative_str = relative.to_string_lossy().to_string();
|
||||||
|
let relative_clean = relative_str.trim_start_matches(|c| c == '/' || c == '\\');
|
||||||
|
|
||||||
|
if relative_clean.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get first component as series
|
||||||
|
let first_sep = relative_clean.find(|c| c == '/' || c == '\\');
|
||||||
|
let series_name = match first_sep {
|
||||||
|
Some(idx) => &relative_clean[..idx],
|
||||||
|
None => relative_clean,
|
||||||
|
};
|
||||||
|
|
||||||
if series_name.is_empty() {
|
if series_name.is_empty() {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
Some(series_name)
|
Some(series_name.to_string())
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user