Skip to content

Commit

Permalink
Add sequential consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
nikitaeverywhere committed Aug 18, 2023
1 parent 0375fd1 commit 92b2cbc
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 28 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-message-bus",
"version": "3.2.0",
"version": "3.3.0",
"description": "Minimalistic and complete AMQP message bus implementation",
"main": "lib/index.js",
"files": [
Expand Down
21 changes: 21 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,27 @@ await consumeMessages(
);
```

#### Sequential AMQP message consumption

If you want a worker (instance!) to consume a maximum of one message at a time from the queue, you can limit it
with a `prefetchCount` option for the specific queue (consumer):

```typescript
import { consumeMessages } from 'node-message-bus';

await consumeMessages(
{
queues: ['test-queue-1'],
/* bindings: also specify bindings here if your queue wasn't bound before. */

prefetchCount: 1,
},
async ({ body }) => {
/* ... */
}
);
```

### Using message types

It is recommended to use `node-message-bus` with typed messages, to ensure data integrity during compilation.
Expand Down
2 changes: 1 addition & 1 deletion src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const channelPromise = getConnection()
throw e;
});

export const getChannel = async () => {
export const getDefaultChannel = async () => {
const channel = await channelPromise;
await channel.waitForConnect();
return channel;
Expand Down
2 changes: 1 addition & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export const configureMessageBus = async (

if (!channel) {
// To avoid creating circular dependency.
channel = await (await import('./channel')).getChannel();
channel = await (await import('./channel')).getDefaultChannel();
}

const promises: Array<Promise<any>> = [];
Expand Down
38 changes: 20 additions & 18 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,15 @@ import {
pushToLastRejectedMessages,
} from 'Utils';
import { ChannelWrapper } from 'amqp-connection-manager';
import {
ConsumeMessage,
ConsumeMessageFields,
MessageProperties,
} from 'amqplib';
import { ConsumeMessage } from 'amqplib';
import { DEFAULT_EXCHANGE_NAME } from './Const';
import { getChannel } from './channel';
import { getDefaultChannel } from './channel';
import { configureMessageBus, getMessageBusConfig } from './config';

const EXP_BACKOFF_HEADER_NAME = 'x-backoff-sec';
const EXP_BACKOFF_MULTIPLIER = 4;
const MAX_EXP_BACKOFF = 1000 * 1024;

interface HandlerExtraParams
extends MessageProperties,
ConsumeMessageFields,
IMessage {
failThisMessage: (error?: Error) => Promise<void>;
}

const backoffRetryMessage = async <DataType = any>({
message,
queue,
Expand Down Expand Up @@ -105,30 +94,42 @@ const backoffRetryMessage = async <DataType = any>({
await channel.ack(message);
};

interface ConsumeMessagesExtraConfig {
/** Number of messages prefetched (and hence handled in parallel). */
prefetchCount?: number;
}

type ConsumeMessagesConfig = MessageBusConfig & ConsumeMessagesExtraConfig;

export async function consumeMessages<Message extends IMessage>(
queueName: string,
handler: MessageHandler<Message>
): Promise<void>;
export async function consumeMessages<Message extends IMessage>(
config: MessageBusConfig,
config: ConsumeMessagesConfig,
handler: MessageHandler<Message>
): Promise<void>;

export async function consumeMessages<Message extends IMessage>(
queueName: MessageBusConfig | string,
queueName: ConsumeMessagesConfig | string,
handler: MessageHandler<Message>
) {
let prefetch: number | undefined;
if (typeof queueName !== 'string') {
if (!queueName.queues || queueName.queues.length !== 1) {
throw new Error(
'You need to define exactly one queue in the passed config to consumeMessages()'
);
}
await configureMessageBus(queueName);
const { prefetchCount, ...config } = queueName;
await configureMessageBus(config);
queueName = queueName.queues[0].name;
if (prefetchCount) {
prefetch = prefetchCount;
}
}

const channel = await getChannel();
const channel = await getDefaultChannel();
const queue = getMessageBusConfig().queues.find((q) => q.name === queueName);
const consumerTag = `${
process.env.HOSTNAME || 'no.env.HOSTNAME'
Expand Down Expand Up @@ -202,13 +203,14 @@ export async function consumeMessages<Message extends IMessage>(
},
{
consumerTag,
prefetch,
}
);

log(`✔ Listening messages from queue ${queueName}`);
}

