Browse Source

update export task

deason 5 months ago
parent
commit
c83fe8ea17

+ 4 - 0
examcloud-core-oe-admin-dao/src/main/java/cn/com/qmth/examcloud/core/oe/admin/dao/ExportTaskRepo.java

@@ -6,6 +6,7 @@
 package cn.com.qmth.examcloud.core.oe.admin.dao;
 
 import java.util.Date;
+import java.util.List;
 
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
@@ -32,6 +33,9 @@ public interface ExportTaskRepo
 	@Query(nativeQuery = true, value = "select * from ec_oe_export_task t where t.status='WAITING' ORDER BY t.id limit 1")
 	public ExportTaskEntity findExportTaskToDispose();
 
+	@Query(nativeQuery = true, value = "select * from ec_oe_export_task t where t.status='WAITING' ORDER BY t.id limit 5")
+	public List<ExportTaskEntity> findExportTaskToDisposes();
+
 	@Transactional
 	@Modifying
 	@Query("update ExportTaskEntity set status=:status, startTime=:startTime where id=:id")

+ 7 - 3
examcloud-core-oe-admin-service/src/main/java/cn/com/qmth/examcloud/core/oe/admin/service/ExportTaskService.java

@@ -13,6 +13,8 @@ import cn.com.qmth.examcloud.core.oe.admin.service.bean.exporttask.ExportTaskLis
 import cn.com.qmth.examcloud.core.oe.admin.service.bean.exporttask.ExportTaskListResp;
 import org.springframework.data.domain.Page;
 
+import java.util.List;
+
 /**
  * 导出任务相关接口
  */
@@ -32,16 +34,18 @@ public interface ExportTaskService {
 
     ExportTaskEntity findExportTaskToDispose();
 
+    List<ExportTaskEntity> findExportTaskToDisposes();
+
     void updateExportTaskStatus(Long taskId, ExportTaskStatus status);
 
     void startExportTask(Long taskId);
 
     void endExportTask(Long taskId);
-    
+
     ExportTaskEntity findById(Long taskId);
 
-	void stopExportTaskById(Long taskId);
+    void stopExportTaskById(Long taskId);
 
-	void checkStopExportTaskById(Long taskId);
+    void checkStopExportTaskById(Long taskId);
 
 }

+ 34 - 24
examcloud-core-oe-admin-service/src/main/java/cn/com/qmth/examcloud/core/oe/admin/service/bean/exporttask/ExportTask.java

@@ -3,6 +3,7 @@ package cn.com.qmth.examcloud.core.oe.admin.service.bean.exporttask;
 import cn.com.qmth.examcloud.commons.util.JsonMapper;
 import cn.com.qmth.examcloud.commons.util.Util;
 import cn.com.qmth.examcloud.core.oe.admin.dao.entity.ExportTaskEntity;
+import cn.com.qmth.examcloud.core.oe.admin.dao.enums.ExportTaskStatus;
 import cn.com.qmth.examcloud.core.oe.admin.dao.enums.ExportTaskType;
 import cn.com.qmth.examcloud.core.oe.admin.service.AsyncExportService;
 import cn.com.qmth.examcloud.core.oe.admin.service.ExportTaskService;
@@ -13,9 +14,12 @@ import cn.com.qmth.examcloud.core.oe.admin.service.bean.examstudent.ExamStudentQ
 import cn.com.qmth.examcloud.support.CacheConstants;
 import cn.com.qmth.examcloud.web.redis.RedisClient;
 import cn.com.qmth.examcloud.web.support.SpringContextHolder;
