Compare commits

...

11 Commits

Author SHA1 Message Date
fd277602c9 feat(api): add GET /series/ongoing and GET /books/ongoing endpoints
Two new read routes for the home screen:
- /series/ongoing: partially read series sorted by last activity
- /books/ongoing: next unread book per ongoing series

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 16:24:05 +01:00
673777bc8d chore: bump version to 0.2.0 2026-03-15 16:23:09 +01:00
03af82d065 feat(tokens): allow permanent deletion of revoked tokens
Add POST /admin/tokens/{id}/delete endpoint that permanently removes
a token from the database (only if already revoked). Add delete button
in backoffice UI for revoked tokens.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 15:19:44 +01:00
78e28a269d chore: bump version to 0.1.5 2026-03-15 15:17:16 +01:00
ee05df26c4 fix(indexer): corriger OOM lors du full rebuild (batching + limite threads)
- Extraction par batches de 200 livres (libère mémoire entre chaque batch)
- Limiter tokio spawn_blocking à 8 threads (défaut 512, chaque thread ~8MB stack)
- Réduire concurrence extraction de 8 à 2 max
- Supprimer raw_bytes.clone() inutile (passage par ownership)
- Ajouter log RSS entre chaque batch pour diagnostic mémoire

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 13:34:14 +01:00
96d9efdeed chore: bump version to 0.1.4 2026-03-15 13:20:41 +01:00
9f5183848b chore: bump version to 0.1.3 2026-03-15 13:09:53 +01:00
6f9dd108ef chore: bump version to 0.1.2 2026-03-15 13:06:36 +01:00
61bc307715 perf(parsers): optimiser listing CBZ avec file_names(), ajouter magic bytes check RAR
- Remplacer by_index() par file_names() pour lister les pages ZIP (zero I/O)
- Ajouter vérification magic bytes avant fallback RAR
- Ajouter tracing debug logs dans parsers
- Script docker-push avec version bump interactif

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 13:01:04 +01:00
c7f3ad981d chore: bump version to 0.1.1 2026-03-15 12:51:54 +01:00
0d60d46cae feat(indexer,backoffice): logs par domaine, réduction fd, UI mobile
- Ajout de targets de log par domaine (scan, extraction, thumbnail, watcher)
  contrôlables via RUST_LOG pour activer/désactiver les logs granulaires
- Ajout de logs détaillés dans extracting_pages (per-book timing en debug,
  progression toutes les 25 books en info)
- Réduction de la consommation de fd: walkdir max_open(20/10), comptage
  séquentiel au lieu de par_iter parallèle, suppression de rayon
- Détection ENFILE dans le scanner: abort après 10 erreurs IO consécutives
- Backoffice: settings dans le burger mobile, masquer "backoffice" et
  icône settings en mobile

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 11:57:49 +01:00
21 changed files with 678 additions and 198 deletions

View File

@@ -34,6 +34,24 @@ MEILI_URL=http://meilisearch:7700
# PostgreSQL Database
DATABASE_URL=postgres://stripstream:stripstream@postgres:5432/stripstream
# =============================================================================
# Logging
# =============================================================================
# Log levels per domain. Default: indexer=info,scan=info,extraction=info,thumbnail=warn,watcher=info
# Domains:
# scan — filesystem scan (discovery phase)
# extraction — page extraction from archives (extracting_pages phase)
# thumbnail — thumbnail generation (resize/encode)
# watcher — file watcher polling
# indexer — general indexer logs
# Levels: error, warn, info, debug, trace
# Examples:
# RUST_LOG=indexer=info # default, quiet thumbnails
# RUST_LOG=indexer=info,thumbnail=debug # enable thumbnail timing logs
# RUST_LOG=indexer=info,extraction=debug # per-book extraction details
# RUST_LOG=indexer=debug,scan=debug,extraction=debug,thumbnail=debug,watcher=debug # tout voir
# RUST_LOG=indexer=info,scan=info,extraction=info,thumbnail=warn,watcher=info
# =============================================================================
# Storage Configuration
# =============================================================================

10
Cargo.lock generated
View File

