deason 11 months ago
parent
commit
191113f950

+ 2 - 1
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/ExamCaptureQueueService.java

@@ -1,5 +1,6 @@
 package cn.com.qmth.examcloud.core.oe.task.service;
 
+import cn.com.qmth.examcloud.core.oe.student.dao.entity.ExamCaptureQueueEntity;
 import cn.com.qmth.examcloud.core.oe.task.service.bean.FaceApiParam;
 
 import java.util.List;
@@ -9,7 +10,7 @@ import java.util.List;
  */
 public interface ExamCaptureQueueService {
 
-    void handlerExamCaptureQueuesByExamRecordDataId(Long examRecordDataId, FaceApiParam param) throws Exception;
+    void handlerExamCaptureQueuesByExamRecordDataId(Long examRecordDataId, List<ExamCaptureQueueEntity> queues, FaceApiParam param) throws Exception;
 
     List<Long> findQueuesGroupByExamRecordDataId(int shardTotal, int shardIndex, int batchSize, int maxErrorNum, boolean examHandInFirst);
 

+ 1 - 2
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/bean/FaceApiParam.java

@@ -32,8 +32,7 @@ public class FaceApiParam implements Serializable {
     private boolean useLocalBaiduApiForFaceLiveness = true;
 
     public int getMaxThreadNum() {
-        // 最大数不超过20
-        return Math.min(maxThreadNum, 20);
+        return maxThreadNum;
     }
 
     public void setMaxThreadNum(int maxThreadNum) {

+ 1 - 3
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/impl/ExamCaptureQueueServiceImpl.java

@@ -65,9 +65,7 @@ public class ExamCaptureQueueServiceImpl implements ExamCaptureQueueService {
     private OssClient ossClient;
 
     @Override
-    public void handlerExamCaptureQueuesByExamRecordDataId(Long examRecordDataId, FaceApiParam param) throws Exception {
-        // 每个考试记录ID下通常只有几条待处理抓拍记录,直接取所有一次性处理。
-        List<ExamCaptureQueueEntity> queues = examCaptureQueueRepo.findByExamRecordDataId(examRecordDataId);
+    public void handlerExamCaptureQueuesByExamRecordDataId(Long examRecordDataId, List<ExamCaptureQueueEntity> queues, FaceApiParam param) throws Exception {
         if (CollectionUtils.isEmpty(queues)) {
             return;
         }

+ 18 - 12
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/job/FaceVerifyJobHandler.java

@@ -16,6 +16,9 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 
 /**
  * 处理考试过程中抓拍照片比对任务
@@ -42,7 +45,7 @@ public class FaceVerifyJobHandler {
                 shardIndex, batchSize, param.getMaxErrorNum(), true);
 
         if (CollectionUtils.isEmpty(todoExamRecordDataIds)) {
-            // 未取到时,再取一次“不区分状态”的考试记录ID集合
+            // 未取到时,再按常规取一次考试记录ID集合
             todoExamRecordDataIds = examCaptureQueueService.findQueuesGroupByExamRecordDataId(shardTotal,
                     shardIndex, batchSize, param.getMaxErrorNum(), false);
 
@@ -52,32 +55,34 @@ public class FaceVerifyJobHandler {
             }
         }
 
-        List<ExamCaptureQueueEntity> queues = examCaptureQueueRepo.findByExamRecordDataIdIn(todoExamRecordDataIds);
-        if (CollectionUtils.isEmpty(queues)) {
+        List<ExamCaptureQueueEntity> todoQueues = examCaptureQueueRepo.findByExamRecordDataIdIn(todoExamRecordDataIds);
+        if (CollectionUtils.isEmpty(todoQueues)) {
             return;
         }
-        log.warn("分片任务_FACE_{}_{} examRecordDataIdSize:{} queueSize:{}", shardTotal, shardIndex, todoExamRecordDataIds.size(), queues.size());
 
-        for (Long examRecordDataId : todoExamRecordDataIds) {
+        log.warn("分片任务_FACE_{}_{} examRecordDataIdSize:{} queueSize:{}", shardTotal, shardIndex, todoExamRecordDataIds.size(), todoQueues.size());
+        Map<Long, List<ExamCaptureQueueEntity>> maps = todoQueues.stream().collect(Collectors.groupingBy(ExamCaptureQueueEntity::getExamRecordDataId));
+
+        for (Map.Entry<Long, List<ExamCaptureQueueEntity>> entry : maps.entrySet()) {
+            Long examRecordDataId = entry.getKey();
             final String lockKey = CacheConstants.LOCK_FACE_COMPARE + examRecordDataId;
+
             try {
                 SequenceLockHelper.getLockSimple(lockKey);
 
-                // 处理未比对的抓拍照片
-                examCaptureQueueService.handlerExamCaptureQueuesByExamRecordDataId(examRecordDataId, param);
+                // 分别按“考试记录ID”集中处理未比对的抓拍照片记录(每个考试记录ID下通常只有几条待处理记录)
+                examCaptureQueueService.handlerExamCaptureQueuesByExamRecordDataId(examRecordDataId, entry.getValue(), param);
             } catch (Exception e) {
                 if (e instanceof InterruptedException) {
                     // 若线程终止,则抛出交由任务调度中心处理
-                    log.warn("当前人脸比对任务线程被终止!examRecordDataId:{}, error:{}", examRecordDataId,
-                            e.getMessage());
+                    log.warn("当前人脸比对任务线程被终止!examRecordDataId:{}, error:{}", examRecordDataId, e.getMessage());
                     throw e;
                 } else if (e instanceof SequenceLockException) {
                     // 若锁问题,下次会继续执行
                     log.warn("当前人脸比对任务获取锁失败!examRecordDataId:{}, redisKey:{}", examRecordDataId, lockKey);
                 } else {
                     // 若异常,下次会继续执行(需要排查原因)
-                    log.error("当前人脸比对任务处理失败!examRecordDataId:{}, error:{}", examRecordDataId,
-                            e.getMessage());
+                    log.error("当前人脸比对任务处理失败!examRecordDataId:{}, error:{}", examRecordDataId, e.getMessage());
                 }
             } finally {
                 SequenceLockHelper.releaseLockSimple(lockKey);
@@ -95,7 +100,8 @@ public class FaceVerifyJobHandler {
 
         JsonNode maxThreadNum = jsonParams.get("maxThreadNum");
         if (maxThreadNum != null) {
-            param.setMaxThreadNum(maxThreadNum.asInt(3));
+            // 最大数不超过20
+            param.setMaxThreadNum(Math.min(maxThreadNum.asInt(3), 20));
         }
 
         JsonNode maxErrorNum = jsonParams.get("maxErrorNum");