mirror of
https://github.com/umami-software/umami.git
synced 2026-02-19 03:55:37 +01:00
save event data into both table
This commit is contained in:
parent
0496062be0
commit
891e7f1f5b
3 changed files with 54 additions and 2 deletions
|
|
@ -143,6 +143,7 @@ export const REPORT_PARAMETERS = {
|
|||
export const KAFKA_TOPIC = {
|
||||
event: 'event',
|
||||
eventData: 'event_data',
|
||||
eventDataBlob: 'event_data_blob',
|
||||
} as const;
|
||||
|
||||
export const ROLES = {
|
||||
|
|
|
|||
|
|
@ -92,3 +92,37 @@ function getKeyName(key: string, parentKey: string) {
|
|||
export function objectToArray(obj: object) {
|
||||
return Object.keys(obj).map(key => obj[key]);
|
||||
}
|
||||
|
||||
export function flattenDynamicData(
|
||||
data: { key: string; value: any; dataType: DynamicDataType }[],
|
||||
): {
|
||||
blobs: string[];
|
||||
doubles: number[];
|
||||
} {
|
||||
const blobs: string[] = [];
|
||||
const doubles: number[] = [];
|
||||
|
||||
data.forEach(({ value, dataType }) => {
|
||||
switch (dataType) {
|
||||
case DATA_TYPE.string:
|
||||
blobs.push(value);
|
||||
break;
|
||||
case DATA_TYPE.number:
|
||||
doubles.push(value);
|
||||
break;
|
||||
case DATA_TYPE.date:
|
||||
doubles.push(new Date(value).getTime());
|
||||
break;
|
||||
case DATA_TYPE.boolean:
|
||||
doubles.push(value ? 1 : 0);
|
||||
break;
|
||||
case DATA_TYPE.array:
|
||||
blobs.push(JSON.stringify(value));
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
return { blobs, doubles };
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import { Prisma } from '@prisma/client';
|
|||
import { DATA_TYPE } from 'lib/constants';
|
||||
import { uuid } from 'lib/crypto';
|
||||
import { CLICKHOUSE, PRISMA, runQuery } from 'lib/db';
|
||||
import { flattenJSON, getStringValue } from 'lib/data';
|
||||
import { flattenDynamicData, flattenJSON, getStringValue } from 'lib/data';
|
||||
import kafka from 'lib/kafka';
|
||||
import prisma from 'lib/prisma';
|
||||
import { DynamicData } from 'lib/types';
|
||||
|
|
@ -61,7 +61,7 @@ async function clickhouseQuery(data: {
|
|||
}) {
|
||||
const { websiteId, sessionId, visitId, eventId, urlPath, eventName, eventData, createdAt } = data;
|
||||
|
||||
const { getDateFormat, sendMessages } = kafka;
|
||||
const { getDateFormat, sendMessages, sendMessage } = kafka;
|
||||
|
||||
const jsonKeys = flattenJSON(eventData);
|
||||
|
||||
|
|
@ -84,5 +84,22 @@ async function clickhouseQuery(data: {
|
|||
|
||||
await sendMessages(messages, 'event_data');
|
||||
|
||||
const jsonBlobs = flattenDynamicData(jsonKeys);
|
||||
const message: { [key: string]: string | number } = {
|
||||
website_id: websiteId,
|
||||
session_id: sessionId,
|
||||
event_id: eventId,
|
||||
visitId: visitId,
|
||||
event_name: eventName,
|
||||
};
|
||||
jsonBlobs.blobs.forEach((blob, i) => {
|
||||
message[`blob${i + 1}`] = blob;
|
||||
});
|
||||
jsonBlobs.doubles.forEach((double, i) => {
|
||||
message[`double${i + 1}`] = double;
|
||||
});
|
||||
|
||||
await sendMessage(message, 'event_data_blob');
|
||||
|
||||
return data;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue