fix: fiabilise le SSE du widget jobs dans le header
- Serveur : envoie toujours les données (plus de skip si identiques), ajoute un heartbeat toutes les 15s pour garder la connexion vivante - Client : détecte les connexions mortes (timeout 30s sans message) et reconnecte automatiquement, reconnexion plus rapide (3s vs 5s) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -12,6 +12,17 @@ export async function GET(request: NextRequest) {
|
|||||||
let isActive = true;
|
let isActive = true;
|
||||||
let consecutiveErrors = 0;
|
let consecutiveErrors = 0;
|
||||||
let intervalId: ReturnType<typeof setInterval> | null = null;
|
let intervalId: ReturnType<typeof setInterval> | null = null;
|
||||||
|
let heartbeatId: ReturnType<typeof setInterval> | null = null;
|
||||||
|
|
||||||
|
const send = (msg: string): boolean => {
|
||||||
|
try {
|
||||||
|
controller.enqueue(new TextEncoder().encode(msg));
|
||||||
|
return true;
|
||||||
|
} catch {
|
||||||
|
isActive = false;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const fetchJobs = async () => {
|
const fetchJobs = async () => {
|
||||||
if (!isActive) return;
|
if (!isActive) return;
|
||||||
@@ -26,23 +37,17 @@ export async function GET(request: NextRequest) {
|
|||||||
const data = await response.json();
|
const data = await response.json();
|
||||||
const dataStr = JSON.stringify(data);
|
const dataStr = JSON.stringify(data);
|
||||||
|
|
||||||
// Send only if data changed
|
// Always send data (client needs fresh timestamps for active jobs)
|
||||||
if (dataStr !== lastData && isActive) {
|
if (isActive) {
|
||||||
lastData = dataStr;
|
lastData = dataStr;
|
||||||
try {
|
send(`data: ${dataStr}\n\n`);
|
||||||
controller.enqueue(
|
|
||||||
new TextEncoder().encode(`data: ${dataStr}\n\n`)
|
|
||||||
);
|
|
||||||
} catch {
|
|
||||||
isActive = false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adapt interval: 2s when active jobs exist, 15s when idle
|
// Adapt interval: 2s when active jobs exist, 10s when idle
|
||||||
const hasActiveJobs = data.some((j: { status: string }) =>
|
const hasActiveJobs = data.some((j: { status: string }) =>
|
||||||
j.status === "running" || j.status === "pending" || j.status === "extracting_pages" || j.status === "generating_thumbnails"
|
j.status === "running" || j.status === "pending" || j.status === "extracting_pages" || j.status === "generating_thumbnails"
|
||||||
);
|
);
|
||||||
const nextInterval = hasActiveJobs ? 2000 : 15000;
|
const nextInterval = hasActiveJobs ? 2000 : 10000;
|
||||||
restartInterval(nextInterval);
|
restartInterval(nextInterval);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@@ -60,6 +65,11 @@ export async function GET(request: NextRequest) {
|
|||||||
intervalId = setInterval(fetchJobs, ms);
|
intervalId = setInterval(fetchJobs, ms);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Heartbeat every 15s to keep connection alive
|
||||||
|
heartbeatId = setInterval(() => {
|
||||||
|
if (isActive) send(": heartbeat\n\n");
|
||||||
|
}, 15000);
|
||||||
|
|
||||||
// Initial fetch + start polling
|
// Initial fetch + start polling
|
||||||
await fetchJobs();
|
await fetchJobs();
|
||||||
|
|
||||||
@@ -67,6 +77,7 @@ export async function GET(request: NextRequest) {
|
|||||||
request.signal.addEventListener("abort", () => {
|
request.signal.addEventListener("abort", () => {
|
||||||
isActive = false;
|
isActive = false;
|
||||||
if (intervalId !== null) clearInterval(intervalId);
|
if (intervalId !== null) clearInterval(intervalId);
|
||||||
|
if (heartbeatId !== null) clearInterval(heartbeatId);
|
||||||
controller.close();
|
controller.close();
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -56,14 +56,27 @@ export function JobsIndicator() {
|
|||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
let eventSource: EventSource | null = null;
|
let eventSource: EventSource | null = null;
|
||||||
let reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
|
let reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
let staleTimeout: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
|
||||||
|
const resetStaleTimer = () => {
|
||||||
|
if (staleTimeout) clearTimeout(staleTimeout);
|
||||||
|
// If no message received in 30s, reconnect (heartbeat should come every 15s)
|
||||||
|
staleTimeout = setTimeout(() => {
|
||||||
|
eventSource?.close();
|
||||||
|
eventSource = null;
|
||||||
|
connect();
|
||||||
|
}, 30000);
|
||||||
|
};
|
||||||
|
|
||||||
const connect = () => {
|
const connect = () => {
|
||||||
if (eventSource) {
|
if (eventSource) {
|
||||||
eventSource.close();
|
eventSource.close();
|
||||||
}
|
}
|
||||||
eventSource = new EventSource("/api/jobs/stream");
|
eventSource = new EventSource("/api/jobs/stream");
|
||||||
|
resetStaleTimer();
|
||||||
|
|
||||||
eventSource.onmessage = (event) => {
|
eventSource.onmessage = (event) => {
|
||||||
|
resetStaleTimer();
|
||||||
try {
|
try {
|
||||||
const allJobs: Job[] = JSON.parse(event.data);
|
const allJobs: Job[] = JSON.parse(event.data);
|
||||||
const active = allJobs.filter(j =>
|
const active = allJobs.filter(j =>
|
||||||
@@ -79,20 +92,15 @@ export function JobsIndicator() {
|
|||||||
eventSource.onerror = () => {
|
eventSource.onerror = () => {
|
||||||
eventSource?.close();
|
eventSource?.close();
|
||||||
eventSource = null;
|
eventSource = null;
|
||||||
// Reconnect after 5s on error
|
// Reconnect after 3s on error
|
||||||
reconnectTimeout = setTimeout(connect, 5000);
|
reconnectTimeout = setTimeout(connect, 3000);
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
const disconnect = () => {
|
const disconnect = () => {
|
||||||
if (reconnectTimeout) {
|
if (reconnectTimeout) { clearTimeout(reconnectTimeout); reconnectTimeout = null; }
|
||||||
clearTimeout(reconnectTimeout);
|
if (staleTimeout) { clearTimeout(staleTimeout); staleTimeout = null; }
|
||||||
reconnectTimeout = null;
|
if (eventSource) { eventSource.close(); eventSource = null; }
|
||||||
}
|
|
||||||
if (eventSource) {
|
|
||||||
eventSource.close();
|
|
||||||
eventSource = null;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const handleVisibilityChange = () => {
|
const handleVisibilityChange = () => {
|
||||||
|
|||||||
Reference in New Issue
Block a user