refactor: replace Meilisearch with PostgreSQL full-text search
Remove Meilisearch dependency entirely. Search is now handled by PostgreSQL ILIKE with pg_trgm indexes, joining series_metadata for series-level authors. No external search engine needed. - Replace search.rs Meilisearch HTTP calls with PostgreSQL queries - Remove meili.rs from indexer, sync_meili call from job pipeline - Remove MEILI_URL/MEILI_MASTER_KEY from config, state, env files - Remove meilisearch service from docker-compose.yml - Add migration 0027: drop sync_metadata, enable pg_trgm, add indexes - Remove search resync button/endpoint (no longer needed) - Update all documentation (CLAUDE.md, README.md, AGENTS.md, PLAN.md) API contract unchanged — same SearchResponse shape returned. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -7,7 +7,7 @@ Service background sur le port **7081**. Voir `AGENTS.md` racine pour les conven
|
||||
| Fichier | Rôle |
|
||||
|---------|------|
|
||||
| `main.rs` | Point d'entrée, initialisation, lancement du worker |
|
||||
| `lib.rs` | `AppState` (pool, meili_url, meili_master_key) |
|
||||
| `lib.rs` | `AppState` (pool) |
|
||||
| `worker.rs` | Boucle principale : claim job → process → cleanup stale |
|
||||
| `job.rs` | `claim_next_job`, `process_job`, `fail_job`, `cleanup_stale_jobs` |
|
||||
| `scanner.rs` | Phase 1 discovery : WalkDir + `parse_metadata_fast` (zéro I/O archive), skip dossiers inchangés via mtime, batching DB |
|
||||
@@ -15,7 +15,6 @@ Service background sur le port **7081**. Voir `AGENTS.md` racine pour les conven
|
||||
| `batch.rs` | `flush_all_batches` avec UNNEST, structures `BookInsert/Update/FileInsert/Update/ErrorInsert` |
|
||||
| `scheduler.rs` | Auto-scan : vérifie toutes les 60s les bibliothèques à monitorer |
|
||||
| `watcher.rs` | File watcher temps réel |
|
||||
| `meili.rs` | Indexation/sync Meilisearch |
|
||||
| `api.rs` | Endpoints HTTP de l'indexer (/health, /ready) |
|
||||
| `utils.rs` | `remap_libraries_path`, `unmap_libraries_path`, `compute_fingerprint`, `kind_from_format` |
|
||||
|
||||
@@ -28,7 +27,6 @@ claim_next_job (UPDATE ... RETURNING, status pending→running)
|
||||
│ ├─ WalkDir + parse_metadata_fast (zéro I/O archive)
|
||||
│ ├─ skip dossiers via directory_mtimes (table DB)
|
||||
│ └─ INSERT books (page_count=NULL) → livres visibles immédiatement
|
||||
├─ meili::sync_meili
|
||||
├─ analyzer::cleanup_orphaned_thumbnails (full_rebuild uniquement)
|
||||
└─ Phase 2 : analyzer::analyze_library_books
|
||||
├─ SELECT books WHERE page_count IS NULL
|
||||
|
||||
@@ -3,7 +3,7 @@ use sqlx::{PgPool, Row};
|
||||
use tracing::{error, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{analyzer, converter, meili, scanner, AppState};
|
||||
use crate::{analyzer, converter, scanner, AppState};
|
||||
|
||||
pub async fn cleanup_stale_jobs(pool: &PgPool) -> Result<()> {
|
||||
let result = sqlx::query(
|
||||
@@ -337,9 +337,6 @@ pub async fn process_job(
|
||||
}
|
||||
}
|
||||
|
||||
// Sync search index after discovery (books are visible immediately)
|
||||
meili::sync_meili(&state.pool, &state.meili_url, &state.meili_master_key).await?;
|
||||
|
||||
// For full rebuild: clean up orphaned thumbnail files (old UUIDs)
|
||||
if is_full_rebuild {
|
||||
analyzer::cleanup_orphaned_thumbnails(state).await?;
|
||||
|
||||
@@ -3,7 +3,6 @@ pub mod api;
|
||||
pub mod batch;
|
||||
pub mod converter;
|
||||
pub mod job;
|
||||
pub mod meili;
|
||||
pub mod scheduler;
|
||||
pub mod scanner;
|
||||
pub mod utils;
|
||||
@@ -15,6 +14,4 @@ use sqlx::PgPool;
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
pub pool: PgPool,
|
||||
pub meili_url: String,
|
||||
pub meili_master_key: String,
|
||||
}
|
||||
|
||||
@@ -30,11 +30,7 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
.connect(&config.database_url)
|
||||
.await?;
|
||||
|
||||
let state = AppState {
|
||||
pool,
|
||||
meili_url: config.meili_url.clone(),
|
||||
meili_master_key: config.meili_master_key.clone(),
|
||||
};
|
||||
let state = AppState { pool };
|
||||
|
||||
tokio::spawn(indexer::worker::run_worker(state.clone(), config.scan_interval_seconds));
|
||||
|
||||
|
||||
@@ -1,214 +0,0 @@
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use reqwest::Client;
|
||||
use serde::Serialize;
|
||||
use sqlx::{PgPool, Row};
|
||||
use tracing::info;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct SearchDoc {
|
||||
id: String,
|
||||
library_id: String,
|
||||
kind: String,
|
||||
title: String,
|
||||
authors: Vec<String>,
|
||||
series: Option<String>,
|
||||
volume: Option<i32>,
|
||||
language: Option<String>,
|
||||
}
|
||||
|
||||
pub async fn sync_meili(pool: &PgPool, meili_url: &str, meili_master_key: &str) -> Result<()> {
|
||||
let client = Client::new();
|
||||
let base = meili_url.trim_end_matches('/');
|
||||
|
||||
// Ensure index exists and has proper settings
|
||||
let _ = client
|
||||
.post(format!("{base}/indexes"))
|
||||
.header("Authorization", format!("Bearer {meili_master_key}"))
|
||||
.json(&serde_json::json!({"uid": "books", "primaryKey": "id"}))
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let _ = client
|
||||
.patch(format!("{base}/indexes/books/settings/filterable-attributes"))
|
||||
.header("Authorization", format!("Bearer {meili_master_key}"))
|
||||
.json(&serde_json::json!(["library_id", "kind"]))
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let _ = client
|
||||
.put(format!("{base}/indexes/books/settings/searchable-attributes"))
|
||||
.header("Authorization", format!("Bearer {meili_master_key}"))
|
||||
.json(&serde_json::json!(["title", "authors", "series"]))
|
||||
.send()
|
||||
.await;
|
||||
|
||||
// Get last sync timestamp
|
||||
let last_sync: Option<DateTime<Utc>> = sqlx::query_scalar(
|
||||
"SELECT last_meili_sync FROM sync_metadata WHERE id = 1 AND last_meili_sync IS NOT NULL"
|
||||
)
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
|
||||
// If no previous sync, do a full sync
|
||||
let is_full_sync = last_sync.is_none();
|
||||
|
||||
// Get books to sync: all if full sync, only modified since last sync otherwise.
|
||||
// Join series_metadata to merge series-level authors into the search document.
|
||||
let books_query = r#"
|
||||
SELECT b.id, b.library_id, b.kind, b.title, b.series, b.volume, b.language, b.updated_at,
|
||||
ARRAY(
|
||||
SELECT DISTINCT unnest(
|
||||
COALESCE(b.authors, CASE WHEN b.author IS NOT NULL AND b.author != '' THEN ARRAY[b.author] ELSE ARRAY[]::text[] END)
|
||||
|| COALESCE(sm.authors, ARRAY[]::text[])
|
||||
)
|
||||
) as authors
|
||||
FROM books b
|
||||
LEFT JOIN series_metadata sm
|
||||
ON sm.library_id = b.library_id
|
||||
AND sm.name = COALESCE(NULLIF(b.series, ''), 'unclassified')
|
||||
"#;
|
||||
|
||||
let rows = if is_full_sync {
|
||||
info!("[MEILI] Performing full sync");
|
||||
sqlx::query(books_query)
|
||||
.fetch_all(pool)
|
||||
.await?
|
||||
} else {
|
||||
let since = last_sync.unwrap();
|
||||
info!("[MEILI] Performing incremental sync since {}", since);
|
||||
|
||||
// Include books that changed OR whose series_metadata changed
|
||||
sqlx::query(&format!(
|
||||
"{books_query} WHERE b.updated_at > $1 OR sm.updated_at > $1"
|
||||
))
|
||||
.bind(since)
|
||||
.fetch_all(pool)
|
||||
.await?
|
||||
};
|
||||
|
||||
if rows.is_empty() && !is_full_sync {
|
||||
info!("[MEILI] No changes to sync");
|
||||
// Still update the timestamp
|
||||
sqlx::query(
|
||||
"INSERT INTO sync_metadata (id, last_meili_sync) VALUES (1, NOW()) ON CONFLICT (id) DO UPDATE SET last_meili_sync = NOW()"
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let docs: Vec<SearchDoc> = rows
|
||||
.into_iter()
|
||||
.map(|row| SearchDoc {
|
||||
id: row.get::<Uuid, _>("id").to_string(),
|
||||
library_id: row.get::<Uuid, _>("library_id").to_string(),
|
||||
kind: row.get("kind"),
|
||||
title: row.get("title"),
|
||||
authors: row.get::<Vec<String>, _>("authors"),
|
||||
series: row.get("series"),
|
||||
volume: row.get("volume"),
|
||||
language: row.get("language"),
|
||||
})
|
||||
.collect();
|
||||
|
||||
let doc_count = docs.len();
|
||||
|
||||
// Send documents to MeiliSearch in batches of 1000
|
||||
const MEILI_BATCH_SIZE: usize = 1000;
|
||||
for (i, chunk) in docs.chunks(MEILI_BATCH_SIZE).enumerate() {
|
||||
let batch_num = i + 1;
|
||||
info!("[MEILI] Sending batch {}/{} ({} docs)", batch_num, doc_count.div_ceil(MEILI_BATCH_SIZE), chunk.len());
|
||||
|
||||
let response = client
|
||||
.post(format!("{base}/indexes/books/documents"))
|
||||
.header("Authorization", format!("Bearer {meili_master_key}"))
|
||||
.json(&chunk)
|
||||
.send()
|
||||
.await
|
||||
.context("failed to send docs to meili")?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
return Err(anyhow::anyhow!("MeiliSearch error {}: {}", status, body));
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up stale documents: remove from Meilisearch any IDs that no longer exist in DB.
|
||||
// Runs on every sync — the cost is minimal (single fetch of IDs only).
|
||||
{
|
||||
let db_ids: Vec<String> = sqlx::query_scalar("SELECT id::text FROM books")
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
// Fetch all document IDs from Meilisearch (paginated to handle large collections)
|
||||
let mut meili_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
|
||||
let mut offset: usize = 0;
|
||||
const PAGE_SIZE: usize = 10000;
|
||||
|
||||
loop {
|
||||
let response = client
|
||||
.post(format!("{base}/indexes/books/documents/fetch"))
|
||||
.header("Authorization", format!("Bearer {meili_master_key}"))
|
||||
.json(&serde_json::json!({
|
||||
"fields": ["id"],
|
||||
"limit": PAGE_SIZE,
|
||||
"offset": offset
|
||||
}))
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let response = match response {
|
||||
Ok(r) if r.status().is_success() => r,
|
||||
_ => break,
|
||||
};
|
||||
|
||||
let payload: serde_json::Value = match response.json().await {
|
||||
Ok(v) => v,
|
||||
Err(_) => break,
|
||||
};
|
||||
|
||||
let results = payload.get("results")
|
||||
.and_then(|v| v.as_array())
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
let page_count = results.len();
|
||||
for doc in results {
|
||||
if let Some(id) = doc.get("id").and_then(|v| v.as_str()) {
|
||||
meili_ids.insert(id.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
if page_count < PAGE_SIZE {
|
||||
break; // Last page
|
||||
}
|
||||
offset += PAGE_SIZE;
|
||||
}
|
||||
|
||||
let db_ids_set: std::collections::HashSet<String> = db_ids.into_iter().collect();
|
||||
let to_delete: Vec<String> = meili_ids.difference(&db_ids_set).cloned().collect();
|
||||
|
||||
if !to_delete.is_empty() {
|
||||
info!("[MEILI] Deleting {} stale documents", to_delete.len());
|
||||
let _ = client
|
||||
.post(format!("{base}/indexes/books/documents/delete-batch"))
|
||||
.header("Authorization", format!("Bearer {meili_master_key}"))
|
||||
.json(&to_delete)
|
||||
.send()
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
// Update last sync timestamp
|
||||
sqlx::query(
|
||||
"INSERT INTO sync_metadata (id, last_meili_sync) VALUES (1, NOW()) ON CONFLICT (id) DO UPDATE SET last_meili_sync = NOW()"
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
info!("[MEILI] Sync completed: {} documents indexed", doc_count);
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user