Browse Source

改用MQ通知发送websocket消息,兼容集群环境

wangliang 4 years ago
parent
commit
46fa30af68

+ 7 - 1
themis-business/src/main/java/com/qmth/themis/business/enums/MqTagEnum.java

@@ -50,7 +50,13 @@ public enum MqTagEnum {
     MARK_RESULT_STANDARD_EXPORT("成绩查询标准版导出任务标签", "成绩查询标准版导出任务", "normal", 38),
     EXAM_BREAK_DELAY("断点时间标签", "断点时间", "delay", 39),
     EXAM_BREAK_HISTORY_UPDATE("断点记录数据更新标签", "断点记录数据更新", "normal", 40),
-    EXAM_BREAK_HISTORY_UPDATE_COLUMNS("断点记录多字段数据更新标签", "断点记录多字段数据更新", "normal", 41);
+    EXAM_BREAK_HISTORY_UPDATE_COLUMNS("断点记录多字段数据更新标签", "断点记录多字段数据更新", "normal", 41),
+    OE_WEBSOCKET_MOBILE_ANSWER_READY("移动端拍照/录音扫描完成标签", "移动端拍照/录音扫描完成","normal", 42),
+    OE_WEBSOCKET_MOBILE_ANSWER_UPLOAD("移动端拍照/录音上传成功标签", "移动端拍照/录音上传成功","normal", 43),
+    OE_WEBSOCKET_EXAM_STOP("客户端考试结束标签", "客户端考试结束","normal", 44),
+    MONITOR_START("监控开始标签", "监控开始","normal", 45),
+    MONITOR_STOP("监控结束标签", "监控结束","normal", 46),
+    EXAM_START("考试移动端监控开始标签", "考试移动端开始", "normal", 47);
 
     private MqTagEnum(String desc, String code, String type, int id) {
         this.desc = desc;

+ 29 - 38
themis-exam/src/main/java/com/qmth/themis/exam/api/TEMobileController.java

@@ -8,19 +8,20 @@ import com.qmth.themis.business.bean.mobile.MobileAuthorizationParamBean;
 import com.qmth.themis.business.cache.ExamRecordCacheUtil;
 import com.qmth.themis.business.cache.bean.ExamStudentCacheBean;
 import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.dto.WebsocketDto;
+import com.qmth.themis.business.dto.MqDto;
 import com.qmth.themis.business.dto.cache.TEStudentCacheDto;
 import com.qmth.themis.business.enums.ExamRecordStatusEnum;
-import com.qmth.themis.business.enums.WebsocketTypeEnum;
+import com.qmth.themis.business.enums.MqTagEnum;
+import com.qmth.themis.business.service.MqDtoService;
 import com.qmth.themis.business.service.TEExamService;
 import com.qmth.themis.business.service.TEExamStudentService;
 import com.qmth.themis.business.service.TEMobileService;
+import com.qmth.themis.business.util.MqUtil;
 import com.qmth.themis.business.util.ServletUtil;
 import com.qmth.themis.common.enums.ExceptionResultEnum;
 import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
-import com.qmth.themis.exam.websocket.WebSocketOeServer;
 import io.swagger.annotations.*;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.web.bind.annotation.*;
@@ -30,8 +31,6 @@ import javax.annotation.Resource;
 import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 
 @Api(tags = "移动端接口")
 @RestController
@@ -47,6 +46,11 @@ public class TEMobileController {
     @Resource
     TEExamStudentService examStudentService;
 
+    @Resource
+    MqUtil mqUtil;
+
+    @Resource
+    MqDtoService mqDtoService;
 
     @ApiOperation(value = "获取登录详细信息")
     @RequestMapping(value = "/authorization", method = RequestMethod.POST)
@@ -83,23 +87,17 @@ public class TEMobileController {
         if (ExamRecordStatusEnum.FINISHED.equals(sta) || ExamRecordStatusEnum.PERSISTED.equals(sta)) {
             throw new BusinessException(ExceptionResultEnum.NOT_FOUND_EXAM_STUDENT);
         }
-        ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
-        String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(param.getRecordId());
-        WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
-        if (webSocketOeServer == null) {
-            throw new BusinessException("消息连接不存在");
-        }
-        if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == param.getRecordId().longValue()) {
-            Map<String, Object> map = new HashMap<String, Object>();
-            map.put("recordId", param.getRecordId());
-            map.put("mainNumber", param.getMainNumber());
-            map.put("subNumber", param.getSubNumber());
-            if (param.getSubIndex() != null) {
-                map.put("subIndex", param.getSubIndex());
-            }
-            WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_ANSWER_READY.name(), map);
-            webSocketOeServer.sendMessage(websocketDto);
+
+        TEStudentCacheDto teStudent = (TEStudentCacheDto) ServletUtil.getRequestStudentAccount();
+        Map<String, Object> mapParameter = new HashMap<String, Object>();
+        mapParameter.put("recordId", param.getRecordId());
+        mapParameter.put("mainNumber", param.getMainNumber());
+        mapParameter.put("subNumber", param.getSubNumber());
+        if (param.getSubIndex() != null) {
+            mapParameter.put("subIndex", param.getSubIndex());
         }
+        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_READY.name(), param.getRecordId(), MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_READY, String.valueOf(teStudent.getId()), mapParameter, teStudent.getName());
+        mqDtoService.assembleSendOneWayMsg(mqDto);
         ExamStudentCacheBean es = examStudentService.getExamStudentCacheBean(esId);
         AnswerReadyResponseBean ret = new AnswerReadyResponseBean();
         ret.setCourseName(es.getCourseName());
@@ -137,24 +135,17 @@ public class TEMobileController {
         }
         MobileAnswerSubmitReponseBean ret = mobileService.answerSubmit(teStudent.getId(), param.getRecordId(), param.getMainNumber(),
                 param.getSubNumber(), param.getSubIndex(), param.getUrls());
-        String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(param.getRecordId());
-        ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
-        if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
-            WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
-            if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == param.getRecordId().longValue()) {
-                Map<String, Object> map = new HashMap<String, Object>();
-                map.put(SystemConstant.RECORD_ID, param.getRecordId());
-                map.put("mainNumber", param.getMainNumber());
-                map.put("subNumber", param.getSubNumber());
-                if (param.getSubIndex() != null) {
-                    map.put("subIndex", param.getSubIndex());
-                }
-                map.put("urls", param.getUrls());
-                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_ANSWER_UPLOAD.name(), map);
-                webSocketOeServer.sendMessage(websocketDto);
-            }
+        Map<String, Object> mapParameter = new HashMap<String, Object>();
+        mapParameter.put(SystemConstant.RECORD_ID, param.getRecordId());
+        mapParameter.put("mainNumber", param.getMainNumber());
+        mapParameter.put("subNumber", param.getSubNumber());
+        if (param.getSubIndex() != null) {
+            mapParameter.put("subIndex", param.getSubIndex());
         }
