From 0375fd1ef069c1e749ffb30fa75d1b14f9add1cc Mon Sep 17 00:00:00 2001 From: Nikita Savchenko Date: Fri, 21 Jul 2023 13:41:45 +0200 Subject: [PATCH] Make clearLastMessages async --- package.json | 2 +- readme.md | 4 ++-- src/Utils/testing.ts | 42 +++++++++++++++++++++++------------ src/publisher.ts | 5 +++-- tests/specs/cloudamqp.test.ts | 4 ++-- 5 files changed, 36 insertions(+), 21 deletions(-) diff --git a/package.json b/package.json index 7732e6f..5fcd35d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-message-bus", - "version": "3.1.1", + "version": "3.2.0", "description": "Minimalistic and complete AMQP message bus implementation", "main": "lib/index.js", "files": [ diff --git a/readme.md b/readme.md index c7d22a3..10d7a28 100644 --- a/readme.md +++ b/readme.md @@ -284,8 +284,8 @@ after(async () => { }); describe('Dummy test', () => { - beforeEach(() => { - clearLastMessages(); + beforeEach(async () => { + await clearLastMessages(); }); it('tests something', async () => { diff --git a/src/Utils/testing.ts b/src/Utils/testing.ts index 8263534..501f84c 100644 --- a/src/Utils/testing.ts +++ b/src/Utils/testing.ts @@ -1,26 +1,40 @@ import { LAST_MESSAGES_BUFFER_SIZE, isTestEnv } from 'Const'; import { IMessage } from 'Types'; +import { error } from './logger'; -interface LastMessagesFactory { - push: (m: T) => void; - clear: () => number; - get: () => T[]; +interface LastMessagesFactory { + /** When promise is given, the system will await on it before clearing messages. */ + push: (m: T, promise?: Promise) => void; + /** Clear last published messages and also await on publishing all pending messages. */ + clear: () => Promise; + get: () => T[]; } -const lastMessagesFactory = (): LastMessagesFactory => { - const array: Array = []; +const lastMessagesFactory = (): LastMessagesFactory => { + const array: Array<{ message: T; promise?: Promise }> = []; return { - push: (m: IMessage) => { + push: (m: T, promise?: Promise) => { if (!isTestEnv()) { return; } - array.push(m); + array.push({ message: m, promise }); if (array.length > LAST_MESSAGES_BUFFER_SIZE) { array.splice(0, 1); } }, - clear: () => (array.length = 0), - get: () => array.slice() as IMessage[], + clear: async () => { + const promises = array.filter((e) => !!e.promise).map((e) => e.promise!); + try { + await Promise.all(promises); + } catch (e) { + error( + `Error when trying to clear last messages: awaiting for the messages publishing promises failed.` + ); + } + array.length = 0; + return 0; + }, + get: () => array.map((e) => e.message).slice(), }; }; @@ -40,8 +54,8 @@ export const { get: getLastRejectedMessages, } = lastMessagesFactory(); -export const clearLastMessages = () => { - clearLastPublishedMessages(); - clearLastConsumedMessages(); - clearLastRejectedMessages(); +export const clearLastMessages = async () => { + await clearLastPublishedMessages(); + await clearLastConsumedMessages(); + await clearLastRejectedMessages(); }; diff --git a/src/publisher.ts b/src/publisher.ts index d9ad323..ed11035 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -31,13 +31,14 @@ export const publishMessage = async ( try { log(`-> publishing [${message.key}]`); - await channel.publish( + const promise = channel.publish( exchangeName, message.key, message.body, // channel.publish stringifies JSON by default. message.options ); - pushToLastPublishedMessages(message); + pushToLastPublishedMessages(message, promise); + await promise; } catch (e) { error( `Unable to publish message to exchange "${exchangeName}" with routing routingKey "${ diff --git a/tests/specs/cloudamqp.test.ts b/tests/specs/cloudamqp.test.ts index 6790036..fc7ee16 100644 --- a/tests/specs/cloudamqp.test.ts +++ b/tests/specs/cloudamqp.test.ts @@ -420,8 +420,8 @@ describe('node-message-bus', () => { }); describe('testing helper functions', () => { - beforeEach(() => { - clearLastMessages(); + beforeEach(async () => { + await clearLastMessages(); }); it('populates last published messages queue', async () => {