Преглед на файлове

streamTask 功能改进

lideyin преди 5 години
родител
ревизия
30d192040f

+ 23 - 13
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/pipeline/AfterHandInExamExecutor.java

@@ -39,8 +39,21 @@ public class AfterHandInExamExecutor implements NodeExecuter<Long, ExamRecordDat
     private ExamRecordDataService examRecordDataService;
     private static Long DEFAULT_MAX_PROCESS_SECONDS = 30L;
 
+    /**
+     * 执行
+     *
+     * @param key
+     * @param value
+     * @param outList
+     * @param removable
+     * @param context
+     * @throws Exception
+     * @author WANGWEI
+     */
     @Override
-    public List<KeyValuePair<Long, ExamRecordData>> execute(Long key, ExamRecordData examRecordData, TaskContext context) throws Exception {
+    public void execute(Long key, ExamRecordData examRecordData,
+                        List<KeyValuePair<Long, ExamRecordData>> outList,
+                        Boolean removable, TaskContext context) throws Exception {
 
         String sequenceLockKey = Constants.EXAM_CONTROL_LOCK_PREFIX + examRecordData.getStudentId();
 
@@ -48,9 +61,6 @@ public class AfterHandInExamExecutor implements NodeExecuter<Long, ExamRecordDat
             //添加考试控制全局锁
             SequenceLockHelper.getLockSimple(sequenceLockKey);
 
-            List<KeyValuePair<Long, ExamRecordData>> resultList = new ArrayList<>();
-            KeyValuePair<Long, ExamRecordData> keyValuePair = new KeyValuePair<>(key, examRecordData);
-
             //针对已交卷的数据进行交卷后续处理
             if (examRecordData.getExamRecordStatus() == ExamRecordStatus.EXAM_HAND_IN ||
                     examRecordData.getExamRecordStatus() == ExamRecordStatus.EXAM_AUTO_HAND_IN) {
@@ -65,28 +75,28 @@ public class AfterHandInExamExecutor implements NodeExecuter<Long, ExamRecordDat
                 }
 
                 //交卷时间戳
-                Long handInTime = (examRecordData.getEndTime() == null ? examRecordData.getCleanTime() : examRecordData.getEndTime()).getTime();
+                Long handInTime = (examRecordData.getEndTime() == null
+                        ? examRecordData.getCleanTime()
+                        : examRecordData.getEndTime()).getTime();
                 Long times = System.currentTimeMillis() - handInTime;
 
                 //如果交卷后超过指定时长内仍未处理完成,则交给下一节点进行处理
                 if (times > maxProcessSeconds * 1000) {
-                    resultList.add(keyValuePair);
-                    return resultList;
+                    outList.add(new KeyValuePair<>(key, examRecordData));
+                    return;
                 }
 
                 examRecordData = examRecordDataService.processAfterHandInExam(examRecordData.getId());
 
-                keyValuePair.setValue(examRecordData);
-                resultList.add(keyValuePair);
+                outList.add(new KeyValuePair<>(key, examRecordData));
 
-                return resultList;
+                return;
             }
 
-            resultList.add(keyValuePair);
-            return resultList;
+            //其它状态数据,直接交给下一步
+            outList.add(new KeyValuePair<>(key, examRecordData));
         } finally {
             SequenceLockHelper.releaseLockSimple(sequenceLockKey);
         }
     }
-
 }

+ 16 - 7
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/pipeline/ClearExamDataCacheExecutor.java

@@ -3,12 +3,7 @@ package cn.com.qmth.examcloud.core.oe.task.service.pipeline;
 import cn.com.qmth.examcloud.commons.helpers.KeyValuePair;
 import cn.com.qmth.examcloud.commons.helpers.pipeline.NodeExecuter;
 import cn.com.qmth.examcloud.commons.helpers.pipeline.TaskContext;
