-
- - 公共部分
- - 生产者接入
- - 消费者接入
- - 常见问题
-
-
-
1:仓库
-
-<repository>
- <id>sohu.nexus</id>
- <url>http://${nexusDomain}/nexus/content/groups/public</url>
-</repository>
-
2:pom依赖
-
-<dependency>
- <groupId>com.sohu.tv</groupId>
- <artifactId>${clientArtifactId}</artifactId>
- <version>${version}</version>
-</dependency>
-
3:【可选】log4j配置片段
-
-<appender name="RocketmqClientAppender" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="${r"${日志路径}"}/rocketmq.log" />
- <param name="DatePattern" value="'.'yyyy-MM-dd" />
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{yyy-MM-dd HH\:mm\:ss,SSS} %p %c{1}(%L) - %m%n"/>
- </layout>
-</appender>
-<logger name="RocketmqClient" additivity="false">
- <appender-ref ref="RocketmqClientAppender"/>
-</logger>
-<logger name="RocketmqCommon" additivity="false">
- <appender-ref ref="RocketmqClientAppender"/>
-</logger>
-<logger name="RocketmqRemoting" additivity="false">
- <appender-ref ref="RocketmqClientAppender"/>
-</logger>
-
4:【可选】log4j2配置片段
-
-<Appenders>
- <RollingFile name="RocketmqClientAppender" fileName="${r"${sys:client.logRoot}"}/rocketmq_client.log" filePattern="${r"${sys:client.logRoot}"}/rocketmq_client-%d{yyyy-MM-dd}-%i.log">
- <PatternLayout pattern="%d{yyy-MM-dd HH\:mm\:ss,SSS} %p %c{1}(%L) - %m%n"/>
- <Policies>
- <TimeBasedTriggeringPolicy/>
- <SizeBasedTriggeringPolicy size="1 GB"/>
- </Policies>
- <DefaultRolloverStrategy max="${r"${sys:client.logFileMaxIndex}"}"/>
- </RollingFile>
-</Appenders>
-<Loggers>
- <logger name="RocketmqClient" additivity="false">
- <appender-ref ref="RocketmqClientAppender"/>
- </logger>
- <logger name="RocketmqCommon" additivity="false">
- <appender-ref ref="RocketmqClientAppender"/>
- </logger>
- <logger name="RocketmqRemoting" additivity="false">
- <appender-ref ref="RocketmqClientAppender"/>
- </logger>
-</Loggers>
-
5:【可选】logback配置片段
-
-<appender name="RocketmqClientAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>${r"${LOGS_DIR}"}/rocketmq.log</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${r"${LOGS_DIR}"}/otherdays/rocketmq.log.%d{yyyy-MM-dd}
- </fileNamePattern>
- <maxHistory>40</maxHistory>
- </rollingPolicy>
- <encoder>
- <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
- <charset class="java.nio.charset.Charset">UTF-8</charset>
- </encoder>
-</appender>
-<logger name="RocketmqCommon" level="INFO" additivity="false">
- <appender-ref ref="RocketmqClientAppender"/>
-</logger>
-<logger name="RocketmqRemoting" level="INFO" additivity="false">
- <appender-ref ref="RocketmqClientAppender"/>
-</logger>
-<logger name="RocketmqClient" level="INFO" additivity="false">
- <appender-ref ref="RocketmqClientAppender"/>
-</logger>
-
-
-
老用户请先通过“我是老用户”入口关联生产者
-
1.1:初始化之spring xml方式
-
-<!-- 采用spring xml方式 -->
-<bean id="xxxProducer" class="${producerClass}" init-method="start" destroy-method="shutdown">
- <constructor-arg index="0" value="${r"${请从topic详情查询生产者的producer group}"}"></constructor-arg>
- <constructor-arg index="1" value="${r"${topic名字}"}"></constructor-arg>
-</bean>
-
1.2:初始化之java方式
-
-// 生产者初始化 注意:只用初始化一次
-${producerClass?substring(producerClass?last_index_of(".") + 1)} producer = new ${producerClass?substring(producerClass?last_index_of(".") + 1)}("xxx-producer", "xxx-topic");
-// 注意,只用启动一次
-producer.start();
-// 应用退出时
-producer.shutdown();
-
2:发送消息示例:
-
-Map<String, Object> message = new HashMap<String, Object>();
-message.put("vid", "123456");
-message.put("aid", "789172");
-//这里message推荐使用map,当然也可以使用json
-//建议设置keys(多个key用空格分隔)参数(也可以忽略该参数),比如keys指定为vid,那么就可以根据vid查询消息
-Result<SendResult> sendResult = producer.publish(message, "keys");
-if(!sendResult.isSuccess){
- //失败消息处理
-}
-
3:发送有序消息示例:
-
-/**
- * 相同的id发送到同一个队列
- * hash方法:id % 队列数
- */
-class IDHashMessageQueueSelector implements MessageQueueSelector {
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object idObject) {
- long id = (Long) idObject;
- int size = mqs.size();
- int index = (int) (id % size);
- return mqs.get(index);
- }
-}
-// 设置到producer
-producer.setMessageQueueSelector(new IDHashMessageQueueSelector());
-// 消息发送
-long id = 123L;
-Map<String, Object> map = new HashMap<String, Object>();
-map.put("id", id);
-Result<SendResult> sendResult = producer.publishOrder(map, String.valueOf(id), id);
-
4:发送事务消息示例:
-
-// 1.定义实现事务回调接口
-TransactionListener transactionListener = new TransactionListener() {
- /**
- * 在此方法执行本地事务
- */
- public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
- // arg可以传业务id
- int id = (Integer) arg;
- // 确定事务状态,未知返回:UNKNOW,回滚返回:ROLLBACK_MESSAGE,成功返回:COMMIT_MESSAGE,抛出异常默认为:UNKNOW
- return LocalTransactionState.COMMIT_MESSAGE;
- }
-
- /**
- * 如果executeLocalTransaction返回UNKNOW,rocketmq会回调此方法查询事务状态,默认每分钟查一次,最多查询15次,状态还是UNKNOW的话,丢弃消息
- */
- public LocalTransactionState checkLocalTransaction(MessageExt msg) {
- String key = msg.getKeys();
- int id = Integer.valueOf(key);
- return LocalTransactionState.COMMIT_MESSAGE;
- }
-};
-
-// 2.发送事务消息
-// 初始化
-${producerClass?substring(producerClass?last_index_of(".") + 1)} producer = new ${producerClass?substring(producerClass?last_index_of(".") + 1)}(producerGroup, topic, transactionListener);
-// 组装消息
-int id = 123;
-Map<String, Object> map = new HashMap<String, Object>();
-map.put("id", id);
-map.put("msg", "msg" + id);
-// 发送
-Result<SendResult> sendResult = producer.publishTransaction(JSON.toJSONString(map), String.valueOf(id), id);
-if(!sendResult.isSuccess){
- //失败消息处理
-}
-
5:发送消息示例【hystrix版:MQ集群如果出现故障,将会拖慢发送方,故提供了hystrix版,以保证即使MQ集群整体不可用,也不会拖死发送方】:
-
-Map<String, String> map = new HashMap<String, String>();
-map.put("aid", "123456");
-map.put("vid", "765432");
-// 1.oneway方式 - 此种方式发送效率最高,但是无法获取返回的结果
-new PublishOnewayCommand(producer, map).execute();
-// 2.async方式 - 此种方式发送效率高于普通方式,可以通过异步回调的方式校验返回结果
-SendCallback sendCallback = new SendCallback() {
- public void onSuccess(SendResult sendResult) {
- // 成功回调
- }
- public void onException(Throwable e) {
- // 失败回调
- }
- };
-new PublishAsyncCommand(producer, map, sendCallback).execute();
-// 3.普通方式 - 此种方式即为普通方式的hystrix封装,与普通发送方式无异
-Result<SendResult> result = new PublishCommand(producer, map).execute();
-
注意:hystrix配置默认采用线程池隔离,容量为30,超时时间为rocketmq客户端默认超时3s,如果使用hystrix版,还需要显示依赖hystrix,如下:
-
-<dependency>
- <groupId>com.netflix.hystrix</groupId>
- <artifactId>hystrix-core</artifactId>
- <version>1.3.20</version>
-</dependency>
-
-
-
老用户请先通过“我是老用户”入口关联消费者
-
1:初始化之spring xml方式
-
-<bean id="consumer" class="${consumerClass}"
- init-method="start" destroy-method="shutdown">
- <constructor-arg index="0" value="${r"${请从topic详情查询消费者的consumer group}"}"/>
- <constructor-arg index="1" value="${r"${topic名字}"}"/>
- <!-- 如果topic的消息格式是map,可以使用此属性 -->
- <property name="consumerExecutor" ref="consumerExecutorBean"/>
- <!-- 如果topic的消息格式是map或byte[]或其他格式,可以使用此属性(兼容所有类型) -->
- <property name="consumerCallback" ref="consumerCallbackBean"/>
-</bean>
-
2:初始化之java方式
-
-// 消费者初始化 注意:只用初始化一次
-${consumerClass?substring(consumerClass?last_index_of(".") + 1)} consumer = new ${consumerClass?substring(consumerClass?last_index_of(".") + 1)}("xxx-consumer", "xxx-topic");
-// 设置消费回调
-consumer.setConsumerCallback(new ConsumerCallback<Map<String, Object>, MessageExt>() {
- public void call(Map<String, Object> t, MessageExt k) {
- try {
- // 消费逻辑
- } catch (Exception e) {
- logger.error("consume err, msgid:{}, msg:{}", k.getMsgId(), t, e);
- // 如果需要重新消费,这里需要把异常抛出,消费失败的消息将发回rocketmq,重试消费
- throw e;
- }
- }
-});
-// 注意,只用启动一次
-consumer.start();
-// 应用退出时
-consumer.shutdown();
-
3:广播模式消费者需要注意:
-
广播模式offset默认存储在用户主目录的.rocketmq_offsets文件夹下,如果应用部署在docker上,为了防止重新部署镜像导致offset丢失,需要单独指定offset存储的目录:
-
-Drocketmq.client.localOffsetStoreDir=/data/logs/.rocketmq_offsets
-
4:Consumer部分参数释义【如非有特殊需求不必修改】:
-
-/**
- * 消费线程数,默认20
- *
- * @param num
- */
-public void setConsumeThreadMin(int num) {
- if (num <= 0) {
- return;
- }
- consumer.setConsumeThreadMin(num);
-}
-/**
- * 消费线程数,默认64
- *
- * @param num
- */
-public void setConsumeThreadMax(int num) {
- if (num <= 0) {
- return;
- }
- consumer.setConsumeThreadMax(num);
-}
-/**
- * queue中缓存多少个消息时进行流控 ,默认1000
- *
- * @param size
- */
-public void setPullThresholdForQueue(int size) {
- if (size < 0) {
- return;
- }
- consumer.setPullThresholdForQueue(size);
-}
-
-
-
1:我能否用一个topic发送多种消息?
-
最好一个topic只负责一类消息,topic的数量对MQ集群几乎无影响。
-
2:能否用同样的producer名(即producer group)往多个topic发送消息?
-
可以发送成功,但是不要这样做。
-
3:能否用同样的consumer名(即consumer group)消费多个topic的消息?
-
不可以,consumer group与topic的关系是,多对一。
-
4:消息采用什么格式发送?
-
推荐使用map,优点:生产者和消费者解耦,无序列化和反序列化问题。
-
5:三种消息发送方式的应用场景?
-
1.如果是通知类型消息,即消息可以丢失,推荐采用oneway方式发送。
-
2.如果需要知道消息是否发送成功,但是不能阻塞主流程,推荐采用asyn方式发送。
-
3.如果消息必须发送成功,不在乎是否阻塞主流程,推荐采用普通方式发送。
-
4.以上三种方式都有对应的hystrix隔离版,可以在MQ集群故障时保障客户端主流程不阻塞。
-
6:生产者注意事项:
-
检查发送消息后的返回值,针对失败的消息进行重试发送或降级处理。
-
7:【集群模式】消费者注意事项:
-
针对需要重试的消息,消费失败需要抛出异常,这样会将失败的消息发回重试队列。
-
8:能否使用tags?
-
不建议使用tags,理由如下:
-
1. 说起tags不得不说consumer group,其必须在整个集群中全局唯一,否则会在消费时导致部分消息丢失的问题:参见测试,而MQCloud在业务层面保证了这个唯一性。
-
2. 那么跟tags有什么关系?关系就是同一个topic,同样的consumer group,使用不同的tags,会导致和consumer group一样的问题。
-
也就是说topic<->consumer group<->tags需要一一对应!
-
3. 引起这两个问题的原因都跟rocketmq心跳机制有关,具体类可以参见ConsumerManager,中的结构:
-
private final ConcurrentMap<String/* Consumer Group */, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
-
4. 如果自己能确保上述的一一对应关系,可以参考如下相关代码:
-
-// 生产者:注意一条消息只支持设置一个tag
-producer.publish(msg, tags, null, null);
-// 消费者:在启动之前设置
-consumer.setSubExpression("tagA || tagB");
-
-
5. tags替代方案:消息体增加type字段,各个消费者自己过滤。
-
9:已知问题:
-
1.org.apache.rocketmq.client.exception.MQBrokerException: CODE: 25 DESC: the consumer's subscription not latest。
-
该问题是rocketmq4.2版本的bug,拉取消息流程控制不严格导致,但是并不影响消息消费,在4.2版本出现,在4.3版本修复,参见。
-
2.org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while
-
该问题是由于rocketmq4.1之后broker针对处理发送过来的请求增加了快速失败机制,对于响应超过200ms的请求移除队列。默认broker端采用单线程和spin lock来处理。引起的原因可能是SYN_FLUSH,SYN_MASTER,gc,iops过高等,参考1,参考2。
-
-