wangliang 4 gadi atpakaļ
vecāks
revīzija
d97d0ee6bf

+ 10 - 22
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketMessageConsumer.java

@@ -5,9 +5,7 @@ import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.common.contanst.Constants;
 import com.qmth.themis.mq.dto.MqDto;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.*;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -36,33 +34,23 @@ public class RocketMessageConsumer {
      * @param consumerGroupName
      * @param topic
      * @param tag
+     * @param o
      * @return
      */
-    public DefaultMQPushConsumer setRocketMQConsumer(String nameSrvAddr, String consumerGroupName, String topic, String tag) {
+    public DefaultMQPushConsumer setRocketMQConsumer(String nameSrvAddr, String consumerGroupName, String topic, String tag, Object o) {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName);
         consumer.setNamesrvAddr(nameSrvAddr);
         consumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         consumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
         consumer.setMessageModel(MessageModel.BROADCASTING);
-        consumer.registerMessageListener(new MessageListenerConcurrently() {
-            @Override
-            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
-                try {
-                    long threadId = Thread.currentThread().getId();
-                    String threadName = Thread.currentThread().getName();
-                    for (MessageExt messageExt : msgs) {
-                        log.info(":{}-:{} messageConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
-                        MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-                        log.info(":{}-:{} messageConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-                }
-                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
-            }
-        });
+        if (o instanceof MessageListenerConcurrently) {
+            consumer.registerMessageListener((MessageListenerConcurrently) o);
+        } else if (o instanceof MessageListenerOrderly) {
+            consumer.registerMessageListener((MessageListenerOrderly) o);
+        } else {
+            consumer.registerMessageListener((MessageListener) o);
+        }
         try {
             consumer.subscribe(topic, tag);
             consumer.start();