feat: add batch data for tracking payload

This commit is contained in:
harry 2024-11-20 09:32:33 +07:00
parent 7ec87553cc
commit e30315ba53
6 changed files with 68 additions and 14 deletions

View file

@ -18,6 +18,7 @@ export async function saveEvent(args: {
pageTitle?: string;
eventName?: string;
eventData?: any;
eventBatchData?: any[];
hostname?: string;
browser?: string;
os?: string;
@ -47,6 +48,7 @@ async function relationalQuery(data: {
pageTitle?: string;
eventName?: string;
eventData?: any;
eventBatchData?: Array<any>;
}) {
const {
websiteId,
@ -60,6 +62,7 @@ async function relationalQuery(data: {
eventName,
eventData,
pageTitle,
eventBatchData,
} = data;
const websiteEventId = uuid();
@ -80,7 +83,7 @@ async function relationalQuery(data: {
},
});
if (eventData) {
if (eventData || eventBatchData) {
await saveEventData({
websiteId,
sessionId,
@ -89,6 +92,7 @@ async function relationalQuery(data: {
urlPath: urlPath?.substring(0, URL_LENGTH),
eventName: eventName?.substring(0, EVENT_NAME_LENGTH),
eventData,
eventBatchData,
});
}
@ -107,6 +111,7 @@ async function clickhouseQuery(data: {
pageTitle?: string;
eventName?: string;
eventData?: any;
eventBatchData?: any[];
hostname?: string;
browser?: string;
os?: string;
@ -130,6 +135,7 @@ async function clickhouseQuery(data: {
pageTitle,
eventName,
eventData,
eventBatchData,
country,
subdivision1,
subdivision2,
@ -173,7 +179,7 @@ async function clickhouseQuery(data: {
await insert('website_event', [message]);
}
if (eventData) {
if (eventData || eventBatchData) {
await saveEventData({
websiteId,
sessionId,
@ -182,6 +188,7 @@ async function clickhouseQuery(data: {
urlPath: urlPath?.substring(0, URL_LENGTH),
eventName: eventName?.substring(0, EVENT_NAME_LENGTH),
eventData,
eventBatchData,
createdAt,
});
}

View file

@ -6,7 +6,7 @@ import { flattenDynamicData, flattenJSON, getStringValue } from 'lib/data';
import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db';
import kafka from 'lib/kafka';
import prisma from 'lib/prisma';
import { DynamicData } from 'lib/types';
import { DynamicData, JsonKeyDynamicData } from 'lib/types';
export async function saveEventData(data: {
websiteId: string;
@ -15,7 +15,8 @@ export async function saveEventData(data: {
visitId?: string;
urlPath?: string;
eventName?: string;
eventData: DynamicData;
eventData?: DynamicData;
eventBatchData?: Array<DynamicData>;
createdAt?: string;
}) {
return runQuery({
@ -27,11 +28,17 @@ export async function saveEventData(data: {
async function relationalQuery(data: {
websiteId: string;
eventId: string;
eventData: DynamicData;
eventData?: DynamicData;
eventBatchData?: Array<DynamicData>;
}): Promise<Prisma.BatchPayload> {
const { websiteId, eventId, eventData } = data;
const { websiteId, eventId, eventData, eventBatchData } = data;
const jsonKeys = flattenJSON(eventData);
let jsonKeys: Array<JsonKeyDynamicData> = [];
if (eventData) {
jsonKeys = flattenJSON(eventData);
} else if (eventBatchData) {
jsonKeys = eventBatchData.flatMap(d => flattenJSON(d));
}
// id, websiteEventId, eventStringValue
const flattenedData = jsonKeys.map(a => ({
@ -57,15 +64,31 @@ async function clickhouseQuery(data: {
visitId?: string;
urlPath?: string;
eventName?: string;
eventData: DynamicData;
eventData?: DynamicData;
eventBatchData?: Array<DynamicData>;
createdAt?: string;
}) {
const { websiteId, sessionId, visitId, eventId, urlPath, eventName, eventData, createdAt } = data;
const {
websiteId,
sessionId,
visitId,
eventId,
urlPath,
eventName,
eventData,
eventBatchData,
createdAt,
} = data;
const { sendMessages, sendMessage } = kafka;
const { insert, getUTCString } = clickhouse;
const jsonKeys = flattenJSON(eventData);
let jsonKeys: Array<JsonKeyDynamicData> = [];
if (eventData) {
jsonKeys = flattenJSON(eventData);
} else if (eventBatchData) {
jsonKeys = eventBatchData.flatMap(d => flattenJSON(d));
}
const messages = jsonKeys.map(({ key, value, dataType }) => {
return {