From 81d1586501e63639fadd4ea7315540b07d41a309 Mon Sep 17 00:00:00 2001 From: Froidefond Julien Date: Sat, 21 Mar 2026 17:24:43 +0100 Subject: [PATCH] feat: add Telegram notification system with granular event toggles Add notifications crate shared between API and indexer to send Telegram messages on scan/thumbnail/conversion completion/failure, metadata linking, batch and refresh events. Configurable via a new Notifications tab in the backoffice settings with per-event toggle switches grouped by category. Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 15 + Cargo.toml | 1 + apps/api/Cargo.toml | 1 + apps/api/Dockerfile | 7 +- apps/api/src/main.rs | 2 + apps/api/src/metadata.rs | 10 + apps/api/src/metadata_batch.rs | 28 ++ apps/api/src/metadata_refresh.rs | 29 ++ apps/api/src/telegram.rs | 46 ++ apps/backoffice/app/components/ui/Icon.tsx | 4 +- apps/backoffice/app/settings/SettingsPage.tsx | 259 +++++++++- apps/backoffice/lib/i18n/en.ts | 27 ++ apps/backoffice/lib/i18n/fr.ts | 27 ++ apps/indexer/Cargo.toml | 1 + apps/indexer/Dockerfile | 7 +- apps/indexer/src/job.rs | 1 + apps/indexer/src/scanner.rs | 20 + apps/indexer/src/worker.rs | 153 +++++- crates/notifications/Cargo.toml | 13 + crates/notifications/src/lib.rs | 442 ++++++++++++++++++ .../migrations/0048_add_telegram_settings.sql | 3 + .../0049_update_telegram_events.sql | 8 + 22 files changed, 1096 insertions(+), 8 deletions(-) create mode 100644 apps/api/src/telegram.rs create mode 100644 crates/notifications/Cargo.toml create mode 100644 crates/notifications/src/lib.rs create mode 100644 infra/migrations/0048_add_telegram_settings.sql create mode 100644 infra/migrations/0049_update_telegram_events.sql diff --git a/Cargo.lock b/Cargo.lock index 9519b73..2898d70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,7 @@ dependencies = [ "image", "jpeg-decoder", "lru", + "notifications", "parsers", "rand 0.8.5", "regex", @@ -1240,6 +1241,7 @@ dependencies = [ "futures", "image", "jpeg-decoder", + "notifications", "num_cpus", "parsers", "reqwest", @@ -1663,6 +1665,19 @@ dependencies = [ "nom", ] +[[package]] +name = "notifications" +version = "1.21.2" +dependencies = [ + "anyhow", + "reqwest", + "serde", + "serde_json", + "sqlx", + "tokio", + "tracing", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" diff --git a/Cargo.toml b/Cargo.toml index 4359bb1..f481644 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "apps/api", "apps/indexer", "crates/core", + "crates/notifications", "crates/parsers", ] resolver = "2" diff --git a/apps/api/Cargo.toml b/apps/api/Cargo.toml index d487bb6..b4ad5d2 100644 --- a/apps/api/Cargo.toml +++ b/apps/api/Cargo.toml @@ -15,6 +15,7 @@ futures = "0.3" image.workspace = true jpeg-decoder.workspace = true lru.workspace = true +notifications = { path = "../../crates/notifications" } stripstream-core = { path = "../../crates/core" } parsers = { path = "../../crates/parsers" } rand.workspace = true diff --git a/apps/api/Dockerfile b/apps/api/Dockerfile index ec50b55..643ba01 100644 --- a/apps/api/Dockerfile +++ b/apps/api/Dockerfile @@ -6,13 +6,15 @@ COPY Cargo.toml ./ COPY apps/api/Cargo.toml apps/api/Cargo.toml COPY apps/indexer/Cargo.toml apps/indexer/Cargo.toml COPY crates/core/Cargo.toml crates/core/Cargo.toml +COPY crates/notifications/Cargo.toml crates/notifications/Cargo.toml COPY crates/parsers/Cargo.toml crates/parsers/Cargo.toml -RUN mkdir -p apps/api/src apps/indexer/src crates/core/src crates/parsers/src && \ +RUN mkdir -p apps/api/src apps/indexer/src crates/core/src crates/notifications/src crates/parsers/src && \ echo "fn main() {}" > apps/api/src/main.rs && \ echo "fn main() {}" > apps/indexer/src/main.rs && \ echo "" > apps/indexer/src/lib.rs && \ echo "" > crates/core/src/lib.rs && \ + echo "" > crates/notifications/src/lib.rs && \ echo "" > crates/parsers/src/lib.rs # Build dependencies only (cached as long as Cargo.toml files don't change) @@ -26,12 +28,13 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ COPY apps/api/src apps/api/src COPY apps/indexer/src apps/indexer/src COPY crates/core/src crates/core/src +COPY crates/notifications/src crates/notifications/src COPY crates/parsers/src crates/parsers/src RUN --mount=type=cache,target=/usr/local/cargo/registry \ --mount=type=cache,target=/usr/local/cargo/git \ --mount=type=cache,target=/app/target \ - touch apps/api/src/main.rs crates/core/src/lib.rs crates/parsers/src/lib.rs && \ + touch apps/api/src/main.rs crates/core/src/lib.rs crates/notifications/src/lib.rs crates/parsers/src/lib.rs && \ cargo build --release -p api && \ cp /app/target/release/api /usr/local/bin/api diff --git a/apps/api/src/main.rs b/apps/api/src/main.rs index 2204ff4..a111043 100644 --- a/apps/api/src/main.rs +++ b/apps/api/src/main.rs @@ -21,6 +21,7 @@ mod series; mod settings; mod state; mod stats; +mod telegram; mod thumbnails; mod tokens; @@ -111,6 +112,7 @@ async fn main() -> anyhow::Result<()> { .route("/prowlarr/test", get(prowlarr::test_prowlarr)) .route("/qbittorrent/add", axum::routing::post(qbittorrent::add_torrent)) .route("/qbittorrent/test", get(qbittorrent::test_qbittorrent)) + .route("/telegram/test", get(telegram::test_telegram)) .route("/komga/sync", axum::routing::post(komga::sync_komga_read_books)) .route("/komga/reports", get(komga::list_sync_reports)) .route("/komga/reports/:id", get(komga::get_sync_report)) diff --git a/apps/api/src/metadata.rs b/apps/api/src/metadata.rs index 75ad9f7..b4c3e9e 100644 --- a/apps/api/src/metadata.rs +++ b/apps/api/src/metadata.rs @@ -369,6 +369,16 @@ pub async fn approve_metadata( .await?; } + // Notify via Telegram + let provider_for_notif: String = row.get("provider"); + notifications::notify( + state.pool.clone(), + notifications::NotificationEvent::MetadataApproved { + series_name: series_name.clone(), + provider: provider_for_notif, + }, + ); + Ok(Json(ApproveResponse { status: "approved".to_string(), report, diff --git a/apps/api/src/metadata_batch.rs b/apps/api/src/metadata_batch.rs index 9506c0e..fff4a2b 100644 --- a/apps/api/src/metadata_batch.rs +++ b/apps/api/src/metadata_batch.rs @@ -124,6 +124,12 @@ pub async fn start_batch( // Spawn the background processing task let pool = state.pool.clone(); + let library_name: Option = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") + .bind(library_id) + .fetch_optional(&state.pool) + .await + .ok() + .flatten(); tokio::spawn(async move { if let Err(e) = process_metadata_batch(&pool, job_id, library_id).await { warn!("[METADATA_BATCH] job {job_id} failed: {e}"); @@ -134,6 +140,13 @@ pub async fn start_batch( .bind(e.to_string()) .execute(&pool) .await; + notifications::notify( + pool.clone(), + notifications::NotificationEvent::MetadataBatchFailed { + library_name, + error: e.to_string(), + }, + ); } }); @@ -621,6 +634,21 @@ async fn process_metadata_batch( info!("[METADATA_BATCH] job={job_id} completed: {processed}/{total} series processed"); + let library_name: Option = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") + .bind(library_id) + .fetch_optional(pool) + .await + .ok() + .flatten(); + notifications::notify( + pool.clone(), + notifications::NotificationEvent::MetadataBatchCompleted { + library_name, + total_series: total, + processed, + }, + ); + Ok(()) } diff --git a/apps/api/src/metadata_refresh.rs b/apps/api/src/metadata_refresh.rs index fb950ed..5cfc9b4 100644 --- a/apps/api/src/metadata_refresh.rs +++ b/apps/api/src/metadata_refresh.rs @@ -133,6 +133,12 @@ pub async fn start_refresh( // Spawn the background processing task let pool = state.pool.clone(); + let library_name: Option = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") + .bind(library_id) + .fetch_optional(&state.pool) + .await + .ok() + .flatten(); tokio::spawn(async move { if let Err(e) = process_metadata_refresh(&pool, job_id, library_id).await { warn!("[METADATA_REFRESH] job {job_id} failed: {e}"); @@ -143,6 +149,13 @@ pub async fn start_refresh( .bind(e.to_string()) .execute(&pool) .await; + notifications::notify( + pool.clone(), + notifications::NotificationEvent::MetadataRefreshFailed { + library_name, + error: e.to_string(), + }, + ); } }); @@ -319,6 +332,22 @@ async fn process_metadata_refresh( info!("[METADATA_REFRESH] job={job_id} completed: {refreshed} updated, {unchanged} unchanged, {errors} errors"); + let library_name: Option = sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") + .bind(library_id) + .fetch_optional(pool) + .await + .ok() + .flatten(); + notifications::notify( + pool.clone(), + notifications::NotificationEvent::MetadataRefreshCompleted { + library_name, + refreshed, + unchanged, + errors, + }, + ); + Ok(()) } diff --git a/apps/api/src/telegram.rs b/apps/api/src/telegram.rs new file mode 100644 index 0000000..ceabb0e --- /dev/null +++ b/apps/api/src/telegram.rs @@ -0,0 +1,46 @@ +use axum::{extract::State, Json}; +use serde::Serialize; +use utoipa::ToSchema; + +use crate::{error::ApiError, state::AppState}; + +#[derive(Serialize, ToSchema)] +pub struct TelegramTestResponse { + pub success: bool, + pub message: String, +} + +/// Test Telegram connection by sending a test message +#[utoipa::path( + get, + path = "/telegram/test", + tag = "notifications", + responses( + (status = 200, body = TelegramTestResponse), + (status = 400, description = "Telegram not configured"), + (status = 401, description = "Unauthorized"), + ), + security(("Bearer" = [])) +)] +pub async fn test_telegram( + State(state): State, +) -> Result, ApiError> { + let config = notifications::load_telegram_config(&state.pool) + .await + .ok_or_else(|| { + ApiError::bad_request( + "Telegram is not configured or disabled. Set bot_token, chat_id, and enable it.", + ) + })?; + + match notifications::send_test_message(&config).await { + Ok(()) => Ok(Json(TelegramTestResponse { + success: true, + message: "Test message sent successfully".to_string(), + })), + Err(e) => Ok(Json(TelegramTestResponse { + success: false, + message: format!("Failed to send: {e}"), + })), + } +} diff --git a/apps/backoffice/app/components/ui/Icon.tsx b/apps/backoffice/app/components/ui/Icon.tsx index 38b1bcc..22bc1c4 100644 --- a/apps/backoffice/app/components/ui/Icon.tsx +++ b/apps/backoffice/app/components/ui/Icon.tsx @@ -34,7 +34,8 @@ type IconName = | "warning" | "tag" | "document" - | "authors"; + | "authors" + | "bell"; type IconSize = "sm" | "md" | "lg" | "xl"; @@ -88,6 +89,7 @@ const icons: Record = { tag: "M7 7h.01M7 3h5a1.99 1.99 0 011.414.586l7 7a2 2 0 010 2.828l-7 7a2 2 0 01-2.828 0l-7-7A1.994 1.994 0 013 12V7a4 4 0 014-4z", document: "M9 12h6m-6 4h6m2 5H7a2 2 0 01-2-2V5a2 2 0 012-2h5.586a1 1 0 01.707.293l5.414 5.414a1 1 0 01.293.707V19a2 2 0 01-2 2z", authors: "M17 20h5v-2a3 3 0 00-5.356-1.857M17 20H7m10 0v-2c0-.656-.126-1.283-.356-1.857M7 20H2v-2a3 3 0 015.356-1.857M7 20v-2c0-.656.126-1.283.356-1.857m0 0a5.002 5.002 0 019.288 0M15 7a3 3 0 11-6 0 3 3 0 016 0zm6 3a2 2 0 11-4 0 2 2 0 014 0zM7 10a2 2 0 11-4 0 2 2 0 014 0z", + bell: "M15 17h5l-1.405-1.405A2.032 2.032 0 0118 14.158V11a6.002 6.002 0 00-4-5.659V5a2 2 0 10-4 0v.341C7.67 6.165 6 8.388 6 11v3.159c0 .538-.214 1.055-.595 1.436L4 17h5m6 0v1a3 3 0 11-6 0v-1m6 0H9", }; const colorClasses: Partial> = { diff --git a/apps/backoffice/app/settings/SettingsPage.tsx b/apps/backoffice/app/settings/SettingsPage.tsx index 769d3fe..534ab75 100644 --- a/apps/backoffice/app/settings/SettingsPage.tsx +++ b/apps/backoffice/app/settings/SettingsPage.tsx @@ -150,11 +150,12 @@ export default function SettingsPage({ initialSettings, initialCacheStats, initi } } - const [activeTab, setActiveTab] = useState<"general" | "integrations">("general"); + const [activeTab, setActiveTab] = useState<"general" | "integrations" | "notifications">("general"); const tabs = [ { id: "general" as const, label: t("settings.general"), icon: "settings" as const }, { id: "integrations" as const, label: t("settings.integrations"), icon: "refresh" as const }, + { id: "notifications" as const, label: t("settings.notifications"), icon: "bell" as const }, ]; return ( @@ -826,6 +827,11 @@ export default function SettingsPage({ initialSettings, initialCacheStats, initi )} + + {activeTab === "notifications" && (<> + {/* Telegram Notifications */} + + )} ); } @@ -1480,3 +1486,254 @@ function QBittorrentCard({ handleUpdateSetting }: { handleUpdateSetting: (key: s ); } + +// --------------------------------------------------------------------------- +// Telegram Notifications sub-component +// --------------------------------------------------------------------------- + +const DEFAULT_EVENTS = { + scan_completed: true, + scan_failed: true, + scan_cancelled: true, + thumbnail_completed: true, + thumbnail_failed: true, + conversion_completed: true, + conversion_failed: true, + metadata_approved: true, + metadata_batch_completed: true, + metadata_batch_failed: true, + metadata_refresh_completed: true, + metadata_refresh_failed: true, +}; + +function TelegramCard({ handleUpdateSetting }: { handleUpdateSetting: (key: string, value: unknown) => Promise }) { + const { t } = useTranslation(); + const [botToken, setBotToken] = useState(""); + const [chatId, setChatId] = useState(""); + const [enabled, setEnabled] = useState(false); + const [events, setEvents] = useState(DEFAULT_EVENTS); + const [isTesting, setIsTesting] = useState(false); + const [testResult, setTestResult] = useState<{ success: boolean; message: string } | null>(null); + const [showHelp, setShowHelp] = useState(false); + + useEffect(() => { + fetch("/api/settings/telegram") + .then((r) => (r.ok ? r.json() : null)) + .then((data) => { + if (data) { + if (data.bot_token) setBotToken(data.bot_token); + if (data.chat_id) setChatId(data.chat_id); + if (data.enabled !== undefined) setEnabled(data.enabled); + if (data.events) setEvents({ ...DEFAULT_EVENTS, ...data.events }); + } + }) + .catch(() => {}); + }, []); + + function saveTelegram(token?: string, chat?: string, en?: boolean, ev?: typeof events) { + handleUpdateSetting("telegram", { + bot_token: token ?? botToken, + chat_id: chat ?? chatId, + enabled: en ?? enabled, + events: ev ?? events, + }); + } + + async function handleTestConnection() { + setIsTesting(true); + setTestResult(null); + try { + const resp = await fetch("/api/telegram/test"); + const data = await resp.json(); + if (data.error) { + setTestResult({ success: false, message: data.error }); + } else { + setTestResult(data); + } + } catch { + setTestResult({ success: false, message: "Failed to connect" }); + } finally { + setIsTesting(false); + } + } + + return ( + + + + + {t("settings.telegram")} + + {t("settings.telegramDesc")} + + +
+ {/* Setup guide */} +
+ + {showHelp && ( +
+
+

1. Bot Token

+

+

+
+

2. Chat ID

+

+

+
+

3. Group chat

+

+

+
+ )} +
+ +
+ + {t("settings.telegramEnabled")} +
+ + + + + setBotToken(e.target.value)} + onBlur={() => saveTelegram()} + /> + + + + + + setChatId(e.target.value)} + onBlur={() => saveTelegram()} + /> + + + + {/* Event toggles grouped by category */} +
+

{t("settings.telegramEvents")}

+
+ {([ + { + category: t("settings.eventCategoryScan"), + icon: "search" as const, + items: [ + { key: "scan_completed" as const, label: t("settings.eventCompleted") }, + { key: "scan_failed" as const, label: t("settings.eventFailed") }, + { key: "scan_cancelled" as const, label: t("settings.eventCancelled") }, + ], + }, + { + category: t("settings.eventCategoryThumbnail"), + icon: "image" as const, + items: [ + { key: "thumbnail_completed" as const, label: t("settings.eventCompleted") }, + { key: "thumbnail_failed" as const, label: t("settings.eventFailed") }, + ], + }, + { + category: t("settings.eventCategoryConversion"), + icon: "refresh" as const, + items: [ + { key: "conversion_completed" as const, label: t("settings.eventCompleted") }, + { key: "conversion_failed" as const, label: t("settings.eventFailed") }, + ], + }, + { + category: t("settings.eventCategoryMetadata"), + icon: "tag" as const, + items: [ + { key: "metadata_approved" as const, label: t("settings.eventLinked") }, + { key: "metadata_batch_completed" as const, label: t("settings.eventBatchCompleted") }, + { key: "metadata_batch_failed" as const, label: t("settings.eventBatchFailed") }, + { key: "metadata_refresh_completed" as const, label: t("settings.eventRefreshCompleted") }, + { key: "metadata_refresh_failed" as const, label: t("settings.eventRefreshFailed") }, + ], + }, + ]).map(({ category, icon, items }) => ( +
+

+ + {category} +

+
+ {items.map(({ key, label }) => ( +
+ ))} +
+
+ +
+ + {testResult && ( + + {testResult.message} + + )} +
+
+ + + ); +} diff --git a/apps/backoffice/lib/i18n/en.ts b/apps/backoffice/lib/i18n/en.ts index 121dd09..8f18b82 100644 --- a/apps/backoffice/lib/i18n/en.ts +++ b/apps/backoffice/lib/i18n/en.ts @@ -543,6 +543,33 @@ const en: Record = { "settings.qbittorrentUsername": "Username", "settings.qbittorrentPassword": "Password", + // Settings - Telegram Notifications + "settings.notifications": "Notifications", + "settings.telegram": "Telegram", + "settings.telegramDesc": "Receive Telegram notifications for scans, errors, and metadata linking.", + "settings.botToken": "Bot Token", + "settings.botTokenPlaceholder": "123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11", + "settings.chatId": "Chat ID", + "settings.chatIdPlaceholder": "123456789", + "settings.telegramEnabled": "Enable Telegram notifications", + "settings.telegramEvents": "Events", + "settings.eventCategoryScan": "Scans", + "settings.eventCategoryThumbnail": "Thumbnails", + "settings.eventCategoryConversion": "CBR → CBZ Conversion", + "settings.eventCategoryMetadata": "Metadata", + "settings.eventCompleted": "Completed", + "settings.eventFailed": "Failed", + "settings.eventCancelled": "Cancelled", + "settings.eventLinked": "Linked", + "settings.eventBatchCompleted": "Batch completed", + "settings.eventBatchFailed": "Batch failed", + "settings.eventRefreshCompleted": "Refresh completed", + "settings.eventRefreshFailed": "Refresh failed", + "settings.telegramHelp": "How to get the required information?", + "settings.telegramHelpBot": "Open Telegram, search for @BotFather, send /newbot and follow the instructions. Copy the token it gives you.", + "settings.telegramHelpChat": "Send a message to your bot, then open https://api.telegram.org/bot<TOKEN>/getUpdates in your browser. The chat id is in message.chat.id.", + "settings.telegramHelpGroup": "For a group: add the bot to the group, send a message, then check the same URL. Group IDs are negative (e.g. -123456789).", + // Settings - Language "settings.language": "Language", "settings.languageDesc": "Choose the interface language", diff --git a/apps/backoffice/lib/i18n/fr.ts b/apps/backoffice/lib/i18n/fr.ts index 192996a..32d59bf 100644 --- a/apps/backoffice/lib/i18n/fr.ts +++ b/apps/backoffice/lib/i18n/fr.ts @@ -541,6 +541,33 @@ const fr = { "settings.qbittorrentUsername": "Nom d'utilisateur", "settings.qbittorrentPassword": "Mot de passe", + // Settings - Telegram Notifications + "settings.notifications": "Notifications", + "settings.telegram": "Telegram", + "settings.telegramDesc": "Recevoir des notifications Telegram lors des scans, erreurs et liaisons de métadonnées.", + "settings.botToken": "Bot Token", + "settings.botTokenPlaceholder": "123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11", + "settings.chatId": "Chat ID", + "settings.chatIdPlaceholder": "123456789", + "settings.telegramEnabled": "Activer les notifications Telegram", + "settings.telegramEvents": "Événements", + "settings.eventCategoryScan": "Scans", + "settings.eventCategoryThumbnail": "Miniatures", + "settings.eventCategoryConversion": "Conversion CBR → CBZ", + "settings.eventCategoryMetadata": "Métadonnées", + "settings.eventCompleted": "Terminé", + "settings.eventFailed": "Échoué", + "settings.eventCancelled": "Annulé", + "settings.eventLinked": "Liée", + "settings.eventBatchCompleted": "Batch terminé", + "settings.eventBatchFailed": "Batch échoué", + "settings.eventRefreshCompleted": "Rafraîchissement terminé", + "settings.eventRefreshFailed": "Rafraîchissement échoué", + "settings.telegramHelp": "Comment obtenir les informations ?", + "settings.telegramHelpBot": "Ouvrez Telegram, recherchez @BotFather, envoyez /newbot et suivez les instructions. Copiez le token fourni.", + "settings.telegramHelpChat": "Envoyez un message à votre bot, puis ouvrez https://api.telegram.org/bot<TOKEN>/getUpdates dans votre navigateur. Le chat id apparaît dans message.chat.id.", + "settings.telegramHelpGroup": "Pour un groupe : ajoutez le bot au groupe, envoyez un message, puis consultez la même URL. Les IDs de groupe sont négatifs (ex: -123456789).", + // Settings - Language "settings.language": "Langue", "settings.languageDesc": "Choisir la langue de l'interface", diff --git a/apps/indexer/Cargo.toml b/apps/indexer/Cargo.toml index 7e49610..e70f6f1 100644 --- a/apps/indexer/Cargo.toml +++ b/apps/indexer/Cargo.toml @@ -14,6 +14,7 @@ futures = "0.3" image.workspace = true jpeg-decoder.workspace = true num_cpus.workspace = true +notifications = { path = "../../crates/notifications" } parsers = { path = "../../crates/parsers" } reqwest.workspace = true serde.workspace = true diff --git a/apps/indexer/Dockerfile b/apps/indexer/Dockerfile index 9a8e392..fb6bde8 100644 --- a/apps/indexer/Dockerfile +++ b/apps/indexer/Dockerfile @@ -6,13 +6,15 @@ COPY Cargo.toml ./ COPY apps/api/Cargo.toml apps/api/Cargo.toml COPY apps/indexer/Cargo.toml apps/indexer/Cargo.toml COPY crates/core/Cargo.toml crates/core/Cargo.toml +COPY crates/notifications/Cargo.toml crates/notifications/Cargo.toml COPY crates/parsers/Cargo.toml crates/parsers/Cargo.toml -RUN mkdir -p apps/api/src apps/indexer/src crates/core/src crates/parsers/src && \ +RUN mkdir -p apps/api/src apps/indexer/src crates/core/src crates/notifications/src crates/parsers/src && \ echo "fn main() {}" > apps/api/src/main.rs && \ echo "fn main() {}" > apps/indexer/src/main.rs && \ echo "" > apps/indexer/src/lib.rs && \ echo "" > crates/core/src/lib.rs && \ + echo "" > crates/notifications/src/lib.rs && \ echo "" > crates/parsers/src/lib.rs # Build dependencies only (cached as long as Cargo.toml files don't change) @@ -25,12 +27,13 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ COPY apps/api/src apps/api/src COPY apps/indexer/src apps/indexer/src COPY crates/core/src crates/core/src +COPY crates/notifications/src crates/notifications/src COPY crates/parsers/src crates/parsers/src RUN --mount=type=cache,target=/usr/local/cargo/registry \ --mount=type=cache,target=/usr/local/cargo/git \ --mount=type=cache,target=/app/target \ - touch apps/indexer/src/main.rs crates/core/src/lib.rs crates/parsers/src/lib.rs && \ + touch apps/indexer/src/main.rs crates/core/src/lib.rs crates/notifications/src/lib.rs crates/parsers/src/lib.rs && \ cargo build --release -p indexer && \ cp /app/target/release/indexer /usr/local/bin/indexer diff --git a/apps/indexer/src/job.rs b/apps/indexer/src/job.rs index 27dc92a..20ed1a0 100644 --- a/apps/indexer/src/job.rs +++ b/apps/indexer/src/job.rs @@ -328,6 +328,7 @@ pub async fn process_job( removed_files: 0, errors: 0, warnings: 0, + new_series: 0, }; let mut total_processed_count = 0i32; diff --git a/apps/indexer/src/scanner.rs b/apps/indexer/src/scanner.rs index 28b6e48..7a7af25 100644 --- a/apps/indexer/src/scanner.rs +++ b/apps/indexer/src/scanner.rs @@ -14,6 +14,7 @@ use crate::{ utils, AppState, }; +use std::collections::HashSet; #[derive(Serialize)] pub struct JobStats { @@ -22,6 +23,7 @@ pub struct JobStats { pub removed_files: usize, pub errors: usize, pub warnings: usize, + pub new_series: usize, } const BATCH_SIZE: usize = 100; @@ -106,6 +108,18 @@ pub async fn scan_library_discovery( HashMap::new() }; + // Track existing series names for new_series counting + let existing_series: HashSet = sqlx::query_scalar( + "SELECT DISTINCT COALESCE(NULLIF(series, ''), 'unclassified') FROM books WHERE library_id = $1", + ) + .bind(library_id) + .fetch_all(&state.pool) + .await + .unwrap_or_default() + .into_iter() + .collect(); + let mut seen_new_series: HashSet = HashSet::new(); + let mut seen: HashMap = HashMap::new(); let mut library_processed_count = 0i32; let mut last_progress_update = std::time::Instant::now(); @@ -382,6 +396,12 @@ pub async fn scan_library_discovery( let book_id = Uuid::new_v4(); let file_id = Uuid::new_v4(); + // Track new series + let series_key = parsed.series.as_deref().unwrap_or("unclassified").to_string(); + if !existing_series.contains(&series_key) && seen_new_series.insert(series_key) { + stats.new_series += 1; + } + books_to_insert.push(BookInsert { book_id, library_id, diff --git a/apps/indexer/src/worker.rs b/apps/indexer/src/worker.rs index 18ade62..b7469d3 100644 --- a/apps/indexer/src/worker.rs +++ b/apps/indexer/src/worker.rs @@ -1,10 +1,12 @@ use std::time::Duration; +use sqlx::Row; use tracing::{error, info, trace}; +use uuid::Uuid; use crate::{job, scheduler, watcher, AppState}; pub async fn run_worker(state: AppState, interval_seconds: u64) { let wait = Duration::from_secs(interval_seconds.max(1)); - + // Cleanup stale jobs from previous runs if let Err(err) = job::cleanup_stale_jobs(&state.pool).await { error!("[CLEANUP] Failed to cleanup stale jobs: {}", err); @@ -34,21 +36,168 @@ pub async fn run_worker(state: AppState, interval_seconds: u64) { } }); + async fn load_job_info( + pool: &sqlx::PgPool, + job_id: Uuid, + library_id: Option, + ) -> (String, Option, Option) { + let row = sqlx::query("SELECT type, book_id FROM index_jobs WHERE id = $1") + .bind(job_id) + .fetch_optional(pool) + .await + .ok() + .flatten(); + + let (job_type, book_id): (String, Option) = match row { + Some(r) => (r.get("type"), r.get("book_id")), + None => ("unknown".to_string(), None), + }; + + let library_name: Option = if let Some(lib_id) = library_id { + sqlx::query_scalar("SELECT name FROM libraries WHERE id = $1") + .bind(lib_id) + .fetch_optional(pool) + .await + .ok() + .flatten() + } else { + None + }; + + let book_title: Option = if let Some(bid) = book_id { + sqlx::query_scalar("SELECT title FROM books WHERE id = $1") + .bind(bid) + .fetch_optional(pool) + .await + .ok() + .flatten() + } else { + None + }; + + (job_type, library_name, book_title) + } + + async fn load_scan_stats(pool: &sqlx::PgPool, job_id: Uuid) -> notifications::ScanStats { + let row = sqlx::query("SELECT stats_json FROM index_jobs WHERE id = $1") + .bind(job_id) + .fetch_optional(pool) + .await + .ok() + .flatten(); + + if let Some(row) = row { + if let Ok(val) = row.try_get::("stats_json") { + return notifications::ScanStats { + scanned_files: val.get("scanned_files").and_then(|v| v.as_u64()).unwrap_or(0) as usize, + indexed_files: val.get("indexed_files").and_then(|v| v.as_u64()).unwrap_or(0) as usize, + removed_files: val.get("removed_files").and_then(|v| v.as_u64()).unwrap_or(0) as usize, + new_series: val.get("new_series").and_then(|v| v.as_u64()).unwrap_or(0) as usize, + errors: val.get("errors").and_then(|v| v.as_u64()).unwrap_or(0) as usize, + }; + } + } + + notifications::ScanStats { + scanned_files: 0, + indexed_files: 0, + removed_files: 0, + new_series: 0, + errors: 0, + } + } + + fn build_completed_event( + job_type: &str, + library_name: Option, + book_title: Option, + stats: notifications::ScanStats, + duration_seconds: u64, + ) -> notifications::NotificationEvent { + match notifications::job_type_category(job_type) { + "thumbnail" => notifications::NotificationEvent::ThumbnailCompleted { + job_type: job_type.to_string(), + library_name, + duration_seconds, + }, + "conversion" => notifications::NotificationEvent::ConversionCompleted { + library_name, + book_title, + }, + _ => notifications::NotificationEvent::ScanCompleted { + job_type: job_type.to_string(), + library_name, + stats, + duration_seconds, + }, + } + } + + fn build_failed_event( + job_type: &str, + library_name: Option, + book_title: Option, + error: String, + ) -> notifications::NotificationEvent { + match notifications::job_type_category(job_type) { + "thumbnail" => notifications::NotificationEvent::ThumbnailFailed { + job_type: job_type.to_string(), + library_name, + error, + }, + "conversion" => notifications::NotificationEvent::ConversionFailed { + library_name, + book_title, + error, + }, + _ => notifications::NotificationEvent::ScanFailed { + job_type: job_type.to_string(), + library_name, + error, + }, + } + } + loop { match job::claim_next_job(&state.pool).await { Ok(Some((job_id, library_id))) => { info!("[INDEXER] Starting job {} library={:?}", job_id, library_id); + let started_at = std::time::Instant::now(); + let (job_type, library_name, book_title) = + load_job_info(&state.pool, job_id, library_id).await; + if let Err(err) = job::process_job(&state, job_id, library_id).await { let err_str = err.to_string(); if err_str.contains("cancelled") || err_str.contains("Cancelled") { info!("[INDEXER] Job {} was cancelled by user", job_id); - // Status is already 'cancelled' in DB, don't change it + notifications::notify( + state.pool.clone(), + notifications::NotificationEvent::ScanCancelled { + job_type: job_type.clone(), + library_name: library_name.clone(), + }, + ); } else { error!("[INDEXER] Job {} failed: {}", job_id, err); let _ = job::fail_job(&state.pool, job_id, &err_str).await; + notifications::notify( + state.pool.clone(), + build_failed_event(&job_type, library_name.clone(), book_title.clone(), err_str), + ); } } else { info!("[INDEXER] Job {} completed", job_id); + let stats = load_scan_stats(&state.pool, job_id).await; + notifications::notify( + state.pool.clone(), + build_completed_event( + &job_type, + library_name.clone(), + book_title.clone(), + stats, + started_at.elapsed().as_secs(), + ), + ); } } Ok(None) => { diff --git a/crates/notifications/Cargo.toml b/crates/notifications/Cargo.toml new file mode 100644 index 0000000..98f7604 --- /dev/null +++ b/crates/notifications/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "notifications" +version.workspace = true +edition.workspace = true + +[dependencies] +anyhow.workspace = true +reqwest.workspace = true +serde.workspace = true +serde_json.workspace = true +sqlx.workspace = true +tokio.workspace = true +tracing.workspace = true diff --git a/crates/notifications/src/lib.rs b/crates/notifications/src/lib.rs new file mode 100644 index 0000000..3c31f4b --- /dev/null +++ b/crates/notifications/src/lib.rs @@ -0,0 +1,442 @@ +use anyhow::Result; +use serde::Deserialize; +use sqlx::PgPool; +use tracing::{info, warn}; + +// --------------------------------------------------------------------------- +// Config +// --------------------------------------------------------------------------- + +#[derive(Debug, Deserialize)] +pub struct TelegramConfig { + pub bot_token: String, + pub chat_id: String, + #[serde(default)] + pub enabled: bool, + #[serde(default = "default_events")] + pub events: EventToggles, +} + +#[derive(Debug, Deserialize)] +pub struct EventToggles { + #[serde(default = "default_true")] + pub scan_completed: bool, + #[serde(default = "default_true")] + pub scan_failed: bool, + #[serde(default = "default_true")] + pub scan_cancelled: bool, + #[serde(default = "default_true")] + pub thumbnail_completed: bool, + #[serde(default = "default_true")] + pub thumbnail_failed: bool, + #[serde(default = "default_true")] + pub conversion_completed: bool, + #[serde(default = "default_true")] + pub conversion_failed: bool, + #[serde(default = "default_true")] + pub metadata_approved: bool, + #[serde(default = "default_true")] + pub metadata_batch_completed: bool, + #[serde(default = "default_true")] + pub metadata_batch_failed: bool, + #[serde(default = "default_true")] + pub metadata_refresh_completed: bool, + #[serde(default = "default_true")] + pub metadata_refresh_failed: bool, +} + +fn default_true() -> bool { + true +} + +fn default_events() -> EventToggles { + EventToggles { + scan_completed: true, + scan_failed: true, + scan_cancelled: true, + thumbnail_completed: true, + thumbnail_failed: true, + conversion_completed: true, + conversion_failed: true, + metadata_approved: true, + metadata_batch_completed: true, + metadata_batch_failed: true, + metadata_refresh_completed: true, + metadata_refresh_failed: true, + } +} + +/// Load the Telegram config from `app_settings` (key = "telegram"). +/// Returns `None` when the row is missing, disabled, or has empty credentials. +pub async fn load_telegram_config(pool: &PgPool) -> Option { + let row = sqlx::query_scalar::<_, serde_json::Value>( + "SELECT value FROM app_settings WHERE key = 'telegram'", + ) + .fetch_optional(pool) + .await + .ok()??; + + let config: TelegramConfig = serde_json::from_value(row).ok()?; + + if !config.enabled || config.bot_token.is_empty() || config.chat_id.is_empty() { + return None; + } + + Some(config) +} + +// --------------------------------------------------------------------------- +// Telegram HTTP +// --------------------------------------------------------------------------- + +async fn send_telegram(config: &TelegramConfig, text: &str) -> Result<()> { + let url = format!( + "https://api.telegram.org/bot{}/sendMessage", + config.bot_token + ); + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build()?; + + let body = serde_json::json!({ + "chat_id": config.chat_id, + "text": text, + "parse_mode": "HTML", + }); + + let resp = client.post(&url).json(&body).send().await?; + + if !resp.status().is_success() { + let status = resp.status(); + let text = resp.text().await.unwrap_or_default(); + anyhow::bail!("Telegram API returned {status}: {text}"); + } + + Ok(()) +} + +/// Send a test message. Returns the result directly (not fire-and-forget). +pub async fn send_test_message(config: &TelegramConfig) -> Result<()> { + send_telegram(config, "🔔 Stripstream Librarian\nTest notification — connection OK!").await +} + +// --------------------------------------------------------------------------- +// Notification events +// --------------------------------------------------------------------------- + +pub struct ScanStats { + pub scanned_files: usize, + pub indexed_files: usize, + pub removed_files: usize, + pub new_series: usize, + pub errors: usize, +} + +pub enum NotificationEvent { + // Scan jobs (rebuild, full_rebuild, rescan, scan) + ScanCompleted { + job_type: String, + library_name: Option, + stats: ScanStats, + duration_seconds: u64, + }, + ScanFailed { + job_type: String, + library_name: Option, + error: String, + }, + ScanCancelled { + job_type: String, + library_name: Option, + }, + // Thumbnail jobs (thumbnail_rebuild, thumbnail_regenerate) + ThumbnailCompleted { + job_type: String, + library_name: Option, + duration_seconds: u64, + }, + ThumbnailFailed { + job_type: String, + library_name: Option, + error: String, + }, + // CBR→CBZ conversion + ConversionCompleted { + library_name: Option, + book_title: Option, + }, + ConversionFailed { + library_name: Option, + book_title: Option, + error: String, + }, + // Metadata manual approve + MetadataApproved { + series_name: String, + provider: String, + }, + // Metadata batch (auto-match) + MetadataBatchCompleted { + library_name: Option, + total_series: i32, + processed: i32, + }, + MetadataBatchFailed { + library_name: Option, + error: String, + }, + // Metadata refresh + MetadataRefreshCompleted { + library_name: Option, + refreshed: i32, + unchanged: i32, + errors: i32, + }, + MetadataRefreshFailed { + library_name: Option, + error: String, + }, +} + +/// Classify an indexer job_type string into the right event constructor category. +/// Returns "scan", "thumbnail", or "conversion". +pub fn job_type_category(job_type: &str) -> &'static str { + match job_type { + "thumbnail_rebuild" | "thumbnail_regenerate" => "thumbnail", + "cbr_to_cbz" => "conversion", + _ => "scan", + } +} + +fn format_event(event: &NotificationEvent) -> String { + match event { + NotificationEvent::ScanCompleted { + job_type, + library_name, + stats, + duration_seconds, + } => { + let lib = library_name.as_deref().unwrap_or("All libraries"); + let duration = format_duration(*duration_seconds); + format!( + "📚 Scan completed\n\ + Library: {lib}\n\ + Type: {job_type}\n\ + New books: {}\n\ + New series: {}\n\ + Files scanned: {}\n\ + Removed: {}\n\ + Errors: {}\n\ + Duration: {duration}", + stats.indexed_files, + stats.new_series, + stats.scanned_files, + stats.removed_files, + stats.errors, + ) + } + NotificationEvent::ScanFailed { + job_type, + library_name, + error, + } => { + let lib = library_name.as_deref().unwrap_or("All libraries"); + let err = truncate(error, 200); + format!( + "❌ Scan failed\n\ + Library: {lib}\n\ + Type: {job_type}\n\ + Error: {err}" + ) + } + NotificationEvent::ScanCancelled { + job_type, + library_name, + } => { + let lib = library_name.as_deref().unwrap_or("All libraries"); + format!( + "⏹ Scan cancelled\n\ + Library: {lib}\n\ + Type: {job_type}" + ) + } + NotificationEvent::ThumbnailCompleted { + job_type, + library_name, + duration_seconds, + } => { + let lib = library_name.as_deref().unwrap_or("All libraries"); + let duration = format_duration(*duration_seconds); + format!( + "🖼 Thumbnails completed\n\ + Library: {lib}\n\ + Type: {job_type}\n\ + Duration: {duration}" + ) + } + NotificationEvent::ThumbnailFailed { + job_type, + library_name, + error, + } => { + let lib = library_name.as_deref().unwrap_or("All libraries"); + let err = truncate(error, 200); + format!( + "❌ Thumbnails failed\n\ + Library: {lib}\n\ + Type: {job_type}\n\ + Error: {err}" + ) + } + NotificationEvent::ConversionCompleted { + library_name, + book_title, + } => { + let lib = library_name.as_deref().unwrap_or("Unknown"); + let title = book_title.as_deref().unwrap_or("Unknown"); + format!( + "🔄 CBR→CBZ conversion completed\n\ + Library: {lib}\n\ + Book: {title}" + ) + } + NotificationEvent::ConversionFailed { + library_name, + book_title, + error, + } => { + let lib = library_name.as_deref().unwrap_or("Unknown"); + let title = book_title.as_deref().unwrap_or("Unknown"); + let err = truncate(error, 200); + format!( + "❌ CBR→CBZ conversion failed\n\ + Library: {lib}\n\ + Book: {title}\n\ + Error: {err}" + ) + } + NotificationEvent::MetadataApproved { + series_name, + provider, + } => { + format!( + "🔗 Metadata linked\n\ + Series: {series_name}\n\ + Provider: {provider}" + ) + } + NotificationEvent::MetadataBatchCompleted { + library_name, + total_series, + processed, + } => { + let lib = library_name.as_deref().unwrap_or("All libraries"); + format!( + "🔍 Metadata batch completed\n\ + Library: {lib}\n\ + Series processed: {processed}/{total_series}" + ) + } + NotificationEvent::MetadataBatchFailed { + library_name, + error, + } => { + let lib = library_name.as_deref().unwrap_or("All libraries"); + let err = truncate(error, 200); + format!( + "❌ Metadata batch failed\n\ + Library: {lib}\n\ + Error: {err}" + ) + } + NotificationEvent::MetadataRefreshCompleted { + library_name, + refreshed, + unchanged, + errors, + } => { + let lib = library_name.as_deref().unwrap_or("All libraries"); + format!( + "🔄 Metadata refresh completed\n\ + Library: {lib}\n\ + Updated: {refreshed}\n\ + Unchanged: {unchanged}\n\ + Errors: {errors}" + ) + } + NotificationEvent::MetadataRefreshFailed { + library_name, + error, + } => { + let lib = library_name.as_deref().unwrap_or("All libraries"); + let err = truncate(error, 200); + format!( + "❌ Metadata refresh failed\n\ + Library: {lib}\n\ + Error: {err}" + ) + } + } +} + +fn truncate(s: &str, max: usize) -> String { + if s.len() > max { + format!("{}…", &s[..max]) + } else { + s.to_string() + } +} + +fn format_duration(secs: u64) -> String { + if secs < 60 { + format!("{secs}s") + } else { + let m = secs / 60; + let s = secs % 60; + format!("{m}m{s}s") + } +} + +// --------------------------------------------------------------------------- +// Public entry point — fire & forget +// --------------------------------------------------------------------------- + +/// Returns whether this event type is enabled in the config. +fn is_event_enabled(config: &TelegramConfig, event: &NotificationEvent) -> bool { + match event { + NotificationEvent::ScanCompleted { .. } => config.events.scan_completed, + NotificationEvent::ScanFailed { .. } => config.events.scan_failed, + NotificationEvent::ScanCancelled { .. } => config.events.scan_cancelled, + NotificationEvent::ThumbnailCompleted { .. } => config.events.thumbnail_completed, + NotificationEvent::ThumbnailFailed { .. } => config.events.thumbnail_failed, + NotificationEvent::ConversionCompleted { .. } => config.events.conversion_completed, + NotificationEvent::ConversionFailed { .. } => config.events.conversion_failed, + NotificationEvent::MetadataApproved { .. } => config.events.metadata_approved, + NotificationEvent::MetadataBatchCompleted { .. } => config.events.metadata_batch_completed, + NotificationEvent::MetadataBatchFailed { .. } => config.events.metadata_batch_failed, + NotificationEvent::MetadataRefreshCompleted { .. } => config.events.metadata_refresh_completed, + NotificationEvent::MetadataRefreshFailed { .. } => config.events.metadata_refresh_failed, + } +} + +/// Load config + format + send in a spawned task. Errors are only logged. +pub fn notify(pool: PgPool, event: NotificationEvent) { + tokio::spawn(async move { + let config = match load_telegram_config(&pool).await { + Some(c) => c, + None => return, // disabled or not configured + }; + + if !is_event_enabled(&config, &event) { + return; + } + + let text = format_event(&event); + if let Err(e) = send_telegram(&config, &text).await { + warn!("[TELEGRAM] Failed to send notification: {e}"); + } else { + info!("[TELEGRAM] Notification sent"); + } + }); +} diff --git a/infra/migrations/0048_add_telegram_settings.sql b/infra/migrations/0048_add_telegram_settings.sql new file mode 100644 index 0000000..877fb52 --- /dev/null +++ b/infra/migrations/0048_add_telegram_settings.sql @@ -0,0 +1,3 @@ +INSERT INTO app_settings (key, value) VALUES + ('telegram', '{"bot_token": "", "chat_id": "", "enabled": false, "events": {"job_completed": true, "job_failed": true, "job_cancelled": true, "metadata_approved": true}}') +ON CONFLICT DO NOTHING; diff --git a/infra/migrations/0049_update_telegram_events.sql b/infra/migrations/0049_update_telegram_events.sql new file mode 100644 index 0000000..73d98bf --- /dev/null +++ b/infra/migrations/0049_update_telegram_events.sql @@ -0,0 +1,8 @@ +-- Update telegram events from 4 generic toggles to 12 granular toggles +UPDATE app_settings +SET value = jsonb_set( + value, + '{events}', + '{"scan_completed": true, "scan_failed": true, "scan_cancelled": true, "thumbnail_completed": true, "thumbnail_failed": true, "conversion_completed": true, "conversion_failed": true, "metadata_approved": true, "metadata_batch_completed": true, "metadata_batch_failed": true, "metadata_refresh_completed": true, "metadata_refresh_failed": true}'::jsonb +) +WHERE key = 'telegram';