Skip to content

Commit

Permalink
Fix backoff queue message consuming
Browse files Browse the repository at this point in the history
  • Loading branch information
nikitaeverywhere committed Feb 2, 2024
1 parent 9d42c8c commit 011581e
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 6 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-message-bus",
"version": "3.3.2",
"version": "3.3.3",
"description": "Minimalistic and complete AMQP message bus implementation",
"main": "lib/index.js",
"files": [
Expand Down
21 changes: 18 additions & 3 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import { DEFAULT_EXCHANGE_NAME } from './Const';
import { getDefaultChannel } from './channel';
import { configureMessageBus, getMessageBusConfig } from './config';

const EXP_BACKOFF_HEADER_NAME = 'x-backoff-sec';
/* Marks the number of the delay seconds with which this message was sent to a backoff queue. */
const HEADER_NAME_EXP_BACKOFF_SEC = 'x-backoff-sec';
/* Marks backoff messages with this header to make sure the message is consumed only by the queue where it has failed. */
const HEADER_NAME_TARGET_QUEUE = 'x-qn-target';
const EXP_BACKOFF_MULTIPLIER = 4;
const MAX_EXP_BACKOFF = 1000 * 1024;

Expand All @@ -36,7 +39,7 @@ const backoffRetryMessage = async <DataType = any>({
error?: Error;
}) => {
const currentBackoffSeconds =
parseInt(message.properties.headers[EXP_BACKOFF_HEADER_NAME]) || 0;
parseInt(message.properties.headers[HEADER_NAME_EXP_BACKOFF_SEC]) || 0;
const nextBackoffSeconds = Math.min(
MAX_EXP_BACKOFF,
currentBackoffSeconds
Expand Down Expand Up @@ -82,7 +85,8 @@ const backoffRetryMessage = async <DataType = any>({
await channel.sendToQueue(backoffQueueName, body, {
headers: {
...message.properties.headers,
[EXP_BACKOFF_HEADER_NAME]: nextBackoffSeconds,
[HEADER_NAME_EXP_BACKOFF_SEC]: nextBackoffSeconds,
[HEADER_NAME_TARGET_QUEUE]: queue.name,
},
});
} catch (e) {
Expand Down Expand Up @@ -151,6 +155,17 @@ export async function consumeMessages<Message extends IMessage>(
return;
}

if (
message.properties.headers[HEADER_NAME_TARGET_QUEUE] &&
message.properties.headers[HEADER_NAME_TARGET_QUEUE] !== queueName
) {
log(
`Skipping message "${message.fields.routingKey}" in queue "${queueName}" because it is intended for queue "${message.properties.headers[HEADER_NAME_TARGET_QUEUE]}" due to backoff.`
);
channel.ack(message);
return;
}

let body = message.content;
try {
body = JSON.parse(message.content.toString());
Expand Down
72 changes: 72 additions & 0 deletions tests/specs/cloudamqp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ describe('node-message-bus', () => {
{
name: 'test-queue-dead-letter-handler',
},
{
name: 'test-queue-backoff-1',
},
{
name: 'test-queue-backoff-2',
},
],
bindings: [
{
Expand All @@ -63,6 +69,14 @@ describe('node-message-bus', () => {
toQueue: 'test-queue-dead-letter-handler',
routingKey: 'automation.run',
},
{
toQueue: 'test-queue-backoff-1',
routingKey: 'backoff.*',
},
{
toQueue: 'test-queue-backoff-2',
routingKey: 'backoff.*',
},
],
});
});
Expand Down Expand Up @@ -475,6 +489,64 @@ describe('node-message-bus', () => {
stepId: 'start',
});
});

it('triggers only target queue handler when in backoff', async () => {
let handledTimes1: number[] = [];
let handledTimes2: number[] = [];
let handledData1: any;
let handledData2: any;

consumeMessages('test-queue-backoff-1', async ({ body, headers }) => {
console.log(
`Handling new message in queue 1: ${body}, headers: ${JSON.stringify(
headers
)}`
);
handledTimes1.push(Date.now());
if (handledTimes1.length === 3) {
handledData1 = body;
} else {
throw new Error('dummy error - expected in test');
}
});
consumeMessages('test-queue-backoff-2', async ({ body, headers }) => {
console.log(
`Handling new message in queue 2: ${body}, headers: ${JSON.stringify(
headers
)}`
);
handledTimes2.push(Date.now());
handledData2 = body;
});

await publishMessage({
key: 'backoff.test',
body: {
hello: 1,
},
});

// Wait for body.
await new Promise((resolve) => {
const int = setInterval(() => {
if (handledTimes1.length === 3) {
clearInterval(int);
resolve(1);
}
}, 50);
});

const d1 = handledTimes1[1] - handledTimes1[0];
const d2 = handledTimes1[2] - handledTimes1[1];
expect(d1).to.be.greaterThanOrEqual(1000);
expect(d1).to.be.lessThanOrEqual(3000);
expect(d2).to.be.greaterThanOrEqual(4000);
expect(d2).to.be.lessThanOrEqual(6000);
expect(handledData1).to.be.deep.equal({ hello: 1 });
expect(handledTimes1.length).to.be.equal(3);
expect(handledTimes2.length).to.be.equal(1);
expect(handledData2).to.be.deep.equal({ hello: 1 });
});
});

describe('testing helper functions', () => {
Expand Down

0 comments on commit 011581e

Please sign in to comment.