Jelajahi Sumber

Merge remote-tracking branch 'origin/dev' into dev

wangliang 4 tahun lalu
induk
melakukan
c6fb935840

+ 15 - 0
themis-business/src/main/java/com/qmth/themis/business/cache/ExamRecordCacheUtil.java

@@ -6,6 +6,7 @@ import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.enums.ExamRecordStatusEnum;
 import com.qmth.themis.business.enums.FinishTypeEnum;
 import com.qmth.themis.business.enums.VerifyExceptionEnum;
+import com.qmth.themis.business.service.TOeExamRecordService;
 import com.qmth.themis.business.util.RedisUtil;
 
 /**
@@ -16,6 +17,8 @@ import com.qmth.themis.business.util.RedisUtil;
  * @Date: 2020-07-29
  */
 public class ExamRecordCacheUtil {
+	
+	private static TOeExamRecordService examRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
 	private static RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
 
 	public static Long getId(Long recordId) {
@@ -32,10 +35,12 @@ public class ExamRecordCacheUtil {
 
 	public static void setFirstStartTime(Long recordId, Date date) {
 		redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), "firstStartTime", date);
+		examRecordService.dataUpdateMq(recordId, "first_start_time", date);
 	}
 
 	public static void setStatus(Long recordId, ExamRecordStatusEnum status) {
 		redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), "status", status);
+		examRecordService.dataUpdateMq(recordId, "status", status);
 	}
 
 	public static Integer getDurationSeconds(Long recordId) {
@@ -52,10 +57,12 @@ public class ExamRecordCacheUtil {
 
 	public static void setObjectiveScore(Long recordId, Double objectiveScore) {
 		redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), "objectiveScore", objectiveScore);
+		examRecordService.dataUpdateMq(recordId, "objective_score", objectiveScore);
 	}
 
 	public static void setFinishTime(Long recordId, Date finishTime) {
 		redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), "finishTime", finishTime);
+		examRecordService.dataUpdateMq(recordId, "finish_time", finishTime);
 	}
 
 	public static Date getFinishTime(Long recordId) {
@@ -64,10 +71,12 @@ public class ExamRecordCacheUtil {
 
 	public static void setDurationSeconds(Long recordId, Integer durationSeconds) {
 		redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), "durationSeconds", durationSeconds);
+		examRecordService.dataUpdateMq(recordId, "duration_seconds", durationSeconds);
 	}
 
 	public static void setFinishType(Long recordId, FinishTypeEnum finishType) {
 		redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), "finishType", finishType);
