Renamed folder.

This commit is contained in:
Mike Cao 2025-02-05 20:31:48 -08:00
parent 8525188e42
commit dcf0da7b14
39 changed files with 0 additions and 0 deletions

View file

@ -0,0 +1,107 @@
import prisma from '@/lib/prisma';
import clickhouse from '@/lib/clickhouse';
import { CLICKHOUSE, PRISMA, runQuery } from '@/lib/db';
import { QueryFilters, WebsiteEventData } from '@/lib/types';
export async function getEventDataEvents(
...args: [websiteId: string, filters: QueryFilters]
): Promise<WebsiteEventData[]> {
return runQuery({
[PRISMA]: () => relationalQuery(...args),
[CLICKHOUSE]: () => clickhouseQuery(...args),
});
}
async function relationalQuery(websiteId: string, filters: QueryFilters) {
const { rawQuery, parseFilters } = prisma;
const { event } = filters;
const { params } = await parseFilters(websiteId, filters);
if (event) {
return rawQuery(
`
select
website_event.event_name as "eventName",
event_data.data_key as "propertyName",
event_data.data_type as "dataType",
event_data.string_value as "propertyValue",
count(*) as "total"
from event_data
inner join website_event
on website_event.event_id = event_data.website_event_id
where event_data.website_id = {{websiteId::uuid}}
and event_data.created_at between {{startDate}} and {{endDate}}
and website_event.event_name = {{event}}
group by website_event.event_name, event_data.data_key, event_data.data_type, event_data.string_value
order by 1 asc, 2 asc, 3 asc, 5 desc
`,
params,
);
}
return rawQuery(
`
select
website_event.event_name as "eventName",
event_data.data_key as "propertyName",
event_data.data_type as "dataType",
count(*) as "total"
from event_data
inner join website_event
on website_event.event_id = event_data.website_event_id
where event_data.website_id = {{websiteId::uuid}}
and event_data.created_at between {{startDate}} and {{endDate}}
group by website_event.event_name, event_data.data_key, event_data.data_type
order by 1 asc, 2 asc
limit 500
`,
params,
);
}
async function clickhouseQuery(
websiteId: string,
filters: QueryFilters,
): Promise<{ eventName: string; propertyName: string; dataType: number; total: number }[]> {
const { rawQuery, parseFilters } = clickhouse;
const { event } = filters;
const { params } = await parseFilters(websiteId, filters);
if (event) {
return rawQuery(
`
select
event_name as eventName,
data_key as propertyName,
data_type as dataType,
string_value as propertyValue,
count(*) as total
from event_data
where website_id = {websiteId:UUID}
and created_at between {startDate:DateTime64} and {endDate:DateTime64}
and event_name = {event:String}
group by data_key, data_type, string_value, event_name
order by 1 asc, 2 asc, 3 asc, 5 desc
limit 500
`,
params,
);
}
return rawQuery(
`
select
event_name as eventName,
data_key as propertyName,
data_type as dataType,
count(*) as total
from event_data
where website_id = {websiteId:UUID}
and created_at between {startDate:DateTime64} and {endDate:DateTime64}
group by data_key, data_type, event_name
order by 1 asc, 2 asc
limit 500
`,
params,
);
}

View file

@ -0,0 +1,69 @@
import prisma from '@/lib/prisma';
import clickhouse from '@/lib/clickhouse';
import { CLICKHOUSE, PRISMA, runQuery } from '@/lib/db';
import { QueryFilters, WebsiteEventData } from '@/lib/types';
export async function getEventDataFields(
...args: [websiteId: string, filters: QueryFilters]
): Promise<WebsiteEventData[]> {
return runQuery({
[PRISMA]: () => relationalQuery(...args),
[CLICKHOUSE]: () => clickhouseQuery(...args),
});
}
async function relationalQuery(websiteId: string, filters: QueryFilters) {
const { rawQuery, parseFilters, getDateSQL } = prisma;
const { filterQuery, params } = await parseFilters(websiteId, filters);
return rawQuery(
`
select
data_key as "propertyName",
data_type as "dataType",
case
when data_type = 2 then replace(string_value, '.0000', '')
when data_type = 4 then ${getDateSQL('date_value', 'hour')}
else string_value
end as "value",
count(*) as "total"
from event_data
join website_event on website_event.event_id = event_data.website_event_id
where event_data.website_id = {{websiteId::uuid}}
and event_data.created_at between {{startDate}} and {{endDate}}
${filterQuery}
group by data_key, data_type, value
order by 2 desc
limit 100
`,
params,
);
}
async function clickhouseQuery(
websiteId: string,
filters: QueryFilters,
): Promise<{ propertyName: string; dataType: number; propertyValue: string; total: number }[]> {
const { rawQuery, parseFilters } = clickhouse;
const { filterQuery, params } = await parseFilters(websiteId, filters);
return rawQuery(
`
select
data_key as propertyName,
data_type as dataType,
multiIf(data_type = 2, replaceAll(string_value, '.0000', ''),
data_type = 4, toString(date_trunc('hour', date_value)),
string_value) as "value",
count(*) as "total"
from event_data
where website_id = {websiteId:UUID}
and created_at between {startDate:DateTime64} and {endDate:DateTime64}
${filterQuery}
group by data_key, data_type, value
order by 2 desc
limit 100
`,
params,
);
}