+        mapParameter.put("urls", param.getUrls());
+
+        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_UPLOAD.name(), param.getRecordId(), MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_READY, String.valueOf(teStudent.getId()), mapParameter, teStudent.getName());
+        mqDtoService.assembleSendOneWayMsg(mqDto);
         return ResultUtil.ok(ret);
     }
-
 }

+ 0 - 7
themis-exam/src/main/java/com/qmth/themis/exam/api/TEStudentController.java

@@ -287,13 +287,6 @@ public class TEStudentController {
         map.put(SystemConstant.ACCESS_TOKEN, test);
         map.put(SystemConstant.STUDENT_ACCOUNT, teStudent);
         map.put(SystemConstant.SESSION_ID, sessionId);
-
-        Map<String,Object> mapParameter = new HashMap<>();
-        mapParameter.put("examRecordId",1);
-        mapParameter.put("type","text");
-        mapParameter.put("content","test123");
-        MqDto mqDtoTest = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_IM_CLUSTERING.name(), 1, MqTagEnum.OE_IM_CLUSTERING, String.valueOf(teStudent.getId()), mapParameter, teStudent.getName());
-        mqDtoService.assembleSendOneWayMsg(mqDtoTest);
         return ResultUtil.ok(map);
     }
 

+ 11 - 22
themis-exam/src/main/java/com/qmth/themis/exam/api/TIeInvigilateCallMobileController.java

