From 0c42a9ed04205fa60cb9e1c973163b22819f0e58 Mon Sep 17 00:00:00 2001 From: Froidefond Julien Date: Sun, 22 Mar 2026 21:05:42 +0100 Subject: [PATCH] fix: add API job poller to process scheduler-created metadata jobs The scheduler (indexer) created metadata_refresh/metadata_batch jobs in DB, but the indexer excluded them (API_ONLY_JOB_TYPES) and the API only processed jobs created via its REST endpoints. Scheduler-created jobs stayed pending forever. Co-Authored-By: Claude Opus 4.6 --- apps/api/src/job_poller.rs | 134 +++++++++++++++++++++++++++++++ apps/api/src/main.rs | 9 +++ apps/api/src/metadata_batch.rs | 6 +- apps/api/src/metadata_refresh.rs | 6 +- 4 files changed, 149 insertions(+), 6 deletions(-) create mode 100644 apps/api/src/job_poller.rs diff --git a/apps/api/src/job_poller.rs b/apps/api/src/job_poller.rs new file mode 100644 index 0000000..43c3d07 --- /dev/null +++ b/apps/api/src/job_poller.rs @@ -0,0 +1,134 @@ +use std::time::Duration; + +use sqlx::{PgPool, Row}; +use tracing::{error, info, trace}; +use uuid::Uuid; + +use crate::{metadata_batch, metadata_refresh}; + +/// Poll for pending API-only jobs (`metadata_batch`, `metadata_refresh`) and process them. +/// This mirrors the indexer's worker loop but for job types handled by the API. +pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) { + let wait = Duration::from_secs(interval_seconds.max(1)); + + loop { + match claim_next_api_job(&pool).await { + Ok(Some((job_id, job_type, library_id))) => { + info!("[JOB_POLLER] Claimed {job_type} job {job_id} library={library_id}"); + + let pool_clone = pool.clone(); + let library_name: Option = + sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") + .bind(library_id) + .fetch_optional(&pool) + .await + .ok() + .flatten(); + + tokio::spawn(async move { + let result = match job_type.as_str() { + "metadata_refresh" => { + metadata_refresh::process_metadata_refresh( + &pool_clone, + job_id, + library_id, + ) + .await + } + "metadata_batch" => { + metadata_batch::process_metadata_batch( + &pool_clone, + job_id, + library_id, + ) + .await + } + _ => Err(format!("Unknown API job type: {job_type}")), + }; + + if let Err(e) = result { + error!("[JOB_POLLER] {job_type} job {job_id} failed: {e}"); + let _ = sqlx::query( + "UPDATE index_jobs SET status = 'failed', error_opt = $2, finished_at = NOW() WHERE id = $1", + ) + .bind(job_id) + .bind(e.to_string()) + .execute(&pool_clone) + .await; + + match job_type.as_str() { + "metadata_refresh" => { + notifications::notify( + pool_clone, + notifications::NotificationEvent::MetadataRefreshFailed { + library_name, + error: e.to_string(), + }, + ); + } + "metadata_batch" => { + notifications::notify( + pool_clone, + notifications::NotificationEvent::MetadataBatchFailed { + library_name, + error: e.to_string(), + }, + ); + } + _ => {} + } + } + }); + } + Ok(None) => { + trace!("[JOB_POLLER] No pending API jobs, waiting..."); + tokio::time::sleep(wait).await; + } + Err(err) => { + error!("[JOB_POLLER] Error claiming job: {err}"); + tokio::time::sleep(wait).await; + } + } + } +} + +const API_JOB_TYPES: &[&str] = &["metadata_batch", "metadata_refresh"]; + +async fn claim_next_api_job(pool: &PgPool) -> Result, sqlx::Error> { + let mut tx = pool.begin().await?; + + let row = sqlx::query( + r#" + SELECT id, type, library_id + FROM index_jobs + WHERE status = 'pending' + AND type = ANY($1) + AND library_id IS NOT NULL + ORDER BY created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + "#, + ) + .bind(API_JOB_TYPES) + .fetch_optional(&mut *tx) + .await?; + + let Some(row) = row else { + tx.commit().await?; + return Ok(None); + }; + + let id: Uuid = row.get("id"); + let job_type: String = row.get("type"); + let library_id: Uuid = row.get("library_id"); + + sqlx::query( + "UPDATE index_jobs SET status = 'running', started_at = NOW(), error_opt = NULL WHERE id = $1", + ) + .bind(id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(Some((id, job_type, library_id))) +} diff --git a/apps/api/src/main.rs b/apps/api/src/main.rs index a111043..180fa64 100644 --- a/apps/api/src/main.rs +++ b/apps/api/src/main.rs @@ -4,6 +4,7 @@ mod books; mod error; mod handlers; mod index_jobs; +mod job_poller; mod komga; mod libraries; mod metadata; @@ -159,6 +160,9 @@ async fn main() -> anyhow::Result<()> { auth::require_read, )); + // Clone pool before state is moved into the router + let poller_pool = state.pool.clone(); + let app = Router::new() .route("/health", get(handlers::health)) .route("/ready", get(handlers::ready)) @@ -170,6 +174,11 @@ async fn main() -> anyhow::Result<()> { .layer(middleware::from_fn_with_state(state.clone(), api_middleware::request_counter)) .with_state(state); + // Start background poller for API-only jobs (metadata_batch, metadata_refresh) + tokio::spawn(async move { + job_poller::run_job_poller(poller_pool, 5).await; + }); + let listener = tokio::net::TcpListener::bind(&config.listen_addr).await?; info!(addr = %config.listen_addr, "api listening"); axum::serve(listener, app).await?; diff --git a/apps/api/src/metadata_batch.rs b/apps/api/src/metadata_batch.rs index fff4a2b..cb12104 100644 --- a/apps/api/src/metadata_batch.rs +++ b/apps/api/src/metadata_batch.rs @@ -115,14 +115,14 @@ pub async fn start_batch( let job_id = Uuid::new_v4(); sqlx::query( - "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, 'metadata_batch', 'pending')", + "INSERT INTO index_jobs (id, library_id, type, status, started_at) VALUES ($1, $2, 'metadata_batch', 'running', NOW())", ) .bind(job_id) .bind(library_id) .execute(&state.pool) .await?; - // Spawn the background processing task + // Spawn the background processing task (status already 'running' to avoid poller race) let pool = state.pool.clone(); let library_name: Option = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") .bind(library_id) @@ -313,7 +313,7 @@ pub async fn get_batch_results( // Background processing // --------------------------------------------------------------------------- -async fn process_metadata_batch( +pub(crate) async fn process_metadata_batch( pool: &PgPool, job_id: Uuid, library_id: Uuid, diff --git a/apps/api/src/metadata_refresh.rs b/apps/api/src/metadata_refresh.rs index 5cfc9b4..b77ca4f 100644 --- a/apps/api/src/metadata_refresh.rs +++ b/apps/api/src/metadata_refresh.rs @@ -124,14 +124,14 @@ pub async fn start_refresh( let job_id = Uuid::new_v4(); sqlx::query( - "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, 'metadata_refresh', 'pending')", + "INSERT INTO index_jobs (id, library_id, type, status, started_at) VALUES ($1, $2, 'metadata_refresh', 'running', NOW())", ) .bind(job_id) .bind(library_id) .execute(&state.pool) .await?; - // Spawn the background processing task + // Spawn the background processing task (status already 'running' to avoid poller race) let pool = state.pool.clone(); let library_name: Option = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") .bind(library_id) @@ -222,7 +222,7 @@ pub async fn get_refresh_report( // Background processing // --------------------------------------------------------------------------- -async fn process_metadata_refresh( +pub(crate) async fn process_metadata_refresh( pool: &PgPool, job_id: Uuid, library_id: Uuid,