View file

@ -0,0 +1,68 @@
import prisma from '@/lib/prisma';
import clickhouse from '@/lib/clickhouse';
import { CLICKHOUSE, PRISMA, runQuery } from '@/lib/db';
import { QueryFilters, WebsiteEventData } from '@/lib/types';
export async function getEventDataProperties(
...args: [websiteId: string, filters: QueryFilters & { propertyName?: string }]
): Promise<WebsiteEventData[]> {
return runQuery({
[PRISMA]: () => relationalQuery(...args),
[CLICKHOUSE]: () => clickhouseQuery(...args),
});
}
async function relationalQuery(
websiteId: string,
filters: QueryFilters & { propertyName?: string },
) {
const { rawQuery, parseFilters } = prisma;
const { filterQuery, params } = await parseFilters(websiteId, filters, {
columns: { propertyName: 'data_key' },
});
return rawQuery(
`
select
website_event.event_name as "eventName",
event_data.data_key as "propertyName",
count(*) as "total"
from event_data
join website_event on website_event.event_id = event_data.website_event_id
where event_data.website_id = {{websiteId::uuid}}
and event_data.created_at between {{startDate}} and {{endDate}}
${filterQuery}
group by website_event.event_name, event_data.data_key
order by 3 desc
limit 500
`,
params,
);
}
async function clickhouseQuery(
websiteId: string,
filters: QueryFilters & { propertyName?: string },
): Promise<{ eventName: string; propertyName: string; total: number }[]> {
const { rawQuery, parseFilters } = clickhouse;
const { filterQuery, params } = await parseFilters(websiteId, filters, {
columns: { propertyName: 'data_key' },
});
return rawQuery(
`
select
event_name as eventName,
data_key as propertyName,
count(*) as total
from event_data
where website_id = {websiteId:UUID}
and created_at between {startDate:DateTime64} and {endDate:DateTime64}
${filterQuery}
group by event_name, data_key
order by 1, 3 desc
limit 500
`,
params,
);
}

View file

@ -0,0 +1,72 @@
import prisma from '@/lib/prisma';
import clickhouse from '@/lib/clickhouse';
import { CLICKHOUSE, PRISMA, runQuery } from '@/lib/db';
import { QueryFilters } from '@/lib/types';
export async function getEventDataStats(
...args: [websiteId: string, filters: QueryFilters]
): Promise<{
events: number;
properties: number;
records: number;
}> {
return runQuery({
[PRISMA]: () => relationalQuery(...args),
[CLICKHOUSE]: () => clickhouseQuery(...args),
}).then(results => results?.[0]);
}
async function relationalQuery(websiteId: string, filters: QueryFilters) {
const { rawQuery, parseFilters } = prisma;
const { filterQuery, params } = await parseFilters(websiteId, filters);
return rawQuery(
`
select
count(distinct t.website_event_id) as "events",
count(distinct t.data_key) as "properties",
sum(t.total) as "records"
from (
select
website_event_id,
data_key,
count(*) as "total"
from event_data
where website_id = {{websiteId::uuid}}
and created_at between {{startDate}} and {{endDate}}
${filterQuery}
group by website_event_id, data_key
) as t
`,
params,
);
}
async function clickhouseQuery(
websiteId: string,
filters: QueryFilters,
): Promise<{ events: number; properties: number; records: number }[]> {
const { rawQuery, parseFilters } = clickhouse;
const { filterQuery, params } = await parseFilters(websiteId, filters);
return rawQuery(
`
select
count(distinct t.event_id) as "events",
count(distinct t.data_key) as "properties",
sum(t.total) as "records"
from (
select
event_id,
data_key,
count(*) as "total"
from event_data
where website_id = {websiteId:UUID}
and created_at between {startDate:DateTime64} and {endDate:DateTime64}
${filterQuery}
group by event_id, data_key
) as t
`,
params,
);
}