@@ -5,7 +5,6 @@ import com.qmth.themis.business.annotation.ApiJsonProperty;
 import com.qmth.themis.business.cache.ExamRecordCacheUtil;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.dto.MqDto;
-import com.qmth.themis.business.dto.WebsocketDto;
 import com.qmth.themis.business.entity.TIeExamInvigilateCallLog;
 import com.qmth.themis.business.enums.*;
 import com.qmth.themis.business.service.MqDtoService;
@@ -16,7 +15,6 @@ import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
 import com.qmth.themis.exam.config.DictionaryConfig;
 import com.qmth.themis.exam.websocket.WebSocketMobileServer;
-import com.qmth.themis.exam.websocket.WebSocketOeServer;
 import io.swagger.annotations.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -171,7 +169,6 @@ public class TIeInvigilateCallMobileController {
         if (Objects.equals(status, MonitorStatusSourceEnum.STOP.name()) && (Objects.isNull(mapParameter.get("type")) || Objects.equals(mapParameter.get("type"), ""))) {
             throw new BusinessException("异常类型不能为空");
         }
-        String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
         //获取考试记录缓存
         String liveUrl = null;
         if (Objects.nonNull(ExamRecordCacheUtil.getMonitorLiveUrl(recordId, source))) {
@@ -192,25 +189,17 @@ public class TIeInvigilateCallMobileController {
         mqDtoService.assembleSendOneWayMsg(mqDto);
         //监考监控通话信息 发送mq end
 
-        ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
-        if (Objects.nonNull(webSocketMap.get(clientWebsocketId)) && Objects.equals(status.name(), MonitorStatusSourceEnum.START.name())) {
-            WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
-            if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == recordId.longValue()) {
-                Map map = new HashMap<>();
-                map.put(SystemConstant.RECORD_ID, recordId);
-                map.put("source", source.name());
-                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_MONITOR_START.name(), map);
-                webSocketOeServer.sendMessage(websocketDto);
-            }
-        } else if (Objects.nonNull(webSocketMap.get(clientWebsocketId)) && Objects.equals(status.name(), MonitorStatusSourceEnum.STOP.name())) {
-            WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
-            if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == recordId.longValue()) {
-                Map map = new HashMap<>();
-                map.put(SystemConstant.RECORD_ID, recordId);
-                map.put("source", source.name());
-                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_MONITOR_STOP.name(), map);
-                webSocketOeServer.sendMessage(websocketDto);
-            }
+        Map mqMap = new HashMap<>();
+        mqMap.put(SystemConstant.RECORD_ID, recordId);
+        mqMap.put("source", source.name());
+        if (Objects.equals(status.name(), MonitorStatusSourceEnum.START.name())) {
+            //监控开始
+            MqDto mqDtoStart = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.MONITOR_START.name(), recordId, MqTagEnum.MONITOR_START, String.valueOf(recordId), mqMap, String.valueOf(recordId));
+            mqDtoService.assembleSendOneWayMsg(mqDtoStart);
+        } else if (Objects.equals(status.name(), MonitorStatusSourceEnum.STOP.name())) {
+            //监控结束
+            MqDto mqDtoStop = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.MONITOR_STOP.name(), recordId, MqTagEnum.MONITOR_STOP, String.valueOf(recordId), mqMap, String.valueOf(recordId));
+            mqDtoService.assembleSendOneWayMsg(mqDtoStop);
         }
         Map map = new HashMap();
         map.put("status", status.name());

+ 19 - 55
themis-exam/src/main/java/com/qmth/themis/exam/config/ExamConstant.java

@@ -1,17 +1,14 @@
 package com.qmth.themis.exam.config;
 
