Explorar el Código

考生端websocket

wangliang hace 5 años
padre
commit
ed2f1ce569

+ 13 - 0
themis-business/src/main/java/com/qmth/themis/business/entity/TOeExamBreakHistory.java

@@ -2,6 +2,7 @@ package com.qmth.themis.business.entity;
 
 import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
+import com.qmth.themis.common.contanst.Constants;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 
@@ -55,6 +56,18 @@ public class TOeExamBreakHistory implements Serializable {
     @TableField(value = "entry_authentication_id")
     private Long entryAuthenticationId;
 
+    public TOeExamBreakHistory() {
+
+    }
+
+    public TOeExamBreakHistory(Long examRecordId, Date breakTime, Integer breakReason, String resumeReason) {
+        this.id = Constants.idGen.next();
+        this.examRecordId = examRecordId;
+        this.breakTime = breakTime;
+        this.breakReason = breakReason;
+        this.resumeReason = resumeReason;
+    }
+
     public static long getSerialVersionUID() {
         return serialVersionUID;
     }

+ 41 - 0
themis-business/src/main/java/com/qmth/themis/business/enums/ExamRecordStatusEnum.java

@@ -0,0 +1,41 @@
+package com.qmth.themis.business.enums;
+
+/**
+* @Description: 考试记录 enum
+* @Param:
+* @return:
+* @Author: wangliang
+* @Date: 2020/7/25
+*/
+public enum ExamRecordStatusEnum {
+
+    /**
+     * 首次候考
+     */
+    first_prepare,
+
+    /**
+     * 正在答题
+     */
+    answering,
+
+    /**
+     * 已中断
+     */
+    break_off,
+
+    /**
+     * 断点恢复候考
+     */
+    resume_prepare,
+
+    /**
+     * 已结束考试
+     */
+    finished,
+
+    /**
+     * 数据已保存
+     */
+    persisted;
+}

+ 21 - 0
themis-business/src/main/java/com/qmth/themis/business/enums/WebsocketExceptionEnum.java

@@ -0,0 +1,21 @@
+package com.qmth.themis.business.enums;
+
+/** 
+* @Description: websocket异常enum
+* @Param:  
+* @return:  
+* @Author: wangliang
+* @Date: 2020/7/25 
+*/ 
+public enum WebsocketExceptionEnum {
+
+    /**
+     * 网络超时
+     */
+    NET_TIME_OUT,
+
+    /**
+     * 机器故障
+     */
+    MACHINE_FAULT;
+}

+ 21 - 0
themis-business/src/main/java/com/qmth/themis/business/enums/WebsocketStatusEnum.java

@@ -0,0 +1,21 @@
+package com.qmth.themis.business.enums;
+
+/**
+* @Description: websocket 通讯状态
+* @Param:
+* @return:
+* @Author: wangliang
+* @Date: 2020/7/25
+*/
+public enum WebsocketStatusEnum {
+
+    /**
+     * 在线
+     */
+    ONLINE,
+
+    /**
+     * 离线
+     */
+    UN_ONLINE;
+}

+ 14 - 6
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketServer.java

@@ -6,6 +6,7 @@ import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.entity.TBSession;
 import com.qmth.themis.business.enums.MqEnum;
 import com.qmth.themis.business.enums.SystemOperationEnum;
+import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.enums.ExceptionResultEnum;
 import com.qmth.themis.common.exception.BusinessException;
@@ -26,10 +27,7 @@ import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
 import java.lang.reflect.Method;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -58,6 +56,7 @@ public class WebSocketServer
     private Long time = null;
     private RedisUtil redisUtil;
     private Long updateTime = null;
+    private Map<String, Object> tranMap;
 
     /**
      * 连接建立成功调用的方法
@@ -75,6 +74,7 @@ public class WebSocketServer
         if (Objects.isNull(mapParameter.get("deviceId")) || mapParameter.get("deviceId").size() == 0) {
             throw new BusinessException(ExceptionResultEnum.DEVICE_ID_INVALID);
         }
+        log.info("uri:{}", session.getRequestURI());
         this.deviceId = String.valueOf(mapParameter.get("deviceId").get(0));
         this.Authorization = String.valueOf(mapParameter.get("Authorization").get(0));
         this.time = Long.parseLong(String.valueOf(mapParameter.get("time").get(0)));
@@ -104,9 +104,16 @@ public class WebSocketServer
                         addOnlineCount();
                         //在线数加1
                     }
+                    redisUtil.delete(SystemConstant.WEBSOCKET_UN_NORMAL_LIST, this.sessionId);
+                    //发送恢复网络mq消息
                     log.info("用户连接:" + this.sessionId + ",当前在线人数为:" + getOnlineCount());
                     try {
                         this.sendMessage("连接成功");
+                        tranMap = new HashMap<>();
+                        tranMap.put("recordId", this.recordId);
+                        tranMap.put("deviceId", this.deviceId);
+                        tranMap.put("ip", "");
+                        tranMap.put("updateTime", this.updateTime);
                     } catch (IOException e) {
                         e.printStackTrace();
                         log.error("用户:" + this.sessionId + ",网络异常!!!!!!");
@@ -138,8 +145,9 @@ public class WebSocketServer
                 //发送延时mq消息start
                 MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
                 DictionaryConfig dictionaryConfig = SpringContextHolder.getBean(DictionaryConfig.class);
-                MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getWebsocketUnNormalTopic(), dictionaryConfig.mqConfigDomain().getWebsocketUnNormalTopicOeTag(), SystemOperationEnum.OE_NET_UN_NORMAL, MqEnum.WEBSOCKET_UN_NORMAL_LOG, String.valueOf(this.recordId), this.sessionId);
-                mqDtoService.assembleSendAsyncDelayMsg(mqDto, SystemConstant.mqDelayLevel.get("10s"));
+                tranMap.put("timeOut", SystemConstant.mqDelayLevel.get("10s"));
+                MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getWebsocketUnNormalTopic(), dictionaryConfig.mqConfigDomain().getWebsocketUnNormalTopicOeTag(), SystemOperationEnum.OE_NET_UN_NORMAL, MqEnum.WEBSOCKET_UN_NORMAL_LOG, String.valueOf(this.recordId), this.tranMap, this.sessionId);
+                mqDtoService.assembleSendAsyncDelayMsg(mqDto);
                 //发送延时mq消息end
             }
         }

+ 1 - 1
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketSessionConsumer.java

@@ -77,7 +77,7 @@ public class RocketSessionConsumer implements
                     mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
                     TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
                     tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                    redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
+                    redisUtil.delete(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                         log.info(":{}-:{} 更新db", threadId, threadName);

+ 50 - 13
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketWebsocketUnNormalLogConsumer.java

@@ -4,10 +4,13 @@ import com.alibaba.fastjson.JSONObject;
 import com.google.gson.Gson;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.entity.TMRocketMessage;
-import com.qmth.themis.business.enums.MqEnum;
-import com.qmth.themis.business.enums.SystemOperationEnum;
+import com.qmth.themis.business.entity.TOeExamBreakHistory;
+import com.qmth.themis.business.entity.TOeExamRecord;
+import com.qmth.themis.business.enums.*;
 import com.qmth.themis.business.service.TEExamStudentLogService;
 import com.qmth.themis.business.service.TMRocketMessageService;
+import com.qmth.themis.business.service.TOeExamBreakHistoryService;
+import com.qmth.themis.business.service.TOeExamRecordService;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.contanst.Constants;
@@ -29,6 +32,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Resource;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -53,6 +57,12 @@ public class RocketWebsocketUnNormalLogConsumer implements MessageListenerConcur
     @Resource
     TMRocketMessageService tmRocketMessageService;
 
+    @Resource
+    TOeExamRecordService tOeExamRecordService;
+
+    @Resource
+    TOeExamBreakHistoryService tOeExamBreakHistoryService;
+
     @Override
     @Transactional
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
@@ -75,21 +85,48 @@ public class RocketWebsocketUnNormalLogConsumer implements MessageListenerConcur
                     TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
                     tmRocketMessage.setProp(JacksonUtil.parseJson(tmRocketMessage.getProperties()));
                     tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                    redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
+                    redisUtil.delete(SystemConstant.WEBSOCKET_UN_NORMAL_LOG_TOPIC_BUFFER_LIST, mqDto.getId());
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(SystemConstant.WEBSOCKET_UN_NORMAL_LOG_TOPIC_BUFFER_LIST) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
-                        teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
-                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-                        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                        tmRocketMessage.setProp(JacksonUtil.parseJson(tmRocketMessage.getProperties()));
-                        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                        redisUtil.delete(SystemConstant.WEBSOCKET_UN_NORMAL_LOG_TOPIC_BUFFER_LIST, mqDto.getId());
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);//表示成功处理
+                        Map<String, Object> tranMap = mqDto.getProperties();
+                        Long recordId = Long.parseLong(String.valueOf(tranMap.get("recordId")));
+                        String deviceId = String.valueOf(tranMap.get("deviceId"));
+                        String ip = String.valueOf(tranMap.get("ip"));
+                        Long updateTime = Long.parseLong(String.valueOf(tranMap.get("updateTime")));
+                        Date date = new Date();
+                        date.setTime(updateTime);
+                        TOeExamRecord tOeExamRecord = tOeExamRecordService.getById(recordId);
+                        Integer breakCount = tOeExamRecord.getLeftBreakResumeCount();
+                        if (Objects.isNull(breakCount) || breakCount == 0) {
+                            //todo 没有断点次数,则强制交卷
+                            tOeExamRecord.setStatus(ExamRecordStatusEnum.finished.ordinal());
+                            tOeExamRecordService.updateById(tOeExamRecord);
+                        } else {
+                            breakCount--;
+                            //增加断点记录
+                            TOeExamBreakHistory tOeExamBreakHistory = new TOeExamBreakHistory(recordId, new Date(), WebsocketExceptionEnum.NET_TIME_OUT.ordinal(), WebsocketExceptionEnum.NET_TIME_OUT.name());
+                            tOeExamBreakHistoryService.save(tOeExamBreakHistory);
+                            //更新考试记录状态
+                            tOeExamRecord.setClientCurrentIp(ip);
+                            tOeExamRecord.setClientWebsocketId(deviceId);
+                            tOeExamRecord.setClientWebsocketStatus(WebsocketStatusEnum.UN_ONLINE.ordinal());
+                            tOeExamRecord.setClientLastSyncTime(date);
+                            tOeExamRecord.setLastBreakId(tOeExamBreakHistory.getId());
+                            tOeExamRecord.setLastBreakTime(tOeExamBreakHistory.getBreakTime());
+                            tOeExamRecord.setLeftBreakResumeCount(breakCount);
+                            tOeExamRecord.setStatus(ExamRecordStatusEnum.break_off.ordinal());
+                            tOeExamRecordService.updateById(tOeExamRecord);
+                        }
                     } else {
-                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                        mqDto.setAck(SystemConstant.INDIVIDUAL_ACK_TYPE);//表示考生已重新登录
                     }
+                    log.info(":{}-:{} 插入用户轨迹日志", threadId, threadName);
+                    teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
+                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                    tmRocketMessage.setProp(JacksonUtil.parseJson(tmRocketMessage.getProperties()));
+                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                    redisUtil.delete(SystemConstant.WEBSOCKET_UN_NORMAL_LOG_TOPIC_BUFFER_LIST, mqDto.getId());
                 }
             }
         } catch (Exception e) {

+ 2 - 6
themis-mq/src/main/java/com/qmth/themis/mq/service/MqDtoService.java

@@ -3,8 +3,6 @@ package com.qmth.themis.mq.service;
 
 import com.qmth.themis.mq.dto.MqDto;
 
-import java.util.concurrent.TimeUnit;
-
 /**
  * @Description: mqdto 服务类
  * @Param:
@@ -34,17 +32,15 @@ public interface MqDtoService {
      * 组装异步延时消息
      *
      * @param mqDto
-     * @param timeOut
      * @return
      */
-    public MqDto assembleSendAsyncDelayMsg(MqDto mqDto, int timeOut);
+    public MqDto assembleSendAsyncDelayMsg(MqDto mqDto);
 
     /**
      * 组装同步延时消息
      *
      * @param mqDto
-     * @param timeOut
      * @return
      */
-    public MqDto assembleSendSyncDelayMsg(MqDto mqDto, int timeOut);
+    public MqDto assembleSendSyncDelayMsg(MqDto mqDto);
 }

