feat: implement request monitoring and queuing services to manage concurrent requests to Komga

This commit is contained in:
Julien Froidefond
2025-10-14 20:20:02 +02:00
parent 5afb495cd4
commit b954a271d6
11 changed files with 6648 additions and 4688 deletions

View File

@@ -6,6 +6,8 @@ import { AppError } from "../../utils/errors";
import type { KomgaConfig } from "@/types/komga";
import type { ServerCacheService } from "./server-cache.service";
import { DebugService } from "./debug.service";
import { RequestMonitorService } from "./request-monitor.service";
import { RequestQueueService } from "./request-queue.service";
// Types de cache disponibles
export type CacheType = "DEFAULT" | "HOME" | "LIBRARIES" | "SERIES" | "BOOKS" | "IMAGES";
@@ -99,9 +101,27 @@ export abstract class BaseApiService {
const startTime = performance.now();
// Timeout de 60 secondes au lieu de 10 par défaut
const timeoutMs = 60000;
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
try {
const response = await fetch(url, { headers, ...options });
const endTime = performance.now();
// Enqueue la requête pour limiter la concurrence
const response = await RequestQueueService.enqueue(async () => {
return await fetch(url, {
headers,
...options,
signal: controller.signal,
// Configure undici connection timeouts
// @ts-ignore - undici-specific options not in standard fetch types
connectTimeout: timeoutMs,
bodyTimeout: timeoutMs,
headersTimeout: timeoutMs,
});
});
clearTimeout(timeoutId);
const endTime = performance.now();
// Logger la requête côté serveur
await DebugService.logRequest({
@@ -131,6 +151,9 @@ export abstract class BaseApiService {
});
throw error;
} finally {
clearTimeout(timeoutId);
RequestMonitorService.decrementActive();
}
}
}

View File

@@ -12,7 +12,7 @@ export class ImageService extends BaseApiService {
try {
const headers = { Accept: "image/jpeg, image/png, image/gif, image/webp, */*" };
return this.fetchWithCache<ImageResponse>(
const result = await this.fetchWithCache<ImageResponse>(
`image-${path}`,
async () => {
const response = await this.fetchFromApi<Response>({ path }, headers, { isImage: true });
@@ -27,6 +27,8 @@ export class ImageService extends BaseApiService {
},
"IMAGES"
);
return result;
} catch (error) {
console.error("Erreur lors de la récupération de l'image:", error);
throw new AppError(ERROR_CODES.IMAGE.FETCH_ERROR, {}, error);

View File

@@ -0,0 +1,44 @@
/**
* Service de monitoring des requêtes concurrentes vers Komga
* Permet de tracker le nombre de requêtes actives et d'alerter en cas de charge élevée
*/
class RequestMonitor {
private activeRequests = 0;
private readonly thresholds = {
warning: 10,
high: 20,
critical: 30,
};
incrementActive(): number {
this.activeRequests++;
this.checkThresholds();
return this.activeRequests;
}
decrementActive(): number {
this.activeRequests = Math.max(0, this.activeRequests - 1);
return this.activeRequests;
}
getActiveCount(): number {
return this.activeRequests;
}
private checkThresholds(): void {
const count = this.activeRequests;
if (count >= this.thresholds.critical) {
console.warn(`[REQUEST-MONITOR] 🔴 CRITICAL concurrency: ${count} active requests`);
} else if (count >= this.thresholds.high) {
console.warn(`[REQUEST-MONITOR] ⚠️ HIGH concurrency: ${count} active requests`);
} else if (count >= this.thresholds.warning) {
console.log(`[REQUEST-MONITOR] ⚡ Warning concurrency: ${count} active requests`);
}
}
}
// Singleton instance
export const RequestMonitorService = new RequestMonitor();

View File

@@ -0,0 +1,73 @@
/**
* Service de gestion de queue pour limiter les requêtes concurrentes vers Komga
* Évite de surcharger Komga avec trop de requêtes simultanées
*/
interface QueuedRequest<T> {
execute: () => Promise<T>;
resolve: (value: T) => void;
reject: (error: any) => void;
}
class RequestQueue {
private queue: QueuedRequest<any>[] = [];
private activeCount = 0;
private maxConcurrent: number;
constructor(maxConcurrent: number = 5) {
this.maxConcurrent = maxConcurrent;
}
async enqueue<T>(execute: () => Promise<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
this.queue.push({ execute, resolve, reject });
this.processQueue();
});
}
private async delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
private async processQueue(): Promise<void> {
if (this.activeCount >= this.maxConcurrent || this.queue.length === 0) {
return;
}
this.activeCount++;
const request = this.queue.shift();
if (!request) {
this.activeCount--;
return;
}
try {
// Délai de 200ms entre chaque requête pour espacer la charge CPU sur Komga
await this.delay(200);
const result = await request.execute();
request.resolve(result);
} catch (error) {
request.reject(error);
} finally {
this.activeCount--;
this.processQueue();
}
}
getActiveCount(): number {
return this.activeCount;
}
getQueueLength(): number {
return this.queue.length;
}
setMaxConcurrent(max: number): void {
this.maxConcurrent = max;
}
}
// Singleton instance - Limite à 2 requêtes simultanées vers Komga (réduit pour CPU)
export const RequestQueueService = new RequestQueue(2);