From 58d16c90c0762c4719705ff8a5cad25847b5fcec Mon Sep 17 00:00:00 2001 From: Nikita Savchenko Date: Tue, 4 Jul 2023 16:18:33 +0200 Subject: [PATCH] Allow to configure CloudAMQP instance via config --- package-lock.json | 4 +-- package.json | 5 ++-- src/Const/env.ts | 5 ---- src/Types/config.ts | 4 +++ src/channel.ts | 15 ++++++---- src/cloudamqp.ts | 15 +++++----- src/config.ts | 50 +++++++++++++++++++++++++++------ src/connection.ts | 28 ++++++++++-------- tests/specs/connection2.env | 4 +++ tests/specs/connection2.test.ts | 28 ++++++++++++++++++ 10 files changed, 116 insertions(+), 42 deletions(-) create mode 100644 tests/specs/connection2.env create mode 100644 tests/specs/connection2.test.ts diff --git a/package-lock.json b/package-lock.json index 4b12944..34be88e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "node-message-bus", - "version": "3.0.0", + "version": "3.1.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "node-message-bus", - "version": "3.0.0", + "version": "3.1.0", "license": "MIT", "dependencies": { "@types/amqplib": "^0.10.1", diff --git a/package.json b/package.json index 778b6d3..cd1bdeb 100644 --- a/package.json +++ b/package.json @@ -1,15 +1,16 @@ { "name": "node-message-bus", - "version": "3.0.11", + "version": "3.1.0", "description": "Minimalistic and complete AMQP message bus implementation", "main": "lib/index.js", "files": [ "lib/**/*" ], "scripts": { - "test": "npm run test:cloudamqp && npm run test:connection", + "test": "npm run test:cloudamqp && npm run test:connection && npm run test:connection2", "test:cloudamqp": "env-cmd -f tests/common.env -f tests/specs/cloudamqp.env --use-shell \"export NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY=$CLOUDAMQP_API_KEY && mocha --config=tests/mocha.yaml tests/**/*cloudamqp.test.ts\"", "test:connection": "env-cmd -f tests/common.env -f tests/specs/connection.env mocha --config=tests/mocha.yaml tests/**/*connection.test.ts", + "test:connection2": "env-cmd -f tests/common.env -f tests/specs/connection2.env --use-shell \"export AMQP_TEMP=$CLOUDAMQP_API_KEY && mocha --config=tests/mocha.yaml tests/**/*connection2.test.ts\"", "build": "tsc && tsc-alias", "start": "npm run build -- -w & tsc-alias -w" }, diff --git a/src/Const/env.ts b/src/Const/env.ts index 44213ed..c0a1cb0 100644 --- a/src/Const/env.ts +++ b/src/Const/env.ts @@ -16,8 +16,3 @@ export const NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_PREFERRED_REGIONS = ( ).split(','); export const isTestEnv = () => !!NODE_ENV.includes('test'); -export const isUsingCloudAmqp = () => - !!( - process.env.NODE_ENV !== 'production' && - NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY - ); diff --git a/src/Types/config.ts b/src/Types/config.ts index ceba2a9..61819e8 100644 --- a/src/Types/config.ts +++ b/src/Types/config.ts @@ -24,6 +24,10 @@ export interface BindingConfig { } export interface MessageBusConfig { + /** Allows to use CloudAMQP temp instance for this run. */ + useCloudAmqpTempInstance?: { + apiKey: string; + }; logger?: (logType: LogType, message: string) => unknown; exchanges?: ExchangeConfig[]; queues?: QueueConfig[]; diff --git a/src/channel.ts b/src/channel.ts index 6ad03db..29f27c3 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -1,8 +1,12 @@ -import { ConfirmChannel } from 'amqplib'; import { MessageBusConfig } from 'Types'; import { error, log } from 'Utils'; +import { ConfirmChannel } from 'amqplib'; import { configureMessageBus, configureMessageBusStatic } from './config'; -import { closeMessageBusConnection, getConnection } from './connection'; +import { + closeMessageBusConnection, + getConnection, + initConnection, +} from './connection'; let initPromiseResolve = (value: MessageBusConfig) => { value; @@ -18,11 +22,10 @@ const _postInitPromise = new Promise( /** Initialize the message bus. */ export const initMessageBus = async (config: MessageBusConfig = {}) => { - // Pass only properties that need to be configured BEFORE RabbitMQ connection is established. - await configureMessageBusStatic({ - logger: config.logger, - }); + // Pass only properties that need to be configured BEFORE AMQP connection is established. + await configureMessageBusStatic(config); + await initConnection(); initPromiseResolve(config); await _postInitPromise; }; diff --git a/src/cloudamqp.ts b/src/cloudamqp.ts index d8a214b..5969496 100644 --- a/src/cloudamqp.ts +++ b/src/cloudamqp.ts @@ -4,16 +4,15 @@ import { CLOUD_AMQP_URL_INSTANCE, CLOUD_AMQP_URL_INSTANCES, CLOUD_AMQP_URL_REGIONS, - NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY, NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_INSTANCE_LIFETIME, NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_PREFERRED_REGIONS, } from 'Const'; import { error, log } from 'Utils'; import { setTimeout } from 'timers/promises'; +import { getCloudAmqpKey } from './config'; -const authorizationHeader = `Basic ${Buffer.from( - `:${NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY || ''}` -).toString('base64')}`; +const authorizationHeader = () => + `Basic ${Buffer.from(`:${getCloudAmqpKey()}`).toString('base64')}`; const getInstancesList = async (): Promise< Array<{ @@ -32,7 +31,7 @@ const getInstancesList = async (): Promise< try { const res = await fetch(`${CLOUD_AMQP_URL_INSTANCES}`, { headers: { - Authorization: authorizationHeader, + Authorization: authorizationHeader(), }, }); result = await res.json(); @@ -64,7 +63,7 @@ export const deleteCloudAmqpInstance = async ({ const res = await fetch(`${CLOUD_AMQP_URL_INSTANCE(id.toString())}`, { method: 'DELETE', headers: { - Authorization: authorizationHeader, + Authorization: authorizationHeader(), }, }); (await res.text()) as any; @@ -103,7 +102,7 @@ export const getCloudAmqpRegions = async (): Promise< try { const res = await fetch(`${CLOUD_AMQP_URL_REGIONS}`, { headers: { - Authorization: authorizationHeader, + Authorization: authorizationHeader(), }, }); result = await res.json(); @@ -171,7 +170,7 @@ export const getNewCloudAmqpInstance = async (): Promise<{ const res = await fetch(`${CLOUD_AMQP_URL_INSTANCES}`, { method: 'POST', headers: { - Authorization: authorizationHeader, + Authorization: authorizationHeader(), 'Content-Type': 'application/json', }, body: JSON.stringify(newInstanceConfig), diff --git a/src/config.ts b/src/config.ts index b6fc918..19efb18 100644 --- a/src/config.ts +++ b/src/config.ts @@ -5,34 +5,68 @@ import { ChannelWrapper, } from 'amqp-connection-manager'; import { ConfirmChannel } from 'amqplib'; -import { DEFAULT_CONFIG, DEFAULT_EXCHANGE_NAME } from './Const'; +import { + DEFAULT_CONFIG, + DEFAULT_EXCHANGE_NAME, + NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY, +} from './Const'; // Config that was applied through the lifetime of the message bus. -const appliedConfig: Required> = - { - exchanges: [], - queues: [], - bindings: [], - }; +const appliedConfig: Required< + Omit +> = { + exchanges: [], + queues: [], + bindings: [], +}; +let useCloudAmqpTempInstance: + | MessageBusConfig['useCloudAmqpTempInstance'] + | null = null; let defaultExchangeConfigured = false; export let amqpConfig: AmqpConnectionManagerOptions | null = null; +export const isUsingCloudAmqp = () => + !!useCloudAmqpTempInstance || + !!( + process.env.NODE_ENV !== 'production' && + NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY + ); + +export const getCloudAmqpKey = () => + useCloudAmqpTempInstance?.apiKey || + NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY || + ''; + /** * Returns all applied message bus data through the lifetime of this application. */ export const getMessageBusConfig = () => appliedConfig; -export const configureMessageBusStatic = async (config: MessageBusConfig) => { +/** + * Configure message bus before it is connected to AMQP. + */ +export const configureMessageBusStatic = async ( + config: Pick< + MessageBusConfig, + 'amqpConfig' | 'useCloudAmqpTempInstance' | 'logger' + > +) => { if (typeof config.logger === 'function') { setLoggerFunction(config.logger); } + if (config.useCloudAmqpTempInstance) { + useCloudAmqpTempInstance = config.useCloudAmqpTempInstance; + } if (config.amqpConfig) { amqpConfig = config.amqpConfig; } }; +/** + * Waits until AMQP is initialized, then configures message bus. + */ export const configureMessageBus = async ( config: MessageBusConfig, channel?: ChannelWrapper | ConfirmChannel diff --git a/src/connection.ts b/src/connection.ts index 3ac959d..d51aeaa 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -1,9 +1,4 @@ -import { - NODE_ENV, - NODE_MESSAGE_BUS_CONNECTION_URL, - isTestEnv, - isUsingCloudAmqp, -} from 'Const'; +import { NODE_ENV, NODE_MESSAGE_BUS_CONNECTION_URL, isTestEnv } from 'Const'; import { error, getPrintableConnectionString, log } from 'Utils'; import amqp from 'amqp-connection-manager'; import { @@ -11,14 +6,24 @@ import { deleteCloudAmqpInstance, getNewCloudAmqpInstance, } from './cloudamqp'; -import { amqpConfig } from './config'; +import { amqpConfig, isUsingCloudAmqp } from './config'; let connectionUrl = ''; let cloudAmqpInstanceId = 0; -const useCloudAMQP = isUsingCloudAmqp(); +let _resolveInitPromise = (m: ReturnType) => {}; +let initPromise: Promise> = new Promise( + (r) => (_resolveInitPromise = r) +); +let initialized = false; +export const initConnection = async () => { + if (initialized) { + return; + } + initialized = true; + + const useCloudAMQP = isUsingCloudAmqp(); -const initPromise = (async () => { if (useCloudAMQP) { await cleanupOldCloudAmqpInstances(); } @@ -76,8 +81,9 @@ const initPromise = (async () => { } }); + _resolveInitPromise(connection); return connection; -})(); +}; export const getConnection = () => initPromise; export const closeMessageBusConnection = async () => { @@ -89,7 +95,7 @@ export const closeMessageBusConnection = async () => { await (await getConnection()).close(); log(`RabbitMQ connection closed.`); - if (useCloudAMQP && cloudAmqpInstanceId) { + if (isUsingCloudAmqp() && cloudAmqpInstanceId) { await deleteCloudAmqpInstance({ id: cloudAmqpInstanceId }); } }; diff --git a/tests/specs/connection2.env b/tests/specs/connection2.env new file mode 100644 index 0000000..6bd82a3 --- /dev/null +++ b/tests/specs/connection2.env @@ -0,0 +1,4 @@ +NODE_ENV=production + +# Reset in package.json +AMQP_TEMP=$CLOUDAMQP_API_KEY \ No newline at end of file diff --git a/tests/specs/connection2.test.ts b/tests/specs/connection2.test.ts new file mode 100644 index 0000000..d92e9b6 --- /dev/null +++ b/tests/specs/connection2.test.ts @@ -0,0 +1,28 @@ +import { closeMessageBus, initMessageBus } from '../../lib'; +import { until } from '../utils'; + +describe('node-message-bus', () => { + let logs: string[] = []; + + before(async () => { + await initMessageBus({ + useCloudAmqpTempInstance: { + apiKey: process.env.AMQP_TEMP || '', + }, + logger: (f, msg) => { + logs.push(msg); + console[f](msg); + }, + }); + + await until(() => !!logs.find((log) => log.includes('Connected to'))); + }); + + after(async () => { + await closeMessageBus(); + }); + + it('connects to temp CloudAMQP', async () => { + // do nothing + }); +});