+		examRecordService.dataUpdateMq(recordId, "finish_type", finishType);
 	}
 
 	public static FinishTypeEnum getFinishType(Long recordId) {
@@ -84,6 +93,7 @@ public class ExamRecordCacheUtil {
 	
 	public static void setLeftBreakResumeCount(Long recordId,Integer leftBreakResumeCount) {
 		redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), "leftBreakResumeCount",leftBreakResumeCount);
+		examRecordService.dataUpdateMq(recordId, "left_break_resume_count", leftBreakResumeCount);
 	}
 
 	public static ExamRecordStatusEnum getStatus(Long recordId) {
@@ -104,6 +114,7 @@ public class ExamRecordCacheUtil {
 
 	public static void setEntryAuthenticationResult(Long recordId, VerifyExceptionEnum entryAuthenticationResult) {
 		redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), "entryAuthenticationResult", entryAuthenticationResult);
+		examRecordService.dataUpdateMq(recordId, "entry_authentication_result", entryAuthenticationResult);
 	}
 	
 	public static Long getEntryAuthenticationId(Long recordId) {
@@ -112,6 +123,7 @@ public class ExamRecordCacheUtil {
 
 	public static void setEntryAuthenticationId(Long recordId, Long entryAuthenticationId) {
 		redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), "entryAuthenticationId", entryAuthenticationId);
+		examRecordService.dataUpdateMq(recordId, "entry_authentication_id", entryAuthenticationId);
 	}
 	
 	public static Long getExamActivityId(Long recordId) {
@@ -120,6 +132,7 @@ public class ExamRecordCacheUtil {
 	
 	public static void setWarningCount(Long recordId, Integer warningCount) {
 		redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), "warningCount", warningCount);
+		examRecordService.dataUpdateMq(recordId, "warning_count", warningCount);
 	}
 	
 	public static Integer getWarningCount(Long recordId) {
@@ -128,6 +141,7 @@ public class ExamRecordCacheUtil {
 	
 	public static void setBreachStatus(Long recordId, Integer breachStatus) {
 		redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), "breachStatus", breachStatus);
+		examRecordService.dataUpdateMq(recordId, "breach_status", breachStatus);
 	}
 	
 	public static Integer getBreachStatus(Long recordId) {
@@ -136,6 +150,7 @@ public class ExamRecordCacheUtil {
 	
 	public static void setInProcessLivenessVerifyCount(Long recordId, Integer inProcessLivenessVerifyCount) {
 		redisUtil.set(RedisKeyHelper.examRecordCacheKey(recordId), "inProcessLivenessVerifyCount", inProcessLivenessVerifyCount);
+		examRecordService.dataUpdateMq(recordId, "in_process_liveness_verify_count", inProcessLivenessVerifyCount);
 	}
 	
 	public static Integer getInProcessLivenessVerifyCount(Long recordId) {

+ 7 - 0
themis-business/src/main/java/com/qmth/themis/business/dao/TOeExamRecordMapper.java

@@ -28,4 +28,11 @@ public interface TOeExamRecordMapper extends BaseMapper<TOeExamRecord> {
      * @return
      */
     public TEExamUnFinishDto getUnFinishExam(@Param("studentId") Long studentId, @Param("examId") Long examId, @Param("orgId") Long orgId);
+    
+    /**数据更新
+     * @param recordId
+     * @param colName
+     * @param colValue
+     */
+    public void dataUpdate(@Param("recordId") Long recordId, @Param("colName") String colName, @Param("colValue") Object colValue);
 }

+ 2 - 0
themis-business/src/main/java/com/qmth/themis/business/enums/MqTagEnum.java

@@ -29,6 +29,8 @@ public enum MqTagEnum {
     faceVerifySave("人脸验证保存"),
     livenessVerifySave("活体验证保存"),
     examRecordPersisted("考试记录数据持久化"),
+    examRecordUpdate("考试记录数据更新"),
+    examRecordInit("考试记录数据初始化"),
     examBreakHistory("考试断点记录");
 
     private MqTagEnum(String code) {

+ 24 - 0
themis-business/src/main/java/com/qmth/themis/business/service/TOeExamRecordService.java

@@ -47,4 +47,28 @@ public interface TOeExamRecordService extends IService<TOeExamRecord> {
 	void saveLivenessVerify(LivenessTypeEnum type, Long recordId, Long entryAuthenticationId,
 			VerifyExceptionEnum entryAuthenticationResult);
 
+	/**考试记录字段同步消息发送
+	 * @param recordId
+	 * @param colName
+	 * @param colValue
+	 */
+	void dataUpdateMq(Long recordId, String colName, Object colValue);
+	
+	/**考试记录字段同步
+	 * @param recordId
+	 * @param colName
+	 * @param colValue
+	 */
+	void dataUpdate(Long recordId, String colName, Object colValue);
+
+	/**考试记录初始化消息发送
+	 * @param param
+	 */
+	void dataInitMq(Map<String, Object> param);
+
+	/**考试记录初始化
+	 * @param param
+	 */
+	void dataInit(Map<String, Object> param);
+
 }

+ 44 - 1
themis-business/src/main/java/com/qmth/themis/business/service/impl/TOeExamRecordServiceImpl.java

@@ -30,6 +30,7 @@ import com.qmth.themis.business.config.SystemConfig;
 import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.dao.TOeExamRecordMapper;
+import com.qmth.themis.business.dto.MqDto;
 import com.qmth.themis.business.dto.response.TEExamUnFinishDto;
 import com.qmth.themis.business.entity.TEExamStudent;
 import com.qmth.themis.business.entity.TOeExamAnswer;
@@ -37,8 +38,12 @@ import com.qmth.themis.business.entity.TOeExamRecord;
 import com.qmth.themis.business.enums.ExamRecordStatusEnum;
 import com.qmth.themis.business.enums.ExamTypeEnum;
 import com.qmth.themis.business.enums.LivenessTypeEnum;
+import com.qmth.themis.business.enums.MqEnum;
+import com.qmth.themis.business.enums.MqTagEnum;
+import com.qmth.themis.business.enums.MqTopicEnum;
 import com.qmth.themis.business.enums.ObjectiveScorePolicyEnum;
 import com.qmth.themis.business.enums.VerifyExceptionEnum;
+import com.qmth.themis.business.service.MqDtoService;
 import com.qmth.themis.business.service.TEExamPaperService;
 import com.qmth.themis.business.service.TEExamService;
 import com.qmth.themis.business.service.TEExamStudentService;
@@ -62,6 +67,10 @@ public class TOeExamRecordServiceImpl extends ServiceImpl<TOeExamRecordMapper, T
 	private final static Logger log = LoggerFactory.getLogger(TOeExamRecordServiceImpl.class);
 	
 	private SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");
+	
+    @Resource
+    MqDtoService mqDtoService;
+	
     @Resource
     TOeExamRecordMapper tOeExamRecordMapper;
 
@@ -148,7 +157,10 @@ public class TOeExamRecordServiceImpl extends ServiceImpl<TOeExamRecordMapper, T
         er.setSerialNumber(serialNumber);
         er.setFirstPrepareTime(new Date());
         er.setStatus(ExamRecordStatusEnum.FIRST_PREPARE);
-        redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(er.getId()), SimpleBeanUtil.objectToMap(er));
+        er.setObjectiveScore(0.0);
+        Map<String, Object> map=SimpleBeanUtil.objectToMap(er);
+        redisUtil.setForHash(RedisKeyHelper.examRecordCacheKey(er.getId()), map);
+        dataInitMq(map);
         return er.getId();
     }
 
@@ -365,4 +377,35 @@ public class TOeExamRecordServiceImpl extends ServiceImpl<TOeExamRecordMapper, T
 		}
 	}
 	
+	@Override
+	public void dataUpdateMq(Long recordId,String colName,Object colValue) {
+		Map<String, Object> transMap = new HashMap<String, Object>();
+		transMap.put("recordId", recordId);
+        transMap.put("colName", colName);
+        transMap.put("colValue", colValue);
+        //mq发送消息start
+        MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.examRecordUpdate.name(), transMap, MqEnum.EXAM, null, null);
+        mqDtoService.assembleSendOneWayMsg(mqDto);
+	}
+
+	@Transactional
+	@Override
+	public void dataUpdate(Long recordId, String colName, Object colValue) {
+		tOeExamRecordMapper.dataUpdate(recordId, colName, colValue);
+	}
+	
+	@Override
+	public void dataInitMq(Map<String, Object> param) {
+        //mq发送消息start
+        MqDto mqDto = new MqDto(MqTopicEnum.themisTopic.getCode(), MqTagEnum.examRecordInit.name(), param, MqEnum.EXAM, null, null);
+        mqDtoService.assembleSendOneWayMsg(mqDto);
+	}
+
+	@Transactional
+	@Override
+	public void dataInit(Map<String, Object> param) {
+		TOeExamRecord tr=SimpleBeanUtil.mapToObject(param, TOeExamRecord.class);
+		saveOrUpdate(tr);
+	}
+	
 }

