Răsfoiți Sursa

广播模式逻辑优化

wangliang 4 ani în urmă
părinte
comite
6d83b5cbd8

+ 1 - 1
themis-admin/src/main/resources/application.properties

@@ -180,7 +180,7 @@ mq.config.map.FACE_VERIFY_SAVE_GROUP=themis-group-exam-faceVerifySave
 mq.config.map.LIVENESS_VERIFY_SAVE_GROUP=themis-group-exam-livenessVerifySave
 mq.config.map.EXAM_RECORD_PERSISTED_GROUP=themis-group-exam-examRecordPersisted
 mq.config.map.EXAM_RECORD_UPDATE_GROUP=themis-group-exam-examRecordUpdate
-mq.config.map.EXAM_BREAK_HISTORY_PERSISTED=themis-group-exam-examBreakHistoryPersisted
+mq.config.map.EXAM_BREAK_RECORD_PERSISTED_GROUP=themis-group-exam-examBreakHistoryPersisted
 mq.config.map.SCORE_CALCULATE_GROUP=themis-group-exam-scoreCalculate
 mq.config.map.EXAM_STUDENT_UPDATE_GROUP=themis-group-exam-examStudentUpdate
 mq.config.map.EXAM_BREAK_GROUP=themis-group-exam-examBreak

+ 2 - 0
themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java

