From 3e21e96e13d8f235eba1d68dd3eb7c15dcb72e6f Mon Sep 17 00:00:00 2001 From: Demian Parkhomenko <95881717+DemianParkhomenko@users.noreply.github.com> Date: Tue, 11 Jun 2024 18:24:14 +0300 Subject: [PATCH 1/5] Add dynamic queues to setup function in `ChannelWrapper` --- package-lock.json | 4 ++-- src/consumer.ts | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/package-lock.json b/package-lock.json index 7f4037d..53ce98c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "node-message-bus", - "version": "3.3.3", + "version": "3.3.4", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "node-message-bus", - "version": "3.3.3", + "version": "3.3.4", "license": "MIT", "dependencies": { "@types/amqplib": "^0.10.5", diff --git a/src/consumer.ts b/src/consumer.ts index a508f20..25a406f 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -13,7 +13,7 @@ import { pushToLastRejectedMessages, } from 'Utils'; import { ChannelWrapper } from 'amqp-connection-manager'; -import { ConsumeMessage } from 'amqplib'; +import { ConfirmChannel, ConsumeMessage } from 'amqplib'; import { DEFAULT_EXCHANGE_NAME } from './Const'; import { getDefaultChannel } from './channel'; import { configureMessageBus, getMessageBusConfig } from './config'; @@ -119,6 +119,7 @@ export async function consumeMessages( handler: MessageHandler ) { let prefetch: number | undefined; + const channel = await getDefaultChannel(); if (typeof queueName !== 'string') { if (!queueName.queues || queueName.queues.length !== 1) { throw new Error( @@ -131,9 +132,11 @@ export async function consumeMessages( if (prefetchCount) { prefetch = prefetchCount; } + channel.addSetup(async (channel: ConfirmChannel) => { + await configureMessageBus(config, channel); + }); } - const channel = await getDefaultChannel(); const queue = getMessageBusConfig().queues.find((q) => q.name === queueName); const consumerTag = `${ process.env.HOSTNAME || 'no.env.HOSTNAME' From e7db4dd43f2316796feb46508e06edc91113dd6c Mon Sep 17 00:00:00 2001 From: Demian Parkhomenko <95881717+DemianParkhomenko@users.noreply.github.com> Date: Tue, 11 Jun 2024 20:02:05 +0300 Subject: [PATCH 2/5] Refactor channel setup --- src/channel.ts | 34 +++++++++++++++++----------------- src/config.ts | 36 +++++++++++++++++++++++++++--------- src/consumer.ts | 6 ++---- 3 files changed, 46 insertions(+), 30 deletions(-) diff --git a/src/channel.ts b/src/channel.ts index 1edb2bb..cfa7da2 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -1,6 +1,5 @@ import { MessageBusConfig } from 'Types'; import { error, log } from 'Utils'; -import { ConfirmChannel } from 'amqplib'; import { configureMessageBus, configureMessageBusStatic } from './config'; import { closeMessageBusConnection, @@ -34,22 +33,6 @@ const channelPromise = getConnection() .then((connection) => { return connection.createChannel({ json: true, - setup: async (channel: ConfirmChannel) => { - log( - `Waiting for AMQP to be initialized by invoking initMessageBus() from 'node-message-bus'.` - ); - const config = await _initPromise; - - await configureMessageBus(config, channel); - - log( - `AMQP initialization is complete with the following config: ${JSON.stringify( - config - )}` - ); - - postInitPromiseResolve(true); - }, }); }) .catch((e) => { @@ -57,6 +40,23 @@ const channelPromise = getConnection() throw e; }); +channelPromise.then(async (channelWrapper) => { + log( + `Waiting for AMQP to be initialized by invoking initMessageBus() from 'node-message-bus'.` + ); + const config = await _initPromise; + + await configureMessageBus(config, channelWrapper); + + log( + `AMQP initialization is complete with the following config: ${JSON.stringify( + config + )}` + ); + + postInitPromiseResolve(true); +}); + export const getDefaultChannel = async () => { const channel = await channelPromise; await channel.waitForConnect(); diff --git a/src/config.ts b/src/config.ts index 271e670..e64cd18 100644 --- a/src/config.ts +++ b/src/config.ts @@ -69,7 +69,7 @@ export const configureMessageBusStatic = async ( */ export const configureMessageBus = async ( config: MessageBusConfig, - channel?: ChannelWrapper | ConfirmChannel + channel?: ChannelWrapper ) => { configureMessageBusStatic(config); @@ -85,9 +85,17 @@ export const configureMessageBus = async ( )) { defaultExchangeConfigured = true; try { - log(`Asserting exchange "${exchange.name}" of type "${exchange.type}".`); promises.push( - channel.assertExchange(exchange.name, exchange.type, exchange.options) + channel.addSetup(async (channel: ConfirmChannel) => { + log( + `Asserting exchange "${exchange.name}" of type "${exchange.type}".` + ); + await channel.assertExchange( + exchange.name, + exchange.type, + exchange.options + ); + }) ); appliedConfig.exchanges.push(exchange); } catch (e) { @@ -102,8 +110,12 @@ export const configureMessageBus = async ( DEFAULT_CONFIG.queues || [] )) { try { - log(`Asserting queue "${queue.name}".`); - promises.push(channel.assertQueue(queue.name, queue.options)); + promises.push( + channel.addSetup(async (channel: ConfirmChannel) => { + log(`Asserting queue "${queue.name}".`); + await channel.assertQueue(queue.name, queue.options); + }) + ); appliedConfig.queues.push(queue); } catch (e) { const message = `Failed to assert queue "${ @@ -118,11 +130,17 @@ export const configureMessageBus = async ( )) { const fromExchange = binding.fromExchange || DEFAULT_EXCHANGE_NAME; try { - log( - `Asserting the queue binding "${fromExchange}" -> "${binding.toQueue}" by key "${binding.routingKey}".` - ); promises.push( - channel.bindQueue(binding.toQueue, fromExchange, binding.routingKey) + channel.addSetup(async (channel: ConfirmChannel) => { + log( + `Asserting the queue binding "${fromExchange}" -> "${binding.toQueue}" by key "${binding.routingKey}".` + ); + await channel.bindQueue( + binding.toQueue, + fromExchange, + binding.routingKey + ); + }) ); appliedConfig.bindings.push(binding); } catch (e) { diff --git a/src/consumer.ts b/src/consumer.ts index 25a406f..2498712 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -13,7 +13,7 @@ import { pushToLastRejectedMessages, } from 'Utils'; import { ChannelWrapper } from 'amqp-connection-manager'; -import { ConfirmChannel, ConsumeMessage } from 'amqplib'; +import { ConsumeMessage } from 'amqplib'; import { DEFAULT_EXCHANGE_NAME } from './Const'; import { getDefaultChannel } from './channel'; import { configureMessageBus, getMessageBusConfig } from './config'; @@ -118,6 +118,7 @@ export async function consumeMessages( queueName: ConsumeMessagesConfig | string, handler: MessageHandler ) { + console.log('📍 consumeMessages', queueName); let prefetch: number | undefined; const channel = await getDefaultChannel(); if (typeof queueName !== 'string') { @@ -132,9 +133,6 @@ export async function consumeMessages( if (prefetchCount) { prefetch = prefetchCount; } - channel.addSetup(async (channel: ConfirmChannel) => { - await configureMessageBus(config, channel); - }); } const queue = getMessageBusConfig().queues.find((q) => q.name === queueName); From 2f3308481f0e93c6b780c087a7fe19b01ae968d2 Mon Sep 17 00:00:00 2001 From: Demian Parkhomenko <95881717+DemianParkhomenko@users.noreply.github.com> Date: Tue, 11 Jun 2024 20:08:40 +0300 Subject: [PATCH 3/5] Fix the publish workflow to run on pull requests --- .github/workflows/publish.yaml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 5f163c1..ddccc7e 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -2,8 +2,9 @@ name: Publish NPM package on: push: - branches: - - main + branches: [main] + pull_request: + branches: [main] env: CLOUDAMQP_API_KEY: ${{ secrets.CLOUDAMQP_API_KEY }} @@ -21,5 +22,6 @@ jobs: - run: npm run build - run: npm test - uses: JS-DevTools/npm-publish@v1 + if: ${{ github.ref == 'refs/heads/main' }} with: token: ${{ env.NPM_TOKEN }} From 9f9c133b25b5d5505e0905c129af08a3e2f02c15 Mon Sep 17 00:00:00 2001 From: Demian Parkhomenko <95881717+DemianParkhomenko@users.noreply.github.com> Date: Tue, 11 Jun 2024 20:12:30 +0300 Subject: [PATCH 4/5] Bump version to 3.3.5 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index a71d838..37cbc9a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-message-bus", - "version": "3.3.4", + "version": "3.3.5", "description": "Minimalistic and complete AMQP message bus implementation", "main": "lib/index.js", "files": [ From 92e002797dbf9ef5e067e17337ea63f52dfee553 Mon Sep 17 00:00:00 2001 From: Demian Parkhomenko <95881717+DemianParkhomenko@users.noreply.github.com> Date: Tue, 11 Jun 2024 20:15:04 +0300 Subject: [PATCH 5/5] Revert `consumer.ts` file --- src/consumer.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/consumer.ts b/src/consumer.ts index 2498712..a508f20 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -118,9 +118,7 @@ export async function consumeMessages( queueName: ConsumeMessagesConfig | string, handler: MessageHandler ) { - console.log('📍 consumeMessages', queueName); let prefetch: number | undefined; - const channel = await getDefaultChannel(); if (typeof queueName !== 'string') { if (!queueName.queues || queueName.queues.length !== 1) { throw new Error( @@ -135,6 +133,7 @@ export async function consumeMessages( } } + const channel = await getDefaultChannel(); const queue = getMessageBusConfig().queues.find((q) => q.name === queueName); const consumerTag = `${ process.env.HOSTNAME || 'no.env.HOSTNAME'