deason 8 miesięcy temu
rodzic
commit
6dbd5abd88

+ 3 - 0
src/main/java/com/qmth/exam/reserve/bean/apply/ApplyRecordCacheBean.java

@@ -29,4 +29,7 @@ public class ApplyRecordCacheBean implements IModel {
     @ApiModelProperty(value = "操作时间")
     private Long operateTime;
 
+    @ApiModelProperty(value = "预约队列业务流水号")
+    private Long bizId;
+
 }

+ 6 - 12
src/main/java/com/qmth/exam/reserve/cache/CacheConstants.java

@@ -47,6 +47,12 @@ public interface CacheConstants {
      */
     String CACHE_APPLY_TOTAL = "$cache:apply_total:%s";
 
+    /**
+     * 预约队列业务流水号缓存
+     * $queue:apply_biz_id
+     */
+    String QUEUE_APPLY_BIZ_ID = "$queue:apply_biz_id";
+
     /**
      * 预约任务 - 某考点某时段的“剩余可约数量”的缓存
      * $cache:apply_available_count:{examSiteId}_{timePeriodId}
@@ -66,18 +72,6 @@ public interface CacheConstants {
      */
     String CACHE_STUDENT_APPLY_RECORD = "$cache:student_apply_record:%s";
 
-    /**
-     * 考生预约记录队列
-     * $queue:student_apply_record
-     */
-    String QUEUE_STUDENT_APPLY_RECORD = "$queue:student_apply_record";
-
-    /**
-     * 考生预约记录队列 操作锁
-     * $lock:student_apply_record
-     */
-    String LOCK_STUDENT_APPLY_RECORD = "student_apply_record";
-
     /**
      * 某考生预约 操作锁
      * $lock:student_apply:{studentId}

+ 10 - 2
src/main/java/com/qmth/exam/reserve/cache/impl/ApplyTaskCacheService.java

@@ -109,6 +109,14 @@ public class ApplyTaskCacheService implements CacheConstants {
         log.warn("DELETE cacheKey:{}", cacheKey);
     }
 
+    /**
+     * 累加 预约队列业务流水号缓存
+     */
+    public long increaseBizId() {
+        RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(QUEUE_APPLY_BIZ_ID);
+        return atomic.incrementAndGet();
+    }
+
     /**
      * 获取某考点某时段的“剩余可约数量”缓存
      */
@@ -395,8 +403,8 @@ public class ApplyTaskCacheService implements CacheConstants {
             mqProducer.sendMessage(value);
             return true;
         } catch (Exception e) {
-            log.error("【考生预约队列】消息推送失败! {}_{}_{}_{} err:{}", value.getStudentId(), value.getExamSiteId(),
-                    value.getTimePeriodId(), value.getCancel(), e.getMessage());
+            log.error("【考生预约队列】消息推送失败! {}_{}_{}_{} bizId:{} err:{}", value.getStudentId(), value.getExamSiteId(),
+                    value.getTimePeriodId(), value.getCancel(), value.getBizId(), e.getMessage());
         }
         return false;
     }

+ 11 - 7
src/main/java/com/qmth/exam/reserve/mq/ExamReserveMQConsumer.java

@@ -25,8 +25,7 @@ import org.springframework.stereotype.Component;
         topic = "${rocketmq.topic}",
         selectorExpression = MQConstants.TAG_STUDENT_APPLY,
         consumeMode = ConsumeMode.ORDERLY,