+ 3 - 0
themis-business/src/main/resources/mapper/TOeExamRecordMapper.xml

@@ -63,4 +63,7 @@
             </if>
         </where>
     </select>
+    <update id="dataUpdate">
+		update t_oe_exam_record t set t.${colName}=#{colValue} where t.id=#{recordId}
+	</update>
 </mapper>

+ 4 - 0
themis-exam/src/main/java/com/qmth/themis/exam/start/StartRunning.java

@@ -94,6 +94,10 @@ public class StartRunning implements CommandLineRunner {
         //考试断点记录
         rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.examBreakHistoryGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.examBreakHistory.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamBreakHistoryConcurrentlyImpl.class));
 
+        //考试记录数据更新
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.examRecordPersistedGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.examRecordUpdate.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamRecordUpdateConcurrentlyImpl.class));
+        //考试记录数据初始化
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.examRecordPersistedGroup.getCode(), MqTopicEnum.themisTopic.getCode(), MqTagEnum.examRecordInit.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamRecordInitConcurrentlyImpl.class));
         SystemConstant.initTempFiles();
         log.info("服务器启动时执行 end");
     }

+ 12 - 0
themis-mq/src/main/java/com/qmth/themis/mq/service/MqLogicService.java

@@ -90,4 +90,16 @@ public interface MqLogicService {
      * @param key
      */
     public void execMqExamBreakHistoryLogic(MqDto mqDto, String key);
+
+	/**考试记录数据更新
+	 * @param mqDto
+	 * @param key
+	 */
+	void execMqRecordUpdateLogic(MqDto mqDto, String key);
+
+	/**考试记录初始化
+	 * @param mqDto
+	 * @param key
+	 */
+	void execMqRecordInitLogic(MqDto mqDto, String key);
 }

