Преглед на файлове

webSocket record修改为sessionId

wangliang преди 4 години
родител
ревизия
b9b76314fa

+ 3 - 3
themis-admin/src/main/java/com/qmth/themis/admin/websocket/WebSocketAdminServer.java

@@ -49,7 +49,7 @@ public class WebSocketAdminServer
     private Session session = null;
     private String sessionId = null, ip = null;
     private Long userId = null;
-    private String platform = null, deviceId = null, Authorization = null;
+    private String platform = null, deviceId = null, authorization = null;
     private Long time = null;
     private RedisUtil redisUtil;
     private Long updateTime = null;
@@ -75,12 +75,12 @@ public class WebSocketAdminServer
             throw new BusinessException(ExceptionResultEnum.DEVICE_ID_INVALID);
         }
         this.deviceId = String.valueOf(mapParameter.get("deviceId").get(0));
-        this.Authorization = String.valueOf(mapParameter.get("Authorization").get(0));
+        this.authorization = String.valueOf(mapParameter.get("authorization").get(0));
         this.time = Long.parseLong(String.valueOf(mapParameter.get("time").get(0)));
         this.userId = Long.parseLong(String.valueOf(mapParameter.get("userId").get(0)));
 
         redisUtil = SpringContextHolder.getBean(RedisUtil.class);
-        TBSession tbSession = AuthUtil.websocketAuthInterceptor(Platform.valueOf(platform), deviceId, Authorization, String.valueOf(mapParameter.get("time").get(0)), SystemConstant.GET, url);
+        TBSession tbSession = AuthUtil.websocketAuthInterceptor(Platform.valueOf(platform), deviceId, authorization, String.valueOf(mapParameter.get("time").get(0)), SystemConstant.GET, url);
         this.session = session;
         session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
         this.sessionId = tbSession.getId();

+ 4 - 0
themis-business/src/main/java/com/qmth/themis/business/cache/ExamRecordCacheUtil.java

@@ -236,6 +236,10 @@ public class ExamRecordCacheUtil {
         return (WebsocketStatusEnum) redisUtil.get(RedisKeyHelper.examRecordCacheKey(recordId), ExamRecordFieldEnum.client_websocket_status.getCode());
     }
 
+    public static String getClientWebsocketId(Long recordId) {
+        return (String) redisUtil.get(RedisKeyHelper.examRecordCacheKey(recordId), ExamRecordFieldEnum.client_websocket_id.getCode());
+    }
+
     public static void setClientWebsocketStatus(Long recordId, WebsocketStatusEnum websocketStatusEnum, boolean update) {
         redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), ExamRecordFieldEnum.client_websocket_status.getCode(), websocketStatusEnum);
         if (update) {

+ 3 - 2
themis-exam/src/main/java/com/qmth/themis/exam/api/TEExamController.java

@@ -129,8 +129,9 @@ public class TEExamController {
                 throw new BusinessException("考试状态出错");
             }
         }
-        ConcurrentHashMap<Long, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
-        WebSocketOeServer webSocketOeServer = webSocketMap.get(Long.parseLong(recordId));
+        ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
+        String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(Long.parseLong(recordId));
+        WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
         if (Objects.nonNull(webSocketOeServer)) {
             webSocketOeServer.onClose();
         }

+ 38 - 43
themis-exam/src/main/java/com/qmth/themis/exam/api/TEMobileController.java

@@ -1,21 +1,5 @@
 package com.qmth.themis.exam.api;
 
-import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.annotation.Resource;
-
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RequestParam;
-import org.springframework.web.bind.annotation.RestController;
-import org.springframework.web.multipart.MultipartFile;
-
 import com.qmth.themis.business.bean.exam.AnswerReadyParamBean;
 import com.qmth.themis.business.bean.exam.AnswerReadyResponseBean;
 import com.qmth.themis.business.bean.exam.MobileAnswerSubmitParamBean;
@@ -37,12 +21,17 @@ import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
 import com.qmth.themis.exam.websocket.WebSocketOeServer;
+import io.swagger.annotations.*;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.multipart.MultipartFile;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
+import javax.annotation.Resource;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Api(tags = "移动端接口")
 @RestController
