fix: support batch event

This commit is contained in:
harry 2024-11-26 17:00:32 +07:00
parent 42824be546
commit 1a908b86fe
3 changed files with 156 additions and 116 deletions

View file

@ -52,12 +52,6 @@ export interface DynamicData {
[key: string]: number | string | number[] | string[];
}
export interface JsonKeyDynamicData {
key: string;
value: any;
dataType: DynamicDataType;
}
export interface Auth {
user?: {
id: string;

View file

@ -67,10 +67,44 @@ async function relationalQuery(data: {
eventBatchData,
tag,
} = data;
const websiteEventId = uuid();
const websiteEvent = prisma.client.websiteEvent.create({
data: {
const websiteEventData = [];
const eventsData = [];
if (eventBatchData) {
for (const eventData of eventBatchData) {
const websiteEventId = uuid();
websiteEventData.push({
id: websiteEventId,
websiteId,
sessionId,
visitId,
urlPath: urlPath?.substring(0, URL_LENGTH),
urlQuery: urlQuery?.substring(0, URL_LENGTH),
referrerPath: referrerPath?.substring(0, URL_LENGTH),
referrerQuery: referrerQuery?.substring(0, URL_LENGTH),
referrerDomain: referrerDomain?.substring(0, URL_LENGTH),
pageTitle: pageTitle?.substring(0, PAGE_TITLE_LENGTH),
eventType: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView,
eventName: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null,
tag,
});
eventsData.push({
websiteId,
sessionId,
visitId,
eventId: websiteEventId,
urlPath: urlPath?.substring(0, URL_LENGTH),
eventName: eventName?.substring(0, EVENT_NAME_LENGTH),
eventData,
});
}
} else {
const websiteEventId = uuid();
websiteEventData.push({
id: websiteEventId,
websiteId,
sessionId,
@ -84,11 +118,9 @@ async function relationalQuery(data: {
eventType: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView,
eventName: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null,
tag,
},
});
});
if (eventData || eventBatchData) {
await saveEventData({
eventsData.push({
websiteId,
sessionId,
visitId,
@ -96,11 +128,18 @@ async function relationalQuery(data: {
urlPath: urlPath?.substring(0, URL_LENGTH),
eventName: eventName?.substring(0, EVENT_NAME_LENGTH),
eventData,
eventBatchData,
});
}
return websiteEvent;
const websiteEvents = prisma.client.websiteEvent.createMany({
data: websiteEventData,
});
if (eventData || eventBatchData) {
await saveEventData(eventsData);
}
return websiteEvents;
}
async function clickhouseQuery(data: {

View file

@ -6,135 +6,142 @@ import { flattenDynamicData, flattenJSON, getStringValue } from 'lib/data';
import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db';
import kafka from 'lib/kafka';
import prisma from 'lib/prisma';
import { DynamicData, JsonKeyDynamicData } from 'lib/types';
import { DynamicData, DynamicDataType } from 'lib/types';
export async function saveEventData(data: {
websiteId: string;
eventId: string;
sessionId?: string;
visitId?: string;
urlPath?: string;
eventName?: string;
eventData?: DynamicData;
eventBatchData?: Array<DynamicData>;
createdAt?: string;
}) {
export async function saveEventData(
data: Array<{
websiteId: string;
eventId: string;
sessionId?: string;
visitId?: string;
urlPath?: string;
eventName?: string;
eventData?: DynamicData;
createdAt?: string;
}>,
) {
return runQuery({
[PRISMA]: () => relationalQuery(data),
[CLICKHOUSE]: () => clickhouseQuery(data),
});
}
async function relationalQuery(data: {
websiteId: string;
eventId: string;
eventData?: DynamicData;
eventBatchData?: Array<DynamicData>;
}): Promise<Prisma.BatchPayload> {
const { websiteId, eventId, eventData, eventBatchData } = data;
async function relationalQuery(
data: Array<{
websiteId: string;
eventId: string;
eventData?: DynamicData;
}>,
): Promise<Prisma.BatchPayload> {
const listFlattenedData: {
id: string;
websiteEventId: string;
websiteId: string;
dataKey: string;
stringValue: string;
numberValue: number;
dateValue: Date;
dataType: DynamicDataType;
}[] = [];
let jsonKeys: Array<JsonKeyDynamicData> = [];
if (eventData) {
jsonKeys = flattenJSON(eventData);
} else if (eventBatchData) {
jsonKeys = eventBatchData.flatMap(d => flattenJSON(d));
for (const item of data) {
const { websiteId, eventId, eventData } = item;
const jsonKeys = flattenJSON(eventData);
// id, websiteEventId, eventStringValue
const flattenedData = jsonKeys.map(a => ({
id: uuid(),
websiteEventId: eventId,
websiteId,
dataKey: a.key,
stringValue: getStringValue(a.value, a.dataType),
numberValue: a.dataType === DATA_TYPE.number ? a.value : null,
dateValue: a.dataType === DATA_TYPE.date ? new Date(a.value) : null,
dataType: a.dataType,
}));
listFlattenedData.push(...flattenedData);
}
// id, websiteEventId, eventStringValue
const flattenedData = jsonKeys.map(a => ({
id: uuid(),
websiteEventId: eventId,
websiteId,
dataKey: a.key,
stringValue: getStringValue(a.value, a.dataType),
numberValue: a.dataType === DATA_TYPE.number ? a.value : null,
dateValue: a.dataType === DATA_TYPE.date ? new Date(a.value) : null,
dataType: a.dataType,
}));
return prisma.client.eventData.createMany({
data: flattenedData,
data: listFlattenedData,
});
}
async function clickhouseQuery(data: {
websiteId: string;
eventId: string;
sessionId?: string;
visitId?: string;
urlPath?: string;
eventName?: string;
eventData?: DynamicData;
eventBatchData?: Array<DynamicData>;
createdAt?: string;
}) {
const {
websiteId,
sessionId,
visitId,
eventId,
urlPath,
eventName,
eventData,
eventBatchData,
createdAt,
} = data;
const { sendMessages, sendMessage } = kafka;
async function clickhouseQuery(
data: Array<{
websiteId: string;
eventId: string;
sessionId?: string;
visitId?: string;
urlPath?: string;
eventName?: string;
eventData?: DynamicData;
createdAt?: string;
}>,
) {
const { sendMessages } = kafka;
const { insert, getUTCString } = clickhouse;
let jsonKeys: Array<JsonKeyDynamicData> = [];
if (eventData) {
jsonKeys = flattenJSON(eventData);
} else if (eventBatchData) {
jsonKeys = eventBatchData.flatMap(d => flattenJSON(d));
}
const messagesEventData = [];
const messagesEventDataBlob = [];
const messages = jsonKeys.map(({ key, value, dataType }) => {
return {
for (const item of data) {
const { websiteId, sessionId, visitId, eventId, urlPath, eventName, eventData, createdAt } =
item;
const jsonKeys = flattenJSON(eventData);
const messagesPerEventData = jsonKeys.map(({ key, value, dataType }) => {
return {
website_id: websiteId,
session_id: sessionId,
event_id: eventId,
visit_id: visitId,
url_path: urlPath,
event_name: eventName,
data_key: key,
data_type: dataType,
string_value: getStringValue(value, dataType),
number_value: dataType === DATA_TYPE.number ? value : null,
date_value: dataType === DATA_TYPE.date ? getUTCString(value) : null,
created_at: createdAt ?? getUTCString(),
};
});
messagesEventData.push(...messagesPerEventData);
const jsonBlobs = flattenDynamicData(jsonKeys);
const messageEventDataBlob: { [key: string]: string | number } = {
website_id: websiteId,
session_id: sessionId,
event_id: eventId,
visit_id: visitId,
url_path: urlPath,
event_name: eventName,
data_key: key,
data_type: dataType,
string_value: getStringValue(value, dataType),
number_value: dataType === DATA_TYPE.number ? value : null,
date_value: dataType === DATA_TYPE.date ? getUTCString(value) : null,
created_at: createdAt ?? getUTCString(),
created_at: createdAt ?? getUTCString(new Date()),
};
});
jsonBlobs.blobs.forEach((blob, i) => {
if (i >= 20) return; // 20 is the max number of blobs
messageEventDataBlob[`blob${i + 1}`] = blob;
});
jsonBlobs.doubles.forEach((double, i) => {
if (i >= 20) return; // 20 is the max number of doubles
messageEventDataBlob[`double${i + 1}`] = double;
});
if (kafka.enabled) {
await sendMessages('event_data', messages);
} else {
await insert('event_data', messages);
messagesEventDataBlob.push(messageEventDataBlob);
}
const jsonBlobs = flattenDynamicData(jsonKeys);
const message: { [key: string]: string | number } = {
website_id: websiteId,
session_id: sessionId,
event_id: eventId,
visit_id: visitId,
event_name: eventName,
created_at: createdAt ?? getUTCString(new Date()),
};
jsonBlobs.blobs.forEach((blob, i) => {
if (i >= 20) return; // 20 is the max number of blobs
message[`blob${i + 1}`] = blob;
});
jsonBlobs.doubles.forEach((double, i) => {
if (i >= 20) return; // 20 is the max number of doubles
message[`double${i + 1}`] = double;
});
if (kafka.enabled) {
await sendMessage('event_data_blob', message);
await Promise.all([
sendMessages('event_data', messagesEventData),
sendMessages('event_data_blob', messagesEventDataBlob),
]);
} else {
await insert('event_data_blob', [message]);
await Promise.all([
insert('event_data', messagesEventData),
insert('event_data_blob', messagesEventDataBlob),
]);
}
return data;