Skip to content

Commit

Permalink
Test use of PCollections
Browse files Browse the repository at this point in the history
  • Loading branch information
hylkevds committed Aug 20, 2024
1 parent 5761bce commit ca474d3
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 47 deletions.
6 changes: 6 additions & 0 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@
<version>1.15</version>
</dependency>

<dependency>
<groupId>org.pcollections</groupId>
<artifactId>pcollections</artifactId>
<version>4.0.2</version>
</dependency>

<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
Expand Down
77 changes: 35 additions & 42 deletions broker/src/main/java/io/moquette/broker/subscriptions/CNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -33,52 +34,48 @@
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import org.pcollections.PMap;
import org.pcollections.TreePMap;

class CNode implements Comparable<CNode> {

public static final Random SECURE_RANDOM = new SecureRandom();
private final Token token;
private final List<INode> children;
// Sorted list of subscriptions. The sort is necessary for fast access, instead of linear scan.
private List<Subscription> subscriptions;
private PMap<String, INode> children;
// Map of subscriptions. Not a Set, because Set doesn't have a Get method and we may need to update.
private PMap<Subscription, Subscription> subscriptions;
// the list of SharedSubscription is sorted. The sort is necessary for fast access, instead of linear scan.
private Map<ShareName, List<SharedSubscription>> sharedSubscriptions;

CNode(Token token) {
this.children = new ArrayList<>();
this.subscriptions = new ArrayList<>();
this.children = TreePMap.empty();
this.subscriptions = TreePMap.empty();
this.sharedSubscriptions = new HashMap<>();
this.token = token;
}

//Copy constructor
private CNode(Token token, List<INode> children, List<Subscription> subscriptions, Map<ShareName,
List<SharedSubscription>> sharedSubscriptions) {
private CNode(Token token, PMap<String, INode> children, PMap<Subscription, Subscription> subscriptions, Map<ShareName, List<SharedSubscription>> sharedSubscriptions) {
this.token = token; // keep reference, root comparison in directory logic relies on it for now.
this.subscriptions = new ArrayList<>(subscriptions);
this.subscriptions = subscriptions;
this.sharedSubscriptions = new HashMap<>(sharedSubscriptions);
this.children = new ArrayList<>(children);
this.children = children;
}

public Token getToken() {
return token;
}

List<INode> allChildren() {
return new ArrayList<>(this.children);
Collection<INode> allChildren() {
return this.children.values();
}

Optional<INode> childOf(Token token) {
int idx = findIndexForToken(token);
if (idx < 0) {
INode value = children.get(token.name);
if (value == null) {
return Optional.empty();
}
return Optional.of(children.get(idx));
}

private int findIndexForToken(Token token) {
final INode tempTokenNode = new INode(new CNode(token));
return Collections.binarySearch(children, tempTokenNode, (INode node, INode tokenHolder) -> node.mainNode().token.compareTo(tokenHolder.mainNode().token));
return Optional.of(value);
}

@Override
Expand All @@ -91,17 +88,15 @@ CNode copy() {
}

public void add(INode newINode) {
int idx = findIndexForToken(newINode.mainNode().token);
if (idx < 0) {
children.add(-1 - idx, newINode);
} else {
children.add(idx, newINode);
}
final String tokenName = newINode.mainNode().token.name;
children = children.plus(tokenName, newINode);
}

public INode remove(INode node) {
int idx = findIndexForToken(node.mainNode().token);
return this.children.remove(idx);
final String tokenName = node.mainNode().token.name;
INode toRemove = children.get(tokenName);
children = children.minus(tokenName);
return toRemove;
}

private List<Subscription> sharedSubscriptions() {
Expand All @@ -118,8 +113,8 @@ private List<Subscription> sharedSubscriptions() {
return selectedSubscriptions;
}

List<Subscription> subscriptions() {
return subscriptions;
Set<Subscription> subscriptions() {
return subscriptions.keySet();
}

// Mutating operation
Expand All @@ -141,25 +136,23 @@ CNode addSubscription(SubscriptionRequest request) {
final Subscription newSubscription = request.subscription();

// if already contains one with same topic and same client, keep that with higher QoS
int idx = Collections.binarySearch(subscriptions, newSubscription);
if (idx >= 0) {
final Subscription existing = subscriptions.get(newSubscription);
if (existing != null) {
// Subscription already exists
final Subscription existing = subscriptions.get(idx);
if (needsToUpdateExistingSubscription(newSubscription, existing)) {
subscriptions.set(idx, newSubscription);
subscriptions = subscriptions.plus(newSubscription, newSubscription);
}
} else {
// insert into the expected index so that the sorting is maintained
this.subscriptions.add(-1 - idx, newSubscription);
subscriptions = subscriptions.plus(newSubscription, newSubscription);
}
}
return this;
}

private static boolean needsToUpdateExistingSubscription(Subscription newSubscription, Subscription existing) {
if ((newSubscription.hasSubscriptionIdentifier() && existing.hasSubscriptionIdentifier()) &&
newSubscription.getSubscriptionIdentifier().equals(existing.getSubscriptionIdentifier())
) {
if ((newSubscription.hasSubscriptionIdentifier() && existing.hasSubscriptionIdentifier())
&& newSubscription.getSubscriptionIdentifier().equals(existing.getSubscriptionIdentifier())) {
// if subscription identifier hasn't changed,
// then check QoS but don't lower the requested QoS level
return existing.option().qos().value() < newSubscription.option().qos().value();
Expand All @@ -177,7 +170,7 @@ private static boolean needsToUpdateExistingSubscription(Subscription newSubscri
* AND at least one subscription is actually present for that clientId
* */
boolean containsOnly(String clientId) {
for (Subscription sub : this.subscriptions) {
for (Subscription sub : this.subscriptions.values()) {
if (!sub.clientId.equals(clientId)) {
return false;
}
Expand Down Expand Up @@ -207,7 +200,7 @@ private static SharedSubscription wrapKey(String clientId) {

//TODO this is equivalent to negate(containsOnly(clientId))
private boolean containsSubscriptionsForClient(String clientId) {
for (Subscription sub : this.subscriptions) {
for (Subscription sub : this.subscriptions.values()) {
if (sub.clientId.equals(clientId)) {
return true;
}
Expand All @@ -233,13 +226,13 @@ void removeSubscriptionsFor(UnsubscribeRequest request) {
} else {
// collect Subscription instances to remove
Set<Subscription> toRemove = new HashSet<>();
for (Subscription sub : this.subscriptions) {
for (Subscription sub : this.subscriptions.values()) {
if (sub.clientId.equals(clientId)) {
toRemove.add(sub);
}
}
// effectively remove the instances
this.subscriptions.removeAll(toRemove);
subscriptions = subscriptions.minusAll(toRemove);
}
}

Expand All @@ -251,7 +244,7 @@ public int compareTo(CNode o) {
public List<Subscription> sharedAndNonSharedSubscriptions() {
List<Subscription> shared = sharedSubscriptions();
List<Subscription> returnedSubscriptions = new ArrayList<>(subscriptions.size() + shared.size());
returnedSubscriptions.addAll(subscriptions);
returnedSubscriptions.addAll(subscriptions.values());
returnedSubscriptions.addAll(shared);
return returnedSubscriptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static io.moquette.broker.subscriptions.SubscriptionTestUtils.asSubscription;
import static io.moquette.broker.subscriptions.Topic.asTopic;
import java.util.List;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -54,11 +55,11 @@ public void testAddOnSecondLayerWithEmptyTokenOnEmptyTree() {
assertThat(this.sut.root.mainNode().subscriptions()).isEmpty();
assertThat(this.sut.root.mainNode().allChildren()).isNotEmpty();

INode firstLayer = this.sut.root.mainNode().allChildren().get(0);
INode firstLayer = this.sut.root.mainNode().allChildren().stream().findFirst().get();
assertThat(firstLayer.mainNode().subscriptions()).isEmpty();
assertThat(firstLayer.mainNode().allChildren()).isNotEmpty();

INode secondLayer = firstLayer.mainNode().allChildren().get(0);
INode secondLayer = firstLayer.mainNode().allChildren().stream().findFirst().get();
assertThat(secondLayer.mainNode().subscriptions()).isNotEmpty();
assertThat(secondLayer.mainNode().allChildren()).isEmpty();
}
Expand Down Expand Up @@ -99,7 +100,7 @@ public void testAddNewSubscriptionOnExistingNode() {
//Verify
final Optional<CNode> matchedNode = sut.lookup(asTopic("/temp"));
assertTrue(matchedNode.isPresent(), "Node on path /temp must be present");
final List<Subscription> subscriptions = matchedNode.get().subscriptions();
final Set<Subscription> subscriptions = matchedNode.get().subscriptions();
assertTrue(subscriptions.contains(asSubscription("TempSensor2", "/temp")));
}

Expand All @@ -117,7 +118,7 @@ public void testAddNewDeepNodes() {
//Verify
final Optional<CNode> matchedNode = sut.lookup(asTopic("/italy/happiness"));
assertTrue(matchedNode.isPresent(), "Node on path /italy/happiness must be present");
final List<Subscription> subscriptions = matchedNode.get().subscriptions();
final Set<Subscription> subscriptions = matchedNode.get().subscriptions();
assertTrue(subscriptions.contains(asSubscription("HappinessSensor", "/italy/happiness")));
}

Expand Down

0 comments on commit ca474d3

Please sign in to comment.