|
@@ -28,6 +28,9 @@ import com.qmth.themis.business.util.RedisUtil;
|
|
|
import com.qmth.themis.common.contanst.Constants;
|
|
|
import com.qmth.themis.common.util.SimpleBeanUtil;
|
|
|
import com.qmth.themis.mq.service.MqLogicService;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
+import org.apache.rocketmq.common.message.MessageExt;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.stereotype.Service;
|
|
@@ -35,6 +38,7 @@ import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
import java.io.IOException;
|
|
|
+import java.lang.reflect.Method;
|
|
|
import java.util.Date;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -410,7 +414,7 @@ public class MqLogicServiceImpl implements MqLogicService {
|
|
|
*/
|
|
|
@Override
|
|
|
@Transactional
|
|
|
- public void execMqExamRecordPersistedLogic(MqDto mqDto, String key) {
|
|
|
+ public void execMqRecordPersistedLogic(MqDto mqDto, String key) {
|
|
|
Gson gson = new Gson();
|
|
|
Map<String, Object> param = (Map<String, Object>) mqDto.getBody();
|
|
|
Long recordId = (Long) param.get("recordId");
|
|
@@ -632,4 +636,91 @@ public class MqLogicServiceImpl implements MqLogicService {
|
|
|
tmRocketMessageService.saveOrUpdate(tmRocketMessage);
|
|
|
redisUtil.delete(key, mqDto.getId());
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 普通消息
|
|
|
+ *
|
|
|
+ * @param msgs
|
|
|
+ * @param consumeConcurrentlyContext
|
|
|
+ * @param mqExecTypeEnum
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext, MqExecTypeEnum mqExecTypeEnum) {
|
|
|
+ MqDto mqDto = null;
|
|
|
+ try {
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
+ for (MessageExt messageExt : msgs) {
|
|
|
+ log.info(":{}-:{} consumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
+ mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
+ log.info(":{}-:{} consumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
+ int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
+ if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
+ this.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
+ } else {
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
+ Method method = this.getClass().getDeclaredMethod(mqExecTypeEnum.getDesc(), MqDto.class, String.class);
|
|
|
+ log.info(":{}-:{} 准备执行mq exec:{}逻辑", threadId, threadName, method.getName());
|
|
|
+ method.invoke(this, mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
+ } else {
|
|
|
+ log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("mq普通消息监听,消息消费出错", e);
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ } finally {
|
|
|
+ if (Objects.nonNull(mqDto)) {
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 延时消息
|
|
|
+ *
|
|
|
+ * @param msgs
|
|
|
+ * @param consumeConcurrentlyContext
|
|
|
+ * @param mqExecTypeEnum
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessageDelay(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext, MqExecTypeEnum mqExecTypeEnum) {
|
|
|
+ MqDto mqDto = null;
|
|
|
+ try {
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
+ for (MessageExt messageExt : msgs) {
|
|
|
+ log.info(":{}-:{} consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
|
|
|
+ mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
+ log.info(":{}-:{} consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
|
|
|
+ int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
+ if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
+ this.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST);
|
|
|
+ } else {
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
+ Method method = this.getClass().getDeclaredMethod(mqExecTypeEnum.getDesc(), MqDto.class, String.class);
|
|
|
+ log.info(":{}-:{} 准备执行mq exec:{}逻辑", threadId, threadName, method.getName());
|
|
|
+ method.invoke(this, mqDto, SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST);
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
+ } else {
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("mq延时消息监听,消息消费出错", e);
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ } finally {
|
|
|
+ if (Objects.nonNull(mqDto)) {
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
|
|
|
+ }
|
|
|
}
|