diff --git a/src/app/(main)/websites/[websiteId]/sessions/SessionsDataTable.tsx b/src/app/(main)/websites/[websiteId]/sessions/SessionsDataTable.tsx index b1b9f658..6519a852 100644 --- a/src/app/(main)/websites/[websiteId]/sessions/SessionsDataTable.tsx +++ b/src/app/(main)/websites/[websiteId]/sessions/SessionsDataTable.tsx @@ -1,8 +1,9 @@ import { DataGrid } from '@/components/common/DataGrid'; -import { useWebsiteSessionsQuery } from '@/components/hooks'; +import { useSessionStream, useWebsiteSessionsQuery } from '@/components/hooks'; import { SessionsTable } from './SessionsTable'; export function SessionsDataTable({ websiteId }: { websiteId?: string; teamId?: string }) { + useSessionStream(websiteId); const queryResult = useWebsiteSessionsQuery(websiteId); return ( diff --git a/src/app/api/send/route.ts b/src/app/api/send/route.ts index a0becc2a..3117cf23 100644 --- a/src/app/api/send/route.ts +++ b/src/app/api/send/route.ts @@ -12,6 +12,7 @@ import { parseRequest } from '@/lib/request'; import { badRequest, forbidden, json, serverError } from '@/lib/response'; import { anyObjectParam, urlOrPathParam } from '@/lib/schema'; import { safeDecodeURI, safeDecodeURIComponent } from '@/lib/url'; +import { emitSessionCreated } from '@/lib/session-events'; import { createSession, saveEvent, saveSessionData } from '@/queries/sql'; interface Cache { @@ -151,6 +152,7 @@ export async function POST(request: Request) { distinctId: id, createdAt, }); + await emitSessionCreated(sourceId, sessionId); } // Visit info diff --git a/src/app/api/websites/[websiteId]/sessions/stream/route.ts b/src/app/api/websites/[websiteId]/sessions/stream/route.ts new file mode 100644 index 00000000..db78a958 --- /dev/null +++ b/src/app/api/websites/[websiteId]/sessions/stream/route.ts @@ -0,0 +1,56 @@ +import { initializeRedisSubscriber, subscribeToSessions } from '@/lib/session-events'; +import { parseRequest } from '@/lib/request'; +import { unauthorized } from '@/lib/response'; +import { canViewWebsite } from '@/permissions'; + +const HEARTBEAT_INTERVAL = 30000; + +export async function GET( + request: Request, + { params }: { params: Promise<{ websiteId: string }> }, +) { + const { auth, error } = await parseRequest(request); + + if (error) { + return error(); + } + + const { websiteId } = await params; + + if (!(await canViewWebsite(auth, websiteId))) { + return unauthorized(); + } + + await initializeRedisSubscriber(); + + const stream = new ReadableStream({ + start(controller) { + const encoder = new TextEncoder(); + let heartbeatTimer: ReturnType | null = null; + + const unsubscribe = subscribeToSessions(websiteId, (_, sessionId) => { + controller.enqueue(encoder.encode(`data: ${JSON.stringify({ sessionId })}\n\n`)); + }); + + heartbeatTimer = setInterval(() => { + controller.enqueue(encoder.encode(': heartbeat\n\n')); + }, HEARTBEAT_INTERVAL); + + request.signal.addEventListener('abort', () => { + if (heartbeatTimer) { + clearInterval(heartbeatTimer); + } + unsubscribe(); + controller.close(); + }); + }, + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }, + }); +} diff --git a/src/components/common/LoadingPanel.tsx b/src/components/common/LoadingPanel.tsx index fb37e140..e16c57a0 100644 --- a/src/components/common/LoadingPanel.tsx +++ b/src/components/common/LoadingPanel.tsx @@ -28,9 +28,11 @@ export function LoadingPanel({ ...props }: LoadingPanelProps): ReactNode { const empty = isEmpty ?? checkEmpty(data); + const hasData = data && !empty; - // Show loading spinner only if no data exists - if (isLoading || isFetching) { + // Show loading only on initial load when no data exists yet + // Don't show loading during background refetches when we already have data + if ((isLoading || isFetching) && !hasData) { return ( @@ -48,8 +50,8 @@ export function LoadingPanel({ return renderEmpty(); } - // Show main content when data exists - if (!isLoading && !isFetching && !error && !empty) { + // Show content when we have data (even during background refetch) + if (hasData) { return children; } diff --git a/src/components/hooks/index.ts b/src/components/hooks/index.ts index e8e5c135..ea284c1e 100644 --- a/src/components/hooks/index.ts +++ b/src/components/hooks/index.ts @@ -79,6 +79,7 @@ export * from './useNavigation'; export * from './usePagedQuery'; export * from './usePageParameters'; export * from './useRegionNames'; +export * from './useSessionStream'; export * from './useSlug'; export * from './useSticky'; export * from './useTimezone'; diff --git a/src/components/hooks/useSessionStream.ts b/src/components/hooks/useSessionStream.ts new file mode 100644 index 00000000..34188e6b --- /dev/null +++ b/src/components/hooks/useSessionStream.ts @@ -0,0 +1,61 @@ +import { useEffect, useRef } from 'react'; +import { useQueryClient } from '@tanstack/react-query'; + +const MAX_RETRY_DELAY = 30000; +const INITIAL_RETRY_DELAY = 1000; + +export function useSessionStream(websiteId?: string) { + const queryClient = useQueryClient(); + const retryCountRef = useRef(0); + const retryTimeoutRef = useRef | null>(null); + + useEffect(() => { + if (!websiteId) return; + + let eventSource: EventSource | null = null; + let isMounted = true; + + const connect = () => { + if (!isMounted) return; + + eventSource = new EventSource(`/api/websites/${websiteId}/sessions/stream`); + + eventSource.onmessage = event => { + try { + const data = JSON.parse(event.data); + if (data.sessionId) { + retryCountRef.current = 0; + queryClient.invalidateQueries({ queryKey: ['sessions', { websiteId }] }); + } + } catch (error) { + // eslint-disable-next-line no-console + console.error('Failed to parse session event:', error); + } + }; + + eventSource.onerror = () => { + eventSource?.close(); + + if (!isMounted) return; + + const delay = Math.min( + INITIAL_RETRY_DELAY * Math.pow(2, retryCountRef.current), + MAX_RETRY_DELAY, + ); + retryCountRef.current += 1; + + retryTimeoutRef.current = setTimeout(connect, delay); + }; + }; + + connect(); + + return () => { + isMounted = false; + if (retryTimeoutRef.current) { + clearTimeout(retryTimeoutRef.current); + } + eventSource?.close(); + }; + }, [websiteId, queryClient]); +} diff --git a/src/lib/session-events.ts b/src/lib/session-events.ts new file mode 100644 index 00000000..8c27e7ef --- /dev/null +++ b/src/lib/session-events.ts @@ -0,0 +1,53 @@ +import redis from './redis'; + +type SessionListener = (websiteId: string, sessionId: string) => void; + +const listeners = new Map>(); +const REDIS_CHANNEL = 'umami:session:created'; + +export function subscribeToSessions(websiteId: string, callback: SessionListener) { + if (!listeners.has(websiteId)) { + listeners.set(websiteId, new Set()); + } + + const websiteListeners = listeners.get(websiteId)!; + websiteListeners.add(callback); + + return () => { + websiteListeners.delete(callback); + if (websiteListeners.size === 0) { + listeners.delete(websiteId); + } + }; +} + +export async function emitSessionCreated(websiteId: string, sessionId: string) { + const message = JSON.stringify({ websiteId, sessionId }); + + if (redis.enabled) { + await redis.client.publish(REDIS_CHANNEL, message); + } + + listeners.get(websiteId)?.forEach(cb => cb(websiteId, sessionId)); +} + +let redisSubscriber: any = null; + +export async function initializeRedisSubscriber() { + if (!redis.enabled || redisSubscriber) { + return; + } + + redisSubscriber = redis.client.duplicate(); + await redisSubscriber.connect(); + + await redisSubscriber.subscribe(REDIS_CHANNEL, (message: string) => { + try { + const { websiteId, sessionId } = JSON.parse(message); + listeners.get(websiteId)?.forEach(cb => cb(websiteId, sessionId)); + } catch (error) { + // eslint-disable-next-line no-console + console.error('Failed to parse session event:', error); + } + }); +}