123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- package com.qmth.exam.reserve.mq;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
- import com.qmth.exam.reserve.bean.apply.ApplyRecordCacheBean;
- import com.qmth.exam.reserve.cache.impl.ApplyTaskCacheService;
- import com.qmth.exam.reserve.entity.StudentApplyEntity;
- import com.qmth.exam.reserve.enums.EventType;
- import com.qmth.exam.reserve.service.OperateLogService;
- import com.qmth.exam.reserve.service.StudentApplyService;
- import com.qmth.exam.reserve.util.JsonHelper;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.spring.annotation.ConsumeMode;
- import org.apache.rocketmq.spring.annotation.MessageModel;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- @Component
- @RocketMQMessageListener(
- consumerGroup = "${rocketmq.consumer.group}",
- topic = "${rocketmq.topic}",
- selectorExpression = MQConstants.TAG_STUDENT_APPLY,
- consumeMode = ConsumeMode.ORDERLY,
- messageModel = MessageModel.CLUSTERING
- )
- public class ExamReserveMQConsumer implements RocketMQListener<MessageExt> {
- private static final Logger log = LoggerFactory.getLogger(ExamReserveMQConsumer.class);
- @Autowired
- private StudentApplyService studentApplyService;
- @Autowired
- private OperateLogService operateLogService;
- @Autowired
- private ApplyTaskCacheService applyTaskCacheService;
- @Override
- public void onMessage(MessageExt message) {
- ApplyRecordCacheBean queueBean = JsonHelper.toObj(message.getBody(), ApplyRecordCacheBean.class);
- if (queueBean == null) {
- log.warn("【考生预约队列】MQ消息转换失败!queueOffset:{} msgId:{}", message.getQueueOffset(), message.getMsgId());
- throw new RuntimeException("MQ消息转换失败!");
- }
- ApplyRecordCacheBean cacheBean = applyTaskCacheService.getStudentApplyRecord(queueBean.getStudentId(),
- queueBean.getExamSiteId(), queueBean.getTimePeriodId());
- final ApplyRecordCacheBean bean;
- if (cacheBean == null) {
- log.warn("【考生预约队列】MQ队列元素对应的缓存记录不存在!bizId:{} msgId:{}", queueBean.getBizId(), message.getMsgId());
- // 若缓存数据不存在,则采用队列数据持久化
- bean = queueBean;
- // 更新缓存
- applyTaskCacheService.saveStudentApplyRecord(bean);
- } else {
- if (cacheBean.getBizId() != null && cacheBean.getBizId() >= queueBean.getBizId()) {
- log.info("【考生预约队列】MQ消息消费!bizId:{} cacheBizId:{} msgId:{}", queueBean.getBizId(),
- cacheBean.getBizId(), message.getMsgId());
- // 默认采用最新预约缓存数据持久化(极端情况下队列消息顺序异常)
- bean = cacheBean;
- } else {
- log.warn("【考生预约队列】缓存中的队列业务流水号无效!bizId:{} cacheBizId:{} msgId:{}", queueBean.getBizId(),
- cacheBean.getBizId(), message.getMsgId());
- // 其它情况,采用队列数据持久化
- bean = queueBean;
- }
- }
- this.saveOrUpdate(bean);
- }
- public void saveOrUpdate(ApplyRecordCacheBean bean) {
- LambdaQueryWrapper<StudentApplyEntity> wrapper = new LambdaQueryWrapper<>();
- wrapper.eq(StudentApplyEntity::getExamSiteId, bean.getExamSiteId());
- wrapper.eq(StudentApplyEntity::getTimePeriodId, bean.getTimePeriodId());
- wrapper.eq(StudentApplyEntity::getStudentId, bean.getStudentId());
- StudentApplyEntity data = studentApplyService.getOne(wrapper);
- String logContent = String.format("%s_%s_%s_%s", bean.getStudentId(), bean.getExamSiteId(), bean.getTimePeriodId(), bean.getCancel());
- if (data != null) {
- // 存在预约记录,则更新
- data.setCancel(bean.getCancel());
- data.setOperateId(bean.getOperateId());
- data.setUpdateTime(bean.getOperateTime());
- LambdaUpdateWrapper<StudentApplyEntity> updateWrapper = new LambdaUpdateWrapper<>();
- updateWrapper.set(StudentApplyEntity::getCancel, bean.getCancel());
- updateWrapper.set(StudentApplyEntity::getOperateId, bean.getOperateId());
- updateWrapper.set(StudentApplyEntity::getUpdateTime, bean.getOperateTime());
- updateWrapper.eq(StudentApplyEntity::getId, data.getId());
- studentApplyService.update(updateWrapper);
- log.warn("【考生预约】DB更新!{} dbId:{} bizId:{}", logContent, data.getId(), bean.getBizId());
- } else {
- // 不存在预约记录,则新增
- data = new StudentApplyEntity();
- data.setStudentId(bean.getStudentId());
- data.setExamSiteId(bean.getExamSiteId());
- data.setTimePeriodId(bean.getTimePeriodId());
- data.setCancel(bean.getCancel());
- data.setOperateId(bean.getOperateId());
- data.setCreateTime(bean.getOperateTime());
- data.setUpdateTime(bean.getOperateTime());
- studentApplyService.save(data);
- log.warn("【考生预约】DB新增!{} dbId:{} bizId:{}", logContent, data.getId(), bean.getBizId());
- }
- if (data.getCancel()) {
- // 保存“取消预约”操作记录
- operateLogService.insertOperateLog(bean.getOperateId(), EventType.CANCEL_APPLY, JsonHelper.toJson(data));
- }
- }
- }
|