diff --git a/apps/api/src/index_jobs.rs b/apps/api/src/index_jobs.rs index eddb1b1..0800543 100644 --- a/apps/api/src/index_jobs.rs +++ b/apps/api/src/index_jobs.rs @@ -182,7 +182,7 @@ pub async fn cancel_job( id: axum::extract::Path, ) -> Result, ApiError> { let rows_affected = sqlx::query( - "UPDATE index_jobs SET status = 'cancelled' WHERE id = $1 AND status IN ('pending', 'running', 'generating_thumbnails')", + "UPDATE index_jobs SET status = 'cancelled' WHERE id = $1 AND status IN ('pending', 'running', 'extracting_pages', 'generating_thumbnails')", ) .bind(id.0) .execute(&state.pool) diff --git a/apps/backoffice/app/api/jobs/[id]/stream/route.ts b/apps/backoffice/app/api/jobs/[id]/stream/route.ts index a9bae3e..a5b4637 100644 --- a/apps/backoffice/app/api/jobs/[id]/stream/route.ts +++ b/apps/backoffice/app/api/jobs/[id]/stream/route.ts @@ -15,19 +15,21 @@ export async function GET( let lastData: string | null = null; let isActive = true; - + let consecutiveErrors = 0; + const fetchJob = async () => { if (!isActive) return; - + try { const response = await fetch(`${baseUrl}/index/jobs/${id}`, { headers: { Authorization: `Bearer ${token}` }, }); - + if (response.ok && isActive) { + consecutiveErrors = 0; const data = await response.json(); const dataStr = JSON.stringify(data); - + // Only send if data changed if (dataStr !== lastData && isActive) { lastData = dataStr; @@ -40,7 +42,7 @@ export async function GET( isActive = false; return; } - + // Stop polling if job is complete if (data.status === "success" || data.status === "failed" || data.status === "cancelled") { isActive = false; @@ -54,7 +56,11 @@ export async function GET( } } catch (error) { if (isActive) { - console.error("SSE fetch error:", error); + consecutiveErrors++; + // Only log first failure and every 60th to avoid spam + if (consecutiveErrors === 1 || consecutiveErrors % 60 === 0) { + console.warn(`SSE fetch error (${consecutiveErrors} consecutive):`, error); + } } } }; diff --git a/apps/backoffice/app/api/jobs/stream/route.ts b/apps/backoffice/app/api/jobs/stream/route.ts index f0a4415..04cb4cb 100644 --- a/apps/backoffice/app/api/jobs/stream/route.ts +++ b/apps/backoffice/app/api/jobs/stream/route.ts @@ -10,19 +10,21 @@ export async function GET(request: NextRequest) { let lastData: string | null = null; let isActive = true; - + let consecutiveErrors = 0; + const fetchJobs = async () => { if (!isActive) return; - + try { const response = await fetch(`${baseUrl}/index/status`, { headers: { Authorization: `Bearer ${token}` }, }); - + if (response.ok && isActive) { + consecutiveErrors = 0; const data = await response.json(); const dataStr = JSON.stringify(data); - + // Send if data changed if (dataStr !== lastData && isActive) { lastData = dataStr; @@ -38,7 +40,11 @@ export async function GET(request: NextRequest) { } } catch (error) { if (isActive) { - console.error("SSE fetch error:", error); + consecutiveErrors++; + // Only log first failure and every 30th to avoid spam + if (consecutiveErrors === 1 || consecutiveErrors % 30 === 0) { + console.warn(`SSE fetch error (${consecutiveErrors} consecutive):`, error); + } } } }; diff --git a/apps/indexer/src/analyzer.rs b/apps/indexer/src/analyzer.rs index 38cc823..2ef1ff4 100644 --- a/apps/indexer/src/analyzer.rs +++ b/apps/indexer/src/analyzer.rs @@ -173,7 +173,7 @@ pub async fn analyze_library_books( let sql = format!( r#" - SELECT b.id AS book_id, bf.abs_path, bf.format + SELECT b.id AS book_id, bf.abs_path, bf.format, (b.thumbnail_path IS NULL) AS needs_thumbnail FROM books b JOIN book_files bf ON bf.book_id = b.id WHERE (b.library_id = $1 OR $1 IS NULL) @@ -219,6 +219,7 @@ pub async fn analyze_library_books( book_id: Uuid, abs_path: String, format: String, + needs_thumbnail: bool, } let tasks: Vec = rows @@ -227,6 +228,7 @@ pub async fn analyze_library_books( book_id: row.get("book_id"), abs_path: row.get("abs_path"), format: row.get("format"), + needs_thumbnail: row.get("needs_thumbnail"), }) .collect(); @@ -245,7 +247,7 @@ pub async fn analyze_library_books( let extracted_count = Arc::new(AtomicI32::new(0)); - // Collected results: (book_id, raw_path, page_count) + // Collected results: (book_id, raw_path, page_count) — only books that need thumbnail generation let extracted: Vec<(Uuid, String, i32)> = stream::iter(tasks) .map(|task| { let pool = state.pool.clone(); @@ -261,6 +263,28 @@ pub async fn analyze_library_books( let local_path = utils::remap_libraries_path(&task.abs_path); let path = std::path::Path::new(&local_path); let book_id = task.book_id; + let needs_thumbnail = task.needs_thumbnail; + + // Remove macOS Apple Double resource fork files (._*) that were indexed before the scanner filter was added + if path + .file_name() + .and_then(|n| n.to_str()) + .map(|n| n.starts_with("._")) + .unwrap_or(false) + { + warn!("[ANALYZER] Removing macOS resource fork from DB: {}", local_path); + let _ = sqlx::query("DELETE FROM book_files WHERE book_id = $1") + .bind(book_id) + .execute(&pool) + .await; + let _ = sqlx::query( + "DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)", + ) + .bind(book_id) + .execute(&pool) + .await; + return None; + } let format = match book_format_from_str(&task.format) { Some(f) => f, @@ -295,6 +319,29 @@ pub async fn analyze_library_books( } }; + // If thumbnail already exists, just update page_count and skip thumbnail generation + if !needs_thumbnail { + if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2") + .bind(page_count) + .bind(book_id) + .execute(&pool) + .await + { + warn!("[ANALYZER] DB page_count update failed for book {}: {}", book_id, e); + } + let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1; + let percent = (processed as f64 / total as f64 * 50.0) as i32; + let _ = sqlx::query( + "UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1", + ) + .bind(job_id) + .bind(processed) + .bind(percent) + .execute(&pool) + .await; + return None; // don't enqueue for thumbnail sub-phase + } + // Save raw bytes to disk (no resize, no encode) let raw_path = match tokio::task::spawn_blocking({ let dir = config.directory.clone(); diff --git a/apps/indexer/src/job.rs b/apps/indexer/src/job.rs index 1b38126..c1758b0 100644 --- a/apps/indexer/src/job.rs +++ b/apps/indexer/src/job.rs @@ -13,7 +13,7 @@ pub async fn cleanup_stale_jobs(pool: &PgPool) -> Result<()> { SET status = 'failed', finished_at = NOW(), error_opt = 'Job interrupted by indexer restart' - WHERE status = 'running' + WHERE status IN ('running', 'extracting_pages', 'generating_thumbnails') AND started_at < NOW() - INTERVAL '5 minutes' RETURNING id "#,