ApplyTaskCacheService.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. package com.qmth.exam.reserve.cache.impl;
  2. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  3. import com.qmth.boot.core.concurrent.service.ConcurrentService;
  4. import com.qmth.exam.reserve.bean.apply.ApplyRecordCacheBean;
  5. import com.qmth.exam.reserve.bean.applytask.CurrentApplyTaskVO;
  6. import com.qmth.exam.reserve.bean.examsite.ExamSiteCapacityInfo;
  7. import com.qmth.exam.reserve.cache.CacheConstants;
  8. import com.qmth.exam.reserve.cache.RedisClient;
  9. import com.qmth.exam.reserve.entity.StudentApplyEntity;
  10. import com.qmth.exam.reserve.entity.TimePeriodEntity;
  11. import com.qmth.exam.reserve.mq.ExamReserveMQProducer;
  12. import com.qmth.exam.reserve.service.*;
  13. import org.apache.commons.collections4.CollectionUtils;
  14. import org.apache.commons.collections4.MapUtils;
  15. import org.redisson.api.RAtomicLong;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.data.redis.core.script.DefaultRedisScript;
  20. import org.springframework.stereotype.Component;
  21. import java.util.*;
  22. import java.util.concurrent.TimeUnit;
  23. import java.util.concurrent.locks.Lock;
  24. @Component
  25. public class ApplyTaskCacheService implements CacheConstants {
  26. private static final Logger log = LoggerFactory.getLogger(ApplyTaskCacheService.class);
  27. @Autowired
  28. private RedisClient redisClient;
  29. @Autowired
  30. private ExamReserveMQProducer mqProducer;
  31. @Autowired
  32. private ApplyTaskService applyTaskService;
  33. @Autowired
  34. private ExamSiteService examSiteService;
  35. @Autowired
  36. private TimePeriodService timePeriodService;
  37. @Autowired
  38. private StudentService studentService;
  39. @Autowired
  40. private StudentApplyService studentApplyService;
  41. @Autowired
  42. private ConcurrentService concurrentService;
  43. /**
  44. * 获取当前启用的预约任务缓存
  45. */
  46. public CurrentApplyTaskVO currentApplyTask(Long orgId) {
  47. String cacheKey = String.format(CACHE_CURRENT_APPLY_TASK, orgId);
  48. CurrentApplyTaskVO value = redisClient.get(cacheKey, CurrentApplyTaskVO.class);
  49. if (value != null) {
  50. return value;
  51. }
  52. value = applyTaskService.currentApplyTask(orgId);
  53. if (value == null) {
  54. return null;
  55. }
  56. redisClient.set(cacheKey, value, CACHE_TIME_OUT_10, TimeUnit.MINUTES);
  57. log.info("SET cacheKey:{} curApplyTaskId:{}", cacheKey, value.getTaskId());
  58. return value;
  59. }
  60. /**
  61. * 清除当前启用的预约任务缓存
  62. */
  63. public void clearCurrentApplyTaskCache(Long orgId) {
  64. String cacheKey = String.format(CACHE_CURRENT_APPLY_TASK, orgId);
  65. redisClient.delete(cacheKey);
  66. log.warn("DELETE cacheKey:{}", cacheKey);
  67. }
  68. /**
  69. * 获取某考生的“允许预约时段次数”缓存
  70. */
  71. public int getStudentApplyNumber(Long studentId) {
  72. String cacheKey = String.format(CACHE_STUDENT_APPLY_NUMBER, studentId);
  73. Integer value = redisClient.get(cacheKey, Integer.class);
  74. if (value != null) {
  75. return value;
  76. }
  77. value = studentService.findStudentApplyNumberById(studentId);
  78. redisClient.set(cacheKey, value, CACHE_TIME_OUT_10, TimeUnit.MINUTES);
  79. log.info("SET cacheKey:{} value:{}", cacheKey, value);
  80. return value;
  81. }
  82. /**
  83. * 清除某考生的“允许预约时段次数”缓存
  84. */
  85. public void clearStudentApplyNumberCache(Long studentId) {
  86. String cacheKey = String.format(CACHE_STUDENT_APPLY_NUMBER, studentId);
  87. redisClient.delete(cacheKey);
  88. log.warn("DELETE cacheKey:{}", cacheKey);
  89. }
  90. /**
  91. * 累加 预约队列业务流水号缓存
  92. */
  93. public long increaseBizId() {
  94. RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(QUEUE_APPLY_BIZ_ID);
  95. return atomic.incrementAndGet();
  96. }
  97. /**
  98. * 获取某考点某时段的“剩余可约数量”缓存
  99. */
  100. public int getApplyAvailableCount(Long examSiteId, Long timePeriodId) {
  101. String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriodId);
  102. RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
  103. return (int) atomic.get();//若缓存不存在时,会返回0
  104. }
  105. /**
  106. * 累加 某考点某时段的“剩余可约数量”缓存
  107. */
  108. public void increaseApplyAvailableCount(Long examSiteId, Long timePeriodId) {
  109. String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriodId);
  110. RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
  111. long value = atomic.incrementAndGet();
  112. log.warn("累加成功,考点时段剩余可约数量:{} {}", value, cacheKey);
  113. }
  114. /**
  115. * 累减 某考点某时段的“剩余可约数量”缓存
  116. */
  117. public boolean decreaseApplyAvailableCount(Long examSiteId, Long timePeriodId) {
  118. String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriodId);
  119. StringBuilder luaScript = new StringBuilder();
  120. luaScript.append("local key = KEYS[1];");
  121. luaScript.append("if (redis.call('exists', key) == 1) then");
  122. luaScript.append(" local stock = tonumber(redis.call('get', key));");
  123. luaScript.append(" local amount = tonumber(ARGV[1]);");
  124. luaScript.append(" if (stock >= amount) then");
  125. luaScript.append(" return redis.call('decrby', key, amount);");
  126. luaScript.append(" end;");
  127. luaScript.append(" return -1;");
  128. luaScript.append("end;");
  129. luaScript.append("return -2;");
  130. List<String> keys = Collections.singletonList(cacheKey);
  131. Object[] args = new Object[]{1};
  132. // 执行Lua脚本,传入键列表和参数列表
  133. DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript.toString(), Long.class);
  134. Long result = redisClient.getRedisTemplate().execute(redisScript, keys, args);
  135. Objects.requireNonNull(result, "缓存操作异常,返回结果为空!");
  136. if (-2 == result) {
  137. log.warn("扣减失败,考点时段剩余可约数量缓存不存在!{}", cacheKey);
  138. return false;
  139. } else if (-1 == result) {
  140. log.warn("扣减失败,考点时段剩余可约数量为0不足!{}", cacheKey);
  141. return false;
  142. }
  143. log.info("扣减成功,考点时段剩余可约数量:{} {}", result, cacheKey);
  144. return true;
  145. }
  146. /**
  147. * 刷新 某考点某时段的“剩余可约数量”缓存
  148. */
  149. public void refreshApplyAvailableCountCache(Long examSiteId, int oldCapacity, int newCapacity) {
  150. String lockKey = String.format(CacheConstants.LOCK_EXAM_SITE_CAPACITY, examSiteId);
  151. Lock lock = concurrentService.getLock(lockKey);
  152. try {
  153. if (!lock.tryLock()) {
  154. throw new RuntimeException("获取锁失败,不允许同时操作!" + lockKey);
  155. }
  156. // 获取所有时段ID集合
  157. List<TimePeriodEntity> timePeriods = timePeriodService.list(new LambdaQueryWrapper<TimePeriodEntity>().select(TimePeriodEntity::getId));
  158. if (CollectionUtils.isEmpty(timePeriods)) {
  159. return;
  160. }
  161. // 新、旧考点容量差额
  162. int diffCapacity = newCapacity - oldCapacity;
  163. for (TimePeriodEntity timePeriod : timePeriods) {
  164. String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSiteId, timePeriod.getId());
  165. if (redisClient.exist(cacheKey)) {
  166. if (diffCapacity == 0) {
  167. // 考点总容量未变化,不用更新缓存
  168. continue;
  169. }
  170. // 总容量变化时,则更新
  171. RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
  172. long availableCount = Math.max(atomic.get() + diffCapacity, 0);
  173. atomic.set(availableCount);
  174. log.warn("更新考点时段剩余可约数量:{} {}", availableCount, cacheKey);
  175. } else {
  176. RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
  177. atomic.set(newCapacity);
  178. log.warn("初始考点时段剩余可约数量:{} {}", newCapacity, cacheKey);
  179. }
  180. }
  181. } finally {
  182. try {
  183. lock.unlock();
  184. } catch (Exception e) {
  185. log.warn("解锁失败!lockKey:{} err:{}", lockKey, e.getMessage());
  186. }
  187. }
  188. }
  189. /**
  190. * 初始或重置计算 所有考点所有时段的“剩余可约数量”缓存
  191. */
  192. public void initApplyAvailableCountCacheForAllExamSites(boolean skipExisted) {
  193. // 获取所有考点和考点总容量集合
  194. List<ExamSiteCapacityInfo> examSites = examSiteService.findAllExamSiteCapacityList();
  195. if (CollectionUtils.isEmpty(examSites)) {
  196. return;
  197. }
  198. // 获取所有时段ID集合
  199. List<TimePeriodEntity> timePeriods = timePeriodService.list(new LambdaQueryWrapper<TimePeriodEntity>().select(TimePeriodEntity::getId));
  200. if (CollectionUtils.isEmpty(timePeriods)) {
  201. return;
  202. }
  203. for (ExamSiteCapacityInfo examSite : examSites) {
  204. String lockKey = String.format(CacheConstants.LOCK_EXAM_SITE_CAPACITY, examSite.getExamSiteId());
  205. Lock lock = concurrentService.getLock(lockKey);
  206. try {
  207. if (!lock.tryLock()) {
  208. log.warn("获取锁失败,不允许同时操作!{}", lockKey);
  209. return;
  210. }
  211. for (TimePeriodEntity timePeriod : timePeriods) {
  212. String cacheKey = String.format(CACHE_APPLY_AVAILABLE_COUNT, examSite.getExamSiteId(), timePeriod.getId());
  213. boolean existCache = redisClient.exist(cacheKey);
  214. if (existCache && skipExisted) {
  215. // 跳过存在的缓存
  216. continue;
  217. }
  218. // 获取某考点某时段的“已预约数量”(仅查数据库会有误差,需等预约队列中全部完成数据库持久化后再执行!!!)
  219. int finishCount = studentApplyService.countApplyFinishForExamSiteAndTimePeriod(examSite.getExamSiteId(), timePeriod.getId());
  220. // 剩余可约数量
  221. int availableCount = Math.max(examSite.getCapacity() - finishCount, 0);
  222. RAtomicLong atomic = redisClient.getRedissonClient().getAtomicLong(cacheKey);
  223. atomic.set(availableCount);
  224. log.warn("{}考点时段剩余可约数量:{} {}", existCache ? "更新" : "初始", availableCount, cacheKey);
  225. }
  226. } finally {
  227. try {
  228. lock.unlock();
  229. } catch (Exception e) {
  230. log.warn("解锁失败!lockKey:{} err:{}", lockKey, e.getMessage());
  231. }
  232. }
  233. }
  234. }
  235. /**
  236. * 获取某考生的 已完成预约次数缓存(查数据库)
  237. */
  238. public int getStudentApplyFinishCountFromDB(Long studentId) {
  239. LambdaQueryWrapper<StudentApplyEntity> wrapper = new LambdaQueryWrapper<>();
  240. wrapper.eq(StudentApplyEntity::getStudentId, studentId);
  241. wrapper.eq(StudentApplyEntity::getCancel, Boolean.FALSE);
  242. return studentApplyService.count(wrapper);
  243. }
  244. /**
  245. * 获取某考生的 已完成预约次数缓存
  246. */
  247. public int getStudentApplyFinishCount(Long studentId) {
  248. Map<String, ApplyRecordCacheBean> maps = this.getStudentApplyRecords(studentId);
  249. int studentApplyFinishCount = 0;
  250. for (ApplyRecordCacheBean bean : maps.values()) {
  251. if (!bean.getCancel()) {
  252. studentApplyFinishCount++;
  253. }
  254. }
  255. return studentApplyFinishCount;
  256. }
  257. /**
  258. * 获取某考生的 所有的“预约记录”缓存
  259. */
  260. public Map<String, ApplyRecordCacheBean> getStudentApplyRecords(Long studentId) {
  261. String cacheKey = String.format(CACHE_STUDENT_APPLY_RECORD, studentId);
  262. if (!redisClient.exist(cacheKey)) {
  263. Map<String, ApplyRecordCacheBean> maps = this.getStudentApplyRecordsFromDB(studentId);
  264. if (MapUtils.isEmpty(maps)) {
  265. return new HashMap<>();
  266. }
  267. for (ApplyRecordCacheBean bean : maps.values()) {
  268. this.saveStudentApplyRecord(bean);
  269. }
  270. redisClient.expire(cacheKey, CACHE_TIME_OUT_60, TimeUnit.DAYS);
  271. return maps;
  272. }
  273. return redisClient.getEntriesForHash(cacheKey, ApplyRecordCacheBean.class);
  274. }
  275. /**
  276. * 获取某考生的 所有的“预约记录”(查数据库)
  277. */
  278. public Map<String, ApplyRecordCacheBean> getStudentApplyRecordsFromDB(Long studentId) {
  279. LambdaQueryWrapper<StudentApplyEntity> wrapper = new LambdaQueryWrapper<>();
  280. wrapper.eq(StudentApplyEntity::getStudentId, studentId);
  281. List<StudentApplyEntity> list = studentApplyService.list(wrapper);
  282. if (CollectionUtils.isEmpty(list)) {
  283. return new HashMap<>();
  284. }
  285. Map<String, ApplyRecordCacheBean> maps = new HashMap<>();
  286. for (StudentApplyEntity entity : list) {
  287. ApplyRecordCacheBean bean = new ApplyRecordCacheBean();
  288. bean.setStudentId(entity.getStudentId());
  289. bean.setExamSiteId(entity.getExamSiteId());
  290. bean.setTimePeriodId(entity.getTimePeriodId());
  291. bean.setCancel(entity.getCancel());
  292. bean.setOperateId(entity.getOperateId());
  293. bean.setOperateTime(entity.getUpdateTime());
  294. String hashKey = String.format("%s_%s", entity.getExamSiteId(), entity.getTimePeriodId());
  295. maps.put(hashKey, bean);
  296. }
  297. return maps;
  298. }
  299. /**
  300. * 获取某考生的 某考点某时段的“预约记录”缓存
  301. */
  302. public ApplyRecordCacheBean getStudentApplyRecord(Long studentId, Long examSiteId, Long timePeriodId) {
  303. String cacheKey = String.format(CACHE_STUDENT_APPLY_RECORD, studentId);
  304. String hashKey = String.format("%s_%s", examSiteId, timePeriodId);
  305. ApplyRecordCacheBean cacheBean = redisClient.getForHash(cacheKey, hashKey, ApplyRecordCacheBean.class);
  306. if (cacheBean != null) {
  307. return cacheBean;
  308. }
  309. // 缓存不存在时,从数据库再查一次
  310. StudentApplyEntity entity = this.getStudentApplyRecordFormDB(studentId, examSiteId, timePeriodId);
  311. if (entity != null) {
  312. cacheBean = new ApplyRecordCacheBean();
  313. cacheBean.setStudentId(entity.getStudentId());
  314. cacheBean.setExamSiteId(entity.getExamSiteId());
  315. cacheBean.setTimePeriodId(entity.getTimePeriodId());
  316. cacheBean.setCancel(entity.getCancel());
  317. cacheBean.setOperateId(entity.getOperateId());
  318. cacheBean.setOperateTime(entity.getUpdateTime());
  319. return cacheBean;
  320. }
  321. return null;
  322. }
  323. /**
  324. * 获取某考生的 某考点某时段的“预约记录”(查数据库)
  325. */
  326. public StudentApplyEntity getStudentApplyRecordFormDB(Long studentId, Long examSiteId, Long timePeriodId) {
  327. LambdaQueryWrapper<StudentApplyEntity> wrapper = new LambdaQueryWrapper<>();
  328. wrapper.eq(StudentApplyEntity::getExamSiteId, examSiteId);
  329. wrapper.eq(StudentApplyEntity::getTimePeriodId, timePeriodId);
  330. wrapper.eq(StudentApplyEntity::getStudentId, studentId);
  331. return studentApplyService.getOne(wrapper);
  332. }
  333. /**
  334. * 保存某考生的 某考点某时段的“预约记录”缓存
  335. */
  336. public void saveStudentApplyRecord(ApplyRecordCacheBean value) {
  337. String cacheKey = String.format(CACHE_STUDENT_APPLY_RECORD, value.getStudentId());
  338. String hashKey = String.format("%s_%s", value.getExamSiteId(), value.getTimePeriodId());
  339. redisClient.setForHash(cacheKey, hashKey, value);
  340. log.info("SET cacheKey:{} hashKey:{} cancel:{}", cacheKey, hashKey, value.getCancel());
  341. }
  342. /**
  343. * 推送至考生预约队列
  344. */
  345. public boolean pushStudentApplyRecordQueue(ApplyRecordCacheBean value) {
  346. try {
  347. mqProducer.sendMessage(value);
  348. return true;
  349. } catch (Exception e) {
  350. log.error("【考生预约队列】消息推送失败! {}_{}_{}_{} bizId:{} err:{}", value.getStudentId(), value.getExamSiteId(),
  351. value.getTimePeriodId(), value.getCancel(), value.getBizId(), e.getMessage());
  352. }
  353. return false;
  354. }
  355. }