Skip to content

Commit

Permalink
Issue 1467 (#1504)
Browse files Browse the repository at this point in the history
* #1467 add sysparam show and corrective notice error

* #1467 rebuild the dataHost in reload even if the standby dataSource changes

* #1467 code sytle change

* #1499 add new param for myid conf

* #1467 fix dataSource display bug

* #1467 fix dataSource display bug
  • Loading branch information
sunsun314 authored and yanhuqing666 committed Nov 18, 2019
1 parent 6aee2ed commit 9e05647
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,17 @@ public int init(int index) {
}

public void init() {
for (Map.Entry<String, PhysicalDatasource> entry : allSourceMap.entrySet()) {
if (initSource(entry.getValue())) {
if (balance != 0) {
for (Map.Entry<String, PhysicalDatasource> entry : allSourceMap.entrySet()) {
if (initSource(entry.getValue())) {
initSuccess = true;
LOGGER.info(hostName + " " + entry.getKey() + " init success");
}
}
} else {
if (initSource(writeSource)) {
initSuccess = true;
LOGGER.info(hostName + " " + entry.getKey() + " init success");
LOGGER.info(hostName + " " + writeSource.getName() + " init success");
}
}
if (initSuccess) {
Expand Down Expand Up @@ -166,7 +173,7 @@ public Collection<PhysicalDatasource> getAllDataSources() {
@Override
public Map<Integer, PhysicalDatasource[]> getStandbyReadSourcesMap() {
if (this.getDataHostConfig().getBalance() == BALANCE_NONE) {
return getReadSources();
return getReadSourceAll();
} else {
return new HashMap<Integer, PhysicalDatasource[]>();
}
Expand Down Expand Up @@ -269,8 +276,8 @@ public int getActiveIndex() {
return 0;
}

@Override
public Map<Integer, PhysicalDatasource[]> getReadSources() {

public Map<Integer, PhysicalDatasource[]> getReadSourceAll() {
PhysicalDatasource[] list = new PhysicalDatasource[allSourceMap.size() - 1];
int i = 0;
for (PhysicalDatasource ds : allSourceMap.values()) {
Expand All @@ -285,6 +292,15 @@ public Map<Integer, PhysicalDatasource[]> getReadSources() {
return result;
}

@Override
public Map<Integer, PhysicalDatasource[]> getReadSources() {
if (this.getDataHostConfig().getBalance() == BALANCE_NONE) {
return new HashMap<Integer, PhysicalDatasource[]>();
} else {
return getReadSourceAll();
}
}


@Override
public void switchSourceIfNeed(PhysicalDatasource ds, String reason) {
Expand Down Expand Up @@ -414,6 +430,8 @@ public String disableHosts(String hostNames, boolean syncWriteConf) {

HaConfigManager.getInstance().updateConfDataHost(this, syncWriteConf);
return this.getClusterHaJson();
} catch (Exception e) {
throw e;
} finally {
lock.readLock().unlock();
adjustLock.writeLock().unlock();
Expand All @@ -434,6 +452,8 @@ public String enableHosts(String hostNames, boolean syncWriteConf) {

HaConfigManager.getInstance().updateConfDataHost(this, syncWriteConf);
return this.getClusterHaJson();
} catch (Exception e) {
throw e;
} finally {
lock.readLock().unlock();
adjustLock.writeLock().unlock();
Expand All @@ -453,6 +473,8 @@ public String switchMaster(String writeHost, boolean syncWriteConf) {
writeSource = newWriteHost;
HaConfigManager.getInstance().updateConfDataHost(this, syncWriteConf);
return this.getClusterHaJson();
} catch (Exception e) {
throw e;
} finally {
lock.readLock().unlock();
adjustLock.writeLock().unlock();
Expand Down Expand Up @@ -490,6 +512,8 @@ public void changeIntoLatestStatus(String jsonStatus) {
}
}
HaConfigManager.getInstance().updateConfDataHost(this, false);
} catch (Exception e) {
throw e;
} finally {
lock.readLock().unlock();
adjustLock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,22 @@ public void notifyProcess(KvBean configValue) throws Exception {
if (info.getLockType() == HaInfo.HaType.DATAHOST_DISABLE &&
!info.getStartId().equals(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID)) &&
info.getStatus() == HaInfo.HaStatus.SUCCESS) {
//start the log
int id = HaConfigManager.getInstance().haStart(HaInfo.HaStage.RESPONSE_NOTIFY, HaInfo.HaStartType.CLUSTER_NOTIFY, HaInfo.HaStage.RESPONSE_NOTIFY.toString());
//try to get the lastest status of the dataHost
KvBean lastestStatus = ClusterHelper.getKV(ClusterPathUtil.getHaStatusPath(info.getDhName()));
//find out the target dataHost and change it into latest status
PhysicalDNPoolSingleWH dataHost = (PhysicalDNPoolSingleWH) DbleServer.getInstance().getConfig().getDataHosts().get(info.getDhName());
dataHost.changeIntoLatestStatus(lastestStatus.getValue());
//response the event ,only disable event has response
ClusterHelper.setKV(ClusterPathUtil.getSelfResponsePath(configValue.getKey()), ClusterPathUtil.SUCCESS);
//ha manager writeOut finish log
HaConfigManager.getInstance().haFinish(id, null, lastestStatus.getValue());
try {
//start the log
int id = HaConfigManager.getInstance().haStart(HaInfo.HaStage.RESPONSE_NOTIFY, HaInfo.HaStartType.CLUSTER_NOTIFY, HaInfo.HaStage.RESPONSE_NOTIFY.toString());
//try to get the lastest status of the dataHost
KvBean lastestStatus = ClusterHelper.getKV(ClusterPathUtil.getHaStatusPath(info.getDhName()));
//find out the target dataHost and change it into latest status
PhysicalDNPoolSingleWH dataHost = (PhysicalDNPoolSingleWH) DbleServer.getInstance().getConfig().getDataHosts().get(info.getDhName());
dataHost.changeIntoLatestStatus(lastestStatus.getValue());
//response the event ,only disable event has response
ClusterHelper.setKV(ClusterPathUtil.getSelfResponsePath(configValue.getKey()), ClusterPathUtil.SUCCESS);
//ha manager writeOut finish log
HaConfigManager.getInstance().haFinish(id, null, lastestStatus.getValue());
} catch (Exception e) {
//response the event ,only disable event has response
ClusterHelper.setKV(ClusterPathUtil.getSelfResponsePath(configValue.getKey()), e.getMessage());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.actiontech.dble.config;

import com.actiontech.dble.DbleServer;
import com.actiontech.dble.alarm.AlarmCode;
import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
Expand Down Expand Up @@ -344,9 +345,10 @@ private Map<String, AbstractPhysicalDBPool> initDataHosts(SchemaLoader schemaLoa
Map<String, DataHostConfig> nodeConf = schemaLoader.getDataHosts();
//create PhysicalDBPool according to DataHost
Map<String, AbstractPhysicalDBPool> nodes = new HashMap<>(nodeConf.size());
boolean outerHa = DbleServer.getInstance().getConfig() == null ? system.isUseOuterHa() : DbleServer.getInstance().getConfig().getSystem().isUseOuterHa();
for (DataHostConfig conf : nodeConf.values()) {
AbstractPhysicalDBPool pool = null;
if (system.isUseOuterHa()) {
if (outerHa) {
pool = getPhysicalDBPoolSingleWH(conf);
} else {
pool = getPhysicalDBPool(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,42 @@ public Object baseParseXmlToBean(String fileName) throws JAXBException, XMLStrea
}


/**
* baseParseAndWriteToXml
*
* @param user
* @param inputPath
* @param name
* @Created 2016/9/15
*/
public void baseParseWriteToXml(Object user, String inputPath, String name) throws IOException {
OutputStream out = null;
try {
Marshaller marshaller = this.jaxContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);

if (null != name) {
marshaller.setProperty("com.sun.xml.internal.bind.xmlHeaders",
String.format("<!DOCTYPE " + Versions.ROOT_PREFIX + ":%1$s SYSTEM \"%1$s.dtd\">", name));
}

Path path = Paths.get(inputPath);

out = Files.newOutputStream(path, StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);

marshaller.marshal(user, out);

} catch (JAXBException | IOException e) {
LOGGER.error("ZookeeperProcessListen parseToXml error:Exception info:", e);
throw new IOException(e);
} finally {
if (out != null) {
out.close();
}
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,10 @@ public void parseToXmlWrite(Schemas data, String outputFile, String dataName) {
}
}


public void parseToXmlWriteWithException(Schemas data, String outputFile, String dataName) throws IOException {
this.parseBean.baseParseWriteToXml(data, outputFile, dataName);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th
}

private void updateStatus(ChildData childData) {
String nodeName = childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1);
String data = new String(childData.getData(), StandardCharsets.UTF_8);
int id = HaConfigManager.getInstance().haStart(HaInfo.HaStage.RESPONSE_NOTIFY, HaInfo.HaStartType.CLUSTER_NOTIFY, "");
PhysicalDNPoolSingleWH dataHost = (PhysicalDNPoolSingleWH) DbleServer.getInstance().getConfig().getDataHosts().get(nodeName);
dataHost.changeIntoLatestStatus(data);
HaConfigManager.getInstance().haFinish(id, null, data);
try {
String nodeName = childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1);
String data = new String(childData.getData(), StandardCharsets.UTF_8);
int id = HaConfigManager.getInstance().haStart(HaInfo.HaStage.RESPONSE_NOTIFY, HaInfo.HaStartType.CLUSTER_NOTIFY, "");
PhysicalDNPoolSingleWH dataHost = (PhysicalDNPoolSingleWH) DbleServer.getInstance().getConfig().getDataHosts().get(nodeName);
dataHost.changeIntoLatestStatus(data);
HaConfigManager.getInstance().haFinish(id, null, data);
} catch (Exception e) {
LOGGER.warn("get Error when update Ha status", e);
}
}

}
14 changes: 11 additions & 3 deletions src/main/java/com/actiontech/dble/config/util/SchemaWriteJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class SchemaWriteJob implements Runnable {
private final Set<PhysicalDNPoolSingleWH> changeSet;
private final Schemas schemas;
private volatile boolean finish = false;
private volatile String errorMessage = null;
private final ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private final int reloadIndex;
Expand All @@ -45,9 +46,11 @@ public void run() {
}
}
HaConfigManager.getInstance().write(schemas, reloadIndex);
this.signalAll();
} catch (Exception e) {
errorMessage = e.getMessage();
HaConfigManager.getInstance().log("get error from SchemaWriteJob", e);
} finally {
this.signalAll();
}
}

Expand Down Expand Up @@ -75,8 +78,8 @@ private void changeHostInfo(DataHost dh, PhysicalDNPoolSingleWH physicalDNPoolSi
}

List<ReadHost> newReadList = new ArrayList<ReadHost>();
if (physicalDNPoolSingleWH.getReadSources() != null) {
for (PhysicalDatasource rs : physicalDNPoolSingleWH.getReadSources().get(0)) {
if (physicalDNPoolSingleWH.getReadSourceAll() != null) {
for (PhysicalDatasource rs : physicalDNPoolSingleWH.getReadSourceAll().get(0)) {
ReadHost r = new ReadHost();
r.setDisabled("" + rs.isDisabled());
r.setHost(rs.getConfig().getHostName());
Expand Down Expand Up @@ -117,6 +120,11 @@ public void waitForWritingDone() {
} finally {
lock.unlock();
}

if (errorMessage != null) {
LOGGER.info("get result errorMessage = " + errorMessage);
throw new RuntimeException(errorMessage);
}
}

public void signalAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,15 @@ public static void execute(Matcher disable, ManagerConnection mc) {
return;
}
} else {
//dble start in single mode
String result = dh.disableHosts(subHostName, true);
HaConfigManager.getInstance().haFinish(id, null, result);
try {
//dble start in single mode
String result = dh.disableHosts(subHostName, true);
HaConfigManager.getInstance().haFinish(id, null, result);
} catch (Exception e) {
HaConfigManager.getInstance().haFinish(id, e.getMessage(), null);
mc.writeErrMessage(ErrorCode.ER_YES, "disable dataHost with error, use show @@dataSource to check latest status. Error:" + e.getMessage());
return;
}
}

OkPacket packet = new OkPacket();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,14 @@ public static void execute(Matcher enable, ManagerConnection mc) {
return;
}
} else {
String result = dh.enableHosts(subHostName, true);
HaConfigManager.getInstance().haFinish(id, null, result);
try {
String result = dh.enableHosts(subHostName, true);
HaConfigManager.getInstance().haFinish(id, null, result);
} catch (Exception e) {
HaConfigManager.getInstance().haFinish(id, e.getMessage(), null);
mc.writeErrMessage(ErrorCode.ER_YES, "enable dataHost with error, use show @@dataSource to check latest status. Error:" + e.getMessage());
return;
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,15 @@ public static void execute(Matcher switcher, ManagerConnection mc) {
return;
}
} else {
//dble start in single mode
String result = dh.switchMaster(masterName, true);
HaConfigManager.getInstance().haFinish(id, null, result);
try {
//dble start in single mode
String result = dh.switchMaster(masterName, true);
HaConfigManager.getInstance().haFinish(id, null, result);
} catch (Exception e) {
HaConfigManager.getInstance().haFinish(id, e.getMessage(), null);
mc.writeErrMessage(ErrorCode.ER_YES, "swtich dataHost with error, use show @@dataSource to check latest status. Error:" + e.getMessage());
return;
}
}

OkPacket packet = new OkPacket();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.actiontech.dble.config.loader.zkprocess.entity.schema.datahost.DataHost;
import com.actiontech.dble.config.loader.zkprocess.entity.schema.datahost.ReadHost;
import com.actiontech.dble.config.loader.zkprocess.entity.schema.datahost.WriteHost;
import com.actiontech.dble.config.loader.zkprocess.parse.ParseXmlServiceInf;
import com.actiontech.dble.config.loader.zkprocess.parse.XmlProcessBase;
import com.actiontech.dble.config.loader.zkprocess.parse.entryparse.schema.xml.SchemasParseXmlImpl;
import com.actiontech.dble.config.loader.zkprocess.zookeeper.process.DataSourceStatus;
Expand All @@ -20,6 +19,7 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -37,7 +37,7 @@ public final class HaConfigManager {
private static final String HA_LOG = "ha_log";
private static final Logger HA_LOGGER = LoggerFactory.getLogger(HA_LOG);
private static final HaConfigManager INSTANCE = new HaConfigManager();
private ParseXmlServiceInf<Schemas> parseSchemaXmlService;
private SchemasParseXmlImpl parseSchemaXmlService;
private static final String WRITEPATH = "schema.xml";
private Schemas schema;
private AtomicInteger indexCreater = new AtomicInteger();
Expand Down Expand Up @@ -70,7 +70,7 @@ public void init() {
return;
}

public void write(Schemas schemas, int reloadId) {
public void write(Schemas schemas, int reloadId) throws IOException {
HA_LOGGER.info("try to write schemas into local file " + reloadId);
final ReentrantReadWriteLock lock = DbleServer.getInstance().getConfig().getLock();
lock.readLock().lock();
Expand All @@ -79,7 +79,7 @@ public void write(Schemas schemas, int reloadId) {
String path = ResourceUtil.getResourcePathFromRoot(ZookeeperPath.ZK_LOCAL_WRITE_PATH.getKey());
path = new File(path).getPath() + File.separator;
path += WRITEPATH;
this.parseSchemaXmlService.parseToXmlWrite(schemas, path, "schema");
this.parseSchemaXmlService.parseToXmlWriteWithException(schemas, path, "schema");
} else {
HA_LOGGER.info("reloadId changes when try to write the local file,just skip " + reloadIndex.get());
}
Expand Down

0 comments on commit 9e05647

Please sign in to comment.