mirror of
https://github.com/umami-software/umami.git
synced 2026-02-04 04:37:11 +01:00
Merge 5874cf80f5 into 860e6390f1
This commit is contained in:
commit
f9e59d41c2
7 changed files with 181 additions and 5 deletions
|
|
@ -1,8 +1,9 @@
|
|||
import { DataGrid } from '@/components/common/DataGrid';
|
||||
import { useWebsiteSessionsQuery } from '@/components/hooks';
|
||||
import { useSessionStream, useWebsiteSessionsQuery } from '@/components/hooks';
|
||||
import { SessionsTable } from './SessionsTable';
|
||||
|
||||
export function SessionsDataTable({ websiteId }: { websiteId?: string; teamId?: string }) {
|
||||
useSessionStream(websiteId);
|
||||
const queryResult = useWebsiteSessionsQuery(websiteId);
|
||||
|
||||
return (
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import { parseRequest } from '@/lib/request';
|
|||
import { badRequest, forbidden, json, serverError } from '@/lib/response';
|
||||
import { anyObjectParam, urlOrPathParam } from '@/lib/schema';
|
||||
import { safeDecodeURI, safeDecodeURIComponent } from '@/lib/url';
|
||||
import { emitSessionCreated } from '@/lib/session-events';
|
||||
import { createSession, saveEvent, saveSessionData } from '@/queries/sql';
|
||||
|
||||
interface Cache {
|
||||
|
|
@ -151,6 +152,7 @@ export async function POST(request: Request) {
|
|||
distinctId: id,
|
||||
createdAt,
|
||||
});
|
||||
await emitSessionCreated(sourceId, sessionId);
|
||||
}
|
||||
|
||||
// Visit info
|
||||
|
|
|
|||
56
src/app/api/websites/[websiteId]/sessions/stream/route.ts
Normal file
56
src/app/api/websites/[websiteId]/sessions/stream/route.ts
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
import { initializeRedisSubscriber, subscribeToSessions } from '@/lib/session-events';
|
||||
import { parseRequest } from '@/lib/request';
|
||||
import { unauthorized } from '@/lib/response';
|
||||
import { canViewWebsite } from '@/permissions';
|
||||
|
||||
const HEARTBEAT_INTERVAL = 30000;
|
||||
|
||||
export async function GET(
|
||||
request: Request,
|
||||
{ params }: { params: Promise<{ websiteId: string }> },
|
||||
) {
|
||||
const { auth, error } = await parseRequest(request);
|
||||
|
||||
if (error) {
|
||||
return error();
|
||||
}
|
||||
|
||||
const { websiteId } = await params;
|
||||
|
||||
if (!(await canViewWebsite(auth, websiteId))) {
|
||||
return unauthorized();
|
||||
}
|
||||
|
||||
await initializeRedisSubscriber();
|
||||
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
const encoder = new TextEncoder();
|
||||
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
const unsubscribe = subscribeToSessions(websiteId, (_, sessionId) => {
|
||||
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ sessionId })}\n\n`));
|
||||
});
|
||||
|
||||
heartbeatTimer = setInterval(() => {
|
||||
controller.enqueue(encoder.encode(': heartbeat\n\n'));
|
||||
}, HEARTBEAT_INTERVAL);
|
||||
|
||||
request.signal.addEventListener('abort', () => {
|
||||
if (heartbeatTimer) {
|
||||
clearInterval(heartbeatTimer);
|
||||
}
|
||||
unsubscribe();
|
||||
controller.close();
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
},
|
||||
});
|
||||
}
|
||||
|
|
@ -28,9 +28,11 @@ export function LoadingPanel({
|
|||
...props
|
||||
}: LoadingPanelProps): ReactNode {
|
||||
const empty = isEmpty ?? checkEmpty(data);
|
||||
const hasData = data && !empty;
|
||||
|
||||
// Show loading spinner only if no data exists
|
||||
if (isLoading || isFetching) {
|
||||
// Show loading only on initial load when no data exists yet
|
||||
// Don't show loading during background refetches when we already have data
|
||||
if ((isLoading || isFetching) && !hasData) {
|
||||
return (
|
||||
<Column position="relative" height="100%" width="100%" {...props}>
|
||||
<Loading icon={loadingIcon} placement={loadingPlacement} />
|
||||
|
|
@ -48,8 +50,8 @@ export function LoadingPanel({
|
|||
return renderEmpty();
|
||||
}
|
||||
|
||||
// Show main content when data exists
|
||||
if (!isLoading && !isFetching && !error && !empty) {
|
||||
// Show content when we have data (even during background refetch)
|
||||
if (hasData) {
|
||||
return children;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -79,6 +79,7 @@ export * from './useNavigation';
|
|||
export * from './usePagedQuery';
|
||||
export * from './usePageParameters';
|
||||
export * from './useRegionNames';
|
||||
export * from './useSessionStream';
|
||||
export * from './useSlug';
|
||||
export * from './useSticky';
|
||||
export * from './useTimezone';
|
||||
|
|
|
|||
61
src/components/hooks/useSessionStream.ts
Normal file
61
src/components/hooks/useSessionStream.ts
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
import { useEffect, useRef } from 'react';
|
||||
import { useQueryClient } from '@tanstack/react-query';
|
||||
|
||||
const MAX_RETRY_DELAY = 30000;
|
||||
const INITIAL_RETRY_DELAY = 1000;
|
||||
|
||||
export function useSessionStream(websiteId?: string) {
|
||||
const queryClient = useQueryClient();
|
||||
const retryCountRef = useRef(0);
|
||||
const retryTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
|
||||
|
||||
useEffect(() => {
|
||||
if (!websiteId) return;
|
||||
|
||||
let eventSource: EventSource | null = null;
|
||||
let isMounted = true;
|
||||
|
||||
const connect = () => {
|
||||
if (!isMounted) return;
|
||||
|
||||
eventSource = new EventSource(`/api/websites/${websiteId}/sessions/stream`);
|
||||
|
||||
eventSource.onmessage = event => {
|
||||
try {
|
||||
const data = JSON.parse(event.data);
|
||||
if (data.sessionId) {
|
||||
retryCountRef.current = 0;
|
||||
queryClient.invalidateQueries({ queryKey: ['sessions', { websiteId }] });
|
||||
}
|
||||
} catch (error) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.error('Failed to parse session event:', error);
|
||||
}
|
||||
};
|
||||
|
||||
eventSource.onerror = () => {
|
||||
eventSource?.close();
|
||||
|
||||
if (!isMounted) return;
|
||||
|
||||
const delay = Math.min(
|
||||
INITIAL_RETRY_DELAY * Math.pow(2, retryCountRef.current),
|
||||
MAX_RETRY_DELAY,
|
||||
);
|
||||
retryCountRef.current += 1;
|
||||
|
||||
retryTimeoutRef.current = setTimeout(connect, delay);
|
||||
};
|
||||
};
|
||||
|
||||
connect();
|
||||
|
||||
return () => {
|
||||
isMounted = false;
|
||||
if (retryTimeoutRef.current) {
|
||||
clearTimeout(retryTimeoutRef.current);
|
||||
}
|
||||
eventSource?.close();
|
||||
};
|
||||
}, [websiteId, queryClient]);
|
||||
}
|
||||
53
src/lib/session-events.ts
Normal file
53
src/lib/session-events.ts
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
import redis from './redis';
|
||||
|
||||
type SessionListener = (websiteId: string, sessionId: string) => void;
|
||||
|
||||
const listeners = new Map<string, Set<SessionListener>>();
|
||||
const REDIS_CHANNEL = 'umami:session:created';
|
||||
|
||||
export function subscribeToSessions(websiteId: string, callback: SessionListener) {
|
||||
if (!listeners.has(websiteId)) {
|
||||
listeners.set(websiteId, new Set());
|
||||
}
|
||||
|
||||
const websiteListeners = listeners.get(websiteId)!;
|
||||
websiteListeners.add(callback);
|
||||
|
||||
return () => {
|
||||
websiteListeners.delete(callback);
|
||||
if (websiteListeners.size === 0) {
|
||||
listeners.delete(websiteId);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export async function emitSessionCreated(websiteId: string, sessionId: string) {
|
||||
const message = JSON.stringify({ websiteId, sessionId });
|
||||
|
||||
if (redis.enabled) {
|
||||
await redis.client.publish(REDIS_CHANNEL, message);
|
||||
}
|
||||
|
||||
listeners.get(websiteId)?.forEach(cb => cb(websiteId, sessionId));
|
||||
}
|
||||
|
||||
let redisSubscriber: any = null;
|
||||
|
||||
export async function initializeRedisSubscriber() {
|
||||
if (!redis.enabled || redisSubscriber) {
|
||||
return;
|
||||
}
|
||||
|
||||
redisSubscriber = redis.client.duplicate();
|
||||
await redisSubscriber.connect();
|
||||
|
||||
await redisSubscriber.subscribe(REDIS_CHANNEL, (message: string) => {
|
||||
try {
|
||||
const { websiteId, sessionId } = JSON.parse(message);
|
||||
listeners.get(websiteId)?.forEach(cb => cb(websiteId, sessionId));
|
||||
} catch (error) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.error('Failed to parse session event:', error);
|
||||
}
|
||||
});
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue