diff --git a/lib/queue.js b/lib/queue.js index 044833fc..b94d6133 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -12,13 +12,23 @@ function Queue(channel, name, options) { this.replyName = null; this.channel = channel; this.durable = options.durable; + this.priority = options.priority; + this.exclusive = options.exclusive; this.noAck = options.noAck; this.tag = null; + var queueOptions = { durable: this.durable}; + + if (this.priority) { + queueOptions = _.extend({arguments: {'x-max-priority': this.priority}}, queueOptions); + } + + if (this.exclusive) { + queueOptions = _.extend({exclusive: this.exclusive}, queueOptions); + } + this.channel - .assertQueue(name, { - durable: this.durable - }) + .assertQueue(name, queueOptions) .then(function createReplyQueue(info) { this.emit('ready', info); }.bind(this)) @@ -76,15 +86,17 @@ Queue.prototype.onMessage = function(msg) { }; -Queue.prototype.publish = function(obj, cb, replyHandler) { +Queue.prototype.publish = function(obj, headers, cb, replyHandler) { var msg = JSON.stringify(obj); var id = uuid.v4(); if (replyHandler) { this.channel.replyHandlers[id] = replyHandler; } - this.channel.sendToQueue(this.name, new Buffer(msg), { + var defaultHeaders = { persistent: !replyHandler, correlationId: id, replyTo: replyHandler ? this.channel.replyName : undefined - }, cb); + }; + headers = _.extend(defaultHeaders, headers); + this.channel.sendToQueue(this.name, new Buffer(msg), headers, cb); }; diff --git a/lib/wrabbit.js b/lib/wrabbit.js index 4e27198a..3d7e3fe8 100644 --- a/lib/wrabbit.js +++ b/lib/wrabbit.js @@ -121,8 +121,16 @@ WRabbit.prototype.purge = function(name, done) { function onFail(err) { done(err); } }; -WRabbit.prototype.publish = function(name, obj, cb, replyHandler) { - this.queues[name].publish(obj, cb, replyHandler); +WRabbit.prototype.publish = function(name, obj, headers, cb, replyHandler) { + if(typeof headers === 'function'){ + replyHandler = cb; + cb = headers; + headers = {}; + } + if (!headers) { + headers = {}; + } + this.queues[name].publish(obj, headers, cb, replyHandler); }; WRabbit.prototype.handle = function(name, handler) { diff --git a/package.json b/package.json index 6e995dcf..305d6423 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "wrabbit", - "version": "0.0.1", + "version": "0.0.2", "description": "Simple AMQP / RabbitMQ job queues for node extended from jackrabbit", "keywords": [ "amqp", @@ -33,6 +33,7 @@ "express": "^4.8.7", "istanbul": "^0.3.15", "mocha": "^1.21.4", + "sinon": "^1.16.1", "supertest": "^0.13.0" } } diff --git a/test/rpc.test.js b/test/rpc.test.js index 1a694894..8cb298d0 100644 --- a/test/rpc.test.js +++ b/test/rpc.test.js @@ -2,12 +2,15 @@ var assert = require('chai').assert; var jackrabbit = require('..'); var Queue = require('../lib/queue'); var util = require('./util'); +var sinon = require('sinon') + , sandbox = sinon.sandbox.create(); describe('jackrabbit', function() { describe('rpc', function() { before(function connect(done) { + this.onMessageSpy = sandbox.spy(Queue.prototype, 'onMessage'); this.client = jackrabbit(util.RABBIT_URL, 1); this.client.once('connected', done); }); @@ -34,16 +37,49 @@ describe('jackrabbit', function() { after(function cleanup(done) { this.client.destroy(this.name, done); + sandbox.restore(); + }); + + afterEach(function cleanupSandbox() { + this.onMessageSpy.reset(); }); it('handles an rpc response', function(done) { - this.client.publish(this.name, { a: 2, b: 3 }, undefined, function onResponse(err, response) { + this.client.publish(this.name, { a: 2, b: 3 }, function() {}, function onResponse(err, response) { assert.ok(!err); assert.equal(response, 5); done(); }); }); + it('should create exclusive queues', function(done) { + this.exclusiveName = util.NAME + '.rpc-add-exclusive'; + this.client.create(this.exclusiveName, {exclusive: true}, done); + }); + + it('should create priority queues', function(done) { + this.priorityName = util.NAME + '.rpc-add-priority'; + this.client.create(this.priorityName, {priority: 10}, done); + }); + + it('handles an rpc response with undefined headers', function(done) { + this.client.publish(this.name, { a: 2, b: 3 }, undefined, function() {}, function onResponse(err, response) { + assert.ok(!err); + assert.equal(response, 5); + done(); + }); + }); + + it('handles an rpc response with priority header', function(done) { + this.client.publish(this.name, { a: 2, b: 3 }, {priority: 5}, function() {}, function onResponse(err, response) { + assert.ok(!err); + assert.equal(response, 5); + assert.ok(this.onMessageSpy.calledOnce); + assert.equal(this.onMessageSpy.getCall(0).args[0].properties.priority, 5); + done(); + }.bind(this)); + }); + it('handles an publish ack', function(done) { this.client.publish(this.name, { a: 2, b: 3 }, function publishAck(err, ok) { assert.ok(!err);