Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

verify the replyTo queue still exists before running the command #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/AMQPEndpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ class AMQPEndpoint {
*/
async disconnect() {
if (!this._channel) return;
await this._channel.close();
const channel = this._channel;
this._channel = null;
await channel.close();
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/AMQPRPCClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class AMQPRPCClient extends AMQPEndpoint {
* @returns {Promise}
*/
async disconnect() {
if (!this._channel) {
return;
}
await this._channel.cancel(this._consumerTag);

if (this._params.repliesQueue === '') {
Expand Down
38 changes: 38 additions & 0 deletions src/AMQPRPCServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class AMQPRPCServer extends AMQPEndpoint {
* @param {Object} params
* @param {String} params.requestsQueue queue when AMQPRPC client sends commands, should correspond with AMQPRPCClient
* default is '' which means auto-generated queue name
* @param {Boolean} params.verifyReplyQueue verify that the reply queue exists before handling a request
*/
constructor(connection, params = {}) {
params.requestsQueue = params.requestsQueue || '';
Expand All @@ -24,6 +25,7 @@ class AMQPRPCServer extends AMQPEndpoint {

this._requestsQueue = params.requestsQueue;
this._commands = {};
this._verifyReplyQueue = params.verifyReplyQueue;
}

/**
Expand Down Expand Up @@ -74,6 +76,33 @@ class AMQPRPCServer extends AMQPEndpoint {
return this;
}

/**
* @private
*
* this is an internal method used to verify if the
* reply queue still exists by using a disposable channel.
*
* Extracted from rabbitmq documentation: https://www.rabbitmq.com/direct-reply-to.html
*
* If the RPC server is going to perform some expensive computation
* it might wish to check if the client has gone away. To do this the
* server can declare the generated reply name first on a disposable
* channel in order to determine whether it still exists.
*/
async _checkQueue(queueName) {
const channel = await this._connection.createChannel();
let result = true;
try {
//we need to handle this error otherwise it breaks the application
channel.on('error', () => { result = false; });
await channel.checkQueue(queueName);
} catch(err) {
result = false;
}
await channel.close();
return result;
}

/**
*
* @private
Expand All @@ -86,6 +115,15 @@ class AMQPRPCServer extends AMQPEndpoint {
const persistent = msg.properties.deliveryMode !== 1;

try {
if (this._verifyReplyQueue) {
const replyQueueExists = await this._checkQueue(replyTo);
if (!replyQueueExists) {
//This means that the client has disposed the reply queue
//before we finished.
return;
}
}

const result = await this._dispatchCommand(msg);

const content = new CommandResult(CommandResult.STATES.SUCCESS, result).pack();
Expand Down
20 changes: 20 additions & 0 deletions test/integration/AMQPRPC.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,26 @@ describe('AMQPRPCClient to AMQPRPCServer', () => {
expect(commandStub).to.have.callCount(1);
expect(commandStub.getCall(0).args).to.deep.equal([]);
});
it('should not invoke the command if client disconnects', async () => {
server._verifyReplyQueue = true;
const commandStub = sinon.stub();
server.addCommand('command', commandStub);

//disconnect the client when the req reaches
//the server but before dispatching to the handler
server._handleMsg = (function(original) {
return async function(...args) {
await client.disconnect();
return await original.apply(server, args);
};
})(server._handleMsg);

await expect(client.sendCommand('command'))
.to.be.rejectedWith(Error)
.and.eventually.have.property('message', 'sendCommand canceled due to client disconnect, command:command, correlationId:0');

expect(commandStub).to.have.callCount(0);
});
it('should delete timedout requests from map', async () => {
server.addCommand('command', () => new Promise(sinon.stub()));
await expect(client.sendCommand('command')).to.be.rejectedWith(Error);
Expand Down