diff --git a/src/channel.ts b/src/channel.ts index 1edb2bb..cfa7da2 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -1,6 +1,5 @@ import { MessageBusConfig } from 'Types'; import { error, log } from 'Utils'; -import { ConfirmChannel } from 'amqplib'; import { configureMessageBus, configureMessageBusStatic } from './config'; import { closeMessageBusConnection, @@ -34,22 +33,6 @@ const channelPromise = getConnection() .then((connection) => { return connection.createChannel({ json: true, - setup: async (channel: ConfirmChannel) => { - log( - `Waiting for AMQP to be initialized by invoking initMessageBus() from 'node-message-bus'.` - ); - const config = await _initPromise; - - await configureMessageBus(config, channel); - - log( - `AMQP initialization is complete with the following config: ${JSON.stringify( - config - )}` - ); - - postInitPromiseResolve(true); - }, }); }) .catch((e) => { @@ -57,6 +40,23 @@ const channelPromise = getConnection() throw e; }); +channelPromise.then(async (channelWrapper) => { + log( + `Waiting for AMQP to be initialized by invoking initMessageBus() from 'node-message-bus'.` + ); + const config = await _initPromise; + + await configureMessageBus(config, channelWrapper); + + log( + `AMQP initialization is complete with the following config: ${JSON.stringify( + config + )}` + ); + + postInitPromiseResolve(true); +}); + export const getDefaultChannel = async () => { const channel = await channelPromise; await channel.waitForConnect(); diff --git a/src/config.ts b/src/config.ts index 271e670..e64cd18 100644 --- a/src/config.ts +++ b/src/config.ts @@ -69,7 +69,7 @@ export const configureMessageBusStatic = async ( */ export const configureMessageBus = async ( config: MessageBusConfig, - channel?: ChannelWrapper | ConfirmChannel + channel?: ChannelWrapper ) => { configureMessageBusStatic(config); @@ -85,9 +85,17 @@ export const configureMessageBus = async ( )) { defaultExchangeConfigured = true; try { - log(`Asserting exchange "${exchange.name}" of type "${exchange.type}".`); promises.push( - channel.assertExchange(exchange.name, exchange.type, exchange.options) + channel.addSetup(async (channel: ConfirmChannel) => { + log( + `Asserting exchange "${exchange.name}" of type "${exchange.type}".` + ); + await channel.assertExchange( + exchange.name, + exchange.type, + exchange.options + ); + }) ); appliedConfig.exchanges.push(exchange); } catch (e) { @@ -102,8 +110,12 @@ export const configureMessageBus = async ( DEFAULT_CONFIG.queues || [] )) { try { - log(`Asserting queue "${queue.name}".`); - promises.push(channel.assertQueue(queue.name, queue.options)); + promises.push( + channel.addSetup(async (channel: ConfirmChannel) => { + log(`Asserting queue "${queue.name}".`); + await channel.assertQueue(queue.name, queue.options); + }) + ); appliedConfig.queues.push(queue); } catch (e) { const message = `Failed to assert queue "${ @@ -118,11 +130,17 @@ export const configureMessageBus = async ( )) { const fromExchange = binding.fromExchange || DEFAULT_EXCHANGE_NAME; try { - log( - `Asserting the queue binding "${fromExchange}" -> "${binding.toQueue}" by key "${binding.routingKey}".` - ); promises.push( - channel.bindQueue(binding.toQueue, fromExchange, binding.routingKey) + channel.addSetup(async (channel: ConfirmChannel) => { + log( + `Asserting the queue binding "${fromExchange}" -> "${binding.toQueue}" by key "${binding.routingKey}".` + ); + await channel.bindQueue( + binding.toQueue, + fromExchange, + binding.routingKey + ); + }) ); appliedConfig.bindings.push(binding); } catch (e) { diff --git a/src/consumer.ts b/src/consumer.ts index 25a406f..2498712 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -13,7 +13,7 @@ import { pushToLastRejectedMessages, } from 'Utils'; import { ChannelWrapper } from 'amqp-connection-manager'; -import { ConfirmChannel, ConsumeMessage } from 'amqplib'; +import { ConsumeMessage } from 'amqplib'; import { DEFAULT_EXCHANGE_NAME } from './Const'; import { getDefaultChannel } from './channel'; import { configureMessageBus, getMessageBusConfig } from './config'; @@ -118,6 +118,7 @@ export async function consumeMessages( queueName: ConsumeMessagesConfig | string, handler: MessageHandler ) { + console.log('📍 consumeMessages', queueName); let prefetch: number | undefined; const channel = await getDefaultChannel(); if (typeof queueName !== 'string') { @@ -132,9 +133,6 @@ export async function consumeMessages( if (prefetchCount) { prefetch = prefetchCount; } - channel.addSetup(async (channel: ConfirmChannel) => { - await configureMessageBus(config, channel); - }); } const queue = getMessageBusConfig().queues.find((q) => q.name === queueName);