use axum::{extract::State, Json}; use serde::{Deserialize, Serialize}; use sqlx::{PgPool, Row}; use tracing::{info, warn}; use utoipa::ToSchema; use uuid::Uuid; use crate::{error::ApiError, prowlarr, state::AppState}; // --------------------------------------------------------------------------- // DTOs // --------------------------------------------------------------------------- #[derive(Deserialize, ToSchema)] pub struct StartDownloadDetectionRequest { pub library_id: Option, } #[derive(Serialize, ToSchema)] pub struct DownloadDetectionReportDto { #[schema(value_type = String)] pub job_id: Uuid, pub status: String, pub total_series: i64, pub found: i64, pub not_found: i64, pub no_missing: i64, pub no_metadata: i64, pub errors: i64, } #[derive(Serialize, ToSchema)] pub struct DownloadDetectionResultDto { #[schema(value_type = String)] pub id: Uuid, pub series_name: String, /// 'found' | 'not_found' | 'no_missing' | 'no_metadata' | 'error' pub status: String, pub missing_count: i32, pub available_releases: Option>, pub error_message: Option, } #[derive(Serialize, Deserialize, ToSchema)] pub struct AvailableReleaseDto { pub title: String, pub size: i64, pub download_url: Option, pub indexer: Option, pub seeders: Option, pub matched_missing_volumes: Vec, #[serde(default)] pub all_volumes: Vec, } // --------------------------------------------------------------------------- // POST /download-detection/start // --------------------------------------------------------------------------- #[utoipa::path( post, path = "/download-detection/start", tag = "download_detection", request_body = StartDownloadDetectionRequest, responses( (status = 200, description = "Job created"), (status = 400, description = "Bad request"), ), security(("Bearer" = [])) )] pub async fn start_detection( State(state): State, Json(body): Json, ) -> Result, ApiError> { // All libraries case if body.library_id.is_none() { prowlarr::check_prowlarr_configured(&state.pool).await?; let library_ids: Vec = sqlx::query_scalar( "SELECT id FROM libraries ORDER BY name" ) .fetch_all(&state.pool) .await?; let mut last_job_id: Option = None; for library_id in library_ids { let existing: Option = sqlx::query_scalar( "SELECT id FROM index_jobs WHERE library_id = $1 AND type = 'download_detection' AND status IN ('pending', 'running') LIMIT 1", ) .bind(library_id) .fetch_optional(&state.pool) .await?; if existing.is_some() { continue; } let job_id = Uuid::new_v4(); sqlx::query( "INSERT INTO index_jobs (id, library_id, type, status, started_at) VALUES ($1, $2, 'download_detection', 'running', NOW())", ) .bind(job_id) .bind(library_id) .execute(&state.pool) .await?; let pool = state.pool.clone(); let library_name: Option = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") .bind(library_id) .fetch_optional(&state.pool) .await .ok() .flatten(); tokio::spawn(async move { if let Err(e) = process_download_detection(&pool, job_id, library_id).await { warn!("[DOWNLOAD_DETECTION] job {job_id} failed: {e}"); let _ = sqlx::query( "UPDATE index_jobs SET status = 'failed', error_opt = $2, finished_at = NOW() WHERE id = $1", ) .bind(job_id) .bind(e.to_string()) .execute(&pool) .await; notifications::notify( pool, notifications::NotificationEvent::DownloadDetectionFailed { library_name, error: e.to_string(), }, ); } }); last_job_id = Some(job_id); } return Ok(Json(serde_json::json!({ "id": last_job_id.map(|id| id.to_string()), "status": "started", }))); } let library_id: Uuid = body .library_id .unwrap() .parse() .map_err(|_| ApiError::bad_request("invalid library_id"))?; // Verify library exists sqlx::query("SELECT id FROM libraries WHERE id = $1") .bind(library_id) .fetch_optional(&state.pool) .await? .ok_or_else(|| ApiError::not_found("library not found"))?; // Verify Prowlarr is configured prowlarr::check_prowlarr_configured(&state.pool).await?; // Check no existing running job for this library let existing: Option = sqlx::query_scalar( "SELECT id FROM index_jobs WHERE library_id = $1 AND type = 'download_detection' AND status IN ('pending', 'running') LIMIT 1", ) .bind(library_id) .fetch_optional(&state.pool) .await?; if let Some(existing_id) = existing { return Ok(Json(serde_json::json!({ "id": existing_id.to_string(), "status": "already_running", }))); } let job_id = Uuid::new_v4(); sqlx::query( "INSERT INTO index_jobs (id, library_id, type, status, started_at) VALUES ($1, $2, 'download_detection', 'running', NOW())", ) .bind(job_id) .bind(library_id) .execute(&state.pool) .await?; let pool = state.pool.clone(); let library_name: Option = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") .bind(library_id) .fetch_optional(&state.pool) .await .ok() .flatten(); tokio::spawn(async move { if let Err(e) = process_download_detection(&pool, job_id, library_id).await { warn!("[DOWNLOAD_DETECTION] job {job_id} failed: {e}"); let _ = sqlx::query( "UPDATE index_jobs SET status = 'failed', error_opt = $2, finished_at = NOW() WHERE id = $1", ) .bind(job_id) .bind(e.to_string()) .execute(&pool) .await; notifications::notify( pool, notifications::NotificationEvent::DownloadDetectionFailed { library_name, error: e.to_string(), }, ); } }); Ok(Json(serde_json::json!({ "id": job_id.to_string(), "status": "running", }))) } // --------------------------------------------------------------------------- // GET /download-detection/:id/report // --------------------------------------------------------------------------- #[utoipa::path( get, path = "/download-detection/{id}/report", tag = "download_detection", params(("id" = String, Path, description = "Job UUID")), responses( (status = 200, body = DownloadDetectionReportDto), (status = 404, description = "Job not found"), ), security(("Bearer" = [])) )] pub async fn get_detection_report( State(state): State, axum::extract::Path(job_id): axum::extract::Path, ) -> Result, ApiError> { let row = sqlx::query( "SELECT status, total_files FROM index_jobs WHERE id = $1 AND type = 'download_detection'", ) .bind(job_id) .fetch_optional(&state.pool) .await? .ok_or_else(|| ApiError::not_found("job not found"))?; let job_status: String = row.get("status"); let total_files: Option = row.get("total_files"); let counts = sqlx::query( "SELECT status, COUNT(*) as cnt FROM download_detection_results WHERE job_id = $1 GROUP BY status", ) .bind(job_id) .fetch_all(&state.pool) .await?; let mut found = 0i64; let mut not_found = 0i64; let mut no_missing = 0i64; let mut no_metadata = 0i64; let mut errors = 0i64; for r in &counts { let status: String = r.get("status"); let cnt: i64 = r.get("cnt"); match status.as_str() { "found" => found = cnt, "not_found" => not_found = cnt, "no_missing" => no_missing = cnt, "no_metadata" => no_metadata = cnt, "error" => errors = cnt, _ => {} } } Ok(Json(DownloadDetectionReportDto { job_id, status: job_status, total_series: total_files.unwrap_or(0) as i64, found, not_found, no_missing, no_metadata, errors, })) } // --------------------------------------------------------------------------- // GET /download-detection/:id/results // --------------------------------------------------------------------------- #[derive(Deserialize)] pub struct ResultsQuery { pub status: Option, } #[utoipa::path( get, path = "/download-detection/{id}/results", tag = "download_detection", params( ("id" = String, Path, description = "Job UUID"), ("status" = Option, Query, description = "Filter by status"), ), responses( (status = 200, body = Vec), ), security(("Bearer" = [])) )] pub async fn get_detection_results( State(state): State, axum::extract::Path(job_id): axum::extract::Path, axum::extract::Query(query): axum::extract::Query, ) -> Result>, ApiError> { let rows = if let Some(status_filter) = &query.status { sqlx::query( "SELECT id, series_name, status, missing_count, available_releases, error_message FROM download_detection_results WHERE job_id = $1 AND status = $2 ORDER BY series_name", ) .bind(job_id) .bind(status_filter) .fetch_all(&state.pool) .await? } else { sqlx::query( "SELECT id, series_name, status, missing_count, available_releases, error_message FROM download_detection_results WHERE job_id = $1 ORDER BY status, series_name", ) .bind(job_id) .fetch_all(&state.pool) .await? }; let results = rows .iter() .map(|row| { let releases_json: Option = row.get("available_releases"); let available_releases = releases_json.and_then(|v| { serde_json::from_value::>(v).ok() }); DownloadDetectionResultDto { id: row.get("id"), series_name: row.get("series_name"), status: row.get("status"), missing_count: row.get("missing_count"), available_releases, error_message: row.get("error_message"), } }) .collect(); Ok(Json(results)) } // --------------------------------------------------------------------------- // GET /download-detection/latest-found // --------------------------------------------------------------------------- #[derive(Serialize, ToSchema)] pub struct LatestFoundPerLibraryDto { #[schema(value_type = String)] pub library_id: Uuid, pub library_name: String, pub results: Vec, } #[derive(Serialize, ToSchema)] pub struct AvailableDownloadDto { #[schema(value_type = String)] pub id: Uuid, pub series_name: String, pub missing_count: i32, pub available_releases: Option>, pub updated_at: String, } /// Returns available downloads per library from the `available_downloads` table. #[utoipa::path( get, path = "/download-detection/latest-found", tag = "download_detection", responses( (status = 200, body = Vec), ), security(("Bearer" = [])) )] pub async fn get_latest_found( State(state): State, ) -> Result>, ApiError> { let rows = sqlx::query( "SELECT ad.id, ad.library_id, ad.series_name, ad.missing_count, ad.available_releases, ad.updated_at, \ l.name as library_name \ FROM available_downloads ad \ JOIN libraries l ON l.id = ad.library_id \ ORDER BY l.name, ad.series_name", ) .fetch_all(&state.pool) .await?; let mut libs: std::collections::BTreeMap = std::collections::BTreeMap::new(); for row in &rows { let library_id: Uuid = row.get("library_id"); let updated_at: chrono::DateTime = row.get("updated_at"); let releases_json: Option = row.get("available_releases"); let available_releases = releases_json.and_then(|v| { serde_json::from_value::>(v).ok() }); let entry = libs.entry(library_id).or_insert_with(|| LatestFoundPerLibraryDto { library_id, library_name: row.get("library_name"), results: Vec::new(), }); entry.results.push(AvailableDownloadDto { id: row.get("id"), series_name: row.get("series_name"), missing_count: row.get("missing_count"), available_releases, updated_at: updated_at.to_rfc3339(), }); } Ok(Json(libs.into_values().collect())) } // --------------------------------------------------------------------------- // Background processing // --------------------------------------------------------------------------- pub(crate) async fn process_download_detection( pool: &PgPool, job_id: Uuid, library_id: Uuid, ) -> Result<(i32, i64), String> { let (prowlarr_url, prowlarr_api_key, categories) = prowlarr::load_prowlarr_config_internal(pool) .await .map_err(|e| e.message)?; // Fetch all series with their metadata link status let all_series: Vec = sqlx::query_scalar( r#" SELECT DISTINCT COALESCE(NULLIF(series, ''), 'unclassified') FROM books WHERE library_id = $1 ORDER BY 1 "#, ) .bind(library_id) .fetch_all(pool) .await .map_err(|e| e.to_string())?; // Clean up available_downloads for series that no longer exist in books sqlx::query( r#" DELETE FROM available_downloads WHERE library_id = $1 AND series_name NOT IN ( SELECT DISTINCT COALESCE(NULLIF(series, ''), 'unclassified') FROM books WHERE library_id = $1 ) "#, ) .bind(library_id) .execute(pool) .await .map_err(|e| e.to_string())?; let total = all_series.len() as i32; sqlx::query("UPDATE index_jobs SET total_files = $2 WHERE id = $1") .bind(job_id) .bind(total) .execute(pool) .await .map_err(|e| e.to_string())?; // Fetch approved metadata links for this library (series_name -> link_id) let links: Vec<(String, Uuid)> = sqlx::query( "SELECT series_name, id FROM external_metadata_links WHERE library_id = $1 AND status = 'approved'", ) .bind(library_id) .fetch_all(pool) .await .map_err(|e| e.to_string())? .into_iter() .map(|row| { let series_name: String = row.get("series_name"); let link_id: Uuid = row.get("id"); (series_name, link_id) }) .collect(); let link_map: std::collections::HashMap = links.into_iter().collect(); let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(30)) .build() .map_err(|e| format!("failed to build HTTP client: {e}"))?; let mut processed = 0i32; for series_name in &all_series { if is_job_cancelled(pool, job_id).await { sqlx::query( "UPDATE index_jobs SET status = 'cancelled', finished_at = NOW() WHERE id = $1", ) .bind(job_id) .execute(pool) .await .map_err(|e| e.to_string())?; return Ok((total, 0)); } processed += 1; let progress = (processed * 100 / total.max(1)).min(100); sqlx::query( "UPDATE index_jobs SET processed_files = $2, progress_percent = $3, current_file = $4 WHERE id = $1", ) .bind(job_id) .bind(processed) .bind(progress) .bind(series_name) .execute(pool) .await .ok(); // Skip unclassified if series_name == "unclassified" { insert_result(pool, job_id, library_id, series_name, "no_metadata", 0, None, None).await; continue; } // Check if this series has an approved metadata link let link_id = match link_map.get(series_name) { Some(id) => *id, None => { insert_result(pool, job_id, library_id, series_name, "no_metadata", 0, None, None).await; continue; } }; // Fetch missing books for this series let missing_rows = sqlx::query( "SELECT volume_number FROM external_book_metadata WHERE link_id = $1 AND book_id IS NULL ORDER BY volume_number NULLS LAST", ) .bind(link_id) .fetch_all(pool) .await .map_err(|e| e.to_string())?; if missing_rows.is_empty() { insert_result(pool, job_id, library_id, series_name, "no_missing", 0, None, None).await; // Series is complete, remove from available_downloads let _ = sqlx::query("DELETE FROM available_downloads WHERE library_id = $1 AND series_name = $2") .bind(library_id).bind(series_name).execute(pool).await; continue; } let missing_volumes: Vec = missing_rows .iter() .filter_map(|row| row.get::, _>("volume_number")) .collect(); let missing_count = missing_rows.len() as i32; // Search Prowlarr match search_prowlarr_for_series( &client, &prowlarr_url, &prowlarr_api_key, &categories, series_name, &missing_volumes, ) .await { Ok(matched_releases) if !matched_releases.is_empty() => { let releases_json = serde_json::to_value(&matched_releases).ok(); insert_result( pool, job_id, library_id, series_name, "found", missing_count, releases_json.clone(), None, ) .await; // UPSERT into available_downloads if let Some(ref rj) = releases_json { let _ = sqlx::query( "INSERT INTO available_downloads (library_id, series_name, missing_count, available_releases, updated_at) \ VALUES ($1, $2, $3, $4, NOW()) \ ON CONFLICT (library_id, series_name) DO UPDATE SET \ missing_count = EXCLUDED.missing_count, \ available_releases = EXCLUDED.available_releases, \ updated_at = NOW()", ) .bind(library_id) .bind(series_name) .bind(missing_count) .bind(rj) .execute(pool) .await; } } Ok(_) => { insert_result(pool, job_id, library_id, series_name, "not_found", missing_count, None, None).await; // Remove from available_downloads if previously found let _ = sqlx::query( "DELETE FROM available_downloads WHERE library_id = $1 AND series_name = $2", ) .bind(library_id) .bind(series_name) .execute(pool) .await; } Err(e) => { warn!("[DOWNLOAD_DETECTION] series '{series_name}': {e}"); insert_result(pool, job_id, library_id, series_name, "error", missing_count, None, Some(&e)).await; } } } // Build final stats let counts = sqlx::query( "SELECT status, COUNT(*) as cnt FROM download_detection_results WHERE job_id = $1 GROUP BY status", ) .bind(job_id) .fetch_all(pool) .await .map_err(|e| e.to_string())?; let mut count_found = 0i64; let mut count_not_found = 0i64; let mut count_no_missing = 0i64; let mut count_no_metadata = 0i64; let mut count_errors = 0i64; for row in &counts { let s: String = row.get("status"); let c: i64 = row.get("cnt"); match s.as_str() { "found" => count_found = c, "not_found" => count_not_found = c, "no_missing" => count_no_missing = c, "no_metadata" => count_no_metadata = c, "error" => count_errors = c, _ => {} } } let stats = serde_json::json!({ "total_series": total as i64, "found": count_found, "not_found": count_not_found, "no_missing": count_no_missing, "no_metadata": count_no_metadata, "errors": count_errors, }); sqlx::query( "UPDATE index_jobs SET status = 'success', finished_at = NOW(), stats_json = $2, progress_percent = 100 WHERE id = $1", ) .bind(job_id) .bind(&stats) .execute(pool) .await .map_err(|e| e.to_string())?; info!( "[DOWNLOAD_DETECTION] job={job_id} completed: {total} series, found={count_found}, not_found={count_not_found}, no_missing={count_no_missing}, no_metadata={count_no_metadata}, errors={count_errors}" ); let library_name: Option = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") .bind(library_id) .fetch_optional(pool) .await .ok() .flatten(); notifications::notify( pool.clone(), notifications::NotificationEvent::DownloadDetectionCompleted { library_name, total_series: total, found: count_found, }, ); Ok((total, count_found)) } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- async fn search_prowlarr_for_series( client: &reqwest::Client, url: &str, api_key: &str, categories: &[i32], series_name: &str, missing_volumes: &[i32], ) -> Result, String> { let query = format!("\"{}\"", series_name); let mut params: Vec<(&str, String)> = vec![ ("query", query), ("type", "search".to_string()), ]; for cat in categories { params.push(("categories", cat.to_string())); } let resp = client .get(format!("{url}/api/v1/search")) .query(¶ms) .header("X-Api-Key", api_key) .send() .await .map_err(|e| format!("Prowlarr request failed: {e}"))?; if !resp.status().is_success() { let status = resp.status(); let text = resp.text().await.unwrap_or_default(); return Err(format!("Prowlarr returned {status}: {text}")); } let raw_releases: Vec = resp .json() .await .map_err(|e| format!("Failed to parse Prowlarr response: {e}"))?; let matched: Vec = raw_releases .into_iter() .filter_map(|r| { let title_volumes = prowlarr::extract_volumes_from_title_pub(&r.title); let matched_vols: Vec = title_volumes .iter() .copied() .filter(|v| missing_volumes.contains(v)) .collect(); if matched_vols.is_empty() { None } else { Some(AvailableReleaseDto { title: r.title, size: r.size, download_url: r.download_url, indexer: r.indexer, seeders: r.seeders, matched_missing_volumes: matched_vols, all_volumes: title_volumes, }) } }) .collect(); Ok(matched) } #[allow(clippy::too_many_arguments)] async fn insert_result( pool: &PgPool, job_id: Uuid, library_id: Uuid, series_name: &str, status: &str, missing_count: i32, available_releases: Option, error_message: Option<&str>, ) { let _ = sqlx::query( r#" INSERT INTO download_detection_results (job_id, library_id, series_name, status, missing_count, available_releases, error_message) VALUES ($1, $2, $3, $4, $5, $6, $7) "#, ) .bind(job_id) .bind(library_id) .bind(series_name) .bind(status) .bind(missing_count) .bind(&available_releases) .bind(error_message) .execute(pool) .await; } async fn is_job_cancelled(pool: &PgPool, job_id: Uuid) -> bool { sqlx::query_scalar::<_, String>("SELECT status FROM index_jobs WHERE id = $1") .bind(job_id) .fetch_optional(pool) .await .ok() .flatten() .as_deref() == Some("cancelled") }