浏览代码

断点逻辑优化+锁优化

wangliang 4 年之前
父节点
当前提交
9c3e4e9b74

+ 61 - 57
themis-backend/src/main/java/com/qmth/themis/backend/api/TEExamReexamController.java

@@ -93,32 +93,38 @@ public class TEExamReexamController {
             List<TEExamStudent> teExamStudentList = teExamStudentService.listByIds(examStudentIdList);
             List<TEExamStudent> teExamStudentList = teExamStudentService.listByIds(examStudentIdList);
             for (TEExamStudent teExamStudent : teExamStudentList) {
             for (TEExamStudent teExamStudent : teExamStudentList) {
                 if (redisUtil.lock(SystemConstant.REDIS_LOCK_EXAM_STUDENT_PREFIX + teExamStudent.getId(), SystemConstant.REDIS_LOCK_REEXAM_EXAM_STUDENT_TIME_OUT)) {
                 if (redisUtil.lock(SystemConstant.REDIS_LOCK_EXAM_STUDENT_PREFIX + teExamStudent.getId(), SystemConstant.REDIS_LOCK_REEXAM_EXAM_STUDENT_TIME_OUT)) {
-                    ExamCacheBean examCacheBean = teExamService.getExamCacheBean(teExamStudent.getExamId());//考试缓存
-                    if (Objects.isNull(examCacheBean)) {
-                        throw new BusinessException("考试批次[" + examCacheBean + "]不存在");
-                    }
-                    reexamAuditing = examCacheBean.getReexamAuditing();
-                    status = Objects.isNull(reexamAuditing) || reexamAuditing.intValue() == 0 ? 0 : 1;
-                    TEExamReexam teExamReexam = new TEExamReexam(teExamStudent.getExamId(), teExamStudent.getExamActivityId(), teExamStudent.getId(), model, reason, status, Objects.isNull(mapParameter.get("remark")) ? null : String.valueOf(mapParameter.get("remark")));
-                    teExamReexam.setCreateId(tbUser.getId());
-                    if (Objects.nonNull(status) && status.intValue() == 1) {
-                        //这里查询该机构下所有为管理员角色的账号
-                        List<TBUser> tbUserList = tbUserRoleService.userQueryByRole(tbUser.getOrgId(), RoleEnum.ADMIN.name());
-                        if (Objects.nonNull(tbUserList) && tbUserList.size() > 0) {
-                            for (TBUser t : tbUserList) {
-                                TEExamReexamAuditing teExamReexamAuditing = new TEExamReexamAuditing(teExamReexam.getId(), t.getId());
-                                teExamReexamAuditingList.add(teExamReexamAuditing);
+                    try {
+                        ExamCacheBean examCacheBean = teExamService.getExamCacheBean(teExamStudent.getExamId());//考试缓存
+                        if (Objects.isNull(examCacheBean)) {
+                            throw new BusinessException("考试批次[" + examCacheBean + "]不存在");
+                        }
+                        reexamAuditing = examCacheBean.getReexamAuditing();
+                        status = Objects.isNull(reexamAuditing) || reexamAuditing.intValue() == 0 ? 0 : 1;
+                        TEExamReexam teExamReexam = new TEExamReexam(teExamStudent.getExamId(), teExamStudent.getExamActivityId(), teExamStudent.getId(), model, reason, status, Objects.isNull(mapParameter.get("remark")) ? null : String.valueOf(mapParameter.get("remark")));
+                        teExamReexam.setCreateId(tbUser.getId());
+                        if (Objects.nonNull(status) && status.intValue() == 1) {
+                            //这里查询该机构下所有为管理员角色的账号
+                            List<TBUser> tbUserList = tbUserRoleService.userQueryByRole(tbUser.getOrgId(), RoleEnum.ADMIN.name());
+                            if (Objects.nonNull(tbUserList) && tbUserList.size() > 0) {
+                                for (TBUser t : tbUserList) {
+                                    TEExamReexamAuditing teExamReexamAuditing = new TEExamReexamAuditing(teExamReexam.getId(), t.getId());
+                                    teExamReexamAuditingList.add(teExamReexamAuditing);
+                                }
                             }
                             }
+                        } else if (Objects.nonNull(status) && status.intValue() == 0) {//无需审核时考生已考次数-1
+                            examStudentIdNotAuditingList.add(teExamStudent.getId());
+                            teExamReexam.setAuditingId(tbUser.getId());
+                            teExamReexam.setAuditingStatus(0);
+                            teExamReexam.setAuditingTime(System.currentTimeMillis());
+                            teExamReexam.setAuditingSuggest("无需审核");
+                            teExamReexam.setUpdateId(tbUser.getId());
+                        }
+                        teExamReexamService.save(teExamReexam);
+                    } finally {
+                        if (Objects.nonNull(teExamStudent)) {
+                            redisUtil.releaseLock(SystemConstant.REDIS_LOCK_EXAM_STUDENT_PREFIX + teExamStudent.getId());
                         }
                         }
-                    } else if (Objects.nonNull(status) && status.intValue() == 0) {//无需审核时考生已考次数-1
-                        examStudentIdNotAuditingList.add(teExamStudent.getId());
-                        teExamReexam.setAuditingId(tbUser.getId());
-                        teExamReexam.setAuditingStatus(0);
-                        teExamReexam.setAuditingTime(System.currentTimeMillis());
-                        teExamReexam.setAuditingSuggest("无需审核");
-                        teExamReexam.setUpdateId(tbUser.getId());
                     }
                     }
-                    teExamReexamService.save(teExamReexam);
                 }
                 }
             }
             }
             if (Objects.nonNull(teExamReexamAuditingList) && teExamReexamAuditingList.size() > 0) {
             if (Objects.nonNull(teExamReexamAuditingList) && teExamReexamAuditingList.size() > 0) {
@@ -172,38 +178,42 @@ public class TEExamReexamController {
                 List<TEExamReexam> teExamReexamUpdateList = new ArrayList<>();
                 List<TEExamReexam> teExamReexamUpdateList = new ArrayList<>();
                 for (String reexamId : reexamIdList) {
                 for (String reexamId : reexamIdList) {
                     if (redisUtil.lock(SystemConstant.REDIS_LOCK_REEXAM_AUDITING_PREFIX + Long.parseLong(reexamId), SystemConstant.REDIS_LOCK_REEXAM_TIME_OUT)) {
                     if (redisUtil.lock(SystemConstant.REDIS_LOCK_REEXAM_AUDITING_PREFIX + Long.parseLong(reexamId), SystemConstant.REDIS_LOCK_REEXAM_TIME_OUT)) {
-                        TEExamReexam teExamReexam = teExamReexamService.getById(Long.parseLong(reexamId));
-                        if (Objects.isNull(teExamReexam)) {
-                            throw new BusinessException("重考id[" + reexamId + "]记录不存在");
-                        }
-                        if (Objects.nonNull(teExamReexam.getStatus()) && teExamReexam.getStatus().intValue() == 2) {
-                            throw new BusinessException("重考id[" + reexamId + "]已经审核");
-                        } else {
-                            teExamReexam.setAuditingId(tbUser.getId());
-                            teExamReexam.setAuditingStatus(auditingStatus);
-                            teExamReexam.setAuditingTime(System.currentTimeMillis());
-                            teExamReexam.setAuditingSuggest(Objects.isNull(mapParameter.get("auditingSuggest")) ? null : String.valueOf(mapParameter.get("auditingSuggest")));
-                            teExamReexam.setStatus(2);
-                            teExamReexam.setUpdateId(tbUser.getId());
-                            teExamReexamUpdateList.add(teExamReexam);
-                            if (auditingStatus.intValue() == 0) {
-                                ExamStudentCacheBean examStudentCacheBean = teExamStudentService.getExamStudentCacheBean(teExamReexam.getExamStudentId());
-                                if (Objects.nonNull(examStudentCacheBean)) {
-                                    Integer alreadyExamCount = Objects.nonNull(examStudentCacheBean.getAlreadyExamCount()) ? examStudentCacheBean.getAlreadyExamCount() : 0;
-                                    if (alreadyExamCount > 0) {
-                                        alreadyExamCount = alreadyExamCount - 1;
-                                        ExamCacheBean ec = teExamService.getExamCacheBean(teExamReexam.getExamId());//考试缓存
-                                        if (alreadyExamCount.intValue() >= ec.getExamCount().intValue()) {
-                                            alreadyExamCount = 0;
+                        try {
+                            TEExamReexam teExamReexam = teExamReexamService.getById(Long.parseLong(reexamId));
+                            if (Objects.isNull(teExamReexam)) {
+                                throw new BusinessException("重考id[" + reexamId + "]记录不存在");
+                            }
+                            if (Objects.nonNull(teExamReexam.getStatus()) && teExamReexam.getStatus().intValue() == 2) {
+                                throw new BusinessException("重考id[" + reexamId + "]已经审核");
+                            } else {
+                                teExamReexam.setAuditingId(tbUser.getId());
+                                teExamReexam.setAuditingStatus(auditingStatus);
+                                teExamReexam.setAuditingTime(System.currentTimeMillis());
+                                teExamReexam.setAuditingSuggest(Objects.isNull(mapParameter.get("auditingSuggest")) ? null : String.valueOf(mapParameter.get("auditingSuggest")));
+                                teExamReexam.setStatus(2);
+                                teExamReexam.setUpdateId(tbUser.getId());
+                                teExamReexamUpdateList.add(teExamReexam);
+                                if (auditingStatus.intValue() == 0) {
+                                    ExamStudentCacheBean examStudentCacheBean = teExamStudentService.getExamStudentCacheBean(teExamReexam.getExamStudentId());
+                                    if (Objects.nonNull(examStudentCacheBean)) {
+                                        Integer alreadyExamCount = Objects.nonNull(examStudentCacheBean.getAlreadyExamCount()) ? examStudentCacheBean.getAlreadyExamCount() : 0;
+                                        if (alreadyExamCount > 0) {
+                                            alreadyExamCount = alreadyExamCount - 1;
+                                            ExamCacheBean ec = teExamService.getExamCacheBean(teExamReexam.getExamId());//考试缓存
+                                            if (alreadyExamCount.intValue() >= ec.getExamCount().intValue()) {
+                                                alreadyExamCount = 0;
+                                            }
+                                            UpdateWrapper<TEExamStudent> teExamStudentUpdateWrapper = new UpdateWrapper<>();
+                                            teExamStudentUpdateWrapper.lambda().set(TEExamStudent::getAlreadyExamCount, alreadyExamCount)
+                                                    .eq(TEExamStudent::getId, teExamReexam.getExamStudentId());
+                                            teExamStudentService.update(teExamStudentUpdateWrapper);
+                                            teExamStudentService.updateExamStudentCacheBean(teExamReexam.getExamStudentId());
                                         }
                                         }
-                                        UpdateWrapper<TEExamStudent> teExamStudentUpdateWrapper = new UpdateWrapper<>();
-                                        teExamStudentUpdateWrapper.lambda().set(TEExamStudent::getAlreadyExamCount, alreadyExamCount)
-                                                .eq(TEExamStudent::getId, teExamReexam.getExamStudentId());
-                                        teExamStudentService.update(teExamStudentUpdateWrapper);
-                                        teExamStudentService.updateExamStudentCacheBean(teExamReexam.getExamStudentId());
                                     }
                                     }
                                 }
                                 }
                             }
                             }
+                        } finally {
+                            redisUtil.releaseLock(SystemConstant.REDIS_LOCK_REEXAM_AUDITING_PREFIX + Long.parseLong(reexamId));
                         }
                         }
                     }
                     }
                 }
                 }
@@ -216,12 +226,6 @@ public class TEExamReexamController {
             } else {
             } else {
                 throw new RuntimeException(e);
                 throw new RuntimeException(e);
             }
             }
-        } finally {
-            if (Objects.nonNull(reexamIdList)) {
-                reexamIdList.forEach(s -> {
-                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_REEXAM_AUDITING_PREFIX + s);
-                });
-            }
         }
         }
         return ResultUtil.ok(Collections.singletonMap(SystemConstant.SUCCESS, true));
         return ResultUtil.ok(Collections.singletonMap(SystemConstant.SUCCESS, true));
     }
     }

