mirror of
https://github.com/umami-software/umami.git
synced 2026-02-13 00:55:37 +01:00
Adding logging for Kafka. Refactor send method.
This commit is contained in:
parent
1da81150d5
commit
aaa51e72d3
3 changed files with 26 additions and 30 deletions
|
|
@ -1,3 +1,4 @@
|
||||||
|
import { serializeError } from 'serialize-error';
|
||||||
import debug from 'debug';
|
import debug from 'debug';
|
||||||
import { Kafka, Producer, RecordMetadata, SASLOptions, logLevel } from 'kafkajs';
|
import { Kafka, Producer, RecordMetadata, SASLOptions, logLevel } from 'kafkajs';
|
||||||
import { KAFKA, KAFKA_PRODUCER } from 'lib/db';
|
import { KAFKA, KAFKA_PRODUCER } from 'lib/db';
|
||||||
|
|
@ -63,33 +64,29 @@ async function getProducer(): Promise<Producer> {
|
||||||
|
|
||||||
async function sendMessage(
|
async function sendMessage(
|
||||||
topic: string,
|
topic: string,
|
||||||
message: { [key: string]: string | number },
|
message: { [key: string]: string | number } | { [key: string]: string | number }[],
|
||||||
): Promise<RecordMetadata[]> {
|
): Promise<RecordMetadata[]> {
|
||||||
await connect();
|
try {
|
||||||
|
await connect();
|
||||||
|
|
||||||
return producer.send({
|
return producer.send({
|
||||||
topic,
|
topic,
|
||||||
messages: [
|
messages: Array.isArray(message)
|
||||||
{
|
? message.map(a => {
|
||||||
value: JSON.stringify(message),
|
return { value: JSON.stringify(a) };
|
||||||
},
|
})
|
||||||
],
|
: [
|
||||||
timeout: SEND_TIMEOUT,
|
{
|
||||||
acks: ACKS,
|
value: JSON.stringify(message),
|
||||||
});
|
},
|
||||||
}
|
],
|
||||||
|
timeout: SEND_TIMEOUT,
|
||||||
async function sendMessages(topic: string, messages: { [key: string]: string | number }[]) {
|
acks: ACKS,
|
||||||
await connect();
|
});
|
||||||
|
} catch (e) {
|
||||||
await producer.send({
|
// eslint-disable-next-line no-console
|
||||||
topic,
|
console.log('KAFKA ERROR:', serializeError(e));
|
||||||
messages: messages.map(a => {
|
}
|
||||||
return { value: JSON.stringify(a) };
|
|
||||||
}),
|
|
||||||
timeout: SEND_TIMEOUT,
|
|
||||||
acks: ACKS,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async function connect(): Promise<Kafka> {
|
async function connect(): Promise<Kafka> {
|
||||||
|
|
@ -111,5 +108,4 @@ export default {
|
||||||
log,
|
log,
|
||||||
connect,
|
connect,
|
||||||
sendMessage,
|
sendMessage,
|
||||||
sendMessages,
|
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,7 @@ async function clickhouseQuery(data: {
|
||||||
const { websiteId, sessionId, eventId, urlPath, eventName, eventData, createdAt } = data;
|
const { websiteId, sessionId, eventId, urlPath, eventName, eventData, createdAt } = data;
|
||||||
|
|
||||||
const { insert, getUTCString } = clickhouse;
|
const { insert, getUTCString } = clickhouse;
|
||||||
const { sendMessages } = kafka;
|
const { sendMessage } = kafka;
|
||||||
|
|
||||||
const jsonKeys = flattenJSON(eventData);
|
const jsonKeys = flattenJSON(eventData);
|
||||||
|
|
||||||
|
|
@ -82,7 +82,7 @@ async function clickhouseQuery(data: {
|
||||||
});
|
});
|
||||||
|
|
||||||
if (kafka.enabled) {
|
if (kafka.enabled) {
|
||||||
await sendMessages('event_data', messages);
|
await sendMessage('event_data', messages);
|
||||||
} else {
|
} else {
|
||||||
await insert('event_data', messages);
|
await insert('event_data', messages);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,7 @@ async function clickhouseQuery(data: {
|
||||||
const { websiteId, sessionId, sessionData } = data;
|
const { websiteId, sessionId, sessionData } = data;
|
||||||
|
|
||||||
const { insert, getUTCString } = clickhouse;
|
const { insert, getUTCString } = clickhouse;
|
||||||
const { sendMessages } = kafka;
|
const { sendMessage } = kafka;
|
||||||
const createdAt = getUTCString();
|
const createdAt = getUTCString();
|
||||||
|
|
||||||
const jsonKeys = flattenJSON(sessionData);
|
const jsonKeys = flattenJSON(sessionData);
|
||||||
|
|
@ -100,7 +100,7 @@ async function clickhouseQuery(data: {
|
||||||
});
|
});
|
||||||
|
|
||||||
if (kafka.enabled) {
|
if (kafka.enabled) {
|
||||||
await sendMessages('session_data', messages);
|
await sendMessage('session_data', messages);
|
||||||
} else {
|
} else {
|
||||||
await insert('session_data', messages);
|
await insert('session_data', messages);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue