Skip to content

Commit

Permalink
部署master时同步订阅配置
Browse files Browse the repository at this point in the history
  • Loading branch information
zhehongyuan committed Jan 14, 2019
1 parent 802bcb0 commit bf5220f
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ public void produce() {
Assert.assertTrue(sendResult.isSuccess());
}

@Test
public void produceMulti() throws Exception {
for(int i = 0; i < 10000; ++i) {
Map<String, Object> map = new HashMap<String, Object>();
map.put("a", "a"+i);
map.put("c", "d"+i);
map.put("o", "c"+i);
String str = JSON.toJSONString(map);
Result<SendResult> sendResult = producer.publish(str, "data"+i);
System.out.println(sendResult);
Assert.assertTrue(sendResult.isSuccess());
Thread.sleep(1000);
}
}

@After
public void clean() {
producer.shutdown();
Expand Down
111 changes: 91 additions & 20 deletions mq-cloud/src/main/java/com/sohu/tv/mq/cloud/service/MQDeployer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.tools.admin.MQAdminExt;
Expand Down Expand Up @@ -278,30 +279,57 @@ public SSHResult call(SSHSession session) {
if(configResult.isNotOK()) {
return configResult;
}
// slave直接返回
if(brokerParam.isSlave()) {
return Result.getOKResult();
}
// 获取master地址
Result<String> masterAddressResult = fetchMasterAddress(cluster);
if(!masterAddressResult.isOK()) {
return masterAddressResult;
}
String masterAddress = masterAddressResult.getResult();
// 抓取topic配置
Result<String> result = fetchTopicConfig(brokerParam);
Result<String> result = fetchTopicConfig(cluster, masterAddress);
if(Status.DB_ERROR.getKey() == result.getStatus()) {
return result;
}
if(Status.NO_RESULT.getKey() == result.getStatus()) {
return Result.getOKResult();

// 保存topic配置
Result<?> topicSSHResult = saveConfig(brokerParam.getIp(), result.getResult(), absoluteDir, "topics.json");
if(!topicSSHResult.isOK()) {
return topicSSHResult;
}

// 抓取consumer配置
Result<String> consumerResult = fetchConsumerConfig(cluster, masterAddress);
if(Status.DB_ERROR.getKey() == consumerResult.getStatus()) {
return consumerResult;
}
SSHResult topicSSHResult = null;

// 保存consumer配置
Result<?> consumerSSHResult = saveConfig(brokerParam.getIp(), consumerResult.getResult(), absoluteDir,
"subscriptionGroup.json");
return consumerSSHResult;
}

private Result<?> saveConfig(String ip, String content, String absoluteDir, String fileName) {
SSHResult sshResult = null;
try {
// save config to /tmp
MixAll.string2File(result.getResult(), "/tmp/topics.json");
MixAll.string2File(content, "/tmp/" + fileName);

topicSSHResult = sshTemplate.execute(brokerParam.getIp(), new SSHCallback() {
sshResult = sshTemplate.execute(ip, new SSHCallback() {
public SSHResult call(SSHSession session) {
SSHResult sshResult = session.scpToDir("/tmp/topics.json", absoluteDir+"/data/config/");
SSHResult sshResult = session.scpToDir("/tmp/" + fileName, absoluteDir+"/data/config/");
return sshResult;
}
});
} catch (Exception e) {
logger.error("configBroker topic, ip:{},comm:{}", brokerParam.getIp(), result.getResult(), e);
logger.error("configBroker {}, ip:{}, content:{}", fileName, ip, content, e);
return Result.getWebErrorResult(e);
}
return wrapSSHResult(topicSSHResult);
return wrapSSHResult(sshResult);
}

/**
Expand All @@ -328,15 +356,11 @@ public SSHResult call(SSHSession session) {
}

/**
* 获取topic的配置
* 获取master地址
* @param brokerRole
* @return
*/
private Result<String> fetchTopicConfig(BrokerParam brokerParam){
if (brokerParam.isSlave()) {
return Result.getResult(Status.NO_RESULT);
}
Cluster cluster = clusterService.getMQClusterById(brokerParam.getMqClusterId());
private Result<String> fetchMasterAddress(Cluster cluster){
// 获取topic配置
return mqAdminTemplate.execute(new MQAdminCallback<Result<String>>() {
public Result<String> callback(MQAdminExt mqAdmin) throws Exception {
Expand All @@ -355,19 +379,66 @@ public Result<String> callback(MQAdminExt mqAdmin) throws Exception {
return Result.getResult(Status.NO_RESULT);
}
String masterAddr = brokerAddrs.get(MixAll.MASTER_ID);
if(masterAddr == null) {
return Result.getResult(Status.NO_RESULT);
}
return Result.getResult(masterAddr);
}

public Result<String> exception(Exception e) throws Exception {
logger.error("fetchMasterAddress:{} error", cluster, e);
return Result.getDBErrorResult(e);
}
public Cluster mqCluster() {
return cluster;
}
});
}

/**
* 获取topic的配置
* @param brokerRole
* @return
*/
private Result<String> fetchTopicConfig(Cluster cluster, String masterAddress){
// 获取topic配置
return mqAdminTemplate.execute(new MQAdminCallback<Result<String>>() {
public Result<String> callback(MQAdminExt mqAdmin) throws Exception {
// 获取topic配置
TopicConfigSerializeWrapper topicWrapper = mqAdmin.getAllTopicGroup(masterAddr, 10 * 1000);
TopicConfigSerializeWrapper topicWrapper = mqAdmin.getAllTopicGroup(masterAddress, 10 * 1000);
if(topicWrapper == null) {
return Result.getResult(Status.NO_RESULT);
}
return Result.getResult(JSON.toJSONString(topicWrapper));
}

public Result<String> exception(Exception e) throws Exception {
logger.error("cluster:{} error", cluster, e);
logger.error("fetchTopicConfig:{} error", masterAddress, e);
return Result.getDBErrorResult(e);
}

public Cluster mqCluster() {
return cluster;
}
});
}

/**
* 获取consumer的配置
* @param brokerRole
* @return
*/
private Result<String> fetchConsumerConfig(Cluster cluster, String masterAddress){
// 获取topic配置
return mqAdminTemplate.execute(new MQAdminCallback<Result<String>>() {
public Result<String> callback(MQAdminExt mqAdmin) throws Exception {
// 获取topic配置
SubscriptionGroupWrapper subscriptionWrapper = mqAdmin.getAllSubscriptionGroup(masterAddress, 10 * 1000);
if(subscriptionWrapper == null) {
return Result.getResult(Status.NO_RESULT);
}
return Result.getResult(JSON.toJSONString(subscriptionWrapper));
}

public Result<String> exception(Exception e) throws Exception {
logger.error("fetchConsumerConfig:{} error", masterAddress, e);
return Result.getDBErrorResult(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public String toConfig(String nameServerDomain, Cluster cluster) {
+ "\nrmqAddressServerDomain=" + nameServerDomain
+ "\nrmqAddressServerSubGroup=" + String.format(MQDeployer.NS_SUB_GROUP, cluster.getId())
+ "\nfetchNamesrvAddrByAddressServer=true"
+ "\nautoCreateTopicEnable=false"
+ "\nautoCreateTopicEnable=true"
+ "\nclusterTopicEnable=false"
+ "\nautoCreateSubscriptionGroup=false"
+ "\nstorePathRootDir=" + MQDeployer.MQ_CLOUD_DIR + getDir() + "/data"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<#if response.result.hasNameServer>
<div class="form-group">
<button type="button" class="btn btn-success" onclick="addBroker()" data-target="#addBrokerModal" data-toggle="modal"><span class="glyphicon glyphicon-plus" aria-hidden="true"></span>&nbsp;Master</button>
<button type="button" class="btn btn-success" title="从Name Server抓取broker列表,刷入数据库中,仅用于监控broker是否存活" onclick="refresh()" data-toggle="modal"><span class="glyphicon glyphicon-refresh" aria-hidden="true"></span>&nbsp;broker</button>
<button type="button" class="btn btn-success" title="添加broker节点后,从Name Server抓取broker列表,刷入数据库中,用于broker列表展示及监控" onclick="refresh()" data-toggle="modal"><span class="glyphicon glyphicon-refresh" aria-hidden="true"></span>&nbsp;broker</button>
</div>
</#if>
<#if response.result.brokerGroup??>
Expand Down

0 comments on commit bf5220f

Please sign in to comment.