Skip to content

Commit

Permalink
Allow to configure CloudAMQP instance via config
Browse files Browse the repository at this point in the history
  • Loading branch information
nikitaeverywhere committed Jul 4, 2023
1 parent 3c06af5 commit 58d16c9
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 42 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
{
"name": "node-message-bus",
"version": "3.0.11",
"version": "3.1.0",
"description": "Minimalistic and complete AMQP message bus implementation",
"main": "lib/index.js",
"files": [
"lib/**/*"
],
"scripts": {
"test": "npm run test:cloudamqp && npm run test:connection",
"test": "npm run test:cloudamqp && npm run test:connection && npm run test:connection2",
"test:cloudamqp": "env-cmd -f tests/common.env -f tests/specs/cloudamqp.env --use-shell \"export NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY=$CLOUDAMQP_API_KEY && mocha --config=tests/mocha.yaml tests/**/*cloudamqp.test.ts\"",
"test:connection": "env-cmd -f tests/common.env -f tests/specs/connection.env mocha --config=tests/mocha.yaml tests/**/*connection.test.ts",
"test:connection2": "env-cmd -f tests/common.env -f tests/specs/connection2.env --use-shell \"export AMQP_TEMP=$CLOUDAMQP_API_KEY && mocha --config=tests/mocha.yaml tests/**/*connection2.test.ts\"",
"build": "tsc && tsc-alias",
"start": "npm run build -- -w & tsc-alias -w"
},
Expand Down
5 changes: 0 additions & 5 deletions src/Const/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,3 @@ export const NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_PREFERRED_REGIONS = (
).split(',');

export const isTestEnv = () => !!NODE_ENV.includes('test');
export const isUsingCloudAmqp = () =>
!!(
process.env.NODE_ENV !== 'production' &&
NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY
);
4 changes: 4 additions & 0 deletions src/Types/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ export interface BindingConfig {
}

export interface MessageBusConfig {
/** Allows to use CloudAMQP temp instance for this run. */
useCloudAmqpTempInstance?: {
apiKey: string;
};
logger?: (logType: LogType, message: string) => unknown;
exchanges?: ExchangeConfig[];
queues?: QueueConfig[];
Expand Down
15 changes: 9 additions & 6 deletions src/channel.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { ConfirmChannel } from 'amqplib';
import { MessageBusConfig } from 'Types';
import { error, log } from 'Utils';
import { ConfirmChannel } from 'amqplib';
import { configureMessageBus, configureMessageBusStatic } from './config';
import { closeMessageBusConnection, getConnection } from './connection';
import {
closeMessageBusConnection,
getConnection,
initConnection,
} from './connection';

let initPromiseResolve = (value: MessageBusConfig) => {
value;
Expand All @@ -18,11 +22,10 @@ const _postInitPromise = new Promise<MessageBusConfig>(

/** Initialize the message bus. */
export const initMessageBus = async (config: MessageBusConfig = {}) => {
// Pass only properties that need to be configured BEFORE RabbitMQ connection is established.
await configureMessageBusStatic({
logger: config.logger,
});
// Pass only properties that need to be configured BEFORE AMQP connection is established.
await configureMessageBusStatic(config);

await initConnection();
initPromiseResolve(config);
await _postInitPromise;
};
Expand Down
15 changes: 7 additions & 8 deletions src/cloudamqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import {
CLOUD_AMQP_URL_INSTANCE,
CLOUD_AMQP_URL_INSTANCES,
CLOUD_AMQP_URL_REGIONS,
NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY,
NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_INSTANCE_LIFETIME,
NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_PREFERRED_REGIONS,
} from 'Const';
import { error, log } from 'Utils';
import { setTimeout } from 'timers/promises';
import { getCloudAmqpKey } from './config';

const authorizationHeader = `Basic ${Buffer.from(
`:${NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY || ''}`
).toString('base64')}`;
const authorizationHeader = () =>
`Basic ${Buffer.from(`:${getCloudAmqpKey()}`).toString('base64')}`;

const getInstancesList = async (): Promise<
Array<{
Expand All @@ -32,7 +31,7 @@ const getInstancesList = async (): Promise<
try {
const res = await fetch(`${CLOUD_AMQP_URL_INSTANCES}`, {
headers: {
Authorization: authorizationHeader,
Authorization: authorizationHeader(),
},
});
result = await res.json();
Expand Down Expand Up @@ -64,7 +63,7 @@ export const deleteCloudAmqpInstance = async ({
const res = await fetch(`${CLOUD_AMQP_URL_INSTANCE(id.toString())}`, {
method: 'DELETE',
headers: {
Authorization: authorizationHeader,
Authorization: authorizationHeader(),
},
});
(await res.text()) as any;
Expand Down Expand Up @@ -103,7 +102,7 @@ export const getCloudAmqpRegions = async (): Promise<
try {
const res = await fetch(`${CLOUD_AMQP_URL_REGIONS}`, {
headers: {
Authorization: authorizationHeader,
Authorization: authorizationHeader(),
},
});
result = await res.json();
Expand Down Expand Up @@ -171,7 +170,7 @@ export const getNewCloudAmqpInstance = async (): Promise<{
const res = await fetch(`${CLOUD_AMQP_URL_INSTANCES}`, {
method: 'POST',
headers: {
Authorization: authorizationHeader,
Authorization: authorizationHeader(),
'Content-Type': 'application/json',
},
body: JSON.stringify(newInstanceConfig),
Expand Down
50 changes: 42 additions & 8 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,68 @@ import {
ChannelWrapper,
} from 'amqp-connection-manager';
import { ConfirmChannel } from 'amqplib';
import { DEFAULT_CONFIG, DEFAULT_EXCHANGE_NAME } from './Const';
import {
DEFAULT_CONFIG,
DEFAULT_EXCHANGE_NAME,
NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY,
} from './Const';

// Config that was applied through the lifetime of the message bus.
const appliedConfig: Required<Omit<MessageBusConfig, 'logger' | 'amqpConfig'>> =
{
exchanges: [],
queues: [],
bindings: [],
};
const appliedConfig: Required<
Omit<MessageBusConfig, 'logger' | 'amqpConfig' | 'useCloudAmqpTempInstance'>
> = {
exchanges: [],
queues: [],
bindings: [],
};

let useCloudAmqpTempInstance:
| MessageBusConfig['useCloudAmqpTempInstance']
| null = null;
let defaultExchangeConfigured = false;

export let amqpConfig: AmqpConnectionManagerOptions | null = null;

export const isUsingCloudAmqp = () =>
!!useCloudAmqpTempInstance ||
!!(
process.env.NODE_ENV !== 'production' &&
NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY
);

export const getCloudAmqpKey = () =>
useCloudAmqpTempInstance?.apiKey ||
NODE_MESSAGE_BUS_TESTING_CLOUDAMQP_API_KEY ||
'';

/**
* Returns all applied message bus data through the lifetime of this application.
*/
export const getMessageBusConfig = () => appliedConfig;

export const configureMessageBusStatic = async (config: MessageBusConfig) => {
/**
* Configure message bus before it is connected to AMQP.
*/
export const configureMessageBusStatic = async (
config: Pick<
MessageBusConfig,
'amqpConfig' | 'useCloudAmqpTempInstance' | 'logger'
>
) => {
if (typeof config.logger === 'function') {
setLoggerFunction(config.logger);
}
if (config.useCloudAmqpTempInstance) {
useCloudAmqpTempInstance = config.useCloudAmqpTempInstance;
}
if (config.amqpConfig) {
amqpConfig = config.amqpConfig;
}
};

/**
* Waits until AMQP is initialized, then configures message bus.
*/
export const configureMessageBus = async (
config: MessageBusConfig,
channel?: ChannelWrapper | ConfirmChannel
Expand Down
28 changes: 17 additions & 11 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
import {
NODE_ENV,
NODE_MESSAGE_BUS_CONNECTION_URL,
isTestEnv,
isUsingCloudAmqp,
} from 'Const';
import { NODE_ENV, NODE_MESSAGE_BUS_CONNECTION_URL, isTestEnv } from 'Const';
import { error, getPrintableConnectionString, log } from 'Utils';
import amqp from 'amqp-connection-manager';
import {
cleanupOldCloudAmqpInstances,
deleteCloudAmqpInstance,
getNewCloudAmqpInstance,
} from './cloudamqp';
import { amqpConfig } from './config';
import { amqpConfig, isUsingCloudAmqp } from './config';

let connectionUrl = '';
let cloudAmqpInstanceId = 0;

const useCloudAMQP = isUsingCloudAmqp();
let _resolveInitPromise = (m: ReturnType<typeof amqp.connect>) => {};
let initPromise: Promise<ReturnType<typeof amqp.connect>> = new Promise(
(r) => (_resolveInitPromise = r)
);
let initialized = false;
export const initConnection = async () => {
if (initialized) {
return;
}
initialized = true;

const useCloudAMQP = isUsingCloudAmqp();

const initPromise = (async () => {
if (useCloudAMQP) {
await cleanupOldCloudAmqpInstances();
}
Expand Down Expand Up @@ -76,8 +81,9 @@ const initPromise = (async () => {
}
});

_resolveInitPromise(connection);
return connection;
})();
};

export const getConnection = () => initPromise;
export const closeMessageBusConnection = async () => {
Expand All @@ -89,7 +95,7 @@ export const closeMessageBusConnection = async () => {
await (await getConnection()).close();
log(`RabbitMQ connection closed.`);

if (useCloudAMQP && cloudAmqpInstanceId) {
if (isUsingCloudAmqp() && cloudAmqpInstanceId) {
await deleteCloudAmqpInstance({ id: cloudAmqpInstanceId });
}
};
4 changes: 4 additions & 0 deletions tests/specs/connection2.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
NODE_ENV=production

# Reset in package.json
AMQP_TEMP=$CLOUDAMQP_API_KEY
28 changes: 28 additions & 0 deletions tests/specs/connection2.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { closeMessageBus, initMessageBus } from '../../lib';
import { until } from '../utils';

describe('node-message-bus', () => {
let logs: string[] = [];

before(async () => {
await initMessageBus({
useCloudAmqpTempInstance: {
apiKey: process.env.AMQP_TEMP || '',
},
logger: (f, msg) => {
logs.push(msg);
console[f](msg);
},
});

await until(() => !!logs.find((log) => log.includes('Connected to')));
});

after(async () => {
await closeMessageBus();
});

it('connects to temp CloudAMQP', async () => {
// do nothing
});
});

0 comments on commit 58d16c9

Please sign in to comment.