Skip to content

Commit

Permalink
Refactor channel setup
Browse files Browse the repository at this point in the history
  • Loading branch information
DemianParkhomenko committed Jun 11, 2024
1 parent 3e21e96 commit e7db4dd
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 30 deletions.
34 changes: 17 additions & 17 deletions src/channel.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { MessageBusConfig } from 'Types';
import { error, log } from 'Utils';
import { ConfirmChannel } from 'amqplib';
import { configureMessageBus, configureMessageBusStatic } from './config';
import {
closeMessageBusConnection,
Expand Down Expand Up @@ -34,29 +33,30 @@ const channelPromise = getConnection()
.then((connection) => {
return connection.createChannel({
json: true,
setup: async (channel: ConfirmChannel) => {
log(
`Waiting for AMQP to be initialized by invoking initMessageBus() from 'node-message-bus'.`
);
const config = await _initPromise;

await configureMessageBus(config, channel);

log(
`AMQP initialization is complete with the following config: ${JSON.stringify(
config
)}`
);

postInitPromiseResolve(true);
},
});
})
.catch((e) => {
error(e.stack || e);
throw e;
});

channelPromise.then(async (channelWrapper) => {
log(
`Waiting for AMQP to be initialized by invoking initMessageBus() from 'node-message-bus'.`
);
const config = await _initPromise;

await configureMessageBus(config, channelWrapper);

log(
`AMQP initialization is complete with the following config: ${JSON.stringify(
config
)}`
);

postInitPromiseResolve(true);
});

export const getDefaultChannel = async () => {
const channel = await channelPromise;
await channel.waitForConnect();
Expand Down
36 changes: 27 additions & 9 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export const configureMessageBusStatic = async (
*/
export const configureMessageBus = async (
config: MessageBusConfig,
channel?: ChannelWrapper | ConfirmChannel
channel?: ChannelWrapper
) => {
configureMessageBusStatic(config);

Expand All @@ -85,9 +85,17 @@ export const configureMessageBus = async (
)) {
defaultExchangeConfigured = true;
try {
log(`Asserting exchange "${exchange.name}" of type "${exchange.type}".`);
promises.push(
channel.assertExchange(exchange.name, exchange.type, exchange.options)
channel.addSetup(async (channel: ConfirmChannel) => {
log(
`Asserting exchange "${exchange.name}" of type "${exchange.type}".`
);
await channel.assertExchange(
exchange.name,
exchange.type,
exchange.options
);
})
);
appliedConfig.exchanges.push(exchange);
} catch (e) {
Expand All @@ -102,8 +110,12 @@ export const configureMessageBus = async (
DEFAULT_CONFIG.queues || []
)) {
try {
log(`Asserting queue "${queue.name}".`);
promises.push(channel.assertQueue(queue.name, queue.options));
promises.push(
channel.addSetup(async (channel: ConfirmChannel) => {
log(`Asserting queue "${queue.name}".`);
await channel.assertQueue(queue.name, queue.options);
})
);
appliedConfig.queues.push(queue);
} catch (e) {
const message = `Failed to assert queue "${
Expand All @@ -118,11 +130,17 @@ export const configureMessageBus = async (
)) {
const fromExchange = binding.fromExchange || DEFAULT_EXCHANGE_NAME;
try {
log(
`Asserting the queue binding "${fromExchange}" -> "${binding.toQueue}" by key "${binding.routingKey}".`
);
promises.push(
channel.bindQueue(binding.toQueue, fromExchange, binding.routingKey)
channel.addSetup(async (channel: ConfirmChannel) => {
log(
`Asserting the queue binding "${fromExchange}" -> "${binding.toQueue}" by key "${binding.routingKey}".`
);
await channel.bindQueue(
binding.toQueue,
fromExchange,
binding.routingKey
);
})
);
appliedConfig.bindings.push(binding);
} catch (e) {
Expand Down
6 changes: 2 additions & 4 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
pushToLastRejectedMessages,
} from 'Utils';
import { ChannelWrapper } from 'amqp-connection-manager';
import { ConfirmChannel, ConsumeMessage } from 'amqplib';
import { ConsumeMessage } from 'amqplib';
import { DEFAULT_EXCHANGE_NAME } from './Const';
import { getDefaultChannel } from './channel';
import { configureMessageBus, getMessageBusConfig } from './config';
Expand Down Expand Up @@ -118,6 +118,7 @@ export async function consumeMessages<Message extends IMessage>(
queueName: ConsumeMessagesConfig | string,
handler: MessageHandler<Message>
) {
console.log('📍 consumeMessages', queueName);
let prefetch: number | undefined;
const channel = await getDefaultChannel();
if (typeof queueName !== 'string') {
Expand All @@ -132,9 +133,6 @@ export async function consumeMessages<Message extends IMessage>(
if (prefetchCount) {
prefetch = prefetchCount;
}
channel.addSetup(async (channel: ConfirmChannel) => {
await configureMessageBus(config, channel);
});
}

const queue = getMessageBusConfig().queues.find((q) => q.name === queueName);
Expand Down

0 comments on commit e7db4dd

Please sign in to comment.