@@ -51,7 +51,7 @@ checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c"
[[package]]
name = "api"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"anyhow",
"argon2",
@@ -1122,7 +1122,7 @@ dependencies = [
[[package]]
name = "indexer"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"anyhow",
"axum",
@@ -1132,7 +1132,6 @@ dependencies = [
"jpeg-decoder",
"num_cpus",
"parsers",
"rayon",
"reqwest",
"serde",
"serde_json",
@@ -1625,7 +1624,7 @@ dependencies = [
[[package]]
name = "parsers"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"anyhow",
"flate2",
@@ -1634,6 +1633,7 @@ dependencies = [
"natord",
"pdfium-render",
"regex",
"tracing",
"unrar",
"zip 8.2.0",
]
@@ -2626,7 +2626,7 @@ dependencies = [
[[package]]
name = "stripstream-core"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"anyhow",
"serde",

View File

@@ -9,7 +9,7 @@ resolver = "2"
[workspace.package]
edition = "2021"
version = "0.1.0"
version = "0.2.0"
license = "MIT"
[workspace.dependencies]

View File

@@ -471,6 +471,175 @@ pub async fn list_series(
}))
}
#[derive(Deserialize, ToSchema)]
pub struct OngoingQuery {
#[schema(value_type = Option<i64>, example = 10)]
pub limit: Option<i64>,
}
/// List ongoing series (partially read, sorted by most recent activity)
#[utoipa::path(
get,
path = "/series/ongoing",
tag = "books",
params(
("limit" = Option<i64>, Query, description = "Max items to return (default 10, max 50)"),
),
responses(
(status = 200, body = Vec<SeriesItem>),
(status = 401, description = "Unauthorized"),
),
security(("Bearer" = []))
)]
pub async fn ongoing_series(
State(state): State<AppState>,
Query(query): Query<OngoingQuery>,
) -> Result<Json<Vec<SeriesItem>>, ApiError> {
let limit = query.limit.unwrap_or(10).clamp(1, 50);
let rows = sqlx::query(
r#"
WITH series_stats AS (
SELECT
COALESCE(NULLIF(b.series, ''), 'unclassified') AS name,
COUNT(*) AS book_count,
COUNT(brp.book_id) FILTER (WHERE brp.status = 'read') AS books_read_count,
MAX(brp.last_read_at) AS last_read_at
FROM books b
LEFT JOIN book_reading_progress brp ON brp.book_id = b.id
GROUP BY COALESCE(NULLIF(b.series, ''), 'unclassified')
HAVING (
COUNT(brp.book_id) FILTER (WHERE brp.status IN ('read', 'reading')) > 0
AND COUNT(brp.book_id) FILTER (WHERE brp.status = 'read') < COUNT(*)
)
),
first_books AS (
SELECT
COALESCE(NULLIF(series, ''), 'unclassified') AS name,
id,
ROW_NUMBER() OVER (
PARTITION BY COALESCE(NULLIF(series, ''), 'unclassified')
ORDER BY
REGEXP_REPLACE(LOWER(title), '[0-9]+', '', 'g'),
COALESCE((REGEXP_MATCH(LOWER(title), '\d+'))[1]::int, 0),
title ASC
) AS rn
FROM books
)
SELECT ss.name, ss.book_count, ss.books_read_count, fb.id AS first_book_id
FROM series_stats ss
JOIN first_books fb ON fb.name = ss.name AND fb.rn = 1
ORDER BY ss.last_read_at DESC NULLS LAST
LIMIT $1
"#,
)
.bind(limit)
.fetch_all(&state.pool)
.await?;
let items: Vec<SeriesItem> = rows
.iter()
.map(|row| SeriesItem {
name: row.get("name"),
book_count: row.get("book_count"),
books_read_count: row.get("books_read_count"),
first_book_id: row.get("first_book_id"),
})
.collect();
Ok(Json(items))
}
/// List next unread book for each ongoing series (sorted by most recent activity)
#[utoipa::path(
get,
path = "/books/ongoing",
tag = "books",
params(
("limit" = Option<i64>, Query, description = "Max items to return (default 10, max 50)"),
),
responses(
(status = 200, body = Vec<BookItem>),
(status = 401, description = "Unauthorized"),
),
security(("Bearer" = []))
)]
pub async fn ongoing_books(
State(state): State<AppState>,
Query(query): Query<OngoingQuery>,
) -> Result<Json<Vec<BookItem>>, ApiError> {
let limit = query.limit.unwrap_or(10).clamp(1, 50);
let rows = sqlx::query(
r#"
WITH ongoing_series AS (
SELECT
COALESCE(NULLIF(b.series, ''), 'unclassified') AS name,
MAX(brp.last_read_at) AS series_last_read_at
FROM books b
LEFT JOIN book_reading_progress brp ON brp.book_id = b.id
GROUP BY COALESCE(NULLIF(b.series, ''), 'unclassified')
HAVING (
COUNT(brp.book_id) FILTER (WHERE brp.status IN ('read', 'reading')) > 0
AND COUNT(brp.book_id) FILTER (WHERE brp.status = 'read') < COUNT(*)
)
),
next_books AS (
SELECT
b.id, b.library_id, b.kind, b.format, b.title, b.author, b.series, b.volume,
b.language, b.page_count, b.thumbnail_path, b.updated_at,
COALESCE(brp.status, 'unread') AS reading_status,
brp.current_page AS reading_current_page,
brp.last_read_at AS reading_last_read_at,
os.series_last_read_at,
ROW_NUMBER() OVER (
PARTITION BY COALESCE(NULLIF(b.series, ''), 'unclassified')
ORDER BY b.volume NULLS LAST, b.title
) AS rn
FROM books b
JOIN ongoing_series os ON COALESCE(NULLIF(b.series, ''), 'unclassified') = os.name
LEFT JOIN book_reading_progress brp ON brp.book_id = b.id
WHERE COALESCE(brp.status, 'unread') != 'read'
)
SELECT id, library_id, kind, format, title, author, series, volume, language, page_count,
thumbnail_path, updated_at, reading_status, reading_current_page, reading_last_read_at
FROM next_books
WHERE rn = 1
ORDER BY series_last_read_at DESC NULLS LAST
LIMIT $1
"#,
)
.bind(limit)
.fetch_all(&state.pool)
.await?;
let items: Vec<BookItem> = rows
.iter()
.map(|row| {
let thumbnail_path: Option<String> = row.get("thumbnail_path");
BookItem {
id: row.get("id"),
library_id: row.get("library_id"),
kind: row.get("kind"),
format: row.get("format"),
title: row.get("title"),
author: row.get("author"),
series: row.get("series"),
volume: row.get("volume"),
language: row.get("language"),
page_count: row.get("page_count"),
thumbnail_url: thumbnail_path.map(|_| format!("/books/{}/thumbnail", row.get::<Uuid, _>("id"))),
updated_at: row.get("updated_at"),
reading_status: row.get("reading_status"),
reading_current_page: row.get("reading_current_page"),
reading_last_read_at: row.get("reading_last_read_at"),
}
})
.collect();
Ok(Json(items))
}
fn remap_libraries_path(path: &str) -> String {
if let Ok(root) = std::env::var("LIBRARIES_ROOT_PATH") {
if path.starts_with("/libraries/") {

View File

@@ -96,6 +96,7 @@ async fn main() -> anyhow::Result<()> {
.route("/folders", get(index_jobs::list_folders))
.route("/admin/tokens", get(tokens::list_tokens).post(tokens::create_token))
.route("/admin/tokens/:id", delete(tokens::revoke_token))
.route("/admin/tokens/:id/delete", axum::routing::post(tokens::delete_token))
.merge(settings::settings_routes())
.route_layer(middleware::from_fn_with_state(
state.clone(),
@@ -104,11 +105,13 @@ async fn main() -> anyhow::Result<()> {
let read_routes = Router::new()
.route("/books", get(books::list_books))
.route("/books/ongoing", get(books::ongoing_books))
.route("/books/:id", get(books::get_book))
.route("/books/:id/thumbnail", get(books::get_thumbnail))
.route("/books/:id/pages/:n", get(pages::get_page))
.route("/books/:id/progress", get(reading_progress::get_reading_progress).patch(reading_progress::update_reading_progress))
.route("/libraries/:library_id/series", get(books::list_series))
.route("/series/ongoing", get(books::ongoing_series))
.route("/search", get(search::search_books))
.route_layer(middleware::from_fn_with_state(state.clone(), api_middleware::read_rate_limit))
.route_layer(middleware::from_fn_with_state(

View File

@@ -10,6 +10,8 @@ use utoipa::OpenApi;
crate::reading_progress::update_reading_progress,
crate::books::get_thumbnail,
crate::books::list_series,
crate::books::ongoing_series,
crate::books::ongoing_books,
crate::books::convert_book,
crate::pages::get_page,
crate::search::search_books,
@@ -31,6 +33,7 @@ use utoipa::OpenApi;
crate::tokens::list_tokens,
crate::tokens::create_token,
crate::tokens::revoke_token,
crate::tokens::delete_token,
crate::settings::get_settings,
crate::settings::get_setting,
crate::settings::update_setting,
@@ -48,6 +51,7 @@ use utoipa::OpenApi;
crate::reading_progress::UpdateReadingProgressRequest,
crate::books::SeriesItem,
crate::books::SeriesPage,
crate::books::OngoingQuery,
crate::pages::PageQuery,
crate::search::SearchQuery,
crate::search::SearchResponse,

View File

@@ -170,3 +170,35 @@ pub async fn revoke_token(
Ok(Json(serde_json::json!({"revoked": true, "id": id})))
}
/// Permanently delete a revoked API token
#[utoipa::path(
post,
path = "/admin/tokens/{id}/delete",
tag = "tokens",
params(
("id" = String, Path, description = "Token UUID"),
),
responses(
(status = 200, description = "Token permanently deleted"),
(status = 404, description = "Token not found or not revoked"),
(status = 401, description = "Unauthorized"),
(status = 403, description = "Forbidden - Admin scope required"),
),
security(("Bearer" = []))
)]
pub async fn delete_token(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> Result<Json<serde_json::Value>, ApiError> {
let result = sqlx::query("DELETE FROM api_tokens WHERE id = $1 AND revoked_at IS NOT NULL")
.bind(id)
.execute(&state.pool)
.await?;
if result.rows_affected() == 0 {
return Err(ApiError::not_found("token not found or not revoked"));
}
Ok(Json(serde_json::json!({"deleted": true, "id": id})))
}

View File

@@ -68,6 +68,17 @@ export function MobileNav({ navItems }: { navItems: NavItem[] }) {
<span className="font-medium">{item.label}</span>
</Link>
))}
<div className="border-t border-border/40 mt-2 pt-2">
<Link
href="/settings"
className="flex items-center gap-3 px-3 py-3 rounded-lg text-muted-foreground hover:text-foreground hover:bg-accent transition-colors duration-200 active:scale-[0.98]"
onClick={() => setIsOpen(false)}
>
<NavIcon name="settings" />
<span className="font-medium">Settings</span>
</Link>
</div>
</nav>
</div>
</>

View File

@@ -52,7 +52,7 @@ export default function RootLayout({ children }: { children: ReactNode }) {
<span className="text-xl font-bold tracking-tight text-foreground">
StripStream
</span>
<span className="text-sm text-muted-foreground font-medium">
<span className="text-sm text-muted-foreground font-medium hidden md:inline">
backoffice
</span>
</div>
@@ -74,7 +74,7 @@ export default function RootLayout({ children }: { children: ReactNode }) {
<JobsIndicator />
<Link
href="/settings"
className="p-2 rounded-lg text-muted-foreground hover:text-foreground hover:bg-accent transition-colors"
className="hidden md:flex p-2 rounded-lg text-muted-foreground hover:text-foreground hover:bg-accent transition-colors"
title="Settings"
>
<Icon name="settings" size="md" />

View File

@@ -1,6 +1,6 @@
import { revalidatePath } from "next/cache";
import { redirect } from "next/navigation";
import { listTokens, createToken, revokeToken, TokenDto } from "../../lib/api";
import { listTokens, createToken, revokeToken, deleteToken, TokenDto } from "../../lib/api";
import { Card, CardHeader, CardTitle, CardDescription, CardContent, Button, Badge, FormField, FormInput, FormSelect, FormRow } from "../components/ui";
export const dynamic = "force-dynamic";
@@ -31,6 +31,13 @@ export default async function TokensPage({
revalidatePath("/tokens");
}
async function deleteTokenAction(formData: FormData) {
"use server";
const id = formData.get("id") as string;
await deleteToken(id);
revalidatePath("/tokens");
}
return (
<>
<div className="mb-6">
@@ -109,7 +116,7 @@ export default async function TokensPage({
)}
</td>
<td className="px-4 py-3">
{!token.revoked_at && (
{!token.revoked_at ? (
<form action={revokeTokenAction}>
<input type="hidden" name="id" value={token.id} />
<Button type="submit" variant="destructive" size="sm">
@@ -119,6 +126,16 @@ export default async function TokensPage({
Revoke
</Button>
</form>
) : (
<form action={deleteTokenAction}>
<input type="hidden" name="id" value={token.id} />
<Button type="submit" variant="destructive" size="sm">
<svg className="w-4 h-4 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M19 7l-.867 12.142A2 2 0 0116.138 21H7.862a2 2 0 01-1.995-1.858L5 7m5 4v6m4-6v6m1-10V4a1 1 0 00-1-1h-4a1 1 0 00-1 1v3M4 7h16" />
</svg>
Delete
</Button>
</form>
)}
</td>
</tr>

View File

@@ -254,6 +254,10 @@ export async function revokeToken(id: string) {
return apiFetch<void>(`/admin/tokens/${id}`, { method: "DELETE" });
}
export async function deleteToken(id: string) {
return apiFetch<void>(`/admin/tokens/${id}/delete`, { method: "POST" });
}
export async function fetchBooks(
libraryId?: string,
series?: string,

View File

@@ -1,6 +1,6 @@
{
"name": "stripstream-backoffice",
"version": "0.1.0",
"version": "0.2.0",
"private": true,
"scripts": {
"dev": "next dev -p 7082",

View File

@@ -15,7 +15,6 @@ image.workspace = true
jpeg-decoder.workspace = true
num_cpus.workspace = true
parsers = { path = "../../crates/parsers" }
rayon.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -6,7 +6,7 @@ use sqlx::Row;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
use std::sync::Arc;
use tracing::{info, warn};
use tracing::{debug, info, warn};
use uuid::Uuid;
use crate::{job::is_job_cancelled, utils, AppState};
@@ -89,7 +89,7 @@ async fn load_thumbnail_concurrency(pool: &sqlx::PgPool) -> usize {
// Default: half the logical CPUs, clamped between 2 and 8.
// Archive extraction is I/O bound but benefits from moderate parallelism.
let cpus = num_cpus::get();
let default_concurrency = (cpus / 2).clamp(2, 8);
let default_concurrency = (cpus / 2).clamp(1, 2);
let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#)
.fetch_optional(pool)
.await;
@@ -179,7 +179,8 @@ fn generate_thumbnail(image_bytes: &[u8], config: &ThumbnailConfig) -> anyhow::R
let t_resize = t1.elapsed();
let format = config.format.as_deref().unwrap_or("webp");
info!(
debug!(
target: "thumbnail",
"[THUMBNAIL] {}x{} -> {}x{} decode={:.0}ms resize={:.0}ms encode_format={}",
orig_w, orig_h, w, h,
t_decode.as_secs_f64() * 1000.0,
@@ -237,7 +238,8 @@ fn generate_thumbnail(image_bytes: &[u8], config: &ThumbnailConfig) -> anyhow::R
}
};
let t_encode = t2.elapsed();
info!(
debug!(
target: "thumbnail",
"[THUMBNAIL] encode={:.0}ms total={:.0}ms output_size={}KB",
t_encode.as_secs_f64() * 1000.0,
t0.elapsed().as_secs_f64() * 1000.0,
@@ -263,7 +265,7 @@ fn resize_raw_to_thumbnail(
) -> anyhow::Result<String> {
let raw_bytes = std::fs::read(raw_path)
.map_err(|e| anyhow::anyhow!("failed to read raw image {}: {}", raw_path, e))?;
info!("[THUMBNAIL] book={} raw_size={}KB", book_id, raw_bytes.len() / 1024);
debug!(target: "thumbnail", "[THUMBNAIL] book={} raw_size={}KB", book_id, raw_bytes.len() / 1024);
let thumb_bytes = generate_thumbnail(&raw_bytes, config)?;
let format = config.format.as_deref().unwrap_or("webp");
@@ -367,6 +369,7 @@ pub async fn analyze_library_books(
}
});
#[derive(Clone)]
struct BookTask {
book_id: Uuid,
abs_path: String,
@@ -386,8 +389,11 @@ pub async fn analyze_library_books(
// -------------------------------------------------------------------------
// Sub-phase A: extract first page from each archive and store raw image
// I/O bound — limited by HDD throughput, runs at `concurrency`
// Processed in batches of 500 to limit memory — raw_bytes are freed between batches.
// The collected results (Uuid, String, i32) are lightweight (~100 bytes each).
// -------------------------------------------------------------------------
const BATCH_SIZE: usize = 200;
let phase_a_start = std::time::Instant::now();
let _ = sqlx::query(
"UPDATE index_jobs SET status = 'extracting_pages', total_files = $2, processed_files = 0, current_file = NULL WHERE id = $1",
@@ -398,9 +404,27 @@ pub async fn analyze_library_books(
.await;
let extracted_count = Arc::new(AtomicI32::new(0));
let mut all_extracted: Vec<(Uuid, String, i32)> = Vec::new();
// Collected results: (book_id, raw_path, page_count) — only books that need thumbnail generation
let extracted: Vec<(Uuid, String, i32)> = stream::iter(tasks)
let num_batches = (tasks.len() + BATCH_SIZE - 1) / BATCH_SIZE;
let task_chunks: Vec<Vec<BookTask>> = tasks
.into_iter()
.collect::<Vec<_>>()
.chunks(BATCH_SIZE)
.map(|c| c.to_vec())
.collect();
for (batch_idx, batch_tasks) in task_chunks.into_iter().enumerate() {
if cancelled_flag.load(Ordering::Relaxed) {
break;
}
info!(
"[ANALYZER] Extraction batch {}/{} — {} books",
batch_idx + 1, num_batches, batch_tasks.len()
);
let batch_extracted: Vec<(Uuid, String, i32)> = stream::iter(batch_tasks)
.map(|task| {
let pool = state.pool.clone();
let config = config.clone();
@@ -449,6 +473,13 @@ pub async fn analyze_library_books(
let pdf_scale = config.width.max(config.height);
let path_owned = path.to_path_buf();
let timeout_secs = config.timeout_secs;
let file_name = path.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| local_path.clone());
debug!(target: "extraction", "[EXTRACTION] Starting: {} ({})", file_name, task.format);
let extract_start = std::time::Instant::now();
let analyze_result = tokio::time::timeout(
std::time::Duration::from_secs(timeout_secs),
tokio::task::spawn_blocking(move || analyze_book(&path_owned, format, pdf_scale)),
@@ -458,7 +489,7 @@ pub async fn analyze_library_books(
let (page_count, raw_bytes) = match analyze_result {
Ok(Ok(Ok(result))) => result,
Ok(Ok(Err(e))) => {
warn!("[ANALYZER] analyze_book failed for book {}: {}", book_id, e);
warn!(target: "extraction", "[EXTRACTION] Failed: {} {}", file_name, e);
let _ = sqlx::query(
"UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1",
)
@@ -469,11 +500,11 @@ pub async fn analyze_library_books(
return None;
}
Ok(Err(e)) => {
warn!("[ANALYZER] spawn_blocking error for book {}: {}", book_id, e);
warn!(target: "extraction", "[EXTRACTION] spawn error: {} {}", file_name, e);
return None;
}
Err(_) => {
warn!("[ANALYZER] analyze_book timed out after {}s for book {}: {}", timeout_secs, book_id, local_path);
warn!(target: "extraction", "[EXTRACTION] Timeout ({}s): {}", timeout_secs, file_name);
let _ = sqlx::query(
"UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1",
)
@@ -485,15 +516,24 @@ pub async fn analyze_library_books(
}
};
let extract_elapsed = extract_start.elapsed();
debug!(
target: "extraction",
"[EXTRACTION] Done: {} — {} pages, image={}KB in {:.0}ms",
file_name, page_count, raw_bytes.len() / 1024,
extract_elapsed.as_secs_f64() * 1000.0,
);
// If thumbnail already exists, just update page_count and skip thumbnail generation
if !needs_thumbnail {
debug!(target: "extraction", "[EXTRACTION] Page count only: {} — {} pages", file_name, page_count);
if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2")
.bind(page_count)
.bind(book_id)
.execute(&pool)
.await
{
warn!("[ANALYZER] DB page_count update failed for book {}: {}", book_id, e);
warn!(target: "extraction", "[EXTRACTION] DB page_count update failed for {}: {}", file_name, e);
}
let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1;
let percent = (processed as f64 / total as f64 * 50.0) as i32;
@@ -505,14 +545,21 @@ pub async fn analyze_library_books(
.bind(percent)
.execute(&pool)
.await;
if processed % 25 == 0 || processed == total {
info!(
target: "extraction",
"[EXTRACTION] Progress: {}/{} books extracted ({}%)",
processed, total, percent
);
}
return None; // don't enqueue for thumbnail sub-phase
}
// Save raw bytes to disk (no resize, no encode)
// Save raw bytes to disk (no resize, no encode) — moves raw_bytes, no clone
let raw_path = match tokio::task::spawn_blocking({
let dir = config.directory.clone();
let bytes = raw_bytes.clone();
move || save_raw_image(book_id, &bytes, &dir)
move || save_raw_image(book_id, &raw_bytes, &dir)
})
.await
{
@@ -549,6 +596,14 @@ pub async fn analyze_library_books(
.execute(&pool)
.await;
if processed % 25 == 0 || processed == total {
info!(
target: "extraction",
"[EXTRACTION] Progress: {}/{} books extracted ({}%)",
processed, total, percent
);
}
Some((book_id, raw_path, page_count))
}
})
@@ -557,20 +612,35 @@ pub async fn analyze_library_books(
.collect()
.await;
// Collect lightweight results; raw_bytes already saved to disk and freed
all_extracted.extend(batch_extracted);
// Log RSS to track memory growth between batches
if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
for line in status.lines() {
if line.starts_with("VmRSS:") {
info!("[ANALYZER] Memory after batch {}/{}: {}", batch_idx + 1, num_batches, line.trim());
break;
}
}
}
}
if cancelled_flag.load(Ordering::Relaxed) {
cancel_handle.abort();
info!("[ANALYZER] Job {} cancelled during extraction phase", job_id);
return Err(anyhow::anyhow!("Job cancelled by user"));
}
let extracted_total = extracted.len() as i32;
let extracted_total = all_extracted.len() as i32;
let phase_a_elapsed = phase_a_start.elapsed();
info!(
"[ANALYZER] Sub-phase A complete: {}/{} books extracted in {:.1}s ({:.0} ms/book)",
"[ANALYZER] Sub-phase A complete: {}/{} books extracted in {:.1}s ({:.0} ms/book, {} batches)",
extracted_total,
total,
phase_a_elapsed.as_secs_f64(),
if extracted_total > 0 { phase_a_elapsed.as_millis() as f64 / extracted_total as f64 } else { 0.0 }
if extracted_total > 0 { phase_a_elapsed.as_millis() as f64 / extracted_total as f64 } else { 0.0 },
num_batches,
);
// -------------------------------------------------------------------------
@@ -588,7 +658,7 @@ pub async fn analyze_library_books(
let resize_count = Arc::new(AtomicI32::new(0));
stream::iter(extracted)
stream::iter(all_extracted)
.for_each_concurrent(concurrency, |(book_id, raw_path, page_count)| {
let pool = state.pool.clone();
let config = config.clone();
@@ -643,6 +713,14 @@ pub async fn analyze_library_books(
.bind(percent)
.execute(&pool)
.await;
if processed % 25 == 0 || processed == extracted_total {
info!(
target: "thumbnail",
"[THUMBNAIL] Progress: {}/{} thumbnails generated ({}%)",
processed, extracted_total, percent
);
}
}
})
.await;

View File

@@ -1,5 +1,4 @@
use anyhow::Result;
use rayon::prelude::*;
use sqlx::{PgPool, Row};
use tracing::{error, info};
use uuid::Uuid;
@@ -270,10 +269,12 @@ pub async fn process_job(
crate::utils::remap_libraries_path(&library.get::<String, _>("root_path"))
})
.collect();
// Count sequentially with limited open fds to avoid ENFILE exhaustion
library_paths
.par_iter()
.iter()
.map(|root_path| {
walkdir::WalkDir::new(root_path)
.max_open(20)
.into_iter()
.filter_map(Result::ok)
.filter(|entry| {

View File

@@ -4,11 +4,23 @@ use sqlx::postgres::PgPoolOptions;
use stripstream_core::config::IndexerConfig;
use tracing::info;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
fn main() -> anyhow::Result<()> {
// Limit blocking thread pool to 8 threads (default 512).
// Each spawn_blocking call (archive extraction, image save) gets a thread.
// With thousands of books, unlimited threads cause OOM via stack memory (~8MB each).
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.max_blocking_threads(8)
.build()?;
runtime.block_on(async_main())
}
async fn async_main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
std::env::var("RUST_LOG").unwrap_or_else(|_| "indexer=info,axum=info".to_string()),
std::env::var("RUST_LOG").unwrap_or_else(|_| {
"indexer=info,axum=info,scan=info,extraction=info,thumbnail=warn,watcher=info".to_string()
}),
)
.init();

View File

@@ -4,7 +4,7 @@ use parsers::{detect_format, parse_metadata_fast};
use serde::Serialize;
use sqlx::Row;
use std::{collections::HashMap, path::Path, time::Duration};
use tracing::{error, info, trace, warn};
use tracing::{debug, error, info, trace, warn};
use uuid::Uuid;
use walkdir::WalkDir;
@@ -124,7 +124,37 @@ pub async fn scan_library_discovery(
// Files under these prefixes are added to `seen` but not reprocessed.
let mut skipped_dir_prefixes: Vec<String> = Vec::new();
for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) {
// Track consecutive IO errors to detect fd exhaustion (ENFILE)
let mut consecutive_io_errors: usize = 0;
const MAX_CONSECUTIVE_IO_ERRORS: usize = 10;
for result in WalkDir::new(root).max_open(20).into_iter() {
let entry = match result {
Ok(e) => {
consecutive_io_errors = 0;
e
}
Err(e) => {
consecutive_io_errors += 1;
let is_enfile = e
.io_error()
.map(|io| io.raw_os_error() == Some(23) || io.raw_os_error() == Some(24))
.unwrap_or(false);
if is_enfile || consecutive_io_errors >= MAX_CONSECUTIVE_IO_ERRORS {
error!(
"[SCAN] Too many IO errors ({} consecutive) scanning library {} — \
fd limit likely exhausted. Aborting scan for this library.",
consecutive_io_errors, library_id
);
stats.warnings += 1;
break;
}
warn!("[SCAN] walkdir error: {}", e);
stats.warnings += 1;
continue;
}
};
let path = entry.path().to_path_buf();
let local_path = path.to_string_lossy().to_string();
@@ -192,7 +222,8 @@ pub async fn scan_library_discovery(
continue;
};
info!(
debug!(
target: "scan",
"[SCAN] Found book file: {} (format: {:?})",
path.display(),
format
@@ -209,6 +240,17 @@ pub async fn scan_library_discovery(
let metadata = match std::fs::metadata(&path) {
Ok(m) => m,
Err(e) => {
let is_enfile = e.raw_os_error() == Some(23) || e.raw_os_error() == Some(24);
if is_enfile {
consecutive_io_errors += 1;
}
if consecutive_io_errors >= MAX_CONSECUTIVE_IO_ERRORS {
error!(
"[SCAN] fd limit exhausted while stat'ing files in library {}. Aborting.",
library_id
);
break;
}
warn!("[SCAN] cannot stat {}, skipping: {}", path.display(), e);
stats.warnings += 1;
continue;
@@ -278,8 +320,9 @@ pub async fn scan_library_discovery(
continue;
}
info!(
"[PROCESS] Updating existing file: {} (fingerprint_changed={})",
debug!(
target: "scan",
"[SCAN] Updating: {} (fingerprint_changed={})",
file_name,
old_fingerprint != fingerprint
);
@@ -335,7 +378,7 @@ pub async fn scan_library_discovery(
}
// New file — insert with page_count = NULL (analyzer fills it in)
info!("[PROCESS] Inserting new file: {}", file_name);
debug!(target: "scan", "[SCAN] Inserting: {}", file_name);
let book_id = Uuid::new_v4();
let file_id = Uuid::new_v4();
@@ -401,7 +444,28 @@ pub async fn scan_library_discovery(
library_id, library_processed_count, stats.indexed_files, stats.errors
);
// Handle deletions
// Handle deletions — with safety check against volume mount failures
let existing_count = existing.len();
let seen_count = seen.len();
let stale_count = existing.iter().filter(|(p, _)| !seen.contains_key(p.as_str())).count();
// Safety: if the library root is not accessible, or if we found zero files
// but the DB had many, the volume is probably not mounted correctly.
// Do NOT delete anything in that case.
let root_accessible = root.is_dir() && std::fs::read_dir(root).is_ok();
let skip_deletions = !root_accessible
|| (seen_count == 0 && existing_count > 0)
|| (stale_count > 0 && stale_count == existing_count);
if skip_deletions && stale_count > 0 {
warn!(
"[SCAN] Skipping deletion of {} stale files for library {} — \
root accessible={}, seen={}, existing={}. \
Volume may not be mounted correctly.",
stale_count, library_id, root_accessible, seen_count, existing_count
);
stats.warnings += stale_count;
} else {
let mut removed_count = 0usize;
for (abs_path, (file_id, book_id, _)) in &existing {
if seen.contains_key(abs_path) {
@@ -427,6 +491,7 @@ pub async fn scan_library_discovery(
removed_count
);
}
}
// Upsert directory mtimes for next incremental scan
if !new_dir_mtimes.is_empty() {

View File

@@ -3,7 +3,7 @@ use sqlx::Row;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::time::Duration;
use tracing::{error, info, trace, warn};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use walkdir::WalkDir;
@@ -29,6 +29,7 @@ fn snapshot_library(root_path: &str) -> LibrarySnapshot {
let mut files = HashSet::new();
let walker = WalkDir::new(root_path)
.follow_links(true)
.max_open(10)
.into_iter()
.filter_map(|e| e.ok());
@@ -54,7 +55,7 @@ pub async fn run_file_watcher(state: AppState) -> Result<()> {
// Skip if any job is active — avoid competing for file descriptors
if has_active_jobs(&pool).await {
trace!("[WATCHER] Skipping poll — job active");
debug!(target: "watcher", "[WATCHER] Skipping poll — job active");
continue;
}
@@ -113,7 +114,7 @@ pub async fn run_file_watcher(state: AppState) -> Result<()> {
// Re-check between libraries in case a job was created
if has_active_jobs(&pool).await {
trace!("[WATCHER] Job became active during poll, stopping");
debug!(target: "watcher", "[WATCHER] Job became active during poll, stopping");
break;
}
@@ -126,7 +127,8 @@ pub async fn run_file_watcher(state: AppState) -> Result<()> {
Some(old_snapshot) => *old_snapshot != new_snapshot,
None => {
// First scan — store baseline, don't trigger a job
trace!(
debug!(
target: "watcher",
"[WATCHER] Initial snapshot for library {}: {} files",
library_id,
new_snapshot.len()
@@ -168,7 +170,7 @@ pub async fn run_file_watcher(state: AppState) -> Result<()> {
Err(err) => error!("[WATCHER] Failed to create job: {}", err),
}
} else {
trace!("[WATCHER] Job already active for library {}, skipping", library_id);
debug!(target: "watcher", "[WATCHER] Job already active for library {}, skipping", library_id);
}
}

View File

@@ -14,3 +14,4 @@ regex = "1"
unrar.workspace = true
zip = { version = "8", default-features = false, features = ["deflate"] }
flate2 = "1"
tracing.workspace = true

View File

@@ -166,13 +166,30 @@ fn analyze_cbz(path: &Path, allow_fallback: bool) -> Result<(i32, Vec<u8>)> {
Ok(a) => a,
Err(zip_err) => {
if allow_fallback {
// Try RAR fallback first (file might be a RAR with .cbz extension)
tracing::debug!(target: "extraction", "[EXTRACTION] ZipArchive::new failed for {}: {} — trying fallbacks", path.display(), zip_err);
// Check magic bytes to avoid expensive RAR probe on ZIP files
let is_zip_magic = std::fs::File::open(path)
.and_then(|mut f| {
let mut magic = [0u8; 4];
std::io::Read::read_exact(&mut f, &mut magic)?;
Ok(magic[0] == b'P' && magic[1] == b'K')
})
.unwrap_or(false);
if !is_zip_magic {
// Try RAR fallback (file might be a RAR with .cbz extension)
if let Ok(result) = analyze_cbr(path, false) {
tracing::debug!(target: "extraction", "[EXTRACTION] RAR fallback succeeded for {}", path.display());
return Ok(result);
}
}
// Try streaming fallback: read local file headers without central directory
// (handles ZIP files with NTFS extra fields that confuse the central dir parser)
let t0 = std::time::Instant::now();
if let Ok(result) = analyze_cbz_streaming(path) {
tracing::debug!(target: "extraction", "[EXTRACTION] Streaming fallback succeeded for {} — {} pages in {:.0}ms", path.display(), result.0, t0.elapsed().as_secs_f64() * 1000.0);
return Ok(result);
}
}
@@ -180,17 +197,11 @@ fn analyze_cbz(path: &Path, allow_fallback: bool) -> Result<(i32, Vec<u8>)> {
}
};
let mut image_names: Vec<String> = Vec::new();
for i in 0..archive.len() {
let entry = match archive.by_index(i) {
Ok(e) => e,
Err(_) => continue, // skip corrupted entries
};
let name = entry.name().to_ascii_lowercase();
if is_image_name(&name) {
image_names.push(entry.name().to_string());
}
}
let mut image_names: Vec<String> = archive
.file_names()
.filter(|name| is_image_name(&name.to_ascii_lowercase()))
.map(|name| name.to_string())
.collect::<Vec<_>>();
image_names.sort_by(|a, b| natord::compare(a, b));
if image_names.is_empty() {

View File

@@ -4,29 +4,75 @@ set -e
# Docker Hub
REGISTRY="docker.io"
OWNER="julienfroidefond32"
VERSION=$(grep '^version = ' Cargo.toml | head -1 | sed 's/version = "\(.*\)"/\1/')
SERVICES=("api" "indexer" "backoffice")
# ─── Version bump ───────────────────────────────────────────────────────────
CURRENT_VERSION=$(grep '^version = ' Cargo.toml | head -1 | sed 's/version = "\(.*\)"/\1/')
IFS='.' read -r MAJOR MINOR PATCH <<< "$CURRENT_VERSION"
echo "=== Stripstream Librarian Docker Push ==="
echo "Current version: $CURRENT_VERSION"
echo ""
echo "Bump version?"
echo " 1) patch → $MAJOR.$MINOR.$((PATCH + 1))"
echo " 2) minor → $MAJOR.$((MINOR + 1)).0"
echo " 3) major → $((MAJOR + 1)).0.0"
echo " 4) skip → keep $CURRENT_VERSION"
echo ""
read -rp "Choice [1-4]: " BUMP_CHOICE
case "$BUMP_CHOICE" in
1) NEW_VERSION="$MAJOR.$MINOR.$((PATCH + 1))" ;;
2) NEW_VERSION="$MAJOR.$((MINOR + 1)).0" ;;
3) NEW_VERSION="$((MAJOR + 1)).0.0" ;;
4) NEW_VERSION="$CURRENT_VERSION" ;;
*) echo "Invalid choice"; exit 1 ;;
esac
if [ "$NEW_VERSION" != "$CURRENT_VERSION" ]; then
echo "Bumping version: $CURRENT_VERSION$NEW_VERSION"
# Update Cargo.toml (workspace version)
sed -i.bak "s/^version = \"$CURRENT_VERSION\"/version = \"$NEW_VERSION\"/" Cargo.toml
rm -f Cargo.toml.bak
# Update backoffice package.json
sed -i.bak "s/\"version\": \"$CURRENT_VERSION\"/\"version\": \"$NEW_VERSION\"/" apps/backoffice/package.json
rm -f apps/backoffice/package.json.bak
# Update Cargo.lock
cargo update --workspace 2>/dev/null || true
# Commit version bump
git add Cargo.toml Cargo.lock apps/backoffice/package.json
git commit -m "chore: bump version to $NEW_VERSION"
echo "✓ Version bumped and committed"
else
echo "Keeping version $CURRENT_VERSION"
fi
VERSION="$NEW_VERSION"
echo ""
echo "Registry: $REGISTRY"
echo "Version: $VERSION"
echo "Services: ${SERVICES[@]}"
echo "Services: ${SERVICES[*]}"
echo ""
for service in "${SERVICES[@]}"; do
echo "Building $service..."
docker build -f apps/$service/Dockerfile -t $service:latest .
docker build -f "apps/$service/Dockerfile" -t "$service:latest" .
echo "Tagging $service..."
docker tag $service:latest $REGISTRY/$OWNER/stripstream-$service:$VERSION
docker tag $service:latest $REGISTRY/$OWNER/stripstream-$service:latest
docker tag "$service:latest" "$REGISTRY/$OWNER/stripstream-$service:$VERSION"
docker tag "$service:latest" "$REGISTRY/$OWNER/stripstream-$service:latest"
echo "Pushing stripstream-$service:$VERSION..."
docker push $REGISTRY/$OWNER/stripstream-$service:$VERSION
docker push "$REGISTRY/$OWNER/stripstream-$service:$VERSION"
echo "Pushing stripstream-$service:latest..."
docker push $REGISTRY/$OWNER/stripstream-$service:latest
docker push "$REGISTRY/$OWNER/stripstream-$service:latest"
echo "$service pushed successfully"
echo ""
@@ -38,3 +84,10 @@ for service in "${SERVICES[@]}"; do
echo " - $REGISTRY/$OWNER/stripstream-$service:$VERSION"
echo " - $REGISTRY/$OWNER/stripstream-$service:latest"
done
# Tag git with version
if ! git tag -l "v$VERSION" | grep -q "v$VERSION"; then
git tag "v$VERSION"
echo ""
echo "Git tag v$VERSION created. Push with: git push origin v$VERSION"
fi