From c92bd619a310f6b1b13d4164250ee8c8cb93f1e6 Mon Sep 17 00:00:00 2001 From: aven Date: Wed, 31 Jan 2018 18:33:48 +0800 Subject: [PATCH] TRAFODION-2940 In HA env, one node lose network, when recover, trafci can't use --- dcs/bin/scripts/dcsunbind.sh | 42 +++++- .../java/org/trafodion/dcs/Constants.java | 6 + .../org/trafodion/dcs/master/DcsMaster.java | 142 +++++++++++++----- .../org/trafodion/dcs/master/FloatingIp.java | 37 +++++ .../trafodion/dcs/master/ServerManager.java | 7 +- .../dcs/master/listener/ListenerService.java | 11 ++ .../dcs/master/listener/ListenerWorker.java | 1 + .../trafodion/dcs/server/ServerManager.java | 26 ++-- .../org/trafodion/dcs/util/RetryCounter.java | 2 +- .../org/trafodion/dcs/zookeeper/ZkClient.java | 71 ++++++++- 10 files changed, 290 insertions(+), 55 deletions(-) diff --git a/dcs/bin/scripts/dcsunbind.sh b/dcs/bin/scripts/dcsunbind.sh index e58f4b31e9..78ec1e2f8c 100755 --- a/dcs/bin/scripts/dcsunbind.sh +++ b/dcs/bin/scripts/dcsunbind.sh @@ -51,6 +51,31 @@ function check_node { done } +function check_self_node { + for myinterface in `/sbin/ip link show|cut -d: -f1- | cut -c1- | awk -F': ' '/^[0-9]+:.*/ {print $2;}'`; do + ip_output=$(/sbin/ip addr show $myinterface | cut -d: -f1- | cut -c1-) + + myifport=`echo "$ip_output" | grep -w $gv_float_external_ip` + status=$? + if [ $status -eq 0 ]; then + tempinterface=`echo $gv_float_interface:$gv_port` + # check if another interface is bound to this virtual ip address + echo "$myifport" | grep "$tempinterface" > /dev/null + if [ $? -eq 0 ]; then + unbindip=`echo "$myifport" | awk '{print $2}'` + unbindlb=`echo "$myifport"|awk '{print $NF}'` + echo "Virtual ip $gv_float_external_ip is in use on node $HOSTNAME bound to interface $myinterface($unbindlb) - unbinding..." + sudo /sbin/ip addr del $unbindip dev $myinterface + status=$? + if [ $status -ne 0 ]; then + echo "Failed to unbind - status is $status" + exit -1 + fi + fi # endif node+name match + fi # endif looking for external ip + done +} + function Check_VirtualIP_InUse_And_Unbind { echo "check all nodes to see if external virtual ip address is in use and unbind if necessary" mynode="" @@ -64,12 +89,23 @@ function Check_VirtualIP_InUse_And_Unbind { fi } +function Check_VirtualIP_InUse_And_Unbind_Self { + check_self_node +} + #Main program if [[ $ENABLE_HA == "false" ]]; then exit 0 fi +unbindSelf=false +for i in "$@"; do + if [[ $i=="self" ]]; then + unbindSelf=true + fi +done + gv_float_internal_ip=`python $DCS_INSTALL_DIR/bin/scripts/parse_dcs_site.py|cut -d$'\n' -f2` gv_float_external_ip=`python $DCS_INSTALL_DIR/bin/scripts/parse_dcs_site.py|cut -d$'\n' -f2` gv_float_interface=`python $DCS_INSTALL_DIR/bin/scripts/parse_dcs_site.py|cut -d$'\n' -f1` @@ -94,6 +130,10 @@ if [[ $AWS_CLOUD == "true" ]]; then echo "Detached interface :" $NETWORKINTERFACE fi else - Check_VirtualIP_InUse_And_Unbind + if [ $unbindSelf ]; then + Check_VirtualIP_InUse_And_Unbind_Self + else + Check_VirtualIP_InUse_And_Unbind + fi fi exit 0 diff --git a/dcs/src/main/java/org/trafodion/dcs/Constants.java b/dcs/src/main/java/org/trafodion/dcs/Constants.java index 3f8843737e..bb4ad3bdb6 100644 --- a/dcs/src/main/java/org/trafodion/dcs/Constants.java +++ b/dcs/src/main/java/org/trafodion/dcs/Constants.java @@ -559,6 +559,12 @@ public final class Constants { /** Default value for DcsMaster floating IP command */ public static final String DEFAULT_DCS_MASTER_FLOATING_IP_COMMAND = "cd ${dcs.home.dir};bin/scripts/dcsbind.sh -i -a -p"; + /** DcsMaster floating IP command unbind*/ + public static final String DCS_MASTER_FLOATING_IP_COMMAND_UNBIND = "dcs.master.floating.ip.command.unbind"; + + /** Default value for DcsMaster floating IP command unbind*/ + public static final String DEFAULT_DCS_MASTER_FLOATING_IP_COMMAND_UNBIND = "cd ${dcs.home.dir};bin/scripts/dcsunbind.sh self"; + /** DcsMaster Floating IP external interface */ public static final String DCS_MASTER_FLOATING_IP_EXTERNAL_INTERFACE = "dcs.master.floating.ip.external.interface"; diff --git a/dcs/src/main/java/org/trafodion/dcs/master/DcsMaster.java b/dcs/src/main/java/org/trafodion/dcs/master/DcsMaster.java index 719c3d30d2..c6235545fb 100644 --- a/dcs/src/main/java/org/trafodion/dcs/master/DcsMaster.java +++ b/dcs/src/main/java/org/trafodion/dcs/master/DcsMaster.java @@ -22,22 +22,16 @@ Licensed to the Apache Software Foundation (ASF) under one */ package org.trafodion.dcs.master; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.NetworkInterface; -import java.nio.charset.Charset; -import java.util.Enumeration; -import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; -import org.apache.commons.io.IOUtils; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; @@ -46,23 +40,22 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper.States; - -import org.apache.hadoop.util.StringUtils; - import org.trafodion.dcs.Constants; +import org.trafodion.dcs.master.listener.ListenerService; +import org.trafodion.dcs.master.listener.ListenerWorker; import org.trafodion.dcs.util.DcsConfiguration; import org.trafodion.dcs.util.DcsNetworkConfiguration; import org.trafodion.dcs.util.InfoServer; +import org.trafodion.dcs.util.RetryCounter; +import org.trafodion.dcs.util.RetryCounterFactory; import org.trafodion.dcs.util.VersionInfo; -import org.trafodion.dcs.zookeeper.ZkClient; import org.trafodion.dcs.zookeeper.ZKConfig; -import org.trafodion.dcs.master.listener.ListenerService; +import org.trafodion.dcs.zookeeper.ZkClient; -public class DcsMaster implements Runnable { +public class DcsMaster implements Callable { private static final Log LOG = LogFactory.getLog(DcsMaster.class); private Thread thrd; private ZkClient zkc = null; @@ -111,11 +104,59 @@ public DcsMaster(String[] args) { trafodionHome = System.getProperty(Constants.DCS_TRAFODION_HOME); jvmShutdownHook = new JVMShutdownHook(); Runtime.getRuntime().addShutdownHook(jvmShutdownHook); - thrd = new Thread(this); - thrd.start(); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + CompletionService completionService = new ExecutorCompletionService(executorService); + + while (true) { + completionService.submit(this); + Future f = null; + try { + f = completionService.take(); + if (f != null) { + Integer status = f.get(); + if (status <= 0) { + System.exit(status); + } else { + // 35000 * 15mins ~= 1 years + RetryCounter retryCounter = RetryCounterFactory.create(35000, 15, TimeUnit.MINUTES); + while (true) { + try { + ZkClient tmpZkc = new ZkClient(); + tmpZkc.connect(); + tmpZkc.close(); + tmpZkc = null; + LOG.info("Connected to ZooKeeper successful, restart DCS Master."); + // reset lock + isLeader = new CountDownLatch(1); + break; + } catch (Exception e) { + LOG.error("Reconnect to ZooKeeper failed, use retry..."); + if (retryCounter.shouldRetry()) { + retryCounter.sleepUntilNextRetry(); + retryCounter.useRetry(); + } else { + System.exit(-2); + } + } + } + } + } + } catch (InterruptedException | ExecutionException e) { + LOG.error(e.getMessage(), e); + } + } + } - public void run() { + // return value lesser than 0, means can't recover exception exit. + // -1 configure error + // -2 retry exhaust + // return value greater than 0 , means exception can be recover. + // 1 means network error, retry till network recover. + // return value equals 0, means unknow exception, do exit now. + // change value other than 0 when confirm the exception real reason. + public Integer call() { VersionInfo.logVersion(); Options opt = new Options(); @@ -129,10 +170,10 @@ public void run() { instance = "1"; } catch (NullPointerException e) { LOG.error("No args found: ", e); - System.exit(1); + return -1; } catch (ParseException e) { LOG.error("Could not parse: ", e); - System.exit(1); + return -1; } try { @@ -140,8 +181,8 @@ public void run() { zkc.connect(); LOG.info("Connected to ZooKeeper"); } catch (Exception e) { - LOG.error(e); - System.exit(1); + LOG.error(e.getMessage(), e); + return 1; } try { @@ -203,8 +244,8 @@ public void run() { } catch (KeeperException.NodeExistsException e) { // do nothing...some other server has created znodes } catch (Exception e) { - LOG.error(e); - System.exit(0); + LOG.error(e.getMessage(), e); + return 0; } metrics = new Metrics(); @@ -213,10 +254,10 @@ public void run() { try { netConf = new DcsNetworkConfiguration(conf); serverName = netConf.getHostName(); - if (serverName == null) { + if (serverName == null) { LOG.error("DNS Interface [" + conf.get(Constants.DCS_DNS_INTERFACE, Constants.DEFAULT_DCS_DNS_INTERFACE) - + "] configured in dcs.site.xml is not found!"); - System.exit(1); + + "] configured in dcs.site.xml is not found!"); + return -1; } // Wait to become the leader of all DcsMasters @@ -229,6 +270,11 @@ public void run() { + ":" + startTime; zkc.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + // Add a check path here for session expired situation, + // if there meets session expired, use the mark to compare with the exist znode, + // if not match, that means a backup master take over the master role. + zkc.setCheckPath(path); + LOG.info("Created znode [" + path + "]"); int requestTimeout = conf.getInt( @@ -262,12 +308,38 @@ public void run() { future.get();// block } catch (Exception e) { - LOG.error(e); - e.printStackTrace(); + LOG.error(e.getMessage(), e); + try { + FloatingIp floatingIp = FloatingIp.getInstance(this); + floatingIp.unbindScript(); + } catch (Exception e1) { + if (LOG.isErrorEnabled()) { + LOG.error("Error creating class FloatingIp [" + e.getMessage() + "]", e1); + } + } if (pool != null) pool.shutdown(); - System.exit(0); + if (ls != null) { + ListenerWorker lw = ls.getWorker(); + if (lw != null) { + lw.interrupt(); + LOG.info("Interrupt listenerWorker."); + } + ls.interrupt(); + LOG.info("Interrupt listenerService."); + } + if (infoServer != null) { + try { + infoServer.stop(); + LOG.info("Stop infoServer."); + } catch (Exception e1) { + LOG.error(e1.getMessage(), e1); + } + } + return 1; + } + return 0; } public String getServerName() { diff --git a/dcs/src/main/java/org/trafodion/dcs/master/FloatingIp.java b/dcs/src/main/java/org/trafodion/dcs/master/FloatingIp.java index b969b7a4e8..2bc5de3e84 100644 --- a/dcs/src/main/java/org/trafodion/dcs/master/FloatingIp.java +++ b/dcs/src/main/java/org/trafodion/dcs/master/FloatingIp.java @@ -61,6 +61,43 @@ public boolean isEnabled() { return isEnabled; } + public synchronized int unbindScript() throws Exception { + if (isEnabled) + LOG.info("Floating IP is enabled"); + else { + LOG.info("Floating IP is disabled"); + return 0; + } + + ScriptContext scriptContext = new ScriptContext(); + scriptContext.setScriptName(Constants.SYS_SHELL_SCRIPT_NAME); + scriptContext.setStripStdOut(false); + scriptContext.setStripStdErr(false); + + String command = master.getConfiguration().get(Constants.DCS_MASTER_FLOATING_IP_COMMAND_UNBIND, + Constants.DEFAULT_DCS_MASTER_FLOATING_IP_COMMAND_UNBIND); + + scriptContext.setCommand(command); + LOG.info("Unbind Floating IP [" + scriptContext.getCommand() + "]"); + ScriptManager.getInstance().runScript(scriptContext);// Blocking call + + StringBuilder sb = new StringBuilder(); + sb.append("exit code [" + scriptContext.getExitCode() + "]"); + if (!scriptContext.getStdOut().toString().isEmpty()) + sb.append(", stdout [" + scriptContext.getStdOut().toString() + "]"); + if (!scriptContext.getStdErr().toString().isEmpty()) + sb.append(", stderr [" + scriptContext.getStdErr().toString() + "]"); + if (LOG.isErrorEnabled()) + LOG.info(sb.toString()); + + if (scriptContext.getExitCode() == 0) + LOG.info("Unbind Floating IP successful"); + else + LOG.error("Unbind Floating IP failed"); + + return scriptContext.getExitCode(); + } + public synchronized int runScript() throws Exception { if (isEnabled) LOG.info("Floating IP is enabled"); diff --git a/dcs/src/main/java/org/trafodion/dcs/master/ServerManager.java b/dcs/src/main/java/org/trafodion/dcs/master/ServerManager.java index 5ab1a43d09..45f2152a60 100644 --- a/dcs/src/main/java/org/trafodion/dcs/master/ServerManager.java +++ b/dcs/src/main/java/org/trafodion/dcs/master/ServerManager.java @@ -328,6 +328,10 @@ public Boolean call() throws Exception { } } + if (!zkc.isSessionRecoverSuccessful()) { + throw new Exception("error while recover zkclient session. lose zookeeper connection. restart DCS Master."); + } + try { Thread.sleep(timeoutMillis); } catch (InterruptedException e) { @@ -335,9 +339,8 @@ public Boolean call() throws Exception { } } catch (Exception e) { - e.printStackTrace(); if (LOG.isErrorEnabled()) - LOG.error(e); + LOG.error(e.getMessage(), e); pool.shutdown(); throw e; } diff --git a/dcs/src/main/java/org/trafodion/dcs/master/listener/ListenerService.java b/dcs/src/main/java/org/trafodion/dcs/master/listener/ListenerService.java index fd6ff8469b..36871e6c01 100644 --- a/dcs/src/main/java/org/trafodion/dcs/master/listener/ListenerService.java +++ b/dcs/src/main/java/org/trafodion/dcs/master/listener/ListenerService.java @@ -162,6 +162,7 @@ public void run () { server.configureBlocking(false); // InetSocketAddress isa = new InetSocketAddress(ia, port); InetSocketAddress isa = new InetSocketAddress(port); //use any ip address for this port + server.socket().setReuseAddress(true); server.socket().bind(isa); SelectionKey serverkey = server.register(selector, SelectionKey.OP_ACCEPT ); int keysAdded = 0; @@ -263,14 +264,20 @@ else if ((key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { } } } + if (this.isInterrupted()) { + throw new InterruptedException(); + } //gc(); } + } catch (InterruptedException e) { } catch (IOException e) { LOG.error(e); System.exit(1); } finally { + LOG.info("close ServerSocketChannel..."); if (server != null) { try { + server.socket().close(); server.close(); } catch (IOException e) { e.printStackTrace(); @@ -434,4 +441,8 @@ private void processWrite(SelectionKey key) { public static void main(String [] args) { ListenerService as = new ListenerService(args); } + + public ListenerWorker getWorker() { + return worker; + } } diff --git a/dcs/src/main/java/org/trafodion/dcs/master/listener/ListenerWorker.java b/dcs/src/main/java/org/trafodion/dcs/master/listener/ListenerWorker.java index 95fba59c38..3ecb7effeb 100644 --- a/dcs/src/main/java/org/trafodion/dcs/master/listener/ListenerWorker.java +++ b/dcs/src/main/java/org/trafodion/dcs/master/listener/ListenerWorker.java @@ -93,6 +93,7 @@ public void run() { try { queue.wait(); } catch (InterruptedException e) { + return; } } dataEvent = queue.remove(0); diff --git a/dcs/src/main/java/org/trafodion/dcs/server/ServerManager.java b/dcs/src/main/java/org/trafodion/dcs/server/ServerManager.java index 89186d5108..dbe5a32a39 100644 --- a/dcs/src/main/java/org/trafodion/dcs/server/ServerManager.java +++ b/dcs/src/main/java/org/trafodion/dcs/server/ServerManager.java @@ -377,6 +377,15 @@ private boolean isTrafodionRunning(String nid) { // script is // running int exitCode = scriptContext.getExitCode(); + if (LOG.isDebugEnabled()) { + StringBuffer sb = new StringBuffer(); + sb.append("exit code [" + exitCode + "]"); + if (!scriptContext.getStdOut().toString().isEmpty()) + sb.append(", stdout [" + scriptContext.getStdOut().toString() + "]"); + if (!scriptContext.getStdErr().toString().isEmpty()) + sb.append(", stderr [" + scriptContext.getStdErr().toString() + "]"); + LOG.debug(sb.toString()); + } return (exitCode == 0 || exitCode == 1) ? true : false; } @@ -433,14 +442,12 @@ public Boolean call() throws Exception { Integer result = f.get(); LOG.info("Server handler [" + instance + ":" + result + "] exit"); - retryCounter = RetryCounterFactory.create(maxRestartAttempts, retryIntervalMillis); - while (!isTrafodionRunning(nid)) { - if (!retryCounter.shouldRetry()) { - throw new IOException("Node " + nid + " is not Up"); - } else { - retryCounter.sleepUntilNextRetry(); - retryCounter.useRetry(); - } + // Here dcs server has already checked trafodion is up when it is starting, + // so there is no need to sleep 6(default) times while restart mxosrvr, + // if sleep too many times,the result will be dcs server doesn't down + // but dcs master down when network down occurs on one node. + if (!isTrafodionRunning(nid)) { + throw new IOException("Node " + nid + " is not Up"); } int childInstance = result.intValue(); // get the node id @@ -454,8 +461,7 @@ public Boolean call() throws Exception { } } } catch (Exception e) { - e.printStackTrace(); - LOG.error(e); + LOG.error(e.getMessage(), e); if (executorService != null) executorService.shutdown(); throw e; diff --git a/dcs/src/main/java/org/trafodion/dcs/util/RetryCounter.java b/dcs/src/main/java/org/trafodion/dcs/util/RetryCounter.java index 1300c7228d..fef3c902dc 100644 --- a/dcs/src/main/java/org/trafodion/dcs/util/RetryCounter.java +++ b/dcs/src/main/java/org/trafodion/dcs/util/RetryCounter.java @@ -54,7 +54,7 @@ public int getMaxRetries() { public void sleepUntilNextRetry() throws InterruptedException { int attempts = getAttemptTimes(); long sleepTime = (long) (retryInterval * Math.log(attempts + 15)); - LOG.info("Sleeping " + sleepTime + "ms before retry #" + attempts + "..."); + LOG.info("Sleeping " + sleepTime + " " + timeUnit.name() + " before retry #" + attempts + "..."); timeUnit.sleep(sleepTime); } diff --git a/dcs/src/main/java/org/trafodion/dcs/zookeeper/ZkClient.java b/dcs/src/main/java/org/trafodion/dcs/zookeeper/ZkClient.java index 1d69d95105..96dc5381fa 100644 --- a/dcs/src/main/java/org/trafodion/dcs/zookeeper/ZkClient.java +++ b/dcs/src/main/java/org/trafodion/dcs/zookeeper/ZkClient.java @@ -29,6 +29,7 @@ Licensed to the Apache Software Foundation (ASF) under one import java.util.LinkedList; import java.util.Iterator; import java.nio.charset.Charset; + import org.apache.hadoop.conf.Configuration; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.ZooKeeper; @@ -69,7 +70,9 @@ public class ZkClient implements Watcher { private final String identifier = null; private final byte[] id = null; private String parentZnode; - + private String checkPath; + private boolean sessionRecoverSuccessful = true; + // The metadata attached to each piece of data has the // format: // 1-byte constant @@ -128,8 +131,17 @@ public ZkClient(String zkhost, int zkport) { init(); } - public void connect() throws IOException, InterruptedException { - if(zk==null) { + public void connect() throws IOException, InterruptedException { + connect(false); + } + + public void connect(boolean force) throws IOException, InterruptedException { + if (force) { + LOG.debug("Force reconnect to Zookeeper."); + connectedSignal = new CountDownLatch(1); + } + + if(zk==null || force) { this.zk = new ZooKeeper(zkServers, sessionTimeout, this); //wait 3 seconds to connect @@ -151,8 +163,28 @@ public void connect() throws IOException, InterruptedException { this.zk=null; throw new IOException("Cannot connect to Zookeeper"); } - - LOG.debug("Zookeeper.State=" + this.zk.getState()); + + // Solve the forcible reconnection + // When zk reconn, the backup-master may take over the master, + // so current master should restart, and queues in /dcs/master/leader + if (LOG.isDebugEnabled()) { + LOG.debug("force = [" + force + "]. checkPath = [" + checkPath + "]"); + } + if (force && checkPath != null) { + try { + Stat stat = zk.exists(checkPath, false); + if (LOG.isDebugEnabled()) { + LOG.debug("stat = [" + stat + "]."); + } + if (stat == null) { + // this means master has change. + setSessionRecoverSuccessful(false); + } + } catch (KeeperException e) { + e.printStackTrace(); + } + } + connectedSignal.await(); } } @@ -176,7 +208,19 @@ public ZooKeeper getZk() { public void process(WatchedEvent event) { if(event.getState() == Watcher.Event.KeeperState.SyncConnected) { connectedSignal.countDown(); - } + } else if (event.getState() == Watcher.Event.KeeperState.Expired) { + LOG.info("session expired. now rebuilding"); + // session expired, may be never happending. but if it happen there + // need to close old client and rebuild new client + try { + connect(true); + } catch (IOException e) { + setSessionRecoverSuccessful(false); + LOG.error("session expired and throw IOException while do reconnect: " + e.getMessage()); + } catch (InterruptedException e) { + LOG.error("session expired and throw InterruptedException while do reconnect: " + e.getMessage()); + } + } } public void create(String path, String value, boolean ephemeral) @@ -699,6 +743,21 @@ private static List filterByPrefix(List nodes, return lockChildren; } + public String getCheckPath() { + return checkPath; + } + + public void setCheckPath(String checkPath) { + this.checkPath = checkPath; + } + + public boolean isSessionRecoverSuccessful() { + return sessionRecoverSuccessful; + } + + public void setSessionRecoverSuccessful(boolean sessionRecoverSuccessful) { + this.sessionRecoverSuccessful = sessionRecoverSuccessful; + } public static void main(String [] args) throws Exception { ZkClient zkc = new ZkClient(); }