+ 29 - 0
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -365,4 +365,33 @@ public class MqLogicServiceImpl implements MqLogicService {
         tmRocketMessageService.saveOrUpdate(tmRocketMessage);
         redisUtil.delete(key, mqDto.getId());
     }
+    
+    
+    @Override
+    @Transactional
+    public void execMqRecordUpdateLogic(MqDto mqDto, String key) {
+        Gson gson = new Gson();
+        Map<String, Object> param = (Map<String, Object>) mqDto.getBody();
+        Long recordId = (Long) param.get("recordId");
+        String colName = (String) param.get("colName");
+        String colValue = (String) param.get("colValue");
+        examRecordService.dataUpdate(recordId, colName, colValue);
+        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
+    
+    @Override
+    public void execMqRecordInitLogic(MqDto mqDto, String key) {
+        Gson gson = new Gson();
+        Map<String, Object> param = (Map<String, Object>) mqDto.getBody();
+        examRecordService.dataInit(param);
+        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+        redisUtil.delete(key, mqDto.getId());
+    }
 }

+ 1 - 1
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/CalculateObjectiveScoreConcurrentlyImpl.java

@@ -46,7 +46,7 @@ public class CalculateObjectiveScoreConcurrentlyImpl implements Concurrently {
                         JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                    mqLogicService.execMqWebsocketUnNormalLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
                             && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))

+ 77 - 0
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/ExamRecordInitConcurrentlyImpl.java

@@ -0,0 +1,77 @@
+package com.qmth.themis.mq.templete.impl;
+
+import java.util.List;
+import java.util.Objects;
+
+import javax.annotation.Resource;
+
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.business.dto.MqDto;
+import com.qmth.themis.mq.service.MqLogicService;
+import com.qmth.themis.mq.templete.Concurrently;
+
+/**考试记录数据初始化
+ * @Description: 
+ * @Author: xiatian
+ * @Date: 2020-08-04
+ */
+@Service
+public class ExamRecordInitConcurrentlyImpl implements Concurrently {
+    private final static Logger log = LoggerFactory.getLogger(ExamRecordInitConcurrentlyImpl.class);
+    
+	@Resource
+	RedisUtil redisUtil;
+	@Resource
+	MqLogicService mqLogicService;
+
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        MqDto mqDto = null;
+        try {
+            long threadId = Thread.currentThread().getId();
+            String threadName = Thread.currentThread().getName();
+            for (MessageExt messageExt : msgs) {
+                log.debug(":{}-:{} CalculateObjectiveScore 重试次数:{}", threadId, threadName,
+                        messageExt.getReconsumeTimes());
+                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+                log.debug(":{}-:{} CalculateObjectiveScore 接收到的消息:{}", threadId, threadName,
+                        JacksonUtil.parseJson(mqDto));
+                int reconsumeTime = messageExt.getReconsumeTimes();
+                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                } else {
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
+                            && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))
+                            && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
+                            SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+                        log.debug(":{}-:{} 更新db", threadId, threadName);
+                        mqLogicService.execMqFaceVerifySaveLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        log.debug(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
+                    }
+                }
+            }
+        } catch (Exception e) {
+        	log.error("考试记录数据更新,消息消费出错",e);
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+        } finally {
+            if (Objects.nonNull(mqDto)) {
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+            }
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
+    }
+}

