Compare commits
11 Commits
6947af10fe
...
fd277602c9
| Author | SHA1 | Date | |
|---|---|---|---|
| fd277602c9 | |||
| 673777bc8d | |||
| 03af82d065 | |||
| 78e28a269d | |||
| ee05df26c4 | |||
| 96d9efdeed | |||
| 9f5183848b | |||
| 6f9dd108ef | |||
| 61bc307715 | |||
| c7f3ad981d | |||
| 0d60d46cae |
18
.env.example
18
.env.example
@@ -34,6 +34,24 @@ MEILI_URL=http://meilisearch:7700
|
||||
# PostgreSQL Database
|
||||
DATABASE_URL=postgres://stripstream:stripstream@postgres:5432/stripstream
|
||||
|
||||
# =============================================================================
|
||||
# Logging
|
||||
# =============================================================================
|
||||
# Log levels per domain. Default: indexer=info,scan=info,extraction=info,thumbnail=warn,watcher=info
|
||||
# Domains:
|
||||
# scan — filesystem scan (discovery phase)
|
||||
# extraction — page extraction from archives (extracting_pages phase)
|
||||
# thumbnail — thumbnail generation (resize/encode)
|
||||
# watcher — file watcher polling
|
||||
# indexer — general indexer logs
|
||||
# Levels: error, warn, info, debug, trace
|
||||
# Examples:
|
||||
# RUST_LOG=indexer=info # default, quiet thumbnails
|
||||
# RUST_LOG=indexer=info,thumbnail=debug # enable thumbnail timing logs
|
||||
# RUST_LOG=indexer=info,extraction=debug # per-book extraction details
|
||||
# RUST_LOG=indexer=debug,scan=debug,extraction=debug,thumbnail=debug,watcher=debug # tout voir
|
||||
# RUST_LOG=indexer=info,scan=info,extraction=info,thumbnail=warn,watcher=info
|
||||
|
||||
# =============================================================================
|
||||
# Storage Configuration
|
||||
# =============================================================================
|
||||
|
||||
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -51,7 +51,7 @@ checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c"
|
||||
|
||||
[[package]]
|
||||
name = "api"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"argon2",
|
||||
@@ -1122,7 +1122,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "indexer"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
@@ -1132,7 +1132,6 @@ dependencies = [
|
||||
"jpeg-decoder",
|
||||
"num_cpus",
|
||||
"parsers",
|
||||
"rayon",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -1625,7 +1624,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "parsers"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"flate2",
|
||||
@@ -1634,6 +1633,7 @@ dependencies = [
|
||||
"natord",
|
||||
"pdfium-render",
|
||||
"regex",
|
||||
"tracing",
|
||||
"unrar",
|
||||
"zip 8.2.0",
|
||||
]
|
||||
@@ -2626,7 +2626,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "stripstream-core"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"serde",
|
||||
|
||||
@@ -9,7 +9,7 @@ resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
edition = "2021"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
license = "MIT"
|
||||
|
||||
[workspace.dependencies]
|
||||
|
||||
@@ -471,6 +471,175 @@ pub async fn list_series(
|
||||
}))
|
||||
}
|
||||
|
||||
#[derive(Deserialize, ToSchema)]
|
||||
pub struct OngoingQuery {
|
||||
#[schema(value_type = Option<i64>, example = 10)]
|
||||
pub limit: Option<i64>,
|
||||
}
|
||||
|
||||
/// List ongoing series (partially read, sorted by most recent activity)
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/series/ongoing",
|
||||
tag = "books",
|
||||
params(
|
||||
("limit" = Option<i64>, Query, description = "Max items to return (default 10, max 50)"),
|
||||
),
|
||||
responses(
|
||||
(status = 200, body = Vec<SeriesItem>),
|
||||
(status = 401, description = "Unauthorized"),
|
||||
),
|
||||
security(("Bearer" = []))
|
||||
)]
|
||||
pub async fn ongoing_series(
|
||||
State(state): State<AppState>,
|
||||
Query(query): Query<OngoingQuery>,
|
||||
) -> Result<Json<Vec<SeriesItem>>, ApiError> {
|
||||
let limit = query.limit.unwrap_or(10).clamp(1, 50);
|
||||
|
||||
let rows = sqlx::query(
|
||||
r#"
|
||||
WITH series_stats AS (
|
||||
SELECT
|
||||
COALESCE(NULLIF(b.series, ''), 'unclassified') AS name,
|
||||
COUNT(*) AS book_count,
|
||||
COUNT(brp.book_id) FILTER (WHERE brp.status = 'read') AS books_read_count,
|
||||
MAX(brp.last_read_at) AS last_read_at
|
||||
FROM books b
|
||||
LEFT JOIN book_reading_progress brp ON brp.book_id = b.id
|
||||
GROUP BY COALESCE(NULLIF(b.series, ''), 'unclassified')
|
||||
HAVING (
|
||||
COUNT(brp.book_id) FILTER (WHERE brp.status IN ('read', 'reading')) > 0
|
||||
AND COUNT(brp.book_id) FILTER (WHERE brp.status = 'read') < COUNT(*)
|
||||
)
|
||||
),
|
||||
first_books AS (
|
||||
SELECT
|
||||
COALESCE(NULLIF(series, ''), 'unclassified') AS name,
|
||||
id,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY COALESCE(NULLIF(series, ''), 'unclassified')
|
||||
ORDER BY
|
||||
REGEXP_REPLACE(LOWER(title), '[0-9]+', '', 'g'),
|
||||
COALESCE((REGEXP_MATCH(LOWER(title), '\d+'))[1]::int, 0),
|
||||
title ASC
|
||||
) AS rn
|
||||
FROM books
|
||||
)
|
||||
SELECT ss.name, ss.book_count, ss.books_read_count, fb.id AS first_book_id
|
||||
FROM series_stats ss
|
||||
JOIN first_books fb ON fb.name = ss.name AND fb.rn = 1
|
||||
ORDER BY ss.last_read_at DESC NULLS LAST
|
||||
LIMIT $1
|
||||
"#,
|
||||
)
|
||||
.bind(limit)
|
||||
.fetch_all(&state.pool)
|
||||
.await?;
|
||||
|
||||
let items: Vec<SeriesItem> = rows
|
||||
.iter()
|
||||
.map(|row| SeriesItem {
|
||||
name: row.get("name"),
|
||||
book_count: row.get("book_count"),
|
||||
books_read_count: row.get("books_read_count"),
|
||||
first_book_id: row.get("first_book_id"),
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Json(items))
|
||||
}
|
||||
|
||||
/// List next unread book for each ongoing series (sorted by most recent activity)
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/books/ongoing",
|
||||
tag = "books",
|
||||
params(
|
||||
("limit" = Option<i64>, Query, description = "Max items to return (default 10, max 50)"),
|
||||
),
|
||||
responses(
|
||||
(status = 200, body = Vec<BookItem>),
|
||||
(status = 401, description = "Unauthorized"),
|
||||
),
|
||||
security(("Bearer" = []))
|
||||
)]
|
||||
pub async fn ongoing_books(
|
||||
State(state): State<AppState>,
|
||||
Query(query): Query<OngoingQuery>,
|
||||
) -> Result<Json<Vec<BookItem>>, ApiError> {
|
||||
let limit = query.limit.unwrap_or(10).clamp(1, 50);
|
||||
|
||||
let rows = sqlx::query(
|
||||
r#"
|
||||
WITH ongoing_series AS (
|
||||
SELECT
|
||||
COALESCE(NULLIF(b.series, ''), 'unclassified') AS name,
|
||||
MAX(brp.last_read_at) AS series_last_read_at
|
||||
FROM books b
|
||||
LEFT JOIN book_reading_progress brp ON brp.book_id = b.id
|
||||
GROUP BY COALESCE(NULLIF(b.series, ''), 'unclassified')
|
||||
HAVING (
|
||||
COUNT(brp.book_id) FILTER (WHERE brp.status IN ('read', 'reading')) > 0
|
||||
AND COUNT(brp.book_id) FILTER (WHERE brp.status = 'read') < COUNT(*)
|
||||
)
|
||||
),
|
||||
next_books AS (
|
||||
SELECT
|
||||
b.id, b.library_id, b.kind, b.format, b.title, b.author, b.series, b.volume,
|
||||
b.language, b.page_count, b.thumbnail_path, b.updated_at,
|
||||
COALESCE(brp.status, 'unread') AS reading_status,
|
||||
brp.current_page AS reading_current_page,
|
||||
brp.last_read_at AS reading_last_read_at,
|
||||
os.series_last_read_at,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY COALESCE(NULLIF(b.series, ''), 'unclassified')
|
||||
ORDER BY b.volume NULLS LAST, b.title
|
||||
) AS rn
|
||||
FROM books b
|
||||
JOIN ongoing_series os ON COALESCE(NULLIF(b.series, ''), 'unclassified') = os.name
|
||||
LEFT JOIN book_reading_progress brp ON brp.book_id = b.id
|
||||
WHERE COALESCE(brp.status, 'unread') != 'read'
|
||||
)
|
||||
SELECT id, library_id, kind, format, title, author, series, volume, language, page_count,
|
||||
thumbnail_path, updated_at, reading_status, reading_current_page, reading_last_read_at
|
||||
FROM next_books
|
||||
WHERE rn = 1
|
||||
ORDER BY series_last_read_at DESC NULLS LAST
|
||||
LIMIT $1
|
||||
"#,
|
||||
)
|
||||
.bind(limit)
|
||||
.fetch_all(&state.pool)
|
||||
.await?;
|
||||
|
||||
let items: Vec<BookItem> = rows
|
||||
.iter()
|
||||
.map(|row| {
|
||||
let thumbnail_path: Option<String> = row.get("thumbnail_path");
|
||||
BookItem {
|
||||
id: row.get("id"),
|
||||
library_id: row.get("library_id"),
|
||||
kind: row.get("kind"),
|
||||
format: row.get("format"),
|
||||
title: row.get("title"),
|
||||
author: row.get("author"),
|
||||
series: row.get("series"),
|
||||
volume: row.get("volume"),
|
||||
language: row.get("language"),
|
||||
page_count: row.get("page_count"),
|
||||
thumbnail_url: thumbnail_path.map(|_| format!("/books/{}/thumbnail", row.get::<Uuid, _>("id"))),
|
||||
updated_at: row.get("updated_at"),
|
||||
reading_status: row.get("reading_status"),
|
||||
reading_current_page: row.get("reading_current_page"),
|
||||
reading_last_read_at: row.get("reading_last_read_at"),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Json(items))
|
||||
}
|
||||
|
||||
fn remap_libraries_path(path: &str) -> String {
|
||||
if let Ok(root) = std::env::var("LIBRARIES_ROOT_PATH") {
|
||||
if path.starts_with("/libraries/") {
|
||||
|
||||
@@ -96,6 +96,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
.route("/folders", get(index_jobs::list_folders))
|
||||
.route("/admin/tokens", get(tokens::list_tokens).post(tokens::create_token))
|
||||
.route("/admin/tokens/:id", delete(tokens::revoke_token))
|
||||
.route("/admin/tokens/:id/delete", axum::routing::post(tokens::delete_token))
|
||||
.merge(settings::settings_routes())
|
||||
.route_layer(middleware::from_fn_with_state(
|
||||
state.clone(),
|
||||
@@ -104,11 +105,13 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let read_routes = Router::new()
|
||||
.route("/books", get(books::list_books))
|
||||
.route("/books/ongoing", get(books::ongoing_books))
|
||||
.route("/books/:id", get(books::get_book))
|
||||
.route("/books/:id/thumbnail", get(books::get_thumbnail))
|
||||
.route("/books/:id/pages/:n", get(pages::get_page))
|
||||
.route("/books/:id/progress", get(reading_progress::get_reading_progress).patch(reading_progress::update_reading_progress))
|
||||
.route("/libraries/:library_id/series", get(books::list_series))
|
||||
.route("/series/ongoing", get(books::ongoing_series))
|
||||
.route("/search", get(search::search_books))
|
||||
.route_layer(middleware::from_fn_with_state(state.clone(), api_middleware::read_rate_limit))
|
||||
.route_layer(middleware::from_fn_with_state(
|
||||
|
||||
@@ -10,6 +10,8 @@ use utoipa::OpenApi;
|
||||
crate::reading_progress::update_reading_progress,
|
||||
crate::books::get_thumbnail,
|
||||
crate::books::list_series,
|
||||
crate::books::ongoing_series,
|
||||
crate::books::ongoing_books,
|
||||
crate::books::convert_book,
|
||||
crate::pages::get_page,
|
||||
crate::search::search_books,
|
||||
@@ -31,6 +33,7 @@ use utoipa::OpenApi;
|
||||
crate::tokens::list_tokens,
|
||||
crate::tokens::create_token,
|
||||
crate::tokens::revoke_token,
|
||||
crate::tokens::delete_token,
|
||||
crate::settings::get_settings,
|
||||
crate::settings::get_setting,
|
||||
crate::settings::update_setting,
|
||||
@@ -48,6 +51,7 @@ use utoipa::OpenApi;
|
||||
crate::reading_progress::UpdateReadingProgressRequest,
|
||||
crate::books::SeriesItem,
|
||||
crate::books::SeriesPage,
|
||||
crate::books::OngoingQuery,
|
||||
crate::pages::PageQuery,
|
||||
crate::search::SearchQuery,
|
||||
crate::search::SearchResponse,
|
||||
|
||||
@@ -170,3 +170,35 @@ pub async fn revoke_token(
|
||||
|
||||
Ok(Json(serde_json::json!({"revoked": true, "id": id})))
|
||||
}
|
||||
|
||||
/// Permanently delete a revoked API token
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/admin/tokens/{id}/delete",
|
||||
tag = "tokens",
|
||||
params(
|
||||
("id" = String, Path, description = "Token UUID"),
|
||||
),
|
||||
responses(
|
||||
(status = 200, description = "Token permanently deleted"),
|
||||
(status = 404, description = "Token not found or not revoked"),
|
||||
(status = 401, description = "Unauthorized"),
|
||||
(status = 403, description = "Forbidden - Admin scope required"),
|
||||
),
|
||||
security(("Bearer" = []))
|
||||
)]
|
||||
pub async fn delete_token(
|
||||
State(state): State<AppState>,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> Result<Json<serde_json::Value>, ApiError> {
|
||||
let result = sqlx::query("DELETE FROM api_tokens WHERE id = $1 AND revoked_at IS NOT NULL")
|
||||
.bind(id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
if result.rows_affected() == 0 {
|
||||
return Err(ApiError::not_found("token not found or not revoked"));
|
||||
}
|
||||
|
||||
Ok(Json(serde_json::json!({"deleted": true, "id": id})))
|
||||
}
|
||||
|
||||
@@ -68,6 +68,17 @@ export function MobileNav({ navItems }: { navItems: NavItem[] }) {
|
||||
<span className="font-medium">{item.label}</span>
|
||||
</Link>
|
||||
))}
|
||||
|
||||
<div className="border-t border-border/40 mt-2 pt-2">
|
||||
<Link
|
||||
href="/settings"
|
||||
className="flex items-center gap-3 px-3 py-3 rounded-lg text-muted-foreground hover:text-foreground hover:bg-accent transition-colors duration-200 active:scale-[0.98]"
|
||||
onClick={() => setIsOpen(false)}
|
||||
>
|
||||
<NavIcon name="settings" />
|
||||
<span className="font-medium">Settings</span>
|
||||
</Link>
|
||||
</div>
|
||||
</nav>
|
||||
</div>
|
||||
</>
|
||||
|
||||
@@ -52,7 +52,7 @@ export default function RootLayout({ children }: { children: ReactNode }) {
|
||||
<span className="text-xl font-bold tracking-tight text-foreground">
|
||||
StripStream
|
||||
</span>
|
||||
<span className="text-sm text-muted-foreground font-medium">
|
||||
<span className="text-sm text-muted-foreground font-medium hidden md:inline">
|
||||
backoffice
|
||||
</span>
|
||||
</div>
|
||||
@@ -74,7 +74,7 @@ export default function RootLayout({ children }: { children: ReactNode }) {
|
||||
<JobsIndicator />
|
||||
<Link
|
||||
href="/settings"
|
||||
className="p-2 rounded-lg text-muted-foreground hover:text-foreground hover:bg-accent transition-colors"
|
||||
className="hidden md:flex p-2 rounded-lg text-muted-foreground hover:text-foreground hover:bg-accent transition-colors"
|
||||
title="Settings"
|
||||
>
|
||||
<Icon name="settings" size="md" />
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { revalidatePath } from "next/cache";
|
||||
import { redirect } from "next/navigation";
|
||||
import { listTokens, createToken, revokeToken, TokenDto } from "../../lib/api";
|
||||
import { listTokens, createToken, revokeToken, deleteToken, TokenDto } from "../../lib/api";
|
||||
import { Card, CardHeader, CardTitle, CardDescription, CardContent, Button, Badge, FormField, FormInput, FormSelect, FormRow } from "../components/ui";
|
||||
|
||||
export const dynamic = "force-dynamic";
|
||||
@@ -31,6 +31,13 @@ export default async function TokensPage({
|
||||
revalidatePath("/tokens");
|
||||
}
|
||||
|
||||
async function deleteTokenAction(formData: FormData) {
|
||||
"use server";
|
||||
const id = formData.get("id") as string;
|
||||
await deleteToken(id);
|
||||
revalidatePath("/tokens");
|
||||
}
|
||||
|
||||
return (
|
||||
<>
|
||||
<div className="mb-6">
|
||||
@@ -109,7 +116,7 @@ export default async function TokensPage({
|
||||
)}
|
||||
</td>
|
||||
<td className="px-4 py-3">
|
||||
{!token.revoked_at && (
|
||||
{!token.revoked_at ? (
|
||||
<form action={revokeTokenAction}>
|
||||
<input type="hidden" name="id" value={token.id} />
|
||||
<Button type="submit" variant="destructive" size="sm">
|
||||
@@ -119,6 +126,16 @@ export default async function TokensPage({
|
||||
Revoke
|
||||
</Button>
|
||||
</form>
|
||||
) : (
|
||||
<form action={deleteTokenAction}>
|
||||
<input type="hidden" name="id" value={token.id} />
|
||||
<Button type="submit" variant="destructive" size="sm">
|
||||
<svg className="w-4 h-4 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
|
||||
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth={2} d="M19 7l-.867 12.142A2 2 0 0116.138 21H7.862a2 2 0 01-1.995-1.858L5 7m5 4v6m4-6v6m1-10V4a1 1 0 00-1-1h-4a1 1 0 00-1 1v3M4 7h16" />
|
||||
</svg>
|
||||
Delete
|
||||
</Button>
|
||||
</form>
|
||||
)}
|
||||
</td>
|
||||
</tr>
|
||||
|
||||
@@ -254,6 +254,10 @@ export async function revokeToken(id: string) {
|
||||
return apiFetch<void>(`/admin/tokens/${id}`, { method: "DELETE" });
|
||||
}
|
||||
|
||||
export async function deleteToken(id: string) {
|
||||
return apiFetch<void>(`/admin/tokens/${id}/delete`, { method: "POST" });
|
||||
}
|
||||
|
||||
export async function fetchBooks(
|
||||
libraryId?: string,
|
||||
series?: string,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "stripstream-backoffice",
|
||||
"version": "0.1.0",
|
||||
"version": "0.2.0",
|
||||
"private": true,
|
||||
"scripts": {
|
||||
"dev": "next dev -p 7082",
|
||||
|
||||
@@ -15,7 +15,6 @@ image.workspace = true
|
||||
jpeg-decoder.workspace = true
|
||||
num_cpus.workspace = true
|
||||
parsers = { path = "../../crates/parsers" }
|
||||
rayon.workspace = true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -6,7 +6,7 @@ use sqlx::Row;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tracing::{info, warn};
|
||||
use tracing::{debug, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{job::is_job_cancelled, utils, AppState};
|
||||
@@ -89,7 +89,7 @@ async fn load_thumbnail_concurrency(pool: &sqlx::PgPool) -> usize {
|
||||
// Default: half the logical CPUs, clamped between 2 and 8.
|
||||
// Archive extraction is I/O bound but benefits from moderate parallelism.
|
||||
let cpus = num_cpus::get();
|
||||
let default_concurrency = (cpus / 2).clamp(2, 8);
|
||||
let default_concurrency = (cpus / 2).clamp(1, 2);
|
||||
let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#)
|
||||
.fetch_optional(pool)
|
||||
.await;
|
||||
@@ -179,7 +179,8 @@ fn generate_thumbnail(image_bytes: &[u8], config: &ThumbnailConfig) -> anyhow::R
|
||||
let t_resize = t1.elapsed();
|
||||
|
||||
let format = config.format.as_deref().unwrap_or("webp");
|
||||
info!(
|
||||
debug!(
|
||||
target: "thumbnail",
|
||||
"[THUMBNAIL] {}x{} -> {}x{} decode={:.0}ms resize={:.0}ms encode_format={}",
|
||||
orig_w, orig_h, w, h,
|
||||
t_decode.as_secs_f64() * 1000.0,
|
||||
@@ -237,7 +238,8 @@ fn generate_thumbnail(image_bytes: &[u8], config: &ThumbnailConfig) -> anyhow::R
|
||||
}
|
||||
};
|
||||
let t_encode = t2.elapsed();
|
||||
info!(
|
||||
debug!(
|
||||
target: "thumbnail",
|
||||
"[THUMBNAIL] encode={:.0}ms total={:.0}ms output_size={}KB",
|
||||
t_encode.as_secs_f64() * 1000.0,
|
||||
t0.elapsed().as_secs_f64() * 1000.0,
|
||||
@@ -263,7 +265,7 @@ fn resize_raw_to_thumbnail(
|
||||
) -> anyhow::Result<String> {
|
||||
let raw_bytes = std::fs::read(raw_path)
|
||||
.map_err(|e| anyhow::anyhow!("failed to read raw image {}: {}", raw_path, e))?;
|
||||
info!("[THUMBNAIL] book={} raw_size={}KB", book_id, raw_bytes.len() / 1024);
|
||||
debug!(target: "thumbnail", "[THUMBNAIL] book={} raw_size={}KB", book_id, raw_bytes.len() / 1024);
|
||||
let thumb_bytes = generate_thumbnail(&raw_bytes, config)?;
|
||||
|
||||
let format = config.format.as_deref().unwrap_or("webp");
|
||||
@@ -367,6 +369,7 @@ pub async fn analyze_library_books(
|
||||
}
|
||||
});
|
||||
|
||||
#[derive(Clone)]
|
||||
struct BookTask {
|
||||
book_id: Uuid,
|
||||
abs_path: String,
|
||||
@@ -386,8 +389,11 @@ pub async fn analyze_library_books(
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Sub-phase A: extract first page from each archive and store raw image
|
||||
// I/O bound — limited by HDD throughput, runs at `concurrency`
|
||||
// Processed in batches of 500 to limit memory — raw_bytes are freed between batches.
|
||||
// The collected results (Uuid, String, i32) are lightweight (~100 bytes each).
|
||||
// -------------------------------------------------------------------------
|
||||
const BATCH_SIZE: usize = 200;
|
||||
|
||||
let phase_a_start = std::time::Instant::now();
|
||||
let _ = sqlx::query(
|
||||
"UPDATE index_jobs SET status = 'extracting_pages', total_files = $2, processed_files = 0, current_file = NULL WHERE id = $1",
|
||||
@@ -398,95 +404,177 @@ pub async fn analyze_library_books(
|
||||
.await;
|
||||
|
||||
let extracted_count = Arc::new(AtomicI32::new(0));
|
||||
let mut all_extracted: Vec<(Uuid, String, i32)> = Vec::new();
|
||||
|
||||
// Collected results: (book_id, raw_path, page_count) — only books that need thumbnail generation
|
||||
let extracted: Vec<(Uuid, String, i32)> = stream::iter(tasks)
|
||||
.map(|task| {
|
||||
let pool = state.pool.clone();
|
||||
let config = config.clone();
|
||||
let cancelled = cancelled_flag.clone();
|
||||
let extracted_count = extracted_count.clone();
|
||||
let num_batches = (tasks.len() + BATCH_SIZE - 1) / BATCH_SIZE;
|
||||
let task_chunks: Vec<Vec<BookTask>> = tasks
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>()
|
||||
.chunks(BATCH_SIZE)
|
||||
.map(|c| c.to_vec())
|
||||
.collect();
|
||||
|
||||
async move {
|
||||
if cancelled.load(Ordering::Relaxed) {
|
||||
return None;
|
||||
}
|
||||
for (batch_idx, batch_tasks) in task_chunks.into_iter().enumerate() {
|
||||
if cancelled_flag.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
let local_path = utils::remap_libraries_path(&task.abs_path);
|
||||
let path = std::path::Path::new(&local_path);
|
||||
let book_id = task.book_id;
|
||||
let needs_thumbnail = task.needs_thumbnail;
|
||||
info!(
|
||||
"[ANALYZER] Extraction batch {}/{} — {} books",
|
||||
batch_idx + 1, num_batches, batch_tasks.len()
|
||||
);
|
||||
|
||||
// Remove macOS Apple Double resource fork files (._*) that were indexed before the scanner filter was added
|
||||
if path
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.map(|n| n.starts_with("._"))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
warn!("[ANALYZER] Removing macOS resource fork from DB: {}", local_path);
|
||||
let _ = sqlx::query("DELETE FROM book_files WHERE book_id = $1")
|
||||
let batch_extracted: Vec<(Uuid, String, i32)> = stream::iter(batch_tasks)
|
||||
.map(|task| {
|
||||
let pool = state.pool.clone();
|
||||
let config = config.clone();
|
||||
let cancelled = cancelled_flag.clone();
|
||||
let extracted_count = extracted_count.clone();
|
||||
|
||||
async move {
|
||||
if cancelled.load(Ordering::Relaxed) {
|
||||
return None;
|
||||
}
|
||||
|
||||
let local_path = utils::remap_libraries_path(&task.abs_path);
|
||||
let path = std::path::Path::new(&local_path);
|
||||
let book_id = task.book_id;
|
||||
let needs_thumbnail = task.needs_thumbnail;
|
||||
|
||||
// Remove macOS Apple Double resource fork files (._*) that were indexed before the scanner filter was added
|
||||
if path
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.map(|n| n.starts_with("._"))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
warn!("[ANALYZER] Removing macOS resource fork from DB: {}", local_path);
|
||||
let _ = sqlx::query("DELETE FROM book_files WHERE book_id = $1")
|
||||
.bind(book_id)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
let _ = sqlx::query(
|
||||
"DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)",
|
||||
)
|
||||
.bind(book_id)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
let _ = sqlx::query(
|
||||
"DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)",
|
||||
return None;
|
||||
}
|
||||
|
||||
let format = match book_format_from_str(&task.format) {
|
||||
Some(f) => f,
|
||||
None => {
|
||||
warn!("[ANALYZER] Unknown format '{}' for book {}", task.format, book_id);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let pdf_scale = config.width.max(config.height);
|
||||
let path_owned = path.to_path_buf();
|
||||
let timeout_secs = config.timeout_secs;
|
||||
let file_name = path.file_name()
|
||||
.map(|n| n.to_string_lossy().to_string())
|
||||
.unwrap_or_else(|| local_path.clone());
|
||||
|
||||
debug!(target: "extraction", "[EXTRACTION] Starting: {} ({})", file_name, task.format);
|
||||
let extract_start = std::time::Instant::now();
|
||||
|
||||
let analyze_result = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(timeout_secs),
|
||||
tokio::task::spawn_blocking(move || analyze_book(&path_owned, format, pdf_scale)),
|
||||
)
|
||||
.bind(book_id)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
|
||||
let format = match book_format_from_str(&task.format) {
|
||||
Some(f) => f,
|
||||
None => {
|
||||
warn!("[ANALYZER] Unknown format '{}' for book {}", task.format, book_id);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let (page_count, raw_bytes) = match analyze_result {
|
||||
Ok(Ok(Ok(result))) => result,
|
||||
Ok(Ok(Err(e))) => {
|
||||
warn!(target: "extraction", "[EXTRACTION] Failed: {} — {}", file_name, e);
|
||||
let _ = sqlx::query(
|
||||
"UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1",
|
||||
)
|
||||
.bind(book_id)
|
||||
.bind(e.to_string())
|
||||
.execute(&pool)
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
warn!(target: "extraction", "[EXTRACTION] spawn error: {} — {}", file_name, e);
|
||||
return None;
|
||||
}
|
||||
Err(_) => {
|
||||
warn!(target: "extraction", "[EXTRACTION] Timeout ({}s): {}", timeout_secs, file_name);
|
||||
let _ = sqlx::query(
|
||||
"UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1",
|
||||
)
|
||||
.bind(book_id)
|
||||
.bind(format!("analyze_book timed out after {}s", timeout_secs))
|
||||
.execute(&pool)
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let pdf_scale = config.width.max(config.height);
|
||||
let path_owned = path.to_path_buf();
|
||||
let timeout_secs = config.timeout_secs;
|
||||
let analyze_result = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(timeout_secs),
|
||||
tokio::task::spawn_blocking(move || analyze_book(&path_owned, format, pdf_scale)),
|
||||
)
|
||||
.await;
|
||||
let extract_elapsed = extract_start.elapsed();
|
||||
debug!(
|
||||
target: "extraction",
|
||||
"[EXTRACTION] Done: {} — {} pages, image={}KB in {:.0}ms",
|
||||
file_name, page_count, raw_bytes.len() / 1024,
|
||||
extract_elapsed.as_secs_f64() * 1000.0,
|
||||
);
|
||||
|
||||
let (page_count, raw_bytes) = match analyze_result {
|
||||
Ok(Ok(Ok(result))) => result,
|
||||
Ok(Ok(Err(e))) => {
|
||||
warn!("[ANALYZER] analyze_book failed for book {}: {}", book_id, e);
|
||||
// If thumbnail already exists, just update page_count and skip thumbnail generation
|
||||
if !needs_thumbnail {
|
||||
debug!(target: "extraction", "[EXTRACTION] Page count only: {} — {} pages", file_name, page_count);
|
||||
if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2")
|
||||
.bind(page_count)
|
||||
.bind(book_id)
|
||||
.execute(&pool)
|
||||
.await
|
||||
{
|
||||
warn!(target: "extraction", "[EXTRACTION] DB page_count update failed for {}: {}", file_name, e);
|
||||
}
|
||||
let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let percent = (processed as f64 / total as f64 * 50.0) as i32;
|
||||
let _ = sqlx::query(
|
||||
"UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1",
|
||||
"UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1",
|
||||
)
|
||||
.bind(book_id)
|
||||
.bind(e.to_string())
|
||||
.bind(job_id)
|
||||
.bind(processed)
|
||||
.bind(percent)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
warn!("[ANALYZER] spawn_blocking error for book {}: {}", book_id, e);
|
||||
return None;
|
||||
}
|
||||
Err(_) => {
|
||||
warn!("[ANALYZER] analyze_book timed out after {}s for book {}: {}", timeout_secs, book_id, local_path);
|
||||
let _ = sqlx::query(
|
||||
"UPDATE book_files SET parse_status = 'error', parse_error_opt = $2 WHERE book_id = $1",
|
||||
)
|
||||
.bind(book_id)
|
||||
.bind(format!("analyze_book timed out after {}s", timeout_secs))
|
||||
.execute(&pool)
|
||||
.await;
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
// If thumbnail already exists, just update page_count and skip thumbnail generation
|
||||
if !needs_thumbnail {
|
||||
if processed % 25 == 0 || processed == total {
|
||||
info!(
|
||||
target: "extraction",
|
||||
"[EXTRACTION] Progress: {}/{} books extracted ({}%)",
|
||||
processed, total, percent
|
||||
);
|
||||
}
|
||||
return None; // don't enqueue for thumbnail sub-phase
|
||||
}
|
||||
|
||||
// Save raw bytes to disk (no resize, no encode) — moves raw_bytes, no clone
|
||||
let raw_path = match tokio::task::spawn_blocking({
|
||||
let dir = config.directory.clone();
|
||||
move || save_raw_image(book_id, &raw_bytes, &dir)
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Ok(p)) => p,
|
||||
Ok(Err(e)) => {
|
||||
warn!("[ANALYZER] save_raw_image failed for book {}: {}", book_id, e);
|
||||
return None;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("[ANALYZER] spawn_blocking save_raw error for book {}: {}", book_id, e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
// Update page_count in DB
|
||||
if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2")
|
||||
.bind(page_count)
|
||||
.bind(book_id)
|
||||
@@ -494,9 +582,11 @@ pub async fn analyze_library_books(
|
||||
.await
|
||||
{
|
||||
warn!("[ANALYZER] DB page_count update failed for book {}: {}", book_id, e);
|
||||
return None;
|
||||
}
|
||||
|
||||
let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let percent = (processed as f64 / total as f64 * 50.0) as i32;
|
||||
let percent = (processed as f64 / total as f64 * 50.0) as i32; // first 50%
|
||||
let _ = sqlx::query(
|
||||
"UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1",
|
||||
)
|
||||
@@ -505,57 +595,36 @@ pub async fn analyze_library_books(
|
||||
.bind(percent)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
return None; // don't enqueue for thumbnail sub-phase
|
||||
}
|
||||
|
||||
// Save raw bytes to disk (no resize, no encode)
|
||||
let raw_path = match tokio::task::spawn_blocking({
|
||||
let dir = config.directory.clone();
|
||||
let bytes = raw_bytes.clone();
|
||||
move || save_raw_image(book_id, &bytes, &dir)
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Ok(p)) => p,
|
||||
Ok(Err(e)) => {
|
||||
warn!("[ANALYZER] save_raw_image failed for book {}: {}", book_id, e);
|
||||
return None;
|
||||
if processed % 25 == 0 || processed == total {
|
||||
info!(
|
||||
target: "extraction",
|
||||
"[EXTRACTION] Progress: {}/{} books extracted ({}%)",
|
||||
processed, total, percent
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("[ANALYZER] spawn_blocking save_raw error for book {}: {}", book_id, e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
// Update page_count in DB
|
||||
if let Err(e) = sqlx::query("UPDATE books SET page_count = $1 WHERE id = $2")
|
||||
.bind(page_count)
|
||||
.bind(book_id)
|
||||
.execute(&pool)
|
||||
.await
|
||||
{
|
||||
warn!("[ANALYZER] DB page_count update failed for book {}: {}", book_id, e);
|
||||
return None;
|
||||
Some((book_id, raw_path, page_count))
|
||||
}
|
||||
})
|
||||
.buffer_unordered(concurrency)
|
||||
.filter_map(|x| async move { x })
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
let processed = extracted_count.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let percent = (processed as f64 / total as f64 * 50.0) as i32; // first 50%
|
||||
let _ = sqlx::query(
|
||||
"UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1",
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(processed)
|
||||
.bind(percent)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
// Collect lightweight results; raw_bytes already saved to disk and freed
|
||||
all_extracted.extend(batch_extracted);
|
||||
|
||||
Some((book_id, raw_path, page_count))
|
||||
// Log RSS to track memory growth between batches
|
||||
if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
|
||||
for line in status.lines() {
|
||||
if line.starts_with("VmRSS:") {
|
||||
info!("[ANALYZER] Memory after batch {}/{}: {}", batch_idx + 1, num_batches, line.trim());
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
.buffer_unordered(concurrency)
|
||||
.filter_map(|x| async move { x })
|
||||
.collect()
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
if cancelled_flag.load(Ordering::Relaxed) {
|
||||
cancel_handle.abort();
|
||||
@@ -563,14 +632,15 @@ pub async fn analyze_library_books(
|
||||
return Err(anyhow::anyhow!("Job cancelled by user"));
|
||||
}
|
||||
|
||||
let extracted_total = extracted.len() as i32;
|
||||
let extracted_total = all_extracted.len() as i32;
|
||||
let phase_a_elapsed = phase_a_start.elapsed();
|
||||
info!(
|
||||
"[ANALYZER] Sub-phase A complete: {}/{} books extracted in {:.1}s ({:.0} ms/book)",
|
||||
"[ANALYZER] Sub-phase A complete: {}/{} books extracted in {:.1}s ({:.0} ms/book, {} batches)",
|
||||
extracted_total,
|
||||
total,
|
||||
phase_a_elapsed.as_secs_f64(),
|
||||
if extracted_total > 0 { phase_a_elapsed.as_millis() as f64 / extracted_total as f64 } else { 0.0 }
|
||||
if extracted_total > 0 { phase_a_elapsed.as_millis() as f64 / extracted_total as f64 } else { 0.0 },
|
||||
num_batches,
|
||||
);
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -588,7 +658,7 @@ pub async fn analyze_library_books(
|
||||
|
||||
let resize_count = Arc::new(AtomicI32::new(0));
|
||||
|
||||
stream::iter(extracted)
|
||||
stream::iter(all_extracted)
|
||||
.for_each_concurrent(concurrency, |(book_id, raw_path, page_count)| {
|
||||
let pool = state.pool.clone();
|
||||
let config = config.clone();
|
||||
@@ -643,6 +713,14 @@ pub async fn analyze_library_books(
|
||||
.bind(percent)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
|
||||
if processed % 25 == 0 || processed == extracted_total {
|
||||
info!(
|
||||
target: "thumbnail",
|
||||
"[THUMBNAIL] Progress: {}/{} thumbnails generated ({}%)",
|
||||
processed, extracted_total, percent
|
||||
);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use anyhow::Result;
|
||||
use rayon::prelude::*;
|
||||
use sqlx::{PgPool, Row};
|
||||
use tracing::{error, info};
|
||||
use uuid::Uuid;
|
||||
@@ -270,10 +269,12 @@ pub async fn process_job(
|
||||
crate::utils::remap_libraries_path(&library.get::<String, _>("root_path"))
|
||||
})
|
||||
.collect();
|
||||
// Count sequentially with limited open fds to avoid ENFILE exhaustion
|
||||
library_paths
|
||||
.par_iter()
|
||||
.iter()
|
||||
.map(|root_path| {
|
||||
walkdir::WalkDir::new(root_path)
|
||||
.max_open(20)
|
||||
.into_iter()
|
||||
.filter_map(Result::ok)
|
||||
.filter(|entry| {
|
||||
|
||||
@@ -4,11 +4,23 @@ use sqlx::postgres::PgPoolOptions;
|
||||
use stripstream_core::config::IndexerConfig;
|
||||
use tracing::info;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
fn main() -> anyhow::Result<()> {
|
||||
// Limit blocking thread pool to 8 threads (default 512).
|
||||
// Each spawn_blocking call (archive extraction, image save) gets a thread.
|
||||
// With thousands of books, unlimited threads cause OOM via stack memory (~8MB each).
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.max_blocking_threads(8)
|
||||
.build()?;
|
||||
runtime.block_on(async_main())
|
||||
}
|
||||
|
||||
async fn async_main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
std::env::var("RUST_LOG").unwrap_or_else(|_| "indexer=info,axum=info".to_string()),
|
||||
std::env::var("RUST_LOG").unwrap_or_else(|_| {
|
||||
"indexer=info,axum=info,scan=info,extraction=info,thumbnail=warn,watcher=info".to_string()
|
||||
}),
|
||||
)
|
||||
.init();
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ use parsers::{detect_format, parse_metadata_fast};
|
||||
use serde::Serialize;
|
||||
use sqlx::Row;
|
||||
use std::{collections::HashMap, path::Path, time::Duration};
|
||||
use tracing::{error, info, trace, warn};
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use uuid::Uuid;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
@@ -124,7 +124,37 @@ pub async fn scan_library_discovery(
|
||||
// Files under these prefixes are added to `seen` but not reprocessed.
|
||||
let mut skipped_dir_prefixes: Vec<String> = Vec::new();
|
||||
|
||||
for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) {
|
||||
// Track consecutive IO errors to detect fd exhaustion (ENFILE)
|
||||
let mut consecutive_io_errors: usize = 0;
|
||||
const MAX_CONSECUTIVE_IO_ERRORS: usize = 10;
|
||||
|
||||
for result in WalkDir::new(root).max_open(20).into_iter() {
|
||||
let entry = match result {
|
||||
Ok(e) => {
|
||||
consecutive_io_errors = 0;
|
||||
e
|
||||
}
|
||||
Err(e) => {
|
||||
consecutive_io_errors += 1;
|
||||
let is_enfile = e
|
||||
.io_error()
|
||||
.map(|io| io.raw_os_error() == Some(23) || io.raw_os_error() == Some(24))
|
||||
.unwrap_or(false);
|
||||
if is_enfile || consecutive_io_errors >= MAX_CONSECUTIVE_IO_ERRORS {
|
||||
error!(
|
||||
"[SCAN] Too many IO errors ({} consecutive) scanning library {} — \
|
||||
fd limit likely exhausted. Aborting scan for this library.",
|
||||
consecutive_io_errors, library_id
|
||||
);
|
||||
stats.warnings += 1;
|
||||
break;
|
||||
}
|
||||
warn!("[SCAN] walkdir error: {}", e);
|
||||
stats.warnings += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let path = entry.path().to_path_buf();
|
||||
let local_path = path.to_string_lossy().to_string();
|
||||
|
||||
@@ -192,7 +222,8 @@ pub async fn scan_library_discovery(
|
||||
continue;
|
||||
};
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
target: "scan",
|
||||
"[SCAN] Found book file: {} (format: {:?})",
|
||||
path.display(),
|
||||
format
|
||||
@@ -209,6 +240,17 @@ pub async fn scan_library_discovery(
|
||||
let metadata = match std::fs::metadata(&path) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
let is_enfile = e.raw_os_error() == Some(23) || e.raw_os_error() == Some(24);
|
||||
if is_enfile {
|
||||
consecutive_io_errors += 1;
|
||||
}
|
||||
if consecutive_io_errors >= MAX_CONSECUTIVE_IO_ERRORS {
|
||||
error!(
|
||||
"[SCAN] fd limit exhausted while stat'ing files in library {}. Aborting.",
|
||||
library_id
|
||||
);
|
||||
break;
|
||||
}
|
||||
warn!("[SCAN] cannot stat {}, skipping: {}", path.display(), e);
|
||||
stats.warnings += 1;
|
||||
continue;
|
||||
@@ -278,8 +320,9 @@ pub async fn scan_library_discovery(
|
||||
continue;
|
||||
}
|
||||
|
||||
info!(
|
||||
"[PROCESS] Updating existing file: {} (fingerprint_changed={})",
|
||||
debug!(
|
||||
target: "scan",
|
||||
"[SCAN] Updating: {} (fingerprint_changed={})",
|
||||
file_name,
|
||||
old_fingerprint != fingerprint
|
||||
);
|
||||
@@ -335,7 +378,7 @@ pub async fn scan_library_discovery(
|
||||
}
|
||||
|
||||
// New file — insert with page_count = NULL (analyzer fills it in)
|
||||
info!("[PROCESS] Inserting new file: {}", file_name);
|
||||
debug!(target: "scan", "[SCAN] Inserting: {}", file_name);
|
||||
let book_id = Uuid::new_v4();
|
||||
let file_id = Uuid::new_v4();
|
||||
|
||||
@@ -401,31 +444,53 @@ pub async fn scan_library_discovery(
|
||||
library_id, library_processed_count, stats.indexed_files, stats.errors
|
||||
);
|
||||
|
||||
// Handle deletions
|
||||
let mut removed_count = 0usize;
|
||||
for (abs_path, (file_id, book_id, _)) in &existing {
|
||||
if seen.contains_key(abs_path) {
|
||||
continue;
|
||||
}
|
||||
sqlx::query("DELETE FROM book_files WHERE id = $1")
|
||||
.bind(file_id)
|
||||
// Handle deletions — with safety check against volume mount failures
|
||||
let existing_count = existing.len();
|
||||
let seen_count = seen.len();
|
||||
let stale_count = existing.iter().filter(|(p, _)| !seen.contains_key(p.as_str())).count();
|
||||
|
||||
// Safety: if the library root is not accessible, or if we found zero files
|
||||
// but the DB had many, the volume is probably not mounted correctly.
|
||||
// Do NOT delete anything in that case.
|
||||
let root_accessible = root.is_dir() && std::fs::read_dir(root).is_ok();
|
||||
let skip_deletions = !root_accessible
|
||||
|| (seen_count == 0 && existing_count > 0)
|
||||
|| (stale_count > 0 && stale_count == existing_count);
|
||||
|
||||
if skip_deletions && stale_count > 0 {
|
||||
warn!(
|
||||
"[SCAN] Skipping deletion of {} stale files for library {} — \
|
||||
root accessible={}, seen={}, existing={}. \
|
||||
Volume may not be mounted correctly.",
|
||||
stale_count, library_id, root_accessible, seen_count, existing_count
|
||||
);
|
||||
stats.warnings += stale_count;
|
||||
} else {
|
||||
let mut removed_count = 0usize;
|
||||
for (abs_path, (file_id, book_id, _)) in &existing {
|
||||
if seen.contains_key(abs_path) {
|
||||
continue;
|
||||
}
|
||||
sqlx::query("DELETE FROM book_files WHERE id = $1")
|
||||
.bind(file_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
sqlx::query(
|
||||
"DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)",
|
||||
)
|
||||
.bind(book_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
sqlx::query(
|
||||
"DELETE FROM books WHERE id = $1 AND NOT EXISTS (SELECT 1 FROM book_files WHERE book_id = $1)",
|
||||
)
|
||||
.bind(book_id)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
stats.removed_files += 1;
|
||||
removed_count += 1;
|
||||
}
|
||||
stats.removed_files += 1;
|
||||
removed_count += 1;
|
||||
}
|
||||
|
||||
if removed_count > 0 {
|
||||
info!(
|
||||
"[SCAN] Removed {} stale files from database",
|
||||
removed_count
|
||||
);
|
||||
if removed_count > 0 {
|
||||
info!(
|
||||
"[SCAN] Removed {} stale files from database",
|
||||
removed_count
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Upsert directory mtimes for next incremental scan
|
||||
|
||||
@@ -3,7 +3,7 @@ use sqlx::Row;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use tracing::{error, info, trace, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use uuid::Uuid;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
@@ -29,6 +29,7 @@ fn snapshot_library(root_path: &str) -> LibrarySnapshot {
|
||||
let mut files = HashSet::new();
|
||||
let walker = WalkDir::new(root_path)
|
||||
.follow_links(true)
|
||||
.max_open(10)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok());
|
||||
|
||||
@@ -54,7 +55,7 @@ pub async fn run_file_watcher(state: AppState) -> Result<()> {
|
||||
|
||||
// Skip if any job is active — avoid competing for file descriptors
|
||||
if has_active_jobs(&pool).await {
|
||||
trace!("[WATCHER] Skipping poll — job active");
|
||||
debug!(target: "watcher", "[WATCHER] Skipping poll — job active");
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -113,7 +114,7 @@ pub async fn run_file_watcher(state: AppState) -> Result<()> {
|
||||
|
||||
// Re-check between libraries in case a job was created
|
||||
if has_active_jobs(&pool).await {
|
||||
trace!("[WATCHER] Job became active during poll, stopping");
|
||||
debug!(target: "watcher", "[WATCHER] Job became active during poll, stopping");
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -126,7 +127,8 @@ pub async fn run_file_watcher(state: AppState) -> Result<()> {
|
||||
Some(old_snapshot) => *old_snapshot != new_snapshot,
|
||||
None => {
|
||||
// First scan — store baseline, don't trigger a job
|
||||
trace!(
|
||||
debug!(
|
||||
target: "watcher",
|
||||
"[WATCHER] Initial snapshot for library {}: {} files",
|
||||
library_id,
|
||||
new_snapshot.len()
|
||||
@@ -168,7 +170,7 @@ pub async fn run_file_watcher(state: AppState) -> Result<()> {
|
||||
Err(err) => error!("[WATCHER] Failed to create job: {}", err),
|
||||
}
|
||||
} else {
|
||||
trace!("[WATCHER] Job already active for library {}, skipping", library_id);
|
||||
debug!(target: "watcher", "[WATCHER] Job already active for library {}, skipping", library_id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,3 +14,4 @@ regex = "1"
|
||||
unrar.workspace = true
|
||||
zip = { version = "8", default-features = false, features = ["deflate"] }
|
||||
flate2 = "1"
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -166,13 +166,30 @@ fn analyze_cbz(path: &Path, allow_fallback: bool) -> Result<(i32, Vec<u8>)> {
|
||||
Ok(a) => a,
|
||||
Err(zip_err) => {
|
||||
if allow_fallback {
|
||||
// Try RAR fallback first (file might be a RAR with .cbz extension)
|
||||
if let Ok(result) = analyze_cbr(path, false) {
|
||||
return Ok(result);
|
||||
tracing::debug!(target: "extraction", "[EXTRACTION] ZipArchive::new failed for {}: {} — trying fallbacks", path.display(), zip_err);
|
||||
|
||||
// Check magic bytes to avoid expensive RAR probe on ZIP files
|
||||
let is_zip_magic = std::fs::File::open(path)
|
||||
.and_then(|mut f| {
|
||||
let mut magic = [0u8; 4];
|
||||
std::io::Read::read_exact(&mut f, &mut magic)?;
|
||||
Ok(magic[0] == b'P' && magic[1] == b'K')
|
||||
})
|
||||
.unwrap_or(false);
|
||||
|
||||
if !is_zip_magic {
|
||||
// Try RAR fallback (file might be a RAR with .cbz extension)
|
||||
if let Ok(result) = analyze_cbr(path, false) {
|
||||
tracing::debug!(target: "extraction", "[EXTRACTION] RAR fallback succeeded for {}", path.display());
|
||||
return Ok(result);
|
||||
}
|
||||
}
|
||||
|
||||
// Try streaming fallback: read local file headers without central directory
|
||||
// (handles ZIP files with NTFS extra fields that confuse the central dir parser)
|
||||
let t0 = std::time::Instant::now();
|
||||
if let Ok(result) = analyze_cbz_streaming(path) {
|
||||
tracing::debug!(target: "extraction", "[EXTRACTION] Streaming fallback succeeded for {} — {} pages in {:.0}ms", path.display(), result.0, t0.elapsed().as_secs_f64() * 1000.0);
|
||||
return Ok(result);
|
||||
}
|
||||
}
|
||||
@@ -180,17 +197,11 @@ fn analyze_cbz(path: &Path, allow_fallback: bool) -> Result<(i32, Vec<u8>)> {
|
||||
}
|
||||
};
|
||||
|
||||
let mut image_names: Vec<String> = Vec::new();
|
||||
for i in 0..archive.len() {
|
||||
let entry = match archive.by_index(i) {
|
||||
Ok(e) => e,
|
||||
Err(_) => continue, // skip corrupted entries
|
||||
};
|
||||
let name = entry.name().to_ascii_lowercase();
|
||||
if is_image_name(&name) {
|
||||
image_names.push(entry.name().to_string());
|
||||
}
|
||||
}
|
||||
let mut image_names: Vec<String> = archive
|
||||
.file_names()
|
||||
.filter(|name| is_image_name(&name.to_ascii_lowercase()))
|
||||
.map(|name| name.to_string())
|
||||
.collect::<Vec<_>>();
|
||||
image_names.sort_by(|a, b| natord::compare(a, b));
|
||||
|
||||
if image_names.is_empty() {
|
||||
|
||||
@@ -4,29 +4,75 @@ set -e
|
||||
# Docker Hub
|
||||
REGISTRY="docker.io"
|
||||
OWNER="julienfroidefond32"
|
||||
VERSION=$(grep '^version = ' Cargo.toml | head -1 | sed 's/version = "\(.*\)"/\1/')
|
||||
|
||||
SERVICES=("api" "indexer" "backoffice")
|
||||
|
||||
# ─── Version bump ───────────────────────────────────────────────────────────
|
||||
CURRENT_VERSION=$(grep '^version = ' Cargo.toml | head -1 | sed 's/version = "\(.*\)"/\1/')
|
||||
IFS='.' read -r MAJOR MINOR PATCH <<< "$CURRENT_VERSION"
|
||||
|
||||
echo "=== Stripstream Librarian Docker Push ==="
|
||||
echo "Current version: $CURRENT_VERSION"
|
||||
echo ""
|
||||
echo "Bump version?"
|
||||
echo " 1) patch → $MAJOR.$MINOR.$((PATCH + 1))"
|
||||
echo " 2) minor → $MAJOR.$((MINOR + 1)).0"
|
||||
echo " 3) major → $((MAJOR + 1)).0.0"
|
||||
echo " 4) skip → keep $CURRENT_VERSION"
|
||||
echo ""
|
||||
read -rp "Choice [1-4]: " BUMP_CHOICE
|
||||
|
||||
case "$BUMP_CHOICE" in
|
||||
1) NEW_VERSION="$MAJOR.$MINOR.$((PATCH + 1))" ;;
|
||||
2) NEW_VERSION="$MAJOR.$((MINOR + 1)).0" ;;
|
||||
3) NEW_VERSION="$((MAJOR + 1)).0.0" ;;
|
||||
4) NEW_VERSION="$CURRENT_VERSION" ;;
|
||||
*) echo "Invalid choice"; exit 1 ;;
|
||||
esac
|
||||
|
||||
if [ "$NEW_VERSION" != "$CURRENT_VERSION" ]; then
|
||||
echo "Bumping version: $CURRENT_VERSION → $NEW_VERSION"
|
||||
|
||||
# Update Cargo.toml (workspace version)
|
||||
sed -i.bak "s/^version = \"$CURRENT_VERSION\"/version = \"$NEW_VERSION\"/" Cargo.toml
|
||||
rm -f Cargo.toml.bak
|
||||
|
||||
# Update backoffice package.json
|
||||
sed -i.bak "s/\"version\": \"$CURRENT_VERSION\"/\"version\": \"$NEW_VERSION\"/" apps/backoffice/package.json
|
||||
rm -f apps/backoffice/package.json.bak
|
||||
|
||||
# Update Cargo.lock
|
||||
cargo update --workspace 2>/dev/null || true
|
||||
|
||||
# Commit version bump
|
||||
git add Cargo.toml Cargo.lock apps/backoffice/package.json
|
||||
git commit -m "chore: bump version to $NEW_VERSION"
|
||||
echo "✓ Version bumped and committed"
|
||||
else
|
||||
echo "Keeping version $CURRENT_VERSION"
|
||||
fi
|
||||
|
||||
VERSION="$NEW_VERSION"
|
||||
|
||||
echo ""
|
||||
echo "Registry: $REGISTRY"
|
||||
echo "Version: $VERSION"
|
||||
echo "Services: ${SERVICES[@]}"
|
||||
echo "Services: ${SERVICES[*]}"
|
||||
echo ""
|
||||
|
||||
for service in "${SERVICES[@]}"; do
|
||||
echo "Building $service..."
|
||||
docker build -f apps/$service/Dockerfile -t $service:latest .
|
||||
docker build -f "apps/$service/Dockerfile" -t "$service:latest" .
|
||||
|
||||
echo "Tagging $service..."
|
||||
docker tag $service:latest $REGISTRY/$OWNER/stripstream-$service:$VERSION
|
||||
docker tag $service:latest $REGISTRY/$OWNER/stripstream-$service:latest
|
||||
docker tag "$service:latest" "$REGISTRY/$OWNER/stripstream-$service:$VERSION"
|
||||
docker tag "$service:latest" "$REGISTRY/$OWNER/stripstream-$service:latest"
|
||||
|
||||
echo "Pushing stripstream-$service:$VERSION..."
|
||||
docker push $REGISTRY/$OWNER/stripstream-$service:$VERSION
|
||||
docker push "$REGISTRY/$OWNER/stripstream-$service:$VERSION"
|
||||
|
||||
echo "Pushing stripstream-$service:latest..."
|
||||
docker push $REGISTRY/$OWNER/stripstream-$service:latest
|
||||
docker push "$REGISTRY/$OWNER/stripstream-$service:latest"
|
||||
|
||||
echo "✓ $service pushed successfully"
|
||||
echo ""
|
||||
@@ -38,3 +84,10 @@ for service in "${SERVICES[@]}"; do
|
||||
echo " - $REGISTRY/$OWNER/stripstream-$service:$VERSION"
|
||||
echo " - $REGISTRY/$OWNER/stripstream-$service:latest"
|
||||
done
|
||||
|
||||
# Tag git with version
|
||||
if ! git tag -l "v$VERSION" | grep -q "v$VERSION"; then
|
||||
git tag "v$VERSION"
|
||||
echo ""
|
||||
echo "Git tag v$VERSION created. Push with: git push origin v$VERSION"
|
||||
fi
|
||||
|
||||
Reference in New Issue
Block a user