@@ -94,20 +83,23 @@ public class TEMobileController {
         if (ExamRecordStatusEnum.FINISHED.equals(sta) || ExamRecordStatusEnum.PERSISTED.equals(sta)) {
             throw new BusinessException(ExceptionResultEnum.NOT_FOUND_EXAM_STUDENT);
         }
-        ConcurrentHashMap<Long, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
-        WebSocketOeServer webSocketOeServer = webSocketMap.get(param.getRecordId());
+        ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
+        String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(param.getRecordId());
+        WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
         if (webSocketOeServer == null) {
             throw new BusinessException("消息连接不存在");
         }
-        Map<String, Object> map = new HashMap<String, Object>();
-        map.put("recordId", param.getRecordId());
-        map.put("mainNumber", param.getMainNumber());
-        map.put("subNumber", param.getSubNumber());
-        if (param.getSubIndex() != null) {
-            map.put("subIndex", param.getSubIndex());
+        if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == param.getRecordId().longValue()) {
+            Map<String, Object> map = new HashMap<String, Object>();
+            map.put("recordId", param.getRecordId());
+            map.put("mainNumber", param.getMainNumber());
+            map.put("subNumber", param.getSubNumber());
+            if (param.getSubIndex() != null) {
+                map.put("subIndex", param.getSubIndex());
+            }
+            WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_ANSWER_READY.name(), map);
+            webSocketOeServer.sendMessage(websocketDto);
         }
-        WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_ANSWER_READY.name(), map);
-        webSocketOeServer.sendMessage(websocketDto);
         ExamStudentCacheBean es = examStudentService.getExamStudentCacheBean(esId);
         AnswerReadyResponseBean ret = new AnswerReadyResponseBean();
         ret.setCourseName(es.getCourseName());
@@ -145,19 +137,22 @@ public class TEMobileController {
         }
         MobileAnswerSubmitReponseBean ret = mobileService.answerSubmit(teStudent.getId(), param.getRecordId(), param.getMainNumber(),
                 param.getSubNumber(), param.getSubIndex(), param.getUrls());
-        ConcurrentHashMap<Long, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
-        if (Objects.nonNull(webSocketMap.get(param.getRecordId()))) {
-            WebSocketOeServer webSocketOeServer = webSocketMap.get(param.getRecordId());
-            Map<String, Object> map = new HashMap<String, Object>();
-            map.put(SystemConstant.RECORD_ID, param.getRecordId());
-            map.put("mainNumber", param.getMainNumber());
-            map.put("subNumber", param.getSubNumber());
-            if (param.getSubIndex() != null) {
-                map.put("subIndex", param.getSubIndex());
+        String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(param.getRecordId());
+        ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
+        if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
+            WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
+            if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == param.getRecordId().longValue()) {
+                Map<String, Object> map = new HashMap<String, Object>();
+                map.put(SystemConstant.RECORD_ID, param.getRecordId());
+                map.put("mainNumber", param.getMainNumber());
+                map.put("subNumber", param.getSubNumber());
+                if (param.getSubIndex() != null) {
+                    map.put("subIndex", param.getSubIndex());
+                }
+                map.put("urls", param.getUrls());
+                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_ANSWER_UPLOAD.name(), map);
+                webSocketOeServer.sendMessage(websocketDto);
             }
-            map.put("urls", param.getUrls());
-            WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_ANSWER_UPLOAD.name(), map);
-            webSocketOeServer.sendMessage(websocketDto);
         }
         return ResultUtil.ok(ret);
     }

+ 23 - 17
themis-exam/src/main/java/com/qmth/themis/exam/api/TIeInvigilateCallMobileController.java

@@ -75,7 +75,8 @@ public class TIeInvigilateCallMobileController {
                 throw new BusinessException("观看地址不能为空");
             }
             ConcurrentHashMap<String, WebSocketMobileServer> webSocketMap = WebSocketMobileServer.getWebSocketMap();
