浏览代码

update thread handlerExamCaptureQueues

deason 10 月之前
父节点
当前提交
766d518112

+ 2 - 2
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/bean/FaceApiParam.java

@@ -7,9 +7,9 @@ public class FaceApiParam implements Serializable {
     private static final long serialVersionUID = 2983309896884898091L;
     private static final long serialVersionUID = 2983309896884898091L;
 
 
     /**
     /**
-     * 最大线程数【默认:3
+     * 最大线程数【默认:2
      */
      */
-    private int maxThreadNum = 3;
+    private int maxThreadNum = 2;
 
 
     /**
     /**
      * 最大错误次数【默认:10】
      * 最大错误次数【默认:10】

+ 30 - 15
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/impl/ExamCaptureQueueServiceImpl.java

@@ -88,7 +88,7 @@ public class ExamCaptureQueueServiceImpl implements ExamCaptureQueueService {
             baiduExpectFaceCompareScore = Double.parseDouble(faceScoreConfig.getValue());
             baiduExpectFaceCompareScore = Double.parseDouble(faceScoreConfig.getValue());
         }
         }
 
 
-        log.warn("studentId:{} examRecordDataId:{} queueSize:{} useBaiduApi:{} useLocalBaiduApiForFaceCompare:{} useLocalBaiduApiForFaceLiveness:{} scoreConfig:{}",
+        log.warn("studentId:{} examRecordDataId:{} queueSize:{} useBaiduApi:{} compare:{} liveness:{} scoreConfig:{}",
                 studentId, examRecordDataId, queues.size(), useBaiduApi, param.isUseLocalBaiduApiForFaceCompare(),
                 studentId, examRecordDataId, queues.size(), useBaiduApi, param.isUseLocalBaiduApiForFaceCompare(),
                 param.isUseLocalBaiduApiForFaceLiveness(), baiduExpectFaceCompareScore);
                 param.isUseLocalBaiduApiForFaceLiveness(), baiduExpectFaceCompareScore);
 
 
@@ -106,21 +106,9 @@ public class ExamCaptureQueueServiceImpl implements ExamCaptureQueueService {
 
 
         for (ExamCaptureQueueEntity queue : queues) {
         for (ExamCaptureQueueEntity queue : queues) {
             // 将抓拍照图片转成Base64格式
             // 将抓拍照图片转成Base64格式
-            String capturePhotoPath = FssHelper.fixFilePath(queue.getFileUrl());
-            if (StringUtils.isBlank(capturePhotoPath)) {
+            ImageParm capturePhoto = this.checkAndReadFile(queue.getFileUrl(), examRecordDataId);
+            if (capturePhoto == null) {
                 // 删除错误无效数据
                 // 删除错误无效数据
-                log.warn("人脸抓拍图片地址错误!examRecordDataId:{} queueId:{} imgUrl:{}", queue.getExamRecordDataId(),
-                        queue.getId(), queue.getFileUrl());
-                examCaptureQueueRepo.deleteById(queue.getId());
-                continue;
-            }
-
-            byte[] capturePhotoBytes = FssFactory.getInstance().readFile(capturePhotoPath);
-            ImageParm capturePhoto = new ImageBase64Parm(Base64.encodeBase64String(capturePhotoBytes));
-            if (StringUtils.isBlank(capturePhoto.value())) {
-                // 删除错误无效数据
-                log.warn("人脸抓拍图片为空白!examRecordDataId:{} queueId:{} imgUrl:{}", queue.getExamRecordDataId(),
-                        queue.getId(), queue.getFileUrl());
                 examCaptureQueueRepo.deleteById(queue.getId());
                 examCaptureQueueRepo.deleteById(queue.getId());
                 continue;
                 continue;
             }
             }
@@ -143,6 +131,33 @@ public class ExamCaptureQueueServiceImpl implements ExamCaptureQueueService {
         }
         }
     }
     }
 
 
+    private ImageParm checkAndReadFile(String fileUrl, Long examRecordDataId) {
+        String imgPath = FssHelper.fixFilePath(fileUrl);
+        if (StringUtils.isBlank(imgPath)) {
+            log.warn("照片地址有误!examRecordDataId:{} imgUrl:{}", examRecordDataId, fileUrl);
+            return null;
+        }
+
+        byte[] imgBytes;
+        try {
+            imgBytes = FssFactory.getInstance().readFile(imgPath);
+        } catch (Exception e) {
+            boolean exist = FssFactory.getInstance().existFile(imgPath);
+            if (!exist) {
+                log.warn("照片文件不存在!examRecordDataId:{} imgUrl:{}", examRecordDataId, fileUrl);
+                return null;
+            }
+            throw e;
+        }
+
+        if (imgBytes == null || imgBytes.length == 0) {
+            log.warn("照片文件有误!examRecordDataId:{} imgUrl:{}", examRecordDataId, fileUrl);
+            return null;
+        }
+
+        return new ImageBase64Parm(Base64.encodeBase64String(imgBytes));
+    }
+
     private boolean doFaceCompare(ExamCaptureQueueEntity queue, ImageParm basePhoto, ImageParm capturePhoto,
     private boolean doFaceCompare(ExamCaptureQueueEntity queue, ImageParm basePhoto, ImageParm capturePhoto,
                                   FaceApiParam param, boolean useBaiduApi, Double baiduExpectFaceCompareScore,
                                   FaceApiParam param, boolean useBaiduApi, Double baiduExpectFaceCompareScore,
                                   int curErrorNum) {
                                   int curErrorNum) {

+ 46 - 28
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/job/FaceVerifyJobHandler.java

@@ -19,6 +19,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 
 
@@ -37,6 +39,7 @@ public class FaceVerifyJobHandler {
     private ExamCaptureQueueService examCaptureQueueService;
     private ExamCaptureQueueService examCaptureQueueService;
 
 
     public void run(int shardTotal, int shardIndex, String jobParam) throws Exception {
     public void run(int shardTotal, int shardIndex, String jobParam) throws Exception {
+        long startTime = System.currentTimeMillis();
         FaceApiParam param = this.parseJobParam(jobParam);
         FaceApiParam param = this.parseJobParam(jobParam);
 
 
         // 根据任务调度策略,推荐50
         // 根据任务调度策略,推荐50
@@ -45,17 +48,17 @@ public class FaceVerifyJobHandler {
         // 优先取“已交卷”的考试记录ID集合
         // 优先取“已交卷”的考试记录ID集合
         List<Long> examRecordDataIds = examCaptureQueueService.findQueuesGroupByExamRecordDataId(shardTotal,
         List<Long> examRecordDataIds = examCaptureQueueService.findQueuesGroupByExamRecordDataId(shardTotal,
                 shardIndex, batchSize, param.getMaxErrorNum(), true);
                 shardIndex, batchSize, param.getMaxErrorNum(), true);
+        int firstCount = examRecordDataIds.size(), moreCount = 0;
 
 
-        if (examRecordDataIds.size() < batchSize) {
+        if (firstCount < batchSize) {
             // 未取到或取到的数量不足时,再按“不区分交卷状态”方式取一次考试记录ID集合来补充(避免调度任务不饱和空转)
             // 未取到或取到的数量不足时,再按“不区分交卷状态”方式取一次考试记录ID集合来补充(避免调度任务不饱和空转)
             List<Long> moreExamRecordDataIds = examCaptureQueueService.findQueuesGroupByExamRecordDataId(shardTotal,
             List<Long> moreExamRecordDataIds = examCaptureQueueService.findQueuesGroupByExamRecordDataId(shardTotal,
                     shardIndex, batchSize, param.getMaxErrorNum(), false);
                     shardIndex, batchSize, param.getMaxErrorNum(), false);
-
-            log.warn("分片任务_FACE_{}_{} 已交卷考试记录数:{} 补充考试记录数:{}", shardTotal, shardIndex, examRecordDataIds.size(), moreExamRecordDataIds.size());
+            moreCount = moreExamRecordDataIds.size();
             examRecordDataIds.addAll(moreExamRecordDataIds);
             examRecordDataIds.addAll(moreExamRecordDataIds);
 
 
             if (CollectionUtils.isEmpty(examRecordDataIds)) {
             if (CollectionUtils.isEmpty(examRecordDataIds)) {
-                log.warn("分片任务_FACE_{}_{} 当前分片暂无待处理记录!", shardTotal, shardIndex);
+                log.warn("分片任务_FACE_{}_{} 暂无待处理记录!", shardTotal, shardIndex);
                 return;
                 return;
             }
             }
         }
         }
@@ -63,37 +66,52 @@ public class FaceVerifyJobHandler {
         Set<Long> todoIds = new HashSet<>(examRecordDataIds);
         Set<Long> todoIds = new HashSet<>(examRecordDataIds);
         List<ExamCaptureQueueEntity> todoQueues = examCaptureQueueRepo.findByExamRecordDataIdIn(todoIds);
         List<ExamCaptureQueueEntity> todoQueues = examCaptureQueueRepo.findByExamRecordDataIdIn(todoIds);
         if (CollectionUtils.isEmpty(todoQueues)) {
         if (CollectionUtils.isEmpty(todoQueues)) {
+            log.warn("分片任务_FACE_{}_{} 暂无待处理队列记录!", shardTotal, shardIndex);
             return;
             return;
         }
         }
 
 
-        log.warn("分片任务_FACE_{}_{} 本次处理考试记录数:{} 待比对照片数:{}", shardTotal, shardIndex, todoIds.size(), todoQueues.size());
+        long queryCost = System.currentTimeMillis() - startTime;
+        log.warn("分片任务_FACE_{}_{} 本次处理考试记录数:{} 待比对照片数:{} firstCount:{} moreCount:{} cost:{}ms",
+                shardTotal, shardIndex, todoIds.size(), todoQueues.size(), firstCount, moreCount, queryCost);
         Map<Long, List<ExamCaptureQueueEntity>> maps = todoQueues.stream().collect(Collectors.groupingBy(ExamCaptureQueueEntity::getExamRecordDataId));
         Map<Long, List<ExamCaptureQueueEntity>> maps = todoQueues.stream().collect(Collectors.groupingBy(ExamCaptureQueueEntity::getExamRecordDataId));
 
 
+        // 创建线程池
+        ExecutorService executorService = Executors.newFixedThreadPool(param.getMaxThreadNum());
         for (Map.Entry<Long, List<ExamCaptureQueueEntity>> entry : maps.entrySet()) {
         for (Map.Entry<Long, List<ExamCaptureQueueEntity>> entry : maps.entrySet()) {
-            Long examRecordDataId = entry.getKey();
-            final String lockKey = CacheConstants.LOCK_FACE_COMPARE + examRecordDataId;
-
-            try {
-                SequenceLockHelper.getLockSimple(lockKey);
-
-                // 分别按“考试记录ID”集中处理未比对的抓拍照片记录(每个考试记录ID下通常只有几条待处理记录)
-                examCaptureQueueService.handlerExamCaptureQueuesByExamRecordDataId(examRecordDataId, entry.getValue(), param);
-            } catch (Exception e) {
-                if (e instanceof InterruptedException) {
-                    // 若线程终止,则抛出交由任务调度中心处理
-                    log.warn("当前人脸比对任务线程被终止!examRecordDataId:{}, error:{}", examRecordDataId, e.getMessage());
-                    throw e;
-                } else if (e instanceof SequenceLockException) {
-                    // 若锁问题,下次会继续执行
-                    log.warn("当前人脸比对任务获取锁失败!examRecordDataId:{}, redisKey:{}", examRecordDataId, lockKey);
-                } else {
-                    // 若异常,下次会继续执行(需要排查原因)
-                    log.error("当前人脸比对任务处理失败!examRecordDataId:{}, error:{}", examRecordDataId, e.getMessage());
+            executorService.execute(() -> {
+                // 分别按“考试记录ID”集中处理该“考试记录ID”下的抓拍照片队列记录(通常只有几条)
+                Long examRecordDataId = entry.getKey();
+                final String lockKey = CacheConstants.LOCK_FACE_COMPARE + examRecordDataId;
+
+                try {
+                    SequenceLockHelper.getLockSimple(lockKey);
+                    examCaptureQueueService.handlerExamCaptureQueuesByExamRecordDataId(examRecordDataId, entry.getValue(), param);
+                } catch (Exception e) {
+                    if (e instanceof InterruptedException) {
+                        // 若线程终止,则抛出交由任务调度中心处理
+                        log.warn("当前人脸比对任务线程被终止!examRecordDataId:{}, error:{}", examRecordDataId, e.getMessage());
+                    } else if (e instanceof SequenceLockException) {
+                        // 若锁问题,下次会继续执行
+                        log.warn("当前人脸比对任务获取锁失败!examRecordDataId:{}, redisKey:{}", examRecordDataId, lockKey);
+                    } else {
+                        // 若异常,下次会继续执行(需要排查原因)
+                        log.error("当前人脸比对任务处理失败!examRecordDataId:{}, error:{}", examRecordDataId, e.getMessage());
+                    }
+                } finally {
+                    SequenceLockHelper.releaseLockSimple(lockKey);
                 }
                 }
-            } finally {
-                SequenceLockHelper.releaseLockSimple(lockKey);
-            }
+            });
         }
         }
+
+        // 停止接受新任务,并等待所有任务完成
+        executorService.shutdown();
+        while (!executorService.isTerminated()) {
+            // ignore
+        }
+
+        long allCost = Math.max((System.currentTimeMillis() - startTime) / 1000, 1);
+        log.warn("分片任务_FACE_{}_{} 本次处理比对照片数:{} 约{}个每秒 threadNum:{} cost:{}s", shardTotal, shardIndex,
+                todoQueues.size(), (double) todoQueues.size() / allCost, param.getMaxThreadNum(), allCost);
     }
     }
 
 
     private FaceApiParam parseJobParam(String jobParam) {
     private FaceApiParam parseJobParam(String jobParam) {
@@ -107,7 +125,7 @@ public class FaceVerifyJobHandler {
         JsonNode maxThreadNum = jsonParams.get("maxThreadNum");
         JsonNode maxThreadNum = jsonParams.get("maxThreadNum");
         if (maxThreadNum != null) {
         if (maxThreadNum != null) {
             // 最大数不超过20
             // 最大数不超过20
-            param.setMaxThreadNum(Math.min(maxThreadNum.asInt(3), 20));
+            param.setMaxThreadNum(Math.min(maxThreadNum.asInt(2), 20));
         }
         }
 
 
         JsonNode maxErrorNum = jsonParams.get("maxErrorNum");
         JsonNode maxErrorNum = jsonParams.get("maxErrorNum");