Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENTMQBR-2114] Duplicate messages when JMS bridge is stopped and restarted #277

Open
wants to merge 1 commit into
base: 2.6.3.jbossorg-x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,13 @@ public void stop() throws Exception {
}
}

try {
sourceConn.close();
} catch (Exception ignore) {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source conn", ignore);
if (sourceConn != null) {
try {
sourceConn.close();
} catch (Exception ignore) {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source conn", ignore);
}
}
}

Expand All @@ -521,6 +523,14 @@ public void stop() throws Exception {
}
}

if (messages.size() > 0) {
// Clear outstanding messages so they don't get retransmitted and duplicated on the other side of the bridge
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages before stopping...");
}
messages.clear();
}

if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this);
}
Expand Down Expand Up @@ -1191,11 +1201,13 @@ private boolean setupJMSObjects() {

private void cleanup() {
// Stop the source connection
try {
sourceConn.stop();
} catch (Throwable ignore) {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to stop source connection", ignore);
if (sourceConn != null) {
try {
sourceConn.stop();
} catch (Throwable ignore) {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to stop source connection", ignore);
}
}
}

Expand All @@ -1219,11 +1231,13 @@ private void cleanup() {
}

// Close the old objects
try {
sourceConn.close();
} catch (Throwable ignore) {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source connection", ignore);
if (sourceConn != null) {
try {
sourceConn.close();
} catch (Throwable ignore) {
if (JMSBridgeImpl.trace) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source connection", ignore);
}
}
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,11 @@ private LargeMessageDeliverer(final LargeServerMessage message, final MessageRef
this.ref = ref;
}

@Override
public String toString() {
return "ServerConsumerImpl$LargeMessageDeliverer[ref=[" + ref + "]]";
}

public boolean deliver() throws Exception {
lockDelivery.readLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class LargeMessageOnShutdownTest extends ActiveMQTestBase {
private static final SimpleString queueName = new SimpleString("largeMessageShutdownQueue");
private static ActiveMQServer server;

@Override
@Before
public void setUp() throws Exception {
super.setUp();
Expand All @@ -54,6 +55,7 @@ public void setUp() throws Exception {
server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false);
}

@Override
@After
public void tearDown() throws Exception {
super.tearDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
Expand All @@ -35,9 +36,12 @@
import java.util.Map;

import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl;
Expand Down Expand Up @@ -1766,10 +1770,187 @@ public void testMBeanServer() throws Exception {
Assert.assertFalse(mbeanServer.isRegistered(objectName));
}

@Test
public void testDuplicateMessagesWhenBridgeStops() throws Exception {
final int NUM_MESSAGES = 500;

JMSBridgeImpl bridge = new JMSBridgeImpl(cff0xa, cff1xa, sourceQueueFactory, targetQueueFactory, null, null,
null, null, null, 1000, 10,
QualityOfServiceMode.ONCE_AND_ONLY_ONCE, 10, 100, null, "ClientId123",
true)
.setBridgeName("test-bridge");
bridge.setTransactionManager(getNewTm());

final List<TextMessage> sentMessages = new ArrayList<>();
final List<TextMessage> receivedMessages = new ArrayList<>();

log.info("Starting bridge " + bridge);
bridge.start();
waitForComponent(bridge, 15);

Thread producerThread = new Thread(() -> {
Connection conn = null;
Session session = null;
int counter = 0;
try {
conn = cf0.createConnection();
session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(sourceQueue);
TextMessage msg = null;

while (counter < NUM_MESSAGES) {
msg = session.createTextMessage("message" + counter);
msg.setIntProperty("count", counter);
producer.send(msg);
sentMessages.add(msg);
log.info("Sent message with property counter: " + counter + ", messageId:" + msg.getJMSMessageID()
+ ((msg.getStringProperty("_AMQ_DUPL_ID") != null) ? ", _AMQ_DUPL_ID=" + msg.getStringProperty("_AMQ_DUPL_ID") : ""));
counter++;
Thread.sleep(200);
}

producer.close();
} catch (InterruptedException | JMSException e) {
log.error("Error while producing messages: ", e);
} finally {
try {
if (session != null) {
session.close();
}

if (conn != null) {
conn.close();
}
} catch (JMSException e) {
log.error("Error cleaning up the producer thread! ", e);
}
}
});

Thread consumerThread = new Thread(() -> {
Connection conn = null;
Session session = null;
try {
conn = cf1.createConnection();
conn.start();

session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);

MessageConsumer consumer = session.createConsumer(targetQueue);
TextMessage msg = null;

boolean running = true;
while (running) {
msg = (TextMessage) consumer.receive(5000);
if (msg != null) {
msg.acknowledge();
receivedMessages.add(msg);
log.info("Received message with messageId: " + msg.getJMSMessageID() +
" and property counter " + msg.getIntProperty("count"));
} else {
running = false;
}
}

} catch (JMSException e) {
log.error("Error while consuming messages: ", e);
} finally {
try {
if (session != null) {
session.close();
}

if (conn != null) {
conn.close();
}
} catch (JMSException e) {
log.error("Error cleaning up the consumer thread! ", e);
}
}
});

log.info("Starting producer thread...");
producerThread.start();

Assert.assertTrue(waitForMessages(server1, targetQueue.getQueueName(), NUM_MESSAGES / 100, 250000));

log.info("Stopping bridge " + bridge);
bridge.stop();
Thread.sleep(5000);

log.info("Starting bridge " + bridge + " again");
bridge.start();
waitForComponent(bridge, 15);

Assert.assertTrue(waitForMessages(server1, targetQueue.getQueueName(), NUM_MESSAGES, 300000));



log.info("Starting consumer thread...");
consumerThread.start();

log.info("Waiting for the consumer thread to die...");
consumerThread.join();

log.info("Waiting for the producer thread to die...");
producerThread.join();

bridge.stop();

server1.stop();
server0.stop();

Assert.assertEquals("Number of sent messages is different from received messages", sentMessages.size(), receivedMessages.size());
}

public TransactionManager getNewTm() {
return newTransactionManager();
}

private static long countMessagesInQueue(ActiveMQServer server, String queueName) {
QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + queueName);
Assert.assertNotNull(queueControl);
long count = -1;
int numberOfTries = 0;
int maxNumberOfTries = 10;
while (count == -1 && numberOfTries < maxNumberOfTries) {
try {
numberOfTries++;
count = queueControl.countMessages();
break;
} catch (Exception ex) {
if (numberOfTries > maxNumberOfTries - 1) {
throw new RuntimeException("countMessagesInQueue() failed for queue:" + queueName
+ " and server: " + server + ". Number of tries: " + numberOfTries, ex);
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
log.info("Number of messages in queue " + queueName + " on server: " + server + " is: " + count);
return count;
}

private static boolean waitForMessages(ActiveMQServer server, String queueName, long numberOfMessages, long timeout) throws Exception {

long startTime = System.currentTimeMillis();

long count = 0;
while ((count = countMessagesInQueue(server, queueName)) < numberOfMessages) {
log.info("Total number of messages in queue: " + queueName + " on server " + server + " is " + count);
Thread.sleep(5000);
if (System.currentTimeMillis() - startTime > timeout) {
log.warn(numberOfMessages + " not on server " + server + " in timeout " + timeout + "ms.");
return false;
}
}
return true;

}

// Inner classes -------------------------------------------------------------------

private static class StressSender implements Runnable {
Expand Down