-
Notifications
You must be signed in to change notification settings - Fork 3
/
RMQPubStream.js
45 lines (39 loc) · 1.52 KB
/
RMQPubStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
const amqp = require('amqplib');
const RabbitMQLogger = require('./config/RabbitMQLogger');
const validateTopic = require('./validations/validateTopic');
const validateData = require('./validations/validateData');
const config = require('./config/config');
/**
* Sends a message to a RabbitMQ topic after basic validation and logging.
*
* @param {string} topic - the RabbitMQ topic to publish to
* @param {Object} [data={}] - the data to publish to the topic
* @return {Promise<string>} a Promise that resolves with a success message or rejects with an error message
*/
const RMQPubStream = async (topic, data = {}) => {
// basic validation
validateTopic(topic);
validateData(data);
const connection = await amqp.connect(config.AMQP_URL);
const channel = await connection.createChannel();
// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve, reject) => {
try {
// setting up loggers
RabbitMQLogger.settopic(topic);
RabbitMQLogger.setLogData(data);
await channel.assertQueue(topic);
channel.sendToQueue(topic, Buffer.from(JSON.stringify(data)));
await channel.close();
connection.close();
RabbitMQLogger.info(topic, data, `Message has been broadcast on ${topic}`);
resolve(`${topic} has been broadcasted`);
} catch (error) {
// setting up loggers
RabbitMQLogger.debug('Error to publish', error);
RabbitMQLogger.error(topic, {}, error.message);
reject(error.message);
}
});
};
module.exports = RMQPubStream;