Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Added changes to handle exclusive queues and priority
Browse files Browse the repository at this point in the history
  • Loading branch information
nikosd23 committed Sep 4, 2015
1 parent 9546d04 commit 90f252f
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 10 deletions.
24 changes: 18 additions & 6 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,23 @@ function Queue(channel, name, options) {
this.replyName = null;
this.channel = channel;
this.durable = options.durable;
this.priority = options.priority;
this.exclusive = options.exclusive;
this.noAck = options.noAck;
this.tag = null;

var queueOptions = { durable: this.durable};

if (this.priority) {
queueOptions = _.extend({arguments: {'x-max-priority': this.priority}}, queueOptions);
}

if (this.exclusive) {
queueOptions = _.extend({exclusive: this.exclusive}, queueOptions);
}

this.channel
.assertQueue(name, {
durable: this.durable
})
.assertQueue(name, queueOptions)
.then(function createReplyQueue(info) {
this.emit('ready', info);
}.bind(this))
Expand Down Expand Up @@ -76,15 +86,17 @@ Queue.prototype.onMessage = function(msg) {

};

Queue.prototype.publish = function(obj, cb, replyHandler) {
Queue.prototype.publish = function(obj, headers, cb, replyHandler) {
var msg = JSON.stringify(obj);
var id = uuid.v4();
if (replyHandler) {
this.channel.replyHandlers[id] = replyHandler;
}
this.channel.sendToQueue(this.name, new Buffer(msg), {
var defaultHeaders = {
persistent: !replyHandler,
correlationId: id,
replyTo: replyHandler ? this.channel.replyName : undefined
}, cb);
};
headers = _.extend(defaultHeaders, headers);
this.channel.sendToQueue(this.name, new Buffer(msg), headers, cb);
};
12 changes: 10 additions & 2 deletions lib/wrabbit.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,16 @@ WRabbit.prototype.purge = function(name, done) {
function onFail(err) { done(err); }
};

WRabbit.prototype.publish = function(name, obj, cb, replyHandler) {
this.queues[name].publish(obj, cb, replyHandler);
WRabbit.prototype.publish = function(name, obj, headers, cb, replyHandler) {
if(typeof headers === 'function'){
replyHandler = cb;
cb = headers;
headers = {};
}
if (!headers) {
headers = {};
}
this.queues[name].publish(obj, headers, cb, replyHandler);
};

WRabbit.prototype.handle = function(name, handler) {
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "wrabbit",
"version": "0.0.1",
"version": "0.0.2",
"description": "Simple AMQP / RabbitMQ job queues for node extended from jackrabbit",
"keywords": [
"amqp",
Expand Down Expand Up @@ -33,6 +33,7 @@
"express": "^4.8.7",
"istanbul": "^0.3.15",
"mocha": "^1.21.4",
"sinon": "^1.16.1",
"supertest": "^0.13.0"
}
}
38 changes: 37 additions & 1 deletion test/rpc.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ var assert = require('chai').assert;
var jackrabbit = require('..');
var Queue = require('../lib/queue');
var util = require('./util');
var sinon = require('sinon')
, sandbox = sinon.sandbox.create();

describe('jackrabbit', function() {

describe('rpc', function() {

before(function connect(done) {
this.onMessageSpy = sandbox.spy(Queue.prototype, 'onMessage');
this.client = jackrabbit(util.RABBIT_URL, 1);
this.client.once('connected', done);
});
Expand All @@ -34,16 +37,49 @@ describe('jackrabbit', function() {

after(function cleanup(done) {
this.client.destroy(this.name, done);
sandbox.restore();
});

afterEach(function cleanupSandbox() {
this.onMessageSpy.reset();
});

it('handles an rpc response', function(done) {
this.client.publish(this.name, { a: 2, b: 3 }, undefined, function onResponse(err, response) {
this.client.publish(this.name, { a: 2, b: 3 }, function() {}, function onResponse(err, response) {
assert.ok(!err);
assert.equal(response, 5);
done();
});
});

it('should create exclusive queues', function(done) {
this.exclusiveName = util.NAME + '.rpc-add-exclusive';
this.client.create(this.exclusiveName, {exclusive: true}, done);
});

it('should create priority queues', function(done) {
this.priorityName = util.NAME + '.rpc-add-priority';
this.client.create(this.priorityName, {priority: 10}, done);
});

it('handles an rpc response with undefined headers', function(done) {
this.client.publish(this.name, { a: 2, b: 3 }, undefined, function() {}, function onResponse(err, response) {
assert.ok(!err);
assert.equal(response, 5);
done();
});
});

it('handles an rpc response with priority header', function(done) {
this.client.publish(this.name, { a: 2, b: 3 }, {priority: 5}, function() {}, function onResponse(err, response) {
assert.ok(!err);
assert.equal(response, 5);
assert.ok(this.onMessageSpy.calledOnce);
assert.equal(this.onMessageSpy.getCall(0).args[0].properties.priority, 5);
done();
}.bind(this));
});

it('handles an publish ack', function(done) {
this.client.publish(this.name, { a: 2, b: 3 }, function publishAck(err, ok) {
assert.ok(!err);
Expand Down

0 comments on commit 90f252f

Please sign in to comment.