-import com.qmth.themis.business.cache.ExamRecordCacheUtil;
+import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.dto.WebsocketDto;
-import com.qmth.themis.business.enums.MonitorVideoSourceEnum;
-import com.qmth.themis.business.enums.WebsocketTypeEnum;
-import com.qmth.themis.exam.websocket.WebSocketMobileServer;
-import com.qmth.themis.exam.websocket.WebSocketOeServer;
+import com.qmth.themis.business.dto.MqDto;
+import com.qmth.themis.business.enums.MqTagEnum;
+import com.qmth.themis.business.service.MqDtoService;
+import com.qmth.themis.business.util.MqUtil;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @Description: 考试系统常量
@@ -29,41 +26,18 @@ public class ExamConstant {
      * @param clientStop
      */
     public static void sendExamStopMsg(Long recordId, boolean clientStop) {
-        String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
+        MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
+        MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
+        Map mapParameter = new HashMap<>();
+        mapParameter.put(SystemConstant.RECORD_ID, recordId);
         //客户端考试结束
         if (clientStop) {
-            ConcurrentHashMap<String, WebSocketOeServer> webSocketOeMap = WebSocketOeServer.getWebSocketMap();
-            if (Objects.nonNull(clientWebsocketId) && Objects.nonNull(webSocketOeMap.get(clientWebsocketId))) {
-                WebSocketOeServer webSocketOeServer = webSocketOeMap.get(clientWebsocketId);
-                if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == recordId.longValue()) {
-                    Map map = new HashMap<>();
-                    map.put(SystemConstant.RECORD_ID, recordId);
-                    WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_STOP.name(), map);
-                    webSocketOeServer.sendMessage(websocketDto);
-                }
-            }
+            MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_WEBSOCKET_EXAM_STOP.name(), recordId, MqTagEnum.OE_WEBSOCKET_EXAM_STOP, String.valueOf(recordId), mapParameter, String.valueOf(recordId));
+            mqDtoService.assembleSendOneWayMsg(mqDto);
         }
-
         //移动端考试结束
-        ConcurrentHashMap<String, WebSocketMobileServer> webSocketMap = WebSocketMobileServer.getWebSocketMap();
-        if (Objects.nonNull(clientWebsocketId) && Objects.nonNull(webSocketMap.get(clientWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_FIRST.name()))) {
-            WebSocketMobileServer webSocketMobileServer = webSocketMap.get(clientWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_FIRST.name());
-            if (Objects.nonNull(webSocketMobileServer.getRecordId()) && webSocketMobileServer.getRecordId().longValue() == recordId.longValue()) {
-                Map map = new HashMap<>();
-                map.put(SystemConstant.RECORD_ID, recordId);
-                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_STOP.name(), map);
-                webSocketMobileServer.sendMessage(websocketDto);
-            }
-        }
-        if (Objects.nonNull(clientWebsocketId) && Objects.nonNull(webSocketMap.get(clientWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_SECOND.name()))) {
-            WebSocketMobileServer webSocketMobileServer = webSocketMap.get(clientWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_SECOND.name());
-            if (Objects.nonNull(webSocketMobileServer.getRecordId()) && webSocketMobileServer.getRecordId().longValue() == recordId.longValue()) {
-                Map map = new HashMap<>();
-                map.put(SystemConstant.RECORD_ID, recordId);
-                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_STOP.name(), map);
-                webSocketMobileServer.sendMessage(websocketDto);
-            }
-        }
+        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_STOP.name(), recordId, MqTagEnum.EXAM_STOP, String.valueOf(recordId), mapParameter, String.valueOf(recordId));
+        mqDtoService.assembleSendOneWayMsg(mqDto);
     }
 
     /**
@@ -72,21 +46,11 @@ public class ExamConstant {
      * @param recordId
      */
     public static void sendExamStartMsg(Long recordId) {
-        //移动端考试开始
-        ConcurrentHashMap<String, WebSocketMobileServer> webSocketMap = WebSocketMobileServer.getWebSocketMap();
-        if (Objects.nonNull(webSocketMap.get(recordId + "-" + MonitorVideoSourceEnum.MOBILE_FIRST.name()))) {
-            WebSocketMobileServer webSocketMobileServer = webSocketMap.get(recordId + "-" + MonitorVideoSourceEnum.MOBILE_FIRST.name());
-            Map map = new HashMap<>();
-            map.put(SystemConstant.RECORD_ID, recordId);
-            WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_START.name(), map);
-            webSocketMobileServer.sendMessage(websocketDto);
-        }
-        if (Objects.nonNull(webSocketMap.get(recordId + "-" + MonitorVideoSourceEnum.MOBILE_SECOND.name()))) {
-            WebSocketMobileServer webSocketMobileServer = webSocketMap.get(recordId + "-" + MonitorVideoSourceEnum.MOBILE_SECOND.name());
-            Map map = new HashMap<>();
-            map.put(SystemConstant.RECORD_ID, recordId);
-            WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_START.name(), map);
-            webSocketMobileServer.sendMessage(websocketDto);
-        }
+        MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
+        MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
+        Map mapParameter = new HashMap<>();
+        mapParameter.put(SystemConstant.RECORD_ID, recordId);
+        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_START.name(), recordId, MqTagEnum.EXAM_START, String.valueOf(recordId), mapParameter, String.valueOf(recordId));
+        mqDtoService.assembleSendOneWayMsg(mqDto);
     }
 }

