Skip to content

Commit

Permalink
Add dynamic queues to setup function in ChannelWrapper (#2)
Browse files Browse the repository at this point in the history
* Add dynamic queues to setup function in `ChannelWrapper`

* Refactor channel setup

* Fix the publish workflow to run on pull requests

* Bump version to 3.3.5

* Revert `consumer.ts` file
  • Loading branch information
DemianParkhomenko authored Jun 11, 2024
1 parent d98e132 commit 5353727
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 31 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ name: Publish NPM package

on:
push:
branches:
- main
branches: [main]
pull_request:
branches: [main]

env:
CLOUDAMQP_API_KEY: ${{ secrets.CLOUDAMQP_API_KEY }}
Expand All @@ -21,5 +22,6 @@ jobs:
- run: npm run build
- run: npm test
- uses: JS-DevTools/npm-publish@v1
if: ${{ github.ref == 'refs/heads/main' }}
with:
token: ${{ env.NPM_TOKEN }}
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-message-bus",
"version": "3.3.4",
"version": "3.3.5",
"description": "Minimalistic and complete AMQP message bus implementation",
"main": "lib/index.js",
"files": [
Expand Down
34 changes: 17 additions & 17 deletions src/channel.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { MessageBusConfig } from 'Types';
import { error, log } from 'Utils';
import { ConfirmChannel } from 'amqplib';
import { configureMessageBus, configureMessageBusStatic } from './config';
import {
closeMessageBusConnection,
Expand Down Expand Up @@ -34,29 +33,30 @@ const channelPromise = getConnection()
.then((connection) => {
return connection.createChannel({
json: true,
setup: async (channel: ConfirmChannel) => {
log(
`Waiting for AMQP to be initialized by invoking initMessageBus() from 'node-message-bus'.`
);
const config = await _initPromise;

await configureMessageBus(config, channel);

log(
`AMQP initialization is complete with the following config: ${JSON.stringify(
config
)}`
);

postInitPromiseResolve(true);
},
});
})
.catch((e) => {
error(e.stack || e);
throw e;
});

channelPromise.then(async (channelWrapper) => {
log(
`Waiting for AMQP to be initialized by invoking initMessageBus() from 'node-message-bus'.`
);
const config = await _initPromise;

await configureMessageBus(config, channelWrapper);

log(
`AMQP initialization is complete with the following config: ${JSON.stringify(
config
)}`
);

postInitPromiseResolve(true);
});

export const getDefaultChannel = async () => {
const channel = await channelPromise;
await channel.waitForConnect();
Expand Down
36 changes: 27 additions & 9 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export const configureMessageBusStatic = async (
*/
export const configureMessageBus = async (
config: MessageBusConfig,
channel?: ChannelWrapper | ConfirmChannel
channel?: ChannelWrapper
) => {
configureMessageBusStatic(config);

Expand All @@ -85,9 +85,17 @@ export const configureMessageBus = async (
)) {
defaultExchangeConfigured = true;
try {
log(`Asserting exchange "${exchange.name}" of type "${exchange.type}".`);
promises.push(
channel.assertExchange(exchange.name, exchange.type, exchange.options)
channel.addSetup(async (channel: ConfirmChannel) => {
log(
`Asserting exchange "${exchange.name}" of type "${exchange.type}".`
);
await channel.assertExchange(
exchange.name,
exchange.type,
exchange.options
);
})
);
appliedConfig.exchanges.push(exchange);
} catch (e) {
Expand All @@ -102,8 +110,12 @@ export const configureMessageBus = async (
DEFAULT_CONFIG.queues || []
)) {
try {
log(`Asserting queue "${queue.name}".`);
promises.push(channel.assertQueue(queue.name, queue.options));
promises.push(
channel.addSetup(async (channel: ConfirmChannel) => {
log(`Asserting queue "${queue.name}".`);
await channel.assertQueue(queue.name, queue.options);
})
);
appliedConfig.queues.push(queue);
} catch (e) {
const message = `Failed to assert queue "${
Expand All @@ -118,11 +130,17 @@ export const configureMessageBus = async (
)) {
const fromExchange = binding.fromExchange || DEFAULT_EXCHANGE_NAME;
try {
log(
`Asserting the queue binding "${fromExchange}" -> "${binding.toQueue}" by key "${binding.routingKey}".`
);
promises.push(
channel.bindQueue(binding.toQueue, fromExchange, binding.routingKey)
channel.addSetup(async (channel: ConfirmChannel) => {
log(
`Asserting the queue binding "${fromExchange}" -> "${binding.toQueue}" by key "${binding.routingKey}".`
);
await channel.bindQueue(
binding.toQueue,
fromExchange,
binding.routingKey
);
})
);
appliedConfig.bindings.push(binding);
} catch (e) {
Expand Down

0 comments on commit 5353727

Please sign in to comment.