View file

@ -0,0 +1,34 @@
import clickhouse from '@/lib/clickhouse';
import { CLICKHOUSE, PRISMA, runQuery, notImplemented } from '@/lib/db';
export function getEventDataUsage(...args: [websiteIds: string[], startDate: Date, endDate: Date]) {
return runQuery({
[PRISMA]: notImplemented,
[CLICKHOUSE]: () => clickhouseQuery(...args),
});
}
function clickhouseQuery(
websiteIds: string[],
startDate: Date,
endDate: Date,
): Promise<{ websiteId: string; count: number }[]> {
const { rawQuery } = clickhouse;
return rawQuery(
`
select
website_id as websiteId,
count(*) as count
from event_data
where created_at between {startDate:DateTime64} and {endDate:DateTime64}
and website_id in {websiteIds:Array(UUID)}
group by website_id
`,
{
websiteIds,
startDate,
endDate,
},
);
}

View file

@ -0,0 +1,75 @@
import prisma from '@/lib/prisma';
import clickhouse from '@/lib/clickhouse';
import { CLICKHOUSE, PRISMA, runQuery } from '@/lib/db';
import { QueryFilters, WebsiteEventData } from '@/lib/types';
export async function getEventDataValues(
...args: [
websiteId: string,
filters: QueryFilters & { eventName?: string; propertyName?: string },
]
): Promise<WebsiteEventData[]> {
return runQuery({
[PRISMA]: () => relationalQuery(...args),
[CLICKHOUSE]: () => clickhouseQuery(...args),
});
}
async function relationalQuery(
websiteId: string,
filters: QueryFilters & { eventName?: string; propertyName?: string },
) {
const { rawQuery, parseFilters, getDateSQL } = prisma;
const { filterQuery, params } = await parseFilters(websiteId, filters);
return rawQuery(
`
select
case
when data_type = 2 then replace(string_value, '.0000', '')
when data_type = 4 then ${getDateSQL('date_value', 'hour')}
else string_value
end as "value",
count(*) as "total"
from event_data
join website_event on website_event.event_id = event_data.website_event_id
where event_data.website_id = {{websiteId::uuid}}
and event_data.created_at between {{startDate}} and {{endDate}}
and event_data.data_key = {{propertyName}}
and website_event.event_name = {{eventName}}
${filterQuery}
group by value
order by 2 desc
limit 100
`,
params,
);
}
async function clickhouseQuery(
websiteId: string,
filters: QueryFilters & { eventName?: string; propertyName?: string },
): Promise<{ value: string; total: number }[]> {
const { rawQuery, parseFilters } = clickhouse;
const { filterQuery, params } = await parseFilters(websiteId, filters);
return rawQuery(
`
select
multiIf(data_type = 2, replaceAll(string_value, '.0000', ''),
data_type = 4, toString(date_trunc('hour', date_value)),
string_value) as "value",
count(*) as "total"
from event_data
where website_id = {websiteId:UUID}
and created_at between {startDate:DateTime64} and {endDate:DateTime64}
and data_key = {propertyName:String}
and event_name = {eventName:String}
${filterQuery}
group by value
order by 2 desc
limit 100
`,
params,
);
}

View file