+ 24 - 12
themis-backend/src/main/java/com/qmth/themis/backend/websocket/WebSocketAdminServer.java

@@ -204,13 +204,19 @@ public class WebSocketAdminServer
      */
      */
     public synchronized void addOnlineCount() {
     public synchronized void addOnlineCount() {
         if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
         if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
-            Object o = redisUtil.get(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT);
-            int count = 0;
-            if (Objects.nonNull(o)) {
-                count = (int) o;
+            try {
+                Object o = redisUtil.get(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT);
+                int count = 0;
+                if (Objects.nonNull(o)) {
+                    count = (int) o;
+                }
+                count++;
+                redisUtil.set(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT, count);
+            } finally {
+                if (Objects.nonNull(this.sessionId)) {
+                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId);
+                }
             }
             }
-            count++;
-            redisUtil.set(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT, count);
         }
         }
     }
     }
 
 
@@ -219,13 +225,19 @@ public class WebSocketAdminServer
      */
      */
     public synchronized void subOnlineCount() {
     public synchronized void subOnlineCount() {
         if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
         if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
-            Object o = redisUtil.get(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT);
-            int count = 0;
-            if (Objects.nonNull(o)) {
-                count = (int) o;
+            try {
+                Object o = redisUtil.get(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT);
+                int count = 0;
+                if (Objects.nonNull(o)) {
+                    count = (int) o;
+                }
+                count--;
+                redisUtil.set(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT, count < 0 ? 0 : count);
+            } finally {
+                if (Objects.nonNull(this.sessionId)) {
+                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId);
+                }
             }
             }
-            count--;
-            redisUtil.set(SystemConstant.WEBSOCKET_ADMIN_ONLINE_COUNT, count < 0 ? 0 : count);
         }
         }
     }
     }
 
 

