From 011581e35267a894bafdcfc9a496289d358bb896 Mon Sep 17 00:00:00 2001 From: Nikita Savchenko Date: Fri, 2 Feb 2024 13:41:51 +0100 Subject: [PATCH] Fix backoff queue message consuming --- package-lock.json | 4 +- package.json | 2 +- src/consumer.ts | 21 ++++++++-- tests/specs/cloudamqp.test.ts | 72 +++++++++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+), 6 deletions(-) diff --git a/package-lock.json b/package-lock.json index 34be88e..43c5c9c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "node-message-bus", - "version": "3.1.0", + "version": "3.3.3", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "node-message-bus", - "version": "3.1.0", + "version": "3.3.3", "license": "MIT", "dependencies": { "@types/amqplib": "^0.10.1", diff --git a/package.json b/package.json index eecb8d7..5cf05d1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-message-bus", - "version": "3.3.2", + "version": "3.3.3", "description": "Minimalistic and complete AMQP message bus implementation", "main": "lib/index.js", "files": [ diff --git a/src/consumer.ts b/src/consumer.ts index 7b55a0e..5b7f488 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -18,7 +18,10 @@ import { DEFAULT_EXCHANGE_NAME } from './Const'; import { getDefaultChannel } from './channel'; import { configureMessageBus, getMessageBusConfig } from './config'; -const EXP_BACKOFF_HEADER_NAME = 'x-backoff-sec'; +/* Marks the number of the delay seconds with which this message was sent to a backoff queue. */ +const HEADER_NAME_EXP_BACKOFF_SEC = 'x-backoff-sec'; +/* Marks backoff messages with this header to make sure the message is consumed only by the queue where it has failed. */ +const HEADER_NAME_TARGET_QUEUE = 'x-qn-target'; const EXP_BACKOFF_MULTIPLIER = 4; const MAX_EXP_BACKOFF = 1000 * 1024; @@ -36,7 +39,7 @@ const backoffRetryMessage = async ({ error?: Error; }) => { const currentBackoffSeconds = - parseInt(message.properties.headers[EXP_BACKOFF_HEADER_NAME]) || 0; + parseInt(message.properties.headers[HEADER_NAME_EXP_BACKOFF_SEC]) || 0; const nextBackoffSeconds = Math.min( MAX_EXP_BACKOFF, currentBackoffSeconds @@ -82,7 +85,8 @@ const backoffRetryMessage = async ({ await channel.sendToQueue(backoffQueueName, body, { headers: { ...message.properties.headers, - [EXP_BACKOFF_HEADER_NAME]: nextBackoffSeconds, + [HEADER_NAME_EXP_BACKOFF_SEC]: nextBackoffSeconds, + [HEADER_NAME_TARGET_QUEUE]: queue.name, }, }); } catch (e) { @@ -151,6 +155,17 @@ export async function consumeMessages( return; } + if ( + message.properties.headers[HEADER_NAME_TARGET_QUEUE] && + message.properties.headers[HEADER_NAME_TARGET_QUEUE] !== queueName + ) { + log( + `Skipping message "${message.fields.routingKey}" in queue "${queueName}" because it is intended for queue "${message.properties.headers[HEADER_NAME_TARGET_QUEUE]}" due to backoff.` + ); + channel.ack(message); + return; + } + let body = message.content; try { body = JSON.parse(message.content.toString()); diff --git a/tests/specs/cloudamqp.test.ts b/tests/specs/cloudamqp.test.ts index f27d3ad..23ad35f 100644 --- a/tests/specs/cloudamqp.test.ts +++ b/tests/specs/cloudamqp.test.ts @@ -45,6 +45,12 @@ describe('node-message-bus', () => { { name: 'test-queue-dead-letter-handler', }, + { + name: 'test-queue-backoff-1', + }, + { + name: 'test-queue-backoff-2', + }, ], bindings: [ { @@ -63,6 +69,14 @@ describe('node-message-bus', () => { toQueue: 'test-queue-dead-letter-handler', routingKey: 'automation.run', }, + { + toQueue: 'test-queue-backoff-1', + routingKey: 'backoff.*', + }, + { + toQueue: 'test-queue-backoff-2', + routingKey: 'backoff.*', + }, ], }); }); @@ -475,6 +489,64 @@ describe('node-message-bus', () => { stepId: 'start', }); }); + + it('triggers only target queue handler when in backoff', async () => { + let handledTimes1: number[] = []; + let handledTimes2: number[] = []; + let handledData1: any; + let handledData2: any; + + consumeMessages('test-queue-backoff-1', async ({ body, headers }) => { + console.log( + `Handling new message in queue 1: ${body}, headers: ${JSON.stringify( + headers + )}` + ); + handledTimes1.push(Date.now()); + if (handledTimes1.length === 3) { + handledData1 = body; + } else { + throw new Error('dummy error - expected in test'); + } + }); + consumeMessages('test-queue-backoff-2', async ({ body, headers }) => { + console.log( + `Handling new message in queue 2: ${body}, headers: ${JSON.stringify( + headers + )}` + ); + handledTimes2.push(Date.now()); + handledData2 = body; + }); + + await publishMessage({ + key: 'backoff.test', + body: { + hello: 1, + }, + }); + + // Wait for body. + await new Promise((resolve) => { + const int = setInterval(() => { + if (handledTimes1.length === 3) { + clearInterval(int); + resolve(1); + } + }, 50); + }); + + const d1 = handledTimes1[1] - handledTimes1[0]; + const d2 = handledTimes1[2] - handledTimes1[1]; + expect(d1).to.be.greaterThanOrEqual(1000); + expect(d1).to.be.lessThanOrEqual(3000); + expect(d2).to.be.greaterThanOrEqual(4000); + expect(d2).to.be.lessThanOrEqual(6000); + expect(handledData1).to.be.deep.equal({ hello: 1 }); + expect(handledTimes1.length).to.be.equal(3); + expect(handledTimes2.length).to.be.equal(1); + expect(handledData2).to.be.deep.equal({ hello: 1 }); + }); }); describe('testing helper functions', () => {