@ -0,0 +1,94 @@
import clickhouse from '@/lib/clickhouse';
import { EVENT_TYPE } from '@/lib/constants';
import { CLICKHOUSE, PRISMA, runQuery } from '@/lib/db';
import prisma from '@/lib/prisma';
import { QueryFilters, WebsiteEventMetric } from '@/lib/types';
export async function getEventMetrics(
...args: [websiteId: string, filters: QueryFilters]
): Promise<WebsiteEventMetric[]> {
return runQuery({
[PRISMA]: () => relationalQuery(...args),
[CLICKHOUSE]: () => clickhouseQuery(...args),
});
}
async function relationalQuery(websiteId: string, filters: QueryFilters) {
const { timezone = 'utc', unit = 'day' } = filters;
const { rawQuery, getDateSQL, parseFilters } = prisma;
const { filterQuery, joinSession, params } = await parseFilters(websiteId, {
...filters,
eventType: EVENT_TYPE.customEvent,
});
return rawQuery(
`
select
event_name x,
${getDateSQL('website_event.created_at', unit, timezone)} t,
count(*) y
from website_event
${joinSession}
where website_event.website_id = {{websiteId::uuid}}
and website_event.created_at between {{startDate}} and {{endDate}}
and event_type = {{eventType}}
${filterQuery}
group by 1, 2
order by 2
`,
params,
);
}
async function clickhouseQuery(
websiteId: string,
filters: QueryFilters,
): Promise<{ x: string; t: string; y: number }[]> {
const { timezone = 'UTC', unit = 'day' } = filters;
const { rawQuery, getDateSQL, parseFilters } = clickhouse;
const { filterQuery, params } = await parseFilters(websiteId, {
...filters,
eventType: EVENT_TYPE.customEvent,
});
let sql = '';
if (filterQuery) {
sql = `
select
event_name x,
${getDateSQL('created_at', unit, timezone)} t,
count(*) y
from website_event
where website_id = {websiteId:UUID}
and created_at between {startDate:DateTime64} and {endDate:DateTime64}
and event_type = {eventType:UInt32}
${filterQuery}
group by x, t
order by t
`;
} else {
sql = `
select
event_name x,
${getDateSQL('created_at', unit, timezone)} t,
count(*) y
from (
select arrayJoin(event_name) as event_name,
created_at
from website_event_stats_hourly website_event
where website_id = {websiteId:UUID}
and created_at between {startDate:DateTime64} and {endDate:DateTime64}
and event_type = {eventType:UInt32}
) as g
group by x, t
order by t
`;
}
return rawQuery(sql, params).then(a => {
return Object.values(a).map(a => {
return { x: a.x, t: a.t, y: Number(a.y) };
});
});
}

View file

@ -0,0 +1,38 @@
import clickhouse from '@/lib/clickhouse';
import { CLICKHOUSE, PRISMA, runQuery, notImplemented } from '@/lib/db';
export function getEventUsage(...args: [websiteIds: string[], startDate: Date, endDate: Date]) {
return runQuery({
[PRISMA]: notImplemented,
[CLICKHOUSE]: () => clickhouseQuery(...args),
});
}
function clickhouseQuery(
websiteIds: string[],
startDate: Date,
endDate: Date,
): Promise<{ websiteId: string; count: number }[]> {
const { rawQuery } = clickhouse;
return rawQuery(
`
select
website_id as websiteId,
count(*) as count
from website_event
where website_id in {websiteIds:Array(UUID)}
and created_at between {startDate:DateTime64} and {endDate:DateTime64}
group by website_id
`,
{
websiteIds,
startDate,
endDate,
},
).then(a => {
return Object.values(a).map(a => {
return { websiteId: a.websiteId, count: Number(a.count) };
});
});
}

View file

@ -0,0 +1,98 @@
import clickhouse from '@/lib/clickhouse';
import { CLICKHOUSE, getDatabaseType, POSTGRESQL, PRISMA, runQuery } from '@/lib/db';
import prisma from '@/lib/prisma';
import { PageParams, QueryFilters } from '@/lib/types';
export function getWebsiteEvents(
...args: [websiteId: string, filters: QueryFilters, pageParams?: PageParams]
) {
return runQuery({
[PRISMA]: () => relationalQuery(...args),
[CLICKHOUSE]: () => clickhouseQuery(...args),
});
}
async function relationalQuery(websiteId: string, filters: QueryFilters, pageParams?: PageParams) {
const { pagedRawQuery, parseFilters } = prisma;
const { query } = pageParams;
const { filterQuery, params } = await parseFilters(websiteId, {
...filters,
});
const db = getDatabaseType();
const like = db === POSTGRESQL ? 'ilike' : 'like';
return pagedRawQuery(
`
with events as (
select
event_id as "id",
website_id as "websiteId",
session_id as "sessionId",
created_at as "createdAt",
url_path as "urlPath",
url_query as "urlQuery",
referrer_path as "referrerPath",
referrer_query as "referrerQuery",
referrer_domain as "referrerDomain",
page_title as "pageTitle",
event_type as "eventType",
event_name as "eventName"
from website_event
where website_id = {{websiteId::uuid}}
and created_at between {{startDate}} and {{endDate}}
${filterQuery}
${
query
? `and ((event_name ${like} {{query}} and event_type = 2)
or (url_path ${like} {{query}} and event_type = 1))`
: ''
}
order by created_at desc
limit 1000)
select * from events
`,
{ ...params, query: `%${query}%` },
pageParams,
);
}
async function clickhouseQuery(websiteId: string, filters: QueryFilters, pageParams?: PageParams) {
const { pagedQuery, parseFilters } = clickhouse;
const { params, dateQuery, filterQuery } = await parseFilters(websiteId, filters);
const { query } = pageParams;
return pagedQuery(
`
with events as (
select
event_id as id,
website_id as websiteId,
session_id as sessionId,
created_at as createdAt,
url_path as urlPath,
url_query as urlQuery,
referrer_path as referrerPath,
referrer_query as referrerQuery,
referrer_domain as referrerDomain,
page_title as pageTitle,
event_type as eventType,
event_name as eventName
from website_event
where website_id = {websiteId:UUID}
${dateQuery}
${filterQuery}
${
query
? `and ((positionCaseInsensitive(event_name, {query:String}) > 0 and event_type = 2)
or (positionCaseInsensitive(url_path, {query:String}) > 0 and event_type = 1))`
: ''
}
order by created_at desc
limit 1000)
select * from events
`,
{ ...params, query },
pageParams,
);
}