+ 29 - 26
themis-business/src/main/java/com/qmth/themis/business/service/impl/TOeExamRecordServiceImpl.java

@@ -954,8 +954,8 @@ public class TOeExamRecordServiceImpl extends ServiceImpl<TOeExamRecordMapper, T
     @Override
     @Override
     @Transactional
     @Transactional
     public void setExamBreak(Long recordId) {
     public void setExamBreak(Long recordId) {
-        try {
-            if (redisUtil.lock(SystemConstant.REDIS_LOCK_EXAM_BREAK_PREFIX + recordId, SystemConstant.REDIS_LOCK_EXAM_BREAK_TIME_OUT)) {
+        if (redisUtil.lock(SystemConstant.REDIS_LOCK_EXAM_BREAK_PREFIX + recordId, SystemConstant.REDIS_LOCK_EXAM_BREAK_TIME_OUT)) {
+            try {
                 Integer alreadyBreakCount = Objects.isNull(ExamRecordCacheUtil.getAlreadyBreakCount(recordId)) ? 0 : ExamRecordCacheUtil.getAlreadyBreakCount(recordId);
                 Integer alreadyBreakCount = Objects.isNull(ExamRecordCacheUtil.getAlreadyBreakCount(recordId)) ? 0 : ExamRecordCacheUtil.getAlreadyBreakCount(recordId);
                 alreadyBreakCount++;
                 alreadyBreakCount++;
                 Long examActivityId = ExamRecordCacheUtil.getExamActivityId(recordId);
                 Long examActivityId = ExamRecordCacheUtil.getExamActivityId(recordId);
@@ -978,17 +978,17 @@ public class TOeExamRecordServiceImpl extends ServiceImpl<TOeExamRecordMapper, T
                 //考试断点异常原因 发送mq end
                 //考试断点异常原因 发送mq end
                 //更新场次-考试记录缓存
                 //更新场次-考试记录缓存
                 ExamActivityRecordCacheUtil.setExamRecordStatus(examActivityId, recordId, new ExamActivityRecordCacheBean(ExamRecordCacheUtil.getExamStudentId(recordId), ExamRecordCacheUtil.getStatus(recordId)));
                 ExamActivityRecordCacheUtil.setExamRecordStatus(examActivityId, recordId, new ExamActivityRecordCacheBean(ExamRecordCacheUtil.getExamStudentId(recordId), ExamRecordCacheUtil.getStatus(recordId)));
-            }
-        } catch (Exception e) {
-            log.error("请求出错", e);
-            if (e instanceof BusinessException) {
-                throw new BusinessException(e.getMessage());
-            } else {
-                throw new RuntimeException(e);
-            }
-        } finally {
-            if (Objects.nonNull(recordId)) {
-                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_EXAM_BREAK_PREFIX + recordId);
+            } catch (Exception e) {
+                log.error("请求出错", e);
+                if (e instanceof BusinessException) {
+                    throw new BusinessException(e.getMessage());
+                } else {
+                    throw new RuntimeException(e);
+                }
+            } finally {
+                if (Objects.nonNull(recordId)) {
+                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_EXAM_BREAK_PREFIX + recordId);
+                }
             }
             }
         }
         }
     }
     }