-            if (Objects.isNull(webSocketMap.get(recordId + "-" + source.name()))) {
+            String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
+            if (Objects.isNull(webSocketMap.get(clientWebsocketId + "-" + source.name()))) {
                 throw new BusinessException("网络连接失败");
             }
             String liveUrl = String.valueOf(mapParameter.get("liveUrl"));
@@ -165,7 +166,8 @@ public class TIeInvigilateCallMobileController {
             throw new BusinessException("异常类型不能为空");
         }
         ConcurrentHashMap<String, WebSocketMobileServer> mobileWebSocketMap = WebSocketMobileServer.getWebSocketMap();
-        if (Objects.isNull(mobileWebSocketMap.get(recordId + "-" + source.name()))) {
+        String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
+        if (Objects.isNull(mobileWebSocketMap.get(clientWebsocketId + "-" + source.name()))) {
             throw new BusinessException("网络连接失败");
         }
         //获取考试记录缓存
@@ -187,21 +189,25 @@ public class TIeInvigilateCallMobileController {
         mqDtoService.assembleSendOneWayMsg(mqDto);
         //监考监控通话信息 发送mq end
 
-        ConcurrentHashMap<Long, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
-        if (Objects.nonNull(webSocketMap.get(recordId)) && Objects.equals(status.name(), MonitorStatusSourceEnum.START.name())) {
-            WebSocketOeServer webSocketOeServer = webSocketMap.get(recordId);
-            Map map = new HashMap<>();
-            map.put(SystemConstant.RECORD_ID, recordId);
-            map.put("source", source.name());
-            WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_MONITOR_START.name(), map);
-            webSocketOeServer.sendMessage(websocketDto);
-        } else if (Objects.nonNull(webSocketMap.get(recordId)) && Objects.equals(status.name(), MonitorStatusSourceEnum.STOP.name())) {
-            WebSocketOeServer webSocketOeServer = webSocketMap.get(recordId);
-            Map map = new HashMap<>();
-            map.put(SystemConstant.RECORD_ID, recordId);
-            map.put("monitorVideoSource", source.name());
-            WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_MONITOR_STOP.name(), map);
-            webSocketOeServer.sendMessage(websocketDto);
+        ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
+        if (Objects.nonNull(webSocketMap.get(clientWebsocketId)) && Objects.equals(status.name(), MonitorStatusSourceEnum.START.name())) {
+            WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
+            if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == recordId.longValue()) {
+                Map map = new HashMap<>();
+                map.put(SystemConstant.RECORD_ID, recordId);
+                map.put("source", source.name());
+                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_MONITOR_START.name(), map);
+                webSocketOeServer.sendMessage(websocketDto);
+            }
+        } else if (Objects.nonNull(webSocketMap.get(clientWebsocketId)) && Objects.equals(status.name(), MonitorStatusSourceEnum.STOP.name())) {
+            WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
+            if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == recordId.longValue()) {
+                Map map = new HashMap<>();
+                map.put(SystemConstant.RECORD_ID, recordId);
+                map.put("monitorVideoSource", source.name());
+                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_MONITOR_STOP.name(), map);
+                webSocketOeServer.sendMessage(websocketDto);
+            }
         }
         Map map = new HashMap();
         map.put("status", status.name());

+ 27 - 19
themis-exam/src/main/java/com/qmth/themis/exam/config/ExamConstant.java

@@ -1,5 +1,6 @@
 package com.qmth.themis.exam.config;
 
+import com.qmth.themis.business.cache.ExamRecordCacheUtil;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.dto.WebsocketDto;
 import com.qmth.themis.business.enums.MonitorVideoSourceEnum;
@@ -28,33 +29,40 @@ public class ExamConstant {
      * @param clientStop
      */
     public static void sendExamStopMsg(Long recordId, boolean clientStop) {
+        String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
         //客户端考试结束
         if (clientStop) {
-            ConcurrentHashMap<Long, WebSocketOeServer> webSocketOeMap = WebSocketOeServer.getWebSocketMap();
-            if (Objects.nonNull(webSocketOeMap.get(recordId))) {
-                WebSocketOeServer webSocketOeServer = webSocketOeMap.get(recordId);
-                Map map = new HashMap<>();
-                map.put(SystemConstant.RECORD_ID, recordId);
-                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_STOP.name(), map);
-                webSocketOeServer.sendMessage(websocketDto);
+            ConcurrentHashMap<String, WebSocketOeServer> webSocketOeMap = WebSocketOeServer.getWebSocketMap();
+            if (Objects.nonNull(webSocketOeMap.get(clientWebsocketId))) {
+                WebSocketOeServer webSocketOeServer = webSocketOeMap.get(clientWebsocketId);
+                if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == recordId.longValue()) {
+                    Map map = new HashMap<>();
+                    map.put(SystemConstant.RECORD_ID, recordId);
+                    WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_STOP.name(), map);
+                    webSocketOeServer.sendMessage(websocketDto);
+                }
             }
         }
 
         //移动端考试结束
         ConcurrentHashMap<String, WebSocketMobileServer> webSocketMap = WebSocketMobileServer.getWebSocketMap();
