package com.qmth.exam.reserve.mq; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.qmth.exam.reserve.bean.apply.ApplyRecordCacheBean; import com.qmth.exam.reserve.cache.impl.ApplyTaskCacheService; import com.qmth.exam.reserve.entity.StudentApplyEntity; import com.qmth.exam.reserve.enums.EventType; import com.qmth.exam.reserve.service.OperateLogService; import com.qmth.exam.reserve.service.StudentApplyService; import com.qmth.exam.reserve.util.JsonHelper; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener( consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.topic}", selectorExpression = MQConstants.TAG_STUDENT_APPLY, consumeMode = ConsumeMode.ORDERLY, messageModel = MessageModel.CLUSTERING ) public class ExamReserveMQConsumer implements RocketMQListener { private static final Logger log = LoggerFactory.getLogger(ExamReserveMQConsumer.class); @Autowired private StudentApplyService studentApplyService; @Autowired private OperateLogService operateLogService; @Autowired private ApplyTaskCacheService applyTaskCacheService; @Override public void onMessage(MessageExt message) { ApplyRecordCacheBean queueBean = JsonHelper.toObj(message.getBody(), ApplyRecordCacheBean.class); if (queueBean == null) { log.warn("【考生预约队列】MQ消息转换失败!queueOffset:{} msgId:{}", message.getQueueOffset(), message.getMsgId()); throw new RuntimeException("MQ消息转换失败!"); } ApplyRecordCacheBean cacheBean = applyTaskCacheService.getStudentApplyRecord(queueBean.getStudentId(), queueBean.getExamSiteId(), queueBean.getTimePeriodId()); final ApplyRecordCacheBean bean; if (cacheBean == null) { log.warn("【考生预约队列】MQ队列元素对应的缓存记录不存在!bizId:{} msgId:{}", queueBean.getBizId(), message.getMsgId()); // 若缓存数据不存在,则采用队列数据持久化 bean = queueBean; // 更新缓存 applyTaskCacheService.saveStudentApplyRecord(bean); } else { if (cacheBean.getBizId() != null && cacheBean.getBizId() >= queueBean.getBizId()) { log.info("【考生预约队列】MQ消息消费!bizId:{} cacheBizId:{} msgId:{}", queueBean.getBizId(), cacheBean.getBizId(), message.getMsgId()); // 默认采用最新预约缓存数据持久化(极端情况下队列消息顺序异常) bean = cacheBean; } else { log.warn("【考生预约队列】缓存中的队列业务流水号无效!bizId:{} cacheBizId:{} msgId:{}", queueBean.getBizId(), cacheBean.getBizId(), message.getMsgId()); // 其它情况,采用队列数据持久化 bean = queueBean; } } this.saveOrUpdate(bean); } public void saveOrUpdate(ApplyRecordCacheBean bean) { LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(StudentApplyEntity::getExamSiteId, bean.getExamSiteId()); wrapper.eq(StudentApplyEntity::getTimePeriodId, bean.getTimePeriodId()); wrapper.eq(StudentApplyEntity::getStudentId, bean.getStudentId()); StudentApplyEntity data = studentApplyService.getOne(wrapper); String logContent = String.format("%s_%s_%s_%s", bean.getStudentId(), bean.getExamSiteId(), bean.getTimePeriodId(), bean.getCancel()); if (data != null) { // 存在预约记录,则更新 data.setCancel(bean.getCancel()); data.setOperateId(bean.getOperateId()); data.setUpdateTime(bean.getOperateTime()); LambdaUpdateWrapper updateWrapper = new LambdaUpdateWrapper<>(); updateWrapper.set(StudentApplyEntity::getCancel, bean.getCancel()); updateWrapper.set(StudentApplyEntity::getOperateId, bean.getOperateId()); updateWrapper.set(StudentApplyEntity::getUpdateTime, bean.getOperateTime()); updateWrapper.eq(StudentApplyEntity::getId, data.getId()); studentApplyService.update(updateWrapper); log.warn("【考生预约】DB更新!{} dbId:{} bizId:{}", logContent, data.getId(), bean.getBizId()); } else { // 不存在预约记录,则新增 data = new StudentApplyEntity(); data.setStudentId(bean.getStudentId()); data.setExamSiteId(bean.getExamSiteId()); data.setTimePeriodId(bean.getTimePeriodId()); data.setCancel(bean.getCancel()); data.setOperateId(bean.getOperateId()); data.setCreateTime(bean.getOperateTime()); data.setUpdateTime(bean.getOperateTime()); studentApplyService.save(data); log.warn("【考生预约】DB新增!{} dbId:{} bizId:{}", logContent, data.getId(), bean.getBizId()); } if (data.getCancel()) { // 保存“取消预约”操作记录 operateLogService.insertOperateLog(bean.getOperateId(), EventType.CANCEL_APPLY, JsonHelper.toJson(data)); } } }