@@ -1003,8 +1003,8 @@ public class TOeExamRecordServiceImpl extends ServiceImpl<TOeExamRecordMapper, T
     @Override
     @Override
     public Boolean sendExamBreakMsg(Long recordId, boolean setBreak) {
     public Boolean sendExamBreakMsg(Long recordId, boolean setBreak) {
         Boolean finished = false;
         Boolean finished = false;
-        try {
-            if (redisUtil.lock(SystemConstant.REDIS_LOCK_EXAM_BREAK_LOGIC_PREFIX + recordId, SystemConstant.REDIS_LOCK_EXAM_BREAK_LOGIC_TIME_OUT)) {
+        if (redisUtil.lock(SystemConstant.REDIS_LOCK_EXAM_BREAK_LOGIC_PREFIX + recordId, SystemConstant.REDIS_LOCK_EXAM_BREAK_LOGIC_TIME_OUT)) {
+            try {
                 Long examId = ExamRecordCacheUtil.getExamId(recordId);
                 Long examId = ExamRecordCacheUtil.getExamId(recordId);
                 Long examStudentId = ExamRecordCacheUtil.getExamStudentId(recordId);
                 Long examStudentId = ExamRecordCacheUtil.getExamStudentId(recordId);
                 ExamStudentCacheBean examStudentCacheBean = teExamStudentService.getExamStudentCacheBean(examStudentId);
                 ExamStudentCacheBean examStudentCacheBean = teExamStudentService.getExamStudentCacheBean(examStudentId);
@@ -1013,6 +1013,9 @@ public class TOeExamRecordServiceImpl extends ServiceImpl<TOeExamRecordMapper, T
                 Integer breakExpireSeconds = Objects.isNull(ec.getBreakExpireSeconds()) ? 0 : ec.getBreakExpireSeconds();
                 Integer breakExpireSeconds = Objects.isNull(ec.getBreakExpireSeconds()) ? 0 : ec.getBreakExpireSeconds();
                 Integer durationSeconds = Objects.isNull(ExamRecordCacheUtil.getDurationSeconds(recordId)) ? 0 : ExamRecordCacheUtil.getDurationSeconds(recordId);
                 Integer durationSeconds = Objects.isNull(ExamRecordCacheUtil.getDurationSeconds(recordId)) ? 0 : ExamRecordCacheUtil.getDurationSeconds(recordId);
                 Integer alreadyBreakCount = Objects.isNull(ExamRecordCacheUtil.getAlreadyBreakCount(recordId)) ? 0 : ExamRecordCacheUtil.getAlreadyBreakCount(recordId);
                 Integer alreadyBreakCount = Objects.isNull(ExamRecordCacheUtil.getAlreadyBreakCount(recordId)) ? 0 : ExamRecordCacheUtil.getAlreadyBreakCount(recordId);
+                if (setBreak) {//如果需要断点,则次数本地先加1,可以避免多生成考试记录
+                    alreadyBreakCount++;
+                }
                 Integer leftBreakResumeCount = ec.getBreakResumeCount() - alreadyBreakCount;
                 Integer leftBreakResumeCount = ec.getBreakResumeCount() - alreadyBreakCount;
                 if (Objects.nonNull(lastBreakTime) && (System.currentTimeMillis() - lastBreakTime) / 1000 >= breakExpireSeconds) {
                 if (Objects.nonNull(lastBreakTime) && (System.currentTimeMillis() - lastBreakTime) / 1000 >= breakExpireSeconds) {
                     finished = true;
                     finished = true;
@@ -1027,17 +1030,17 @@ public class TOeExamRecordServiceImpl extends ServiceImpl<TOeExamRecordMapper, T
                         }
                         }
                     }
                     }
                 }
                 }
-            }
-        } catch (Exception e) {
-            log.error("请求出错", e);
-            if (e instanceof BusinessException) {
-                throw new BusinessException(e.getMessage());
-            } else {
-                throw new RuntimeException(e);
-            }
-        } finally {
-            if (Objects.nonNull(recordId)) {
-                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_EXAM_BREAK_LOGIC_PREFIX + recordId);
+            } catch (Exception e) {
+                log.error("请求出错", e);
+                if (e instanceof BusinessException) {
+                    throw new BusinessException(e.getMessage());
+                } else {
+                    throw new RuntimeException(e);
+                }
+            } finally {
+                if (Objects.nonNull(recordId)) {
+                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_EXAM_BREAK_LOGIC_PREFIX + recordId);
+                }
             }
             }
         }
         }
         return finished;
         return finished;

