deason 8 月之前
父節點
當前提交
23194e6235
共有 1 個文件被更改,包括 36 次插入23 次删除
  1. 36 23
      src/main/java/com/qmth/exam/reserve/job/StudentApplyRecordJob.java

+ 36 - 23
src/main/java/com/qmth/exam/reserve/job/StudentApplyRecordJob.java

@@ -6,6 +6,7 @@ import com.qmth.boot.core.concurrent.service.ConcurrentService;
 import com.qmth.exam.reserve.bean.apply.ApplyRecordCacheBean;
 import com.qmth.exam.reserve.cache.CacheConstants;
 import com.qmth.exam.reserve.cache.RedisClient;
+import com.qmth.exam.reserve.cache.impl.ApplyTaskCacheService;
 import com.qmth.exam.reserve.entity.StudentApplyEntity;
 import com.qmth.exam.reserve.enums.EventType;
 import com.qmth.exam.reserve.service.OperateLogService;
@@ -30,6 +31,9 @@ public class StudentApplyRecordJob {
     @Autowired
     private OperateLogService operateLogService;
 
+    @Autowired
+    private ApplyTaskCacheService applyTaskCacheService;
+
     @Autowired
     private ConcurrentService concurrentService;
 
@@ -59,6 +63,9 @@ public class StudentApplyRecordJob {
 
             for (int i = 1; i <= queueSize; i++) {
                 ApplyRecordCacheBean value = queue.poll();
+                if (value == null) {
+                    continue;
+                }
 
                 try {
                     this.saveOrUpdate(value, i);
@@ -85,50 +92,56 @@ public class StudentApplyRecordJob {
         }
     }
 
-    // @Transactional
-    public void saveOrUpdate(ApplyRecordCacheBean bean, int index) {
-        if (bean == null) {
-            return;
+    public void saveOrUpdate(ApplyRecordCacheBean queueBean, int index) {
+        // 采用最新预约缓存记录进行数据库持久化,防止消费队列消息顺序异常
+        ApplyRecordCacheBean cacheBean = applyTaskCacheService.getStudentApplyRecord(
+                queueBean.getStudentId(), queueBean.getExamSiteId(), queueBean.getTimePeriodId());
+        if (cacheBean == null) {
+            log.warn("预约记录队列对应的缓存记录不存在!studentId:{} examSiteId:{} timePeriodId:{}",
+                    queueBean.getStudentId(), queueBean.getExamSiteId(), queueBean.getTimePeriodId());
+
+            // 若缓存丢失,则采用队列数据补偿数据库持久化
+            cacheBean = queueBean;
         }
 
         LambdaQueryWrapper<StudentApplyEntity> wrapper = new LambdaQueryWrapper<>();
-        wrapper.eq(StudentApplyEntity::getExamSiteId, bean.getExamSiteId());
-        wrapper.eq(StudentApplyEntity::getTimePeriodId, bean.getTimePeriodId());
-        wrapper.eq(StudentApplyEntity::getStudentId, bean.getStudentId());
+        wrapper.eq(StudentApplyEntity::getExamSiteId, cacheBean.getExamSiteId());
+        wrapper.eq(StudentApplyEntity::getTimePeriodId, cacheBean.getTimePeriodId());
+        wrapper.eq(StudentApplyEntity::getStudentId, cacheBean.getStudentId());
         StudentApplyEntity data = studentApplyService.getOne(wrapper);
 
-        String msg = String.format("%s_%s_%s_%s", bean.getStudentId(), bean.getExamSiteId(),
-                bean.getTimePeriodId(), bean.getCancel());
+        String msg = String.format("%s_%s_%s_%s", cacheBean.getStudentId(), cacheBean.getExamSiteId(),
+                cacheBean.getTimePeriodId(), cacheBean.getCancel());
         if (data != null) {
             // 存在预约记录,则修改
-            data.setCancel(bean.getCancel());
-            data.setOperateId(bean.getOperateId());
-            data.setUpdateTime(bean.getOperateTime());
+            data.setCancel(cacheBean.getCancel());
+            data.setOperateId(cacheBean.getOperateId());
+            data.setUpdateTime(cacheBean.getOperateTime());
 
             LambdaUpdateWrapper<StudentApplyEntity> updateWrapper = new LambdaUpdateWrapper<>();
-            updateWrapper.set(StudentApplyEntity::getCancel, bean.getCancel());
-            updateWrapper.set(StudentApplyEntity::getOperateId, bean.getOperateId());
-            updateWrapper.set(StudentApplyEntity::getUpdateTime, bean.getOperateTime());
+            updateWrapper.set(StudentApplyEntity::getCancel, cacheBean.getCancel());
+            updateWrapper.set(StudentApplyEntity::getOperateId, cacheBean.getOperateId());
+            updateWrapper.set(StudentApplyEntity::getUpdateTime, cacheBean.getOperateTime());
             updateWrapper.eq(StudentApplyEntity::getId, data.getId());
             studentApplyService.update(updateWrapper);
             log.info("{} 预约修改!{}", msg, index);
         } else {
             // 不存在预约记录,则新增
             data = new StudentApplyEntity();
-            data.setStudentId(bean.getStudentId());
-            data.setExamSiteId(bean.getExamSiteId());
-            data.setTimePeriodId(bean.getTimePeriodId());
-            data.setCancel(bean.getCancel());
-            data.setOperateId(bean.getOperateId());
-            data.setCreateTime(bean.getOperateTime());
-            data.setUpdateTime(bean.getOperateTime());
+            data.setStudentId(cacheBean.getStudentId());
+            data.setExamSiteId(cacheBean.getExamSiteId());
+            data.setTimePeriodId(cacheBean.getTimePeriodId());
+            data.setCancel(cacheBean.getCancel());
+            data.setOperateId(cacheBean.getOperateId());
+            data.setCreateTime(cacheBean.getOperateTime());
+            data.setUpdateTime(cacheBean.getOperateTime());
             studentApplyService.save(data);
             log.info("{} 预约新增!{}", msg, index);
         }
 
         if (data.getCancel()) {
             // 保存“取消预约”操作记录
-            operateLogService.insertOperateLog(bean.getOperateId(), EventType.CANCEL_APPLY, JsonHelper.toJson(data));
+            operateLogService.insertOperateLog(cacheBean.getOperateId(), EventType.CANCEL_APPLY, JsonHelper.toJson(data));
         }
     }