wangliang 4 năm trước cách đây
mục cha
commit
cdd001b6f3

+ 61 - 0
themis-mq/src/main/java/com/qmth/themis/mq/service/MqLogicService.java

@@ -0,0 +1,61 @@
+package com.qmth.themis.mq.service;
+
+import com.qmth.themis.mq.dto.MqDto;
+
+/**
+ * @Description: mq执行逻辑
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/31
+ */
+public interface MqLogicService {
+
+    /**
+     * mq最大重试次数逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqMaxReconsumeTime(MqDto mqDto, String key);
+
+    /**
+     * 用户轨迹逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqUserLogLogic(MqDto mqDto, String key);
+
+    /**
+     * session逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqSessionLogic(MqDto mqDto, String key);
+
+    /**
+     * task逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqTaskLogic(MqDto mqDto, String key);
+
+    /**
+     * websocket超时退出逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqWebsocketUnNormalLogic(MqDto mqDto, String key);
+
+    /**
+     * 计算客观分逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqCalculateObjectiveScoreLogic(MqDto mqDto, String key);
+}

+ 243 - 0
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -0,0 +1,243 @@
+package com.qmth.themis.mq.service.impl;
+
+import com.google.gson.Gson;
+import com.qmth.themis.business.cache.ExamRecordCacheUtil;
+import com.qmth.themis.business.constant.SpringContextHolder;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TBSession;
+import com.qmth.themis.business.entity.TMRocketMessage;
+import com.qmth.themis.business.entity.TOeExamBreakHistory;
+import com.qmth.themis.business.entity.TOeExamRecord;
+import com.qmth.themis.business.enums.*;
+import com.qmth.themis.business.service.*;
+import com.qmth.themis.business.templete.TaskExportTemplete;
+import com.qmth.themis.business.templete.TaskImportTemplete;
+import com.qmth.themis.business.templete.impl.TaskExamPaperImportTemplete;
+import com.qmth.themis.business.templete.impl.TaskExamStudentImportTemplete;
+import com.qmth.themis.business.templete.impl.TaskRoomCodeExportTemplete;
+import com.qmth.themis.business.templete.impl.TaskRoomCodeImportTemplete;
+import com.qmth.themis.business.threadPool.MyThreadPool;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.service.MqLogicService;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * @Description: mq执行逻辑 impl
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/31
+ */
+@Service
+public class MqLogicServiceImpl implements MqLogicService {
+
+    @Resource
+    RedisUtil redisUtil;
+
+    @Resource
+    TMRocketMessageService tmRocketMessageService;
+
+    @Resource
+    TEUserLogService teUserLogService;
+
+    @Resource
+    TEExamStudentLogService teExamStudentLogService;
+
+    @Resource
+    TBSessionService tbSessionService;
+
+    @Resource
+    MyThreadPool myThreadPool;
+
+    @Resource
+    TOeExamRecordService tOeExamRecordService;
+
+    @Resource
+    TOeExamBreakHistoryService tOeExamBreakHistoryService;
+
+    @Resource
+    TOeExamRecordService examRecordService;
+
+    /**
+     * mq最大重试次数逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    @Override
+    @Transactional
+    public void execMqMaxReconsumeTime(MqDto mqDto, String key) {
+        //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
+        Gson gson = new Gson();
+        mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
+
+    /**
+     * 用户轨迹逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    @Override
+    @Transactional
+    public void execMqUserLogLogic(MqDto mqDto, String key) {
+        Gson gson = new Gson();
+        String tag = mqDto.getTag();
+        if (tag.contains("user")) {
+            teUserLogService.saveUserLogInfo(mqDto.getTimestamp(), mqDto.getObjId(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
+        } else if (tag.contains("student")) {
+            teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
+        }
+        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
+
+    /**
+     * session逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    @Override
+    @Transactional
+    public void execMqSessionLogic(MqDto mqDto, String key) {
+        Gson gson = new Gson();
+        tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class), mqDto.getTimestamp());
+        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
+
+    /**
+     * task逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    @Override
+    @Transactional
+    public void execMqTaskLogic(MqDto mqDto, String key) {
+        Gson gson = new Gson();
+        Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
+        String tag = mqDto.getTag();
+        myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
+            if (tag.contains("Import")) {
+                TaskImportTemplete taskImportTemplete = null;
+                if (tag.contains("examStudentImport")) {
+                    taskImportTemplete = SpringContextHolder.getBean(TaskExamStudentImportTemplete.class);
+                } else if (tag.contains("roomCodeImport")) {
+                    taskImportTemplete = SpringContextHolder.getBean(TaskRoomCodeImportTemplete.class);
+                } else if (tag.contains("examPaperImport")) {
+                    taskImportTemplete = SpringContextHolder.getBean(TaskExamPaperImportTemplete.class);
+                }
+                try {
+                    taskImportTemplete.importTask(map);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            } else {
+                TaskExportTemplete taskExportTemplete = null;
+                if (tag.contains("roomCodeExport")) {
+                    taskExportTemplete = SpringContextHolder.getBean(TaskRoomCodeExportTemplete.class);
+                }
+                try {
+                    taskExportTemplete.exportTask(map);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+        mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
+
+    /**
+     * websocket超时退出逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    @Override
+    @Transactional
+    public void execMqWebsocketUnNormalLogic(MqDto mqDto, String key) {
+        Gson gson = new Gson();
+        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);//表示成功处理
+        Map<String, Object> tranMap = mqDto.getProperties();
+        Long recordId = Long.parseLong(String.valueOf(tranMap.get("recordId")));
+        Date clientLastSyncTime = ExamRecordCacheUtil.getClientLastSyncTime(recordId);
+        if ((System.currentTimeMillis() - clientLastSyncTime.getTime()) / 1000 / 60 >= 2) {//大于等于当前时间,说明未重连或重登录
+            String deviceId = String.valueOf(tranMap.get("deviceId"));
+            String ip = String.valueOf(tranMap.get("ip"));
+            Long updateTime = Long.parseLong(String.valueOf(tranMap.get("updateTime")));
+            Date date = new Date();
+            date.setTime(updateTime);
+            TOeExamRecord tOeExamRecord = tOeExamRecordService.getById(recordId);
+            Integer breakCount = tOeExamRecord.getLeftBreakResumeCount();
+            if (Objects.isNull(breakCount) || breakCount <= 0) {
+                //todo 没有断点次数,则强制交卷
+                tOeExamRecord.setStatus(ExamRecordStatusEnum.finished);
+                tOeExamRecordService.updateById(tOeExamRecord);
+
+                //加入踢下线mq
+            } else {
+                breakCount--;
+                //增加断点记录
+                TOeExamBreakHistory tOeExamBreakHistory = new TOeExamBreakHistory(recordId, new Date(), BreakReasonEnum.NET_TIME_OUT, BreakReasonEnum.NET_TIME_OUT.name());
+                tOeExamBreakHistoryService.save(tOeExamBreakHistory);
+                //更新考试记录状态
+                tOeExamRecord.setClientCurrentIp(ip);
+                tOeExamRecord.setClientWebsocketId(deviceId);
+                tOeExamRecord.setClientWebsocketStatus(WebsocketStatusEnum.UN_ONLINE.ordinal());
+                tOeExamRecord.setClientLastSyncTime(date);
+                tOeExamRecord.setLastBreakId(tOeExamBreakHistory.getId());
+                tOeExamRecord.setLastBreakTime(tOeExamBreakHistory.getBreakTime());
+                tOeExamRecord.setLeftBreakResumeCount(breakCount);
+                tOeExamRecord.setStatus(ExamRecordStatusEnum.break_off);
+                tOeExamRecordService.updateById(tOeExamRecord);
+            }
+        }
+        teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setProp(JacksonUtil.parseJson(tmRocketMessage.getProperties()));
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
+
+    /**
+     * 计算客观分逻辑
+     *
+     * @param mqDto
+     * @param key
+     */
+    @Override
+    @Transactional
+    public void execMqCalculateObjectiveScoreLogic(MqDto mqDto, String key) {
+        Gson gson = new Gson();
+        Map<String, Object> param = (Map<String, Object>) mqDto.getBody();
+        examRecordService.calculateObjectiveScore(param);
+        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
+}

+ 0 - 22
themis-mq/src/main/java/com/qmth/themis/mq/templete/ExecMqLogic.java

@@ -1,22 +0,0 @@
-package com.qmth.themis.mq.templete;
-
-/**
- * @Description: mq 执行逻辑
- * @Param:
- * @return:
- * @Author: wangliang
- * @Date: 2020/7/28
- */
-public interface ExecMqLogic {
-
-    /**
-     * 最大次数
-     * @param key
-     */
-    public void execMqMaxReconsumeTime(String key);
-
-    /**
-     * 处理逻辑
-     */
-    public void execMqLogic();
-}

+ 5 - 25
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/CalculateObjectiveScoreConcurrentlyImpl.java

@@ -3,14 +3,13 @@ package com.qmth.themis.mq.templete.impl;
 import com.google.gson.Gson;
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.entity.TMRocketMessage;
 import com.qmth.themis.business.service.TMRocketMessageService;
 import com.qmth.themis.business.service.TOeExamRecordService;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.contanst.Constants;
-import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.service.MqLogicService;
 import com.qmth.themis.mq.templete.Concurrently;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -18,10 +17,8 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -36,17 +33,14 @@ public class CalculateObjectiveScoreConcurrentlyImpl implements Concurrently {
     private final static Logger log = LoggerFactory.getLogger(CalculateObjectiveScoreConcurrentlyImpl.class);
 
     @Override
-    @Transactional
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                     ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
-        TOeExamRecordService examRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
-        TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
+        MqLogicService mqLogicService = SpringContextHolder.getBean(MqLogicService.class);
         MqDto mqDto = null;
         try {
             long threadId = Thread.currentThread().getId();
             String threadName = Thread.currentThread().getName();
-            Gson gson = new Gson();
             for (MessageExt messageExt : msgs) {
                 log.debug(":{}-:{} CalculateObjectiveScore 重试次数:{}", threadId, threadName,
                         messageExt.getReconsumeTimes());
@@ -55,24 +49,14 @@ public class CalculateObjectiveScoreConcurrentlyImpl implements Concurrently {
                         JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                    //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
-                    mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
-                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                    redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
+                    mqLogicService.execMqWebsocketUnNormalLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
                             && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))
                             && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
                             SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                         log.debug(":{}-:{} 更新db", threadId, threadName);
-                        Map<String, Object> param = (Map<String, Object>) mqDto.getBody();
-                        examRecordService.calculateObjectiveScore(param);
-                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-                        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
-                        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                        redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
+                        mqLogicService.execMqCalculateObjectiveScoreLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                     } else {
                         log.debug(":{}-:{} 消息ack未确认,重发", threadId, threadName);
@@ -82,11 +66,7 @@ public class CalculateObjectiveScoreConcurrentlyImpl implements Concurrently {
             }
         } catch (Exception e) {
             e.printStackTrace();
-            if (e instanceof BusinessException) {
-                throw new BusinessException(e.getMessage());
-            } else {
-                throw new RuntimeException(e);
-            }
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
         } finally {
             if (Objects.nonNull(mqDto)) {
                 redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());

+ 5 - 26
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/SessionConcurrentlyImpl.java

@@ -3,8 +3,6 @@ package com.qmth.themis.mq.templete.impl;
 import com.google.gson.Gson;
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.entity.TBSession;
-import com.qmth.themis.business.entity.TMRocketMessage;
 import com.qmth.themis.business.service.TBSessionService;
 import com.qmth.themis.business.service.TMRocketMessageService;
 import com.qmth.themis.business.util.JacksonUtil;
@@ -12,18 +10,15 @@ import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.contanst.Constants;
 import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.service.MqLogicService;
 import com.qmth.themis.mq.templete.Concurrently;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
 
-import javax.annotation.Resource;
 import java.util.List;
 import java.util.Objects;
 
@@ -39,36 +34,24 @@ public class SessionConcurrentlyImpl implements Concurrently {
     private final static Logger log = LoggerFactory.getLogger(SessionConcurrentlyImpl.class);
 
     @Override
-    @Transactional
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
-        TBSessionService tbSessionService = SpringContextHolder.getBean(TBSessionService.class);
-        TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
+        MqLogicService mqLogicService = SpringContextHolder.getBean(MqLogicService.class);
         MqDto mqDto = null;
         try {
             long threadId = Thread.currentThread().getId();
             String threadName = Thread.currentThread().getName();
-            Gson gson = new Gson();
             for (MessageExt messageExt : msgs) {
                 log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
                 mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                    //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
-                    mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
-                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                    redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                         log.info(":{}-:{} 更新db", threadId, threadName);
-                        tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class), mqDto.getTimestamp());
-                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-                        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
-                        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                        redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
+                        mqLogicService.execMqSessionLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                     } else {
                         log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
@@ -78,11 +61,7 @@ public class SessionConcurrentlyImpl implements Concurrently {
             }
         } catch (Exception e) {
             e.printStackTrace();
-            if (e instanceof BusinessException) {
-                throw new BusinessException(e.getMessage());
-            } else {
-                throw new RuntimeException(e);
-            }
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
         } finally {
             if (Objects.nonNull(mqDto)) {
                 redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());

+ 5 - 67
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/TaskConcurrentlyImpl.java

@@ -1,26 +1,12 @@
 package com.qmth.themis.mq.templete.impl;
 
-import com.google.gson.Gson;
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.entity.TMRocketMessage;
-import com.qmth.themis.business.enums.MqEnum;
-import com.qmth.themis.business.enums.SystemOperationEnum;
-import com.qmth.themis.business.service.TBSessionService;
-import com.qmth.themis.business.service.TEExamStudentLogService;
-import com.qmth.themis.business.service.TEUserLogService;
-import com.qmth.themis.business.service.TMRocketMessageService;
-import com.qmth.themis.business.templete.TaskExportTemplete;
-import com.qmth.themis.business.templete.TaskImportTemplete;
-import com.qmth.themis.business.templete.impl.TaskExamStudentImportTemplete;
-import com.qmth.themis.business.templete.impl.TaskRoomCodeExportTemplete;
-import com.qmth.themis.business.templete.impl.TaskRoomCodeImportTemplete;
-import com.qmth.themis.business.threadPool.MyThreadPool;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.contanst.Constants;
-import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.service.MqLogicService;
 import com.qmth.themis.mq.templete.Concurrently;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -28,12 +14,8 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
 
-import javax.annotation.Resource;
-import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -48,63 +30,23 @@ public class TaskConcurrentlyImpl implements Concurrently {
     private final static Logger log = LoggerFactory.getLogger(TaskConcurrentlyImpl.class);
 
     @Override
-    @Transactional
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
-        MyThreadPool myThreadPool = SpringContextHolder.getBean(MyThreadPool.class);
-        TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
+        MqLogicService mqLogicService = SpringContextHolder.getBean(MqLogicService.class);
         MqDto mqDto = null;
         try {
             long threadId = Thread.currentThread().getId();
             String threadName = Thread.currentThread().getName();
-            Gson gson = new Gson();
             for (MessageExt messageExt : msgs) {
                 log.info(":{}-:{} task Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
                 mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} task Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                    //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
-                    mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
-                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                    redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
-                        String tag = mqDto.getTag();
-                        myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
-                            if (tag.contains("Import")) {
-                                TaskImportTemplete taskImportTemplete = null;
-                                if (tag.contains("examStudentImport")) {
-                                    taskImportTemplete = new TaskExamStudentImportTemplete();
-                                } else if (tag.contains("roomCodeImport")) {
-                                    taskImportTemplete = new TaskRoomCodeImportTemplete();
-                                } else if (tag.contains("examPaperImport")) {
-                                    taskImportTemplete = SpringContextHolder.getBean("taskExamPaperImportTemplete");
-                                }
-                                try {
-                                    taskImportTemplete.importTask(map);
-                                } catch (IOException e) {
-                                    e.printStackTrace();
-                                }
-                            } else {
-                                TaskExportTemplete taskExportTemplete = null;
-                                if (tag.contains("roomCodeExport")) {
-                                    taskExportTemplete = new TaskRoomCodeExportTemplete();
-                                }
-                                try {
-                                    taskExportTemplete.exportTask(map);
-                                } catch (IOException e) {
-                                    e.printStackTrace();
-                                }
-                            }
-                        });
-                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-                        mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
-                        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                        redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
+                        mqLogicService.execMqTaskLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                     } else {
                         log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
@@ -114,11 +56,7 @@ public class TaskConcurrentlyImpl implements Concurrently {
             }
         } catch (Exception e) {
             e.printStackTrace();
-            if (e instanceof BusinessException) {
-                throw new BusinessException(e.getMessage());
-            } else {
-                throw new RuntimeException(e);
-            }
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
         } finally {
             if (Objects.nonNull(mqDto)) {
                 redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());

+ 7 - 52
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/UserLogConcurrentlyImpl.java

@@ -1,28 +1,19 @@
 package com.qmth.themis.mq.templete.impl;
 
-import com.google.gson.Gson;
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.entity.TMRocketMessage;
-import com.qmth.themis.business.enums.MqEnum;
-import com.qmth.themis.business.enums.SystemOperationEnum;
-import com.qmth.themis.business.service.TEExamStudentLogService;
-import com.qmth.themis.business.service.TEUserLogService;
-import com.qmth.themis.business.service.TMRocketMessageService;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.contanst.Constants;
-import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.service.MqLogicService;
 import com.qmth.themis.mq.templete.Concurrently;
-import com.qmth.themis.mq.templete.ExecMqLogic;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
 import java.util.Objects;
@@ -37,34 +28,26 @@ import java.util.Objects;
 @Service
 public class UserLogConcurrentlyImpl implements Concurrently {
     private final static Logger log = LoggerFactory.getLogger(UserLogConcurrentlyImpl.class);
-    private RedisUtil redisUtil = null;
-    private TEUserLogService teUserLogService = null;
-    private TEExamStudentLogService teExamStudentLogService = null;
-    private TMRocketMessageService tmRocketMessageService = null;
-    private Gson gson = null;
-    private MqDto mqDto = null;
 
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
-        redisUtil = SpringContextHolder.getBean(RedisUtil.class);
-        teUserLogService = SpringContextHolder.getBean(TEUserLogService.class);
-        teExamStudentLogService = SpringContextHolder.getBean(TEExamStudentLogService.class);
-        tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
+        RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
+        MqLogicService mqLogicService = SpringContextHolder.getBean(MqLogicService.class);
+        MqDto mqDto = null;
         try {
             long threadId = Thread.currentThread().getId();
             String threadName = Thread.currentThread().getName();
-            gson = new Gson();
             for (MessageExt messageExt : msgs) {
                 mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} logConsumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
                 log.info(":{}-:{} logConsumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                    this.execMqMaxReconsumeTime(SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                         log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
-                        this.execMqLogic();
+                        mqLogicService.execMqUserLogLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                     } else {
                         log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
@@ -74,11 +57,7 @@ public class UserLogConcurrentlyImpl implements Concurrently {
             }
         } catch (Exception e) {
             e.printStackTrace();
-            if (e instanceof BusinessException) {
-                throw new BusinessException(e.getMessage());
-            } else {
-                throw new RuntimeException(e);
-            }
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
         } finally {
             if (Objects.nonNull(mqDto)) {
                 redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
@@ -86,28 +65,4 @@ public class UserLogConcurrentlyImpl implements Concurrently {
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
     }
-
-    @Transactional
-    public void execMqMaxReconsumeTime(String key) {
-        //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
-        mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
-        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-        redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
-    }
-
-    @Transactional
-    public void execMqLogic() {
-        String tag = mqDto.getTag();
-        if (tag.contains("user")) {
-            teUserLogService.saveUserLogInfo(mqDto.getTimestamp(), mqDto.getObjId(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
-        } else if (tag.contains("student")) {
-            teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
-        }
-        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-        redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
-        int i = 1/0;
-    }
 }

+ 8 - 71
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/WebsocketUnNormalConcurrentlyImpl.java

@@ -1,23 +1,12 @@
 package com.qmth.themis.mq.templete.impl;
 
 import com.alibaba.fastjson.JSONObject;
-import com.google.gson.Gson;
-import com.qmth.themis.business.cache.ExamRecordCacheUtil;
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.entity.TMRocketMessage;
-import com.qmth.themis.business.entity.TOeExamBreakHistory;
-import com.qmth.themis.business.entity.TOeExamRecord;
-import com.qmth.themis.business.enums.*;
-import com.qmth.themis.business.service.TEExamStudentLogService;
-import com.qmth.themis.business.service.TMRocketMessageService;
-import com.qmth.themis.business.service.TOeExamBreakHistoryService;
-import com.qmth.themis.business.service.TOeExamRecordService;
-import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.contanst.Constants;
-import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.service.MqLogicService;
 import com.qmth.themis.mq.templete.Concurrently;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -27,7 +16,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -44,18 +32,13 @@ public class WebsocketUnNormalConcurrentlyImpl implements Concurrently {
     private final static Logger log = LoggerFactory.getLogger(WebsocketUnNormalConcurrentlyImpl.class);
 
     @Override
-    @Transactional
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
-        TEExamStudentLogService teExamStudentLogService = SpringContextHolder.getBean(TEExamStudentLogService.class);
-        TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
-        TOeExamRecordService tOeExamRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
-        TOeExamBreakHistoryService tOeExamBreakHistoryService = SpringContextHolder.getBean(TOeExamBreakHistoryService.class);
+        MqLogicService mqLogicService = SpringContextHolder.getBean(MqLogicService.class);
         MqDto mqDto = null;
         try {
             long threadId = Thread.currentThread().getId();
             String threadName = Thread.currentThread().getName();
-            Gson gson = new Gson();
             for (MessageExt messageExt : msgs) {
                 log.info(":{}-:{} websocket un normal logConsumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
                 String body = new String(messageExt.getBody(), Constants.CHARSET_NAME);
@@ -66,66 +49,20 @@ public class WebsocketUnNormalConcurrentlyImpl implements Concurrently {
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
                     //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
-                    mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
-                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                    tmRocketMessage.setProp(JacksonUtil.parseJson(tmRocketMessage.getProperties()));
-                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                    redisUtil.delete(SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST, mqDto.getId());
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST);
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);//表示成功处理
-                        Map<String, Object> tranMap = mqDto.getProperties();
-                        Long recordId = Long.parseLong(String.valueOf(tranMap.get("recordId")));
-                        Date clientLastSyncTime = ExamRecordCacheUtil.getClientLastSyncTime(recordId);
-                        if ((System.currentTimeMillis() - clientLastSyncTime.getTime()) / 1000 / 60 >= 2) {//大于等于当前时间,说明未重连或重登录
-                            String deviceId = String.valueOf(tranMap.get("deviceId"));
-                            String ip = String.valueOf(tranMap.get("ip"));
-                            Long updateTime = Long.parseLong(String.valueOf(tranMap.get("updateTime")));
-                            Date date = new Date();
-                            date.setTime(updateTime);
-                            TOeExamRecord tOeExamRecord = tOeExamRecordService.getById(recordId);
-                            Integer breakCount = tOeExamRecord.getLeftBreakResumeCount();
-                            if (Objects.isNull(breakCount) || breakCount <= 0) {
-                                //todo 没有断点次数,则强制交卷
-                                tOeExamRecord.setStatus(ExamRecordStatusEnum.finished);
-                                tOeExamRecordService.updateById(tOeExamRecord);
-
-                                //加入踢下线mq
-                            } else {
-                                breakCount--;
-                                //增加断点记录
-                                TOeExamBreakHistory tOeExamBreakHistory = new TOeExamBreakHistory(recordId, new Date(), BreakReasonEnum.NET_TIME_OUT, BreakReasonEnum.NET_TIME_OUT.name());
-                                tOeExamBreakHistoryService.save(tOeExamBreakHistory);
-                                //更新考试记录状态
-                                tOeExamRecord.setClientCurrentIp(ip);
-                                tOeExamRecord.setClientWebsocketId(deviceId);
-                                tOeExamRecord.setClientWebsocketStatus(WebsocketStatusEnum.UN_ONLINE.ordinal());
-                                tOeExamRecord.setClientLastSyncTime(date);
-                                tOeExamRecord.setLastBreakId(tOeExamBreakHistory.getId());
-                                tOeExamRecord.setLastBreakTime(tOeExamBreakHistory.getBreakTime());
-                                tOeExamRecord.setLeftBreakResumeCount(breakCount);
-                                tOeExamRecord.setStatus(ExamRecordStatusEnum.break_off);
-                                tOeExamRecordService.updateById(tOeExamRecord);
-                            }
-                        }
+                        log.info(":{}-:{} 插入学生轨迹日志", threadId, threadName);
+                        mqLogicService.execMqWebsocketUnNormalLogic(mqDto, SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST);
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                     } else {
-                        mqDto.setAck(SystemConstant.INDIVIDUAL_ACK_TYPE);//表示考生已重新登录
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
                     }
-                    log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
-                    teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
-                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                    tmRocketMessage.setProp(JacksonUtil.parseJson(tmRocketMessage.getProperties()));
-                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                    redisUtil.delete(SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST, mqDto.getId());
                 }
             }
         } catch (Exception e) {
             e.printStackTrace();
-            if (e instanceof BusinessException) {
-                throw new BusinessException(e.getMessage());
-            } else {
-                throw new RuntimeException(e);
-            }
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
         } finally {
             if (Objects.nonNull(mqDto)) {
                 redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());