+import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 public class ExportTask extends Thread {
 
     private static final Logger log = LoggerFactory.getLogger(ExportTask.class);
@@ -32,38 +36,44 @@ public class ExportTask extends Thread {
     public void run() {
         for (; ; ) {
             try {
-                ExportTaskEntity et = exportTaskService.findExportTaskToDispose();
-                if (et == null) {
+                List<ExportTaskEntity> tasks = exportTaskService.findExportTaskToDisposes();
+                if (CollectionUtils.isEmpty(tasks)) {
                     log.info("ExportTask sleep 30s");
                     Util.sleep(30);
                     continue;
                 }
 
-                String cacheLock = CacheConstants.LOCK_OE_EXPORT_TASK + et.getId();
-                Boolean lock = redisClient.setIfAbsent(cacheLock, cacheLock, cacheLockTimeout);
-                if (!lock) {
-                    log.warn("ExportTask lock... " + cacheLock);
-                    Util.sleep(10);
-                    continue;
-                }
+                for (ExportTaskEntity task : tasks) {
+                    String cacheLock = CacheConstants.LOCK_OE_EXPORT_TASK + task.getId();
 
-                try {
-                    exportTaskService.startExportTask(et.getId());
-                    if (ExportTaskType.EXAM_DETAIL.equals(et.getType())) {
-                        examRecordDetails(et);
-                    } else if (ExportTaskType.SCORE_STATISTIC.equals(et.getType())) {
-                        examScores(et);
-                    } else if (ExportTaskType.EXAM_SCHEDULING.equals(et.getType())) {
-                        examScheduling(et);
-                    } else if (ExportTaskType.AUDIT.equals(et.getType())) {
-                        examAudit(et);
+                    Boolean lock = redisClient.setIfAbsent(cacheLock, cacheLock, cacheLockTimeout);
+                    if (!lock) {
+                        continue;
                     }
-                    exportTaskService.endExportTask(et.getId());
-                } finally {
-                    redisClient.delete(cacheLock);
-                }
 
-                log.info("ExportTask end, taskId = {}", et.getId());
+                    ExportTaskEntity et = exportTaskService.findById(task.getId());
+                    if (ExportTaskStatus.WAITING != et.getStatus()) {
+                        continue;
+                    }
+
+                    try {
+                        exportTaskService.startExportTask(et.getId());
+                        if (ExportTaskType.EXAM_DETAIL.equals(et.getType())) {
+                            examRecordDetails(et);
+                        } else if (ExportTaskType.SCORE_STATISTIC.equals(et.getType())) {
+                            examScores(et);
+                        } else if (ExportTaskType.EXAM_SCHEDULING.equals(et.getType())) {
+                            examScheduling(et);
+                        } else if (ExportTaskType.AUDIT.equals(et.getType())) {
+                            examAudit(et);
+                        }
+                        exportTaskService.endExportTask(et.getId());
+                    } finally {
+                        redisClient.delete(cacheLock);
+                    }
+
+                    log.info("ExportTask end, taskId = {}", et.getId());
+                }
             } catch (Exception e) {
                 log.error("ExportTask error " + e.getMessage(), e);
                 Util.sleep(30);

+ 58 - 63
examcloud-core-oe-admin-service/src/main/java/cn/com/qmth/examcloud/core/oe/admin/service/impl/ExportTaskServiceImpl.java

@@ -5,30 +5,6 @@
 
 package cn.com.qmth.examcloud.core.oe.admin.service.impl;
 
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import javax.persistence.criteria.Predicate;
-
-import cn.com.qmth.examcloud.support.fss.FssHelper;
-import org.apache.commons.collections.CollectionUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.domain.Page;
-import org.springframework.data.domain.PageImpl;
-import org.springframework.data.domain.PageRequest;
-import org.springframework.data.domain.Pageable;
-import org.springframework.data.domain.Sort;
-import org.springframework.data.jpa.domain.Specification;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
 import cn.com.qmth.examcloud.commons.exception.StatusException;
 import cn.com.qmth.examcloud.core.basic.api.UserCloudService;
 import cn.com.qmth.examcloud.core.basic.api.request.GetUserReq;
@@ -48,17 +24,31 @@ import cn.com.qmth.examcloud.examwork.api.bean.ExamBean;
 import cn.com.qmth.examcloud.examwork.api.request.GetExamMapsReq;
 import cn.com.qmth.examcloud.examwork.api.response.GetExamMapsResp;
 import cn.com.qmth.examcloud.support.CacheConstants;
+import cn.com.qmth.examcloud.support.fss.FssHelper;
 import cn.com.qmth.examcloud.web.helpers.GlobalHelper;
 import cn.com.qmth.examcloud.web.redis.RedisClient;
+import org.apache.commons.collections.CollectionUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.*;
+import org.springframework.data.jpa.domain.Specification;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.persistence.criteria.Predicate;
+import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * 导出任务相关接口
  */
 @Service
 public class ExportTaskServiceImpl implements ExportTaskService {
-	private static int cacheTimeOut = 60 * 60;
-	@Autowired
-	private RedisClient redisClient;
+
+    private static int cacheTimeOut = 60 * 60;
+
+    @Autowired
+    private RedisClient redisClient;
+
     @Autowired
     private ExportTaskRepo exportTaskRepo;
 
@@ -68,12 +58,16 @@ public class ExportTaskServiceImpl implements ExportTaskService {
     @Autowired
     private UserCloudService userCloudService;
 
-
     @Override
     public ExportTaskEntity findExportTaskToDispose() {
         return exportTaskRepo.findExportTaskToDispose();
     }
 
+    @Override
+    public List<ExportTaskEntity> findExportTaskToDisposes() {
+        return exportTaskRepo.findExportTaskToDisposes();
+    }
+
     @Override
     @Transactional
     public Long addExportTask(ExportTaskInfo info) {
@@ -89,8 +83,8 @@ public class ExportTaskServiceImpl implements ExportTaskService {
         entity.setExamId(info.getExamId());
         entity.setFilePath(info.getFilePath());
         entity.setCreator(info.getCreator());
-        if(info.getJsonParams()!=null&&info.getJsonParams().length()>980) {
-        	throw new StatusException("输入的导出条件过长!");
+        if (info.getJsonParams() != null && info.getJsonParams().length() > 980) {
+            throw new StatusException("输入的导出条件过长!");
         }
         entity.setExportParam(info.getJsonParams());
         exportTaskRepo.save(entity);
@@ -232,7 +226,8 @@ public class ExportTaskServiceImpl implements ExportTaskService {
             return new PageImpl<>(new ArrayList<>(), pageable, 0);
         }
 
-        Set<Long> examIds = page.getContent().stream().filter(e -> e.getExamId() != null).map(e -> e.getExamId()).collect(Collectors.toSet());
+        Set<Long> examIds = page.getContent().stream().filter(e -> e.getExamId() != null).map(e -> e.getExamId())
+                .collect(Collectors.toSet());
         Map<Long, ExamBean> examMaps;
         if (CollectionUtils.isNotEmpty(examIds)) {
             GetExamMapsReq examsReq = new GetExamMapsReq();
@@ -249,8 +244,8 @@ public class ExportTaskServiceImpl implements ExportTaskService {
         GetUserResp namesResp = userCloudService.getUser(reqUser);
         String createName = namesResp.getUserBean().getName();
 
-        List<ExportTaskListResp> list = page.getContent()
-                .stream().map(entity -> ofExportTask(entity, examMaps, createName)).collect(Collectors.toList());
+        List<ExportTaskListResp> list = page.getContent().stream()
+                .map(entity -> ofExportTask(entity, examMaps, createName)).collect(Collectors.toList());
         return new PageImpl<>(list, pageable, page.getTotalElements());
     }
 
@@ -280,35 +275,35 @@ public class ExportTaskServiceImpl implements ExportTaskService {
         return info;
     }
 
-	@Override
-	public ExportTaskEntity findById(Long taskId) {
-		return GlobalHelper.getEntity(exportTaskRepo, taskId, ExportTaskEntity.class);
-	}
-
-	@Transactional
-	@Override
-	public void stopExportTaskById(Long taskId) {
-		ExportTaskEntity e=findById(taskId);
-		if(e==null) {
-			throw new StatusException("未找到任务");
-		}
-		if(!ExportTaskStatus.EXPORTING.equals(e.getStatus())) {
-			throw new StatusException("只能终止导出中的任务");
-		}
-		e.setStatus(ExportTaskStatus.TERMINATING);
-		exportTaskRepo.save(e);
-		String key = CacheConstants.CACHE_OE_EXPORT_TASK_STOP + e.getId();
-		redisClient.set(key, key, cacheTimeOut);
-	}
-	
-	@Override
-	public void checkStopExportTaskById(Long taskId) {
-		String key = CacheConstants.CACHE_OE_EXPORT_TASK_STOP + taskId;
-		String ob=redisClient.get(key,String.class);
-		if(ob!=null) {
-			redisClient.delete(key);
-			throw new ExportTaskStopException();
-		}
-	}
+    @Override
+    public ExportTaskEntity findById(Long taskId) {
+        return GlobalHelper.getEntity(exportTaskRepo, taskId, ExportTaskEntity.class);
+    }
+
+    @Transactional
+    @Override
+    public void stopExportTaskById(Long taskId) {
+        ExportTaskEntity e = findById(taskId);
+        if (e == null) {
+            throw new StatusException("未找到任务");
+        }
+        if (!ExportTaskStatus.EXPORTING.equals(e.getStatus())) {
+            throw new StatusException("只能终止导出中的任务");
+        }
+        e.setStatus(ExportTaskStatus.TERMINATING);
+        exportTaskRepo.save(e);
+        String key = CacheConstants.CACHE_OE_EXPORT_TASK_STOP + e.getId();
+        redisClient.set(key, key, cacheTimeOut);
+    }
+
+    @Override
+    public void checkStopExportTaskById(Long taskId) {
+        String key = CacheConstants.CACHE_OE_EXPORT_TASK_STOP + taskId;
+        String ob = redisClient.get(key, String.class);
+        if (ob != null) {
+            redisClient.delete(key);
+            throw new ExportTaskStopException();
+        }
+    }
 
 }