feat(watcher): Ajout watcher de fichiers temps réel

- Migration 0006: colonne watcher_enabled
- Crate notify pour surveillance FS temps réel (FSEvents/inotify)
- Watcher redémarré toutes les 30s si config change
- Détection instantanée création/modification/suppression
- Création job immédiate quand fichier détecté
- API: support watcher_enabled dans UpdateMonitoringRequest
- Backoffice: toggle Watcher avec indicateur 
- Fonctionne en parallèle du scheduler auto-scan

Usage: Activer Watcher + Auto-scan pour réactivité max
This commit is contained in:
2026-03-06 11:49:53 +01:00
parent 6e0a77fae0
commit 75f7de2e43
8 changed files with 340 additions and 25 deletions

View File

@@ -2,12 +2,14 @@ use anyhow::Context;
use axum::{extract::State, routing::get, Json, Router};
use chrono::{DateTime, Utc};
use axum::http::StatusCode;
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
use parsers::{detect_format, parse_metadata, BookFormat};
use serde::Serialize;
use sha2::{Digest, Sha256};
use sqlx::{postgres::PgPoolOptions, Row};
use std::{collections::HashMap, path::Path, time::Duration};
use stripstream_core::config::IndexerConfig;
use tokio::sync::mpsc;
use tracing::{error, info, trace, warn};
use uuid::Uuid;
use walkdir::WalkDir;
@@ -92,6 +94,16 @@ async fn ready(State(state): State<AppState>) -> Result<Json<serde_json::Value>,
async fn run_worker(state: AppState, interval_seconds: u64) {
let wait = Duration::from_secs(interval_seconds.max(1));
// Start file watcher task
let watcher_state = state.clone();
let _watcher_handle = tokio::spawn(async move {
info!("[WATCHER] Starting file watcher service");
if let Err(err) = run_file_watcher(watcher_state).await {
error!("[WATCHER] Error: {}", err);
}
});
// Start scheduler task for auto-monitoring
let scheduler_state = state.clone();
let _scheduler_handle = tokio::spawn(async move {
@@ -127,6 +139,140 @@ async fn run_worker(state: AppState, interval_seconds: u64) {
}
}
async fn run_file_watcher(state: AppState) -> anyhow::Result<()> {
let (tx, mut rx) = mpsc::channel::<(Uuid, String)>(100);
// Start watcher refresh loop
let refresh_interval = Duration::from_secs(30);
let pool = state.pool.clone();
tokio::spawn(async move {
let mut watcher: Option<RecommendedWatcher> = None;
let mut watched_libraries: HashMap<Uuid, String> = HashMap::new();
loop {
// Get libraries with watcher enabled
match sqlx::query(
"SELECT id, root_path FROM libraries WHERE watcher_enabled = TRUE AND enabled = TRUE"
)
.fetch_all(&pool)
.await
{
Ok(rows) => {
let current_libraries: HashMap<Uuid, String> = rows
.into_iter()
.map(|row| {
let id: Uuid = row.get("id");
let root_path: String = row.get("root_path");
let local_path = remap_libraries_path(&root_path);
(id, local_path)
})
.collect();
// Check if we need to recreate watcher
let needs_restart = watched_libraries.len() != current_libraries.len()
|| watched_libraries.iter().any(|(id, path)| {
current_libraries.get(id) != Some(path)
});
if needs_restart {
info!("[WATCHER] Restarting watcher for {} libraries", current_libraries.len());
// Drop old watcher
watcher = None;
watched_libraries.clear();
if !current_libraries.is_empty() {
let tx_clone = tx.clone();
let libraries_clone = current_libraries.clone();
match setup_watcher(libraries_clone, tx_clone) {
Ok(new_watcher) => {
watcher = Some(new_watcher);
watched_libraries = current_libraries;
info!("[WATCHER] Watching {} libraries", watched_libraries.len());
}
Err(err) => {
error!("[WATCHER] Failed to setup watcher: {}", err);
}
}
}
}
}
Err(err) => {
error!("[WATCHER] Failed to fetch libraries: {}", err);
}
}
tokio::time::sleep(refresh_interval).await;
}
});
// Process watcher events
while let Some((library_id, file_path)) = rx.recv().await {
info!("[WATCHER] File changed in library {}: {}", library_id, file_path);
// Check if there's already a pending job for this library
match sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM index_jobs WHERE library_id = $1 AND status IN ('pending', 'running'))"
)
.bind(library_id)
.fetch_one(&state.pool)
.await
{
Ok(exists) => {
if !exists {
// Create a quick scan job
let job_id = Uuid::new_v4();
match sqlx::query(
"INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, 'rebuild', 'pending')"
)
.bind(job_id)
.bind(library_id)
.execute(&state.pool)
.await
{
Ok(_) => info!("[WATCHER] Created job {} for library {}", job_id, library_id),
Err(err) => error!("[WATCHER] Failed to create job: {}", err),
}
} else {
trace!("[WATCHER] Job already pending for library {}, skipping", library_id);
}
}
Err(err) => error!("[WATCHER] Failed to check existing jobs: {}", err),
}
}
Ok(())
}
fn setup_watcher(
libraries: HashMap<Uuid, String>,
tx: mpsc::Sender<(Uuid, String)>,
) -> anyhow::Result<RecommendedWatcher> {
let watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| {
match res {
Ok(event) => {
if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() {
for path in event.paths {
if let Some((library_id, _)) = libraries.iter().find(|(_, root)| {
path.starts_with(root)
}) {
let path_str = path.to_string_lossy().to_string();
if detect_format(&path).is_some() {
let _ = tx.try_send((*library_id, path_str));
}
}
}
}
}
Err(err) => error!("[WATCHER] Event error: {}", err),
}
})?;
Ok(watcher)
}
async fn check_and_schedule_auto_scans(pool: &sqlx::PgPool) -> anyhow::Result<()> {
let libraries = sqlx::query(
r#"