fix: add API job poller to process scheduler-created metadata jobs
All checks were successful
Deploy with Docker Compose / deploy (push) Successful in 1m12s
All checks were successful
Deploy with Docker Compose / deploy (push) Successful in 1m12s
The scheduler (indexer) created metadata_refresh/metadata_batch jobs in DB, but the indexer excluded them (API_ONLY_JOB_TYPES) and the API only processed jobs created via its REST endpoints. Scheduler-created jobs stayed pending forever. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
134
apps/api/src/job_poller.rs
Normal file
134
apps/api/src/job_poller.rs
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use sqlx::{PgPool, Row};
|
||||||
|
use tracing::{error, info, trace};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::{metadata_batch, metadata_refresh};
|
||||||
|
|
||||||
|
/// Poll for pending API-only jobs (`metadata_batch`, `metadata_refresh`) and process them.
|
||||||
|
/// This mirrors the indexer's worker loop but for job types handled by the API.
|
||||||
|
pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) {
|
||||||
|
let wait = Duration::from_secs(interval_seconds.max(1));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match claim_next_api_job(&pool).await {
|
||||||
|
Ok(Some((job_id, job_type, library_id))) => {
|
||||||
|
info!("[JOB_POLLER] Claimed {job_type} job {job_id} library={library_id}");
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
let library_name: Option<String> =
|
||||||
|
sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1")
|
||||||
|
.bind(library_id)
|
||||||
|
.fetch_optional(&pool)
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.flatten();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let result = match job_type.as_str() {
|
||||||
|
"metadata_refresh" => {
|
||||||
|
metadata_refresh::process_metadata_refresh(
|
||||||
|
&pool_clone,
|
||||||
|
job_id,
|
||||||
|
library_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
"metadata_batch" => {
|
||||||
|
metadata_batch::process_metadata_batch(
|
||||||
|
&pool_clone,
|
||||||
|
job_id,
|
||||||
|
library_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
_ => Err(format!("Unknown API job type: {job_type}")),
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = result {
|
||||||
|
error!("[JOB_POLLER] {job_type} job {job_id} failed: {e}");
|
||||||
|
let _ = sqlx::query(
|
||||||
|
"UPDATE index_jobs SET status = 'failed', error_opt = $2, finished_at = NOW() WHERE id = $1",
|
||||||
|
)
|
||||||
|
.bind(job_id)
|
||||||
|
.bind(e.to_string())
|
||||||
|
.execute(&pool_clone)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match job_type.as_str() {
|
||||||
|
"metadata_refresh" => {
|
||||||
|
notifications::notify(
|
||||||
|
pool_clone,
|
||||||
|
notifications::NotificationEvent::MetadataRefreshFailed {
|
||||||
|
library_name,
|
||||||
|
error: e.to_string(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
"metadata_batch" => {
|
||||||
|
notifications::notify(
|
||||||
|
pool_clone,
|
||||||
|
notifications::NotificationEvent::MetadataBatchFailed {
|
||||||
|
library_name,
|
||||||
|
error: e.to_string(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
trace!("[JOB_POLLER] No pending API jobs, waiting...");
|
||||||
|
tokio::time::sleep(wait).await;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("[JOB_POLLER] Error claiming job: {err}");
|
||||||
|
tokio::time::sleep(wait).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const API_JOB_TYPES: &[&str] = &["metadata_batch", "metadata_refresh"];
|
||||||
|
|
||||||
|
async fn claim_next_api_job(pool: &PgPool) -> Result<Option<(Uuid, String, Uuid)>, sqlx::Error> {
|
||||||
|
let mut tx = pool.begin().await?;
|
||||||
|
|
||||||
|
let row = sqlx::query(
|
||||||
|
r#"
|
||||||
|
SELECT id, type, library_id
|
||||||
|
FROM index_jobs
|
||||||
|
WHERE status = 'pending'
|
||||||
|
AND type = ANY($1)
|
||||||
|
AND library_id IS NOT NULL
|
||||||
|
ORDER BY created_at ASC
|
||||||
|
FOR UPDATE SKIP LOCKED
|
||||||
|
LIMIT 1
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(API_JOB_TYPES)
|
||||||
|
.fetch_optional(&mut *tx)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let Some(row) = row else {
|
||||||
|
tx.commit().await?;
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
let id: Uuid = row.get("id");
|
||||||
|
let job_type: String = row.get("type");
|
||||||
|
let library_id: Uuid = row.get("library_id");
|
||||||
|
|
||||||
|
sqlx::query(
|
||||||
|
"UPDATE index_jobs SET status = 'running', started_at = NOW(), error_opt = NULL WHERE id = $1",
|
||||||
|
)
|
||||||
|
.bind(id)
|
||||||
|
.execute(&mut *tx)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
tx.commit().await?;
|
||||||
|
Ok(Some((id, job_type, library_id)))
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ mod books;
|
|||||||
mod error;
|
mod error;
|
||||||
mod handlers;
|
mod handlers;
|
||||||
mod index_jobs;
|
mod index_jobs;
|
||||||
|
mod job_poller;
|
||||||
mod komga;
|
mod komga;
|
||||||
mod libraries;
|
mod libraries;
|
||||||
mod metadata;
|
mod metadata;
|
||||||
@@ -159,6 +160,9 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
auth::require_read,
|
auth::require_read,
|
||||||
));
|
));
|
||||||
|
|
||||||
|
// Clone pool before state is moved into the router
|
||||||
|
let poller_pool = state.pool.clone();
|
||||||
|
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.route("/health", get(handlers::health))
|
.route("/health", get(handlers::health))
|
||||||
.route("/ready", get(handlers::ready))
|
.route("/ready", get(handlers::ready))
|
||||||
@@ -170,6 +174,11 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.layer(middleware::from_fn_with_state(state.clone(), api_middleware::request_counter))
|
.layer(middleware::from_fn_with_state(state.clone(), api_middleware::request_counter))
|
||||||
.with_state(state);
|
.with_state(state);
|
||||||
|
|
||||||
|
// Start background poller for API-only jobs (metadata_batch, metadata_refresh)
|
||||||
|
tokio::spawn(async move {
|
||||||
|
job_poller::run_job_poller(poller_pool, 5).await;
|
||||||
|
});
|
||||||
|
|
||||||
let listener = tokio::net::TcpListener::bind(&config.listen_addr).await?;
|
let listener = tokio::net::TcpListener::bind(&config.listen_addr).await?;
|
||||||
info!(addr = %config.listen_addr, "api listening");
|
info!(addr = %config.listen_addr, "api listening");
|
||||||
axum::serve(listener, app).await?;
|
axum::serve(listener, app).await?;
|
||||||
|
|||||||
@@ -115,14 +115,14 @@ pub async fn start_batch(
|
|||||||
|
|
||||||
let job_id = Uuid::new_v4();
|
let job_id = Uuid::new_v4();
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
"INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, 'metadata_batch', 'pending')",
|
"INSERT INTO index_jobs (id, library_id, type, status, started_at) VALUES ($1, $2, 'metadata_batch', 'running', NOW())",
|
||||||
)
|
)
|
||||||
.bind(job_id)
|
.bind(job_id)
|
||||||
.bind(library_id)
|
.bind(library_id)
|
||||||
.execute(&state.pool)
|
.execute(&state.pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Spawn the background processing task
|
// Spawn the background processing task (status already 'running' to avoid poller race)
|
||||||
let pool = state.pool.clone();
|
let pool = state.pool.clone();
|
||||||
let library_name: Option<String> = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1")
|
let library_name: Option<String> = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1")
|
||||||
.bind(library_id)
|
.bind(library_id)
|
||||||
@@ -313,7 +313,7 @@ pub async fn get_batch_results(
|
|||||||
// Background processing
|
// Background processing
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
async fn process_metadata_batch(
|
pub(crate) async fn process_metadata_batch(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
job_id: Uuid,
|
job_id: Uuid,
|
||||||
library_id: Uuid,
|
library_id: Uuid,
|
||||||
|
|||||||
@@ -124,14 +124,14 @@ pub async fn start_refresh(
|
|||||||
|
|
||||||
let job_id = Uuid::new_v4();
|
let job_id = Uuid::new_v4();
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
"INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, 'metadata_refresh', 'pending')",
|
"INSERT INTO index_jobs (id, library_id, type, status, started_at) VALUES ($1, $2, 'metadata_refresh', 'running', NOW())",
|
||||||
)
|
)
|
||||||
.bind(job_id)
|
.bind(job_id)
|
||||||
.bind(library_id)
|
.bind(library_id)
|
||||||
.execute(&state.pool)
|
.execute(&state.pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Spawn the background processing task
|
// Spawn the background processing task (status already 'running' to avoid poller race)
|
||||||
let pool = state.pool.clone();
|
let pool = state.pool.clone();
|
||||||
let library_name: Option<String> = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1")
|
let library_name: Option<String> = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1")
|
||||||
.bind(library_id)
|
.bind(library_id)
|
||||||
@@ -222,7 +222,7 @@ pub async fn get_refresh_report(
|
|||||||
// Background processing
|
// Background processing
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
async fn process_metadata_refresh(
|
pub(crate) async fn process_metadata_refresh(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
job_id: Uuid,
|
job_id: Uuid,
|
||||||
library_id: Uuid,
|
library_id: Uuid,
|
||||||
|
|||||||
Reference in New Issue
Block a user