|
@@ -1,217 +1,160 @@
|
|
|
-package com.qmth.themis.mq.listener;
|
|
|
-
|
|
|
-import com.google.gson.Gson;
|
|
|
-import com.qmth.themis.business.constant.SystemConstant;
|
|
|
-import com.qmth.themis.business.entity.TBSession;
|
|
|
-import com.qmth.themis.business.entity.TMRocketMessage;
|
|
|
-import com.qmth.themis.business.service.TBSessionService;
|
|
|
-import com.qmth.themis.business.service.TMRocketMessageService;
|
|
|
-import com.qmth.themis.business.util.JacksonUtil;
|
|
|
-import com.qmth.themis.business.util.RedisUtil;
|
|
|
-import com.qmth.themis.common.contanst.Constants;
|
|
|
-import com.qmth.themis.mq.dto.MqDto;
|
|
|
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|
|
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
|
|
-import org.apache.rocketmq.common.message.Message;
|
|
|
-import org.apache.rocketmq.common.message.MessageExt;
|
|
|
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
-import org.apache.rocketmq.spring.annotation.SelectorType;
|
|
|
-import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-import org.springframework.stereotype.Service;
|
|
|
-import org.springframework.transaction.annotation.Transactional;
|
|
|
-
|
|
|
-import javax.annotation.Resource;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Objects;
|
|
|
-
|
|
|
-/**
|
|
|
- * @Description: 普通消息监听 session_topic
|
|
|
- * @Param:
|
|
|
- * @return:
|
|
|
- * @Author: wangliang
|
|
|
- * @Date: 2020/6/28
|
|
|
- */
|
|
|
-@Service
|
|
|
-public class RocketSessionConsumer implements
|
|
|
-// MessageListenerOrderly
|
|
|
- MessageListenerConcurrently //并发消费
|
|
|
-{
|
|
|
-
|
|
|
- private final static Logger log = LoggerFactory.getLogger(RocketSessionConsumer.class);
|
|
|
-
|
|
|
- @Resource
|
|
|
- TBSessionService tbSessionService;
|
|
|
-
|
|
|
- @Resource
|
|
|
- RedisUtil redisUtil;
|
|
|
-
|
|
|
- @Resource
|
|
|
- TMRocketMessageService tmRocketMessageService;
|
|
|
-
|
|
|
- /**
|
|
|
- * 并发消费
|
|
|
- *
|
|
|
- * @param msgs
|
|
|
- * @param consumeConcurrentlyContext
|
|
|
- * @return
|
|
|
- */
|
|
|
- @Override
|
|
|
- @Transactional
|
|
|
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
- try {
|
|
|
- long threadId = Thread.currentThread().getId();
|
|
|
- String threadName = Thread.currentThread().getName();
|
|
|
- Gson gson = new Gson();
|
|
|
- for (MessageExt messageExt : msgs) {
|
|
|
- log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
- MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
- log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
- log.info(":{}-:{} sessionConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
- if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
|
|
|
- log.info(":{}-:{} 更新db", threadId, threadName);
|
|
|
- tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class), mqDto.getTimestamp());
|
|
|
- mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
- TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
- tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
|
|
|
- tmRocketMessageService.save(tmRocketMessage);
|
|
|
- redisUtil.deleteSessionTopicList(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
- } else {
|
|
|
- log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
- }
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
- }
|
|
|
-
|
|
|
+//package com.qmth.themis.mq.listener;
|
|
|
+//
|
|
|
+//import com.google.gson.Gson;
|
|
|
+//import com.qmth.themis.business.constant.SystemConstant;
|
|
|
+//import com.qmth.themis.business.entity.TBSession;
|
|
|
+//import com.qmth.themis.business.entity.TMRocketMessage;
|
|
|
+//import com.qmth.themis.business.service.TBSessionService;
|
|
|
+//import com.qmth.themis.business.service.TMRocketMessageService;
|
|
|
+//import com.qmth.themis.business.util.JacksonUtil;
|
|
|
+//import com.qmth.themis.business.util.RedisUtil;
|
|
|
+//import com.qmth.themis.common.contanst.Constants;
|
|
|
+//import com.qmth.themis.mq.dto.MqDto;
|
|
|
+//import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
+//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
+//import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
+//import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
|
|
+//import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
|
|
+//import org.apache.rocketmq.common.message.Message;
|
|
|
+//import org.apache.rocketmq.common.message.MessageExt;
|
|
|
+//import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
|
|
+//import org.apache.rocketmq.spring.annotation.SelectorType;
|
|
|
+//import org.apache.rocketmq.spring.core.RocketMQListener;
|
|
|
+//import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
|
|
|
+//import org.slf4j.Logger;
|
|
|
+//import org.slf4j.LoggerFactory;
|
|
|
+//import org.springframework.stereotype.Service;
|
|
|
+//import org.springframework.transaction.annotation.Transactional;
|
|
|
+//
|
|
|
+//import javax.annotation.Resource;
|
|
|
+//import java.util.List;
|
|
|
+//import java.util.Objects;
|
|
|
+//
|
|
|
+///**
|
|
|
+// * @Description: 普通消息监听 session_topic
|
|
|
+// * @Param:
|
|
|
+// * @return:
|
|
|
+// * @Author: wangliang
|
|
|
+// * @Date: 2020/6/28
|
|
|
+// */
|
|
|
+//@Service
|
|
|
+//public class RocketSessionConsumer implements
|
|
|
+//// MessageListenerOrderly
|
|
|
+// MessageListenerConcurrently //并发消费
|
|
|
+//{
|
|
|
+//
|
|
|
+// private final static Logger log = LoggerFactory.getLogger(RocketSessionConsumer.class);
|
|
|
+//
|
|
|
+// @Resource
|
|
|
+// TBSessionService tbSessionService;
|
|
|
+//
|
|
|
+// @Resource
|
|
|
+// RedisUtil redisUtil;
|
|
|
+//
|
|
|
+// @Resource
|
|
|
+// TMRocketMessageService tmRocketMessageService;
|
|
|
+//
|
|
|
// /**
|
|
|
-// * 顺序消费
|
|
|
+// * 并发消费
|
|
|
// *
|
|
|
// * @param msgs
|
|
|
-// * @param consumeOrderlyContext
|
|
|
+// * @param consumeConcurrentlyContext
|
|
|
// * @return
|
|
|
// */
|
|
|
// @Override
|
|
|
-// public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext
|
|
|
-// consumeOrderlyContext) {
|
|
|
+// @Transactional
|
|
|
+// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
// try {
|
|
|
+// long threadId = Thread.currentThread().getId();
|
|
|
+// String threadName = Thread.currentThread().getName();
|
|
|
+// Gson gson = new Gson();
|
|
|
// for (MessageExt messageExt : msgs) {
|
|
|
-// log.info("sessionConsumer重试次数:{}", messageExt.getReconsumeTimes());
|
|
|
-// MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
|
|
|
-// log.info("sessionConsumer接受到的消息:{}", JacksonUtil.parseJson(mqDto));
|
|
|
-// log.info("mqDto sequence:{},tag:{}", mqDto.getSequence(), mqDto.getTag());
|
|
|
-// MqDto redisMqdto = (MqDto) redisUtil.getSessionTopicList(mqDto.getId());
|
|
|
-// if (Objects.nonNull(redisMqdto)) {
|
|
|
-// if (Objects.nonNull(redisMqdto.getAck()) && redisMqdto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
|
|
|
-// log.info("更新db");
|
|
|
-// tbSessionService.saveSessionInfo(toJavaObject((JSON) mqDto.getBody(), TBSession.class), redisMqdto.getTimestamp());
|
|
|
-// redisUtil.deleteSessionTopicList(redisMqdto.getId());
|
|
|
-// return ConsumeOrderlyStatus.SUCCESS;
|
|
|
-// } else {
|
|
|
-// log.info("消息ack未确认,重发");
|
|
|
-// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
-// }
|
|
|
+// log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
+// MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
+// log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
+// log.info(":{}-:{} sessionConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
|
|
|
+// if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
|
|
|
+// log.info(":{}-:{} 更新db", threadId, threadName);
|
|
|
+// tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class), mqDto.getTimestamp());
|
|
|
+// mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
|
|
|
+// TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
|
|
|
+// tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
|
|
|
+// tmRocketMessageService.save(tmRocketMessage);
|
|
|
+// redisUtil.deleteSessionTopicList(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
|
|
|
+// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
// } else {
|
|
|
-// log.info("消息数据为空,重发消息");
|
|
|
-// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
+// log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
+// return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
// }
|
|
|
// }
|
|
|
// } catch (Exception e) {
|
|
|
// e.printStackTrace();
|
|
|
-// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
+// return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+// }
|
|
|
+// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
+// }
|
|
|
+//
|
|
|
+//// /**
|
|
|
+//// * 顺序消费
|
|
|
+//// *
|
|
|
+//// * @param msgs
|
|
|
+//// * @param consumeOrderlyContext
|
|
|
+//// * @return
|
|
|
+//// */
|
|
|
+//// @Override
|
|
|
+//// public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext
|
|
|
+//// consumeOrderlyContext) {
|
|
|
+//// try {
|
|
|
+//// for (MessageExt messageExt : msgs) {
|
|
|
+//// log.info("sessionConsumer重试次数:{}", messageExt.getReconsumeTimes());
|
|
|
+//// MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
|
|
|
+//// log.info("sessionConsumer接受到的消息:{}", JacksonUtil.parseJson(mqDto));
|
|
|
+//// log.info("mqDto sequence:{},tag:{}", mqDto.getSequence(), mqDto.getTag());
|
|
|
+//// MqDto redisMqdto = (MqDto) redisUtil.getSessionTopicList(mqDto.getId());
|
|
|
+//// if (Objects.nonNull(redisMqdto)) {
|
|
|
+//// if (Objects.nonNull(redisMqdto.getAck()) && redisMqdto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
|
|
|
+//// log.info("更新db");
|
|
|
+//// tbSessionService.saveSessionInfo(toJavaObject((JSON) mqDto.getBody(), TBSession.class), redisMqdto.getTimestamp());
|
|
|
+//// redisUtil.deleteSessionTopicList(redisMqdto.getId());
|
|
|
+//// return ConsumeOrderlyStatus.SUCCESS;
|
|
|
+//// } else {
|
|
|
+//// log.info("消息ack未确认,重发");
|
|
|
+//// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
+//// }
|
|
|
+//// } else {
|
|
|
+//// log.info("消息数据为空,重发消息");
|
|
|
+//// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
+//// }
|
|
|
+//// }
|
|
|
+//// } catch (Exception e) {
|
|
|
+//// e.printStackTrace();
|
|
|
+//// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
|
|
|
+//// }
|
|
|
+//// return ConsumeOrderlyStatus.SUCCESS;//成功
|
|
|
+//// }
|
|
|
+//
|
|
|
+// @Service
|
|
|
+// @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWebGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWebTag}")
|
|
|
+// public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public void onMessage(Message message) {
|
|
|
+// //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
+// }
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
+// defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
+// defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
+// defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
+//// defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
|
|
|
+// defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
|
|
|
// }
|
|
|
-// return ConsumeOrderlyStatus.SUCCESS;//成功
|
|
|
// }
|
|
|
-
|
|
|
- @Service
|
|
|
- @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWebGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWebTag}")
|
|
|
- public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onMessage(Message message) {
|
|
|
- //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
- defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
- defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
- defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
-// defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
|
|
|
- defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Service
|
|
|
- @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerPcGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicPcTag}")
|
|
|
- public class sessionConsumerPc implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onMessage(Message message) {
|
|
|
- //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
- defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
- defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
- defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
- defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Service
|
|
|
- @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappVideoGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappVideoTag}")
|
|
|
- public class sessionConsumerWxappVideo implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onMessage(Message message) {
|
|
|
- //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
- defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
- defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
- defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
- defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Service
|
|
|
- @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappAnswerGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappAnswerTag}")
|
|
|
- public class sessionConsumerWxappAnswer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onMessage(Message message) {
|
|
|
- //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
- defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
- defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
- defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
- defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 死信队列
|
|
|
- */
|
|
|
+//
|
|
|
// @Service
|
|
|
-// @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerGroupDlq}", topic = "${mq.config.sessionTopicDlq}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicTag}")
|
|
|
-// public class dlqSessionConsumer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
+// @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerPcGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicPcTag}")
|
|
|
+// public class sessionConsumerPc implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
//
|
|
|
// @Override
|
|
|
// public void onMessage(Message message) {
|
|
@@ -220,10 +163,67 @@ public class RocketSessionConsumer implements
|
|
|
//
|
|
|
// @Override
|
|
|
// public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
-// log.info("dlqSessionConsumer死信队列进来了");
|
|
|
// defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
+// defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
// defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
-// defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
|
|
|
+// defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
|
|
|
// }
|
|
|
// }
|
|
|
-}
|
|
|
+//
|
|
|
+// @Service
|
|
|
+// @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappMonitorGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappMonitorTag}")
|
|
|
+// public class sessionConsumerWxappMonitor implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public void onMessage(Message message) {
|
|
|
+// //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
+// }
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
+// defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
+// defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
+// defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
+// defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// @Service
|
|
|
+// @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappAnswerGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappAnswerTag}")
|
|
|
+// public class sessionConsumerWxappAnswer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public void onMessage(Message message) {
|
|
|
+// //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
+// }
|
|
|
+//
|
|
|
+// @Override
|
|
|
+// public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
+// defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
+// defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
|
|
+// defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
+// defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// /**
|
|
|
+// * 死信队列
|
|
|
+// */
|
|
|
+//// @Service
|
|
|
+//// @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerGroupDlq}", topic = "${mq.config.sessionTopicDlq}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicTag}")
|
|
|
+//// public class dlqSessionConsumer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
|
|
|
+////
|
|
|
+//// @Override
|
|
|
+//// public void onMessage(Message message) {
|
|
|
+//// //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
|
|
|
+//// }
|
|
|
+////
|
|
|
+//// @Override
|
|
|
+//// public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
|
|
|
+//// log.info("dlqSessionConsumer死信队列进来了");
|
|
|
+//// defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
|
|
|
+//// defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
|
|
|
+//// defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
|
|
|
+//// }
|
|
|
+//// }
|
|
|
+//}
|