View file

@ -0,0 +1,195 @@
import { EVENT_NAME_LENGTH, URL_LENGTH, EVENT_TYPE, PAGE_TITLE_LENGTH } from '@/lib/constants';
import { CLICKHOUSE, PRISMA, runQuery } from '@/lib/db';
import clickhouse from '@/lib/clickhouse';
import kafka from '@/lib/kafka';
import prisma from '@/lib/prisma';
import { uuid } from '@/lib/crypto';
import { saveEventData } from './saveEventData';
export async function saveEvent(args: {
websiteId: string;
sessionId: string;
visitId: string;
urlPath: string;
urlQuery?: string;
referrerPath?: string;
referrerQuery?: string;
referrerDomain?: string;
pageTitle?: string;
eventName?: string;
eventData?: any;
hostname?: string;
browser?: string;
os?: string;
device?: string;
screen?: string;
language?: string;
country?: string;
subdivision1?: string;
subdivision2?: string;
city?: string;
tag?: string;
}) {
return runQuery({
[PRISMA]: () => relationalQuery(args),
[CLICKHOUSE]: () => clickhouseQuery(args),
});
}
async function relationalQuery(data: {
websiteId: string;
sessionId: string;
visitId: string;
urlPath: string;
urlQuery?: string;
referrerPath?: string;
referrerQuery?: string;
referrerDomain?: string;
pageTitle?: string;
eventName?: string;
eventData?: any;
tag?: string;
}) {
const {
websiteId,
sessionId,
visitId,
urlPath,
urlQuery,
referrerPath,
referrerQuery,
referrerDomain,
eventName,
eventData,
pageTitle,
tag,
} = data;
const websiteEventId = uuid();
const websiteEvent = prisma.client.websiteEvent.create({
data: {
id: websiteEventId,
websiteId,
sessionId,
visitId,
urlPath: urlPath?.substring(0, URL_LENGTH),
urlQuery: urlQuery?.substring(0, URL_LENGTH),
referrerPath: referrerPath?.substring(0, URL_LENGTH),
referrerQuery: referrerQuery?.substring(0, URL_LENGTH),
referrerDomain: referrerDomain?.substring(0, URL_LENGTH),
pageTitle: pageTitle?.substring(0, PAGE_TITLE_LENGTH),
eventType: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView,
eventName: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null,
tag,
},
});
if (eventData) {
await saveEventData({
websiteId,
sessionId,
eventId: websiteEventId,
urlPath: urlPath?.substring(0, URL_LENGTH),
eventName: eventName?.substring(0, EVENT_NAME_LENGTH),
eventData,
});
}
return websiteEvent;
}
async function clickhouseQuery(data: {
websiteId: string;
sessionId: string;
visitId: string;
urlPath: string;
urlQuery?: string;
referrerPath?: string;
referrerQuery?: string;
referrerDomain?: string;
pageTitle?: string;
eventName?: string;
eventData?: any;
hostname?: string;
browser?: string;
os?: string;
device?: string;
screen?: string;
language?: string;
country?: string;
subdivision1?: string;
subdivision2?: string;
city?: string;
tag?: string;
}) {
const {
websiteId,
sessionId,
visitId,
urlPath,
urlQuery,
referrerPath,
referrerQuery,
referrerDomain,
pageTitle,
eventName,
eventData,
country,
subdivision1,
subdivision2,
city,
tag,
...args
} = data;
const { insert, getUTCString } = clickhouse;
const { sendMessage } = kafka;
const eventId = uuid();
const createdAt = getUTCString();
const message = {
...args,
website_id: websiteId,
session_id: sessionId,
visit_id: visitId,
event_id: eventId,
country: country,
subdivision1:
country && subdivision1
? subdivision1.includes('-')
? subdivision1
: `${country}-${subdivision1}`
: null,
subdivision2: subdivision2,
city: city,
url_path: urlPath?.substring(0, URL_LENGTH),
url_query: urlQuery?.substring(0, URL_LENGTH),
referrer_path: referrerPath?.substring(0, URL_LENGTH),
referrer_query: referrerQuery?.substring(0, URL_LENGTH),
referrer_domain: referrerDomain?.substring(0, URL_LENGTH),
page_title: pageTitle?.substring(0, PAGE_TITLE_LENGTH),
event_type: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView,
event_name: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null,
tag: tag,
created_at: createdAt,
};
if (kafka.enabled) {
await sendMessage('event', message);
} else {
await insert('website_event', [message]);
}
if (eventData) {
await saveEventData({
websiteId,
sessionId,
eventId,
urlPath: urlPath?.substring(0, URL_LENGTH),
eventName: eventName?.substring(0, EVENT_NAME_LENGTH),
eventData,
createdAt,
});
}
return data;
}

