Compare commits
3 Commits
e64848a216
...
539dc77d57
| Author | SHA1 | Date | |
|---|---|---|---|
| 539dc77d57 | |||
| 9c7120c3dc | |||
| b1844a4f01 |
@@ -32,6 +32,8 @@ use stripstream_core::config::ApiConfig;
|
|||||||
use sqlx::postgres::PgPoolOptions;
|
use sqlx::postgres::PgPoolOptions;
|
||||||
use tokio::sync::{Mutex, Semaphore};
|
use tokio::sync::{Mutex, Semaphore};
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
use sqlx::{Pool, Postgres, Row};
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct AppState {
|
struct AppState {
|
||||||
@@ -66,6 +68,25 @@ impl Metrics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn load_concurrent_renders(pool: &Pool<Postgres>) -> usize {
|
||||||
|
let default_concurrency = 8;
|
||||||
|
let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#)
|
||||||
|
.fetch_optional(pool)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match row {
|
||||||
|
Ok(Some(row)) => {
|
||||||
|
let value: Value = row.get("value");
|
||||||
|
value
|
||||||
|
.get("concurrent_renders")
|
||||||
|
.and_then(|v: &Value| v.as_u64())
|
||||||
|
.map(|v| v as usize)
|
||||||
|
.unwrap_or(default_concurrency)
|
||||||
|
}
|
||||||
|
_ => default_concurrency,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
tracing_subscriber::fmt()
|
tracing_subscriber::fmt()
|
||||||
@@ -80,13 +101,17 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.connect(&config.database_url)
|
.connect(&config.database_url)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// Load concurrent_renders from settings, default to 8
|
||||||
|
let concurrent_renders = load_concurrent_renders(&pool).await;
|
||||||
|
info!("Using concurrent_renders limit: {}", concurrent_renders);
|
||||||
|
|
||||||
let state = AppState {
|
let state = AppState {
|
||||||
pool,
|
pool,
|
||||||
bootstrap_token: Arc::from(config.api_bootstrap_token),
|
bootstrap_token: Arc::from(config.api_bootstrap_token),
|
||||||
meili_url: Arc::from(config.meili_url),
|
meili_url: Arc::from(config.meili_url),
|
||||||
meili_master_key: Arc::from(config.meili_master_key),
|
meili_master_key: Arc::from(config.meili_master_key),
|
||||||
page_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(512).expect("non-zero")))),
|
page_cache: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(512).expect("non-zero")))),
|
||||||
page_render_limit: Arc::new(Semaphore::new(8)),
|
page_render_limit: Arc::new(Semaphore::new(concurrent_renders)),
|
||||||
metrics: Arc::new(Metrics::new()),
|
metrics: Arc::new(Metrics::new()),
|
||||||
read_rate_limit: Arc::new(Mutex::new(ReadRateLimit {
|
read_rate_limit: Arc::new(Mutex::new(ReadRateLimit {
|
||||||
window_started_at: Instant::now(),
|
window_started_at: Instant::now(),
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::sync::atomic::{AtomicI32, Ordering};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use axum::{
|
use axum::{
|
||||||
@@ -6,6 +8,7 @@ use axum::{
|
|||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
Json,
|
Json,
|
||||||
};
|
};
|
||||||
|
use futures::stream::{self, StreamExt};
|
||||||
use image::GenericImageView;
|
use image::GenericImageView;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use sqlx::Row;
|
use sqlx::Row;
|
||||||
@@ -24,6 +27,25 @@ struct ThumbnailConfig {
|
|||||||
directory: String,
|
directory: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn load_thumbnail_concurrency(pool: &sqlx::PgPool) -> usize {
|
||||||
|
let default_concurrency = 4;
|
||||||
|
let row = sqlx::query(r#"SELECT value FROM app_settings WHERE key = 'limits'"#)
|
||||||
|
.fetch_optional(pool)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match row {
|
||||||
|
Ok(Some(row)) => {
|
||||||
|
let value: serde_json::Value = row.get("value");
|
||||||
|
value
|
||||||
|
.get("concurrent_renders")
|
||||||
|
.and_then(|v| v.as_u64())
|
||||||
|
.map(|v| v as usize)
|
||||||
|
.unwrap_or(default_concurrency)
|
||||||
|
}
|
||||||
|
_ => default_concurrency,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn load_thumbnail_config(pool: &sqlx::PgPool) -> ThumbnailConfig {
|
async fn load_thumbnail_config(pool: &sqlx::PgPool) -> ThumbnailConfig {
|
||||||
let fallback = ThumbnailConfig {
|
let fallback = ThumbnailConfig {
|
||||||
enabled: true,
|
enabled: true,
|
||||||
@@ -115,8 +137,74 @@ async fn run_checkup(state: AppState, job_id: Uuid) {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Regenerate: clear existing thumbnails in scope so they get regenerated
|
// Regenerate or full_rebuild: clear existing thumbnails in scope so they get regenerated
|
||||||
if job_type == "thumbnail_regenerate" {
|
if job_type == "thumbnail_regenerate" || job_type == "full_rebuild" {
|
||||||
|
let config = load_thumbnail_config(pool).await;
|
||||||
|
|
||||||
|
if job_type == "full_rebuild" {
|
||||||
|
// For full_rebuild: delete orphaned thumbnail files (books were deleted, new ones have new UUIDs)
|
||||||
|
// Get all existing book IDs to keep their thumbnails
|
||||||
|
let existing_book_ids: std::collections::HashSet<Uuid> = sqlx::query_scalar(
|
||||||
|
r#"SELECT id FROM books WHERE (library_id = $1 OR $1 IS NULL)"#,
|
||||||
|
)
|
||||||
|
.bind(library_id)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await
|
||||||
|
.unwrap_or_default()
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Delete thumbnail files that don't correspond to existing books
|
||||||
|
let thumbnail_dir = Path::new(&config.directory);
|
||||||
|
if thumbnail_dir.exists() {
|
||||||
|
let mut deleted_count = 0;
|
||||||
|
if let Ok(entries) = std::fs::read_dir(thumbnail_dir) {
|
||||||
|
for entry in entries.flatten() {
|
||||||
|
if let Some(file_name) = entry.file_name().to_str() {
|
||||||
|
if file_name.ends_with(".webp") {
|
||||||
|
if let Some(book_id_str) = file_name.strip_suffix(".webp") {
|
||||||
|
if let Ok(book_id) = Uuid::parse_str(book_id_str) {
|
||||||
|
if !existing_book_ids.contains(&book_id) {
|
||||||
|
if let Err(e) = std::fs::remove_file(entry.path()) {
|
||||||
|
warn!("Failed to delete orphaned thumbnail {}: {}", entry.path().display(), e);
|
||||||
|
} else {
|
||||||
|
deleted_count += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!("thumbnails full_rebuild: deleted {} orphaned thumbnail files", deleted_count);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// For regenerate: delete thumbnail files for books with thumbnails
|
||||||
|
let book_ids_to_clear: Vec<Uuid> = sqlx::query_scalar(
|
||||||
|
r#"SELECT id FROM books WHERE (library_id = $1 OR $1 IS NULL) AND thumbnail_path IS NOT NULL"#,
|
||||||
|
)
|
||||||
|
.bind(library_id)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let mut deleted_count = 0;
|
||||||
|
for book_id in &book_ids_to_clear {
|
||||||
|
let filename = format!("{}.webp", book_id);
|
||||||
|
let thumbnail_path = Path::new(&config.directory).join(&filename);
|
||||||
|
if thumbnail_path.exists() {
|
||||||
|
if let Err(e) = std::fs::remove_file(&thumbnail_path) {
|
||||||
|
warn!("Failed to delete thumbnail file {}: {}", thumbnail_path.display(), e);
|
||||||
|
} else {
|
||||||
|
deleted_count += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!("thumbnails regenerate: deleted {} thumbnail files", deleted_count);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear thumbnail_path in database
|
||||||
let cleared = sqlx::query(
|
let cleared = sqlx::query(
|
||||||
r#"UPDATE books SET thumbnail_path = NULL WHERE (library_id = $1 OR $1 IS NULL)"#,
|
r#"UPDATE books SET thumbnail_path = NULL WHERE (library_id = $1 OR $1 IS NULL)"#,
|
||||||
)
|
)
|
||||||
@@ -124,7 +212,7 @@ async fn run_checkup(state: AppState, job_id: Uuid) {
|
|||||||
.execute(pool)
|
.execute(pool)
|
||||||
.await;
|
.await;
|
||||||
if let Ok(res) = cleared {
|
if let Ok(res) = cleared {
|
||||||
info!("thumbnails regenerate: cleared {} books", res.rows_affected());
|
info!("thumbnails {}: cleared {} books in database", job_type, res.rows_affected());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -156,38 +244,57 @@ async fn run_checkup(state: AppState, job_id: Uuid) {
|
|||||||
.execute(pool)
|
.execute(pool)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
for (i, &book_id) in book_ids.iter().enumerate() {
|
let concurrency = load_thumbnail_concurrency(pool).await;
|
||||||
match pages::render_book_page_1(&state, book_id, config.width, config.quality).await {
|
let processed_count = Arc::new(AtomicI32::new(0));
|
||||||
Ok(page_bytes) => {
|
let pool_clone = pool.clone();
|
||||||
match generate_thumbnail(&page_bytes, &config) {
|
let job_id_clone = job_id;
|
||||||
Ok(thumb_bytes) => {
|
let config_clone = config.clone();
|
||||||
if let Ok(path) = save_thumbnail(book_id, &thumb_bytes, &config) {
|
let state_clone = state.clone();
|
||||||
if sqlx::query("UPDATE books SET thumbnail_path = $1 WHERE id = $2")
|
|
||||||
.bind(&path)
|
let total_clone = total;
|
||||||
.bind(book_id)
|
stream::iter(book_ids)
|
||||||
.execute(pool)
|
.for_each_concurrent(concurrency, |book_id| {
|
||||||
.await
|
let processed_count = processed_count.clone();
|
||||||
.is_ok()
|
let pool = pool_clone.clone();
|
||||||
{
|
let job_id = job_id_clone;
|
||||||
let processed = (i + 1) as i32;
|
let config = config_clone.clone();
|
||||||
let percent = ((i + 1) as f64 / total as f64 * 100.0) as i32;
|
let state = state_clone.clone();
|
||||||
let _ = sqlx::query(
|
let total = total_clone;
|
||||||
"UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1",
|
|
||||||
)
|
async move {
|
||||||
.bind(job_id)
|
match pages::render_book_page_1(&state, book_id, config.width, config.quality).await {
|
||||||
.bind(processed)
|
Ok(page_bytes) => {
|
||||||
.bind(percent)
|
match generate_thumbnail(&page_bytes, &config) {
|
||||||
.execute(pool)
|
Ok(thumb_bytes) => {
|
||||||
.await;
|
if let Ok(path) = save_thumbnail(book_id, &thumb_bytes, &config) {
|
||||||
|
if sqlx::query("UPDATE books SET thumbnail_path = $1 WHERE id = $2")
|
||||||
|
.bind(&path)
|
||||||
|
.bind(book_id)
|
||||||
|
.execute(&pool)
|
||||||
|
.await
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
let processed = processed_count.fetch_add(1, Ordering::Relaxed) + 1;
|
||||||
|
let percent = (processed as f64 / total as f64 * 100.0) as i32;
|
||||||
|
let _ = sqlx::query(
|
||||||
|
"UPDATE index_jobs SET processed_files = $2, progress_percent = $3 WHERE id = $1",
|
||||||
|
)
|
||||||
|
.bind(job_id)
|
||||||
|
.bind(processed)
|
||||||
|
.bind(percent)
|
||||||
|
.execute(&pool)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Err(e) => warn!("thumbnail generate failed for book {}: {:?}", book_id, e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => warn!("thumbnail generate failed for book {}: {:?}", book_id, e),
|
Err(e) => warn!("render page 1 failed for book {}: {:?}", book_id, e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => warn!("render page 1 failed for book {}: {:?}", book_id, e),
|
})
|
||||||
}
|
.await;
|
||||||
}
|
|
||||||
|
|
||||||
let _ = sqlx::query(
|
let _ = sqlx::query(
|
||||||
"UPDATE index_jobs SET status = 'success', finished_at = NOW(), progress_percent = 100, current_file = NULL WHERE id = $1",
|
"UPDATE index_jobs SET status = 'success', finished_at = NOW(), progress_percent = 100, current_file = NULL WHERE id = $1",
|
||||||
|
|||||||
@@ -247,7 +247,7 @@ export default function SettingsPage({ initialSettings, initialCacheStats, initi
|
|||||||
<Icon name="performance" size="md" />
|
<Icon name="performance" size="md" />
|
||||||
Performance Limits
|
Performance Limits
|
||||||
</CardTitle>
|
</CardTitle>
|
||||||
<CardDescription>Configure API performance and rate limiting</CardDescription>
|
<CardDescription>Configure API performance, rate limiting, and thumbnail generation concurrency</CardDescription>
|
||||||
</CardHeader>
|
</CardHeader>
|
||||||
<CardContent>
|
<CardContent>
|
||||||
<div className="space-y-4">
|
<div className="space-y-4">
|
||||||
@@ -266,6 +266,9 @@ export default function SettingsPage({ initialSettings, initialCacheStats, initi
|
|||||||
}}
|
}}
|
||||||
onBlur={() => handleUpdateSetting("limits", settings.limits)}
|
onBlur={() => handleUpdateSetting("limits", settings.limits)}
|
||||||
/>
|
/>
|
||||||
|
<p className="text-xs text-muted-foreground mt-1">
|
||||||
|
Maximum number of page renders and thumbnail generations running in parallel
|
||||||
|
</p>
|
||||||
</FormField>
|
</FormField>
|
||||||
<FormField className="flex-1">
|
<FormField className="flex-1">
|
||||||
<label className="text-sm font-medium text-muted-foreground mb-1 block">Timeout (seconds)</label>
|
<label className="text-sm font-medium text-muted-foreground mb-1 block">Timeout (seconds)</label>
|
||||||
@@ -299,7 +302,7 @@ export default function SettingsPage({ initialSettings, initialCacheStats, initi
|
|||||||
</FormField>
|
</FormField>
|
||||||
</FormRow>
|
</FormRow>
|
||||||
<p className="text-sm text-muted-foreground">
|
<p className="text-sm text-muted-foreground">
|
||||||
Note: Changes to limits require a server restart to take effect.
|
Note: Changes to limits require a server restart to take effect. The "Concurrent Renders" setting controls both page rendering and thumbnail generation parallelism.
|
||||||
</p>
|
</p>
|
||||||
</div>
|
</div>
|
||||||
</CardContent>
|
</CardContent>
|
||||||
@@ -424,7 +427,7 @@ export default function SettingsPage({ initialSettings, initialCacheStats, initi
|
|||||||
</div>
|
</div>
|
||||||
|
|
||||||
<p className="text-sm text-muted-foreground">
|
<p className="text-sm text-muted-foreground">
|
||||||
Note: Thumbnail settings are used during indexing. Existing thumbnails will not be regenerated automatically.
|
Note: Thumbnail settings are used during indexing. Existing thumbnails will not be regenerated automatically. The concurrency for thumbnail generation is controlled by the "Concurrent Renders" setting in Performance Limits above.
|
||||||
</p>
|
</p>
|
||||||
</div>
|
</div>
|
||||||
</CardContent>
|
</CardContent>
|
||||||
|
|||||||
@@ -3,13 +3,13 @@ use axum::{extract::State, routing::get, Json, Router};
|
|||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use axum::http::StatusCode;
|
use axum::http::StatusCode;
|
||||||
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
|
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
|
||||||
use parsers::{detect_format, parse_metadata, BookFormat};
|
use parsers::{detect_format, parse_metadata, BookFormat, ParsedMetadata};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
use sqlx::{postgres::PgPoolOptions, Row};
|
use sqlx::{postgres::PgPoolOptions, Row};
|
||||||
use std::{collections::HashMap, path::Path, time::Duration};
|
use std::{collections::HashMap, path::Path, time::Duration};
|
||||||
use stripstream_core::config::{IndexerConfig, ThumbnailConfig};
|
use stripstream_core::config::IndexerConfig;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tracing::{error, info, trace, warn};
|
use tracing::{error, info, trace, warn};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@@ -38,7 +38,6 @@ struct AppState {
|
|||||||
pool: sqlx::PgPool,
|
pool: sqlx::PgPool,
|
||||||
meili_url: String,
|
meili_url: String,
|
||||||
meili_master_key: String,
|
meili_master_key: String,
|
||||||
thumbnail_config: ThumbnailConfig,
|
|
||||||
api_base_url: String,
|
api_base_url: String,
|
||||||
api_bootstrap_token: String,
|
api_bootstrap_token: String,
|
||||||
}
|
}
|
||||||
@@ -69,7 +68,6 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
pool,
|
pool,
|
||||||
meili_url: config.meili_url.clone(),
|
meili_url: config.meili_url.clone(),
|
||||||
meili_master_key: config.meili_master_key.clone(),
|
meili_master_key: config.meili_master_key.clone(),
|
||||||
thumbnail_config: config.thumbnail_config.clone(),
|
|
||||||
api_base_url: config.api_base_url.clone(),
|
api_base_url: config.api_base_url.clone(),
|
||||||
api_bootstrap_token: config.api_bootstrap_token.clone(),
|
api_bootstrap_token: config.api_bootstrap_token.clone(),
|
||||||
};
|
};
|
||||||
@@ -878,28 +876,78 @@ async fn scan_library(
|
|||||||
let mut files_to_insert: Vec<FileInsert> = Vec::with_capacity(BATCH_SIZE);
|
let mut files_to_insert: Vec<FileInsert> = Vec::with_capacity(BATCH_SIZE);
|
||||||
let mut errors_to_insert: Vec<ErrorInsert> = Vec::with_capacity(BATCH_SIZE);
|
let mut errors_to_insert: Vec<ErrorInsert> = Vec::with_capacity(BATCH_SIZE);
|
||||||
|
|
||||||
|
// Step 1: Collect all book files first
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct FileInfo {
|
||||||
|
path: std::path::PathBuf,
|
||||||
|
format: BookFormat,
|
||||||
|
abs_path: String,
|
||||||
|
file_name: String,
|
||||||
|
metadata: std::fs::Metadata,
|
||||||
|
mtime: DateTime<Utc>,
|
||||||
|
fingerprint: String,
|
||||||
|
lookup_path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut file_infos: Vec<FileInfo> = Vec::new();
|
||||||
for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) {
|
for entry in WalkDir::new(root).into_iter().filter_map(Result::ok) {
|
||||||
if !entry.file_type().is_file() {
|
if !entry.file_type().is_file() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let path = entry.path();
|
let path = entry.path().to_path_buf();
|
||||||
let Some(format) = detect_format(path) else {
|
let Some(format) = detect_format(&path) else {
|
||||||
trace!("[SCAN] Skipping non-book file: {}", path.display());
|
trace!("[SCAN] Skipping non-book file: {}", path.display());
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("[SCAN] Found book file: {} (format: {:?})", path.display(), format);
|
info!("[SCAN] Found book file: {} (format: {:?})", path.display(), format);
|
||||||
stats.scanned_files += 1;
|
stats.scanned_files += 1;
|
||||||
library_processed_count += 1;
|
|
||||||
*total_processed_count += 1;
|
|
||||||
let abs_path_local = path.to_string_lossy().to_string();
|
let abs_path_local = path.to_string_lossy().to_string();
|
||||||
let abs_path = unmap_libraries_path(&abs_path_local);
|
let abs_path = unmap_libraries_path(&abs_path_local);
|
||||||
let file_name = path.file_name()
|
let file_name = path.file_name()
|
||||||
.map(|s| s.to_string_lossy().to_string())
|
.map(|s| s.to_string_lossy().to_string())
|
||||||
.unwrap_or_else(|| abs_path.clone());
|
.unwrap_or_else(|| abs_path.clone());
|
||||||
|
|
||||||
let start_time = std::time::Instant::now();
|
let metadata = std::fs::metadata(&path)
|
||||||
|
.with_context(|| format!("cannot stat {}", path.display()))?;
|
||||||
|
let mtime: DateTime<Utc> = metadata
|
||||||
|
.modified()
|
||||||
|
.map(DateTime::<Utc>::from)
|
||||||
|
.unwrap_or_else(|_| Utc::now());
|
||||||
|
let fingerprint = compute_fingerprint(&path, metadata.len(), &mtime)?;
|
||||||
|
let lookup_path = remap_libraries_path(&abs_path);
|
||||||
|
|
||||||
|
file_infos.push(FileInfo {
|
||||||
|
path,
|
||||||
|
format,
|
||||||
|
abs_path,
|
||||||
|
file_name,
|
||||||
|
metadata,
|
||||||
|
mtime,
|
||||||
|
fingerprint,
|
||||||
|
lookup_path,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("[SCAN] Collected {} files, starting parallel parsing", file_infos.len());
|
||||||
|
|
||||||
|
// Step 2: Parse metadata in parallel
|
||||||
|
let parsed_results: Vec<(FileInfo, anyhow::Result<ParsedMetadata>)> = file_infos
|
||||||
|
.into_par_iter()
|
||||||
|
.map(|file_info| {
|
||||||
|
let parse_result = parse_metadata(&file_info.path, file_info.format, root);
|
||||||
|
(file_info, parse_result)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
info!("[SCAN] Completed parallel parsing, processing {} results", parsed_results.len());
|
||||||
|
|
||||||
|
// Step 3: Process results sequentially for batch inserts
|
||||||
|
for (file_info, parse_result) in parsed_results {
|
||||||
|
library_processed_count += 1;
|
||||||
|
*total_processed_count += 1;
|
||||||
|
|
||||||
// Update progress in DB every 1 second or every 10 files
|
// Update progress in DB every 1 second or every 10 files
|
||||||
let should_update_progress = last_progress_update.elapsed() > Duration::from_secs(1) || library_processed_count % 10 == 0;
|
let should_update_progress = last_progress_update.elapsed() > Duration::from_secs(1) || library_processed_count % 10 == 0;
|
||||||
@@ -914,7 +962,7 @@ async fn scan_library(
|
|||||||
"UPDATE index_jobs SET current_file = $2, processed_files = $3, progress_percent = $4 WHERE id = $1"
|
"UPDATE index_jobs SET current_file = $2, processed_files = $3, progress_percent = $4 WHERE id = $1"
|
||||||
)
|
)
|
||||||
.bind(job_id)
|
.bind(job_id)
|
||||||
.bind(&file_name)
|
.bind(&file_info.file_name)
|
||||||
.bind(*total_processed_count)
|
.bind(*total_processed_count)
|
||||||
.bind(progress_percent)
|
.bind(progress_percent)
|
||||||
.execute(&state.pool)
|
.execute(&state.pool)
|
||||||
@@ -935,32 +983,23 @@ async fn scan_library(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let seen_key = remap_libraries_path(&abs_path);
|
let seen_key = remap_libraries_path(&file_info.abs_path);
|
||||||
seen.insert(seen_key.clone(), true);
|
seen.insert(seen_key.clone(), true);
|
||||||
|
|
||||||
let metadata = std::fs::metadata(path)
|
if let Some((file_id, book_id, old_fingerprint)) = existing.get(&file_info.lookup_path).cloned() {
|
||||||
.with_context(|| format!("cannot stat {}", path.display()))?;
|
if !is_full_rebuild && old_fingerprint == file_info.fingerprint {
|
||||||
let mtime: DateTime<Utc> = metadata
|
trace!("[PROCESS] Skipping unchanged file: {}", file_info.file_name);
|
||||||
.modified()
|
|
||||||
.map(DateTime::<Utc>::from)
|
|
||||||
.unwrap_or_else(|_| Utc::now());
|
|
||||||
let fingerprint = compute_fingerprint(path, metadata.len(), &mtime)?;
|
|
||||||
|
|
||||||
let lookup_path = remap_libraries_path(&abs_path);
|
|
||||||
if let Some((file_id, book_id, old_fingerprint)) = existing.get(&lookup_path).cloned() {
|
|
||||||
if !is_full_rebuild && old_fingerprint == fingerprint {
|
|
||||||
trace!("[PROCESS] Skipping unchanged file: {}", file_name);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("[PROCESS] Updating existing file: {} (full_rebuild={}, fingerprint_match={})", file_name, is_full_rebuild, old_fingerprint == fingerprint);
|
info!("[PROCESS] Updating existing file: {} (full_rebuild={}, fingerprint_match={})", file_info.file_name, is_full_rebuild, old_fingerprint == file_info.fingerprint);
|
||||||
|
|
||||||
match parse_metadata(path, format, root) {
|
match parse_result {
|
||||||
Ok(parsed) => {
|
Ok(parsed) => {
|
||||||
books_to_update.push(BookUpdate {
|
books_to_update.push(BookUpdate {
|
||||||
book_id,
|
book_id,
|
||||||
title: parsed.title,
|
title: parsed.title,
|
||||||
kind: kind_from_format(format).to_string(),
|
kind: kind_from_format(file_info.format).to_string(),
|
||||||
series: parsed.series,
|
series: parsed.series,
|
||||||
volume: parsed.volume,
|
volume: parsed.volume,
|
||||||
page_count: parsed.page_count,
|
page_count: parsed.page_count,
|
||||||
@@ -968,29 +1007,29 @@ async fn scan_library(
|
|||||||
|
|
||||||
files_to_update.push(FileUpdate {
|
files_to_update.push(FileUpdate {
|
||||||
file_id,
|
file_id,
|
||||||
format: format.as_str().to_string(),
|
format: file_info.format.as_str().to_string(),
|
||||||
size_bytes: metadata.len() as i64,
|
size_bytes: file_info.metadata.len() as i64,
|
||||||
mtime,
|
mtime: file_info.mtime,
|
||||||
fingerprint,
|
fingerprint: file_info.fingerprint,
|
||||||
});
|
});
|
||||||
|
|
||||||
stats.indexed_files += 1;
|
stats.indexed_files += 1;
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("[PARSER] Failed to parse {}: {}", file_name, err);
|
warn!("[PARSER] Failed to parse {}: {}", file_info.file_name, err);
|
||||||
stats.errors += 1;
|
stats.errors += 1;
|
||||||
|
|
||||||
files_to_update.push(FileUpdate {
|
files_to_update.push(FileUpdate {
|
||||||
file_id,
|
file_id,
|
||||||
format: format.as_str().to_string(),
|
format: file_info.format.as_str().to_string(),
|
||||||
size_bytes: metadata.len() as i64,
|
size_bytes: file_info.metadata.len() as i64,
|
||||||
mtime,
|
mtime: file_info.mtime,
|
||||||
fingerprint: fingerprint.clone(),
|
fingerprint: file_info.fingerprint.clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
errors_to_insert.push(ErrorInsert {
|
errors_to_insert.push(ErrorInsert {
|
||||||
job_id,
|
job_id,
|
||||||
file_path: abs_path.clone(),
|
file_path: file_info.abs_path.clone(),
|
||||||
error_message: err.to_string(),
|
error_message: err.to_string(),
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -1014,17 +1053,17 @@ async fn scan_library(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// New file (thumbnails generated by API after job handoff)
|
// New file (thumbnails generated by API after job handoff)
|
||||||
info!("[PROCESS] Inserting new file: {}", file_name);
|
info!("[PROCESS] Inserting new file: {}", file_info.file_name);
|
||||||
let book_id = Uuid::new_v4();
|
let book_id = Uuid::new_v4();
|
||||||
|
|
||||||
match parse_metadata(path, format, root) {
|
match parse_result {
|
||||||
Ok(parsed) => {
|
Ok(parsed) => {
|
||||||
let file_id = Uuid::new_v4();
|
let file_id = Uuid::new_v4();
|
||||||
|
|
||||||
books_to_insert.push(BookInsert {
|
books_to_insert.push(BookInsert {
|
||||||
book_id,
|
book_id,
|
||||||
library_id,
|
library_id,
|
||||||
kind: kind_from_format(format).to_string(),
|
kind: kind_from_format(file_info.format).to_string(),
|
||||||
title: parsed.title,
|
title: parsed.title,
|
||||||
series: parsed.series,
|
series: parsed.series,
|
||||||
volume: parsed.volume,
|
volume: parsed.volume,
|
||||||
@@ -1035,11 +1074,11 @@ async fn scan_library(
|
|||||||
files_to_insert.push(FileInsert {
|
files_to_insert.push(FileInsert {
|
||||||
file_id,
|
file_id,
|
||||||
book_id,
|
book_id,
|
||||||
format: format.as_str().to_string(),
|
format: file_info.format.as_str().to_string(),
|
||||||
abs_path: abs_path.clone(),
|
abs_path: file_info.abs_path.clone(),
|
||||||
size_bytes: metadata.len() as i64,
|
size_bytes: file_info.metadata.len() as i64,
|
||||||
mtime,
|
mtime: file_info.mtime,
|
||||||
fingerprint,
|
fingerprint: file_info.fingerprint,
|
||||||
parse_status: "ok".to_string(),
|
parse_status: "ok".to_string(),
|
||||||
parse_error: None,
|
parse_error: None,
|
||||||
});
|
});
|
||||||
@@ -1047,7 +1086,7 @@ async fn scan_library(
|
|||||||
stats.indexed_files += 1;
|
stats.indexed_files += 1;
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("[PARSER] Failed to parse {}: {}", file_name, err);
|
warn!("[PARSER] Failed to parse {}: {}", file_info.file_name, err);
|
||||||
stats.errors += 1;
|
stats.errors += 1;
|
||||||
let book_id = Uuid::new_v4();
|
let book_id = Uuid::new_v4();
|
||||||
let file_id = Uuid::new_v4();
|
let file_id = Uuid::new_v4();
|
||||||
@@ -1055,8 +1094,8 @@ async fn scan_library(
|
|||||||
books_to_insert.push(BookInsert {
|
books_to_insert.push(BookInsert {
|
||||||
book_id,
|
book_id,
|
||||||
library_id,
|
library_id,
|
||||||
kind: kind_from_format(format).to_string(),
|
kind: kind_from_format(file_info.format).to_string(),
|
||||||
title: file_display_name(path),
|
title: file_display_name(&file_info.path),
|
||||||
series: None,
|
series: None,
|
||||||
volume: None,
|
volume: None,
|
||||||
page_count: None,
|
page_count: None,
|
||||||
@@ -1066,18 +1105,18 @@ async fn scan_library(
|
|||||||
files_to_insert.push(FileInsert {
|
files_to_insert.push(FileInsert {
|
||||||
file_id,
|
file_id,
|
||||||
book_id,
|
book_id,
|
||||||
format: format.as_str().to_string(),
|
format: file_info.format.as_str().to_string(),
|
||||||
abs_path: abs_path.clone(),
|
abs_path: file_info.abs_path.clone(),
|
||||||
size_bytes: metadata.len() as i64,
|
size_bytes: file_info.metadata.len() as i64,
|
||||||
mtime,
|
mtime: file_info.mtime,
|
||||||
fingerprint,
|
fingerprint: file_info.fingerprint,
|
||||||
parse_status: "error".to_string(),
|
parse_status: "error".to_string(),
|
||||||
parse_error: Some(err.to_string()),
|
parse_error: Some(err.to_string()),
|
||||||
});
|
});
|
||||||
|
|
||||||
errors_to_insert.push(ErrorInsert {
|
errors_to_insert.push(ErrorInsert {
|
||||||
job_id,
|
job_id,
|
||||||
file_path: abs_path,
|
file_path: file_info.abs_path,
|
||||||
error_message: err.to_string(),
|
error_message: err.to_string(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -1087,8 +1126,6 @@ async fn scan_library(
|
|||||||
if books_to_insert.len() >= BATCH_SIZE || files_to_insert.len() >= BATCH_SIZE {
|
if books_to_insert.len() >= BATCH_SIZE || files_to_insert.len() >= BATCH_SIZE {
|
||||||
flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?;
|
flush_all_batches(&state.pool, &mut books_to_update, &mut files_to_update, &mut books_to_insert, &mut files_to_insert, &mut errors_to_insert).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("[DONE] Processed file {} (total time: {:?})", file_name, start_time.elapsed());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Final flush of any remaining items
|
// Final flush of any remaining items
|
||||||
|
|||||||
Reference in New Issue
Block a user