diff --git a/apps/backoffice/app/api/jobs/stream/route.ts b/apps/backoffice/app/api/jobs/stream/route.ts index 04cb4cb..caec1fc 100644 --- a/apps/backoffice/app/api/jobs/stream/route.ts +++ b/apps/backoffice/app/api/jobs/stream/route.ts @@ -7,10 +7,11 @@ export async function GET(request: NextRequest) { const stream = new ReadableStream({ async start(controller) { controller.enqueue(new TextEncoder().encode("")); - + let lastData: string | null = null; let isActive = true; let consecutiveErrors = 0; + let intervalId: ReturnType | null = null; const fetchJobs = async () => { if (!isActive) return; @@ -25,51 +26,52 @@ export async function GET(request: NextRequest) { const data = await response.json(); const dataStr = JSON.stringify(data); - // Send if data changed + // Send only if data changed if (dataStr !== lastData && isActive) { lastData = dataStr; try { controller.enqueue( new TextEncoder().encode(`data: ${dataStr}\n\n`) ); - } catch (err) { - // Controller closed, ignore + } catch { isActive = false; } } + + // Adapt interval: 2s when active jobs exist, 15s when idle + const hasActiveJobs = data.some((j: { status: string }) => + j.status === "running" || j.status === "pending" || j.status === "extracting_pages" || j.status === "generating_thumbnails" + ); + const nextInterval = hasActiveJobs ? 2000 : 15000; + restartInterval(nextInterval); } } catch (error) { if (isActive) { consecutiveErrors++; - // Only log first failure and every 30th to avoid spam if (consecutiveErrors === 1 || consecutiveErrors % 30 === 0) { console.warn(`SSE fetch error (${consecutiveErrors} consecutive):`, error); } } } }; - - // Initial fetch + + const restartInterval = (ms: number) => { + if (intervalId !== null) clearInterval(intervalId); + intervalId = setInterval(fetchJobs, ms); + }; + + // Initial fetch + start polling await fetchJobs(); - - // Poll every 2 seconds - const interval = setInterval(async () => { - if (!isActive) { - clearInterval(interval); - return; - } - await fetchJobs(); - }, 2000); - + // Cleanup request.signal.addEventListener("abort", () => { isActive = false; - clearInterval(interval); + if (intervalId !== null) clearInterval(intervalId); controller.close(); }); }, }); - + return new Response(stream, { headers: { "Content-Type": "text/event-stream", diff --git a/apps/backoffice/app/components/JobsIndicator.tsx b/apps/backoffice/app/components/JobsIndicator.tsx index e0d26f5..45ffe1e 100644 --- a/apps/backoffice/app/components/JobsIndicator.tsx +++ b/apps/backoffice/app/components/JobsIndicator.tsx @@ -54,44 +54,60 @@ export function JobsIndicator() { const [popinStyle, setPopinStyle] = useState({}); useEffect(() => { - let intervalId: ReturnType | null = null; + let eventSource: EventSource | null = null; + let reconnectTimeout: ReturnType | null = null; - const fetchActiveJobs = async () => { - try { - const response = await fetch("/api/jobs/active"); - if (response.ok) { - const jobs: Job[] = await response.json(); - setActiveJobs(jobs); - // Adapt polling interval: 2s when jobs are active, 30s when idle - restartInterval(jobs.length > 0 ? 2000 : 30000); - } - } catch (error) { - console.error("Failed to fetch jobs:", error); + const connect = () => { + if (eventSource) { + eventSource.close(); } + eventSource = new EventSource("/api/jobs/stream"); + + eventSource.onmessage = (event) => { + try { + const allJobs: Job[] = JSON.parse(event.data); + const active = allJobs.filter(j => + j.status === "running" || j.status === "pending" || + j.status === "extracting_pages" || j.status === "generating_thumbnails" + ); + setActiveJobs(active); + } catch { + // ignore malformed data + } + }; + + eventSource.onerror = () => { + eventSource?.close(); + eventSource = null; + // Reconnect after 5s on error + reconnectTimeout = setTimeout(connect, 5000); + }; }; - const restartInterval = (ms: number) => { - if (intervalId !== null) clearInterval(intervalId); - intervalId = setInterval(fetchActiveJobs, ms); + const disconnect = () => { + if (reconnectTimeout) { + clearTimeout(reconnectTimeout); + reconnectTimeout = null; + } + if (eventSource) { + eventSource.close(); + eventSource = null; + } }; const handleVisibilityChange = () => { if (document.hidden) { - if (intervalId !== null) { - clearInterval(intervalId); - intervalId = null; - } + disconnect(); } else { - // Refetch immediately when tab becomes visible, then resume polling - fetchActiveJobs(); + connect(); } }; - fetchActiveJobs(); + connect(); document.addEventListener("visibilitychange", handleVisibilityChange); return () => { - if (intervalId !== null) clearInterval(intervalId); + disconnect(); document.removeEventListener("visibilitychange", handleVisibilityChange); }; }, []);