-        if (Objects.nonNull(webSocketMap.get(recordId + "-" + MonitorVideoSourceEnum.MOBILE_FIRST.name()))) {
-            WebSocketMobileServer webSocketMobileServer = webSocketMap.get(recordId + "-" + MonitorVideoSourceEnum.MOBILE_FIRST.name());
-            Map map = new HashMap<>();
-            map.put(SystemConstant.RECORD_ID, recordId);
-            WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_STOP.name(), map);
-            webSocketMobileServer.sendMessage(websocketDto);
+        if (Objects.nonNull(webSocketMap.get(clientWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_FIRST.name()))) {
+            WebSocketMobileServer webSocketMobileServer = webSocketMap.get(clientWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_FIRST.name());
+            if (Objects.nonNull(webSocketMobileServer.getRecordId()) && webSocketMobileServer.getRecordId().longValue() == recordId.longValue()) {
+                Map map = new HashMap<>();
+                map.put(SystemConstant.RECORD_ID, recordId);
+                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_STOP.name(), map);
+                webSocketMobileServer.sendMessage(websocketDto);
+            }
         }
-        if (Objects.nonNull(webSocketMap.get(recordId + "-" + MonitorVideoSourceEnum.MOBILE_SECOND.name()))) {
-            WebSocketMobileServer webSocketMobileServer = webSocketMap.get(recordId + "-" + MonitorVideoSourceEnum.MOBILE_SECOND.name());
-            Map map = new HashMap<>();
-            map.put(SystemConstant.RECORD_ID, recordId);
-            WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_STOP.name(), map);
-            webSocketMobileServer.sendMessage(websocketDto);
+        if (Objects.nonNull(webSocketMap.get(clientWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_SECOND.name()))) {
+            WebSocketMobileServer webSocketMobileServer = webSocketMap.get(clientWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_SECOND.name());
+            if (Objects.nonNull(webSocketMobileServer.getRecordId()) && webSocketMobileServer.getRecordId().longValue() == recordId.longValue()) {
+                Map map = new HashMap<>();
+                map.put(SystemConstant.RECORD_ID, recordId);
+                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.EXAM_STOP.name(), map);
+                webSocketMobileServer.sendMessage(websocketDto);
+            }
         }
     }
 

+ 53 - 41
themis-exam/src/main/java/com/qmth/themis/exam/listener/service/impl/MqOeLogicServiceImpl.java

@@ -96,24 +96,28 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
     @Transactional
     public void execMqOeLogic(MqDto mqDto, String key) throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
         Gson gson = new Gson();
