Skip to content

Commit

Permalink
HBASE-20159 Support using separate ZK quorums for client
Browse files Browse the repository at this point in the history
  • Loading branch information
carp84 committed Mar 28, 2018
1 parent 3b6199a commit 061a31f
Show file tree
Hide file tree
Showing 15 changed files with 781 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,13 @@ private String getId() {
}

public ReadOnlyZKClient(Configuration conf) {
this.connectString = ZKConfig.getZKQuorumServersString(conf);
// We might use a different ZK for client access
String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
if (clientZkQuorumServers != null) {
this.connectString = clientZkQuorumServers;
} else {
this.connectString = ZKConfig.getZKQuorumServersString(conf);
}
this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
this.retryIntervalMs =
Expand Down
17 changes: 15 additions & 2 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,19 @@ public enum OperationStatusCode {
/** Name of ZooKeeper quorum configuration parameter. */
public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";

/** Name of ZooKeeper quorum configuration parameter for client to locate meta. */
public static final String CLIENT_ZOOKEEPER_QUORUM = "hbase.client.zookeeper.quorum";

/** Client port of ZooKeeper for client to locate meta */
public static final String CLIENT_ZOOKEEPER_CLIENT_PORT =
"hbase.client.zookeeper.property.clientPort";

/** Indicate whether the client ZK are observer nodes of the server ZK */
public static final String CLIENT_ZOOKEEPER_OBSERVER_MODE =
"hbase.client.zookeeper.observer.mode";
/** Assuming client zk not in observer mode and master need to synchronize information */
public static final boolean DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE = false;

/** Common prefix of ZooKeeper configuration properties */
public static final String ZK_CFG_PROPERTY_PREFIX =
"hbase.zookeeper.property.";
Expand All @@ -201,7 +214,7 @@ public enum OperationStatusCode {
ZK_CFG_PROPERTY_PREFIX + CLIENT_PORT_STR;

/** Default client port that the zookeeper listens on */
public static final int DEFAULT_ZOOKEPER_CLIENT_PORT = 2181;
public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;

/** Parameter name for the root dir in ZK for this cluster */
public static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent";
Expand All @@ -224,7 +237,7 @@ public enum OperationStatusCode {
ZK_CFG_PROPERTY_PREFIX + "tickTime";

/** Default limit on concurrent client-side zookeeper connections */
public static final int DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS = 300;
public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 300;

/** Configuration key for ZooKeeper session timeout */
public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -85,7 +86,7 @@ private static Properties makeZKPropsFromHbaseConfig(Configuration conf) {
// If clientPort is not set, assign the default.
if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
zkProperties.put(HConstants.CLIENT_PORT_STR,
HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
}

// Create the server.X properties.
Expand Down Expand Up @@ -119,7 +120,7 @@ private static Properties makeZKPropsFromHbaseConfig(Configuration conf) {
*/
private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
String defaultClientPort = Integer.toString(
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT));

// Build the ZK quorum server string with "server:clientport" list, separated by ','
final String[] serverHosts =
Expand Down Expand Up @@ -310,4 +311,23 @@ public String getZnodeParent() {
return znodeParent;
}
}

/**
* Get the client ZK Quorum servers string
* @param conf the configuration to read
* @return Client quorum servers, or null if not specified
*/
public static String getClientZKQuorumServersString(Configuration conf) {
String clientQuromServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
if (clientQuromServers == null) {
return null;
}
int defaultClientPort =
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
String clientZkClientPort =
Integer.toString(conf.getInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, defaultClientPort));
// Build the ZK quorum server string with "server:clientport" list, separated by ','
final String[] serverHosts = StringUtils.getStrings(clientQuromServers);
return buildZKQuorumServerString(serverHosts, clientZkClientPort);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
Expand Down Expand Up @@ -300,6 +302,10 @@ public void run() {
private DrainingServerTracker drainingServerTracker;
// Tracker for load balancer state
LoadBalancerTracker loadBalancerTracker;
// Tracker for meta location, if any client ZK quorum specified
MetaLocationSyncer metaLocationSyncer;
// Tracker for active master location, if any client ZK quorum specified
MasterAddressSyncer masterAddressSyncer;

// Tracker for split and merge state
private SplitOrMergeTracker splitOrMergeTracker;
Expand Down Expand Up @@ -556,18 +562,20 @@ protected String getUseThisHostnameInstead(Configuration conf) {
public void run() {
try {
if (!conf.getBoolean("hbase.testing.nocluster", false)) {
try {
int infoPort = putUpJettyServer();
startActiveMasterManager(infoPort);
} catch (Throwable t) {
// Make sure we log the exception.
String error = "Failed to become Active Master";
LOG.error(error, t);
// Abort should have been called already.
if (!isAborted()) {
abort(error, t);
Threads.setDaemonThreadRunning(new Thread(() -> {
try {
int infoPort = putUpJettyServer();
startActiveMasterManager(infoPort);
} catch (Throwable t) {
// Make sure we log the exception.
String error = "Failed to become Active Master";
LOG.error(error, t);
// Abort should have been called already.
if (!isAborted()) {
abort(error, t);
}
}
}
}));
}
// Fall in here even if we have been aborted. Need to run the shutdown services and
// the super run call will do this for us.
Expand Down Expand Up @@ -749,6 +757,23 @@ void initializeZKBasedSystemTrackers() throws IOException, InterruptedException,
this.maintenanceModeTracker = new MasterMaintenanceModeTracker(zooKeeper);
this.maintenanceModeTracker.start();

String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE);
if (clientQuorumServers != null && !clientZkObserverMode) {
// we need to take care of the ZK information synchronization
// if given client ZK are not observer nodes
ZKWatcher clientZkWatcher = new ZKWatcher(conf,
getProcessName() + ":" + rpcServices.getSocketAddress().getPort() + "-clientZK", this,
false, true);
this.metaLocationSyncer = new MetaLocationSyncer(zooKeeper, clientZkWatcher, this);
this.metaLocationSyncer.start();
this.masterAddressSyncer = new MasterAddressSyncer(zooKeeper, clientZkWatcher, this);
this.masterAddressSyncer.start();
// set cluster id is a one-go effort
ZKClusterId.setClusterId(clientZkWatcher, fileSystemManager.getClusterId());
}

// Set the cluster as up. If new RSs, they'll be waiting on this before
// going ahead with their startup.
boolean wasUp = this.clusterStatusTracker.isClusterUp();
Expand Down
Loading

0 comments on commit 061a31f

Please sign in to comment.