feat: two-phase indexation with direct thumbnail generation in indexer
Phase 1 (discovery): walkdir + filename-only metadata, zero archive I/O. Books are visible immediately in the UI while Phase 2 runs in background. Phase 2 (analysis): open each archive once via analyze_book() to extract page_count and first page bytes, then generate WebP thumbnail directly in the indexer — removing the HTTP roundtrip to the API checkup endpoint. - Add parse_metadata_fast() (infallible, no archive I/O) - Add analyze_book() returning (page_count, first_page_bytes) in one pass - Add looks_like_image() magic bytes check for unrar p stdout validation - Add lsar fallback in list_cbr_images() for UTF-16BE encoded filenames - Add directory_mtimes table to skip unchanged dirs on incremental scans - Add analyzer.rs: generate_thumbnail, analyze_library_books, regenerate_thumbnails - Remove run_checkup() from API; indexer handles thumbnail jobs directly - Remove api_base_url/api_bootstrap_token from IndexerConfig and AppState - Add unar + poppler-utils to indexer Dockerfile - Fix smoke.sh: wait for job completion, check thumbnail_url field Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,63 +1,60 @@
|
||||
use anyhow::Result;
|
||||
use rayon::prelude::*;
|
||||
use sqlx::{PgPool, Row};
|
||||
use std::time::Duration;
|
||||
use tracing::{error, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{meili, scanner, AppState};
|
||||
use crate::{analyzer, meili, scanner, AppState};
|
||||
|
||||
pub async fn cleanup_stale_jobs(pool: &PgPool) -> Result<()> {
|
||||
// Mark jobs that have been running for more than 5 minutes as failed
|
||||
// This handles cases where the indexer was restarted while jobs were running
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
UPDATE index_jobs
|
||||
SET status = 'failed',
|
||||
finished_at = NOW(),
|
||||
UPDATE index_jobs
|
||||
SET status = 'failed',
|
||||
finished_at = NOW(),
|
||||
error_opt = 'Job interrupted by indexer restart'
|
||||
WHERE status = 'running'
|
||||
WHERE status = 'running'
|
||||
AND started_at < NOW() - INTERVAL '5 minutes'
|
||||
RETURNING id
|
||||
"#
|
||||
"#,
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
|
||||
if !result.is_empty() {
|
||||
let count = result.len();
|
||||
let ids: Vec<String> = result.iter()
|
||||
let ids: Vec<String> = result
|
||||
.iter()
|
||||
.map(|row| row.get::<Uuid, _>("id").to_string())
|
||||
.collect();
|
||||
info!("[CLEANUP] Marked {} stale job(s) as failed: {}", count, ids.join(", "));
|
||||
info!(
|
||||
"[CLEANUP] Marked {} stale job(s) as failed: {}",
|
||||
count,
|
||||
ids.join(", ")
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn claim_next_job(pool: &PgPool) -> Result<Option<(Uuid, Option<Uuid>)>> {
|
||||
let mut tx = pool.begin().await?;
|
||||
|
||||
// Atomically select and lock the next job
|
||||
// Exclude rebuild/full_rebuild if one is already running
|
||||
// Prioritize: full_rebuild > rebuild > others
|
||||
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT j.id, j.type, j.library_id
|
||||
FROM index_jobs j
|
||||
WHERE j.status = 'pending'
|
||||
AND (
|
||||
-- Allow rebuilds only if no rebuild is running
|
||||
(j.type IN ('rebuild', 'full_rebuild') AND NOT EXISTS (
|
||||
SELECT 1 FROM index_jobs
|
||||
WHERE status = 'running'
|
||||
AND type IN ('rebuild', 'full_rebuild')
|
||||
))
|
||||
OR
|
||||
-- Always allow non-rebuild jobs
|
||||
j.type NOT IN ('rebuild', 'full_rebuild')
|
||||
)
|
||||
ORDER BY
|
||||
ORDER BY
|
||||
CASE j.type
|
||||
WHEN 'full_rebuild' THEN 1
|
||||
WHEN 'rebuild' THEN 2
|
||||
@@ -66,7 +63,7 @@ pub async fn claim_next_job(pool: &PgPool) -> Result<Option<(Uuid, Option<Uuid>)
|
||||
j.created_at ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
LIMIT 1
|
||||
"#
|
||||
"#,
|
||||
)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
@@ -79,8 +76,7 @@ pub async fn claim_next_job(pool: &PgPool) -> Result<Option<(Uuid, Option<Uuid>)
|
||||
let id: Uuid = row.get("id");
|
||||
let job_type: String = row.get("type");
|
||||
let library_id: Option<Uuid> = row.get("library_id");
|
||||
|
||||
// Final check: if this is a rebuild, ensure no rebuild started between SELECT and UPDATE
|
||||
|
||||
if job_type == "rebuild" || job_type == "full_rebuild" {
|
||||
let has_running_rebuild: bool = sqlx::query_scalar(
|
||||
r#"
|
||||
@@ -90,48 +86,55 @@ pub async fn claim_next_job(pool: &PgPool) -> Result<Option<(Uuid, Option<Uuid>)
|
||||
AND type IN ('rebuild', 'full_rebuild')
|
||||
AND id != $1
|
||||
)
|
||||
"#
|
||||
"#,
|
||||
)
|
||||
.bind(id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
|
||||
|
||||
if has_running_rebuild {
|
||||
tx.rollback().await?;
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
sqlx::query("UPDATE index_jobs SET status = 'running', started_at = NOW(), error_opt = NULL WHERE id = $1")
|
||||
.bind(id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
sqlx::query(
|
||||
"UPDATE index_jobs SET status = 'running', started_at = NOW(), error_opt = NULL WHERE id = $1",
|
||||
)
|
||||
.bind(id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(Some((id, library_id)))
|
||||
}
|
||||
|
||||
pub async fn fail_job(pool: &PgPool, job_id: Uuid, error_message: &str) -> Result<()> {
|
||||
sqlx::query("UPDATE index_jobs SET status = 'failed', finished_at = NOW(), error_opt = $2 WHERE id = $1")
|
||||
.bind(job_id)
|
||||
.bind(error_message)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
sqlx::query(
|
||||
"UPDATE index_jobs SET status = 'failed', finished_at = NOW(), error_opt = $2 WHERE id = $1",
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(error_message)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn is_job_cancelled(pool: &PgPool, job_id: Uuid) -> Result<bool> {
|
||||
let status: Option<String> = sqlx::query_scalar(
|
||||
"SELECT status FROM index_jobs WHERE id = $1"
|
||||
)
|
||||
.bind(job_id)
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
|
||||
let status: Option<String> =
|
||||
sqlx::query_scalar("SELECT status FROM index_jobs WHERE id = $1")
|
||||
.bind(job_id)
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
|
||||
Ok(status.as_deref() == Some("cancelled"))
|
||||
}
|
||||
|
||||
pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Option<Uuid>) -> Result<()> {
|
||||
pub async fn process_job(
|
||||
state: &AppState,
|
||||
job_id: Uuid,
|
||||
target_library_id: Option<Uuid>,
|
||||
) -> Result<()> {
|
||||
info!("[JOB] Processing {} library={:?}", job_id, target_library_id);
|
||||
|
||||
let job_type: String = sqlx::query_scalar("SELECT type FROM index_jobs WHERE id = $1")
|
||||
@@ -139,8 +142,8 @@ pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Opti
|
||||
.fetch_one(&state.pool)
|
||||
.await?;
|
||||
|
||||
// Thumbnail jobs: hand off to API and wait for completion (same queue as rebuilds)
|
||||
if job_type == "thumbnail_rebuild" || job_type == "thumbnail_regenerate" {
|
||||
// Thumbnail rebuild: generate thumbnails for books missing them
|
||||
if job_type == "thumbnail_rebuild" {
|
||||
sqlx::query(
|
||||
"UPDATE index_jobs SET status = 'generating_thumbnails', started_at = NOW() WHERE id = $1",
|
||||
)
|
||||
@@ -148,54 +151,65 @@ pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Opti
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
let api_base = state.api_base_url.trim_end_matches('/');
|
||||
let url = format!("{}/index/jobs/{}/thumbnails/checkup", api_base, job_id);
|
||||
let client = reqwest::Client::new();
|
||||
let res = client
|
||||
.post(&url)
|
||||
.header("Authorization", format!("Bearer {}", state.api_bootstrap_token))
|
||||
.send()
|
||||
.await?;
|
||||
if !res.status().is_success() {
|
||||
anyhow::bail!("thumbnail checkup API returned {}", res.status());
|
||||
}
|
||||
analyzer::analyze_library_books(state, job_id, target_library_id, true).await?;
|
||||
|
||||
// Poll until job is finished (API updates the same row)
|
||||
let poll_interval = Duration::from_secs(1);
|
||||
loop {
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
let status: String = sqlx::query_scalar("SELECT status FROM index_jobs WHERE id = $1")
|
||||
.bind(job_id)
|
||||
.fetch_one(&state.pool)
|
||||
.await?;
|
||||
if status == "success" || status == "failed" {
|
||||
info!("[JOB] Thumbnail job {} finished with status {}", job_id, status);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
sqlx::query(
|
||||
"UPDATE index_jobs SET status = 'success', finished_at = NOW(), progress_percent = 100, current_file = NULL WHERE id = $1",
|
||||
)
|
||||
.bind(job_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Thumbnail regenerate: clear all thumbnails then re-generate
|
||||
if job_type == "thumbnail_regenerate" {
|
||||
sqlx::query(
|
||||
"UPDATE index_jobs SET status = 'generating_thumbnails', started_at = NOW() WHERE id = $1",
|
||||
)
|
||||
.bind(job_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
analyzer::regenerate_thumbnails(state, job_id, target_library_id).await?;
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE index_jobs SET status = 'success', finished_at = NOW(), progress_percent = 100, current_file = NULL WHERE id = $1",
|
||||
)
|
||||
.bind(job_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let is_full_rebuild = job_type == "full_rebuild";
|
||||
info!("[JOB] {} type={} full_rebuild={}", job_id, job_type, is_full_rebuild);
|
||||
info!(
|
||||
"[JOB] {} type={} full_rebuild={}",
|
||||
job_id, job_type, is_full_rebuild
|
||||
);
|
||||
|
||||
// For full rebuilds, delete existing data first
|
||||
// Full rebuild: delete existing data first
|
||||
if is_full_rebuild {
|
||||
info!("[JOB] Full rebuild: deleting existing data");
|
||||
|
||||
if let Some(library_id) = target_library_id {
|
||||
// Delete books and files for specific library
|
||||
sqlx::query("DELETE FROM book_files WHERE book_id IN (SELECT id FROM books WHERE library_id = $1)")
|
||||
.bind(library_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
sqlx::query(
|
||||
"DELETE FROM book_files WHERE book_id IN (SELECT id FROM books WHERE library_id = $1)",
|
||||
)
|
||||
.bind(library_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
sqlx::query("DELETE FROM books WHERE library_id = $1")
|
||||
.bind(library_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
info!("[JOB] Deleted existing data for library {}", library_id);
|
||||
} else {
|
||||
// Delete all books and files
|
||||
sqlx::query("DELETE FROM book_files").execute(&state.pool).await?;
|
||||
sqlx::query("DELETE FROM book_files")
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
sqlx::query("DELETE FROM books").execute(&state.pool).await?;
|
||||
info!("[JOB] Deleted all existing data");
|
||||
}
|
||||
@@ -212,24 +226,34 @@ pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Opti
|
||||
.await?
|
||||
};
|
||||
|
||||
// First pass: count total files for progress estimation (parallel)
|
||||
let library_paths: Vec<String> = libraries.iter()
|
||||
.map(|library| crate::utils::remap_libraries_path(&library.get::<String, _>("root_path")))
|
||||
// Count total files for progress estimation
|
||||
let library_paths: Vec<String> = libraries
|
||||
.iter()
|
||||
.map(|library| {
|
||||
crate::utils::remap_libraries_path(&library.get::<String, _>("root_path"))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let total_files: usize = library_paths.par_iter()
|
||||
|
||||
let total_files: usize = library_paths
|
||||
.par_iter()
|
||||
.map(|root_path| {
|
||||
walkdir::WalkDir::new(root_path)
|
||||
.into_iter()
|
||||
.filter_map(Result::ok)
|
||||
.filter(|entry| entry.file_type().is_file() && parsers::detect_format(entry.path()).is_some())
|
||||
.filter(|entry| {
|
||||
entry.file_type().is_file()
|
||||
&& parsers::detect_format(entry.path()).is_some()
|
||||
})
|
||||
.count()
|
||||
})
|
||||
.sum();
|
||||
|
||||
info!("[JOB] Found {} libraries, {} total files to index", libraries.len(), total_files);
|
||||
|
||||
// Update job with total estimate
|
||||
info!(
|
||||
"[JOB] Found {} libraries, {} total files to index",
|
||||
libraries.len(),
|
||||
total_files
|
||||
);
|
||||
|
||||
sqlx::query("UPDATE index_jobs SET total_files = $2 WHERE id = $1")
|
||||
.bind(job_id)
|
||||
.bind(total_files as i32)
|
||||
@@ -242,26 +266,47 @@ pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Opti
|
||||
removed_files: 0,
|
||||
errors: 0,
|
||||
};
|
||||
|
||||
// Track processed files across all libraries for accurate progress
|
||||
|
||||
let mut total_processed_count = 0i32;
|
||||
|
||||
for library in libraries {
|
||||
// Phase 1: Discovery
|
||||
for library in &libraries {
|
||||
let library_id: Uuid = library.get("id");
|
||||
let root_path: String = library.get("root_path");
|
||||
let root_path = crate::utils::remap_libraries_path(&root_path);
|
||||
match scanner::scan_library(state, job_id, library_id, std::path::Path::new(&root_path), &mut stats, &mut total_processed_count, total_files, is_full_rebuild).await {
|
||||
match scanner::scan_library_discovery(
|
||||
state,
|
||||
job_id,
|
||||
library_id,
|
||||
std::path::Path::new(&root_path),
|
||||
&mut stats,
|
||||
&mut total_processed_count,
|
||||
total_files,
|
||||
is_full_rebuild,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
let err_str = err.to_string();
|
||||
if err_str.contains("cancelled") || err_str.contains("Cancelled") {
|
||||
return Err(err);
|
||||
}
|
||||
stats.errors += 1;
|
||||
error!(library_id = %library_id, error = %err, "library scan failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sync search index after discovery (books are visible immediately)
|
||||
meili::sync_meili(&state.pool, &state.meili_url, &state.meili_master_key).await?;
|
||||
|
||||
// Hand off to API for thumbnail checkup (API will set status = 'success' when done)
|
||||
// For full rebuild: clean up orphaned thumbnail files (old UUIDs)
|
||||
if is_full_rebuild {
|
||||
analyzer::cleanup_orphaned_thumbnails(state, target_library_id).await?;
|
||||
}
|
||||
|
||||
// Phase 2: Analysis (extract page_count + thumbnails for new/updated books)
|
||||
sqlx::query(
|
||||
"UPDATE index_jobs SET status = 'generating_thumbnails', stats_json = $2, current_file = NULL, processed_files = $3 WHERE id = $1",
|
||||
)
|
||||
@@ -271,23 +316,14 @@ pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Opti
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
let api_base = state.api_base_url.trim_end_matches('/');
|
||||
let url = format!("{}/index/jobs/{}/thumbnails/checkup", api_base, job_id);
|
||||
let client = reqwest::Client::new();
|
||||
let res = client
|
||||
.post(&url)
|
||||
.header("Authorization", format!("Bearer {}", state.api_bootstrap_token))
|
||||
.send()
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
tracing::warn!("[JOB] Failed to trigger thumbnail checkup: {} — API will not generate thumbnails for this job", e);
|
||||
} else if let Ok(r) = res {
|
||||
if !r.status().is_success() {
|
||||
tracing::warn!("[JOB] Thumbnail checkup returned {} — API may not generate thumbnails", r.status());
|
||||
} else {
|
||||
info!("[JOB] Thumbnail checkup started (job {}), API will complete the job", job_id);
|
||||
}
|
||||
}
|
||||
analyzer::analyze_library_books(state, job_id, target_library_id, false).await?;
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE index_jobs SET status = 'success', finished_at = NOW(), progress_percent = 100, current_file = NULL WHERE id = $1",
|
||||
)
|
||||
.bind(job_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user