feat: add reading_status_push auto-refresh schedule per library
- Migration 0059: reading_status_push_mode / last / next columns on libraries - API: update_reading_status_provider accepts push_mode and calculates next_push_at - job_poller: handles reading_status_push pending jobs - Indexer scheduler: check_and_schedule_reading_status_push every minute - Backoffice: schedule select in library settings modal Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -4,7 +4,7 @@ use sqlx::{PgPool, Row};
|
||||
use tracing::{error, info, trace};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{metadata_batch, metadata_refresh};
|
||||
use crate::{metadata_batch, metadata_refresh, reading_status_push};
|
||||
|
||||
/// Poll for pending API-only jobs (`metadata_batch`, `metadata_refresh`) and process them.
|
||||
/// This mirrors the indexer's worker loop but for job types handled by the API.
|
||||
@@ -43,6 +43,14 @@ pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) {
|
||||
)
|
||||
.await
|
||||
}
|
||||
"reading_status_push" => {
|
||||
reading_status_push::process_reading_status_push(
|
||||
&pool_clone,
|
||||
job_id,
|
||||
library_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
_ => Err(format!("Unknown API job type: {job_type}")),
|
||||
};
|
||||
|
||||
@@ -75,6 +83,15 @@ pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) {
|
||||
},
|
||||
);
|
||||
}
|
||||
"reading_status_push" => {
|
||||
notifications::notify(
|
||||
pool_clone,
|
||||
notifications::NotificationEvent::ReadingStatusPushFailed {
|
||||
library_name,
|
||||
error: e.to_string(),
|
||||
},
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -92,7 +109,7 @@ pub async fn run_job_poller(pool: PgPool, interval_seconds: u64) {
|
||||
}
|
||||
}
|
||||
|
||||
const API_JOB_TYPES: &[&str] = &["metadata_batch", "metadata_refresh"];
|
||||
const API_JOB_TYPES: &[&str] = &["metadata_batch", "metadata_refresh", "reading_status_push"];
|
||||
|
||||
async fn claim_next_api_job(pool: &PgPool) -> Result<Option<(Uuid, String, Uuid)>, sqlx::Error> {
|
||||
let mut tx = pool.begin().await?;
|
||||
|
||||
@@ -31,6 +31,9 @@ pub struct LibraryResponse {
|
||||
#[schema(value_type = Vec<String>)]
|
||||
pub thumbnail_book_ids: Vec<Uuid>,
|
||||
pub reading_status_provider: Option<String>,
|
||||
pub reading_status_push_mode: String,
|
||||
#[schema(value_type = Option<String>)]
|
||||
pub next_reading_status_push_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, ToSchema)]
|
||||
@@ -54,7 +57,7 @@ pub struct CreateLibraryRequest {
|
||||
)]
|
||||
pub async fn list_libraries(State(state): State<AppState>) -> Result<Json<Vec<LibraryResponse>>, ApiError> {
|
||||
let rows = sqlx::query(
|
||||
"SELECT l.id, l.name, l.root_path, l.enabled, l.monitor_enabled, l.scan_mode, l.next_scan_at, l.watcher_enabled, l.metadata_provider, l.fallback_metadata_provider, l.metadata_refresh_mode, l.next_metadata_refresh_at, l.reading_status_provider,
|
||||
"SELECT l.id, l.name, l.root_path, l.enabled, l.monitor_enabled, l.scan_mode, l.next_scan_at, l.watcher_enabled, l.metadata_provider, l.fallback_metadata_provider, l.metadata_refresh_mode, l.next_metadata_refresh_at, l.reading_status_provider, l.reading_status_push_mode, l.next_reading_status_push_at,
|
||||
(SELECT COUNT(*) FROM books b WHERE b.library_id = l.id) as book_count,
|
||||
(SELECT COUNT(DISTINCT COALESCE(NULLIF(b.series, ''), 'unclassified')) FROM books b WHERE b.library_id = l.id) as series_count,
|
||||
COALESCE((
|
||||
@@ -94,6 +97,8 @@ pub async fn list_libraries(State(state): State<AppState>) -> Result<Json<Vec<Li
|
||||
next_metadata_refresh_at: row.get("next_metadata_refresh_at"),
|
||||
thumbnail_book_ids: row.get("thumbnail_book_ids"),
|
||||
reading_status_provider: row.get("reading_status_provider"),
|
||||
reading_status_push_mode: row.get("reading_status_push_mode"),
|
||||
next_reading_status_push_at: row.get("next_reading_status_push_at"),
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -152,6 +157,8 @@ pub async fn create_library(
|
||||
next_metadata_refresh_at: None,
|
||||
thumbnail_book_ids: vec![],
|
||||
reading_status_provider: None,
|
||||
reading_status_push_mode: "manual".to_string(),
|
||||
next_reading_status_push_at: None,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -339,7 +346,7 @@ pub async fn update_monitoring(
|
||||
let watcher_enabled = input.watcher_enabled.unwrap_or(false);
|
||||
|
||||
let result = sqlx::query(
|
||||
"UPDATE libraries SET monitor_enabled = $2, scan_mode = $3, next_scan_at = $4, watcher_enabled = $5, metadata_refresh_mode = $6, next_metadata_refresh_at = $7 WHERE id = $1 RETURNING id, name, root_path, enabled, monitor_enabled, scan_mode, next_scan_at, watcher_enabled, metadata_provider, fallback_metadata_provider, metadata_refresh_mode, next_metadata_refresh_at, reading_status_provider"
|
||||
"UPDATE libraries SET monitor_enabled = $2, scan_mode = $3, next_scan_at = $4, watcher_enabled = $5, metadata_refresh_mode = $6, next_metadata_refresh_at = $7 WHERE id = $1 RETURNING id, name, root_path, enabled, monitor_enabled, scan_mode, next_scan_at, watcher_enabled, metadata_provider, fallback_metadata_provider, metadata_refresh_mode, next_metadata_refresh_at, reading_status_provider, reading_status_push_mode, next_reading_status_push_at"
|
||||
)
|
||||
.bind(library_id)
|
||||
.bind(input.monitor_enabled)
|
||||
@@ -393,6 +400,8 @@ pub async fn update_monitoring(
|
||||
next_metadata_refresh_at: row.get("next_metadata_refresh_at"),
|
||||
thumbnail_book_ids,
|
||||
reading_status_provider: row.get("reading_status_provider"),
|
||||
reading_status_push_mode: row.get("reading_status_push_mode"),
|
||||
next_reading_status_push_at: row.get("next_reading_status_push_at"),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -428,7 +437,7 @@ pub async fn update_metadata_provider(
|
||||
let fallback = input.fallback_metadata_provider.as_deref().filter(|s| !s.is_empty());
|
||||
|
||||
let result = sqlx::query(
|
||||
"UPDATE libraries SET metadata_provider = $2, fallback_metadata_provider = $3 WHERE id = $1 RETURNING id, name, root_path, enabled, monitor_enabled, scan_mode, next_scan_at, watcher_enabled, metadata_provider, fallback_metadata_provider, metadata_refresh_mode, next_metadata_refresh_at, reading_status_provider"
|
||||
"UPDATE libraries SET metadata_provider = $2, fallback_metadata_provider = $3 WHERE id = $1 RETURNING id, name, root_path, enabled, monitor_enabled, scan_mode, next_scan_at, watcher_enabled, metadata_provider, fallback_metadata_provider, metadata_refresh_mode, next_metadata_refresh_at, reading_status_provider, reading_status_push_mode, next_reading_status_push_at"
|
||||
)
|
||||
.bind(library_id)
|
||||
.bind(provider)
|
||||
@@ -478,12 +487,15 @@ pub async fn update_metadata_provider(
|
||||
next_metadata_refresh_at: row.get("next_metadata_refresh_at"),
|
||||
thumbnail_book_ids,
|
||||
reading_status_provider: row.get("reading_status_provider"),
|
||||
reading_status_push_mode: row.get("reading_status_push_mode"),
|
||||
next_reading_status_push_at: row.get("next_reading_status_push_at"),
|
||||
}))
|
||||
}
|
||||
|
||||
#[derive(Deserialize, ToSchema)]
|
||||
pub struct UpdateReadingStatusProviderRequest {
|
||||
pub reading_status_provider: Option<String>,
|
||||
pub reading_status_push_mode: Option<String>,
|
||||
}
|
||||
|
||||
/// Update the reading status provider for a library
|
||||
@@ -506,15 +518,41 @@ pub async fn update_reading_status_provider(
|
||||
Json(input): Json<UpdateReadingStatusProviderRequest>,
|
||||
) -> Result<Json<serde_json::Value>, ApiError> {
|
||||
let provider = input.reading_status_provider.as_deref().filter(|s| !s.is_empty());
|
||||
let result = sqlx::query("UPDATE libraries SET reading_status_provider = $2 WHERE id = $1")
|
||||
.bind(library_id)
|
||||
.bind(provider)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
let valid_modes = ["manual", "hourly", "daily", "weekly"];
|
||||
let push_mode = input.reading_status_push_mode.as_deref().unwrap_or("manual");
|
||||
if !valid_modes.contains(&push_mode) {
|
||||
return Err(ApiError::bad_request("reading_status_push_mode must be one of: manual, hourly, daily, weekly"));
|
||||
}
|
||||
|
||||
let next_push_at = if push_mode != "manual" {
|
||||
let interval_minutes: i64 = match push_mode {
|
||||
"hourly" => 60,
|
||||
"daily" => 1440,
|
||||
"weekly" => 10080,
|
||||
_ => 1440,
|
||||
};
|
||||
Some(chrono::Utc::now() + chrono::Duration::minutes(interval_minutes))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let result = sqlx::query(
|
||||
"UPDATE libraries SET reading_status_provider = $2, reading_status_push_mode = $3, next_reading_status_push_at = $4 WHERE id = $1"
|
||||
)
|
||||
.bind(library_id)
|
||||
.bind(provider)
|
||||
.bind(push_mode)
|
||||
.bind(next_push_at)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
if result.rows_affected() == 0 {
|
||||
return Err(ApiError::not_found("library not found"));
|
||||
}
|
||||
|
||||
Ok(Json(serde_json::json!({ "reading_status_provider": provider })))
|
||||
Ok(Json(serde_json::json!({
|
||||
"reading_status_provider": provider,
|
||||
"reading_status_push_mode": push_mode,
|
||||
})))
|
||||
}
|
||||
|
||||
@@ -296,7 +296,7 @@ struct SeriesInfo {
|
||||
anilist_url: Option<String>,
|
||||
}
|
||||
|
||||
async fn process_reading_status_push(
|
||||
pub async fn process_reading_status_push(
|
||||
pool: &PgPool,
|
||||
job_id: Uuid,
|
||||
library_id: Uuid,
|
||||
|
||||
Reference in New Issue
Block a user