+ 73 - 7
themis-exam/src/main/java/com/qmth/themis/exam/listener/service/impl/MqOeLogicServiceImpl.java

@@ -10,18 +10,15 @@ import com.qmth.themis.business.dto.WebsocketDto;
 import com.qmth.themis.business.entity.TEExamStudentLog;
 import com.qmth.themis.business.entity.TIeExamInvigilateNotice;
 import com.qmth.themis.business.entity.TMRocketMessage;
-import com.qmth.themis.business.enums.FinishTypeEnum;
-import com.qmth.themis.business.enums.MessageTypeEnum;
-import com.qmth.themis.business.enums.MqTagEnum;
-import com.qmth.themis.business.enums.WebsocketTypeEnum;
+import com.qmth.themis.business.enums.*;
 import com.qmth.themis.business.service.TEExamStudentLogService;
 import com.qmth.themis.business.service.TEExamStudentService;
 import com.qmth.themis.business.service.TIeExamInvigilateNoticeService;
 import com.qmth.themis.business.service.TMRocketMessageService;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
-import com.qmth.themis.exam.config.ExamConstant;
 import com.qmth.themis.exam.listener.service.MqOeLogicService;
+import com.qmth.themis.exam.websocket.WebSocketMobileServer;
 import com.qmth.themis.exam.websocket.WebSocketOeServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -199,6 +196,39 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
                     teExamStudentLogService.save(teExamStudentLog);
                 }
             }
+        } else if (Objects.equals(MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_READY.name(), tag)//移动端拍照/录音扫描完成
+                || Objects.equals(MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_UPLOAD.name(), tag)//移动端拍照/录音上传成功
+                || Objects.equals(MqTagEnum.OE_WEBSOCKET_EXAM_STOP.name(), tag)//考试结束
+                || Objects.equals(MqTagEnum.MONITOR_START.name(), tag)//监控开始
+                || Objects.equals(MqTagEnum.MONITOR_STOP.name(), tag)) {//监控结束
+            Long recordId = Long.parseLong(String.valueOf(mqDto.getBody()));
+            String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
+            if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
+                WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
+                if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == recordId.longValue()) {
+                    WebsocketDto websocketDto = null;
+                    switch (tag.toUpperCase()) {
+                        case "OE_WEBSOCKET_MOBILE_ANSWER_READY":
+                            websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_ANSWER_READY.name(), mqDto.getProperties());
+                            break;
+                        case "OE_WEBSOCKET_MOBILE_ANSWER_UPLOAD":
+                            websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_ANSWER_UPLOAD.name(), mqDto.getProperties());
+                            break;
+                        case "OE_WEBSOCKET_EXAM_STOP":
+                            websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_STOP.name(), mqDto.getProperties());
+                            break;
+                        case "MONITOR_START":
+                            websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_MONITOR_START.name(), mqDto.getProperties());
+                            break;
+                        case "MONITOR_STOP":
+                            websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_MONITOR_STOP.name(), mqDto.getProperties());
+                            break;
+                        default:
+                            break;
+                    }
+                    webSocketOeServer.sendMessage(websocketDto);
+                }
+            }
         }
         mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
