Fixed identify. Closes #3409

This commit is contained in:
Mike Cao 2025-05-09 16:54:49 -07:00
parent d3977eef7d
commit 3f1ecf4c1b
4 changed files with 156 additions and 212 deletions

View file

@ -7,28 +7,29 @@ import { CLICKHOUSE, PRISMA, runQuery } from '@/lib/db';
import kafka from '@/lib/kafka';
import clickhouse from '@/lib/clickhouse';
export async function saveSessionData(data: {
export interface SaveSessionDataArgs {
websiteId: string;
sessionId: string;
sessionData: DynamicData;
distinctId?: string;
createdAt?: Date;
}) {
}
export async function saveSessionData(data: SaveSessionDataArgs) {
return runQuery({
[PRISMA]: () => relationalQuery(data),
[CLICKHOUSE]: () => clickhouseQuery(data),
});
}
export async function relationalQuery(data: {
websiteId: string;
sessionId: string;
sessionData: DynamicData;
distinctId?: string;
createdAt?: Date;
}) {
export async function relationalQuery({
websiteId,
sessionId,
sessionData,
distinctId,
createdAt,
}: SaveSessionDataArgs) {
const { client } = prisma;
const { websiteId, sessionId, sessionData, distinctId, createdAt } = data;
const jsonKeys = flattenJSON(sessionData);
@ -75,19 +76,15 @@ export async function relationalQuery(data: {
});
}
}
return flattenedData;
}
async function clickhouseQuery(data: {
websiteId: string;
sessionId: string;
sessionData: DynamicData;
distinctId?: string;
createdAt?: Date;
}) {
const { websiteId, sessionId, sessionData, distinctId, createdAt } = data;
async function clickhouseQuery({
websiteId,
sessionId,
sessionData,
distinctId,
createdAt,
}: SaveSessionDataArgs) {
const { insert, getUTCString } = clickhouse;
const { sendMessage } = kafka;
@ -112,6 +109,4 @@ async function clickhouseQuery(data: {
} else {
await insert('session_data', messages);
}
return data;
}