Skip to content

Commit

Permalink
新功能和优化:
Browse files Browse the repository at this point in the history
1 优化字符串拼接代码
2 修改用户指南
3 客户端链接增加client name
4 完善日志

bug修复
1 修复拓扑图展示客户端链接过多导致无法滚动的bug
2 fix 消费者retry流量收集失败导致正常流量停止收集的问题
  • Loading branch information
yongfeigao committed Jul 22, 2019
1 parent 4c94903 commit 61c2e6e
Show file tree
Hide file tree
Showing 16 changed files with 141 additions and 66 deletions.
15 changes: 3 additions & 12 deletions mq-cloud/src/main/java/com/sohu/tv/mq/cloud/bo/ClientVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.util.Date;
import java.util.HashSet;
import java.util.Set;

import com.sohu.tv.mq.cloud.util.Jointer;
/**
* 客户端版本
* @Description:
Expand Down Expand Up @@ -67,18 +69,7 @@ public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public String getOwnersString() {
if(owners == null) {
return "";
}
StringBuilder sb = new StringBuilder();
for(User u : owners) {
sb.append(u.notBlankName());
sb.append(",");
}
if(sb.length() > 0) {
sb.deleteCharAt(sb.length() - 1);
}
return sb.toString();
return Jointer.BY_COMMA.join(owners, v -> v.notBlankName());
}
public boolean addOwner(User owner) {
if(owners == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.google.common.base.Joiner;
import com.sohu.tv.mq.cloud.bo.AlarmConfig;
import com.sohu.tv.mq.cloud.bo.NeedAlarmConfig;
import com.sohu.tv.mq.cloud.dao.NeedAlarmConfigDao;
Expand Down Expand Up @@ -164,16 +165,7 @@ public void setConfigTable(List<AlarmConfig> alarmConfigList) {
* @return
*/
private String getKey(String... keys) {
StringBuilder sb = new StringBuilder();
for (String k : keys) {
sb.append(k);
sb.append("_");
}
if (sb.length() > 0) {
sb.deleteCharAt(sb.length() - 1);
return sb.toString();
}
return "";
return Joiner.on("_").join(keys);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.sohu.tv.mq.cloud.bo.User;
import com.sohu.tv.mq.cloud.mq.DefaultInvoke;
import com.sohu.tv.mq.cloud.mq.MQAdminTemplate;
import com.sohu.tv.mq.cloud.util.Jointer;
import com.sohu.tv.mq.cloud.util.MQCloudConfigHelper;
import com.sohu.tv.mq.cloud.util.Result;

Expand Down Expand Up @@ -72,13 +73,7 @@ public void invoke(MQAdminExt mqAdmin) throws Exception {
topicConsumer.getTid(), topicConsumer.getCid());
String email = null;
if(userListResult.isNotEmpty()) {
StringBuilder sb = new StringBuilder();
for(User u : userListResult.getResult()) {
sb.append(u.getEmail());
sb.append(",");
}
sb.deleteCharAt(sb.length() - 1);
email = sb.toString();
email = Jointer.BY_COMMA.join(userListResult.getResult(), u -> u.getEmail());
}
long consumerFailCount = alarmConfigBridingService.getConsumerFailCount(topicConsumer.getConsumer());
if (consumerFailCount >= 0 && consumerFailCount < topicTraffic.getCount()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.springframework.web.util.HtmlUtils;

import com.alibaba.fastjson.JSON;
import com.google.common.base.Joiner;
import com.sohu.tv.mq.cloud.bo.Cluster;
import com.sohu.tv.mq.cloud.bo.Consumer;
import com.sohu.tv.mq.cloud.bo.DecodedMessage;
Expand Down Expand Up @@ -494,14 +495,8 @@ public List<String> callback(MQAdminExt mqAdmin) throws Exception {
return mqAdmin.getNameServerAddressList();
}
});
StringBuilder sb = new StringBuilder();
for (String str : nsList) {
sb.append(str);
sb.append(";");
}
sb.deleteCharAt(sb.length() - 1);
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, null);
consumer.setNamesrvAddr(sb.toString());
consumer.setNamesrvAddr(Joiner.on(";").join(nsList));
consumer.setVipChannelEnabled(mqCluster.isEnableVipChannel());
consumer.start();
return consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected void fetchTraffic(MQAdminExt mqAdmin, String topic, String statKey, T
if (masterAddr == null) {
continue;
}
String key = masterAddr + "_" + statKey;
String key = masterAddr + "_" + statKey + "_" + getCountKey();
String value = fetchLocalCache.get(key);
if (value != null && ERROR.equals(value)) {
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.sohu.tv.mq.cloud.cache.LocalCache;
import com.sohu.tv.mq.cloud.dao.ConsumerDao;
import com.sohu.tv.mq.cloud.dao.UserDao;
import com.sohu.tv.mq.cloud.util.Jointer;
import com.sohu.tv.mq.cloud.util.Result;
import com.sohu.tv.mq.cloud.util.Status;

Expand Down Expand Up @@ -281,15 +282,7 @@ public String queryMonitorEmail() {
logger.error("queryMonitor err", e);
}
if (userList != null && userList.size() > 0) {
StringBuilder sb = new StringBuilder();
for (User u : userList) {
sb.append(u.getEmail());
sb.append(",");
}
if (sb.length() > 0) {
sb.deleteCharAt(sb.length() - 1);
}
email = sb;
email = Jointer.BY_COMMA.join(userList, u -> u.getEmail());
mqLocalCache.put("monitor", email);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.sohu.tv.mq.cloud.bo.Cluster;
import com.sohu.tv.mq.cloud.bo.NameServer;
import com.sohu.tv.mq.cloud.service.NameServerService;
import com.sohu.tv.mq.cloud.util.Jointer;
import com.sohu.tv.mq.cloud.util.Result;

/**
Expand Down Expand Up @@ -73,13 +74,7 @@ public MonitorService(NameServerService nameServerService, Cluster mqCluster, Mo
logger.error("monitor cluster:{} init err!", mqCluster);
return;
}
StringBuilder sb = new StringBuilder();
for(NameServer ns : nameServerListResult.getResult()) {
sb.append(ns.getAddr());
sb.append(";");
}
sb.deleteCharAt(sb.length() - 1);
this.nsAddr = sb.toString();
this.nsAddr = Jointer.BY_SEMICOLON.join(nameServerListResult.getResult(), ns -> ns.getAddr());
this.clusterName = mqCluster.getName();
this.monitorListener = monitorListener;

Expand Down
62 changes: 62 additions & 0 deletions mq-cloud/src/main/java/com/sohu/tv/mq/cloud/util/Jointer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.sohu.tv.mq.cloud.util;

import java.util.Collection;
import java.util.Iterator;

/**
* 拼接工具 refrence from com.google.common.base.Joiner
*
* @author yongfeigao
* @date 2019年7月10日
*/
public class Jointer {
// 拼接分隔符
private final String separator;

public static final Jointer BY_COMMA = new Jointer(",");

public static final Jointer BY_SEMICOLON = new Jointer(";");

public static final String BLANK = "";

private Jointer(String separator) {
this.separator = separator;
}

/**
* 遍历collection的对象获取其joinerValue进行拼接
* @param collection
* @param joinerValue
* @return String
*/
public <E> String join(Collection<E> collection, JointerValue<E> joinerValue) {
if (collection == null || collection.isEmpty()) {
return "";
}
StringBuilder buffer = new StringBuilder();
Iterator<E> iterator = collection.iterator();
buffer.append(joinerValue.getValue(iterator.next()));
while (iterator.hasNext()) {
buffer.append(separator);
buffer.append(joinerValue.getValue(iterator.next()));
}
return buffer.toString();
}

/**
* 拼接返回值
*
* @author yongfeigao
* @date 2019年7月11日
* @param <T>
*/
public interface JointerValue<T> {

/**
* 返回打算拼接的字符串
* @param t
* @return
*/
public String getValue(T t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,16 @@ public String consumeProgress(UserInfo userInfo, @RequestParam("tid") int tid, M
if(consumerProgressVO.getRetryTopic() == null) {
consumerProgressVO.setRetryTopic(mq.getTopic());
} else if(!mq.getTopic().equals(consumerProgressVO.getRetryTopic())){
logger.error("retry consumer has two diffrent topic, {} != {}", mq.getTopic(), consumerProgressVO.getRetryTopic());
logger.error("retry consumer:{} has two diffrent topic, {} != {}", consumer.getName(),
mq.getTopic(), consumerProgressVO.getRetryTopic());
}
} else {
offsetMap.put(mq, offsetTable.get(mq));
if(consumerProgressVO.getTopic() == null) {
consumerProgressVO.setTopic(mq.getTopic());
} else if(!mq.getTopic().equals(consumerProgressVO.getTopic())){
logger.error("consumer has two diffrent topic, {} != {}", mq.getTopic(), consumerProgressVO.getTopic());
logger.error("consumer:{} has two diffrent topic, {} != {}", consumer.getName(),
mq.getTopic(), consumerProgressVO.getTopic());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.sohu.tv.mq.cloud.bo.NameServer;
import com.sohu.tv.mq.cloud.service.NameServerService;
import com.sohu.tv.mq.cloud.util.Jointer;
import com.sohu.tv.mq.cloud.util.Result;

/**
Expand All @@ -34,14 +35,6 @@ public class RocketMQController {
@RequestMapping("/nsaddr-{clusterId}")
public String nsaddr(@PathVariable int clusterId) throws Exception {
Result<List<NameServer>> result = nameServerService.query(clusterId);
if(result.isNotEmpty()) {
StringBuilder sb = new StringBuilder();
for(NameServer nameServer : result.getResult()) {
sb.append(nameServer.getAddr());
sb.append(";");
}
return sb.toString();
}
return "";
return Jointer.BY_SEMICOLON.join(result.getResult(), ns -> ns.getAddr());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ public String connection(UserInfo userInfo, @RequestParam("topic") String topic,
@RequestParam("type") int type, Map<String, Object> map) throws Exception {
FreemarkerUtil.set("mqVersion", MQVersion.class, map);
setResult(map, null);
setResult(map, "client", group);
String view = viewModule() + "/connection";
Cluster cluster = clusterService.getMQClusterById(cid);
HashSet<Connection> connectionSet = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
@Configuration
public class MQConfiguration {
@Value("${flushCache.consumer}")
@Value("${flushCache.consumerGroup}")
private String flushCacheConsumer;
@Value("${flushCache.topic}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
```
@Configuration
public class MQConfiguration {
@Value("${flushCache.producer}")
@Value("${flushCache.producerGroup}")
private String flushCacheProducer;
@Value("${flushCache.topic}")
Expand All @@ -18,6 +18,8 @@ public class MQConfiguration {
}
```

producerGroup和topic具体的值,请参考[topic详情页](topic#detail),然后配置到yml或properties里。

## 二、<span id="spring-xml">初始化之spring xml方式</span>

```
Expand Down
3 changes: 3 additions & 0 deletions mq-cloud/src/main/resources/templates/topic/connection.html
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
<table class="table table-striped table-hover" style="margin: 0px">
<thead>
<tr>
<td colspan=2>${client}的链接</td>
</tr>
<tr>
<td>ClientId</td>
<td>版本</td>
Expand Down
2 changes: 2 additions & 0 deletions mq-cloud/src/main/resources/templates/user/topicTopology.html
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ <h3 class="text-center">拓扑</h3>
if(params.data._type){
if(ipCache[params.data._group]){
if(ipCache[params.data._group] != 1 && ipCache[params.data._group] != 2){
setTimeout("$('body').getNiceScroll().resize()", 500);
return ipCache[params.data._group];
}
} else {
Expand All @@ -165,6 +166,7 @@ <h3 class="text-center">拓扑</h3>
ipCache[params.data._group] = content;
callback(ticket, content);
console.log($("#ipList").val());
$('body').getNiceScroll().resize();
});
}
} else {
Expand Down
49 changes: 49 additions & 0 deletions mq-cloud/src/test/java/com/sohu/tv/mq/cloud/util/JointerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.sohu.tv.mq.cloud.util;

import java.util.ArrayList;
import java.util.List;

import org.junit.Assert;
import org.junit.Test;

import com.sohu.tv.mq.cloud.bo.User;

public class JointerTest {

@Test
public void testJoin() {
List<User> userList = new ArrayList<User>();
User user = new User();
user.setEmail("[email protected]");
userList.add(user);

user = new User();
user.setEmail("[email protected]");
userList.add(user);

String result = Jointer.BY_COMMA.join(userList, u -> u.getEmail());
Assert.assertEquals("[email protected],[email protected]", result);
}

@Test
public void testJoinBlank() {
List<User> userList = new ArrayList<User>();
String result = Jointer.BY_COMMA.join(userList, u -> u.getEmail());
Assert.assertEquals("", result);

userList = null;
result = Jointer.BY_COMMA.join(userList, u -> u.getEmail());
Assert.assertEquals("", result);
}

@Test
public void testJoinOne() {
List<User> userList = new ArrayList<User>();
User user = new User();
user.setEmail("[email protected]");
userList.add(user);

String result = Jointer.BY_COMMA.join(userList, u -> u.getEmail());
Assert.assertEquals("[email protected]", result);
}
}

0 comments on commit 61c2e6e

Please sign in to comment.