feat: add Telegram notification system with granular event toggles
Add notifications crate shared between API and indexer to send Telegram messages on scan/thumbnail/conversion completion/failure, metadata linking, batch and refresh events. Configurable via a new Notifications tab in the backoffice settings with per-event toggle switches grouped by category. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -14,6 +14,7 @@ futures = "0.3"
|
||||
image.workspace = true
|
||||
jpeg-decoder.workspace = true
|
||||
num_cpus.workspace = true
|
||||
notifications = { path = "../../crates/notifications" }
|
||||
parsers = { path = "../../crates/parsers" }
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
@@ -6,13 +6,15 @@ COPY Cargo.toml ./
|
||||
COPY apps/api/Cargo.toml apps/api/Cargo.toml
|
||||
COPY apps/indexer/Cargo.toml apps/indexer/Cargo.toml
|
||||
COPY crates/core/Cargo.toml crates/core/Cargo.toml
|
||||
COPY crates/notifications/Cargo.toml crates/notifications/Cargo.toml
|
||||
COPY crates/parsers/Cargo.toml crates/parsers/Cargo.toml
|
||||
|
||||
RUN mkdir -p apps/api/src apps/indexer/src crates/core/src crates/parsers/src && \
|
||||
RUN mkdir -p apps/api/src apps/indexer/src crates/core/src crates/notifications/src crates/parsers/src && \
|
||||
echo "fn main() {}" > apps/api/src/main.rs && \
|
||||
echo "fn main() {}" > apps/indexer/src/main.rs && \
|
||||
echo "" > apps/indexer/src/lib.rs && \
|
||||
echo "" > crates/core/src/lib.rs && \
|
||||
echo "" > crates/notifications/src/lib.rs && \
|
||||
echo "" > crates/parsers/src/lib.rs
|
||||
|
||||
# Build dependencies only (cached as long as Cargo.toml files don't change)
|
||||
@@ -25,12 +27,13 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
||||
COPY apps/api/src apps/api/src
|
||||
COPY apps/indexer/src apps/indexer/src
|
||||
COPY crates/core/src crates/core/src
|
||||
COPY crates/notifications/src crates/notifications/src
|
||||
COPY crates/parsers/src crates/parsers/src
|
||||
|
||||
RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
||||
--mount=type=cache,target=/usr/local/cargo/git \
|
||||
--mount=type=cache,target=/app/target \
|
||||
touch apps/indexer/src/main.rs crates/core/src/lib.rs crates/parsers/src/lib.rs && \
|
||||
touch apps/indexer/src/main.rs crates/core/src/lib.rs crates/notifications/src/lib.rs crates/parsers/src/lib.rs && \
|
||||
cargo build --release -p indexer && \
|
||||
cp /app/target/release/indexer /usr/local/bin/indexer
|
||||
|
||||
|
||||
@@ -328,6 +328,7 @@ pub async fn process_job(
|
||||
removed_files: 0,
|
||||
errors: 0,
|
||||
warnings: 0,
|
||||
new_series: 0,
|
||||
};
|
||||
|
||||
let mut total_processed_count = 0i32;
|
||||
|
||||
@@ -14,6 +14,7 @@ use crate::{
|
||||
utils,
|
||||
AppState,
|
||||
};
|
||||
use std::collections::HashSet;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct JobStats {
|
||||
@@ -22,6 +23,7 @@ pub struct JobStats {
|
||||
pub removed_files: usize,
|
||||
pub errors: usize,
|
||||
pub warnings: usize,
|
||||
pub new_series: usize,
|
||||
}
|
||||
|
||||
const BATCH_SIZE: usize = 100;
|
||||
@@ -106,6 +108,18 @@ pub async fn scan_library_discovery(
|
||||
HashMap::new()
|
||||
};
|
||||
|
||||
// Track existing series names for new_series counting
|
||||
let existing_series: HashSet<String> = sqlx::query_scalar(
|
||||
"SELECT DISTINCT COALESCE(NULLIF(series, ''), 'unclassified') FROM books WHERE library_id = $1",
|
||||
)
|
||||
.bind(library_id)
|
||||
.fetch_all(&state.pool)
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.collect();
|
||||
let mut seen_new_series: HashSet<String> = HashSet::new();
|
||||
|
||||
let mut seen: HashMap<String, bool> = HashMap::new();
|
||||
let mut library_processed_count = 0i32;
|
||||
let mut last_progress_update = std::time::Instant::now();
|
||||
@@ -382,6 +396,12 @@ pub async fn scan_library_discovery(
|
||||
let book_id = Uuid::new_v4();
|
||||
let file_id = Uuid::new_v4();
|
||||
|
||||
// Track new series
|
||||
let series_key = parsed.series.as_deref().unwrap_or("unclassified").to_string();
|
||||
if !existing_series.contains(&series_key) && seen_new_series.insert(series_key) {
|
||||
stats.new_series += 1;
|
||||
}
|
||||
|
||||
books_to_insert.push(BookInsert {
|
||||
book_id,
|
||||
library_id,
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
use std::time::Duration;
|
||||
use sqlx::Row;
|
||||
use tracing::{error, info, trace};
|
||||
use uuid::Uuid;
|
||||
use crate::{job, scheduler, watcher, AppState};
|
||||
|
||||
pub async fn run_worker(state: AppState, interval_seconds: u64) {
|
||||
let wait = Duration::from_secs(interval_seconds.max(1));
|
||||
|
||||
|
||||
// Cleanup stale jobs from previous runs
|
||||
if let Err(err) = job::cleanup_stale_jobs(&state.pool).await {
|
||||
error!("[CLEANUP] Failed to cleanup stale jobs: {}", err);
|
||||
@@ -34,21 +36,168 @@ pub async fn run_worker(state: AppState, interval_seconds: u64) {
|
||||
}
|
||||
});
|
||||
|
||||
async fn load_job_info(
|
||||
pool: &sqlx::PgPool,
|
||||
job_id: Uuid,
|
||||
library_id: Option<Uuid>,
|
||||
) -> (String, Option<String>, Option<String>) {
|
||||
let row = sqlx::query("SELECT type, book_id FROM index_jobs WHERE id = $1")
|
||||
.bind(job_id)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
let (job_type, book_id): (String, Option<Uuid>) = match row {
|
||||
Some(r) => (r.get("type"), r.get("book_id")),
|
||||
None => ("unknown".to_string(), None),
|
||||
};
|
||||
|
||||
let library_name: Option<String> = if let Some(lib_id) = library_id {
|
||||
sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1")
|
||||
.bind(lib_id)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let book_title: Option<String> = if let Some(bid) = book_id {
|
||||
sqlx::query_scalar("SELECT title FROM books WHERE id = $1")
|
||||
.bind(bid)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
(job_type, library_name, book_title)
|
||||
}
|
||||
|
||||
async fn load_scan_stats(pool: &sqlx::PgPool, job_id: Uuid) -> notifications::ScanStats {
|
||||
let row = sqlx::query("SELECT stats_json FROM index_jobs WHERE id = $1")
|
||||
.bind(job_id)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
if let Some(row) = row {
|
||||
if let Ok(val) = row.try_get::<serde_json::Value, _>("stats_json") {
|
||||
return notifications::ScanStats {
|
||||
scanned_files: val.get("scanned_files").and_then(|v| v.as_u64()).unwrap_or(0) as usize,
|
||||
indexed_files: val.get("indexed_files").and_then(|v| v.as_u64()).unwrap_or(0) as usize,
|
||||
removed_files: val.get("removed_files").and_then(|v| v.as_u64()).unwrap_or(0) as usize,
|
||||
new_series: val.get("new_series").and_then(|v| v.as_u64()).unwrap_or(0) as usize,
|
||||
errors: val.get("errors").and_then(|v| v.as_u64()).unwrap_or(0) as usize,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
notifications::ScanStats {
|
||||
scanned_files: 0,
|
||||
indexed_files: 0,
|
||||
removed_files: 0,
|
||||
new_series: 0,
|
||||
errors: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn build_completed_event(
|
||||
job_type: &str,
|
||||
library_name: Option<String>,
|
||||
book_title: Option<String>,
|
||||
stats: notifications::ScanStats,
|
||||
duration_seconds: u64,
|
||||
) -> notifications::NotificationEvent {
|
||||
match notifications::job_type_category(job_type) {
|
||||
"thumbnail" => notifications::NotificationEvent::ThumbnailCompleted {
|
||||
job_type: job_type.to_string(),
|
||||
library_name,
|
||||
duration_seconds,
|
||||
},
|
||||
"conversion" => notifications::NotificationEvent::ConversionCompleted {
|
||||
library_name,
|
||||
book_title,
|
||||
},
|
||||
_ => notifications::NotificationEvent::ScanCompleted {
|
||||
job_type: job_type.to_string(),
|
||||
library_name,
|
||||
stats,
|
||||
duration_seconds,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn build_failed_event(
|
||||
job_type: &str,
|
||||
library_name: Option<String>,
|
||||
book_title: Option<String>,
|
||||
error: String,
|
||||
) -> notifications::NotificationEvent {
|
||||
match notifications::job_type_category(job_type) {
|
||||
"thumbnail" => notifications::NotificationEvent::ThumbnailFailed {
|
||||
job_type: job_type.to_string(),
|
||||
library_name,
|
||||
error,
|
||||
},
|
||||
"conversion" => notifications::NotificationEvent::ConversionFailed {
|
||||
library_name,
|
||||
book_title,
|
||||
error,
|
||||
},
|
||||
_ => notifications::NotificationEvent::ScanFailed {
|
||||
job_type: job_type.to_string(),
|
||||
library_name,
|
||||
error,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
match job::claim_next_job(&state.pool).await {
|
||||
Ok(Some((job_id, library_id))) => {
|
||||
info!("[INDEXER] Starting job {} library={:?}", job_id, library_id);
|
||||
let started_at = std::time::Instant::now();
|
||||
let (job_type, library_name, book_title) =
|
||||
load_job_info(&state.pool, job_id, library_id).await;
|
||||
|
||||
if let Err(err) = job::process_job(&state, job_id, library_id).await {
|
||||
let err_str = err.to_string();
|
||||
if err_str.contains("cancelled") || err_str.contains("Cancelled") {
|
||||
info!("[INDEXER] Job {} was cancelled by user", job_id);
|
||||
// Status is already 'cancelled' in DB, don't change it
|
||||
notifications::notify(
|
||||
state.pool.clone(),
|
||||
notifications::NotificationEvent::ScanCancelled {
|
||||
job_type: job_type.clone(),
|
||||
library_name: library_name.clone(),
|
||||
},
|
||||
);
|
||||
} else {
|
||||
error!("[INDEXER] Job {} failed: {}", job_id, err);
|
||||
let _ = job::fail_job(&state.pool, job_id, &err_str).await;
|
||||
notifications::notify(
|
||||
state.pool.clone(),
|
||||
build_failed_event(&job_type, library_name.clone(), book_title.clone(), err_str),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
info!("[INDEXER] Job {} completed", job_id);
|
||||
let stats = load_scan_stats(&state.pool, job_id).await;
|
||||
notifications::notify(
|
||||
state.pool.clone(),
|
||||
build_completed_event(
|
||||
&job_type,
|
||||
library_name.clone(),
|
||||
book_title.clone(),
|
||||
stats,
|
||||
started_at.elapsed().as_secs(),
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
|
||||
Reference in New Issue
Block a user