Эх сурвалжийг харах

临时改造按支持机构范围同步

deason 4 жил өмнө
parent
commit
99d8e0d5fb

+ 108 - 0
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/bean/RecordDataSyncRule.java

@@ -0,0 +1,108 @@
+package cn.com.qmth.examcloud.core.oe.task.service.bean;
+
+import cn.com.qmth.examcloud.api.commons.exchange.JsonSerializable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class RecordDataSyncRule implements JsonSerializable {
+
+    private static final long serialVersionUID = 802462624028077528L;
+
+    private Set<Long> allowOrgList;
+
+    private Set<Long> limitOrgList;
+
+    private Boolean allowOrgAll;
+
+    public RecordDataSyncRule(String recordDataSyncAllowOrgList, String recordDataSyncLimitOrgList) {
+        /*
+            规则:
+            allowOrgList 和 limitOrgList 若配置值,则按值处理
+            allowOrgList 和 limitOrgList 若都未配置值,则按允许所有处理
+         */
+
+        Set<Long> allowOrgList = new HashSet<>();
+        if (StringUtils.isNotEmpty(recordDataSyncAllowOrgList)) {
+            String[] values = StringUtils.split(recordDataSyncAllowOrgList, ",");
+            for (String value : values) {
+                if (StringUtils.isBlank(value)) {
+                    continue;
+                }
+
+                allowOrgList.add(Long.parseLong(value.trim()));
+            }
+        }
+
+        Set<Long> limitOrgList = new HashSet<>();
+        if (StringUtils.isNotEmpty(recordDataSyncLimitOrgList)) {
+            String[] values = StringUtils.split(recordDataSyncLimitOrgList, ",");
+            for (String value : values) {
+                if (StringUtils.isBlank(value)) {
+                    continue;
+                }
+
+                limitOrgList.add(Long.parseLong(value.trim()));
+            }
+        }
+
+        this.allowOrgList = allowOrgList;
+        this.limitOrgList = limitOrgList;
+
+        if (CollectionUtils.isEmpty(allowOrgList) && CollectionUtils.isEmpty(limitOrgList)) {
+            this.allowOrgAll = true;
+        } else {
+            this.allowOrgAll = false;
+        }
+    }
+
+    public boolean isAllowOrg(Long orgId) {
+        if (this.allowOrgList == null) {
+            return false;
+        }
+        return this.allowOrgList.contains(orgId);
+    }
+
+    public boolean isLimitOrg(Long orgId) {
+        if (this.limitOrgList == null) {
+            return false;
+        }
+        return this.limitOrgList.contains(orgId);
+    }
+
+    public Set<Long> getAllowOrgList() {
+        return allowOrgList;
+    }
+
+    public void setAllowOrgList(Set<Long> allowOrgList) {
+        this.allowOrgList = allowOrgList;
+    }
+
+    public Set<Long> getLimitOrgList() {
+        return limitOrgList;
+    }
+
+    public void setLimitOrgList(Set<Long> limitOrgList) {
+        this.limitOrgList = limitOrgList;
+    }
+
+    public boolean isAllowOrgAll() {
+        return allowOrgAll;
+    }
+
+    public void setAllowOrgAll(boolean allowOrgAll) {
+        this.allowOrgAll = allowOrgAll;
+    }
+
+    @Override
+    public String toString() {
+        return "RecordDataSyncRule{" +
+                "allowOrgList=" + allowOrgList +
+                ", limitOrgList=" + limitOrgList +
+                ", allowOrgAll=" + allowOrgAll +
+                '}';
+    }
+
+}

+ 76 - 8
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/pipeline/DataGainExamExecutor.java

@@ -12,9 +12,13 @@ import cn.com.qmth.examcloud.core.oe.student.api.request.UpdateExamRecordDataBat
 import cn.com.qmth.examcloud.core.oe.student.api.request.UpdateExamRecordStatusReq;
 import cn.com.qmth.examcloud.core.oe.student.api.response.GetExamRecordDataIdsResp;
 import cn.com.qmth.examcloud.core.oe.task.service.ExamRecordDataService;
+import cn.com.qmth.examcloud.core.oe.task.service.bean.RecordDataSyncRule;
 import cn.com.qmth.examcloud.support.enums.ExamRecordStatus;
 import cn.com.qmth.examcloud.support.examing.ExamRecordData;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
@@ -42,20 +46,33 @@ public class DataGainExamExecutor implements NodeExecuter<Long, ExamRecordData,
     @Autowired
     private ExamRecordDataService examRecordDataService;
 
