deason 8 ماه پیش
والد
کامیت
f250bc90cc

+ 50 - 21
src/main/java/com/qmth/exam/reserve/mq/ExamReserveMQConsumer.java

@@ -2,7 +2,11 @@ package com.qmth.exam.reserve.mq;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.qmth.boot.core.concurrent.service.ConcurrentService;
+import com.qmth.boot.core.exception.StatusException;
+import com.qmth.exam.reserve.bean.Constants;
 import com.qmth.exam.reserve.bean.apply.ApplyRecordCacheBean;
+import com.qmth.exam.reserve.cache.CacheConstants;
 import com.qmth.exam.reserve.cache.impl.ApplyTaskCacheService;
 import com.qmth.exam.reserve.entity.StudentApplyEntity;
 import com.qmth.exam.reserve.enums.EventType;
@@ -14,6 +18,7 @@ import org.apache.rocketmq.spring.annotation.ConsumeMode;
 import org.apache.rocketmq.spring.annotation.MessageModel;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.redisson.api.RLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -40,6 +45,9 @@ public class ExamReserveMQConsumer implements RocketMQListener<MessageExt> {
     @Autowired
     private ApplyTaskCacheService applyTaskCacheService;
 
+    @Autowired
+    private ConcurrentService concurrentService;
+
     @Override
     public void onMessage(MessageExt message) {
         ApplyRecordCacheBean queueBean = JsonHelper.toObj(message.getBody(), ApplyRecordCacheBean.class);
@@ -48,34 +56,55 @@ public class ExamReserveMQConsumer implements RocketMQListener<MessageExt> {
             throw new RuntimeException("MQ消息转换失败!");
         }
 
-        ApplyRecordCacheBean cacheBean = applyTaskCacheService.getStudentApplyRecord(queueBean.getStudentId(),
-                queueBean.getExamSiteId(), queueBean.getTimePeriodId());
+        // 考生预约操作锁
+        String studentApplyLockKey = String.format(CacheConstants.LOCK_STUDENT_APPLY, queueBean.getStudentId());
+        RLock studentApplyLock = (RLock) concurrentService.getLock(studentApplyLockKey);
+        try {
+            if (!studentApplyLock.tryLock()) {
+                log.warn("获取锁失败,同一个考生不允许同时操作预约!lockKey:{}", studentApplyLockKey);
+                throw new StatusException(Constants.SYSTEM_BUSY);
+            }
+            log.info("获取锁成功!lockKey:{}", studentApplyLockKey);
 
-        final ApplyRecordCacheBean bean;
-        if (cacheBean == null) {
-            log.warn("【考生预约队列】MQ队列元素对应的缓存记录不存在!bizId:{} msgId:{}", queueBean.getBizId(), message.getMsgId());
+            ApplyRecordCacheBean cacheBean = applyTaskCacheService.getStudentApplyRecord(queueBean.getStudentId(),
+                    queueBean.getExamSiteId(), queueBean.getTimePeriodId());
 
-            // 若缓存数据不存在,则采用队列数据持久化
-            bean = queueBean;
-            // 更新缓存
-            applyTaskCacheService.saveStudentApplyRecord(bean);
-        } else {
-            if (cacheBean.getBizId() != null && cacheBean.getBizId() >= queueBean.getBizId()) {
-                log.info("【考生预约队列】MQ消息消费!bizId:{} cacheBizId:{} msgId:{}", queueBean.getBizId(),
-                        cacheBean.getBizId(), message.getMsgId());
+            final ApplyRecordCacheBean bean;
+            if (cacheBean == null) {
+                log.warn("【考生预约队列】MQ队列元素对应的缓存记录不存在!bizId:{} msgId:{}", queueBean.getBizId(), message.getMsgId());
 
-                // 默认采用最新预约缓存数据持久化(极端情况下队列消息顺序异常)
-                bean = cacheBean;
+                // 若缓存数据不存在,则采用队列数据持久化
+                bean = queueBean;
+                // 更新缓存
+                applyTaskCacheService.saveStudentApplyRecord(bean);
             } else {
-                log.warn("【考生预约队列】缓存中的队列业务流水号无效!bizId:{} cacheBizId:{} msgId:{}", queueBean.getBizId(),
-                        cacheBean.getBizId(), message.getMsgId());
+                if (cacheBean.getBizId() != null && cacheBean.getBizId() >= queueBean.getBizId()) {
+                    log.info("【考生预约队列】MQ消息消费!bizId:{} cacheBizId:{} msgId:{}", queueBean.getBizId(),
+                            cacheBean.getBizId(), message.getMsgId());
+
+                    // 默认采用最新预约缓存数据持久化(极端情况下队列消息顺序异常)
+                    bean = cacheBean;
+                } else {
+                    log.warn("【考生预约队列】缓存中的队列业务流水号无效!bizId:{} cacheBizId:{} msgId:{}", queueBean.getBizId(),
+                            cacheBean.getBizId(), message.getMsgId());
+
+                    // 其它情况,采用队列数据持久化
+                    bean = queueBean;
+                }
+            }
 
-                // 其它情况,采用队列数据持久化
-                bean = queueBean;
+            this.saveOrUpdate(bean);
+        } finally {
+            try {
+                // 解锁前检查当前线程是否持有该锁
+                if (studentApplyLock.isLocked() && studentApplyLock.isHeldByCurrentThread()) {
+                    studentApplyLock.unlock();
+                    log.info("解锁成功!lockKey:{}", studentApplyLockKey);
+                }
+            } catch (Exception e) {
+                log.warn("解锁失败!lockKey:{} err:{}", studentApplyLockKey, e.getMessage());
             }
         }
-
-        this.saveOrUpdate(bean);
     }
 
     public void saveOrUpdate(ApplyRecordCacheBean bean) {

+ 10 - 17
src/main/java/com/qmth/exam/reserve/service/impl/ExamReserveServiceImpl.java

@@ -146,6 +146,12 @@ public class ExamReserveServiceImpl implements ExamReserveService {
                 throw new StatusException(msg);
             }
 
+            ApplyRecordCacheBean existApply = applyTaskCacheService.getStudentApplyRecord(student.getId(), examSiteId, timePeriodId);
+            if (existApply != null && !existApply.getCancel()) {
+                log.warn("当前时段已预约,请勿重复预约!examSiteId:{} timePeriodId:{}", examSiteId, timePeriodId);
+                throw new StatusException("当前时段已预约,请勿重复预约");
+            }
+
             // 某考点某时段的“剩余可约数量” 累减1(抢占1个数量)
             boolean takeSuccess = applyTaskCacheService.decreaseApplyAvailableCount(examSiteId, timePeriodId);
             if (!takeSuccess) {
@@ -154,16 +160,7 @@ public class ExamReserveServiceImpl implements ExamReserveService {
             }
 
             try {
-                ApplyRecordCacheBean existApply = applyTaskCacheService.getStudentApplyRecord(student.getId(), examSiteId, timePeriodId);
                 if (existApply != null) {
-                    if (!existApply.getCancel()) {
-                        // 已存在预约记录,则归还1个刚才被占数量
-                        applyTaskCacheService.increaseApplyAvailableCount(examSiteId, timePeriodId);
-
-                        log.warn("当前时段已预约,请勿重复预约!examSiteId:{} timePeriodId:{}", examSiteId, timePeriodId);
-                        throw new StatusException("当前时段已预约,请勿重复预约");
-                    }
-
                     // 存在历史“已取消”的预约记录,则恢复预约
                     existApply.setCancel(false);
                     existApply.setOperateId(student.getId());
@@ -173,7 +170,7 @@ public class ExamReserveServiceImpl implements ExamReserveService {
                     // 推送至预约队列
                     boolean pushSuccess = applyTaskCacheService.pushStudentApplyRecordQueue(existApply);
                     if (!pushSuccess) {
-                        throw new RuntimeException("预约消息推送失败,请稍后再试!");
+                        throw new StatusException("预约消息推送失败,请稍后再试!");
                     }
                     // 保存至预约缓存
                     applyTaskCacheService.saveStudentApplyRecord(existApply);
@@ -191,7 +188,7 @@ public class ExamReserveServiceImpl implements ExamReserveService {
                     // 推送至预约队列
                     boolean pushSuccess = applyTaskCacheService.pushStudentApplyRecordQueue(newApply);
                     if (!pushSuccess) {
-                        throw new RuntimeException("预约消息推送失败,请稍后再试!");
+                        throw new StatusException("预约消息推送失败,请稍后再试!");
                     }
                     // 保存至预约缓存
                     applyTaskCacheService.saveStudentApplyRecord(newApply);
@@ -199,11 +196,7 @@ public class ExamReserveServiceImpl implements ExamReserveService {
 
                 log.warn("预约成功!studentId:{} examSiteId:{} timePeriodId:{}", student.getId(), examSiteId, timePeriodId);
             } catch (Exception e) {
-                if (e instanceof StatusException) {
-                    throw e;
-                }
-
-                // 其它异常时,归还1个被占数量
+                // 异常时,归还1个被占数量
                 applyTaskCacheService.increaseApplyAvailableCount(examSiteId, timePeriodId);
                 log.error("预约异常!examSiteId:{} timePeriodId:{} err:{}", examSiteId, timePeriodId, e.getMessage());
                 throw new StatusException("预约失败,请稍后再试!", e);
@@ -292,7 +285,7 @@ public class ExamReserveServiceImpl implements ExamReserveService {
             // 推送至预约队列
             boolean pushSuccess = applyTaskCacheService.pushStudentApplyRecordQueue(existApply);
             if (!pushSuccess) {
-                throw new RuntimeException("预约消息推送失败,请稍后再试!");
+                throw new StatusException("预约消息推送失败,请稍后再试!");
             }
             // 保存至预约缓存
             applyTaskCacheService.saveStudentApplyRecord(existApply);