xiatian 5 years ago
parent
commit
3be1bb553b

+ 24 - 52
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/pipeline/DataGainExamExecutor.java

@@ -1,6 +1,5 @@
 package cn.com.qmth.examcloud.core.oe.task.service.pipeline;
 
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
@@ -11,9 +10,6 @@ import cn.com.qmth.examcloud.commons.exception.StatusException;
 import cn.com.qmth.examcloud.commons.helpers.KeyValuePair;
 import cn.com.qmth.examcloud.commons.helpers.pipeline.NodeExecuter;
 import cn.com.qmth.examcloud.commons.helpers.pipeline.TaskContext;
-import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
-import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
-import cn.com.qmth.examcloud.commons.util.Util;
 import cn.com.qmth.examcloud.core.oe.student.api.ExamRecordDataCloudService;
 import cn.com.qmth.examcloud.core.oe.student.api.request.GetExamRecordDataIdsReq;
 import cn.com.qmth.examcloud.core.oe.student.api.request.UpdateExamRecordDataBatchNumReq;
@@ -30,15 +26,9 @@ import cn.com.qmth.examcloud.support.examing.ExamRecordData;
 @Component
 public class DataGainExamExecutor implements NodeExecuter<Long, ExamRecordData, Long, ExamRecordData> {
 
-    private static final ExamCloudLog LOG = ExamCloudLogFactory.getLog(DataGainExamExecutor.class);
-
     private final static Integer batchSize = 200;
-    
-    private final static Long batchNum = new Date().getTime();
-
-    private static Long startId = 0l;
 
-    private static Boolean isSleep = false;
+    private final static Long batchNum = new Date().getTime();
 
     @Autowired
     private ExamRecordDataCloudService examRecordDataCloudService;
@@ -47,54 +37,36 @@ public class DataGainExamExecutor implements NodeExecuter<Long, ExamRecordData,
     private ExamRecordDataService examRecordDataService;
 
     @Override
-    public List<KeyValuePair<Long, ExamRecordData>> execute(Long key, ExamRecordData examRecordData,
-            TaskContext context) throws Exception {
-        if (isSleep) {
-            try {
-                Util.sleep(10);
-            } catch (Exception e) {
-                LOG.error("sleep Exception.", e);
-            }
-        }
-
-        List<KeyValuePair<Long, ExamRecordData>> resultList = new ArrayList<>();
-
+    public void execute(Long key, ExamRecordData value, List<KeyValuePair<Long, ExamRecordData>> outList,
+            Boolean removable, TaskContext context) throws Exception {
         // 获取考试信息id
+        Long startId = 0l;
         GetExamRecordDataIdsReq req = new GetExamRecordDataIdsReq();
         req.setBatchNum(batchNum);
         req.setSize(batchSize);
-        req.setStartId(startId);
-        GetExamRecordDataIdsResp res = examRecordDataCloudService.getExamRecordDataIds(req);
-        List<Long> ids = res.getExamRecordDataIds();
-        if (ids == null || ids.size() == 0) {
-            isSleep=true;
-            return resultList;
-        }
-        for (Long id : ids) {
-            // 根据id获取考试信息缓存
-            ExamRecordData erd = examRecordDataService.getExamRecordDataCache(id);
-            if (erd == null) {
-                throw new StatusException("1001", "获取Redis中考试信息为空");
+        for (;;) {
+            req.setStartId(startId);
+            GetExamRecordDataIdsResp res = examRecordDataCloudService.getExamRecordDataIds(req);
+            List<Long> ids = res.getExamRecordDataIds();
+            if (ids == null || ids.size() == 0) {
+                return;
+            }
+            for (Long id : ids) {
+                // 根据id获取考试信息缓存
+                ExamRecordData erd = examRecordDataService.getExamRecordDataCache(id);
+                if (erd == null) {
+                    throw new StatusException("1001", "获取Redis中考试信息为空");
+                }
+                outList.add(new KeyValuePair<Long, ExamRecordData>(id, erd));
             }
-            resultList.add(new KeyValuePair<Long, ExamRecordData>(id, erd));
-        }
-
-        // 修改已获取过的考试信息batchNum
-        UpdateExamRecordDataBatchNumReq ureq = new UpdateExamRecordDataBatchNumReq();
-        ureq.setBatchNum(batchNum);
-        ureq.setIds(ids);
-        examRecordDataCloudService.updateExamRecordDataBatchNum(ureq);
 
-        // 记录startId以便下次取数据
-        startId = ids.get(ids.size() - 1);
-        
-        //取到的数据未达到batchSize则下次进来sleep
-        if(ids.size()<batchSize) {
-            isSleep=true;
-        }else {
-            isSleep=false;
+            // 修改已获取过的考试信息batchNum
+            UpdateExamRecordDataBatchNumReq ureq = new UpdateExamRecordDataBatchNumReq();
+            ureq.setBatchNum(batchNum);
+            ureq.setIds(ids);
+            examRecordDataCloudService.updateExamRecordDataBatchNum(ureq);
+            startId = ids.get(ids.size() - 1);
         }
-        return resultList;
     }
 
 }