Procházet zdrojové kódy

考生信息异步持久化

xiatian před 4 roky
rodič
revize
8e025ed466

+ 0 - 9
themis-business/src/main/java/com/qmth/themis/business/cache/bean/ExamStudentCacheBean.java

@@ -35,8 +35,6 @@ public class ExamStudentCacheBean implements Serializable {
 
     //已考考试次数
     private Integer alreadyExamCount;
-    // 当前考试是第几次
-    private Integer currentSerialNumber;
     // 当前考试记录ID
     private Long currentRecordId;
 
@@ -64,13 +62,6 @@ public class ExamStudentCacheBean implements Serializable {
         this.identity = identity;
     }
 
-    public Integer getCurrentSerialNumber() {
-        return currentSerialNumber;
-    }
-
-    public void setCurrentSerialNumber(Integer currentSerialNumber) {
-        this.currentSerialNumber = currentSerialNumber;
-    }
 
     public String getCourseName() {
         return courseName;

+ 4 - 0
themis-business/src/main/java/com/qmth/themis/business/dao/TEExamStudentMapper.java

@@ -77,4 +77,8 @@ public interface TEExamStudentMapper extends BaseMapper<TEExamStudent> {
             @Param("roomCode") String roomCode, @Param("courseCode") String courseCode, @Param("grade") String grade,
             @Param("enable") Integer enable, @Param("classNo") String classNo, @Param("hasPhoto")Integer hasPhoto);
 
+	public void updateAlreadyExamCount(@Param("examStudentId")Long examStudentId,@Param("alreadyExamCount") Integer alreadyExamCount);
+
+	public void updateCurrentRecordId(@Param("examStudentId")Long examStudentId,@Param("currentRecordId") Long currentRecordId);
+
 }

+ 5 - 1
themis-business/src/main/java/com/qmth/themis/business/enums/MqGroupEnum.java

@@ -76,7 +76,11 @@ public enum MqGroupEnum {
 	/**
      * 考试重新算分
      */
-    scoreCalculateGroup("themis-group-exam-scoreCalculate");
+    scoreCalculateGroup("themis-group-exam-scoreCalculate"),
+    /**
+     * 考生数据更新
+     */
+    examStudentUpdateGroup("themis-group-exam-examStudentUpdate");
 
     private MqGroupEnum(String code) {
         this.code = code;

+ 2 - 1
themis-business/src/main/java/com/qmth/themis/business/enums/MqTagEnum.java

@@ -39,7 +39,8 @@ public enum MqTagEnum {
     WARNING_LOG("预警标签", "预警", "normal", 27),
     EXCEPTION_LOG("异常标签", "异常", "normal", 28),
     MONITOR_LOG("监考监控标签", "监考监控", "normal", 29),
-    EXAM_SCORE_CALCULATE("重新算分", "考试", "normal", 30);
+    EXAM_SCORE_CALCULATE("重新算分", "考试", "normal", 30),
+    EXAM_STUDNET_UPDATE("考生数据更新", "考试", "normal", 31),;
 
     private MqTagEnum(String desc, String code, String type, int id) {
         this.desc = desc;

+ 2 - 0
themis-business/src/main/java/com/qmth/themis/business/service/TEExamStudentService.java

@@ -62,4 +62,6 @@ public interface TEExamStudentService extends IService<TEExamStudent> {
      * @return
      */
     public List<RoomCodeQueryDto> examRoomQuery(String roomName);
+
+	void updateByMqMsg(Map<String, Object> param);
 }

+ 14 - 8
themis-business/src/main/java/com/qmth/themis/business/service/impl/TEExamServiceImpl.java

@@ -303,17 +303,12 @@ public class TEExamServiceImpl extends ServiceImpl<TEExamMapper, TEExam> impleme
         }
 
         // 写入次数
-        Integer serialNumber = es.getCurrentSerialNumber();
-        if (serialNumber == null) {
-            serialNumber = 0;
-        }
-        es.setCurrentSerialNumber(serialNumber + 1);
+        Integer alreadyExamCount = es.getAlreadyExamCount()+1;
 
         Long recordId = toeExamRecordService.saveByPrepare(es.getExamId(), es.getExamActivityId(), examStudentId,
-                paperId, es.getCurrentSerialNumber());
+                paperId, alreadyExamCount);
 
-        Integer alreadyExamCount = es.getAlreadyExamCount();
-        es.setAlreadyExamCount(alreadyExamCount + 1);
+        es.setAlreadyExamCount(alreadyExamCount);
 
         es.setCurrentRecordId(recordId);
 
@@ -343,9 +338,19 @@ public class TEExamServiceImpl extends ServiceImpl<TEExamMapper, TEExam> impleme
         //mq发送消息start
         MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.STUDENT.name(), SystemOperationEnum.PREPARE, MqTagEnum.STUDENT, String.valueOf(teStudentCacheDto.getId()), teStudentCacheDto.getIdentity());
         this.sendOeLogMessage(SystemOperationEnum.PREPARE, examStudentId, recordId, mqDto);
+        updateExamStudent(examStudentId, alreadyExamCount, recordId);
         //mq发送消息end
         return prepare;
     }
+    
+    private void updateExamStudent(Long examStudentId,Integer alreadyExamCount,Long currentRecordId) {
+    	Map<String, Object> transMap = new HashMap<String, Object>();
+        transMap.put("examStudentId", examStudentId);
+        transMap.put("alreadyExamCount", alreadyExamCount);
+        transMap.put("currentRecordId", currentRecordId);
+        MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.EXAM_STUDNET_UPDATE.name(), transMap, MqTagEnum.EXAM_STUDNET_UPDATE, examStudentId.toString(), examStudentId.toString());
+        mqDtoService.assembleSendOneWayMsg(mqDto);
+    }
 
     /**
      * 根据设定几率取出一套试卷
@@ -791,6 +796,7 @@ public class TEExamServiceImpl extends ServiceImpl<TEExamMapper, TEExam> impleme
         TEStudentCacheDto teStudentCacheDto = (TEStudentCacheDto) redisUtil.getStudent(es.getStudentId());
         teStudentCacheDto.setUnFinishedRecordId(null);
         redisUtil.setStudent(teStudentCacheDto.getId(), teStudentCacheDto);
+        //异步持久化
         checkToPersisted(recordId);
         //mq发送消息start
         MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.STUDENT.name(), SystemOperationEnum.FINISHED, MqTagEnum.STUDENT, String.valueOf(teStudentCacheDto.getId()), teStudentCacheDto.getIdentity());

+ 26 - 10
themis-business/src/main/java/com/qmth/themis/business/service/impl/TEExamStudentServiceImpl.java

@@ -1,5 +1,17 @@
 package com.qmth.themis.business.service.impl;
 
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Resource;
+
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.CachePut;
+import org.springframework.cache.annotation.Cacheable;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.qmth.themis.business.cache.bean.ExamStudentCacheBean;
@@ -8,15 +20,6 @@ import com.qmth.themis.business.dto.response.RoomCodeQueryDto;
 import com.qmth.themis.business.dto.response.TEExamStudentDto;
 import com.qmth.themis.business.entity.TEExamStudent;
 import com.qmth.themis.business.service.TEExamStudentService;
-import org.springframework.cache.annotation.CacheEvict;
-import org.springframework.cache.annotation.CachePut;
-import org.springframework.cache.annotation.Cacheable;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.Resource;
-import java.util.List;
-import java.util.Map;
 
 /**
  * @Description: 考生库 服务实现类
@@ -119,7 +122,6 @@ public class TEExamStudentServiceImpl extends ServiceImpl<TEExamStudentMapper, T
         ret.setRoomName(es.getRoomName());
         ret.setAlreadyExamCount(es.getAlreadyExamCount());
         ret.setCurrentRecordId(es.getCurrentRecordId());
-        ret.setCurrentSerialNumber(null);
         ret.setIdentity(es.getIdentity());
         ret.setName(es.getName());
         return ret;
@@ -130,4 +132,18 @@ public class TEExamStudentServiceImpl extends ServiceImpl<TEExamStudentMapper, T
 			String roomCode, String courseCode, String grade, Integer enable, String classNo, Integer hasPhoto) {
 		return teExamStudentMapper.examStudentList(examId, activityId, identity, name, roomCode, courseCode, grade, enable, classNo,hasPhoto);
 	}
+	
+	@Transactional
+	@Override
+	public void updateByMqMsg(Map<String, Object> param) {
+        Long examStudentId=(Long)param.get("examStudentId");
+        Integer alreadyExamCount=(Integer)param.get("alreadyExamCount");
+        Long currentRecordId=(Long)param.get("currentRecordId");
+        if(alreadyExamCount!=null) {
+        	teExamStudentMapper.updateAlreadyExamCount(examStudentId,alreadyExamCount);
+        }
+        if(currentRecordId!=null) {
+        	teExamStudentMapper.updateCurrentRecordId(examStudentId,currentRecordId);
+        }
+	}
 }

+ 5 - 5
themis-business/src/main/java/com/qmth/themis/business/service/impl/TOeExamRecordServiceImpl.java

@@ -354,11 +354,11 @@ public class TOeExamRecordServiceImpl extends ServiceImpl<TOeExamRecordMapper, T
                 answer.setExamRecordId(recordId);
                 examAnswerService.saveOrUpdate(answer);
             }
-            //更新考生信息
-            ExamStudentCacheBean examStudentCache = examStudentService.getExamStudentCacheBean(er.getExamStudentId());
-            TEExamStudent examStudent = new TEExamStudent();
-            BeanUtils.copyProperties(examStudentCache, examStudent);
-            examStudentService.saveOrUpdate(examStudent);
+//            //更新考生信息
+//            ExamStudentCacheBean examStudentCache = examStudentService.getExamStudentCacheBean(er.getExamStudentId());
+//            TEExamStudent examStudent = new TEExamStudent();
+//            BeanUtils.copyProperties(examStudentCache, examStudent);
+//            examStudentService.saveOrUpdate(examStudent);
             //上传个人试卷结构
             ExamStudentPaperStructCacheBean struct = (ExamStudentPaperStructCacheBean) redisUtil.get(RedisKeyHelper.studentPaperStructKey(recordId));
             if(struct!=null) {

+ 8 - 0
themis-business/src/main/resources/mapper/TEExamStudentMapper.xml

@@ -271,4 +271,12 @@
 		</if>
 	</select>
 	
+	<update id="updateAlreadyExamCount">
+	update t_e_exam_student set already_exam_count=#{alreadyExamCount} where id=#{examStudentId} and (already_exam_count is null or already_exam_count &lt; #{alreadyExamCount})
+	</update>
+	
+	<update id="updateCurrentRecordId">
+	update t_e_exam_student set current_record_id=#{currentRecordId} where id=#{examStudentId} and (current_record_id is null or current_record_id &lt; #{currentRecordId})
+	</update>
+	
 </mapper>

+ 24 - 9
themis-exam/src/main/java/com/qmth/themis/exam/start/StartRunning.java

@@ -1,13 +1,7 @@
 package com.qmth.themis.exam.start;
 
-import com.qmth.themis.business.constant.SpringContextHolder;
-import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.enums.MqGroupEnum;
-import com.qmth.themis.business.enums.MqTagEnum;
-import com.qmth.themis.business.enums.MqTopicEnum;
-import com.qmth.themis.exam.websocket.WebSocketOeServer;
-import com.qmth.themis.mq.listener.RocketMessageConsumer;
-import com.qmth.themis.mq.templete.impl.*;
+import javax.annotation.Resource;
+
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -15,7 +9,25 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.Resource;
+import com.qmth.themis.business.constant.SpringContextHolder;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.enums.MqGroupEnum;
+import com.qmth.themis.business.enums.MqTagEnum;
+import com.qmth.themis.business.enums.MqTopicEnum;
+import com.qmth.themis.exam.websocket.WebSocketOeServer;
+import com.qmth.themis.mq.listener.RocketMessageConsumer;
+import com.qmth.themis.mq.templete.impl.CalculateObjectiveScoreConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.ExamRecordInitConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.ExamRecordPersistedConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.ExamRecordUpdateConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.ExamStudentUpdateConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.FaceVerifyConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.LivenessVerifyConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.LogConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.SessionConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.TaskConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.UserLogConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.WebsocketUnNormalConcurrentlyImpl;
 
 /**
  * @Description: 服务启动时初始化运行,哪个微服务模块需要则拿此模版去用
@@ -77,6 +89,9 @@ public class StartRunning implements CommandLineRunner {
         rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.examRecordUpdateGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.EXAM_RECORD_UPDATE.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamRecordUpdateConcurrentlyImpl.class));
         //考试记录数据初始化
         rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.examRecordInitGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.EXAM_RECORD_INIT.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamRecordInitConcurrentlyImpl.class));
+        
+        //考生数据更新
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.examStudentUpdateGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.EXAM_STUDNET_UPDATE.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamStudentUpdateConcurrentlyImpl.class));
         SystemConstant.initTempFiles();
         log.info("服务器启动时执行 end");
     }

+ 5 - 0
themis-mq/src/main/java/com/qmth/themis/mq/service/MqLogicService.java

@@ -114,4 +114,9 @@ public interface MqLogicService {
 	 * @param key
 	 */
 	void execMqCalculateScoreLogic(MqDto mqDto, String key);