+    @Value("${examcloud.record.data.sync.allowOrgList}")
+    private String recordDataSyncAllowOrgList;
+
+    @Value("${examcloud.record.data.sync.limitOrgList}")
+    private String recordDataSyncLimitOrgList;
+
+    @Autowired
+    private JdbcTemplate jdbcTemplate;
+
     @Override
     public void execute(Long key, ExamRecordData value, List<KeyValuePair<Long, ExamRecordData>> outList,
                         ObjectHolder<Boolean> removable, TaskContext context) throws Exception {
-        LOG.debug("[DataGainExamExecutor]开始获取数据");
+        RecordDataSyncRule syncRule = new RecordDataSyncRule(recordDataSyncAllowOrgList, recordDataSyncLimitOrgList);
+        LOG.warn(syncRule.toString());
+
+        LOG.info("[DataGainExamExecutor]开始获取数据");
         Date start = new Date();
         // 获取考试信息id
         Long startId = 0l;
         GetExamRecordDataIdsReq req = new GetExamRecordDataIdsReq();
         req.setBatchNum(batchNum);
         req.setSize(batchSize);
+
         for (; ; ) {
-            List<KeyValuePair<Long, ExamRecordData>> tempList = new ArrayList<KeyValuePair<Long, ExamRecordData>>();
             req.setStartId(startId);
             GetExamRecordDataIdsResp res = null;
+
             try {
                 res = examRecordDataCloudService.getExamRecordDataIds(req);
             } catch (Exception e) {
@@ -64,28 +81,68 @@ public class DataGainExamExecutor implements NodeExecuter<Long, ExamRecordData,
                 LOG.error("[DataGainExamExecutor]获取数据库中考试信息出错 startId:" + startId + " 获取数据条数:" + outList.size() + " 耗时:" + times + "ms", e);
                 return;
             }
+
             List<Long> ids = res.getExamRecordDataIds();
             if (ids == null || ids.size() == 0) {
                 Date end = new Date();
                 long times = end.getTime() - start.getTime();
-                LOG.debug("[DataGainExamExecutor]获取数据条数:" + outList.size() + " 耗时:" + times + "ms");
+                LOG.info("[DataGainExamExecutor]获取数据条数:" + outList.size() + " 耗时:" + times + "ms");
                 return;
             }
+
+            List<KeyValuePair<Long, ExamRecordData>> tempList = new ArrayList<>();
+            List<Long> newExamRecordDataIds = new ArrayList<>();
+
             for (Long id : ids) {
                 // 根据id获取考试信息缓存
-                ExamRecordData erd = examRecordDataService.getExamRecordDataCache(id);
-                if (erd == null) {
+                ExamRecordData examRecordData = examRecordDataService.getExamRecordDataCache(id);
+
+                if (examRecordData == null) {
                     LOG.error("[DataGainExamExecutor]获取Redis中考试信息为空 examRecordDataId:" + id);
-                    updateExamRecordStatusError(id);
+                    examRecordData = this.queryExamRecordData(id);
+
+                    if (examRecordData != null) {
+
+                        if (syncRule.getAllowOrgList().isEmpty()) {
+                            // 未配置“允许机构列表”值时,默认执行所有机构,但要跳过“限制机构列表”中的机构
+                            if (syncRule.isLimitOrg(examRecordData.getRootOrgId())) {
+                                continue;
+                            }
+
+                            updateExamRecordStatusError(id);
+                        } else {
+                            // 配置了“允许机构列表”值时,只执行“允许机构列表”范围内的机构
+                            if (syncRule.isAllowOrg(examRecordData.getRootOrgId())) {
+                                updateExamRecordStatusError(id);
+                            }
+                        }
+
+                    }
                 } else {
-                    tempList.add(new KeyValuePair<Long, ExamRecordData>(id, erd));
+
+                    if (syncRule.getAllowOrgList().isEmpty()) {
+                        // 未配置“允许机构列表”值时,默认执行所有机构,但要跳过“限制机构列表”中的机构
+                        if (syncRule.isLimitOrg(examRecordData.getRootOrgId())) {
+                            continue;
+                        }
+
+                        newExamRecordDataIds.add(id);
+                        tempList.add(new KeyValuePair<>(id, examRecordData));
+                    } else {
+                        // 配置了“允许机构列表”值时,只执行“允许机构列表”范围内的机构
+                        if (syncRule.isAllowOrg(examRecordData.getRootOrgId())) {
+                            newExamRecordDataIds.add(id);
+                            tempList.add(new KeyValuePair<>(id, examRecordData));
+                        }
+                    }
+
                 }
             }
 
             // 修改已获取过的考试信息batchNum
             UpdateExamRecordDataBatchNumReq ureq = new UpdateExamRecordDataBatchNumReq();
             ureq.setBatchNum(batchNum);
-            ureq.setIds(ids);
+            ureq.setIds(newExamRecordDataIds);
             try {
                 examRecordDataCloudService.updateExamRecordDataBatchNum(ureq);
             } catch (Exception e) {
@@ -94,11 +151,22 @@ public class DataGainExamExecutor implements NodeExecuter<Long, ExamRecordData,
                 LOG.error("[DataGainExamExecutor]修改考试记录batchNum出错 startId:" + startId + " 获取数据条数:" + outList.size() + " 耗时:" + times + "ms", e);
                 return;
             }
+
             outList.addAll(tempList);
             startId = ids.get(ids.size() - 1);
         }
     }
 
+    private ExamRecordData queryExamRecordData(Long id) {
+        String sql = String.format("select id,root_org_id,exam_record_status,sync_status from ec_oes_exam_record_data where id = %s", id);
+        try {
+            return jdbcTemplate.queryForObject(sql, new BeanPropertyRowMapper<>(ExamRecordData.class));
+        } catch (Exception e) {
+            // ignore
+            return null;
+        }
+    }
+
     private void updateExamRecordStatusError(Long id) {
         try {
             UpdateExamRecordStatusReq ureq = new UpdateExamRecordStatusReq();

+ 1 - 1
examcloud-core-oe-task-starter/shell/start.sh

@@ -11,7 +11,7 @@ PID_LIST=`ps -ef|grep $APP_MAIN_JAR|grep java|awk '{print $2}'`
 
 if [ ! -z "$PID_LIST" ]; then
     echo "[ERROR] : APP is already running!"
-    exit -1
+#    exit -1
 fi
 
 if [ "$1" ];then

+ 9 - 0
examcloud-core-oe-task-starter/src/main/java/cn/com/qmth/examcloud/core/oe/task/starter/config/ProcessBaiduFaceLivenessAlarmTask.java

@@ -12,6 +12,7 @@ import cn.com.qmth.examcloud.support.cache.bean.SysPropertyCacheBean;
 import com.google.common.collect.Maps;
 import com.googlecode.aviator.AviatorEvaluator;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.core.annotation.Order;
@@ -38,8 +39,16 @@ public class ProcessBaiduFaceLivenessAlarmTask implements ApplicationRunner {
     @Autowired
     SmsCloudService smsCloudService;
 
+    @Value("${examcloud.face.compare.task.enable}")
+    private Boolean faceCompareTaskEnable;
+
     @Override
     public void run(ApplicationArguments args) throws Exception {
+        captureLog.warn("ProcessBaiduFaceLivenessAlarmTask enable is " + faceCompareTaskEnable);
+        if (!faceCompareTaskEnable) {
+            return;
+        }
+
         Thread thread = new Thread(new Runnable() {
 
             @Override

+ 8 - 0
examcloud-core-oe-task-starter/src/main/java/cn/com/qmth/examcloud/core/oe/task/starter/config/ProcessBaiduFacelivenessTask.java

@@ -11,6 +11,7 @@ import cn.com.qmth.examcloud.core.oe.task.service.worker.BaiduFaceLivenessWorker
 import cn.com.qmth.examcloud.web.bootstrap.PropertyHolder;
 import org.apache.logging.log4j.ThreadContext;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.core.annotation.Order;
@@ -31,6 +32,9 @@ public class ProcessBaiduFacelivenessTask implements ApplicationRunner {
     @Autowired
     ExamCaptureQueueRepo examCaptureQueueRepo;
 
+    @Value("${examcloud.face.compare.task.enable}")
+    private Boolean faceCompareTaskEnable;
+
     private int process(ConcurrentTask<ExamCaptureQueueInfo> concurrentTask,
                         String processBatchNum, Integer limit) {
 
@@ -76,6 +80,10 @@ public class ProcessBaiduFacelivenessTask implements ApplicationRunner {
 
     @Override
     public void run(ApplicationArguments args) {
+        captureLog.warn("ProcessBaiduFacelivenessTask enable is " + faceCompareTaskEnable);
+        if (!faceCompareTaskEnable) {
+            return;
+        }
 
         ConcurrentTask<ExamCaptureQueueInfo> concurrentTask = new ConcurrentTask<ExamCaptureQueueInfo>("百度活体检测");
         concurrentTask.setMaxActiveThreadSize(

+ 8 - 0
examcloud-core-oe-task-starter/src/main/java/cn/com/qmth/examcloud/core/oe/task/starter/config/ProcessFaceCompareAlarmTask.java

@@ -12,6 +12,7 @@ import cn.com.qmth.examcloud.support.cache.bean.SysPropertyCacheBean;
 import com.google.common.collect.Maps;
 import com.googlecode.aviator.AviatorEvaluator;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.core.annotation.Order;
@@ -35,11 +36,18 @@ public class ProcessFaceCompareAlarmTask implements ApplicationRunner {
     @Autowired
     SmsCloudService smsCloudService;
 
+    @Value("${examcloud.face.compare.task.enable}")
+    private Boolean faceCompareTaskEnable;
+
     private final ExamCloudLog captureLog = ExamCloudLogFactory
             .getLog("PROCESS_EXAM_CAPTURE_TASK_LOGGER");
 
     @Override
     public void run(ApplicationArguments args) throws Exception {
+        captureLog.warn("ProcessFaceCompareAlarmTask enable is " + faceCompareTaskEnable);
+        if (!faceCompareTaskEnable) {
+            return;
+        }
 
         Thread thread = new Thread(new Runnable() {
 

+ 8 - 0
examcloud-core-oe-task-starter/src/main/java/cn/com/qmth/examcloud/core/oe/task/starter/config/ProcessFaceCompareQueueTask.java

@@ -12,6 +12,7 @@ import cn.com.qmth.examcloud.exchange.inner.api.SmsCloudService;
 import cn.com.qmth.examcloud.web.bootstrap.PropertyHolder;
 import org.apache.logging.log4j.ThreadContext;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.core.annotation.Order;
@@ -35,6 +36,9 @@ public class ProcessFaceCompareQueueTask implements ApplicationRunner {
     @Autowired
     SmsCloudService smsCloudService;
 
+    @Value("${examcloud.face.compare.task.enable}")
+    private Boolean faceCompareTaskEnable;
+
     private int process(ConcurrentTask<ExamCaptureQueueInfo> concurrentTask,
                         String processBatchNum, Integer limit) {
         // 如果队列没满,则从数据库中查数据并插入
@@ -79,6 +83,10 @@ public class ProcessFaceCompareQueueTask implements ApplicationRunner {
 
     @Override
     public void run(ApplicationArguments args) {
+        captureLog.warn("ProcessFaceCompareQueueTask enable is " + faceCompareTaskEnable);
+        if (!faceCompareTaskEnable) {
+            return;
+        }
 
         ConcurrentTask<ExamCaptureQueueInfo> concurrentTask = new ConcurrentTask<ExamCaptureQueueInfo>("Face++人脸比对");
         concurrentTask.setMaxActiveThreadSize(

+ 14 - 1
examcloud-core-oe-task-starter/src/main/java/cn/com/qmth/examcloud/core/oe/task/starter/config/StreamTaskExecutor.java

@@ -3,10 +3,13 @@ package cn.com.qmth.examcloud.core.oe.task.starter.config;
 import cn.com.qmth.examcloud.commons.helpers.pipeline.Node;
 import cn.com.qmth.examcloud.commons.helpers.pipeline.SimpleNode;
 import cn.com.qmth.examcloud.commons.helpers.pipeline.TaskContext;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
 import cn.com.qmth.examcloud.core.oe.task.service.pipeline.*;
 import cn.com.qmth.examcloud.support.examing.ExamRecordData;
 import cn.com.qmth.examcloud.web.bootstrap.PropertyHolder;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.core.annotation.Order;
@@ -22,6 +25,8 @@ import org.springframework.stereotype.Component;
 @Order(300)
 public class StreamTaskExecutor implements ApplicationRunner {
 
+    private static final ExamCloudLog LOG = ExamCloudLogFactory.getLog(StreamTaskExecutor.class);
+
     @Autowired
     private DataGainExamExecutor dataGainExamExecutor;
 
@@ -37,6 +42,9 @@ public class StreamTaskExecutor implements ApplicationRunner {
     @Autowired
     private ClearExamDataCacheExecutor clearExamDataCacheExecutor;
 
+    @Value("${examcloud.record.data.sync.enable}")
+    private Boolean recordDataSyncEnable;
+
     private static Integer DEFAULT_GAIN_EXAM_DATA_EXECUTOR_SLEEP_SECONDS = 2;
 
     private static Integer DEFAULT_HAND_IN_EXAM_EXECUTOR_SLEEP_SECONDS = 2;
@@ -49,7 +57,12 @@ public class StreamTaskExecutor implements ApplicationRunner {
 
     @Override
     public void run(ApplicationArguments args) throws Exception {
-        initExecutor();
+        // boolean recordDataSyncEnable = PropertyHolder.getBoolean("examcloud.record.data.sync.enable", false);
+        LOG.warn("examcloud.record.data.sync.enable = " + recordDataSyncEnable);
+
+        if (recordDataSyncEnable) {
+            initExecutor();
+        }
     }
 
     private void initExecutor() {

+ 48 - 48
examcloud-core-oe-task-starter/src/main/java/cn/com/qmth/examcloud/core/oe/task/starter/config/SystemStartup.java

@@ -1,48 +1,48 @@
-package cn.com.qmth.examcloud.core.oe.task.starter.config;
-
-import cn.com.qmth.examcloud.commons.exception.ExamCloudRuntimeException;
-import cn.com.qmth.examcloud.web.bootstrap.PropertyHolder;
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.cloud.client.ServiceInstance;
-import org.springframework.cloud.client.discovery.DiscoveryClient;
-import org.springframework.core.annotation.Order;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-
-/**
- * 系统启动
- *
- * @author WANGWEI
- * @date 2018年11月29日
- * @Copyright (c) 2018-? http://qmth.com.cn All Rights Reserved.
- */
-@Component
-@Order(99)
-public class SystemStartup implements ApplicationRunner {
-
-    @Autowired
-    private DiscoveryClient discoveryClient;
-
-    public void start() {
-
-        String appName = PropertyHolder.getString("spring.application.name");
-
-        if (StringUtils.isNotBlank(appName)) {
-            List<ServiceInstance> instances = discoveryClient.getInstances(appName);
-            if (!instances.isEmpty()) {
-                throw new ExamCloudRuntimeException("multiple instances!");
-            }
-        }
-
-    }
-
-    @Override
-    public void run(ApplicationArguments args) throws Exception {
-        start();
-    }
-
-}
+// package cn.com.qmth.examcloud.core.oe.task.starter.config;
+//
+// import cn.com.qmth.examcloud.commons.exception.ExamCloudRuntimeException;
+// import cn.com.qmth.examcloud.web.bootstrap.PropertyHolder;
+// import org.apache.commons.lang3.StringUtils;
+// import org.springframework.beans.factory.annotation.Autowired;
+// import org.springframework.boot.ApplicationArguments;
+// import org.springframework.boot.ApplicationRunner;
+// import org.springframework.cloud.client.ServiceInstance;
+// import org.springframework.cloud.client.discovery.DiscoveryClient;
+// import org.springframework.core.annotation.Order;
+// import org.springframework.stereotype.Component;
+//
+// import java.util.List;
+//
+// /**
+//  * 系统启动
+//  *
+//  * @author WANGWEI
+//  * @date 2018年11月29日
+//  * @Copyright (c) 2018-? http://qmth.com.cn All Rights Reserved.
+//  */
+// @Component
+// @Order(99)
+// public class SystemStartup implements ApplicationRunner {
+//
+//     @Autowired
+//     private DiscoveryClient discoveryClient;
+//
+//     public void start() {
+//
+//         String appName = PropertyHolder.getString("spring.application.name");
+//
+//         if (StringUtils.isNotBlank(appName)) {
+//             List<ServiceInstance> instances = discoveryClient.getInstances(appName);
+//             if (!instances.isEmpty()) {
+//                 throw new ExamCloudRuntimeException("multiple instances!");
+//             }
+//         }
+//
+//     }
+//
+//     @Override
+//     public void run(ApplicationArguments args) throws Exception {
+//         start();
+//     }
+//
+// }

+ 6 - 1
examcloud-core-oe-task-starter/src/main/resources/application.properties

@@ -3,4 +3,9 @@ spring.profiles.active=dev
 examcloud.startup.startupCode=3000
 examcloud.startup.configCenterHost=192.168.10.39
 examcloud.startup.configCenterPort=9999
-examcloud.startup.appCode=OET
+examcloud.startup.appCode=OET
+
+examcloud.face.compare.task.enable=false
+examcloud.record.data.sync.enable=false
+examcloud.record.data.sync.allowOrgList=
+examcloud.record.data.sync.limitOrgList=