diff --git a/Cargo.lock b/Cargo.lock index 225b778..f6eb33f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,9 +71,11 @@ version = "0.1.0" dependencies = [ "anyhow", "argon2", + "async-stream", "axum", "base64", "chrono", + "futures", "image", "lru", "rand 0.8.5", @@ -84,7 +86,9 @@ dependencies = [ "sqlx", "stripstream-core", "tokio", + "tokio-stream", "tower", + "tower-http", "tracing", "tracing-subscriber", "utoipa", @@ -114,6 +118,28 @@ dependencies = [ "password-hash", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -605,6 +631,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.32" @@ -649,6 +690,17 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "futures-sink" version = "0.3.32" @@ -667,8 +719,10 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", diff --git a/PLAN.md b/PLAN.md index 7ce191f..b50fba6 100644 --- a/PLAN.md +++ b/PLAN.md @@ -161,18 +161,72 @@ Construire un serveur ultra performant pour indexer et servir des bibliotheques **DoD:** Checklist MVP validee de bout en bout. +### T19 - Progression temps reel des jobs (Option 2 - stockage minimal) - [COMPLETE] +- [x] Table `index_job_errors` pour stocker les erreurs (job_id, file_path, error_message, created_at) +- [x] Endpoint polling `GET /index/jobs/:id` (remplace SSE pour compatibilite) avec progression temps reel +- [x] Indexer: stockage progression en DB (current_file, progress_percent, processed_files, total_files) +- [x] Backoffice: composant `JobProgress` avec barre de progression et pourcentage +- [x] Affichage du fichier en cours de traitement et compteur (traite/total) +- [x] Stats globales utilisees depuis `index_jobs.stats_json` + +**DoD:** ✅ Barre de progression temps reel + erreurs stockees pour debug post-job. + +### T20 - Header avec indicateur jobs - [COMPLETE] +- [x] Endpoint `GET /index/jobs/active` pour recuperer les jobs pending/running +- [x] Composant `JobsIndicator` dans le header avec badge compteur dynamique +- [x] Dropdown au clic affichant la liste des jobs actifs avec progression +- [x] Polling toutes les 5s pour mise a jour temps reel +- [x] Lien rapide vers la page jobs detaillee + +**DoD:** ✅ Icone cliquable dans le header montrant les jobs en cours avec badge rouge. + +### T21 - Bouton indexation sur libraries - [COMPLETE] +- [x] Boutons "Index" (incremental) et "Index Full" (rebuild complet) sur chaque ligne +- [x] Endpoint `POST /libraries/:id/scan` pour lancer le job +- [x] Support parametre `full: true` pour rebuild complet +- [x] Feedback visuel via revalidation Next.js +- [x] Type de job `full_rebuild` distinct du `rebuild` normal + +**DoD:** ✅ Lancement d'indexation direct depuis la page libraries avec support full rebuild. + +### T22 - Details enrichis des jobs - [PARTIEL] +- [x] Endpoint `GET /index/jobs/:id` avec statistiques completes (remplace /details) +- [x] Endpoint `GET /index/jobs/:id/errors` avec liste des erreurs +- [ ] Endpoint `GET /index/jobs/:id/files` avec pagination des fichiers traites +- [ ] Page detaillee `/jobs/[id]/page.tsx` avec timeline et stats avancees +- [x] Liste des erreurs de job accessible +- [x] Navigation et affichage progression en temps reel sur la liste + +**DoD:** ⚠️ Fonctionnalites de base OK, page detaillee complete a finaliser. + +### T23 - Surveillance automatique des libraries - [PARTIEL] +- [x] Migration `0004_library_monitoring.sql`: colonnes `monitor_enabled`, `scan_mode`, `last_scan_at`, `next_scan_at` +- [x] Enum scan_mode: 'manual', 'hourly', 'daily', 'weekly' avec contrainte CHECK +- [ ] Checkbox "Enable monitoring" et select "Scan frequency" dans formulaire library +- [ ] Scheduler dans l'indexer (toutes les minutes) verifiant les libraries a scanner +- [x] Endpoint `POST /libraries/:id/scan` pour scan manuel declenche par API +- [ ] Badge "Auto" (vert) ou "Manual" (gris) dans la liste des libraries +- [ ] Prochain scan estime affiche dans les details de la library + +**DoD:** ⚠️ Schema DB et endpoint API OK, UI et scheduler a implementer. + --- ## Contrat API minimum (v1) - `GET /libraries` - `POST /libraries` - `DELETE /libraries/:id` +- `POST /libraries/:id/scan` (T23) - `GET /books` - `GET /books/:id` - `GET /search` - `GET /books/:id/pages/:n` - `POST /index/rebuild` - `GET /index/status` +- `GET /index/jobs/active` (T20) +- `GET /index/jobs/:id/details` (T22) +- `GET /index/jobs/:id/files` (T22) +- `GET /index/jobs/:id/stream` (T19) - `POST /admin/tokens` - `GET /admin/tokens` - `DELETE /admin/tokens/:id` @@ -196,7 +250,9 @@ Construire un serveur ultra performant pour indexer et servir des bibliotheques ## Suivi d'avancement - [x] Lot 1: Fondations (T1 -> T6) - [x] Lot 2: Ingestion + Search (T7 -> T13) -- [ ] Lot 3: Lecture + UI + Hardening (T14 -> T18) +- [x] Lot 3: Lecture + UI + Hardening (T14 -> T18) +- [x] Lot 4: Ameliorations Indexation (T19 -> T22) [COMPLETE PARTIELLEMENT] +- [ ] Lot 5: Optimisations et Polishing (T23 + ameliorations) ## Notes - Scope token v1: `admin`, `read` @@ -213,3 +269,4 @@ Construire un serveur ultra performant pour indexer et servir des bibliotheques - 2026-03-05: smoke + bench scripts corriges et verifies (`infra/smoke.sh`, `infra/bench.sh`). - 2026-03-05: pivot backoffice valide: remplacement de l'admin UI Rust SSR par une app Next.js. - 2026-03-05: backoffice Next.js implemente (`apps/backoffice`) avec branding neon base sur le logo, actions libraries/jobs/tokens, et integration Docker Compose. +- 2026-03-06: planification Lot 4 - ajout des taches T19-T23 pour ameliorations indexing (progression temps reel, indicateur header, bouton indexation, details jobs, surveillance auto libraries) diff --git a/apps/api/Cargo.toml b/apps/api/Cargo.toml index 99d7f5b..f341bfe 100644 --- a/apps/api/Cargo.toml +++ b/apps/api/Cargo.toml @@ -9,11 +9,14 @@ anyhow.workspace = true argon2.workspace = true axum.workspace = true base64.workspace = true +async-stream = "0.3" chrono.workspace = true +futures = "0.3" image.workspace = true lru.workspace = true stripstream-core = { path = "../../crates/core" } rand.workspace = true +tokio-stream = "0.1" reqwest.workspace = true serde.workspace = true serde_json.workspace = true @@ -21,6 +24,7 @@ sha2.workspace = true sqlx.workspace = true tokio.workspace = true tower.workspace = true +tower-http = { version = "0.6", features = ["cors"] } tracing.workspace = true tracing-subscriber.workspace = true uuid.workspace = true diff --git a/apps/api/src/index_jobs.rs b/apps/api/src/index_jobs.rs index ebba8be..b4e4b0d 100644 --- a/apps/api/src/index_jobs.rs +++ b/apps/api/src/index_jobs.rs @@ -1,7 +1,10 @@ -use axum::{extract::State, Json}; +use axum::{extract::State, response::sse::{Event, Sse}, Json}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use sqlx::Row; +use std::convert::Infallible; +use std::time::Duration; +use tokio_stream::Stream; use uuid::Uuid; use utoipa::ToSchema; @@ -11,6 +14,8 @@ use crate::{error::ApiError, AppState}; pub struct RebuildRequest { #[schema(value_type = Option)] pub library_id: Option, + #[schema(value_type = Option, example = false)] + pub full: Option, } #[derive(Serialize, ToSchema)] @@ -37,6 +42,49 @@ pub struct FolderItem { pub path: String, } +#[derive(Serialize, ToSchema)] +pub struct IndexJobDetailResponse { + #[schema(value_type = String)] + pub id: Uuid, + #[schema(value_type = Option)] + pub library_id: Option, + pub r#type: String, + pub status: String, + #[schema(value_type = Option)] + pub started_at: Option>, + #[schema(value_type = Option)] + pub finished_at: Option>, + pub stats_json: Option, + pub error_opt: Option, + #[schema(value_type = String)] + pub created_at: DateTime, + pub current_file: Option, + pub progress_percent: Option, + pub total_files: Option, + pub processed_files: Option, +} + +#[derive(Serialize, ToSchema)] +pub struct JobErrorResponse { + #[schema(value_type = String)] + pub id: Uuid, + pub file_path: String, + pub error_message: String, + #[schema(value_type = String)] + pub created_at: DateTime, +} + +#[derive(Serialize, ToSchema)] +pub struct ProgressEvent { + pub job_id: String, + pub status: String, + pub current_file: Option, + pub progress_percent: Option, + pub processed_files: Option, + pub total_files: Option, + pub stats_json: Option, +} + /// Enqueue a job to rebuild the search index for a library (or all libraries if no library_id specified) #[utoipa::path( post, @@ -54,14 +102,17 @@ pub async fn enqueue_rebuild( State(state): State, payload: Option>, ) -> Result, ApiError> { - let library_id = payload.and_then(|p| p.0.library_id); + let library_id = payload.as_ref().and_then(|p| p.0.library_id); + let is_full = payload.as_ref().and_then(|p| p.0.full).unwrap_or(false); + let job_type = if is_full { "full_rebuild" } else { "rebuild" }; let id = Uuid::new_v4(); sqlx::query( - "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, 'rebuild', 'pending')", + "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, $3, 'pending')", ) .bind(id) .bind(library_id) + .bind(job_type) .execute(&state.pool) .await?; @@ -138,6 +189,10 @@ pub async fn cancel_job( Ok(Json(map_row(row))) } +fn get_libraries_root() -> String { + std::env::var("LIBRARIES_ROOT_PATH").unwrap_or_else(|_| "/libraries".to_string()) +} + /// List available folders in /libraries for library creation #[utoipa::path( get, @@ -151,7 +206,8 @@ pub async fn cancel_job( security(("Bearer" = [])) )] pub async fn list_folders(State(_state): State) -> Result>, ApiError> { - let libraries_path = std::path::Path::new("/libraries"); + let libraries_root = get_libraries_root(); + let libraries_path = std::path::Path::new(&libraries_root); let mut folders = Vec::new(); if let Ok(entries) = std::fs::read_dir(libraries_path) { @@ -170,7 +226,7 @@ pub async fn list_folders(State(_state): State) -> Result IndexJobResponse { +pub fn map_row(row: sqlx::postgres::PgRow) -> IndexJobResponse { IndexJobResponse { id: row.get("id"), library_id: row.get("library_id"), @@ -183,3 +239,213 @@ fn map_row(row: sqlx::postgres::PgRow) -> IndexJobResponse { created_at: row.get("created_at"), } } + +fn map_row_detail(row: sqlx::postgres::PgRow) -> IndexJobDetailResponse { + IndexJobDetailResponse { + id: row.get("id"), + library_id: row.get("library_id"), + r#type: row.get("type"), + status: row.get("status"), + started_at: row.get("started_at"), + finished_at: row.get("finished_at"), + stats_json: row.get("stats_json"), + error_opt: row.get("error_opt"), + created_at: row.get("created_at"), + current_file: row.get("current_file"), + progress_percent: row.get("progress_percent"), + total_files: row.get("total_files"), + processed_files: row.get("processed_files"), + } +} + +/// List active indexing jobs (pending or running) +#[utoipa::path( + get, + path = "/index/jobs/active", + tag = "indexing", + responses( + (status = 200, body = Vec), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - Admin scope required"), + ), + security(("Bearer" = [])) +)] +pub async fn get_active_jobs(State(state): State) -> Result>, ApiError> { + let rows = sqlx::query( + "SELECT id, library_id, type, status, started_at, finished_at, stats_json, error_opt, created_at + FROM index_jobs + WHERE status IN ('pending', 'running') + ORDER BY created_at ASC" + ) + .fetch_all(&state.pool) + .await?; + + Ok(Json(rows.into_iter().map(map_row).collect())) +} + +/// Get detailed job information including progress +#[utoipa::path( + get, + path = "/index/jobs/{id}/details", + tag = "indexing", + params( + ("id" = String, Path, description = "Job UUID"), + ), + responses( + (status = 200, body = IndexJobDetailResponse), + (status = 404, description = "Job not found"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - Admin scope required"), + ), + security(("Bearer" = [])) +)] +pub async fn get_job_details( + State(state): State, + id: axum::extract::Path, +) -> Result, ApiError> { + let row = sqlx::query( + "SELECT id, library_id, type, status, started_at, finished_at, stats_json, error_opt, created_at, + current_file, progress_percent, total_files, processed_files + FROM index_jobs WHERE id = $1" + ) + .bind(id.0) + .fetch_optional(&state.pool) + .await?; + + match row { + Some(row) => Ok(Json(map_row_detail(row))), + None => Err(ApiError::not_found("job not found")), + } +} + +/// List errors for a specific job +#[utoipa::path( + get, + path = "/index/jobs/{id}/errors", + tag = "indexing", + params( + ("id" = String, Path, description = "Job UUID"), + ), + responses( + (status = 200, body = Vec), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - Admin scope required"), + ), + security(("Bearer" = [])) +)] +pub async fn get_job_errors( + State(state): State, + id: axum::extract::Path, +) -> Result>, ApiError> { + let rows = sqlx::query( + "SELECT id, file_path, error_message, created_at + FROM index_job_errors + WHERE job_id = $1 + ORDER BY created_at ASC" + ) + .bind(id.0) + .fetch_all(&state.pool) + .await?; + + let errors: Vec = rows + .into_iter() + .map(|row| JobErrorResponse { + id: row.get("id"), + file_path: row.get("file_path"), + error_message: row.get("error_message"), + created_at: row.get("created_at"), + }) + .collect(); + + Ok(Json(errors)) +} + +/// Stream job progress via SSE +#[utoipa::path( + get, + path = "/index/jobs/{id}/stream", + tag = "indexing", + params( + ("id" = String, Path, description = "Job UUID"), + ), + responses( + (status = 200, description = "SSE stream of progress events"), + (status = 404, description = "Job not found"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - Admin scope required"), + ), + security(("Bearer" = [])) +)] +pub async fn stream_job_progress( + State(state): State, + id: axum::extract::Path, +) -> Result>>, ApiError> { + // Verify job exists + let job_exists = sqlx::query("SELECT 1 FROM index_jobs WHERE id = $1") + .bind(id.0) + .fetch_optional(&state.pool) + .await?; + + if job_exists.is_none() { + return Err(ApiError::not_found("job not found")); + } + + let job_id = id.0; + let pool = state.pool.clone(); + + let stream = async_stream::stream! { + let mut last_status: Option = None; + let mut last_processed: Option = None; + let mut interval = tokio::time::interval(Duration::from_millis(500)); + + loop { + interval.tick().await; + + let row = sqlx::query( + "SELECT status, current_file, progress_percent, processed_files, total_files, stats_json + FROM index_jobs WHERE id = $1" + ) + .bind(job_id) + .fetch_one(&pool) + .await; + + match row { + Ok(row) => { + let status: String = row.get("status"); + let processed_files: Option = row.get("processed_files"); + + // Send update if status changed or progress changed + let should_send = last_status.as_ref() != Some(&status) + || last_processed != processed_files; + + if should_send { + last_status = Some(status.clone()); + last_processed = processed_files; + + let event = ProgressEvent { + job_id: job_id.to_string(), + status: status.clone(), + current_file: row.get("current_file"), + progress_percent: row.get("progress_percent"), + processed_files, + total_files: row.get("total_files"), + stats_json: row.get("stats_json"), + }; + + if let Ok(json) = serde_json::to_string(&event) { + yield Ok(Event::default().data(json)); + } + + // Stop streaming if job is finished + if status == "success" || status == "failed" || status == "cancelled" { + break; + } + } + } + Err(_) => break, + } + } + }; + + Ok(Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default())) +} diff --git a/apps/api/src/libraries.rs b/apps/api/src/libraries.rs index badb504..18c4ae9 100644 --- a/apps/api/src/libraries.rs +++ b/apps/api/src/libraries.rs @@ -152,3 +152,61 @@ fn canonicalize_library_root(root_path: &str) -> Result { Ok(canonical) } + +use crate::index_jobs::{IndexJobResponse, RebuildRequest}; + +/// Trigger a scan/indexing job for a specific library +#[utoipa::path( + post, + path = "/libraries/{id}/scan", + tag = "libraries", + params( + ("id" = String, Path, description = "Library UUID"), + ), + request_body = Option, + responses( + (status = 200, body = IndexJobResponse), + (status = 404, description = "Library not found"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - Admin scope required"), + ), + security(("Bearer" = [])) +)] +pub async fn scan_library( + State(state): State, + AxumPath(library_id): AxumPath, + payload: Option>, +) -> Result, ApiError> { + // Verify library exists + let library_exists = sqlx::query("SELECT 1 FROM libraries WHERE id = $1") + .bind(library_id) + .fetch_optional(&state.pool) + .await?; + + if library_exists.is_none() { + return Err(ApiError::not_found("library not found")); + } + + let is_full = payload.as_ref().and_then(|p| p.full).unwrap_or(false); + let job_type = if is_full { "full_rebuild" } else { "rebuild" }; + + // Create indexing job for this library + let job_id = Uuid::new_v4(); + sqlx::query( + "INSERT INTO index_jobs (id, library_id, type, status) VALUES ($1, $2, $3, 'pending')", + ) + .bind(job_id) + .bind(library_id) + .bind(job_type) + .execute(&state.pool) + .await?; + + let row = sqlx::query( + "SELECT id, library_id, type, status, started_at, finished_at, stats_json, error_opt, created_at FROM index_jobs WHERE id = $1", + ) + .bind(job_id) + .fetch_one(&state.pool) + .await?; + + Ok(Json(crate::index_jobs::map_row(row))) +} diff --git a/apps/api/src/main.rs b/apps/api/src/main.rs index 54f0a91..b4318bc 100644 --- a/apps/api/src/main.rs +++ b/apps/api/src/main.rs @@ -95,8 +95,13 @@ async fn main() -> anyhow::Result<()> { let admin_routes = Router::new() .route("/libraries", get(libraries::list_libraries).post(libraries::create_library)) .route("/libraries/:id", delete(libraries::delete_library)) + .route("/libraries/:id/scan", axum::routing::post(libraries::scan_library)) .route("/index/rebuild", axum::routing::post(index_jobs::enqueue_rebuild)) .route("/index/status", get(index_jobs::list_index_jobs)) + .route("/index/jobs/active", get(index_jobs::get_active_jobs)) + .route("/index/jobs/:id", get(index_jobs::get_job_details)) + .route("/index/jobs/:id/stream", get(index_jobs::stream_job_progress)) + .route("/index/jobs/:id/errors", get(index_jobs::get_job_errors)) .route("/index/cancel/:id", axum::routing::post(index_jobs::cancel_job)) .route("/folders", get(index_jobs::list_folders)) .route("/admin/tokens", get(tokens::list_tokens).post(tokens::create_token)) diff --git a/apps/api/src/openapi.rs b/apps/api/src/openapi.rs index 05beab0..e637b10 100644 --- a/apps/api/src/openapi.rs +++ b/apps/api/src/openapi.rs @@ -11,11 +11,16 @@ use utoipa::OpenApi; crate::search::search_books, crate::index_jobs::enqueue_rebuild, crate::index_jobs::list_index_jobs, + crate::index_jobs::get_active_jobs, + crate::index_jobs::get_job_details, + crate::index_jobs::stream_job_progress, + crate::index_jobs::get_job_errors, crate::index_jobs::cancel_job, crate::index_jobs::list_folders, crate::libraries::list_libraries, crate::libraries::create_library, crate::libraries::delete_library, + crate::libraries::scan_library, crate::tokens::list_tokens, crate::tokens::create_token, crate::tokens::revoke_token, @@ -32,6 +37,9 @@ use utoipa::OpenApi; crate::search::SearchResponse, crate::index_jobs::RebuildRequest, crate::index_jobs::IndexJobResponse, + crate::index_jobs::IndexJobDetailResponse, + crate::index_jobs::JobErrorResponse, + crate::index_jobs::ProgressEvent, crate::index_jobs::FolderItem, crate::libraries::LibraryResponse, crate::libraries::CreateLibraryRequest, diff --git a/apps/api/src/pages.rs b/apps/api/src/pages.rs index 8bb2102..8f0fc15 100644 --- a/apps/api/src/pages.rs +++ b/apps/api/src/pages.rs @@ -20,6 +20,15 @@ use uuid::Uuid; use crate::{error::ApiError, AppState}; +fn remap_libraries_path(path: &str) -> String { + if let Ok(root) = std::env::var("LIBRARIES_ROOT_PATH") { + if path.starts_with("/libraries/") { + return path.replacen("/libraries", &root, 1); + } + } + path.to_string() +} + #[derive(Deserialize, ToSchema)] pub struct PageQuery { #[schema(value_type = Option, example = "webp")] @@ -122,6 +131,8 @@ pub async fn get_page( let row = row.ok_or_else(|| ApiError::not_found("book file not found"))?; let abs_path: String = row.get("abs_path"); + // Remap /libraries to LIBRARIES_ROOT_PATH for local development + let abs_path = remap_libraries_path(&abs_path); let input_format: String = row.get("format"); let _permit = state diff --git a/apps/backoffice/.env.local b/apps/backoffice/.env.local new file mode 100644 index 0000000..aa836e3 --- /dev/null +++ b/apps/backoffice/.env.local @@ -0,0 +1,4 @@ +API_BASE_URL=http://localhost:8080 +API_BOOTSTRAP_TOKEN=stripstream-dev-bootstrap-token +NEXT_PUBLIC_API_BASE_URL=http://localhost:8080 +NEXT_PUBLIC_API_BOOTSTRAP_TOKEN=stripstream-dev-bootstrap-token diff --git a/apps/backoffice/app/api/jobs/[id]/cancel/route.ts b/apps/backoffice/app/api/jobs/[id]/cancel/route.ts new file mode 100644 index 0000000..fe2ef5d --- /dev/null +++ b/apps/backoffice/app/api/jobs/[id]/cancel/route.ts @@ -0,0 +1,36 @@ +import { NextRequest, NextResponse } from "next/server"; + +export async function POST( + request: NextRequest, + { params }: { params: Promise<{ id: string }> } +) { + const { id } = await params; + const apiBaseUrl = process.env.API_BASE_URL || "http://api:8080"; + const apiToken = process.env.API_BOOTSTRAP_TOKEN; + + if (!apiToken) { + return NextResponse.json({ error: "API token not configured" }, { status: 500 }); + } + + try { + const response = await fetch(`${apiBaseUrl}/index/cancel/${id}`, { + method: "POST", + headers: { + Authorization: `Bearer ${apiToken}`, + }, + }); + + if (!response.ok) { + return NextResponse.json( + { error: `API error: ${response.status}` }, + { status: response.status } + ); + } + + const data = await response.json(); + return NextResponse.json(data); + } catch (error) { + console.error("Proxy error:", error); + return NextResponse.json({ error: "Failed to cancel job" }, { status: 500 }); + } +} diff --git a/apps/backoffice/app/api/jobs/[id]/route.ts b/apps/backoffice/app/api/jobs/[id]/route.ts new file mode 100644 index 0000000..35eb915 --- /dev/null +++ b/apps/backoffice/app/api/jobs/[id]/route.ts @@ -0,0 +1,35 @@ +import { NextRequest, NextResponse } from "next/server"; + +export async function GET( + request: NextRequest, + { params }: { params: Promise<{ id: string }> } +) { + const { id } = await params; + const apiBaseUrl = process.env.API_BASE_URL || "http://api:8080"; + const apiToken = process.env.API_BOOTSTRAP_TOKEN; + + if (!apiToken) { + return NextResponse.json({ error: "API token not configured" }, { status: 500 }); + } + + try { + const response = await fetch(`${apiBaseUrl}/index/jobs/${id}`, { + headers: { + Authorization: `Bearer ${apiToken}`, + }, + }); + + if (!response.ok) { + return NextResponse.json( + { error: `API error: ${response.status}` }, + { status: response.status } + ); + } + + const data = await response.json(); + return NextResponse.json(data); + } catch (error) { + console.error("Proxy error:", error); + return NextResponse.json({ error: "Failed to fetch job" }, { status: 500 }); + } +} diff --git a/apps/backoffice/app/api/jobs/[id]/stream/route.ts b/apps/backoffice/app/api/jobs/[id]/stream/route.ts new file mode 100644 index 0000000..d665350 --- /dev/null +++ b/apps/backoffice/app/api/jobs/[id]/stream/route.ts @@ -0,0 +1,87 @@ +import { NextRequest } from "next/server"; + +export async function GET( + request: NextRequest, + { params }: { params: Promise<{ id: string }> } +) { + const { id } = await params; + const apiBaseUrl = process.env.API_BASE_URL || "http://api:8080"; + const apiToken = process.env.API_BOOTSTRAP_TOKEN; + + if (!apiToken) { + return new Response( + `data: ${JSON.stringify({ error: "API token not configured" })}\n\n`, + { status: 500, headers: { "Content-Type": "text/event-stream" } } + ); + } + + const stream = new ReadableStream({ + async start(controller) { + // Send initial headers for SSE + controller.enqueue(new TextEncoder().encode("")); + + let lastData: string | null = null; + let isActive = true; + + const fetchJob = async () => { + if (!isActive) return; + + try { + const response = await fetch(`${apiBaseUrl}/index/jobs/${id}`, { + headers: { + Authorization: `Bearer ${apiToken}`, + }, + }); + + if (response.ok) { + const data = await response.json(); + const dataStr = JSON.stringify(data); + + // Only send if data changed + if (dataStr !== lastData) { + lastData = dataStr; + controller.enqueue( + new TextEncoder().encode(`data: ${dataStr}\n\n`) + ); + + // Stop polling if job is complete + if (data.status === "success" || data.status === "failed" || data.status === "cancelled") { + isActive = false; + controller.close(); + } + } + } + } catch (error) { + console.error("SSE fetch error:", error); + } + }; + + // Initial fetch + await fetchJob(); + + // Poll every 500ms while job is active + const interval = setInterval(async () => { + if (!isActive) { + clearInterval(interval); + return; + } + await fetchJob(); + }, 500); + + // Cleanup on abort + request.signal.addEventListener("abort", () => { + isActive = false; + clearInterval(interval); + controller.close(); + }); + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + }); +} diff --git a/apps/backoffice/app/api/jobs/route.ts b/apps/backoffice/app/api/jobs/route.ts new file mode 100644 index 0000000..b3b2e79 --- /dev/null +++ b/apps/backoffice/app/api/jobs/route.ts @@ -0,0 +1,31 @@ +import { NextRequest, NextResponse } from "next/server"; + +export async function GET(request: NextRequest) { + const apiBaseUrl = process.env.API_BASE_URL || "http://api:8080"; + const apiToken = process.env.API_BOOTSTRAP_TOKEN; + + if (!apiToken) { + return NextResponse.json({ error: "API token not configured" }, { status: 500 }); + } + + try { + const response = await fetch(`${apiBaseUrl}/index/status`, { + headers: { + Authorization: `Bearer ${apiToken}`, + }, + }); + + if (!response.ok) { + return NextResponse.json( + { error: `API error: ${response.status}` }, + { status: response.status } + ); + } + + const data = await response.json(); + return NextResponse.json(data); + } catch (error) { + console.error("Proxy error:", error); + return NextResponse.json({ error: "Failed to fetch jobs" }, { status: 500 }); + } +} diff --git a/apps/backoffice/app/api/jobs/stream/route.ts b/apps/backoffice/app/api/jobs/stream/route.ts new file mode 100644 index 0000000..ddf74b8 --- /dev/null +++ b/apps/backoffice/app/api/jobs/stream/route.ts @@ -0,0 +1,76 @@ +import { NextRequest } from "next/server"; + +export async function GET(request: NextRequest) { + const apiBaseUrl = process.env.API_BASE_URL || "http://api:8080"; + const apiToken = process.env.API_BOOTSTRAP_TOKEN; + + if (!apiToken) { + return new Response( + `data: ${JSON.stringify({ error: "API token not configured" })}\n\n`, + { status: 500, headers: { "Content-Type": "text/event-stream" } } + ); + } + + const stream = new ReadableStream({ + async start(controller) { + controller.enqueue(new TextEncoder().encode("")); + + let lastData: string | null = null; + let isActive = true; + + const fetchJobs = async () => { + if (!isActive) return; + + try { + const response = await fetch(`${apiBaseUrl}/index/status`, { + headers: { + Authorization: `Bearer ${apiToken}`, + }, + }); + + if (response.ok) { + const data = await response.json(); + const dataStr = JSON.stringify(data); + + // Send if data changed + if (dataStr !== lastData) { + lastData = dataStr; + controller.enqueue( + new TextEncoder().encode(`data: ${dataStr}\n\n`) + ); + } + } + } catch (error) { + console.error("SSE fetch error:", error); + } + }; + + // Initial fetch + await fetchJobs(); + + // Poll every 2 seconds + const interval = setInterval(async () => { + if (!isActive) { + clearInterval(interval); + return; + } + await fetchJobs(); + }, 2000); + + // Cleanup + request.signal.addEventListener("abort", () => { + isActive = false; + clearInterval(interval); + controller.close(); + }); + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + }); +} diff --git a/apps/backoffice/app/components/JobProgress.tsx b/apps/backoffice/app/components/JobProgress.tsx new file mode 100644 index 0000000..ee55914 --- /dev/null +++ b/apps/backoffice/app/components/JobProgress.tsx @@ -0,0 +1,123 @@ +"use client"; + +import { useEffect, useState } from "react"; + +interface ProgressEvent { + job_id: string; + status: string; + current_file: string | null; + progress_percent: number | null; + processed_files: number | null; + total_files: number | null; + stats_json: { + scanned_files: number; + indexed_files: number; + removed_files: number; + errors: number; + } | null; +} + +interface JobProgressProps { + jobId: string; + onComplete?: () => void; +} + +export function JobProgress({ jobId, onComplete }: JobProgressProps) { + const [progress, setProgress] = useState(null); + const [error, setError] = useState(null); + const [isComplete, setIsComplete] = useState(false); + + useEffect(() => { + // Use SSE via local proxy + const eventSource = new EventSource(`/api/jobs/${jobId}/stream`); + + eventSource.onmessage = (event) => { + try { + const data = JSON.parse(event.data); + + const progressData: ProgressEvent = { + job_id: data.id, + status: data.status, + current_file: data.current_file, + progress_percent: data.progress_percent, + processed_files: data.processed_files, + total_files: data.total_files, + stats_json: data.stats_json, + }; + + setProgress(progressData); + + if (data.status === "success" || data.status === "failed" || data.status === "cancelled") { + setIsComplete(true); + eventSource.close(); + onComplete?.(); + } + } catch (err) { + setError("Failed to parse SSE data"); + } + }; + + eventSource.onerror = (err) => { + console.error("SSE error:", err); + eventSource.close(); + setError("Connection lost"); + }; + + return () => { + eventSource.close(); + }; + }, [jobId, onComplete]); + + if (error) { + return
Error: {error}
; + } + + if (!progress) { + return
Loading progress...
; + } + + const percent = progress.progress_percent ?? 0; + const processed = progress.processed_files ?? 0; + const total = progress.total_files ?? 0; + + return ( +
+
+ + {progress.status} + + {isComplete && Complete} +
+ +
+
+ {percent}% +
+ +
+ {processed} / {total} files + {progress.current_file && ( + + Current: {progress.current_file.length > 40 + ? progress.current_file.substring(0, 40) + "..." + : progress.current_file} + + )} +
+ + {progress.stats_json && ( +
+ Scanned: {progress.stats_json.scanned_files} + Indexed: {progress.stats_json.indexed_files} + Removed: {progress.stats_json.removed_files} + {progress.stats_json.errors > 0 && ( + Errors: {progress.stats_json.errors} + )} +
+ )} +
+ ); +} diff --git a/apps/backoffice/app/components/JobRow.tsx b/apps/backoffice/app/components/JobRow.tsx new file mode 100644 index 0000000..bafbdb3 --- /dev/null +++ b/apps/backoffice/app/components/JobRow.tsx @@ -0,0 +1,75 @@ +"use client"; + +import { useState } from "react"; +import { JobProgress } from "./JobProgress"; + +interface JobRowProps { + job: { + id: string; + library_id: string | null; + type: string; + status: string; + created_at: string; + error_opt: string | null; + }; + libraryName: string | undefined; + highlighted?: boolean; + onCancel: (id: string) => void; +} + +export function JobRow({ job, libraryName, highlighted, onCancel }: JobRowProps) { + const [showProgress, setShowProgress] = useState( + highlighted || job.status === "running" || job.status === "pending" + ); + + const handleComplete = () => { + setShowProgress(false); + // Trigger a page refresh to update the job status + window.location.reload(); + }; + + return ( + <> + + + {job.id.slice(0, 8)} + + {job.library_id ? libraryName || job.library_id.slice(0, 8) : "—"} + {job.type} + + {job.status} + {job.error_opt && !} + {job.status === "running" && ( + + )} + + {new Date(job.created_at).toLocaleString()} + + {job.status === "pending" || job.status === "running" ? ( + + ) : null} + + + {showProgress && (job.status === "running" || job.status === "pending") && ( + + + + + + )} + + ); +} diff --git a/apps/backoffice/app/components/JobsIndicator.tsx b/apps/backoffice/app/components/JobsIndicator.tsx new file mode 100644 index 0000000..66409de --- /dev/null +++ b/apps/backoffice/app/components/JobsIndicator.tsx @@ -0,0 +1,144 @@ +"use client"; + +import { useEffect, useState } from "react"; +import Link from "next/link"; + +interface Job { + id: string; + status: string; + current_file: string | null; + progress_percent: number | null; +} + +interface JobsIndicatorProps { + apiBaseUrl: string; + apiToken: string; +} + +export function JobsIndicator({ apiBaseUrl, apiToken }: JobsIndicatorProps) { + const [activeJobs, setActiveJobs] = useState([]); + const [isOpen, setIsOpen] = useState(false); + + useEffect(() => { + const fetchActiveJobs = async () => { + try { + const response = await fetch(`${apiBaseUrl}/index/jobs/active`, { + headers: { + "Authorization": `Bearer ${apiToken}`, + }, + }); + + if (response.ok) { + const jobs = await response.json(); + // Enrich with details for running jobs + const jobsWithDetails = await Promise.all( + jobs.map(async (job: Job) => { + if (job.status === "running") { + try { + const detailRes = await fetch(`${apiBaseUrl}/index/jobs/${job.id}`, { + headers: { "Authorization": `Bearer ${apiToken}` }, + }); + if (detailRes.ok) { + const detail = await detailRes.json(); + return { ...job, ...detail }; + } + } catch { + // ignore detail fetch errors + } + } + return job; + }) + ); + setActiveJobs(jobsWithDetails); + } + } catch { + // Silently fail + } + }; + + fetchActiveJobs(); + const interval = setInterval(fetchActiveJobs, 5000); + + return () => clearInterval(interval); + }, [apiBaseUrl, apiToken]); + + const pendingCount = activeJobs.filter(j => j.status === "pending").length; + const runningCount = activeJobs.filter(j => j.status === "running").length; + const totalCount = activeJobs.length; + + if (totalCount === 0) { + return ( + + + + + + + ); + } + + return ( +
+ + + {isOpen && ( +
+
+ Active Jobs + setIsOpen(false)}>View all +
+ + {activeJobs.length === 0 ? ( +

No active jobs

+ ) : ( +
    + {activeJobs.map(job => ( +
  • +
    + + {job.status} + + {job.id.slice(0, 8)} +
    + {job.status === "running" && job.progress_percent !== null && ( +
    +
    + {job.progress_percent}% +
    + )} + {job.current_file && ( +

    + {job.current_file.length > 30 + ? job.current_file.substring(0, 30) + "..." + : job.current_file} +

    + )} +
  • + ))} +
+ )} +
+ )} +
+ ); +} diff --git a/apps/backoffice/app/components/JobsIndicatorWrapper.tsx b/apps/backoffice/app/components/JobsIndicatorWrapper.tsx new file mode 100644 index 0000000..4f4a21e --- /dev/null +++ b/apps/backoffice/app/components/JobsIndicatorWrapper.tsx @@ -0,0 +1,12 @@ +"use client"; + +import { JobsIndicator } from "./JobsIndicator"; + +interface JobsIndicatorWrapperProps { + apiBaseUrl: string; + apiToken: string; +} + +export function JobsIndicatorWrapper({ apiBaseUrl, apiToken }: JobsIndicatorWrapperProps) { + return ; +} diff --git a/apps/backoffice/app/components/JobsList.tsx b/apps/backoffice/app/components/JobsList.tsx new file mode 100644 index 0000000..9fd4d36 --- /dev/null +++ b/apps/backoffice/app/components/JobsList.tsx @@ -0,0 +1,91 @@ +"use client"; + +import { useState, useEffect } from "react"; +import { JobRow } from "./JobRow"; + +interface Job { + id: string; + library_id: string | null; + type: string; + status: string; + created_at: string; + error_opt: string | null; +} + +interface JobsListProps { + initialJobs: Job[]; + libraries: Map; + highlightJobId?: string; +} + +export function JobsList({ initialJobs, libraries, highlightJobId }: JobsListProps) { + const [jobs, setJobs] = useState(initialJobs); + + // Refresh jobs list via SSE + useEffect(() => { + const eventSource = new EventSource("/api/jobs/stream"); + + eventSource.onmessage = (event) => { + try { + const data = JSON.parse(event.data); + if (Array.isArray(data)) { + setJobs(data); + } + } catch (error) { + console.error("Failed to parse SSE data:", error); + } + }; + + eventSource.onerror = (err) => { + console.error("SSE error:", err); + eventSource.close(); + }; + + return () => { + eventSource.close(); + }; + }, []); + + const handleCancel = async (id: string) => { + try { + const response = await fetch(`/api/jobs/${id}/cancel`, { + method: "POST", + }); + + if (response.ok) { + // Update local state to reflect cancellation + setJobs(jobs.map(job => + job.id === id ? { ...job, status: "cancelled" } : job + )); + } + } catch (error) { + console.error("Failed to cancel job:", error); + } + }; + + return ( + + + + + + + + + + + + + {jobs.map((job) => ( + + ))} + +
IDLibraryTypeStatusCreatedActions
+ ); +} diff --git a/apps/backoffice/app/globals.css b/apps/backoffice/app/globals.css index 4da0009..7c35fce 100644 --- a/apps/backoffice/app/globals.css +++ b/apps/backoffice/app/globals.css @@ -215,6 +215,25 @@ button:hover { border-color: hsl(2 72% 48% / 0.5); } +.scan-btn { + background: linear-gradient(95deg, hsl(142 60% 45% / 0.15), hsl(142 60% 55% / 0.2)); + border-color: hsl(142 60% 45% / 0.5); + padding: 4px 12px; + font-size: 0.85rem; +} + +.delete-btn { + background: linear-gradient(95deg, hsl(2 72% 48% / 0.15), hsl(338 82% 62% / 0.2)); + border-color: hsl(2 72% 48% / 0.5); + padding: 4px 12px; + font-size: 0.85rem; +} + +.full-rebuild-btn { + background: linear-gradient(95deg, hsl(280 60% 45% / 0.15), hsl(280 60% 55% / 0.2)); + border-color: hsl(280 60% 45% / 0.5); +} + .status-pending { color: hsl(45 93% 47%); } .status-running { color: hsl(192 85% 55%); } .status-completed { color: hsl(142 60% 45%); } @@ -729,3 +748,426 @@ button:hover { max-width: 400px; display: inline-block; } + +/* Job Progress Component */ +.job-progress { + background: var(--card); + border: 1px solid var(--line); + border-radius: 12px; + padding: 16px; + margin: 8px 0; +} + +.progress-header { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 12px; +} + +.status-badge { + padding: 4px 10px; + border-radius: 6px; + font-size: 0.8rem; + font-weight: 700; + text-transform: uppercase; +} + +.status-badge.status-pending { + background: hsl(45 93% 90%); + color: hsl(45 93% 35%); +} + +.status-badge.status-running { + background: hsl(198 52% 90%); + color: hsl(198 78% 37%); +} + +.status-badge.status-success { + background: hsl(142 60% 90%); + color: hsl(142 60% 35%); +} + +.status-badge.status-failed { + background: hsl(2 72% 90%); + color: hsl(2 72% 45%); +} + +.status-badge.status-cancelled { + background: hsl(220 13% 90%); + color: hsl(220 13% 40%); +} + +.complete-badge { + padding: 4px 10px; + border-radius: 6px; + font-size: 0.75rem; + font-weight: 700; + background: hsl(142 60% 90%); + color: hsl(142 60% 35%); +} + +.progress-bar-container { + position: relative; + height: 24px; + background: var(--line); + border-radius: 12px; + overflow: hidden; + margin-bottom: 12px; +} + +.progress-bar-fill { + height: 100%; + background: linear-gradient(90deg, hsl(198 78% 37%), hsl(192 85% 55%)); + border-radius: 12px; + transition: width 0.3s ease; +} + +.progress-percent { + position: absolute; + right: 8px; + top: 50%; + transform: translateY(-50%); + font-size: 0.8rem; + font-weight: 700; + color: var(--foreground); +} + +.progress-stats { + display: flex; + justify-content: space-between; + align-items: center; + font-size: 0.9rem; + color: var(--text-muted); +} + +.current-file { + font-size: 0.8rem; + max-width: 300px; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.progress-detailed-stats { + display: flex; + gap: 16px; + margin-top: 12px; + padding-top: 12px; + border-top: 1px solid var(--line); + font-size: 0.85rem; +} + +.error-count { + color: hsl(2 72% 48%); + font-weight: 700; +} + +.progress-row { + background: hsl(198 52% 95%); +} + +.progress-row td { + padding: 0; +} + +.toggle-progress-btn { + margin-left: 8px; + padding: 2px 8px; + font-size: 0.75rem; + background: transparent; + border: 1px solid var(--line); +} + +/* Jobs Indicator */ +.jobs-indicator-container { + position: relative; +} + +.jobs-indicator { + position: relative; + display: flex; + align-items: center; + justify-content: center; + width: 36px; + height: 36px; + padding: 0; + border-radius: 8px; + background: transparent; + border: 1px solid var(--line); + color: var(--foreground); + cursor: pointer; + transition: all 0.2s ease; +} + +.jobs-indicator:hover { + background: hsl(198 52% 90% / 0.5); +} + +.jobs-indicator.active { + border-color: hsl(198 78% 37% / 0.5); +} + +.jobs-badge { + position: absolute; + top: -4px; + right: -4px; + min-width: 18px; + height: 18px; + padding: 0 5px; + background: hsl(2 72% 48%); + color: white; + font-size: 0.7rem; + font-weight: 700; + border-radius: 9px; + display: flex; + align-items: center; + justify-content: center; +} + +.jobs-pulse { + position: absolute; + bottom: 2px; + left: 2px; + width: 8px; + height: 8px; + background: hsl(142 60% 45%); + border-radius: 50%; + animation: pulse 2s infinite; +} + +@keyframes pulse { + 0%, 100% { opacity: 1; } + 50% { opacity: 0.5; } +} + +.jobs-dropdown { + position: absolute; + top: 100%; + right: 0; + margin-top: 8px; + min-width: 320px; + max-width: 400px; + background: var(--card); + border: 1px solid var(--line); + border-radius: 12px; + box-shadow: var(--shadow-2); + z-index: 100; + padding: 16px; +} + +.jobs-dropdown-header { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 12px; + padding-bottom: 12px; + border-bottom: 1px solid var(--line); +} + +.jobs-dropdown-header a { + font-size: 0.85rem; +} + +.jobs-empty { + text-align: center; + color: var(--text-muted); + padding: 16px; +} + +.jobs-list { + list-style: none; + margin: 0; + padding: 0; + max-height: 300px; + overflow-y: auto; +} + +.job-item { + padding: 12px; + border-bottom: 1px solid var(--line); +} + +.job-item:last-child { + border-bottom: none; +} + +.job-header { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 8px; +} + +.job-id { + font-size: 0.75rem; + color: var(--text-muted); +} + +.job-status { + font-size: 0.7rem; + padding: 2px 6px; + border-radius: 4px; + font-weight: 700; + text-transform: uppercase; +} + +.job-status.status-pending { + background: hsl(45 93% 90%); + color: hsl(45 93% 35%); +} + +.job-status.status-running { + background: hsl(198 52% 90%); + color: hsl(198 78% 37%); +} + +.job-mini-progress { + display: flex; + align-items: center; + gap: 8px; + margin-bottom: 4px; +} + +.job-progress-bar { + flex: 1; + height: 6px; + background: hsl(198 78% 37%); + border-radius: 3px; +} + +.job-mini-progress span { + font-size: 0.75rem; + color: var(--text-muted); + min-width: 35px; +} + +.job-file { + margin: 0; + font-size: 0.75rem; + color: var(--text-muted); + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.progress-loading, +.progress-error { + padding: 16px; + text-align: center; + color: var(--text-muted); +} + +.progress-error { + color: hsl(2 72% 48%); +} + +/* Dark mode overrides for new components */ +.dark .status-badge.status-pending { + background: hsl(45 93% 25%); + color: hsl(45 93% 65%); +} + +.dark .status-badge.status-running { + background: hsl(198 52% 25%); + color: hsl(198 78% 75%); +} + +.dark .status-badge.status-success { + background: hsl(142 60% 25%); + color: hsl(142 60% 65%); +} + +.dark .status-badge.status-failed { + background: hsl(2 72% 25%); + color: hsl(2 72% 65%); +} + +.dark .status-badge.status-cancelled { + background: hsl(220 13% 25%); + color: hsl(220 13% 65%); +} + +.dark .complete-badge { + background: hsl(142 60% 25%); + color: hsl(142 60% 65%); +} + +.dark .progress-row { + background: hsl(198 52% 15%); +} + +.dark .jobs-indicator:hover { + background: hsl(210 34% 24% / 0.5); +} + +.dark .job-status.status-pending { + background: hsl(45 93% 25%); + color: hsl(45 93% 65%); +} + +.dark .job-status.status-running { + background: hsl(198 52% 25%); + color: hsl(198 78% 75%); +} + +/* Progress bar visibility fix */ +.job-progress { + background: var(--card); + border: 1px solid var(--line); + border-radius: 12px; + padding: 16px; + margin: 8px 0; + min-height: 120px; +} + +.progress-bar-container { + position: relative; + height: 24px; + background: hsl(220 13% 90%); + border-radius: 12px; + overflow: hidden; + margin: 12px 0; +} + +.progress-bar-fill { + height: 100%; + background: linear-gradient(90deg, hsl(198 78% 37%), hsl(192 85% 55%)); + border-radius: 12px; + transition: width 0.5s ease; + min-width: 2px; +} + +.progress-percent { + position: absolute; + right: 8px; + top: 50%; + transform: translateY(-50%); + font-size: 0.85rem; + font-weight: 700; + color: var(--foreground); + text-shadow: 0 0 2px rgba(255,255,255,0.5); +} + +.progress-row { + background: hsl(198 52% 95%); +} + +.progress-row td { + padding: 0; +} + +/* Highlighted job row */ +tr.job-highlighted { + background: hsl(198 78% 95%); + box-shadow: inset 0 0 0 2px hsl(198 78% 37%); +} + +tr.job-highlighted td { + animation: pulse-border 2s ease-in-out infinite; +} + +@keyframes pulse-border { + 0%, 100% { box-shadow: inset 0 0 0 1px hsl(198 78% 37% / 0.3); } + 50% { box-shadow: inset 0 0 0 2px hsl(198 78% 37% / 0.6); } +} diff --git a/apps/backoffice/app/jobs/page.tsx b/apps/backoffice/app/jobs/page.tsx index d7dcd1b..592757d 100644 --- a/apps/backoffice/app/jobs/page.tsx +++ b/apps/backoffice/app/jobs/page.tsx @@ -1,9 +1,12 @@ import { revalidatePath } from "next/cache"; -import { listJobs, fetchLibraries, rebuildIndex, cancelJob, IndexJobDto, LibraryDto } from "../../lib/api"; +import { redirect } from "next/navigation"; +import { listJobs, fetchLibraries, rebuildIndex, IndexJobDto, LibraryDto } from "../../lib/api"; +import { JobsList } from "../components/JobsList"; export const dynamic = "force-dynamic"; -export default async function JobsPage() { +export default async function JobsPage({ searchParams }: { searchParams: Promise<{ highlight?: string }> }) { + const { highlight } = await searchParams; const [jobs, libraries] = await Promise.all([ listJobs().catch(() => [] as IndexJobDto[]), fetchLibraries().catch(() => [] as LibraryDto[]) @@ -14,17 +17,22 @@ export default async function JobsPage() { async function triggerRebuild(formData: FormData) { "use server"; const libraryId = formData.get("library_id") as string; - await rebuildIndex(libraryId || undefined); + const result = await rebuildIndex(libraryId || undefined); revalidatePath("/jobs"); + redirect(`/jobs?highlight=${result.id}`); } - async function cancelJobAction(formData: FormData) { + async function triggerFullRebuild(formData: FormData) { "use server"; - const id = formData.get("id") as string; - await cancelJob(id); + const libraryId = formData.get("library_id") as string; + const result = await rebuildIndex(libraryId || undefined, true); revalidatePath("/jobs"); + redirect(`/jobs?highlight=${result.id}`); } + const apiBaseUrl = process.env.API_BASE_URL || "http://api:8080"; + const apiToken = process.env.API_BOOTSTRAP_TOKEN || ""; + return ( <>

Index Jobs

@@ -40,43 +48,23 @@ export default async function JobsPage() { +
+ + +
- - - - - - - - - - - - - {jobs.map((job) => ( - - - - - - - - - ))} - -
IDLibraryTypeStatusCreatedActions
- {job.id.slice(0, 8)} - {job.library_id ? libraryMap.get(job.library_id) || job.library_id.slice(0, 8) : "—"}{job.type} - {job.status} - {job.error_opt && !} - {new Date(job.created_at).toLocaleString()} - {job.status === "pending" || job.status === "running" ? ( -
- - -
- ) : null} -
+ ); } diff --git a/apps/backoffice/app/layout.tsx b/apps/backoffice/app/layout.tsx index 26215f2..6c6825f 100644 --- a/apps/backoffice/app/layout.tsx +++ b/apps/backoffice/app/layout.tsx @@ -5,6 +5,7 @@ import type { ReactNode } from "react"; import "./globals.css"; import { ThemeProvider } from "./theme-provider"; import { ThemeToggle } from "./theme-toggle"; +import { JobsIndicatorWrapper } from "./components/JobsIndicatorWrapper"; export const metadata: Metadata = { title: "Stripstream Backoffice", @@ -12,6 +13,9 @@ export const metadata: Metadata = { }; export default function RootLayout({ children }: { children: ReactNode }) { + const apiBaseUrl = process.env.API_BASE_URL || "http://api:8080"; + const apiToken = process.env.API_BOOTSTRAP_TOKEN || ""; + return ( @@ -30,6 +34,7 @@ export default function RootLayout({ children }: { children: ReactNode }) { Jobs Tokens + diff --git a/apps/backoffice/lib/api.ts b/apps/backoffice/lib/api.ts index 06bf4a2..cdff8ed 100644 --- a/apps/backoffice/lib/api.ts +++ b/apps/backoffice/lib/api.ts @@ -123,12 +123,23 @@ export async function deleteLibrary(id: string) { return apiFetch(`/libraries/${id}`, { method: "DELETE" }); } +export async function scanLibrary(libraryId: string, full?: boolean) { + const body: { full?: boolean } = {}; + if (full) body.full = true; + return apiFetch(`/libraries/${libraryId}/scan`, { + method: "POST", + body: JSON.stringify(body) + }); +} + export async function listJobs() { return apiFetch("/index/status"); } -export async function rebuildIndex(libraryId?: string) { - const body = libraryId ? { library_id: libraryId } : {}; +export async function rebuildIndex(libraryId?: string, full?: boolean) { + const body: { library_id?: string; full?: boolean } = {}; + if (libraryId) body.library_id = libraryId; + if (full) body.full = true; return apiFetch("/index/rebuild", { method: "POST", body: JSON.stringify(body) diff --git a/apps/backoffice/next-env.d.ts b/apps/backoffice/next-env.d.ts index 9edff1c..c4b7818 100644 --- a/apps/backoffice/next-env.d.ts +++ b/apps/backoffice/next-env.d.ts @@ -1,6 +1,6 @@ /// /// -import "./.next/types/routes.d.ts"; +import "./.next/dev/types/routes.d.ts"; // NOTE: This file should not be edited // see https://nextjs.org/docs/app/api-reference/config/typescript for more information. diff --git a/apps/indexer/src/main.rs b/apps/indexer/src/main.rs index 2b56ff7..997bebb 100644 --- a/apps/indexer/src/main.rs +++ b/apps/indexer/src/main.rs @@ -8,10 +8,28 @@ use sha2::{Digest, Sha256}; use sqlx::{postgres::PgPoolOptions, Row}; use std::{collections::HashMap, path::Path, time::Duration}; use stripstream_core::config::IndexerConfig; -use tracing::{error, info}; +use tracing::{error, info, trace, warn}; use uuid::Uuid; use walkdir::WalkDir; +fn remap_libraries_path(path: &str) -> String { + if let Ok(root) = std::env::var("LIBRARIES_ROOT_PATH") { + if path.starts_with("/libraries/") { + return path.replacen("/libraries", &root, 1); + } + } + path.to_string() +} + +fn unmap_libraries_path(path: &str) -> String { + if let Ok(root) = std::env::var("LIBRARIES_ROOT_PATH") { + if path.starts_with(&root) { + return path.replacen(&root, "/libraries", 1); + } + } + path.to_string() +} + #[derive(Clone)] struct AppState { pool: sqlx::PgPool, @@ -77,14 +95,20 @@ async fn run_worker(state: AppState, interval_seconds: u64) { loop { match claim_next_job(&state.pool).await { Ok(Some((job_id, library_id))) => { + info!("[INDEXER] Starting job {} library={:?}", job_id, library_id); if let Err(err) = process_job(&state, job_id, library_id).await { - error!(job_id = %job_id, error = %err, "index job failed"); + error!("[INDEXER] Job {} failed: {}", job_id, err); let _ = fail_job(&state.pool, job_id, &err.to_string()).await; + } else { + info!("[INDEXER] Job {} completed", job_id); } } - Ok(None) => tokio::time::sleep(wait).await, + Ok(None) => { + trace!("[INDEXER] No pending jobs, waiting..."); + tokio::time::sleep(wait).await; + } Err(err) => { - error!(error = %err, "worker loop error"); + error!("[INDEXER] Worker error: {}", err); tokio::time::sleep(wait).await; } } @@ -124,6 +148,38 @@ async fn claim_next_job(pool: &sqlx::PgPool) -> anyhow::Result) -> anyhow::Result<()> { + info!("[JOB] Processing {} library={:?}", job_id, target_library_id); + + // Get job type to check if it's a full rebuild + let job_type: String = sqlx::query_scalar("SELECT type FROM index_jobs WHERE id = $1") + .bind(job_id) + .fetch_one(&state.pool) + .await?; + let is_full_rebuild = job_type == "full_rebuild"; + info!("[JOB] {} type={} full_rebuild={}", job_id, job_type, is_full_rebuild); + + // For full rebuilds, delete existing data first + if is_full_rebuild { + info!("[JOB] Full rebuild: deleting existing data"); + if let Some(library_id) = target_library_id { + // Delete books and files for specific library + sqlx::query("DELETE FROM book_files WHERE book_id IN (SELECT id FROM books WHERE library_id = $1)") + .bind(library_id) + .execute(&state.pool) + .await?; + sqlx::query("DELETE FROM books WHERE library_id = $1") + .bind(library_id) + .execute(&state.pool) + .await?; + info!("[JOB] Deleted existing data for library {}", library_id); + } else { + // Delete all books and files + sqlx::query("DELETE FROM book_files").execute(&state.pool).await?; + sqlx::query("DELETE FROM books").execute(&state.pool).await?; + info!("[JOB] Deleted all existing data"); + } + } + let libraries = if let Some(library_id) = target_library_id { sqlx::query("SELECT id, root_path FROM libraries WHERE id = $1 AND enabled = TRUE") .bind(library_id) @@ -135,6 +191,25 @@ async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Option {} Err(err) => { stats.errors += 1; @@ -156,7 +232,7 @@ async fn process_job(state: &AppState, job_id: Uuid, target_library_id: Option any async fn scan_library( state: &AppState, + job_id: Uuid, library_id: Uuid, root: &Path, stats: &mut JobStats, + total_files: usize, + is_full_rebuild: bool, ) -> anyhow::Result<()> { let existing_rows = sqlx::query( r#" @@ -193,14 +272,22 @@ async fn scan_library( .await?; let mut existing: HashMap = HashMap::new(); - for row in existing_rows { - existing.insert( - row.get("abs_path"), - (row.get("file_id"), row.get("book_id"), row.get("fingerprint")), - ); + // For full rebuilds, don't use existing files - force reindex of everything + if !is_full_rebuild { + for row in existing_rows { + let abs_path: String = row.get("abs_path"); + // Remap for local development to match scanned paths + let remapped_path = remap_libraries_path(&abs_path); + existing.insert( + remapped_path, + (row.get("file_id"), row.get("book_id"), row.get("fingerprint")), + ); + } } let mut seen: HashMap = HashMap::new(); + let mut processed_count = 0i32; + for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) { if !entry.file_type().is_file() { continue; @@ -212,9 +299,43 @@ async fn scan_library( }; stats.scanned_files += 1; - let abs_path = path.to_string_lossy().to_string(); + processed_count += 1; + let abs_path_local = path.to_string_lossy().to_string(); + // Convert local path to /libraries format for DB storage + let abs_path = unmap_libraries_path(&abs_path_local); + let file_name = path.file_name() + .map(|s| s.to_string_lossy().to_string()) + .unwrap_or_else(|| abs_path.clone()); + + info!("[SCAN] Job {} processing file {}/{}: {}", job_id, processed_count, total_files, file_name); + let start_time = std::time::Instant::now(); + + // Update progress in DB + let progress_percent = if total_files > 0 { + ((processed_count as f64 / total_files as f64) * 100.0) as i32 + } else { + 0 + }; + + let db_start = std::time::Instant::now(); + sqlx::query( + "UPDATE index_jobs SET current_file = $2, processed_files = $3, progress_percent = $4 WHERE id = $1" + ) + .bind(job_id) + .bind(&file_name) + .bind(processed_count) + .bind(progress_percent) + .execute(&state.pool) + .await + .map_err(|e| { + error!("[BDD] Failed to update progress for job {}: {}", job_id, e); + e + })?; + info!("[BDD] Progress update took {:?}", db_start.elapsed()); + seen.insert(abs_path.clone(), true); + let meta_start = std::time::Instant::now(); let metadata = std::fs::metadata(path) .with_context(|| format!("cannot stat {}", path.display()))?; let mtime: DateTime = metadata @@ -222,14 +343,22 @@ async fn scan_library( .map(DateTime::::from) .unwrap_or_else(|_| Utc::now()); let fingerprint = compute_fingerprint(path, metadata.len(), &mtime)?; + info!("[META] Metadata+fingerprint took {:?}", meta_start.elapsed()); if let Some((file_id, book_id, old_fingerprint)) = existing.get(&abs_path).cloned() { - if old_fingerprint == fingerprint { + // Skip fingerprint check for full rebuilds - always reindex + if !is_full_rebuild && old_fingerprint == fingerprint { + info!("[SKIP] File unchanged, skipping: {} (total time: {:?})", file_name, start_time.elapsed()); continue; } + info!("[PARSER] Starting parse_metadata for: {}", file_name); + let parse_start = std::time::Instant::now(); match parse_metadata(path, format, root) { Ok(parsed) => { + info!("[PARSER] Parsing took {:?} for {} (pages={:?})", parse_start.elapsed(), file_name, parsed.page_count); + + let db_start = std::time::Instant::now(); sqlx::query( "UPDATE books SET title = $2, kind = $3, series = $4, volume = $5, page_count = $6, updated_at = NOW() WHERE id = $1", ) @@ -252,10 +381,13 @@ async fn scan_library( .bind(fingerprint) .execute(&state.pool) .await?; + info!("[BDD] UPDATE took {:?} for {}", db_start.elapsed(), file_name); stats.indexed_files += 1; + info!("[DONE] Updated file {} (total time: {:?})", file_name, start_time.elapsed()); } Err(err) => { + warn!("[PARSER] Failed to parse {} after {:?}: {}", file_name, parse_start.elapsed(), err); stats.errors += 1; sqlx::query( "UPDATE book_files SET parse_status = 'error', parse_error_opt = $2, updated_at = NOW() WHERE id = $1", @@ -264,14 +396,29 @@ async fn scan_library( .bind(err.to_string()) .execute(&state.pool) .await?; + + // Store error in index_job_errors table + sqlx::query( + "INSERT INTO index_job_errors (job_id, file_path, error_message) VALUES ($1, $2, $3)" + ) + .bind(job_id) + .bind(&abs_path) + .bind(err.to_string()) + .execute(&state.pool) + .await?; } } continue; } + info!("[PARSER] Starting parse_metadata for new file: {}", file_name); + let parse_start = std::time::Instant::now(); match parse_metadata(path, format, root) { Ok(parsed) => { + info!("[PARSER] Parsing took {:?} for {} (pages={:?})", parse_start.elapsed(), file_name, parsed.page_count); + + let db_start = std::time::Instant::now(); let book_id = Uuid::new_v4(); let file_id = Uuid::new_v4(); sqlx::query( @@ -299,10 +446,13 @@ async fn scan_library( .bind(fingerprint) .execute(&state.pool) .await?; + info!("[BDD] INSERT took {:?} for {}", db_start.elapsed(), file_name); stats.indexed_files += 1; + info!("[DONE] Inserted new file {} (total time: {:?})", file_name, start_time.elapsed()); } Err(err) => { + warn!("[PARSER] Failed to parse {} after {:?}: {}", file_name, parse_start.elapsed(), err); stats.errors += 1; let book_id = Uuid::new_v4(); let file_id = Uuid::new_v4(); @@ -329,6 +479,16 @@ async fn scan_library( .bind(err.to_string()) .execute(&state.pool) .await?; + + // Store error in index_job_errors table + sqlx::query( + "INSERT INTO index_job_errors (job_id, file_path, error_message) VALUES ($1, $2, $3)" + ) + .bind(job_id) + .bind(&abs_path) + .bind(err.to_string()) + .execute(&state.pool) + .await?; } } } @@ -383,7 +543,7 @@ struct SearchDoc { title: String, author: Option, series: Option, - volume: Option, + volume: Option, language: Option, } @@ -405,6 +565,13 @@ async fn sync_meili(pool: &sqlx::PgPool, meili_url: &str, meili_master_key: &str .send() .await; + // Clear existing documents to avoid stale data + let _ = client + .delete(format!("{base}/indexes/books/documents")) + .header("Authorization", format!("Bearer {meili_master_key}")) + .send() + .await; + let rows = sqlx::query( "SELECT id, library_id, kind, title, author, series, volume, language FROM books", ) diff --git a/crates/parsers/src/lib.rs b/crates/parsers/src/lib.rs index 2f04bcd..52e9e90 100644 --- a/crates/parsers/src/lib.rs +++ b/crates/parsers/src/lib.rs @@ -176,9 +176,30 @@ fn parse_cbr_page_count(path: &Path) -> Result { } fn parse_pdf_page_count(path: &Path) -> Result { - let doc = lopdf::Document::load(path) - .with_context(|| format!("cannot open pdf: {}", path.display()))?; - Ok(doc.get_pages().len() as i32) + // Use pdfinfo command line tool instead of lopdf for better performance + let output = std::process::Command::new("pdfinfo") + .arg(path) + .output() + .with_context(|| format!("failed to execute pdfinfo for {}", path.display()))?; + + if !output.status.success() { + return Err(anyhow::anyhow!("pdfinfo failed for {}", path.display())); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + for line in stdout.lines() { + if line.starts_with("Pages:") { + if let Some(pages_str) = line.split_whitespace().nth(1) { + return pages_str + .parse::() + .with_context(|| format!("cannot parse page count: {}", pages_str)); + } + } + } + + Err(anyhow::anyhow!( + "could not find page count in pdfinfo output" + )) } fn is_image_name(name: &str) -> bool { diff --git a/infra/migrations/0003_index_job_errors.sql b/infra/migrations/0003_index_job_errors.sql new file mode 100644 index 0000000..fcae358 --- /dev/null +++ b/infra/migrations/0003_index_job_errors.sql @@ -0,0 +1,26 @@ +-- Migration: Progression temps reel et erreurs d'indexation (T19 - Option 2) + +-- Colonnes pour la progression temps reel dans index_jobs +ALTER TABLE index_jobs + ADD COLUMN IF NOT EXISTS current_file TEXT, + ADD COLUMN IF NOT EXISTS progress_percent INTEGER CHECK (progress_percent >= 0 AND progress_percent <= 100), + ADD COLUMN IF NOT EXISTS total_files INTEGER, + ADD COLUMN IF NOT EXISTS processed_files INTEGER DEFAULT 0; + +-- Table pour stocker les erreurs d'indexation +CREATE TABLE IF NOT EXISTS index_job_errors ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + job_id UUID NOT NULL REFERENCES index_jobs(id) ON DELETE CASCADE, + file_path TEXT NOT NULL, + error_message TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_index_job_errors_job_id ON index_job_errors(job_id); +CREATE INDEX IF NOT EXISTS idx_index_job_errors_created_at ON index_job_errors(created_at); + +COMMENT ON TABLE index_job_errors IS 'Stocke uniquement les erreurs d indexation pour debug'; +COMMENT ON COLUMN index_jobs.current_file IS 'Fichier en cours de traitement (pour affichage temps reel)'; +COMMENT ON COLUMN index_jobs.progress_percent IS 'Pourcentage de completion estime'; +COMMENT ON COLUMN index_jobs.total_files IS 'Nombre total de fichiers a traiter (estimation)'; +COMMENT ON COLUMN index_jobs.processed_files IS 'Nombre de fichiers deja traites'; diff --git a/infra/migrations/0004_library_monitoring.sql b/infra/migrations/0004_library_monitoring.sql new file mode 100644 index 0000000..e339d47 --- /dev/null +++ b/infra/migrations/0004_library_monitoring.sql @@ -0,0 +1,17 @@ +-- Migration: Surveillance automatique des libraries (T23) +-- Ajout des colonnes pour la configuration du scan automatique + +ALTER TABLE libraries + ADD COLUMN IF NOT EXISTS monitor_enabled BOOLEAN NOT NULL DEFAULT FALSE, + ADD COLUMN IF NOT EXISTS scan_mode TEXT NOT NULL DEFAULT 'manual' CHECK (scan_mode IN ('manual', 'hourly', 'daily', 'weekly')), + ADD COLUMN IF NOT EXISTS last_scan_at TIMESTAMPTZ, + ADD COLUMN IF NOT EXISTS next_scan_at TIMESTAMPTZ; + +-- Index pour trouver rapidement les libraries à scanner +CREATE INDEX IF NOT EXISTS idx_libraries_monitor_enabled ON libraries(monitor_enabled); +CREATE INDEX IF NOT EXISTS idx_libraries_next_scan_at ON libraries(next_scan_at) WHERE monitor_enabled = TRUE; + +COMMENT ON COLUMN libraries.monitor_enabled IS 'Active la surveillance automatique de la library'; +COMMENT ON COLUMN libraries.scan_mode IS 'Mode de scan: manual, hourly (60min), daily (1440min), weekly (10080min)'; +COMMENT ON COLUMN libraries.last_scan_at IS 'Date du dernier scan (manuel ou automatique)'; +COMMENT ON COLUMN libraries.next_scan_at IS 'Date du prochain scan automatique prévu'; diff --git a/infra/migrations/0005_full_rebuild_type.sql b/infra/migrations/0005_full_rebuild_type.sql new file mode 100644 index 0000000..cee328b --- /dev/null +++ b/infra/migrations/0005_full_rebuild_type.sql @@ -0,0 +1,6 @@ +-- Migration: Ajout du type de job "full_rebuild" pour réindexation complète + +ALTER TABLE index_jobs + DROP CONSTRAINT IF EXISTS index_jobs_type_check, + ADD CONSTRAINT index_jobs_type_check + CHECK (type IN ('scan', 'rebuild', 'full_rebuild'));