瀏覽代碼

考试处理数据加载节点

xiatian 5 年之前
父節點
當前提交
28c3efad67

+ 5 - 0
examcloud-core-oe-task-base/pom.xml

@@ -56,6 +56,11 @@
             <artifactId>examcloud-core-oe-admin-api-client</artifactId>
             <version>${examcloud.version}</version>
         </dependency>
+        <dependency>
+            <groupId>cn.com.qmth.examcloud.rpc</groupId>
+            <artifactId>examcloud-core-oe-student-api-client</artifactId>
+            <version>${examcloud.version}</version>
+        </dependency>
         <dependency>
             <groupId>cn.com.qmth.examcloud.rpc</groupId>
             <artifactId>examcloud-ws-api-client</artifactId>

+ 105 - 0
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/pipeline/DataGainExamExecutor.java

@@ -0,0 +1,105 @@
+package cn.com.qmth.examcloud.core.oe.task.service.pipeline;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import cn.com.qmth.examcloud.commons.exception.StatusException;
+import cn.com.qmth.examcloud.commons.helpers.KeyValuePair;
+import cn.com.qmth.examcloud.commons.helpers.pipeline.NodeExecuter;
+import cn.com.qmth.examcloud.commons.helpers.pipeline.TaskContext;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLog;
+import cn.com.qmth.examcloud.commons.logging.ExamCloudLogFactory;
+import cn.com.qmth.examcloud.commons.util.Util;
+import cn.com.qmth.examcloud.core.oe.student.api.ExamRecordDataCloudService;
+import cn.com.qmth.examcloud.core.oe.student.api.request.GetExamRecordDataIdsReq;
+import cn.com.qmth.examcloud.core.oe.student.api.request.UpdateExamRecordDataBatchNumReq;
+import cn.com.qmth.examcloud.core.oe.student.api.response.GetExamRecordDataIdsResp;
+import cn.com.qmth.examcloud.support.examing.ExamRecordData;
+import cn.com.qmth.examcloud.support.redis.RedisKeyHelper;
+import cn.com.qmth.examcloud.web.redis.RedisClient;
+
+/**
+ * @Description 获取待处理数据
+ * @Author lideyin
+ * @Date 2019/12/17 16:39
+ * @Version 1.0
+ */
+@Component
+public class DataGainExamExecutor implements NodeExecuter<Long, ExamRecordData, Long, ExamRecordData> {
+
+    private static final ExamCloudLog LOG = ExamCloudLogFactory.getLog(DataGainExamExecutor.class);
+
+    private final static Integer batchSize = 200;
+    
+    private final static Long batchNum = new Date().getTime();
+
+    private static Long startId = 0l;
+
+    private static Boolean isSleep = false;
+
+    @Autowired
+    private ExamRecordDataCloudService examRecordDataCloudService;
+
+    @Autowired
+    private RedisClient redisClient;
+
+    @Override
+    public List<KeyValuePair<Long, ExamRecordData>> execute(Long key, ExamRecordData examRecordData,
+            TaskContext context) throws Exception {
+        if (isSleep) {
+            try {
+                Util.sleep(10);
+            } catch (Exception e) {
+                LOG.error("sleep Exception.", e);
+            }
+        }
+
+        List<KeyValuePair<Long, ExamRecordData>> resultList = new ArrayList<>();
+
+        // 获取考试信息id
+        GetExamRecordDataIdsReq req = new GetExamRecordDataIdsReq();
+        req.setBatchNum(batchNum);
+        req.setSize(batchSize);
+        req.setStartId(startId);
+        GetExamRecordDataIdsResp res = examRecordDataCloudService.getExamRecordDataIds(req);
+        List<Long> ids = res.getExamRecordDataIds();
+        if (ids == null || ids.size() == 0) {
+            isSleep=true;
+            return resultList;
+        }
+        for (Long id : ids) {
+            // 根据id获取考试信息缓存
+            ExamRecordData erd = getExamRecordDataCache(id);
+            if (erd == null) {
+                throw new StatusException("1001", "获取Redis中考试信息为空");
+            }
+            resultList.add(new KeyValuePair<Long, ExamRecordData>(id, erd));
+        }
+
+        // 修改已获取过的考试信息batchNum
+        UpdateExamRecordDataBatchNumReq ureq = new UpdateExamRecordDataBatchNumReq();
+        ureq.setBatchNum(batchNum);
+        ureq.setIds(ids);
+        examRecordDataCloudService.updateExamRecordDataBatchNum(ureq);
+
+        // 记录startId以便下次取数据
+        startId = ids.get(ids.size() - 1);
+        
+        //取到的数据未达到batchSize则下次进来sleep
+        if(ids.size()<batchSize) {
+            isSleep=true;
+        }else {
+            isSleep=false;
+        }
+        return resultList;
+    }
+
+    private ExamRecordData getExamRecordDataCache(Long examRecordDataId) {
+        String key = RedisKeyHelper.getBuilder().examRecordDataKey(examRecordDataId);
+        return redisClient.get(key + examRecordDataId, ExamRecordData.class);
+    }
+}