Merge pull request #1 from autodity/change_event_data_schema

Change event data schema
This commit is contained in:
Viet-Tien 2024-08-06 10:54:54 +07:00 committed by GitHub
commit 62d045c2d5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 161 additions and 2 deletions

View file

@ -0,0 +1,52 @@
CREATE TABLE umami.event_data_blob
(
website_id UUID,
session_id UUID,
visit_id UUID,
event_id UUID,
blob1 String,
blob2 String,
blob3 String,
blob4 String,
blob5 String,
blob5 String,
blob7 String,
blob8 String,
blob9 String,
blob10 String,
blob11 String,
blob12 String,
blob13 String,
blob14 String,
blob15 String,
blob16 String,
blob17 String,
blob18 String,
blob19 String,
blob20 String,
double1 Nullable(Decimal64(4)),
double2 Nullable(Decimal64(4)),
double3 Nullable(Decimal64(4)),
double4 Nullable(Decimal64(4)),
double5 Nullable(Decimal64(4)),
double6 Nullable(Decimal64(4)),
double7 Nullable(Decimal64(4)),
double8 Nullable(Decimal64(4)),
double9 Nullable(Decimal64(4)),
double10 Nullable(Decimal64(4)),
double11 Nullable(Decimal64(4)),
double12 Nullable(Decimal64(4)),
double13 Nullable(Decimal64(4)),
double14 Nullable(Decimal64(4)),
double15 Nullable(Decimal64(4)),
double16 Nullable(Decimal64(4)),
double17 Nullable(Decimal64(4)),
double18 Nullable(Decimal64(4)),
double19 Nullable(Decimal64(4)),
double20 Nullable(Decimal64(4)),
created_at DateTime('UTC'),
job_id Nullable(UUID)
)
engine = MergeTree
ORDER BY (website_id, visit_id, event_id, created_at)
SETTINGS index_granularity = 8192;

View file

@ -69,4 +69,57 @@ CREATE TABLE umami.session_data
) )
engine = MergeTree engine = MergeTree
ORDER BY (website_id, session_id, data_key, created_at) ORDER BY (website_id, session_id, data_key, created_at)
SETTINGS index_granularity = 8192;
CREATE TABLE umami.event_data_blob
(
website_id UUID,
session_id UUID,
visit_id UUID,
event_id UUID,
blob1 String,
blob2 String,
blob3 String,
blob4 String,
blob5 String,
blob5 String,
blob7 String,
blob8 String,
blob9 String,
blob10 String,
blob11 String,
blob12 String,
blob13 String,
blob14 String,
blob15 String,
blob16 String,
blob17 String,
blob18 String,
blob19 String,
blob20 String,
double1 Nullable(Decimal64(4)),
double2 Nullable(Decimal64(4)),
double3 Nullable(Decimal64(4)),
double4 Nullable(Decimal64(4)),
double5 Nullable(Decimal64(4)),
double6 Nullable(Decimal64(4)),
double7 Nullable(Decimal64(4)),
double8 Nullable(Decimal64(4)),
double9 Nullable(Decimal64(4)),
double10 Nullable(Decimal64(4)),
double11 Nullable(Decimal64(4)),
double12 Nullable(Decimal64(4)),
double13 Nullable(Decimal64(4)),
double14 Nullable(Decimal64(4)),
double15 Nullable(Decimal64(4)),
double16 Nullable(Decimal64(4)),
double17 Nullable(Decimal64(4)),
double18 Nullable(Decimal64(4)),
double19 Nullable(Decimal64(4)),
double20 Nullable(Decimal64(4)),
created_at DateTime('UTC'),
job_id Nullable(UUID)
)
engine = MergeTree
ORDER BY (website_id, visit_id, event_id, created_at)
SETTINGS index_granularity = 8192; SETTINGS index_granularity = 8192;

View file

@ -143,6 +143,7 @@ export const REPORT_PARAMETERS = {
export const KAFKA_TOPIC = { export const KAFKA_TOPIC = {
event: 'event', event: 'event',
eventData: 'event_data', eventData: 'event_data',
eventDataBlob: 'event_data_blob',
} as const; } as const;
export const ROLES = { export const ROLES = {

View file

@ -92,3 +92,37 @@ function getKeyName(key: string, parentKey: string) {
export function objectToArray(obj: object) { export function objectToArray(obj: object) {
return Object.keys(obj).map(key => obj[key]); return Object.keys(obj).map(key => obj[key]);
} }
export function flattenDynamicData(
data: { key: string; value: any; dataType: DynamicDataType }[],
): {
blobs: string[];
doubles: number[];
} {
const blobs: string[] = [];
const doubles: number[] = [];
data.forEach(({ value, dataType }) => {
switch (dataType) {
case DATA_TYPE.string:
blobs.push(value);
break;
case DATA_TYPE.number:
doubles.push(value);
break;
case DATA_TYPE.date:
doubles.push(new Date(value).getTime());
break;
case DATA_TYPE.boolean:
doubles.push(value ? 1 : 0);
break;
case DATA_TYPE.array:
blobs.push(JSON.stringify(value));
break;
default:
break;
}
});
return { blobs, doubles };
}

View file

@ -2,7 +2,7 @@ import { Prisma } from '@prisma/client';
import { DATA_TYPE } from 'lib/constants'; import { DATA_TYPE } from 'lib/constants';
import { uuid } from 'lib/crypto'; import { uuid } from 'lib/crypto';
import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db'; import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db';
import { flattenJSON, getStringValue } from 'lib/data'; import { flattenDynamicData, flattenJSON, getStringValue } from 'lib/data';
import kafka from 'lib/kafka'; import kafka from 'lib/kafka';
import prisma from 'lib/prisma'; import prisma from 'lib/prisma';
import { DynamicData } from 'lib/types'; import { DynamicData } from 'lib/types';
@ -61,7 +61,7 @@ async function clickhouseQuery(data: {
}) { }) {
const { websiteId, sessionId, visitId, eventId, urlPath, eventName, eventData, createdAt } = data; const { websiteId, sessionId, visitId, eventId, urlPath, eventName, eventData, createdAt } = data;
const { getDateFormat, sendMessages } = kafka; const { getDateFormat, sendMessages, sendMessage } = kafka;
const jsonKeys = flattenJSON(eventData); const jsonKeys = flattenJSON(eventData);
@ -84,5 +84,24 @@ async function clickhouseQuery(data: {
await sendMessages(messages, 'event_data'); await sendMessages(messages, 'event_data');
const jsonBlobs = flattenDynamicData(jsonKeys);
const message: { [key: string]: string | number } = {
website_id: websiteId,
session_id: sessionId,
event_id: eventId,
visitId: visitId,
event_name: eventName,
};
jsonBlobs.blobs.forEach((blob, i) => {
if (i >= 20) return; // 20 is the max number of blobs
message[`blob${i + 1}`] = blob;
});
jsonBlobs.doubles.forEach((double, i) => {
if (i >= 20) return; // 20 is the max number of doubles
message[`double${i + 1}`] = double;
});
await sendMessage(message, 'event_data_blob');
return data; return data;
} }