+ 9 - 11
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketMobileServer.java

@@ -277,23 +277,21 @@ public class WebSocketMobileServer implements Concurrently {
                     mqOeLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                     mqOeLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        mqOeLogicService.execMqOeMobileLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    } else {
-                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                        try {
+                            mqOeLogicService.execMqOeMobileLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                        } finally {
+                            if (Objects.nonNull(mqDto)) {
+                                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+                            }
+                        }
                     }
                     }
                 }
                 }
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             log.error("mq websocket oe,消息消费出错", e);
             log.error("mq websocket oe,消息消费出错", e);
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-        } finally {
-            if (Objects.nonNull(mqDto)) {
-                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
-            }
         }
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
     }
     }
-}
-
+}

+ 32 - 21
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java

@@ -252,13 +252,19 @@ public class WebSocketOeServer implements Concurrently {
      */
      */
     public synchronized void addOnlineCount() {
     public synchronized void addOnlineCount() {
         if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
         if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
-            Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
-            int count = 0;
-            if (Objects.nonNull(o)) {
-                count = (int) o;
+            try {
+                Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
+                int count = 0;
+                if (Objects.nonNull(o)) {
+                    count = (int) o;
+                }
+                count++;
+                redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count);
+            } finally {
+                if (Objects.nonNull(this.sessionId)) {
+                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId);
+                }
             }
             }
-            count++;
-            redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count);
         }
         }
     }
     }
 
 
