From 10cc69e53fbc9774645b603aac765e5beeb8cc6c Mon Sep 17 00:00:00 2001 From: Froidefond Julien Date: Wed, 25 Mar 2026 10:30:04 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20add=20reading=5Fstatus=5Fpush=20job=20?= =?UTF-8?q?=E2=80=94=20differential=20push=20to=20AniList?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Push reading statuses (PLANNING/CURRENT/COMPLETED) to AniList for all linked series that changed since last sync, or have new books/no sync yet. - Migration 0057: adds reading_status_push to index_jobs type constraint - Migration 0058: creates reading_status_push_results table (pushed/skipped/no_books/error) - API: new reading_status_push module with start_push, get_push_report, get_push_results - Differential detection: synced_at IS NULL OR reading progress updated OR new books added - Same 429 retry logic as reading_status_match (wait 10s, retry once, abort on 2nd 429) - Notifications: ReadingStatusPushCompleted/Failed events - Backoffice: push button in reading status group, job detail report with per-series list - Replay support, badge label, i18n (FR + EN) Co-Authored-By: Claude Sonnet 4.6 --- apps/api/src/main.rs | 4 + apps/api/src/reading_status_push.rs | 642 ++++++++++++++++++ apps/backoffice/app/(app)/jobs/[id]/page.tsx | 119 +++- apps/backoffice/app/(app)/jobs/page.tsx | 42 +- .../app/api/jobs/[id]/replay/route.ts | 5 +- apps/backoffice/app/components/JobRow.tsx | 2 +- apps/backoffice/app/components/ui/Badge.tsx | 1 + apps/backoffice/lib/api.ts | 37 + apps/backoffice/lib/i18n/en.ts | 11 + apps/backoffice/lib/i18n/fr.ts | 11 + crates/notifications/src/lib.rs | 49 ++ .../0057_add_reading_status_push_job_type.sql | 4 + .../0058_add_reading_status_push_results.sql | 19 + 13 files changed, 939 insertions(+), 7 deletions(-) create mode 100644 apps/api/src/reading_status_push.rs create mode 100644 infra/migrations/0057_add_reading_status_push_job_type.sql create mode 100644 infra/migrations/0058_add_reading_status_push_results.sql diff --git a/apps/api/src/main.rs b/apps/api/src/main.rs index e1ee5f1..083b87e 100644 --- a/apps/api/src/main.rs +++ b/apps/api/src/main.rs @@ -19,6 +19,7 @@ mod prowlarr; mod qbittorrent; mod reading_progress; mod reading_status_match; +mod reading_status_push; mod search; mod series; mod settings; @@ -149,6 +150,9 @@ async fn main() -> anyhow::Result<()> { .route("/reading-status/match", axum::routing::post(reading_status_match::start_match)) .route("/reading-status/match/:id/report", get(reading_status_match::get_match_report)) .route("/reading-status/match/:id/results", get(reading_status_match::get_match_results)) + .route("/reading-status/push", axum::routing::post(reading_status_push::start_push)) + .route("/reading-status/push/:id/report", get(reading_status_push::get_push_report)) + .route("/reading-status/push/:id/results", get(reading_status_push::get_push_results)) .merge(settings::settings_routes()) .route_layer(middleware::from_fn_with_state( state.clone(), diff --git a/apps/api/src/reading_status_push.rs b/apps/api/src/reading_status_push.rs new file mode 100644 index 0000000..308b494 --- /dev/null +++ b/apps/api/src/reading_status_push.rs @@ -0,0 +1,642 @@ +use axum::{extract::State, Json}; +use serde::{Deserialize, Serialize}; +use sqlx::{PgPool, Row}; +use std::time::Duration; +use tracing::{info, warn}; +use utoipa::ToSchema; +use uuid::Uuid; + +use crate::{anilist, error::ApiError, state::AppState}; + +// --------------------------------------------------------------------------- +// DTOs +// --------------------------------------------------------------------------- + +#[derive(Deserialize, ToSchema)] +pub struct ReadingStatusPushRequest { + pub library_id: String, +} + +#[derive(Serialize, ToSchema)] +pub struct ReadingStatusPushReportDto { + #[schema(value_type = String)] + pub job_id: Uuid, + pub status: String, + pub total_series: i64, + pub pushed: i64, + pub skipped: i64, + pub no_books: i64, + pub errors: i64, +} + +#[derive(Serialize, ToSchema)] +pub struct ReadingStatusPushResultDto { + #[schema(value_type = String)] + pub id: Uuid, + pub series_name: String, + /// 'pushed' | 'skipped' | 'no_books' | 'error' + pub status: String, + pub anilist_id: Option, + pub anilist_title: Option, + pub anilist_url: Option, + /// PLANNING | CURRENT | COMPLETED + pub anilist_status: Option, + pub progress_volumes: Option, + pub error_message: Option, +} + +// --------------------------------------------------------------------------- +// POST /reading-status/push — Trigger a reading status push job +// --------------------------------------------------------------------------- + +#[utoipa::path( + post, + path = "/reading-status/push", + tag = "reading_status", + request_body = ReadingStatusPushRequest, + responses( + (status = 200, description = "Job created"), + (status = 400, description = "Bad request"), + ), + security(("Bearer" = [])) +)] +pub async fn start_push( + State(state): State, + Json(body): Json, +) -> Result, ApiError> { + let library_id: Uuid = body + .library_id + .parse() + .map_err(|_| ApiError::bad_request("invalid library_id"))?; + + // Verify library exists and has AniList configured + let lib_row = sqlx::query("SELECT reading_status_provider FROM libraries WHERE id = $1") + .bind(library_id) + .fetch_optional(&state.pool) + .await? + .ok_or_else(|| ApiError::not_found("library not found"))?; + + let provider: Option = lib_row.get("reading_status_provider"); + if provider.as_deref() != Some("anilist") { + return Err(ApiError::bad_request( + "This library has no AniList reading status provider configured", + )); + } + + // Check AniList is configured globally with a local_user_id + let (_, _, local_user_id) = anilist::load_anilist_settings(&state.pool).await?; + if local_user_id.is_none() { + return Err(ApiError::bad_request( + "AniList local_user_id not configured — required for reading status push", + )); + } + + // Check no existing running job for this library + let existing: Option = sqlx::query_scalar( + "SELECT id FROM index_jobs WHERE library_id = $1 AND type = 'reading_status_push' AND status IN ('pending', 'running') LIMIT 1", + ) + .bind(library_id) + .fetch_optional(&state.pool) + .await?; + + if let Some(existing_id) = existing { + return Ok(Json(serde_json::json!({ + "id": existing_id.to_string(), + "status": "already_running", + }))); + } + + let job_id = Uuid::new_v4(); + sqlx::query( + "INSERT INTO index_jobs (id, library_id, type, status, started_at) VALUES ($1, $2, 'reading_status_push', 'running', NOW())", + ) + .bind(job_id) + .bind(library_id) + .execute(&state.pool) + .await?; + + let pool = state.pool.clone(); + let library_name: Option = + sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") + .bind(library_id) + .fetch_optional(&state.pool) + .await + .ok() + .flatten(); + + tokio::spawn(async move { + if let Err(e) = process_reading_status_push(&pool, job_id, library_id).await { + warn!("[READING_STATUS_PUSH] job {job_id} failed: {e}"); + let _ = sqlx::query( + "UPDATE index_jobs SET status = 'failed', error_opt = $2, finished_at = NOW() WHERE id = $1", + ) + .bind(job_id) + .bind(e.to_string()) + .execute(&pool) + .await; + notifications::notify( + pool.clone(), + notifications::NotificationEvent::ReadingStatusPushFailed { + library_name, + error: e.to_string(), + }, + ); + } + }); + + Ok(Json(serde_json::json!({ + "id": job_id.to_string(), + "status": "running", + }))) +} + +// --------------------------------------------------------------------------- +// GET /reading-status/push/:id/report +// --------------------------------------------------------------------------- + +#[utoipa::path( + get, + path = "/reading-status/push/{id}/report", + tag = "reading_status", + params(("id" = String, Path, description = "Job UUID")), + responses( + (status = 200, body = ReadingStatusPushReportDto), + (status = 404, description = "Job not found"), + ), + security(("Bearer" = [])) +)] +pub async fn get_push_report( + State(state): State, + axum::extract::Path(job_id): axum::extract::Path, +) -> Result, ApiError> { + let row = sqlx::query( + "SELECT status, total_files FROM index_jobs WHERE id = $1 AND type = 'reading_status_push'", + ) + .bind(job_id) + .fetch_optional(&state.pool) + .await? + .ok_or_else(|| ApiError::not_found("job not found"))?; + + let job_status: String = row.get("status"); + let total_files: Option = row.get("total_files"); + + let counts = sqlx::query( + "SELECT status, COUNT(*) as cnt FROM reading_status_push_results WHERE job_id = $1 GROUP BY status", + ) + .bind(job_id) + .fetch_all(&state.pool) + .await?; + + let mut pushed = 0i64; + let mut skipped = 0i64; + let mut no_books = 0i64; + let mut errors = 0i64; + + for r in &counts { + let status: String = r.get("status"); + let cnt: i64 = r.get("cnt"); + match status.as_str() { + "pushed" => pushed = cnt, + "skipped" => skipped = cnt, + "no_books" => no_books = cnt, + "error" => errors = cnt, + _ => {} + } + } + + Ok(Json(ReadingStatusPushReportDto { + job_id, + status: job_status, + total_series: total_files.unwrap_or(0) as i64, + pushed, + skipped, + no_books, + errors, + })) +} + +// --------------------------------------------------------------------------- +// GET /reading-status/push/:id/results +// --------------------------------------------------------------------------- + +#[derive(Deserialize)] +pub struct PushResultsQuery { + pub status: Option, +} + +#[utoipa::path( + get, + path = "/reading-status/push/{id}/results", + tag = "reading_status", + params( + ("id" = String, Path, description = "Job UUID"), + ("status" = Option, Query, description = "Filter by status"), + ), + responses( + (status = 200, body = Vec), + ), + security(("Bearer" = [])) +)] +pub async fn get_push_results( + State(state): State, + axum::extract::Path(job_id): axum::extract::Path, + axum::extract::Query(query): axum::extract::Query, +) -> Result>, ApiError> { + let rows = if let Some(status_filter) = &query.status { + sqlx::query( + "SELECT id, series_name, status, anilist_id, anilist_title, anilist_url, anilist_status, progress_volumes, error_message + FROM reading_status_push_results + WHERE job_id = $1 AND status = $2 + ORDER BY series_name", + ) + .bind(job_id) + .bind(status_filter) + .fetch_all(&state.pool) + .await? + } else { + sqlx::query( + "SELECT id, series_name, status, anilist_id, anilist_title, anilist_url, anilist_status, progress_volumes, error_message + FROM reading_status_push_results + WHERE job_id = $1 + ORDER BY status, series_name", + ) + .bind(job_id) + .fetch_all(&state.pool) + .await? + }; + + let results = rows + .iter() + .map(|row| ReadingStatusPushResultDto { + id: row.get("id"), + series_name: row.get("series_name"), + status: row.get("status"), + anilist_id: row.get("anilist_id"), + anilist_title: row.get("anilist_title"), + anilist_url: row.get("anilist_url"), + anilist_status: row.get("anilist_status"), + progress_volumes: row.get("progress_volumes"), + error_message: row.get("error_message"), + }) + .collect(); + + Ok(Json(results)) +} + +// --------------------------------------------------------------------------- +// Background processing +// --------------------------------------------------------------------------- + +struct SeriesInfo { + series_name: String, + anilist_id: i32, + anilist_title: Option, + anilist_url: Option, +} + +async fn process_reading_status_push( + pool: &PgPool, + job_id: Uuid, + library_id: Uuid, +) -> Result<(), String> { + let (token, _, local_user_id_opt) = anilist::load_anilist_settings(pool) + .await + .map_err(|e| e.message)?; + + let local_user_id = local_user_id_opt + .ok_or_else(|| "AniList local_user_id not configured".to_string())?; + + // Find all linked series that need a push (differential) + let series_to_push: Vec = sqlx::query( + r#" + SELECT + asl.series_name, + asl.anilist_id, + asl.anilist_title, + asl.anilist_url + FROM anilist_series_links asl + WHERE asl.library_id = $1 + AND asl.anilist_id IS NOT NULL + AND ( + asl.synced_at IS NULL + OR EXISTS ( + SELECT 1 + FROM book_reading_progress brp + JOIN books b2 ON b2.id = brp.book_id + WHERE b2.library_id = asl.library_id + AND COALESCE(NULLIF(b2.series, ''), 'unclassified') = asl.series_name + AND brp.user_id = $2 + AND brp.updated_at > asl.synced_at + ) + OR EXISTS ( + SELECT 1 + FROM books b2 + WHERE b2.library_id = asl.library_id + AND COALESCE(NULLIF(b2.series, ''), 'unclassified') = asl.series_name + AND b2.created_at > asl.synced_at + ) + ) + ORDER BY asl.series_name + "#, + ) + .bind(library_id) + .bind(local_user_id) + .fetch_all(pool) + .await + .map_err(|e| e.to_string())? + .into_iter() + .map(|row| SeriesInfo { + series_name: row.get("series_name"), + anilist_id: row.get("anilist_id"), + anilist_title: row.get("anilist_title"), + anilist_url: row.get("anilist_url"), + }) + .collect(); + + let total = series_to_push.len() as i32; + sqlx::query("UPDATE index_jobs SET total_files = $2 WHERE id = $1") + .bind(job_id) + .bind(total) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + let mut processed = 0i32; + + for series in &series_to_push { + if is_job_cancelled(pool, job_id).await { + sqlx::query( + "UPDATE index_jobs SET status = 'cancelled', finished_at = NOW() WHERE id = $1", + ) + .bind(job_id) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + return Ok(()); + } + + processed += 1; + let progress = (processed * 100 / total.max(1)).min(100); + sqlx::query( + "UPDATE index_jobs SET processed_files = $2, progress_percent = $3, current_file = $4 WHERE id = $1", + ) + .bind(job_id) + .bind(processed) + .bind(progress) + .bind(&series.series_name) + .execute(pool) + .await + .ok(); + + // Compute reading status for this series + let stats_row = sqlx::query( + r#" + SELECT + COUNT(b.id) AS total_books, + COUNT(brp.book_id) FILTER (WHERE brp.status = 'read') AS books_read + FROM books b + LEFT JOIN book_reading_progress brp + ON brp.book_id = b.id AND brp.user_id = $3 + WHERE b.library_id = $1 + AND COALESCE(NULLIF(b.series, ''), 'unclassified') = $2 + "#, + ) + .bind(library_id) + .bind(&series.series_name) + .bind(local_user_id) + .fetch_one(pool) + .await + .map_err(|e| e.to_string())?; + + let total_books: i64 = stats_row.get("total_books"); + let books_read: i64 = stats_row.get("books_read"); + + if total_books == 0 { + insert_push_result( + pool, job_id, library_id, &series.series_name, "no_books", + Some(series.anilist_id), series.anilist_title.as_deref(), series.anilist_url.as_deref(), + None, None, None, + ).await; + tokio::time::sleep(Duration::from_millis(700)).await; + continue; + } + + let anilist_status = if books_read == 0 { + "PLANNING" + } else if books_read >= total_books { + "COMPLETED" + } else { + "CURRENT" + }; + let progress_volumes = books_read as i32; + + match push_to_anilist( + &token, + series.anilist_id, + anilist_status, + progress_volumes, + ) + .await + { + Ok(()) => { + // Update synced_at + let _ = sqlx::query( + "UPDATE anilist_series_links SET synced_at = NOW() WHERE library_id = $1 AND series_name = $2", + ) + .bind(library_id) + .bind(&series.series_name) + .execute(pool) + .await; + + insert_push_result( + pool, job_id, library_id, &series.series_name, "pushed", + Some(series.anilist_id), series.anilist_title.as_deref(), series.anilist_url.as_deref(), + Some(anilist_status), Some(progress_volumes), None, + ).await; + } + Err(e) if e.contains("429") || e.contains("Too Many Requests") => { + warn!("[READING_STATUS_PUSH] rate limit hit for '{}', waiting 10s before retry", series.series_name); + tokio::time::sleep(Duration::from_secs(10)).await; + match push_to_anilist(&token, series.anilist_id, anilist_status, progress_volumes).await { + Ok(()) => { + let _ = sqlx::query( + "UPDATE anilist_series_links SET synced_at = NOW() WHERE library_id = $1 AND series_name = $2", + ) + .bind(library_id) + .bind(&series.series_name) + .execute(pool) + .await; + + insert_push_result( + pool, job_id, library_id, &series.series_name, "pushed", + Some(series.anilist_id), series.anilist_title.as_deref(), series.anilist_url.as_deref(), + Some(anilist_status), Some(progress_volumes), None, + ).await; + } + Err(e2) => { + return Err(format!( + "AniList rate limit exceeded (429) — job stopped after {processed}/{total} series: {e2}" + )); + } + } + } + Err(e) => { + warn!("[READING_STATUS_PUSH] series '{}': {e}", series.series_name); + insert_push_result( + pool, job_id, library_id, &series.series_name, "error", + Some(series.anilist_id), series.anilist_title.as_deref(), series.anilist_url.as_deref(), + None, None, Some(&e), + ).await; + } + } + + // Respect AniList rate limit (~90 req/min) + tokio::time::sleep(Duration::from_millis(700)).await; + } + + // Build final stats + let counts = sqlx::query( + "SELECT status, COUNT(*) as cnt FROM reading_status_push_results WHERE job_id = $1 GROUP BY status", + ) + .bind(job_id) + .fetch_all(pool) + .await + .map_err(|e| e.to_string())?; + + let mut count_pushed = 0i64; + let mut count_skipped = 0i64; + let mut count_no_books = 0i64; + let mut count_errors = 0i64; + for row in &counts { + let s: String = row.get("status"); + let c: i64 = row.get("cnt"); + match s.as_str() { + "pushed" => count_pushed = c, + "skipped" => count_skipped = c, + "no_books" => count_no_books = c, + "error" => count_errors = c, + _ => {} + } + } + + let stats = serde_json::json!({ + "total_series": total as i64, + "pushed": count_pushed, + "skipped": count_skipped, + "no_books": count_no_books, + "errors": count_errors, + }); + + sqlx::query( + "UPDATE index_jobs SET status = 'success', finished_at = NOW(), stats_json = $2, progress_percent = 100 WHERE id = $1", + ) + .bind(job_id) + .bind(&stats) + .execute(pool) + .await + .map_err(|e| e.to_string())?; + + info!( + "[READING_STATUS_PUSH] job={job_id} completed: {}/{} series, pushed={count_pushed}, no_books={count_no_books}, errors={count_errors}", + processed, total + ); + + let library_name: Option = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") + .bind(library_id) + .fetch_optional(pool) + .await + .ok() + .flatten(); + + notifications::notify( + pool.clone(), + notifications::NotificationEvent::ReadingStatusPushCompleted { + library_name, + total_series: total, + pushed: count_pushed as i32, + }, + ); + + Ok(()) +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +async fn push_to_anilist( + token: &str, + anilist_id: i32, + status: &str, + progress: i32, +) -> Result<(), String> { + let gql = r#" + mutation SaveMediaListEntry($mediaId: Int, $status: MediaListStatus, $progress: Int) { + SaveMediaListEntry(mediaId: $mediaId, status: $status, progress: $progress) { + id + status + progress + } + } + "#; + + anilist::anilist_graphql( + token, + gql, + serde_json::json!({ + "mediaId": anilist_id, + "status": status, + "progress": progress, + }), + ) + .await + .map_err(|e| e.message)?; + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +async fn insert_push_result( + pool: &PgPool, + job_id: Uuid, + library_id: Uuid, + series_name: &str, + status: &str, + anilist_id: Option, + anilist_title: Option<&str>, + anilist_url: Option<&str>, + anilist_status: Option<&str>, + progress_volumes: Option, + error_message: Option<&str>, +) { + let _ = sqlx::query( + r#" + INSERT INTO reading_status_push_results + (job_id, library_id, series_name, status, anilist_id, anilist_title, anilist_url, anilist_status, progress_volumes, error_message) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + "#, + ) + .bind(job_id) + .bind(library_id) + .bind(series_name) + .bind(status) + .bind(anilist_id) + .bind(anilist_title) + .bind(anilist_url) + .bind(anilist_status) + .bind(progress_volumes) + .bind(error_message) + .execute(pool) + .await; +} + +async fn is_job_cancelled(pool: &PgPool, job_id: Uuid) -> bool { + sqlx::query_scalar::<_, String>("SELECT status FROM index_jobs WHERE id = $1") + .bind(job_id) + .fetch_optional(pool) + .await + .ok() + .flatten() + .as_deref() + == Some("cancelled") +} diff --git a/apps/backoffice/app/(app)/jobs/[id]/page.tsx b/apps/backoffice/app/(app)/jobs/[id]/page.tsx index 9d82ef4..b8c1980 100644 --- a/apps/backoffice/app/(app)/jobs/[id]/page.tsx +++ b/apps/backoffice/app/(app)/jobs/[id]/page.tsx @@ -2,7 +2,7 @@ export const dynamic = "force-dynamic"; import { notFound } from "next/navigation"; import Link from "next/link"; -import { apiFetch, getMetadataBatchReport, getMetadataBatchResults, getMetadataRefreshReport, getReadingStatusMatchReport, getReadingStatusMatchResults, MetadataBatchReportDto, MetadataBatchResultDto, MetadataRefreshReportDto, ReadingStatusMatchReportDto, ReadingStatusMatchResultDto } from "@/lib/api"; +import { apiFetch, getMetadataBatchReport, getMetadataBatchResults, getMetadataRefreshReport, getReadingStatusMatchReport, getReadingStatusMatchResults, getReadingStatusPushReport, getReadingStatusPushResults, MetadataBatchReportDto, MetadataBatchResultDto, MetadataRefreshReportDto, ReadingStatusMatchReportDto, ReadingStatusMatchResultDto, ReadingStatusPushReportDto, ReadingStatusPushResultDto } from "@/lib/api"; import { Card, CardHeader, CardTitle, CardDescription, CardContent, StatusBadge, JobTypeBadge, StatBox, ProgressBar @@ -137,11 +137,17 @@ export default async function JobDetailPage({ params }: JobDetailPageProps) { description: t("jobType.reading_status_matchDesc"), isThumbnailOnly: false, }, + reading_status_push: { + label: t("jobType.reading_status_pushLabel"), + description: t("jobType.reading_status_pushDesc"), + isThumbnailOnly: false, + }, }; const isMetadataBatch = job.type === "metadata_batch"; const isMetadataRefresh = job.type === "metadata_refresh"; const isReadingStatusMatch = job.type === "reading_status_match"; + const isReadingStatusPush = job.type === "reading_status_push"; // Fetch batch report & results for metadata_batch jobs let batchReport: MetadataBatchReportDto | null = null; @@ -169,6 +175,16 @@ export default async function JobDetailPage({ params }: JobDetailPageProps) { ]); } + // Fetch reading status push report & results + let readingStatusPushReport: ReadingStatusPushReportDto | null = null; + let readingStatusPushResults: ReadingStatusPushResultDto[] = []; + if (isReadingStatusPush) { + [readingStatusPushReport, readingStatusPushResults] = await Promise.all([ + getReadingStatusPushReport(id).catch(() => null), + getReadingStatusPushResults(id).catch(() => []), + ]); + } + const typeInfo = JOB_TYPE_INFO[job.type] ?? { label: job.type, description: null, @@ -195,6 +211,8 @@ export default async function JobDetailPage({ params }: JobDetailPageProps) { ? t("jobDetail.metadataRefresh") : isReadingStatusMatch ? t("jobDetail.readingStatusMatch") + : isReadingStatusPush + ? t("jobDetail.readingStatusPush") : isThumbnailOnly ? t("jobType.thumbnail_rebuild") : isExtractingPages @@ -209,6 +227,8 @@ export default async function JobDetailPage({ params }: JobDetailPageProps) { ? t("jobDetail.metadataRefreshDesc") : isReadingStatusMatch ? t("jobDetail.readingStatusMatchDesc") + : isReadingStatusPush + ? t("jobDetail.readingStatusPushDesc") : isThumbnailOnly ? undefined : isExtractingPages @@ -265,7 +285,12 @@ export default async function JobDetailPage({ params }: JobDetailPageProps) { — {readingStatusReport.linked} {t("jobDetail.linked").toLowerCase()}, {readingStatusReport.no_results} {t("jobDetail.noResults").toLowerCase()}, {readingStatusReport.ambiguous} {t("jobDetail.ambiguous").toLowerCase()}, {readingStatusReport.errors} {t("jobDetail.errors").toLowerCase()} )} - {!isMetadataBatch && !isMetadataRefresh && !isReadingStatusMatch && job.stats_json && ( + {isReadingStatusPush && readingStatusPushReport && ( + + — {readingStatusPushReport.pushed} {t("jobDetail.pushed").toLowerCase()}, {readingStatusPushReport.no_books} {t("jobDetail.noBooks").toLowerCase()}, {readingStatusPushReport.errors} {t("jobDetail.errors").toLowerCase()} + + )} + {!isMetadataBatch && !isMetadataRefresh && !isReadingStatusMatch && !isReadingStatusPush && job.stats_json && ( — {job.stats_json.scanned_files} {t("jobDetail.scanned").toLowerCase()}, {job.stats_json.indexed_files} {t("jobDetail.indexed").toLowerCase()} {job.stats_json.removed_files > 0 && `, ${job.stats_json.removed_files} ${t("jobDetail.removed").toLowerCase()}`} @@ -274,7 +299,7 @@ export default async function JobDetailPage({ params }: JobDetailPageProps) { {job.total_files != null && job.total_files > 0 && `, ${job.total_files} ${t("jobType.thumbnail_rebuild").toLowerCase()}`} )} - {!isMetadataBatch && !isMetadataRefresh && !isReadingStatusMatch && !job.stats_json && isThumbnailOnly && job.total_files != null && ( + {!isMetadataBatch && !isMetadataRefresh && !isReadingStatusMatch && !isReadingStatusPush && !job.stats_json && isThumbnailOnly && job.total_files != null && ( — {job.processed_files ?? job.total_files} {t("jobDetail.generated").toLowerCase()} @@ -539,7 +564,7 @@ export default async function JobDetailPage({ params }: JobDetailPageProps) { )} {/* Index Statistics — index jobs only */} - {job.stats_json && !isThumbnailOnly && !isMetadataBatch && !isMetadataRefresh && !isReadingStatusMatch && ( + {job.stats_json && !isThumbnailOnly && !isMetadataBatch && !isMetadataRefresh && !isReadingStatusMatch && !isReadingStatusPush && ( {t("jobDetail.indexStats")} @@ -827,6 +852,92 @@ export default async function JobDetailPage({ params }: JobDetailPageProps) { )} + {/* Reading status push — summary report */} + {isReadingStatusPush && readingStatusPushReport && ( + + + {t("jobDetail.readingStatusPushReport")} + {t("jobDetail.seriesAnalyzed", { count: String(readingStatusPushReport.total_series) })} + + +
+ + + + 0 ? "error" : "default"} /> +
+
+
+ )} + + {/* Reading status push — per-series detail */} + {isReadingStatusPush && readingStatusPushResults.length > 0 && ( + + + {t("jobDetail.resultsBySeries")} + {t("jobDetail.seriesProcessed", { count: String(readingStatusPushResults.length) })} + + + {readingStatusPushResults.map((r) => ( +
+
+ {job.library_id ? ( + + {r.series_name} + + ) : ( + {r.series_name} + )} + + {r.status === "pushed" ? t("jobDetail.pushed") : + r.status === "skipped" ? t("jobDetail.skipped") : + r.status === "no_books" ? t("jobDetail.noBooks") : + r.status === "error" ? t("common.error") : + r.status} + +
+ {r.status === "pushed" && r.anilist_title && ( +
+ + + + {r.anilist_url ? ( + + {r.anilist_title} + + ) : ( + {r.anilist_title} + )} + {r.anilist_status && {r.anilist_status}} + {r.progress_volumes != null && vol. {r.progress_volumes}} +
+ )} + {r.error_message && ( +

{r.error_message}

+ )} +
+ ))} +
+
+ )} + {/* Metadata batch results */} {isMetadataBatch && batchResults.length > 0 && ( diff --git a/apps/backoffice/app/(app)/jobs/page.tsx b/apps/backoffice/app/(app)/jobs/page.tsx index b6a072c..5b6f88f 100644 --- a/apps/backoffice/app/(app)/jobs/page.tsx +++ b/apps/backoffice/app/(app)/jobs/page.tsx @@ -1,6 +1,6 @@ import { revalidatePath } from "next/cache"; import { redirect } from "next/navigation"; -import { listJobs, fetchLibraries, rebuildIndex, rebuildThumbnails, regenerateThumbnails, startMetadataBatch, startMetadataRefresh, startReadingStatusMatch, IndexJobDto, LibraryDto } from "@/lib/api"; +import { listJobs, fetchLibraries, rebuildIndex, rebuildThumbnails, regenerateThumbnails, startMetadataBatch, startMetadataRefresh, startReadingStatusMatch, startReadingStatusPush, IndexJobDto, LibraryDto } from "@/lib/api"; import { JobsList } from "@/app/components/JobsList"; import { Card, CardHeader, CardTitle, CardDescription, CardContent, FormField, FormSelect } from "@/app/components/ui"; import { getServerTranslations } from "@/lib/i18n/server"; @@ -149,6 +149,36 @@ export default async function JobsPage({ searchParams }: { searchParams: Promise } } + async function triggerReadingStatusPush(formData: FormData) { + "use server"; + const libraryId = formData.get("library_id") as string; + if (libraryId) { + let result; + try { + result = await startReadingStatusPush(libraryId); + } catch { + return; + } + revalidatePath("/jobs"); + redirect(`/jobs?highlight=${result.id}`); + } else { + // All libraries — only those with reading_status_provider configured + const allLibraries = await fetchLibraries().catch(() => [] as LibraryDto[]); + let lastId: string | undefined; + for (const lib of allLibraries) { + if (!lib.reading_status_provider) continue; + try { + const result = await startReadingStatusPush(lib.id); + if (result.status !== "already_running") lastId = result.id; + } catch { + // Skip libraries with errors + } + } + revalidatePath("/jobs"); + redirect(lastId ? `/jobs?highlight=${lastId}` : "/jobs"); + } + } + return ( <>
@@ -305,6 +335,16 @@ export default async function JobsPage({ searchParams }: { searchParams: Promise

{t("jobs.matchReadingStatusShort")}

+ )} diff --git a/apps/backoffice/app/api/jobs/[id]/replay/route.ts b/apps/backoffice/app/api/jobs/[id]/replay/route.ts index ae61478..661bdbe 100644 --- a/apps/backoffice/app/api/jobs/[id]/replay/route.ts +++ b/apps/backoffice/app/api/jobs/[id]/replay/route.ts @@ -1,5 +1,5 @@ import { NextRequest, NextResponse } from "next/server"; -import { apiFetch, IndexJobDto, rebuildIndex, rebuildThumbnails, regenerateThumbnails, startMetadataBatch, startMetadataRefresh, startReadingStatusMatch } from "@/lib/api"; +import { apiFetch, IndexJobDto, rebuildIndex, rebuildThumbnails, regenerateThumbnails, startMetadataBatch, startMetadataRefresh, startReadingStatusMatch, startReadingStatusPush } from "@/lib/api"; export async function POST( _request: NextRequest, @@ -32,6 +32,9 @@ export async function POST( case "reading_status_match": if (!libraryId) return NextResponse.json({ error: "Library ID required for reading status match" }, { status: 400 }); return NextResponse.json(await startReadingStatusMatch(libraryId)); + case "reading_status_push": + if (!libraryId) return NextResponse.json({ error: "Library ID required for reading status push" }, { status: 400 }); + return NextResponse.json(await startReadingStatusPush(libraryId)); default: return NextResponse.json({ error: `Cannot replay job type: ${job.type}` }, { status: 400 }); } diff --git a/apps/backoffice/app/components/JobRow.tsx b/apps/backoffice/app/components/JobRow.tsx index fed94f4..b33e383 100644 --- a/apps/backoffice/app/components/JobRow.tsx +++ b/apps/backoffice/app/components/JobRow.tsx @@ -35,7 +35,7 @@ interface JobRowProps { formatDuration: (start: string, end: string | null) => string; } -const REPLAYABLE_TYPES = new Set(["rebuild", "full_rebuild", "rescan", "scan", "thumbnail_rebuild", "thumbnail_regenerate", "metadata_batch", "metadata_refresh", "reading_status_match"]); +const REPLAYABLE_TYPES = new Set(["rebuild", "full_rebuild", "rescan", "scan", "thumbnail_rebuild", "thumbnail_regenerate", "metadata_batch", "metadata_refresh", "reading_status_match", "reading_status_push"]); export function JobRow({ job, libraryName, highlighted, onCancel, onReplay, formatDate, formatDuration }: JobRowProps) { const { t } = useTranslation(); diff --git a/apps/backoffice/app/components/ui/Badge.tsx b/apps/backoffice/app/components/ui/Badge.tsx index 0d9d31a..8774458 100644 --- a/apps/backoffice/app/components/ui/Badge.tsx +++ b/apps/backoffice/app/components/ui/Badge.tsx @@ -118,6 +118,7 @@ export function JobTypeBadge({ type, className = "" }: JobTypeBadgeProps) { metadata_batch: t("jobType.metadata_batch"), metadata_refresh: t("jobType.metadata_refresh"), reading_status_match: t("jobType.reading_status_match"), + reading_status_push: t("jobType.reading_status_push"), }; const label = jobTypeLabels[key] ?? type; return {label}; diff --git a/apps/backoffice/lib/api.ts b/apps/backoffice/lib/api.ts index 1ab7e56..edf8bd9 100644 --- a/apps/backoffice/lib/api.ts +++ b/apps/backoffice/lib/api.ts @@ -1102,6 +1102,43 @@ export async function getReadingStatusMatchResults(jobId: string) { return apiFetch(`/reading-status/match/${jobId}/results`); } +export async function startReadingStatusPush(libraryId: string) { + return apiFetch<{ id: string; status: string }>("/reading-status/push", { + method: "POST", + body: JSON.stringify({ library_id: libraryId }), + }); +} + +export type ReadingStatusPushReportDto = { + job_id: string; + status: string; + total_series: number; + pushed: number; + skipped: number; + no_books: number; + errors: number; +}; + +export type ReadingStatusPushResultDto = { + id: string; + series_name: string; + status: "pushed" | "skipped" | "no_books" | "error"; + anilist_id: number | null; + anilist_title: string | null; + anilist_url: string | null; + anilist_status: string | null; + progress_volumes: number | null; + error_message: string | null; +}; + +export async function getReadingStatusPushReport(jobId: string) { + return apiFetch(`/reading-status/push/${jobId}/report`); +} + +export async function getReadingStatusPushResults(jobId: string) { + return apiFetch(`/reading-status/push/${jobId}/results`); +} + export type RefreshFieldDiff = { field: string; old?: unknown; diff --git a/apps/backoffice/lib/i18n/en.ts b/apps/backoffice/lib/i18n/en.ts index 6c9b78f..70b56d0 100644 --- a/apps/backoffice/lib/i18n/en.ts +++ b/apps/backoffice/lib/i18n/en.ts @@ -262,6 +262,8 @@ const en: Record = { "jobs.groupReadingStatus": "Reading status", "jobs.matchReadingStatus": "Match series", "jobs.matchReadingStatusShort": "Auto-link unmatched series to the reading status provider", + "jobs.pushReadingStatus": "Push reading statuses", + "jobs.pushReadingStatusShort": "Push changed reading statuses to AniList (differential push)", // Jobs list "jobsList.id": "ID", @@ -368,6 +370,12 @@ const en: Record = { "jobDetail.readingStatusMatchReport": "Match report", "jobDetail.linked": "Linked", "jobDetail.ambiguous": "Ambiguous", + "jobDetail.readingStatusPush": "Reading status push", + "jobDetail.readingStatusPushDesc": "Differential push of reading statuses to AniList", + "jobDetail.readingStatusPushReport": "Push report", + "jobDetail.pushed": "Pushed", + "jobDetail.skipped": "Skipped", + "jobDetail.noBooks": "No books", // Job types "jobType.rebuild": "Indexing", @@ -397,6 +405,9 @@ const en: Record = { "jobType.reading_status_match": "Reading status match", "jobType.reading_status_matchLabel": "Series matching (reading status)", "jobType.reading_status_matchDesc": "Automatically searches each series in the library against the configured reading status provider (e.g. AniList) and creates links for unambiguously identified series.", + "jobType.reading_status_push": "Reading status push", + "jobType.reading_status_pushLabel": "Reading status push", + "jobType.reading_status_pushDesc": "Differentially pushes changed reading statuses (or new series) to AniList.", // Status badges "statusBadge.extracting_pages": "Extracting pages", diff --git a/apps/backoffice/lib/i18n/fr.ts b/apps/backoffice/lib/i18n/fr.ts index 38f64aa..a64438d 100644 --- a/apps/backoffice/lib/i18n/fr.ts +++ b/apps/backoffice/lib/i18n/fr.ts @@ -260,6 +260,8 @@ const fr = { "jobs.groupReadingStatus": "Statut de lecture", "jobs.matchReadingStatus": "Correspondance des séries", "jobs.matchReadingStatusShort": "Lier automatiquement les séries non associées au provider", + "jobs.pushReadingStatus": "Push des états de lecture", + "jobs.pushReadingStatusShort": "Envoyer les états de lecture modifiés vers AniList (push différentiel)", // Jobs list "jobsList.id": "ID", @@ -366,6 +368,12 @@ const fr = { "jobDetail.readingStatusMatchReport": "Rapport de correspondance", "jobDetail.linked": "Liées", "jobDetail.ambiguous": "Ambiguës", + "jobDetail.readingStatusPush": "Push des états de lecture", + "jobDetail.readingStatusPushDesc": "Envoi différentiel des états de lecture vers AniList", + "jobDetail.readingStatusPushReport": "Rapport de push", + "jobDetail.pushed": "Envoyés", + "jobDetail.skipped": "Ignorés", + "jobDetail.noBooks": "Sans livres", // Job types "jobType.rebuild": "Indexation", @@ -395,6 +403,9 @@ const fr = { "jobType.reading_status_match": "Correspondance statut lecture", "jobType.reading_status_matchLabel": "Correspondance des séries (statut lecture)", "jobType.reading_status_matchDesc": "Recherche automatiquement chaque série de la bibliothèque sur le provider de statut de lecture configuré (ex. AniList) et crée les liens pour les séries identifiées sans ambiguïté.", + "jobType.reading_status_push": "Push statut lecture", + "jobType.reading_status_pushLabel": "Push des états de lecture", + "jobType.reading_status_pushDesc": "Envoie les états de lecture modifiés (ou nouvelles séries) vers AniList de façon différentielle.", // Status badges "statusBadge.extracting_pages": "Extraction des pages", diff --git a/crates/notifications/src/lib.rs b/crates/notifications/src/lib.rs index b8b22da..c24384e 100644 --- a/crates/notifications/src/lib.rs +++ b/crates/notifications/src/lib.rs @@ -47,6 +47,10 @@ pub struct EventToggles { pub reading_status_match_completed: bool, #[serde(default = "default_true")] pub reading_status_match_failed: bool, + #[serde(default = "default_true")] + pub reading_status_push_completed: bool, + #[serde(default = "default_true")] + pub reading_status_push_failed: bool, } fn default_true() -> bool { @@ -69,6 +73,8 @@ fn default_events() -> EventToggles { metadata_refresh_failed: true, reading_status_match_completed: true, reading_status_match_failed: true, + reading_status_push_completed: true, + reading_status_push_failed: true, } } @@ -265,6 +271,16 @@ pub enum NotificationEvent { library_name: Option, error: String, }, + // Reading status push (differential push to AniList) + ReadingStatusPushCompleted { + library_name: Option, + total_series: i32, + pushed: i32, + }, + ReadingStatusPushFailed { + library_name: Option, + error: String, + }, } /// Classify an indexer job_type string into the right event constructor category. @@ -511,6 +527,37 @@ fn format_event(event: &NotificationEvent) -> String { ] .join("\n") } + NotificationEvent::ReadingStatusPushCompleted { + library_name, + total_series, + pushed, + } => { + let lib = library_name.as_deref().unwrap_or("All libraries"); + [ + format!("✅ Reading status push completed"), + format!("━━━━━━━━━━━━━━━━━━━━"), + format!("📂 Library: {lib}"), + String::new(), + format!("📊 Results"), + format!(" ⬆️ Pushed: {pushed} / {total_series} series"), + ] + .join("\n") + } + NotificationEvent::ReadingStatusPushFailed { + library_name, + error, + } => { + let lib = library_name.as_deref().unwrap_or("All libraries"); + let err = truncate(error, 200); + [ + format!("🚨 Reading status push failed"), + format!("━━━━━━━━━━━━━━━━━━━━"), + format!("📂 Library: {lib}"), + String::new(), + format!("💬 {err}"), + ] + .join("\n") + } } } @@ -553,6 +600,8 @@ fn is_event_enabled(config: &TelegramConfig, event: &NotificationEvent) -> bool NotificationEvent::MetadataRefreshFailed { .. } => config.events.metadata_refresh_failed, NotificationEvent::ReadingStatusMatchCompleted { .. } => config.events.reading_status_match_completed, NotificationEvent::ReadingStatusMatchFailed { .. } => config.events.reading_status_match_failed, + NotificationEvent::ReadingStatusPushCompleted { .. } => config.events.reading_status_push_completed, + NotificationEvent::ReadingStatusPushFailed { .. } => config.events.reading_status_push_failed, } } diff --git a/infra/migrations/0057_add_reading_status_push_job_type.sql b/infra/migrations/0057_add_reading_status_push_job_type.sql new file mode 100644 index 0000000..ec6c3b8 --- /dev/null +++ b/infra/migrations/0057_add_reading_status_push_job_type.sql @@ -0,0 +1,4 @@ +ALTER TABLE index_jobs + DROP CONSTRAINT IF EXISTS index_jobs_type_check, + ADD CONSTRAINT index_jobs_type_check + CHECK (type IN ('scan', 'rebuild', 'full_rebuild', 'rescan', 'thumbnail_rebuild', 'thumbnail_regenerate', 'cbr_to_cbz', 'metadata_batch', 'metadata_refresh', 'reading_status_match', 'reading_status_push')); diff --git a/infra/migrations/0058_add_reading_status_push_results.sql b/infra/migrations/0058_add_reading_status_push_results.sql new file mode 100644 index 0000000..a50080f --- /dev/null +++ b/infra/migrations/0058_add_reading_status_push_results.sql @@ -0,0 +1,19 @@ +CREATE TABLE reading_status_push_results ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + job_id UUID NOT NULL REFERENCES index_jobs(id) ON DELETE CASCADE, + library_id UUID NOT NULL REFERENCES libraries(id) ON DELETE CASCADE, + series_name TEXT NOT NULL, + -- 'pushed' | 'skipped' | 'no_books' | 'error' + status TEXT NOT NULL, + anilist_id INTEGER, + anilist_title TEXT, + anilist_url TEXT, + -- what was actually sent to AniList (NULL when skipped/error) + anilist_status TEXT, + progress_volumes INTEGER, + error_message TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_rspr_job_id ON reading_status_push_results(job_id); +CREATE INDEX idx_rspr_status ON reading_status_push_results(status);