deason 1 年之前
父节点
当前提交
18964b85ef

+ 8 - 2
src/main/java/com/qmth/exam/reserve/cache/CacheConstants.java

@@ -61,7 +61,13 @@ public interface CacheConstants {
     String QUEUE_STUDENT_APPLY_RECORD = "$queue:student_apply_record";
 
     /**
-     * 某考生预约操作锁
+     * 考生预约记录队列 操作锁
+     * $lock:student_apply_record
+     */
+    String LOCK_STUDENT_APPLY_RECORD = "student_apply_record";
+
+    /**
+     * 某考生预约 操作锁
      * $lock:student_apply:{studentId}
      */
     String LOCK_STUDENT_APPLY = "student_apply:%s";
@@ -73,7 +79,7 @@ public interface CacheConstants {
     String LOCK_AUTO_APPLY = "auto_apply";
 
     /**
-     * 自动排考
+     * 自动排考操作锁
      * $lock:arrange_exam:{yyyyMMdd}
      */
     String LOCK_ARRANGE_EXAM = "arrange_exam:%s";

+ 68 - 6
src/main/java/com/qmth/exam/reserve/job/StudentApplyRecordJob.java

@@ -1,8 +1,12 @@
 package com.qmth.exam.reserve.job;
 
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.qmth.boot.core.concurrent.service.ConcurrentService;
 import com.qmth.exam.reserve.bean.apply.ApplyRecordCacheBean;
 import com.qmth.exam.reserve.cache.CacheConstants;
 import com.qmth.exam.reserve.cache.RedisClient;
+import com.qmth.exam.reserve.entity.StudentApplyEntity;
 import com.qmth.exam.reserve.service.StudentApplyService;
 import org.redisson.api.RBlockingQueue;
 import org.slf4j.Logger;
@@ -10,6 +14,9 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.concurrent.locks.Lock;
 
 @Component
 public class StudentApplyRecordJob {
@@ -19,6 +26,9 @@ public class StudentApplyRecordJob {
     @Autowired
     private StudentApplyService studentApplyService;
 
+    @Autowired
+    private ConcurrentService concurrentService;
+
     @Autowired
     private RedisClient redisClient;
 
@@ -29,21 +39,73 @@ public class StudentApplyRecordJob {
     // @Scheduled(fixedDelay = 30000, initialDelay = 30000)
     public void saveStudentApplyRecordJob() {
         long start = System.currentTimeMillis();
+
         try {
+            Lock curLock = concurrentService.getLock(CacheConstants.LOCK_STUDENT_APPLY_RECORD);
+            if (!curLock.tryLock()) {
+                log.info("[JOB] locking...");
+                return;
+            }
+
             RBlockingQueue<ApplyRecordCacheBean> queue = redisClient.getRedissonClient()
                     .getBlockingQueue(CacheConstants.QUEUE_STUDENT_APPLY_RECORD);
 
-            log.info("[QUEUE] curSize:{}", queue.size());
-            while (!queue.isEmpty()) {
+            int queueSize = queue.size();
+            log.info("[JOB] queue size:{}", queueSize);
+            for (int i = 0; i < queueSize; i++) {
                 ApplyRecordCacheBean value = queue.take();
-                log.info("[QUEUE] {}_{}_{}", value.getStudentId(), value.getExamSiteId(), value.getTimePeriodId());
-                // todo
+
+                try {
+                    this.saveOrUpdate(value);
+                } catch (Exception e) {
+                    // 保存至数据库失败,放回队列重试执行
+                    boolean success = queue.offer(value);
+                    log.error("[JOB] offerQueue:{} studentId:{} err:{}", success, value.getStudentId(), e.getMessage());
+                }
             }
+
+            curLock.unlock();
         } catch (Exception e) {
-            log.error("[QUEUE] job err:{}", e.getMessage(), e);
+            log.error("[JOB] err:{}", e.getMessage(), e);
         } finally {
             long end = System.currentTimeMillis();
-            log.info("[QUEUE] job cost {}ms", end - start);
+            log.info("[JOB] cost:{}ms", end - start);
+        }
+    }
+
+    @Transactional
+    public void saveOrUpdate(ApplyRecordCacheBean bean) {
+        LambdaQueryWrapper<StudentApplyEntity> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(StudentApplyEntity::getStudentId, bean.getStudentId());
+        wrapper.eq(StudentApplyEntity::getExamSiteId, bean.getExamSiteId());
+        wrapper.eq(StudentApplyEntity::getTimePeriodId, bean.getTimePeriodId());
+        StudentApplyEntity existEntity = studentApplyService.getOne(wrapper);
+        if (existEntity != null) {
+            if (!existEntity.getCancel()) {
+                log.debug("{}_{}_{}_{} db exist", bean.getStudentId(), bean.getExamSiteId(), bean.getTimePeriodId(), bean.getCancel());
+                return;
+            }
+
+            // 存在“已取消”预约记录,则恢复预约
+            LambdaUpdateWrapper<StudentApplyEntity> updateWrapper = new LambdaUpdateWrapper<>();
+            updateWrapper.set(StudentApplyEntity::getCancel, bean.getCancel());
+            updateWrapper.set(StudentApplyEntity::getOperateId, bean.getOperateId());
+            updateWrapper.set(StudentApplyEntity::getUpdateTime, bean.getOperateTime());
+            updateWrapper.eq(StudentApplyEntity::getId, existEntity.getId());
+            studentApplyService.update(updateWrapper);
+            log.debug("{}_{}_{}_{} db update", bean.getStudentId(), bean.getExamSiteId(), bean.getTimePeriodId(), bean.getCancel());
+        } else {
+            // 不存在则新增预约记录
+            StudentApplyEntity entity = new StudentApplyEntity();
+            entity.setStudentId(bean.getStudentId());
+            entity.setExamSiteId(bean.getExamSiteId());
+            entity.setTimePeriodId(bean.getTimePeriodId());
+            entity.setCancel(false);
+            entity.setOperateId(bean.getOperateId());
+            entity.setCreateTime(bean.getOperateTime());
+            entity.setUpdateTime(bean.getOperateTime());
+            studentApplyService.save(entity);
+            log.debug("{}_{}_{}_{} db insert", bean.getStudentId(), bean.getExamSiteId(), bean.getTimePeriodId(), bean.getCancel());
         }
     }