export const messageBusStopAllConsumers = async () => {
const channel = await getChannel();
const channel = await getDefaultChannel();
await channel.cancelAll();
};
4 changes: 2 additions & 2 deletions src/misc.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { getChannel } from './channel';
import { getDefaultChannel } from './channel';
import { getMessageBusConfig } from './config';

/** Purges a single queue. */
export const purgeQueue = async (opts: string | { queueName: string }) => {
const channel = await getChannel();
const channel = await getDefaultChannel();

return await channel.purgeQueue(
typeof opts === 'string' ? opts : opts.queueName
Expand Down
6 changes: 3 additions & 3 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
} from 'Utils';
import { Options } from 'amqplib';
import { DEFAULT_CONFIG, DEFAULT_EXCHANGE_NAME } from './Const';
import { getChannel } from './channel';
import { getDefaultChannel } from './channel';

interface Message extends IMessage {
exchangeName?: string;
Expand All @@ -23,7 +23,7 @@ interface DirectMessage<MessageType extends IMessage> {
export const publishMessage = async <DataType extends IMessage = Message>(
message: Message & DataType
) => {
const channel = await getChannel();
const channel = await getDefaultChannel();
const exchangeName =
message.exchangeName ||
DEFAULT_CONFIG.exchanges?.[0].name ||
Expand Down Expand Up @@ -58,7 +58,7 @@ export const publishMessageToQueue = async <
queueName,
options,
}: DirectMessage<DataType>) => {
const channel = await getChannel();
const channel = await getDefaultChannel();

try {
log(`Publishing message to queue=${queueName}`);
Expand Down
4 changes: 2 additions & 2 deletions src/queues.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Options } from 'amqplib';
import { getChannel } from './channel';
import { getDefaultChannel } from './channel';

export const deleteQueue = async (
queueName: string,
options?: Options.DeleteQueue
) => {
const channel = await getChannel();
const channel = await getDefaultChannel();
const result = await channel.deleteQueue(queueName, options);

return {
Expand Down
58 changes: 58 additions & 0 deletions tests/specs/cloudamqp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,64 @@ describe('node-message-bus', () => {
expect(consumedMessages[0].recipientUserId).to.be.equal(random);
});

it('sequential consumption', async () => {
const consumedMessages: any[] = [];
await consumeMessages(
{
queues: [{ name: 'the-sequential' }],
bindings: [
{
toQueue: 'the-sequential',
routingKey: 'the.test.sequential',
},
],
prefetchCount: 1,
},
async ({ body }) => {
const job = Math.random().toString(36).slice(2);
const start = Date.now();
console.log(`Job ${job} start at ${start}`);
await new Promise((r) => setTimeout(r, 500));
consumedMessages.push({ start, end: Date.now(), body });
console.log(
`Job ${job} end at ${
consumedMessages[consumedMessages.length - 1].end
}`
);
}
);

const publish = (x: number) =>
publishMessage({
key: 'the.test.sequential',
body: {
x,
},
});
await Promise.all([publish(1), publish(2), publish(3)]);

await new Promise((r) => {
const int = setInterval(() => {
if (consumedMessages.length === 3) {
clearInterval(int);
r(null);
}
}, 25);
});

expect(consumedMessages.map((m) => m.body.x)).to.deep.equal([1, 2, 3]);
consumedMessages
.map((m) => m.end - m.start)
.forEach((v) => expect(v).to.be.greaterThanOrEqual(500));
consumedMessages
.map((m, i, arr) => (i === 0 ? 0 : m.start - arr[i - 1].start))
.slice(1)
.forEach((v) => {
expect(v).to.be.greaterThanOrEqual(500);
expect(v).to.be.lessThanOrEqual(1000);
});
});

describe('errors', () => {
it('uses exponential backoff for failed deliveries', async () => {
let handledTimes: number[] = [];
Expand Down

0 comments on commit 92b2cbc

Please sign in to comment.