Merge branch 'dev' of https://github.com/umami-software/umami into feat/clickhouse-mv

This commit is contained in:
Francis Cao 2024-07-19 11:27:03 -07:00
commit 77fcdc0646
157 changed files with 1780 additions and 1476 deletions

View file

@ -13,7 +13,7 @@ export async function getSessionStats(...args: [websiteId: string, filters: Quer
async function relationalQuery(websiteId: string, filters: QueryFilters) {
const { timezone = 'utc', unit = 'day' } = filters;
const { getDateQuery, parseFilters, rawQuery } = prisma;
const { getDateSQL, parseFilters, rawQuery } = prisma;
const { filterQuery, joinSession, params } = await parseFilters(websiteId, {
...filters,
eventType: EVENT_TYPE.pageView,
@ -22,7 +22,7 @@ async function relationalQuery(websiteId: string, filters: QueryFilters) {
return rawQuery(
`
select
${getDateQuery('website_event.created_at', unit, timezone)} x,
${getDateSQL('website_event.created_at', unit, timezone)} x,
count(distinct website_event.session_id) y
from website_event
${joinSession}
@ -41,7 +41,7 @@ async function clickhouseQuery(
filters: QueryFilters,
): Promise<{ x: string; y: number }[]> {
const { timezone = 'UTC', unit = 'day' } = filters;
const { parseFilters, rawQuery, getDateStringQuery, getDateQuery } = clickhouse;
const { parseFilters, rawQuery, getDateStringSQL, getDateSQL } = clickhouse;
const { filterQuery, params } = await parseFilters(websiteId, {
...filters,
eventType: EVENT_TYPE.pageView,
@ -51,11 +51,11 @@ async function clickhouseQuery(
return rawQuery(
`
select
${getDateStringQuery('g.t', unit)} as x,
${getDateStringSQL('g.t', unit)} as x,
g.y as y
from (
select
${getDateQuery('created_at', unit, timezone)} as t,
${getDateSQL('created_at', unit, timezone)} as t,
uniq(session_id) as y
from ${table} website_event
where website_id = {websiteId:UUID}

View file

@ -1,51 +1,38 @@
import prisma from 'lib/prisma';
import clickhouse from 'lib/clickhouse';
import { runQuery, PRISMA, CLICKHOUSE } from 'lib/db';
import { QueryFilters } from 'lib/types';
import { PageParams, QueryFilters } from 'lib/types';
export async function getSessions(...args: [websiteId: string, filters: QueryFilters]) {
export async function getSessions(
...args: [websiteId: string, filters?: QueryFilters, pageParams?: PageParams]
) {
return runQuery({
[PRISMA]: () => relationalQuery(...args),
[CLICKHOUSE]: () => clickhouseQuery(...args),
});
}
async function relationalQuery(websiteId: string, filters: QueryFilters) {
const { startDate } = filters;
async function relationalQuery(websiteId: string, filters: QueryFilters, pageParams: PageParams) {
const { pagedQuery } = prisma;
return prisma.client.session
.findMany({
where: {
websiteId,
createdAt: {
gte: startDate,
},
},
orderBy: {
createdAt: 'desc',
},
})
.then(a => {
return Object.values(a).map(a => {
return {
...a,
timestamp: new Date(a.createdAt).getTime() / 1000,
};
});
});
const where = {
...filters,
id: websiteId,
};
return pagedQuery('session', { where }, pageParams);
}
async function clickhouseQuery(websiteId: string, filters: QueryFilters) {
const { rawQuery } = clickhouse;
const { startDate } = filters;
async function clickhouseQuery(websiteId: string, filters: QueryFilters, pageParams?: PageParams) {
const { pagedQuery, parseFilters, getDateStringSQL } = clickhouse;
const { params, dateQuery, filterQuery } = await parseFilters(websiteId, filters);
return rawQuery(
return pagedQuery(
`
select
session_id as id,
website_id as websiteId,
created_at as createdAt,
toUnixTimestamp(created_at) as timestamp,
${getDateStringSQL('created_at', 'second', filters.timezone)} as createdAt,
hostname,
browser,
os,
@ -58,12 +45,11 @@ async function clickhouseQuery(websiteId: string, filters: QueryFilters) {
city
from website_event
where website_id = {websiteId:UUID}
and created_at >= {startDate:DateTime64}
${dateQuery}
${filterQuery}
order by created_at desc
`,
{
websiteId,
startDate,
},
params,
pageParams,
);
}

View file

@ -5,6 +5,7 @@ import prisma from 'lib/prisma';
import { DynamicData } from 'lib/types';
import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db';
import kafka from 'lib/kafka';
import clickhouse from 'lib/clickhouse';
export async function saveSessionData(data: {
websiteId: string;
@ -81,6 +82,7 @@ async function clickhouseQuery(data: {
}) {
const { websiteId, sessionId, sessionData, createdAt } = data;
const { insert } = clickhouse;
const { getDateFormat, sendMessages } = kafka;
const jsonKeys = flattenJSON(sessionData);
@ -98,7 +100,11 @@ async function clickhouseQuery(data: {
};
});
await sendMessages(messages, 'session_data');
if (kafka.enabled) {
await sendMessages('session_data', messages);
} else {
await insert('session_data', messages);
}
return data;
}