diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index b93a1280bd5..17f42748cdc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -758,6 +758,12 @@ public int sendMessage(MessageReference ref, ServerConsumer consumer, int delive } + @Override + public boolean filterRef(MessageReference ref, ServerConsumer consumer) { + ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext(); + return plugSender.filterRef(ref); + } + @Override public int sendLargeMessage(MessageReference ref, ServerConsumer consumer, diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index 2c07e1f5534..d97af8b8386 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -306,10 +307,10 @@ public void validateMatching(Queue queue, AMQPBrokerConnectionElement connection public void createLink(Queue queue, AMQPBrokerConnectionElement connectionElement) { if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) { Symbol[] dispatchCapability = new Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY}; - connectSender(queue, queue.getAddress().toString(), null, null, null, null, dispatchCapability, null); + connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, dispatchCapability, null); connectReceiver(protonRemotingConnection, session, sessionContext, queue, dispatchCapability); } else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) { - connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null); + connectSender(queue, queue.getAddress().toString(), null, null, null, null, null, null, null); } else if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) { connectReceiver(protonRemotingConnection, session, sessionContext, queue); } @@ -450,21 +451,25 @@ private void doConnect() { final Queue queue = server.locateQueue(getMirrorSNF(replica)); final boolean coreTunnelingEnabled = isCoreMessageTunnelingEnabled(replica); - final Symbol[] desiredCapabilities; + ArrayList desiredCapabilitiesList = new ArrayList<>(); + desiredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY); if (coreTunnelingEnabled) { - desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, - AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT}; - } else { - desiredCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; + desiredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT); + } + if (replica.isNoForward()) { + desiredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD); } - final Symbol[] requiredOfferedCapabilities = new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; + final Symbol[] desiredCapabilities = (Symbol[]) desiredCapabilitiesList.toArray(new Symbol[]{}); + + final Symbol[] requiredOfferedCapabilities = replica.isNoForward() ? new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, AMQPMirrorControllerSource.NO_FORWARD} : new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY}; connectSender(queue, queue.getName().toString(), mirrorControllerSource::setLink, (r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)), + (r) -> mirrorControllerSource.shouldFilterRef(r), server.getNodeID().toString(), desiredCapabilities, null, @@ -771,6 +776,7 @@ private void connectSender(Queue queue, String targetName, java.util.function.Consumer senderConsumer, java.util.function.Consumer beforeDeliver, + java.util.function.Predicate shouldFilterRef, String brokerID, Symbol[] desiredCapabilities, Symbol[] targetCapabilities, @@ -831,7 +837,7 @@ private void connectSender(Queue queue, // Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> { - ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver); + ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setShouldFilterRef(shouldFilterRef); try { if (!cancelled.get()) { if (futureTimeout != null) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 0d31363c6f1..dae23699a7f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -70,6 +70,8 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu"); public static final Symbol BROKER_ID = Symbol.getSymbol("x-opt-amq-bkr-id"); public static final SimpleString BROKER_ID_SIMPLE_STRING = SimpleString.of(BROKER_ID.toString()); + public static final SimpleString NO_FORWARD_SOURCE = SimpleString.of("x-opt-amq-mr-no-fwd-src"); + public static final SimpleString RECEIVER_ID_FILTER = SimpleString.of("x-opt-amq-mr-rcv-id-filter"); // Events: public static final Symbol ADD_ADDRESS = Symbol.getSymbol("addAddress"); @@ -89,9 +91,11 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im // Capabilities public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror"); public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint"); + public static final Symbol NO_FORWARD = Symbol.getSymbol("amq.no.forward"); public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.of(INTERNAL_ID.toString()); public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.of(BROKER_ID.toString()); + public static final SimpleString INTERNAL_NO_FORWARD = SimpleString.of(NO_FORWARD.toString()); private static final ThreadLocal mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null)); @@ -230,12 +234,17 @@ public void addAddress(AddressInfo addressInfo) throws Exception { public void deleteAddress(AddressInfo addressInfo) throws Exception { logger.trace("{} deleteAddress {}", server, addressInfo); + if (isBlockedByNoForward()) { + return; + } + if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) { return; } if (ignoreAddress(addressInfo.getName())) { return; } + if (deleteQueues) { Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON()); routeMirrorCommand(server, message); @@ -246,6 +255,10 @@ public void deleteAddress(AddressInfo addressInfo) throws Exception { public void createQueue(QueueConfiguration queueConfiguration) throws Exception { logger.trace("{} createQueue {}", server, queueConfiguration); + if (isBlockedByNoForward()) { + return; + } + if (invalidTarget(getControllerInUse()) || queueConfiguration.isInternal()) { if (logger.isTraceEnabled()) { logger.trace("Rejecting ping pong on create {} as isInternal={} and mirror target = {}", queueConfiguration, queueConfiguration.isInternal(), getControllerInUse()); @@ -264,6 +277,7 @@ public void createQueue(QueueConfiguration queueConfiguration) throws Exception } return; } + if (addQueues) { Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON()); routeMirrorCommand(server, message); @@ -276,6 +290,10 @@ public void deleteQueue(SimpleString address, SimpleString queue) throws Excepti logger.trace("{} deleteQueue {}/{}", server, address, queue); } + if (isBlockedByNoForward()) { + return; + } + if (invalidTarget(getControllerInUse())) { return; } @@ -310,6 +328,14 @@ private boolean invalidTarget(MirrorController controller) { return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId()); } + private boolean isBlockedByNoForward() { + return getControllerInUse() != null && getControllerInUse().isNoForward(); + } + + private boolean isBlockedByNoForward(Message message) { + return isBlockedByNoForward() || Boolean.TRUE.equals(message.getBrokerProperty(INTERNAL_NO_FORWARD)); + } + private boolean ignoreAddress(SimpleString address) { if (address.startsWith(server.getConfiguration().getManagementAddress())) { return true; @@ -338,6 +364,11 @@ Message copyMessageForPaging(Message message) { public void sendMessage(Transaction tx, Message message, RoutingContext context) { SimpleString address = context.getAddress(message); + if (isBlockedByNoForward(message)) { + logger.trace("sendMessage::server {} is discarding the send because its source is setting a noForward policy", server); + return; + } + if (context.isInternal()) { logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server); return; @@ -353,6 +384,8 @@ public void sendMessage(Transaction tx, Message message, RoutingContext context) return; } + logger.trace("sendMessage::{} send message {}", server, message); + try { context.setReusable(false); @@ -467,6 +500,29 @@ public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier, } } + /** + * Checks if the message ref should be filtered or not. + * @param ref the message to check + * @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value + * than the remoteMirrorID, false otherwise. + */ + public boolean shouldFilterRef(MessageReference ref) { + Object filterID = ref.getMessage().getAnnotation(RECEIVER_ID_FILTER); + if (filterID != null) { + String remoteMirrorId = getRemoteMirrorId(); + if (remoteMirrorId != null) { + if (remoteMirrorId.equals(filterID)) { + return false; + } else { + logger.trace("filtering message {} as remote mirror ID {} diverges from the wanted receiver {}", ref, remoteMirrorId, filterID); + return true; + } + } + return false; + } + return false; + } + /** This method will return the brokerID used by the message */ private static String setProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref) { String brokerID = referenceIDSupplier.getServerID(ref); @@ -543,6 +599,21 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, reason); } + String noForwardSource = null; + String remoteMirrorId = getRemoteMirrorId(); + // The remote mirror ID might not be known at this point because the remote mirror hasn't been connected yet or connection was lost. + // However, if the remote mirror ID is known there's a possibility to early check if the acknowledgment is supposed to reach the destination + // based on the noForward policy of the message about to be acked. + if (Boolean.TRUE.equals(ref.getMessage().getBooleanProperty(INTERNAL_NO_FORWARD))) { + noForwardSource = String.valueOf(ref.getMessage().getBrokerProperty(NO_FORWARD_SOURCE)); + if (remoteMirrorId != null && !remoteMirrorId.equals(noForwardSource)) { + if (logger.isInfoEnabled()) { + logger.trace("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId); + } + return; + } + } + MirrorController controllerInUse = getControllerInUse(); // Retried ACKs are not forwarded. @@ -578,6 +649,13 @@ public void preAcknowledge(final Transaction tx, final MessageReference ref, fin String nodeID = idSupplier.getServerID(ref); // notice the brokerID will be null for any message generated on this broker. long internalID = idSupplier.getID(ref); Message messageCommand = createMessage(ref.getQueue().getAddress(), ref.getQueue().getName(), POST_ACK, nodeID, internalID, reason); + + // When the remote mirror ID couldn't be known in advance, the ack is annotated with the ID it is supposed to reach. This value + // will be used to filter out acks that do violate the configured noForward policy. + if (remoteMirrorId == null && noForwardSource != null) { + messageCommand.setBrokerProperty(RECEIVER_ID_FILTER, noForwardSource); + } + if (sync) { OperationContext operationContext; operationContext = OperationContextImpl.getContext(server.getExecutorFactory()); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index 683e226cc0a..2a05ba27d07 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -39,7 +39,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; -import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; @@ -53,6 +53,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMessageReader; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageReader; +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver; import org.apache.activemq.artemis.utils.ByteUtil; @@ -77,8 +78,11 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.DELETE_QUEUE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.EVENT_TYPE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_NO_FORWARD; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.NO_FORWARD_SOURCE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; @@ -86,20 +90,27 @@ import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT; import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT; -public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController { +public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements TargetMirrorController { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final ThreadLocal CONTROLLER_THREAD_LOCAL = new ThreadLocal<>(); + private static final ThreadLocal CONTROLLER_THREAD_LOCAL = new ThreadLocal<>(); - public static void setControllerInUse(MirrorController controller) { + public static void setControllerInUse(TargetMirrorController controller) { CONTROLLER_THREAD_LOCAL.set(controller); } - public static MirrorController getControllerInUse() { + public static TargetMirrorController getControllerInUse() { return CONTROLLER_THREAD_LOCAL.get(); } + private boolean noForwarding = false; + + @Override + public boolean isNoForward() { + return noForwarding; + } + /** * Objects of this class can be used by either transaction or by OperationContext. * It is important that when you're using the transactions you clear any references to @@ -248,6 +259,7 @@ public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI, this.configuration = server.getConfiguration(); this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier(); mirrorContext = protonSession.getSessionSPI().getSessionContext(); + this.noForwarding = AmqpSupport.verifyDesiredCapability(receiver, NO_FORWARD); } @Override @@ -534,6 +546,10 @@ private boolean sendMessage(Message message, DeliveryAnnotations deliveryAnnotat message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID); message.setBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY, internalMirrorID); + if (noForwarding) { + message.setBrokerProperty(INTERNAL_NO_FORWARD, true); + message.setBrokerProperty(NO_FORWARD_SOURCE, getRemoteMirrorId()); + } if (internalAddress != null) { message.setAddress(internalAddress); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index 0ef1a9b6497..b5138420e4d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -54,7 +54,7 @@ import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; @@ -237,7 +237,7 @@ private boolean isEmpty(LongObjectHashMap> acksToRetry) { - MirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse(); + TargetMirrorController previousController = AMQPMirrorControllerTarget.getControllerInUse(); logger.trace("retrying address {} on server {}", address, server); try { AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController); @@ -518,7 +518,7 @@ private void deliveryAsync(JournalHashMap map) { - private static class DisabledAckMirrorController implements MirrorController { + private static class DisabledAckMirrorController implements TargetMirrorController { @Override public boolean isRetryACK() { @@ -564,5 +564,10 @@ public void preAcknowledge(Transaction tx, MessageReference ref, AckReason reaso public String getRemoteMirrorId() { return null; } + + @Override + public boolean isNoForward() { + return false; + } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java index 114cf9ad6f8..d825beb648c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java @@ -20,7 +20,7 @@ import org.apache.activemq.artemis.core.io.OperationConsistencyLevel; import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.TargetMirrorController; import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.slf4j.Logger; @@ -34,7 +34,7 @@ public class MirrorTransaction extends TransactionImpl { boolean allowPageTransaction; - MirrorController controlInUse; + TargetMirrorController controlInUse; public MirrorTransaction(StorageManager storageManager) { super(storageManager); @@ -44,7 +44,7 @@ public MirrorTransaction(StorageManager storageManager) { @Override protected synchronized void afterCommit(List operationsToComplete) { - MirrorController beforeController = AMQPMirrorControllerTarget.getControllerInUse(); + TargetMirrorController beforeController = AMQPMirrorControllerTarget.getControllerInUse(); AMQPMirrorControllerTarget.setControllerInUse(controlInUse); try { super.afterCommit(operationsToComplete); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 17b54cf15b2..a7a93cd2f94 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -458,14 +459,19 @@ private void handleReplicaTargetLinkOpened(AMQPSessionContext protonSession, Rec return; } + ArrayList offeredCapabilitiesList = new ArrayList<>(); + offeredCapabilitiesList.add(AMQPMirrorControllerSource.MIRROR_CAPABILITY); // We need to check if the remote desires to send us tunneled core messages or not, and if // we support that we need to offer that back so it knows it can actually do core tunneling. if (verifyDesiredCapability(receiver, AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT)) { - receiver.setOfferedCapabilities(new Symbol[] {AMQPMirrorControllerSource.MIRROR_CAPABILITY, - AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT}); - } else { - receiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY}); + offeredCapabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT); + } + + // If the remote wants us to not forward any messages to other mirrors we need to offer that capability + if (verifyDesiredCapability(receiver, AMQPMirrorControllerSource.NO_FORWARD)) { + offeredCapabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD); } + receiver.setOfferedCapabilities((Symbol[]) offeredCapabilitiesList.toArray(new Symbol[]{})); protonSession.addReplicaTarget(receiver); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index ad0d42c20e2..767c67cf6eb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -89,6 +89,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private int credits = 0; private AtomicInteger pending = new AtomicInteger(0); private java.util.function.Consumer beforeDelivery; + private java.util.function.Predicate shouldFilterRef; protected volatile Runnable afterDelivery; protected volatile MessageWriter messageWriter = SenderController.REJECTING_MESSAGE_WRITER; @@ -120,6 +121,11 @@ public ProtonServerSenderContext setBeforeDelivery(java.util.function.Consumer shouldFilterRef) { + this.shouldFilterRef = shouldFilterRef; + return this; + } + public ServerConsumer getBrokerConsumer() { return brokerConsumer; } @@ -441,6 +447,13 @@ private boolean handleExtendedDeliveryOutcomes(Message message, Delivery deliver return handled; } + public boolean filterRef(MessageReference ref) { + if (shouldFilterRef != null) { + return shouldFilterRef.test(ref); + } + return false; + } + private final class ConnectionFlushIOCallback implements IOCallback { @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java index 944099c2a00..2aa259721ba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java @@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme boolean queueCreation = true; + boolean noForward = false; + boolean queueRemoval = true; boolean messageAcknowledgements = true; @@ -75,6 +77,15 @@ public AMQPMirrorBrokerConnectionElement setQueueCreation(boolean queueCreation) return this; } + public boolean isNoForward() { + return noForward; + } + + public AMQPMirrorBrokerConnectionElement setNoForward(boolean noForward) { + this.noForward = noForward; + return this; + } + public boolean isQueueRemoval() { return queueRemoval; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 514f77a1d5a..11aae017be9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -2198,10 +2198,11 @@ private void parseAMQPBrokerConnections(final Element e, boolean durable = getBooleanAttribute(e2, "durable", true); boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true); boolean sync = getBooleanAttribute(e2, "sync", false); + boolean noForward = !getBooleanAttribute(e2, "no-forward", false); String addressFilter = getAttributeValue(e2, "address-filter"); AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement(); - amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync); + amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter).setSync(sync).setNoForward(noForward); connectionElement = amqpMirrorConnectionElement; connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index bab579a5c0b..a470bc635fb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -428,6 +428,12 @@ public HandleStatus handle(final MessageReference ref) throws Exception { return HandleStatus.NO_MATCH; } + if (callback != null && callback.filterRef(ref, ServerConsumerImpl.this)) { + if (logger.isDebugEnabled()) { + logger.trace("Reference {} is not allowed to be consumed by {} due to message filtering.", ref, this); + } + return HandleStatus.NO_MATCH; + } synchronized (lock) { // If the consumer is stopped then we don't accept the message, it diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/TargetMirrorController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/TargetMirrorController.java new file mode 100644 index 00000000000..bbddf19571c --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/TargetMirrorController.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.mirror; + +public interface TargetMirrorController extends MirrorController { + boolean isNoForward(); +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index e20f01277f2..7b45d03140a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -98,4 +98,8 @@ default void close(boolean failed) { default Transaction getCurrentTransaction() { return null; } + + default boolean filterRef(MessageReference ref, ServerConsumer consumer) { + return false; + } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index db1e93eba04..7cfb45f7ef7 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2306,6 +2306,14 @@ + + + + If this is true, the mirror at the opposite end of the link will not forward messages or instructions coming from this broker to any other mirrors down the line. + This is false by default. + + + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java index 9796fe00417..b594f92c711 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java @@ -27,6 +27,7 @@ import java.lang.invoke.MethodHandles; import java.net.URI; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -349,20 +350,41 @@ public void testProducerMessageIsMirroredWithoutCoreTunnelingUsesDefaultMessageF doTestProducerMessageIsMirroredWithCorrectMessageFormat(false); } + @Test + @Timeout(20) + public void testProducerMessageIsMirroredWithNoForwardAndTunneling() throws Exception { + doTestProducerMessageIsMirroredWithCorrectMessageFormat(true, true); + } + + @Test + @Timeout(20) + public void testProducerMessageIsMirroredWithNoForwardAndWithoutTunneling() throws Exception { + doTestProducerMessageIsMirroredWithCorrectMessageFormat(false, true); + } + private void doTestProducerMessageIsMirroredWithCorrectMessageFormat(boolean tunneling) throws Exception { + doTestProducerMessageIsMirroredWithCorrectMessageFormat(tunneling, false); + } + + private void doTestProducerMessageIsMirroredWithCorrectMessageFormat(boolean tunneling, boolean noForward) throws Exception { final Map brokerProperties = new HashMap<>(); brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); final String[] capabilities; + ArrayList capabilitiesList = new ArrayList<>(); final int messageFormat; + capabilitiesList.add("amq.mirror"); if (tunneling) { - capabilities = new String[] {"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}; + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; } else { - capabilities = new String[] {"amq.mirror"}; messageFormat = 0; // AMQP default } + if (noForward) { + capabilitiesList.add(AMQPMirrorControllerSource.NO_FORWARD.toString()); + } + capabilities = capabilitiesList.toArray(new String[]{}); try (ProtonTestServer peer = new ProtonTestServer()) { peer.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); @@ -387,6 +409,7 @@ private void doTestProducerMessageIsMirroredWithCorrectMessageFormat(boolean tun AMQPMirrorBrokerConnectionElement mirrorElement = new AMQPMirrorBrokerConnectionElement(); mirrorElement.addProperty(TUNNEL_CORE_MESSAGES, Boolean.toString(tunneling)); mirrorElement.setQueueCreation(true); + mirrorElement.setNoForward(noForward); AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java new file mode 100644 index 00000000000..9879fb9f74d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPNoForwardMirroringTest.java @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource; +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.qpid.protonj2.test.driver.ProtonTestServer; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.CONNECTION_FORCED; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TUNNEL_CORE_MESSAGES; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AMQPNoForwardMirroringTest extends AmqpClientTestSupport { + + protected static final int AMQP_PORT_2 = 5673; + protected static final int AMQP_PORT_3 = 5674; + protected static final int AMQP_PORT_4 = 5675; + + ActiveMQServer server_2; + ActiveMQServer server_3; + ActiveMQServer server_4; + + @Override + protected ActiveMQServer createServer() throws Exception { + return createServer(AMQP_PORT, false); + } + + @Override + protected String getConfiguredProtocols() { + return "AMQP,CORE,OPENWIRE"; + } + + @Test + public void testNoForward() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + + /** + * Topology of the test: + * + * 1 -(noForward)-> 2 -> 3 + * ^ | + * |________________| + */ + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100).setReconnectAttempts(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100).setReconnectAttempts(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100).setReconnectAttempts(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + server.start(); + server_2.start(); + server_3.start(); + + createAddressAndQueues(server); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + // queue creation doesn't reach 3 b/c of the noForward link between 1 and 2. + Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) == null); + + Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + ConnectionFactory factory = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT); + ConnectionFactory factory2 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_2); + ConnectionFactory factory3 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_3); + + // send from 1, 2 receives, 3 don't. + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + + // consume from 2, 1 and 2 counters go back to 0 + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + } + + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + + Thread.sleep(100); // some time to allow eventual loops + + // queue creation was originated on server, with noForward in place, + // the messages never reached server_3, for the rest of the test suite, + // we need server_3 to have access to the queue + createAddressAndQueues(server_3); + Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null); + Queue q3 = server_3.locateQueue(getQueueName()); + assertNotNull(q3); + + // produce on 2. 1, 2 and 3 receive messages. + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + + // consume on 1. 1, 2, and 3 counters are back to 0 + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + + // produce on 2. 1, 2 and 3 receive messages. + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + + // consume on 3. 1, 2 counters are still at 10 and 3 is at 0. + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + + // consume on 2. 1, 2 and 3 counters are back to 0 + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + + // produce on 3. only 3 has messages. + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + + // consume on 3. 1, 2, and 3 counters are back to 0 + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + } + + @Test + public void testSquare() throws Exception { + server_2 = createServer(AMQP_PORT_2, false); + server_3 = createServer(AMQP_PORT_3, false); + server_4 = createServer(AMQP_PORT_4, false); + + // name the servers, for convenience during debugging + server.getConfiguration().setName("1"); + server_2.getConfiguration().setName("2"); + server_3.getConfiguration().setName("3"); + server_4.getConfiguration().setName("4"); + + /** + * + * Setup the mirroring topology to be a square: + * + * 1 <- - -> 2 + * ^ ^ The link between 1 and 2 and the + * | | link between 3 and 4 are noForward + * v v links in both directions. + * 4 <- - -> 3 + */ + + { + AMQPBrokerConnectConfiguration amqpConnection1to2 = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100).setReconnectAttempts(100); + amqpConnection1to2.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection1to2); + + AMQPBrokerConnectConfiguration amqpConnection1to4 = new AMQPBrokerConnectConfiguration(getTestMethodName() + "1to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100).setReconnectAttempts(100); + amqpConnection1to4.addElement(new AMQPMirrorBrokerConnectionElement()); + server.getConfiguration().addAMQPConnection(amqpConnection1to4); + } + + { + AMQPBrokerConnectConfiguration amqpConnection2to1 = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100).setReconnectAttempts(100); + amqpConnection2to1.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server_2.getConfiguration().addAMQPConnection(amqpConnection2to1); + + AMQPBrokerConnectConfiguration amqpConnection2to3 = new AMQPBrokerConnectConfiguration(getTestMethodName() + "2to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100).setReconnectAttempts(100); + amqpConnection2to3.addElement(new AMQPMirrorBrokerConnectionElement()); + server_2.getConfiguration().addAMQPConnection(amqpConnection2to3); + } + + { + AMQPBrokerConnectConfiguration amqpConnection3to2 = new AMQPBrokerConnectConfiguration(getTestMethodName() + "3to2", "tcp://localhost:" + AMQP_PORT_2).setRetryInterval(100).setReconnectAttempts(100); + amqpConnection3to2.addElement(new AMQPMirrorBrokerConnectionElement()); + server_3.getConfiguration().addAMQPConnection(amqpConnection3to2); + + AMQPBrokerConnectConfiguration amqpConnection3to4 = new AMQPBrokerConnectConfiguration(getTestMethodName() + "3to4", "tcp://localhost:" + AMQP_PORT_4).setRetryInterval(100).setReconnectAttempts(100); + amqpConnection3to4.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server_3.getConfiguration().addAMQPConnection(amqpConnection3to4); + } + + { + AMQPBrokerConnectConfiguration amqpConnection4to1 = new AMQPBrokerConnectConfiguration(getTestMethodName() + "4to1", "tcp://localhost:" + AMQP_PORT).setRetryInterval(100).setReconnectAttempts(100); + amqpConnection4to1.addElement(new AMQPMirrorBrokerConnectionElement()); + server_4.getConfiguration().addAMQPConnection(amqpConnection4to1); + + AMQPBrokerConnectConfiguration amqpConnection4to3 = new AMQPBrokerConnectConfiguration(getTestMethodName() + "4to3", "tcp://localhost:" + AMQP_PORT_3).setRetryInterval(100).setReconnectAttempts(100); + amqpConnection4to3.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server_4.getConfiguration().addAMQPConnection(amqpConnection4to3); + } + + server.start(); + server_2.start(); + server_3.start(); + server_4.start(); + + createAddressAndQueues(server); + Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_3.locateQueue(getQueueName()) != null); + Wait.assertTrue(() -> server_4.locateQueue(getQueueName()) != null); + + Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + Queue q3 = server_3.locateQueue(getQueueName()); + assertNotNull(q3); + + Queue q4 = server_4.locateQueue(getQueueName()); + assertNotNull(q4); + + ConnectionFactory factory = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT); + ConnectionFactory factory2 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_2); + ConnectionFactory factory3 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_3); + ConnectionFactory factory4 = CFUtil.createConnectionFactory(randomProtocol(), "tcp://localhost:" + AMQP_PORT_4); + + try (Connection conn = factory4.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 10; i < 20; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 20; i < 30; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + for (int i = 30; i < 40; i++) { + producer.send(session.createTextMessage("message " + i)); + } + } + + Thread.sleep(100); // some time to allow eventual loops + + Wait.assertEquals(40L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(40L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(40L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(40L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(30L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(30L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(30L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(30L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 10; i < 20; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(20L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(20L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(20L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(20L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 20; i < 30; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(10L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(10L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory4.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + for (int i = 30; i < 40; i++) { + TextMessage message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals("message " + i, message.getText()); + } + consumer.close(); + } + + Wait.assertEquals(0L, q1::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q2::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q3::getMessageCount, 1000, 100); + Wait.assertEquals(0L, q4::getMessageCount, 1000, 100); + + try (Connection conn = factory.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory2.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory3.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + try (Connection conn = factory4.createConnection()) { + Session session = conn.createSession(); + conn.start(); + MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + assertNull(consumer.receiveNoWait()); + consumer.close(); + } + + } + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Test + @Timeout(20) + public void testBrokerHandlesSenderLinkOmitsNoForwardCapability() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS"); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities(String.valueOf(AMQPMirrorControllerSource.MIRROR_CAPABILITY), AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString(), AMQPMirrorControllerSource.NO_FORWARD.toString()) + .respond() + .withOfferedCapabilities(String.valueOf(AMQPMirrorControllerSource.MIRROR_CAPABILITY), AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + peer.expectClose().withError(CONNECTION_FORCED.toString()).optional(); // Can hit the wire in rare instances. + peer.expectConnectionToDrop(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + // No user or pass given, it will have to select ANONYMOUS even though PLAIN also offered + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> loggerHandler.findText("AMQ111001")); + assertEquals(1, loggerHandler.countText("AMQ119018")); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + server.stop(); + } + } + } + + private int minLargeMessageSize = 100 * 1024; + @Override + protected void configureAMQPAcceptorParameters(Map params) { + params.put("amqpMinLargeMessageSize", minLargeMessageSize); + } + + private String buildCoreLargeMessage() { + String message = "A message!"; + StringBuilder builder = new StringBuilder(); + builder.append(message); + for (int i = 0; i < ((minLargeMessageSize * 2) / message.length()) + 1; i++) { + builder.append(message); + } + return builder.toString(); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, false); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksCoreLargeMessagesAndControlsPropagation() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(false, true); + } + + @Test + @Timeout(20) + public void testNoForwardBlocksCoreLargeMessagesAndControlsPropagationWithTunneling() throws Exception { + doTestNoForwardBlocksMessagesAndControlsPropagation(true, true); + } + + private void doTestNoForwardBlocksMessagesAndControlsPropagation(boolean tunneling, boolean largeMessage) throws Exception { + final Map brokerProperties = new HashMap<>(); + brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + + final String[] capabilities; + ArrayList capabilitiesList = new ArrayList<>(); + int messageFormat = 0; + + capabilitiesList.add(String.valueOf(AMQPMirrorControllerSource.MIRROR_CAPABILITY)); + if (tunneling) { + capabilitiesList.add(AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()); + messageFormat = AMQP_TUNNELED_CORE_MESSAGE_FORMAT; + } + capabilities = capabilitiesList.toArray(new String[]{}); + + // Topology of the test: server -(noForward)-> server_2 -> peer_3 + // ^ | + // |______________________| + + // the objective of the test is to make sure that the proton peer doesn't receive any message coming from "server" + // since the link be "server" and "server_2" is set to have the noForward property. + // the only way there is to assess that peer_3 didn't receive any message is to have server_2 sending it a message at the end + // if the message gets successfully decoded, in turn it means that nothing got received in the meantime. + try (ProtonTestServer peer_3 = new ProtonTestServer()) { + peer_3.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS"); + peer_3.expectOpen().respond(); + peer_3.expectBegin().respond(); + peer_3.expectAttach().ofSender() + .withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")) + .withDesiredCapabilities(capabilities) + .respond() + .withOfferedCapabilities(capabilities) + .withPropertiesMap(brokerProperties); + peer_3.remoteFlow().withLinkCredit(10).queue(); + peer_3.start(); + + final URI remoteURI = peer_3.getServerURI(); + logger.info("Connect test started, peer listening on: {}", remoteURI); + + final String secondaryQueueName = "secondaryQueue"; + final int AMQP_PORT_2 = AMQP_PORT + 1; + final ActiveMQServer server_2 = createServer(AMQP_PORT_2, false); + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toPeer3", "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser("user"); + amqpConnection.setPassword("pass"); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().addProperty(TUNNEL_CORE_MESSAGES, Boolean.toString(tunneling)).setQueueCreation(true).setAddressFilter(getQueueName() + "," + secondaryQueueName)); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer", "tcp://localhost:" + AMQP_PORT); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName())); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + } + + { + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestMethodName() + "toServer2", "tcp://localhost:" + AMQP_PORT_2); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setAddressFilter(getQueueName()).setNoForward(true)); + server.getConfiguration().addAMQPConnection(amqpConnection); + } + + // connect the topology + server_2.start(); + server.start(); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Create queues & send messages on server, nothing will reach peer_3 + createAddressAndQueues(server); + org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null); + org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); + + final org.apache.activemq.artemis.core.server.Queue q1 = server.locateQueue(getQueueName()); + assertNotNull(q1); + + final org.apache.activemq.artemis.core.server.Queue q2 = server_2.locateQueue(getQueueName()); + assertNotNull(q2); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT + "?minLargeMessageSize=" + minLargeMessageSize); + final ConnectionFactory factory_2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2 + "?minLargeMessageSize=" + minLargeMessageSize); + + String message = largeMessage ? buildCoreLargeMessage() : "A message!"; + + try (Connection conn = factory.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); + producer.send(session.createTextMessage(message)); + producer.close(); + } + + org.apache.activemq.artemis.tests.util.Wait.assertEquals(1L, q1::getMessageCount, 100, 100); + org.apache.activemq.artemis.tests.util.Wait.assertEquals(1L, q2::getMessageCount, 100, 100); + + try (Connection conn = factory_2.createConnection()) { + final Session session = conn.createSession(); + conn.start(); + + final MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); + TextMessage rcvMsg = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertEquals(message, rcvMsg.getText()); + consumer.close(); + } + + org.apache.activemq.artemis.tests.util.Wait.assertEquals(0L, q2::getMessageCount, 100, 100); + org.apache.activemq.artemis.tests.util.Wait.assertEquals(0L, q1::getMessageCount, 6000, 100); + + // give some time to peer_3 to receive messages (if any) + Thread.sleep(100); + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); // if messages are received here, this should error out + + // Then send messages on the broker directly connected to the peer, the messages should make it to the peer. + // Receiving these 3 messages in that order confirms that no previous data reched the Peer, therefore validating + // the test. + final String peer3Message = "test"; + peer_3.expectTransfer().accept(); // Address create + peer_3.expectTransfer().accept(); // Queue create + if (tunneling) { + peer_3.expectTransfer().withMessageFormat(messageFormat).accept(); + } else { + peer_3.expectTransfer().withMessageFormat(messageFormat).withMessage().withHeader().also().withDeliveryAnnotations().also().withMessageAnnotations().also().withProperties().also().withApplicationProperties().also().withValue(peer3Message).and().accept(); + } + + server_2.addAddressInfo(new AddressInfo(SimpleString.of(secondaryQueueName), RoutingType.ANYCAST)); + server_2.createQueue(QueueConfiguration.of(secondaryQueueName).setRoutingType(RoutingType.ANYCAST)); + + try (Connection connection = factory_2.createConnection()) { + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createQueue(secondaryQueueName)); + final TextMessage msg = session.createTextMessage(peer3Message); + + connection.start(); + producer.send(msg); + } + + peer_3.waitForScriptToComplete(5, TimeUnit.SECONDS); + + server.stop(); + server_2.stop(); + } + } + +}