Merge branch 'dev' of https://github.com/mikecao/umami into dev

This commit is contained in:
Mike Cao 2022-08-23 00:48:57 -07:00
commit 700158f5f6
25 changed files with 1089 additions and 509 deletions

View file

@ -71,6 +71,7 @@ export const RELATIONAL = 'relational';
export const POSTGRESQL = 'postgresql';
export const MYSQL = 'mysql';
export const CLICKHOUSE = 'clickhouse';
export const KAFKA = 'kafka';
export const MYSQL_DATE_FORMATS = {
minute: '%Y-%m-%d %H:%i:00',

View file

@ -1,6 +1,8 @@
import { PrismaClient } from '@prisma/client';
import { ClickHouse } from 'clickhouse';
import dateFormat from 'dateformat';
import chalk from 'chalk';
import { getKafkaService } from './kafka';
import {
MYSQL,
MYSQL_DATE_FORMATS,
@ -9,6 +11,7 @@ import {
CLICKHOUSE,
RELATIONAL,
FILTER_IGNORED,
KAFKA,
} from 'lib/constants';
import moment from 'moment-timezone';
import { CLICKHOUSE_DATE_FORMATS } from './constants';
@ -87,9 +90,7 @@ export function getDatabase() {
}
export function getAnalyticsDatabase() {
const type =
process.env.ANALYTICS_TYPE ||
(process.env.ANALYTICS_URL && process.env.ANALYTICS_URL.split(':')[0]);
const type = process.env.ANALYTICS_URL && process.env.ANALYTICS_URL.split(':')[0];
if (type === 'postgres') {
return POSTGRESQL;
@ -129,13 +130,13 @@ export function getDateQuery(field, unit, timezone) {
export function getDateQueryClickhouse(field, unit, timezone) {
if (timezone) {
return `date_trunc('${unit}', ${field},'${timezone}')`;
return `date_trunc('${unit}', ${field}, '${timezone}')`;
}
return `date_trunc('${unit}', ${field})`;
}
export function getDateFormatClickhouse(date) {
return `parseDateTimeBestEffort('${date.toUTCString()}')`;
return `'${dateFormat(date, 'UTC:yyyy-mm-dd HH:MM:ss')}'`;
}
export function getBetweenDatesClickhouse(field, start_at, end_at) {
@ -219,8 +220,6 @@ export function parseFilters(table, column, filters = {}, params = [], sessionKe
const { domain, url, event_url, referrer, os, browser, device, country, event_name, query } =
filters;
console.log({ table, column, filters, params });
const pageviewFilters = { domain, url, referrer, query };
const sessionFilters = { os, browser, device, country };
const eventFilters = { url: event_url, event_name };
@ -300,6 +299,10 @@ export async function runAnalyticsQuery(queries) {
}
if (db === CLICKHOUSE) {
const kafka = getKafkaService();
if (kafka === KAFKA && queries[KAFKA]) {
return queries[KAFKA]();
}
return queries[CLICKHOUSE]();
}
}

64
lib/kafka.js Normal file
View file

@ -0,0 +1,64 @@
import { Kafka } from 'kafkajs';
import dateFormat from 'dateformat';
export function getKafkaClient() {
if (!process.env.KAFKA_URL) {
return null;
}
const url = new URL(process.env.KAFKA_URL);
const brokers = process.env.KAFKA_BROKER.split(',');
if (url.username.length === 0 && url.password.length === 0) {
return new Kafka({
clientId: 'umami',
brokers: brokers,
connectionTimeout: 3000,
});
} else {
return new Kafka({
clientId: 'umami',
brokers: brokers,
connectionTimeout: 3000,
ssl: true,
sasl: {
mechanism: 'plain',
username: url.username,
password: url.password,
},
});
}
}
const kafka = global.kafka || getKafkaClient();
if (process.env.NODE_ENV !== 'production') {
global.kafka = kafka;
}
export { kafka };
export async function kafkaProducer(params, topic) {
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic,
messages: [
{
key: 'key',
value: JSON.stringify(params),
},
],
});
}
export function getDateFormatKafka(date) {
return dateFormat(date, 'UTC:yyyy-mm-dd HH:MM:ss');
}
export function getKafkaService() {
const type = process.env.KAFKA_URL && process.env.KAFKA_URL.split(':')[0];
return type;
}