wangliang 4 роки тому
батько
коміт
5234593b32

+ 1 - 0
themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java

@@ -41,6 +41,7 @@ public class SystemConstant {
     public static final String STUDENT = "student:";
     public static final String SESSION = "session:";
     public static final String ORG = "org:cache:";
+    public static final String EXAM_BREAK = "exam:break:";
     public static final String LINK = "LINK";
     public static final String MENU = "MENU";
     public static final String ALL_PATH = "/**";

+ 2 - 3
themis-business/src/main/java/com/qmth/themis/business/enums/MqEnum.java

@@ -1,7 +1,5 @@
 package com.qmth.themis.business.enums;
 
-import java.util.Objects;
-
 /**
  * @Description: mq enum
  * @Param:
@@ -20,7 +18,8 @@ public enum MqEnum {
     TASK_LOG(6, "任务"),
     QUARTZ_LOG(7, "quartz任务"),
     EXAM(8, "考生端消息"),
-    WEBSOCKET_OFFLINE_LOG(9, "websocket强行离线(交卷)"),
+    EXAM_BREAK(9, "考生断点记录"),
+//    WEBSOCKET_OFFLINE_LOG(9, "websocket强行离线(交卷)"),
     WEBSOCKET_IM_CLUSTERING_LOG(10, "websocket点对点发送消息"),
     WEBSOCKET_IM_BROADCASTING_LOG(11, "websocket广播发送消息"),
     WEBSOCKET_MONITOR_LOG(12, "监考强制离线(交卷)"),

+ 0 - 16
themis-business/src/main/java/com/qmth/themis/business/service/impl/TOeExamRecordServiceImpl.java

@@ -72,22 +72,6 @@ public class TOeExamRecordServiceImpl extends ServiceImpl<TOeExamRecordMapper, T
     public Map getUnFinishExam(Long studentId, Long examId, Long orgId) {
         TEExamUnFinishDto teExamUnFinishDto = tOeExamRecordMapper.getUnFinishExam(studentId, examId, orgId);
         if (Objects.nonNull(teExamUnFinishDto)) {
-//            //获取最近同步时间
-//            Date clientLastSyncTime = ExamRecordCacheUtil.getClientLastSyncTime(teExamUnFinishDto.getRecordId());
-//            //获取剩余断点次数
-//            Integer leftBreakResumeCount = ExamRecordCacheUtil.getLeftBreakResumeCount(teExamUnFinishDto.getRecordId());
-//            //如果断点时间大于整体断点时间,则强制交卷
-//            if ((System.currentTimeMillis() - clientLastSyncTime.getTime() / 1000) > teExamUnFinishDto.getBreakExpireSeconds()) {
-//                teExamService.finish(teExamUnFinishDto.getExamStudentId(), teExamUnFinishDto.getRecordId(), FinishTypeEnum.AUTO.name(), (int) (((System.currentTimeMillis() - teExamUnFinishDto.getClientLastSyncTime().getTime()) / 1000) + teExamUnFinishDto.getDurationSeconds()));
-//            } else {//否则断点次数加1
-//                leftBreakResumeCount++;
-//                //如果断点次数超过了考试整体断点次数,也强制交卷
-//                if (leftBreakResumeCount > teExamUnFinishDto.getBreakResumeCount()) {
-//                    teExamService.finish(teExamUnFinishDto.getExamStudentId(), teExamUnFinishDto.getRecordId(), FinishTypeEnum.AUTO.name(), (int) (((System.currentTimeMillis() - teExamUnFinishDto.getClientLastSyncTime().getTime()) / 1000) + teExamUnFinishDto.getDurationSeconds()));
-//                } else {
-//                    //发送mq,更新考试记录和加入断点次数记录
-//                }
-//            }
             Map finalMap = new HashMap();
             Map<String, Object> waitingMap = new HashMap();
             waitingMap.put("id", teExamUnFinishDto.getId());

+ 35 - 8
themis-exam/src/main/java/com/qmth/themis/exam/api/TEStudentController.java

@@ -16,9 +16,7 @@ import com.qmth.themis.business.dto.response.TEExamUnFinishDto;
 import com.qmth.themis.business.entity.TBSession;
 import com.qmth.themis.business.entity.TEConfig;
 import com.qmth.themis.business.entity.TEStudent;
-import com.qmth.themis.business.enums.MqEnum;
-import com.qmth.themis.business.enums.RoleEnum;
-import com.qmth.themis.business.enums.SystemOperationEnum;
+import com.qmth.themis.business.enums.*;
 import com.qmth.themis.business.service.*;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
@@ -34,6 +32,7 @@ import com.qmth.themis.common.signature.SignatureType;
 import com.qmth.themis.common.util.AesUtil;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
+import com.qmth.themis.mq.dto.MqDto;
 import com.qmth.themis.mq.enums.MqTagEnum;
 import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
@@ -179,20 +178,48 @@ public class TEStudentController {
         String test = SignatureInfo.build(SignatureType.TOKEN, sessionId, token);
         Map<String, Object> map = new HashMap<>();
         //获取未完考试
-        if(Objects.isNull(teStudentCacheDto.getUnFinishedRecordId())){
+        if (Objects.isNull(teStudentCacheDto.getUnFinishedRecordId())) {
             //获取待考列表
             List<TEExamDto> list = teExamService.getWaitingExam(teStudent.getId(), examId, orgId);
             if (Objects.nonNull(list) && list.size() > 0) {
                 map.put("waiting", list);
             }
-        }else{
+        } else {
+            Long recordId = teStudentCacheDto.getUnFinishedRecordId();
             //获取考试记录缓存
-            Map<String,Object> objectMap = redisUtil.getHashEntries(RedisKeyHelper.examRecordCacheKey(teStudentCacheDto.getUnFinishedRecordId()));
+            Map<String, Object> objectMap = redisUtil.getHashEntries(RedisKeyHelper.examRecordCacheKey(recordId));
+            Long examIdMap = Long.parseLong(String.valueOf(objectMap.get("examId")));
+            Long examStudentId = Long.parseLong(String.valueOf(objectMap.get("examStudentId")));
+            Integer durationSeconds = Integer.parseInt(String.valueOf(objectMap.get("durationSeconds")));
             //获取考试缓存
-            ExamCacheBean ec = teExamService.getExamCacheBean(Long.parseLong(String.valueOf(objectMap.get("examId"))));
+            ExamCacheBean ec = teExamService.getExamCacheBean(examIdMap);
+
+            //获取最近同步时间
+            Date clientLastSyncTime = ExamRecordCacheUtil.getClientLastSyncTime(recordId);
+            //获取剩余断点次数
+            Integer leftBreakResumeCount = ExamRecordCacheUtil.getLeftBreakResumeCount(recordId);
+            //如果断点时间大于整体断点时间,则强制交卷
+            if ((System.currentTimeMillis() - clientLastSyncTime.getTime() / 1000) > ec.getBreakExpireSeconds()) {
+                teExamService.finish(examStudentId, recordId, FinishTypeEnum.AUTO.name(), durationSeconds);
+            } else {//否则断点次数减1
+                leftBreakResumeCount--;
+                //如果断点次数超过了考试整体断点次数,也强制交卷
+                if (leftBreakResumeCount > ec.getBreakResumeCount()) {
+                    teExamService.finish(examStudentId, recordId, FinishTypeEnum.AUTO.name(), durationSeconds);
+                } else {
+                    //更新考试记录
+                    objectMap.put("status", ExamRecordStatusEnum.RESUME_PREPARE.name());
+                    objectMap.put("lastBreakTime", new Date());
+                    objectMap.put("leftBreakResumeCount", leftBreakResumeCount);
+
+                    //发送mq,增加断点次数记录
+                    MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.examBreakHistory.name(), JacksonUtil.parseJson(objectMap), MqEnum.EXAM_BREAK, String.valueOf(recordId), "增加断点记录");
+                    mqDtoService.assembleSendOneWayMsg(mqDto);
+                }
+            }
             Map finalMap = new HashMap();
             finalMap.put("waiting", ec);
-            finalMap.put("activity",objectMap);
+            finalMap.put("activity", objectMap);
             map.put("unFinished", finalMap);
         }
 //        Map unFinishExam = tOeExamRecordService.getUnFinishExam(teStudent.getId(), examId, orgId);

+ 7 - 10
themis-exam/src/main/java/com/qmth/themis/exam/start/StartRunning.java

@@ -4,6 +4,7 @@ import javax.annotation.Resource;
 
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.exam.websocket.WebSocketOeServer;
+import com.qmth.themis.mq.templete.impl.*;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -16,12 +17,6 @@ import com.qmth.themis.mq.enums.MqGroupEnum;
 import com.qmth.themis.mq.enums.MqTagEnum;
 import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.listener.RocketMessageConsumer;
-import com.qmth.themis.mq.templete.impl.CalculateObjectiveScoreConcurrentlyImpl;
-import com.qmth.themis.mq.templete.impl.FaceVerifyConcurrentlyImpl;
-import com.qmth.themis.mq.templete.impl.SessionConcurrentlyImpl;
-import com.qmth.themis.mq.templete.impl.TaskConcurrentlyImpl;
-import com.qmth.themis.mq.templete.impl.UserLogConcurrentlyImpl;
-import com.qmth.themis.mq.templete.impl.WebsocketUnNormalConcurrentlyImpl;
 
 /**
  * @Description: 服务启动时初始化运行,哪个微服务模块需要则拿此模版去用
@@ -91,12 +86,14 @@ public class StartRunning implements CommandLineRunner {
         rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.faceVerifySaveGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.faceVerifySave.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(FaceVerifyConcurrentlyImpl.class));
         
         //活体验证保存
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.livenessVerifySaveGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.livenessVerifySave.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(FaceVerifyConcurrentlyImpl.class));
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.livenessVerifySaveGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.livenessVerifySave.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(LivenessVerifyConcurrentlyImpl.class));
         
         //考试记录数据持久化
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.examRecordPersistedGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.examRecordPersisted.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(FaceVerifyConcurrentlyImpl.class));
-        
-        
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.examRecordPersistedGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.examRecordPersisted.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamRecordPersistedConcurrentlyImpl.class));
+
+        //考试断点记录
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.examBreakHistoryGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.examBreakHistory.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamBreakHistoryConcurrentlyImpl.class));
+
         SystemConstant.initTempFiles();
         log.info("服务器启动时执行 end");
     }

+ 7 - 2
themis-mq/src/main/java/com/qmth/themis/mq/enums/MqGroupEnum.java

@@ -97,11 +97,16 @@ public enum MqGroupEnum {
      * 活体验证
      */
     livenessVerifySaveGroup("themis-group-exam-livenessVerifySave"),
-    
+
     /**
      * 考试记录数据持久化
      */
-    examRecordPersistedGroup("themis-group-exam-examRecordPersisted");
+    examRecordPersistedGroup("themis-group-exam-examRecordPersisted"),
+
+    /**
+     * 考试断点记录
+     */
+    examBreakHistoryGroup("themis-group-exam-examBreakHistory");
 
     private MqGroupEnum(String code) {
         this.code = code;

+ 2 - 1
themis-mq/src/main/java/com/qmth/themis/mq/enums/MqTagEnum.java

@@ -28,7 +28,8 @@ public enum MqTagEnum {
     calculateObjectiveScore("计算客观分标签"),
     faceVerifySave("人脸验证保存"),
     livenessVerifySave("活体验证保存"),
-    examRecordPersisted("考试记录数据持久化");
+    examRecordPersisted("考试记录数据持久化"),
+    examBreakHistory("考试断点记录");
 
     private MqTagEnum(String code) {
         this.code = code;

+ 30 - 15
themis-mq/src/main/java/com/qmth/themis/mq/service/MqLogicService.java

@@ -59,20 +59,35 @@ public interface MqLogicService {
      */
     public void execMqCalculateObjectiveScoreLogic(MqDto mqDto, String key);
 
-	/**人脸验证保存
-	 * @param mqDto
-	 * @param key
-	 */
-	public void execMqFaceVerifySaveLogic(MqDto mqDto, String key);
-	
-	/**活体验证保存
-	 * @param mqDto
-	 * @param key
-	 */
-	public void execMqLivenessVerifySaveLogic(MqDto mqDto, String key);
+    /**
+     * 人脸验证保存
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqFaceVerifySaveLogic(MqDto mqDto, String key);
+
+    /**
+     * 活体验证保存
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqLivenessVerifySaveLogic(MqDto mqDto, String key);
 
-	/**
-	 *考试记录数据持久化
-	 */
-	void execMqExamRecordPersistedLogic(MqDto mqDto, String key);
+    /**
+     * 考试记录数据持久化
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqExamRecordPersistedLogic(MqDto mqDto, String key);
+
+    /**
+     * 增加考试断点记录
+     *
+     * @param mqDto
+     * @param key
+     */
+    public void execMqExamBreakHistoryLogic(MqDto mqDto, String key);
 }

+ 112 - 41
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -1,26 +1,19 @@
 package com.qmth.themis.mq.service.impl;
 
-import java.io.IOException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-import javax.annotation.Resource;
-
-import com.qmth.themis.business.enums.*;
-import com.qmth.themis.business.service.*;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.google.gson.Gson;
 import com.qmth.themis.business.cache.ExamRecordCacheUtil;
+import com.qmth.themis.business.cache.RedisKeyHelper;
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.entity.TBSession;
 import com.qmth.themis.business.entity.TMRocketMessage;
 import com.qmth.themis.business.entity.TOeExamBreakHistory;
-import com.qmth.themis.business.entity.TOeExamRecord;
+import com.qmth.themis.business.enums.BreakReasonEnum;
+import com.qmth.themis.business.enums.FinishTypeEnum;
+import com.qmth.themis.business.enums.MqEnum;
+import com.qmth.themis.business.enums.SystemOperationEnum;
+import com.qmth.themis.business.service.*;
 import com.qmth.themis.business.templete.TaskExportTemplete;
 import com.qmth.themis.business.templete.TaskImportTemplete;
 import com.qmth.themis.business.templete.impl.TaskExamPaperImportTemplete;
@@ -32,6 +25,12 @@ import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.mq.dto.MqDto;
 import com.qmth.themis.mq.service.MqLogicService;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+import java.util.*;
 
 /**
  * @Description: mq执行逻辑 impl
@@ -75,12 +74,10 @@ public class MqLogicServiceImpl implements MqLogicService {
 
     @Resource
     TOeLivenessVerifyHistoryService livenessVerifyHistoryService;
-
     @Resource
     TEExamService teExamService;
 
-    /**
-     * mq最大重试次数逻辑
+    /*** mq最大重试次数逻辑
      *
      * @param mqDto
      * @param key
@@ -205,28 +202,51 @@ public class MqLogicServiceImpl implements MqLogicService {
             Long updateTime = Long.parseLong(String.valueOf(tranMap.get("updateTime")));
             Date date = new Date();
             date.setTime(updateTime);
-            TOeExamRecord tOeExamRecord = tOeExamRecordService.getById(recordId);
-            Integer breakCount = tOeExamRecord.getLeftBreakResumeCount();
-            if (Objects.isNull(breakCount) || breakCount <= 0) {
-                tOeExamRecord.setStatus(ExamRecordStatusEnum.FINISHED);
-                tOeExamRecordService.updateById(tOeExamRecord);
-                teExamService.finish(tOeExamRecord.getExamStudentId(), tOeExamRecord.getId(), FinishTypeEnum.AUTO.name(), (int) (((System.currentTimeMillis() - tOeExamRecord.getClientLastSyncTime().getTime()) / 1000) + tOeExamRecord.getDurationSeconds()));
-            } else {
-                breakCount--;
-                //增加断点记录
-                TOeExamBreakHistory tOeExamBreakHistory = new TOeExamBreakHistory(recordId, new Date(), BreakReasonEnum.NET_TIME_OUT, BreakReasonEnum.NET_TIME_OUT.name());
-                tOeExamBreakHistoryService.save(tOeExamBreakHistory);
-                //更新考试记录状态
-                tOeExamRecord.setClientCurrentIp(ip);
-                tOeExamRecord.setClientWebsocketId(deviceId);
-                tOeExamRecord.setClientWebsocketStatus(WebsocketStatusEnum.UN_ONLINE.ordinal());
-                tOeExamRecord.setClientLastSyncTime(date);
-                tOeExamRecord.setLastBreakId(tOeExamBreakHistory.getId());
-                tOeExamRecord.setLastBreakTime(tOeExamBreakHistory.getBreakTime());
-                tOeExamRecord.setLeftBreakResumeCount(breakCount);
-                tOeExamRecord.setStatus(ExamRecordStatusEnum.bREAK_OFF);
-                tOeExamRecordService.updateById(tOeExamRecord);
+//            TOeExamRecord tOeExamRecord = tOeExamRecordService.getById(recordId);
+            Map<String, Object> objectMap = redisUtil.getHashEntries(RedisKeyHelper.examRecordCacheKey(recordId));
+//            Integer breakCount = tOeExamRecord.getLeftBreakResumeCount();
+            Integer leftBreakResumeCount = ExamRecordCacheUtil.getLeftBreakResumeCount(recordId);
+            Long examId = Long.parseLong(String.valueOf(objectMap.get("examId")));
+            Long examStudentId = Long.parseLong(String.valueOf(objectMap.get("examStudentId")));
+            Integer durationSeconds = Integer.parseInt(String.valueOf(objectMap.get("durationSeconds")));
+//            获取考试缓存
+//            ExamCacheBean ec = teExamService.getExamCacheBean(examId);
+            if (Objects.isNull(leftBreakResumeCount) || leftBreakResumeCount <= 0) {
+//                tOeExamRecordService.updateById(tOeExamRecord);
+                teExamService.finish(examStudentId, recordId, FinishTypeEnum.AUTO.name(), durationSeconds);
             }
+//            else {//否则断点次数减1
+//                leftBreakResumeCount--;
+//                //如果断点次数超过了考试整体断点次数,也强制交卷
+//                if (leftBreakResumeCount > ec.getBreakResumeCount()) {
+//                    teExamService.finish(examStudentId, recordId, FinishTypeEnum.AUTO.name(), durationSeconds);
+//                } else {
+//                    //先查询之前的断点记录
+//                    QueryWrapper<TOeExamBreakHistory> tOeExamBreakHistoryQueryWrapper = new QueryWrapper<>();
+//                    tOeExamBreakHistoryQueryWrapper.lambda().eq(TOeExamBreakHistory::getExamRecordId, recordId);
+//                    List<TOeExamBreakHistory> tOeExamBreakHistoryList = tOeExamBreakHistoryService.list(tOeExamBreakHistoryQueryWrapper);
+//                    //删除历史断点缓存
+//                    if (Objects.nonNull(tOeExamBreakHistoryList) && tOeExamBreakHistoryList.size() > 0) {
+//                        tOeExamBreakHistoryList.forEach(s -> {
+//                            redisUtil.delete(SystemConstant.EXAM_BREAK + String.valueOf(s.getId()));
+//                        });
+//                    }
+//                    //增加断点记录
+//                    TOeExamBreakHistory tOeExamBreakHistory = new TOeExamBreakHistory(recordId, new Date(), BreakReasonEnum.NET_TIME_OUT, BreakReasonEnum.NET_TIME_OUT.name());
+//                    tOeExamBreakHistoryService.save(tOeExamBreakHistory);
+//                    redisUtil.set(SystemConstant.EXAM_BREAK + String.valueOf(tOeExamBreakHistory.getId()), tOeExamBreakHistory);
+//                    //更新考试记录状态
+////                    tOeExamRecord.setClientCurrentIp(ip);
+////                    tOeExamRecord.setClientWebsocketId(deviceId);
+////                    tOeExamRecord.setClientWebsocketStatus(WebsocketStatusEnum.UN_ONLINE.ordinal());
+////                    tOeExamRecord.setClientLastSyncTime(date);
+////                    tOeExamRecord.setLastBreakId(tOeExamBreakHistory.getId());
+////                    tOeExamRecord.setLastBreakTime(tOeExamBreakHistory.getBreakTime());
+////                    tOeExamRecord.setLeftBreakResumeCount(breakCount);
+////                    tOeExamRecord.setStatus(ExamRecordStatusEnum.bREAK_OFF);
+//                }
+////                tOeExamRecordService.updateById(tOeExamRecord);
+//            }
         }
         teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
@@ -259,6 +279,9 @@ public class MqLogicServiceImpl implements MqLogicService {
 
     /**
      * 人脸验证保存
+     *
+     * @param mqDto
+     * @param key
      */
     @Override
     @Transactional
@@ -283,9 +306,13 @@ public class MqLogicServiceImpl implements MqLogicService {
     }
 
     /**
-     *活体验证
+     * 活体验证
+     *
+     * @param mqDto
+     * @param key
      */
     @Override
+    @Transactional
     public void execMqLivenessVerifySaveLogic(MqDto mqDto, String key) {
         Gson gson = new Gson();
         Map<String, Object> param = (Map<String, Object>) mqDto.getBody();
@@ -304,11 +331,15 @@ public class MqLogicServiceImpl implements MqLogicService {
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }
-    
+
     /**
-     *考试记录数据持久化
+     * 考试记录数据持久化
+     *
+     * @param mqDto
+     * @param key
      */
     @Override
+    @Transactional
     public void execMqExamRecordPersistedLogic(MqDto mqDto, String key) {
         Gson gson = new Gson();
         Map<String, Object> param = (Map<String, Object>) mqDto.getBody();
@@ -320,4 +351,44 @@ public class MqLogicServiceImpl implements MqLogicService {
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }
+
+    /**
+     * 增加考试断点记录
+     *
+     * @param mqDto
+     * @param key
+     */
+    @Override
+    @Transactional
+    public void execMqExamBreakHistoryLogic(MqDto mqDto, String key) {
+        //更新考试记录
+        Map<String, Object> objectMap = JacksonUtil.readJson(String.valueOf(mqDto.getBody()), Map.class);
+        Long recordId = Long.parseLong(mqDto.getObjId());
+
+        //先查询之前的断点记录
+        QueryWrapper<TOeExamBreakHistory> tOeExamBreakHistoryQueryWrapper = new QueryWrapper<>();
+        tOeExamBreakHistoryQueryWrapper.lambda().eq(TOeExamBreakHistory::getExamRecordId, recordId);
+        List<TOeExamBreakHistory> tOeExamBreakHistoryList = tOeExamBreakHistoryService.list(tOeExamBreakHistoryQueryWrapper);
+        //删除历史断点缓存
+        if (Objects.nonNull(tOeExamBreakHistoryList) && tOeExamBreakHistoryList.size() > 0) {
+            tOeExamBreakHistoryList.forEach(s -> {
+                redisUtil.delete(SystemConstant.EXAM_BREAK + String.valueOf(s.getId()));
+            });
+        }
+        //增加断点记录
+        TOeExamBreakHistory tOeExamBreakHistory = new TOeExamBreakHistory(recordId, new Date(), BreakReasonEnum.NET_TIME_OUT, BreakReasonEnum.NET_TIME_OUT.name());
+        tOeExamBreakHistoryService.save(tOeExamBreakHistory);
+        redisUtil.set(SystemConstant.EXAM_BREAK + String.valueOf(tOeExamBreakHistory.getId()), tOeExamBreakHistory);
+
+        objectMap.put("lastBreakId", tOeExamBreakHistory.getId());
+        redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(recordId), objectMap);
+
+        Gson gson = new Gson();
+        teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
+        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
 }

+ 68 - 0
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/ExamBreakHistoryConcurrentlyImpl.java

@@ -0,0 +1,68 @@
+package com.qmth.themis.mq.templete.impl;
+
+import com.qmth.themis.business.constant.SpringContextHolder;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.service.MqLogicService;
+import com.qmth.themis.mq.templete.Concurrently;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * @Description: mq 考试断点记录并行消费监听
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/28
+ */
+@Service
+public class ExamBreakHistoryConcurrentlyImpl implements Concurrently {
+    private final static Logger log = LoggerFactory.getLogger(ExamBreakHistoryConcurrentlyImpl.class);
+
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
+        MqLogicService mqLogicService = SpringContextHolder.getBean(MqLogicService.class);
+        MqDto mqDto = null;
+        try {
+            long threadId = Thread.currentThread().getId();
+            String threadName = Thread.currentThread().getName();
+            for (MessageExt messageExt : msgs) {
+                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+                log.info(":{}-:{} examBreakHistory Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+                log.info(":{}-:{} examBreakHistory Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+                int reconsumeTime = messageExt.getReconsumeTimes();
+                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                } else {
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+                        log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
+                        mqLogicService.execMqExamBreakHistoryLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+        } finally {
+            if (Objects.nonNull(mqDto)) {
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+            }
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+    }
+}