Sfoglia il codice sorgente

创建pdf线程池修改

wangliang 9 mesi fa
parent
commit
6d93ea47ae

+ 1 - 1
distributed-print-business/src/main/java/com/qmth/distributed/print/business/templete/create/AsyncCreateTaskTemplete.java

@@ -23,7 +23,7 @@ public abstract class AsyncCreateTaskTemplete extends AsyncImportTaskTemplete {
     /**
      * 创建pdf
      */
-    @Async("taskThreadPool")
+//    @Async("taskThreadPool")
     public Result createPdf(TBTaskPdf tbTaskPdf, CallbackCreatePdf callbackCreatePdf) {
         return null;
     }

+ 1 - 0
distributed-print-business/src/main/java/com/qmth/distributed/print/business/templete/execute/AsyncCreatePdfTemplateService.java

@@ -60,6 +60,7 @@ public class AsyncCreatePdfTemplateService extends AsyncCreateTaskTemplete {
         tbTaskPdfService.updateById(tbTaskPdf);
         List<ExamDetailCourse> examDetailCourseList = null;
         try {
+            Thread.sleep(1000 * 60 * 15);
             PdfTaskLogicService pdfTaskLogicService = SpringContextHolder.getBean(PdfTaskLogicService.class);
             examDetailCourseList = pdfTaskLogicService.executeCreatePdfLogic(tbTaskPdf, stringJoinerSummary);
             tbTaskPdf.setResult(TaskResultEnum.SUCCESS);

+ 2 - 0
teachcloud-task/src/main/java/com/qmth/teachcloud/task/job/CreatePdfTaskJob.java

@@ -26,12 +26,14 @@ public class CreatePdfTaskJob extends QuartzJobBean {
     @Override
     protected void executeInternal(JobExecutionContext jobExecutionContext) {
         if (lockService.trylock(LockType.CREATE_PDF, LockType.CREATE_PDF.name())) {
+            log.info("ThreadPoolTaskExecutor create_pdf锁已获取");
             try {
                 jobService.createPdfTask();
             } catch (Exception e) {
                 log.error(SystemConstant.LOG_ERROR, e);
             } finally {
                 lockService.unlock(LockType.CREATE_PDF, LockType.CREATE_PDF.name());
+                log.info("ThreadPoolTaskExecutor create_pdf锁已释放");
             }
         }
     }

+ 2 - 1
teachcloud-task/src/main/java/com/qmth/teachcloud/task/job/service/JobService.java

@@ -1,6 +1,7 @@
 package com.qmth.teachcloud.task.job.service;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
 
 /**
  * @Description: job service
@@ -52,5 +53,5 @@ public interface JobService {
 
     void clearTimeoutTask();
 
-    void createPdfTask();
+    void createPdfTask() throws ExecutionException, InterruptedException;
 }

+ 65 - 3
teachcloud-task/src/main/java/com/qmth/teachcloud/task/job/service/impl/JobServiceImpl.java

@@ -12,11 +12,14 @@ import com.qmth.teachcloud.common.bean.dto.MqDto;
 import com.qmth.teachcloud.common.bean.vo.PaperInfoVo;
 import com.qmth.teachcloud.common.contant.SystemConstant;
 import com.qmth.teachcloud.common.entity.BasicCourse;
+import com.qmth.teachcloud.common.entity.SysConfig;
+import com.qmth.teachcloud.common.enums.ExceptionResultEnum;
 import com.qmth.teachcloud.common.enums.PushTypeEnum;
 import com.qmth.teachcloud.common.enums.TaskResultEnum;
 import com.qmth.teachcloud.common.enums.TaskStatusEnum;
 import com.qmth.teachcloud.common.enums.mark.MarkPaperStatus;
 import com.qmth.teachcloud.common.service.BasicCourseService;
+import com.qmth.teachcloud.common.service.CommonCacheService;
 import com.qmth.teachcloud.common.util.DateDisposeUtils;
 import com.qmth.teachcloud.common.util.ExamTaskUtil;
 import com.qmth.teachcloud.common.util.RedisUtil;
@@ -32,14 +35,19 @@ import com.qmth.teachcloud.mark.service.MarkUserGroupService;
 import com.qmth.teachcloud.mark.utils.TaskLockUtil;
 import com.qmth.teachcloud.task.job.service.JobService;
 import com.qmth.teachcloud.task.service.PrintFinishService;
+import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -200,11 +208,28 @@ public class JobServiceImpl implements JobService {
         }
     }
 
+    @Resource
+    CommonCacheService commonCacheService;
+
     @Override
-    public void createPdfTask() {
+    public void createPdfTask() throws ExecutionException, InterruptedException {
         List<TBTaskPdf> tbTaskPdfList = tbTaskPdfService.listWaitingTask();
-        for (TBTaskPdf tbTaskPdf : tbTaskPdfList) {
-            asyncCreatePdfTemplateService.createPdf(tbTaskPdf, null);
+        if (CollectionUtils.isNotEmpty(tbTaskPdfList)) {
+            ThreadPoolTaskExecutor threadPoolTaskExecutor = this.createThreadPool();
+            log.info("ThreadPoolTaskExecutor线程池准备开始创建PDF");
+            for (TBTaskPdf tbTaskPdf : tbTaskPdfList) {
+                threadPoolTaskExecutor.execute(new Thread(() -> asyncCreatePdfTemplateService.createPdf(tbTaskPdf, null)));
+            }
+            log.info("ThreadPoolTaskExecutor corePoolSize:{},maximumPoolSize:{},poolSize:{},queueSize:{},activeCount:{},completedTaskCount:{},taskCount:{}",
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getCorePoolSize(),
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getMaximumPoolSize(),
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getPoolSize(),
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size(),
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getActiveCount(),
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getCompletedTaskCount(),
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getTaskCount());
+            threadPoolTaskExecutor.shutdown();
+            log.info("ThreadPoolTaskExecutor线程池已关闭");
         }
     }
 
@@ -225,4 +250,41 @@ public class JobServiceImpl implements JobService {
             });
         }
     }
+
+    /**
+     * 创建线程池
+     *
+     * @return
+     */
+    protected ThreadPoolTaskExecutor createThreadPool() {
+        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+        SysConfig sysConfigCustomThreadPoolCoreSize = commonCacheService.addSysConfigCache(SystemConstant.CUSTOM_THREAD_POOL_CORE_SIZE);
+        Optional.ofNullable(sysConfigCustomThreadPoolCoreSize).orElseThrow(() -> ExceptionResultEnum.ERROR.exception("未配置是否自定义线程池大小"));
+        boolean customThreadPoolCoreSize = Boolean.valueOf(sysConfigCustomThreadPoolCoreSize.getConfigValue());
+
+        SysConfig sysConfigThreadPoolCoreSize = commonCacheService.addSysConfigCache(SystemConstant.THREAD_POOL_CORE_SIZE);
+        Optional.ofNullable(sysConfigThreadPoolCoreSize).orElseThrow(() -> ExceptionResultEnum.ERROR.exception("未配置自定义线程池大小"));
+        Integer threadPoolCoreSize = Integer.valueOf(sysConfigThreadPoolCoreSize.getConfigValue());
+
+        final int cpuNum = Runtime.getRuntime().availableProcessors();
+        log.info("cpuNum:{}", cpuNum);
+        if (!customThreadPoolCoreSize && cpuNum > 0) {
+            threadPoolTaskExecutor.setCorePoolSize(Math.abs(cpuNum / 2));//核心线程数
+            threadPoolTaskExecutor.setMaxPoolSize(threadPoolTaskExecutor.getCorePoolSize() * 2);//最大线程数
+        } else {
+            threadPoolTaskExecutor.setCorePoolSize(threadPoolCoreSize);//核心线程数
+            threadPoolTaskExecutor.setMaxPoolSize(threadPoolCoreSize * 2);//最大线程数
+        }
+        threadPoolTaskExecutor.setKeepAliveSeconds(SystemConstant.THREAD_POOL_KEEP_ALIVE_SECONDS);//线程空闲时间
+        threadPoolTaskExecutor.setQueueCapacity(SystemConstant.THREAD_POOL_QUEUE_CAPACITY);//队列容量
+        threadPoolTaskExecutor.setThreadNamePrefix(SystemConstant.THREAD_POOL_NAME);
+        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);//设置是否允许核心线程超时。若允许,核心线程超时后,会被销毁。默认为不允许(fasle)
+        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);//设置shutdown时是否等到所有任务完成再真正关闭
+        threadPoolTaskExecutor.setAwaitTerminationSeconds(60 * 60 * 24);//当setWaitForTasksToCompleteOnShutdown(true)时,setAwaitTerminationSeconds 设置在 shutdown 之后最多等待多长时间后再真正关闭线程池
+        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
+        // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
+        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        threadPoolTaskExecutor.initialize();
+        return threadPoolTaskExecutor;
+    }
 }