From 92b2cbc35b96af24a21d9c5886d605449b8f5de8 Mon Sep 17 00:00:00 2001 From: Nikita Savchenko Date: Fri, 18 Aug 2023 15:42:31 +0200 Subject: [PATCH] Add sequential consumption --- package.json | 2 +- readme.md | 21 +++++++++++++ src/channel.ts | 2 +- src/config.ts | 2 +- src/consumer.ts | 38 ++++++++++++----------- src/misc.ts | 4 +-- src/publisher.ts | 6 ++-- src/queues.ts | 4 +-- tests/specs/cloudamqp.test.ts | 58 +++++++++++++++++++++++++++++++++++ 9 files changed, 109 insertions(+), 28 deletions(-) diff --git a/package.json b/package.json index 5fcd35d..4af30db 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-message-bus", - "version": "3.2.0", + "version": "3.3.0", "description": "Minimalistic and complete AMQP message bus implementation", "main": "lib/index.js", "files": [ diff --git a/readme.md b/readme.md index 10d7a28..9659bf8 100644 --- a/readme.md +++ b/readme.md @@ -319,6 +319,27 @@ await consumeMessages( ); ``` +#### Sequential AMQP message consumption + +If you want a worker (instance!) to consume a maximum of one message at a time from the queue, you can limit it +with a `prefetchCount` option for the specific queue (consumer): + +```typescript +import { consumeMessages } from 'node-message-bus'; + +await consumeMessages( + { + queues: ['test-queue-1'], + /* bindings: also specify bindings here if your queue wasn't bound before. */ + + prefetchCount: 1, + }, + async ({ body }) => { + /* ... */ + } +); +``` + ### Using message types It is recommended to use `node-message-bus` with typed messages, to ensure data integrity during compilation. diff --git a/src/channel.ts b/src/channel.ts index cfe9444..1edb2bb 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -57,7 +57,7 @@ const channelPromise = getConnection() throw e; }); -export const getChannel = async () => { +export const getDefaultChannel = async () => { const channel = await channelPromise; await channel.waitForConnect(); return channel; diff --git a/src/config.ts b/src/config.ts index 19efb18..271e670 100644 --- a/src/config.ts +++ b/src/config.ts @@ -75,7 +75,7 @@ export const configureMessageBus = async ( if (!channel) { // To avoid creating circular dependency. - channel = await (await import('./channel')).getChannel(); + channel = await (await import('./channel')).getDefaultChannel(); } const promises: Array> = []; diff --git a/src/consumer.ts b/src/consumer.ts index 604a473..7b55a0e 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -13,26 +13,15 @@ import { pushToLastRejectedMessages, } from 'Utils'; import { ChannelWrapper } from 'amqp-connection-manager'; -import { - ConsumeMessage, - ConsumeMessageFields, - MessageProperties, -} from 'amqplib'; +import { ConsumeMessage } from 'amqplib'; import { DEFAULT_EXCHANGE_NAME } from './Const'; -import { getChannel } from './channel'; +import { getDefaultChannel } from './channel'; import { configureMessageBus, getMessageBusConfig } from './config'; const EXP_BACKOFF_HEADER_NAME = 'x-backoff-sec'; const EXP_BACKOFF_MULTIPLIER = 4; const MAX_EXP_BACKOFF = 1000 * 1024; -interface HandlerExtraParams - extends MessageProperties, - ConsumeMessageFields, - IMessage { - failThisMessage: (error?: Error) => Promise; -} - const backoffRetryMessage = async ({ message, queue, @@ -105,30 +94,42 @@ const backoffRetryMessage = async ({ await channel.ack(message); }; +interface ConsumeMessagesExtraConfig { + /** Number of messages prefetched (and hence handled in parallel). */ + prefetchCount?: number; +} + +type ConsumeMessagesConfig = MessageBusConfig & ConsumeMessagesExtraConfig; + export async function consumeMessages( queueName: string, handler: MessageHandler ): Promise; export async function consumeMessages( - config: MessageBusConfig, + config: ConsumeMessagesConfig, handler: MessageHandler ): Promise; export async function consumeMessages( - queueName: MessageBusConfig | string, + queueName: ConsumeMessagesConfig | string, handler: MessageHandler ) { + let prefetch: number | undefined; if (typeof queueName !== 'string') { if (!queueName.queues || queueName.queues.length !== 1) { throw new Error( 'You need to define exactly one queue in the passed config to consumeMessages()' ); } - await configureMessageBus(queueName); + const { prefetchCount, ...config } = queueName; + await configureMessageBus(config); queueName = queueName.queues[0].name; + if (prefetchCount) { + prefetch = prefetchCount; + } } - const channel = await getChannel(); + const channel = await getDefaultChannel(); const queue = getMessageBusConfig().queues.find((q) => q.name === queueName); const consumerTag = `${ process.env.HOSTNAME || 'no.env.HOSTNAME' @@ -202,6 +203,7 @@ export async function consumeMessages( }, { consumerTag, + prefetch, } ); @@ -209,6 +211,6 @@ export async function consumeMessages( } export const messageBusStopAllConsumers = async () => { - const channel = await getChannel(); + const channel = await getDefaultChannel(); await channel.cancelAll(); }; diff --git a/src/misc.ts b/src/misc.ts index 28563f9..67372b0 100644 --- a/src/misc.ts +++ b/src/misc.ts @@ -1,9 +1,9 @@ -import { getChannel } from './channel'; +import { getDefaultChannel } from './channel'; import { getMessageBusConfig } from './config'; /** Purges a single queue. */ export const purgeQueue = async (opts: string | { queueName: string }) => { - const channel = await getChannel(); + const channel = await getDefaultChannel(); return await channel.purgeQueue( typeof opts === 'string' ? opts : opts.queueName diff --git a/src/publisher.ts b/src/publisher.ts index ed11035..426f4c4 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -7,7 +7,7 @@ import { } from 'Utils'; import { Options } from 'amqplib'; import { DEFAULT_CONFIG, DEFAULT_EXCHANGE_NAME } from './Const'; -import { getChannel } from './channel'; +import { getDefaultChannel } from './channel'; interface Message extends IMessage { exchangeName?: string; @@ -23,7 +23,7 @@ interface DirectMessage { export const publishMessage = async ( message: Message & DataType ) => { - const channel = await getChannel(); + const channel = await getDefaultChannel(); const exchangeName = message.exchangeName || DEFAULT_CONFIG.exchanges?.[0].name || @@ -58,7 +58,7 @@ export const publishMessageToQueue = async < queueName, options, }: DirectMessage) => { - const channel = await getChannel(); + const channel = await getDefaultChannel(); try { log(`Publishing message to queue=${queueName}`); diff --git a/src/queues.ts b/src/queues.ts index f519969..cdfacde 100644 --- a/src/queues.ts +++ b/src/queues.ts @@ -1,11 +1,11 @@ import { Options } from 'amqplib'; -import { getChannel } from './channel'; +import { getDefaultChannel } from './channel'; export const deleteQueue = async ( queueName: string, options?: Options.DeleteQueue ) => { - const channel = await getChannel(); + const channel = await getDefaultChannel(); const result = await channel.deleteQueue(queueName, options); return { diff --git a/tests/specs/cloudamqp.test.ts b/tests/specs/cloudamqp.test.ts index fc7ee16..f27d3ad 100644 --- a/tests/specs/cloudamqp.test.ts +++ b/tests/specs/cloudamqp.test.ts @@ -326,6 +326,64 @@ describe('node-message-bus', () => { expect(consumedMessages[0].recipientUserId).to.be.equal(random); }); + it('sequential consumption', async () => { + const consumedMessages: any[] = []; + await consumeMessages( + { + queues: [{ name: 'the-sequential' }], + bindings: [ + { + toQueue: 'the-sequential', + routingKey: 'the.test.sequential', + }, + ], + prefetchCount: 1, + }, + async ({ body }) => { + const job = Math.random().toString(36).slice(2); + const start = Date.now(); + console.log(`Job ${job} start at ${start}`); + await new Promise((r) => setTimeout(r, 500)); + consumedMessages.push({ start, end: Date.now(), body }); + console.log( + `Job ${job} end at ${ + consumedMessages[consumedMessages.length - 1].end + }` + ); + } + ); + + const publish = (x: number) => + publishMessage({ + key: 'the.test.sequential', + body: { + x, + }, + }); + await Promise.all([publish(1), publish(2), publish(3)]); + + await new Promise((r) => { + const int = setInterval(() => { + if (consumedMessages.length === 3) { + clearInterval(int); + r(null); + } + }, 25); + }); + + expect(consumedMessages.map((m) => m.body.x)).to.deep.equal([1, 2, 3]); + consumedMessages + .map((m) => m.end - m.start) + .forEach((v) => expect(v).to.be.greaterThanOrEqual(500)); + consumedMessages + .map((m, i, arr) => (i === 0 ? 0 : m.start - arr[i - 1].start)) + .slice(1) + .forEach((v) => { + expect(v).to.be.greaterThanOrEqual(500); + expect(v).to.be.lessThanOrEqual(1000); + }); + }); + describe('errors', () => { it('uses exponential backoff for failed deliveries', async () => { let handledTimes: number[] = [];