@@ -267,13 +273,19 @@ public class WebSocketOeServer implements Concurrently {
      */
      */
     public synchronized void subOnlineCount() {
     public synchronized void subOnlineCount() {
         if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
         if (redisUtil.lock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId, SystemConstant.REDIS_LOCK_WEBSOCKET_TIME_OUT)) {
-            Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
-            int count = 0;
-            if (Objects.nonNull(o)) {
-                count = (int) o;
+            try {
+                Object o = redisUtil.get(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT);
+                int count = 0;
+                if (Objects.nonNull(o)) {
+                    count = (int) o;
+                }
+                count--;
+                redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count < 0 ? 0 : count);
+            } finally {
+                if (Objects.nonNull(this.sessionId)) {
+                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_WEBSOCKET_PREFIX + this.sessionId);
+                }
             }
             }
-            count--;
-            redisUtil.set(SystemConstant.WEBSOCKET_OE_ONLINE_COUNT, count < 0 ? 0 : count);
         }
         }
     }
     }
 
 
@@ -295,21 +307,20 @@ public class WebSocketOeServer implements Concurrently {
                     mqOeLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                     mqOeLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        mqOeLogicService.execMqOeLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    } else {
-                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                        try {
+                            mqOeLogicService.execMqOeLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                        } finally {
+                            if (Objects.nonNull(mqDto)) {
+                                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+                            }
+                        }
                     }
                     }
                 }
                 }
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             log.error("mq websocket oe,消息消费出错", e);
             log.error("mq websocket oe,消息消费出错", e);
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-        } finally {
-            if (Objects.nonNull(mqDto)) {
-                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
-            }
         }
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
     }
     }

