|
@@ -1,15 +1,16 @@
|
|
package com.qmth.themis.business.listener;
|
|
package com.qmth.themis.business.listener;
|
|
|
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
import com.qmth.themis.business.constant.SystemConstant;
|
|
|
|
+import com.qmth.themis.business.dto.MqDto;
|
|
import com.qmth.themis.business.entity.TBSession;
|
|
import com.qmth.themis.business.entity.TBSession;
|
|
import com.qmth.themis.business.service.TBSessionService;
|
|
import com.qmth.themis.business.service.TBSessionService;
|
|
import com.qmth.themis.business.util.RedisUtil;
|
|
import com.qmth.themis.business.util.RedisUtil;
|
|
import com.qmth.themis.common.contanst.Constants;
|
|
import com.qmth.themis.common.contanst.Constants;
|
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
|
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
|
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|
|
|
|
|
+import org.apache.rocketmq.client.consumer.listener.*;
|
|
|
|
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
|
import org.apache.rocketmq.common.message.Message;
|
|
import org.apache.rocketmq.common.message.Message;
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
@@ -22,9 +23,9 @@ import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
import javax.annotation.Resource;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
-import java.util.Map;
|
|
|
|
import java.util.Objects;
|
|
import java.util.Objects;
|
|
|
|
|
|
|
|
+import static com.alibaba.fastjson.JSON.toJavaObject;
|
|
import static com.alibaba.fastjson.JSONObject.parseObject;
|
|
import static com.alibaba.fastjson.JSONObject.parseObject;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -35,7 +36,10 @@ import static com.alibaba.fastjson.JSONObject.parseObject;
|
|
* @Date: 2020/6/28
|
|
* @Date: 2020/6/28
|
|
*/
|
|
*/
|
|
@Service
|
|
@Service
|
|
-public class RocketConsumer implements MessageListenerConcurrently {
|
|
|
|
|
|
+public class RocketConsumer implements
|
|
|
|
+// MessageListenerOrderly
|
|
|
|
+ MessageListenerConcurrently //并发消费
|
|
|
|
+{
|
|
|
|
|
|
private final static Logger log = LoggerFactory.getLogger(RocketConsumer.class);
|
|
private final static Logger log = LoggerFactory.getLogger(RocketConsumer.class);
|
|
|
|
|
|
@@ -45,28 +49,34 @@ public class RocketConsumer implements MessageListenerConcurrently {
|
|
@Resource
|
|
@Resource
|
|
RedisUtil redisUtil;
|
|
RedisUtil redisUtil;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * 并发消费
|
|
|
|
+ *
|
|
|
|
+ * @param msgs
|
|
|
|
+ * @param consumeConcurrentlyContext
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
@Override
|
|
@Override
|
|
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
try {
|
|
try {
|
|
for (MessageExt messageExt : msgs) {
|
|
for (MessageExt messageExt : msgs) {
|
|
log.info("sessionConsumer重试次数:{}", messageExt.getReconsumeTimes());
|
|
log.info("sessionConsumer重试次数:{}", messageExt.getReconsumeTimes());
|
|
- JSONObject jsonObject = parseObject(new String(messageExt.getBody(), Constants.CHARSET));
|
|
|
|
- log.info("sessionConsumer接受到的消息:{}", jsonObject.toJSONString());
|
|
|
|
- boolean waitStoreMsgOK = jsonObject.getBoolean("waitStoreMsgOK");
|
|
|
|
- if (waitStoreMsgOK) {
|
|
|
|
- Map map = (Map) jsonObject.get(SystemConstant.PROPERTIES);
|
|
|
|
- String sessionId = (String.valueOf(map.get(String.valueOf(map.get("TAGS")))));
|
|
|
|
- if (Objects.nonNull(sessionId)) {
|
|
|
|
- TBSession tbSession = (TBSession) redisUtil.getSessionTopicList(sessionId);
|
|
|
|
- if (Objects.nonNull(tbSession.getAck()) && tbSession.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
|
|
|
|
- tbSessionService.saveSessionInfo(tbSession);
|
|
|
|
- tbSession.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
|
- redisUtil.setUserSession(tbSession.getId(), tbSession);
|
|
|
|
- redisUtil.deleteSessionTopicList(tbSession.getId());
|
|
|
|
- }
|
|
|
|
|
|
+ MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
|
|
|
|
+ log.info("sessionConsumer接受到的消息:{}", JSONObject.toJSONString(mqDto));
|
|
|
|
+ log.info("mqDto sequence:{},tag:{}", mqDto.getSequence(), mqDto.getTag());
|
|
|
|
+ MqDto redisMqdto = (MqDto) redisUtil.getSessionTopicList(mqDto.getId());
|
|
|
|
+ if (Objects.nonNull(redisMqdto)) {
|
|
|
|
+ if (Objects.nonNull(redisMqdto.getAck()) && redisMqdto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
|
|
|
|
+ log.info("更新db");
|
|
|
|
+ tbSessionService.saveSessionInfo(toJavaObject((JSON) mqDto.getBody(), TBSession.class), redisMqdto.getTimestamp());
|
|
|
|
+ redisUtil.deleteSessionTopicList(redisMqdto.getId());
|
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
|
+ } else {
|
|
|
|
+ log.info("消息ack未确认,重发");
|
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
}
|
|
}
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
|
} else {
|
|
} else {
|
|
|
|
+ log.info("消息数据为空,重发消息");
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -77,9 +87,48 @@ public class RocketConsumer implements MessageListenerConcurrently {
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// /**
|
|
|
|
+// * 顺序消费
|
|
|
|
+// *
|
|
|
|
+// * @param msgs
|
|
|
|
+// * @param consumeOrderlyContext
|
|
|
|
+// * @return
|
|
|
|
+// */
|
|
|
|
+// @Override
|
|
|
|
+// public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext
|
|
|
|
+// consumeOrderlyContext) {
|
|
|
|
+// try {
|
|
|
|
+// for (MessageExt messageExt : msgs) {
|
|
|
|
+// log.info("sessionConsumer重试次数:{}", messageExt.getReconsumeTimes());
|
|
|
|
+// MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
|
|
|
|
+// log.info("sessionConsumer接受到的消息:{}", JSONObject.toJSONString(mqDto));
|
|
|
|
+// log.info("mqDto sequence:{},tag:{}", mqDto.getSequence(), mqDto.getTag());
|
|
|
|
+// MqDto redisMqdto = (MqDto) redisUtil.getSessionTopicList(mqDto.getId());
|
|
|
|
+// if (Objects.nonNull(redisMqdto)) {
|
|
|
|
+// if (Objects.nonNull(redisMqdto.getAck()) && redisMqdto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
|
|
|
|
+// log.info("更新db");
|
|
|
|
+// tbSessionService.saveSessionInfo(toJavaObject((JSON) mqDto.getBody(), TBSession.class), redisMqdto.getTimestamp());
|
|
|
|
+// redisUtil.deleteSessionTopicList(redisMqdto.getId());
|
|
|
|
+// return ConsumeOrderlyStatus.SUCCESS;
|
|
|
|
+// } else {
|
|
|
|
+// log.info("消息ack未确认,重发");
|
|
|
|
+// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
|
+// }
|
|
|
|
+// } else {
|
|
|
|
+// log.info("消息数据为空,重发消息");
|
|
|
|
+// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
|
+// }
|
|
|
|
+// }
|
|
|
|
+// } catch (Exception e) {
|
|
|
|
+// e.printStackTrace();
|
|
|
|
+// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
|
+// }
|
|
|
|
+// return ConsumeOrderlyStatus.SUCCESS;//成功
|
|
|
|
+// }
|
|
|
|
+
|
|
@Service
|
|
@Service
|
|
- @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicTag}")
|
|
|
|
- public class sessionConsumer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
|
|
|
+ @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWebGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWebTag}")
|
|
|
|
+ public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void onMessage(Message message) {
|
|
public void onMessage(Message message) {
|
|
@@ -89,14 +138,15 @@ public class RocketConsumer implements MessageListenerConcurrently {
|
|
@Override
|
|
@Override
|
|
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
|
+ defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
|
|
defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@Service
|
|
@Service
|
|
- @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerGroupDlq}", topic = "${mq.config.sessionTopicDlq}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicTag}")
|
|
|
|
- public class dlqSessionConsumer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
|
|
|
+ @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerPcGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicPcTag}")
|
|
|
|
+ public class sessionConsumerPc implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void onMessage(Message message) {
|
|
public void onMessage(Message message) {
|
|
@@ -105,10 +155,50 @@ public class RocketConsumer implements MessageListenerConcurrently {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
- log.info("dlqSessionConsumer死信队列进来了");
|
|
|
|
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
|
+ defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
|
|
defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ //
|
|
|
|
+ @Service
|
|
|
|
+ @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappTag}")
|
|
|
|
+ public class sessionConsumerWxapp implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onMessage(Message message) {
|
|
|
|
+ //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
|
+ defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
|
+ defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
|
+ defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
|
+ defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 死信队列
|
|
|
|
+ */
|
|
|
|
+// @Service
|
|
|
|
+// @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerGroupDlq}", topic = "${mq.config.sessionTopicDlq}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicTag}")
|
|
|
|
+// public class dlqSessionConsumer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
|
+//
|
|
|
|
+// @Override
|
|
|
|
+// public void onMessage(Message message) {
|
|
|
|
+// //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
|
+// }
|
|
|
|
+//
|
|
|
|
+// @Override
|
|
|
|
+// public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
|
+// log.info("dlqSessionConsumer死信队列进来了");
|
|
|
|
+// defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
|
+// defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
|
+// defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
|
|
|
|
+// }
|
|
|
|
+// }
|
|
}
|
|
}
|