|
@@ -1,23 +1,22 @@
|
|
|
package cn.com.qmth.examcloud.core.oe.student.face.starter.config;
|
|
|
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.boot.ApplicationArguments;
|
|
|
+import org.springframework.boot.ApplicationRunner;
|
|
|
+import org.springframework.core.annotation.Order;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
import cn.com.qmth.examcloud.commons.helpers.concurrency.simple.ConcurrentTask;
|
|
|
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
|
|
|
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
|
|
|
import cn.com.qmth.examcloud.commons.util.Util;
|
|
|
import cn.com.qmth.examcloud.core.oe.common.entity.ExamCaptureQueueEntity;
|
|
|
import cn.com.qmth.examcloud.core.oe.common.repository.ExamCaptureQueueRepo;
|
|
|
-import cn.com.qmth.examcloud.core.oe.student.face.service.impl.ExamCaptureProcessStatisticController;
|
|
|
import cn.com.qmth.examcloud.core.oe.student.face.service.impl.FacePPCompareWorker;
|
|
|
import cn.com.qmth.examcloud.exchange.inner.api.SmsCloudService;
|
|
|
import cn.com.qmth.examcloud.web.bootstrap.PropertyHolder;
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.boot.ApplicationArguments;
|
|
|
-import org.springframework.boot.ApplicationRunner;
|
|
|
-import org.springframework.core.annotation.Order;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-
|
|
|
-import java.util.List;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
/**
|
|
|
* 启动人脸比对任务
|
|
@@ -26,72 +25,76 @@ import java.util.concurrent.TimeUnit;
|
|
|
@Order(200)
|
|
|
public class ProcessFaceCompareQueueTask implements ApplicationRunner {
|
|
|
|
|
|
- @Autowired
|
|
|
- ExamCaptureQueueRepo examCaptureQueueRepo;
|
|
|
- private final Log captureLog = LogFactory.getLog("PROCESS_EXAM_CAPTURE_TASK_LOGGER");
|
|
|
- private final ExamCaptureProcessStatisticController statisticController = new ExamCaptureProcessStatisticController();
|
|
|
- //失败率预警阈值
|
|
|
- private static final double RATE_WARN_THRESHOLD = 0.5;
|
|
|
- //总数量预警阈值
|
|
|
- private static final int COUNT_WARN_THRESHOLD = 10;
|
|
|
- @Autowired
|
|
|
- SmsCloudService smsCloudService;
|
|
|
-
|
|
|
- private void start() {
|
|
|
- ConcurrentTask concurrentTask = new ConcurrentTask();
|
|
|
- concurrentTask.setMaxActiveThreadSize(PropertyHolder.getInt("$capture.thread.maxActiveThreadSize", 100));
|
|
|
- concurrentTask.setMinThreadSize(PropertyHolder.getInt("$capture.thread.minThreadSize", 2));
|
|
|
- concurrentTask.setWorker(new FacePPCompareWorker());
|
|
|
- concurrentTask.start();
|
|
|
- //当前获取数据的批次号(默认用时间戳)
|
|
|
- String processBatchNum = "A_" + System.currentTimeMillis();
|
|
|
-
|
|
|
- captureLog.debug("[PROCESS_FACEPP." + processBatchNum + "] 启动face++人脸比对服务...");
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- //如果队列没满,则从数据库中查数据并插入
|
|
|
- List<ExamCaptureQueueEntity> examCaptureQueueList = examCaptureQueueRepo.
|
|
|
- findNeedFaceCompareExamCaptureQueuesLimitByProcessBatchNum(PropertyHolder.getInt("$capture.queue.limit", 100), processBatchNum);
|
|
|
- //如果队列中没取到数据,则2秒钟后再取
|
|
|
- if (null == examCaptureQueueList || examCaptureQueueList.isEmpty()) {
|
|
|
- captureLog.debug("[PROCESS_FACEPP." + processBatchNum + "] 抓拍队列中没有取到数据,2秒后重试");
|
|
|
-
|
|
|
- Util.sleep(PropertyHolder.getInt("$capture.queue.read.sleepSeconds.", 2));
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- captureLog.debug("[PROCESS_FACEPP." + processBatchNum + "] 抓拍队列中的数据条数为:" + examCaptureQueueList.size());
|
|
|
-
|
|
|
- for (ExamCaptureQueueEntity offeredQueueEntity : examCaptureQueueList) {
|
|
|
- while (true) {
|
|
|
- boolean offerSuccess = concurrentTask.offerElement(offeredQueueEntity);
|
|
|
- //如果向队列中添加数据成功,则更新标识
|
|
|
- if (offerSuccess) {
|
|
|
- offeredQueueEntity.setProcessBatchNum(processBatchNum);
|
|
|
- examCaptureQueueRepo.save(offeredQueueEntity);
|
|
|
-
|
|
|
- captureLog.debug("[PROCESS_FACEPP." + processBatchNum + "] 向工作队列中添加数据成功:fileUrl=" + offeredQueueEntity.getFileUrl());
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- captureLog.debug("[PROCESS_FACEPP." + processBatchNum + "] 向工作队列中添加数据失败,30秒后重试:fileUrl=" + offeredQueueEntity.getFileUrl());
|
|
|
-
|
|
|
- Util.sleep(PropertyHolder.getInt("$capture.queue.offer.sleepSeconds.", 30));
|
|
|
- }
|
|
|
- }
|
|
|
- Util.sleep(2);
|
|
|
- } catch (Exception e) {
|
|
|
- captureLog.error("[PROCESS_FACEPP." + processBatchNum + "] 百度活体检测出出异常,3秒后重试", e);
|
|
|
- Util.sleep(3);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run(ApplicationArguments args) {
|
|
|
- new Thread(() -> {
|
|
|
- start();
|
|
|
- }).start();
|
|
|
- }
|
|
|
+ private final ExamCloudLog captureLog = ExamCloudLogFactory
|
|
|
+ .getLog("PROCESS_EXAM_CAPTURE_TASK_LOGGER");
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ ExamCaptureQueueRepo examCaptureQueueRepo;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ SmsCloudService smsCloudService;
|
|
|
+
|
|
|
+ private void start() {
|
|
|
+ ConcurrentTask<ExamCaptureQueueEntity> concurrentTask = new ConcurrentTask<ExamCaptureQueueEntity>();
|
|
|
+ concurrentTask.setMaxActiveThreadSize(
|
|
|
+ PropertyHolder.getInt("$capture.thread.maxActiveThreadSize", 100));
|
|
|
+ concurrentTask.setMinThreadSize(PropertyHolder.getInt("$capture.thread.minThreadSize", 2));
|
|
|
+ concurrentTask.setWorker(new FacePPCompareWorker());
|
|
|
+ concurrentTask.start();
|
|
|
+ // 当前获取数据的批次号(默认用时间戳)
|
|
|
+ String processBatchNum = "A_" + System.currentTimeMillis();
|
|
|
+
|
|
|
+ captureLog.debug("[PROCESS_FACEPP." + processBatchNum + "] 启动face++人脸比对服务...");
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ // 如果队列没满,则从数据库中查数据并插入
|
|
|
+ List<ExamCaptureQueueEntity> examCaptureQueueList = examCaptureQueueRepo
|
|
|
+ .findNeedFaceCompareExamCaptureQueuesLimitByProcessBatchNum(
|
|
|
+ PropertyHolder.getInt("$capture.queue.limit", 100),
|
|
|
+ processBatchNum);
|
|
|
+ // 如果队列中没取到数据,则2秒钟后再取
|
|
|
+ if (null == examCaptureQueueList || examCaptureQueueList.isEmpty()) {
|
|
|
+ captureLog.debug("[PROCESS_FACEPP." + processBatchNum + "] 抓拍队列中没有取到数据,2秒后重试");
|
|
|
+
|
|
|
+ Util.sleep(PropertyHolder.getInt("$capture.queue.read.sleepSeconds.", 2));
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ captureLog.debug("[PROCESS_FACEPP." + processBatchNum + "] 抓拍队列中的数据条数为:"
|
|
|
+ + examCaptureQueueList.size());
|
|
|
+
|
|
|
+ for (ExamCaptureQueueEntity offeredQueueEntity : examCaptureQueueList) {
|
|
|
+ while (true) {
|
|
|
+ boolean offerSuccess = concurrentTask.offerElement(offeredQueueEntity);
|
|
|
+ // 如果向队列中添加数据成功,则更新标识
|
|
|
+ if (offerSuccess) {
|
|
|
+ offeredQueueEntity.setProcessBatchNum(processBatchNum);
|
|
|
+ examCaptureQueueRepo.save(offeredQueueEntity);
|
|
|
+
|
|
|
+ captureLog.debug("[PROCESS_FACEPP." + processBatchNum
|
|
|
+ + "] 向工作队列中添加数据成功:fileUrl=" + offeredQueueEntity.getFileUrl());
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ captureLog.debug("[PROCESS_FACEPP." + processBatchNum
|
|
|
+ + "] 向工作队列中添加数据失败,30秒后重试:fileUrl="
|
|
|
+ + offeredQueueEntity.getFileUrl());
|
|
|
+
|
|
|
+ Util.sleep(PropertyHolder.getInt("$capture.queue.offer.sleepSeconds.", 30));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Util.sleep(2);
|
|
|
+ } catch (Exception e) {
|
|
|
+ captureLog.error("[PROCESS_FACEPP." + processBatchNum + "] 百度活体检测出出异常,3秒后重试", e);
|
|
|
+ Util.sleep(3);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run(ApplicationArguments args) {
|
|
|
+ new Thread(() -> {
|
|
|
+ start();
|
|
|
+ }).start();
|
|
|
+ }
|
|
|
}
|