ExamReserveMQConsumer.java 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package com.qmth.exam.reserve.mq;
  2. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  3. import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
  4. import com.qmth.exam.reserve.bean.apply.ApplyRecordCacheBean;
  5. import com.qmth.exam.reserve.cache.impl.ApplyTaskCacheService;
  6. import com.qmth.exam.reserve.entity.StudentApplyEntity;
  7. import com.qmth.exam.reserve.enums.EventType;
  8. import com.qmth.exam.reserve.service.OperateLogService;
  9. import com.qmth.exam.reserve.service.StudentApplyService;
  10. import com.qmth.exam.reserve.util.JsonHelper;
  11. import org.apache.rocketmq.common.message.MessageExt;
  12. import org.apache.rocketmq.spring.annotation.ConsumeMode;
  13. import org.apache.rocketmq.spring.annotation.MessageModel;
  14. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  15. import org.apache.rocketmq.spring.core.RocketMQListener;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.stereotype.Component;
  20. @Component
  21. @RocketMQMessageListener(
  22. consumerGroup = "${rocketmq.consumer.group}",
  23. topic = "${rocketmq.topic}",
  24. selectorExpression = MQConstants.TAG_STUDENT_APPLY,
  25. consumeMode = ConsumeMode.ORDERLY,
  26. messageModel = MessageModel.CLUSTERING
  27. )
  28. public class ExamReserveMQConsumer implements RocketMQListener<MessageExt> {
  29. private static final Logger log = LoggerFactory.getLogger(ExamReserveMQConsumer.class);
  30. @Autowired
  31. private StudentApplyService studentApplyService;
  32. @Autowired
  33. private OperateLogService operateLogService;
  34. @Autowired
  35. private ApplyTaskCacheService applyTaskCacheService;
  36. @Override
  37. public void onMessage(MessageExt message) {
  38. ApplyRecordCacheBean queueBean = JsonHelper.toObj(message.getBody(), ApplyRecordCacheBean.class);
  39. if (queueBean == null) {
  40. log.warn("【考生预约队列】MQ消息转换失败!queueOffset:{} msgId:{}", message.getQueueOffset(), message.getMsgId());
  41. throw new RuntimeException("MQ消息转换失败!");
  42. }
  43. ApplyRecordCacheBean cacheBean = applyTaskCacheService.getStudentApplyRecord(queueBean.getStudentId(),
  44. queueBean.getExamSiteId(), queueBean.getTimePeriodId());
  45. final ApplyRecordCacheBean bean;
  46. if (cacheBean == null) {
  47. log.warn("【考生预约队列】MQ队列元素对应的缓存记录不存在!bizId:{} msgId:{}", queueBean.getBizId(), message.getMsgId());
  48. // 若缓存数据不存在,则采用队列数据持久化
  49. bean = queueBean;
  50. // 更新缓存
  51. applyTaskCacheService.saveStudentApplyRecord(bean);
  52. } else {
  53. if (cacheBean.getBizId() != null && cacheBean.getBizId() >= queueBean.getBizId()) {
  54. log.info("【考生预约队列】MQ消息消费!bizId:{} cacheBizId:{} msgId:{}", queueBean.getBizId(),
  55. cacheBean.getBizId(), message.getMsgId());
  56. // 默认采用最新预约缓存数据持久化(极端情况下队列消息顺序异常)
  57. bean = cacheBean;
  58. } else {
  59. log.warn("【考生预约队列】缓存中的队列业务流水号无效!bizId:{} cacheBizId:{} msgId:{}", queueBean.getBizId(),
  60. cacheBean.getBizId(), message.getMsgId());
  61. // 其它情况,采用队列数据持久化
  62. bean = queueBean;
  63. }
  64. }
  65. this.saveOrUpdate(bean);
  66. }
  67. public void saveOrUpdate(ApplyRecordCacheBean bean) {
  68. LambdaQueryWrapper<StudentApplyEntity> wrapper = new LambdaQueryWrapper<>();
  69. wrapper.eq(StudentApplyEntity::getExamSiteId, bean.getExamSiteId());
  70. wrapper.eq(StudentApplyEntity::getTimePeriodId, bean.getTimePeriodId());
  71. wrapper.eq(StudentApplyEntity::getStudentId, bean.getStudentId());
  72. StudentApplyEntity data = studentApplyService.getOne(wrapper);
  73. String logContent = String.format("%s_%s_%s_%s", bean.getStudentId(), bean.getExamSiteId(), bean.getTimePeriodId(), bean.getCancel());
  74. if (data != null) {
  75. // 存在预约记录,则更新
  76. data.setCancel(bean.getCancel());
  77. data.setOperateId(bean.getOperateId());
  78. data.setUpdateTime(bean.getOperateTime());
  79. LambdaUpdateWrapper<StudentApplyEntity> updateWrapper = new LambdaUpdateWrapper<>();
  80. updateWrapper.set(StudentApplyEntity::getCancel, bean.getCancel());
  81. updateWrapper.set(StudentApplyEntity::getOperateId, bean.getOperateId());
  82. updateWrapper.set(StudentApplyEntity::getUpdateTime, bean.getOperateTime());
  83. updateWrapper.eq(StudentApplyEntity::getId, data.getId());
  84. studentApplyService.update(updateWrapper);
  85. log.warn("【考生预约】DB更新!{} dbId:{} bizId:{}", logContent, data.getId(), bean.getBizId());
  86. } else {
  87. // 不存在预约记录,则新增
  88. data = new StudentApplyEntity();
  89. data.setStudentId(bean.getStudentId());
  90. data.setExamSiteId(bean.getExamSiteId());
  91. data.setTimePeriodId(bean.getTimePeriodId());
  92. data.setCancel(bean.getCancel());
  93. data.setOperateId(bean.getOperateId());
  94. data.setCreateTime(bean.getOperateTime());
  95. data.setUpdateTime(bean.getOperateTime());
  96. studentApplyService.save(data);
  97. log.warn("【考生预约】DB新增!{} dbId:{} bizId:{}", logContent, data.getId(), bean.getBizId());
  98. }
  99. if (data.getCancel()) {
  100. // 保存“取消预约”操作记录
  101. operateLogService.insertOperateLog(bean.getOperateId(), EventType.CANCEL_APPLY, JsonHelper.toJson(data));
  102. }
  103. }
  104. }