redis checkpoint

This commit is contained in:
Brian Cao 2022-08-26 20:21:53 -07:00
parent 10cc6616c5
commit 818f8721e9
10 changed files with 187 additions and 34 deletions

View file

@ -1,4 +1,5 @@
import { prisma, runQuery } from 'lib/relational';
import redis from 'lib/redis';
export async function createWebsite(user_id, data) {
return runQuery(
@ -12,5 +13,11 @@ export async function createWebsite(user_id, data) {
...data,
},
}),
);
).then(async res => {
if (process.env.REDIS_URL) {
await redis.set(`website:${res.website_uuid}`, Number(res.website_id));
}
return res;
});
}

View file

@ -1,23 +1,30 @@
import { prisma, runQuery } from 'lib/relational';
import redis from 'lib/redis';
export async function deleteWebsite(website_id) {
return runQuery(
prisma.$transaction([
prisma.pageview.deleteMany({
where: { session: { website: { website_id } } },
prisma
.$transaction([
prisma.pageview.deleteMany({
where: { session: { website: { website_id } } },
}),
prisma.event_data.deleteMany({
where: { event: { session: { website: { website_id } } } },
}),
prisma.event.deleteMany({
where: { session: { website: { website_id } } },
}),
prisma.session.deleteMany({
where: { website: { website_id } },
}),
prisma.website.delete({
where: { website_id },
}),
])
.then(async res => {
if (process.env.REDIS_URL) {
await redis.del(`website:${res.website_uuid}`);
}
}),
prisma.event_data.deleteMany({
where: { event: { session: { website: { website_id } } } },
}),
prisma.event.deleteMany({
where: { session: { website: { website_id } } },
}),
prisma.session.deleteMany({
where: { website: { website_id } },
}),
prisma.website.delete({
where: { website_id },
}),
]),
);
}

View file

@ -1,8 +1,9 @@
import { CLICKHOUSE, KAFKA, RELATIONAL } from 'lib/constants';
import { prisma, runQuery } from 'lib/relational';
import clickhouse from 'lib/clickhouse';
import kafka from 'lib/kafka';
import { CLICKHOUSE, KAFKA, RELATIONAL } from 'lib/constants';
import { runAnalyticsQuery } from 'lib/db';
import kafka from 'lib/kafka';
import redis from 'lib/redis';
import { prisma, runQuery } from 'lib/relational';
export async function createSession(...args) {
return runAnalyticsQuery({
@ -23,7 +24,13 @@ async function relationalQuery(website_id, data) {
session_id: true,
},
}),
);
).then(async res => {
if (process.env.REDIS_URL) {
await redis.set(`session:${res.session_uuid}`, '');
}
return res;
});
}
async function clickhouseQuery(
@ -67,4 +74,6 @@ async function kafkaQuery(
};
await kafka.sendKafkaMessage(params, 'session');
await redis.set(`session:${session_uuid}`, '');
}

View file

@ -14,11 +14,15 @@ async function relationalQuery(websites, start_at) {
return runQuery(
prisma.session.findMany({
where: {
website: {
website_id: {
in: websites,
},
},
...(websites && websites.length > 0
? {
website: {
website_id: {
in: websites,
},
},
}
: {}),
created_at: {
gte: start_at,
},
@ -31,7 +35,6 @@ async function clickhouseQuery(websites, start_at) {
return clickhouse.rawQuery(
`
select
session_id,
session_uuid,
website_id,
created_at,
@ -43,8 +46,8 @@ async function clickhouseQuery(websites, start_at) {
"language",
country
from session
where website_id in (${websites.join[',']}
and created_at >= ${clickhouse.getDateFormat(start_at)})
where ${websites && websites.length > 0 ? `(website_id in (${websites.join[',']})` : '0 = 0'}
and created_at >= ${clickhouse.getDateFormat(start_at)}
`,
);
}