|
@@ -0,0 +1,77 @@
|
|
|
+package com.qmth.themis.mq.templete.impl;
|
|
|
+
|
|
|
+import java.util.List;
|
|
|
+import java.util.Objects;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
+import org.apache.rocketmq.common.message.MessageExt;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import com.qmth.themis.business.constant.SystemConstant;
|
|
|
+import com.qmth.themis.business.util.JacksonUtil;
|
|
|
+import com.qmth.themis.business.util.RedisUtil;
|
|
|
+import com.qmth.themis.common.contanst.Constants;
|
|
|
+import com.qmth.themis.mq.dto.MqDto;
|
|
|
+import com.qmth.themis.mq.service.MqLogicService;
|
|
|
+import com.qmth.themis.mq.templete.Concurrently;
|
|
|
+
|
|
|
+/**考试记录数据持久化
|
|
|
+ * @Description:
|
|
|
+ * @Author: xiatian
|
|
|
+ * @Date: 2020-08-04
|
|
|
+ */
|
|
|
+@Service
|
|
|
+public class ExamRecordPersistedConcurrentlyImpl implements Concurrently {
|
|
|
+ private final static Logger log = LoggerFactory.getLogger(ExamRecordPersistedConcurrentlyImpl.class);
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ RedisUtil redisUtil;
|
|
|
+ @Resource
|
|
|
+ MqLogicService mqLogicService;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
|
|
|
+ ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
|
|
+ MqDto mqDto = null;
|
|
|
+ try {
|
|
|
+ long threadId = Thread.currentThread().getId();
|
|
|
+ String threadName = Thread.currentThread().getName();
|
|
|
+ for (MessageExt messageExt : msgs) {
|
|
|
+ log.debug(":{}-:{} CalculateObjectiveScore 重试次数:{}", threadId, threadName,
|
|
|
+ messageExt.getReconsumeTimes());
|
|
|
+ mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
|
|
|
+ log.debug(":{}-:{} CalculateObjectiveScore 接收到的消息:{}", threadId, threadName,
|
|
|
+ JacksonUtil.parseJson(mqDto));
|
|
|
+ int reconsumeTime = messageExt.getReconsumeTimes();
|
|
|
+ if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
|
|
|
+ mqLogicService.execMqWebsocketUnNormalLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
+ } else {
|
|
|
+ if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
|
|
|
+ && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))
|
|
|
+ && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
|
|
|
+ SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
|
|
|
+ log.debug(":{}-:{} 更新db", threadId, threadName);
|
|
|
+ mqLogicService.execMqFaceVerifySaveLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
+ } else {
|
|
|
+ log.debug(":{}-:{} 消息ack未确认,重发", threadId, threadName);
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("人脸验证保存,消息消费出错",e);
|
|
|
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
|
|
|
+ } finally {
|
|
|
+ if (Objects.nonNull(mqDto)) {
|
|
|
+ redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
|
|
|
+ }
|
|
|
+}
|