فهرست منبع

websocket mq消息改为顺序发送

wangliang 4 سال پیش
والد
کامیت
20e44345dc

+ 6 - 5
themis-admin/src/main/java/com/qmth/themis/admin/api/TIeInvigilateController.java

@@ -371,7 +371,7 @@ public class TIeInvigilateController {
                 mqTagEnum = MqTagEnum.OE_HARD_FINISH;
             }
             MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), mqTagEnum.name(), JacksonUtil.parseJson(recordIdList), mqTagEnum, String.valueOf(tbUser.getId()), mapParameter, tbUser.getName());
-            mqDtoService.assembleSendOneWayMsg(mqDto);
+            mqDtoService.assembleSendOneOrderMsg(mqDto);
             //发送mq给客户端强制收卷end
         }
         return ResultUtil.ok(true);
@@ -566,13 +566,14 @@ public class TIeInvigilateController {
         }
         TBUser tbUser = (TBUser) ServletUtil.getRequestAccount();
         //发送mq给客户端监考消息start
-        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_IM_CLUSTERING.name(), recordId, MqTagEnum.OE_IM_CLUSTERING, String.valueOf(tbUser.getId()), mapParameter, tbUser.getName());
-        mqDtoService.assembleSendOneWayMsg(mqDto);
+        mapParameter.put("formUserId",tbUser.getId());
+        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_IM_CLUSTERING.name(), recordId, MqTagEnum.OE_IM_CLUSTERING, String.valueOf(recordId), mapParameter, tbUser.getName());
+        mqDtoService.assembleSendOneOrderMsg(mqDto);
         //发送mq给客户端监考消息end
 
 //        //发送mq给客户端监考强制活体验证start
-//        mqDto = new MqDto(MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.OE_LIVENESS_VERIFY.name(), recordId, MqTagEnum.OE_LIVENESS_VERIFY, String.valueOf(tbUser.getId()), mapParameter, tbUser.getName());
-//        mqDtoService.assembleSendOneWayMsg(mqDto);
+//        mqDto = new MqDto(MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.OE_LIVENESS_VERIFY.name(), recordId, MqTagEnum.OE_LIVENESS_VERIFY, String.valueOf(recordId), mapParameter, tbUser.getName());
+//        mqDtoService.assembleSendOneOrderMsg(mqDto);
 //        //发送mq给客户端监考强制活体验证end
         return ResultUtil.ok(true);
     }

+ 2 - 0
themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java