View file

@ -0,0 +1,91 @@
import { Prisma } from '@prisma/client';
import { DATA_TYPE } from '@/lib/constants';
import { uuid } from '@/lib/crypto';
import { CLICKHOUSE, PRISMA, runQuery } from '@/lib/db';
import { flattenJSON, getStringValue } from '@/lib/data';
import clickhouse from '@/lib/clickhouse';
import kafka from '@/lib/kafka';
import prisma from '@/lib/prisma';
import { DynamicData } from '@/lib/types';
export async function saveEventData(data: {
websiteId: string;
eventId: string;
sessionId?: string;
urlPath?: string;
eventName?: string;
eventData: DynamicData;
createdAt?: string;
}) {
return runQuery({
[PRISMA]: () => relationalQuery(data),
[CLICKHOUSE]: () => clickhouseQuery(data),
});
}
async function relationalQuery(data: {
websiteId: string;
eventId: string;
eventData: DynamicData;
}): Promise<Prisma.BatchPayload> {
const { websiteId, eventId, eventData } = data;
const jsonKeys = flattenJSON(eventData);
// id, websiteEventId, eventStringValue
const flattenedData = jsonKeys.map(a => ({
id: uuid(),
websiteEventId: eventId,
websiteId,
dataKey: a.key,
stringValue: getStringValue(a.value, a.dataType),
numberValue: a.dataType === DATA_TYPE.number ? a.value : null,
dateValue: a.dataType === DATA_TYPE.date ? new Date(a.value) : null,
dataType: a.dataType,
}));
return prisma.client.eventData.createMany({
data: flattenedData,
});
}
async function clickhouseQuery(data: {
websiteId: string;
eventId: string;
sessionId?: string;
urlPath?: string;
eventName?: string;
eventData: DynamicData;
createdAt?: string;
}) {
const { websiteId, sessionId, eventId, urlPath, eventName, eventData, createdAt } = data;
const { insert, getUTCString } = clickhouse;
const { sendMessage } = kafka;
const jsonKeys = flattenJSON(eventData);
const messages = jsonKeys.map(({ key, value, dataType }) => {
return {
website_id: websiteId,
session_id: sessionId,
event_id: eventId,
url_path: urlPath,
event_name: eventName,
data_key: key,
data_type: dataType,
string_value: getStringValue(value, dataType),
number_value: dataType === DATA_TYPE.number ? value : null,
date_value: dataType === DATA_TYPE.date ? getUTCString(value) : null,
created_at: createdAt,
};
});
if (kafka.enabled) {
await sendMessage('event_data', messages);
} else {
await insert('event_data', messages);
}
return data;
}