-        ConcurrentHashMap<Long, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
+        ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
         String tag = mqDto.getTag();
         if (Objects.equals(MqTagEnum.OE_MONITOR_FINISH.name(), tag)) {//强制离线交卷
             Set examRecordId = JacksonUtil.readJson(String.valueOf(mqDto.getBody()), Set.class);
             examRecordId.forEach(s -> {
                 Long recordId = Long.parseLong(String.valueOf(s));
-                if (Objects.nonNull(webSocketMap.get(recordId))) {
+                //获取考试记录的客户端websocketId
+                String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
+                if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
                     Long examStudentId = ExamRecordCacheUtil.getExamStudentId(recordId);
                     ExamStudentCacheBean examStudentCacheBean = teExamStudentService.getExamStudentCacheBean(examStudentId);
-                    WebSocketOeServer webSocketOeServer = webSocketMap.get(recordId);
-                    Map map = new HashMap<>();
-                    map.put("form", mqDto.getObjName());
-                    map.put(SystemConstant.RECORD_ID, recordId);
-                    map.put(SystemConstant.MESSAGE, FinishTypeEnum.valueOf(String.valueOf(mqDto.getProperties().get("type"))).getCode());
-                    WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.INVIGILATE_STOP_EXAM.name(), map);
-                    webSocketOeServer.sendMessage(websocketDto);
-                    TEExamStudentLog teExamStudentLog = new TEExamStudentLog(mqDto.getType().name(), mqDto.getType().getCode(), mqDto.getType().getCode(), examStudentCacheBean.getStudentId(), examStudentId, recordId, mqDto.getType().getCode());
-                    teExamStudentLogService.save(teExamStudentLog);
+                    WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
+                    if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == recordId.longValue()) {
+                        Map map = new HashMap<>();
+                        map.put("form", mqDto.getObjName());
+                        map.put(SystemConstant.RECORD_ID, recordId);
+                        map.put(SystemConstant.MESSAGE, FinishTypeEnum.valueOf(String.valueOf(mqDto.getProperties().get("type"))).getCode());
+                        WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.INVIGILATE_STOP_EXAM.name(), map);
+                        webSocketOeServer.sendMessage(websocketDto);
+                        TEExamStudentLog teExamStudentLog = new TEExamStudentLog(mqDto.getType().name(), mqDto.getType().getCode(), mqDto.getType().getCode(), examStudentCacheBean.getStudentId(), examStudentId, recordId, mqDto.getType().getCode());
+                        teExamStudentLogService.save(teExamStudentLog);
+                    }
                 }
             });
         }