@@ -222,9 +252,45 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
     public void execMqOeMobileLogic(MqDto mqDto, String key) {
         Gson gson = new Gson();
         String tag = mqDto.getTag();
-        if (Objects.equals(MqTagEnum.EXAM_STOP.name(), tag)) {//考试退出
+        ConcurrentHashMap<String, WebSocketMobileServer> webSocketMap = WebSocketMobileServer.getWebSocketMap();
+        if (Objects.equals(MqTagEnum.EXAM_STOP.name(), tag)//考试退出
+                || Objects.equals(MqTagEnum.EXAM_START.name(), tag)) {//考试开始
             Long recordId = Long.parseLong(String.valueOf(mqDto.getBody()));
-            ExamConstant.sendExamStopMsg(recordId, true);
+            String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
+            if (Objects.nonNull(clientWebsocketId) && Objects.nonNull(webSocketMap.get(clientWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_FIRST.name()))) {
+                WebSocketMobileServer webSocketMobileServer = webSocketMap.get(clientWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_FIRST.name());
+                if (Objects.nonNull(webSocketMobileServer.getRecordId()) && webSocketMobileServer.getRecordId().longValue() == recordId.longValue()) {
+                    WebsocketDto websocketDto = null;
+                    switch (tag.toUpperCase()) {
+                        case "EXAM_STOP":
+                            websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_STOP.name(), mqDto.getProperties());
+                            break;
+                        case "EXAM_START":
+                            websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_START.name(), mqDto.getProperties());
+                            break;
+                        default:
+                            break;
+                    }
+                    webSocketMobileServer.sendMessage(websocketDto);
+                }
+            }
+            if (Objects.nonNull(clientWebsocketId) && Objects.nonNull(webSocketMap.get(clientWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_SECOND.name()))) {
+                WebSocketMobileServer webSocketMobileServer = webSocketMap.get(clientWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_SECOND.name());
+                if (Objects.nonNull(webSocketMobileServer.getRecordId()) && webSocketMobileServer.getRecordId().longValue() == recordId.longValue()) {
+                    WebsocketDto websocketDto = null;
+                    switch (tag.toUpperCase()) {
+                        case "EXAM_STOP":
+                            websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_STOP.name(), mqDto.getProperties());
+                            break;
+                        case "EXAM_START":
+                            websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_START.name(), mqDto.getProperties());
+                            break;
+                        default:
+                            break;
+                    }
+                    webSocketMobileServer.sendMessage(websocketDto);
+                }
+            }
         }
         mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);

+ 2 - 2
themis-exam/src/main/java/com/qmth/themis/exam/start/StartRunning.java