@@ -95,6 +95,8 @@ public class SystemConstant {
 
     public static final String MQ_DELAY_TOPIC_BUFFER_LIST = "mq:delay:topic:buffer:list";
 
+    public static final String MQ_BROADCAST_TOPIC_BUFFER_LIST = "mq:broadcast:topic:buffer:list";
+
     public static final String TYPE = "type";
 
     public static final String LOCAL = "local";

+ 9 - 0
themis-business/src/main/java/com/qmth/themis/business/dto/MqDto.java

@@ -27,6 +27,7 @@ public class MqDto implements Serializable {
     private Integer ack;//ack
     private Integer sequence;//序号
     private Map<String, Object> properties;//扩展类型
+    private Integer reconsume = 0;//重试次数
 
     public MqDto() {
 
@@ -85,6 +86,14 @@ public class MqDto implements Serializable {
         this.properties = properties;
     }
 
+    public Integer getReconsume() {
+        return reconsume;
+    }
+
+    public void setReconsume(Integer reconsume) {
+        this.reconsume = reconsume;
+    }
+
     public static long getSerialVersionUID() {
         return serialVersionUID;
     }

+ 15 - 15
themis-business/src/main/java/com/qmth/themis/business/enums/MqTagEnum.java

@@ -21,12 +21,12 @@ public enum MqTagEnum {
     ROOM_CODE_EXPORT("考场导出任务标签", "考场导出任务", "normal", 9),
     ROOM_CODE_IMPORT("考场导入任务标签", "考场导入任务", "normal", 10),
     EXAM_PAPER_IMPORT("试卷导入任务标签", "试卷导入任务", "normal", 11),
-    OE_IM_CLUSTERING("websocket客户端点对点发送消息标签", "客户端点对点消息", "normal", 12),
-    OE_IM_BROADCASTING("websocket客户端广播发送消息标签", "客户端广播消息", "normal", 13),
-    OE_MONITOR_FINISH("websocket客户端监考强制离线(交卷)标签", "监考强制离线(交卷)", "normal", 14),
-    OE_WARNING_FINISH("websocket客户端预警强制离线(交卷)标签", "预警强制离线(交卷)", "normal", 15),
-    OE_HARD_FINISH("websocket客户端手动(交卷)标签", "手动(交卷)", "normal", 16),
-    OE_LIVENESS_VERIFY("websocket客户端监考强制活体验证标签", "监考强制活体验证", "normal", 17),
+    OE_IM_CLUSTERING("websocket客户端点对点发送消息标签", "客户端点对点消息", "broadcast", 12),
+    OE_IM_BROADCASTING("websocket客户端广播发送消息标签", "客户端广播消息", "broadcast", 13),
+    OE_MONITOR_FINISH("websocket客户端监考强制离线(交卷)标签", "监考强制离线(交卷)", "broadcast", 14),
+    OE_WARNING_FINISH("websocket客户端预警强制离线(交卷)标签", "预警强制离线(交卷)", "broadcast", 15),
+    OE_HARD_FINISH("websocket客户端手动(交卷)标签", "手动(交卷)", "broadcast", 16),
+    OE_LIVENESS_VERIFY("websocket客户端监考强制活体验证标签", "监考强制活体验证", "broadcast", 17),
     OE_UN_NORMAL("websocket超时退出标签", "websocket超时退出", "delay", 18),
     EXAM_ACTIVITY("考场一次性延时任务标签", "考场一次性延时任务", "normal", 19),
     QUARTZ("quartz标签", "quartz任务", "normal", 20),
@@ -40,20 +40,20 @@ public enum MqTagEnum {
     EXAM_SCORE_CALCULATE("重新算分标签", "重新算分", "normal", 30),
     EXAM_STUDNET_UPDATE("考生数据更新标签", "考生数据更新", "normal", 31),
     EXAM_BREAK("考试断点标签", "考试断点", "normal", 32),
-    EXAM_STOP("考试移动端监控退出标签", "考试移动端退出暂停", "normal", 33),
+    EXAM_STOP("考试移动端监控退出标签", "考试移动端退出暂停", "broadcast", 33),
     EXAM_STUDENT("考生一次性延时任务标签", "考生一次性延时任务", "normal", 35),
     EXAM_STUDENT_EXPORT("考生导出任务标签", "考生导出任务", "normal", 36),
     MARK_RESULT_SIMPLE_EXPORT("成绩查询简版导出任务标签", "成绩查询简版导出任务", "normal", 37),
     MARK_RESULT_STANDARD_EXPORT("成绩查询标准版导出任务标签", "成绩查询标准版导出任务", "normal", 38),
     EXAM_BREAK_DELAY("断点时间标签", "断点时间", "delay", 39),
     EXAM_BREAK_HISTORY_PERSISTED("断点记录数据持久化标签", "断点记录数据持久化", "normal", 40),
-    OE_WEBSOCKET_MOBILE_ANSWER_READY("移动端拍照/录音扫描完成标签", "移动端拍照/录音扫描完成","normal", 42),
-    OE_WEBSOCKET_MOBILE_ANSWER_UPLOAD("移动端拍照/录音上传成功标签", "移动端拍照/录音上传成功","normal", 43),
-    OE_WEBSOCKET_EXAM_STOP("客户端考试结束标签", "客户端考试结束","normal", 44),
-    MONITOR_START("监控开始标签", "监控开始","normal", 45),
-    MONITOR_STOP("监控结束标签", "监控结束","normal", 46),
-    EXAM_START("考试移动端监控开始标签", "考试移动端开始", "normal", 47),
-    OE_WEBSOCKET_MOBILE_MONITOR_STATUS("通知客户端移动端当前监控状态标签", "通知客户端移动端当前监控状态", "normal", 48);
+    OE_WEBSOCKET_MOBILE_ANSWER_READY("移动端拍照/录音扫描完成标签", "移动端拍照/录音扫描完成","broadcast", 42),
+    OE_WEBSOCKET_MOBILE_ANSWER_UPLOAD("移动端拍照/录音上传成功标签", "移动端拍照/录音上传成功","broadcast", 43),
+    OE_WEBSOCKET_EXAM_STOP("客户端考试结束标签", "客户端考试结束","broadcast", 44),
+    MONITOR_START("监控开始标签", "监控开始","broadcast", 45),
+    MONITOR_STOP("监控结束标签", "监控结束","broadcast", 46),
+    EXAM_START("考试移动端监控开始标签", "考试移动端开始", "broadcast", 47),
+    OE_WEBSOCKET_MOBILE_MONITOR_STATUS("通知客户端移动端当前监控状态标签", "通知客户端移动端当前监控状态", "broadcast", 48);
 
     private MqTagEnum(String desc, String code, String type, int id) {
         this.desc = desc;
@@ -66,7 +66,7 @@ public enum MqTagEnum {
 
     private String desc;//标签
 
-    private String type;//消息类型,normal:正常消息,delay:延时消息,transactional:事务消息
+    private String type;//消息类型,normal:正常消息,delay:延时消息,transactional:事务消息,broadcast:广播消息
 
     private int id;
 

+ 3 - 1
themis-business/src/main/java/com/qmth/themis/business/service/impl/MqDtoServiceImpl.java

@@ -152,7 +152,9 @@ public class MqDtoServiceImpl implements MqDtoService {
         if (Objects.nonNull(mqDto)) {
             if (Objects.equals(mqDto.getType().getType(), "normal")) {
                 redisUtil.set(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
-            } else {
+            } else if (Objects.equals(mqDto.getType().getType(), "broadcast")) {
+                redisUtil.set(SystemConstant.MQ_BROADCAST_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
+            } else if (Objects.equals(mqDto.getType().getType(), "delay")) {
                 redisUtil.set(SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
             }
         }

+ 12 - 6
themis-exam/src/main/java/com/qmth/themis/exam/api/TEExamController.java

@@ -129,10 +129,10 @@ public class TEExamController {
         String recordId = (String) mapParameter.get("recordId");
         ExamRecordStatusEnum status = ExamRecordCacheUtil.getStatus(Long.valueOf(recordId));
         if (Objects.nonNull(status) && Objects.equals(ExamRecordStatusEnum.ANSWERING, status)) {//答题状态,强制断点
-            ExamConstant.sendExamStopMsg(Long.valueOf(recordId), true);
+            ExamConstant.sendExamStopMsg(Long.valueOf(recordId), true, false);
             tOeExamRecordService.examBreakLogic(Long.valueOf(recordId), true);
         } else if (Objects.nonNull(status) && Objects.equals(ExamRecordStatusEnum.FIRST_PREPARE, status)) {
-            ExamConstant.sendExamStopMsg(Long.valueOf(recordId), false);
+            ExamConstant.sendExamStopMsg(Long.valueOf(recordId), false, true);
         } else {
             if (Objects.nonNull(status)) {
                 throw new BusinessException(ExceptionResultEnum.EXAM_STATUS_ERROR);
@@ -140,9 +140,11 @@ public class TEExamController {
         }
         ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
         String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(Long.parseLong(recordId));
-        WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
-        if (Objects.nonNull(webSocketOeServer)) {
-            webSocketOeServer.onClose();
+        if (Objects.nonNull(clientWebsocketId)) {
+            WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
+            if (Objects.nonNull(webSocketOeServer)) {
+                webSocketOeServer.onClose();
+            }
         }
         return ResultUtil.ok(Collections.singletonMap(SystemConstant.SUCCESS, true));
     }
@@ -299,7 +301,7 @@ public class TEExamController {
                 throw new BusinessException("总用时秒数不能为空");
             }
             Result re = ResultUtil.ok(teExamService.finish(teStudent.getId(), param.getRecordId(), param.getType(), param.getDurationSeconds()));
-            ExamConstant.sendExamStopMsg(param.getRecordId(), false);
+            ExamConstant.sendExamStopMsg(param.getRecordId(), false, true);
             return re;
         } finally {
             redisUtil.releaseLock(lockKey);
@@ -341,6 +343,10 @@ public class TEExamController {
         if (Objects.isNull(param.getRecordId()) || Objects.equals(param.getRecordId(), "")) {
             throw new BusinessException(ExceptionResultEnum.RECORD_ID_IS_NULL);
         }
+        ExamRecordStatusEnum examRecordStatusEnum = ExamRecordCacheUtil.getStatus(param.getRecordId());
+        if (Objects.nonNull(examRecordStatusEnum) && (!Objects.equals(ExamRecordStatusEnum.FIRST_PREPARE, examRecordStatusEnum) && !Objects.equals(ExamRecordStatusEnum.RESUME_PREPARE, examRecordStatusEnum))) {
+            throw new BusinessException(ExceptionResultEnum.EXAM_STATUS_ERROR);
+        }
         Integer paperDownload = Objects.isNull(ExamRecordCacheUtil.getPaperDownload(param.getRecordId())) ? 1 : ExamRecordCacheUtil.getPaperDownload(param.getRecordId());
         if (paperDownload.intValue() == 1) {
             ExamRecordCacheUtil.setPaperDownload(param.getRecordId(), 0);

+ 1 - 1
themis-exam/src/main/java/com/qmth/themis/exam/api/TEStudentController.java

@@ -264,7 +264,7 @@ public class TEStudentController {
                     .equals(status, ExamRecordStatusEnum.RESUME_PREPARE)) {
                 //只有ANSWERING状态才生成断点
                 if (Objects.equals(status, ExamRecordStatusEnum.ANSWERING)) {
-                    ExamConstant.sendExamStopMsg(recordId, true);
+                    ExamConstant.sendExamStopMsg(recordId, true, false);
                     tOeExamRecordService.setExamBreak(recordId);
                 }
                 Boolean finished = tOeExamRecordService.examBreakLogic(recordId, false);

+ 5 - 3
themis-exam/src/main/java/com/qmth/themis/exam/config/ExamConstant.java

@@ -25,7 +25,7 @@ public class ExamConstant {
      * @param recordId
      * @param clientStop
      */
-    public static void sendExamStopMsg(Long recordId, boolean clientStop) {
+    public static void sendExamStopMsg(Long recordId, boolean clientStop, boolean mobileStop) {
         MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
         MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
         Map mapParameter = new HashMap<>();
@@ -36,8 +36,10 @@ public class ExamConstant {
             mqDtoService.assembleSendOneWayMsg(mqDto);
         }
         //移动端考试结束
-        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_STOP.name(), recordId, MqTagEnum.EXAM_STOP, String.valueOf(recordId), mapParameter, String.valueOf(recordId));
-        mqDtoService.assembleSendOneWayMsg(mqDto);
+        if (mobileStop) {
+            MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_STOP.name(), recordId, MqTagEnum.EXAM_STOP, String.valueOf(recordId), mapParameter, String.valueOf(recordId));
+            mqDtoService.assembleSendOneWayMsg(mqDto);
+        }
     }
 
     /**

+ 1 - 12
themis-exam/src/main/java/com/qmth/themis/exam/listener/service/MqOeLogicService.java

@@ -2,8 +2,6 @@ package com.qmth.themis.exam.listener.service;
 
 import com.qmth.themis.business.dto.MqDto;
 
-import java.lang.reflect.InvocationTargetException;
-
 /**
  * @Description: mq执行逻辑
  * @Param:
@@ -13,21 +11,13 @@ import java.lang.reflect.InvocationTargetException;
  */
 public interface MqOeLogicService {
 
-    /**
-     * mq最大重试次数逻辑
-     *
-     * @param mqDto
-     * @param key
-     */
-    public void execMqMaxReconsumeTime(MqDto mqDto, String key);
-
     /**
      * oe逻辑
      *
      * @param mqDto
      * @param key
      */
-    public void execMqOeLogic(MqDto mqDto, String key) throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException;
+    public void execMqOeLogic(MqDto mqDto, String key);
 
     /**
      * oe mobile 逻辑
@@ -36,5 +26,4 @@ public interface MqOeLogicService {
      * @param key
      */
     public void execMqOeMobileLogic(MqDto mqDto, String key);
-
 }

+ 7 - 35
themis-exam/src/main/java/com/qmth/themis/exam/listener/service/impl/MqOeLogicServiceImpl.java

@@ -27,7 +27,6 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Resource;
-import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -61,38 +60,15 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
     @Resource
     TIeExamInvigilateNoticeService tIeExamInvigilateNoticeService;
 
-    /**
-     * mq最大重试次数逻辑
-     *
-     * @param mqDto
-     * @param key
-     */
-    @Override
-    @Transactional
-    public void execMqMaxReconsumeTime(MqDto mqDto, String key) {
-        //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
-        Gson gson = new Gson();
-        mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
-        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
-        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-        redisUtil.delete(key, mqDto.getId());
-    }
-
     /**
      * oe逻辑
      *
      * @param mqDto
      * @param key
-     * @throws ClassNotFoundException
-     * @throws IllegalAccessException
-     * @throws InstantiationException
-     * @throws NoSuchMethodException
-     * @throws InvocationTargetException
      */
     @Override
     @Transactional
-    public void execMqOeLogic(MqDto mqDto, String key) throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+    public void execMqOeLogic(MqDto mqDto, String key) {
         Gson gson = new Gson();
         ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
         String tag = mqDto.getTag();
@@ -103,7 +79,7 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
                 Long recordId = Long.parseLong(String.valueOf(s));
                 //获取考试记录的客户端websocketId
                 String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
-                if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
+                if (Objects.nonNull(clientWebsocketId) && Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
                     Long examStudentId = ExamRecordCacheUtil.getExamStudentId(recordId);
                     ExamStudentCacheBean examStudentCacheBean = teExamStudentService.getExamStudentCacheBean(examStudentId);
                     WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
@@ -130,7 +106,7 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
             examRecordId.forEach(s -> {
                 Long recordId = Long.parseLong(String.valueOf(s));
                 String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
-                if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
+                if (Objects.nonNull(clientWebsocketId) && Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
                     Long examStudentId = ExamRecordCacheUtil.getExamStudentId(recordId);
                     ExamStudentCacheBean examStudentCacheBean = teExamStudentService.getExamStudentCacheBean(examStudentId);
                     WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
@@ -154,7 +130,7 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
         } else if (Objects.equals(MqTagEnum.OE_IM_CLUSTERING.name(), tag)) {//点对点消息
             Long recordId = Long.parseLong(String.valueOf(mqDto.getBody()));
             String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
-            if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
+            if (Objects.nonNull(clientWebsocketId) && Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
                 Long examId = ExamRecordCacheUtil.getExamId(recordId);
                 Long examStudentId = ExamRecordCacheUtil.getExamStudentId(recordId);
                 Long examActivityId = ExamRecordCacheUtil.getExamActivityId(recordId);
@@ -185,7 +161,7 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
         } else if (Objects.equals(MqTagEnum.OE_LIVENESS_VERIFY.name(), tag)) {//监考强制活体验证
             Long recordId = Long.parseLong(String.valueOf(mqDto.getBody()));
             String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
-            if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
+            if (Objects.nonNull(clientWebsocketId) && Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
                 Long examStudentId = ExamRecordCacheUtil.getExamStudentId(recordId);
                 ExamStudentCacheBean examStudentCacheBean = teExamStudentService.getExamStudentCacheBean(examStudentId);
                 WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
@@ -213,7 +189,7 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
             Long recordId = Long.parseLong(String.valueOf(mqDto.getBody()));
             String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
             Map<String, Object> prop = mqDto.getProperties();
-            if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
+            if (Objects.nonNull(clientWebsocketId) && Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
                 WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
                 ExamRecordStatusEnum examRecordStatusEnum = ExamRecordCacheUtil.getStatus(recordId);
                 if ((Objects.nonNull(examRecordStatusEnum)
@@ -275,13 +251,9 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
      *
      * @param mqDto
      * @param key
-     * @throws ClassNotFoundException
-     * @throws IllegalAccessException
-     * @throws InstantiationException
-     * @throws NoSuchMethodException
-     * @throws InvocationTargetException
      */
     @Override
+    @Transactional
     public void execMqOeMobileLogic(MqDto mqDto, String key) {
         Gson gson = new Gson();
         String tag = mqDto.getTag();

+ 3 - 18
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketMobileServer.java

@@ -243,31 +243,16 @@ public class WebSocketMobileServer implements Concurrently {
 
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
-        RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
         MqOeLogicService mqOeLogicService = SpringContextHolder.getBean(MqOeLogicService.class);
-        MqDto mqDto = null;
         try {
             long threadId = Thread.currentThread().getId();
             String threadName = Thread.currentThread().getName();
             for (MessageExt messageExt : msgs) {
                 log.info(":{}-:{} websocket oe Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
-                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+                MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-                int reconsumeTime = messageExt.getReconsumeTimes();
-                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                    mqOeLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
-                } else {
-                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        try {
-                            mqOeLogicService.execMqOeMobileLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
-                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                        } finally {
-                            if (Objects.nonNull(mqDto)) {
-                                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
-                            }
-                        }
-                    }
-                }
+                mqOeLogicService.execMqOeMobileLogic(mqDto, SystemConstant.MQ_BROADCAST_TOPIC_BUFFER_LIST);
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         } catch (Exception e) {
             log.error("mq websocket oe,消息消费出错", e);

+ 4 - 19
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java

@@ -143,7 +143,7 @@ public class WebSocketOeServer implements Concurrently {
                     //发送延时mq消息end
                 } else {
                     log.info("正常退出");
-                    ExamConstant.sendExamStopMsg(this.recordId, true);
+                    ExamConstant.sendExamStopMsg(this.recordId, true, false);
                     tOeExamRecordService.examBreakLogic(this.recordId, true);
                 }
             }
@@ -264,31 +264,16 @@ public class WebSocketOeServer implements Concurrently {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext
             consumeConcurrentlyContext) {
-        RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
         MqOeLogicService mqOeLogicService = SpringContextHolder.getBean(MqOeLogicService.class);
-        MqDto mqDto = null;
         try {
             long threadId = Thread.currentThread().getId();
             String threadName = Thread.currentThread().getName();
             for (MessageExt messageExt : msgs) {
                 log.info(":{}-:{} websocket oe Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
-                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+                MqDto mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} websocket oe Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-                int reconsumeTime = messageExt.getReconsumeTimes();
-                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                    mqOeLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
-                } else {
-                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        try {
-                            mqOeLogicService.execMqOeLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
-                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                        } finally {
-                            if (Objects.nonNull(mqDto)) {
-                                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
-                            }
-                        }
-                    }
-                }
+                mqOeLogicService.execMqOeLogic(mqDto, SystemConstant.MQ_BROADCAST_TOPIC_BUFFER_LIST);
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         } catch (Exception e) {
             log.error("mq websocket oe,消息消费出错", e);

+ 1 - 1
themis-exam/src/main/resources/application.properties

@@ -132,7 +132,7 @@ mq.config.map.FACE_VERIFY_SAVE_GROUP=themis-group-exam-faceVerifySave
 mq.config.map.LIVENESS_VERIFY_SAVE_GROUP=themis-group-exam-livenessVerifySave
 mq.config.map.EXAM_RECORD_PERSISTED_GROUP=themis-group-exam-examRecordPersisted
 mq.config.map.EXAM_RECORD_UPDATE_GROUP=themis-group-exam-examRecordUpdate
-mq.config.map.EXAM_BREAK_HISTORY_PERSISTED=themis-group-exam-examBreakHistoryPersisted
+mq.config.map.EXAM_BREAK_RECORD_PERSISTED_GROUP=themis-group-exam-examBreakHistoryPersisted
 mq.config.map.SCORE_CALCULATE_GROUP=themis-group-exam-scoreCalculate
 mq.config.map.EXAM_STUDENT_UPDATE_GROUP=themis-group-exam-examStudentUpdate
 mq.config.map.EXAM_BREAK_GROUP=themis-group-exam-examBreak

+ 0 - 3
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketMessageConsumer.java

@@ -2,7 +2,6 @@ package com.qmth.themis.mq.listener;
 
 import com.qmth.themis.business.constant.SystemConstant;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.MessageListener;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -45,8 +44,6 @@ public class RocketMessageConsumer {
             consumer.registerMessageListener((MessageListenerConcurrently) o);
         } else if (o instanceof MessageListenerOrderly) {
             consumer.registerMessageListener((MessageListenerOrderly) o);
-        } else {
-            consumer.registerMessageListener((MessageListener) o);
         }
         try {
             consumer.subscribe(topic, tag);

+ 38 - 0
themis-task/src/main/java/com/qmth/themis/task/quartz/MqJob.java

@@ -1,8 +1,12 @@
 package com.qmth.themis.task.quartz;
 
+import com.google.gson.Gson;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.dto.MqDto;
+import com.qmth.themis.business.entity.TMRocketMessage;
 import com.qmth.themis.business.service.ProducerServer;
+import com.qmth.themis.business.service.TMRocketMessageService;
+import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import org.quartz.JobExecutionContext;
 import org.slf4j.Logger;
@@ -29,12 +33,17 @@ public class MqJob extends QuartzJobBean {
     @Resource
     ProducerServer producerServer;
 
+    @Resource
+    TMRocketMessageService tmRocketMessageService;
+
     @Override
     protected void executeInternal(JobExecutionContext context) {
         log.info("mq_job进来了,context:{}", context);
         this.assembleJob(SystemConstant.MQ_TOPIC_BUFFER_LIST);
         log.info("mq_delay_job进来了,context:{}", context);
         this.assembleDelayJob(SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST);
+        log.info("mq_broadcast_job进来了,context:{}", context);
+        this.assembleBroadCastJob(SystemConstant.MQ_BROADCAST_TOPIC_BUFFER_LIST);
     }
 
     /**
@@ -75,4 +84,33 @@ public class MqJob extends QuartzJobBean {
             });
         }
     }
+
+    /**
+     * 组装job
+     *
+     * @param redisKey
+     */
+    public void assembleBroadCastJob(String redisKey) {
+        Long size = redisUtil.getHashSize(redisKey);
+        if (Objects.nonNull(size) && size.longValue() > 0) {
+            log.info("redisKey:{}缓冲区的消息数为:{}", redisKey, size);
+            Map map = redisUtil.getHashEntries(redisKey);
+            map.forEach((k, v) -> {
+                MqDto mqDto = (MqDto) v;
+                int reconsume = mqDto.getReconsume();
+                if (reconsume < SystemConstant.MAXRECONSUMETIMES) {
+                    mqDto.setReconsume(mqDto.getReconsume() + 1);
+                    redisUtil.set(SystemConstant.MQ_BROADCAST_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
+                    producerServer.sendOneWay(mqDto);
+                } else {
+                    mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
+                    Gson gson = new Gson();
+                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                    tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                    redisUtil.delete(SystemConstant.MQ_BROADCAST_TOPIC_BUFFER_LIST, mqDto.getId());
+                }
+            });
+        }
+    }
 }

+ 1 - 1
themis-task/src/main/resources/application.properties

@@ -190,7 +190,7 @@ mq.config.map.FACE_VERIFY_SAVE_GROUP=themis-group-exam-faceVerifySave
 mq.config.map.LIVENESS_VERIFY_SAVE_GROUP=themis-group-exam-livenessVerifySave
 mq.config.map.EXAM_RECORD_PERSISTED_GROUP=themis-group-exam-examRecordPersisted
 mq.config.map.EXAM_RECORD_UPDATE_GROUP=themis-group-exam-examRecordUpdate
-mq.config.map.EXAM_BREAK_HISTORY_PERSISTED=themis-group-exam-examBreakHistoryPersisted
+mq.config.map.EXAM_BREAK_RECORD_PERSISTED_GROUP=themis-group-exam-examBreakHistoryPersisted
 mq.config.map.SCORE_CALCULATE_GROUP=themis-group-exam-scoreCalculate
 mq.config.map.EXAM_STUDENT_UPDATE_GROUP=themis-group-exam-examStudentUpdate
 mq.config.map.EXAM_BREAK_GROUP=themis-group-exam-examBreak