Database refactoring.

This commit is contained in:
Mike Cao 2022-08-27 21:38:35 -07:00
parent bb184dc2cc
commit 467c7f289f
37 changed files with 566 additions and 591 deletions

View file

@ -1,40 +1,51 @@
import { ClickHouse } from 'clickhouse';
import dateFormat from 'dateformat';
import debug from 'debug';
import { FILTER_IGNORED } from 'lib/constants';
import { CLICKHOUSE_DATE_FORMATS } from './constants';
import { CLICKHOUSE } from 'lib/db';
export const CLICKHOUSE_DATE_FORMATS = {
minute: '%Y-%m-%d %H:%M:00',
hour: '%Y-%m-%d %H:00:00',
day: '%Y-%m-%d',
month: '%Y-%m-01',
year: '%Y-01-01',
};
const log = debug('clickhouse');
function getClient() {
if (!process.env.ANALYTICS_URL) {
if (!process.env.CLICKHOUSE_URL) {
return null;
}
const url = new URL(process.env.ANALYTICS_URL);
const database = url.pathname.replace('/', '');
const {
hostname,
port,
pathname,
username = 'default',
password,
} = new URL(process.env.CLICKHOUSE_URL);
return new ClickHouse({
url: url.hostname,
port: Number(url.port),
basicAuth: url.password
? {
username: url.username || 'default',
password: url.password,
}
: null,
const client = new ClickHouse({
url: hostname,
port: Number(port),
format: 'json',
config: {
database,
database: pathname.replace('/', ''),
},
basicAuth: password ? { username, password } : null,
});
if (process.env.NODE_ENV !== 'production') {
global[CLICKHOUSE] = clickhouse;
}
log('Clickhouse initialized');
return client;
}
const clickhouse = global.clickhouse || getClient();
if (process.env.NODE_ENV !== 'production') {
global.clickhouse = clickhouse;
}
export { clickhouse };
function getDateStringQuery(data, unit) {
return `formatDateTime(${data}, '${CLICKHOUSE_DATE_FORMATS[unit]}')`;
}
@ -176,7 +187,12 @@ async function findFirst(data) {
return data[0] ?? null;
}
// Initialization
const clickhouse = global[CLICKHOUSE] || getClient();
export default {
client: clickhouse,
log,
getDateStringQuery,
getDateQuery,
getDateFormat,

View file

@ -67,36 +67,6 @@ export const EVENT_COLORS = [
'#ffec16',
];
export const RELATIONAL = 'relational';
export const POSTGRESQL = 'postgresql';
export const MYSQL = 'mysql';
export const CLICKHOUSE = 'clickhouse';
export const KAFKA = 'kafka';
export const MYSQL_DATE_FORMATS = {
minute: '%Y-%m-%d %H:%i:00',
hour: '%Y-%m-%d %H:00:00',
day: '%Y-%m-%d',
month: '%Y-%m-01',
year: '%Y-01-01',
};
export const POSTGRESQL_DATE_FORMATS = {
minute: 'YYYY-MM-DD HH24:MI:00',
hour: 'YYYY-MM-DD HH24:00:00',
day: 'YYYY-MM-DD',
month: 'YYYY-MM-01',
year: 'YYYY-01-01',
};
export const CLICKHOUSE_DATE_FORMATS = {
minute: '%Y-%m-%d %H:%M:00',
hour: '%Y-%m-%d %H:00:00',
day: '%Y-%m-%d',
month: '%Y-%m-01',
year: '%Y-01-01',
};
export const FILTER_IGNORED = Symbol.for('filter-ignored');
export const DOMAIN_REGEX =
@ -107,6 +77,7 @@ export const LAPTOP_SCREEN_WIDTH = 1024;
export const MOBILE_SCREEN_WIDTH = 479;
export const URL_LENGTH = 500;
export const EVENT_NAME_LENGTH = 50;
export const DESKTOP_OS = [
'Windows 3.11',

View file

@ -1,5 +1,11 @@
import { POSTGRESQL, RELATIONAL, MYSQL, KAFKA, CLICKHOUSE } from 'lib/constants';
export const PRISMA = 'prisma';
export const POSTGRESQL = 'postgresql';
export const MYSQL = 'mysql';
export const CLICKHOUSE = 'clickhouse';
export const KAFKA = 'kafka';
export const KAFKA_PRODUCER = 'kafka-producer';
// Fixes issue with converting bigint values
BigInt.prototype.toJSON = function () {
return Number(this);
};
@ -14,11 +20,11 @@ export function getDatabaseType(url = process.env.DATABASE_URL) {
return type;
}
export async function runAnalyticsQuery(queries) {
const db = getDatabaseType(process.env.ANALYTICS_URL || process.env.DATABASE_URL);
export async function runQuery(queries) {
const db = getDatabaseType(process.env.CLICKHOUSE_URL || process.env.DATABASE_URL);
if (db === POSTGRESQL || db === MYSQL) {
return queries[RELATIONAL]();
return queries[PRISMA]();
}
if (db === CLICKHOUSE) {

View file

@ -74,7 +74,7 @@ export function stringToColor(str) {
let color = '#';
for (let i = 0; i < 3; i++) {
let value = (hash >> (i * 8)) & 0xff;
color += ('00' + value.toString(16)).substr(-2);
color += ('00' + value.toString(16)).substring(-2);
}
return color;
}

View file

@ -1,57 +1,53 @@
import { Kafka, logLevel } from 'kafkajs';
import dateFormat from 'dateformat';
import debug from 'debug';
import { KAFKA, KAFKA_PRODUCER } from 'lib/db';
export function getClient() {
if (!process.env.KAFKA_URL) {
const log = debug('kafka');
function getClient() {
if (!process.env.KAFKA_URL || !process.env.KAFKA_BROKER) {
return null;
}
const url = new URL(process.env.KAFKA_URL);
const { username, password } = new URL(process.env.KAFKA_URL);
const brokers = process.env.KAFKA_BROKER.split(',');
if (url.username.length === 0 && url.password.length === 0) {
return new Kafka({
clientId: 'umami',
brokers: brokers,
connectionTimeout: 3000,
logLevel: logLevel.ERROR,
});
} else {
return new Kafka({
clientId: 'umami',
brokers: brokers,
connectionTimeout: 3000,
ssl: true,
sasl: {
mechanism: 'plain',
username: url.username,
password: url.password,
},
});
}
}
const kafka = global.kafka || getClient();
let kafkaProducer = null;
const ssl =
username && password
? {
ssl: true,
sasl: {
mechanism: 'plain',
username,
password,
},
}
: {};
(async () => {
if (kafka) {
kafkaProducer = global.kakfaProducer || (await getProducer());
}
const client = new Kafka({
clientId: 'umami',
brokers: brokers,
connectionTimeout: 3000,
logLevel: logLevel.ERROR,
...ssl,
});
if (process.env.NODE_ENV !== 'production') {
global.kafka = kafka;
if (kafka) {
global.kakfaProducer = kafkaProducer;
}
global[KAFKA] = client;
}
})();
export { kafka, kafkaProducer };
return client;
}
async function getProducer() {
const producer = kafka.producer();
await producer.connect();
if (process.env.NODE_ENV !== 'production') {
global[KAFKA_PRODUCER] = producer;
}
return producer;
}
@ -60,7 +56,7 @@ function getDateFormat(date) {
}
async function sendMessage(params, topic) {
await kafkaProducer.send({
await producer.send({
topic,
messages: [
{
@ -72,7 +68,19 @@ async function sendMessage(params, topic) {
});
}
// Initialization
let kafka;
let producer;
(async () => {
kafka = global[KAFKA] || getClient();
producer = global[KAFKA_PRODUCER] || (await getProducer());
})();
export default {
client: kafka,
producer: producer,
log,
getDateFormat,
sendMessage,
};

View file

@ -1,19 +1,30 @@
import { PrismaClient } from '@prisma/client';
import chalk from 'chalk';
import {
FILTER_IGNORED,
MYSQL,
MYSQL_DATE_FORMATS,
POSTGRESQL,
POSTGRESQL_DATE_FORMATS,
} from 'lib/constants';
import { getDatabaseType } from 'lib/db';
import moment from 'moment-timezone';
import debug from 'debug';
import { PRISMA, MYSQL, POSTGRESQL } from 'lib/db';
import { FILTER_IGNORED } from 'lib/constants';
import { getDatabaseType } from 'lib/db';
const MYSQL_DATE_FORMATS = {
minute: '%Y-%m-%d %H:%i:00',
hour: '%Y-%m-%d %H:00:00',
day: '%Y-%m-%d',
month: '%Y-%m-01',
year: '%Y-01-01',
};
const POSTGRESQL_DATE_FORMATS = {
minute: 'YYYY-MM-DD HH24:MI:00',
hour: 'YYYY-MM-DD HH24:00:00',
day: 'YYYY-MM-DD',
month: 'YYYY-MM-01',
year: 'YYYY-01-01',
};
const log = debug('prisma');
const options = {
const PRISMA_OPTIONS = {
log: [
{
emit: 'event',
@ -33,16 +44,16 @@ function getClient(options) {
prisma.$on('query', logQuery);
}
if (process.env.NODE_ENV !== 'production') {
global[PRISMA] = prisma;
}
log('Prisma initialized');
return prisma;
}
let prisma = global.prisma || getClient(options);
if (process.env.NODE_ENV !== 'production') {
global.prisma = prisma;
}
export function getDateQuery(field, unit, timezone) {
function getDateQuery(field, unit, timezone) {
const db = getDatabaseType(process.env.DATABASE_URL);
if (db === POSTGRESQL) {
@ -63,7 +74,7 @@ export function getDateQuery(field, unit, timezone) {
}
}
export function getTimestampInterval(field) {
function getTimestampInterval(field) {
const db = getDatabaseType(process.env.DATABASE_URL);
if (db === POSTGRESQL) {
@ -75,7 +86,7 @@ export function getTimestampInterval(field) {
}
}
export function getFilterQuery(table, column, filters = {}, params = []) {
function getFilterQuery(table, column, filters = {}, params = []) {
const query = Object.keys(filters).reduce((arr, key) => {
const filter = filters[key];
@ -135,7 +146,7 @@ export function getFilterQuery(table, column, filters = {}, params = []) {
return query.join('\n');
}
export function parseFilters(table, column, filters = {}, params = [], sessionKey = 'session_id') {
function parseFilters(table, column, filters = {}, params = [], sessionKey = 'session_id') {
const { domain, url, event_url, referrer, os, browser, device, country, event_name, query } =
filters;
@ -158,13 +169,7 @@ export function parseFilters(table, column, filters = {}, params = [], sessionKe
};
}
export async function runQuery(query) {
return query.catch(e => {
throw e;
});
}
export async function rawQuery(query, params = []) {
async function rawQuery(query, params = []) {
const db = getDatabaseType(process.env.DATABASE_URL);
if (db !== POSTGRESQL && db !== MYSQL) {
@ -173,17 +178,23 @@ export async function rawQuery(query, params = []) {
const sql = db === MYSQL ? query.replace(/\$[0-9]+/g, '?') : query;
return runQuery(prisma.$queryRawUnsafe.apply(prisma, [sql, ...params]));
return prisma.$queryRawUnsafe.apply(prisma, [sql, ...params]);
}
export { prisma };
async function multiQuery(queries) {
return prisma.$transaction(queries);
}
// Initialization
const prisma = global[PRISMA] || getClient(PRISMA_OPTIONS);
export default {
prisma,
client: prisma,
log,
getDateQuery,
getTimestampInterval,
getFilterQuery,
parseFilters,
runQuery,
rawQuery,
multiQuery,
};

View file

@ -1,6 +1,10 @@
import { createClient } from 'redis';
import { startOfMonth } from 'date-fns';
import { getSessions, getAllWebsites } from '/queries';
import debug from 'debug';
const log = debug('db:redis');
const REDIS = Symbol.for('redis');
async function getClient() {
const redis = new createClient({
@ -10,30 +14,16 @@ async function getClient() {
await redis.connect();
if (process.env.LOG_QUERY) {
redis.on('error', err => console.log('Redis Client Error', err));
redis.on('error', err => log(err));
}
if (process.env.NODE_ENV !== 'production') {
global[REDIS] = redis;
}
return redis;
}
let redis = null;
(async () => {
redis = global.redis || (await getClient());
if (process.env.NODE_ENV !== 'production') {
global.redis = redis;
}
const value = await redis.get('initialized');
if (!value) {
await stageData();
}
})();
export default redis;
async function stageData() {
const sessions = await getSessions([], startOfMonth(new Date()).toUTCString());
const websites = await getAllWebsites();
@ -57,3 +47,18 @@ async function addRedis(ids) {
await redis.set(key, value);
}
}
// Initialization
let redis = null;
(async () => {
redis = global[REDIS] || (await getClient());
const value = await redis.get('initialized');
if (!value) {
await stageData();
}
})();
export default redis;