Skip to content

Commit

Permalink
Rewrote subscription fetching to not copy lists of subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
hylkevds committed Sep 2, 2024
1 parent 0642876 commit e913366
Show file tree
Hide file tree
Showing 18 changed files with 252 additions and 135 deletions.
3 changes: 2 additions & 1 deletion broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.stream.Collectors;

import static io.moquette.broker.Utils.messageId;
import io.moquette.broker.subscriptions.SubscriptionCollection;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE;
Expand Down Expand Up @@ -842,7 +843,7 @@ private RoutingResults publish2Subscribers(String publisherClientId,
final boolean retainPublish = msg.fixedHeader().isRetain();
final Topic topic = new Topic(msg.variableHeader().topicName());
final MqttQoS publishingQos = msg.fixedHeader().qosLevel();
List<Subscription> topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic);
SubscriptionCollection topicMatchingSubscriptions = subscriptions.matchWithoutQosSharpening(topic);
if (topicMatchingSubscriptions.isEmpty()) {
// no matching subscriptions, clean exit
LOG.trace("No matching subscriptions for topic: {}", topic);
Expand Down
37 changes: 10 additions & 27 deletions broker/src/main/java/io/moquette/broker/subscriptions/CNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,20 @@ class CNode implements Comparable<CNode> {
// Map of subscriptions per clientId.
private PMap<String, Subscription> subscriptions;
// the list of SharedSubscription is sorted. The sort is necessary for fast access, instead of linear scan.
private Map<ShareName, List<SharedSubscription>> sharedSubscriptions;
private PMap<ShareName, List<SharedSubscription>> sharedSubscriptions;

CNode(Token token) {
this.children = TreePMap.empty();
this.subscriptions = TreePMap.empty();
this.sharedSubscriptions = new HashMap<>();
this.sharedSubscriptions = TreePMap.empty();
this.token = token;
}

//Copy constructor
private CNode(Token token, PMap<String, INode> children, PMap<String, Subscription> subscriptions, Map<ShareName, List<SharedSubscription>> sharedSubscriptions) {
private CNode(Token token, PMap<String, INode> children, PMap<String, Subscription> subscriptions, PMap<ShareName, List<SharedSubscription>> sharedSubscriptions) {
this.token = token; // keep reference, root comparison in directory logic relies on it for now.
this.subscriptions = subscriptions;
this.sharedSubscriptions = new HashMap<>(sharedSubscriptions);
this.sharedSubscriptions = sharedSubscriptions;
this.children = children;
}

Expand Down Expand Up @@ -99,22 +99,12 @@ public INode remove(INode node) {
return toRemove;
}

private List<Subscription> sharedSubscriptions() {
List<Subscription> selectedSubscriptions = new ArrayList<>(sharedSubscriptions.size());
// for each sharedSubscription related to a ShareName, select one subscription
for (Map.Entry<ShareName, List<SharedSubscription>> subsForName : sharedSubscriptions.entrySet()) {
List<SharedSubscription> list = subsForName.getValue();
final String shareName = subsForName.getKey().getShareName();
// select a subscription randomly
int randIdx = SECURE_RANDOM.nextInt(list.size());
SharedSubscription sub = list.get(randIdx);
selectedSubscriptions.add(sub.createSubscription());
}
return selectedSubscriptions;
public PMap<String, Subscription> getSubscriptions() {
return subscriptions;
}

Collection<Subscription> subscriptions() {
return subscriptions.values();
public PMap<ShareName, List<SharedSubscription>> getSharedSubscriptions() {
return sharedSubscriptions;
}

// Mutating operation
Expand Down Expand Up @@ -214,9 +204,9 @@ void removeSubscriptionsFor(UnsubscribeRequest request) {
subscriptionsForName.removeAll(toRemove);

if (subscriptionsForName.isEmpty()) {
this.sharedSubscriptions.remove(request.getSharedName());
sharedSubscriptions = sharedSubscriptions.minus(request.getSharedName());
} else {
this.sharedSubscriptions.replace(request.getSharedName(), subscriptionsForName);
sharedSubscriptions = sharedSubscriptions.plus(request.getSharedName(), subscriptionsForName);
}
} else {
subscriptions = subscriptions.minus(clientId);
Expand All @@ -228,11 +218,4 @@ public int compareTo(CNode o) {
return token.compareTo(o.token);
}

public List<Subscription> sharedAndNonSharedSubscriptions() {
List<Subscription> shared = sharedSubscriptions();
List<Subscription> returnedSubscriptions = new ArrayList<>(subscriptions.size() + shared.size());
returnedSubscriptions.addAll(subscriptions.values());
returnedSubscriptions.addAll(shared);
return returnedSubscriptions;
}
}
29 changes: 17 additions & 12 deletions broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public SubscriptionIdentifier getSubscriptionIdentifier() {
* Models a request to unsubscribe a client, it's carrier for the Subscription
* */
public final static class UnsubscribeRequest {

private final Topic topicFilter;
private final String clientId;
private boolean shared = false;
Expand Down Expand Up @@ -231,21 +232,25 @@ private NavigationAction evaluate(Topic topicName, CNode cnode, int depth) {
return NavigationAction.STOP;
}

public List<Subscription> recursiveMatch(Topic topicName) {
return recursiveMatch(topicName, this.root, 0);
public SubscriptionCollection recursiveMatch(Topic topicName) {
SubscriptionCollection subscriptions = new SubscriptionCollection();
recursiveMatch(topicName, this.root, 0, subscriptions);
return subscriptions;
}

private List<Subscription> recursiveMatch(Topic topicName, INode inode, int depth) {
private void recursiveMatch(Topic topicName, INode inode, int depth, SubscriptionCollection target) {
CNode cnode = inode.mainNode();
if (cnode instanceof TNode) {
return Collections.emptyList();
return;
}
NavigationAction action = evaluate(topicName, cnode, depth);
if (action == NavigationAction.MATCH) {
return cnode.sharedAndNonSharedSubscriptions();
target.addNormalSubscriptions(cnode.getSubscriptions());
target.addSharedSubscriptions(cnode.getSharedSubscriptions());
return;
}
if (action == NavigationAction.STOP) {
return Collections.emptyList();
return;
}
final boolean isRoot = ROOT.equals(cnode.getToken());
final boolean isSingle = Token.SINGLE.equals(cnode.getToken());
Expand All @@ -256,27 +261,27 @@ private List<Subscription> recursiveMatch(Topic topicName, INode inode, int dept
: (isSingle || isMulti)
? topicName.exceptFullHeadToken()
: topicName.exceptHeadToken();
List<Subscription> subscriptions = new ArrayList<>();
SubscriptionCollection subscriptions = new SubscriptionCollection();

// We should only consider the maximum three children children of
// type #, + or exact match
Optional<INode> subInode = cnode.childOf(Token.MULTI);
if (subInode.isPresent()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
recursiveMatch(remainingTopic, subInode.get(), depth + 1, target);
}
subInode = cnode.childOf(Token.SINGLE);
if (subInode.isPresent()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
recursiveMatch(remainingTopic, subInode.get(), depth + 1, target);
}
if (remainingTopic.isEmpty()) {
subscriptions.addAll(cnode.sharedAndNonSharedSubscriptions());
target.addNormalSubscriptions(cnode.getSubscriptions());
target.addSharedSubscriptions(cnode.getSharedSubscriptions());
} else {
subInode = cnode.childOf(remainingTopic.headToken());
if (subInode.isPresent()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
recursiveMatch(remainingTopic, subInode.get(), depth + 1, target);
}
}
return subscriptions;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -77,34 +78,10 @@ public void init(ISubscriptionsRepository subscriptionsRepository) {
* @return the list of matching subscriptions, or empty if not matching.
*/
@Override
public List<Subscription> matchWithoutQosSharpening(Topic topicName) {
public SubscriptionCollection matchWithoutQosSharpening(Topic topicName) {
return ctrie.recursiveMatch(topicName);
}

@Override
public List<Subscription> matchQosSharpening(Topic topicName) {
final List<Subscription> subscriptions = matchWithoutQosSharpening(topicName);

// for each session select the subscription with higher QoS
return selectSubscriptionsWithHigherQoSForEachSession(subscriptions);
}

private static List<Subscription> selectSubscriptionsWithHigherQoSForEachSession(List<Subscription> subscriptions) {
// for each session select the subscription with higher QoS
Map<String, Subscription> subsGroupedByClient = new HashMap<>();
for (Subscription sub : subscriptions) {
// If same client is subscribed to two different shared subscription that overlaps
// then it has to return both subscriptions, because the share name made them independent.
final String key = sub.clientAndShareName();
Subscription existingSub = subsGroupedByClient.get(key);
// update the selected subscriptions if not present or if it has a greater qos
if (existingSub == null || existingSub.qosLessThan(sub)) {
subsGroupedByClient.put(key, sub);
}
}
return new ArrayList<>(subsGroupedByClient.values());
}

@Override
public boolean add(String clientId, Topic filter, MqttSubscriptionOption option) {
SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,21 @@ private String prettySubscriptions(CNode node) {
if (node instanceof TNode) {
return "TNode";
}
if (node.subscriptions().isEmpty()) {
if (node.getSubscriptions().isEmpty()) {
return StringUtil.EMPTY_STRING;
}
StringBuilder subScriptionsStr = new StringBuilder(" ~~[");
int counter = 0;
for (Subscription couple : node.subscriptions()) {
for (Subscription couple : node.getSubscriptions().values()) {
subScriptionsStr
.append("{filter=").append(couple.topicFilter).append(", ")
.append("option=").append(couple.option()).append(", ")
.append("client='").append(couple.clientId).append("'}");
counter++;
if (counter < node.subscriptions().size()) {
subScriptionsStr.append(";");
}
subScriptionsStr.append(";");
}
return subScriptionsStr.append("]").toString();
final int length = subScriptionsStr.length();
return subScriptionsStr.replace(length - 1, length, "]").toString();
}

private String indentTabs(int deep) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ public interface ISubscriptionsDirectory {

void init(ISubscriptionsRepository sessionsRepository);

List<Subscription> matchWithoutQosSharpening(Topic topic);

List<Subscription> matchQosSharpening(Topic topic);
SubscriptionCollection matchWithoutQosSharpening(Topic topic);

boolean add(String clientId, Topic filter, MqttSubscriptionOption option);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* Shared subscription's name.
*/
// It's public because used by PostOffice
public final class ShareName {
public final class ShareName implements Comparable<ShareName>{
private final String shareName;

public ShareName(String shareName) {
Expand All @@ -36,8 +36,8 @@ public boolean equals(Object o) {
return Objects.equals(shareName, (String) o);
}
if (getClass() != o.getClass()) return false;
ShareName shareName1 = (ShareName) o;
return Objects.equals(shareName, shareName1.shareName);
ShareName oShareName = (ShareName) o;
return Objects.equals(shareName, oShareName.shareName);
}

public String getShareName() {
Expand All @@ -55,4 +55,9 @@ public String toString() {
"shareName='" + shareName + '\'' +
'}';
}

@Override
public int compareTo(ShareName o) {
return shareName.compareTo(o.shareName);
}
}
Loading

0 comments on commit e913366

Please sign in to comment.