feat: add per-library download detection auto-schedule

Adds a configurable schedule (manual/hourly/daily/weekly) for the
download detection job in the library settings modal. The indexer
scheduler triggers the job automatically, and the API job poller
processes it — consistent with the reading_status_push pattern.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-25 13:57:59 +01:00
parent 19de3ceebb
commit e0d94758af
12 changed files with 212 additions and 35 deletions

View File

@@ -119,34 +119,22 @@ pub async fn start_detection(
.flatten();
tokio::spawn(async move {
match process_download_detection(&pool, job_id, library_id).await {
Ok((total_series, found)) => {
notifications::notify(
pool,
notifications::NotificationEvent::DownloadDetectionCompleted {
library_name,
total_series,
found,
},
);
}
Err(e) => {
warn!("[DOWNLOAD_DETECTION] job {job_id} failed: {e}");
let _ = sqlx::query(
"UPDATE index_jobs SET status = 'failed', error_opt = $2, finished_at = NOW() WHERE id = $1",
)
.bind(job_id)
.bind(e.to_string())
.execute(&pool)
.await;
notifications::notify(
pool,
notifications::NotificationEvent::DownloadDetectionFailed {
library_name,
error: e.to_string(),
},
);
}
if let Err(e) = process_download_detection(&pool, job_id, library_id).await {
warn!("[DOWNLOAD_DETECTION] job {job_id} failed: {e}");
let _ = sqlx::query(
"UPDATE index_jobs SET status = 'failed', error_opt = $2, finished_at = NOW() WHERE id = $1",
)
.bind(job_id)
.bind(e.to_string())
.execute(&pool)
.await;
notifications::notify(
pool,
notifications::NotificationEvent::DownloadDetectionFailed {
library_name,
error: e.to_string(),
},
);
}
});
@@ -500,6 +488,22 @@ pub(crate) async fn process_download_detection(
"[DOWNLOAD_DETECTION] job={job_id} completed: {total} series, found={count_found}, not_found={count_not_found}, no_missing={count_no_missing}, no_metadata={count_no_metadata}, errors={count_errors}"
);
let library_name: Option<String> = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1")
.bind(library_id)
.fetch_optional(pool)
.await
.ok()
.flatten();
notifications::notify(
pool.clone(),
notifications::NotificationEvent::DownloadDetectionCompleted {
library_name,
total_series: total,
found: count_found,
},
);
Ok((total, count_found))
}

View File

@@ -4,7 +4,7 @@ use sqlx::{PgPool, Row};
use tracing::{error, info, trace};
use uuid::Uuid;
use crate::{metadata_batch, metadata_refresh, reading_status_push};
use crate::{download_detection, metadata_batch, metadata_refresh, reading_status_push};
/// Poll for pending API-only jobs (`metadata_batch`, `metadata_refresh`) and process them.
/// This mirrors the indexer's worker loop but for job types handled by the API.
@@ -51,6 +51,15 @@ pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) {
)
.await
}
"download_detection" => {
download_detection::process_download_detection(
&pool_clone,
job_id,
library_id,
)
.await
.map(|_| ())
}
_ => Err(format!("Unknown API job type: {job_type}")),
};
@@ -92,6 +101,15 @@ pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) {
},
);
}
"download_detection" => {
notifications::notify(
pool_clone,
notifications::NotificationEvent::DownloadDetectionFailed {
library_name,
error: e.to_string(),
},
);
}
_ => {}
}
}
@@ -109,7 +127,7 @@ pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) {
}
}
const API_JOB_TYPES: &[&str] = &["metadata_batch", "metadata_refresh", "reading_status_push"];
const API_JOB_TYPES: &[&str] = &["metadata_batch", "metadata_refresh", "reading_status_push", "download_detection"];
async fn claim_next_api_job(pool: &PgPool) -> Result<Option<(Uuid, String, Uuid)>, sqlx::Error> {
let mut tx = pool.begin().await?;

View File

@@ -34,6 +34,9 @@ pub struct LibraryResponse {
pub reading_status_push_mode: String,
#[schema(value_type = Option<String>)]
pub next_reading_status_push_at: Option<chrono::DateTime<chrono::Utc>>,
pub download_detection_mode: String,
#[schema(value_type = Option<String>)]
pub next_download_detection_at: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Deserialize, ToSchema)]
@@ -57,7 +60,7 @@ pub struct CreateLibraryRequest {
)]
pub async fn list_libraries(State(state): State<AppState>) -> Result<Json<Vec<LibraryResponse>>, ApiError> {
let rows = sqlx::query(
"SELECT l.id, l.name, l.root_path, l.enabled, l.monitor_enabled, l.scan_mode, l.next_scan_at, l.watcher_enabled, l.metadata_provider, l.fallback_metadata_provider, l.metadata_refresh_mode, l.next_metadata_refresh_at, l.reading_status_provider, l.reading_status_push_mode, l.next_reading_status_push_at,
"SELECT l.id, l.name, l.root_path, l.enabled, l.monitor_enabled, l.scan_mode, l.next_scan_at, l.watcher_enabled, l.metadata_provider, l.fallback_metadata_provider, l.metadata_refresh_mode, l.next_metadata_refresh_at, l.reading_status_provider, l.reading_status_push_mode, l.next_reading_status_push_at, l.download_detection_mode, l.next_download_detection_at,
(SELECT COUNT(*) FROM books b WHERE b.library_id = l.id) as book_count,
(SELECT COUNT(DISTINCT COALESCE(NULLIF(b.series, ''), 'unclassified')) FROM books b WHERE b.library_id = l.id) as series_count,
COALESCE((
@@ -99,6 +102,8 @@ pub async fn list_libraries(State(state): State<AppState>) -> Result<Json<Vec<Li
reading_status_provider: row.get("reading_status_provider"),
reading_status_push_mode: row.get("reading_status_push_mode"),
next_reading_status_push_at: row.get("next_reading_status_push_at"),
download_detection_mode: row.get("download_detection_mode"),
next_download_detection_at: row.get("next_download_detection_at"),
})
.collect();
@@ -159,6 +164,8 @@ pub async fn create_library(
reading_status_provider: None,
reading_status_push_mode: "manual".to_string(),
next_reading_status_push_at: None,
download_detection_mode: "manual".to_string(),
next_download_detection_at: None,
}))
}
@@ -281,6 +288,8 @@ pub struct UpdateMonitoringRequest {
pub watcher_enabled: Option<bool>,
#[schema(value_type = Option<String>, example = "daily")]
pub metadata_refresh_mode: Option<String>, // 'manual', 'hourly', 'daily', 'weekly'
#[schema(value_type = Option<String>, example = "daily")]
pub download_detection_mode: Option<String>, // 'manual', 'hourly', 'daily', 'weekly'
}
/// Update monitoring settings for a library
@@ -317,6 +326,12 @@ pub async fn update_monitoring(
return Err(ApiError::bad_request("metadata_refresh_mode must be one of: manual, hourly, daily, weekly"));
}
// Validate download_detection_mode
let download_detection_mode = input.download_detection_mode.as_deref().unwrap_or("manual");
if !valid_modes.contains(&download_detection_mode) {
return Err(ApiError::bad_request("download_detection_mode must be one of: manual, hourly, daily, weekly"));
}
// Calculate next_scan_at if monitoring is enabled
let next_scan_at = if input.monitor_enabled {
let interval_minutes = match input.scan_mode.as_str() {
@@ -343,10 +358,23 @@ pub async fn update_monitoring(
None
};
// Calculate next_download_detection_at
let next_download_detection_at = if download_detection_mode != "manual" {
let interval_minutes = match download_detection_mode {
"hourly" => 60,
"daily" => 1440,
"weekly" => 10080,
_ => 1440,
};
Some(chrono::Utc::now() + chrono::Duration::minutes(interval_minutes))
} else {
None
};
let watcher_enabled = input.watcher_enabled.unwrap_or(false);
let result = sqlx::query(
"UPDATE libraries SET monitor_enabled = $2, scan_mode = $3, next_scan_at = $4, watcher_enabled = $5, metadata_refresh_mode = $6, next_metadata_refresh_at = $7 WHERE id = $1 RETURNING id, name, root_path, enabled, monitor_enabled, scan_mode, next_scan_at, watcher_enabled, metadata_provider, fallback_metadata_provider, metadata_refresh_mode, next_metadata_refresh_at, reading_status_provider, reading_status_push_mode, next_reading_status_push_at"
"UPDATE libraries SET monitor_enabled = $2, scan_mode = $3, next_scan_at = $4, watcher_enabled = $5, metadata_refresh_mode = $6, next_metadata_refresh_at = $7, download_detection_mode = $8, next_download_detection_at = $9 WHERE id = $1 RETURNING id, name, root_path, enabled, monitor_enabled, scan_mode, next_scan_at, watcher_enabled, metadata_provider, fallback_metadata_provider, metadata_refresh_mode, next_metadata_refresh_at, reading_status_provider, reading_status_push_mode, next_reading_status_push_at, download_detection_mode, next_download_detection_at"
)
.bind(library_id)
.bind(input.monitor_enabled)
@@ -355,6 +383,8 @@ pub async fn update_monitoring(
.bind(watcher_enabled)
.bind(metadata_refresh_mode)
.bind(next_metadata_refresh_at)
.bind(download_detection_mode)
.bind(next_download_detection_at)
.fetch_optional(&state.pool)
.await?;
@@ -402,6 +432,8 @@ pub async fn update_monitoring(
reading_status_provider: row.get("reading_status_provider"),
reading_status_push_mode: row.get("reading_status_push_mode"),
next_reading_status_push_at: row.get("next_reading_status_push_at"),
download_detection_mode: row.get("download_detection_mode"),
next_download_detection_at: row.get("next_download_detection_at"),
}))
}
@@ -437,7 +469,7 @@ pub async fn update_metadata_provider(
let fallback = input.fallback_metadata_provider.as_deref().filter(|s| !s.is_empty());
let result = sqlx::query(
"UPDATE libraries SET metadata_provider = $2, fallback_metadata_provider = $3 WHERE id = $1 RETURNING id, name, root_path, enabled, monitor_enabled, scan_mode, next_scan_at, watcher_enabled, metadata_provider, fallback_metadata_provider, metadata_refresh_mode, next_metadata_refresh_at, reading_status_provider, reading_status_push_mode, next_reading_status_push_at"
"UPDATE libraries SET metadata_provider = $2, fallback_metadata_provider = $3 WHERE id = $1 RETURNING id, name, root_path, enabled, monitor_enabled, scan_mode, next_scan_at, watcher_enabled, metadata_provider, fallback_metadata_provider, metadata_refresh_mode, next_metadata_refresh_at, reading_status_provider, reading_status_push_mode, next_reading_status_push_at, download_detection_mode, next_download_detection_at"
)
.bind(library_id)
.bind(provider)
@@ -489,6 +521,8 @@ pub async fn update_metadata_provider(
reading_status_provider: row.get("reading_status_provider"),
reading_status_push_mode: row.get("reading_status_push_mode"),
next_reading_status_push_at: row.get("next_reading_status_push_at"),
download_detection_mode: row.get("download_detection_mode"),
next_download_detection_at: row.get("next_download_detection_at"),
}))
}