-import cn.com.qmth.examcloud.core.oe.admin.api.SyncExamDataCloudService;
-import cn.com.qmth.examcloud.core.oe.student.api.ExamRecordDataCloudService;
-import cn.com.qmth.examcloud.core.oe.task.dao.ExamCaptureRepo;
-import cn.com.qmth.examcloud.core.oe.task.dao.ExamSyncCaptureRepo;
 import cn.com.qmth.examcloud.core.oe.task.service.ExamBossService;
-import cn.com.qmth.examcloud.core.oe.task.service.ExamRecordDataService;
 import cn.com.qmth.examcloud.support.Constants;
 import cn.com.qmth.examcloud.support.enums.SyncStatus;
 import cn.com.qmth.examcloud.support.examing.ExamBoss;
@@ -31,8 +26,22 @@ public class ClearExamDataCacheExecutor implements NodeExecuter<Long, ExamRecord
     @Autowired
     private ExamBossService examBossService;
 
+    /**
+     * 执行
+     *
+     * @param key
+     * @param examRecordData
+     * @param outList
+     * @param removable
+     * @param context
+     * @throws Exception
+     * @author WANGWEI
+     */
     @Override
-    public List<KeyValuePair<Long, ExamRecordData>> execute(Long key, ExamRecordData examRecordData, TaskContext context) throws Exception {
+    public void execute(Long key, ExamRecordData examRecordData,
+                        List<KeyValuePair<Long, ExamRecordData>> outList,
+                        Boolean removable, TaskContext context) throws Exception {
+
         String sequenceLockKey = Constants.EXAM_CONTROL_LOCK_PREFIX + examRecordData.getStudentId();
 
         try {
@@ -44,7 +53,6 @@ public class ClearExamDataCacheExecutor implements NodeExecuter<Long, ExamRecord
                 clearExamCache(examRecordData.getExamStudentId());
             }
 
-            return null;
         } finally {
             SequenceLockHelper.releaseLockSimple(sequenceLockKey);
         }
@@ -67,4 +75,5 @@ public class ClearExamDataCacheExecutor implements NodeExecuter<Long, ExamRecord
             examBossService.deleteExamBoss(examId);
         }
     }
+
 }

+ 67 - 32
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/pipeline/HandInExamExecutor.java

@@ -4,6 +4,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
+import cn.com.qmth.examcloud.commons.exception.StatusException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -38,8 +39,21 @@ public class HandInExamExecutor implements NodeExecuter<Long, ExamRecordData, Lo
     @Autowired
     private RedisClient redisClient;
 
+    /**
+     * 执行
+     *
+     * @param key
+     * @param examRecordData
+     * @param outList
+     * @param removable
+     * @param context
+     * @throws Exception
+     * @author WANGWEI
+     */
     @Override
