Skip to content

Commit

Permalink
Merge branch '2.x' into fix_#7140
Browse files Browse the repository at this point in the history
  • Loading branch information
xingfudeshi authored Mar 7, 2025
2 parents 875d7ee + efa341a commit 700085c
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
Expand All @@ -38,16 +40,67 @@ public class ConsistentHashLoadBalance implements LoadBalance {
* The constant LOAD_BALANCE_CONSISTENT_HASH_VIRTUAL_NODES.
*/
public static final String LOAD_BALANCE_CONSISTENT_HASH_VIRTUAL_NODES = LoadBalanceFactory.LOAD_BALANCE_PREFIX
+ "virtualNodes";
+ "virtualNodes";
/**
* The constant VIRTUAL_NODES_NUM.
*/
private static final int VIRTUAL_NODES_NUM = ConfigurationFactory.getInstance().getInt(
LOAD_BALANCE_CONSISTENT_HASH_VIRTUAL_NODES, VIRTUAL_NODES_DEFAULT);
LOAD_BALANCE_CONSISTENT_HASH_VIRTUAL_NODES, VIRTUAL_NODES_DEFAULT);

/**
* The ConsistentHashSelectorWrapper that caches a {@link ConsistentHashSelector}.
*/
private volatile ConsistentHashSelectorWrapper selectorWrapper;

@SuppressWarnings("unchecked")
@Override
public <T> T select(List<T> invokers, String xid) {
return new ConsistentHashSelector<>(invokers, VIRTUAL_NODES_NUM).select(xid);
if (selectorWrapper == null) {
synchronized (this) {
if (selectorWrapper == null) {
selectorWrapper = new ConsistentHashSelectorWrapper(
new ConsistentHashSelector<>(invokers, VIRTUAL_NODES_NUM), invokers);
}
}
}
return (T) selectorWrapper.getSelector(invokers).select(xid);
}

@SuppressWarnings({"rawtypes", "unchecked"})
private static final class ConsistentHashSelectorWrapper {

private volatile ConsistentHashSelector selector;
// only shared with read
private volatile Set invokers;

public ConsistentHashSelectorWrapper(ConsistentHashSelector selector, List invokers) {
this.selector = selector;
this.invokers = new HashSet<>(invokers);
}

public ConsistentHashSelector getSelector(List invokers) {
if (!equals(invokers)) {
synchronized (this) {
if (!equals(invokers)) {
selector = new ConsistentHashSelector(invokers, VIRTUAL_NODES_NUM);
this.invokers = new HashSet<>(invokers);
}
}
}
return selector;
}

private boolean equals(List invokers) {
if (invokers.size() != this.invokers.size()) {
return false;
}
for (Object invoker : invokers) {
if (!this.invokers.contains(invoker)) {
return false;
}
}
return true;
}
}

private static final class ConsistentHashSelector<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

Expand Down Expand Up @@ -85,7 +88,7 @@ public void testXIDLoadBalance_select(List<InetSocketAddress> addresses) throws
Assertions.assertNotNull(inetSocketAddress);
// test not found tc channel
inetSocketAddress = loadBalance.select(addresses, "127.0.0.1:8199:123456");
Assertions.assertNotEquals(inetSocketAddress.getPort(),8199);
Assertions.assertNotEquals(inetSocketAddress.getPort(), 8199);
}

/**
Expand All @@ -108,6 +111,31 @@ public void testConsistentHashLoadBalance_select(List<InetSocketAddress> address
Assertions.assertEquals(1, selected, "selected must be equal to 1");
}

/**
* Test cached consistent hash load balance select.
*
* @param addresses the addresses
*/
@ParameterizedTest
@MethodSource("addressProvider")
public void testCachedConsistentHashLoadBalance_select(List<InetSocketAddress> addresses) throws Exception {
ConsistentHashLoadBalance loadBalance = new ConsistentHashLoadBalance();

List<InetSocketAddress> addresses1 = new ArrayList<>(addresses);
loadBalance.select(addresses1, XID);
Object o1 = getConsistentHashSelectorByReflect(loadBalance);
List<InetSocketAddress> addresses2 = new ArrayList<>(addresses);
loadBalance.select(addresses2, XID);
Object o2 = getConsistentHashSelectorByReflect(loadBalance);
Assertions.assertEquals(o1, o2);

List<InetSocketAddress> addresses3 = new ArrayList<>(addresses);
addresses3.remove(ThreadLocalRandom.current().nextInt(addresses.size()));
loadBalance.select(addresses3, XID);
Object o3 = getConsistentHashSelectorByReflect(loadBalance);
Assertions.assertNotEquals(o1, o3);
}

/**
* Test least active load balance select.
*
Expand Down Expand Up @@ -166,6 +194,22 @@ public Map<InetSocketAddress, AtomicLong> getSelectedCounter(int runs, List<Inet
return counter;
}

/**
* Gets ConsistentHashSelector Instance By Reflect
*
* @param loadBalance the loadBalance
* @return the ConsistentHashSelector
*/
public Object getConsistentHashSelectorByReflect(ConsistentHashLoadBalance loadBalance) throws Exception {
Field selectorWrapperField = ConsistentHashLoadBalance.class.getDeclaredField("selectorWrapper");
selectorWrapperField.setAccessible(true);
Object selectWrapper = selectorWrapperField.get(loadBalance);
Assertions.assertNotNull(selectWrapper);
Field selectorField = selectWrapper.getClass().getDeclaredField("selector");
selectorField.setAccessible(true);
return selectorField.get(selectWrapper);
}

/**
* Address provider object [ ] [ ].
*
Expand Down

0 comments on commit 700085c

Please sign in to comment.