+
+	/**
+	 *更新考生信息
+	 */
+	void execMqExamStudentUpdateLogic(MqDto mqDto, String key);
 }

+ 16 - 0
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -526,4 +526,20 @@ public class MqLogicServiceImpl implements MqLogicService {
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }
+    
+    /**
+     *更新考生信息
+     */
+    @Override
+    @Transactional
+    public void execMqExamStudentUpdateLogic(MqDto mqDto, String key) {
+        Gson gson = new Gson();
+        Map<String, Object> param = (Map<String, Object>) mqDto.getBody();
+        teExamStudentService.updateByMqMsg(param);
+        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
 }

+ 77 - 0
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/ExamStudentUpdateConcurrentlyImpl.java

@@ -0,0 +1,77 @@
+package com.qmth.themis.mq.templete.impl;
+
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.dto.MqDto;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.service.MqLogicService;
+import com.qmth.themis.mq.templete.Concurrently;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * 考生数据更新
+ *
+ * @Description:
+ * @Author: xiatian
+ * @Date: 2020-08-04
+ */
+@Service
+public class ExamStudentUpdateConcurrentlyImpl implements Concurrently {
+    private final static Logger log = LoggerFactory.getLogger(ExamStudentUpdateConcurrentlyImpl.class);
+
+    @Resource
+    RedisUtil redisUtil;
+    @Resource
+    MqLogicService mqLogicService;
+
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        MqDto mqDto = null;
+        try {
+            long threadId = Thread.currentThread().getId();
+            String threadName = Thread.currentThread().getName();
+            for (MessageExt messageExt : msgs) {
+                log.debug(":{}-:{} CalculateObjectiveScore 重试次数:{}", threadId, threadName,
+                        messageExt.getReconsumeTimes());
+                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+                log.debug(":{}-:{} CalculateObjectiveScore 接收到的消息:{}", threadId, threadName,
+                        JacksonUtil.parseJson(mqDto));
+                int reconsumeTime = messageExt.getReconsumeTimes();
+                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                } else {
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
+                            && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))
+                            && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
+                            SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+                        log.debug(":{}-:{} 更新db", threadId, threadName);
+                        mqLogicService.execMqExamStudentUpdateLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        log.debug(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("mq 考生数据更新,消息消费出错", e);
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+        } finally {
+            if (Objects.nonNull(mqDto)) {
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+            }
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
+    }
+}