Skip to content

Commit

Permalink
HDFS-17657. The balancer service supports httpserver. (apache#7242) C…
Browse files Browse the repository at this point in the history
…ontribtued by Zhaobo Huang.

Reviewed-by: Tao Li <[email protected]>
Signed-off-by: Shilun Fan <[email protected]>
  • Loading branch information
huangzhaobo99 authored Jan 22, 2025
1 parent 0d72896 commit 0432761
Show file tree
Hide file tree
Showing 19 changed files with 702 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void start() throws IOException {
NfsConfigKeys.NFS_HTTPS_ADDRESS_DEFAULT);
InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString);

HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
HttpServer2.Builder builder = DFSUtil.getHttpServerTemplate(conf,
httpAddr, httpsAddr, "nfs3",
NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY,
NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected void serviceInit(Configuration configuration) throws Exception {
protected void serviceStart() throws Exception {
// Build and start server
String webApp = "router";
HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(
HttpServer2.Builder builder = DFSUtil.getHttpServerTemplate(
this.conf, this.httpAddress, this.httpsAddress, webApp,
RBFConfigKeys.DFS_ROUTER_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
RBFConfigKeys.DFS_ROUTER_KEYTAB_FILE_KEY);
Expand Down
4 changes: 4 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<copy file="${basedir}/src/main/webapps/proto-web.xml"
tofile="${project.build.directory}/webapps/journal/WEB-INF/web.xml"
filtering="true"/>
<copy file="${basedir}/src/main/webapps/proto-web.xml"
tofile="${project.build.directory}/webapps/balancer/WEB-INF/web.xml"
filtering="true"/>
<copy file="${basedir}/src/main/webapps/proto-web.xml"
tofile="${project.build.directory}/webapps/nfs3/WEB-INF/web.xml"
filtering="true"/>
Expand Down Expand Up @@ -428,6 +431,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<exclude>src/main/webapps/hdfs/robots.txt</exclude>
<exclude>src/main/webapps/journal/robots.txt</exclude>
<exclude>src/main/webapps/secondary/robots.txt</exclude>
<exclude>src/main/webapps/balancer/robots.txt</exclude>
<exclude>src/contrib/**</exclude>
<exclude>src/site/resources/images/*</exclude>
<exclude>src/main/webapps/static/bootstrap-3.4.1/**</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_BALANCER_ADDRESS_DEFAULT= "0.0.0.0:0";
public static final String DFS_BALANCER_KEYTAB_FILE_KEY = "dfs.balancer.keytab.file";
public static final String DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = "dfs.balancer.kerberos.principal";
public static final String DFS_BALANCER_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY =
"dfs.balancer.kerberos.internal.spnego.principal";
public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout";
public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
public static final String DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.balancer.max-no-move-interval";
Expand All @@ -821,6 +823,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_BALANCER_SERVICE_INTERVAL_DEFAULT = TimeUnit.MINUTES.toMillis(5); //5 mins
public static final String DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION = "dfs.balancer.service.retries.on.exception";
public static final int DFS_BALANCER_SERVICE_RETRIES_ON_EXCEPTION_DEFAULT = 5;
public static final String DFS_BALANCER_HTTPSERVER_ENABLED_KEY =
"dfs.balancer.httpserver.enabled";
public static final Boolean DFS_BALANCER_HTTPSERVER_ENABLED_DEFAULT = false;
public static final String DFS_BALANCER_HTTP_ADDRESS_KEY = "dfs.balancer.http-address";
public static final int DFS_BALANCER_HTTP_PORT_DEFAULT = 8590;
public static final String DFS_BALANCER_HTTP_BIND_HOST_KEY = "dfs.balancer.http-bind-host";
public static final String DFS_BALANCER_HTTP_ADDRESS_DEFAULT =
"0.0.0.0:" + DFS_BALANCER_HTTP_PORT_DEFAULT;
public static final String DFS_BALANCER_HTTPS_ADDRESS_KEY = "dfs.balancer.https-address";
public static final int DFS_BALANCER_HTTPS_PORT_DEFAULT = 8591;
public static final String DFS_BALANCER_HTTPS_BIND_HOST_KEY = "dfs.balancer.https-bind-host";
public static final String DFS_BALANCER_HTTPS_ADDRESS_DEFAULT =
"0.0.0.0:" + DFS_BALANCER_HTTPS_PORT_DEFAULT;

public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1736,11 +1736,11 @@ public static Configuration loadSslConfiguration(Configuration conf) {
}

/**
* Return a HttpServer.Builder that the journalnode / namenode / secondary
* Return a HttpServer.Builder that the journalnode / namenode / secondary / router / balancer
* namenode can use to initialize their HTTP / HTTPS server.
*
*/
public static HttpServer2.Builder httpServerTemplateForNNAndJN(
public static HttpServer2.Builder getHttpServerTemplate(
Configuration conf, final InetSocketAddress httpAddr,
final InetSocketAddress httpsAddr, String name, String spnegoUserNameKey,
String spnegoKeytabFileKey) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void start() throws IOException {
}
}

HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
HttpServer2.Builder builder = DFSUtil.getHttpServerTemplate(conf,
httpAddr, httpsAddr, "journal",
DFSConfigKeys.DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.management.ObjectName;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
Expand Down Expand Up @@ -66,6 +68,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
Expand All @@ -76,6 +79,7 @@
import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.VersionInfo;

/** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
* when some datanodes become full or when new empty nodes join the cluster.
Expand Down Expand Up @@ -180,7 +184,7 @@
*/

@InterfaceAudience.Private
public class Balancer {
public class Balancer implements BalancerMXBean {
static final Logger LOG = LoggerFactory.getLogger(Balancer.class);

static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
Expand Down Expand Up @@ -241,6 +245,7 @@ public class Balancer {
private final boolean sortTopNodes;
private final int limitOverUtilizedNum;
private final BalancerMetrics metrics;
private ObjectName balancerInfoBeanName;

// all data node lists
private final Collection<Source> overUtilized = new LinkedList<Source>();
Expand Down Expand Up @@ -377,6 +382,8 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
this.metrics = BalancerMetrics.create(this);

registerBalancerMXBean();
}

private static long getCapacity(DatanodeStorageReport report, StorageType t) {
Expand Down Expand Up @@ -680,6 +687,13 @@ private boolean matchStorageGroups(StorageGroup left, StorageGroup right,
left.getDatanodeInfo(), right.getDatanodeInfo());
}

/**
* Register BalancerMXBean.
*/
private void registerBalancerMXBean() {
balancerInfoBeanName = MBeans.register("Balancer", "BalancerInfo", this);
}

/* reset all fields in a balancer preparing for the next iteration */
void resetData(Configuration conf) {
this.overUtilized.clear();
Expand All @@ -689,12 +703,32 @@ void resetData(Configuration conf) {
this.policy.reset();
this.dispatcher.reset(conf);
DefaultMetricsSystem.removeSourceName(metrics.getName());
if (balancerInfoBeanName != null) {
MBeans.unregister(balancerInfoBeanName);
balancerInfoBeanName = null;
}
}

NameNodeConnector getNnc() {
return nnc;
}

@Override
public String getVersion() {
return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision();
}

@Override
public String getSoftwareVersion() {
return VersionInfo.getVersion();
}

@Override
public String getCompileInfo() {
return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from "
+ VersionInfo.getBranch();
}

static class Result {
private final ExitStatus exitStatus;
private final long bytesLeftToMove;
Expand Down Expand Up @@ -860,6 +894,7 @@ static private int doBalance(Collection<URI> namenodes,
+ " NameNode");

List<NameNodeConnector> connectors = Collections.emptyList();
BalancerHttpServer balancerHttpServer = startBalancerHttpServer(conf);
try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
Expand All @@ -872,6 +907,9 @@ static private int doBalance(Collection<URI> namenodes,
if (p.getBlockPools().size() == 0
|| p.getBlockPools().contains(nnc.getBlockpoolID())) {
final Balancer b = new Balancer(nnc, p, conf);
if (balancerHttpServer != null) {
balancerHttpServer.setBalancerAttribute(b);
}
final Result r = b.runOneIteration();
r.print(iteration, nnc, System.out);

Expand All @@ -898,6 +936,9 @@ static private int doBalance(Collection<URI> namenodes,
for(NameNodeConnector nnc : connectors) {
IOUtils.cleanupWithLogger(LOG, nnc);
}
if (balancerHttpServer != null) {
balancerHttpServer.stop();
}
}
return ExitStatus.SUCCESS.getExitCode();
}
Expand Down Expand Up @@ -969,6 +1010,18 @@ static void stop() {
serviceRunning = false;
}

private static BalancerHttpServer startBalancerHttpServer(Configuration conf) throws IOException {
boolean httpServerEnabled = conf.getBoolean(DFSConfigKeys.DFS_BALANCER_HTTPSERVER_ENABLED_KEY,
DFSConfigKeys.DFS_BALANCER_HTTPSERVER_ENABLED_DEFAULT);
if (httpServerEnabled) {
BalancerHttpServer balancerHttpServer = new BalancerHttpServer(conf);
balancerHttpServer.start();
return balancerHttpServer;
} else {
return null;
}
}

private static void checkKeytabAndInit(Configuration conf)
throws IOException {
if (conf.getBoolean(DFSConfigKeys.DFS_BALANCER_KEYTAB_ENABLED_KEY,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.net.NetUtils;

public class BalancerHttpServer {

private static final String BALANCER_ATTRIBUTE_KEY = "current.balancer";

private final Configuration conf;
private InetSocketAddress httpAddress;
private InetSocketAddress httpsAddress;
private HttpServer2 httpServer;

public BalancerHttpServer(Configuration conf) {
this.conf = conf;
}

public void start() throws IOException {
String webApp = "balancer";
// Get HTTP address
httpAddress = conf.getSocketAddr(DFSConfigKeys.DFS_BALANCER_HTTP_BIND_HOST_KEY,
DFSConfigKeys.DFS_BALANCER_HTTP_ADDRESS_KEY,
DFSConfigKeys.DFS_BALANCER_HTTP_ADDRESS_DEFAULT,
DFSConfigKeys.DFS_BALANCER_HTTP_PORT_DEFAULT);

// Get HTTPs address
httpsAddress = conf.getSocketAddr(DFSConfigKeys.DFS_BALANCER_HTTPS_BIND_HOST_KEY,
DFSConfigKeys.DFS_BALANCER_HTTPS_ADDRESS_KEY,
DFSConfigKeys.DFS_BALANCER_HTTPS_ADDRESS_DEFAULT,
DFSConfigKeys.DFS_BALANCER_HTTPS_PORT_DEFAULT);

HttpServer2.Builder builder =
DFSUtil.getHttpServerTemplate(conf, httpAddress, httpsAddress, webApp,
DFSConfigKeys.DFS_BALANCER_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
DFSConfigKeys.DFS_BALANCER_KEYTAB_FILE_KEY);

final boolean xFrameEnabled = conf.getBoolean(DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED,
DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED_DEFAULT);

final String xFrameOptionValue = conf.getTrimmed(DFSConfigKeys.DFS_XFRAME_OPTION_VALUE,
DFSConfigKeys.DFS_XFRAME_OPTION_VALUE_DEFAULT);

builder.configureXFrame(xFrameEnabled).setXFrameOption(xFrameOptionValue);

httpServer = builder.build();
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
httpServer.start();

HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
int connIdx = 0;
if (policy.isHttpEnabled()) {
httpAddress = httpServer.getConnectorAddress(connIdx++);
if (httpAddress != null) {
conf.set(DFSConfigKeys.DFS_BALANCER_HTTP_ADDRESS_KEY,
NetUtils.getHostPortString(httpAddress));
}
}
if (policy.isHttpsEnabled()) {
httpsAddress = httpServer.getConnectorAddress(connIdx);
if (httpsAddress != null) {
conf.set(DFSConfigKeys.DFS_BALANCER_HTTPS_ADDRESS_KEY,
NetUtils.getHostPortString(httpsAddress));
}
}
}

public void setBalancerAttribute(Balancer balancer) {
httpServer.setAttribute(BALANCER_ATTRIBUTE_KEY, balancer);
}

public void stop() throws IOException {
if (httpServer != null) {
try {
httpServer.stop();
} catch (Exception e) {
throw new IOException(e);
}
}
}

public InetSocketAddress getHttpAddress() {
return httpAddress;
}

public InetSocketAddress getHttpsAddress() {
return httpsAddress;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;

public interface BalancerMXBean {

/**
* Gets the version of Hadoop.
*
* @return the version of Hadoop
*/
String getVersion();

/**
* Get the version of software running on the Balancer.
*
* @return a string representing the version.
*/
String getSoftwareVersion();

/**
* Get the compilation information which contains date, user and branch.
*
* @return the compilation information, as a JSON string.
*/
String getCompileInfo();

}
Loading

0 comments on commit 0432761

Please sign in to comment.