-        messageModel = MessageModel.CLUSTERING,
-        enableMsgTrace = true
+        messageModel = MessageModel.CLUSTERING
 )
 public class ExamReserveMQConsumer implements RocketMQListener<MessageExt> {
 
@@ -48,19 +47,24 @@ public class ExamReserveMQConsumer implements RocketMQListener<MessageExt> {
             log.warn("【考生预约队列】MQ消息转换失败!msgId:{} queueOffset:{}", message.getMsgId(), message.getQueueOffset());
             throw new RuntimeException("MQ消息转换失败!");
         }
-        log.info("【考生预约队列】MQ消息消费!msgId:{} queueOffset:{}", message.getMsgId(), message.getQueueOffset());
+        log.info("【考生预约队列】MQ消息消费!msgId:{} bizId:{}", message.getMsgId(), queueBean.getBizId());
 
         ApplyRecordCacheBean cacheBean = applyTaskCacheService.getStudentApplyRecord(queueBean.getStudentId(),
                 queueBean.getExamSiteId(), queueBean.getTimePeriodId());
         final ApplyRecordCacheBean bean;
         if (cacheBean == null) {
-            log.warn("【考生预约队列】队列对应的缓存记录不存在!{}_{}_{}_{}", queueBean.getStudentId(),
-                    queueBean.getExamSiteId(), queueBean.getTimePeriodId(), queueBean.getCancel());
+            log.warn("【考生预约队列】队列对应的缓存记录不存在!bizId:{} {}_{}_{}_{}", queueBean.getBizId(),
+                    queueBean.getStudentId(), queueBean.getExamSiteId(), queueBean.getTimePeriodId(), queueBean.getCancel());
             // 若缓存数据不存在,则采用队列数据持久化
             bean = queueBean;
         } else {
-            // 默认采用最新预约缓存数据持久化(极端情况下队列消息顺序异常)
-            bean = cacheBean;
+            if (cacheBean.getBizId() != null && cacheBean.getBizId() >= queueBean.getBizId()) {
+                // 默认采用最新预约缓存数据持久化(极端情况下队列消息顺序异常)
+                bean = cacheBean;
+            } else {
+                // 其它情况采用队列数据持久化
+                bean = queueBean;
+            }
         }
 
         this.saveOrUpdate(bean);

+ 2 - 2
src/main/java/com/qmth/exam/reserve/mq/ExamReserveMQProducer.java

@@ -38,8 +38,8 @@ public class ExamReserveMQProducer implements MQConstants {
         int delayLevel = 3;
 
         SendResult result = rocketMQTemplate.syncSendOrderly(destination, message, hashKey, mqSendMessageTimeout, delayLevel);
-        log.info("【考生预约队列】消息推送!sendStatus:{} msgId:{} queueOffset:{} {}_{}_{}_{}",
-                result.getSendStatus(), result.getMsgId(), result.getQueueOffset(), bean.getStudentId(),
+        log.info("【考生预约队列】消息推送!sendStatus:{} msgId:{} bizId:{} {}_{}_{}_{}",
+                result.getSendStatus(), result.getMsgId(), bean.getBizId(), bean.getStudentId(),
                 bean.getExamSiteId(), bean.getTimePeriodId(), bean.getCancel());
     }
 

+ 3 - 0
src/main/java/com/qmth/exam/reserve/service/impl/ExamReserveServiceImpl.java

@@ -168,6 +168,7 @@ public class ExamReserveServiceImpl implements ExamReserveService {
                     existApply.setCancel(false);
                     existApply.setOperateId(student.getId());
                     existApply.setOperateTime(System.currentTimeMillis());
+                    existApply.setBizId(applyTaskCacheService.increaseBizId());
 
                     // 推送至预约队列
                     boolean pushSuccess = applyTaskCacheService.pushStudentApplyRecordQueue(existApply);
@@ -185,6 +186,7 @@ public class ExamReserveServiceImpl implements ExamReserveService {
                     newApply.setCancel(false);
                     newApply.setOperateId(student.getId());
                     newApply.setOperateTime(System.currentTimeMillis());
+                    newApply.setBizId(applyTaskCacheService.increaseBizId());
 
                     // 推送至预约队列
                     boolean pushSuccess = applyTaskCacheService.pushStudentApplyRecordQueue(newApply);
@@ -285,6 +287,7 @@ public class ExamReserveServiceImpl implements ExamReserveService {
             existApply.setCancel(true);
             existApply.setOperateId(student.getId());
             existApply.setOperateTime(System.currentTimeMillis());
+            existApply.setBizId(applyTaskCacheService.increaseBizId());
 
             // 推送至预约队列
             boolean pushSuccess = applyTaskCacheService.pushStudentApplyRecordQueue(existApply);