diff --git a/next-env.d.ts b/next-env.d.ts index 725dd6f24..fd36f9494 100644 --- a/next-env.d.ts +++ b/next-env.d.ts @@ -3,4 +3,4 @@ /// // NOTE: This file should not be edited -// see https://nextjs.org/docs/app/building-your-application/configuring/typescript for more information. +// see https://nextjs.org/docs/basic-features/typescript for more information. diff --git a/src/lib/types.ts b/src/lib/types.ts index a768952bd..96c0a2655 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -52,12 +52,6 @@ export interface DynamicData { [key: string]: number | string | number[] | string[]; } -export interface JsonKeyDynamicData { - key: string; - value: any; - dataType: DynamicDataType; -} - export interface Auth { user?: { id: string; diff --git a/src/queries/analytics/events/saveEvent.ts b/src/queries/analytics/events/saveEvent.ts index 47d47570f..dfefcc547 100644 --- a/src/queries/analytics/events/saveEvent.ts +++ b/src/queries/analytics/events/saveEvent.ts @@ -67,10 +67,44 @@ async function relationalQuery(data: { eventBatchData, tag, } = data; - const websiteEventId = uuid(); - const websiteEvent = prisma.client.websiteEvent.create({ - data: { + const websiteEventData = []; + const eventsData = []; + + if (eventBatchData) { + for (const eventData of eventBatchData) { + const websiteEventId = uuid(); + + websiteEventData.push({ + id: websiteEventId, + websiteId, + sessionId, + visitId, + urlPath: urlPath?.substring(0, URL_LENGTH), + urlQuery: urlQuery?.substring(0, URL_LENGTH), + referrerPath: referrerPath?.substring(0, URL_LENGTH), + referrerQuery: referrerQuery?.substring(0, URL_LENGTH), + referrerDomain: referrerDomain?.substring(0, URL_LENGTH), + pageTitle: pageTitle?.substring(0, PAGE_TITLE_LENGTH), + eventType: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView, + eventName: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null, + tag, + }); + + eventsData.push({ + websiteId, + sessionId, + visitId, + eventId: websiteEventId, + urlPath: urlPath?.substring(0, URL_LENGTH), + eventName: eventName?.substring(0, EVENT_NAME_LENGTH), + eventData, + }); + } + } else { + const websiteEventId = uuid(); + + websiteEventData.push({ id: websiteEventId, websiteId, sessionId, @@ -84,11 +118,9 @@ async function relationalQuery(data: { eventType: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView, eventName: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null, tag, - }, - }); + }); - if (eventData || eventBatchData) { - await saveEventData({ + eventsData.push({ websiteId, sessionId, visitId, @@ -96,11 +128,18 @@ async function relationalQuery(data: { urlPath: urlPath?.substring(0, URL_LENGTH), eventName: eventName?.substring(0, EVENT_NAME_LENGTH), eventData, - eventBatchData, }); } - return websiteEvent; + const websiteEvents = prisma.client.websiteEvent.createMany({ + data: websiteEventData, + }); + + if (eventData || eventBatchData) { + await saveEventData(eventsData); + } + + return websiteEvents; } async function clickhouseQuery(data: { @@ -149,56 +188,105 @@ async function clickhouseQuery(data: { ...args } = data; const { insert, getUTCString } = clickhouse; - const { sendMessage } = kafka; - const eventId = uuid(); + const { sendMessages } = kafka; const createdAt = getUTCString(); - const message = { - ...args, - website_id: websiteId, - session_id: sessionId, - visit_id: visitId, - event_id: eventId, - country: country, - subdivision1: - country && subdivision1 - ? subdivision1.includes('-') - ? subdivision1 - : `${country}-${subdivision1}` - : null, - subdivision2: subdivision2, - city: city, - url_path: urlPath?.substring(0, URL_LENGTH), - url_query: urlQuery?.substring(0, URL_LENGTH), - referrer_path: referrerPath?.substring(0, URL_LENGTH), - referrer_query: referrerQuery?.substring(0, URL_LENGTH), - referrer_domain: referrerDomain?.substring(0, URL_LENGTH), - page_title: pageTitle?.substring(0, PAGE_TITLE_LENGTH), - event_type: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView, - event_name: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null, - tag: tag, - created_at: createdAt, - }; + const websiteEventData = []; + const eventsData = []; - if (kafka.enabled) { - await sendMessage('event', message); + if (eventBatchData) { + for (const eventData of eventBatchData) { + const websiteEventId = uuid(); + + websiteEventData.push({ + ...args, + website_id: websiteId, + session_id: sessionId, + visit_id: visitId, + event_id: websiteEventId, + country: country, + subdivision1: + country && subdivision1 + ? subdivision1.includes('-') + ? subdivision1 + : `${country}-${subdivision1}` + : null, + subdivision2: subdivision2, + city: city, + url_path: urlPath?.substring(0, URL_LENGTH), + url_query: urlQuery?.substring(0, URL_LENGTH), + referrer_path: referrerPath?.substring(0, URL_LENGTH), + referrer_query: referrerQuery?.substring(0, URL_LENGTH), + referrer_domain: referrerDomain?.substring(0, URL_LENGTH), + page_title: pageTitle?.substring(0, PAGE_TITLE_LENGTH), + event_type: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView, + event_name: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null, + tag: tag, + created_at: createdAt, + }); + + eventsData.push({ + websiteId, + sessionId, + visitId, + eventId: websiteEventId, + urlPath: urlPath?.substring(0, URL_LENGTH), + eventName: eventName?.substring(0, EVENT_NAME_LENGTH), + eventData, + createdAt, + }); + } } else { - await insert('website_event', [message]); - } + const websiteEventId = uuid(); - if (eventData || eventBatchData) { - await saveEventData({ + websiteEventData.push({ + ...args, + website_id: websiteId, + session_id: sessionId, + visit_id: visitId, + event_id: websiteEventId, + country: country, + subdivision1: + country && subdivision1 + ? subdivision1.includes('-') + ? subdivision1 + : `${country}-${subdivision1}` + : null, + subdivision2: subdivision2, + city: city, + url_path: urlPath?.substring(0, URL_LENGTH), + url_query: urlQuery?.substring(0, URL_LENGTH), + referrer_path: referrerPath?.substring(0, URL_LENGTH), + referrer_query: referrerQuery?.substring(0, URL_LENGTH), + referrer_domain: referrerDomain?.substring(0, URL_LENGTH), + page_title: pageTitle?.substring(0, PAGE_TITLE_LENGTH), + event_type: eventName ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView, + event_name: eventName ? eventName?.substring(0, EVENT_NAME_LENGTH) : null, + tag: tag, + created_at: createdAt, + }); + + eventsData.push({ websiteId, sessionId, visitId, - eventId, + eventId: websiteEventId, urlPath: urlPath?.substring(0, URL_LENGTH), eventName: eventName?.substring(0, EVENT_NAME_LENGTH), eventData, - eventBatchData, createdAt, }); } + if (kafka.enabled) { + await sendMessages('event', websiteEventData); + } else { + await insert('website_event', websiteEventData); + } + + if (eventData || eventBatchData) { + await saveEventData(eventsData); + } + return data; } diff --git a/src/queries/analytics/events/saveEventData.ts b/src/queries/analytics/events/saveEventData.ts index a08b5b3b2..50a38baf6 100644 --- a/src/queries/analytics/events/saveEventData.ts +++ b/src/queries/analytics/events/saveEventData.ts @@ -6,135 +6,142 @@ import { flattenDynamicData, flattenJSON, getStringValue } from 'lib/data'; import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db'; import kafka from 'lib/kafka'; import prisma from 'lib/prisma'; -import { DynamicData, JsonKeyDynamicData } from 'lib/types'; +import { DynamicData, DynamicDataType } from 'lib/types'; -export async function saveEventData(data: { - websiteId: string; - eventId: string; - sessionId?: string; - visitId?: string; - urlPath?: string; - eventName?: string; - eventData?: DynamicData; - eventBatchData?: Array; - createdAt?: string; -}) { +export async function saveEventData( + data: Array<{ + websiteId: string; + eventId: string; + sessionId?: string; + visitId?: string; + urlPath?: string; + eventName?: string; + eventData?: DynamicData; + createdAt?: string; + }>, +) { return runQuery({ [PRISMA]: () => relationalQuery(data), [CLICKHOUSE]: () => clickhouseQuery(data), }); } -async function relationalQuery(data: { - websiteId: string; - eventId: string; - eventData?: DynamicData; - eventBatchData?: Array; -}): Promise { - const { websiteId, eventId, eventData, eventBatchData } = data; +async function relationalQuery( + data: Array<{ + websiteId: string; + eventId: string; + eventData?: DynamicData; + }>, +): Promise { + const listFlattenedData: { + id: string; + websiteEventId: string; + websiteId: string; + dataKey: string; + stringValue: string; + numberValue: number; + dateValue: Date; + dataType: DynamicDataType; + }[] = []; - let jsonKeys: Array = []; - if (eventData) { - jsonKeys = flattenJSON(eventData); - } else if (eventBatchData) { - jsonKeys = eventBatchData.flatMap(d => flattenJSON(d)); + for (const item of data) { + const { websiteId, eventId, eventData } = item; + const jsonKeys = flattenJSON(eventData); + + // id, websiteEventId, eventStringValue + const flattenedData = jsonKeys.map(a => ({ + id: uuid(), + websiteEventId: eventId, + websiteId, + dataKey: a.key, + stringValue: getStringValue(a.value, a.dataType), + numberValue: a.dataType === DATA_TYPE.number ? a.value : null, + dateValue: a.dataType === DATA_TYPE.date ? new Date(a.value) : null, + dataType: a.dataType, + })); + + listFlattenedData.push(...flattenedData); } - // id, websiteEventId, eventStringValue - const flattenedData = jsonKeys.map(a => ({ - id: uuid(), - websiteEventId: eventId, - websiteId, - dataKey: a.key, - stringValue: getStringValue(a.value, a.dataType), - numberValue: a.dataType === DATA_TYPE.number ? a.value : null, - dateValue: a.dataType === DATA_TYPE.date ? new Date(a.value) : null, - dataType: a.dataType, - })); - return prisma.client.eventData.createMany({ - data: flattenedData, + data: listFlattenedData, }); } -async function clickhouseQuery(data: { - websiteId: string; - eventId: string; - sessionId?: string; - visitId?: string; - urlPath?: string; - eventName?: string; - eventData?: DynamicData; - eventBatchData?: Array; - createdAt?: string; -}) { - const { - websiteId, - sessionId, - visitId, - eventId, - urlPath, - eventName, - eventData, - eventBatchData, - createdAt, - } = data; - - const { sendMessages, sendMessage } = kafka; +async function clickhouseQuery( + data: Array<{ + websiteId: string; + eventId: string; + sessionId?: string; + visitId?: string; + urlPath?: string; + eventName?: string; + eventData?: DynamicData; + createdAt?: string; + }>, +) { + const { sendMessages } = kafka; const { insert, getUTCString } = clickhouse; - let jsonKeys: Array = []; - if (eventData) { - jsonKeys = flattenJSON(eventData); - } else if (eventBatchData) { - jsonKeys = eventBatchData.flatMap(d => flattenJSON(d)); - } + const messagesEventData = []; + const messagesEventDataBlob = []; - const messages = jsonKeys.map(({ key, value, dataType }) => { - return { + for (const item of data) { + const { websiteId, sessionId, visitId, eventId, urlPath, eventName, eventData, createdAt } = + item; + + const jsonKeys = flattenJSON(eventData); + + const messagesPerEventData = jsonKeys.map(({ key, value, dataType }) => { + return { + website_id: websiteId, + session_id: sessionId, + event_id: eventId, + visit_id: visitId, + url_path: urlPath, + event_name: eventName, + data_key: key, + data_type: dataType, + string_value: getStringValue(value, dataType), + number_value: dataType === DATA_TYPE.number ? value : null, + date_value: dataType === DATA_TYPE.date ? getUTCString(value) : null, + created_at: createdAt ?? getUTCString(), + }; + }); + + messagesEventData.push(...messagesPerEventData); + + const jsonBlobs = flattenDynamicData(jsonKeys); + const messageEventDataBlob: { [key: string]: string | number } = { website_id: websiteId, session_id: sessionId, event_id: eventId, visit_id: visitId, - url_path: urlPath, event_name: eventName, - data_key: key, - data_type: dataType, - string_value: getStringValue(value, dataType), - number_value: dataType === DATA_TYPE.number ? value : null, - date_value: dataType === DATA_TYPE.date ? getUTCString(value) : null, - created_at: createdAt ?? getUTCString(), + created_at: createdAt ?? getUTCString(new Date()), }; - }); + jsonBlobs.blobs.forEach((blob, i) => { + if (i >= 20) return; // 20 is the max number of blobs + messageEventDataBlob[`blob${i + 1}`] = blob; + }); + jsonBlobs.doubles.forEach((double, i) => { + if (i >= 20) return; // 20 is the max number of doubles + messageEventDataBlob[`double${i + 1}`] = double; + }); - if (kafka.enabled) { - await sendMessages('event_data', messages); - } else { - await insert('event_data', messages); + messagesEventDataBlob.push(messageEventDataBlob); } - const jsonBlobs = flattenDynamicData(jsonKeys); - const message: { [key: string]: string | number } = { - website_id: websiteId, - session_id: sessionId, - event_id: eventId, - visit_id: visitId, - event_name: eventName, - created_at: createdAt ?? getUTCString(new Date()), - }; - jsonBlobs.blobs.forEach((blob, i) => { - if (i >= 20) return; // 20 is the max number of blobs - message[`blob${i + 1}`] = blob; - }); - jsonBlobs.doubles.forEach((double, i) => { - if (i >= 20) return; // 20 is the max number of doubles - message[`double${i + 1}`] = double; - }); - if (kafka.enabled) { - await sendMessage('event_data_blob', message); + await Promise.all([ + sendMessages('event_data', messagesEventData), + sendMessages('event_data_blob', messagesEventDataBlob), + ]); } else { - await insert('event_data_blob', [message]); + await Promise.all([ + insert('event_data', messagesEventData), + insert('event_data_blob', messagesEventDataBlob), + ]); } return data;