diff --git a/apps/api/src/api_middleware.rs b/apps/api/src/api_middleware.rs new file mode 100644 index 0000000..8972848 --- /dev/null +++ b/apps/api/src/api_middleware.rs @@ -0,0 +1,42 @@ +use axum::{ + extract::State, + middleware::Next, + response::{IntoResponse, Response}, +}; +use std::time::Duration; +use std::sync::atomic::Ordering; + +use crate::state::AppState; + +pub async fn request_counter( + State(state): State, + req: axum::extract::Request, + next: Next, +) -> Response { + state.metrics.requests_total.fetch_add(1, Ordering::Relaxed); + next.run(req).await +} + +pub async fn read_rate_limit( + State(state): State, + req: axum::extract::Request, + next: Next, +) -> Response { + let mut limiter = state.read_rate_limit.lock().await; + if limiter.window_started_at.elapsed() >= Duration::from_secs(1) { + limiter.window_started_at = std::time::Instant::now(); + limiter.requests_in_window = 0; + } + + if limiter.requests_in_window >= 120 { + return ( + axum::http::StatusCode::TOO_MANY_REQUESTS, + "rate limit exceeded", + ) + .into_response(); + } + + limiter.requests_in_window += 1; + drop(limiter); + next.run(req).await +} diff --git a/apps/api/src/auth.rs b/apps/api/src/auth.rs index 3e106d0..6cdd070 100644 --- a/apps/api/src/auth.rs +++ b/apps/api/src/auth.rs @@ -8,7 +8,7 @@ use axum::{ use chrono::Utc; use sqlx::Row; -use crate::{error::ApiError, AppState}; +use crate::{error::ApiError, state::AppState}; #[derive(Clone, Debug)] pub enum Scope { diff --git a/apps/api/src/books.rs b/apps/api/src/books.rs index 5f0d1c0..2ebc11d 100644 --- a/apps/api/src/books.rs +++ b/apps/api/src/books.rs @@ -5,7 +5,7 @@ use sqlx::Row; use uuid::Uuid; use utoipa::ToSchema; -use crate::{error::ApiError, AppState}; +use crate::{error::ApiError, state::AppState}; #[derive(Deserialize, ToSchema)] pub struct ListBooksQuery { diff --git a/apps/api/src/handlers.rs b/apps/api/src/handlers.rs new file mode 100644 index 0000000..26cb73c --- /dev/null +++ b/apps/api/src/handlers.rs @@ -0,0 +1,26 @@ +use axum::{extract::State, Json}; +use std::sync::atomic::Ordering; + +use crate::{error::ApiError, state::AppState}; + +pub async fn health() -> &'static str { + "ok" +} + +pub async fn docs_redirect() -> impl axum::response::IntoResponse { + axum::response::Redirect::to("/swagger-ui/") +} + +pub async fn ready(State(state): State) -> Result, ApiError> { + sqlx::query("SELECT 1").execute(&state.pool).await?; + Ok(Json(serde_json::json!({"status": "ready"}))) +} + +pub async fn metrics(State(state): State) -> String { + format!( + "requests_total {}\npage_cache_hits {}\npage_cache_misses {}\n", + state.metrics.requests_total.load(Ordering::Relaxed), + state.metrics.page_cache_hits.load(Ordering::Relaxed), + state.metrics.page_cache_misses.load(Ordering::Relaxed), + ) +} diff --git a/apps/api/src/index_jobs.rs b/apps/api/src/index_jobs.rs index 7439cd5..3fcc177 100644 --- a/apps/api/src/index_jobs.rs +++ b/apps/api/src/index_jobs.rs @@ -8,7 +8,7 @@ use tokio_stream::Stream; use uuid::Uuid; use utoipa::ToSchema; -use crate::{error::ApiError, AppState}; +use crate::{error::ApiError, state::AppState}; #[derive(Deserialize, ToSchema)] pub struct RebuildRequest { diff --git a/apps/api/src/libraries.rs b/apps/api/src/libraries.rs index f38f790..b798d4d 100644 --- a/apps/api/src/libraries.rs +++ b/apps/api/src/libraries.rs @@ -6,7 +6,7 @@ use sqlx::Row; use uuid::Uuid; use utoipa::ToSchema; -use crate::{error::ApiError, AppState}; +use crate::{error::ApiError, state::AppState}; #[derive(Serialize, ToSchema)] pub struct LibraryResponse { diff --git a/apps/api/src/main.rs b/apps/api/src/main.rs index 2fba993..f007239 100644 --- a/apps/api/src/main.rs +++ b/apps/api/src/main.rs @@ -1,91 +1,36 @@ mod auth; mod books; mod error; +mod handlers; mod index_jobs; mod libraries; +mod api_middleware; mod openapi; mod pages; mod search; mod settings; +mod state; mod thumbnails; mod tokens; -use std::{ - num::NonZeroUsize, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - time::{Duration, Instant}, -}; +use std::sync::Arc; +use std::time::Instant; use axum::{ middleware, - response::IntoResponse, routing::{delete, get}, - Json, Router, + Router, }; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; use lru::LruCache; +use std::num::NonZeroUsize; use stripstream_core::config::ApiConfig; use sqlx::postgres::PgPoolOptions; use tokio::sync::{Mutex, Semaphore}; use tracing::info; -use sqlx::{Pool, Postgres, Row}; -use serde_json::Value; -#[derive(Clone)] -struct AppState { - pool: sqlx::PgPool, - bootstrap_token: Arc, - meili_url: Arc, - meili_master_key: Arc, - page_cache: Arc>>>>, - page_render_limit: Arc, - metrics: Arc, - read_rate_limit: Arc>, -} - -struct Metrics { - requests_total: AtomicU64, - page_cache_hits: AtomicU64, - page_cache_misses: AtomicU64, -} - -struct ReadRateLimit { - window_started_at: Instant, - requests_in_window: u32, -} - -impl Metrics { - fn new() -> Self { - Self { - requests_total: AtomicU64::new(0), - page_cache_hits: AtomicU64::new(0), - page_cache_misses: AtomicU64::new(0), - } - } -} - -async fn load_concurrent_renders(pool: &Pool) -> usize { - let default_concurrency = 8; - let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#) - .fetch_optional(pool) - .await; - - match row { - Ok(Some(row)) => { - let value: Value = row.get("value"); - value - .get("concurrent_renders") - .and_then(|v: &Value| v.as_u64()) - .map(|v| v as usize) - .unwrap_or(default_concurrency) - } - _ => default_concurrency, - } -} +use crate::state::{load_concurrent_renders, AppState, Metrics, ReadRateLimit}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -150,21 +95,21 @@ async fn main() -> anyhow::Result<()> { .route("/books/:id/pages/:n", get(pages::get_page)) .route("/libraries/:library_id/series", get(books::list_series)) .route("/search", get(search::search_books)) - .route_layer(middleware::from_fn_with_state(state.clone(), read_rate_limit)) + .route_layer(middleware::from_fn_with_state(state.clone(), api_middleware::read_rate_limit)) .route_layer(middleware::from_fn_with_state( state.clone(), auth::require_read, )); let app = Router::new() - .route("/health", get(health)) - .route("/ready", get(ready)) - .route("/metrics", get(metrics)) - .route("/docs", get(docs_redirect)) + .route("/health", get(handlers::health)) + .route("/ready", get(handlers::ready)) + .route("/metrics", get(handlers::metrics)) + .route("/docs", get(handlers::docs_redirect)) .merge(SwaggerUi::new("/swagger-ui").url("/openapi.json", openapi::ApiDoc::openapi())) .merge(admin_routes) .merge(read_routes) - .layer(middleware::from_fn_with_state(state.clone(), request_counter)) + .layer(middleware::from_fn_with_state(state.clone(), api_middleware::request_counter)) .with_state(state); let listener = tokio::net::TcpListener::bind(&config.listen_addr).await?; @@ -173,57 +118,3 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -async fn health() -> &'static str { - "ok" -} - -async fn docs_redirect() -> impl axum::response::IntoResponse { - axum::response::Redirect::to("/swagger-ui/") -} - -async fn ready(axum::extract::State(state): axum::extract::State) -> Result, error::ApiError> { - sqlx::query("SELECT 1").execute(&state.pool).await?; - Ok(Json(serde_json::json!({"status": "ready"}))) -} - -async fn metrics(axum::extract::State(state): axum::extract::State) -> String { - format!( - "requests_total {}\npage_cache_hits {}\npage_cache_misses {}\n", - state.metrics.requests_total.load(Ordering::Relaxed), - state.metrics.page_cache_hits.load(Ordering::Relaxed), - state.metrics.page_cache_misses.load(Ordering::Relaxed), - ) -} - -async fn request_counter( - axum::extract::State(state): axum::extract::State, - req: axum::extract::Request, - next: axum::middleware::Next, -) -> axum::response::Response { - state.metrics.requests_total.fetch_add(1, Ordering::Relaxed); - next.run(req).await -} - -async fn read_rate_limit( - axum::extract::State(state): axum::extract::State, - req: axum::extract::Request, - next: axum::middleware::Next, -) -> axum::response::Response { - let mut limiter = state.read_rate_limit.lock().await; - if limiter.window_started_at.elapsed() >= Duration::from_secs(1) { - limiter.window_started_at = Instant::now(); - limiter.requests_in_window = 0; - } - - if limiter.requests_in_window >= 120 { - return ( - axum::http::StatusCode::TOO_MANY_REQUESTS, - "rate limit exceeded", - ) - .into_response(); - } - - limiter.requests_in_window += 1; - drop(limiter); - next.run(req).await -} diff --git a/apps/api/src/pages.rs b/apps/api/src/pages.rs index 7c1730a..f5cbac2 100644 --- a/apps/api/src/pages.rs +++ b/apps/api/src/pages.rs @@ -20,7 +20,7 @@ use tracing::{debug, error, info, instrument, warn}; use uuid::Uuid; use walkdir::WalkDir; -use crate::{error::ApiError, AppState}; +use crate::{error::ApiError, state::AppState}; fn remap_libraries_path(path: &str) -> String { if let Ok(root) = std::env::var("LIBRARIES_ROOT_PATH") { diff --git a/apps/api/src/search.rs b/apps/api/src/search.rs index 82ea888..59d7122 100644 --- a/apps/api/src/search.rs +++ b/apps/api/src/search.rs @@ -2,7 +2,7 @@ use axum::{extract::{Query, State}, Json}; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; -use crate::{error::ApiError, AppState}; +use crate::{error::ApiError, state::AppState}; #[derive(Deserialize, ToSchema)] pub struct SearchQuery { diff --git a/apps/api/src/settings.rs b/apps/api/src/settings.rs index 17be349..a5aaf1b 100644 --- a/apps/api/src/settings.rs +++ b/apps/api/src/settings.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use sqlx::Row; -use crate::{error::ApiError, AppState}; +use crate::{error::ApiError, state::AppState}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct UpdateSettingRequest { diff --git a/apps/api/src/state.rs b/apps/api/src/state.rs new file mode 100644 index 0000000..cd5ca66 --- /dev/null +++ b/apps/api/src/state.rs @@ -0,0 +1,61 @@ +use std::sync::{ + atomic::AtomicU64, + Arc, +}; +use std::time::Instant; + +use lru::LruCache; +use sqlx::{Pool, Postgres, Row}; +use tokio::sync::{Mutex, Semaphore}; + +#[derive(Clone)] +pub struct AppState { + pub pool: sqlx::PgPool, + pub bootstrap_token: Arc, + pub meili_url: Arc, + pub meili_master_key: Arc, + pub page_cache: Arc>>>>, + pub page_render_limit: Arc, + pub metrics: Arc, + pub read_rate_limit: Arc>, +} + +pub struct Metrics { + pub requests_total: AtomicU64, + pub page_cache_hits: AtomicU64, + pub page_cache_misses: AtomicU64, +} + +pub struct ReadRateLimit { + pub window_started_at: Instant, + pub requests_in_window: u32, +} + +impl Metrics { + pub fn new() -> Self { + Self { + requests_total: AtomicU64::new(0), + page_cache_hits: AtomicU64::new(0), + page_cache_misses: AtomicU64::new(0), + } + } +} + +pub async fn load_concurrent_renders(pool: &Pool) -> usize { + let default_concurrency = 8; + let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#) + .fetch_optional(pool) + .await; + + match row { + Ok(Some(row)) => { + let value: serde_json::Value = row.get("value"); + value + .get("concurrent_renders") + .and_then(|v: &serde_json::Value| v.as_u64()) + .map(|v| v as usize) + .unwrap_or(default_concurrency) + } + _ => default_concurrency, + } +} diff --git a/apps/api/src/thumbnails.rs b/apps/api/src/thumbnails.rs index 6e27bfc..bbb239e 100644 --- a/apps/api/src/thumbnails.rs +++ b/apps/api/src/thumbnails.rs @@ -16,7 +16,7 @@ use tracing::{info, warn}; use uuid::Uuid; use utoipa::ToSchema; -use crate::{error::ApiError, index_jobs, pages, AppState}; +use crate::{error::ApiError, index_jobs, pages, state::AppState}; #[derive(Clone)] struct ThumbnailConfig { diff --git a/apps/api/src/tokens.rs b/apps/api/src/tokens.rs index a1cacb9..735538e 100644 --- a/apps/api/src/tokens.rs +++ b/apps/api/src/tokens.rs @@ -8,7 +8,7 @@ use sqlx::Row; use uuid::Uuid; use utoipa::ToSchema; -use crate::{error::ApiError, AppState}; +use crate::{error::ApiError, state::AppState}; #[derive(Deserialize, ToSchema)] pub struct CreateTokenRequest { diff --git a/apps/indexer/Cargo.toml b/apps/indexer/Cargo.toml index e6da562..dd1b0c2 100644 --- a/apps/indexer/Cargo.toml +++ b/apps/indexer/Cargo.toml @@ -4,6 +4,8 @@ version.workspace = true edition.workspace = true license.workspace = true +[lib] + [dependencies] anyhow.workspace = true axum.workspace = true diff --git a/apps/indexer/src/api.rs b/apps/indexer/src/api.rs new file mode 100644 index 0000000..21b2bb6 --- /dev/null +++ b/apps/indexer/src/api.rs @@ -0,0 +1,16 @@ +use axum::{extract::State, http::StatusCode, Json}; +use serde_json; + +use crate::AppState; + +pub async fn health() -> &'static str { + "ok" +} + +pub async fn ready(State(state): State) -> Result, StatusCode> { + sqlx::query("SELECT 1") + .execute(&state.pool) + .await + .map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?; + Ok(Json(serde_json::json!({"status": "ready"}))) +} diff --git a/apps/indexer/src/batch.rs b/apps/indexer/src/batch.rs new file mode 100644 index 0000000..0015aef --- /dev/null +++ b/apps/indexer/src/batch.rs @@ -0,0 +1,233 @@ +use anyhow::Result; +use chrono::{DateTime, Utc}; +use sqlx::PgPool; +use uuid::Uuid; + +// Batched update data structures +pub struct BookUpdate { + pub book_id: Uuid, + pub title: String, + pub kind: String, + pub series: Option, + pub volume: Option, + pub page_count: Option, +} + +pub struct FileUpdate { + pub file_id: Uuid, + pub format: String, + pub size_bytes: i64, + pub mtime: DateTime, + pub fingerprint: String, +} + +pub struct BookInsert { + pub book_id: Uuid, + pub library_id: Uuid, + pub kind: String, + pub title: String, + pub series: Option, + pub volume: Option, + pub page_count: Option, + pub thumbnail_path: Option, +} + +pub struct FileInsert { + pub file_id: Uuid, + pub book_id: Uuid, + pub format: String, + pub abs_path: String, + pub size_bytes: i64, + pub mtime: DateTime, + pub fingerprint: String, + pub parse_status: String, + pub parse_error: Option, +} + +pub struct ErrorInsert { + pub job_id: Uuid, + pub file_path: String, + pub error_message: String, +} + +pub async fn flush_all_batches( + pool: &PgPool, + books_update: &mut Vec, + files_update: &mut Vec, + books_insert: &mut Vec, + files_insert: &mut Vec, + errors_insert: &mut Vec, +) -> Result<()> { + if books_update.is_empty() && files_update.is_empty() && books_insert.is_empty() && files_insert.is_empty() && errors_insert.is_empty() { + return Ok(()); + } + + let start = std::time::Instant::now(); + let mut tx = pool.begin().await?; + + // Batch update books using UNNEST + if !books_update.is_empty() { + let book_ids: Vec = books_update.iter().map(|b| b.book_id).collect(); + let titles: Vec = books_update.iter().map(|b| b.title.clone()).collect(); + let kinds: Vec = books_update.iter().map(|b| b.kind.clone()).collect(); + let series: Vec> = books_update.iter().map(|b| b.series.clone()).collect(); + let volumes: Vec> = books_update.iter().map(|b| b.volume).collect(); + let page_counts: Vec> = books_update.iter().map(|b| b.page_count).collect(); + + sqlx::query( + r#" + UPDATE books SET + title = data.title, + kind = data.kind, + series = data.series, + volume = data.volume, + page_count = data.page_count, + updated_at = NOW() + FROM ( + SELECT * FROM UNNEST($1::uuid[], $2::text[], $3::text[], $4::text[], $5::int[], $6::int[]) + AS t(book_id, title, kind, series, volume, page_count) + ) AS data + WHERE books.id = data.book_id + "# + ) + .bind(&book_ids) + .bind(&titles) + .bind(&kinds) + .bind(&series) + .bind(&volumes) + .bind(&page_counts) + .execute(&mut *tx) + .await?; + + books_update.clear(); + } + + // Batch update files using UNNEST + if !files_update.is_empty() { + let file_ids: Vec = files_update.iter().map(|f| f.file_id).collect(); + let formats: Vec = files_update.iter().map(|f| f.format.clone()).collect(); + let sizes: Vec = files_update.iter().map(|f| f.size_bytes).collect(); + let mtimes: Vec> = files_update.iter().map(|f| f.mtime).collect(); + let fingerprints: Vec = files_update.iter().map(|f| f.fingerprint.clone()).collect(); + + sqlx::query( + r#" + UPDATE book_files SET + format = data.format, + size_bytes = data.size, + mtime = data.mtime, + fingerprint = data.fp, + parse_status = 'ok', + parse_error_opt = NULL, + updated_at = NOW() + FROM ( + SELECT * FROM UNNEST($1::uuid[], $2::text[], $3::bigint[], $4::timestamptz[], $5::text[]) + AS t(file_id, format, size, mtime, fp) + ) AS data + WHERE book_files.id = data.file_id + "# + ) + .bind(&file_ids) + .bind(&formats) + .bind(&sizes) + .bind(&mtimes) + .bind(&fingerprints) + .execute(&mut *tx) + .await?; + + files_update.clear(); + } + + // Batch insert books using UNNEST + if !books_insert.is_empty() { + let book_ids: Vec = books_insert.iter().map(|b| b.book_id).collect(); + let library_ids: Vec = books_insert.iter().map(|b| b.library_id).collect(); + let kinds: Vec = books_insert.iter().map(|b| b.kind.clone()).collect(); + let titles: Vec = books_insert.iter().map(|b| b.title.clone()).collect(); + let series: Vec> = books_insert.iter().map(|b| b.series.clone()).collect(); + let volumes: Vec> = books_insert.iter().map(|b| b.volume).collect(); + let page_counts: Vec> = books_insert.iter().map(|b| b.page_count).collect(); + let thumbnail_paths: Vec> = books_insert.iter().map(|b| b.thumbnail_path.clone()).collect(); + + sqlx::query( + r#" + INSERT INTO books (id, library_id, kind, title, series, volume, page_count, thumbnail_path) + SELECT * FROM UNNEST($1::uuid[], $2::uuid[], $3::text[], $4::text[], $5::text[], $6::int[], $7::int[], $8::text[]) + AS t(id, library_id, kind, title, series, volume, page_count, thumbnail_path) + "# + ) + .bind(&book_ids) + .bind(&library_ids) + .bind(&kinds) + .bind(&titles) + .bind(&series) + .bind(&volumes) + .bind(&page_counts) + .bind(&thumbnail_paths) + .execute(&mut *tx) + .await?; + + books_insert.clear(); + } + + // Batch insert files using UNNEST + if !files_insert.is_empty() { + let file_ids: Vec = files_insert.iter().map(|f| f.file_id).collect(); + let book_ids: Vec = files_insert.iter().map(|f| f.book_id).collect(); + let formats: Vec = files_insert.iter().map(|f| f.format.clone()).collect(); + let abs_paths: Vec = files_insert.iter().map(|f| f.abs_path.clone()).collect(); + let sizes: Vec = files_insert.iter().map(|f| f.size_bytes).collect(); + let mtimes: Vec> = files_insert.iter().map(|f| f.mtime).collect(); + let fingerprints: Vec = files_insert.iter().map(|f| f.fingerprint.clone()).collect(); + let statuses: Vec = files_insert.iter().map(|f| f.parse_status.clone()).collect(); + let errors: Vec> = files_insert.iter().map(|f| f.parse_error.clone()).collect(); + + sqlx::query( + r#" + INSERT INTO book_files (id, book_id, format, abs_path, size_bytes, mtime, fingerprint, parse_status, parse_error_opt) + SELECT * FROM UNNEST($1::uuid[], $2::uuid[], $3::text[], $4::text[], $5::bigint[], $6::timestamptz[], $7::text[], $8::text[], $9::text[]) + AS t(id, book_id, format, abs_path, size_bytes, mtime, fingerprint, parse_status, parse_error_opt) + "# + ) + .bind(&file_ids) + .bind(&book_ids) + .bind(&formats) + .bind(&abs_paths) + .bind(&sizes) + .bind(&mtimes) + .bind(&fingerprints) + .bind(&statuses) + .bind(&errors) + .execute(&mut *tx) + .await?; + + files_insert.clear(); + } + + // Batch insert errors using UNNEST + if !errors_insert.is_empty() { + let job_ids: Vec = errors_insert.iter().map(|e| e.job_id).collect(); + let file_paths: Vec = errors_insert.iter().map(|e| e.file_path.clone()).collect(); + let messages: Vec = errors_insert.iter().map(|e| e.error_message.clone()).collect(); + + sqlx::query( + r#" + INSERT INTO index_job_errors (job_id, file_path, error_message) + SELECT * FROM UNNEST($1::uuid[], $2::text[], $3::text[]) + AS t(job_id, file_path, error_message) + "# + ) + .bind(&job_ids) + .bind(&file_paths) + .bind(&messages) + .execute(&mut *tx) + .await?; + + errors_insert.clear(); + } + + tx.commit().await?; + tracing::info!("[BATCH] Flushed all batches in {:?}", start.elapsed()); + + Ok(()) +} diff --git a/apps/indexer/src/job.rs b/apps/indexer/src/job.rs new file mode 100644 index 0000000..f6ebab9 --- /dev/null +++ b/apps/indexer/src/job.rs @@ -0,0 +1,293 @@ +use anyhow::Result; +use rayon::prelude::*; +use sqlx::{PgPool, Row}; +use std::time::Duration; +use tracing::{error, info}; +use uuid::Uuid; + +use crate::{meili, scanner, AppState}; + +pub async fn cleanup_stale_jobs(pool: &PgPool) -> Result<()> { + // Mark jobs that have been running for more than 5 minutes as failed + // This handles cases where the indexer was restarted while jobs were running + let result = sqlx::query( + r#" + UPDATE index_jobs + SET status = 'failed', + finished_at = NOW(), + error_opt = 'Job interrupted by indexer restart' + WHERE status = 'running' + AND started_at < NOW() - INTERVAL '5 minutes' + RETURNING id + "# + ) + .fetch_all(pool) + .await?; + + if !result.is_empty() { + let count = result.len(); + let ids: Vec = result.iter() + .map(|row| row.get::("id").to_string()) + .collect(); + info!("[CLEANUP] Marked {} stale job(s) as failed: {}", count, ids.join(", ")); + } + + Ok(()) +} + +pub async fn claim_next_job(pool: &PgPool) -> Result)>> { + let mut tx = pool.begin().await?; + + // Atomically select and lock the next job + // Exclude rebuild/full_rebuild if one is already running + // Prioritize: full_rebuild > rebuild > others + let row = sqlx::query( + r#" + SELECT j.id, j.type, j.library_id + FROM index_jobs j + WHERE j.status = 'pending' + AND ( + -- Allow rebuilds only if no rebuild is running + (j.type IN ('rebuild', 'full_rebuild') AND NOT EXISTS ( + SELECT 1 FROM index_jobs + WHERE status = 'running' + AND type IN ('rebuild', 'full_rebuild') + )) + OR + -- Always allow non-rebuild jobs + j.type NOT IN ('rebuild', 'full_rebuild') + ) + ORDER BY + CASE j.type + WHEN 'full_rebuild' THEN 1 + WHEN 'rebuild' THEN 2 + ELSE 3 + END, + j.created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + "# + ) + .fetch_optional(&mut *tx) + .await?; + + let Some(row) = row else { + tx.commit().await?; + return Ok(None); + }; + + let id: Uuid = row.get("id"); + let job_type: String = row.get("type"); + let library_id: Option = row.get("library_id"); + + // Final check: if this is a rebuild, ensure no rebuild started between SELECT and UPDATE + if job_type == "rebuild" || job_type == "full_rebuild" { + let has_running_rebuild: bool = sqlx::query_scalar( + r#" + SELECT EXISTS( + SELECT 1 FROM index_jobs + WHERE status = 'running' + AND type IN ('rebuild', 'full_rebuild') + AND id != $1 + ) + "# + ) + .bind(id) + .fetch_one(&mut *tx) + .await?; + + if has_running_rebuild { + tx.rollback().await?; + return Ok(None); + } + } + + sqlx::query("UPDATE index_jobs SET status = 'running', started_at = NOW(), error_opt = NULL WHERE id = $1") + .bind(id) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(Some((id, library_id))) +} + +pub async fn fail_job(pool: &PgPool, job_id: Uuid, error_message: &str) -> Result<()> { + sqlx::query("UPDATE index_jobs SET status = 'failed', finished_at = NOW(), error_opt = $2 WHERE id = $1") + .bind(job_id) + .bind(error_message) + .execute(pool) + .await?; + Ok(()) +} + +pub async fn is_job_cancelled(pool: &PgPool, job_id: Uuid) -> Result { + let status: Option = sqlx::query_scalar( + "SELECT status FROM index_jobs WHERE id = $1" + ) + .bind(job_id) + .fetch_optional(pool) + .await?; + + Ok(status.as_deref() == Some("cancelled")) +} + +pub async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Option) -> Result<()> { + info!("[JOB] Processing {} library={:?}", job_id, target_library_id); + + let job_type: String = sqlx::query_scalar("SELECT type FROM index_jobs WHERE id = $1") + .bind(job_id) + .fetch_one(&state.pool) + .await?; + + // Thumbnail jobs: hand off to API and wait for completion (same queue as rebuilds) + if job_type == "thumbnail_rebuild" || job_type == "thumbnail_regenerate" { + sqlx::query( + "UPDATE index_jobs SET status = 'generating_thumbnails', started_at = NOW() WHERE id = $1", + ) + .bind(job_id) + .execute(&state.pool) + .await?; + + let api_base = state.api_base_url.trim_end_matches('/'); + let url = format!("{}/index/jobs/{}/thumbnails/checkup", api_base, job_id); + let client = reqwest::Client::new(); + let res = client + .post(&url) + .header("Authorization", format!("Bearer {}", state.api_bootstrap_token)) + .send() + .await?; + if !res.status().is_success() { + anyhow::bail!("thumbnail checkup API returned {}", res.status()); + } + + // Poll until job is finished (API updates the same row) + let poll_interval = Duration::from_secs(1); + loop { + tokio::time::sleep(poll_interval).await; + let status: String = sqlx::query_scalar("SELECT status FROM index_jobs WHERE id = $1") + .bind(job_id) + .fetch_one(&state.pool) + .await?; + if status == "success" || status == "failed" { + info!("[JOB] Thumbnail job {} finished with status {}", job_id, status); + return Ok(()); + } + } + } + + let is_full_rebuild = job_type == "full_rebuild"; + info!("[JOB] {} type={} full_rebuild={}", job_id, job_type, is_full_rebuild); + + // For full rebuilds, delete existing data first + if is_full_rebuild { + info!("[JOB] Full rebuild: deleting existing data"); + + if let Some(library_id) = target_library_id { + // Delete books and files for specific library + sqlx::query("DELETE FROM book_files WHERE book_id IN (SELECT id FROM books WHERE library_id = $1)") + .bind(library_id) + .execute(&state.pool) + .await?; + sqlx::query("DELETE FROM books WHERE library_id = $1") + .bind(library_id) + .execute(&state.pool) + .await?; + info!("[JOB] Deleted existing data for library {}", library_id); + } else { + // Delete all books and files + sqlx::query("DELETE FROM book_files").execute(&state.pool).await?; + sqlx::query("DELETE FROM books").execute(&state.pool).await?; + info!("[JOB] Deleted all existing data"); + } + } + + let libraries = if let Some(library_id) = target_library_id { + sqlx::query("SELECT id, root_path FROM libraries WHERE id = $1 AND enabled = TRUE") + .bind(library_id) + .fetch_all(&state.pool) + .await? + } else { + sqlx::query("SELECT id, root_path FROM libraries WHERE enabled = TRUE") + .fetch_all(&state.pool) + .await? + }; + + // First pass: count total files for progress estimation (parallel) + let library_paths: Vec = libraries.iter() + .map(|library| crate::utils::remap_libraries_path(&library.get::("root_path"))) + .collect(); + + let total_files: usize = library_paths.par_iter() + .map(|root_path| { + walkdir::WalkDir::new(root_path) + .into_iter() + .filter_map(Result::ok) + .filter(|entry| entry.file_type().is_file() && parsers::detect_format(entry.path()).is_some()) + .count() + }) + .sum(); + + info!("[JOB] Found {} libraries, {} total files to index", libraries.len(), total_files); + + // Update job with total estimate + sqlx::query("UPDATE index_jobs SET total_files = $2 WHERE id = $1") + .bind(job_id) + .bind(total_files as i32) + .execute(&state.pool) + .await?; + + let mut stats = scanner::JobStats { + scanned_files: 0, + indexed_files: 0, + removed_files: 0, + errors: 0, + }; + + // Track processed files across all libraries for accurate progress + let mut total_processed_count = 0i32; + + for library in libraries { + let library_id: Uuid = library.get("id"); + let root_path: String = library.get("root_path"); + let root_path = crate::utils::remap_libraries_path(&root_path); + match scanner::scan_library(state, job_id, library_id, std::path::Path::new(&root_path), &mut stats, &mut total_processed_count, total_files, is_full_rebuild).await { + Ok(()) => {} + Err(err) => { + stats.errors += 1; + error!(library_id = %library_id, error = %err, "library scan failed"); + } + } + } + + meili::sync_meili(&state.pool, &state.meili_url, &state.meili_master_key).await?; + + // Hand off to API for thumbnail checkup (API will set status = 'success' when done) + sqlx::query( + "UPDATE index_jobs SET status = 'generating_thumbnails', stats_json = $2, current_file = NULL, processed_files = $3 WHERE id = $1", + ) + .bind(job_id) + .bind(serde_json::to_value(&stats)?) + .bind(total_processed_count) + .execute(&state.pool) + .await?; + + let api_base = state.api_base_url.trim_end_matches('/'); + let url = format!("{}/index/jobs/{}/thumbnails/checkup", api_base, job_id); + let client = reqwest::Client::new(); + let res = client + .post(&url) + .header("Authorization", format!("Bearer {}", state.api_bootstrap_token)) + .send() + .await; + if let Err(e) = res { + tracing::warn!("[JOB] Failed to trigger thumbnail checkup: {} — API will not generate thumbnails for this job", e); + } else if let Ok(r) = res { + if !r.status().is_success() { + tracing::warn!("[JOB] Thumbnail checkup returned {} — API may not generate thumbnails", r.status()); + } else { + info!("[JOB] Thumbnail checkup started (job {}), API will complete the job", job_id); + } + } + + Ok(()) +} diff --git a/apps/indexer/src/lib.rs b/apps/indexer/src/lib.rs new file mode 100644 index 0000000..17ecc25 --- /dev/null +++ b/apps/indexer/src/lib.rs @@ -0,0 +1,20 @@ +pub mod api; +pub mod batch; +pub mod job; +pub mod meili; +pub mod scheduler; +pub mod scanner; +pub mod utils; +pub mod watcher; +pub mod worker; + +use sqlx::PgPool; + +#[derive(Clone)] +pub struct AppState { + pub pool: PgPool, + pub meili_url: String, + pub meili_master_key: String, + pub api_base_url: String, + pub api_bootstrap_token: String, +} diff --git a/apps/indexer/src/main.rs b/apps/indexer/src/main.rs index 0801dc4..761a800 100644 --- a/apps/indexer/src/main.rs +++ b/apps/indexer/src/main.rs @@ -1,54 +1,8 @@ -use anyhow::Context; -use axum::{extract::State, routing::get, Json, Router}; -use chrono::{DateTime, Utc}; -use axum::http::StatusCode; -use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher}; -use parsers::{detect_format, parse_metadata, BookFormat, ParsedMetadata}; -use rayon::prelude::*; -use serde::Serialize; -use sha2::{Digest, Sha256}; -use sqlx::{postgres::PgPoolOptions, Row}; -use std::{collections::HashMap, path::Path, time::Duration}; +use axum::{routing::get, Router}; +use indexer::{api, AppState}; +use sqlx::postgres::PgPoolOptions; use stripstream_core::config::IndexerConfig; -use tokio::sync::mpsc; -use tracing::{error, info, trace, warn}; -use uuid::Uuid; -use walkdir::WalkDir; - -fn remap_libraries_path(path: &str) -> String { - if let Ok(root) = std::env::var("LIBRARIES_ROOT_PATH") { - if path.starts_with("/libraries/") { - return path.replacen("/libraries", &root, 1); - } - } - path.to_string() -} - -fn unmap_libraries_path(path: &str) -> String { - if let Ok(root) = std::env::var("LIBRARIES_ROOT_PATH") { - if path.starts_with(&root) { - return path.replacen(&root, "/libraries", 1); - } - } - path.to_string() -} - -#[derive(Clone)] -struct AppState { - pool: sqlx::PgPool, - meili_url: String, - meili_master_key: String, - api_base_url: String, - api_bootstrap_token: String, -} - -#[derive(Serialize)] -struct JobStats { - scanned_files: usize, - indexed_files: usize, - removed_files: usize, - errors: usize, -} +use tracing::info; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -72,11 +26,11 @@ async fn main() -> anyhow::Result<()> { api_bootstrap_token: config.api_bootstrap_token.clone(), }; - tokio::spawn(run_worker(state.clone(), config.scan_interval_seconds)); + tokio::spawn(indexer::worker::run_worker(state.clone(), config.scan_interval_seconds)); let app = Router::new() - .route("/health", get(health)) - .route("/ready", get(ready)) + .route("/health", get(api::health)) + .route("/ready", get(api::ready)) .with_state(state.clone()); let listener = tokio::net::TcpListener::bind(&config.listen_addr).await?; @@ -84,1278 +38,3 @@ async fn main() -> anyhow::Result<()> { axum::serve(listener, app).await?; Ok(()) } - -async fn health() -> &'static str { - "ok" -} - -async fn ready(State(state): State) -> Result, StatusCode> { - sqlx::query("SELECT 1") - .execute(&state.pool) - .await - .map_err(|_| StatusCode::SERVICE_UNAVAILABLE)?; - Ok(Json(serde_json::json!({"status": "ready"}))) -} - -async fn cleanup_stale_jobs(pool: &sqlx::PgPool) -> anyhow::Result<()> { - // Mark jobs that have been running for more than 5 minutes as failed - // This handles cases where the indexer was restarted while jobs were running - let result = sqlx::query( - r#" - UPDATE index_jobs - SET status = 'failed', - finished_at = NOW(), - error_opt = 'Job interrupted by indexer restart' - WHERE status = 'running' - AND started_at < NOW() - INTERVAL '5 minutes' - RETURNING id - "# - ) - .fetch_all(pool) - .await?; - - if !result.is_empty() { - let count = result.len(); - let ids: Vec = result.iter() - .map(|row| row.get::("id").to_string()) - .collect(); - info!("[CLEANUP] Marked {} stale job(s) as failed: {}", count, ids.join(", ")); - } - - Ok(()) -} - -async fn run_worker(state: AppState, interval_seconds: u64) { - let wait = Duration::from_secs(interval_seconds.max(1)); - - // Cleanup stale jobs from previous runs - if let Err(err) = cleanup_stale_jobs(&state.pool).await { - error!("[CLEANUP] Failed to cleanup stale jobs: {}", err); - } - - // Start file watcher task - let watcher_state = state.clone(); - let _watcher_handle = tokio::spawn(async move { - info!("[WATCHER] Starting file watcher service"); - if let Err(err) = run_file_watcher(watcher_state).await { - error!("[WATCHER] Error: {}", err); - } - }); - - // Start scheduler task for auto-monitoring - let scheduler_state = state.clone(); - let _scheduler_handle = tokio::spawn(async move { - let scheduler_wait = Duration::from_secs(60); // Check every minute - loop { - if let Err(err) = check_and_schedule_auto_scans(&scheduler_state.pool).await { - error!("[SCHEDULER] Error: {}", err); - } - tokio::time::sleep(scheduler_wait).await; - } - }); - - loop { - match claim_next_job(&state.pool).await { - Ok(Some((job_id, library_id))) => { - info!("[INDEXER] Starting job {} library={:?}", job_id, library_id); - if let Err(err) = process_job(&state, job_id, library_id).await { - let err_str = err.to_string(); - if err_str.contains("cancelled") || err_str.contains("Cancelled") { - info!("[INDEXER] Job {} was cancelled by user", job_id); - // Status is already 'cancelled' in DB, don't change it - } else { - error!("[INDEXER] Job {} failed: {}", job_id, err); - let _ = fail_job(&state.pool, job_id, &err_str).await; - } - } else { - info!("[INDEXER] Job {} completed", job_id); - } - } - Ok(None) => { - trace!("[INDEXER] No pending jobs, waiting..."); - tokio::time::sleep(wait).await; - } - Err(err) => { - error!("[INDEXER] Worker error: {}", err); - tokio::time::sleep(wait).await; - } - } - } -} - -async fn run_file_watcher(state: AppState) -> anyhow::Result<()> { - let (tx, mut rx) = mpsc::channel::<(Uuid, String)>(100); - - // Start watcher refresh loop - let refresh_interval = Duration::from_secs(30); - let pool = state.pool.clone(); - - tokio::spawn(async move { - let mut watched_libraries: HashMap = HashMap::new(); - - loop { - // Get libraries with watcher enabled - match sqlx::query( - "SELECT id, root_path FROM libraries WHERE watcher_enabled = TRUE AND enabled = TRUE" - ) - .fetch_all(&pool) - .await - { - Ok(rows) => { - let current_libraries: HashMap = rows - .into_iter() - .map(|row| { - let id: Uuid = row.get("id"); - let root_path: String = row.get("root_path"); - let local_path = remap_libraries_path(&root_path); - (id, local_path) - }) - .collect(); - - // Check if we need to recreate watcher - let needs_restart = watched_libraries.len() != current_libraries.len() - || watched_libraries.iter().any(|(id, path)| { - current_libraries.get(id) != Some(path) - }); - - if needs_restart { - info!("[WATCHER] Restarting watcher for {} libraries", current_libraries.len()); - - if !current_libraries.is_empty() { - let tx_clone = tx.clone(); - let libraries_clone = current_libraries.clone(); - - match setup_watcher(libraries_clone, tx_clone) { - Ok(_new_watcher) => { - watched_libraries = current_libraries; - info!("[WATCHER] Watching {} libraries", watched_libraries.len()); - } - Err(err) => { - error!("[WATCHER] Failed to setup watcher: {}", err); - } - } - } - } - } - Err(err) => { - error!("[WATCHER] Failed to fetch libraries: {}", err); - } - } - - tokio::time::sleep(refresh_interval).await; - } - }); - - // Process watcher events - while let Some((library_id, file_path)) = rx.recv().await { - info!("[WATCHER] File changed in library {}: {}", library_id, file_path); - - // Check if there's already a pending job for this library - match sqlx::query_scalar::<_, bool>( - "SELECT EXISTS(SELECT 1 FROM index_jobs WHERE library_id = $1 AND status IN ('pending', 'running'))" - ) - .bind(library_id) - .fetch_one(&state.pool) - .await - { - Ok(exists) => { - if !exists { - // Create a quick scan job - let job_id = Uuid::new_v4(); - match sqlx::query( - "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, 'rebuild', 'pending')" - ) - .bind(job_id) - .bind(library_id) - .execute(&state.pool) - .await - { - Ok(_) => info!("[WATCHER] Created job {} for library {}", job_id, library_id), - Err(err) => error!("[WATCHER] Failed to create job: {}", err), - } - } else { - trace!("[WATCHER] Job already pending for library {}, skipping", library_id); - } - } - Err(err) => error!("[WATCHER] Failed to check existing jobs: {}", err), - } - } - - Ok(()) -} - -fn setup_watcher( - libraries: HashMap, - tx: mpsc::Sender<(Uuid, String)>, -) -> anyhow::Result { - let libraries_for_closure = libraries.clone(); - - let mut watcher = notify::recommended_watcher(move |res: Result| { - match res { - Ok(event) => { - if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() { - for path in event.paths { - if let Some((library_id, _)) = libraries_for_closure.iter().find(|(_, root)| { - path.starts_with(root) - }) { - let path_str = path.to_string_lossy().to_string(); - if detect_format(&path).is_some() { - let _ = tx.try_send((*library_id, path_str)); - } - } - } - } - } - Err(err) => error!("[WATCHER] Event error: {}", err), - } - })?; - - // Actually watch the library directories - for (_, root_path) in &libraries { - info!("[WATCHER] Watching directory: {}", root_path); - watcher.watch(std::path::Path::new(root_path), RecursiveMode::Recursive)?; - } - - Ok(watcher) -} - -async fn check_and_schedule_auto_scans(pool: &sqlx::PgPool) -> anyhow::Result<()> { - let libraries = sqlx::query( - r#" - SELECT id, scan_mode, last_scan_at - FROM libraries - WHERE monitor_enabled = TRUE - AND ( - next_scan_at IS NULL - OR next_scan_at <= NOW() - ) - AND NOT EXISTS ( - SELECT 1 FROM index_jobs - WHERE library_id = libraries.id - AND status IN ('pending', 'running') - ) - "# - ) - .fetch_all(pool) - .await?; - - for row in libraries { - let library_id: Uuid = row.get("id"); - let scan_mode: String = row.get("scan_mode"); - - info!("[SCHEDULER] Auto-scanning library {} (mode: {})", library_id, scan_mode); - - let job_id = Uuid::new_v4(); - let job_type = match scan_mode.as_str() { - "full" => "full_rebuild", - _ => "rebuild", - }; - - sqlx::query( - "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, $3, 'pending')" - ) - .bind(job_id) - .bind(library_id) - .bind(job_type) - .execute(pool) - .await?; - - // Update next_scan_at - let interval_minutes = match scan_mode.as_str() { - "hourly" => 60, - "daily" => 1440, - "weekly" => 10080, - _ => 1440, // default daily - }; - - sqlx::query( - "UPDATE libraries SET last_scan_at = NOW(), next_scan_at = NOW() + INTERVAL '1 minute' * $2 WHERE id = $1" - ) - .bind(library_id) - .bind(interval_minutes) - .execute(pool) - .await?; - - info!("[SCHEDULER] Created job {} for library {}", job_id, library_id); - } - - Ok(()) -} - -async fn claim_next_job(pool: &sqlx::PgPool) -> anyhow::Result)>> { - let mut tx = pool.begin().await?; - let row = sqlx::query( - r#" - SELECT id, library_id - FROM index_jobs - WHERE status = 'pending' - ORDER BY created_at ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - "#, - ) - .fetch_optional(&mut *tx) - .await?; - - let Some(row) = row else { - tx.commit().await?; - return Ok(None); - }; - - let id: Uuid = row.get("id"); - let library_id: Option = row.get("library_id"); - - sqlx::query("UPDATE index_jobs SET status = 'running', started_at = NOW(), error_opt = NULL WHERE id = $1") - .bind(id) - .execute(&mut *tx) - .await?; - - tx.commit().await?; - Ok(Some((id, library_id))) -} - -async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Option) -> anyhow::Result<()> { - info!("[JOB] Processing {} library={:?}", job_id, target_library_id); - - let job_type: String = sqlx::query_scalar("SELECT type FROM index_jobs WHERE id = $1") - .bind(job_id) - .fetch_one(&state.pool) - .await?; - - // Thumbnail jobs: hand off to API and wait for completion (same queue as rebuilds) - if job_type == "thumbnail_rebuild" || job_type == "thumbnail_regenerate" { - sqlx::query( - "UPDATE index_jobs SET status = 'generating_thumbnails', started_at = NOW() WHERE id = $1", - ) - .bind(job_id) - .execute(&state.pool) - .await?; - - let api_base = state.api_base_url.trim_end_matches('/'); - let url = format!("{}/index/jobs/{}/thumbnails/checkup", api_base, job_id); - let client = reqwest::Client::new(); - let res = client - .post(&url) - .header("Authorization", format!("Bearer {}", state.api_bootstrap_token)) - .send() - .await?; - if !res.status().is_success() { - anyhow::bail!("thumbnail checkup API returned {}", res.status()); - } - - // Poll until job is finished (API updates the same row) - let poll_interval = Duration::from_secs(1); - loop { - tokio::time::sleep(poll_interval).await; - let status: String = sqlx::query_scalar("SELECT status FROM index_jobs WHERE id = $1") - .bind(job_id) - .fetch_one(&state.pool) - .await?; - if status == "success" || status == "failed" { - info!("[JOB] Thumbnail job {} finished with status {}", job_id, status); - return Ok(()); - } - } - } - - let is_full_rebuild = job_type == "full_rebuild"; - info!("[JOB] {} type={} full_rebuild={}", job_id, job_type, is_full_rebuild); - - // For full rebuilds, delete existing data first - if is_full_rebuild { - info!("[JOB] Full rebuild: deleting existing data"); - - if let Some(library_id) = target_library_id { - // Delete books and files for specific library - sqlx::query("DELETE FROM book_files WHERE book_id IN (SELECT id FROM books WHERE library_id = $1)") - .bind(library_id) - .execute(&state.pool) - .await?; - sqlx::query("DELETE FROM books WHERE library_id = $1") - .bind(library_id) - .execute(&state.pool) - .await?; - info!("[JOB] Deleted existing data for library {}", library_id); - } else { - // Delete all books and files - sqlx::query("DELETE FROM book_files").execute(&state.pool).await?; - sqlx::query("DELETE FROM books").execute(&state.pool).await?; - info!("[JOB] Deleted all existing data"); - } - } - - let libraries = if let Some(library_id) = target_library_id { - sqlx::query("SELECT id, root_path FROM libraries WHERE id = $1 AND enabled = TRUE") - .bind(library_id) - .fetch_all(&state.pool) - .await? - } else { - sqlx::query("SELECT id, root_path FROM libraries WHERE enabled = TRUE") - .fetch_all(&state.pool) - .await? - }; - - // First pass: count total files for progress estimation (parallel) - let library_paths: Vec = libraries.iter() - .map(|library| remap_libraries_path(&library.get::("root_path"))) - .collect(); - - let total_files: usize = library_paths.par_iter() - .map(|root_path| { - WalkDir::new(root_path) - .into_iter() - .filter_map(Result::ok) - .filter(|entry| entry.file_type().is_file() && detect_format(entry.path()).is_some()) - .count() - }) - .sum(); - - info!("[JOB] Found {} libraries, {} total files to index", libraries.len(), total_files); - - // Update job with total estimate - sqlx::query("UPDATE index_jobs SET total_files = $2 WHERE id = $1") - .bind(job_id) - .bind(total_files as i32) - .execute(&state.pool) - .await?; - - let mut stats = JobStats { - scanned_files: 0, - indexed_files: 0, - removed_files: 0, - errors: 0, - }; - - // Track processed files across all libraries for accurate progress - let mut total_processed_count = 0i32; - - for library in libraries { - let library_id: Uuid = library.get("id"); - let root_path: String = library.get("root_path"); - let root_path = remap_libraries_path(&root_path); - match scan_library(state, job_id, library_id, Path::new(&root_path), &mut stats, &mut total_processed_count, total_files, is_full_rebuild).await { - Ok(()) => {} - Err(err) => { - stats.errors += 1; - error!(library_id = %library_id, error = %err, "library scan failed"); - } - } - } - - sync_meili(&state.pool, &state.meili_url, &state.meili_master_key).await?; - - // Hand off to API for thumbnail checkup (API will set status = 'success' when done) - sqlx::query( - "UPDATE index_jobs SET status = 'generating_thumbnails', stats_json = $2, current_file = NULL, processed_files = $3 WHERE id = $1", - ) - .bind(job_id) - .bind(serde_json::to_value(&stats)?) - .bind(total_processed_count) - .execute(&state.pool) - .await?; - - let api_base = state.api_base_url.trim_end_matches('/'); - let url = format!("{}/index/jobs/{}/thumbnails/checkup", api_base, job_id); - let client = reqwest::Client::new(); - let res = client - .post(&url) - .header("Authorization", format!("Bearer {}", state.api_bootstrap_token)) - .send() - .await; - if let Err(e) = res { - warn!("[JOB] Failed to trigger thumbnail checkup: {} — API will not generate thumbnails for this job", e); - } else if let Ok(r) = res { - if !r.status().is_success() { - warn!("[JOB] Thumbnail checkup returned {} — API may not generate thumbnails", r.status()); - } else { - info!("[JOB] Thumbnail checkup started (job {}), API will complete the job", job_id); - } - } - - Ok(()) -} - -async fn fail_job(pool: &sqlx::PgPool, job_id: Uuid, error_message: &str) -> anyhow::Result<()> { - sqlx::query("UPDATE index_jobs SET status = 'failed', finished_at = NOW(), error_opt = $2 WHERE id = $1") - .bind(job_id) - .bind(error_message) - .execute(pool) - .await?; - Ok(()) -} - -// Batched update data structures -struct BookUpdate { - book_id: Uuid, - title: String, - kind: String, - series: Option, - volume: Option, - page_count: Option, -} - -struct FileUpdate { - file_id: Uuid, - format: String, - size_bytes: i64, - mtime: DateTime, - fingerprint: String, -} - -struct BookInsert { - book_id: Uuid, - library_id: Uuid, - kind: String, - title: String, - series: Option, - volume: Option, - page_count: Option, - thumbnail_path: Option, -} - -struct FileInsert { - file_id: Uuid, - book_id: Uuid, - format: String, - abs_path: String, - size_bytes: i64, - mtime: DateTime, - fingerprint: String, - parse_status: String, - parse_error: Option, -} - -struct ErrorInsert { - job_id: Uuid, - file_path: String, - error_message: String, -} - -async fn flush_all_batches( - pool: &sqlx::PgPool, - books_update: &mut Vec, - files_update: &mut Vec, - books_insert: &mut Vec, - files_insert: &mut Vec, - errors_insert: &mut Vec, -) -> anyhow::Result<()> { - if books_update.is_empty() && files_update.is_empty() && books_insert.is_empty() && files_insert.is_empty() && errors_insert.is_empty() { - return Ok(()); - } - - let start = std::time::Instant::now(); - let mut tx = pool.begin().await?; - - // Batch update books using UNNEST - if !books_update.is_empty() { - let book_ids: Vec = books_update.iter().map(|b| b.book_id).collect(); - let titles: Vec = books_update.iter().map(|b| b.title.clone()).collect(); - let kinds: Vec = books_update.iter().map(|b| b.kind.clone()).collect(); - let series: Vec> = books_update.iter().map(|b| b.series.clone()).collect(); - let volumes: Vec> = books_update.iter().map(|b| b.volume).collect(); - let page_counts: Vec> = books_update.iter().map(|b| b.page_count).collect(); - - sqlx::query( - r#" - UPDATE books SET - title = data.title, - kind = data.kind, - series = data.series, - volume = data.volume, - page_count = data.page_count, - updated_at = NOW() - FROM ( - SELECT * FROM UNNEST($1::uuid[], $2::text[], $3::text[], $4::text[], $5::int[], $6::int[]) - AS t(book_id, title, kind, series, volume, page_count) - ) AS data - WHERE books.id = data.book_id - "# - ) - .bind(&book_ids) - .bind(&titles) - .bind(&kinds) - .bind(&series) - .bind(&volumes) - .bind(&page_counts) - .execute(&mut *tx) - .await?; - - books_update.clear(); - } - - // Batch update files using UNNEST - if !files_update.is_empty() { - let file_ids: Vec = files_update.iter().map(|f| f.file_id).collect(); - let formats: Vec = files_update.iter().map(|f| f.format.clone()).collect(); - let sizes: Vec = files_update.iter().map(|f| f.size_bytes).collect(); - let mtimes: Vec> = files_update.iter().map(|f| f.mtime).collect(); - let fingerprints: Vec = files_update.iter().map(|f| f.fingerprint.clone()).collect(); - - sqlx::query( - r#" - UPDATE book_files SET - format = data.format, - size_bytes = data.size, - mtime = data.mtime, - fingerprint = data.fp, - parse_status = 'ok', - parse_error_opt = NULL, - updated_at = NOW() - FROM ( - SELECT * FROM UNNEST($1::uuid[], $2::text[], $3::bigint[], $4::timestamptz[], $5::text[]) - AS t(file_id, format, size, mtime, fp) - ) AS data - WHERE book_files.id = data.file_id - "# - ) - .bind(&file_ids) - .bind(&formats) - .bind(&sizes) - .bind(&mtimes) - .bind(&fingerprints) - .execute(&mut *tx) - .await?; - - files_update.clear(); - } - - // Batch insert books using UNNEST - if !books_insert.is_empty() { - let book_ids: Vec = books_insert.iter().map(|b| b.book_id).collect(); - let library_ids: Vec = books_insert.iter().map(|b| b.library_id).collect(); - let kinds: Vec = books_insert.iter().map(|b| b.kind.clone()).collect(); - let titles: Vec = books_insert.iter().map(|b| b.title.clone()).collect(); - let series: Vec> = books_insert.iter().map(|b| b.series.clone()).collect(); - let volumes: Vec> = books_insert.iter().map(|b| b.volume).collect(); - let page_counts: Vec> = books_insert.iter().map(|b| b.page_count).collect(); - let thumbnail_paths: Vec> = books_insert.iter().map(|b| b.thumbnail_path.clone()).collect(); - - sqlx::query( - r#" - INSERT INTO books (id, library_id, kind, title, series, volume, page_count, thumbnail_path) - SELECT * FROM UNNEST($1::uuid[], $2::uuid[], $3::text[], $4::text[], $5::text[], $6::int[], $7::int[], $8::text[]) - AS t(id, library_id, kind, title, series, volume, page_count, thumbnail_path) - "# - ) - .bind(&book_ids) - .bind(&library_ids) - .bind(&kinds) - .bind(&titles) - .bind(&series) - .bind(&volumes) - .bind(&page_counts) - .bind(&thumbnail_paths) - .execute(&mut *tx) - .await?; - - books_insert.clear(); - } - - // Batch insert files using UNNEST - if !files_insert.is_empty() { - let file_ids: Vec = files_insert.iter().map(|f| f.file_id).collect(); - let book_ids: Vec = files_insert.iter().map(|f| f.book_id).collect(); - let formats: Vec = files_insert.iter().map(|f| f.format.clone()).collect(); - let abs_paths: Vec = files_insert.iter().map(|f| f.abs_path.clone()).collect(); - let sizes: Vec = files_insert.iter().map(|f| f.size_bytes).collect(); - let mtimes: Vec> = files_insert.iter().map(|f| f.mtime).collect(); - let fingerprints: Vec = files_insert.iter().map(|f| f.fingerprint.clone()).collect(); - let statuses: Vec = files_insert.iter().map(|f| f.parse_status.clone()).collect(); - let errors: Vec> = files_insert.iter().map(|f| f.parse_error.clone()).collect(); - - sqlx::query( - r#" - INSERT INTO book_files (id, book_id, format, abs_path, size_bytes, mtime, fingerprint, parse_status, parse_error_opt) - SELECT * FROM UNNEST($1::uuid[], $2::uuid[], $3::text[], $4::text[], $5::bigint[], $6::timestamptz[], $7::text[], $8::text[], $9::text[]) - AS t(id, book_id, format, abs_path, size_bytes, mtime, fingerprint, parse_status, parse_error_opt) - "# - ) - .bind(&file_ids) - .bind(&book_ids) - .bind(&formats) - .bind(&abs_paths) - .bind(&sizes) - .bind(&mtimes) - .bind(&fingerprints) - .bind(&statuses) - .bind(&errors) - .execute(&mut *tx) - .await?; - - files_insert.clear(); - } - - // Batch insert errors using UNNEST - if !errors_insert.is_empty() { - let job_ids: Vec = errors_insert.iter().map(|e| e.job_id).collect(); - let file_paths: Vec = errors_insert.iter().map(|e| e.file_path.clone()).collect(); - let messages: Vec = errors_insert.iter().map(|e| e.error_message.clone()).collect(); - - sqlx::query( - r#" - INSERT INTO index_job_errors (job_id, file_path, error_message) - SELECT * FROM UNNEST($1::uuid[], $2::text[], $3::text[]) - AS t(job_id, file_path, error_message) - "# - ) - .bind(&job_ids) - .bind(&file_paths) - .bind(&messages) - .execute(&mut *tx) - .await?; - - errors_insert.clear(); - } - - tx.commit().await?; - info!("[BATCH] Flushed all batches in {:?}", start.elapsed()); - - Ok(()) -} - -// Check if job has been cancelled -async fn is_job_cancelled(pool: &sqlx::PgPool, job_id: Uuid) -> anyhow::Result { - let status: Option = sqlx::query_scalar( - "SELECT status FROM index_jobs WHERE id = $1" - ) - .bind(job_id) - .fetch_optional(pool) - .await?; - - Ok(status.as_deref() == Some("cancelled")) -} - -async fn scan_library( - state: &AppState, - job_id: Uuid, - library_id: Uuid, - root: &Path, - stats: &mut JobStats, - total_processed_count: &mut i32, - total_files: usize, - is_full_rebuild: bool, -) -> anyhow::Result<()> { - info!("[SCAN] Starting scan of library {} at path: {} (full_rebuild={})", library_id, root.display(), is_full_rebuild); - - let existing_rows = sqlx::query( - r#" - SELECT bf.id AS file_id, bf.book_id, bf.abs_path, bf.fingerprint - FROM book_files bf - JOIN books b ON b.id = bf.book_id - WHERE b.library_id = $1 - "#, - ) - .bind(library_id) - .fetch_all(&state.pool) - .await?; - - let mut existing: HashMap = HashMap::new(); - if !is_full_rebuild { - for row in existing_rows { - let abs_path: String = row.get("abs_path"); - let remapped_path = remap_libraries_path(&abs_path); - existing.insert( - remapped_path, - (row.get("file_id"), row.get("book_id"), row.get("fingerprint")), - ); - } - info!("[SCAN] Found {} existing files in database for library {}", existing.len(), library_id); - } else { - info!("[SCAN] Full rebuild: skipping existing files lookup (all will be treated as new)"); - } - - let mut seen: HashMap = HashMap::new(); - let mut library_processed_count = 0i32; - let mut last_progress_update = std::time::Instant::now(); - - // Batching buffers - const BATCH_SIZE: usize = 100; - let mut books_to_update: Vec = Vec::with_capacity(BATCH_SIZE); - let mut files_to_update: Vec = Vec::with_capacity(BATCH_SIZE); - let mut books_to_insert: Vec = Vec::with_capacity(BATCH_SIZE); - let mut files_to_insert: Vec = Vec::with_capacity(BATCH_SIZE); - let mut errors_to_insert: Vec = Vec::with_capacity(BATCH_SIZE); - - // Step 1: Collect all book files first - #[derive(Clone)] - struct FileInfo { - path: std::path::PathBuf, - format: BookFormat, - abs_path: String, - file_name: String, - metadata: std::fs::Metadata, - mtime: DateTime, - fingerprint: String, - lookup_path: String, - } - - let mut file_infos: Vec = Vec::new(); - for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) { - if !entry.file_type().is_file() { - continue; - } - - let path = entry.path().to_path_buf(); - let Some(format) = detect_format(&path) else { - trace!("[SCAN] Skipping non-book file: {}", path.display()); - continue; - }; - - info!("[SCAN] Found book file: {} (format: {:?})", path.display(), format); - stats.scanned_files += 1; - - let abs_path_local = path.to_string_lossy().to_string(); - let abs_path = unmap_libraries_path(&abs_path_local); - let file_name = path.file_name() - .map(|s| s.to_string_lossy().to_string()) - .unwrap_or_else(|| abs_path.clone()); - - let metadata = std::fs::metadata(&path) - .with_context(|| format!("cannot stat {}", path.display()))?; - let mtime: DateTime = metadata - .modified() - .map(DateTime::::from) - .unwrap_or_else(|_| Utc::now()); - let fingerprint = compute_fingerprint(&path, metadata.len(), &mtime)?; - let lookup_path = remap_libraries_path(&abs_path); - - file_infos.push(FileInfo { - path, - format, - abs_path, - file_name, - metadata, - mtime, - fingerprint, - lookup_path, - }); - } - - info!("[SCAN] Collected {} files, starting parallel parsing", file_infos.len()); - - // Step 2: Parse metadata in parallel - let parsed_results: Vec<(FileInfo, anyhow::Result)> = file_infos - .into_par_iter() - .map(|file_info| { - let parse_result = parse_metadata(&file_info.path, file_info.format, root); - (file_info, parse_result) - }) - .collect(); - - info!("[SCAN] Completed parallel parsing, processing {} results", parsed_results.len()); - - // Step 3: Process results sequentially for batch inserts - for (file_info, parse_result) in parsed_results { - library_processed_count += 1; - *total_processed_count += 1; - - // Update progress in DB every 1 second or every 10 files - let should_update_progress = last_progress_update.elapsed() > Duration::from_secs(1) || library_processed_count % 10 == 0; - if should_update_progress { - let progress_percent = if total_files > 0 { - ((*total_processed_count as f64 / total_files as f64) * 100.0) as i32 - } else { - 0 - }; - - sqlx::query( - "UPDATE index_jobs SET current_file = $2, processed_files = $3, progress_percent = $4 WHERE id = $1" - ) - .bind(job_id) - .bind(&file_info.file_name) - .bind(*total_processed_count) - .bind(progress_percent) - .execute(&state.pool) - .await - .map_err(|e| { - error!("[BDD] Failed to update progress for job {}: {}", job_id, e); - e - })?; - - last_progress_update = std::time::Instant::now(); - - // Check if job has been cancelled - if is_job_cancelled(&state.pool, job_id).await? { - info!("[JOB] Job {} cancelled by user, stopping...", job_id); - // Flush any pending batches before exiting - flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; - return Err(anyhow::anyhow!("Job cancelled by user")); - } - } - - let seen_key = remap_libraries_path(&file_info.abs_path); - seen.insert(seen_key.clone(), true); - - if let Some((file_id, book_id, old_fingerprint)) = existing.get(&file_info.lookup_path).cloned() { - if !is_full_rebuild && old_fingerprint == file_info.fingerprint { - trace!("[PROCESS] Skipping unchanged file: {}", file_info.file_name); - continue; - } - - info!("[PROCESS] Updating existing file: {} (full_rebuild={}, fingerprint_match={})", file_info.file_name, is_full_rebuild, old_fingerprint == file_info.fingerprint); - - match parse_result { - Ok(parsed) => { - books_to_update.push(BookUpdate { - book_id, - title: parsed.title, - kind: kind_from_format(file_info.format).to_string(), - series: parsed.series, - volume: parsed.volume, - page_count: parsed.page_count, - }); - - files_to_update.push(FileUpdate { - file_id, - format: file_info.format.as_str().to_string(), - size_bytes: file_info.metadata.len() as i64, - mtime: file_info.mtime, - fingerprint: file_info.fingerprint, - }); - - stats.indexed_files += 1; - } - Err(err) => { - warn!("[PARSER] Failed to parse {}: {}", file_info.file_name, err); - stats.errors += 1; - - files_to_update.push(FileUpdate { - file_id, - format: file_info.format.as_str().to_string(), - size_bytes: file_info.metadata.len() as i64, - mtime: file_info.mtime, - fingerprint: file_info.fingerprint.clone(), - }); - - errors_to_insert.push(ErrorInsert { - job_id, - file_path: file_info.abs_path.clone(), - error_message: err.to_string(), - }); - - // Also need to mark file as error - we'll do this separately - sqlx::query( - "UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE id = $1" - ) - .bind(file_id) - .bind(err.to_string()) - .execute(&state.pool) - .await?; - } - } - - // Flush if batch is full - if books_to_update.len() >= BATCH_SIZE || files_to_update.len() >= BATCH_SIZE { - flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; - } - - continue; - } - - // New file (thumbnails generated by API after job handoff) - info!("[PROCESS] Inserting new file: {}", file_info.file_name); - let book_id = Uuid::new_v4(); - - match parse_result { - Ok(parsed) => { - let file_id = Uuid::new_v4(); - - books_to_insert.push(BookInsert { - book_id, - library_id, - kind: kind_from_format(file_info.format).to_string(), - title: parsed.title, - series: parsed.series, - volume: parsed.volume, - page_count: parsed.page_count, - thumbnail_path: None, - }); - - files_to_insert.push(FileInsert { - file_id, - book_id, - format: file_info.format.as_str().to_string(), - abs_path: file_info.abs_path.clone(), - size_bytes: file_info.metadata.len() as i64, - mtime: file_info.mtime, - fingerprint: file_info.fingerprint, - parse_status: "ok".to_string(), - parse_error: None, - }); - - stats.indexed_files += 1; - } - Err(err) => { - warn!("[PARSER] Failed to parse {}: {}", file_info.file_name, err); - stats.errors += 1; - let book_id = Uuid::new_v4(); - let file_id = Uuid::new_v4(); - - books_to_insert.push(BookInsert { - book_id, - library_id, - kind: kind_from_format(file_info.format).to_string(), - title: file_display_name(&file_info.path), - series: None, - volume: None, - page_count: None, - thumbnail_path: None, - }); - - files_to_insert.push(FileInsert { - file_id, - book_id, - format: file_info.format.as_str().to_string(), - abs_path: file_info.abs_path.clone(), - size_bytes: file_info.metadata.len() as i64, - mtime: file_info.mtime, - fingerprint: file_info.fingerprint, - parse_status: "error".to_string(), - parse_error: Some(err.to_string()), - }); - - errors_to_insert.push(ErrorInsert { - job_id, - file_path: file_info.abs_path, - error_message: err.to_string(), - }); - } - } - - // Flush if batch is full - if books_to_insert.len() >= BATCH_SIZE || files_to_insert.len() >= BATCH_SIZE { - flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; - } - } - - // Final flush of any remaining items - flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; - - info!("[SCAN] Library {} scan complete: {} files scanned, {} indexed, {} errors", - library_id, library_processed_count, stats.indexed_files, stats.errors); - - // Handle deletions - let mut removed_count = 0usize; - for (abs_path, (file_id, book_id, _)) in existing { - if seen.contains_key(&abs_path) { - continue; - } - sqlx::query("DELETE FROM book_files WHERE id = $1") - .bind(file_id) - .execute(&state.pool) - .await?; - sqlx::query("DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)") - .bind(book_id) - .execute(&state.pool) - .await?; - stats.removed_files += 1; - removed_count += 1; - } - - if removed_count > 0 { - info!("[SCAN] Removed {} stale files from database", removed_count); - } - - Ok(()) -} - -fn compute_fingerprint(path: &Path, size: u64, mtime: &DateTime) -> anyhow::Result { - // Optimized: only use size + mtime + first bytes of filename for fast fingerprinting - // This is 100x faster than reading file content while still being reliable for change detection - let mut hasher = Sha256::new(); - hasher.update(size.to_le_bytes()); - hasher.update(mtime.timestamp().to_le_bytes()); - - // Add filename for extra uniqueness (in case of rapid changes with same size+mtime) - if let Some(filename) = path.file_name() { - hasher.update(filename.as_encoded_bytes()); - } - - Ok(format!("{:x}", hasher.finalize())) -} - -fn kind_from_format(format: BookFormat) -> &'static str { - match format { - BookFormat::Pdf => "ebook", - BookFormat::Cbz | BookFormat::Cbr => "comic", - } -} - -fn file_display_name(path: &Path) -> String { - path.file_stem() - .map(|s| s.to_string_lossy().to_string()) - .unwrap_or_else(|| "Untitled".to_string()) -} - -#[derive(Serialize)] -struct SearchDoc { - id: String, - library_id: String, - kind: String, - title: String, - author: Option, - series: Option, - volume: Option, - language: Option, -} - -async fn sync_meili(pool: &sqlx::PgPool, meili_url: &str, meili_master_key: &str) -> anyhow::Result<()> { - let client = reqwest::Client::new(); - let base = meili_url.trim_end_matches('/'); - - // Ensure index exists and has proper settings - let _ = client - .post(format!("{base}/indexes")) - .header("Authorization", format!("Bearer {meili_master_key}")) - .json(&serde_json::json!({"uid": "books", "primaryKey": "id"})) - .send() - .await; - - let _ = client - .patch(format!("{base}/indexes/books/settings/filterable-attributes")) - .header("Authorization", format!("Bearer {meili_master_key}")) - .json(&serde_json::json!(["library_id", "kind"])) - .send() - .await; - - // Get last sync timestamp - let last_sync: Option> = sqlx::query_scalar( - "SELECT last_meili_sync FROM sync_metadata WHERE id = 1 AND last_meili_sync IS NOT NULL" - ) - .fetch_optional(pool) - .await?; - - // If no previous sync, do a full sync - let is_full_sync = last_sync.is_none(); - - // Get books to sync: all if full sync, only modified since last sync otherwise - let rows = if is_full_sync { - info!("[MEILI] Performing full sync"); - sqlx::query( - "SELECT id, library_id, kind, title, author, series, volume, language, updated_at FROM books", - ) - .fetch_all(pool) - .await? - } else { - let since = last_sync.unwrap(); - info!("[MEILI] Performing incremental sync since {}", since); - - // Also get deleted book IDs to remove from MeiliSearch - // For now, we'll do a diff approach: get all book IDs from DB and compare with Meili - sqlx::query( - "SELECT id, library_id, kind, title, author, series, volume, language, updated_at FROM books WHERE updated_at > $1", - ) - .bind(since) - .fetch_all(pool) - .await? - }; - - if rows.is_empty() && !is_full_sync { - info!("[MEILI] No changes to sync"); - // Still update the timestamp - sqlx::query( - "INSERT INTO sync_metadata (id, last_meili_sync) VALUES (1, NOW()) ON CONFLICT (id) DO UPDATE SET last_meili_sync = NOW()" - ) - .execute(pool) - .await?; - return Ok(()); - } - - let docs: Vec = rows - .into_iter() - .map(|row| SearchDoc { - id: row.get::("id").to_string(), - library_id: row.get::("library_id").to_string(), - kind: row.get("kind"), - title: row.get("title"), - author: row.get("author"), - series: row.get("series"), - volume: row.get("volume"), - language: row.get("language"), - }) - .collect(); - - let doc_count = docs.len(); - - // Send documents to MeiliSearch in batches of 1000 - const MEILI_BATCH_SIZE: usize = 1000; - for (i, chunk) in docs.chunks(MEILI_BATCH_SIZE).enumerate() { - let batch_num = i + 1; - info!("[MEILI] Sending batch {}/{} ({} docs)", batch_num, (doc_count + MEILI_BATCH_SIZE - 1) / MEILI_BATCH_SIZE, chunk.len()); - - let response = client - .post(format!("{base}/indexes/books/documents")) - .header("Authorization", format!("Bearer {meili_master_key}")) - .json(&chunk) - .send() - .await - .context("failed to send docs to meili")?; - - if !response.status().is_success() { - let status = response.status(); - let body = response.text().await.unwrap_or_default(); - return Err(anyhow::anyhow!("MeiliSearch error {}: {}", status, body)); - } - } - - // Handle deletions: get all book IDs from DB and remove from MeiliSearch any that don't exist - // This is expensive, so we only do it periodically (every 10 syncs) or on full syncs - if is_full_sync || rand::random::() < 26 { // ~10% chance - info!("[MEILI] Checking for documents to delete"); - - // Get all book IDs from database - let db_ids: Vec = sqlx::query_scalar("SELECT id::text FROM books") - .fetch_all(pool) - .await?; - - // Get all document IDs from MeiliSearch (this requires fetching all documents) - // For efficiency, we'll just delete by query for documents that might be stale - // A better approach would be to track deletions in a separate table - - // For now, we'll do a simple approach: fetch all Meili docs and compare - // Note: This could be slow for large collections - let meili_response = client - .post(format!("{base}/indexes/books/documents/fetch")) - .header("Authorization", format!("Bearer {meili_master_key}")) - .json(&serde_json::json!({ - "fields": ["id"], - "limit": 100000 - })) - .send() - .await; - - if let Ok(response) = meili_response { - if response.status().is_success() { - if let Ok(meili_docs) = response.json::>().await { - let meili_ids: std::collections::HashSet = meili_docs - .into_iter() - .filter_map(|doc| doc.get("id").and_then(|id| id.as_str()).map(|s| s.to_string())) - .collect(); - - let db_ids_set: std::collections::HashSet = db_ids.into_iter().collect(); - let to_delete: Vec = meili_ids.difference(&db_ids_set).cloned().collect(); - - if !to_delete.is_empty() { - info!("[MEILI] Deleting {} stale documents", to_delete.len()); - let _ = client - .post(format!("{base}/indexes/books/documents/delete-batch")) - .header("Authorization", format!("Bearer {meili_master_key}")) - .json(&to_delete) - .send() - .await; - } - } - } - } - } - - // Update last sync timestamp - sqlx::query( - "INSERT INTO sync_metadata (id, last_meili_sync) VALUES (1, NOW()) ON CONFLICT (id) DO UPDATE SET last_meili_sync = NOW()" - ) - .execute(pool) - .await?; - - info!("[MEILI] Sync completed: {} documents indexed", doc_count); - Ok(()) -} diff --git a/apps/indexer/src/meili.rs b/apps/indexer/src/meili.rs new file mode 100644 index 0000000..2ddba8d --- /dev/null +++ b/apps/indexer/src/meili.rs @@ -0,0 +1,180 @@ +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use reqwest::Client; +use serde::Serialize; +use sqlx::{PgPool, Row}; +use tracing::info; +use uuid::Uuid; + +#[derive(Serialize)] +struct SearchDoc { + id: String, + library_id: String, + kind: String, + title: String, + author: Option, + series: Option, + volume: Option, + language: Option, +} + +pub async fn sync_meili(pool: &PgPool, meili_url: &str, meili_master_key: &str) -> Result<()> { + let client = Client::new(); + let base = meili_url.trim_end_matches('/'); + + // Ensure index exists and has proper settings + let _ = client + .post(format!("{base}/indexes")) + .header("Authorization", format!("Bearer {meili_master_key}")) + .json(&serde_json::json!({"uid": "books", "primaryKey": "id"})) + .send() + .await; + + let _ = client + .patch(format!("{base}/indexes/books/settings/filterable-attributes")) + .header("Authorization", format!("Bearer {meili_master_key}")) + .json(&serde_json::json!(["library_id", "kind"])) + .send() + .await; + + // Get last sync timestamp + let last_sync: Option> = sqlx::query_scalar( + "SELECT last_meili_sync FROM sync_metadata WHERE id = 1 AND last_meili_sync IS NOT NULL" + ) + .fetch_optional(pool) + .await?; + + // If no previous sync, do a full sync + let is_full_sync = last_sync.is_none(); + + // Get books to sync: all if full sync, only modified since last sync otherwise + let rows = if is_full_sync { + info!("[MEILI] Performing full sync"); + sqlx::query( + "SELECT id, library_id, kind, title, author, series, volume, language, updated_at FROM books", + ) + .fetch_all(pool) + .await? + } else { + let since = last_sync.unwrap(); + info!("[MEILI] Performing incremental sync since {}", since); + + // Also get deleted book IDs to remove from MeiliSearch + // For now, we'll do a diff approach: get all book IDs from DB and compare with Meili + sqlx::query( + "SELECT id, library_id, kind, title, author, series, volume, language, updated_at FROM books WHERE updated_at > $1", + ) + .bind(since) + .fetch_all(pool) + .await? + }; + + if rows.is_empty() && !is_full_sync { + info!("[MEILI] No changes to sync"); + // Still update the timestamp + sqlx::query( + "INSERT INTO sync_metadata (id, last_meili_sync) VALUES (1, NOW()) ON CONFLICT (id) DO UPDATE SET last_meili_sync = NOW()" + ) + .execute(pool) + .await?; + return Ok(()); + } + + let docs: Vec = rows + .into_iter() + .map(|row| SearchDoc { + id: row.get::("id").to_string(), + library_id: row.get::("library_id").to_string(), + kind: row.get("kind"), + title: row.get("title"), + author: row.get("author"), + series: row.get("series"), + volume: row.get("volume"), + language: row.get("language"), + }) + .collect(); + + let doc_count = docs.len(); + + // Send documents to MeiliSearch in batches of 1000 + const MEILI_BATCH_SIZE: usize = 1000; + for (i, chunk) in docs.chunks(MEILI_BATCH_SIZE).enumerate() { + let batch_num = i + 1; + info!("[MEILI] Sending batch {}/{} ({} docs)", batch_num, (doc_count + MEILI_BATCH_SIZE - 1) / MEILI_BATCH_SIZE, chunk.len()); + + let response = client + .post(format!("{base}/indexes/books/documents")) + .header("Authorization", format!("Bearer {meili_master_key}")) + .json(&chunk) + .send() + .await + .context("failed to send docs to meili")?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(anyhow::anyhow!("MeiliSearch error {}: {}", status, body)); + } + } + + // Handle deletions: get all book IDs from DB and remove from MeiliSearch any that don't exist + // This is expensive, so we only do it periodically (every 10 syncs) or on full syncs + if is_full_sync || rand::random::() < 26 { // ~10% chance + info!("[MEILI] Checking for documents to delete"); + + // Get all book IDs from database + let db_ids: Vec = sqlx::query_scalar("SELECT id::text FROM books") + .fetch_all(pool) + .await?; + + // Get all document IDs from MeiliSearch (this requires fetching all documents) + // For efficiency, we'll just delete by query for documents that might be stale + // A better approach would be to track deletions in a separate table + + // For now, we'll do a simple approach: fetch all Meili docs and compare + // Note: This could be slow for large collections + let meili_response = client + .post(format!("{base}/indexes/books/documents/fetch")) + .header("Authorization", format!("Bearer {meili_master_key}")) + .json(&serde_json::json!({ + "fields": ["id"], + "limit": 100000 + })) + .send() + .await; + + if let Ok(response) = meili_response { + if response.status().is_success() { + if let Ok(meili_docs) = response.json::>().await { + let meili_ids: std::collections::HashSet = meili_docs + .into_iter() + .filter_map(|doc| doc.get("id").and_then(|id| id.as_str()).map(|s| s.to_string())) + .collect(); + + let db_ids_set: std::collections::HashSet = db_ids.into_iter().collect(); + let to_delete: Vec = meili_ids.difference(&db_ids_set).cloned().collect(); + + if !to_delete.is_empty() { + info!("[MEILI] Deleting {} stale documents", to_delete.len()); + let _ = client + .post(format!("{base}/indexes/books/documents/delete-batch")) + .header("Authorization", format!("Bearer {meili_master_key}")) + .json(&to_delete) + .send() + .await; + } + } + } + } + } + + // Update last sync timestamp + sqlx::query( + "INSERT INTO sync_metadata (id, last_meili_sync) VALUES (1, NOW()) ON CONFLICT (id) DO UPDATE SET last_meili_sync = NOW()" + ) + .execute(pool) + .await?; + + info!("[MEILI] Sync completed: {} documents indexed", doc_count); + Ok(()) +} diff --git a/apps/indexer/src/scanner.rs b/apps/indexer/src/scanner.rs new file mode 100644 index 0000000..a8883e7 --- /dev/null +++ b/apps/indexer/src/scanner.rs @@ -0,0 +1,360 @@ +use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; +use parsers::{detect_format, parse_metadata, BookFormat, ParsedMetadata}; +use rayon::prelude::*; +use serde::Serialize; +use sqlx::Row; +use std::{collections::HashMap, path::Path, time::Duration}; +use tracing::{error, info, trace, warn}; +use uuid::Uuid; +use walkdir::WalkDir; + +use crate::{ + batch::{flush_all_batches, BookInsert, BookUpdate, ErrorInsert, FileInsert, FileUpdate}, + job::is_job_cancelled, + utils, + AppState, +}; + +#[derive(Serialize)] +pub struct JobStats { + pub scanned_files: usize, + pub indexed_files: usize, + pub removed_files: usize, + pub errors: usize, +} + +const BATCH_SIZE: usize = 100; + +pub async fn scan_library( + state: &AppState, + job_id: Uuid, + library_id: Uuid, + root: &Path, + stats: &mut JobStats, + total_processed_count: &mut i32, + total_files: usize, + is_full_rebuild: bool, +) -> Result<()> { + info!("[SCAN] Starting scan of library {} at path: {} (full_rebuild={})", library_id, root.display(), is_full_rebuild); + + let existing_rows = sqlx::query( + r#" + SELECT bf.id AS file_id, bf.book_id, bf.abs_path, bf.fingerprint + FROM book_files bf + JOIN books b ON b.id = bf.book_id + WHERE b.library_id = $1 + "#, + ) + .bind(library_id) + .fetch_all(&state.pool) + .await?; + + let mut existing: HashMap = HashMap::new(); + if !is_full_rebuild { + for row in existing_rows { + let abs_path: String = row.get("abs_path"); + let remapped_path = utils::remap_libraries_path(&abs_path); + existing.insert( + remapped_path, + (row.get("file_id"), row.get("book_id"), row.get("fingerprint")), + ); + } + info!("[SCAN] Found {} existing files in database for library {}", existing.len(), library_id); + } else { + info!("[SCAN] Full rebuild: skipping existing files lookup (all will be treated as new)"); + } + + let mut seen: HashMap = HashMap::new(); + let mut library_processed_count = 0i32; + let mut last_progress_update = std::time::Instant::now(); + + // Batching buffers + let mut books_to_update: Vec = Vec::with_capacity(BATCH_SIZE); + let mut files_to_update: Vec = Vec::with_capacity(BATCH_SIZE); + let mut books_to_insert: Vec = Vec::with_capacity(BATCH_SIZE); + let mut files_to_insert: Vec = Vec::with_capacity(BATCH_SIZE); + let mut errors_to_insert: Vec = Vec::with_capacity(BATCH_SIZE); + + // Step 1: Collect all book files first + #[derive(Clone)] + struct FileInfo { + path: std::path::PathBuf, + format: BookFormat, + abs_path: String, + file_name: String, + metadata: std::fs::Metadata, + mtime: DateTime, + fingerprint: String, + lookup_path: String, + } + + let mut file_infos: Vec = Vec::new(); + for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) { + if !entry.file_type().is_file() { + continue; + } + + let path = entry.path().to_path_buf(); + let Some(format) = detect_format(&path) else { + trace!("[SCAN] Skipping non-book file: {}", path.display()); + continue; + }; + + info!("[SCAN] Found book file: {} (format: {:?})", path.display(), format); + stats.scanned_files += 1; + + let abs_path_local = path.to_string_lossy().to_string(); + let abs_path = utils::unmap_libraries_path(&abs_path_local); + let file_name = path.file_name() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or_else(|| abs_path.clone()); + + let metadata = std::fs::metadata(&path) + .with_context(|| format!("cannot stat {}", path.display()))?; + let mtime: DateTime = metadata + .modified() + .map(DateTime::::from) + .unwrap_or_else(|_| Utc::now()); + let fingerprint = utils::compute_fingerprint(&path, metadata.len(), &mtime)?; + let lookup_path = utils::remap_libraries_path(&abs_path); + + file_infos.push(FileInfo { + path, + format, + abs_path, + file_name, + metadata, + mtime, + fingerprint, + lookup_path, + }); + } + + info!("[SCAN] Collected {} files, starting parallel parsing", file_infos.len()); + + // Step 2: Parse metadata in parallel + let parsed_results: Vec<(FileInfo, Result)> = file_infos + .into_par_iter() + .map(|file_info| { + let parse_result = parse_metadata(&file_info.path, file_info.format, root); + (file_info, parse_result) + }) + .collect(); + + info!("[SCAN] Completed parallel parsing, processing {} results", parsed_results.len()); + + // Step 3: Process results sequentially for batch inserts + for (file_info, parse_result) in parsed_results { + library_processed_count += 1; + *total_processed_count += 1; + + // Update progress in DB every 1 second or every 10 files + let should_update_progress = last_progress_update.elapsed() > Duration::from_secs(1) || library_processed_count % 10 == 0; + if should_update_progress { + let progress_percent = if total_files > 0 { + ((*total_processed_count as f64 / total_files as f64) * 100.0) as i32 + } else { + 0 + }; + + sqlx::query( + "UPDATE index_jobs SET current_file = $2, processed_files = $3, progress_percent = $4 WHERE id = $1" + ) + .bind(job_id) + .bind(&file_info.file_name) + .bind(*total_processed_count) + .bind(progress_percent) + .execute(&state.pool) + .await + .map_err(|e| { + error!("[BDD] Failed to update progress for job {}: {}", job_id, e); + e + })?; + + last_progress_update = std::time::Instant::now(); + + // Check if job has been cancelled + if is_job_cancelled(&state.pool, job_id).await? { + info!("[JOB] Job {} cancelled by user, stopping...", job_id); + // Flush any pending batches before exiting + flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; + return Err(anyhow::anyhow!("Job cancelled by user")); + } + } + + let seen_key = utils::remap_libraries_path(&file_info.abs_path); + seen.insert(seen_key.clone(), true); + + if let Some((file_id, book_id, old_fingerprint)) = existing.get(&file_info.lookup_path).cloned() { + if !is_full_rebuild && old_fingerprint == file_info.fingerprint { + trace!("[PROCESS] Skipping unchanged file: {}", file_info.file_name); + continue; + } + + info!("[PROCESS] Updating existing file: {} (full_rebuild={}, fingerprint_match={})", file_info.file_name, is_full_rebuild, old_fingerprint == file_info.fingerprint); + + match parse_result { + Ok(parsed) => { + books_to_update.push(BookUpdate { + book_id, + title: parsed.title, + kind: utils::kind_from_format(file_info.format).to_string(), + series: parsed.series, + volume: parsed.volume, + page_count: parsed.page_count, + }); + + files_to_update.push(FileUpdate { + file_id, + format: file_info.format.as_str().to_string(), + size_bytes: file_info.metadata.len() as i64, + mtime: file_info.mtime, + fingerprint: file_info.fingerprint, + }); + + stats.indexed_files += 1; + } + Err(err) => { + warn!("[PARSER] Failed to parse {}: {}", file_info.file_name, err); + stats.errors += 1; + + files_to_update.push(FileUpdate { + file_id, + format: file_info.format.as_str().to_string(), + size_bytes: file_info.metadata.len() as i64, + mtime: file_info.mtime, + fingerprint: file_info.fingerprint.clone(), + }); + + errors_to_insert.push(ErrorInsert { + job_id, + file_path: file_info.abs_path.clone(), + error_message: err.to_string(), + }); + + // Also need to mark file as error - we'll do this separately + sqlx::query( + "UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE id = $1" + ) + .bind(file_id) + .bind(err.to_string()) + .execute(&state.pool) + .await?; + } + } + + // Flush if batch is full + if books_to_update.len() >= BATCH_SIZE || files_to_update.len() >= BATCH_SIZE { + flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; + } + + continue; + } + + // New file (thumbnails generated by API after job handoff) + info!("[PROCESS] Inserting new file: {}", file_info.file_name); + let book_id = Uuid::new_v4(); + + match parse_result { + Ok(parsed) => { + let file_id = Uuid::new_v4(); + + books_to_insert.push(BookInsert { + book_id, + library_id, + kind: utils::kind_from_format(file_info.format).to_string(), + title: parsed.title, + series: parsed.series, + volume: parsed.volume, + page_count: parsed.page_count, + thumbnail_path: None, + }); + + files_to_insert.push(FileInsert { + file_id, + book_id, + format: file_info.format.as_str().to_string(), + abs_path: file_info.abs_path.clone(), + size_bytes: file_info.metadata.len() as i64, + mtime: file_info.mtime, + fingerprint: file_info.fingerprint, + parse_status: "ok".to_string(), + parse_error: None, + }); + + stats.indexed_files += 1; + } + Err(err) => { + warn!("[PARSER] Failed to parse {}: {}", file_info.file_name, err); + stats.errors += 1; + let book_id = Uuid::new_v4(); + let file_id = Uuid::new_v4(); + + books_to_insert.push(BookInsert { + book_id, + library_id, + kind: utils::kind_from_format(file_info.format).to_string(), + title: utils::file_display_name(&file_info.path), + series: None, + volume: None, + page_count: None, + thumbnail_path: None, + }); + + files_to_insert.push(FileInsert { + file_id, + book_id, + format: file_info.format.as_str().to_string(), + abs_path: file_info.abs_path.clone(), + size_bytes: file_info.metadata.len() as i64, + mtime: file_info.mtime, + fingerprint: file_info.fingerprint, + parse_status: "error".to_string(), + parse_error: Some(err.to_string()), + }); + + errors_to_insert.push(ErrorInsert { + job_id, + file_path: file_info.abs_path, + error_message: err.to_string(), + }); + } + } + + // Flush if batch is full + if books_to_insert.len() >= BATCH_SIZE || files_to_insert.len() >= BATCH_SIZE { + flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; + } + } + + // Final flush of any remaining items + flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?; + + info!("[SCAN] Library {} scan complete: {} files scanned, {} indexed, {} errors", + library_id, library_processed_count, stats.indexed_files, stats.errors); + + // Handle deletions + let mut removed_count = 0usize; + for (abs_path, (file_id, book_id, _)) in existing { + if seen.contains_key(&abs_path) { + continue; + } + sqlx::query("DELETE FROM book_files WHERE id = $1") + .bind(file_id) + .execute(&state.pool) + .await?; + sqlx::query("DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)") + .bind(book_id) + .execute(&state.pool) + .await?; + stats.removed_files += 1; + removed_count += 1; + } + + if removed_count > 0 { + info!("[SCAN] Removed {} stale files from database", removed_count); + } + + Ok(()) +} diff --git a/apps/indexer/src/scheduler.rs b/apps/indexer/src/scheduler.rs new file mode 100644 index 0000000..99e55c5 --- /dev/null +++ b/apps/indexer/src/scheduler.rs @@ -0,0 +1,67 @@ +use anyhow::Result; +use sqlx::{PgPool, Row}; +use tracing::info; +use uuid::Uuid; + +pub async fn check_and_schedule_auto_scans(pool: &PgPool) -> Result<()> { + let libraries = sqlx::query( + r#" + SELECT id, scan_mode, last_scan_at + FROM libraries + WHERE monitor_enabled = TRUE + AND ( + next_scan_at IS NULL + OR next_scan_at <= NOW() + ) + AND NOT EXISTS ( + SELECT 1 FROM index_jobs + WHERE library_id = libraries.id + AND status IN ('pending', 'running') + ) + "# + ) + .fetch_all(pool) + .await?; + + for row in libraries { + let library_id: Uuid = row.get("id"); + let scan_mode: String = row.get("scan_mode"); + + info!("[SCHEDULER] Auto-scanning library {} (mode: {})", library_id, scan_mode); + + let job_id = Uuid::new_v4(); + let job_type = match scan_mode.as_str() { + "full" => "full_rebuild", + _ => "rebuild", + }; + + sqlx::query( + "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, $3, 'pending')" + ) + .bind(job_id) + .bind(library_id) + .bind(job_type) + .execute(pool) + .await?; + + // Update next_scan_at + let interval_minutes = match scan_mode.as_str() { + "hourly" => 60, + "daily" => 1440, + "weekly" => 10080, + _ => 1440, // default daily + }; + + sqlx::query( + "UPDATE libraries SET last_scan_at = NOW(), next_scan_at = NOW() + INTERVAL '1 minute' * $2 WHERE id = $1" + ) + .bind(library_id) + .bind(interval_minutes) + .execute(pool) + .await?; + + info!("[SCHEDULER] Created job {} for library {}", job_id, library_id); + } + + Ok(()) +} diff --git a/apps/indexer/src/utils.rs b/apps/indexer/src/utils.rs new file mode 100644 index 0000000..47add2e --- /dev/null +++ b/apps/indexer/src/utils.rs @@ -0,0 +1,52 @@ +use anyhow::Result; +use chrono::DateTime; +use parsers::BookFormat; +use sha2::{Digest, Sha256}; +use std::path::Path; +use chrono::Utc; + +pub fn remap_libraries_path(path: &str) -> String { + if let Ok(root) = std::env::var("LIBRARIES_ROOT_PATH") { + if path.starts_with("/libraries/") { + return path.replacen("/libraries", &root, 1); + } + } + path.to_string() +} + +pub fn unmap_libraries_path(path: &str) -> String { + if let Ok(root) = std::env::var("LIBRARIES_ROOT_PATH") { + if path.starts_with(&root) { + return path.replacen(&root, "/libraries", 1); + } + } + path.to_string() +} + +pub fn compute_fingerprint(path: &Path, size: u64, mtime: &DateTime) -> Result { + // Optimized: only use size + mtime + first bytes of filename for fast fingerprinting + // This is 100x faster than reading file content while still being reliable for change detection + let mut hasher = Sha256::new(); + hasher.update(size.to_le_bytes()); + hasher.update(mtime.timestamp().to_le_bytes()); + + // Add filename for extra uniqueness (in case of rapid changes with same size+mtime) + if let Some(filename) = path.file_name() { + hasher.update(filename.as_encoded_bytes()); + } + + Ok(format!("{:x}", hasher.finalize())) +} + +pub fn kind_from_format(format: BookFormat) -> &'static str { + match format { + BookFormat::Pdf => "ebook", + BookFormat::Cbz | BookFormat::Cbr => "comic", + } +} + +pub fn file_display_name(path: &Path) -> String { + path.file_stem() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or_else(|| "Untitled".to_string()) +} diff --git a/apps/indexer/src/watcher.rs b/apps/indexer/src/watcher.rs new file mode 100644 index 0000000..0a9d3cf --- /dev/null +++ b/apps/indexer/src/watcher.rs @@ -0,0 +1,147 @@ +use anyhow::Result; +use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher}; +use sqlx::Row; +use std::collections::HashMap; +use std::time::Duration; +use tokio::sync::mpsc; +use tracing::{error, info, trace}; +use uuid::Uuid; + +use crate::utils; +use crate::AppState; + +pub async fn run_file_watcher(state: AppState) -> Result<()> { + let (tx, mut rx) = mpsc::channel::<(Uuid, String)>(100); + + // Start watcher refresh loop + let refresh_interval = Duration::from_secs(30); + let pool = state.pool.clone(); + + tokio::spawn(async move { + let mut watched_libraries: HashMap = HashMap::new(); + + loop { + // Get libraries with watcher enabled + match sqlx::query( + "SELECT id, root_path FROM libraries WHERE watcher_enabled = TRUE AND enabled = TRUE" + ) + .fetch_all(&pool) + .await + { + Ok(rows) => { + let current_libraries: HashMap = rows + .into_iter() + .map(|row| { + let id: Uuid = row.get("id"); + let root_path: String = row.get("root_path"); + let local_path = utils::remap_libraries_path(&root_path); + (id, local_path) + }) + .collect(); + + // Check if we need to recreate watcher + let needs_restart = watched_libraries.len() != current_libraries.len() + || watched_libraries.iter().any(|(id, path)| { + current_libraries.get(id) != Some(path) + }); + + if needs_restart { + info!("[WATCHER] Restarting watcher for {} libraries", current_libraries.len()); + + if !current_libraries.is_empty() { + let tx_clone = tx.clone(); + let libraries_clone = current_libraries.clone(); + + match setup_watcher(libraries_clone, tx_clone) { + Ok(_new_watcher) => { + watched_libraries = current_libraries; + info!("[WATCHER] Watching {} libraries", watched_libraries.len()); + } + Err(err) => { + error!("[WATCHER] Failed to setup watcher: {}", err); + } + } + } + } + } + Err(err) => { + error!("[WATCHER] Failed to fetch libraries: {}", err); + } + } + + tokio::time::sleep(refresh_interval).await; + } + }); + + // Process watcher events + while let Some((library_id, file_path)) = rx.recv().await { + info!("[WATCHER] File changed in library {}: {}", library_id, file_path); + + // Check if there's already a pending job for this library + match sqlx::query_scalar::<_, bool>( + "SELECT EXISTS(SELECT 1 FROM index_jobs WHERE library_id = $1 AND status IN ('pending', 'running'))" + ) + .bind(library_id) + .fetch_one(&state.pool) + .await + { + Ok(exists) => { + if !exists { + // Create a quick scan job + let job_id = Uuid::new_v4(); + match sqlx::query( + "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, 'rebuild', 'pending')" + ) + .bind(job_id) + .bind(library_id) + .execute(&state.pool) + .await + { + Ok(_) => info!("[WATCHER] Created job {} for library {}", job_id, library_id), + Err(err) => error!("[WATCHER] Failed to create job: {}", err), + } + } else { + trace!("[WATCHER] Job already pending for library {}, skipping", library_id); + } + } + Err(err) => error!("[WATCHER] Failed to check existing jobs: {}", err), + } + } + + Ok(()) +} + +fn setup_watcher( + libraries: HashMap, + tx: mpsc::Sender<(Uuid, String)>, +) -> Result { + let libraries_for_closure = libraries.clone(); + + let mut watcher = notify::recommended_watcher(move |res: Result| { + match res { + Ok(event) => { + if event.kind.is_modify() || event.kind.is_create() || event.kind.is_remove() { + for path in event.paths { + if let Some((library_id, _)) = libraries_for_closure.iter().find(|(_, root)| { + path.starts_with(root) + }) { + let path_str = path.to_string_lossy().to_string(); + if parsers::detect_format(&path).is_some() { + let _ = tx.try_send((*library_id, path_str)); + } + } + } + } + } + Err(err) => error!("[WATCHER] Event error: {}", err), + } + })?; + + // Actually watch the library directories + for (_, root_path) in &libraries { + info!("[WATCHER] Watching directory: {}", root_path); + watcher.watch(std::path::Path::new(root_path), RecursiveMode::Recursive)?; + } + + Ok(watcher) +} diff --git a/apps/indexer/src/worker.rs b/apps/indexer/src/worker.rs new file mode 100644 index 0000000..ffa2668 --- /dev/null +++ b/apps/indexer/src/worker.rs @@ -0,0 +1,61 @@ +use std::time::Duration; +use tracing::{error, info, trace}; +use crate::{job, scheduler, watcher, AppState}; + +pub async fn run_worker(state: AppState, interval_seconds: u64) { + let wait = Duration::from_secs(interval_seconds.max(1)); + + // Cleanup stale jobs from previous runs + if let Err(err) = job::cleanup_stale_jobs(&state.pool).await { + error!("[CLEANUP] Failed to cleanup stale jobs: {}", err); + } + + // Start file watcher task + let watcher_state = state.clone(); + let _watcher_handle = tokio::spawn(async move { + info!("[WATCHER] Starting file watcher service"); + if let Err(err) = watcher::run_file_watcher(watcher_state).await { + error!("[WATCHER] Error: {}", err); + } + }); + + // Start scheduler task for auto-monitoring + let scheduler_state = state.clone(); + let _scheduler_handle = tokio::spawn(async move { + let scheduler_wait = Duration::from_secs(60); // Check every minute + loop { + if let Err(err) = scheduler::check_and_schedule_auto_scans(&scheduler_state.pool).await { + error!("[SCHEDULER] Error: {}", err); + } + tokio::time::sleep(scheduler_wait).await; + } + }); + + loop { + match job::claim_next_job(&state.pool).await { + Ok(Some((job_id, library_id))) => { + info!("[INDEXER] Starting job {} library={:?}", job_id, library_id); + if let Err(err) = job::process_job(&state, job_id, library_id).await { + let err_str = err.to_string(); + if err_str.contains("cancelled") || err_str.contains("Cancelled") { + info!("[INDEXER] Job {} was cancelled by user", job_id); + // Status is already 'cancelled' in DB, don't change it + } else { + error!("[INDEXER] Job {} failed: {}", job_id, err); + let _ = job::fail_job(&state.pool, job_id, &err_str).await; + } + } else { + info!("[INDEXER] Job {} completed", job_id); + } + } + Ok(None) => { + trace!("[INDEXER] No pending jobs, waiting..."); + tokio::time::sleep(wait).await; + } + Err(err) => { + error!("[INDEXER] Worker error: {}", err); + tokio::time::sleep(wait).await; + } + } + } +}