Merge pull request #5 from autodity/bugfix/batch-data

fix: support batch event
This commit is contained in:
Viet-Tien 2024-11-27 13:16:37 +07:00 committed by GitHub
commit 430ed6f3ce
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 242 additions and 153 deletions

2
next-env.d.ts vendored
View file

@ -3,4 +3,4 @@
/// <reference types="next/navigation-types/compat/navigation" /> /// <reference types="next/navigation-types/compat/navigation" />
// NOTE: This file should not be edited // NOTE: This file should not be edited
// see https://nextjs.org/docs/app/building-your-application/configuring/typescript for more information. // see https://nextjs.org/docs/basic-features/typescript for more information.

View file

@ -52,12 +52,6 @@ export interface DynamicData {
[key: string]: number | string | number[] | string[]; [key: string]: number | string | number[] | string[];
} }
export interface JsonKeyDynamicData {
key: string;
value: any;
dataType: DynamicDataType;
}
export interface Auth { export interface Auth {
user?: { user?: {
id: string; id: string;

View file

@ -67,10 +67,15 @@ async function relationalQuery(data: {
eventBatchData, eventBatchData,
tag, tag,
} = data; } = data;
const websiteEventData = [];
const eventsData = [];
if (eventBatchData) {
for (const eventData of eventBatchData) {
const websiteEventId = uuid(); const websiteEventId = uuid();
const websiteEvent = prisma.client.websiteEvent.create({ websiteEventData.push({
data: {
id: websiteEventId, id: websiteEventId,
websiteId, websiteId,
sessionId, sessionId,
@ -84,11 +89,38 @@ async function relationalQuery(data: {
eventType: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView, eventType: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView,
eventName: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null, eventName: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null,
tag, tag,
},
}); });
if (eventData || eventBatchData) { eventsData.push({
await saveEventData({ websiteId,
sessionId,
visitId,
eventId: websiteEventId,
urlPath: urlPath?.substring(0, URL_LENGTH),
eventName: eventName?.substring(0, EVENT_NAME_LENGTH),
eventData,
});
}
} else {
const websiteEventId = uuid();
websiteEventData.push({
id: websiteEventId,
websiteId,
sessionId,
visitId,
urlPath: urlPath?.substring(0, URL_LENGTH),
urlQuery: urlQuery?.substring(0, URL_LENGTH),
referrerPath: referrerPath?.substring(0, URL_LENGTH),
referrerQuery: referrerQuery?.substring(0, URL_LENGTH),
referrerDomain: referrerDomain?.substring(0, URL_LENGTH),
pageTitle: pageTitle?.substring(0, PAGE_TITLE_LENGTH),
eventType: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView,
eventName: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null,
tag,
});
eventsData.push({
websiteId, websiteId,
sessionId, sessionId,
visitId, visitId,
@ -96,11 +128,18 @@ async function relationalQuery(data: {
urlPath: urlPath?.substring(0, URL_LENGTH), urlPath: urlPath?.substring(0, URL_LENGTH),
eventName: eventName?.substring(0, EVENT_NAME_LENGTH), eventName: eventName?.substring(0, EVENT_NAME_LENGTH),
eventData, eventData,
eventBatchData,
}); });
} }
return websiteEvent; const websiteEvents = prisma.client.websiteEvent.createMany({
data: websiteEventData,
});
if (eventData || eventBatchData) {
await saveEventData(eventsData);
}
return websiteEvents;
} }
async function clickhouseQuery(data: { async function clickhouseQuery(data: {
@ -149,16 +188,22 @@ async function clickhouseQuery(data: {
...args ...args
} = data; } = data;
const { insert, getUTCString } = clickhouse; const { insert, getUTCString } = clickhouse;
const { sendMessage } = kafka; const { sendMessages } = kafka;
const eventId = uuid();
const createdAt = getUTCString(); const createdAt = getUTCString();
const message = { const websiteEventData = [];
const eventsData = [];
if (eventBatchData) {
for (const eventData of eventBatchData) {
const websiteEventId = uuid();
websiteEventData.push({
...args, ...args,
website_id: websiteId, website_id: websiteId,
session_id: sessionId, session_id: sessionId,
visit_id: visitId, visit_id: visitId,
event_id: eventId, event_id: websiteEventId,
country: country, country: country,
subdivision1: subdivision1:
country && subdivision1 country && subdivision1
@ -178,27 +223,70 @@ async function clickhouseQuery(data: {
event_name: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null, event_name: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null,
tag: tag, tag: tag,
created_at: createdAt, created_at: createdAt,
}; });
if (kafka.enabled) { eventsData.push({
await sendMessage('event', message);
} else {
await insert('website_event', [message]);
}
if (eventData || eventBatchData) {
await saveEventData({
websiteId, websiteId,
sessionId, sessionId,
visitId, visitId,
eventId, eventId: websiteEventId,
urlPath: urlPath?.substring(0, URL_LENGTH),
eventName: eventName?.substring(0, EVENT_NAME_LENGTH),
eventData,
createdAt,
});
}
} else {
const websiteEventId = uuid();
websiteEventData.push({
...args,
website_id: websiteId,
session_id: sessionId,
visit_id: visitId,
event_id: websiteEventId,
country: country,
subdivision1:
country && subdivision1
? subdivision1.includes('-')
? subdivision1
: `${country}-${subdivision1}`
: null,
subdivision2: subdivision2,
city: city,
url_path: urlPath?.substring(0, URL_LENGTH),
url_query: urlQuery?.substring(0, URL_LENGTH),
referrer_path: referrerPath?.substring(0, URL_LENGTH),
referrer_query: referrerQuery?.substring(0, URL_LENGTH),
referrer_domain: referrerDomain?.substring(0, URL_LENGTH),
page_title: pageTitle?.substring(0, PAGE_TITLE_LENGTH),
event_type: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView,
event_name: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null,
tag: tag,
created_at: createdAt,
});
eventsData.push({
websiteId,
sessionId,
visitId,
eventId: websiteEventId,
urlPath: urlPath?.substring(0, URL_LENGTH), urlPath: urlPath?.substring(0, URL_LENGTH),
eventName: eventName?.substring(0, EVENT_NAME_LENGTH), eventName: eventName?.substring(0, EVENT_NAME_LENGTH),
eventData, eventData,
eventBatchData,
createdAt, createdAt,
}); });
} }
if (kafka.enabled) {
await sendMessages('event', websiteEventData);
} else {
await insert('website_event', websiteEventData);
}
if (eventData || eventBatchData) {
await saveEventData(eventsData);
}
return data; return data;
} }

View file

@ -6,9 +6,10 @@ import { flattenDynamicData, flattenJSON, getStringValue } from 'lib/data';
import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db'; import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db';
import kafka from 'lib/kafka'; import kafka from 'lib/kafka';
import prisma from 'lib/prisma'; import prisma from 'lib/prisma';
import { DynamicData, JsonKeyDynamicData } from 'lib/types'; import { DynamicData, DynamicDataType } from 'lib/types';
export async function saveEventData(data: { export async function saveEventData(
data: Array<{
websiteId: string; websiteId: string;
eventId: string; eventId: string;
sessionId?: string; sessionId?: string;
@ -16,29 +17,36 @@ export async function saveEventData(data: {
urlPath?: string; urlPath?: string;
eventName?: string; eventName?: string;
eventData?: DynamicData; eventData?: DynamicData;
eventBatchData?: Array<DynamicData>;
createdAt?: string; createdAt?: string;
}) { }>,
) {
return runQuery({ return runQuery({
[PRISMA]: () => relationalQuery(data), [PRISMA]: () => relationalQuery(data),
[CLICKHOUSE]: () => clickhouseQuery(data), [CLICKHOUSE]: () => clickhouseQuery(data),
}); });
} }
async function relationalQuery(data: { async function relationalQuery(
data: Array<{
websiteId: string; websiteId: string;
eventId: string; eventId: string;
eventData?: DynamicData; eventData?: DynamicData;
eventBatchData?: Array<DynamicData>; }>,
}): Promise<Prisma.BatchPayload> { ): Promise<Prisma.BatchPayload> {
const { websiteId, eventId, eventData, eventBatchData } = data; const listFlattenedData: {
id: string;
websiteEventId: string;
websiteId: string;
dataKey: string;
stringValue: string;
numberValue: number;
dateValue: Date;
dataType: DynamicDataType;
}[] = [];
let jsonKeys: Array<JsonKeyDynamicData> = []; for (const item of data) {
if (eventData) { const { websiteId, eventId, eventData } = item;
jsonKeys = flattenJSON(eventData); const jsonKeys = flattenJSON(eventData);
} else if (eventBatchData) {
jsonKeys = eventBatchData.flatMap(d => flattenJSON(d));
}
// id, websiteEventId, eventStringValue // id, websiteEventId, eventStringValue
const flattenedData = jsonKeys.map(a => ({ const flattenedData = jsonKeys.map(a => ({
@ -52,12 +60,16 @@ async function relationalQuery(data: {
dataType: a.dataType, dataType: a.dataType,
})); }));
listFlattenedData.push(...flattenedData);
}
return prisma.client.eventData.createMany({ return prisma.client.eventData.createMany({
data: flattenedData, data: listFlattenedData,
}); });
} }
async function clickhouseQuery(data: { async function clickhouseQuery(
data: Array<{
websiteId: string; websiteId: string;
eventId: string; eventId: string;
sessionId?: string; sessionId?: string;
@ -65,32 +77,22 @@ async function clickhouseQuery(data: {
urlPath?: string; urlPath?: string;
eventName?: string; eventName?: string;
eventData?: DynamicData; eventData?: DynamicData;
eventBatchData?: Array<DynamicData>;
createdAt?: string; createdAt?: string;
}) { }>,
const { ) {
websiteId, const { sendMessages } = kafka;
sessionId,
visitId,
eventId,
urlPath,
eventName,
eventData,
eventBatchData,
createdAt,
} = data;
const { sendMessages, sendMessage } = kafka;
const { insert, getUTCString } = clickhouse; const { insert, getUTCString } = clickhouse;
let jsonKeys: Array<JsonKeyDynamicData> = []; const messagesEventData = [];
if (eventData) { const messagesEventDataBlob = [];
jsonKeys = flattenJSON(eventData);
} else if (eventBatchData) {
jsonKeys = eventBatchData.flatMap(d => flattenJSON(d));
}
const messages = jsonKeys.map(({ key, value, dataType }) => { for (const item of data) {
const { websiteId, sessionId, visitId, eventId, urlPath, eventName, eventData, createdAt } =
item;
const jsonKeys = flattenJSON(eventData);
const messagesPerEventData = jsonKeys.map(({ key, value, dataType }) => {
return { return {
website_id: websiteId, website_id: websiteId,
session_id: sessionId, session_id: sessionId,
@ -107,14 +109,10 @@ async function clickhouseQuery(data: {
}; };
}); });
if (kafka.enabled) { messagesEventData.push(...messagesPerEventData);
await sendMessages('event_data', messages);
} else {
await insert('event_data', messages);
}
const jsonBlobs = flattenDynamicData(jsonKeys); const jsonBlobs = flattenDynamicData(jsonKeys);
const message: { [key: string]: string | number } = { const messageEventDataBlob: { [key: string]: string | number } = {
website_id: websiteId, website_id: websiteId,
session_id: sessionId, session_id: sessionId,
event_id: eventId, event_id: eventId,
@ -124,17 +122,26 @@ async function clickhouseQuery(data: {
}; };
jsonBlobs.blobs.forEach((blob, i) => { jsonBlobs.blobs.forEach((blob, i) => {
if (i >= 20) return; // 20 is the max number of blobs if (i >= 20) return; // 20 is the max number of blobs
message[`blob${i + 1}`] = blob; messageEventDataBlob[`blob${i + 1}`] = blob;
}); });
jsonBlobs.doubles.forEach((double, i) => { jsonBlobs.doubles.forEach((double, i) => {
if (i >= 20) return; // 20 is the max number of doubles if (i >= 20) return; // 20 is the max number of doubles
message[`double${i + 1}`] = double; messageEventDataBlob[`double${i + 1}`] = double;
}); });
messagesEventDataBlob.push(messageEventDataBlob);
}
if (kafka.enabled) { if (kafka.enabled) {
await sendMessage('event_data_blob', message); await Promise.all([
sendMessages('event_data', messagesEventData),
sendMessages('event_data_blob', messagesEventDataBlob),
]);
} else { } else {
await insert('event_data_blob', [message]); await Promise.all([
insert('event_data', messagesEventData),
insert('event_data_blob', messagesEventDataBlob),
]);
} }
return data; return data;