From f3960666faf456f7ca0d992ae43e32d7e9fe31ea Mon Sep 17 00:00:00 2001 From: Froidefond Julien Date: Wed, 25 Mar 2026 12:46:48 +0100 Subject: [PATCH] feat: add reading_status_push auto-refresh schedule per library - Migration 0059: reading_status_push_mode / last / next columns on libraries - API: update_reading_status_provider accepts push_mode and calculates next_push_at - job_poller: handles reading_status_push pending jobs - Indexer scheduler: check_and_schedule_reading_status_push every minute - Backoffice: schedule select in library settings modal Co-Authored-By: Claude Sonnet 4.6 --- apps/api/src/job_poller.rs | 21 ++++++- apps/api/src/libraries.rs | 56 ++++++++++++++--- apps/api/src/reading_status_push.rs | 2 +- apps/backoffice/app/(app)/libraries/page.tsx | 1 + .../app/components/LibraryActions.tsx | 24 ++++++- apps/backoffice/lib/api.ts | 2 + apps/backoffice/lib/i18n/en.ts | 2 + apps/backoffice/lib/i18n/fr.ts | 2 + apps/indexer/src/scheduler.rs | 62 +++++++++++++++++++ apps/indexer/src/worker.rs | 3 + .../0059_add_reading_status_push_schedule.sql | 4 ++ 11 files changed, 166 insertions(+), 13 deletions(-) create mode 100644 infra/migrations/0059_add_reading_status_push_schedule.sql diff --git a/apps/api/src/job_poller.rs b/apps/api/src/job_poller.rs index 43c3d07..6421ff1 100644 --- a/apps/api/src/job_poller.rs +++ b/apps/api/src/job_poller.rs @@ -4,7 +4,7 @@ use sqlx::{PgPool, Row}; use tracing::{error, info, trace}; use uuid::Uuid; -use crate::{metadata_batch, metadata_refresh}; +use crate::{metadata_batch, metadata_refresh, reading_status_push}; /// Poll for pending API-only jobs (`metadata_batch`, `metadata_refresh`) and process them. /// This mirrors the indexer's worker loop but for job types handled by the API. @@ -43,6 +43,14 @@ pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) { ) .await } + "reading_status_push" => { + reading_status_push::process_reading_status_push( + &pool_clone, + job_id, + library_id, + ) + .await + } _ => Err(format!("Unknown API job type: {job_type}")), }; @@ -75,6 +83,15 @@ pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) { }, ); } + "reading_status_push" => { + notifications::notify( + pool_clone, + notifications::NotificationEvent::ReadingStatusPushFailed { + library_name, + error: e.to_string(), + }, + ); + } _ => {} } } @@ -92,7 +109,7 @@ pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) { } } -const API_JOB_TYPES: &[&str] = &["metadata_batch", "metadata_refresh"]; +const API_JOB_TYPES: &[&str] = &["metadata_batch", "metadata_refresh", "reading_status_push"]; async fn claim_next_api_job(pool: &PgPool) -> Result, sqlx::Error> { let mut tx = pool.begin().await?; diff --git a/apps/api/src/libraries.rs b/apps/api/src/libraries.rs index 370625e..2a2c394 100644 --- a/apps/api/src/libraries.rs +++ b/apps/api/src/libraries.rs @@ -31,6 +31,9 @@ pub struct LibraryResponse { #[schema(value_type = Vec)] pub thumbnail_book_ids: Vec, pub reading_status_provider: Option, + pub reading_status_push_mode: String, + #[schema(value_type = Option)] + pub next_reading_status_push_at: Option>, } #[derive(Deserialize, ToSchema)] @@ -54,7 +57,7 @@ pub struct CreateLibraryRequest { )] pub async fn list_libraries(State(state): State) -> Result>, ApiError> { let rows = sqlx::query( - "SELECT l.id, l.name, l.root_path, l.enabled, l.monitor_enabled, l.scan_mode, l.next_scan_at, l.watcher_enabled, l.metadata_provider, l.fallback_metadata_provider, l.metadata_refresh_mode, l.next_metadata_refresh_at, l.reading_status_provider, + "SELECT l.id, l.name, l.root_path, l.enabled, l.monitor_enabled, l.scan_mode, l.next_scan_at, l.watcher_enabled, l.metadata_provider, l.fallback_metadata_provider, l.metadata_refresh_mode, l.next_metadata_refresh_at, l.reading_status_provider, l.reading_status_push_mode, l.next_reading_status_push_at, (SELECT COUNT(*) FROM books b WHERE b.library_id = l.id) as book_count, (SELECT COUNT(DISTINCT COALESCE(NULLIF(b.series, ''), 'unclassified')) FROM books b WHERE b.library_id = l.id) as series_count, COALESCE(( @@ -94,6 +97,8 @@ pub async fn list_libraries(State(state): State) -> Result, + pub reading_status_push_mode: Option, } /// Update the reading status provider for a library @@ -506,15 +518,41 @@ pub async fn update_reading_status_provider( Json(input): Json, ) -> Result, ApiError> { let provider = input.reading_status_provider.as_deref().filter(|s| !s.is_empty()); - let result = sqlx::query("UPDATE libraries SET reading_status_provider = $2 WHERE id = $1") - .bind(library_id) - .bind(provider) - .execute(&state.pool) - .await?; + + let valid_modes = ["manual", "hourly", "daily", "weekly"]; + let push_mode = input.reading_status_push_mode.as_deref().unwrap_or("manual"); + if !valid_modes.contains(&push_mode) { + return Err(ApiError::bad_request("reading_status_push_mode must be one of: manual, hourly, daily, weekly")); + } + + let next_push_at = if push_mode != "manual" { + let interval_minutes: i64 = match push_mode { + "hourly" => 60, + "daily" => 1440, + "weekly" => 10080, + _ => 1440, + }; + Some(chrono::Utc::now() + chrono::Duration::minutes(interval_minutes)) + } else { + None + }; + + let result = sqlx::query( + "UPDATE libraries SET reading_status_provider = $2, reading_status_push_mode = $3, next_reading_status_push_at = $4 WHERE id = $1" + ) + .bind(library_id) + .bind(provider) + .bind(push_mode) + .bind(next_push_at) + .execute(&state.pool) + .await?; if result.rows_affected() == 0 { return Err(ApiError::not_found("library not found")); } - Ok(Json(serde_json::json!({ "reading_status_provider": provider }))) + Ok(Json(serde_json::json!({ + "reading_status_provider": provider, + "reading_status_push_mode": push_mode, + }))) } diff --git a/apps/api/src/reading_status_push.rs b/apps/api/src/reading_status_push.rs index abf9353..aff1a35 100644 --- a/apps/api/src/reading_status_push.rs +++ b/apps/api/src/reading_status_push.rs @@ -296,7 +296,7 @@ struct SeriesInfo { anilist_url: Option, } -async fn process_reading_status_push( +pub async fn process_reading_status_push( pool: &PgPool, job_id: Uuid, library_id: Uuid, diff --git a/apps/backoffice/app/(app)/libraries/page.tsx b/apps/backoffice/app/(app)/libraries/page.tsx index b9a0d83..9176c8e 100644 --- a/apps/backoffice/app/(app)/libraries/page.tsx +++ b/apps/backoffice/app/(app)/libraries/page.tsx @@ -147,6 +147,7 @@ export default async function LibrariesPage() { fallbackMetadataProvider={lib.fallback_metadata_provider} metadataRefreshMode={lib.metadata_refresh_mode} readingStatusProvider={lib.reading_status_provider} + readingStatusPushMode={lib.reading_status_push_mode} />
diff --git a/apps/backoffice/app/components/LibraryActions.tsx b/apps/backoffice/app/components/LibraryActions.tsx index ec79e86..a384e69 100644 --- a/apps/backoffice/app/components/LibraryActions.tsx +++ b/apps/backoffice/app/components/LibraryActions.tsx @@ -15,6 +15,7 @@ interface LibraryActionsProps { fallbackMetadataProvider: string | null; metadataRefreshMode: string; readingStatusProvider: string | null; + readingStatusPushMode: string; onUpdate?: () => void; } @@ -27,6 +28,7 @@ export function LibraryActions({ fallbackMetadataProvider, metadataRefreshMode, readingStatusProvider, + readingStatusPushMode, }: LibraryActionsProps) { const { t } = useTranslation(); const [isOpen, setIsOpen] = useState(false); @@ -43,6 +45,7 @@ export function LibraryActions({ const newFallbackProvider = (formData.get("fallback_metadata_provider") as string) || null; const newMetadataRefreshMode = formData.get("metadata_refresh_mode") as string; const newReadingStatusProvider = (formData.get("reading_status_provider") as string) || null; + const newReadingStatusPushMode = (formData.get("reading_status_push_mode") as string) || "manual"; try { const [response] = await Promise.all([ @@ -64,7 +67,10 @@ export function LibraryActions({ fetch(`/api/libraries/${libraryId}/reading-status-provider`, { method: "PATCH", headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ reading_status_provider: newReadingStatusProvider }), + body: JSON.stringify({ + reading_status_provider: newReadingStatusProvider, + reading_status_push_mode: newReadingStatusPushMode, + }), }), ]); @@ -289,6 +295,22 @@ export function LibraryActions({

{t("libraryActions.readingStatusProviderDesc")}

+
+
+ + +
+

{t("libraryActions.readingStatusPushScheduleDesc")}

+
{saveError && ( diff --git a/apps/backoffice/lib/api.ts b/apps/backoffice/lib/api.ts index edf8bd9..c512fe7 100644 --- a/apps/backoffice/lib/api.ts +++ b/apps/backoffice/lib/api.ts @@ -15,6 +15,8 @@ export type LibraryDto = { series_count: number; thumbnail_book_ids: string[]; reading_status_provider: string | null; + reading_status_push_mode: string; + next_reading_status_push_at: string | null; }; export type IndexJobDto = { diff --git a/apps/backoffice/lib/i18n/en.ts b/apps/backoffice/lib/i18n/en.ts index 597e936..86cb1f5 100644 --- a/apps/backoffice/lib/i18n/en.ts +++ b/apps/backoffice/lib/i18n/en.ts @@ -200,6 +200,8 @@ const en: Record = { "libraryActions.sectionReadingStatus": "Reading Status", "libraryActions.readingStatusProvider": "Reading Status Provider", "libraryActions.readingStatusProviderDesc": "Syncs reading states (read / reading / planned) with an external service", + "libraryActions.readingStatusPushSchedule": "Auto-push schedule", + "libraryActions.readingStatusPushScheduleDesc": "Automatically push reading progress to the provider on a schedule", // Reading status modal "readingStatus.button": "Reading status", diff --git a/apps/backoffice/lib/i18n/fr.ts b/apps/backoffice/lib/i18n/fr.ts index 88f2100..863dbb7 100644 --- a/apps/backoffice/lib/i18n/fr.ts +++ b/apps/backoffice/lib/i18n/fr.ts @@ -198,6 +198,8 @@ const fr = { "libraryActions.sectionReadingStatus": "État de lecture", "libraryActions.readingStatusProvider": "Provider d'état de lecture", "libraryActions.readingStatusProviderDesc": "Synchronise les états de lecture (lu / en cours / planifié) avec un service externe", + "libraryActions.readingStatusPushSchedule": "Synchronisation automatique", + "libraryActions.readingStatusPushScheduleDesc": "Pousse automatiquement la progression de lecture vers le provider selon un calendrier", // Reading status modal "readingStatus.button": "État de lecture", diff --git a/apps/indexer/src/scheduler.rs b/apps/indexer/src/scheduler.rs index d010181..2c4c174 100644 --- a/apps/indexer/src/scheduler.rs +++ b/apps/indexer/src/scheduler.rs @@ -66,6 +66,68 @@ pub async fn check_and_schedule_auto_scans(pool: &PgPool) -> Result<()> { Ok(()) } +pub async fn check_and_schedule_reading_status_push(pool: &PgPool) -> Result<()> { + let libraries = sqlx::query( + r#" + SELECT id, reading_status_push_mode + FROM libraries + WHERE reading_status_push_mode != 'manual' + AND reading_status_provider IS NOT NULL + AND ( + next_reading_status_push_at IS NULL + OR next_reading_status_push_at <= NOW() + ) + AND NOT EXISTS ( + SELECT 1 FROM index_jobs + WHERE library_id = libraries.id + AND type = 'reading_status_push' + AND status IN ('pending', 'running') + ) + AND EXISTS ( + SELECT 1 FROM anilist_series_links + WHERE library_id = libraries.id + ) + "# + ) + .fetch_all(pool) + .await?; + + for row in libraries { + let library_id: Uuid = row.get("id"); + let push_mode: String = row.get("reading_status_push_mode"); + + info!("[SCHEDULER] Auto-pushing reading status for library {} (mode: {})", library_id, push_mode); + + let job_id = Uuid::new_v4(); + sqlx::query( + "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, 'reading_status_push', 'pending')" + ) + .bind(job_id) + .bind(library_id) + .execute(pool) + .await?; + + let interval_minutes: i64 = match push_mode.as_str() { + "hourly" => 60, + "daily" => 1440, + "weekly" => 10080, + _ => 1440, + }; + + sqlx::query( + "UPDATE libraries SET last_reading_status_push_at = NOW(), next_reading_status_push_at = NOW() + INTERVAL '1 minute' * $2 WHERE id = $1" + ) + .bind(library_id) + .bind(interval_minutes) + .execute(pool) + .await?; + + info!("[SCHEDULER] Created reading_status_push job {} for library {}", job_id, library_id); + } + + Ok(()) +} + pub async fn check_and_schedule_metadata_refreshes(pool: &PgPool) -> Result<()> { let libraries = sqlx::query( r#" diff --git a/apps/indexer/src/worker.rs b/apps/indexer/src/worker.rs index 8e793ae..cc65523 100644 --- a/apps/indexer/src/worker.rs +++ b/apps/indexer/src/worker.rs @@ -32,6 +32,9 @@ pub async fn run_worker(state: AppState, interval_seconds: u64) { if let Err(err) = scheduler::check_and_schedule_metadata_refreshes(&scheduler_state.pool).await { error!("[SCHEDULER] Metadata refresh error: {}", err); } + if let Err(err) = scheduler::check_and_schedule_reading_status_push(&scheduler_state.pool).await { + error!("[SCHEDULER] Reading status push error: {}", err); + } tokio::time::sleep(scheduler_wait).await; } }); diff --git a/infra/migrations/0059_add_reading_status_push_schedule.sql b/infra/migrations/0059_add_reading_status_push_schedule.sql new file mode 100644 index 0000000..8fe1df4 --- /dev/null +++ b/infra/migrations/0059_add_reading_status_push_schedule.sql @@ -0,0 +1,4 @@ +ALTER TABLE libraries + ADD COLUMN reading_status_push_mode TEXT NOT NULL DEFAULT 'manual', + ADD COLUMN last_reading_status_push_at TIMESTAMPTZ, + ADD COLUMN next_reading_status_push_at TIMESTAMPTZ;