From f96e8261aff6580d055127e772067fd933be515b Mon Sep 17 00:00:00 2001 From: Stephen Cresswell <229672+cressie176@users.noreply.github.com> Date: Mon, 27 May 2024 13:33:25 +0100 Subject: [PATCH] Fixes #237 --- CHANGELOG.md | 3 + README.md | 2 +- lib/amqp/SubscriberError.js | 6 +- lib/amqp/Subscription.js | 12 +- test/subscriptions.tests.js | 176 ++++++++++++++++++++++++++ test/subscriptionsAsPromised.tests.js | 160 +++++++++++++++++++++++ 6 files changed, 356 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index be648e1..691dd72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Change Log +## 20.1.0 +- Ignore and remove immediateNack header based on the xDeath header. See https://github.com/onebeyond/rascal/issues/237 + ## 20.0.0 - Replaced superagent with native node http client as per https://github.com/onebeyond/rascal/issues/234 diff --git a/README.md b/README.md index 3d75f61..7da0d01 100644 --- a/README.md +++ b/README.md @@ -1534,7 +1534,7 @@ As mentioned previously, dead lettering invalid messages is a good strategy with ackOrNack(err, { strategy: 'republish', immediateNack: true }); ``` -If you ever want to resend the message to the same queue you will have to remove the `properties.headers.rascal..immediateNack` header first. +Prior to Rascal v20.1.0, if you wanted to resend the message to the original queue you had to remove the `properties.headers.rascal..immediateNack` header first. From v20.1.0, Rascal will ignore and remove the immediateNack header if the message's xDeath header indicates that the message was dead lettered after it was republished with immediateNack. ##### Forward diff --git a/lib/amqp/SubscriberError.js b/lib/amqp/SubscriberError.js index 6d53b7b..99353f4 100644 --- a/lib/amqp/SubscriberError.js +++ b/lib/amqp/SubscriberError.js @@ -84,7 +84,11 @@ module.exports = function SubscriptionRecovery(broker, vhost) { _.set(publishOptions, 'headers.rascal.error.code', err.code); _.set(publishOptions, 'headers.rascal.restoreRoutingHeaders', _.has(strategyConfig, 'restoreRoutingHeaders') ? strategyConfig.restoreRoutingHeaders : true); - if (strategyConfig.immediateNack) _.set(publishOptions, ['headers', 'rascal', 'recovery', originalQueue, 'immediateNack'], true); + if (strategyConfig.immediateNack) { + const xDeathRecords = message.properties.headers['x-death'] || []; + const xDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || { count: 0 }; + _.set(publishOptions, ['headers', 'rascal', 'recovery', originalQueue], { immediateNack: true, xDeath }); + } const ackMessage = () => { session._ack(message, (err) => { diff --git a/lib/amqp/Subscription.js b/lib/amqp/Subscription.js index 03e6d73..969a9a0 100644 --- a/lib/amqp/Subscription.js +++ b/lib/amqp/Subscription.js @@ -208,7 +208,17 @@ function Subscription(broker, vhost, subscriptionConfig, counter) { } function immediateNack(message) { - if (_.get(message, ['properties', 'headers', 'rascal', 'recovery', message.properties.headers.rascal.originalQueue, 'immediateNack'])) return true; + const originalQueue = message.properties.headers.rascal.originalQueue; + const xDeathRecords = message.properties.headers['x-death'] || []; + const currentXDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || { count: 0 }; + const previousXDeath = _.get(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath'], { count: 0 }); + const hasImmediateNackHeader = _.has(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']); + if (!hasImmediateNackHeader) return false; + debug('Message %s has been marked for immediate nack', message.properties.messageId); + if (currentXDeath.count === previousXDeath.count) return true; + debug('Message %s has been replayed after being dead lettered. Removing immediate nack.', message.properties.messageId); + _.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']); + _.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath']); return false; } diff --git a/test/subscriptions.tests.js b/test/subscriptions.tests.js index 25789ce..80facd6 100644 --- a/test/subscriptions.tests.js +++ b/test/subscriptions.tests.js @@ -1588,6 +1588,182 @@ describe('Subscriptions', () => { ); }); + it('should ignore immediate nack when messages are replayed to the original queue from a dead letter queue', (test, done) => { + createBroker( + { + vhosts: { + '/': { + namespace, + exchanges: { + e1: { + assert: true, + }, + e2: { + assert: true, + }, + }, + queues: { + q1: { + assert: true, + options: { + arguments: { + 'x-dead-letter-exchange': 'e2', + }, + }, + }, + q2: { + assert: true, + }, + }, + bindings: { + b1: { + source: 'e1', + destination: 'q1', + }, + b2: { + source: 'e2', + destination: 'q2', + }, + }, + }, + }, + publications: _.pick(publications, 'p1'), + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + }, + s2: { + vhost: '/', + queue: 'q2', + }, + }, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { + assert.ifError(err); + + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + let count = 0; + subscription.on('message', (message, content, ackOrNack) => { + count++; + if (count === 1) { + assert.ok(message); + ackOrNack(new Error('immediate nack'), { + strategy: 'republish', + immediateNack: true, + }); + } else { + assert.strictEqual(count, 2); + assert.ok(message); + ackOrNack(); + done(); + } + }); + }); + + broker.subscribe('s2', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + broker.forward('p1', message, () => {}); + }); + }); + }); + }, + ); + }); + + it('should ignore immediate nack when messages are replayed to the original queue from a dead letter queue repeatedly', (test, done) => { + createBroker( + { + vhosts: { + '/': { + namespace, + exchanges: { + e1: { + assert: true, + }, + e2: { + assert: true, + }, + }, + queues: { + q1: { + assert: true, + options: { + arguments: { + 'x-dead-letter-exchange': 'e2', + }, + }, + }, + q2: { + assert: true, + }, + }, + bindings: { + b1: { + source: 'e1', + destination: 'q1', + }, + b2: { + source: 'e2', + destination: 'q2', + }, + }, + }, + }, + publications: _.pick(publications, 'p1'), + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + }, + s2: { + vhost: '/', + queue: 'q2', + }, + }, + }, + (err, broker) => { + assert.ifError(err); + broker.publish('p1', 'test message', (err) => { + assert.ifError(err); + + broker.subscribe('s1', (err, subscription) => { + assert.ifError(err); + let count = 0; + subscription.on('message', (message, content, ackOrNack) => { + count++; + if (count <= 2) { + assert.ok(message); + ackOrNack(new Error('immediate nack'), { + strategy: 'republish', + immediateNack: true, + }); + } else { + assert.strictEqual(count, 3); + assert.ok(message); + ackOrNack(); + done(); + } + }); + }); + + broker.subscribe('s2', (err, subscription) => { + assert.ifError(err); + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + broker.forward('p1', message, () => {}); + }); + }); + }); + }, + ); + }); + it('should forward messages to publication when requested', (test, done) => { createBroker( { diff --git a/test/subscriptionsAsPromised.tests.js b/test/subscriptionsAsPromised.tests.js index 7cbaf28..19b896f 100644 --- a/test/subscriptionsAsPromised.tests.js +++ b/test/subscriptionsAsPromised.tests.js @@ -1112,6 +1112,166 @@ describe( }); }); + it('should ignore immediate nack when messages are replayed to the original queue from a dead letter queue', (test, done) => { + createBroker({ + vhosts: { + '/': { + namespace, + exchanges: { + e1: { + assert: true, + }, + e2: { + assert: true, + }, + }, + queues: { + q1: { + assert: true, + options: { + arguments: { + 'x-dead-letter-exchange': 'e2', + }, + }, + }, + q2: { + assert: true, + }, + }, + bindings: { + b1: { + source: 'e1', + destination: 'q1', + }, + b2: { + source: 'e2', + destination: 'q2', + }, + }, + }, + }, + publications: _.pick(publications, 'p1'), + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + }, + s2: { + vhost: '/', + queue: 'q2', + }, + }, + }).then((broker) => { + broker.publish('p1', 'test message').then(() => { + broker.subscribe('s1').then((subscription) => { + let count = 0; + subscription.on('message', (message, content, ackOrNack) => { + count++; + if (count === 1) { + assert.ok(message); + ackOrNack(new Error('immediate nack'), { + strategy: 'republish', + immediateNack: true, + }); + } else { + assert.strictEqual(count, 2); + assert.ok(message); + ackOrNack(); + done(); + } + }); + }); + + broker.subscribe('s2').then((subscription) => { + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + broker.forward('p1', message, () => {}); + }); + }); + }); + }); + }); + + it('should ignore immediate nack when messages are replayed to the original queue from a dead letter queue repeatedly', (test, done) => { + createBroker({ + vhosts: { + '/': { + namespace, + exchanges: { + e1: { + assert: true, + }, + e2: { + assert: true, + }, + }, + queues: { + q1: { + assert: true, + options: { + arguments: { + 'x-dead-letter-exchange': 'e2', + }, + }, + }, + q2: { + assert: true, + }, + }, + bindings: { + b1: { + source: 'e1', + destination: 'q1', + }, + b2: { + source: 'e2', + destination: 'q2', + }, + }, + }, + }, + publications: _.pick(publications, 'p1'), + subscriptions: { + s1: { + vhost: '/', + queue: 'q1', + }, + s2: { + vhost: '/', + queue: 'q2', + }, + }, + }).then((broker) => { + broker.publish('p1', 'test message').then(() => { + broker.subscribe('s1').then((subscription) => { + let count = 0; + subscription.on('message', (message, content, ackOrNack) => { + count++; + if (count <= 2) { + assert.ok(message); + ackOrNack(new Error('immediate nack'), { + strategy: 'republish', + immediateNack: true, + }); + } else { + assert.strictEqual(count, 3); + assert.ok(message); + ackOrNack(); + done(); + } + }); + }); + + broker.subscribe('s2').then((subscription) => { + subscription.on('message', (message, content, ackOrNack) => { + ackOrNack(); + broker.forward('p1', message, () => {}); + }); + }); + }); + }); + }); + it('should forward messages to publication when requested', (test, done) => { createBroker({ vhosts,