From e0d94758af8e5cf64c0dd3cda0f9df627bb259c3 Mon Sep 17 00:00:00 2001 From: Froidefond Julien Date: Wed, 25 Mar 2026 13:57:59 +0100 Subject: [PATCH] feat: add per-library download detection auto-schedule MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a configurable schedule (manual/hourly/daily/weekly) for the download detection job in the library settings modal. The indexer scheduler triggers the job automatically, and the API job poller processes it — consistent with the reading_status_push pattern. Co-Authored-By: Claude Sonnet 4.6 --- apps/api/src/download_detection.rs | 60 ++++++++-------- apps/api/src/job_poller.rs | 22 +++++- apps/api/src/libraries.rs | 40 ++++++++++- apps/backoffice/app/(app)/libraries/page.tsx | 1 + .../api/libraries/[id]/monitoring/route.ts | 4 +- .../app/components/LibraryActions.tsx | 32 +++++++++ apps/backoffice/lib/api.ts | 7 ++ apps/backoffice/lib/i18n/en.ts | 3 + apps/backoffice/lib/i18n/fr.ts | 3 + apps/indexer/src/scheduler.rs | 69 +++++++++++++++++++ apps/indexer/src/worker.rs | 3 + .../0061_add_download_detection_schedule.sql | 3 + 12 files changed, 212 insertions(+), 35 deletions(-) create mode 100644 infra/migrations/0061_add_download_detection_schedule.sql diff --git a/apps/api/src/download_detection.rs b/apps/api/src/download_detection.rs index bafa7cb..4034eea 100644 --- a/apps/api/src/download_detection.rs +++ b/apps/api/src/download_detection.rs @@ -119,34 +119,22 @@ pub async fn start_detection( .flatten(); tokio::spawn(async move { - match process_download_detection(&pool, job_id, library_id).await { - Ok((total_series, found)) => { - notifications::notify( - pool, - notifications::NotificationEvent::DownloadDetectionCompleted { - library_name, - total_series, - found, - }, - ); - } - Err(e) => { - warn!("[DOWNLOAD_DETECTION] job {job_id} failed: {e}"); - let _ = sqlx::query( - "UPDATE index_jobs SET status = 'failed', error_opt = $2, finished_at = NOW() WHERE id = $1", - ) - .bind(job_id) - .bind(e.to_string()) - .execute(&pool) - .await; - notifications::notify( - pool, - notifications::NotificationEvent::DownloadDetectionFailed { - library_name, - error: e.to_string(), - }, - ); - } + if let Err(e) = process_download_detection(&pool, job_id, library_id).await { + warn!("[DOWNLOAD_DETECTION] job {job_id} failed: {e}"); + let _ = sqlx::query( + "UPDATE index_jobs SET status = 'failed', error_opt = $2, finished_at = NOW() WHERE id = $1", + ) + .bind(job_id) + .bind(e.to_string()) + .execute(&pool) + .await; + notifications::notify( + pool, + notifications::NotificationEvent::DownloadDetectionFailed { + library_name, + error: e.to_string(), + }, + ); } }); @@ -500,6 +488,22 @@ pub(crate) async fn process_download_detection( "[DOWNLOAD_DETECTION] job={job_id} completed: {total} series, found={count_found}, not_found={count_not_found}, no_missing={count_no_missing}, no_metadata={count_no_metadata}, errors={count_errors}" ); + let library_name: Option = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") + .bind(library_id) + .fetch_optional(pool) + .await + .ok() + .flatten(); + + notifications::notify( + pool.clone(), + notifications::NotificationEvent::DownloadDetectionCompleted { + library_name, + total_series: total, + found: count_found, + }, + ); + Ok((total, count_found)) } diff --git a/apps/api/src/job_poller.rs b/apps/api/src/job_poller.rs index 6421ff1..8104db8 100644 --- a/apps/api/src/job_poller.rs +++ b/apps/api/src/job_poller.rs @@ -4,7 +4,7 @@ use sqlx::{PgPool, Row}; use tracing::{error, info, trace}; use uuid::Uuid; -use crate::{metadata_batch, metadata_refresh, reading_status_push}; +use crate::{download_detection, metadata_batch, metadata_refresh, reading_status_push}; /// Poll for pending API-only jobs (`metadata_batch`, `metadata_refresh`) and process them. /// This mirrors the indexer's worker loop but for job types handled by the API. @@ -51,6 +51,15 @@ pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) { ) .await } + "download_detection" => { + download_detection::process_download_detection( + &pool_clone, + job_id, + library_id, + ) + .await + .map(|_| ()) + } _ => Err(format!("Unknown API job type: {job_type}")), }; @@ -92,6 +101,15 @@ pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) { }, ); } + "download_detection" => { + notifications::notify( + pool_clone, + notifications::NotificationEvent::DownloadDetectionFailed { + library_name, + error: e.to_string(), + }, + ); + } _ => {} } } @@ -109,7 +127,7 @@ pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) { } } -const API_JOB_TYPES: &[&str] = &["metadata_batch", "metadata_refresh", "reading_status_push"]; +const API_JOB_TYPES: &[&str] = &["metadata_batch", "metadata_refresh", "reading_status_push", "download_detection"]; async fn claim_next_api_job(pool: &PgPool) -> Result, sqlx::Error> { let mut tx = pool.begin().await?; diff --git a/apps/api/src/libraries.rs b/apps/api/src/libraries.rs index 2a2c394..96d0677 100644 --- a/apps/api/src/libraries.rs +++ b/apps/api/src/libraries.rs @@ -34,6 +34,9 @@ pub struct LibraryResponse { pub reading_status_push_mode: String, #[schema(value_type = Option)] pub next_reading_status_push_at: Option>, + pub download_detection_mode: String, + #[schema(value_type = Option)] + pub next_download_detection_at: Option>, } #[derive(Deserialize, ToSchema)] @@ -57,7 +60,7 @@ pub struct CreateLibraryRequest { )] pub async fn list_libraries(State(state): State) -> Result>, ApiError> { let rows = sqlx::query( - "SELECT l.id, l.name, l.root_path, l.enabled, l.monitor_enabled, l.scan_mode, l.next_scan_at, l.watcher_enabled, l.metadata_provider, l.fallback_metadata_provider, l.metadata_refresh_mode, l.next_metadata_refresh_at, l.reading_status_provider, l.reading_status_push_mode, l.next_reading_status_push_at, + "SELECT l.id, l.name, l.root_path, l.enabled, l.monitor_enabled, l.scan_mode, l.next_scan_at, l.watcher_enabled, l.metadata_provider, l.fallback_metadata_provider, l.metadata_refresh_mode, l.next_metadata_refresh_at, l.reading_status_provider, l.reading_status_push_mode, l.next_reading_status_push_at, l.download_detection_mode, l.next_download_detection_at, (SELECT COUNT(*) FROM books b WHERE b.library_id = l.id) as book_count, (SELECT COUNT(DISTINCT COALESCE(NULLIF(b.series, ''), 'unclassified')) FROM books b WHERE b.library_id = l.id) as series_count, COALESCE(( @@ -99,6 +102,8 @@ pub async fn list_libraries(State(state): State) -> Result, #[schema(value_type = Option, example = "daily")] pub metadata_refresh_mode: Option, // 'manual', 'hourly', 'daily', 'weekly' + #[schema(value_type = Option, example = "daily")] + pub download_detection_mode: Option, // 'manual', 'hourly', 'daily', 'weekly' } /// Update monitoring settings for a library @@ -317,6 +326,12 @@ pub async fn update_monitoring( return Err(ApiError::bad_request("metadata_refresh_mode must be one of: manual, hourly, daily, weekly")); } + // Validate download_detection_mode + let download_detection_mode = input.download_detection_mode.as_deref().unwrap_or("manual"); + if !valid_modes.contains(&download_detection_mode) { + return Err(ApiError::bad_request("download_detection_mode must be one of: manual, hourly, daily, weekly")); + } + // Calculate next_scan_at if monitoring is enabled let next_scan_at = if input.monitor_enabled { let interval_minutes = match input.scan_mode.as_str() { @@ -343,10 +358,23 @@ pub async fn update_monitoring( None }; + // Calculate next_download_detection_at + let next_download_detection_at = if download_detection_mode != "manual" { + let interval_minutes = match download_detection_mode { + "hourly" => 60, + "daily" => 1440, + "weekly" => 10080, + _ => 1440, + }; + Some(chrono::Utc::now() + chrono::Duration::minutes(interval_minutes)) + } else { + None + }; + let watcher_enabled = input.watcher_enabled.unwrap_or(false); let result = sqlx::query( - "UPDATE libraries SET monitor_enabled = $2, scan_mode = $3, next_scan_at = $4, watcher_enabled = $5, metadata_refresh_mode = $6, next_metadata_refresh_at = $7 WHERE id = $1 RETURNING id, name, root_path, enabled, monitor_enabled, scan_mode, next_scan_at, watcher_enabled, metadata_provider, fallback_metadata_provider, metadata_refresh_mode, next_metadata_refresh_at, reading_status_provider, reading_status_push_mode, next_reading_status_push_at" + "UPDATE libraries SET monitor_enabled = $2, scan_mode = $3, next_scan_at = $4, watcher_enabled = $5, metadata_refresh_mode = $6, next_metadata_refresh_at = $7, download_detection_mode = $8, next_download_detection_at = $9 WHERE id = $1 RETURNING id, name, root_path, enabled, monitor_enabled, scan_mode, next_scan_at, watcher_enabled, metadata_provider, fallback_metadata_provider, metadata_refresh_mode, next_metadata_refresh_at, reading_status_provider, reading_status_push_mode, next_reading_status_push_at, download_detection_mode, next_download_detection_at" ) .bind(library_id) .bind(input.monitor_enabled) @@ -355,6 +383,8 @@ pub async fn update_monitoring( .bind(watcher_enabled) .bind(metadata_refresh_mode) .bind(next_metadata_refresh_at) + .bind(download_detection_mode) + .bind(next_download_detection_at) .fetch_optional(&state.pool) .await?; @@ -402,6 +432,8 @@ pub async fn update_monitoring( reading_status_provider: row.get("reading_status_provider"), reading_status_push_mode: row.get("reading_status_push_mode"), next_reading_status_push_at: row.get("next_reading_status_push_at"), + download_detection_mode: row.get("download_detection_mode"), + next_download_detection_at: row.get("next_download_detection_at"), })) } @@ -437,7 +469,7 @@ pub async fn update_metadata_provider( let fallback = input.fallback_metadata_provider.as_deref().filter(|s| !s.is_empty()); let result = sqlx::query( - "UPDATE libraries SET metadata_provider = $2, fallback_metadata_provider = $3 WHERE id = $1 RETURNING id, name, root_path, enabled, monitor_enabled, scan_mode, next_scan_at, watcher_enabled, metadata_provider, fallback_metadata_provider, metadata_refresh_mode, next_metadata_refresh_at, reading_status_provider, reading_status_push_mode, next_reading_status_push_at" + "UPDATE libraries SET metadata_provider = $2, fallback_metadata_provider = $3 WHERE id = $1 RETURNING id, name, root_path, enabled, monitor_enabled, scan_mode, next_scan_at, watcher_enabled, metadata_provider, fallback_metadata_provider, metadata_refresh_mode, next_metadata_refresh_at, reading_status_provider, reading_status_push_mode, next_reading_status_push_at, download_detection_mode, next_download_detection_at" ) .bind(library_id) .bind(provider) @@ -489,6 +521,8 @@ pub async fn update_metadata_provider( reading_status_provider: row.get("reading_status_provider"), reading_status_push_mode: row.get("reading_status_push_mode"), next_reading_status_push_at: row.get("next_reading_status_push_at"), + download_detection_mode: row.get("download_detection_mode"), + next_download_detection_at: row.get("next_download_detection_at"), })) } diff --git a/apps/backoffice/app/(app)/libraries/page.tsx b/apps/backoffice/app/(app)/libraries/page.tsx index 9176c8e..1a7d431 100644 --- a/apps/backoffice/app/(app)/libraries/page.tsx +++ b/apps/backoffice/app/(app)/libraries/page.tsx @@ -148,6 +148,7 @@ export default async function LibrariesPage() { metadataRefreshMode={lib.metadata_refresh_mode} readingStatusProvider={lib.reading_status_provider} readingStatusPushMode={lib.reading_status_push_mode} + downloadDetectionMode={lib.download_detection_mode ?? "manual"} />
diff --git a/apps/backoffice/app/api/libraries/[id]/monitoring/route.ts b/apps/backoffice/app/api/libraries/[id]/monitoring/route.ts index f26a554..91be35e 100644 --- a/apps/backoffice/app/api/libraries/[id]/monitoring/route.ts +++ b/apps/backoffice/app/api/libraries/[id]/monitoring/route.ts @@ -8,8 +8,8 @@ export async function PATCH( ) { const { id } = await params; try { - const { monitor_enabled, scan_mode, watcher_enabled, metadata_refresh_mode } = await request.json(); - const data = await updateLibraryMonitoring(id, monitor_enabled, scan_mode, watcher_enabled, metadata_refresh_mode); + const { monitor_enabled, scan_mode, watcher_enabled, metadata_refresh_mode, download_detection_mode } = await request.json(); + const data = await updateLibraryMonitoring(id, monitor_enabled, scan_mode, watcher_enabled, metadata_refresh_mode, download_detection_mode); revalidatePath("/libraries"); return NextResponse.json(data); } catch (error) { diff --git a/apps/backoffice/app/components/LibraryActions.tsx b/apps/backoffice/app/components/LibraryActions.tsx index a384e69..dedc642 100644 --- a/apps/backoffice/app/components/LibraryActions.tsx +++ b/apps/backoffice/app/components/LibraryActions.tsx @@ -16,6 +16,7 @@ interface LibraryActionsProps { metadataRefreshMode: string; readingStatusProvider: string | null; readingStatusPushMode: string; + downloadDetectionMode: string; onUpdate?: () => void; } @@ -29,6 +30,7 @@ export function LibraryActions({ metadataRefreshMode, readingStatusProvider, readingStatusPushMode, + downloadDetectionMode, }: LibraryActionsProps) { const { t } = useTranslation(); const [isOpen, setIsOpen] = useState(false); @@ -46,6 +48,7 @@ export function LibraryActions({ const newMetadataRefreshMode = formData.get("metadata_refresh_mode") as string; const newReadingStatusProvider = (formData.get("reading_status_provider") as string) || null; const newReadingStatusPushMode = (formData.get("reading_status_push_mode") as string) || "manual"; + const newDownloadDetectionMode = (formData.get("download_detection_mode") as string) || "manual"; try { const [response] = await Promise.all([ @@ -57,6 +60,7 @@ export function LibraryActions({ scan_mode: scanMode, watcher_enabled: watcherEnabled, metadata_refresh_mode: newMetadataRefreshMode, + download_detection_mode: newDownloadDetectionMode, }), }), fetch(`/api/libraries/${libraryId}/metadata-provider`, { @@ -313,6 +317,34 @@ export function LibraryActions({ +
+ + {/* Section: Prowlarr */} +
+

+ + + + {t("libraryActions.sectionProwlarr")} +

+
+
+ + +
+

{t("libraryActions.downloadDetectionScheduleDesc")}

+
+
+ {saveError && (

{saveError} diff --git a/apps/backoffice/lib/api.ts b/apps/backoffice/lib/api.ts index 638746c..894293b 100644 --- a/apps/backoffice/lib/api.ts +++ b/apps/backoffice/lib/api.ts @@ -17,6 +17,8 @@ export type LibraryDto = { reading_status_provider: string | null; reading_status_push_mode: string; next_reading_status_push_at: string | null; + download_detection_mode: string; + next_download_detection_at: string | null; }; export type IndexJobDto = { @@ -301,12 +303,14 @@ export async function updateLibraryMonitoring( scanMode: string, watcherEnabled?: boolean, metadataRefreshMode?: string, + downloadDetectionMode?: string, ) { const body: { monitor_enabled: boolean; scan_mode: string; watcher_enabled?: boolean; metadata_refresh_mode?: string; + download_detection_mode?: string; } = { monitor_enabled: monitorEnabled, scan_mode: scanMode, @@ -317,6 +321,9 @@ export async function updateLibraryMonitoring( if (metadataRefreshMode !== undefined) { body.metadata_refresh_mode = metadataRefreshMode; } + if (downloadDetectionMode !== undefined) { + body.download_detection_mode = downloadDetectionMode; + } return apiFetch(`/libraries/${libraryId}/monitoring`, { method: "PATCH", body: JSON.stringify(body), diff --git a/apps/backoffice/lib/i18n/en.ts b/apps/backoffice/lib/i18n/en.ts index ac44cbe..8353eb4 100644 --- a/apps/backoffice/lib/i18n/en.ts +++ b/apps/backoffice/lib/i18n/en.ts @@ -202,6 +202,9 @@ const en: Record = { "libraryActions.readingStatusProviderDesc": "Syncs reading states (read / reading / planned) with an external service", "libraryActions.readingStatusPushSchedule": "Auto-push schedule", "libraryActions.readingStatusPushScheduleDesc": "Automatically push reading progress to the provider on a schedule", + "libraryActions.sectionProwlarr": "Download detection", + "libraryActions.downloadDetectionSchedule": "Auto-detection schedule", + "libraryActions.downloadDetectionScheduleDesc": "Automatically run missing volume detection via Prowlarr on a schedule", // Reading status modal "readingStatus.button": "Reading status", diff --git a/apps/backoffice/lib/i18n/fr.ts b/apps/backoffice/lib/i18n/fr.ts index 9eb4200..445f89b 100644 --- a/apps/backoffice/lib/i18n/fr.ts +++ b/apps/backoffice/lib/i18n/fr.ts @@ -200,6 +200,9 @@ const fr = { "libraryActions.readingStatusProviderDesc": "Synchronise les états de lecture (lu / en cours / planifié) avec un service externe", "libraryActions.readingStatusPushSchedule": "Synchronisation automatique", "libraryActions.readingStatusPushScheduleDesc": "Pousse automatiquement la progression de lecture vers le provider selon un calendrier", + "libraryActions.sectionProwlarr": "Détection de téléchargements", + "libraryActions.downloadDetectionSchedule": "Détection automatique", + "libraryActions.downloadDetectionScheduleDesc": "Lance automatiquement la détection de volumes manquants via Prowlarr selon un calendrier", // Reading status modal "readingStatus.button": "État de lecture", diff --git a/apps/indexer/src/scheduler.rs b/apps/indexer/src/scheduler.rs index 2c4c174..65820ac 100644 --- a/apps/indexer/src/scheduler.rs +++ b/apps/indexer/src/scheduler.rs @@ -128,6 +128,75 @@ pub async fn check_and_schedule_reading_status_push(pool: &PgPool) -> Result<()> Ok(()) } +pub async fn check_and_schedule_download_detection(pool: &PgPool) -> Result<()> { + // Only schedule if Prowlarr is configured + let prowlarr_configured: bool = sqlx::query_scalar( + "SELECT EXISTS(SELECT 1 FROM settings WHERE key = 'prowlarr' AND value->>'base_url' IS NOT NULL AND value->>'base_url' != '')" + ) + .fetch_one(pool) + .await + .unwrap_or(false); + + if !prowlarr_configured { + return Ok(()); + } + + let libraries = sqlx::query( + r#" + SELECT id, download_detection_mode + FROM libraries + WHERE download_detection_mode != 'manual' + AND ( + next_download_detection_at IS NULL + OR next_download_detection_at <= NOW() + ) + AND NOT EXISTS ( + SELECT 1 FROM index_jobs + WHERE library_id = libraries.id + AND type = 'download_detection' + AND status IN ('pending', 'running') + ) + "# + ) + .fetch_all(pool) + .await?; + + for row in libraries { + let library_id: Uuid = row.get("id"); + let detection_mode: String = row.get("download_detection_mode"); + + info!("[SCHEDULER] Auto-running download detection for library {} (mode: {})", library_id, detection_mode); + + let job_id = Uuid::new_v4(); + sqlx::query( + "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, 'download_detection', 'pending')" + ) + .bind(job_id) + .bind(library_id) + .execute(pool) + .await?; + + let interval_minutes: i64 = match detection_mode.as_str() { + "hourly" => 60, + "daily" => 1440, + "weekly" => 10080, + _ => 1440, + }; + + sqlx::query( + "UPDATE libraries SET last_download_detection_at = NOW(), next_download_detection_at = NOW() + INTERVAL '1 minute' * $2 WHERE id = $1" + ) + .bind(library_id) + .bind(interval_minutes) + .execute(pool) + .await?; + + info!("[SCHEDULER] Created download_detection job {} for library {}", job_id, library_id); + } + + Ok(()) +} + pub async fn check_and_schedule_metadata_refreshes(pool: &PgPool) -> Result<()> { let libraries = sqlx::query( r#" diff --git a/apps/indexer/src/worker.rs b/apps/indexer/src/worker.rs index cc65523..f06f91a 100644 --- a/apps/indexer/src/worker.rs +++ b/apps/indexer/src/worker.rs @@ -35,6 +35,9 @@ pub async fn run_worker(state: AppState, interval_seconds: u64) { if let Err(err) = scheduler::check_and_schedule_reading_status_push(&scheduler_state.pool).await { error!("[SCHEDULER] Reading status push error: {}", err); } + if let Err(err) = scheduler::check_and_schedule_download_detection(&scheduler_state.pool).await { + error!("[SCHEDULER] Download detection error: {}", err); + } tokio::time::sleep(scheduler_wait).await; } }); diff --git a/infra/migrations/0061_add_download_detection_schedule.sql b/infra/migrations/0061_add_download_detection_schedule.sql new file mode 100644 index 0000000..676f2a2 --- /dev/null +++ b/infra/migrations/0061_add_download_detection_schedule.sql @@ -0,0 +1,3 @@ +ALTER TABLE libraries ADD COLUMN download_detection_mode VARCHAR NOT NULL DEFAULT 'manual'; +ALTER TABLE libraries ADD COLUMN next_download_detection_at TIMESTAMPTZ; +ALTER TABLE libraries ADD COLUMN last_download_detection_at TIMESTAMPTZ;