Skip to content

Commit

Permalink
HDFS-17721. RBF: Allow routers to declare IP for admin addr (apache#7342
Browse files Browse the repository at this point in the history
) Contributed by Felix Nguyen.

Reviewed-by: Haiyang Hu <[email protected]>
Signed-off-by: Shilun Fan <[email protected]>
  • Loading branch information
kokon191 authored Feb 3, 2025
1 parent 44a5cba commit 741bdd6
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class MountTableRefresherService extends AbstractService {

/**
* All router admin clients cached. So no need to create the client again and
* again. Router admin address(host:port) is used as key to cache RouterClient
* again. Router admin address(host:port or ip:port) is used as key to cache RouterClient
* objects.
*/
private LoadingCache<String, RouterClient> routerClientsCache;
Expand All @@ -102,8 +102,13 @@ protected void serviceInit(Configuration conf) throws Exception {
this.mountTableStore = getMountTableStore();
// Attach this service to mount table store.
this.mountTableStore.setRefreshService(this);
this.localAdminAddress =
StateStoreUtils.getHostPortString(router.getAdminServerAddress());
if (conf.getBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE,
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT)) {
this.localAdminAddress = StateStoreUtils.getIpPortString(router.getAdminServerAddress());
} else {
this.localAdminAddress = StateStoreUtils.getHostPortString(router.getAdminServerAddress());
}
LOG.info("Initialized MountTableRefresherService with addr: {}", this.localAdminAddress);
this.cacheUpdateTimeout = conf.getTimeDuration(
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT,
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT,
Expand Down Expand Up @@ -220,7 +225,7 @@ public void refresh() throws StateStoreUnavailableException {
List<MountTableRefresherThread> refreshThreads = new ArrayList<>();
for (RouterState routerState : cachedRecords) {
String adminAddress = routerState.getAdminAddress();
if (adminAddress == null || adminAddress.length() == 0) {
if (adminAddress == null || adminAddress.isEmpty()) {
// this router has not enabled router admin.
continue;
}
Expand All @@ -237,11 +242,13 @@ public void refresh() throws StateStoreUnavailableException {
* RouterClient
*/
refreshThreads.add(getLocalRefresher(adminAddress));
LOG.debug("Added local refresher for {}", adminAddress);
} else {
try {
RouterClient client = routerClientsCache.get(adminAddress);
refreshThreads.add(new MountTableRefresherThread(
client.getMountTableManager(), adminAddress));
LOG.debug("Added remote refresher for {}", adminAddress);
} catch (ExecutionException execExcep) {
// Can not connect, seems router is stopped now.
LOG.warn(ROUTER_CONNECT_ERROR_MSG, adminAddress, execExcep);
Expand Down Expand Up @@ -296,6 +303,7 @@ private void logResult(List<MountTableRefresherThread> refreshThreads) {
if (mountTableRefreshThread.isSuccess()) {
successCount++;
} else {
LOG.debug("Failed to refresh {}", mountTableRefreshThread.getAdminAddress());
failureCount++;
// remove RouterClient from cache so that new client is created
removeFromCache(mountTableRefreshThread.getAdminAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_ROUTER_PREFIX + "safemode.checkperiod";
public static final long DFS_ROUTER_SAFEMODE_CHECKPERIOD_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(5);
public static final String DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE =
FEDERATION_ROUTER_PREFIX + "heartbeat.with.ip.enable";
public static final boolean DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT = false;

// HDFS Router-based federation mount table entries
/** Maximum number of cache entries to have. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ synchronized void updateStateStore() {
getStateStoreVersion(MountTableStore.class));
record.setStateStoreVersion(stateStoreVersion);
// if admin server not started then hostPort will be empty
String hostPort =
StateStoreUtils.getHostPortString(router.getAdminServerAddress());
record.setAdminAddress(hostPort);
if (router.getConfig().getBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE,
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT)) {
record.setAdminAddress(StateStoreUtils.getIpPortString(router.getAdminServerAddress()));
} else {
record.setAdminAddress(StateStoreUtils.getHostPortString(router.getAdminServerAddress()));
}
RouterHeartbeatRequest request =
RouterHeartbeatRequest.newInstance(record);
RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -136,4 +137,19 @@ public static String getHostPortString(InetSocketAddress address) {
return hostName + ":" + address.getPort();
}

/**
* Returns address in form of ip:port, empty string if address is null.
*
* @param address address
* @return host:port
*/
public static String getIpPortString(InetSocketAddress address) {
if (null == address) {
return "";
}
address = NetUtils.getConnectAddress(address);
InetAddress inet = address.getAddress();
return inet.getHostAddress() + ":" + address.getPort();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -974,4 +974,12 @@
</description>
</property>

<property>
<name>dfs.federation.router.heartbeat.with.ip.enable</name>
<description>
Make router use IP instead of host when communicating with router state state store.
</description>
<value>false</value>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -53,23 +55,32 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

/**
* This test class verifies that mount table cache is updated on all the routers
* when MountTableRefreshService is enabled and there is a change in mount table
* entries.
*/
@RunWith(Parameterized.class)
public class TestRouterMountTableCacheRefresh {
private static TestingServer curatorTestingServer;
private static MiniRouterDFSCluster cluster;
private static RouterContext routerContext;
private static MountTableManager mountTableManager;

@BeforeClass
public static void setUp() throws Exception {
@Parameterized.Parameters
public static Collection<Object> data() {
return Arrays.asList(new Object[] {true, false});
}

public TestRouterMountTableCacheRefresh(boolean useIpForHeartbeats) throws Exception {
// Initialize only once per parameter
if (curatorTestingServer != null) {
return;
}
curatorTestingServer = new TestingServer();
curatorTestingServer.start();
final String connectString = curatorTestingServer.getConnectString();
Expand All @@ -82,6 +93,7 @@ public static void setUp() throws Exception {
FileSubclusterResolver.class);
conf.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectString);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE, useIpForHeartbeats);
cluster.addRouterOverrides(conf);
cluster.startCluster();
cluster.startRouters();
Expand All @@ -95,11 +107,15 @@ public static void setUp() throws Exception {
numNameservices, 60000);
}

@AfterClass
public static void destory() {
@Parameterized.AfterParam
public static void destroy() {
try {
curatorTestingServer.close();
cluster.shutdown();
if (curatorTestingServer != null) {
curatorTestingServer.close();
}
if (cluster != null) {
cluster.shutdown();
}
} catch (IOException e) {
// do nothing
}
Expand Down

0 comments on commit 741bdd6

Please sign in to comment.