浏览代码

修复exam模块websocket在客户端重新创建连接后,上一个未关闭连接断开后错误触发了断点判断的bug

luoshi 3 年之前
父节点
当前提交
fe5a6db1c1

文件差异内容过多而无法显示
+ 325 - 132
themis-business/src/main/java/com/qmth/themis/business/cache/ExamRecordCacheUtil.java


+ 50 - 11
themis-business/src/main/java/com/qmth/themis/business/util/RedisUtil.java

@@ -1,6 +1,7 @@
 package com.qmth.themis.business.util;
 
 import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.common.exception.BusinessException;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.support.atomic.RedisAtomicInteger;
 import org.springframework.stereotype.Component;
@@ -130,6 +131,22 @@ public class RedisUtil {
         return redisTemplate.opsForHash().entries(key);
     }
 
+    /**
+     * 分布式锁,尝试上锁一定次数,失败后抛出异常
+     *
+     * @param key
+     * @param timeout
+     * @return
+     */
+    public void waitLock(String key, long timeout) {
+        for (int i = 0; i < SystemConstant.MAX_EXAM_STATUS_COUNT; i++) {
+            if (lock(key, timeout)) {
+                return;
+            }
+        }
+        throw new BusinessException("重复尝试上锁失败:" + key);
+    }
+
     /**
      * 分布式锁
      *
@@ -152,6 +169,26 @@ public class RedisUtil {
         redisTemplate.expire(key, 100, TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * 累加操作
+     *
+     * @param key
+     * @return
+     */
+    public Long increment(String key) {
+        return redisTemplate.opsForValue().increment(key);
+    }
+
+    /**
+     * 累减操作
+     *
+     * @param key
+     * @return
+     */
+    public Long decrement(String key) {
+        return redisTemplate.opsForValue().decrement(key);
+    }
+
     /**
      * 设置缓存
      *
@@ -235,15 +272,15 @@ public class RedisUtil {
         redisTemplate.expire(key, timeOut, timeUnit);
     }
 
-//    /**
-//     * 获取redis序列
-//     *
-//     * @return
-//     */
-//    public Integer getRedisSequence() {
-//        RedisAtomicInteger entityIdCounter = new RedisAtomicInteger(SystemConstant.REDIS_MONITOR_SEQUENCE, redisTemplate.getConnectionFactory());
-//        return entityIdCounter.incrementAndGet();
-//    }
+    //    /**
+    //     * 获取redis序列
+    //     *
+    //     * @return
+    //     */
+    //    public Integer getRedisSequence() {
+    //        RedisAtomicInteger entityIdCounter = new RedisAtomicInteger(SystemConstant.REDIS_MONITOR_SEQUENCE, redisTemplate.getConnectionFactory());
+    //        return entityIdCounter.incrementAndGet();
+    //    }
 
     /**
      * 获取redis activity code序列
@@ -252,7 +289,8 @@ public class RedisUtil {
      * @return
      */
     public Integer getRedisActivityCodeSequence(Long key) {
-        RedisAtomicInteger entityIdCounter = new RedisAtomicInteger(SystemConstant.REDIS_ACTIVITY_CODE_SEQUENCE + key, redisTemplate.getConnectionFactory());
+        RedisAtomicInteger entityIdCounter = new RedisAtomicInteger(SystemConstant.REDIS_ACTIVITY_CODE_SEQUENCE + key,
+                redisTemplate.getConnectionFactory());
         return entityIdCounter.incrementAndGet();
     }
 
