Explorar el Código

增加待考列表接口

wangliang hace 5 años
padre
commit
2aa5709b3f
Se han modificado 19 ficheros con 621 adiciones y 110 borrados
  1. 32 6
      themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java
  2. 40 1
      themis-business/src/main/java/com/qmth/themis/business/domain/MqConfigDomain.java
  3. 12 1
      themis-business/src/main/java/com/qmth/themis/business/entity/TMRocketMessage.java
  4. 10 1
      themis-business/src/main/java/com/qmth/themis/business/enums/MqEnum.java
  5. 3 1
      themis-business/src/main/java/com/qmth/themis/business/enums/SystemOperationEnum.java
  6. 0 1
      themis-business/src/main/resources/mapper/TOeExamRecordMapper.xml
  7. 4 0
      themis-common/src/main/java/com/qmth/themis/common/enums/ExceptionResultEnum.java
  8. 3 2
      themis-exam/src/main/java/com/qmth/themis/exam/api/TEStudentController.java
  9. 48 39
      themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketServer.java
  10. 109 8
      themis-exam/src/main/java/com/qmth/themis/exam/websocketTemplete/WebSocketOeMessageTemplete.java
  11. 8 0
      themis-exam/src/main/resources/application.properties
  12. 19 11
      themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketSessionConsumer.java
  13. 33 25
      themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketTaskConsumer.java
  14. 21 12
      themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketUserLogConsumer.java
  15. 141 0
      themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketWebsocketUnNormalLogConsumer.java
  16. 20 0
      themis-mq/src/main/java/com/qmth/themis/mq/service/MqDtoService.java
  17. 18 0
      themis-mq/src/main/java/com/qmth/themis/mq/service/ProducerServer.java
  18. 57 1
      themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqDtoServiceImpl.java
  19. 43 1
      themis-mq/src/main/java/com/qmth/themis/mq/service/impl/ProducerServerImpl.java

+ 32 - 6
themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java

@@ -1,13 +1,11 @@
 package com.qmth.themis.business.constant;
 
+import com.google.common.collect.Maps;
 import com.qmth.themis.common.contanst.Constants;
 import com.qmth.themis.common.enums.Platform;
 import com.qmth.themis.common.enums.Source;
 
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 
 /**
  * @Description: 系统常量
@@ -53,6 +51,7 @@ public class SystemConstant {
     public static final String USERLOG_TOPIC_BUFFER_LIST = "userLog:topic:buffer:list";
     public static final String STUDENTLOG_TOPIC_BUFFER_LIST = "studentLog:topic:buffer:list";
     public static final String TASKLOG_TOPIC_BUFFER_LIST = "taskLog:topic:buffer:list";
+    public static final String WEBSOCKET_UN_NORMAL_LOG_TOPIC_BUFFER_LIST = "websocketUnNormal:topic:buffer:list";
     public static final String TYPE = "type";
     public static final String LOCAL = "local";
     public static final String OSS = "oss";
@@ -101,7 +100,10 @@ public class SystemConstant {
      */
     public static final int CONSUME_MESSAGE_BATCH_MAX_SIZE = 10;
     public static final int MAXRECONSUMETIMES = 3;
-    public static final String OBJ_ID = "objId";
+    public static final String MQDTO_OBJ = "mqDtoObj";
+    public static final long MESSAGE_TIMEOUT = 3000L;
+    public static final String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
+    public static Map<String, Integer> mqDelayLevel = null;
 
     public static final int DELIVERED_ACK_TYPE = 0;//消息"已发出",但尚未处理结束
     public static final int POSION_ACK_TYPE = 1;//消息"错误",通常表示"抛弃"此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
