From 5874cf80f58dc1844f35d1cc9d3058fccfadd013 Mon Sep 17 00:00:00 2001 From: Arthur Sepiol Date: Wed, 10 Dec 2025 16:06:18 +0300 Subject: [PATCH] Add authentication, Redis pub/sub, and error handling to SSE Improvements: - Add Redis pub/sub support for multi-server deployments - Add authentication check to SSE stream endpoint - Add 30s heartbeat keepalive for long-lived connections - Implement exponential backoff reconnection logic in client - Fix TypeScript types (websiteId optional, timer types) - Use specific query key invalidation instead of broad match - Fix undefined access in session-events listener map --- src/app/api/send/route.ts | 2 +- .../[websiteId]/sessions/stream/route.ts | 27 ++++++++- src/components/hooks/useSessionStream.ts | 55 +++++++++++++++++-- src/lib/session-events.ts | 41 +++++++++++++- 4 files changed, 114 insertions(+), 11 deletions(-) diff --git a/src/app/api/send/route.ts b/src/app/api/send/route.ts index f5f7a0ac..3117cf23 100644 --- a/src/app/api/send/route.ts +++ b/src/app/api/send/route.ts @@ -152,7 +152,7 @@ export async function POST(request: Request) { distinctId: id, createdAt, }); - emitSessionCreated(sourceId, sessionId); + await emitSessionCreated(sourceId, sessionId); } // Visit info diff --git a/src/app/api/websites/[websiteId]/sessions/stream/route.ts b/src/app/api/websites/[websiteId]/sessions/stream/route.ts index ec7b154c..db78a958 100644 --- a/src/app/api/websites/[websiteId]/sessions/stream/route.ts +++ b/src/app/api/websites/[websiteId]/sessions/stream/route.ts @@ -1,20 +1,45 @@ -import { subscribeToSessions } from '@/lib/session-events'; +import { initializeRedisSubscriber, subscribeToSessions } from '@/lib/session-events'; +import { parseRequest } from '@/lib/request'; +import { unauthorized } from '@/lib/response'; +import { canViewWebsite } from '@/permissions'; + +const HEARTBEAT_INTERVAL = 30000; export async function GET( request: Request, { params }: { params: Promise<{ websiteId: string }> }, ) { + const { auth, error } = await parseRequest(request); + + if (error) { + return error(); + } + const { websiteId } = await params; + if (!(await canViewWebsite(auth, websiteId))) { + return unauthorized(); + } + + await initializeRedisSubscriber(); + const stream = new ReadableStream({ start(controller) { const encoder = new TextEncoder(); + let heartbeatTimer: ReturnType | null = null; const unsubscribe = subscribeToSessions(websiteId, (_, sessionId) => { controller.enqueue(encoder.encode(`data: ${JSON.stringify({ sessionId })}\n\n`)); }); + heartbeatTimer = setInterval(() => { + controller.enqueue(encoder.encode(': heartbeat\n\n')); + }, HEARTBEAT_INTERVAL); + request.signal.addEventListener('abort', () => { + if (heartbeatTimer) { + clearInterval(heartbeatTimer); + } unsubscribe(); controller.close(); }); diff --git a/src/components/hooks/useSessionStream.ts b/src/components/hooks/useSessionStream.ts index 0abc991d..34188e6b 100644 --- a/src/components/hooks/useSessionStream.ts +++ b/src/components/hooks/useSessionStream.ts @@ -1,18 +1,61 @@ -import { useEffect } from 'react'; +import { useEffect, useRef } from 'react'; import { useQueryClient } from '@tanstack/react-query'; -export function useSessionStream(websiteId: string) { +const MAX_RETRY_DELAY = 30000; +const INITIAL_RETRY_DELAY = 1000; + +export function useSessionStream(websiteId?: string) { const queryClient = useQueryClient(); + const retryCountRef = useRef(0); + const retryTimeoutRef = useRef | null>(null); useEffect(() => { if (!websiteId) return; - const eventSource = new EventSource(`/api/websites/${websiteId}/sessions/stream`); + let eventSource: EventSource | null = null; + let isMounted = true; - eventSource.onmessage = () => { - queryClient.invalidateQueries({ queryKey: ['sessions'] }); + const connect = () => { + if (!isMounted) return; + + eventSource = new EventSource(`/api/websites/${websiteId}/sessions/stream`); + + eventSource.onmessage = event => { + try { + const data = JSON.parse(event.data); + if (data.sessionId) { + retryCountRef.current = 0; + queryClient.invalidateQueries({ queryKey: ['sessions', { websiteId }] }); + } + } catch (error) { + // eslint-disable-next-line no-console + console.error('Failed to parse session event:', error); + } + }; + + eventSource.onerror = () => { + eventSource?.close(); + + if (!isMounted) return; + + const delay = Math.min( + INITIAL_RETRY_DELAY * Math.pow(2, retryCountRef.current), + MAX_RETRY_DELAY, + ); + retryCountRef.current += 1; + + retryTimeoutRef.current = setTimeout(connect, delay); + }; }; - return () => eventSource.close(); + connect(); + + return () => { + isMounted = false; + if (retryTimeoutRef.current) { + clearTimeout(retryTimeoutRef.current); + } + eventSource?.close(); + }; }, [websiteId, queryClient]); } diff --git a/src/lib/session-events.ts b/src/lib/session-events.ts index 9b805cf4..8c27e7ef 100644 --- a/src/lib/session-events.ts +++ b/src/lib/session-events.ts @@ -1,18 +1,53 @@ +import redis from './redis'; + type SessionListener = (websiteId: string, sessionId: string) => void; const listeners = new Map>(); +const REDIS_CHANNEL = 'umami:session:created'; export function subscribeToSessions(websiteId: string, callback: SessionListener) { if (!listeners.has(websiteId)) { listeners.set(websiteId, new Set()); } - listeners.get(websiteId).add(callback); + + const websiteListeners = listeners.get(websiteId)!; + websiteListeners.add(callback); return () => { - listeners.get(websiteId)?.delete(callback); + websiteListeners.delete(callback); + if (websiteListeners.size === 0) { + listeners.delete(websiteId); + } }; } -export function emitSessionCreated(websiteId: string, sessionId: string) { +export async function emitSessionCreated(websiteId: string, sessionId: string) { + const message = JSON.stringify({ websiteId, sessionId }); + + if (redis.enabled) { + await redis.client.publish(REDIS_CHANNEL, message); + } + listeners.get(websiteId)?.forEach(cb => cb(websiteId, sessionId)); } + +let redisSubscriber: any = null; + +export async function initializeRedisSubscriber() { + if (!redis.enabled || redisSubscriber) { + return; + } + + redisSubscriber = redis.client.duplicate(); + await redisSubscriber.connect(); + + await redisSubscriber.subscribe(REDIS_CHANNEL, (message: string) => { + try { + const { websiteId, sessionId } = JSON.parse(message); + listeners.get(websiteId)?.forEach(cb => cb(websiteId, sessionId)); + } catch (error) { + // eslint-disable-next-line no-console + console.error('Failed to parse session event:', error); + } + }); +}