Skip to content

Commit

Permalink
Add dynamic queues to setup function in ChannelWrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
DemianParkhomenko committed Jun 11, 2024
1 parent d98e132 commit 3e21e96
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
pushToLastRejectedMessages,
} from 'Utils';
import { ChannelWrapper } from 'amqp-connection-manager';
import { ConsumeMessage } from 'amqplib';
import { ConfirmChannel, ConsumeMessage } from 'amqplib';
import { DEFAULT_EXCHANGE_NAME } from './Const';
import { getDefaultChannel } from './channel';
import { configureMessageBus, getMessageBusConfig } from './config';
Expand Down Expand Up @@ -119,6 +119,7 @@ export async function consumeMessages<Message extends IMessage>(
handler: MessageHandler<Message>
) {
let prefetch: number | undefined;
const channel = await getDefaultChannel();
if (typeof queueName !== 'string') {
if (!queueName.queues || queueName.queues.length !== 1) {
throw new Error(
Expand All @@ -131,9 +132,11 @@ export async function consumeMessages<Message extends IMessage>(
if (prefetchCount) {
prefetch = prefetchCount;
}
channel.addSetup(async (channel: ConfirmChannel) => {
await configureMessageBus(config, channel);
});
}

const channel = await getDefaultChannel();
const queue = getMessageBusConfig().queues.find((q) => q.name === queueName);
const consumerTag = `${
process.env.HOSTNAME || 'no.env.HOSTNAME'
Expand Down

0 comments on commit 3e21e96

Please sign in to comment.