Skip to content

Commit

Permalink
Make clearLastMessages async
Browse files Browse the repository at this point in the history
  • Loading branch information
nikitaeverywhere committed Jul 21, 2023
1 parent db55bdc commit 0375fd1
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 21 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-message-bus",
"version": "3.1.1",
"version": "3.2.0",
"description": "Minimalistic and complete AMQP message bus implementation",
"main": "lib/index.js",
"files": [
Expand Down
4 changes: 2 additions & 2 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ after(async () => {
});

describe('Dummy test', () => {
beforeEach(() => {
clearLastMessages();
beforeEach(async () => {
await clearLastMessages();
});

it('tests something', async () => {
Expand Down
42 changes: 28 additions & 14 deletions src/Utils/testing.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,40 @@
import { LAST_MESSAGES_BUFFER_SIZE, isTestEnv } from 'Const';
import { IMessage } from 'Types';
import { error } from './logger';

interface LastMessagesFactory {
push: <T extends IMessage>(m: T) => void;
clear: () => number;
get: <T extends IMessage>() => T[];
interface LastMessagesFactory<T extends IMessage> {
/** When promise is given, the system will await on it before clearing messages. */
push: (m: T, promise?: Promise<any>) => void;
/** Clear last published messages and also await on publishing all pending messages. */
clear: () => Promise<number>;
get: () => T[];
}

const lastMessagesFactory = (): LastMessagesFactory => {
const array: Array<IMessage> = [];
const lastMessagesFactory = <T extends IMessage>(): LastMessagesFactory<T> => {
const array: Array<{ message: T; promise?: Promise<any> }> = [];
return {
push: (m: IMessage) => {
push: (m: T, promise?: Promise<any>) => {
if (!isTestEnv()) {
return;
}
array.push(m);
array.push({ message: m, promise });
if (array.length > LAST_MESSAGES_BUFFER_SIZE) {
array.splice(0, 1);
}
},
clear: () => (array.length = 0),
get: <IMessage>() => array.slice() as IMessage[],
clear: async () => {
const promises = array.filter((e) => !!e.promise).map((e) => e.promise!);
try {
await Promise.all(promises);
} catch (e) {
error(
`Error when trying to clear last messages: awaiting for the messages publishing promises failed.`
);
}
array.length = 0;
return 0;
},
get: () => array.map((e) => e.message).slice(),
};
};

Expand All @@ -40,8 +54,8 @@ export const {
get: getLastRejectedMessages,
} = lastMessagesFactory();

export const clearLastMessages = () => {
clearLastPublishedMessages();
clearLastConsumedMessages();
clearLastRejectedMessages();
export const clearLastMessages = async () => {
await clearLastPublishedMessages();
await clearLastConsumedMessages();
await clearLastRejectedMessages();
};
5 changes: 3 additions & 2 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ export const publishMessage = async <DataType extends IMessage = Message>(

try {
log(`-> publishing [${message.key}]`);
await channel.publish(
const promise = channel.publish(
exchangeName,
message.key,
message.body, // channel.publish stringifies JSON by default.
message.options
);
pushToLastPublishedMessages(message);
pushToLastPublishedMessages(message, promise);
await promise;
} catch (e) {
error(
`Unable to publish message to exchange "${exchangeName}" with routing routingKey "${
Expand Down
4 changes: 2 additions & 2 deletions tests/specs/cloudamqp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,8 @@ describe('node-message-bus', () => {
});

describe('testing helper functions', () => {
beforeEach(() => {
clearLastMessages();
beforeEach(async () => {
await clearLastMessages();
});

it('populates last published messages queue', async () => {
Expand Down

0 comments on commit 0375fd1

Please sign in to comment.