package com.qmth.exam.reserve.cache.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.qmth.boot.core.concurrent.service.ConcurrentService; import com.qmth.exam.reserve.bean.apply.ApplyRecordCacheBean; import com.qmth.exam.reserve.bean.applytask.CurrentApplyTaskVO; import com.qmth.exam.reserve.bean.examsite.ExamSiteCapacityInfo; import com.qmth.exam.reserve.cache.CacheConstants; import com.qmth.exam.reserve.cache.RedisClient; import com.qmth.exam.reserve.entity.StudentApplyEntity; import com.qmth.exam.reserve.entity.TimePeriodEntity; import com.qmth.exam.reserve.mq.ExamReserveMQProducer; import com.qmth.exam.reserve.service.*; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.redisson.api.RAtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @Component public class ApplyTaskCacheService implements CacheConstants { private static final Logger log = LoggerFactory.getLogger(ApplyTaskCacheService.class); @Autowired private RedisClient redisClient; @Autowired private ExamReserveMQProducer mqProducer; @Autowired private ApplyTaskService applyTaskService; @Autowired private ExamSiteService examSiteService; @Autowired private TimePeriodService timePeriodService; @Autowired private StudentService studentService; @Autowired private StudentApplyService studentApplyService; @Autowired private ConcurrentService concurrentService; /** * 获取当前启用的预约任务缓存 */ public CurrentApplyTaskVO currentApplyTask(Long orgId) { String cacheKey = String.format(CACHE_CURRENT_APPLY_TASK, orgId); CurrentApplyTaskVO value = redisClient.get(cacheKey, CurrentApplyTaskVO.class); if (value != null) { return value; } value = applyTaskService.currentApplyTask(orgId); if (value == null) { return null; } redisClient.set(cacheKey, value, CACHE_TIME_OUT_10, TimeUnit.MINUTES); log.info("SET cacheKey:{} curApplyTaskId:{}", cacheKey, value.getTaskId()); return value; } /** * 清除当前启用的预约任务缓存 */ public void clearCurrentApplyTaskCache(Long orgId) { String cacheKey = String.format(CACHE_CURRENT_APPLY_TASK, orgId); redisClient.delete(cacheKey); log.warn("DELETE cacheKey:{}", cacheKey); } /** * 获取某考生的“允许预约时段次数”缓存 */ public int getStudentApplyNumber(Long studentId) { String cacheKey = String.format(CACHE_STUDENT_APPLY_NUMBER, studentId); Integer value = redisClient.get(cacheKey, Integer.class); if (value != null) { return value; } value = studentService.findStudentApplyNumberById(studentId); redisClient.set(cacheKey, value, CACHE_TIME_OUT_10, TimeUnit.MINUTES); log.info("SET cacheKey:{} value:{}", cacheKey, value); return value; } /** * 清除某考生的“允许预约时段次数”缓存 */ public void clearStudentApplyNumberCache(Long studentId) { String cacheKey = String.format(CACHE_STUDENT_APPLY_NUMBER, studentId); redisClient.delete(cacheKey); log.warn("DELETE cacheKey:{}", cacheKey); } /** * 累加 预约队列业务流水号缓存 */ public long increaseBizId() { RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(QUEUE_APPLY_BIZ_ID); return atomic.incrementAndGet(); } /** * 获取某考点某时段的“剩余可约数量”缓存 */ public int getApplyAvailableCount(Long examSiteId, Long timePeriodId) { String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriodId); RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey); return (int) atomic.get();//若缓存不存在时,会返回0 } /** * 累加 某考点某时段的“剩余可约数量”缓存 */ public void increaseApplyAvailableCount(Long examSiteId, Long timePeriodId) { String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriodId); RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey); long value = atomic.incrementAndGet(); log.warn("累加成功,考点时段剩余可约数量:{} {}", value, cacheKey); } /** * 累减 某考点某时段的“剩余可约数量”缓存 */ public boolean decreaseApplyAvailableCount(Long examSiteId, Long timePeriodId) { String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriodId); StringBuilder luaScript = new StringBuilder(); luaScript.append("local key = KEYS[1];"); luaScript.append("if (redis.call('exists', key) == 1) then"); luaScript.append(" local stock = tonumber(redis.call('get', key));"); luaScript.append(" local amount = tonumber(ARGV[1]);"); luaScript.append(" if (stock >= amount) then"); luaScript.append(" return redis.call('decrby', key, amount);"); luaScript.append(" end;"); luaScript.append(" return -1;"); luaScript.append("end;"); luaScript.append("return -2;"); List keys = Collections.singletonList(cacheKey); Object[] args = new Object[]{1}; // 执行Lua脚本,传入键列表和参数列表 DefaultRedisScript redisScript = new DefaultRedisScript<>(luaScript.toString(), Long.class); Long result = redisClient.getRedisTemplate().execute(redisScript, keys, args); Objects.requireNonNull(result, "缓存操作异常,返回结果为空!"); if (-2 == result) { log.warn("扣减失败,考点时段剩余可约数量缓存不存在!{}", cacheKey); return false; } else if (-1 == result) { log.warn("扣减失败,考点时段剩余可约数量为0不足!{}", cacheKey); return false; } log.info("扣减成功,考点时段剩余可约数量:{} {}", result, cacheKey); return true; } /** * 刷新 某考点某时段的“剩余可约数量”缓存 */ public void refreshApplyAvailableCountCache(Long examSiteId, int oldCapacity, int newCapacity) { String lockKey = String.format(CacheConstants.LOCK_EXAM_SITE_CAPACITY, examSiteId); Lock lock = concurrentService.getLock(lockKey); try { if (!lock.tryLock()) { throw new RuntimeException("获取锁失败,不允许同时操作!" + lockKey); } // 获取所有时段ID集合 List timePeriods = timePeriodService.list(new LambdaQueryWrapper().select(TimePeriodEntity::getId)); if (CollectionUtils.isEmpty(timePeriods)) { return; } // 新、旧考点容量差额 int diffCapacity = newCapacity - oldCapacity; for (TimePeriodEntity timePeriod : timePeriods) { String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriod.getId()); if (redisClient.exist(cacheKey)) { if (diffCapacity == 0) { // 考点总容量未变化,不用更新缓存 continue; } // 总容量变化时,则更新 RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey); long availableCount = Math.max(atomic.get() + diffCapacity, 0); atomic.set(availableCount); log.warn("更新考点时段剩余可约数量:{} {}", availableCount, cacheKey); } else { RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey); atomic.set(newCapacity); log.warn("初始考点时段剩余可约数量:{} {}", newCapacity, cacheKey); } } } finally { try { lock.unlock(); } catch (Exception e) { log.warn("解锁失败!lockKey:{} err:{}", lockKey, e.getMessage()); } } } /** * 初始或重置计算 所有考点所有时段的“剩余可约数量”缓存 */ public void initApplyAvailableCountCacheForAllExamSites(boolean skipExisted) { // 获取所有考点和考点总容量集合 List examSites = examSiteService.findAllExamSiteCapacityList(); if (CollectionUtils.isEmpty(examSites)) { return; } // 获取所有时段ID集合 List timePeriods = timePeriodService.list(new LambdaQueryWrapper().select(TimePeriodEntity::getId)); if (CollectionUtils.isEmpty(timePeriods)) { return; } for (ExamSiteCapacityInfo examSite : examSites) { String lockKey = String.format(CacheConstants.LOCK_EXAM_SITE_CAPACITY, examSite.getExamSiteId()); Lock lock = concurrentService.getLock(lockKey); try { if (!lock.tryLock()) { log.warn("获取锁失败,不允许同时操作!{}", lockKey); return; } for (TimePeriodEntity timePeriod : timePeriods) { String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSite.getExamSiteId(), timePeriod.getId()); boolean existCache = redisClient.exist(cacheKey); if (existCache && skipExisted) { // 跳过存在的缓存 continue; } // 获取某考点某时段的“已预约数量”(仅查数据库会有误差,需等预约队列中全部完成数据库持久化后再执行!!!) int finishCount = studentApplyService.countApplyFinishForExamSiteAndTimePeriod(examSite.getExamSiteId(), timePeriod.getId()); // 剩余可约数量 int availableCount = Math.max(examSite.getCapacity() - finishCount, 0); RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey); atomic.set(availableCount); log.warn("{}考点时段剩余可约数量:{} {}", existCache ? "更新" : "初始", availableCount, cacheKey); } } finally { try { lock.unlock(); } catch (Exception e) { log.warn("解锁失败!lockKey:{} err:{}", lockKey, e.getMessage()); } } } } /** * 获取某考生的 已完成预约次数缓存(查数据库) */ public int getStudentApplyFinishCountFromDB(Long studentId) { LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(StudentApplyEntity::getStudentId, studentId); wrapper.eq(StudentApplyEntity::getCancel, Boolean.FALSE); return studentApplyService.count(wrapper); } /** * 获取某考生的 已完成预约次数缓存 */ public int getStudentApplyFinishCount(Long studentId) { Map maps = this.getStudentApplyRecords(studentId); int studentApplyFinishCount = 0; for (ApplyRecordCacheBean bean : maps.values()) { if (!bean.getCancel()) { studentApplyFinishCount++; } } return studentApplyFinishCount; } /** * 获取某考生的 所有的“预约记录”缓存 */ public Map getStudentApplyRecords(Long studentId) { String cacheKey = String.format(CACHE_STUDENT_APPLY_RECORD, studentId); if (!redisClient.exist(cacheKey)) { Map maps = this.getStudentApplyRecordsFromDB(studentId); if (MapUtils.isEmpty(maps)) { return new HashMap<>(); } for (ApplyRecordCacheBean bean : maps.values()) { this.saveStudentApplyRecord(bean); } redisClient.expire(cacheKey, CACHE_TIME_OUT_60, TimeUnit.DAYS); return maps; } return redisClient.getEntriesForHash(cacheKey, ApplyRecordCacheBean.class); } /** * 获取某考生的 所有的“预约记录”(查数据库) */ public Map getStudentApplyRecordsFromDB(Long studentId) { LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(StudentApplyEntity::getStudentId, studentId); List list = studentApplyService.list(wrapper); if (CollectionUtils.isEmpty(list)) { return new HashMap<>(); } Map maps = new HashMap<>(); for (StudentApplyEntity entity : list) { ApplyRecordCacheBean bean = new ApplyRecordCacheBean(); bean.setStudentId(entity.getStudentId()); bean.setExamSiteId(entity.getExamSiteId()); bean.setTimePeriodId(entity.getTimePeriodId()); bean.setCancel(entity.getCancel()); bean.setOperateId(entity.getOperateId()); bean.setOperateTime(entity.getUpdateTime()); String hashKey = String.format("%s_%s", entity.getExamSiteId(), entity.getTimePeriodId()); maps.put(hashKey, bean); } return maps; } /** * 获取某考生的 某考点某时段的“预约记录”缓存 */ public ApplyRecordCacheBean getStudentApplyRecord(Long studentId, Long examSiteId, Long timePeriodId) { String cacheKey = String.format(CACHE_STUDENT_APPLY_RECORD, studentId); String hashKey = String.format("%s_%s", examSiteId, timePeriodId); ApplyRecordCacheBean cacheBean = redisClient.getForHash(cacheKey, hashKey, ApplyRecordCacheBean.class); if (cacheBean != null) { return cacheBean; } // 缓存不存在时,从数据库再查一次 StudentApplyEntity entity = this.getStudentApplyRecordFormDB(studentId, examSiteId, timePeriodId); if (entity != null) { cacheBean = new ApplyRecordCacheBean(); cacheBean.setStudentId(entity.getStudentId()); cacheBean.setExamSiteId(entity.getExamSiteId()); cacheBean.setTimePeriodId(entity.getTimePeriodId()); cacheBean.setCancel(entity.getCancel()); cacheBean.setOperateId(entity.getOperateId()); cacheBean.setOperateTime(entity.getUpdateTime()); return cacheBean; } return null; } /** * 获取某考生的 某考点某时段的“预约记录”(查数据库) */ public StudentApplyEntity getStudentApplyRecordFormDB(Long studentId, Long examSiteId, Long timePeriodId) { LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(StudentApplyEntity::getExamSiteId, examSiteId); wrapper.eq(StudentApplyEntity::getTimePeriodId, timePeriodId); wrapper.eq(StudentApplyEntity::getStudentId, studentId); return studentApplyService.getOne(wrapper); } /** * 保存某考生的 某考点某时段的“预约记录”缓存 */ public void saveStudentApplyRecord(ApplyRecordCacheBean value) { String cacheKey = String.format(CACHE_STUDENT_APPLY_RECORD, value.getStudentId()); String hashKey = String.format("%s_%s", value.getExamSiteId(), value.getTimePeriodId()); redisClient.setForHash(cacheKey, hashKey, value); log.info("SET cacheKey:{} hashKey:{} cancel:{}", cacheKey, hashKey, value.getCancel()); } /** * 推送至考生预约队列 */ public boolean pushStudentApplyRecordQueue(ApplyRecordCacheBean value) { try { mqProducer.sendMessage(value); return true; } catch (Exception e) { log.error("【考生预约队列】消息推送失败! {}_{}_{}_{} bizId:{} err:{}", value.getStudentId(), value.getExamSiteId(), value.getTimePeriodId(), value.getCancel(), value.getBizId(), e.getMessage()); } return false; } }