浏览代码

websocket消息逻辑整理

wangliang 3 年之前
父节点
当前提交
ae7017ad43

+ 1 - 1
themis-admin/src/main/java/com/qmth/themis/admin/websocket/WebSocketAdminServer.java

@@ -130,7 +130,7 @@ public class WebSocketAdminServer
      */
     @OnError
     public void onError(Session session, Throwable error) throws IOException {
-        log.error("用户错误:{},原因:{}", this.websocketSessionId, error);
+        log.error("用户错误:{},原因:{}", this.websocketSessionId, error.getMessage());
         close(this);
         throw new BusinessException(error.getMessage());
     }

+ 26 - 0
themis-exam/src/main/java/com/qmth/themis/exam/listener/service/impl/MqOeLogicServiceImpl.java

@@ -260,6 +260,9 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
                     WebSocketOeServer webSocketOeServer = (WebSocketOeServer) map.get(SystemConstant.WEB_SOCKET_OE_SERVER);
                     WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.INVIGILATE_STOP_EXAM.name(), map);
                     webSocketOeServer.sendMessage(websocketDto);
+                } else {
+                    mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                    redisUtil.delete(key, mqDto.getId());
                 }
             });
         } else if (Objects.equals(MqTagEnum.OE_WARNING_FINISH.name(), tag)) {//预警交卷
@@ -270,6 +273,9 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
                     WebSocketOeServer webSocketOeServer = (WebSocketOeServer) map.get(SystemConstant.WEB_SOCKET_OE_SERVER);
                     WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.BREACH_STOP_EXAM.name(), map);
                     webSocketOeServer.sendMessage(websocketDto);
+                } else {
+                    mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                    redisUtil.delete(key, mqDto.getId());
                 }
             });
         } else if (Objects.equals(MqTagEnum.OE_IM_CLUSTERING.name(), tag)) {//点对点消息
@@ -278,6 +284,9 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
                 WebSocketOeServer webSocketOeServer = (WebSocketOeServer) map.get(SystemConstant.WEB_SOCKET_OE_SERVER);
                 WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.INVIGILATE_NOTICE.name(), map);
                 webSocketOeServer.sendMessage(websocketDto);
+            } else {
+                mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                redisUtil.delete(key, mqDto.getId());
             }
         } else if (Objects.equals(MqTagEnum.OE_IM_BROADCASTING.name(), tag)) {//广播消息
             JSONArray jsonArray = JSONArray.parseArray(String.valueOf(mqDto.getBody()));
@@ -289,6 +298,9 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
                 WebSocketOeServer webSocketOeServer = (WebSocketOeServer) map.get(SystemConstant.WEB_SOCKET_OE_SERVER);
                 WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.INVIGILATE_LIVENESS_VERIFY.name(), map);
                 webSocketOeServer.sendMessage(websocketDto);
+            } else {
+                mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                redisUtil.delete(key, mqDto.getId());
             }
         } else if (Objects.equals(MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_READY.name(), tag)//移动端拍照/录音扫描完成
                 || Objects.equals(MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_UPLOAD.name(), tag)//移动端拍照/录音上传成功
@@ -389,6 +401,9 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
 
                     webSocketOeServer.sendMessage(websocketDto);
                 }
+            } else {
+                mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                redisUtil.delete(key, mqDto.getId());
             }
         }
     }
@@ -408,6 +423,11 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
                 || Objects.equals(MqTagEnum.EXAM_START.name(), tag)) {//考试开始
             Long recordId = Long.parseLong(String.valueOf(mqDto.getBody()));
             ExamRecordStatusEnum examRecordStatusEnum = ExamRecordCacheUtil.getStatus(recordId);
+            if (Objects.isNull(examRecordStatusEnum)) {
+                mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                redisUtil.delete(key, mqDto.getId());
+                return;
+            }
             Long examStudentId = ExamRecordCacheUtil.getExamStudentId(recordId);
             Long studentId = null;
             if (Objects.isNull(examStudentId)) {
@@ -460,6 +480,9 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
 
                     webSocketFirstMobileServer.sendMessage(websocketDto);
                 }
+            } else {
+                mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                redisUtil.delete(key, mqDto.getId());
             }
             String mobileSecondWebsocketId = ExamRecordCacheUtil.getMobileSecondWebsocketId(recordId);
             if (Objects.nonNull(mobileSecondWebsocketId) && Objects.nonNull(webSocketMap.get(mobileSecondWebsocketId + "-" + MonitorVideoSourceEnum.MOBILE_SECOND.name()))) {
@@ -499,6 +522,9 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
 
                     webSocketSecondMobileServer.sendMessage(websocketDto);
                 }
+            } else {
+                mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                redisUtil.delete(key, mqDto.getId());
             }
         }
     }

+ 1 - 1
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketMobileServer.java

@@ -181,7 +181,7 @@ public class WebSocketMobileServer implements Concurrently {
      */
     @OnError
     public void onError(Session session, Throwable error) throws IOException {
-        log.error("用户错误:{},原因:{}", this.websocketSessionId, error);
+        log.error("用户错误:{},原因:{}", this.websocketSessionId, error.getMessage());
         close(this);
         throw new BusinessException(error.getMessage());
     }

+ 1 - 1
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java

@@ -181,7 +181,7 @@ public class WebSocketOeServer implements Concurrently {
      */
     @OnError
     public void onError(Session session, Throwable error) throws IOException {
-        log.error("用户错误:{},原因:{}", this.websocketSessionId, error);
+        log.error("用户错误:{},原因:{}", this.websocketSessionId, error.getMessage());
         close(this);
         throw new BusinessException(error.getMessage());
     }

+ 11 - 8
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -206,10 +206,9 @@ public class MqLogicServiceImpl implements MqLogicService {
         Gson gson = new Gson();
         String sessionId = mqDto.getObjId();
         TBSession tbSession = (TBSession) redisUtil.getUserSession(sessionId);
-        if (Objects.isNull(tbSession)) {
-            throw new BusinessException("缓存session为空");
+        if (Objects.nonNull(tbSession)) {
+            tbSessionService.saveOrUpdate(tbSession);
         }
-        tbSessionService.saveOrUpdate(tbSession);
         mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
@@ -381,13 +380,17 @@ public class MqLogicServiceImpl implements MqLogicService {
                 }
                 TMTencentVideoMessage tencentVideoMessage = new TMTencentVideoMessage(response.getRequestId(), JacksonUtil.parseJson(response));
                 tencentVideoMessageService.save(tencentVideoMessage);
+
+                mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);//表示成功处理
+                TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+                tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                redisUtil.delete(key, mqDto.getId());
             }
+        } else {
+            mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);//表示成功处理
+            redisUtil.delete(key, mqDto.getId());
         }
-        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);//表示成功处理
-        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
-        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-        redisUtil.delete(key, mqDto.getId());
     }
 
     /**