@@ -139,36 +143,42 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
             Set examRecordId = JacksonUtil.readJson(String.valueOf(mqDto.getBody()), Set.class);
             examRecordId.forEach(s -> {
                 Long recordId = Long.parseLong(String.valueOf(s));
-                if (Objects.nonNull(webSocketMap.get(recordId))) {
+                String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
+                if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
                     Long examStudentId = ExamRecordCacheUtil.getExamStudentId(recordId);
                     ExamStudentCacheBean examStudentCacheBean = teExamStudentService.getExamStudentCacheBean(examStudentId);
-                    WebSocketOeServer webSocketOeServer = webSocketMap.get(recordId);
-                    Map map = new HashMap<>();
-                    map.put(SystemConstant.RECORD_ID, recordId);
-                    map.put(SystemConstant.BREACH_STATUS, FinishTypeEnum.valueOf(String.valueOf(mqDto.getProperties().get("type"))).getCode());
-                    WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.BREACH_STOP_EXAM.name(), map);
-                    webSocketOeServer.sendMessage(websocketDto);
-                    TEExamStudentLog teExamStudentLog = new TEExamStudentLog(mqDto.getType().name(), mqDto.getType().getCode(), mqDto.getType().getCode(), examStudentCacheBean.getStudentId(), examStudentId, recordId, mqDto.getType().getCode());
-                    teExamStudentLogService.save(teExamStudentLog);
+                    WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
+                    if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == recordId.longValue()) {
+                        Map map = new HashMap<>();
+                        map.put(SystemConstant.RECORD_ID, recordId);
+                        map.put(SystemConstant.BREACH_STATUS, FinishTypeEnum.valueOf(String.valueOf(mqDto.getProperties().get("type"))).getCode());
+                        WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.BREACH_STOP_EXAM.name(), map);
+                        webSocketOeServer.sendMessage(websocketDto);
+                        TEExamStudentLog teExamStudentLog = new TEExamStudentLog(mqDto.getType().name(), mqDto.getType().getCode(), mqDto.getType().getCode(), examStudentCacheBean.getStudentId(), examStudentId, recordId, mqDto.getType().getCode());
+                        teExamStudentLogService.save(teExamStudentLog);
+                    }
                 }
             });
         } else if (Objects.equals(MqTagEnum.OE_IM_CLUSTERING.name(), tag)) {//点对点消息
             Long recordId = Long.parseLong(String.valueOf(mqDto.getBody()));
-            if (Objects.nonNull(webSocketMap.get(recordId))) {
+            String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
+            if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
                 Long examId = ExamRecordCacheUtil.getExamId(recordId);
                 Long examStudentId = ExamRecordCacheUtil.getExamStudentId(recordId);
                 Long examActivityId = ExamRecordCacheUtil.getExamActivityId(recordId);
-                WebSocketOeServer webSocketOeServer = webSocketMap.get(recordId);
-                Map<String, Object> prop = mqDto.getProperties();
-                Map map = new HashMap<>();
-                map.put(SystemConstant.RECORD_ID, recordId);
-                map.put("from", mqDto.getObjName());
-                map.put("type", prop.get("type"));
-                map.put("content", prop.get("content"));
-                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.INVIGILATE_NOTICE.name(), map);
-                webSocketOeServer.sendMessage(websocketDto);
-                TIeExamInvigilateNotice tIeExamInvigilateNotice = new TIeExamInvigilateNotice(examId, examActivityId, recordId, Long.parseLong(mqDto.getObjId()), examStudentId, MessageTypeEnum.valueOf(String.valueOf(prop.get("type")).toUpperCase()), String.valueOf(prop.get("content")));
-                tIeExamInvigilateNoticeService.saveOrUpdate(tIeExamInvigilateNotice);
+                WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
+                if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == recordId.longValue()) {
+                    Map<String, Object> prop = mqDto.getProperties();
+                    Map map = new HashMap<>();
+                    map.put(SystemConstant.RECORD_ID, recordId);
+                    map.put("from", mqDto.getObjName());
+                    map.put("type", prop.get("type"));
+                    map.put("content", prop.get("content"));
+                    WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.INVIGILATE_NOTICE.name(), map);
+                    webSocketOeServer.sendMessage(websocketDto);
+                    TIeExamInvigilateNotice tIeExamInvigilateNotice = new TIeExamInvigilateNotice(examId, examActivityId, recordId, Long.parseLong(mqDto.getObjId()), examStudentId, MessageTypeEnum.valueOf(String.valueOf(prop.get("type")).toUpperCase()), String.valueOf(prop.get("content")));
+                    tIeExamInvigilateNoticeService.saveOrUpdate(tIeExamInvigilateNotice);
+                }
             }
         } else if (Objects.equals(MqTagEnum.OE_IM_BROADCASTING.name(), tag)) {//广播消息
             JSONArray jsonArray = JSONArray.parseArray(String.valueOf(mqDto.getBody()));
@@ -176,16 +186,19 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
             log.info("examStudentIdentitySet:{}", JacksonUtil.parseJson(examStudentIdentitySet));
         } else if (Objects.equals(MqTagEnum.OE_LIVENESS_VERIFY.name(), tag)) {//监考强制活体验证
             Long recordId = Long.parseLong(String.valueOf(mqDto.getBody()));
-            if (Objects.nonNull(webSocketMap.get(recordId))) {
+            String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
+            if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
                 Long examStudentId = ExamRecordCacheUtil.getExamStudentId(recordId);
                 ExamStudentCacheBean examStudentCacheBean = teExamStudentService.getExamStudentCacheBean(examStudentId);
-                WebSocketOeServer webSocketOeServer = webSocketMap.get(recordId);
-                Map map = new HashMap<>();
-                map.put(SystemConstant.RECORD_ID, recordId);
-                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.INVIGILATE_LIVENESS_VERIFY.name(), map);
-                webSocketOeServer.sendMessage(websocketDto);
-                TEExamStudentLog teExamStudentLog = new TEExamStudentLog(mqDto.getType().name(), mqDto.getType().getCode(), mqDto.getType().getCode(), examStudentCacheBean.getStudentId(), examStudentId, recordId, mqDto.getType().getCode());
-                teExamStudentLogService.save(teExamStudentLog);
+                WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
+                if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == recordId.longValue()) {
+                    Map map = new HashMap<>();
+                    map.put(SystemConstant.RECORD_ID, recordId);
+                    WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.INVIGILATE_LIVENESS_VERIFY.name(), map);
+                    webSocketOeServer.sendMessage(websocketDto);
+                    TEExamStudentLog teExamStudentLog = new TEExamStudentLog(mqDto.getType().name(), mqDto.getType().getCode(), mqDto.getType().getCode(), examStudentCacheBean.getStudentId(), examStudentId, recordId, mqDto.getType().getCode());
+                    teExamStudentLogService.save(teExamStudentLog);
+                }
             }
         }
         mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