@@ -264,7 +302,8 @@ public class RedisUtil {
      * @return
      */
     public Integer setRedisActivityCodeSequence(Long key, int initialValue) {
-        RedisAtomicInteger entityIdCounter = new RedisAtomicInteger(SystemConstant.REDIS_ACTIVITY_CODE_SEQUENCE + key, redisTemplate.getConnectionFactory(), initialValue);
+        RedisAtomicInteger entityIdCounter = new RedisAtomicInteger(SystemConstant.REDIS_ACTIVITY_CODE_SEQUENCE + key,
+                redisTemplate.getConnectionFactory(), initialValue);
         return entityIdCounter.get();
     }
 

+ 9 - 4
themis-business/src/main/java/com/qmth/themis/business/util/WebsocketUtil.java

@@ -21,6 +21,7 @@ import java.util.Objects;
  * @Date: 2020/7/27
  */
 public class WebsocketUtil {
+
     private final static Logger log = LoggerFactory.getLogger(WebsocketUtil.class);
 
     public static Object getFieldInstance(Object obj, String fieldPath) {
@@ -56,7 +57,8 @@ public class WebsocketUtil {
      * @param websocketSessionId
      * @param websocketStatusEnum
      */
-    public static void updateExamRecordWebsocketStatus(Long recordId, String ip, String websocketSessionId, WebsocketStatusEnum websocketStatusEnum) {
+    public static void updateExamRecordWebsocketStatus(Long recordId, String ip, String websocketSessionId,
+            WebsocketStatusEnum websocketStatusEnum) {
         Long timestamp = System.currentTimeMillis();
         ExamRecordCacheUtil.setClientWebsocketStatus(recordId, websocketStatusEnum, timestamp);
         ExamRecordCacheUtil.setClientCurrentIp(recordId, ip);
@@ -74,7 +76,8 @@ public class WebsocketUtil {
      * @param websocketSessionId
      * @param websocketStatusEnum
      */
-    public static void updateExamRecordMobileFirstWebsocketStatus(Long recordId, String websocketSessionId, WebsocketStatusEnum websocketStatusEnum) {
+    public static void updateExamRecordMobileFirstWebsocketStatus(Long recordId, String websocketSessionId,
+            WebsocketStatusEnum websocketStatusEnum) {
         Long timestamp = System.currentTimeMillis();
         ExamRecordCacheUtil.setMobileFirstWebsocketStatus(recordId, websocketStatusEnum, timestamp);
         ExamRecordCacheUtil.setMobileFirstWebsocketId(recordId, websocketSessionId);
@@ -89,7 +92,8 @@ public class WebsocketUtil {
      * @param websocketSessionId
      * @param websocketStatusEnum
      */
-    public static void updateExamRecordMobileSecondWebsocketStatus(Long recordId, String websocketSessionId, WebsocketStatusEnum websocketStatusEnum) {
+    public static void updateExamRecordMobileSecondWebsocketStatus(Long recordId, String websocketSessionId,
+            WebsocketStatusEnum websocketStatusEnum) {
         Long timestamp = System.currentTimeMillis();
         ExamRecordCacheUtil.setMobileSecondWebsocketStatus(recordId, websocketStatusEnum, timestamp);
         ExamRecordCacheUtil.setMobileSecondWebsocketId(recordId, websocketSessionId);
@@ -107,7 +111,8 @@ public class WebsocketUtil {
      * @param updateTime
      * @return
      */
-    public static Map<String, Object> initWebsocket(Long recordId, Long userId, String deviceId, String ip, Long updateTime) {
+    public static Map<String, Object> initWebsocket(Long recordId, Long userId, String deviceId, String ip,
+            Long updateTime) {
         Map<String, Object> tranMap = new HashMap<>();
         if (Objects.nonNull(recordId)) {
             tranMap.put(SystemConstant.RECORD_ID, recordId);

+ 81 - 69
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketMobileServer.java

@@ -48,16 +48,24 @@ import java.util.concurrent.ConcurrentHashMap;
 @ServerEndpoint(value = "/ws/mobile", configurator = WebSocketMobileHandshakeInterceptor.class)
 @Component
 public class WebSocketMobileServer implements Concurrently {
+
     private final static Logger log = LoggerFactory.getLogger(WebSocketMobileServer.class);
+
     private volatile static ConcurrentHashMap<String, WebSocketMobileServer> webSocketMap = new ConcurrentHashMap<>();
+
     /**
      * 与某个客户端的连接会话,需要通过它来给客户端发送数据
      */
     private Session session = null;
+
     private String sessionId = null, ip = null, deviceId = null, websocketSessionId = null;
+
     private Long recordId = null, updateTime = null;
+
     private RedisUtil redisUtil;
+
     private Map<String, Object> tranMap = null;
+
     private MonitorVideoSourceEnum source = null;
 
     /**
@@ -74,20 +82,30 @@ public class WebSocketMobileServer implements Concurrently {
         session.setMaxIdleTimeout(SystemConstant.WEBSOCKET_MAX_TIME_OUT);
         this.sessionId = tbSession.getId();
         this.websocketSessionId = String.valueOf(UidUtil.nextId());
-        if (webSocketMap.containsKey(this.websocketSessionId + "-" + this.source.name())) {
+        if (webSocketMap.containsKey(this.websocketSessionId)) {
             throw new BusinessException(ExceptionResultEnum.REPEAT_CONNECT_ERROR);
         } else {
-            webSocketMap.put(this.websocketSessionId + "-" + this.source.name(), this);
+            webSocketMap.put(this.websocketSessionId, this);
             addOnlineCount();
         }
-        log.info("用户连接:{},当前在线人数为:{}", this.websocketSessionId, getOnlineCount());
-        InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
+        log.info("考试记录:{},WS连接创建:{},在线人数:{}", this.recordId, this.websocketSessionId, getOnlineCount());
+        InetSocketAddress addr = (InetSocketAddress) WebsocketUtil
+                .getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
         this.ip = addr.toString().replace("/", "").split(":")[0];
         log.info("ip[:{}]连接成功", this.ip);
-        if (this.source == MonitorVideoSourceEnum.MOBILE_FIRST) {
-            WebsocketUtil.updateExamRecordMobileFirstWebsocketStatus(this.recordId, this.websocketSessionId, WebsocketStatusEnum.ON_LINE);
-        } else if (this.source == MonitorVideoSourceEnum.MOBILE_SECOND) {
-            WebsocketUtil.updateExamRecordMobileSecondWebsocketStatus(this.recordId, this.websocketSessionId, WebsocketStatusEnum.ON_LINE);
+        //修改record相关状态需要强制上锁
+        String lockKey = SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + ":" + this.source.name() + ":" + recordId;
+        redisUtil.waitLock(lockKey, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT);
+        try {
+            if (this.source == MonitorVideoSourceEnum.MOBILE_FIRST) {
+                WebsocketUtil.updateExamRecordMobileFirstWebsocketStatus(this.recordId, this.websocketSessionId,
+                        WebsocketStatusEnum.ON_LINE);
+            } else if (this.source == MonitorVideoSourceEnum.MOBILE_SECOND) {
+                WebsocketUtil.updateExamRecordMobileSecondWebsocketStatus(this.recordId, this.websocketSessionId,
+                        WebsocketStatusEnum.ON_LINE);
+            }
+        } finally {
+            redisUtil.releaseLock(lockKey);
         }
         this.updateTime = System.currentTimeMillis();
         tranMap = WebsocketUtil.initWebsocket(recordId, null, deviceId, ip, updateTime);
@@ -99,30 +117,42 @@ public class WebSocketMobileServer implements Concurrently {
     @OnClose
     public void onClose() {
         log.info("onClose is come in");
-        if (webSocketMap.containsKey(this.websocketSessionId + "-" + this.source.name())) {
-            webSocketMap.remove(this.websocketSessionId + "-" + this.source.name());
+        if (webSocketMap.containsKey(this.websocketSessionId)) {
+            webSocketMap.remove(this.websocketSessionId);
             //从set中删除
             subOnlineCount();
-            //判断是否是正常退出
-            Long timestamp = System.currentTimeMillis();
-            ExamRecordCacheUtil.setMonitorStatus(this.recordId, this.source, MonitorStatusSourceEnum.STOP, timestamp);
-            if (this.source == MonitorVideoSourceEnum.MOBILE_FIRST) {
-                ExamRecordCacheUtil.setMobileFirstWebsocketStatus(this.recordId, WebsocketStatusEnum.OFF_LINE, timestamp);
-            } else if (this.source == MonitorVideoSourceEnum.MOBILE_SECOND) {
-                ExamRecordCacheUtil.setMobileSecondWebsocketStatus(this.recordId, WebsocketStatusEnum.OFF_LINE, timestamp);
+            //修改record相关状态需要强制上锁
+            String lockKey = SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + ":" + this.source.name() + ":" + recordId;
+            redisUtil.waitLock(lockKey, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT);
+            try {
+                //判断是否是正常退出
+                long timestamp = System.currentTimeMillis();
+                //比较考试记录最新websocketId是否与当前websocketId相同
+                String currentId = ExamRecordCacheUtil.getMobileWebsocketId(recordId, this.source);
+                if (this.websocketSessionId != null && this.websocketSessionId.equals(currentId)) {
+                    ExamRecordCacheUtil
+                            .setMonitorStatus(this.recordId, this.source, MonitorStatusSourceEnum.STOP, timestamp);
+                    ExamRecordCacheUtil
+                            .setMobileWebsocketStatus(this.recordId, this.source, WebsocketStatusEnum.OFF_LINE,
+                                    timestamp);
+                    TOeExamRecordService tOeExamRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
+                    tOeExamRecordService.sendExamRecordDataSaveMq(this.recordId, timestamp);
+                    MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
+                    MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
+                    Map mqMap = new HashMap<>();
+                    mqMap.put(SystemConstant.RECORD_ID, this.recordId);
+                    mqMap.put("source", this.source.name());
+                    //监控结束
+                    MqDto mqDtoStop = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.MONITOR_STOP.name(),
+                            recordId, MqTagEnum.MONITOR_STOP, String.valueOf(recordId), mqMap,
+                            String.valueOf(recordId));
+                    mqDtoService.assembleSendOneOrderMsg(mqDtoStop);
+                }
+            } finally {
+                redisUtil.releaseLock(lockKey);
             }
-            TOeExamRecordService tOeExamRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
-            tOeExamRecordService.sendExamRecordDataSaveMq(this.recordId, timestamp);
-            MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
-            MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
-            Map mqMap = new HashMap<>();
-            mqMap.put(SystemConstant.RECORD_ID, this.recordId);
-            mqMap.put("source", this.source.name());
-            //监控结束
-            MqDto mqDtoStop = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.MONITOR_STOP.name(), recordId, MqTagEnum.MONITOR_STOP, String.valueOf(recordId), mqMap, String.valueOf(recordId));
-            mqDtoService.assembleSendOneOrderMsg(mqDtoStop);
         }
-        log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.sessionId, getOnlineCount(), this.updateTime);
+        log.info("考试记录:{},WS连接关闭:{},在线人数:{}", this.recordId, this.websocketSessionId, getOnlineCount());
     }
 
     /**
@@ -141,7 +171,8 @@ public class WebSocketMobileServer implements Concurrently {
                 JSONObject jsonObject = JSONObject.parseObject(message);
                 log.info("onMessage:{}", jsonObject.toJSONString());
                 if (Objects.nonNull(jsonObject)) {
-                    WebSocketMobileMessageTemplete webSocketMobileMessageTemplete = SpringContextHolder.getBean(WebSocketMobileMessageTemplete.class);
+                    WebSocketMobileMessageTemplete webSocketMobileMessageTemplete = SpringContextHolder
+                            .getBean(WebSocketMobileMessageTemplete.class);
                     TOeExamRecordService tOeExamRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
                     MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
                     MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
@@ -156,17 +187,23 @@ public class WebSocketMobileServer implements Concurrently {
                         TOeExamRecord tOeExamRecord = tOeExamRecordService.getById(this.recordId);
                         statusEnum = tOeExamRecord.getStatus();
                     }
-                    if (Objects.nonNull(statusEnum) && (statusEnum == ExamRecordStatusEnum.FINISHED ||
-                            statusEnum == ExamRecordStatusEnum.PERSISTED)) {
+                    if (Objects.nonNull(statusEnum) && (statusEnum == ExamRecordStatusEnum.FINISHED
+                            || statusEnum == ExamRecordStatusEnum.PERSISTED)) {
                         Map<String, Object> properties = new HashMap<>();
                         properties.put(SystemConstant.REMOVE_WEBSOCKET, true);
-                        MqDto mobileMqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_STOP.name(), recordId, MqTagEnum.EXAM_STOP, String.valueOf(recordId), properties, String.valueOf(recordId));
+                        MqDto mobileMqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_STOP.name(),
+                                recordId, MqTagEnum.EXAM_STOP, String.valueOf(recordId), properties,
+                                String.valueOf(recordId));
                         mqDtoService.assembleSendOneOrderMsg(mobileMqDto);
                     }
-                    WebsocketTypeEnum websocketTypeEnum = WebsocketTypeEnum.valueOf(websocketDto.getType().toUpperCase());
+                    WebsocketTypeEnum websocketTypeEnum = WebsocketTypeEnum
+                            .valueOf(websocketDto.getType().toUpperCase());
                     if (Objects.nonNull(websocketTypeEnum) && websocketTypeEnum == WebsocketTypeEnum.SYNC_STATUS) {
-                        Method method = webSocketMobileMessageTemplete.getClass().getDeclaredMethod(websocketTypeEnum.getDesc(), String.class, Long.class);
-                        WebsocketDto result = (WebsocketDto) method.invoke(webSocketMobileMessageTemplete, String.valueOf(websocketDto.getBody()), websocketDto.getTime());
+                        Method method = webSocketMobileMessageTemplete.getClass()
+                                .getDeclaredMethod(websocketTypeEnum.getDesc(), String.class, Long.class);
+                        WebsocketDto result = (WebsocketDto) method
+                                .invoke(webSocketMobileMessageTemplete, String.valueOf(websocketDto.getBody()),
+                                        websocketDto.getTime());
                         this.sendMessage(result);
                     }
                 }
@@ -184,7 +221,7 @@ public class WebSocketMobileServer implements Concurrently {
      */
     @OnError
     public void onError(Session session, Throwable error) throws IOException {
-        log.error("用户错误:{},原因:{}", this.websocketSessionId, error.getMessage());
+        log.error("考试记录:{},WS连接错误:{},原因:{}", this.recordId, this.websocketSessionId, error.getMessage());
         close(this);
         throw new BusinessException(error.getMessage());
     }
@@ -215,42 +252,14 @@ public class WebSocketMobileServer implements Concurrently {
      * 在线人数加一
      */
     public synchronized void addOnlineCount() {
-        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
-            try {
-                Object o = redisUtil.get(SystemConstant.WEBSOCKET_MOBILE_ONLINE_COUNT);
-                int count = 0;
-                if (Objects.nonNull(o)) {
-                    count = (int) o;
-                }
-                count++;
-                redisUtil.set(SystemConstant.WEBSOCKET_MOBILE_ONLINE_COUNT, count);
-            } finally {
-                if (Objects.nonNull(this.websocketSessionId)) {
-                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId);
-                }
-            }
-        }
+        redisUtil.increment(SystemConstant.WEBSOCKET_MOBILE_ONLINE_COUNT);
     }
 
     /**
      * 在线人数减一
      */
     public synchronized void subOnlineCount() {
-        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
-            try {
-                Object o = redisUtil.get(SystemConstant.WEBSOCKET_MOBILE_ONLINE_COUNT);
-                int count = 0;
-                if (Objects.nonNull(o)) {
-                    count = (int) o;
-                }
-                count--;
-                redisUtil.set(SystemConstant.WEBSOCKET_MOBILE_ONLINE_COUNT, count < 0 ? 0 : count);
-            } finally {
-                if (Objects.nonNull(this.websocketSessionId)) {
-                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId);
-                }
-            }
-        }
+        redisUtil.decrement(SystemConstant.WEBSOCKET_MOBILE_ONLINE_COUNT);
     }
 
     public static ConcurrentHashMap<String, WebSocketMobileServer> getWebSocketMap() {
@@ -264,8 +273,10 @@ public class WebSocketMobileServer implements Concurrently {
      */
     public static void close(WebSocketMobileServer webSocketMobileServer) throws IOException {
         //判断当前连接是否还在线
-        if (Objects.nonNull(webSocketMobileServer) && Objects.nonNull(webSocketMobileServer.session) && webSocketMobileServer.session.isOpen()) {
-            webSocketMobileServer.session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, SystemConstant.WEBSOCKET_CLOSE));
+        if (Objects.nonNull(webSocketMobileServer) && Objects.nonNull(webSocketMobileServer.session)
+                && webSocketMobileServer.session.isOpen()) {
+            webSocketMobileServer.session
+                    .close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, SystemConstant.WEBSOCKET_CLOSE));
         }
     }
 
@@ -274,7 +285,8 @@ public class WebSocketMobileServer implements Concurrently {
     }
 
     @Override
-    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+            ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         MqOeLogicService mqOeLogicService = SpringContextHolder.getBean(MqOeLogicService.class);
         try {
             long threadId = Thread.currentThread().getId();

+ 83 - 70
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java

@@ -51,15 +51,22 @@ import java.util.concurrent.ConcurrentHashMap;
 @ServerEndpoint(value = "/ws/oe", configurator = WebSocketOeHandshakeInterceptor.class)
 @Component
 public class WebSocketOeServer implements Concurrently {
+
     private final static Logger log = LoggerFactory.getLogger(WebSocketOeServer.class);
+
     private volatile static ConcurrentHashMap<String, WebSocketOeServer> webSocketMap = new ConcurrentHashMap<>();
+
     /**
      * 与某个客户端的连接会话,需要通过它来给客户端发送数据
      */
     private Session session = null;
+
     private String sessionId = null, ip = null, deviceId = null, websocketSessionId = null;
+
     private Long recordId = null, updateTime = null;
+
     private RedisUtil redisUtil;
+
     private Map<String, Object> tranMap = null;
 
     /**
@@ -81,20 +88,32 @@ public class WebSocketOeServer implements Concurrently {
             webSocketMap.put(this.websocketSessionId, this);
             addOnlineCount();
         }
-        log.info("用户连接:{},当前在线人数为:{}", this.websocketSessionId, getOnlineCount());
-        InetSocketAddress addr = (InetSocketAddress) WebsocketUtil.getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
+        log.info("考试记录:{},WS连接创建:{},在线人数:{}", this.recordId, this.websocketSessionId, getOnlineCount());
+        InetSocketAddress addr = (InetSocketAddress) WebsocketUtil
+                .getFieldInstance(this.session.getAsyncRemote(), "base#socketWrapper#socket#sc#remoteAddress");
         this.ip = addr.toString().replace("/", "").split(":")[0];
         log.info("ip[:{}]连接成功", this.ip);
-        WebsocketUtil.updateExamRecordWebsocketStatus(this.recordId, this.ip, this.websocketSessionId, WebsocketStatusEnum.ON_LINE);
+        //修改record相关状态需要强制上锁
+        String lockKey = SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + ":client:" + recordId;
+        redisUtil.waitLock(lockKey, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT);
+        try {
+            WebsocketUtil.updateExamRecordWebsocketStatus(this.recordId, this.ip, this.websocketSessionId,
+                    WebsocketStatusEnum.ON_LINE);
+        } finally {
+            redisUtil.releaseLock(lockKey);
+        }
         this.updateTime = System.currentTimeMillis();
         tranMap = WebsocketUtil.initWebsocket(this.recordId, null, this.deviceId, this.ip, this.updateTime);
         MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
         MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
         Map mqMap = new HashMap<>();
         mqMap.put(SystemConstant.RECORD_ID, this.recordId);
-//        mqMap.put(MonitorVideoSourceEnum.MOBILE_FIRST.name().toLowerCase(), ExamRecordCacheUtil.getMonitorStatus(this.recordId, MonitorVideoSourceEnum.MOBILE_FIRST));
-//        mqMap.put(MonitorVideoSourceEnum.MOBILE_SECOND.name().toLowerCase(), ExamRecordCacheUtil.getMonitorStatus(this.recordId, MonitorVideoSourceEnum.MOBILE_SECOND));
-        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_WEBSOCKET_MOBILE_MONITOR_STATUS.name(), recordId, MqTagEnum.OE_WEBSOCKET_MOBILE_MONITOR_STATUS, String.valueOf(recordId), mqMap, String.valueOf(recordId));
+        //        mqMap.put(MonitorVideoSourceEnum.MOBILE_FIRST.name().toLowerCase(), ExamRecordCacheUtil.getMonitorStatus(this.recordId, MonitorVideoSourceEnum.MOBILE_FIRST));
+        //        mqMap.put(MonitorVideoSourceEnum.MOBILE_SECOND.name().toLowerCase(), ExamRecordCacheUtil.getMonitorStatus(this.recordId, MonitorVideoSourceEnum.MOBILE_SECOND));
+        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(),
+                MqTagEnum.OE_WEBSOCKET_MOBILE_MONITOR_STATUS.name(), recordId,
+                MqTagEnum.OE_WEBSOCKET_MOBILE_MONITOR_STATUS, String.valueOf(recordId), mqMap,
+                String.valueOf(recordId));
         mqDtoService.assembleSendOneOrderMsg(mqDto);
     }
 
@@ -108,32 +127,47 @@ public class WebSocketOeServer implements Concurrently {
             webSocketMap.remove(this.websocketSessionId);
             //从set中删除
             subOnlineCount();
-            //判断是否是正常退出
-            Long timestamp = System.currentTimeMillis();
-            TOeExamRecordService tOeExamRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
-            ExamRecordCacheUtil.setClientWebsocketStatus(this.recordId, WebsocketStatusEnum.OFF_LINE, timestamp);
-            tOeExamRecordService.sendExamRecordDataSaveMq(this.recordId, timestamp);
-            ExamRecordStatusEnum status = ExamRecordCacheUtil.getStatus(this.recordId);
-            if (Objects.equals(status, ExamRecordStatusEnum.ANSWERING)) {
-                //大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
-                if ((System.currentTimeMillis() - this.updateTime) / 1000 >= SystemConstant.WEBSOCKET_MAX_TIME_OUT / 1000) {
-                    log.info("超时退出");
-                    //发送延时mq消息start
-                    MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
-                    tranMap = mqDtoService.buildMqDelayMsg("2m");
-                    tranMap.put(SystemConstant.RECORD_ID, this.recordId);
-                    MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
-                    MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_UN_NORMAL.name(), MqTagEnum.OE_UN_NORMAL, MqTagEnum.OE_UN_NORMAL, String.valueOf(this.recordId), this.tranMap, this.websocketSessionId);
-                    mqDtoService.assembleSendAsyncDelayMsg(mqDto);
-                    //发送延时mq消息end
-                } else {
-                    log.info("正常退出");
-//                    ExamConstant.sendExamStopMsg(this.recordId, true, false);
-                    tOeExamRecordService.examBreakLogic(this.recordId, true);
+            //修改record相关状态需要强制上锁
+            String lockKey = SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + ":client:" + recordId;
+            redisUtil.waitLock(lockKey, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT);
+            try {
+                //比较考试记录最新websocketId是否与当前websocketId相同
+                String currentId = ExamRecordCacheUtil.getClientWebsocketId(recordId);
+                if (this.websocketSessionId != null && this.websocketSessionId.equals(currentId)) {
+                    long timestamp = System.currentTimeMillis();
+                    WebsocketUtil.updateExamRecordWebsocketStatus(this.recordId, this.ip, this.websocketSessionId,
+                            WebsocketStatusEnum.ON_LINE);
+                    //判断是否是正常退出
+                    TOeExamRecordService tOeExamRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
+                    tOeExamRecordService.sendExamRecordDataSaveMq(this.recordId, timestamp);
+                    ExamRecordStatusEnum status = ExamRecordCacheUtil.getStatus(this.recordId);
+                    if (Objects.equals(status, ExamRecordStatusEnum.ANSWERING)) {
+                        //大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
+                        if ((System.currentTimeMillis() - this.updateTime) / 1000
+                                >= SystemConstant.WEBSOCKET_MAX_TIME_OUT / 1000) {
+                            log.info("超时退出");
+                            //发送延时mq消息start
+                            MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
+                            tranMap = mqDtoService.buildMqDelayMsg("2m");
+                            tranMap.put(SystemConstant.RECORD_ID, this.recordId);
+                            MqUtil mqUtil = SpringContextHolder.getBean(MqUtil.class);
+                            MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_UN_NORMAL.name(),
+                                    MqTagEnum.OE_UN_NORMAL, MqTagEnum.OE_UN_NORMAL, String.valueOf(this.recordId),
+                                    this.tranMap, this.websocketSessionId);
+                            mqDtoService.assembleSendAsyncDelayMsg(mqDto);
+                            //发送延时mq消息end
+                        } else {
+                            log.info("正常退出");
+                            //                    ExamConstant.sendExamStopMsg(this.recordId, true, false);
+                            tOeExamRecordService.examBreakLogic(this.recordId, true);
+                        }
+                    }
                 }
+            } finally {
+                redisUtil.releaseLock(lockKey);
             }
         }
-        log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.websocketSessionId, getOnlineCount(), this.updateTime);
+        log.info("考试记录:{},WS连接关闭:{},在线人数:{}", this.recordId, this.websocketSessionId, getOnlineCount());
     }
 
     /**
@@ -152,16 +186,21 @@ public class WebSocketOeServer implements Concurrently {
                 JSONObject jsonObject = JSONObject.parseObject(message);
                 log.info("onMessage:{}", jsonObject.toJSONString());
                 if (Objects.nonNull(jsonObject)) {
-                    WebSocketOeMessageTemplete webSocketOeMessageTemplete = SpringContextHolder.getBean(WebSocketOeMessageTemplete.class);
+                    WebSocketOeMessageTemplete webSocketOeMessageTemplete = SpringContextHolder
+                            .getBean(WebSocketOeMessageTemplete.class);
                     Gson gson = new Gson();
                     WebsocketDto websocketDto = gson.fromJson(gson.toJson(jsonObject), WebsocketDto.class);
                     jsonObject.getJSONObject("body").put(SystemConstant.RECORD_ID, this.recordId);
                     websocketDto.setBody(jsonObject.getJSONObject("body"));
-                    WebsocketTypeEnum websocketTypeEnum = WebsocketTypeEnum.valueOf(websocketDto.getType().toUpperCase());
-                    if (Objects.nonNull(websocketTypeEnum) && (websocketTypeEnum == WebsocketTypeEnum.SYNC_STATUS ||
-                            websocketTypeEnum == WebsocketTypeEnum.INVIGILATE_NOTICE_ACK)) {
-                        Method method = webSocketOeMessageTemplete.getClass().getDeclaredMethod(websocketTypeEnum.getDesc(), String.class, Long.class);
-                        WebsocketDto result = (WebsocketDto) method.invoke(webSocketOeMessageTemplete, String.valueOf(websocketDto.getBody()), websocketDto.getTime());
+                    WebsocketTypeEnum websocketTypeEnum = WebsocketTypeEnum
+                            .valueOf(websocketDto.getType().toUpperCase());
+                    if (Objects.nonNull(websocketTypeEnum) && (websocketTypeEnum == WebsocketTypeEnum.SYNC_STATUS
+                            || websocketTypeEnum == WebsocketTypeEnum.INVIGILATE_NOTICE_ACK)) {
+                        Method method = webSocketOeMessageTemplete.getClass()
+                                .getDeclaredMethod(websocketTypeEnum.getDesc(), String.class, Long.class);
+                        WebsocketDto result = (WebsocketDto) method
+                                .invoke(webSocketOeMessageTemplete, String.valueOf(websocketDto.getBody()),
+                                        websocketDto.getTime());
                         this.sendMessage(result);
                     }
                 }
@@ -182,7 +221,7 @@ public class WebSocketOeServer implements Concurrently {
      */
     @OnError
     public void onError(Session session, Throwable error) throws IOException {
-        log.error("用户错误:{},原因:{}", this.websocketSessionId, error.getMessage());
+        log.error("考试记录{},WS连接错误:{},原因:{}", this.recordId, this.websocketSessionId, error.getMessage());
         close(this);
         throw new BusinessException(error.getMessage());
     }
@@ -213,47 +252,19 @@ public class WebSocketOeServer implements Concurrently {
      * 在线人数加一
      */
     public synchronized void addOnlineCount() {
-        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
-            try {
-                Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
-                int count = 0;
-                if (Objects.nonNull(o)) {
-                    count = (int) o;
-                }
-                count++;
-                redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count);
-            } finally {
-                if (Objects.nonNull(this.websocketSessionId)) {
-                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId);
-                }
-            }
-        }
+        redisUtil.increment(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
     }
 
     /**
      * 在线人数减一
      */
     public synchronized void subOnlineCount() {
-        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
-            try {
-                Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
-                int count = 0;
-                if (Objects.nonNull(o)) {
-                    count = (int) o;
-                }
-                count--;
-                redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count < 0 ? 0 : count);
-            } finally {
-                if (Objects.nonNull(this.websocketSessionId)) {
-                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.websocketSessionId);
-                }
-            }
-        }
+        redisUtil.decrement(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
     }
 
     @Override
-    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext
-            consumeConcurrentlyContext) {
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+            ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         MqOeLogicService mqOeLogicService = SpringContextHolder.getBean(MqOeLogicService.class);
         try {
             long threadId = Thread.currentThread().getId();
@@ -283,8 +294,10 @@ public class WebSocketOeServer implements Concurrently {
      */
     public static void close(WebSocketOeServer webSocketOeServer) throws IOException {
         //判断当前连接是否还在线
-        if (Objects.nonNull(webSocketOeServer) && Objects.nonNull(webSocketOeServer.session) && webSocketOeServer.session.isOpen()) {
-            webSocketOeServer.session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, SystemConstant.WEBSOCKET_CLOSE));
+        if (Objects.nonNull(webSocketOeServer) && Objects.nonNull(webSocketOeServer.session)
+                && webSocketOeServer.session.isOpen()) {
+            webSocketOeServer.session
+                    .close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, SystemConstant.WEBSOCKET_CLOSE));
         }
     }
 

部分文件因为文件数量过多而无法显示