Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added object storage support #96

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
79a7151
elasticio#1456 removed unused argument
PaulAnnekov May 27, 2019
ddad1d5
elasticio#1456 1st draft
PaulAnnekov May 27, 2019
e374344
elasticio#1456 generate uuid for maester, we can't use messageId beca…
PaulAnnekov May 27, 2019
b590030
elasticio#1456 fixed missing this
PaulAnnekov May 27, 2019
7c020ab
elasticio#1456 writing integration test
PaulAnnekov May 27, 2019
cd2542b
elasticio#1456 small code fixes, made test to work
PaulAnnekov May 28, 2019
4277bdc
elasticio#1456 added test to respond directly, updated tests
PaulAnnekov May 28, 2019
d9d82cc
elasticio#1456 isolate enc/dec code together with maester in a separa…
PaulAnnekov May 28, 2019
adb2012
elasticio#1456 work with ecnrypted data as base64, not binary
PaulAnnekov May 28, 2019
e120aa0
elasticio#1456 handle maester errors and conflicts
PaulAnnekov May 28, 2019
d458603
elasticio#1456 group common headers
PaulAnnekov May 28, 2019
3a5c964
elasticio#1456 MAESTER_IS_STORE -> MAESTER_OUT
PaulAnnekov May 28, 2019
ce86502
elasticio#1456 ignore eslint, we can't fix this
PaulAnnekov May 28, 2019
5c5141d
elasticio#1456 fixed unit-tests
PaulAnnekov May 28, 2019
fc8dc24
elasticio#1456 maester -> object storage
PaulAnnekov May 29, 2019
228def6
elasticio#1456 OBJECT_STORAGE_BASEPATH-> OBJECT_STORAGE_URI
PaulAnnekov May 29, 2019
d142975
elasticio#1456 2.4.0-dev.24
PaulAnnekov May 29, 2019
6d12ac8
elasticio#1456 pass correct content-type
PaulAnnekov May 29, 2019
dcd065f
elasticio#1456 2.4.0-dev.26
PaulAnnekov May 29, 2019
e32b0e4
elasticio#1456 no need to handle encryptMessage errors, it's ok
PaulAnnekov May 29, 2019
07da777
elasticio#1456 object storage stream + gzip
PaulAnnekov May 30, 2019
9bb8fac
elasticio#1456 fixed working with requests streams by using axios, is…
PaulAnnekov May 30, 2019
74e0da8
elasticio#1456 2.4.0-dev.27
PaulAnnekov May 30, 2019
c2657eb
elasticio#1456 checked whether maester can return empty object, it's …
PaulAnnekov May 30, 2019
e06cd34
elasticio#1456 removed unused dependency
PaulAnnekov May 30, 2019
b91d229
elasticio#1456 fixed tests
PaulAnnekov May 31, 2019
6ee68d5
elasticio#1456 unit tests for objectStorage, fixed object storage bug
PaulAnnekov May 31, 2019
45c56e1
elasticio#1456 2.4.0-dev.28
PaulAnnekov May 31, 2019
54ea830
elasticio#1456 2.4.0-dev.29
PaulAnnekov May 31, 2019
d6125a0
elasticio#1456 updated package-lock
PaulAnnekov May 31, 2019
6949dfd
elasticio#1456 added parentheses and rule
PaulAnnekov Jun 3, 2019
d9e0ed4
elasticio#1456 added more parentheses
PaulAnnekov Jun 3, 2019
af9bb1c
elasticio#1456 decryptMessage -> onMessage
PaulAnnekov Jun 3, 2019
0602665
elasticio#1456 OBJECT_STORAGE_OUT -> OBJECT_STORAGE_ENABLED
PaulAnnekov Jun 3, 2019
c832151
elasticio#1456 createReq -> request, onRes -> onResponse
PaulAnnekov Jun 3, 2019
5b9341c
elasticio#1456 added fallback to message send via rabbitmq on object …
PaulAnnekov Jun 3, 2019
2f23324
elasticio#1456 2.4.2-dev.0
PaulAnnekov Jun 5, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ module.exports = {
}
],
'mocha/no-skipped-tests': ERROR,
'mocha/no-exclusive-tests': ERROR
'mocha/no-exclusive-tests': ERROR,
'new-parens': ERROR
}
};
36 changes: 31 additions & 5 deletions lib/amqp.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const log = require('./logging.js');
const amqplib = require('amqplib');
const encryptor = require('./encryptor.js');
const ObjectStorage = require('./objectStorage.js');
const co = require('co');
const _ = require('lodash');
const eventToPromise = require('event-to-promise');
Expand All @@ -11,6 +12,7 @@ const HEADER_ERROR_RESPONSE = 'x-eio-error-response';
class Amqp {
constructor(settings) {
this.settings = settings;
this.objectStorage = new ObjectStorage(settings);
}

connect(uri) {
Expand Down Expand Up @@ -53,13 +55,38 @@ class Amqp {
return Promise.resolve();
}

async decryptMessage(message) {
const objectId = message.properties.headers.objectId;
if (!objectId) {
return encryptor.decryptMessageContent(message.content);
}

return await this.objectStorage.getObject(objectId);
}

async encryptMessage(data, properties) {
if (!this.settings.OBJECT_STORAGE_ENABLED) {
return encryptor.encryptMessageContent(data);
}

let message = '';
try {
properties.headers.objectId = await this.objectStorage.addObject(data);
} catch (e) {
log.error('Failed to add message to object storage: %s', e);
message = encryptor.encryptMessageContent(data);
}

return message;
}

listenQueue(queueName, callback) {
//eslint-disable-next-line consistent-this
const self = this;

this.subscribeChannel.prefetch(this.settings.RABBITMQ_PREFETCH_SAILOR);

return this.subscribeChannel.consume(queueName, function decryptMessage(message) {
return this.subscribeChannel.consume(queueName, async function onMessage(message) {
log.trace('Message received: %j', message);

if (message === null) {
Expand All @@ -69,10 +96,10 @@ class Amqp {

let decryptedContent;
try {
decryptedContent = encryptor.decryptMessageContent(message.content, message.properties.headers);
decryptedContent = await self.decryptMessage(message);
PaulAnnekov marked this conversation as resolved.
Show resolved Hide resolved
} catch (err) {
console.error(
'Error occurred while parsing message #%j payload (%s)',
'Error occurred while getting message #%j payload (%s)',
message.fields.deliveryTag,
err.message
);
Expand Down Expand Up @@ -149,8 +176,7 @@ class Amqp {
async prepareMessageAndSendToExchange(data, properties, routingKey, throttle) {
const settings = this.settings;
data.headers = filterMessageHeaders(data.headers);
const encryptedData = encryptor.encryptMessageContent(data);

const encryptedData = await this.encryptMessage(data, properties);
return this.sendToExchange(settings.PUBLISH_MESSAGES_TO, routingKey, encryptedData, properties, throttle);
}

Expand Down
37 changes: 34 additions & 3 deletions lib/cipher.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var _ = require('lodash');
var crypto = require('crypto');
var debug = require('debug')('sailor:cipher');
var PassThrough = require('stream').PassThrough;

var ALGORYTHM = 'aes-256-cbc';
var PASSWORD = process.env.ELASTICIO_MESSAGE_CRYPTO_PASSWORD;
Expand All @@ -9,6 +10,8 @@ var VECTOR = process.env.ELASTICIO_MESSAGE_CRYPTO_IV;
exports.id = 1;
exports.encrypt = encryptIV;
exports.decrypt = decryptIV;
exports.decryptStream = decryptStreamIV;
exports.encryptStream = encryptStreamIV;

function encryptIV(rawData) {
debug('About to encrypt:', rawData);
Expand All @@ -30,11 +33,9 @@ function encryptIV(rawData) {
return cipher.update(rawData, 'utf-8', 'base64') + cipher.final('base64');
}

function decryptIV(encData, options) {
function decryptIV(encData) {
debug('About to decrypt:', encData);

options = options || {};

if (!_.isString(encData)) {
throw new Error('RabbitMQ message cipher.decryptIV() accepts only string as parameter.');
}
Expand All @@ -54,3 +55,33 @@ function decryptIV(encData, options) {

return result;
}

function encryptStreamIV() {
debug('Creating encryption stream');

if (!PASSWORD) {
return new PassThrough();
}

if (!VECTOR) {
throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set');
}

var encodeKey = crypto.createHash('sha256').update(PASSWORD, 'utf-8').digest();
return crypto.createCipheriv(ALGORYTHM, encodeKey, VECTOR);
}

function decryptStreamIV() {
debug('Creating decryption stream');

if (!PASSWORD) {
return new PassThrough();
}

if (!VECTOR) {
throw new Error('process.env.ELASTICIO_MESSAGE_CRYPTO_IV is not set');
}

var decodeKey = crypto.createHash('sha256').update(PASSWORD, 'utf-8').digest();
return crypto.createDecipheriv(ALGORYTHM, decodeKey, VECTOR);
}
25 changes: 23 additions & 2 deletions lib/encryptor.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
var cipher = require('./cipher.js');
var Readable = require('stream').Readable;
var zlib = require('zlib');
var getStream = require('get-stream');

exports.encryptMessageContent = encryptMessageContent;
exports.decryptMessageContent = decryptMessageContent;
exports.encryptMessageContentStream = encryptMessageContentStream;
exports.decryptMessageContentStream = decryptMessageContentStream;

function encryptMessageContent(messagePayload) {
return cipher.encrypt(JSON.stringify(messagePayload));
}

function decryptMessageContent(messagePayload, messageHeaders) {
function decryptMessageContent(messagePayload) {
if (!messagePayload || messagePayload.toString().length === 0) {
return null;
}
try {
return JSON.parse(cipher.decrypt(messagePayload.toString(), messageHeaders));
return JSON.parse(cipher.decrypt(messagePayload.toString()));
} catch (err) {
console.error(err.stack);
throw Error('Failed to decrypt message: ' + err.message);
}
}

function encryptMessageContentStream(data) {
const dataStream = new Readable();
dataStream.push(JSON.stringify(data));
dataStream.push(null);
return dataStream
.pipe(zlib.createGzip())
.pipe(cipher.encryptStream());
}

async function decryptMessageContentStream(stream) {
const s = stream.pipe(cipher.decryptStream()).pipe(zlib.createGunzip());
const content = await getStream(s);

return JSON.parse(content);
}
81 changes: 81 additions & 0 deletions lib/objectStorage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
const log = require('./logging.js');
const encryptor = require('./encryptor.js');
const uuid = require('uuid');
const axios = require('axios');
const http = require('http');
const https = require('https');

class ObjectStorage {
constructor(settings) {
this.api = axios.create({
baseURL: `${settings.OBJECT_STORAGE_URI}/`,
httpAgent: new http.Agent({ keepAlive: true }),
httpsAgent: new https.Agent({ keepAlive: true }),
headers: { Authorization: `Bearer ${settings.OBJECT_STORAGE_TOKEN}` },
validateStatus: null
});
}

async requestRetry({ maxAttempts, delay, request, onResponse }) {
let attempts = 0;
let res;
let err;
while (attempts < maxAttempts) {
err = null;
res = null;
attempts++;
try {
res = await request();
} catch (e) {
err = e;
}
if (onResponse && onResponse(err, res)) {
continue;
}
if (err || res.status >= 400) {
log.warn('Error during object get: %s', err ? err : `${res.status} (${res.statusText})`);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
break;
}
if (err || res.status >= 400) {
throw err || new Error(`HTTP error during object get: ${res.status} (${res.statusText})`);
}
return res;
}

async getObject(objectId) {
const res = await this.requestRetry({
maxAttempts: 3,
delay: 100,
request: () => this.api.get(`/objects/${objectId}`, { responseType: 'stream' })
});

return await encryptor.decryptMessageContentStream(res.data);
}

async addObject(data) {
let objectId = uuid.v4();
await this.requestRetry({
maxAttempts: 3,
delay: 100,
request: () => this.api.put(
`/objects/${objectId}`,
encryptor.encryptMessageContentStream(data),
{ headers: { 'content-type': 'application/octet-stream' } }
),
onResponse: (err, res) => {
if (!err && res.status === 409) {
log.warn('Generated already existing UUID');
objectId = uuid.v4();
return true;
}
}
});

return objectId;
}
}

module.exports = ObjectStorage;
5 changes: 4 additions & 1 deletion lib/settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ function readFrom(envVars) {
RATE_INTERVAL: 100, // 100ms
PROCESS_AMQP_DRAIN: true,
AMQP_PUBLISH_RETRY_DELAY: 100, // 100ms
AMQP_PUBLISH_RETRY_ATTEMPTS: 10
AMQP_PUBLISH_RETRY_ATTEMPTS: 10,
OBJECT_STORAGE_URI: '',
OBJECT_STORAGE_TOKEN: '',
OBJECT_STORAGE_ENABLED: false
};

_.forEach(requiredAlways, function readRequired(key) {
Expand Down
18 changes: 14 additions & 4 deletions mocha_spec/integration_helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const amqplib = require('amqplib');
const { EventEmitter } = require('events');
const PREFIX = 'sailor_nodejs_integration_test';
const nock = require('nock');
const getStream = require('get-stream');
const encryptor = require('../lib/encryptor.js');

const env = process.env;

Expand Down Expand Up @@ -131,11 +133,12 @@ class AmqpHelper extends EventEmitter {

this.publishChannel.ack(message);

const emittedMessage = JSON.parse(message.content.toString());
const content = message.content.toString();
const emittedMessage = content ? JSON.parse(content) : content;

const data = {
properties: message.properties,
body: emittedMessage.body,
body: emittedMessage ? emittedMessage.body : null,
emittedMessage
};
this.dataMessages.push(data);
Expand Down Expand Up @@ -178,7 +181,9 @@ function prepareEnv() {

env.DEBUG = 'sailor:debug';


env.ELASTICIO_OBJECT_STORAGE_URI = 'http://ma.es.ter';
env.ELASTICIO_OBJECT_STORAGE_ENABLED = '';
env.ELASTICIO_OBJECT_STORAGE_TOKEN = 'jwt';
}

function mockApiTaskStepResponse(response) {
Expand All @@ -197,6 +202,11 @@ function mockApiTaskStepResponse(response) {
.reply(200, Object.assign(defaultResponse, response));
}

async function encryptForObjectStorage(input) {
const stream = encryptor.encryptMessageContentStream(input);
return await getStream.buffer(stream);
}

exports.PREFIX = PREFIX;

exports.amqp = function amqp() {
Expand All @@ -205,4 +215,4 @@ exports.amqp = function amqp() {

exports.prepareEnv = prepareEnv;
exports.mockApiTaskStepResponse = mockApiTaskStepResponse;

exports.encryptForObjectStorage = encryptForObjectStorage;
Loading