@@ -43,8 +43,8 @@ public class StartRunning implements CommandLineRunner {
         /**
          * websocket mq start
          */
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, dictionaryConfig.mqConfigDomain().getMap().get(MqGroupEnum.WEBSOCKET_OE_GROUP.name()), dictionaryConfig.mqConfigDomain().getTopic(), MqTagEnum.OE_HARD_FINISH.name() + "||" + MqTagEnum.OE_IM_BROADCASTING.name() + "||" + MqTagEnum.OE_IM_CLUSTERING.name() + "||" + MqTagEnum.OE_LIVENESS_VERIFY.name() + "||" + MqTagEnum.OE_MONITOR_FINISH.name() + "||" + MqTagEnum.OE_WARNING_FINISH.name(), MessageModel.BROADCASTING, SpringContextHolder.getBean(WebSocketOeServer.class));
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, dictionaryConfig.mqConfigDomain().getMap().get(MqGroupEnum.WEBSOCKET_OE_MOBILE_GROUP.name()), dictionaryConfig.mqConfigDomain().getTopic(), MqTagEnum.EXAM_STOP.name(), MessageModel.BROADCASTING, SpringContextHolder.getBean(WebSocketMobileServer.class));
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, dictionaryConfig.mqConfigDomain().getMap().get(MqGroupEnum.WEBSOCKET_OE_GROUP.name()), dictionaryConfig.mqConfigDomain().getTopic(), MqTagEnum.OE_HARD_FINISH.name() + "||" + MqTagEnum.OE_IM_BROADCASTING.name() + "||" + MqTagEnum.OE_IM_CLUSTERING.name() + "||" + MqTagEnum.OE_LIVENESS_VERIFY.name() + "||" + MqTagEnum.OE_MONITOR_FINISH.name() + "||" + MqTagEnum.OE_WARNING_FINISH.name() + "||" + MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_READY.name() + "||" + MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_UPLOAD.name() + "||" + MqTagEnum.OE_WEBSOCKET_EXAM_STOP.name() + "||" + MqTagEnum.MONITOR_START.name() + "||" + MqTagEnum.MONITOR_STOP.name(), MessageModel.BROADCASTING, SpringContextHolder.getBean(WebSocketOeServer.class));
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, dictionaryConfig.mqConfigDomain().getMap().get(MqGroupEnum.WEBSOCKET_OE_MOBILE_GROUP.name()), dictionaryConfig.mqConfigDomain().getTopic(), MqTagEnum.EXAM_STOP.name() + "||" + MqTagEnum.EXAM_START.name(), MessageModel.BROADCASTING, SpringContextHolder.getBean(WebSocketMobileServer.class));
         SystemConstant.initTempFiles();
         log.info("服务器启动时执行 end");
     }

+ 10 - 12
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketMobileServer.java

@@ -10,7 +10,9 @@ import com.qmth.themis.business.dto.WebsocketDto;
 import com.qmth.themis.business.entity.TBSession;
 import com.qmth.themis.business.enums.MonitorStatusSourceEnum;
 import com.qmth.themis.business.enums.MonitorVideoSourceEnum;
+import com.qmth.themis.business.enums.MqTagEnum;
 import com.qmth.themis.business.enums.WebsocketTypeEnum;
+import com.qmth.themis.business.service.MqDtoService;
 import com.qmth.themis.business.util.*;
 import com.qmth.themis.common.contanst.Constants;
 import com.qmth.themis.common.enums.ExceptionResultEnum;
@@ -118,18 +120,14 @@ public class WebSocketMobileServer implements Concurrently {
 //            subOnlineCount();
             //判断是否是正常退出
             ExamRecordCacheUtil.setMonitorStatus(recordId, this.source, MonitorStatusSourceEnum.STOP, true);
-            ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
-            String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
-            if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
-                WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
-                if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == this.recordId.longValue()) {
-                    Map map = new HashMap<>();
-                    map.put(SystemConstant.RECORD_ID, this.recordId);
-                    map.put("source", this.source.name());
-                    WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_MONITOR_STOP.name(), map);
-                    webSocketOeServer.sendMessage(websocketDto);
-                }
-            }
+            MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
+            MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
+            Map mqMap = new HashMap<>();
+            mqMap.put(SystemConstant.RECORD_ID, this.recordId);
+            mqMap.put("source", this.source.name());
+            //监控结束
+            MqDto mqDtoStop = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.MONITOR_STOP.name(), recordId, MqTagEnum.MONITOR_STOP, String.valueOf(recordId), mqMap, String.valueOf(recordId));
+            mqDtoService.assembleSendOneWayMsg(mqDtoStop);
         }
 //        log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.sessionId, getOnlineCount(), this.updateTime);
         log.info("用户退出:{},updateTime:{}", this.websocketSessionId, this.updateTime);