feat: enhance session management with sharing capabilities, real-time event synchronization, and UI updates for session display
This commit is contained in:
114
src/app/api/sessions/[id]/subscribe/route.ts
Normal file
114
src/app/api/sessions/[id]/subscribe/route.ts
Normal file
@@ -0,0 +1,114 @@
|
||||
import { auth } from '@/lib/auth';
|
||||
import { canAccessSession, getSessionEvents } from '@/services/sessions';
|
||||
|
||||
export const dynamic = 'force-dynamic';
|
||||
|
||||
// Store active connections per session
|
||||
const connections = new Map<string, Set<ReadableStreamDefaultController>>();
|
||||
|
||||
export async function GET(
|
||||
request: Request,
|
||||
{ params }: { params: Promise<{ id: string }> }
|
||||
) {
|
||||
const { id: sessionId } = await params;
|
||||
const session = await auth();
|
||||
|
||||
if (!session?.user?.id) {
|
||||
return new Response('Unauthorized', { status: 401 });
|
||||
}
|
||||
|
||||
// Check access
|
||||
const hasAccess = await canAccessSession(sessionId, session.user.id);
|
||||
if (!hasAccess) {
|
||||
return new Response('Forbidden', { status: 403 });
|
||||
}
|
||||
|
||||
const userId = session.user.id;
|
||||
let lastEventTime = new Date();
|
||||
let controller: ReadableStreamDefaultController;
|
||||
|
||||
const stream = new ReadableStream({
|
||||
start(ctrl) {
|
||||
controller = ctrl;
|
||||
|
||||
// Register connection
|
||||
if (!connections.has(sessionId)) {
|
||||
connections.set(sessionId, new Set());
|
||||
}
|
||||
connections.get(sessionId)!.add(controller);
|
||||
|
||||
// Send initial ping
|
||||
const encoder = new TextEncoder();
|
||||
controller.enqueue(
|
||||
encoder.encode(`data: ${JSON.stringify({ type: 'connected', userId })}\n\n`)
|
||||
);
|
||||
},
|
||||
cancel() {
|
||||
// Remove connection on close
|
||||
connections.get(sessionId)?.delete(controller);
|
||||
if (connections.get(sessionId)?.size === 0) {
|
||||
connections.delete(sessionId);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
// Poll for new events (simple approach, works with any DB)
|
||||
const pollInterval = setInterval(async () => {
|
||||
try {
|
||||
const events = await getSessionEvents(sessionId, lastEventTime);
|
||||
if (events.length > 0) {
|
||||
const encoder = new TextEncoder();
|
||||
for (const event of events) {
|
||||
// Don't send events to the user who created them
|
||||
if (event.userId !== userId) {
|
||||
controller.enqueue(
|
||||
encoder.encode(
|
||||
`data: ${JSON.stringify({
|
||||
type: event.type,
|
||||
payload: JSON.parse(event.payload),
|
||||
user: event.user,
|
||||
timestamp: event.createdAt,
|
||||
})}\n\n`
|
||||
)
|
||||
);
|
||||
}
|
||||
lastEventTime = event.createdAt;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Connection might be closed
|
||||
clearInterval(pollInterval);
|
||||
}
|
||||
}, 1000); // Poll every second
|
||||
|
||||
// Cleanup on abort
|
||||
request.signal.addEventListener('abort', () => {
|
||||
clearInterval(pollInterval);
|
||||
});
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Helper to broadcast to all connections (called from actions)
|
||||
export function broadcastToSession(sessionId: string, event: object) {
|
||||
const sessionConnections = connections.get(sessionId);
|
||||
if (!sessionConnections) return;
|
||||
|
||||
const encoder = new TextEncoder();
|
||||
const message = encoder.encode(`data: ${JSON.stringify(event)}\n\n`);
|
||||
|
||||
for (const controller of sessionConnections) {
|
||||
try {
|
||||
controller.enqueue(message);
|
||||
} catch {
|
||||
// Connection closed, will be cleaned up
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user