+ 2 - 4
themis-mq/src/main/java/com/qmth/themis/mq/service/ProducerServer.java

@@ -32,10 +32,9 @@ public interface ProducerServer {
      * 同步延时消息
      *
      * @param mqDto
-     * @param timeOut
      * @return
      */
-    public Result syncDelayMsg(MqDto mqDto, int timeOut);
+    public Result syncDelayMsg(MqDto mqDto);
 
     /**
      * 异步消息
@@ -57,10 +56,9 @@ public interface ProducerServer {
      * 异步延时消息
      *
      * @param mqDto
-     * @param timeOut
      * @return
      */
-    public Result asyncDelayMsg(MqDto mqDto, int timeOut);
+    public Result asyncDelayMsg(MqDto mqDto);
 
     /**
      * 单向消息

+ 4 - 14
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqDtoServiceImpl.java

@@ -9,8 +9,6 @@ import com.qmth.themis.mq.service.ProducerServer;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -95,17 +93,13 @@ public class MqDtoServiceImpl implements MqDtoService {
      * 组装异步延时消息
      *
      * @param mqDto
-     * @param timeOut
      * @return
      */
     @Override
-    public MqDto assembleSendAsyncDelayMsg(MqDto mqDto, int timeOut) {
+    public MqDto assembleSendAsyncDelayMsg(MqDto mqDto) {
         mqDto.setAck(SystemConstant.DELIVERED_ACK_TYPE);
         try {
-            Map<String, Object> map = new HashMap<>();
-            map.put("timeOut", timeOut);
-            mqDto.setProperties(map);
-            producerServer.asyncDelayMsg(mqDto, timeOut);
+            producerServer.asyncDelayMsg(mqDto);
         } catch (Exception e) {
             e.printStackTrace();
             if (Objects.nonNull(mqDto)) {
@@ -121,17 +115,13 @@ public class MqDtoServiceImpl implements MqDtoService {
      * 组装同步延时消息
      *
      * @param mqDto
-     * @param timeOut
      * @return
      */
     @Override
-    public MqDto assembleSendSyncDelayMsg(MqDto mqDto, int timeOut) {
+    public MqDto assembleSendSyncDelayMsg(MqDto mqDto) {
         mqDto.setAck(SystemConstant.DELIVERED_ACK_TYPE);
         try {
-            Map<String, Object> map = new HashMap<>();
-            map.put("timeOut", timeOut);
-            mqDto.setProperties(map);
-            producerServer.syncDelayMsg(mqDto, timeOut);
+            producerServer.syncDelayMsg(mqDto);
         } catch (Exception e) {
             e.printStackTrace();
             if (Objects.nonNull(mqDto)) {

+ 4 - 4
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/ProducerServerImpl.java

@@ -70,11 +70,11 @@ public class ProducerServerImpl implements ProducerServer {
      * @return
      */
     @Override
-    public Result syncDelayMsg(MqDto mqDto, int timeOut) {
+    public Result syncDelayMsg(MqDto mqDto) {
         log.info("syncDelayMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
         Message message = assembleMessage(mqDto);
         org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
-        rocketMQTemplate.syncSend(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, SystemConstant.MESSAGE_TIMEOUT, timeOut);
+        rocketMQTemplate.syncSend(mqDto.getTopic() + ":" + mqDto.getTag(), messageTran, SystemConstant.MESSAGE_TIMEOUT, Integer.parseInt(String.valueOf(mqDto.getProperties().get("timeOut"))));
         return ResultUtil.ok(SystemConstant.SUCCESS);
     }
 
@@ -135,7 +135,7 @@ public class ProducerServerImpl implements ProducerServer {
      * @return
      */
     @Override
-    public Result asyncDelayMsg(MqDto mqDto, int timeOut) {
+    public Result asyncDelayMsg(MqDto mqDto) {
         log.info("asyncDelayMsg mqDto:{}", JacksonUtil.parseJson(mqDto));
         Message message = assembleMessage(mqDto);
         org.springframework.messaging.Message messageTran = MessageBuilder.withPayload(message).build();
@@ -151,7 +151,7 @@ public class ProducerServerImpl implements ProducerServer {
                 // 失败回调
                 throw new BusinessException(throwable.getMessage());
             }
-        }, SystemConstant.MESSAGE_TIMEOUT, timeOut);
+        }, SystemConstant.MESSAGE_TIMEOUT, Integer.parseInt(String.valueOf(mqDto.getProperties().get("timeOut"))));
         return ResultUtil.ok(SystemConstant.SUCCESS);
     }