+ 20 - 16
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -812,20 +812,22 @@ public class MqLogicServiceImpl implements MqLogicService {
                     this.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                     this.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        Method method = this.getClass().getDeclaredMethod(mqExecTypeEnum.getDesc(), MqDto.class, String.class);
-                        log.info(":{}-:{} 准备执行mq exec:{}逻辑", threadId, threadName, method.getName());
-                        method.invoke(SpringContextHolder.getBean(MqLogicService.class), mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                        try {
+                            Method method = this.getClass().getDeclaredMethod(mqExecTypeEnum.getDesc(), MqDto.class, String.class);
+                            log.info(":{}-:{} 准备执行mq exec:{}逻辑", threadId, threadName, method.getName());
+                            method.invoke(SpringContextHolder.getBean(MqLogicService.class), mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                        } finally {
+                            if (Objects.nonNull(mqDto)) {
+                                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+                            }
+                        }
                     }
                     }
                 }
                 }
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             log.error("mq普通消息监听,消息消费出错", e);
             log.error("mq普通消息监听,消息消费出错", e);
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-        } finally {
-            if (Objects.nonNull(mqDto)) {
-                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
-            }
         }
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
     }
     }
@@ -853,20 +855,22 @@ public class MqLogicServiceImpl implements MqLogicService {
                     this.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST);
                     this.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST);
                 } else {
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        Method method = this.getClass().getDeclaredMethod(mqExecTypeEnum.getDesc(), MqDto.class, String.class);
-                        log.info(":{}-:{} 准备执行mq exec:{}逻辑", threadId, threadName, method.getName());
-                        method.invoke(SpringContextHolder.getBean(MqLogicService.class), mqDto, SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST);
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                        try {
+                            Method method = this.getClass().getDeclaredMethod(mqExecTypeEnum.getDesc(), MqDto.class, String.class);
+                            log.info(":{}-:{} 准备执行mq exec:{}逻辑", threadId, threadName, method.getName());
+                            method.invoke(SpringContextHolder.getBean(MqLogicService.class), mqDto, SystemConstant.MQ_DELAY_TOPIC_BUFFER_LIST);
+                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                        } finally {
+                            if (Objects.nonNull(mqDto)) {
+                                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+                            }
+                        }
                     }
                     }
                 }
                 }
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             log.error("mq延时消息监听,消息消费出错", e);
             log.error("mq延时消息监听,消息消费出错", e);
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-        } finally {
-            if (Objects.nonNull(mqDto)) {
-                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
-            }
         }
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
     }
     }

+ 10 - 9
themis-task/src/main/java/com/qmth/themis/task/listener/QuartzOrderlyImpl.java

@@ -6,8 +6,10 @@ import com.qmth.themis.business.dto.MqDto;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.contanst.Constants;
 import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.service.MqLogicService;
 import com.qmth.themis.mq.templete.Orderly;
 import com.qmth.themis.mq.templete.Orderly;
 import com.qmth.themis.task.listener.service.MqTaskLogicService;
 import com.qmth.themis.task.listener.service.MqTaskLogicService;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -46,21 +48,20 @@ public class QuartzOrderlyImpl implements Orderly {
                     mqTaskLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                     mqTaskLogicService.execMqMaxReconsumeTime(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
                 } else {
                 } else {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
                     if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        mqTaskLogicService.execMqQuartzLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
-                        return ConsumeOrderlyStatus.SUCCESS;
-                    } else {
-                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
+                        try {
+                            mqTaskLogicService.execMqQuartzLogic(mqDto, SystemConstant.MQ_TOPIC_BUFFER_LIST);
+                            return ConsumeOrderlyStatus.SUCCESS;
+                        } finally {
+                            if (Objects.nonNull(mqDto)) {
+                                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+                            }
+                        }
                     }
                     }
                 }
                 }
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             log.error("mq quartz顺序,消息消费出错", e);
             log.error("mq quartz顺序,消息消费出错", e);
             return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
             return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
-        } finally {
-            if (Objects.nonNull(mqDto)) {
-                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
-            }
         }
         }
         return ConsumeOrderlyStatus.SUCCESS;//成功
         return ConsumeOrderlyStatus.SUCCESS;//成功
     }
     }