Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dynamic queues to setup function in ChannelWrapper #2

Merged
merged 5 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ name: Publish NPM package

on:
push:
branches:
- main
branches: [main]
pull_request:
branches: [main]

env:
CLOUDAMQP_API_KEY: ${{ secrets.CLOUDAMQP_API_KEY }}
Expand All @@ -21,5 +22,6 @@ jobs:
- run: npm run build
- run: npm test
- uses: JS-DevTools/npm-publish@v1
if: ${{ github.ref == 'refs/heads/main' }}
with:
token: ${{ env.NPM_TOKEN }}
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 17 additions & 17 deletions src/channel.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { MessageBusConfig } from 'Types';
import { error, log } from 'Utils';
import { ConfirmChannel } from 'amqplib';
import { configureMessageBus, configureMessageBusStatic } from './config';
import {
closeMessageBusConnection,
Expand Down Expand Up @@ -34,29 +33,30 @@ const channelPromise = getConnection()
.then((connection) => {
return connection.createChannel({
json: true,
setup: async (channel: ConfirmChannel) => {
log(
`Waiting for AMQP to be initialized by invoking initMessageBus() from 'node-message-bus'.`
);
const config = await _initPromise;

await configureMessageBus(config, channel);

log(
`AMQP initialization is complete with the following config: ${JSON.stringify(
config
)}`
);

postInitPromiseResolve(true);
},
});
})
.catch((e) => {
error(e.stack || e);
throw e;
});

channelPromise.then(async (channelWrapper) => {
log(
`Waiting for AMQP to be initialized by invoking initMessageBus() from 'node-message-bus'.`
);
const config = await _initPromise;

await configureMessageBus(config, channelWrapper);

log(
`AMQP initialization is complete with the following config: ${JSON.stringify(
config
)}`
);

postInitPromiseResolve(true);
});

export const getDefaultChannel = async () => {
const channel = await channelPromise;
await channel.waitForConnect();
Expand Down
36 changes: 27 additions & 9 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export const configureMessageBusStatic = async (
*/
export const configureMessageBus = async (
config: MessageBusConfig,
channel?: ChannelWrapper | ConfirmChannel
channel?: ChannelWrapper
) => {
configureMessageBusStatic(config);

Expand All @@ -85,9 +85,17 @@ export const configureMessageBus = async (
)) {
defaultExchangeConfigured = true;
try {
log(`Asserting exchange "${exchange.name}" of type "${exchange.type}".`);
promises.push(
channel.assertExchange(exchange.name, exchange.type, exchange.options)
channel.addSetup(async (channel: ConfirmChannel) => {
log(
`Asserting exchange "${exchange.name}" of type "${exchange.type}".`
);
await channel.assertExchange(
exchange.name,
exchange.type,
exchange.options
);
})
);
appliedConfig.exchanges.push(exchange);
} catch (e) {
Expand All @@ -102,8 +110,12 @@ export const configureMessageBus = async (
DEFAULT_CONFIG.queues || []
)) {
try {
log(`Asserting queue "${queue.name}".`);
promises.push(channel.assertQueue(queue.name, queue.options));
promises.push(
channel.addSetup(async (channel: ConfirmChannel) => {
log(`Asserting queue "${queue.name}".`);
await channel.assertQueue(queue.name, queue.options);
})
);
appliedConfig.queues.push(queue);
} catch (e) {
const message = `Failed to assert queue "${
Expand All @@ -118,11 +130,17 @@ export const configureMessageBus = async (
)) {
const fromExchange = binding.fromExchange || DEFAULT_EXCHANGE_NAME;
try {
log(
`Asserting the queue binding "${fromExchange}" -> "${binding.toQueue}" by key "${binding.routingKey}".`
);
promises.push(
channel.bindQueue(binding.toQueue, fromExchange, binding.routingKey)
channel.addSetup(async (channel: ConfirmChannel) => {
log(
`Asserting the queue binding "${fromExchange}" -> "${binding.toQueue}" by key "${binding.routingKey}".`
);
await channel.bindQueue(
binding.toQueue,
fromExchange,
binding.routingKey
);
})
);
appliedConfig.bindings.push(binding);
} catch (e) {
Expand Down
3 changes: 2 additions & 1 deletion src/consumer.ts
DemianParkhomenko marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ export async function consumeMessages<Message extends IMessage>(
queueName: ConsumeMessagesConfig | string,
handler: MessageHandler<Message>
) {
console.log('📍 consumeMessages', queueName);
let prefetch: number | undefined;
const channel = await getDefaultChannel();
if (typeof queueName !== 'string') {
if (!queueName.queues || queueName.queues.length !== 1) {
throw new Error(
Expand All @@ -133,7 +135,6 @@ export async function consumeMessages<Message extends IMessage>(
}
}

const channel = await getDefaultChannel();
const queue = getMessageBusConfig().queues.find((q) => q.name === queueName);
const consumerTag = `${
process.env.HOSTNAME || 'no.env.HOSTNAME'
Expand Down
Loading