deason 8 tháng trước cách đây
mục cha
commit
b7aa0a7296

+ 13 - 15
src/main/java/com/qmth/exam/reserve/cache/impl/ApplyTaskCacheService.java

@@ -9,11 +9,11 @@ import com.qmth.exam.reserve.cache.CacheConstants;
 import com.qmth.exam.reserve.cache.RedisClient;
 import com.qmth.exam.reserve.entity.StudentApplyEntity;
 import com.qmth.exam.reserve.entity.TimePeriodEntity;
+import com.qmth.exam.reserve.mq.ExamReserveMQProducer;
 import com.qmth.exam.reserve.service.*;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
 import org.redisson.api.RAtomicLong;
-import org.redisson.api.RQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -32,6 +32,9 @@ public class ApplyTaskCacheService implements CacheConstants {
     @Autowired
     private RedisClient redisClient;
 
+    @Autowired
+    private ExamReserveMQProducer mqProducer;
+
     @Autowired
     private ApplyTaskService applyTaskService;
 
@@ -385,22 +388,17 @@ public class ApplyTaskCacheService implements CacheConstants {
     }
 
     /**
-     * 推送至考生预约记录队列
+     * 推送至考生预约队列
      */
-    public void pushStudentApplyRecordQueue(ApplyRecordCacheBean value) {
-        if (value == null) {
-            return;
-        }
-
-        RQueue<ApplyRecordCacheBean> queue = redisClient.getRedissonClient()
-                .getQueue(QUEUE_STUDENT_APPLY_RECORD);
-
-        boolean success = queue.offer(value);
-        log.info("{}_{}_{}_{} offerQueue:{}", value.getStudentId(), value.getExamSiteId(),
-                value.getTimePeriodId(), value.getCancel(), success);
-        if (!success) {
-            throw new RuntimeException("推送至考生预约记录队列失败");
+    public boolean pushStudentApplyRecordQueue(ApplyRecordCacheBean value) {
+        try {
+            mqProducer.sendMessage(value);
+            return true;
+        } catch (Exception e) {
+            log.error("【考生预约队列】消息推送失败! {}_{}_{}_{} err:{}", value.getStudentId(), value.getExamSiteId(),
+                    value.getTimePeriodId(), value.getCancel(), e.getMessage());
         }
+        return false;
     }
 
 }

+ 0 - 148
src/main/java/com/qmth/exam/reserve/job/StudentApplyRecordJob.java

@@ -1,148 +0,0 @@
-package com.qmth.exam.reserve.job;
-
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
-import com.qmth.boot.core.concurrent.service.ConcurrentService;
-import com.qmth.exam.reserve.bean.apply.ApplyRecordCacheBean;
-import com.qmth.exam.reserve.cache.CacheConstants;
-import com.qmth.exam.reserve.cache.RedisClient;
-import com.qmth.exam.reserve.cache.impl.ApplyTaskCacheService;
-import com.qmth.exam.reserve.entity.StudentApplyEntity;
-import com.qmth.exam.reserve.enums.EventType;
-import com.qmth.exam.reserve.service.OperateLogService;
-import com.qmth.exam.reserve.service.StudentApplyService;
-import com.qmth.exam.reserve.util.JsonHelper;
-import org.redisson.api.RLock;
-import org.redisson.api.RQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-
-@Component
-public class StudentApplyRecordJob {
-
-    private static final Logger log = LoggerFactory.getLogger(StudentApplyRecordJob.class);
-
-    @Autowired
-    private StudentApplyService studentApplyService;
-
-    @Autowired
-    private OperateLogService operateLogService;
-
-    @Autowired
-    private ApplyTaskCacheService applyTaskCacheService;
-
-    @Autowired
-    private ConcurrentService concurrentService;
-
-    @Autowired
-    private RedisClient redisClient;
-
-    /**
-     * 定时将“考生预约记录队列”中的数据保存至数据库
-     * 注:每N秒执行一次
-     */
-    @Scheduled(fixedDelay = 10000, initialDelay = 30000)
-    public void saveStudentApplyRecordJob() {
-        long start = System.currentTimeMillis();
-
-        RLock curLock = (RLock) concurrentService.getLock(CacheConstants.LOCK_STUDENT_APPLY_RECORD);
-        try {
-            if (!curLock.tryLock()) {
-                log.info("[JOB] locking...");
-                return;
-            }
-
-            RQueue<ApplyRecordCacheBean> queue = redisClient.getRedissonClient()
-                    .getQueue(CacheConstants.QUEUE_STUDENT_APPLY_RECORD);
-
-            int queueSize = queue.size();
-            log.info("[JOB] queue size:{}", queueSize);
-
-            for (int i = 1; i <= queueSize; i++) {
-                ApplyRecordCacheBean value = queue.poll();
-                if (value == null) {
-                    continue;
-                }
-
-                try {
-                    this.saveOrUpdate(value, i);
-                } catch (Exception e) {
-                    // 保存至数据库失败,放回队列重试执行
-                    boolean success = queue.offer(value);
-                    log.error("[JOB] offerQueue:{} studentId:{} err:{}", success, value.getStudentId(), e.getMessage());
-                }
-            }
-        } catch (Exception e) {
-            log.error("[JOB] err:{}", e.getMessage(), e);
-        } finally {
-            try {
-                // 解锁前检查当前线程是否持有该锁
-                if (curLock.isLocked() && curLock.isHeldByCurrentThread()) {
-                    curLock.unlock();
-                }
-            } catch (Exception e) {
-                // ignore
-            }
-
-            long end = System.currentTimeMillis();
-            log.info("[JOB] cost:{}ms", end - start);
-        }
-    }
-
-    public void saveOrUpdate(ApplyRecordCacheBean queueBean, int index) {
-        // 采用最新预约缓存记录进行数据库持久化,防止消费队列消息顺序异常
-        ApplyRecordCacheBean cacheBean = applyTaskCacheService.getStudentApplyRecord(
-                queueBean.getStudentId(), queueBean.getExamSiteId(), queueBean.getTimePeriodId());
-        if (cacheBean == null) {
-            log.warn("预约记录队列对应的缓存记录不存在!studentId:{} examSiteId:{} timePeriodId:{}",
-                    queueBean.getStudentId(), queueBean.getExamSiteId(), queueBean.getTimePeriodId());
-
-            // 若缓存丢失,则采用队列数据补偿数据库持久化
-            cacheBean = queueBean;
-        }
-
-        LambdaQueryWrapper<StudentApplyEntity> wrapper = new LambdaQueryWrapper<>();
-        wrapper.eq(StudentApplyEntity::getExamSiteId, cacheBean.getExamSiteId());
-        wrapper.eq(StudentApplyEntity::getTimePeriodId, cacheBean.getTimePeriodId());
-        wrapper.eq(StudentApplyEntity::getStudentId, cacheBean.getStudentId());
-        StudentApplyEntity data = studentApplyService.getOne(wrapper);
-
-        String msg = String.format("%s_%s_%s_%s", cacheBean.getStudentId(), cacheBean.getExamSiteId(),
-                cacheBean.getTimePeriodId(), cacheBean.getCancel());
-        if (data != null) {
-            // 存在预约记录,则修改
-            data.setCancel(cacheBean.getCancel());
-            data.setOperateId(cacheBean.getOperateId());
-            data.setUpdateTime(cacheBean.getOperateTime());
-
-            LambdaUpdateWrapper<StudentApplyEntity> updateWrapper = new LambdaUpdateWrapper<>();
-            updateWrapper.set(StudentApplyEntity::getCancel, cacheBean.getCancel());
-            updateWrapper.set(StudentApplyEntity::getOperateId, cacheBean.getOperateId());
-            updateWrapper.set(StudentApplyEntity::getUpdateTime, cacheBean.getOperateTime());
-            updateWrapper.eq(StudentApplyEntity::getId, data.getId());
-            studentApplyService.update(updateWrapper);
-            log.info("{} 预约修改!{}", msg, index);
-        } else {
-            // 不存在预约记录,则新增
-            data = new StudentApplyEntity();
-            data.setStudentId(cacheBean.getStudentId());
-            data.setExamSiteId(cacheBean.getExamSiteId());
-            data.setTimePeriodId(cacheBean.getTimePeriodId());
-            data.setCancel(cacheBean.getCancel());
-            data.setOperateId(cacheBean.getOperateId());
-            data.setCreateTime(cacheBean.getOperateTime());
-            data.setUpdateTime(cacheBean.getOperateTime());
-            studentApplyService.save(data);
-            log.info("{} 预约新增!{}", msg, index);
-        }
-
-        if (data.getCancel()) {
-            // 保存“取消预约”操作记录
-            operateLogService.insertOperateLog(cacheBean.getOperateId(), EventType.CANCEL_APPLY, JsonHelper.toJson(data));
-        }
-    }
-
-}

+ 110 - 0
src/main/java/com/qmth/exam/reserve/mq/ExamReserveMQConsumer.java

@@ -0,0 +1,110 @@
+package com.qmth.exam.reserve.mq;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.qmth.exam.reserve.bean.apply.ApplyRecordCacheBean;
+import com.qmth.exam.reserve.cache.impl.ApplyTaskCacheService;
+import com.qmth.exam.reserve.entity.StudentApplyEntity;
+import com.qmth.exam.reserve.enums.EventType;
+import com.qmth.exam.reserve.service.OperateLogService;
+import com.qmth.exam.reserve.service.StudentApplyService;
+import com.qmth.exam.reserve.util.JsonHelper;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.ConsumeMode;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+@RocketMQMessageListener(
+        consumerGroup = "${rocketmq.consumer.group}",
+        topic = "${rocketmq.topic}",
+        selectorExpression = MQConstants.TAG_STUDENT_APPLY,
+        consumeMode = ConsumeMode.ORDERLY,
+        messageModel = MessageModel.CLUSTERING,
+        enableMsgTrace = true
+)
+public class ExamReserveMQConsumer implements RocketMQListener<MessageExt> {
+
+    private static final Logger log = LoggerFactory.getLogger(ExamReserveMQConsumer.class);
+
+    @Autowired
+    private StudentApplyService studentApplyService;
+
+    @Autowired
+    private OperateLogService operateLogService;
+
+    @Autowired
+    private ApplyTaskCacheService applyTaskCacheService;
+
+    @Override
+    public void onMessage(MessageExt message) {
+        ApplyRecordCacheBean queueBean = JsonHelper.toObj(message.getBody(), ApplyRecordCacheBean.class);
+        if (queueBean == null) {
+            log.warn("【考生预约队列】MQ消息转换失败!msgId:{} queueOffset:{}", message.getMsgId(), message.getQueueOffset());
+            throw new RuntimeException("MQ消息转换失败!");
+        }
+        log.info("【考生预约队列】MQ消息消费!msgId:{} queueOffset:{}", message.getMsgId(), message.getQueueOffset());
+
+        ApplyRecordCacheBean cacheBean = applyTaskCacheService.getStudentApplyRecord(queueBean.getStudentId(),
+                queueBean.getExamSiteId(), queueBean.getTimePeriodId());
+        final ApplyRecordCacheBean bean;
+        if (cacheBean == null) {
+            log.warn("【考生预约队列】队列对应的缓存记录不存在!{}_{}_{}_{}", queueBean.getStudentId(),
+                    queueBean.getExamSiteId(), queueBean.getTimePeriodId(), queueBean.getCancel());
+            // 若缓存数据不存在,则采用队列数据持久化
+            bean = queueBean;
+        } else {
+            // 默认采用最新预约缓存数据持久化(极端情况下队列消息顺序异常)
+            bean = cacheBean;
+        }
+
+        this.saveOrUpdate(bean);
+    }
+
+    public void saveOrUpdate(ApplyRecordCacheBean bean) {
+        LambdaQueryWrapper<StudentApplyEntity> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(StudentApplyEntity::getExamSiteId, bean.getExamSiteId());
+        wrapper.eq(StudentApplyEntity::getTimePeriodId, bean.getTimePeriodId());
+        wrapper.eq(StudentApplyEntity::getStudentId, bean.getStudentId());
+        StudentApplyEntity data = studentApplyService.getOne(wrapper);
+
+        String msg = String.format("%s_%s_%s_%s", bean.getStudentId(), bean.getExamSiteId(), bean.getTimePeriodId(), bean.getCancel());
+        if (data != null) {
+            // 存在预约记录,则更新
+            data.setCancel(bean.getCancel());
+            data.setOperateId(bean.getOperateId());
+            data.setUpdateTime(bean.getOperateTime());
+
+            LambdaUpdateWrapper<StudentApplyEntity> updateWrapper = new LambdaUpdateWrapper<>();
+            updateWrapper.set(StudentApplyEntity::getCancel, bean.getCancel());
+            updateWrapper.set(StudentApplyEntity::getOperateId, bean.getOperateId());
+            updateWrapper.set(StudentApplyEntity::getUpdateTime, bean.getOperateTime());
+            updateWrapper.eq(StudentApplyEntity::getId, data.getId());
+            studentApplyService.update(updateWrapper);
+            log.info("【考生预约】DB更新!id:{} {}", data.getId(), msg);
+        } else {
+            // 不存在预约记录,则新增
+            data = new StudentApplyEntity();
+            data.setStudentId(bean.getStudentId());
+            data.setExamSiteId(bean.getExamSiteId());
+            data.setTimePeriodId(bean.getTimePeriodId());
+            data.setCancel(bean.getCancel());
+            data.setOperateId(bean.getOperateId());
+            data.setCreateTime(bean.getOperateTime());
+            data.setUpdateTime(bean.getOperateTime());
+            studentApplyService.save(data);
+            log.info("【考生预约】DB新增!id:{} {}", data.getId(), msg);
+        }
+
+        if (data.getCancel()) {
+            // 保存“取消预约”操作记录
+            operateLogService.insertOperateLog(bean.getOperateId(), EventType.CANCEL_APPLY, JsonHelper.toJson(data));
+        }
+    }
+
+}

+ 35 - 0
src/main/java/com/qmth/exam/reserve/mq/ExamReserveMQProducer.java

@@ -0,0 +1,35 @@
+package com.qmth.exam.reserve.mq;
+
+import com.qmth.exam.reserve.bean.apply.ApplyRecordCacheBean;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+@Component
+public class ExamReserveMQProducer implements MQConstants {
+
+    private static final Logger log = LoggerFactory.getLogger(ExamReserveMQProducer.class);
+
+    @Resource
+    private RocketMQTemplate rocketMQTemplate;
+
+    @Value("${rocketmq.topic}")
+    private String mqTopic;
+
+    public void sendMessage(ApplyRecordCacheBean message) {
+        // 按考点ID分区队列(同个考点下消息按顺序执行)
+        String hashKey = String.valueOf(message.getExamSiteId());
+
+        String destination = mqTopic + ":" + TAG_STUDENT_APPLY;
+        SendResult result = rocketMQTemplate.syncSendOrderly(destination, message, hashKey);
+        log.info("【考生预约队列】消息推送!sendStatus:{} msgId:{} queueOffset:{} {}_{}_{}_{}",
+                result.getSendStatus(), result.getMsgId(), result.getQueueOffset(), message.getStudentId(),
+                message.getExamSiteId(), message.getTimePeriodId(), message.getCancel());
+    }
+
+}