Add authentication, Redis pub/sub, and error handling to SSE

Improvements:
- Add Redis pub/sub support for multi-server deployments
- Add authentication check to SSE stream endpoint
- Add 30s heartbeat keepalive for long-lived connections
- Implement exponential backoff reconnection logic in client
- Fix TypeScript types (websiteId optional, timer types)
- Use specific query key invalidation instead of broad match
- Fix undefined access in session-events listener map
This commit is contained in:
Arthur Sepiol 2025-12-10 16:06:18 +03:00
parent ef9a382cdd
commit 5874cf80f5
4 changed files with 114 additions and 11 deletions

View file

@ -152,7 +152,7 @@ export async function POST(request: Request) {
distinctId: id, distinctId: id,
createdAt, createdAt,
}); });
emitSessionCreated(sourceId, sessionId); await emitSessionCreated(sourceId, sessionId);
} }
// Visit info // Visit info

View file

@ -1,20 +1,45 @@
import { subscribeToSessions } from '@/lib/session-events'; import { initializeRedisSubscriber, subscribeToSessions } from '@/lib/session-events';
import { parseRequest } from '@/lib/request';
import { unauthorized } from '@/lib/response';
import { canViewWebsite } from '@/permissions';
const HEARTBEAT_INTERVAL = 30000;
export async function GET( export async function GET(
request: Request, request: Request,
{ params }: { params: Promise<{ websiteId: string }> }, { params }: { params: Promise<{ websiteId: string }> },
) { ) {
const { auth, error } = await parseRequest(request);
if (error) {
return error();
}
const { websiteId } = await params; const { websiteId } = await params;
if (!(await canViewWebsite(auth, websiteId))) {
return unauthorized();
}
await initializeRedisSubscriber();
const stream = new ReadableStream({ const stream = new ReadableStream({
start(controller) { start(controller) {
const encoder = new TextEncoder(); const encoder = new TextEncoder();
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
const unsubscribe = subscribeToSessions(websiteId, (_, sessionId) => { const unsubscribe = subscribeToSessions(websiteId, (_, sessionId) => {
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ sessionId })}\n\n`)); controller.enqueue(encoder.encode(`data: ${JSON.stringify({ sessionId })}\n\n`));
}); });
heartbeatTimer = setInterval(() => {
controller.enqueue(encoder.encode(': heartbeat\n\n'));
}, HEARTBEAT_INTERVAL);
request.signal.addEventListener('abort', () => { request.signal.addEventListener('abort', () => {
if (heartbeatTimer) {
clearInterval(heartbeatTimer);
}
unsubscribe(); unsubscribe();
controller.close(); controller.close();
}); });

View file

@ -1,18 +1,61 @@
import { useEffect } from 'react'; import { useEffect, useRef } from 'react';
import { useQueryClient } from '@tanstack/react-query'; import { useQueryClient } from '@tanstack/react-query';
export function useSessionStream(websiteId: string) { const MAX_RETRY_DELAY = 30000;
const INITIAL_RETRY_DELAY = 1000;
export function useSessionStream(websiteId?: string) {
const queryClient = useQueryClient(); const queryClient = useQueryClient();
const retryCountRef = useRef(0);
const retryTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
useEffect(() => { useEffect(() => {
if (!websiteId) return; if (!websiteId) return;
const eventSource = new EventSource(`/api/websites/${websiteId}/sessions/stream`); let eventSource: EventSource | null = null;
let isMounted = true;
eventSource.onmessage = () => { const connect = () => {
queryClient.invalidateQueries({ queryKey: ['sessions'] }); if (!isMounted) return;
eventSource = new EventSource(`/api/websites/${websiteId}/sessions/stream`);
eventSource.onmessage = event => {
try {
const data = JSON.parse(event.data);
if (data.sessionId) {
retryCountRef.current = 0;
queryClient.invalidateQueries({ queryKey: ['sessions', { websiteId }] });
}
} catch (error) {
// eslint-disable-next-line no-console
console.error('Failed to parse session event:', error);
}
};
eventSource.onerror = () => {
eventSource?.close();
if (!isMounted) return;
const delay = Math.min(
INITIAL_RETRY_DELAY * Math.pow(2, retryCountRef.current),
MAX_RETRY_DELAY,
);
retryCountRef.current += 1;
retryTimeoutRef.current = setTimeout(connect, delay);
};
}; };
return () => eventSource.close(); connect();
return () => {
isMounted = false;
if (retryTimeoutRef.current) {
clearTimeout(retryTimeoutRef.current);
}
eventSource?.close();
};
}, [websiteId, queryClient]); }, [websiteId, queryClient]);
} }

View file

@ -1,18 +1,53 @@
import redis from './redis';
type SessionListener = (websiteId: string, sessionId: string) => void; type SessionListener = (websiteId: string, sessionId: string) => void;
const listeners = new Map<string, Set<SessionListener>>(); const listeners = new Map<string, Set<SessionListener>>();
const REDIS_CHANNEL = 'umami:session:created';
export function subscribeToSessions(websiteId: string, callback: SessionListener) { export function subscribeToSessions(websiteId: string, callback: SessionListener) {
if (!listeners.has(websiteId)) { if (!listeners.has(websiteId)) {
listeners.set(websiteId, new Set()); listeners.set(websiteId, new Set());
} }
listeners.get(websiteId).add(callback);
const websiteListeners = listeners.get(websiteId)!;
websiteListeners.add(callback);
return () => { return () => {
listeners.get(websiteId)?.delete(callback); websiteListeners.delete(callback);
if (websiteListeners.size === 0) {
listeners.delete(websiteId);
}
}; };
} }
export function emitSessionCreated(websiteId: string, sessionId: string) { export async function emitSessionCreated(websiteId: string, sessionId: string) {
const message = JSON.stringify({ websiteId, sessionId });
if (redis.enabled) {
await redis.client.publish(REDIS_CHANNEL, message);
}
listeners.get(websiteId)?.forEach(cb => cb(websiteId, sessionId)); listeners.get(websiteId)?.forEach(cb => cb(websiteId, sessionId));
} }
let redisSubscriber: any = null;
export async function initializeRedisSubscriber() {
if (!redis.enabled || redisSubscriber) {
return;
}
redisSubscriber = redis.client.duplicate();
await redisSubscriber.connect();
await redisSubscriber.subscribe(REDIS_CHANNEL, (message: string) => {
try {
const { websiteId, sessionId } = JSON.parse(message);
listeners.get(websiteId)?.forEach(cb => cb(websiteId, sessionId));
} catch (error) {
// eslint-disable-next-line no-console
console.error('Failed to parse session event:', error);
}
});
}