diff --git a/src/lib/kafka.ts b/src/lib/kafka.ts index b73e2d45..2e875c59 100644 --- a/src/lib/kafka.ts +++ b/src/lib/kafka.ts @@ -1,3 +1,4 @@ +import { serializeError } from 'serialize-error'; import debug from 'debug'; import { Kafka, Producer, RecordMetadata, SASLOptions, logLevel } from 'kafkajs'; import { KAFKA, KAFKA_PRODUCER } from 'lib/db'; @@ -63,33 +64,29 @@ async function getProducer(): Promise { async function sendMessage( topic: string, - message: { [key: string]: string | number }, + message: { [key: string]: string | number } | { [key: string]: string | number }[], ): Promise { - await connect(); + try { + await connect(); - return producer.send({ - topic, - messages: [ - { - value: JSON.stringify(message), - }, - ], - timeout: SEND_TIMEOUT, - acks: ACKS, - }); -} - -async function sendMessages(topic: string, messages: { [key: string]: string | number }[]) { - await connect(); - - await producer.send({ - topic, - messages: messages.map(a => { - return { value: JSON.stringify(a) }; - }), - timeout: SEND_TIMEOUT, - acks: ACKS, - }); + return producer.send({ + topic, + messages: Array.isArray(message) + ? message.map(a => { + return { value: JSON.stringify(a) }; + }) + : [ + { + value: JSON.stringify(message), + }, + ], + timeout: SEND_TIMEOUT, + acks: ACKS, + }); + } catch (e) { + // eslint-disable-next-line no-console + console.log('KAFKA ERROR:', serializeError(e)); + } } async function connect(): Promise { @@ -111,5 +108,4 @@ export default { log, connect, sendMessage, - sendMessages, }; diff --git a/src/queries/analytics/events/saveEventData.ts b/src/queries/analytics/events/saveEventData.ts index 7e7db8b4..cb75a91b 100644 --- a/src/queries/analytics/events/saveEventData.ts +++ b/src/queries/analytics/events/saveEventData.ts @@ -61,7 +61,7 @@ async function clickhouseQuery(data: { const { websiteId, sessionId, eventId, urlPath, eventName, eventData, createdAt } = data; const { insert, getUTCString } = clickhouse; - const { sendMessages } = kafka; + const { sendMessage } = kafka; const jsonKeys = flattenJSON(eventData); @@ -82,7 +82,7 @@ async function clickhouseQuery(data: { }); if (kafka.enabled) { - await sendMessages('event_data', messages); + await sendMessage('event_data', messages); } else { await insert('event_data', messages); } diff --git a/src/queries/analytics/sessions/saveSessionData.ts b/src/queries/analytics/sessions/saveSessionData.ts index f9f0276e..64bd1d93 100644 --- a/src/queries/analytics/sessions/saveSessionData.ts +++ b/src/queries/analytics/sessions/saveSessionData.ts @@ -81,7 +81,7 @@ async function clickhouseQuery(data: { const { websiteId, sessionId, sessionData } = data; const { insert, getUTCString } = clickhouse; - const { sendMessages } = kafka; + const { sendMessage } = kafka; const createdAt = getUTCString(); const jsonKeys = flattenJSON(sessionData); @@ -100,7 +100,7 @@ async function clickhouseQuery(data: { }); if (kafka.enabled) { - await sendMessages('session_data', messages); + await sendMessage('session_data', messages); } else { await insert('session_data', messages); }