浏览代码

加入rocketmq

wangliang 5 年之前
父节点
当前提交
1d3f308291

+ 4 - 4
themis-business/src/main/java/com/qmth/themis/business/listener/RocketConsumerTranListener.java

@@ -1,5 +1,6 @@
 package com.qmth.themis.business.listener;
 
+import com.qmth.themis.business.constant.SystemConstant;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -38,8 +39,8 @@ public class RocketConsumerTranListener {
 
         @Override
         public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条
-            defaultMQPushConsumer.setMaxReconsumeTimes(1);//最大重试次数
+            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
             defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                 @Override
                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
@@ -48,9 +49,8 @@ public class RocketConsumerTranListener {
                         log.info("接受到的消息:{}", new String(messageExt.getBody()));
                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                     }
-                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
 //                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-//                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
                 }
             });
         }