Skip to content

Commit

Permalink
1.3发布
Browse files Browse the repository at this point in the history
1 消息支持精准offset搜索
2 重试&死消息支持offset搜索
3 客户端异常发送到负责人
4 支持消息重发审核
5 消费者消费消息重试时间从默认15分钟扩大至2小时
6 优化部分UI
7 消息时间查询扩展为任意时间段
8 增加死消息队列展示
9 消息展示支持xml
  • Loading branch information
yongfeigao committed Dec 7, 2018
1 parent 70bb9eb commit e193834
Show file tree
Hide file tree
Showing 107 changed files with 2,418 additions and 797 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@

![消费详情](mq-cloud/src/main/resources/static/img/intro/consumeDetail2.png)

* 某个消费者具体的消费详情-可以查询重试消息和死消息

![消费详情](mq-cloud/src/main/resources/static/img/intro/consumeRetry.png)

* 消息

![消息](mq-cloud/src/main/resources/static/img/intro/msgSearch.png)
Expand Down
2 changes: 1 addition & 1 deletion mq-client-common-open/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.sohu.tv</groupId>
<artifactId>mq</artifactId>
<version>1.2.RELEASE</version>
<version>1.3.RELEASE</version>
</parent>

<artifactId>mq-client-common-open</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ public Exception getException() {

public void setException(Exception exception) {
this.exception = exception;
}
}

@Override
public String toString() {
return "Result [isSuccess=" + isSuccess + ", result=" + result + ", exception=" + exception + "]";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
public class Version {

public static String get() {
return "1.2.RELEASE";
return "1.3.RELEASE";
}
}

This file was deleted.

2 changes: 1 addition & 1 deletion mq-client-open/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.sohu.tv</groupId>
<artifactId>mq</artifactId>
<version>1.2.RELEASE</version>
<version>1.3.RELEASE</version>
</parent>

<artifactId>mq-client-open</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class RocketMQConsumer extends AbstractConfig {
public RocketMQConsumer(String consumerGroup, String topic) {
super(consumerGroup, topic);
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setConsumeTimeout(2 * 60);
}

public void start() {
Expand All @@ -86,8 +87,8 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
}
List<Map<String, Object>> msgList = null;
for (MessageExt me : msgs) {
byte[] bytes = me.getBody();
try {
byte[] bytes = me.getBody();
if (bytes == null || bytes.length == 0) {
logger.warn("MessageExt={},MessageBody is null", me);
continue;
Expand Down Expand Up @@ -115,7 +116,8 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
msgList.add((Map<String, Object>) getMessageSerializer().deserialize(bytes));
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.error("topic:{} consumer:{} msg:{} msgId:{} bornTimestamp:{}", getTopic(),
getConsumer(), new String(bytes), me.getMsgId(), me.getBornTimestamp(), e);
if (reconsume) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.sohu.tv.mq.rocketmq;

import java.util.concurrent.atomic.AtomicLong;

import org.apache.rocketmq.common.message.MessageExt;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.sohu.index.tv.mq.common.ConsumerCallback;

public class RocketMQConsumerJsonTest {

private AtomicLong counter = new AtomicLong();

private RocketMQConsumer consumer;

@Before
public void init() {
consumer = TestUtil.buildConsumer("mqcloud-test-topic-consumer", "mqcloud-test-topic");
}

@Test
public void test() throws InterruptedException {
consumer.setConsumerCallback(new ConsumerCallback<String, MessageExt>() {
public void call(String t, MessageExt k) throws Exception {
System.out.println(t);
}
});
consumer.start();
while (true) {
System.out.println(counter.get());
Thread.sleep(1000);
}
}

@After
public void clean() {
consumer.shutdown();
}
}
Loading

0 comments on commit e193834

Please sign in to comment.