+ 3 - 3
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/ExamRecordPersistedConcurrentlyImpl.java

@@ -49,14 +49,14 @@ public class ExamRecordPersistedConcurrentlyImpl implements Concurrently {
                         JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                    mqLogicService.execMqWebsocketUnNormalLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
                             && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))
                             && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
                             SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                         log.debug(":{}-:{} 更新db", threadId, threadName);
-                        mqLogicService.execMqFaceVerifySaveLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                        mqLogicService.execMqExamRecordPersistedLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                     } else {
                         log.debug(":{}-:{} 消息ack未确认,重发", threadId, threadName);
@@ -65,7 +65,7 @@ public class ExamRecordPersistedConcurrentlyImpl implements Concurrently {
                 }
             }
         } catch (Exception e) {
-        	log.error("人脸验证保存,消息消费出错",e);
+        	log.error("考试记录数据持久化,消息消费出错",e);
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
         } finally {
             if (Objects.nonNull(mqDto)) {

+ 77 - 0
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/ExamRecordUpdateConcurrentlyImpl.java

@@ -0,0 +1,77 @@
+package com.qmth.themis.mq.templete.impl;
+
+import java.util.List;
+import java.util.Objects;
+
+import javax.annotation.Resource;
+
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.business.dto.MqDto;
+import com.qmth.themis.mq.service.MqLogicService;
+import com.qmth.themis.mq.templete.Concurrently;
+
+/**考试记录数据更新
+ * @Description: 
+ * @Author: xiatian
+ * @Date: 2020-08-04
+ */
+@Service
+public class ExamRecordUpdateConcurrentlyImpl implements Concurrently {
+    private final static Logger log = LoggerFactory.getLogger(ExamRecordUpdateConcurrentlyImpl.class);
+    
+	@Resource
+	RedisUtil redisUtil;
+	@Resource
+	MqLogicService mqLogicService;
+
+    @Override
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+                                                    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        MqDto mqDto = null;
+        try {
+            long threadId = Thread.currentThread().getId();
+            String threadName = Thread.currentThread().getName();
+            for (MessageExt messageExt : msgs) {
+                log.debug(":{}-:{} CalculateObjectiveScore 重试次数:{}", threadId, threadName,
+                        messageExt.getReconsumeTimes());
+                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+                log.debug(":{}-:{} CalculateObjectiveScore 接收到的消息:{}", threadId, threadName,
+                        JacksonUtil.parseJson(mqDto));
+                int reconsumeTime = messageExt.getReconsumeTimes();
+                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                } else {
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
+                            && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))
+                            && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(),
+                            SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+                        log.debug(":{}-:{} 更新db", threadId, threadName);
+                        mqLogicService.execMqRecordUpdateLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        log.debug(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
+                    }
+                }
+            }
+        } catch (Exception e) {
+        	log.error("考试记录数据更新,消息消费出错",e);
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+        } finally {
+            if (Objects.nonNull(mqDto)) {
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+            }
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
+    }
+}

+ 1 - 1
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/FaceVerifyConcurrentlyImpl.java

@@ -51,7 +51,7 @@ public class FaceVerifyConcurrentlyImpl implements Concurrently {
                         JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                    mqLogicService.execMqWebsocketUnNormalLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
                             && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))

+ 1 - 1
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/LivenessVerifyConcurrentlyImpl.java

@@ -51,7 +51,7 @@ public class LivenessVerifyConcurrentlyImpl implements Concurrently {
                         JacksonUtil.parseJson(mqDto));
                 int reconsumeTime = messageExt.getReconsumeTimes();
                 if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                    mqLogicService.execMqWebsocketUnNormalLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                    mqLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE
                             && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId()))