123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532 |
- package com.qmth.exam.reserve.cache.impl;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.qmth.boot.core.concurrent.service.ConcurrentService;
- import com.qmth.exam.reserve.bean.apply.ApplyRecordCacheBean;
- import com.qmth.exam.reserve.bean.applytask.CurrentApplyTaskVO;
- import com.qmth.exam.reserve.bean.examsite.ExamSiteCapacityInfo;
- import com.qmth.exam.reserve.cache.CacheConstants;
- import com.qmth.exam.reserve.cache.RedisClient;
- import com.qmth.exam.reserve.entity.StudentApplyEntity;
- import com.qmth.exam.reserve.entity.TimePeriodEntity;
- import com.qmth.exam.reserve.mq.ExamReserveMQProducer;
- import com.qmth.exam.reserve.service.*;
- import org.apache.commons.collections4.CollectionUtils;
- import org.apache.commons.collections4.MapUtils;
- import org.apache.commons.lang3.ArrayUtils;
- import org.redisson.api.RAtomicLong;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.script.DefaultRedisScript;
- import org.springframework.stereotype.Component;
- import java.util.*;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Lock;
- @Component
- public class ApplyTaskCacheService implements CacheConstants {
- private static final Logger log = LoggerFactory.getLogger(ApplyTaskCacheService.class);
- @Autowired
- private RedisClient redisClient;
- @Autowired
- private ExamReserveMQProducer mqProducer;
- @Autowired
- private ApplyTaskService applyTaskService;
- @Autowired
- private ExamSiteService examSiteService;
- @Autowired
- private TimePeriodService timePeriodService;
- @Autowired
- private StudentService studentService;
- @Autowired
- private StudentApplyService studentApplyService;
- @Autowired
- private ConcurrentService concurrentService;
- /**
- * 获取当前启用的预约任务缓存
- */
- public CurrentApplyTaskVO currentApplyTask(Long orgId) {
- if (orgId == null) {
- throw new RuntimeException("参数orgId值不能为空");
- }
- String cacheKey = String.format(CACHE_CURRENT_APPLY_TASK, orgId);
- CurrentApplyTaskVO value = redisClient.get(cacheKey, CurrentApplyTaskVO.class);
- if (value != null) {
- return value;
- }
- value = applyTaskService.currentApplyTask(orgId);
- if (value == null) {
- return null;
- }
- redisClient.set(cacheKey, value, CACHE_TIME_OUT_10, TimeUnit.MINUTES);
- log.info("SET cacheKey:{} curApplyTaskId:{}", cacheKey, value.getTaskId());
- return value;
- }
- /**
- * 清除当前启用的预约任务缓存
- */
- public void clearCurrentApplyTaskCache(Long orgId) {
- String cacheKey = String.format(CACHE_CURRENT_APPLY_TASK, orgId);
- redisClient.delete(cacheKey);
- log.warn("DELETE cacheKey:{}", cacheKey);
- }
- /**
- * 获取某考生的“允许预约时段次数”缓存
- */
- public int getStudentApplyNumber(Long studentId) {
- String cacheKey = String.format(CACHE_STUDENT_APPLY_NUMBER, studentId);
- Integer value = redisClient.get(cacheKey, Integer.class);
- if (value != null) {
- return value;
- }
- value = studentService.findStudentApplyNumberById(studentId);
- redisClient.set(cacheKey, value, CACHE_TIME_OUT_10, TimeUnit.MINUTES);
- log.info("SET cacheKey:{} value:{}", cacheKey, value);
- return value;
- }
- /**
- * 清除某考生的“允许预约时段次数”缓存
- */
- public void clearStudentApplyNumberCache(Long studentId) {
- String cacheKey = String.format(CACHE_STUDENT_APPLY_NUMBER, studentId);
- redisClient.delete(cacheKey);
- log.warn("DELETE cacheKey:{}", cacheKey);
- }
- /**
- * 累加 预约队列业务流水号缓存
- */
- public long increaseBizId() {
- RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(QUEUE_APPLY_BIZ_ID);
- return atomic.incrementAndGet();
- }
- /**
- * 获取某考点某时段的“剩余可约数量”缓存
- */
- public int getApplyAvailableCount(Long examSiteId, Long timePeriodId) {
- String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriodId);
- RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
- return (int) atomic.get();//若缓存不存在时,会返回0
- }
- /**
- * 累加 某考点某时段的“剩余可约数量”缓存
- */
- public void increaseApplyAvailableCount(Long examSiteId, Long timePeriodId) {
- String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriodId);
- RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
- long value = atomic.incrementAndGet();
- log.warn("累加成功,考点时段剩余可约数量:{} {}", value, cacheKey);
- }
- /**
- * 累减 某考点某时段的“剩余可约数量”缓存
- */
- public boolean decreaseApplyAvailableCount(Long examSiteId, Long timePeriodId) {
- String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriodId);
- StringBuilder luaScript = new StringBuilder();
- luaScript.append("local key = KEYS[1];");
- luaScript.append("if (redis.call('exists', key) == 1) then");
- luaScript.append(" local stock = tonumber(redis.call('get', key));");
- luaScript.append(" local amount = tonumber(ARGV[1]);");
- luaScript.append(" if (stock >= amount) then");
- luaScript.append(" return redis.call('decrby', key, amount);");
- luaScript.append(" end;");
- luaScript.append(" return -1;");
- luaScript.append("end;");
- luaScript.append("return -2;");
- List<String> keys = Collections.singletonList(cacheKey);
- Object[] args = new Object[]{1};
- // 执行Lua脚本,传入键列表和参数列表
- DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript.toString(), Long.class);
- Long result = redisClient.getRedisTemplate().execute(redisScript, keys, args);
- Objects.requireNonNull(result, "缓存操作异常,返回结果为空!");
- if (-2 == result) {
- log.warn("扣减失败,考点时段剩余可约数量缓存不存在!{}", cacheKey);
- return false;
- } else if (-1 == result) {
- log.warn("扣减失败,考点时段剩余可约数量为0不足!{}", cacheKey);
- return false;
- }
- log.info("扣减成功,考点时段剩余可约数量:{} {}", result, cacheKey);
- return true;
- }
- /**
- * 刷新 某考点下所有时段的“剩余可约数量”缓存
- */
- @Deprecated
- public void refreshApplyAvailableCountCache(Long examSiteId, int oldCapacity, int newCapacity) {
- String lockKey = String.format(CacheConstants.LOCK_EXAM_SITE_CAPACITY, examSiteId);
- Lock lock = concurrentService.getLock(lockKey);
- try {
- if (!lock.tryLock()) {
- throw new RuntimeException("获取锁失败,不允许同时操作!" + lockKey);
- }
- // 获取所有时段ID集合
- List<TimePeriodEntity> timePeriods = timePeriodService.list(new LambdaQueryWrapper<TimePeriodEntity>().select(TimePeriodEntity::getId));
- if (CollectionUtils.isEmpty(timePeriods)) {
- return;
- }
- // 新、旧考点容量差额
- int diffCapacity = newCapacity - oldCapacity;
- for (TimePeriodEntity timePeriod : timePeriods) {
- String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriod.getId());
- if (redisClient.exist(cacheKey)) {
- if (diffCapacity == 0) {
- // 考点总容量未变化,不用更新缓存
- continue;
- }
- // 总容量变化时,则更新(注意:在所有操作减少某个考场容量时,缩减后的实际容量不能低于考生已预约数量)
- RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
- long curAvailableCount = atomic.get();
- long newAvailableCount = Math.max(curAvailableCount + diffCapacity, 0);
- atomic.set(newAvailableCount);
- log.warn("考点时段剩余可约数量从{}更新为{} oldCapacity:{} newCapacity:{} {}",
- curAvailableCount, newAvailableCount, oldCapacity, newCapacity, cacheKey);
- } else {
- RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
- int finishCount = studentApplyService.countApplyFinishForExamSiteAndTimePeriod(examSiteId, timePeriod.getId());
- long newAvailableCount = Math.max(newCapacity - finishCount, 0);
- atomic.set(newAvailableCount);
- log.warn("缓存不存在,初始考点时段剩余可约数量:{} oldCapacity:{} newCapacity:{} finishCount:{} {}",
- newAvailableCount, oldCapacity, newCapacity, finishCount, cacheKey);
- }
- }
- } finally {
- try {
- lock.unlock();
- } catch (Exception e) {
- log.warn("解锁失败!lockKey:{} err:{}", lockKey, e.getMessage());
- }
- }
- }
- /**
- * 刷新 某考点某时段的“剩余可约数量”缓存
- */
- public void refreshApplyAvailableCountCache(Long examSiteId, Long timePeriodId, int oldCapacity, int newCapacity) {
- String lockKey = String.format(CacheConstants.LOCK_EXAM_SITE_CAPACITY, examSiteId);
- Lock lock = concurrentService.getLock(lockKey);
- try {
- if (!lock.tryLock()) {
- throw new RuntimeException("获取锁失败,不允许同时操作!" + lockKey);
- }
- // 新、旧考点容量差额
- int diffCapacity = newCapacity - oldCapacity;
- String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriodId);
- if (redisClient.exist(cacheKey)) {
- if (diffCapacity == 0) {
- // 考点总容量未变化,不用更新缓存
- return;
- }
- // 总容量变化时,则更新(注意:在所有操作减少某个考场容量时,缩减后的实际容量不能低于考生已预约数量)
- RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
- long curAvailableCount = atomic.get();
- long newAvailableCount = Math.max(curAvailableCount + diffCapacity, 0);
- atomic.set(newAvailableCount);
- log.warn("考点时段剩余可约数量从{}更新为{} oldCapacity:{} newCapacity:{} {}",
- curAvailableCount, newAvailableCount, oldCapacity, newCapacity, cacheKey);
- } else {
- RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
- int finishCount = studentApplyService.countApplyFinishForExamSiteAndTimePeriod(examSiteId, timePeriodId);
- long newAvailableCount = Math.max(newCapacity - finishCount, 0);
- atomic.set(newAvailableCount);
- log.warn("缓存不存在,初始考点时段剩余可约数量:{} oldCapacity:{} newCapacity:{} finishCount:{} {}",
- newAvailableCount, oldCapacity, newCapacity, finishCount, cacheKey);
- }
- } finally {
- try {
- lock.unlock();
- } catch (Exception e) {
- log.warn("解锁失败!lockKey:{} err:{}", lockKey, e.getMessage());
- }
- }
- }
- /**
- * 初始或重置计算 所有考点所有时段的“剩余可约数量”缓存
- */
- public void initApplyAvailableCountCacheForAllExamSites(boolean skipExisted) {
- // 获取所有考点和考点总容量集合
- List<ExamSiteCapacityInfo> examSites = examSiteService.findAllExamSiteCapacityList();
- if (CollectionUtils.isEmpty(examSites)) {
- return;
- }
- // 获取所有时段ID集合
- List<TimePeriodEntity> timePeriods = timePeriodService.list(new LambdaQueryWrapper<TimePeriodEntity>().select(TimePeriodEntity::getId));
- if (CollectionUtils.isEmpty(timePeriods)) {
- return;
- }
- for (ExamSiteCapacityInfo examSite : examSites) {
- String lockKey = String.format(CacheConstants.LOCK_EXAM_SITE_CAPACITY, examSite.getExamSiteId());
- Lock lock = concurrentService.getLock(lockKey);
- try {
- if (!lock.tryLock()) {
- log.warn("获取锁失败,不允许同时操作!{}", lockKey);
- return;
- }
- for (TimePeriodEntity timePeriod : timePeriods) {
- String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSite.getExamSiteId(), timePeriod.getId());
- boolean existCache = redisClient.exist(cacheKey);
- if (existCache && skipExisted) {
- // 跳过存在的缓存
- continue;
- }
- // 获取某考点某时段的“已预约数量”(仅查数据库会有误差,需等预约队列中全部完成数据库持久化后再执行!!!)
- int finishCount = studentApplyService.countApplyFinishForExamSiteAndTimePeriod(examSite.getExamSiteId(), timePeriod.getId());
- int curCapacity = examSiteService.getExamSiteTimePeriodCapacity(examSite.getExamSiteId(), timePeriod.getId());
- // 剩余可约数量
- int availableCount = Math.max(curCapacity - finishCount, 0);
- RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
- atomic.set(availableCount);
- log.warn("{}考点时段剩余可约数量:{} curCapacity:{} finishCount:{} {}",
- existCache ? "更新" : "初始", availableCount, curCapacity, finishCount, cacheKey);
- }
- } finally {
- try {
- lock.unlock();
- } catch (Exception e) {
- log.warn("解锁失败!lockKey:{} err:{}", lockKey, e.getMessage());
- }
- }
- }
- }
- /**
- * 初始 某些时段与所有考点的“剩余可约数量”缓存
- */
- public void initApplyAvailableCountCacheForTimePeriods(Long... timePeriodIds) {
- if (ArrayUtils.isEmpty(timePeriodIds)) {
- return;
- }
- // 获取所有考点和考点总容量集合
- List<ExamSiteCapacityInfo> examSites = examSiteService.findAllExamSiteCapacityList();
- if (CollectionUtils.isEmpty(examSites)) {
- return;
- }
- for (ExamSiteCapacityInfo examSite : examSites) {
- String lockKey = String.format(CacheConstants.LOCK_EXAM_SITE_CAPACITY, examSite.getExamSiteId());
- Lock lock = concurrentService.getLock(lockKey);
- try {
- if (!lock.tryLock()) {
- throw new RuntimeException("获取锁失败,不允许同时操作!" + lockKey);
- }
- for (Long timePeriodId : timePeriodIds) {
- String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSite.getExamSiteId(), timePeriodId);
- boolean existCache = redisClient.exist(cacheKey);
- if (existCache) {
- // 跳过存在的缓存
- continue;
- }
- // 获取某考点某时段的“已预约数量”
- int finishCount = studentApplyService.countApplyFinishForExamSiteAndTimePeriod(examSite.getExamSiteId(), timePeriodId);
- int curCapacity = examSiteService.getExamSiteTimePeriodCapacity(examSite.getExamSiteId(), timePeriodId);
- // 剩余可约数量
- int availableCount = Math.max(curCapacity - finishCount, 0);
- RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
- atomic.set(availableCount);
- log.warn("初始考点时段剩余可约数量:{} curCapacity:{} finishCount:{} {}",
- availableCount, curCapacity, finishCount, cacheKey);
- }
- } finally {
- try {
- lock.unlock();
- } catch (Exception e) {
- log.warn("解锁失败!lockKey:{} err:{}", lockKey, e.getMessage());
- }
- }
- }
- }
- public void saveTimePeriodUsedCache(Long timePeriodId) {
- String cacheKey = String.format(CACHE_TIME_PERIOD_USED, timePeriodId);
- redisClient.set(cacheKey, true, CACHE_TIME_OUT_1, TimeUnit.DAYS);
- log.info("SET cacheKey:{}", cacheKey);
- }
- public boolean existTimePeriodUsedCache(Long timePeriodId) {
- String cacheKey = String.format(CACHE_TIME_PERIOD_USED, timePeriodId);
- return redisClient.exist(cacheKey);
- }
- /**
- * 获取某考生的 已完成预约次数缓存(查数据库)
- */
- public int getStudentApplyFinishCountFromDB(Long studentId) {
- LambdaQueryWrapper<StudentApplyEntity> wrapper = new LambdaQueryWrapper<>();
- wrapper.eq(StudentApplyEntity::getStudentId, studentId);
- wrapper.eq(StudentApplyEntity::getCancel, Boolean.FALSE);
- return studentApplyService.count(wrapper);
- }
- /**
- * 获取某考生的 已完成预约次数缓存
- */
- public int getStudentApplyFinishCount(Long studentId) {
- Map<String, ApplyRecordCacheBean> maps = this.getStudentApplyRecords(studentId);
- int studentApplyFinishCount = 0;
- for (ApplyRecordCacheBean bean : maps.values()) {
- if (!bean.getCancel()) {
- studentApplyFinishCount++;
- }
- }
- return studentApplyFinishCount;
- }
- /**
- * 获取某考生的 所有的“预约记录”缓存
- */
- public Map<String, ApplyRecordCacheBean> getStudentApplyRecords(Long studentId) {
- String cacheKey = String.format(CACHE_STUDENT_APPLY_RECORD, studentId);
- if (!redisClient.exist(cacheKey)) {
- Map<String, ApplyRecordCacheBean> maps = this.getStudentApplyRecordsFromDB(studentId);
- if (MapUtils.isEmpty(maps)) {
- return new HashMap<>();
- }
- for (ApplyRecordCacheBean bean : maps.values()) {
- this.saveStudentApplyRecord(bean);
- }
- redisClient.expire(cacheKey, CACHE_TIME_OUT_60, TimeUnit.DAYS);
- return maps;
- }
- return redisClient.getEntriesForHash(cacheKey, ApplyRecordCacheBean.class);
- }
- /**
- * 获取某考生的 所有的“预约记录”(查数据库)
- */
- public Map<String, ApplyRecordCacheBean> getStudentApplyRecordsFromDB(Long studentId) {
- LambdaQueryWrapper<StudentApplyEntity> wrapper = new LambdaQueryWrapper<>();
- wrapper.eq(StudentApplyEntity::getStudentId, studentId);
- List<StudentApplyEntity> list = studentApplyService.list(wrapper);
- if (CollectionUtils.isEmpty(list)) {
- return new HashMap<>();
- }
- Map<String, ApplyRecordCacheBean> maps = new HashMap<>();
- for (StudentApplyEntity entity : list) {
- ApplyRecordCacheBean bean = new ApplyRecordCacheBean();
- bean.setStudentId(entity.getStudentId());
- bean.setExamSiteId(entity.getExamSiteId());
- bean.setTimePeriodId(entity.getTimePeriodId());
- bean.setCancel(entity.getCancel());
- bean.setOperateId(entity.getOperateId());
- bean.setOperateTime(entity.getUpdateTime());
- String hashKey = String.format("%s_%s", entity.getExamSiteId(), entity.getTimePeriodId());
- maps.put(hashKey, bean);
- }
- return maps;
- }
- /**
- * 获取某考生的 某考点某时段的“预约记录”缓存
- */
- public ApplyRecordCacheBean getStudentApplyRecord(Long studentId, Long examSiteId, Long timePeriodId) {
- String cacheKey = String.format(CACHE_STUDENT_APPLY_RECORD, studentId);
- String hashKey = String.format("%s_%s", examSiteId, timePeriodId);
- ApplyRecordCacheBean cacheBean = redisClient.getForHash(cacheKey, hashKey, ApplyRecordCacheBean.class);
- if (cacheBean != null) {
- return cacheBean;
- }
- // 缓存不存在时,从数据库再查一次
- /*
- StudentApplyEntity entity = this.getStudentApplyRecordFormDB(studentId, examSiteId, timePeriodId);
- if (entity != null) {
- cacheBean = new ApplyRecordCacheBean();
- cacheBean.setStudentId(entity.getStudentId());
- cacheBean.setExamSiteId(entity.getExamSiteId());
- cacheBean.setTimePeriodId(entity.getTimePeriodId());
- cacheBean.setCancel(entity.getCancel());
- cacheBean.setOperateId(entity.getOperateId());
- cacheBean.setOperateTime(entity.getUpdateTime());
- return cacheBean;
- }*/
- return null;
- }
- /**
- * 获取某考生的 某考点某时段的“预约记录”(查数据库)
- */
- public StudentApplyEntity getStudentApplyRecordFormDB(Long studentId, Long examSiteId, Long timePeriodId) {
- LambdaQueryWrapper<StudentApplyEntity> wrapper = new LambdaQueryWrapper<>();
- wrapper.eq(StudentApplyEntity::getExamSiteId, examSiteId);
- wrapper.eq(StudentApplyEntity::getTimePeriodId, timePeriodId);
- wrapper.eq(StudentApplyEntity::getStudentId, studentId);
- return studentApplyService.getOne(wrapper);
- }
- /**
- * 保存某考生的 某考点某时段的“预约记录”缓存
- */
- public void saveStudentApplyRecord(ApplyRecordCacheBean value) {
- String cacheKey = String.format(CACHE_STUDENT_APPLY_RECORD, value.getStudentId());
- String hashKey = String.format("%s_%s", value.getExamSiteId(), value.getTimePeriodId());
- redisClient.setForHash(cacheKey, hashKey, value);
- log.info("SET cacheKey:{} hashKey:{} cancel:{}", cacheKey, hashKey, value.getCancel());
- }
- /**
- * 推送至考生预约队列
- */
- public boolean pushStudentApplyRecordQueue(ApplyRecordCacheBean value) {
- try {
- mqProducer.sendMessage(value);
- return true;
- } catch (Exception e) {
- log.error("【考生预约队列】消息推送失败! {}_{}_{}_{} bizId:{} err:{}", value.getStudentId(), value.getExamSiteId(),
- value.getTimePeriodId(), value.getCancel(), value.getBizId(), e.getMessage());
- }
- return false;
- }
- }
|