@@ -101,6 +101,8 @@ public class SystemConstant {
 
     public static final String MQ_BROADCAST_TOPIC_BUFFER_LIST = "mq:broadcast:topic:buffer:list";
 
+    public static final String MQ_ORDER_TOPIC_BUFFER_LIST = "mq:order:topic:buffer:list";
+
     public static final String TYPE = "type";
 
     public static final String LOCAL = "local";

+ 24 - 0
themis-business/src/main/java/com/qmth/themis/business/service/MqDtoService.java

@@ -29,6 +29,30 @@ public interface MqDtoService {
      */
     public MqDto assembleSendAsyncDelayMsg(MqDto mqDto);
 
+    /**
+     * 组装异步单向顺序消息
+     *
+     * @param mqDto
+     * @return
+     */
+    public MqDto assembleSendOneOrderMsg(MqDto mqDto);
+
+    /**
+     * 组装异步顺序消息
+     *
+     * @param mqDto
+     * @return
+     */
+    public MqDto assembleSendAsyncOrderMsg(MqDto mqDto);
+
+    /**
+     * 组装同步顺序消息
+     *
+     * @param mqDto
+     * @return
+     */
+    public MqDto assembleSendSyncOrderMsg(MqDto mqDto);
+
     /**
      * 组装同步延时消息
      *

+ 68 - 0
themis-business/src/main/java/com/qmth/themis/business/service/impl/MqDtoServiceImpl.java

@@ -77,6 +77,72 @@ public class MqDtoServiceImpl implements MqDtoService {
         return null;
     }
 
+    /**
+     * 组装异步单向顺序消息
+     *
+     * @param mqDto
+     * @return
+     */
+    @Override
+    public MqDto assembleSendOneOrderMsg(MqDto mqDto) {
+        mqDto.setAck(SystemConstant.DELIVERED_ACK_TYPE);
+        try {
+            producerServer.sendOneWayOrderly(mqDto);
+        } catch (Exception e) {
+            log.error("请求出错", e);
+            if (Objects.nonNull(mqDto)) {
+                mqDto.setAck(SystemConstant.UNSEND_ACK_TYPE);
+            }
+        } finally {
+            setTopicBuffer(mqDto);
+        }
+        return null;
+    }
+
+    /**
+     * 组装异步顺序消息
+     *
+     * @param mqDto
+     * @return
+     */
+    @Override
+    public MqDto assembleSendAsyncOrderMsg(MqDto mqDto) {
+        mqDto.setAck(SystemConstant.DELIVERED_ACK_TYPE);
+        try {
+            producerServer.asyncOrderlyMsg(mqDto);
+        } catch (Exception e) {
+            log.error("请求出错", e);
+            if (Objects.nonNull(mqDto)) {
+                mqDto.setAck(SystemConstant.UNSEND_ACK_TYPE);
+            }
+        } finally {
+            setTopicBuffer(mqDto);
+        }
+        return null;
+    }
+
+    /**
+     * 组装同步顺序消息
+     *
+     * @param mqDto
+     * @return
+     */
+    @Override
+    public MqDto assembleSendSyncOrderMsg(MqDto mqDto) {
+        mqDto.setAck(SystemConstant.DELIVERED_ACK_TYPE);
+        try {
+            producerServer.syncOrderlyMsg(mqDto);
+        } catch (Exception e) {
+            log.error("请求出错", e);
+            if (Objects.nonNull(mqDto)) {
+                mqDto.setAck(SystemConstant.UNSEND_ACK_TYPE);
+            }
+        } finally {
+            setTopicBuffer(mqDto);
+        }
+        return null;
+    }
+
     /**
      * 组装同步延时消息
      *
@@ -156,6 +222,8 @@ public class MqDtoServiceImpl implements MqDtoService {
                 redisUtil.set(SystemConstant.MQ_BROADCAST_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
             } else if (Objects.equals(mqDto.getType().getType(), "delay")) {
                 redisUtil.set(SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
+            } else if (Objects.equals(mqDto.getType().getType(), "order")) {
+                redisUtil.set(SystemConstant.MQ_ORDER_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
             }
         }
     }

+ 1 - 1
themis-business/src/main/java/com/qmth/themis/business/service/impl/TEExamServiceImpl.java

@@ -1158,7 +1158,7 @@ public class TEExamServiceImpl extends ServiceImpl<TEExamMapper, TEExam> impleme
         redisUtil.setStudent(teStudentCacheDto.getId(), teStudentCacheDto);
 
         MqDto mobileMqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_STOP.name(), recordId, MqTagEnum.EXAM_STOP, String.valueOf(recordId), Collections.singletonMap(SystemConstant.RECORD_ID, recordId), String.valueOf(recordId));
-        mqDtoService.assembleSendOneWayMsg(mobileMqDto);
+        mqDtoService.assembleSendOneOrderMsg(mobileMqDto);
         //异步持久化
         checkToPersisted(recordId);
         //mq发送消息start

+ 4 - 4
themis-exam/src/main/java/com/qmth/themis/exam/api/TEMobileController.java

@@ -96,8 +96,8 @@ public class TEMobileController {
         if (param.getSubIndex() != null) {
             mapParameter.put("subIndex", param.getSubIndex());
         }
-        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_READY.name(), param.getRecordId(), MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_READY, String.valueOf(teStudent.getId()), mapParameter, teStudent.getName());
-        mqDtoService.assembleSendOneWayMsg(mqDto);
+        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_READY.name(), param.getRecordId(), MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_READY, String.valueOf(param.getRecordId()), mapParameter, teStudent.getName());
+        mqDtoService.assembleSendOneOrderMsg(mqDto);
         ExamStudentCacheBean es = examStudentService.getExamStudentCacheBean(esId);
         AnswerReadyResponseBean ret = new AnswerReadyResponseBean();
         ret.setCourseName(es.getCourseName());
@@ -144,8 +144,8 @@ public class TEMobileController {
         }
         mapParameter.put("urls", param.getUrls());
 
-        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_UPLOAD.name(), param.getRecordId(), MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_READY, String.valueOf(teStudent.getId()), mapParameter, teStudent.getName());
-        mqDtoService.assembleSendOneWayMsg(mqDto);
+        MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_UPLOAD.name(), param.getRecordId(), MqTagEnum.OE_WEBSOCKET_MOBILE_ANSWER_READY, String.valueOf(param.getRecordId()), mapParameter, teStudent.getName());
+        mqDtoService.assembleSendOneOrderMsg(mqDto);
         return ResultUtil.ok(ret);
     }
 }

+ 2 - 2
themis-exam/src/main/java/com/qmth/themis/exam/api/TIeInvigilateCallMobileController.java

@@ -195,11 +195,11 @@ public class TIeInvigilateCallMobileController {
         if (Objects.equals(status.name(), MonitorStatusSourceEnum.START.name())) {
             //监控开始
             MqDto mqDtoStart = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.MONITOR_START.name(), recordId, MqTagEnum.MONITOR_START, String.valueOf(recordId), mqMap, String.valueOf(recordId));
-            mqDtoService.assembleSendOneWayMsg(mqDtoStart);
+            mqDtoService.assembleSendOneOrderMsg(mqDtoStart);
         } else if (Objects.equals(status.name(), MonitorStatusSourceEnum.STOP.name())) {
             //监控结束
             MqDto mqDtoStop = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.MONITOR_STOP.name(), recordId, MqTagEnum.MONITOR_STOP, String.valueOf(recordId), mqMap, String.valueOf(recordId));
-            mqDtoService.assembleSendOneWayMsg(mqDtoStop);
+            mqDtoService.assembleSendOneOrderMsg(mqDtoStop);
         }
         Map map = new HashMap();
         map.put("status", status.name());

+ 3 - 3
themis-exam/src/main/java/com/qmth/themis/exam/config/ExamConstant.java

@@ -33,12 +33,12 @@ public class ExamConstant {
         //客户端考试结束
         if (clientStop) {
             MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_WEBSOCKET_EXAM_STOP.name(), recordId, MqTagEnum.OE_WEBSOCKET_EXAM_STOP, String.valueOf(recordId), mapParameter, String.valueOf(recordId));
-            mqDtoService.assembleSendOneWayMsg(mqDto);
+            mqDtoService.assembleSendOneOrderMsg(mqDto);
         }
         //移动端考试结束
         if (mobileStop) {
             MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_STOP.name(), recordId, MqTagEnum.EXAM_STOP, String.valueOf(recordId), mapParameter, String.valueOf(recordId));
-            mqDtoService.assembleSendOneWayMsg(mqDto);
+            mqDtoService.assembleSendOneOrderMsg(mqDto);
         }
     }
 
@@ -53,6 +53,6 @@ public class ExamConstant {
         Map mapParameter = new HashMap<>();
         mapParameter.put(SystemConstant.RECORD_ID, recordId);
         MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_START.name(), recordId, MqTagEnum.EXAM_START, String.valueOf(recordId), mapParameter, String.valueOf(recordId));
-        mqDtoService.assembleSendOneWayMsg(mqDto);
+        mqDtoService.assembleSendOneOrderMsg(mqDto);
     }
 }

+ 1 - 1
themis-exam/src/main/java/com/qmth/themis/exam/listener/service/impl/MqOeLogicServiceImpl.java

@@ -149,7 +149,7 @@ public class MqOeLogicServiceImpl implements MqOeLogicService {
                     map.put("content", prop.get("content"));
                     WebsocketDto websocketDto = new WebsocketDto(WebsocketTypeEnum.INVIGILATE_NOTICE.name(), map);
                     webSocketOeServer.sendMessage(websocketDto);
-                    TIeExamInvigilateNotice tIeExamInvigilateNotice = new TIeExamInvigilateNotice(examId, examActivityId, recordId, Long.parseLong(mqDto.getObjId()), examStudentId, MessageTypeEnum.valueOf(String.valueOf(prop.get("type")).toUpperCase()), String.valueOf(prop.get("content")));
+                    TIeExamInvigilateNotice tIeExamInvigilateNotice = new TIeExamInvigilateNotice(examId, examActivityId, recordId, Long.parseLong((String) prop.get("formUserId")), examStudentId, MessageTypeEnum.valueOf(String.valueOf(prop.get("type")).toUpperCase()), String.valueOf(prop.get("content")));
                     tIeExamInvigilateNoticeService.saveOrUpdate(tIeExamInvigilateNotice);
                 }
             }

+ 1 - 1
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketMobileServer.java

@@ -133,7 +133,7 @@ public class WebSocketMobileServer implements Concurrently {
             mqMap.put("source", this.source.name());
             //监控结束
             MqDto mqDtoStop = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.MONITOR_STOP.name(), recordId, MqTagEnum.MONITOR_STOP, String.valueOf(recordId), mqMap, String.valueOf(recordId));
-            mqDtoService.assembleSendOneWayMsg(mqDtoStop);
+            mqDtoService.assembleSendOneOrderMsg(mqDtoStop);
         }
 //        log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.sessionId, getOnlineCount(), this.updateTime);
         log.info("用户退出:{},updateTime:{}", this.websocketSessionId, this.updateTime);

+ 1 - 1
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java

@@ -111,7 +111,7 @@ public class WebSocketOeServer implements Concurrently {
         mqMap.put(MonitorVideoSourceEnum.MOBILE_FIRST.name().toLowerCase(), ExamRecordCacheUtil.getMonitorStatus(this.recordId, MonitorVideoSourceEnum.MOBILE_FIRST));
         mqMap.put(MonitorVideoSourceEnum.MOBILE_SECOND.name().toLowerCase(), ExamRecordCacheUtil.getMonitorStatus(this.recordId, MonitorVideoSourceEnum.MOBILE_SECOND));
         MqDto mqDto = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.OE_WEBSOCKET_MOBILE_MONITOR_STATUS.name(), recordId, MqTagEnum.OE_WEBSOCKET_MOBILE_MONITOR_STATUS, String.valueOf(recordId), mqMap, String.valueOf(recordId));
-        mqDtoService.assembleSendOneWayMsg(mqDto);
+        mqDtoService.assembleSendOneOrderMsg(mqDto);
     }
 
     /**

+ 2 - 2
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -291,7 +291,7 @@ public class MqLogicServiceImpl implements MqLogicService {
             //发送移动端监考退出考试mq消息 start
             MqDto mqDtoExamStop = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_STOP.name(), recordId,
                     MqTagEnum.EXAM_STOP, String.valueOf(recordId), String.valueOf(recordId));
-            mqDtoService.assembleSendOneWayMsg(mqDtoExamStop);
+            mqDtoService.assembleSendOneOrderMsg(mqDtoExamStop);
             //发送移动端监考退出考试mq消息 end
         }
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
@@ -875,7 +875,7 @@ public class MqLogicServiceImpl implements MqLogicService {
         //发送移动端监考退出考试mq消息 start
         MqDto mqDtoExamStop = new MqDto(mqUtil.getMqGroupDomain().getTopic(), MqTagEnum.EXAM_STOP.name(), recordId,
                 MqTagEnum.EXAM_STOP, String.valueOf(recordId), String.valueOf(recordId));
-        mqDtoService.assembleSendOneWayMsg(mqDtoExamStop);
+        mqDtoService.assembleSendOneOrderMsg(mqDtoExamStop);
         //发送移动端监考退出考试mq消息 end
 
         Gson gson = new Gson();