-    public List<KeyValuePair<Long, ExamRecordData>> execute(Long key, ExamRecordData examRecordData, TaskContext context) throws Exception {
+    public void execute(Long key, ExamRecordData examRecordData,
+                        List<KeyValuePair<Long, ExamRecordData>> outList,
+                        Boolean removable, TaskContext context) throws Exception {
 
         String sequenceLockKey = Constants.EXAM_CONTROL_LOCK_PREFIX + examRecordData.getStudentId();
 
@@ -47,34 +61,55 @@ public class HandInExamExecutor implements NodeExecuter<Long, ExamRecordData, Lo
             //添加考试控制全局锁
             SequenceLockHelper.getLockSimple(sequenceLockKey);
 
-            List<KeyValuePair<Long, ExamRecordData>> resultList = new ArrayList<>();
-            KeyValuePair<Long, ExamRecordData> keyValuePair = new KeyValuePair<>(key, examRecordData);
+            //获取最新的考试记录状态
+            ExamRecordData examRecordDataCache = examRecordDataService.getExamRecordDataCache(examRecordData.getId());
+
+            examRecordData.setExamRecordStatus(examRecordDataCache.getExamRecordStatus());
 
             //处理正在进行中的考试
             if (examRecordData.getExamRecordStatus() == ExamRecordStatus.EXAM_ING) {
+
                 ExamingSession examingSession = examingSessionService.getExamingSession(examRecordData.getStudentId());
 
                 // 如果考试会话不存在/超过考试时间/超过断点续考时间,自动交卷
                 if (null == examingSession || isOverExamTime(examingSession) || isOverBreakpointTime(examingSession)) {
-                    Date now = new Date();
 
-                    //更改内存中的交卷状态
-                    examRecordData.setExamRecordStatus(ExamRecordStatus.EXAM_AUTO_HAND_IN);
-                    examRecordData.setCleanTime(now);
+                    try {
+                        //更改内存中的交卷状态
+                        examRecordData.setExamRecordStatus(ExamRecordStatus.EXAM_AUTO_HAND_IN);
+                        examRecordData.setCleanTime(new Date());
+
+                        examRecordDataService.saveExamRecordDataCache(examRecordData.getId(), examRecordData);
 
-                    examRecordDataService.saveExamRecordDataCache(examRecordData.getId(), examRecordData);
+                        // 删除考试会话
+                        if (null != examingSession) {
+                            examingSessionService.deleteExamingSession(examRecordData.getStudentId());
+                        }
 
-                    keyValuePair.setValue(examRecordData);
-                    resultList.add(keyValuePair);
+                        outList.add(new KeyValuePair<>(key, examRecordData));
+                    } catch (Exception e) {
+                        //回滚自动交卷操作
+                        examRecordData.setExamRecordStatus(ExamRecordStatus.EXAM_ING);
+                        examRecordData.setCleanTime(null);
+                        examRecordDataService.saveExamRecordDataCache(examRecordData.getId(), examRecordData);
 
-                    // 删除redis会话
-                    examingSessionService.deleteExamingSession(examRecordData.getStudentId());
-                    return resultList;
+                        outList.clear();
+                        removable = false;
+
+                        throw new StatusException("300101", "自动交卷出现异常:" + e.getMessage());
+                    }
+
+                    return;
                 }
+
+                //如果不需要自动交卷,则需要下次轮循,继续处理
+                removable = false;
+                outList.clear();
+                return;
             }
 
-            resultList.add(keyValuePair);
-            return resultList;
+            //其它状态的数据,直接交给下一个节点处理
+            outList.add(new KeyValuePair<>(key, examRecordData));
         } finally {
             SequenceLockHelper.releaseLockSimple(sequenceLockKey);
         }
@@ -87,13 +122,13 @@ public class HandInExamExecutor implements NodeExecuter<Long, ExamRecordData, Lo
      * @return
      */
     private boolean isOverExamTime(ExamingSession examingSession) {
-    	String examingHeartbeatKey = RedisKeyHelper.getBuilder()
-				.examingHeartbeatKey(examingSession.getExamRecordDataId());
-		ExamingHeartbeat examingHeartbeat = redisClient.get(examingHeartbeatKey,
-				ExamingHeartbeat.class);
+        String examingHeartbeatKey = RedisKeyHelper.getBuilder()
+                .examingHeartbeatKey(examingSession.getExamRecordDataId());
+        ExamingHeartbeat examingHeartbeat = redisClient.get(examingHeartbeatKey,
+                ExamingHeartbeat.class);
 
-		//秒
-		long cost = null == examingHeartbeat ? 0 : examingHeartbeat.getCost();
+        //秒
+        long cost = null == examingHeartbeat ? 0 : examingHeartbeat.getCost();
         return examingSession.getExamDuration() <= cost * 1000;
     }
 
@@ -105,16 +140,16 @@ public class HandInExamExecutor implements NodeExecuter<Long, ExamRecordData, Lo
      */
     private boolean isOverBreakpointTime(ExamingSession examingSession) {
         long now = System.currentTimeMillis();
-        
-		String examingActiveTimeKey = RedisKeyHelper.getBuilder()
-				.examingActiveTimeKey(examingSession.getExamRecordDataId());
-		ExamingActivityTime examingActiveTime = redisClient.get(examingActiveTimeKey,
-				ExamingActivityTime.class);
-
-		long activeTime = null == examingActiveTime
-				? System.currentTimeMillis()
-				: examingActiveTime.getActiveTime();
-        return now - activeTime>= examingSession.getExamReconnectTime().intValue() * 60 * 1000;
+
+        String examingActiveTimeKey = RedisKeyHelper.getBuilder()
+                .examingActiveTimeKey(examingSession.getExamRecordDataId());
+        ExamingActivityTime examingActiveTime = redisClient.get(examingActiveTimeKey,
+                ExamingActivityTime.class);
+
+        long activeTime = null == examingActiveTime
+                ? System.currentTimeMillis()
+                : examingActiveTime.getActiveTime();
+        return now - activeTime >= examingSession.getExamReconnectTime().intValue() * 60 * 1000;
     }
-    
+
 }