@@ -122,8 +124,11 @@ public class SystemConstant {
      * websocket
      */
     public static final String WEBSOCKET_OE_ONLINE_COUNT = "websocket:oe:online:count";
+    public static final String WEBSOCKET_UN_NORMAL_LIST = "websocket:oe:unnormal:list";
     public static final String GET = "get";
-    public static final long WEBSOCKET_MAX_TIME_OUT = 3 * 60 * 1000;
+    //    public static final long WEBSOCKET_MAX_TIME_OUT = 1 * 60 * 1000;
+    public static final long WEBSOCKET_MAX_TIME_OUT = 10 * 1000;
+    public static final String ACK_MESSAGE = "ackMessage";
     /**
      * 缓存配置
      */
@@ -131,6 +136,27 @@ public class SystemConstant {
     public static final String studentOauth = "student:oauth:cache";
     public static final String configOauth = "config:cache";
 
+    static {
+        mqDelayLevel = new HashMap<>();
+        mqDelayLevel.put("1s", 1);
+        mqDelayLevel.put("5s", 2);
+        mqDelayLevel.put("10s", 3);
+        mqDelayLevel.put("30s", 4);
+        mqDelayLevel.put("1m", 5);
+        mqDelayLevel.put("2m", 6);
+        mqDelayLevel.put("3m", 7);
+        mqDelayLevel.put("4m", 8);
+        mqDelayLevel.put("5m", 9);
+        mqDelayLevel.put("6m", 10);
+        mqDelayLevel.put("7m", 11);
+        mqDelayLevel.put("8m", 12);
+        mqDelayLevel.put("9m", 13);
+        mqDelayLevel.put("10m", 14);
+        mqDelayLevel.put("20m", 15);
+        mqDelayLevel.put("30m", 16);
+        mqDelayLevel.put("1h", 17);
+        mqDelayLevel.put("2h", 18);
+    }
 
     /**
      * 获取过期时间

+ 40 - 1
themis-business/src/main/java/com/qmth/themis/business/domain/MqConfigDomain.java

@@ -50,10 +50,49 @@ public class MqConfigDomain implements Serializable {
     private String taskConsumerRoomCodeImportGroup;
     private String taskTopicRoomCodeExportTag;
     private String taskConsumerRoomCodeExportGroup;
-    
     private String taskTopicExamPaperImportTag;
     private String taskConsumerExamPaperImportGroup;
 
+    /**
+     * websocket group
+     */
+    private String websocketUnNormalTopic;
+    private String websocketUnNormalConsumerGroup;
+    private String websocketUnNormalTopicOeTag;
+    private String websocketUnNormalConsumerOeGroup;
+
+    public String getWebsocketUnNormalTopic() {
+        return websocketUnNormalTopic;
+    }
+
+    public void setWebsocketUnNormalTopic(String websocketUnNormalTopic) {
+        this.websocketUnNormalTopic = websocketUnNormalTopic;
+    }
+
+    public String getWebsocketUnNormalConsumerGroup() {
+        return websocketUnNormalConsumerGroup;
+    }
+
+    public void setWebsocketUnNormalConsumerGroup(String websocketUnNormalConsumerGroup) {
+        this.websocketUnNormalConsumerGroup = websocketUnNormalConsumerGroup;
+    }
+
+    public String getWebsocketUnNormalTopicOeTag() {
+        return websocketUnNormalTopicOeTag;
+    }
+
+    public void setWebsocketUnNormalTopicOeTag(String websocketUnNormalTopicOeTag) {
+        this.websocketUnNormalTopicOeTag = websocketUnNormalTopicOeTag;
+    }
+
+    public String getWebsocketUnNormalConsumerOeGroup() {
+        return websocketUnNormalConsumerOeGroup;
+    }
+
+    public void setWebsocketUnNormalConsumerOeGroup(String websocketUnNormalConsumerOeGroup) {
+        this.websocketUnNormalConsumerOeGroup = websocketUnNormalConsumerOeGroup;
+    }
+
     public String getTaskTopicRoomCodeExportTag() {
         return taskTopicRoomCodeExportTag;
     }

+ 12 - 1
themis-business/src/main/java/com/qmth/themis/business/entity/TMRocketMessage.java

@@ -59,9 +59,12 @@ public class TMRocketMessage implements Serializable {
     private Integer sequence;
 
     @ApiModelProperty(value = "扩展类型")
-    @TableField(value = "properties")
+    @TableField(exist = false)
     private Map<String, Object> properties;
 
+    @TableField(value = "properties")
+    private String prop;
+
     @ApiModelProperty(value = "创建时间")
     @TableField(value = "create_time", fill = FieldFill.INSERT)
     private Date createTime;
@@ -70,6 +73,14 @@ public class TMRocketMessage implements Serializable {
     @TableField(value = "timestamp")
     private Long timestamp;
 
+    public String getProp() {
+        return prop;
+    }
+
+    public void setProp(String prop) {
+        this.prop = prop;
+    }
+
     public static long getSerialVersionUID() {
         return serialVersionUID;
     }

+ 10 - 1
themis-business/src/main/java/com/qmth/themis/business/enums/MqEnum.java

@@ -43,7 +43,12 @@ public enum MqEnum {
     /**
      * 任务
      */
-    TASK_LOG(7, "任务");
+    TASK_LOG(7, "任务"),
+
+    /**
+     * websocket超时退出
+     */
+    WEBSOCKET_UN_NORMAL_LOG(8, "websocket超时退出");
 
     private int id;
 
@@ -73,6 +78,8 @@ public enum MqEnum {
             return USER_LOG.getId();
         } else if (Objects.equals(value.trim(), TASK_LOG.name())) {
             return TASK_LOG.getId();
+        } else if (Objects.equals(value.trim(), WEBSOCKET_UN_NORMAL_LOG.name())) {
+            return WEBSOCKET_UN_NORMAL_LOG.getId();
         } else {
             return MESSAGE_LOG.getId();
         }
@@ -97,6 +104,8 @@ public enum MqEnum {
             return USER_LOG.name();
         } else if (Objects.equals(value.trim(), TASK_LOG.getCode())) {
             return TASK_LOG.name();
+        } else if (Objects.equals(value.trim(), WEBSOCKET_UN_NORMAL_LOG.getCode())) {
+            return WEBSOCKET_UN_NORMAL_LOG.name();
         } else {
             return MESSAGE_LOG.name();
         }

+ 3 - 1
themis-business/src/main/java/com/qmth/themis/business/enums/SystemOperationEnum.java

@@ -11,7 +11,9 @@ public enum SystemOperationEnum {
 
     LOGIN(1, "登录"),
 
-    LOGOUT(2, "注销");
+    LOGOUT(2, "注销"),
+
+    OE_NET_UN_NORMAL(3, "客户端websocket网络延时");
 
     private int id;
 

+ 0 - 1
themis-business/src/main/resources/mapper/TOeExamRecordMapper.xml

@@ -55,7 +55,6 @@
             and tee.enable = 1
             and teea.enable = 1
             and tees.enable = 1
-            and teea.start_time <![CDATA[ >= ]]> date_add(now(), interval IFNULL(teea.opening_seconds, tee.opening_seconds) second)
         </where>
     </select>
 </mapper>

+ 4 - 0
themis-common/src/main/java/com/qmth/themis/common/enums/ExceptionResultEnum.java

@@ -32,6 +32,10 @@ public enum ExceptionResultEnum {
 
     EXAM_ID_IS_NULL("102", "考试批次id不能为空"),
 
+    RECORD_ID_IS_NULL("102", "考试记录id不能为空"),
+
+    RECORD_NO("102", "考试记录不存在"),
+
     EXAM_INFO_IS_NULL("102", "考试批次信息不能为空"),
 
     EXAM_IS_NULL("102", "考试不能为空"),

+ 3 - 2
themis-exam/src/main/java/com/qmth/themis/exam/api/TEStudentController.java

@@ -142,9 +142,10 @@ public class TEStudentController {
         Map unFinishExam = tOeExamRecordService.getUnFinishExam(teStudent.getId());
         if (Objects.isNull(unFinishExam)) {
             List<Map> list = teExamService.getWaitingExam(teStudent.getId());
-            map.put("waiting", list);
+            if (Objects.nonNull(list) && list.size() > 0) {
+                map.put("waiting", list);
+            }
         } else {
-            unFinishExam.put("activity", unFinishExam);
             map.put("unFinished", unFinishExam);
         }
         //获取全局考试配置

+ 48 - 39
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketServer.java

@@ -1,35 +1,36 @@
 package com.qmth.themis.exam.websocket;//package com.qmth.themis.backend.websocket;
 
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.websocket.OnClose;
-import javax.websocket.OnError;
-import javax.websocket.OnMessage;
-import javax.websocket.OnOpen;
-import javax.websocket.Session;
-import javax.websocket.server.PathParam;
-import javax.websocket.server.ServerEndpoint;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
 import com.alibaba.fastjson.JSONObject;
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.entity.TBSession;
+import com.qmth.themis.business.enums.MqEnum;
+import com.qmth.themis.business.enums.SystemOperationEnum;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.enums.ExceptionResultEnum;
 import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.signature.SignatureInfo;
 import com.qmth.themis.common.signature.SignatureType;
+import com.qmth.themis.common.util.Result;
+import com.qmth.themis.exam.config.DictionaryConfig;
 import com.qmth.themis.exam.enums.WebsocketTypeEnum;
 import com.qmth.themis.exam.websocketTemplete.WebSocketOeMessageTemplete;
+import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.service.MqDtoService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @Description: websocker服务端
@@ -50,11 +51,13 @@ public class WebSocketServer
      */
     private Session session;
     private String sessionId;
+    private Long recordId;
     private String platform = null;
     private String deviceId = null;
     private String Authorization = null;
     private Long time = null;
     private RedisUtil redisUtil;
+    private Long updateTime = null;
 
     /**
      * 连接建立成功调用的方法
@@ -75,6 +78,7 @@ public class WebSocketServer
         this.deviceId = String.valueOf(mapParameter.get("deviceId").get(0));
         this.Authorization = String.valueOf(mapParameter.get("Authorization").get(0));
         this.time = Long.parseLong(String.valueOf(mapParameter.get("time").get(0)));
+        this.recordId = Long.parseLong(String.valueOf(mapParameter.get("recordId").get(0)));
         String method = SystemConstant.GET;
         final SignatureInfo info = SignatureInfo
                 .parse(Authorization);
@@ -102,7 +106,7 @@ public class WebSocketServer
                     }
                     log.info("用户连接:" + this.sessionId + ",当前在线人数为:" + getOnlineCount());
                     try {
-                        sendMessage("连接成功");
+                        this.sendMessage("连接成功");
                     } catch (IOException e) {
                         e.printStackTrace();
                         log.error("用户:" + this.sessionId + ",网络异常!!!!!!");
@@ -125,8 +129,21 @@ public class WebSocketServer
             webSocketMap.remove(this.sessionId);
             //从set中删除
             subOnlineCount();
+            //判断是否是正常退出
+            Date now = new Date();
+            //大于等于超时时间,说明规定时间内都没有通信,非正常退出,因为期间会有心跳更新updateTime
+            if ((now.getTime() - this.updateTime) / 1000 >= SystemConstant.WEBSOCKET_MAX_TIME_OUT / 1000) {
+                log.info("超时退出");
+                redisUtil.set(SystemConstant.WEBSOCKET_UN_NORMAL_LIST, this.sessionId, this.sessionId);
+                //发送延时mq消息start
+                MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
+                DictionaryConfig dictionaryConfig = SpringContextHolder.getBean(DictionaryConfig.class);
+                MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getWebsocketUnNormalTopic(), dictionaryConfig.mqConfigDomain().getWebsocketUnNormalTopicOeTag(), SystemOperationEnum.OE_NET_UN_NORMAL, MqEnum.WEBSOCKET_UN_NORMAL_LOG, String.valueOf(this.recordId), this.sessionId);
+                mqDtoService.assembleSendAsyncDelayMsg(mqDto, SystemConstant.mqDelayLevel.get("10s"));
+                //发送延时mq消息end
+            }
         }
-        log.info("用户退出:" + this.sessionId + ",当前在线人数为:" + getOnlineCount());
+        log.info("用户退出:{},当前在线人数为:{},updateTime:{}", this.sessionId, getOnlineCount(), this.updateTime);
     }
 
     /**
@@ -148,21 +165,11 @@ public class WebSocketServer
                     WebSocketOeMessageTemplete webSocketOeMessageTemplete = SpringContextHolder.getBean(WebSocketOeMessageTemplete.class);
                     String type = String.valueOf(jsonObject.get("type"));
                     Long time = Long.parseLong(String.valueOf(jsonObject.get("time")));
+                    //todo 加入当前时间和time比较的校验
                     Method method = webSocketOeMessageTemplete.getClass().getDeclaredMethod(WebsocketTypeEnum.valueOf(type).getDesc(), String.class);
-                    method.invoke(webSocketOeMessageTemplete, String.valueOf(jsonObject.get("body")));
-                    session.getAsyncRemote().sendText("123");
+                    Result result = (Result) method.invoke(webSocketOeMessageTemplete, String.valueOf(jsonObject.get("body")));
+                    this.sendMessage(JSONObject.toJSONString(result));
                 }
-//                //追加发送人(防止串改)
-//                jsonObject.put("fromSessionId", this.sessionId);
-//                String toSessionId = jsonObject.getString("toSessionId");
-//                //传送给对应toSessionId用户的websocket
-//                if (Objects.nonNull(toSessionId) && webSocketMap.containsKey(toSessionId)) {
-//                    webSocketMap.get(toSessionId).sendMessage(jsonObject.toJSONString());
-//                    log.info("发送消息jsonObject:{}", jsonObject.toJSONString());
-//                } else {
-//                    log.error("请求的sessionId:" + toSessionId + "不在该服务器上");
-//                    //否则不在这个服务器上,发送到mysql或者redis
-//                }
             } catch (Exception e) {
                 e.printStackTrace();
             }
@@ -177,7 +184,7 @@ public class WebSocketServer
      */
     @OnError
     public void onError(Session session, Throwable error) {
-        log.error("用户错误:" + this.sessionId + ",原因:" + error.getMessage());
+        log.error("用户错误:{},原因:{}", this.sessionId, error.getMessage());
         error.printStackTrace();
         throw new BusinessException(error.getMessage());
     }
@@ -190,17 +197,19 @@ public class WebSocketServer
      */
     public void sendMessage(String message) throws IOException {
         log.info("message:{}", message);
+        this.session.getAsyncRemote().sendText(message);
+        this.updateTime = System.currentTimeMillis();
     }
 
     /**
      * 发送自定义消息
      */
     public static void sendInfo(String message, @PathParam("sessionId") String sessionId) throws IOException {
-        log.info("发送消息到:" + sessionId + ",报文:" + message);
+        log.info("发送消息到:{},报文:{}", sessionId, message);
         if (Objects.nonNull(sessionId) && webSocketMap.containsKey(sessionId)) {
             webSocketMap.get(sessionId).sendMessage(message);
         } else {
-            log.error("用户" + sessionId + ",不在线!");
+            log.error("用户[:{}]不在线!", sessionId);
         }
     }
 
@@ -218,7 +227,7 @@ public class WebSocketServer
      * 在线人数加一
      */
     public synchronized void addOnlineCount() {
-        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX+this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
+        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
             Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
             int count = 0;
             if (Objects.nonNull(o)) {
@@ -233,7 +242,7 @@ public class WebSocketServer
      * 在线人数减一
      */
     public synchronized void subOnlineCount() {
-        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX+this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
+        if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
             Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
             int count = 0;
             if (Objects.nonNull(o)) {

+ 109 - 8
themis-exam/src/main/java/com/qmth/themis/exam/websocketTemplete/WebSocketOeMessageTemplete.java

@@ -1,42 +1,143 @@
 package com.qmth.themis.exam.websocketTemplete;
 
 import com.alibaba.fastjson.JSONObject;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TOeExamRecord;
+import com.qmth.themis.business.service.TOeExamRecordService;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.common.enums.ExceptionResultEnum;
+import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.util.Result;
+import com.qmth.themis.common.util.ResultUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
+import javax.annotation.Resource;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * @Description: 考生端websocket模版
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/24
+ */
 @Component
 public class WebSocketOeMessageTemplete {
     private final static Logger log = LoggerFactory.getLogger(WebSocketOeMessageTemplete.class);
 
+    @Resource
+    TOeExamRecordService tOeExamRecordService;
+
+    /**
+     * 客户端已下载试卷
+     *
+     * @param body
+     * @return
+     */
+    @Transactional
     public Result clientPaperDownload(String body) {
         JSONObject jsonObject = JSONObject.parseObject(body);
         log.info("clientPaperDownload jsonObject:{}", jsonObject.toJSONString());
-        return null;
+        if (Objects.isNull(jsonObject.get("recordId")) || Objects.equals(jsonObject.get("recordId"), "")) {
+            throw new BusinessException(ExceptionResultEnum.RECORD_ID_IS_NULL);
+        }
+        Long recordId = Long.parseLong(String.valueOf(jsonObject.get("recordId")));
+        TOeExamRecord tOeExamRecord = tOeExamRecordService.getById(recordId);
+        if (Objects.isNull(tOeExamRecord)) {
+            throw new BusinessException(ExceptionResultEnum.RECORD_NO);
+        }
+        tOeExamRecord.setPaperDownload(0);
+        tOeExamRecordService.updateById(tOeExamRecord);
+        //todo 更新redis考试记录缓存
+        return ResultUtil.ok(JacksonUtil.parseJson(SystemConstant.SUCCESS));
     }
 
-    public Result syncStatus() {
-        return null;
+    /**
+     * 状态同步
+     *
+     * @param body
+     * @return
+     */
+    @Transactional
+    public Result syncStatus(String body) {
+        JSONObject jsonObject = JSONObject.parseObject(body);
+        log.info("syncStatus jsonObject:{}", jsonObject.toJSONString());
+        if (Objects.isNull(jsonObject.get("recordId")) || Objects.equals(jsonObject.get("recordId"), "")) {
+            throw new BusinessException(ExceptionResultEnum.RECORD_ID_IS_NULL);
+        }
+        Long recordId = Long.parseLong(String.valueOf(jsonObject.get("recordId")));
+        if (Objects.isNull(jsonObject.get("progress")) || Objects.equals(jsonObject.get("progress"), "")) {
+            throw new BusinessException("答题进度不能为空");
+        }
+        Double progress = Double.parseDouble(String.valueOf(jsonObject.get("durationSeconds")));
+        if (Objects.isNull(jsonObject.get("durationSeconds")) || Objects.equals(jsonObject.get("durationSeconds"), "")) {
+            throw new BusinessException("考试累计用时秒数不能为空");
+        }
+        Integer durationSeconds = Integer.parseInt((String.valueOf(jsonObject.get("durationSeconds"))));
+        TOeExamRecord tOeExamRecord = tOeExamRecordService.getById(recordId);
+        if (Objects.isNull(tOeExamRecord)) {
+            throw new BusinessException(ExceptionResultEnum.RECORD_NO);
+        }
+        tOeExamRecord.setAnswerProgress(progress);
+        tOeExamRecord.setDurationSeconds(durationSeconds);
+        tOeExamRecordService.updateById(tOeExamRecord);
+        return this.syncAck();
     }
 
+    /**
+     * 同步确认
+     *
+     * @return
+     */
     public Result syncAck() {
-        return null;
+        Map map = new HashMap<>();
+        map.put(SystemConstant.ACK_MESSAGE, System.currentTimeMillis());
+        return ResultUtil.ok(map);
     }
 
-    public Result invigilateLivenessVerify() {
+    /**
+     * 监考强制活体验证
+     *
+     * @param body
+     * @return
+     */
+    public Result invigilateLivenessVerify(String body) {
         return null;
     }
 
-    public Result invigilateNotice() {
+    /**
+     * 监考消息
+     *
+     * @param body
+     * @return
+     */
+    public Result invigilateNotice(String body) {
         return null;
     }
 
+    /**
+     * 监考消息确认
+     *
+     * @return
+     */
     public Result invigilateNoticeAck() {
-        return null;
+        Map map = new HashMap<>();
+        map.put(SystemConstant.ACK_MESSAGE, System.currentTimeMillis());
+        return ResultUtil.ok(map);
     }
 
-    public Result invigilateStopExam() {
+    /**
+     * 监考强制收卷
+     *
+     * @param body
+     * @return
+     */
+    public Result invigilateStopExam(String body) {
         return null;
     }
 }

+ 8 - 0
themis-exam/src/main/resources/application.properties

@@ -167,6 +167,14 @@ mq.config.taskConsumerRoomCodeExportGroup=${mq.config.taskConsumerGroup}-${mq.co
 mq.config.taskTopicExamPaperImportTag=examPaperImport
 mq.config.taskConsumerExamPaperImportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicExamPaperImportTag}
 
+#websocket\u8D85\u65F6\u9000\u51FA\u76D1\u542C
+mq.config.websocketUnNormalTopic=${mq.config.server}-topic-websocketUnNormal
+mq.config.websocketUnNormalConsumerGroup=${mq.config.server}-group-websocketUnNormal
+
+#oe\u8003\u751F\u7AEF
+mq.config.websocketUnNormalTopicOeTag=oe
+mq.config.websocketUnNormalConsumerOeGroup=${mq.config.websocketUnNormalConsumerGroup}-${mq.config.websocketUnNormalTopicOeTag}
+
 #\u963F\u91CC\u4E91OSS\u914D\u7F6E
 aliyun.oss.name=oss-cn-shenzhen.aliyuncs.com
 aliyun.oss.endpoint=http://${aliyun.oss.name}

+ 19 - 11
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketSessionConsumer.java

@@ -71,19 +71,27 @@ public class RocketSessionConsumer implements
                 log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
                 mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-//                log.info(":{}-:{} sessionConsumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
-                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX+mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                    log.info(":{}-:{} 更新db", threadId, threadName);
-                    tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class), mqDto.getTimestamp());
-                    mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                int reconsumeTime = messageExt.getReconsumeTimes();
+                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
+                    //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
+                    mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
                     TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                    tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
                     tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                    redisUtil.delete(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
-                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
                 } else {
-                    log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+                        log.info(":{}-:{} 更新db", threadId, threadName);
+                        tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class), mqDto.getTimestamp());
+                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+                        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                        redisUtil.delete(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                    }
                 }
             }
         } catch (Exception e) {
@@ -91,7 +99,7 @@ public class RocketSessionConsumer implements
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
         } finally {
             if (Objects.nonNull(mqDto)) {
-                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX+mqDto.getId());
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
             }
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功

+ 33 - 25
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketTaskConsumer.java

@@ -73,34 +73,42 @@ public class RocketTaskConsumer {
                     log.info(":{}-:{} task import Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
                     mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                     log.info(":{}-:{} task import Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-//                    log.info(":{}-:{} task import Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
-                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
-                        String tag = mqDto.getTag();
-                        myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
-                            TaskImportTemplete taskImportTemplete = null;
-                            if (tag.contains("examStudentImport")) {
-                                taskImportTemplete = new TaskExamStudentImportTemplete();
-                            } else if (tag.contains("roomCodeImport")) {
-                                taskImportTemplete = new TaskRoomCodeImportTemplete();
-                            } else if (tag.contains("examPaperImport")) {
-                                taskImportTemplete = SpringContextHolder.getBean("taskExamPaperImportTemplete");
-                            }
-                            try {
-                                taskImportTemplete.importTask(map);
-                            } catch (IOException e) {
-                                e.printStackTrace();
-                            }
-                        });
-                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-                        mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
+                    int reconsumeTime = messageExt.getReconsumeTimes();
+                    if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
+                        //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
+                        mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
                         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
                         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                        redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST + mqDto.getId());
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                        redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
                     } else {
-                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                        if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+                            Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
+                            String tag = mqDto.getTag();
+                            myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
+                                TaskImportTemplete taskImportTemplete = null;
+                                if (tag.contains("examStudentImport")) {
+                                    taskImportTemplete = new TaskExamStudentImportTemplete();
+                                } else if (tag.contains("roomCodeImport")) {
+                                    taskImportTemplete = new TaskRoomCodeImportTemplete();
+                                } else if (tag.contains("examPaperImport")) {
+                                    taskImportTemplete = SpringContextHolder.getBean("taskExamPaperImportTemplete");
+                                }
+                                try {
+                                    taskImportTemplete.importTask(map);
+                                } catch (IOException e) {
+                                    e.printStackTrace();
+                                }
+                            });
+                            mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                            mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
+                            TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                            tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                            redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST + mqDto.getId());
+                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                        } else {
+                            log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                        }
                     }
                 }
             } catch (Exception e) {

+ 21 - 12
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketUserLogConsumer.java

@@ -64,9 +64,9 @@ public class RocketUserLogConsumer implements MessageListenerConcurrently {
             String threadName = Thread.currentThread().getName();
             Gson gson = new Gson();
             for (MessageExt messageExt : msgs) {
+                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} logConsumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
 //                MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
-                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
                 log.info(":{}-:{} logConsumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 String tag = mqDto.getTag();
                 Object o = null;
@@ -78,21 +78,30 @@ public class RocketUserLogConsumer implements MessageListenerConcurrently {
                     o = redisUtil.get(SystemConstant.STUDENTLOG_TOPIC_BUFFER_LIST, mqDto.getId());
                     mqTopic = SystemConstant.STUDENTLOG_TOPIC_BUFFER_LIST;
                 }
-                if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(o) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                    log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
-                    if (tag.contains("user")) {
-                        teUserLogService.saveUserLogInfo(mqDto.getTimestamp(), mqDto.getObjId(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
-                    } else if (tag.contains("student")) {
-                        teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
-                    }
-                    mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                int reconsumeTime = messageExt.getReconsumeTimes();
+                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
+                    //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
+                    mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
                     TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
                     tmRocketMessageService.saveOrUpdate(tmRocketMessage);
                     redisUtil.delete(mqTopic, mqDto.getId());
-                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                 } else {
-                    log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(o) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+                        log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
+                        if (tag.contains("user")) {
+                            teUserLogService.saveUserLogInfo(mqDto.getTimestamp(), mqDto.getObjId(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
+                        } else if (tag.contains("student")) {
+                            teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
+                        }
+                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                        redisUtil.delete(mqTopic, mqDto.getId());
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                    }
                 }
             }
         } catch (Exception e) {

+ 141 - 0
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketWebsocketUnNormalLogConsumer.java

@@ -0,0 +1,141 @@
+package com.qmth.themis.mq.listener;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.gson.Gson;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TMRocketMessage;
+import com.qmth.themis.business.enums.MqEnum;
+import com.qmth.themis.business.enums.SystemOperationEnum;
+import com.qmth.themis.business.service.TEExamStudentLogService;
+import com.qmth.themis.business.service.TMRocketMessageService;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.dto.MqDto;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.annotation.SelectorType;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * @Description: 延时消息监听 websocket超时退出
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/2
+ */
+@Service
+public class RocketWebsocketUnNormalLogConsumer implements MessageListenerConcurrently {
+    private final static Logger log = LoggerFactory.getLogger(RocketWebsocketUnNormalLogConsumer.class);
+
+    @Resource
+    RedisUtil redisUtil;
+
+    @Resource
+    TEExamStudentLogService teExamStudentLogService;
+
+    @Resource
+    TMRocketMessageService tmRocketMessageService;
+
+    @Override
+    @Transactional
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        MqDto mqDto = null;
+        try {
+            long threadId = Thread.currentThread().getId();
+            String threadName = Thread.currentThread().getName();
+            Gson gson = new Gson();
+            for (MessageExt messageExt : msgs) {
+                log.info(":{}-:{} websocket un normal logConsumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+                String body = new String(messageExt.getBody(), Constants.CHARSET_NAME);
+                log.info("body:{}", body);
+                JSONObject jsonObject = JSONObject.parseObject(body);
+                Map properties = (Map) jsonObject.get("properties");
+                mqDto = JSONObject.toJavaObject(JSONObject.parseObject(String.valueOf(properties.get(SystemConstant.MQDTO_OBJ))), MqDto.class);
+                int reconsumeTime = messageExt.getReconsumeTimes();
+                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
+                    //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
+                    mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
+                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                    tmRocketMessage.setProp(JacksonUtil.parseJson(tmRocketMessage.getProperties()));
+                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                    redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
+                } else {
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(SystemConstant.WEBSOCKET_UN_NORMAL_LOG_TOPIC_BUFFER_LIST) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+                        log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
+                        teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
+                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                        tmRocketMessage.setProp(JacksonUtil.parseJson(tmRocketMessage.getProperties()));
+                        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                        redisUtil.delete(SystemConstant.WEBSOCKET_UN_NORMAL_LOG_TOPIC_BUFFER_LIST, mqDto.getId());
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+        } finally {
+            if (Objects.nonNull(mqDto)) {
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+            }
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+    }
+
+    @Service
+    @RocketMQMessageListener(consumerGroup = "${mq.config.websocketUnNormalConsumerOeGroup}", topic = "${mq.config.websocketUnNormalTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.websocketUnNormalTopicOeTag}")
+    public class sessionConsumerOeLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+
+        @Override
+        public void onMessage(Message message) {
+            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
+        }
+
+        @Override
+        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+            defaultMQPushConsumer.registerMessageListener(RocketWebsocketUnNormalLogConsumer.this::consumeMessage);
+        }
+    }
+
+//    @Service
+//    @RocketMQMessageListener(consumerGroup = "${mq.config.userLogConsumerStudentGroup}", topic = "${mq.config.userLogTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.userLogTopicStudentTag}")
+//    public class sessionConsumerStudentLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
+//
+//        @Override
+//        public void onMessage(Message message) {
+//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
+//        }
+//
+//        @Override
+//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
+//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
+//            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
+//            defaultMQPushConsumer.registerMessageListener(RocketWebsocketOeLogConsumer.this::consumeMessage);
+//        }
+//    }
+}

+ 20 - 0
themis-mq/src/main/java/com/qmth/themis/mq/service/MqDtoService.java

@@ -3,6 +3,8 @@ package com.qmth.themis.mq.service;
 
 import com.qmth.themis.mq.dto.MqDto;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * @Description: mqdto 服务类
  * @Param:
@@ -27,4 +29,22 @@ public interface MqDtoService {
      * @return
      */
     public MqDto assembleSendOneWayMsg(MqDto mqDto);
+
+    /**
+     * 组装异步延时消息
+     *
+     * @param mqDto
+     * @param timeOut
+     * @return
+     */
+    public MqDto assembleSendAsyncDelayMsg(MqDto mqDto, int timeOut);
+
+    /**
+     * 组装同步延时消息
+     *
+     * @param mqDto
+     * @param timeOut
+     * @return
+     */
+    public MqDto assembleSendSyncDelayMsg(MqDto mqDto, int timeOut);
 }

+ 18 - 0
themis-mq/src/main/java/com/qmth/themis/mq/service/ProducerServer.java

@@ -28,6 +28,15 @@ public interface ProducerServer {
      */
     public Result syncOrderlyMsg(MqDto mqDto);
 
+    /**
+     * 同步延时消息
+     *
+     * @param mqDto
+     * @param timeOut
+     * @return
+     */
+    public Result syncDelayMsg(MqDto mqDto, int timeOut);
+
     /**
      * 异步消息
      *
@@ -44,6 +53,15 @@ public interface ProducerServer {
      */
     public Result asyncOrderlyMsg(MqDto mqDto);
 
+    /**
+     * 异步延时消息
+     *
+     * @param mqDto
+     * @param timeOut
+     * @return
+     */
+    public Result asyncDelayMsg(MqDto mqDto, int timeOut);
+
     /**
      * 单向消息
      *

+ 57 - 1
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqDtoServiceImpl.java

@@ -9,6 +9,8 @@ import com.qmth.themis.mq.service.ProducerServer;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -89,6 +91,58 @@ public class MqDtoServiceImpl implements MqDtoService {
         return null;
     }
 
+    /**
+     * 组装异步延时消息
+     *
+     * @param mqDto
+     * @param timeOut
+     * @return
+     */
+    @Override
+    public MqDto assembleSendAsyncDelayMsg(MqDto mqDto, int timeOut) {
+        mqDto.setAck(SystemConstant.DELIVERED_ACK_TYPE);
+        try {
+            Map<String, Object> map = new HashMap<>();
+            map.put("timeOut", timeOut);
+            mqDto.setProperties(map);
+            producerServer.asyncDelayMsg(mqDto, timeOut);
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (Objects.nonNull(mqDto)) {
+                mqDto.setAck(SystemConstant.UNSEND_ACK_TYPE);
+            }
+        } finally {
+            setTopic(mqDto);
+        }
+        return null;
+    }
+
+    /**
+     * 组装同步延时消息
+     *
+     * @param mqDto
+     * @param timeOut
+     * @return
+     */
+    @Override
+    public MqDto assembleSendSyncDelayMsg(MqDto mqDto, int timeOut) {
+        mqDto.setAck(SystemConstant.DELIVERED_ACK_TYPE);
+        try {
+            Map<String, Object> map = new HashMap<>();
+            map.put("timeOut", timeOut);
+            mqDto.setProperties(map);
+            producerServer.syncDelayMsg(mqDto, timeOut);
+        } catch (Exception e) {
+            e.printStackTrace();
+            if (Objects.nonNull(mqDto)) {
+                mqDto.setAck(SystemConstant.UNSEND_ACK_TYPE);
+            }
+        } finally {
+            setTopic(mqDto);
+        }
+        return null;
+    }
+
     void setTopic(MqDto mqDto) {
         if (Objects.nonNull(mqDto)) {
             switch (mqDto.getType().ordinal()) {
@@ -101,8 +155,10 @@ public class MqDtoServiceImpl implements MqDtoService {
                 case 5:
                     redisUtil.set(SystemConstant.USERLOG_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
                     break;
+                case 7:
+                    redisUtil.set(SystemConstant.WEBSOCKET_UN_NORMAL_LOG_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
+                    break;
                 default:
-                    redisUtil.set(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
                     break;
             }
         }

+ 43 - 1
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/ProducerServerImpl.java

@@ -63,6 +63,21 @@ public class ProducerServerImpl implements ProducerServer {
         return ResultUtil.ok(SystemConstant.SUCCESS);
     }
 
+    /**
+     * 同步延时消息
+     *
+     * @param mqDto
+     * @return
+     */
+    @Override
+    public Result syncDelayMsg(MqDto mqDto, int timeOut) {
+        log.info("syncDelayMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
+        Message message = assembleMessage(mqDto);
+        org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
+        rocketMQTemplate.syncSend(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, SystemConstant.MESSAGE_TIMEOUT, timeOut);
+        return ResultUtil.ok(SystemConstant.SUCCESS);
+    }
+
     /**
      * 异步消息
      *
@@ -113,6 +128,33 @@ public class ProducerServerImpl implements ProducerServer {
         return ResultUtil.ok(SystemConstant.SUCCESS);
     }
 
+    /**
+     * 异步延时消息
+     *
+     * @param mqDto
+     * @return
+     */
+    @Override
+    public Result asyncDelayMsg(MqDto mqDto, int timeOut) {
+        log.info("asyncDelayMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
+        Message message = assembleMessage(mqDto);
+        org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
+        rocketMQTemplate.asyncSend(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                // 成功回调
+                log.info("sendResult:{}", JacksonUtil.parseJson(sendResult));
+            }
+
+            @Override
+            public void onException(Throwable throwable) {
+                // 失败回调
+                throw new BusinessException(throwable.getMessage());
+            }
+        }, SystemConstant.MESSAGE_TIMEOUT, timeOut);
+        return ResultUtil.ok(SystemConstant.SUCCESS);
+    }
+
     /**
      * 单向消息
      *
@@ -167,7 +209,7 @@ public class ProducerServerImpl implements ProducerServer {
         message.setTopic(mqDto.getTopic());
         message.setBody(String.valueOf(mqDto.getBody()).getBytes());
         message.setTags(mqDto.getTag());
-        message.putUserProperty(SystemConstant.OBJ_ID, String.valueOf(mqDto.getObjId()));
+        message.putUserProperty(SystemConstant.MQDTO_OBJ, JacksonUtil.parseJson(mqDto));
         return message;
     }
 }