From 5353727d9527c0df7fa851da14c4bb3556e0e74e Mon Sep 17 00:00:00 2001 From: Demian Parkhomenko <95881717+DemianParkhomenko@users.noreply.github.com> Date: Tue, 11 Jun 2024 20:26:39 +0300 Subject: [PATCH] Add dynamic queues to setup function in `ChannelWrapper` (#2) * Add dynamic queues to setup function in `ChannelWrapper` * Refactor channel setup * Fix the publish workflow to run on pull requests * Bump version to 3.3.5 * Revert `consumer.ts` file --- .github/workflows/publish.yaml | 6 ++++-- package-lock.json | 4 ++-- package.json | 2 +- src/channel.ts | 34 ++++++++++++++++---------------- src/config.ts | 36 +++++++++++++++++++++++++--------- 5 files changed, 51 insertions(+), 31 deletions(-) diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 5f163c1..ddccc7e 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -2,8 +2,9 @@ name: Publish NPM package on: push: - branches: - - main + branches: [main] + pull_request: + branches: [main] env: CLOUDAMQP_API_KEY: ${{ secrets.CLOUDAMQP_API_KEY }} @@ -21,5 +22,6 @@ jobs: - run: npm run build - run: npm test - uses: JS-DevTools/npm-publish@v1 + if: ${{ github.ref == 'refs/heads/main' }} with: token: ${{ env.NPM_TOKEN }} diff --git a/package-lock.json b/package-lock.json index 7f4037d..53ce98c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "node-message-bus", - "version": "3.3.3", + "version": "3.3.4", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "node-message-bus", - "version": "3.3.3", + "version": "3.3.4", "license": "MIT", "dependencies": { "@types/amqplib": "^0.10.5", diff --git a/package.json b/package.json index a71d838..37cbc9a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-message-bus", - "version": "3.3.4", + "version": "3.3.5", "description": "Minimalistic and complete AMQP message bus implementation", "main": "lib/index.js", "files": [ diff --git a/src/channel.ts b/src/channel.ts index 1edb2bb..cfa7da2 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -1,6 +1,5 @@ import { MessageBusConfig } from 'Types'; import { error, log } from 'Utils'; -import { ConfirmChannel } from 'amqplib'; import { configureMessageBus, configureMessageBusStatic } from './config'; import { closeMessageBusConnection, @@ -34,22 +33,6 @@ const channelPromise = getConnection() .then((connection) => { return connection.createChannel({ json: true, - setup: async (channel: ConfirmChannel) => { - log( - `Waiting for AMQP to be initialized by invoking initMessageBus() from 'node-message-bus'.` - ); - const config = await _initPromise; - - await configureMessageBus(config, channel); - - log( - `AMQP initialization is complete with the following config: ${JSON.stringify( - config - )}` - ); - - postInitPromiseResolve(true); - }, }); }) .catch((e) => { @@ -57,6 +40,23 @@ const channelPromise = getConnection() throw e; }); +channelPromise.then(async (channelWrapper) => { + log( + `Waiting for AMQP to be initialized by invoking initMessageBus() from 'node-message-bus'.` + ); + const config = await _initPromise; + + await configureMessageBus(config, channelWrapper); + + log( + `AMQP initialization is complete with the following config: ${JSON.stringify( + config + )}` + ); + + postInitPromiseResolve(true); +}); + export const getDefaultChannel = async () => { const channel = await channelPromise; await channel.waitForConnect(); diff --git a/src/config.ts b/src/config.ts index 271e670..e64cd18 100644 --- a/src/config.ts +++ b/src/config.ts @@ -69,7 +69,7 @@ export const configureMessageBusStatic = async ( */ export const configureMessageBus = async ( config: MessageBusConfig, - channel?: ChannelWrapper | ConfirmChannel + channel?: ChannelWrapper ) => { configureMessageBusStatic(config); @@ -85,9 +85,17 @@ export const configureMessageBus = async ( )) { defaultExchangeConfigured = true; try { - log(`Asserting exchange "${exchange.name}" of type "${exchange.type}".`); promises.push( - channel.assertExchange(exchange.name, exchange.type, exchange.options) + channel.addSetup(async (channel: ConfirmChannel) => { + log( + `Asserting exchange "${exchange.name}" of type "${exchange.type}".` + ); + await channel.assertExchange( + exchange.name, + exchange.type, + exchange.options + ); + }) ); appliedConfig.exchanges.push(exchange); } catch (e) { @@ -102,8 +110,12 @@ export const configureMessageBus = async ( DEFAULT_CONFIG.queues || [] )) { try { - log(`Asserting queue "${queue.name}".`); - promises.push(channel.assertQueue(queue.name, queue.options)); + promises.push( + channel.addSetup(async (channel: ConfirmChannel) => { + log(`Asserting queue "${queue.name}".`); + await channel.assertQueue(queue.name, queue.options); + }) + ); appliedConfig.queues.push(queue); } catch (e) { const message = `Failed to assert queue "${ @@ -118,11 +130,17 @@ export const configureMessageBus = async ( )) { const fromExchange = binding.fromExchange || DEFAULT_EXCHANGE_NAME; try { - log( - `Asserting the queue binding "${fromExchange}" -> "${binding.toQueue}" by key "${binding.routingKey}".` - ); promises.push( - channel.bindQueue(binding.toQueue, fromExchange, binding.routingKey) + channel.addSetup(async (channel: ConfirmChannel) => { + log( + `Asserting the queue binding "${fromExchange}" -> "${binding.toQueue}" by key "${binding.routingKey}".` + ); + await channel.bindQueue( + binding.toQueue, + fromExchange, + binding.routingKey + ); + }) ); appliedConfig.bindings.push(binding); } catch (e) {