From ef9a382cddb53ea8a1e37233e98319fa2cd6d705 Mon Sep 17 00:00:00 2001 From: Arthur Sepiol Date: Wed, 10 Dec 2025 16:02:44 +0300 Subject: [PATCH 1/2] Add real-time session updates via Server-Sent Events Implements push-based real-time updates for the Sessions page. New sessions now appear instantly without manual reload or polling. Changes: - Add SSE event emitter for session creation notifications - Create SSE stream endpoint at /api/websites/[id]/sessions/stream - Emit session events in tracking endpoint when sessions are created - Add useSessionStream hook to connect to SSE and invalidate queries - Fix LoadingPanel to prevent flicker during background refetches --- .../sessions/SessionsDataTable.tsx | 3 +- src/app/api/send/route.ts | 2 ++ .../[websiteId]/sessions/stream/route.ts | 31 +++++++++++++++++++ src/components/common/LoadingPanel.tsx | 10 +++--- src/components/hooks/index.ts | 1 + src/components/hooks/useSessionStream.ts | 18 +++++++++++ src/lib/session-events.ts | 18 +++++++++++ 7 files changed, 78 insertions(+), 5 deletions(-) create mode 100644 src/app/api/websites/[websiteId]/sessions/stream/route.ts create mode 100644 src/components/hooks/useSessionStream.ts create mode 100644 src/lib/session-events.ts diff --git a/src/app/(main)/websites/[websiteId]/sessions/SessionsDataTable.tsx b/src/app/(main)/websites/[websiteId]/sessions/SessionsDataTable.tsx index b1b9f658..6519a852 100644 --- a/src/app/(main)/websites/[websiteId]/sessions/SessionsDataTable.tsx +++ b/src/app/(main)/websites/[websiteId]/sessions/SessionsDataTable.tsx @@ -1,8 +1,9 @@ import { DataGrid } from '@/components/common/DataGrid'; -import { useWebsiteSessionsQuery } from '@/components/hooks'; +import { useSessionStream, useWebsiteSessionsQuery } from '@/components/hooks'; import { SessionsTable } from './SessionsTable'; export function SessionsDataTable({ websiteId }: { websiteId?: string; teamId?: string }) { + useSessionStream(websiteId); const queryResult = useWebsiteSessionsQuery(websiteId); return ( diff --git a/src/app/api/send/route.ts b/src/app/api/send/route.ts index a0becc2a..f5f7a0ac 100644 --- a/src/app/api/send/route.ts +++ b/src/app/api/send/route.ts @@ -12,6 +12,7 @@ import { parseRequest } from '@/lib/request'; import { badRequest, forbidden, json, serverError } from '@/lib/response'; import { anyObjectParam, urlOrPathParam } from '@/lib/schema'; import { safeDecodeURI, safeDecodeURIComponent } from '@/lib/url'; +import { emitSessionCreated } from '@/lib/session-events'; import { createSession, saveEvent, saveSessionData } from '@/queries/sql'; interface Cache { @@ -151,6 +152,7 @@ export async function POST(request: Request) { distinctId: id, createdAt, }); + emitSessionCreated(sourceId, sessionId); } // Visit info diff --git a/src/app/api/websites/[websiteId]/sessions/stream/route.ts b/src/app/api/websites/[websiteId]/sessions/stream/route.ts new file mode 100644 index 00000000..ec7b154c --- /dev/null +++ b/src/app/api/websites/[websiteId]/sessions/stream/route.ts @@ -0,0 +1,31 @@ +import { subscribeToSessions } from '@/lib/session-events'; + +export async function GET( + request: Request, + { params }: { params: Promise<{ websiteId: string }> }, +) { + const { websiteId } = await params; + + const stream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + + const unsubscribe = subscribeToSessions(websiteId, (_, sessionId) => { + controller.enqueue(encoder.encode(`data: ${JSON.stringify({ sessionId })}\n\n`)); + }); + + request.signal.addEventListener('abort', () => { + unsubscribe(); + controller.close(); + }); + }, + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }, + }); +} diff --git a/src/components/common/LoadingPanel.tsx b/src/components/common/LoadingPanel.tsx index fb37e140..e16c57a0 100644 --- a/src/components/common/LoadingPanel.tsx +++ b/src/components/common/LoadingPanel.tsx @@ -28,9 +28,11 @@ export function LoadingPanel({ ...props }: LoadingPanelProps): ReactNode { const empty = isEmpty ?? checkEmpty(data); + const hasData = data && !empty; - // Show loading spinner only if no data exists - if (isLoading || isFetching) { + // Show loading only on initial load when no data exists yet + // Don't show loading during background refetches when we already have data + if ((isLoading || isFetching) && !hasData) { return ( @@ -48,8 +50,8 @@ export function LoadingPanel({ return renderEmpty(); } - // Show main content when data exists - if (!isLoading && !isFetching && !error && !empty) { + // Show content when we have data (even during background refetch) + if (hasData) { return children; } diff --git a/src/components/hooks/index.ts b/src/components/hooks/index.ts index e8e5c135..ea284c1e 100644 --- a/src/components/hooks/index.ts +++ b/src/components/hooks/index.ts @@ -79,6 +79,7 @@ export * from './useNavigation'; export * from './usePagedQuery'; export * from './usePageParameters'; export * from './useRegionNames'; +export * from './useSessionStream'; export * from './useSlug'; export * from './useSticky'; export * from './useTimezone'; diff --git a/src/components/hooks/useSessionStream.ts b/src/components/hooks/useSessionStream.ts new file mode 100644 index 00000000..0abc991d --- /dev/null +++ b/src/components/hooks/useSessionStream.ts @@ -0,0 +1,18 @@ +import { useEffect } from 'react'; +import { useQueryClient } from '@tanstack/react-query'; + +export function useSessionStream(websiteId: string) { + const queryClient = useQueryClient(); + + useEffect(() => { + if (!websiteId) return; + + const eventSource = new EventSource(`/api/websites/${websiteId}/sessions/stream`); + + eventSource.onmessage = () => { + queryClient.invalidateQueries({ queryKey: ['sessions'] }); + }; + + return () => eventSource.close(); + }, [websiteId, queryClient]); +} diff --git a/src/lib/session-events.ts b/src/lib/session-events.ts new file mode 100644 index 00000000..9b805cf4 --- /dev/null +++ b/src/lib/session-events.ts @@ -0,0 +1,18 @@ +type SessionListener = (websiteId: string, sessionId: string) => void; + +const listeners = new Map>(); + +export function subscribeToSessions(websiteId: string, callback: SessionListener) { + if (!listeners.has(websiteId)) { + listeners.set(websiteId, new Set()); + } + listeners.get(websiteId).add(callback); + + return () => { + listeners.get(websiteId)?.delete(callback); + }; +} + +export function emitSessionCreated(websiteId: string, sessionId: string) { + listeners.get(websiteId)?.forEach(cb => cb(websiteId, sessionId)); +} From 5874cf80f58dc1844f35d1cc9d3058fccfadd013 Mon Sep 17 00:00:00 2001 From: Arthur Sepiol Date: Wed, 10 Dec 2025 16:06:18 +0300 Subject: [PATCH 2/2] Add authentication, Redis pub/sub, and error handling to SSE Improvements: - Add Redis pub/sub support for multi-server deployments - Add authentication check to SSE stream endpoint - Add 30s heartbeat keepalive for long-lived connections - Implement exponential backoff reconnection logic in client - Fix TypeScript types (websiteId optional, timer types) - Use specific query key invalidation instead of broad match - Fix undefined access in session-events listener map --- src/app/api/send/route.ts | 2 +- .../[websiteId]/sessions/stream/route.ts | 27 ++++++++- src/components/hooks/useSessionStream.ts | 55 +++++++++++++++++-- src/lib/session-events.ts | 41 +++++++++++++- 4 files changed, 114 insertions(+), 11 deletions(-) diff --git a/src/app/api/send/route.ts b/src/app/api/send/route.ts index f5f7a0ac..3117cf23 100644 --- a/src/app/api/send/route.ts +++ b/src/app/api/send/route.ts @@ -152,7 +152,7 @@ export async function POST(request: Request) { distinctId: id, createdAt, }); - emitSessionCreated(sourceId, sessionId); + await emitSessionCreated(sourceId, sessionId); } // Visit info diff --git a/src/app/api/websites/[websiteId]/sessions/stream/route.ts b/src/app/api/websites/[websiteId]/sessions/stream/route.ts index ec7b154c..db78a958 100644 --- a/src/app/api/websites/[websiteId]/sessions/stream/route.ts +++ b/src/app/api/websites/[websiteId]/sessions/stream/route.ts @@ -1,20 +1,45 @@ -import { subscribeToSessions } from '@/lib/session-events'; +import { initializeRedisSubscriber, subscribeToSessions } from '@/lib/session-events'; +import { parseRequest } from '@/lib/request'; +import { unauthorized } from '@/lib/response'; +import { canViewWebsite } from '@/permissions'; + +const HEARTBEAT_INTERVAL = 30000; export async function GET( request: Request, { params }: { params: Promise<{ websiteId: string }> }, ) { + const { auth, error } = await parseRequest(request); + + if (error) { + return error(); + } + const { websiteId } = await params; + if (!(await canViewWebsite(auth, websiteId))) { + return unauthorized(); + } + + await initializeRedisSubscriber(); + const stream = new ReadableStream({ start(controller) { const encoder = new TextEncoder(); + let heartbeatTimer: ReturnType | null = null; const unsubscribe = subscribeToSessions(websiteId, (_, sessionId) => { controller.enqueue(encoder.encode(`data: ${JSON.stringify({ sessionId })}\n\n`)); }); + heartbeatTimer = setInterval(() => { + controller.enqueue(encoder.encode(': heartbeat\n\n')); + }, HEARTBEAT_INTERVAL); + request.signal.addEventListener('abort', () => { + if (heartbeatTimer) { + clearInterval(heartbeatTimer); + } unsubscribe(); controller.close(); }); diff --git a/src/components/hooks/useSessionStream.ts b/src/components/hooks/useSessionStream.ts index 0abc991d..34188e6b 100644 --- a/src/components/hooks/useSessionStream.ts +++ b/src/components/hooks/useSessionStream.ts @@ -1,18 +1,61 @@ -import { useEffect } from 'react'; +import { useEffect, useRef } from 'react'; import { useQueryClient } from '@tanstack/react-query'; -export function useSessionStream(websiteId: string) { +const MAX_RETRY_DELAY = 30000; +const INITIAL_RETRY_DELAY = 1000; + +export function useSessionStream(websiteId?: string) { const queryClient = useQueryClient(); + const retryCountRef = useRef(0); + const retryTimeoutRef = useRef | null>(null); useEffect(() => { if (!websiteId) return; - const eventSource = new EventSource(`/api/websites/${websiteId}/sessions/stream`); + let eventSource: EventSource | null = null; + let isMounted = true; - eventSource.onmessage = () => { - queryClient.invalidateQueries({ queryKey: ['sessions'] }); + const connect = () => { + if (!isMounted) return; + + eventSource = new EventSource(`/api/websites/${websiteId}/sessions/stream`); + + eventSource.onmessage = event => { + try { + const data = JSON.parse(event.data); + if (data.sessionId) { + retryCountRef.current = 0; + queryClient.invalidateQueries({ queryKey: ['sessions', { websiteId }] }); + } + } catch (error) { + // eslint-disable-next-line no-console + console.error('Failed to parse session event:', error); + } + }; + + eventSource.onerror = () => { + eventSource?.close(); + + if (!isMounted) return; + + const delay = Math.min( + INITIAL_RETRY_DELAY * Math.pow(2, retryCountRef.current), + MAX_RETRY_DELAY, + ); + retryCountRef.current += 1; + + retryTimeoutRef.current = setTimeout(connect, delay); + }; }; - return () => eventSource.close(); + connect(); + + return () => { + isMounted = false; + if (retryTimeoutRef.current) { + clearTimeout(retryTimeoutRef.current); + } + eventSource?.close(); + }; }, [websiteId, queryClient]); } diff --git a/src/lib/session-events.ts b/src/lib/session-events.ts index 9b805cf4..8c27e7ef 100644 --- a/src/lib/session-events.ts +++ b/src/lib/session-events.ts @@ -1,18 +1,53 @@ +import redis from './redis'; + type SessionListener = (websiteId: string, sessionId: string) => void; const listeners = new Map>(); +const REDIS_CHANNEL = 'umami:session:created'; export function subscribeToSessions(websiteId: string, callback: SessionListener) { if (!listeners.has(websiteId)) { listeners.set(websiteId, new Set()); } - listeners.get(websiteId).add(callback); + + const websiteListeners = listeners.get(websiteId)!; + websiteListeners.add(callback); return () => { - listeners.get(websiteId)?.delete(callback); + websiteListeners.delete(callback); + if (websiteListeners.size === 0) { + listeners.delete(websiteId); + } }; } -export function emitSessionCreated(websiteId: string, sessionId: string) { +export async function emitSessionCreated(websiteId: string, sessionId: string) { + const message = JSON.stringify({ websiteId, sessionId }); + + if (redis.enabled) { + await redis.client.publish(REDIS_CHANNEL, message); + } + listeners.get(websiteId)?.forEach(cb => cb(websiteId, sessionId)); } + +let redisSubscriber: any = null; + +export async function initializeRedisSubscriber() { + if (!redis.enabled || redisSubscriber) { + return; + } + + redisSubscriber = redis.client.duplicate(); + await redisSubscriber.connect(); + + await redisSubscriber.subscribe(REDIS_CHANNEL, (message: string) => { + try { + const { websiteId, sessionId } = JSON.parse(message); + listeners.get(websiteId)?.forEach(cb => cb(websiteId, sessionId)); + } catch (error) { + // eslint-disable-next-line no-console + console.error('Failed to parse session event:', error); + } + }); +}