+ 59 - 38
examcloud-core-oe-task-service/src/main/java/cn/com/qmth/examcloud/core/oe/task/service/pipeline/SyncExamDataExecutor.java

@@ -1,6 +1,5 @@
 package cn.com.qmth.examcloud.core.oe.task.service.pipeline;
 
-import cn.com.qmth.examcloud.commons.exception.StatusException;
 import cn.com.qmth.examcloud.commons.helpers.KeyValuePair;
 import cn.com.qmth.examcloud.commons.helpers.pipeline.NodeExecuter;
 import cn.com.qmth.examcloud.commons.helpers.pipeline.TaskContext;
@@ -58,8 +57,23 @@ public class SyncExamDataExecutor implements NodeExecuter<Long, ExamRecordData,
     @Autowired
     private ExamBossService examBossService;
 
+
+    /**
+     * 执行
+     *
+     * @param key
+     * @param examRecordData
+     * @param outList
+     * @param removable
+     * @param context
+     * @throws Exception
+     * @author WANGWEI
+     */
     @Override
-    public List<KeyValuePair<Long, ExamRecordData>> execute(Long key, ExamRecordData examRecordData, TaskContext context) throws Exception {
+    public void execute(Long key, ExamRecordData examRecordData,
+                        List<KeyValuePair<Long, ExamRecordData>> outList,
+                        Boolean removable, TaskContext context) throws Exception {
+
         Long studentId = examRecordData.getStudentId();
         String sequenceLockKey = Constants.EXAM_CONTROL_LOCK_PREFIX + studentId;
 
@@ -67,56 +81,63 @@ public class SyncExamDataExecutor implements NodeExecuter<Long, ExamRecordData,
             //添加考试控制全局锁
             SequenceLockHelper.getLockSimple(sequenceLockKey);
 
-            //如果已同步,直接返回
+            //如果已同步,直接交给下一节点
             if (SyncStatus.SYNCED == examRecordData.getSyncStatus()) {
-                return null;
+                outList.add(new KeyValuePair<>(key, examRecordData));
+
+                return;
             }
 
             //处理上一节点中,指定时间内仍未处理完成的数据
             Long examRecordDataId = examRecordData.getId();
-            if (examRecordData.getExamRecordStatus() == ExamRecordStatus.EXAM_HAND_IN ||
-                    examRecordData.getExamRecordStatus() == ExamRecordStatus.EXAM_AUTO_HAND_IN) {
+            if (ExamRecordStatus.EXAM_HAND_IN == examRecordData.getExamRecordStatus() ||
+                    ExamRecordStatus.EXAM_AUTO_HAND_IN == examRecordData.getExamRecordStatus()) {
                 examRecordDataService.processAfterHandInExam(examRecordDataId);
             }
 
-            List<KeyValuePair<Long, ExamRecordData>> resultList = new ArrayList<>();
-            KeyValuePair<Long, ExamRecordData> keyValuePair = new KeyValuePair<>(key, examRecordData);
-
-            //同步数据
-            SyncExamDataReq syncReq = new SyncExamDataReq();
-            syncReq.setExamRecordData(copyExamRecordDataFrom(examRecordData));
-            syncReq.setExamRecordPaperStruct(getExamRecordPaperStruct(examRecordDataId));
-            syncReq.setExamRecordQuestions(getExamRecordQuestions(examRecordDataId));
-
-            //开启人脸检测相关数据赋值
-            Long rootOrgId = examRecordData.getRootOrgId();
-            Long examId = examRecordData.getExamId();
-            if (FaceBiopsyHelper.isFaceEnable(rootOrgId, examId, studentId)) {
-                syncReq.setExamCaptures(getExamCaptures(examRecordDataId));
-                syncReq.setExamSyncCapture(getExamSyncCapture(examRecordDataId));
-
-                if (FaceBiopsyHelper.isFaceVerify(rootOrgId, examId, studentId)) {
-                    syncReq.setExamFaceLivenessVerifies(getExamFaceLivenessVerifies(examRecordDataId));
-                    syncReq.setFaceBiopsy(getFaceBiopsy(examRecordDataId));
+            //如果考试记录状态为完结状态,则开始同步数据到正式库
+            if (ExamRecordStatus.EXAM_END == examRecordData.getExamRecordStatus() ||
+                    ExamRecordStatus.EXAM_OVERDUE == examRecordData.getExamRecordStatus()) {
+
+                //同步数据
+                SyncExamDataReq syncReq = new SyncExamDataReq();
+                syncReq.setExamRecordData(copyExamRecordDataFrom(examRecordData));
+                syncReq.setExamRecordPaperStruct(getExamRecordPaperStruct(examRecordDataId));
+                syncReq.setExamRecordQuestions(getExamRecordQuestions(examRecordDataId));
+
+                //开启人脸检测相关数据赋值
+                Long rootOrgId = examRecordData.getRootOrgId();
+                Long examId = examRecordData.getExamId();
+                if (FaceBiopsyHelper.isFaceEnable(rootOrgId, examId, studentId)) {
+                    syncReq.setExamCaptures(getExamCaptures(examRecordDataId));
+                    syncReq.setExamSyncCapture(getExamSyncCapture(examRecordDataId));
+
+                    if (FaceBiopsyHelper.isFaceVerify(rootOrgId, examId, studentId)) {
+                        syncReq.setExamFaceLivenessVerifies(getExamFaceLivenessVerifies(examRecordDataId));
+                        syncReq.setFaceBiopsy(getFaceBiopsy(examRecordDataId));
+                    }
                 }
-            }
 
-            syncExamDataCloudService.syncExamData(syncReq);
+                syncExamDataCloudService.syncExamData(syncReq);
 
-            //考试完结次数加1
-            ExamBoss examBoss = examBossService.getExamBoss(examRecordData.getExamStudentId());
-            if (null != examBoss) {
-                examBoss.setEndCount(examBoss.getEndCount() + 1);
-            }
+                //考试完结次数加1
+                ExamBoss examBoss = examBossService.getExamBoss(examRecordData.getExamStudentId());
+                if (null != examBoss) {
+                    examBoss.setEndCount(examBoss.getEndCount() + 1);
+                }
 
-            //设置并保存考试记录的同步状态
-            examRecordData.setSyncStatus(SyncStatus.SYNCED);
-            setAndSaveExamRecordDataSyncStatus(examRecordDataId);
+                //设置并保存考试记录的同步状态
+                examRecordData.setSyncStatus(SyncStatus.SYNCED);
+                setAndSaveExamRecordDataSyncStatus(examRecordDataId);
+
+                outList.add(new KeyValuePair<>(key, examRecordData));
+
+                return;
+            }
 
-            keyValuePair.setValue(examRecordData);
-            resultList.add(keyValuePair);
+            //其它状态的数据,直接交给下一个节点处理
+            outList.add(new KeyValuePair<>(key, examRecordData));
 
-            return resultList;
         } finally {
             SequenceLockHelper.releaseLockSimple(sequenceLockKey);
         }