fix: plusieurs correctifs jobs et analyzer
- cancel_job: ajouter 'extracting_pages' aux statuts annulables - cleanup_stale_jobs: couvrir 'extracting_pages' et 'generating_thumbnails' au redémarrage - analyzer: ne pas régénérer le thumbnail si déjà existant (skip sub-phase B) - analyzer: supprimer les dotfiles macOS (._*) encore en DB - SSE backoffice: réduire le spam de logs en cas d'API injoignable Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -182,7 +182,7 @@ pub async fn cancel_job(
|
|||||||
id: axum::extract::Path<Uuid>,
|
id: axum::extract::Path<Uuid>,
|
||||||
) -> Result<Json<IndexJobResponse>, ApiError> {
|
) -> Result<Json<IndexJobResponse>, ApiError> {
|
||||||
let rows_affected = sqlx::query(
|
let rows_affected = sqlx::query(
|
||||||
"UPDATE index_jobs SET status = 'cancelled' WHERE id = $1 AND status IN ('pending', 'running', 'generating_thumbnails')",
|
"UPDATE index_jobs SET status = 'cancelled' WHERE id = $1 AND status IN ('pending', 'running', 'extracting_pages', 'generating_thumbnails')",
|
||||||
)
|
)
|
||||||
.bind(id.0)
|
.bind(id.0)
|
||||||
.execute(&state.pool)
|
.execute(&state.pool)
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ export async function GET(
|
|||||||
|
|
||||||
let lastData: string | null = null;
|
let lastData: string | null = null;
|
||||||
let isActive = true;
|
let isActive = true;
|
||||||
|
let consecutiveErrors = 0;
|
||||||
|
|
||||||
const fetchJob = async () => {
|
const fetchJob = async () => {
|
||||||
if (!isActive) return;
|
if (!isActive) return;
|
||||||
@@ -25,6 +26,7 @@ export async function GET(
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (response.ok && isActive) {
|
if (response.ok && isActive) {
|
||||||
|
consecutiveErrors = 0;
|
||||||
const data = await response.json();
|
const data = await response.json();
|
||||||
const dataStr = JSON.stringify(data);
|
const dataStr = JSON.stringify(data);
|
||||||
|
|
||||||
@@ -54,7 +56,11 @@ export async function GET(
|
|||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (isActive) {
|
if (isActive) {
|
||||||
console.error("SSE fetch error:", error);
|
consecutiveErrors++;
|
||||||
|
// Only log first failure and every 60th to avoid spam
|
||||||
|
if (consecutiveErrors === 1 || consecutiveErrors % 60 === 0) {
|
||||||
|
console.warn(`SSE fetch error (${consecutiveErrors} consecutive):`, error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ export async function GET(request: NextRequest) {
|
|||||||
|
|
||||||
let lastData: string | null = null;
|
let lastData: string | null = null;
|
||||||
let isActive = true;
|
let isActive = true;
|
||||||
|
let consecutiveErrors = 0;
|
||||||
|
|
||||||
const fetchJobs = async () => {
|
const fetchJobs = async () => {
|
||||||
if (!isActive) return;
|
if (!isActive) return;
|
||||||
@@ -20,6 +21,7 @@ export async function GET(request: NextRequest) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (response.ok && isActive) {
|
if (response.ok && isActive) {
|
||||||
|
consecutiveErrors = 0;
|
||||||
const data = await response.json();
|
const data = await response.json();
|
||||||
const dataStr = JSON.stringify(data);
|
const dataStr = JSON.stringify(data);
|
||||||
|
|
||||||
@@ -38,7 +40,11 @@ export async function GET(request: NextRequest) {
|
|||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (isActive) {
|
if (isActive) {
|
||||||
console.error("SSE fetch error:", error);
|
consecutiveErrors++;
|
||||||
|
// Only log first failure and every 30th to avoid spam
|
||||||
|
if (consecutiveErrors === 1 || consecutiveErrors % 30 === 0) {
|
||||||
|
console.warn(`SSE fetch error (${consecutiveErrors} consecutive):`, error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -173,7 +173,7 @@ pub async fn analyze_library_books(
|
|||||||
|
|
||||||
let sql = format!(
|
let sql = format!(
|
||||||
r#"
|
r#"
|
||||||
SELECT b.id AS book_id, bf.abs_path, bf.format
|
SELECT b.id AS book_id, bf.abs_path, bf.format, (b.thumbnail_path IS NULL) AS needs_thumbnail
|
||||||
FROM books b
|
FROM books b
|
||||||
JOIN book_files bf ON bf.book_id = b.id
|
JOIN book_files bf ON bf.book_id = b.id
|
||||||
WHERE (b.library_id = $1 OR $1 IS NULL)
|
WHERE (b.library_id = $1 OR $1 IS NULL)
|
||||||
@@ -219,6 +219,7 @@ pub async fn analyze_library_books(
|
|||||||
book_id: Uuid,
|
book_id: Uuid,
|
||||||
abs_path: String,
|
abs_path: String,
|
||||||
format: String,
|
format: String,
|
||||||
|
needs_thumbnail: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
let tasks: Vec<BookTask> = rows
|
let tasks: Vec<BookTask> = rows
|
||||||
@@ -227,6 +228,7 @@ pub async fn analyze_library_books(
|
|||||||
book_id: row.get("book_id"),
|
book_id: row.get("book_id"),
|
||||||
abs_path: row.get("abs_path"),
|
abs_path: row.get("abs_path"),
|
||||||
format: row.get("format"),
|
format: row.get("format"),
|
||||||
|
needs_thumbnail: row.get("needs_thumbnail"),
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
@@ -245,7 +247,7 @@ pub async fn analyze_library_books(
|
|||||||
|
|
||||||
let extracted_count = Arc::new(AtomicI32::new(0));
|
let extracted_count = Arc::new(AtomicI32::new(0));
|
||||||
|
|
||||||
// Collected results: (book_id, raw_path, page_count)
|
// Collected results: (book_id, raw_path, page_count) — only books that need thumbnail generation
|
||||||
let extracted: Vec<(Uuid, String, i32)> = stream::iter(tasks)
|
let extracted: Vec<(Uuid, String, i32)> = stream::iter(tasks)
|
||||||
.map(|task| {
|
.map(|task| {
|
||||||
let pool = state.pool.clone();
|
let pool = state.pool.clone();
|
||||||
@@ -261,6 +263,28 @@ pub async fn analyze_library_books(
|
|||||||
let local_path = utils::remap_libraries_path(&task.abs_path);
|
let local_path = utils::remap_libraries_path(&task.abs_path);
|
||||||
let path = std::path::Path::new(&local_path);
|
let path = std::path::Path::new(&local_path);
|
||||||
let book_id = task.book_id;
|
let book_id = task.book_id;
|
||||||
|
let needs_thumbnail = task.needs_thumbnail;
|
||||||
|
|
||||||
|
// Remove macOS Apple Double resource fork files (._*) that were indexed before the scanner filter was added
|
||||||
|
if path
|
||||||
|
.file_name()
|
||||||
|
.and_then(|n| n.to_str())
|
||||||
|
.map(|n| n.starts_with("._"))
|
||||||
|
.unwrap_or(false)
|
||||||
|
{
|
||||||
|
warn!("[ANALYZER] Removing macOS resource fork from DB: {}", local_path);
|
||||||
|
let _ = sqlx::query("DELETE FROM book_files WHERE book_id = $1")
|
||||||
|
.bind(book_id)
|
||||||
|
.execute(&pool)
|
||||||
|
.await;
|
||||||
|
let _ = sqlx::query(
|
||||||
|
"DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)",
|
||||||
|
)
|
||||||
|
.bind(book_id)
|
||||||
|
.execute(&pool)
|
||||||
|
.await;
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
let format = match book_format_from_str(&task.format) {
|
let format = match book_format_from_str(&task.format) {
|
||||||
Some(f) => f,
|
Some(f) => f,
|
||||||
@@ -295,6 +319,29 @@ pub async fn analyze_library_books(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// If thumbnail already exists, just update page_count and skip thumbnail generation
|
||||||
|
if !needs_thumbnail {
|
||||||
|
if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2")
|
||||||
|
.bind(page_count)
|
||||||
|
.bind(book_id)
|
||||||
|
.execute(&pool)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
warn!("[ANALYZER] DB page_count update failed for book {}: {}", book_id, e);
|
||||||
|
}
|
||||||
|
let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1;
|
||||||
|
let percent = (processed as f64 / total as f64 * 50.0) as i32;
|
||||||
|
let _ = sqlx::query(
|
||||||
|
"UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1",
|
||||||
|
)
|
||||||
|
.bind(job_id)
|
||||||
|
.bind(processed)
|
||||||
|
.bind(percent)
|
||||||
|
.execute(&pool)
|
||||||
|
.await;
|
||||||
|
return None; // don't enqueue for thumbnail sub-phase
|
||||||
|
}
|
||||||
|
|
||||||
// Save raw bytes to disk (no resize, no encode)
|
// Save raw bytes to disk (no resize, no encode)
|
||||||
let raw_path = match tokio::task::spawn_blocking({
|
let raw_path = match tokio::task::spawn_blocking({
|
||||||
let dir = config.directory.clone();
|
let dir = config.directory.clone();
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ pub async fn cleanup_stale_jobs(pool: &PgPool) -> Result<()> {
|
|||||||
SET status = 'failed',
|
SET status = 'failed',
|
||||||
finished_at = NOW(),
|
finished_at = NOW(),
|
||||||
error_opt = 'Job interrupted by indexer restart'
|
error_opt = 'Job interrupted by indexer restart'
|
||||||
WHERE status = 'running'
|
WHERE status IN ('running', 'extracting_pages', 'generating_thumbnails')
|
||||||
AND started_at < NOW() - INTERVAL '5 minutes'
|
AND started_at < NOW() - INTERVAL '5 minutes'
|
||||||
RETURNING id
|
RETURNING id
|
||||||
"#,
|
"#,
|
||||||
|
|||||||
Reference in New Issue
Block a user