Skip to content

Commit

Permalink
[improve][broker] Improve the extensibility of the TopicBundleAssignm…
Browse files Browse the repository at this point in the history
…entStrategy interface class (apache#23773)
  • Loading branch information
rayluoluo committed Jan 7, 2025
1 parent b92a611 commit 817e419
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,71 +27,16 @@ public class ConsistentHashingTopicBundleAssigner implements TopicBundleAssignme
private PulsarService pulsar;
@Override
public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
<<<<<<< Updated upstream
<<<<<<< Updated upstream
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
=======
<<<<<<< HEAD
<<<<<<< HEAD
NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName));
=======
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
>>>>>>> Stashed changes
=======
NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName));
>>>>>>> Stashed changes
if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
bundle.setHasNonPersistentTopic(true);
}
return bundle;
}

@Override
<<<<<<< Updated upstream
<<<<<<< Updated upstream
=======
<<<<<<< HEAD
<<<<<<< HEAD
public long calculateBundleHashCode(TopicName topicName) {
return getBundleHashFunc().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
=======
=======
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
>>>>>>> Stashed changes
public long getHashCode(String name) {
return pulsar.getNamespaceService().getNamespaceBundleFactory().getHashFunc()
.hashString(name, StandardCharsets.UTF_8)
.padToLong();
<<<<<<< Updated upstream
}

@Override
public void init(PulsarService pulsarService) {
this.pulsar = pulsarService;
=======
<<<<<<< HEAD
=======
>>>>>>> Stashed changes
}

@Override
public void init(PulsarService pulsarService) {
this.pulsar = pulsarService;
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
}

@Override
public void init(PulsarService pulsarService) {
this.pulsar = pulsarService;
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
public long calculateBundleHashCode(TopicName topicName) {
return getBundleHashFunc().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
>>>>>>> Stashed changes
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,23 +293,7 @@ public CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn)
}

public long getLongHashCode(String name) {
<<<<<<< Updated upstream
<<<<<<< Updated upstream
return this.topicBundleAssignmentStrategy.getHashCode(name);
=======
<<<<<<< HEAD
<<<<<<< HEAD
return this.topicBundleAssignmentStrategy.calculateBundleHashCode(TopicName.get(name));
=======
return this.topicBundleAssignmentStrategy.getHashCode(name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
return this.topicBundleAssignmentStrategy.getHashCode(name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
>>>>>>> Stashed changes
=======
return this.topicBundleAssignmentStrategy.calculateBundleHashCode(TopicName.get(name));
>>>>>>> Stashed changes
}

public NamespaceBundles getBundles(NamespaceName nsname, BundlesData bundleData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,9 @@
public interface TopicBundleAssignmentStrategy {
NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles);

<<<<<<< Updated upstream
<<<<<<< Updated upstream
long getHashCode(String name);
=======
<<<<<<< HEAD
<<<<<<< HEAD
default long calculateBundleHashCode(TopicName topicName) {
return Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
}
=======
long getHashCode(String name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
long getHashCode(String name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
>>>>>>> Stashed changes
=======
default long calculateBundleHashCode(TopicName topicName) {
return Hashing.crc32().hashString(topicName.toString(), StandardCharsets.UTF_8).padToLong();
}
>>>>>>> Stashed changes

void init(PulsarService pulsarService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,7 @@ public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespac
}

@Override
<<<<<<< Updated upstream
<<<<<<< HEAD
<<<<<<< HEAD
public long calculateBundleHashCode(TopicName topicName) {
=======
public long getHashCode(String name) {
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
public long getHashCode(String name) {
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
public long calculateBundleHashCode(TopicName topicName) {
>>>>>>> Stashed changes
return 0;
}

Expand Down Expand Up @@ -145,45 +133,16 @@ public static class RoundRobinBundleAssigner implements TopicBundleAssignmentStr

@Override
public NamespaceBundle findBundle(TopicName topicName, NamespaceBundles namespaceBundles) {
<<<<<<< Updated upstream
<<<<<<< HEAD
<<<<<<< HEAD
NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName));
=======
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
NamespaceBundle bundle = namespaceBundles.getBundle(getHashCode(topicName.toString()));
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
NamespaceBundle bundle = namespaceBundles.getBundle(calculateBundleHashCode(topicName));
>>>>>>> Stashed changes
if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
bundle.setHasNonPersistentTopic(true);
}
return bundle;
}

@Override
<<<<<<< Updated upstream
<<<<<<< HEAD
<<<<<<< HEAD
public long calculateBundleHashCode(TopicName topicName) {
// use topic name without partition id to decide the first hash value
=======
public long getHashCode(String name) {
// use topic name without partition id to decide the first hash value
TopicName topicName = TopicName.get(name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
public long getHashCode(String name) {
// use topic name without partition id to decide the first hash value
TopicName topicName = TopicName.get(name);
>>>>>>> 43e069997e962052bc3376f3cf91e8191f9340bf
=======
public long calculateBundleHashCode(TopicName topicName) {
// use topic name without partition id to decide the first hash value
>>>>>>> Stashed changes
long currentPartitionTopicHash =
pulsar.getNamespaceService().getNamespaceBundleFactory().getHashFunc()
.hashString(topicName.getPartitionedTopicName(), Charsets.UTF_8).padToLong();
Expand Down

0 comments on commit 817e419

Please sign in to comment.