From e9133660540249defe5f7c29a7ab65a45ef8605c Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Sat, 31 Aug 2024 17:43:28 +0200 Subject: [PATCH] Rewrote subscription fetching to not copy lists of subscriptions --- .../java/io/moquette/broker/PostOffice.java | 3 +- .../moquette/broker/subscriptions/CNode.java | 37 ++--- .../moquette/broker/subscriptions/CTrie.java | 29 ++-- .../CTrieSubscriptionDirectory.java | 27 +--- .../broker/subscriptions/DumpTreeVisitor.java | 11 +- .../ISubscriptionsDirectory.java | 4 +- .../broker/subscriptions/ShareName.java | 11 +- .../subscriptions/SubscriptionCollection.java | 133 ++++++++++++++++++ .../SubscriptionCounterVisitor.java | 4 +- .../broker/PostOfficeInternalPublishTest.java | 3 +- .../broker/PostOfficePublishTest.java | 3 +- .../broker/PostOfficeSubscribeTest.java | 5 +- .../broker/PostOfficeUnsubscribeTest.java | 3 +- ...aredSubscriptionDirectoryMatchingTest.java | 14 +- .../broker/subscriptions/CTrieSpeedTest.java | 12 +- ...TrieSubscriptionDirectoryMatchingTest.java | 29 ++-- .../broker/subscriptions/CTrieTest.java | 53 ++++--- .../mqtt5/RequestResponseTest.java | 6 + 18 files changed, 252 insertions(+), 135 deletions(-) create mode 100644 broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index 5ff719ac5..880c28460 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -58,6 +58,7 @@ import java.util.stream.Collectors; import static io.moquette.broker.Utils.messageId; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; @@ -842,7 +843,7 @@ private RoutingResults publish2Subscribers(String publisherClientId, final boolean retainPublish = msg.fixedHeader().isRetain(); final Topic topic = new Topic(msg.variableHeader().topicName()); final MqttQoS publishingQos = msg.fixedHeader().qosLevel(); - List topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic); + SubscriptionCollection topicMatchingSubscriptions = subscriptions.matchWithoutQosSharpening(topic); if (topicMatchingSubscriptions.isEmpty()) { // no matching subscriptions, clean exit LOG.trace("No matching subscriptions for topic: {}", topic); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java index 232c74171..a37fe05c7 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CNode.java @@ -45,20 +45,20 @@ class CNode implements Comparable { // Map of subscriptions per clientId. private PMap subscriptions; // the list of SharedSubscription is sorted. The sort is necessary for fast access, instead of linear scan. - private Map> sharedSubscriptions; + private PMap> sharedSubscriptions; CNode(Token token) { this.children = TreePMap.empty(); this.subscriptions = TreePMap.empty(); - this.sharedSubscriptions = new HashMap<>(); + this.sharedSubscriptions = TreePMap.empty(); this.token = token; } //Copy constructor - private CNode(Token token, PMap children, PMap subscriptions, Map> sharedSubscriptions) { + private CNode(Token token, PMap children, PMap subscriptions, PMap> sharedSubscriptions) { this.token = token; // keep reference, root comparison in directory logic relies on it for now. this.subscriptions = subscriptions; - this.sharedSubscriptions = new HashMap<>(sharedSubscriptions); + this.sharedSubscriptions = sharedSubscriptions; this.children = children; } @@ -99,22 +99,12 @@ public INode remove(INode node) { return toRemove; } - private List sharedSubscriptions() { - List selectedSubscriptions = new ArrayList<>(sharedSubscriptions.size()); - // for each sharedSubscription related to a ShareName, select one subscription - for (Map.Entry> subsForName : sharedSubscriptions.entrySet()) { - List list = subsForName.getValue(); - final String shareName = subsForName.getKey().getShareName(); - // select a subscription randomly - int randIdx = SECURE_RANDOM.nextInt(list.size()); - SharedSubscription sub = list.get(randIdx); - selectedSubscriptions.add(sub.createSubscription()); - } - return selectedSubscriptions; + public PMap getSubscriptions() { + return subscriptions; } - Collection subscriptions() { - return subscriptions.values(); + public PMap> getSharedSubscriptions() { + return sharedSubscriptions; } // Mutating operation @@ -214,9 +204,9 @@ void removeSubscriptionsFor(UnsubscribeRequest request) { subscriptionsForName.removeAll(toRemove); if (subscriptionsForName.isEmpty()) { - this.sharedSubscriptions.remove(request.getSharedName()); + sharedSubscriptions = sharedSubscriptions.minus(request.getSharedName()); } else { - this.sharedSubscriptions.replace(request.getSharedName(), subscriptionsForName); + sharedSubscriptions = sharedSubscriptions.plus(request.getSharedName(), subscriptionsForName); } } else { subscriptions = subscriptions.minus(clientId); @@ -228,11 +218,4 @@ public int compareTo(CNode o) { return token.compareTo(o.token); } - public List sharedAndNonSharedSubscriptions() { - List shared = sharedSubscriptions(); - List returnedSubscriptions = new ArrayList<>(subscriptions.size() + shared.size()); - returnedSubscriptions.addAll(subscriptions.values()); - returnedSubscriptions.addAll(shared); - return returnedSubscriptions; - } } diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java index 9cbbc9ef6..9bdb03588 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java @@ -118,6 +118,7 @@ public SubscriptionIdentifier getSubscriptionIdentifier() { * Models a request to unsubscribe a client, it's carrier for the Subscription * */ public final static class UnsubscribeRequest { + private final Topic topicFilter; private final String clientId; private boolean shared = false; @@ -231,21 +232,25 @@ private NavigationAction evaluate(Topic topicName, CNode cnode, int depth) { return NavigationAction.STOP; } - public List recursiveMatch(Topic topicName) { - return recursiveMatch(topicName, this.root, 0); + public SubscriptionCollection recursiveMatch(Topic topicName) { + SubscriptionCollection subscriptions = new SubscriptionCollection(); + recursiveMatch(topicName, this.root, 0, subscriptions); + return subscriptions; } - private List recursiveMatch(Topic topicName, INode inode, int depth) { + private void recursiveMatch(Topic topicName, INode inode, int depth, SubscriptionCollection target) { CNode cnode = inode.mainNode(); if (cnode instanceof TNode) { - return Collections.emptyList(); + return; } NavigationAction action = evaluate(topicName, cnode, depth); if (action == NavigationAction.MATCH) { - return cnode.sharedAndNonSharedSubscriptions(); + target.addNormalSubscriptions(cnode.getSubscriptions()); + target.addSharedSubscriptions(cnode.getSharedSubscriptions()); + return; } if (action == NavigationAction.STOP) { - return Collections.emptyList(); + return; } final boolean isRoot = ROOT.equals(cnode.getToken()); final boolean isSingle = Token.SINGLE.equals(cnode.getToken()); @@ -256,27 +261,27 @@ private List recursiveMatch(Topic topicName, INode inode, int dept : (isSingle || isMulti) ? topicName.exceptFullHeadToken() : topicName.exceptHeadToken(); - List subscriptions = new ArrayList<>(); + SubscriptionCollection subscriptions = new SubscriptionCollection(); // We should only consider the maximum three children children of // type #, + or exact match Optional subInode = cnode.childOf(Token.MULTI); if (subInode.isPresent()) { - subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1)); + recursiveMatch(remainingTopic, subInode.get(), depth + 1, target); } subInode = cnode.childOf(Token.SINGLE); if (subInode.isPresent()) { - subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1)); + recursiveMatch(remainingTopic, subInode.get(), depth + 1, target); } if (remainingTopic.isEmpty()) { - subscriptions.addAll(cnode.sharedAndNonSharedSubscriptions()); + target.addNormalSubscriptions(cnode.getSubscriptions()); + target.addSharedSubscriptions(cnode.getSharedSubscriptions()); } else { subInode = cnode.childOf(remainingTopic.headToken()); if (subInode.isPresent()) { - subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1)); + recursiveMatch(remainingTopic, subInode.get(), depth + 1, target); } } - return subscriptions; } /** diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java index 88feea7e1..a7c15987e 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -77,34 +78,10 @@ public void init(ISubscriptionsRepository subscriptionsRepository) { * @return the list of matching subscriptions, or empty if not matching. */ @Override - public List matchWithoutQosSharpening(Topic topicName) { + public SubscriptionCollection matchWithoutQosSharpening(Topic topicName) { return ctrie.recursiveMatch(topicName); } - @Override - public List matchQosSharpening(Topic topicName) { - final List subscriptions = matchWithoutQosSharpening(topicName); - - // for each session select the subscription with higher QoS - return selectSubscriptionsWithHigherQoSForEachSession(subscriptions); - } - - private static List selectSubscriptionsWithHigherQoSForEachSession(List subscriptions) { - // for each session select the subscription with higher QoS - Map subsGroupedByClient = new HashMap<>(); - for (Subscription sub : subscriptions) { - // If same client is subscribed to two different shared subscription that overlaps - // then it has to return both subscriptions, because the share name made them independent. - final String key = sub.clientAndShareName(); - Subscription existingSub = subsGroupedByClient.get(key); - // update the selected subscriptions if not present or if it has a greater qos - if (existingSub == null || existingSub.qosLessThan(sub)) { - subsGroupedByClient.put(key, sub); - } - } - return new ArrayList<>(subsGroupedByClient.values()); - } - @Override public boolean add(String clientId, Topic filter, MqttSubscriptionOption option) { SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java b/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java index 3647ee0b8..09c73b2db 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/DumpTreeVisitor.java @@ -31,22 +31,21 @@ private String prettySubscriptions(CNode node) { if (node instanceof TNode) { return "TNode"; } - if (node.subscriptions().isEmpty()) { + if (node.getSubscriptions().isEmpty()) { return StringUtil.EMPTY_STRING; } StringBuilder subScriptionsStr = new StringBuilder(" ~~["); int counter = 0; - for (Subscription couple : node.subscriptions()) { + for (Subscription couple : node.getSubscriptions().values()) { subScriptionsStr .append("{filter=").append(couple.topicFilter).append(", ") .append("option=").append(couple.option()).append(", ") .append("client='").append(couple.clientId).append("'}"); counter++; - if (counter < node.subscriptions().size()) { - subScriptionsStr.append(";"); - } + subScriptionsStr.append(";"); } - return subScriptionsStr.append("]").toString(); + final int length = subScriptionsStr.length(); + return subScriptionsStr.replace(length - 1, length, "]").toString(); } private String indentTabs(int deep) { diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java index 97af38035..d6e11c5c3 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java @@ -27,9 +27,7 @@ public interface ISubscriptionsDirectory { void init(ISubscriptionsRepository sessionsRepository); - List matchWithoutQosSharpening(Topic topic); - - List matchQosSharpening(Topic topic); + SubscriptionCollection matchWithoutQosSharpening(Topic topic); boolean add(String clientId, Topic filter, MqttSubscriptionOption option); diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java b/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java index 2ec25b0fe..3f5f66caf 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/ShareName.java @@ -21,7 +21,7 @@ * Shared subscription's name. */ // It's public because used by PostOffice -public final class ShareName { +public final class ShareName implements Comparable{ private final String shareName; public ShareName(String shareName) { @@ -36,8 +36,8 @@ public boolean equals(Object o) { return Objects.equals(shareName, (String) o); } if (getClass() != o.getClass()) return false; - ShareName shareName1 = (ShareName) o; - return Objects.equals(shareName, shareName1.shareName); + ShareName oShareName = (ShareName) o; + return Objects.equals(shareName, oShareName.shareName); } public String getShareName() { @@ -55,4 +55,9 @@ public String toString() { "shareName='" + shareName + '\'' + '}'; } + + @Override + public int compareTo(ShareName o) { + return shareName.compareTo(o.shareName); + } } diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java new file mode 100644 index 000000000..8f2065b0c --- /dev/null +++ b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCollection.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2012-2018 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.moquette.broker.subscriptions; + +import static io.moquette.broker.subscriptions.CNode.SECURE_RANDOM; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +/** + * A wrapper over multiple maps of normal subscriptions. + */ +public class SubscriptionCollection implements Iterable { + + private final List> normalSubscriptions = new ArrayList<>(); + private final List>> sharedSubscriptions = new ArrayList<>(); + + public boolean isEmpty() { + return normalSubscriptions.isEmpty() && sharedSubscriptions.isEmpty(); + } + + /** + * Calculates the number of subscriptions. Expensive, only use for tests! + * @return the number of subscriptions. + */ + public int size() { + int total = 0; + for (Map var : normalSubscriptions) { + total += var.size(); + } + for (Map> var : sharedSubscriptions) { + total += var.size(); + } + return total; + } + + public void addNormalSubscriptions(Map subs) { + if (subs.isEmpty()) { + return; + } + normalSubscriptions.add(subs); + } + + public void addSharedSubscriptions(Map> subs) { + if (sharedSubscriptions.isEmpty()) { + return; + } + sharedSubscriptions.add(subs); + } + + private static Subscription selectRandom(List list) { + // select a subscription randomly + int randIdx = SECURE_RANDOM.nextInt(list.size()); + return list.get(randIdx).createSubscription(); + } + + @Override + public Iterator iterator() { + return new IteratorImpl(this); + } + + private static class IteratorImpl implements Iterator { + + private Iterator> normapSubListIter; + private Iterator normalSubIter; + + private Iterator>> sharedSubMapIter; + private Iterator> sharedSubIter; + + public IteratorImpl(SubscriptionCollection parent) { + normapSubListIter = parent.normalSubscriptions.iterator(); + sharedSubMapIter = parent.sharedSubscriptions.iterator(); + } + + @Override + public boolean hasNext() { + if (normalSubIter != null && normalSubIter.hasNext()) { + return true; + } + if (sharedSubIter != null && sharedSubIter.hasNext()) { + return true; + } + if (normapSubListIter != null) { + if (normapSubListIter.hasNext()) { + // Get the next normal subscriptions iterator. + Map next = normapSubListIter.next(); + normalSubIter = next.values().iterator(); + return true; + } else { + // Reached the end of the normal subscriptions lists. + normapSubListIter = null; + } + } + if (sharedSubMapIter != null) { + if (sharedSubMapIter.hasNext()) { + Map> next = sharedSubMapIter.next(); + sharedSubIter = next.values().iterator(); + return true; + } else { + sharedSubMapIter = null; + } + } + return false; + } + + @Override + public Subscription next() { + if (normalSubIter != null) { + return normalSubIter.next(); + } + if (sharedSubIter != null) { + return selectRandom(sharedSubIter.next()); + } + throw new NoSuchElementException("Fetched past the end of Iterator, make sure to call hasNext!"); + } + } + +} diff --git a/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCounterVisitor.java b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCounterVisitor.java index 863e944c3..9d06835d9 100644 --- a/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCounterVisitor.java +++ b/broker/src/main/java/io/moquette/broker/subscriptions/SubscriptionCounterVisitor.java @@ -19,11 +19,11 @@ class SubscriptionCounterVisitor implements CTrie.IVisitor { - private AtomicInteger accumulator = new AtomicInteger(0); + private final AtomicInteger accumulator = new AtomicInteger(0); @Override public void visit(CNode node, int deep) { - accumulator.addAndGet(node.subscriptions().size()); + accumulator.addAndGet(node.getSubscriptions().size()); } @Override diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java index 5c285ad14..626e79b89 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java @@ -37,6 +37,7 @@ import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static io.moquette.broker.PostOfficeUnsubscribeTest.CONFIG; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.singleton; @@ -338,7 +339,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index b3314fb6b..dfbf46dbf 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -40,6 +40,7 @@ import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static io.moquette.broker.PostOfficeUnsubscribeTest.CONFIG; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; @@ -198,7 +199,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java index f0b25dcff..8443b7bc8 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java @@ -43,6 +43,7 @@ import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static io.moquette.broker.PostOfficePublishTest.ALLOW_ANONYMOUS_AND_ZERO_BYTES_CLID; import static io.moquette.broker.PostOfficePublishTest.SUBSCRIBER_ID; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; import static java.util.Collections.singleton; @@ -146,7 +147,7 @@ protected void subscribe(EmbeddedChannel channel, String topic, MqttQoS desiredQ final String clientId = NettyUtils.clientID(channel); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); @@ -166,7 +167,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); assertEquals(expectedSubscription, onlyMatchedSubscription); diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java index 13bda420f..279e62995 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java @@ -39,6 +39,7 @@ import static io.moquette.broker.MQTTConnectionPublishTest.memorySessionsRepository; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; import static io.moquette.broker.PostOfficePublishTest.PUBLISHER_ID; +import io.moquette.broker.subscriptions.SubscriptionCollection; import static io.netty.handler.codec.mqtt.MqttQoS.*; import static java.util.Collections.*; import java.util.List; @@ -125,7 +126,7 @@ protected void subscribe(MQTTConnection connection, String topic, MqttQoS desire final String clientId = connection.getClientId(); Subscription expectedSubscription = new Subscription(clientId, new Topic(topic), MqttSubscriptionOption.onlyFromQos(desiredQos)); - final List matchedSubscriptions = subscriptions.matchQosSharpening(new Topic(topic)); + final SubscriptionCollection matchedSubscriptions = subscriptions.matchWithoutQosSharpening(new Topic(topic)); assertEquals(1, matchedSubscriptions.size()); //assertTrue(matchedSubscriptions.size() >=1); final Subscription onlyMatchedSubscription = matchedSubscriptions.iterator().next(); diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java index cb83fcb6f..bc72414fb 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSharedSubscriptionDirectoryMatchingTest.java @@ -54,7 +54,7 @@ public void whenManySharedSubscriptionsOfDifferentShareNameMatchATopicThenOneSub sut.addShared("TempSensor1", new ShareName("temp_sensors"), asTopic("/livingroom"), asOption(MqttQoS.AT_MOST_ONCE)); sut.addShared("TempSensor1", new ShareName("livingroom_devices"), asTopic("/livingroom"), asOption(MqttQoS.AT_MOST_ONCE)); - List matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); + SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); assertThat(matchingSubscriptions) .containsOnly(SubscriptionTestUtils.asSubscription("TempSensor1", "/livingroom", "temp_sensors"), SubscriptionTestUtils.asSubscription("TempSensor1", "/livingroom", "livingroom_devices")) @@ -71,7 +71,7 @@ public void givenSessionHasMultipleSharedSubscriptionWhenTheClientIsRemovedThenN sut.removeSharedSubscriptionsForClient(clientId); // Verify - List matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); + SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/livingroom")); assertThat(matchingSubscriptions).isEmpty(); } @@ -82,7 +82,7 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThe new SubscriptionIdentifier(1)); // verify it contains the subscription identifier - final List matchingSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(matchingSubscriptions, new SubscriptionIdentifier(1), "share_temp"); // update the subscription of same clientId on same topic filter but with different subscription identifier @@ -90,11 +90,11 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThe new SubscriptionIdentifier(123)); // verify the subscription identifier is updated - final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection reloadedSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(reloadedSubscriptions, new SubscriptionIdentifier(123), "share_temp"); } - private static void verifySubscriptionIdentifierIsPresent(List matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier, String expectedShareName) { + private static void verifySubscriptionIdentifierIsPresent(SubscriptionCollection matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier, String expectedShareName) { assertAll("subscription contains the subscription identifier", () -> assertEquals(1, matchingSubscriptions.size()), () -> assertEquals(expectedShareName, matchingSubscriptions.iterator().next().shareName), @@ -111,13 +111,13 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionWithoutSubscri // verify it contains the subscription identifier SubscriptionIdentifier expectedSubscriptionId = new SubscriptionIdentifier(1); - verifySubscriptionIdentifierIsPresent(sut.matchQosSharpening(asTopic("client/test/b")), expectedSubscriptionId, "share_temp"); + verifySubscriptionIdentifierIsPresent(sut.matchWithoutQosSharpening(asTopic("client/test/b")), expectedSubscriptionId, "share_temp"); // update the subscription of same clientId on same topic filter but removing subscription identifier sut.addShared("client", new ShareName("share_temp"), asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE)); // verify the subscription identifier is removed - final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection reloadedSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); assertAll("subscription doesn't contain subscription identifier", () -> assertEquals(1, reloadedSubscriptions.size()), () -> assertFalse(reloadedSubscriptions.iterator().next().hasSubscriptionIdentifier()) diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java index 486a6607e..7f732cd53 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSpeedTest.java @@ -79,7 +79,7 @@ private static void clearResults() { } private static void logResults(Map>> set) { - StringBuilder header = new StringBuilder(); + StringBuilder header = new StringBuilder(set.values().iterator().next().values().iterator().next().get(0).topicCount+" topics"); TreeMap rowPerLength = new TreeMap<>(); for (Map.Entry>> entry : set.entrySet()) { Integer threads = entry.getKey(); @@ -278,8 +278,14 @@ public void readSubscriptions(CTrie tree, List subsToRead, final Subscription subscription = subReq.subscription(); final Topic topic = subReq.getTopicFilter(); - final List recursiveMatch = tree.recursiveMatch(topic); - final boolean contains = recursiveMatch.contains(subscription); + final SubscriptionCollection recursiveMatch = tree.recursiveMatch(topic); + boolean contains = false; + for (Subscription sub : recursiveMatch) { + if (sub.equals(subscription)) { + contains = true; + break; + } + } Assertions.assertTrue(contains, () -> "Failed to find " + subscription + " on " + topic + " found " + recursiveMatch.size() + " matches."); count++; diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java index 7d1438732..b6f31d88a 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectoryMatchingTest.java @@ -15,7 +15,6 @@ */ package io.moquette.broker.subscriptions; - import io.moquette.broker.ISubscriptionsRepository; import io.moquette.persistence.MemorySubscriptionsRepository; import io.netty.handler.codec.mqtt.MqttQoS; @@ -27,6 +26,7 @@ import static io.moquette.broker.subscriptions.CTrieSharedSubscriptionDirectoryMatchingTest.asOption; import static io.moquette.broker.subscriptions.Topic.asTopic; +import java.util.ArrayList; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.*; @@ -198,7 +198,7 @@ public void testOverlappingSubscriptions() { sut.add(specificSub.clientId, specificSub.topicFilter, specificSub.option()); //Exercise - final List matchingForSpecific = sut.matchQosSharpening(asTopic("a/b")); + final SubscriptionCollection matchingForSpecific = sut.matchWithoutQosSharpening(asTopic("a/b")); // Verify assertThat(matchingForSpecific.size()).isEqualTo(1); @@ -226,7 +226,7 @@ public void removeSubscription_sameClients_subscribedSameTopic() { sut.removeSubscription(asTopic("/topic"), "Sensor1"); // Verify - final List matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/topic")); + final SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("/topic")); assertThat(matchingSubscriptions).isEmpty(); } @@ -244,14 +244,17 @@ public void duplicatedSubscriptionsWithDifferentQos() { this.sut.add("client1", asTopic("client/test/b"), asOption(MqttQoS.EXACTLY_ONCE)); // Verify - List subscriptions = this.sut.matchQosSharpening(asTopic("client/test/b")); + SubscriptionCollection subscriptions = this.sut.matchWithoutQosSharpening(asTopic("client/test/b")); assertThat(subscriptions).contains(client1SubQoS2); assertThat(subscriptions).contains(client2Sub); - final Optional matchingClient1Sub = subscriptions - .stream() - .filter(s -> s.equals(client1SubQoS0)) - .findFirst(); + Optional matchingClient1Sub = Optional.empty(); + for (Subscription sub : subscriptions) { + if (sub.equals(client1SubQoS0)) { + matchingClient1Sub = Optional.of(sub); + break; + } + } assertTrue(matchingClient1Sub.isPresent()); Subscription client1Sub = matchingClient1Sub.get(); @@ -267,18 +270,18 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionIsProcessedThe sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(1)); // verify it contains the subscription identifier - final List matchingSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection matchingSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(matchingSubscriptions, new SubscriptionIdentifier(1)); // update the subscription of same clientId on same topic filter but with different subscription identifier sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE), new SubscriptionIdentifier(123)); // verify the subscription identifier is updated - final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection reloadedSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); verifySubscriptionIdentifierIsPresent(reloadedSubscriptions, new SubscriptionIdentifier(123)); } - private static void verifySubscriptionIdentifierIsPresent(List matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier) { + private static void verifySubscriptionIdentifierIsPresent(SubscriptionCollection matchingSubscriptions, SubscriptionIdentifier subscriptionIdentifier) { assertAll("subscription contains the subscription identifier", () -> assertEquals(1, matchingSubscriptions.size()), () -> assertTrue(matchingSubscriptions.iterator().next().hasSubscriptionIdentifier()), @@ -293,13 +296,13 @@ public void givenSubscriptionWithSubscriptionIdWhenNewSubscriptionWithoutSubscri // verify it contains the subscription identifier SubscriptionIdentifier expectedSubscriptionId = new SubscriptionIdentifier(1); - verifySubscriptionIdentifierIsPresent(sut.matchQosSharpening(asTopic("client/test/b")), expectedSubscriptionId); + verifySubscriptionIdentifierIsPresent(sut.matchWithoutQosSharpening(asTopic("client/test/b")), expectedSubscriptionId); // update the subscription of same clientId on same topic filter but removing subscription identifier sut.add("client", asTopic("client/test/b"), asOption(MqttQoS.AT_MOST_ONCE)); // verify the subscription identifier is removed - final List reloadedSubscriptions = sut.matchQosSharpening(asTopic("client/test/b")); + final SubscriptionCollection reloadedSubscriptions = sut.matchWithoutQosSharpening(asTopic("client/test/b")); assertAll("subscription doesn't contain subscription identifier", () -> assertEquals(1, reloadedSubscriptions.size()), () -> assertFalse(reloadedSubscriptions.iterator().next().hasSubscriptionIdentifier()) diff --git a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java index 7a7221abe..fd0c42e12 100644 --- a/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java +++ b/broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java @@ -15,7 +15,6 @@ */ package io.moquette.broker.subscriptions; - import io.moquette.broker.subscriptions.CTrie.SubscriptionRequest; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubscriptionOption; @@ -26,13 +25,11 @@ import static io.moquette.broker.subscriptions.SubscriptionTestUtils.asSubscription; import static io.moquette.broker.subscriptions.Topic.asTopic; -import java.util.Collection; -import java.util.List; -import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import org.pcollections.PMap; public class CTrieTest { @@ -53,15 +50,15 @@ public void testAddOnSecondLayerWithEmptyTokenOnEmptyTree() { final Optional matchedNode = sut.lookup(asTopic("/")); assertTrue(matchedNode.isPresent(), "Node on path / must be present"); //verify structure, only root INode and the first CNode should be present - assertThat(this.sut.root.mainNode().subscriptions()).isEmpty(); + assertThat(this.sut.root.mainNode().getSubscriptions()).isEmpty(); assertThat(this.sut.root.mainNode().allChildren()).isNotEmpty(); INode firstLayer = this.sut.root.mainNode().allChildren().stream().findFirst().get(); - assertThat(firstLayer.mainNode().subscriptions()).isEmpty(); + assertThat(firstLayer.mainNode().getSubscriptions()).isEmpty(); assertThat(firstLayer.mainNode().allChildren()).isNotEmpty(); INode secondLayer = firstLayer.mainNode().allChildren().stream().findFirst().get(); - assertThat(secondLayer.mainNode().subscriptions()).isNotEmpty(); + assertThat(secondLayer.mainNode().getSubscriptions()).isNotEmpty(); assertThat(secondLayer.mainNode().allChildren()).isEmpty(); } @@ -74,7 +71,7 @@ public void testAddFirstLayerNodeOnEmptyTree() { //Verify final Optional matchedNode = sut.lookup(asTopic("/temp")); assertTrue(matchedNode.isPresent(), "Node on path /temp must be present"); - assertFalse(matchedNode.get().subscriptions().isEmpty()); + assertFalse(matchedNode.get().getSubscriptions().isEmpty()); } @Test @@ -101,8 +98,8 @@ public void testAddNewSubscriptionOnExistingNode() { //Verify final Optional matchedNode = sut.lookup(asTopic("/temp")); assertTrue(matchedNode.isPresent(), "Node on path /temp must be present"); - final Collection subscriptions = matchedNode.get().subscriptions(); - assertTrue(subscriptions.contains(asSubscription("TempSensor2", "/temp"))); + final PMap subscriptions = matchedNode.get().getSubscriptions(); + assertTrue(subscriptions.containsValue(asSubscription("TempSensor2", "/temp"))); } @Test @@ -119,8 +116,8 @@ public void testAddNewDeepNodes() { //Verify final Optional matchedNode = sut.lookup(asTopic("/italy/happiness")); assertTrue(matchedNode.isPresent(), "Node on path /italy/happiness must be present"); - final Collection subscriptions = matchedNode.get().subscriptions(); - assertTrue(subscriptions.contains(asSubscription("HappinessSensor", "/italy/happiness"))); + final PMap subscriptions = matchedNode.get().getSubscriptions(); + assertTrue(subscriptions.containsValue(asSubscription("HappinessSensor", "/italy/happiness"))); } static SubscriptionRequest clientSubOnTopic(String clientID, String topicFilter) { @@ -193,7 +190,7 @@ public void givenTreeWithSomeNodeHierarchyWhenRemoveContainedSubscriptionThenNod sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("/temp/1"))); sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("/temp/1"))); - final List matchingSubs = sut.recursiveMatch(asTopic("/temp/2")); + final SubscriptionCollection matchingSubs = sut.recursiveMatch(asTopic("/temp/2")); //Verify final Subscription expectedMatchingsub = new Subscription("TempSensor1", asTopic("/temp/2"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -210,8 +207,8 @@ public void givenTreeWithSomeNodeHierarchWhenRemoveContainedSubscriptionSmallerT //Exercise sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("/temp"))); - final List matchingSubs1 = sut.recursiveMatch(asTopic("/temp/1")); - final List matchingSubs2 = sut.recursiveMatch(asTopic("/temp/2")); + final SubscriptionCollection matchingSubs1 = sut.recursiveMatch(asTopic("/temp/1")); + final SubscriptionCollection matchingSubs2 = sut.recursiveMatch(asTopic("/temp/2")); //Verify // not clear to me, but I believe /temp unsubscribe should not unsub you from downstream /temp/1 or /temp/2 @@ -239,7 +236,7 @@ public void testMatchSubscriptionNoWildcards() { sut.addToTree(newSubscription); //Exercise - final List matchingSubs = sut.recursiveMatch(asTopic("/temp")); + final SubscriptionCollection matchingSubs = sut.recursiveMatch(asTopic("/temp")); //Verify final Subscription expectedMatchingsub = new Subscription("TempSensor1", asTopic("/temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -254,8 +251,8 @@ public void testRemovalInnerTopicOffRootSameClient() { sut.addToTree(newSubscription); //Exercise - final List matchingSubs1 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs1 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -267,8 +264,8 @@ public void testRemovalInnerTopicOffRootSameClient() { sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("temp"))); //Exercise - final List matchingSubs3 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs3 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); assertThat(matchingSubs3).doesNotContain(expectedMatchingsub1); assertThat(matchingSubs4).contains(expectedMatchingsub2); @@ -282,8 +279,8 @@ public void testRemovalInnerTopicOffRootDiffClient() { sut.addToTree(newSubscription); //Exercise - final List matchingSubs1 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs1 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -295,8 +292,8 @@ public void testRemovalInnerTopicOffRootDiffClient() { sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor1", asTopic("temp"))); //Exercise - final List matchingSubs3 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs3 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); assertThat(matchingSubs3).doesNotContain(expectedMatchingsub1); assertThat(matchingSubs4).contains(expectedMatchingsub2); @@ -310,8 +307,8 @@ public void testRemovalOuterTopicOffRootDiffClient() { sut.addToTree(newSubscription); //Exercise - final List matchingSubs1 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs1 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs2 = sut.recursiveMatch(asTopic("temp/1")); //Verify final Subscription expectedMatchingsub1 = new Subscription("TempSensor1", asTopic("temp"), MqttSubscriptionOption.onlyFromQos(MqttQoS.AT_MOST_ONCE)); @@ -323,8 +320,8 @@ public void testRemovalOuterTopicOffRootDiffClient() { sut.removeFromTree(CTrie.UnsubscribeRequest.buildNonShared("TempSensor2", asTopic("temp/1"))); //Exercise - final List matchingSubs3 = sut.recursiveMatch(asTopic("temp")); - final List matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); + final SubscriptionCollection matchingSubs3 = sut.recursiveMatch(asTopic("temp")); + final SubscriptionCollection matchingSubs4 = sut.recursiveMatch(asTopic("temp/1")); assertThat(matchingSubs3).contains(expectedMatchingsub1); assertThat(matchingSubs4).doesNotContain(expectedMatchingsub2); diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java index 1eafab0ad..12ea43c8d 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java @@ -35,9 +35,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RequestResponseTest extends AbstractServerIntegrationWithoutClientFixture { + private static final Logger LOGGER = LoggerFactory.getLogger(RequestResponseTest.class.getName()); + @Test public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReply() throws InterruptedException { final Mqtt5BlockingClient requester = createHiveBlockingClient("requester"); @@ -60,9 +64,11 @@ private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient respo .topicFilter("requester/door/open") .qos(MqttQos.AT_LEAST_ONCE) .build(); + LOGGER.info("Subscribing to on requester/door/open"); responder.toAsync().subscribe(subscribeToRequest, (Mqtt5Publish pub) -> { assertTrue(pub.getResponseTopic().isPresent(), "Response topic MUST defined in request publish"); + LOGGER.info("Responding on {}", pub.getResponseTopic().get()); Mqtt5PublishResult responseResult = responder.publishWith() .topic(pub.getResponseTopic().get()) .payload("OK".getBytes(StandardCharsets.UTF_8))