wangliang 4 years ago
parent
commit
265f1cd234

+ 2 - 1
themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java

@@ -128,7 +128,8 @@ public class SystemConstant {
      */
     public static final String WEBSOCKET_OE_ONLINE_COUNT = "websocket:oe:online:count";
     public static final String GET = "get";
-    public static final long WEBSOCKET_MAX_TIME_OUT = 3 * 60 * 1000;
+//    public static final long WEBSOCKET_MAX_TIME_OUT = 3 * 60 * 1000;
+    public static final long WEBSOCKET_MAX_TIME_OUT = 1 * 60 * 1000;
     //        public static final long WEBSOCKET_MAX_TIME_OUT = 10 * 1000;
     public static final String ACK_MESSAGE = "ackMessage";
     /**

+ 3 - 41
themis-business/src/main/java/com/qmth/themis/business/enums/MqEnum.java

@@ -11,56 +11,18 @@ import java.util.Objects;
  */
 public enum MqEnum {
 
-    /**
-     * 用户session记录
-     */
     SESSION(0, "用户session记录"),
-
-    /**
-     * 考生轨迹
-     */
     EXAM_STUDENT_LOG(1, "考生轨迹"),
-    /**
-     * 预警日志
-     */
     WARMING_LOG(2, "预警日志"),
-
-    /**
-     * 异常日志
-     */
     EXCEPTION_LOG(3, "异常日志"),
-
-    /**
-     * 消息日志
-     */
     MESSAGE_LOG(4, "消息日志"),
-
-    /**
-     * 用户轨迹
-     */
     USER_LOG(5, "用户轨迹"),
-
-    /**
-     * 任务
-     */
     TASK_LOG(6, "任务"),
-
-    /**
-     * quartz
-     */
     QUARTZ_LOG(7, "quartz任务"),
-    
     EXAM(8, "考生端消息"),
-
-    /**
-     * websocket强行交卷
-     */
-    WEBSOCKET_OFFLINE_LOG(8, "websocket强行离线(交卷)"),
-
-    /**
-     * websocket im
-     */
-    WEBSOCKET_IM_LOG(9, "websocket发送消息"),
+    WEBSOCKET_OFFLINE_LOG(9, "websocket强行离线(交卷)"),
+    WEBSOCKET_IM_CLUSTERING_LOG(10, "websocket点对点发送消息"),
+    WEBSOCKET_IM_BROADCASTING_LOG(11, "websocket广播发送消息"),
 
     /**
      * websocket超时退出

+ 8 - 10
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java

@@ -166,10 +166,12 @@ public class WebSocketOeServer implements Concurrently {
                 log.info("超时退出");
                 //发送延时mq消息start
                 MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
-                String level = "2m";
+//                String level = "2m";
+                String level = "30s";
                 Integer time = SystemConstant.mqDelayLevel.get(level);
                 LocalDateTime dt = LocalDateTime.now();
-                dt = dt.plusMinutes(Long.parseLong(level.replace("m", "")));
+//                dt = dt.plusMinutes(Long.parseLong(level.replace("m", "")));
+                dt = dt.plusMinutes(Long.parseLong(level.replace("s", "")));
                 tranMap.put("timeOut", time);
                 tranMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
                 MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.unNormal.name(), SystemOperationEnum.OE_NET_UN_NORMAL, MqEnum.WEBSOCKET_UN_NORMAL_LOG, String.valueOf(this.recordId), this.tranMap, this.sessionId);
@@ -311,16 +313,12 @@ public class WebSocketOeServer implements Concurrently {
                     redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        String body = new String(messageExt.getBody(), Constants.CHARSET_NAME);
-                        log.info("body:{}", body);
-                        JSONObject jsonObject = JSONObject.parseObject(body);
-                        Map properties = (Map) jsonObject.get("properties");
-                        String oper = String.valueOf(properties.get("oper"));
-                        if (oper.contains("offLine")) {//下线
+                        MqEnum mqEnum = mqDto.getType();
+                        if (MqEnum.WEBSOCKET_OFFLINE_LOG.ordinal() == mqEnum.ordinal()) {//下线
 
-                        } else if (oper.contains(MessageModel.CLUSTERING.name())) {//点对点消息
+                        } else if (MqEnum.WEBSOCKET_IM_CLUSTERING_LOG.ordinal() == mqEnum.ordinal()) {//点对点消息
 
-                        } else if (oper.contains(MessageModel.BROADCASTING.name())) {//广播消息
+                        } else if (MqEnum.WEBSOCKET_IM_BROADCASTING_LOG.ordinal() == mqEnum.ordinal()) {//广播消息
 
                         }
                     }else {

+ 1 - 1
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqDtoServiceImpl.java

@@ -161,7 +161,7 @@ public class MqDtoServiceImpl implements MqDtoService {
      */
     void setTopicBuffer(MqDto mqDto) {
         if (Objects.nonNull(mqDto)) {
-            if (mqDto.getType().ordinal() < 100) {
+            if (mqDto.getType().getId() < 100) {
                 redisUtil.set(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);
             } else {
                 redisUtil.set(SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST, mqDto.getId(), mqDto);

+ 25 - 24
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -27,6 +27,7 @@ import org.springframework.transaction.annotation.Transactional;
 import javax.annotation.Resource;
 import java.io.IOException;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -66,9 +67,9 @@ public class MqLogicServiceImpl implements MqLogicService {
 
     @Resource
     TOeExamRecordService examRecordService;
-    
+
     @Resource
-	TOeFaceVerifyHistoryService faceVerifyHistoryService;
+    TOeFaceVerifyHistoryService faceVerifyHistoryService;
 
     /**
      * mq最大重试次数逻辑
@@ -187,7 +188,7 @@ public class MqLogicServiceImpl implements MqLogicService {
         Map<String, Object> tranMap = mqDto.getProperties();
         Long recordId = Long.parseLong(String.valueOf(tranMap.get("recordId")));
         Date clientLastSyncTime = ExamRecordCacheUtil.getClientLastSyncTime(recordId);
-        if ((System.currentTimeMillis() - clientLastSyncTime.getTime()) / 1000 / 60 >= 2) {//大于等于当前时间,说明未重连或重登录
+        if (Objects.nonNull(clientLastSyncTime) && (System.currentTimeMillis() - clientLastSyncTime.getTime()) / 1000 / 60 >= 2) {//大于等于当前时间,说明未重连或重登录
             String deviceId = String.valueOf(tranMap.get("deviceId"));
             String ip = String.valueOf(tranMap.get("ip"));
             Long updateTime = Long.parseLong(String.valueOf(tranMap.get("updateTime")));
@@ -199,8 +200,6 @@ public class MqLogicServiceImpl implements MqLogicService {
                 //todo 没有断点次数,则强制交卷
                 tOeExamRecord.setStatus(ExamRecordStatusEnum.finished);
                 tOeExamRecordService.updateById(tOeExamRecord);
-
-                //加入踢下线mq
             } else {
                 breakCount--;
                 //增加断点记录
@@ -220,9 +219,11 @@ public class MqLogicServiceImpl implements MqLogicService {
         }
         teExamStudentLogService.saveStudentLogInfo(mqDto.getTimestamp(), MqEnum.valueOf(String.valueOf(mqDto.getType())).getId(), SystemOperationEnum.valueOf(String.valueOf(mqDto.getBody())).getCode(), JacksonUtil.parseJson(mqDto));
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-        tmRocketMessage.setProp(JacksonUtil.parseJson(tmRocketMessage.getProperties()));
+        Map map = new HashMap();
+        map.put(SystemConstant.MQDTO_OBJ, JacksonUtil.parseJson(mqDto));
+        tmRocketMessage.setProp(JacksonUtil.parseJson(map));
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-        redisUtil.delete(key, mqDto.getId());
+//        redisUtil.delete(key, mqDto.getId());
     }
 
     /**
@@ -244,28 +245,28 @@ public class MqLogicServiceImpl implements MqLogicService {
         redisUtil.delete(key, mqDto.getId());
     }
 
-	/**
-	 *人脸验证保存
-	 */
-	@Override
-	@Transactional
-	public void execMqFaceVerifySaveLogic(MqDto mqDto, String key) {
-		Gson gson = new Gson();
+    /**
+     * 人脸验证保存
+     */
+    @Override
+    @Transactional
+    public void execMqFaceVerifySaveLogic(MqDto mqDto, String key) {
+        Gson gson = new Gson();
         Map<String, Object> param = (Map<String, Object>) mqDto.getBody();
-        Long id=(Long)param.get("id");
-        Long recordId=(Long)param.get("recordId");
-        String type=(String)param.get("type");
-        String photoUrl=(String)param.get("photoUrl");
-        Integer faceCount=(Integer)param.get("faceCount");
-        Double similarity=(Double)param.get("similarity");
-        Double realness=(Double)param.get("realness");
-        Long time=(Long)param.get("time");
-        String exception=(String)param.get("exception");
+        Long id = (Long) param.get("id");
+        Long recordId = (Long) param.get("recordId");
+        String type = (String) param.get("type");
+        String photoUrl = (String) param.get("photoUrl");
+        Integer faceCount = (Integer) param.get("faceCount");
+        Double similarity = (Double) param.get("similarity");
+        Double realness = (Double) param.get("realness");
+        Long time = (Long) param.get("time");
+        String exception = (String) param.get("exception");
         faceVerifyHistoryService.save(id, recordId, type, photoUrl, faceCount, similarity, realness, time, exception);
         mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
         tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
-	}
+    }
 }

+ 10 - 11
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/CalculateObjectiveScoreConcurrentlyImpl.java

@@ -1,15 +1,5 @@
 package com.qmth.themis.mq.templete.impl;
 
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.util.JacksonUtil;
@@ -18,6 +8,15 @@ import com.qmth.themis.common.contanst.Constants;
 import com.qmth.themis.mq.dto.MqDto;
 import com.qmth.themis.mq.service.MqLogicService;
 import com.qmth.themis.mq.templete.Concurrently;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Objects;
 
 /**
  * 计算客观分
@@ -63,7 +62,7 @@ public class CalculateObjectiveScoreConcurrentlyImpl implements Concurrently {
                 }
             }
         } catch (Exception e) {
-        	log.error("计算客观分,消息消费出错",e);
+            log.error("计算客观分,消息消费出错", e);
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
         } finally {
             if (Objects.nonNull(mqDto)) {

+ 2 - 2
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/SessionConcurrentlyImpl.java

@@ -42,9 +42,9 @@ public class SessionConcurrentlyImpl implements Concurrently {
             long threadId = Thread.currentThread().getId();
             String threadName = Thread.currentThread().getName();
             for (MessageExt messageExt : msgs) {
-                log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+                log.info(":{}-:{} session Consumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
                 mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-                log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+                log.info(":{}-:{} session Consumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
                     mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);

+ 2 - 2
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/UserLogConcurrentlyImpl.java

@@ -39,8 +39,8 @@ public class UserLogConcurrentlyImpl implements Concurrently {
             String threadName = Thread.currentThread().getName();
             for (MessageExt messageExt : msgs) {
                 mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-                log.info(":{}-:{} logConsumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
-                log.info(":{}-:{} logConsumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+                log.info(":{}-:{} log Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+                log.info(":{}-:{} log Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
                     mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);

+ 3 - 2
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/WebsocketUnNormalConcurrentlyImpl.java

@@ -3,6 +3,7 @@ package com.qmth.themis.mq.templete.impl;
 import com.alibaba.fastjson.JSONObject;
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.contanst.Constants;
 import com.qmth.themis.mq.dto.MqDto;
@@ -14,7 +15,6 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
 import java.util.Map;
@@ -40,12 +40,13 @@ public class WebsocketUnNormalConcurrentlyImpl implements Concurrently {
             long threadId = Thread.currentThread().getId();
             String threadName = Thread.currentThread().getName();
             for (MessageExt messageExt : msgs) {
-                log.info(":{}-:{} websocket un normal logConsumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+                log.info(":{}-:{} websocket unnormal Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
                 String body = new String(messageExt.getBody(), Constants.CHARSET_NAME);
                 log.info("body:{}", body);
                 JSONObject jsonObject = JSONObject.parseObject(body);
                 Map properties = (Map) jsonObject.get("properties");
                 mqDto = JSONObject.toJavaObject(JSONObject.parseObject(String.valueOf(properties.get(SystemConstant.MQDTO_OBJ))), MqDto.class);
+                log.info(":{}-:{} websocket unnormal Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
                     //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员

+ 13 - 1
themis-task/src/main/java/com/qmth/themis/task/quartz/service/impl/QuartzLogicServiceImpl.java

@@ -6,9 +6,13 @@ import com.qmth.themis.business.entity.TEExamStudent;
 import com.qmth.themis.business.entity.TOeExamRecord;
 import com.qmth.themis.business.enums.ExamRecordStatusEnum;
 import com.qmth.themis.business.enums.FinishTypeEnum;
+import com.qmth.themis.business.enums.MqEnum;
 import com.qmth.themis.business.service.TEExamActivityService;
 import com.qmth.themis.business.service.TEExamStudentService;
 import com.qmth.themis.business.service.TOeExamRecordService;
+import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.enums.MqTagEnum;
+import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import com.qmth.themis.task.quartz.service.QuartzLogicService;
 import org.slf4j.Logger;
@@ -64,6 +68,7 @@ public class QuartzLogicServiceImpl implements QuartzLogicService {
             tOeExamRecordQueryWrapper.lambda().eq(TOeExamRecord::getExamActivityId, teExamActivity.getId()).ne(TOeExamRecord::getStatus, ExamRecordStatusEnum.finished.ordinal());
             List<TOeExamRecord> tOeExamRecordList = tOeExamRecordService.list(tOeExamRecordQueryWrapper);
             List<Long> examStudentIdList = null;
+            List<String> examStudentIdentityList = null;
             if (Objects.nonNull(tOeExamRecordList) && tOeExamRecordList.size() > 0) {
                 examStudentIdList = new ArrayList<>();
             }
@@ -81,13 +86,20 @@ public class QuartzLogicServiceImpl implements QuartzLogicService {
                 QueryWrapper<TEExamStudent> teExamStudentQueryWrapper = new QueryWrapper<>();
                 teExamStudentQueryWrapper.lambda().in(TEExamStudent::getId, examStudentIdList);
                 List<TEExamStudent> teExamStudentList = teExamStudentService.list(teExamStudentQueryWrapper);
+                examStudentIdentityList = new ArrayList<>();
+                List<String> finalExamStudentIdentityList = examStudentIdentityList;
                 teExamStudentList.forEach(s -> {
                     int count = Objects.isNull(s.getLeftExamCount()) ? 0 : s.getLeftExamCount();
                     count--;
                     s.setLeftExamCount(count < 0 ? 0 : count);
-                    //加入踢下线mq
+                    finalExamStudentIdentityList.add(s.getIdentity());
                 });
+                //加入踢下线mq
                 teExamStudentService.updateBatchById(teExamStudentList);
+                MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.oe.name(), finalExamStudentIdentityList, MqEnum.WEBSOCKET_OFFLINE_LOG, String.valueOf(teExamActivity.getId()), teExamActivity.getCode());
+                //发送强行离线mq start
+                mqDtoService.assembleSendOneWayMsg(mqDto);
+                //发送强行离线mq end
             }
             //todo 未完待续,需要加入交卷逻辑
         } else {