@@ -209,7 +222,6 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
     @Override
     public void execMqOeMobileLogic(MqDto mqDto, String key) {
         Gson gson = new Gson();
-        ConcurrentHashMap<String, WebSocketMobileServer> webSocketMap = WebSocketMobileServer.getWebSocketMap();
         String tag = mqDto.getTag();
         if (Objects.equals(MqTagEnum.EXAM_STOP.name(), tag)) {//考试退出
             Long recordId = Long.parseLong(String.valueOf(mqDto.getBody()));

+ 20 - 17
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketMobileServer.java

@@ -58,7 +58,7 @@ public class WebSocketMobileServer implements Concurrently {
     private Session session = null;
     private String sessionId = null, ip = null;
     private Long recordId = null;
-    private String platform = null, deviceId = null, Authorization = null;
+    private String platform = null, deviceId = null, authorization = null;
     private Long time = null;
     private RedisUtil redisUtil;
     private Long updateTime = null;
@@ -85,21 +85,21 @@ public class WebSocketMobileServer implements Concurrently {
             throw new BusinessException(ExceptionResultEnum.DEVICE_ID_INVALID);
         }
         this.deviceId = String.valueOf(mapParameter.get("deviceId").get(0));
-        this.Authorization = String.valueOf(mapParameter.get("Authorization").get(0));
+        this.authorization = String.valueOf(mapParameter.get("authorization").get(0));
         this.time = Long.parseLong(String.valueOf(mapParameter.get("time").get(0)));
         this.recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId").get(0)));
         this.source = MonitorVideoSourceEnum.valueOf(mapParameter.get("source").get(0));
 
         redisUtil = SpringContextHolder.getBean(RedisUtil.class);
-        TBSession tbSession = AuthUtil.websocketAuthInterceptor(Platform.valueOf(platform), deviceId, Authorization, String.valueOf(mapParameter.get("time").get(0)), SystemConstant.GET, url);
+        TBSession tbSession = AuthUtil.websocketAuthInterceptor(Platform.valueOf(platform), deviceId, authorization, String.valueOf(mapParameter.get("time").get(0)), SystemConstant.GET, url);
         this.session = session;
         session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
         this.sessionId = tbSession.getId();
-        if (webSocketMap.containsKey(this.recordId + "-" + this.source.name())) {
-            webSocketMap.remove(this.recordId + "-" + this.source.name());
-            webSocketMap.put(this.recordId + "-" + this.source.name(), this);
+        if (webSocketMap.containsKey(this.session.getId() + "-" + this.source.name())) {
+            webSocketMap.remove(this.session.getId() + "-" + this.source.name());
+            webSocketMap.put(this.session.getId() + "-" + this.source.name(), this);
         } else {
-            webSocketMap.put(this.recordId + "-" + this.source.name(), this);
+            webSocketMap.put(this.session.getId() + "-" + this.source.name(), this);
 //            addOnlineCount();
         }
 //        log.info("用户连接:{},当前在线人数为:{}", this.sessionId, getOnlineCount());
@@ -117,20 +117,23 @@ public class WebSocketMobileServer implements Concurrently {
     @OnClose
     public void onClose() {
         log.info("onClose is come in");
-        if (webSocketMap.containsKey(this.recordId + "-" + this.source.name())) {
-            webSocketMap.remove(this.recordId + "-" + this.source.name());
+        if (webSocketMap.containsKey(this.session.getId() + "-" + this.source.name())) {
+            webSocketMap.remove(this.session.getId() + "-" + this.source.name());
             //从set中删除
 //            subOnlineCount();
             //判断是否是正常退出
             ExamRecordCacheUtil.setMonitorStatus(recordId, this.source, MonitorStatusSourceEnum.STOP, true);
-            ConcurrentHashMap<Long, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
-            if (Objects.nonNull(webSocketMap.get(recordId))) {
-                WebSocketOeServer webSocketOeServer = webSocketMap.get(recordId);
-                Map map = new HashMap<>();
-                map.put(SystemConstant.RECORD_ID, recordId);
-                map.put("monitorVideoSource", this.source.name());
-                WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_MONITOR_STOP.name(), map);
-                webSocketOeServer.sendMessage(websocketDto);
+            ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = WebSocketOeServer.getWebSocketMap();
+            String clientWebsocketId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
+            if (Objects.nonNull(webSocketMap.get(clientWebsocketId))) {
+                WebSocketOeServer webSocketOeServer = webSocketMap.get(clientWebsocketId);
+                if (Objects.nonNull(webSocketOeServer.getRecordId()) && webSocketOeServer.getRecordId().longValue() == recordId.longValue()) {
+                    Map map = new HashMap<>();
+                    map.put(SystemConstant.RECORD_ID, recordId);
+                    map.put("monitorVideoSource", this.source.name());
+                    WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.MOBILE_MONITOR_STOP.name(), map);
+                    webSocketOeServer.sendMessage(websocketDto);
+                }
             }
         }
 //        log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.sessionId, getOnlineCount(), this.updateTime);

+ 11 - 11
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java

@@ -54,14 +54,14 @@ import java.util.concurrent.ConcurrentHashMap;
 @Component
 public class WebSocketOeServer implements Concurrently {
     private final static Logger log = LoggerFactory.getLogger(WebSocketOeServer.class);
-    private volatile static ConcurrentHashMap<Long, WebSocketOeServer> webSocketMap = new ConcurrentHashMap<>();
+    private volatile static ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = new ConcurrentHashMap<>();
     /**
      * 与某个客户端的连接会话,需要通过它来给客户端发送数据
      */
     private Session session = null;
     private String sessionId = null, ip = null;
     private Long recordId = null;
-    private String platform = null, deviceId = null, Authorization = null;
+    private String platform = null, deviceId = null, authorization = null;
     private Long time = null;
     private RedisUtil redisUtil;
     private Long updateTime = null;
@@ -87,20 +87,20 @@ public class WebSocketOeServer implements Concurrently {
             throw new BusinessException(ExceptionResultEnum.DEVICE_ID_INVALID);
         }
         this.deviceId = String.valueOf(mapParameter.get("deviceId").get(0));
-        this.Authorization = String.valueOf(mapParameter.get("Authorization").get(0));
+        this.authorization = String.valueOf(mapParameter.get("authorization").get(0));
         this.time = Long.parseLong(String.valueOf(mapParameter.get("time").get(0)));
         this.recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId").get(0)));
         redisUtil = SpringContextHolder.getBean(RedisUtil.class);
 
-        TBSession tbSession = AuthUtil.websocketAuthInterceptor(Platform.valueOf(platform), deviceId, Authorization, String.valueOf(mapParameter.get("time").get(0)), SystemConstant.GET, url);
+        TBSession tbSession = AuthUtil.websocketAuthInterceptor(Platform.valueOf(platform), deviceId, authorization, String.valueOf(mapParameter.get("time").get(0)), SystemConstant.GET, url);
         this.session = session;
         session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
         this.sessionId = tbSession.getId();
-        if (webSocketMap.containsKey(this.recordId)) {
-            webSocketMap.remove(this.recordId);
-            webSocketMap.put(this.recordId, this);
+        if (webSocketMap.containsKey(this.session.getId())) {
+            webSocketMap.remove(this.session.getId());
+            webSocketMap.put(this.session.getId(), this);
         } else {
-            webSocketMap.put(this.recordId, this);
+            webSocketMap.put(this.session.getId(), this);
             addOnlineCount();
         }
         log.info("用户连接:{},当前在线人数为:{}", this.sessionId, getOnlineCount());
@@ -118,8 +118,8 @@ public class WebSocketOeServer implements Concurrently {
     @OnClose
     public void onClose() {
         log.info("onClose is come in");
-        if (webSocketMap.containsKey(this.recordId)) {
-            webSocketMap.remove(this.recordId);
+        if (webSocketMap.containsKey(this.session.getId())) {
+            webSocketMap.remove(this.session.getId());
             //从set中删除
             subOnlineCount();
             //判断是否是正常退出
@@ -301,7 +301,7 @@ public class WebSocketOeServer implements Concurrently {
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
     }
 
-    public static ConcurrentHashMap<Long, WebSocketOeServer> getWebSocketMap() {
+    public static ConcurrentHashMap<String, WebSocketOeServer> getWebSocketMap() {
         return webSocketMap;
     }