Browse Source

创建pdf线程池修改

wangliang 9 months ago
parent
commit
c78f28961b

+ 1 - 1
distributed-print-business/src/main/java/com/qmth/distributed/print/business/templete/create/AsyncCreateTaskTemplete.java

@@ -23,7 +23,7 @@ public abstract class AsyncCreateTaskTemplete extends AsyncImportTaskTemplete {
     /**
      * 创建pdf
      */
-    @Async("taskThreadPool")
+//    @Async("taskThreadPool")
     public Result createPdf(TBTaskPdf tbTaskPdf, CallbackCreatePdf callbackCreatePdf) {
         return null;
     }

+ 2 - 0
teachcloud-task/src/main/java/com/qmth/teachcloud/task/job/CreatePdfTaskJob.java

@@ -26,12 +26,14 @@ public class CreatePdfTaskJob extends QuartzJobBean {
     @Override
     protected void executeInternal(JobExecutionContext jobExecutionContext) {
         if (lockService.trylock(LockType.CREATE_PDF, LockType.CREATE_PDF.name())) {
+            log.info("ThreadPoolTaskExecutor create_pdf锁已获取");
             try {
                 jobService.createPdfTask();
             } catch (Exception e) {
                 log.error(SystemConstant.LOG_ERROR, e);
             } finally {
                 lockService.unlock(LockType.CREATE_PDF, LockType.CREATE_PDF.name());
+                log.info("ThreadPoolTaskExecutor create_pdf锁已释放");
             }
         }
     }

+ 63 - 2
teachcloud-task/src/main/java/com/qmth/teachcloud/task/job/service/impl/JobServiceImpl.java

@@ -12,11 +12,14 @@ import com.qmth.teachcloud.common.bean.dto.MqDto;
 import com.qmth.teachcloud.common.bean.vo.PaperInfoVo;
 import com.qmth.teachcloud.common.contant.SystemConstant;
 import com.qmth.teachcloud.common.entity.BasicCourse;
+import com.qmth.teachcloud.common.entity.SysConfig;
+import com.qmth.teachcloud.common.enums.ExceptionResultEnum;
 import com.qmth.teachcloud.common.enums.PushTypeEnum;
 import com.qmth.teachcloud.common.enums.TaskResultEnum;
 import com.qmth.teachcloud.common.enums.TaskStatusEnum;
 import com.qmth.teachcloud.common.enums.mark.MarkPaperStatus;
 import com.qmth.teachcloud.common.service.BasicCourseService;
+import com.qmth.teachcloud.common.service.CommonCacheService;
 import com.qmth.teachcloud.common.util.DateDisposeUtils;
 import com.qmth.teachcloud.common.util.ExamTaskUtil;
 import com.qmth.teachcloud.common.util.RedisUtil;
@@ -32,14 +35,18 @@ import com.qmth.teachcloud.mark.service.MarkUserGroupService;
 import com.qmth.teachcloud.mark.utils.TaskLockUtil;
 import com.qmth.teachcloud.task.job.service.JobService;
 import com.qmth.teachcloud.task.service.PrintFinishService;
+import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -86,6 +93,9 @@ public class JobServiceImpl implements JobService {
     @Resource
     LockService lockService;
 
+    @Resource
+    CommonCacheService commonCacheService;
+
     @Override
     public void sendSmsExpireTask() {
         smsSendService.sendSmsExpireTask();
@@ -203,9 +213,60 @@ public class JobServiceImpl implements JobService {
     @Override
     public void createPdfTask() {
         List<TBTaskPdf> tbTaskPdfList = tbTaskPdfService.listWaitingTask();
-        for (TBTaskPdf tbTaskPdf : tbTaskPdfList) {
-            asyncCreatePdfTemplateService.createPdf(tbTaskPdf, null);
+        if (CollectionUtils.isNotEmpty(tbTaskPdfList)) {
+            ThreadPoolTaskExecutor threadPoolTaskExecutor = this.createThreadPool();
+            log.info("ThreadPoolTaskExecutor线程池准备开始创建PDF");
+            for (TBTaskPdf tbTaskPdf : tbTaskPdfList) {
+                threadPoolTaskExecutor.execute(new Thread(() -> asyncCreatePdfTemplateService.createPdf(tbTaskPdf, null)));
+            }
+            log.info("ThreadPoolTaskExecutor corePoolSize:{},maximumPoolSize:{},poolSize:{},queueSize:{},activeCount:{},completedTaskCount:{},taskCount:{}",
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getCorePoolSize(),
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getMaximumPoolSize(),
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getPoolSize(),
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size(),
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getActiveCount(),
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getCompletedTaskCount(),
+                    threadPoolTaskExecutor.getThreadPoolExecutor().getTaskCount());
+            threadPoolTaskExecutor.shutdown();
+            log.info("ThreadPoolTaskExecutor线程池已关闭");
+        }
+    }
+
+    /**
+     * 创建线程池
+     *
+     * @return
+     */
+    protected ThreadPoolTaskExecutor createThreadPool() {
+        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+        SysConfig sysConfigCustomThreadPoolCoreSize = commonCacheService.addSysConfigCache(SystemConstant.CUSTOM_THREAD_POOL_CORE_SIZE);
+        Optional.ofNullable(sysConfigCustomThreadPoolCoreSize).orElseThrow(() -> ExceptionResultEnum.ERROR.exception("未配置是否自定义线程池大小"));
+        boolean customThreadPoolCoreSize = Boolean.valueOf(sysConfigCustomThreadPoolCoreSize.getConfigValue());
+
+        SysConfig sysConfigThreadPoolCoreSize = commonCacheService.addSysConfigCache(SystemConstant.THREAD_POOL_CORE_SIZE);
+        Optional.ofNullable(sysConfigThreadPoolCoreSize).orElseThrow(() -> ExceptionResultEnum.ERROR.exception("未配置自定义线程池大小"));
+        Integer threadPoolCoreSize = Integer.valueOf(sysConfigThreadPoolCoreSize.getConfigValue());
+
+        final int cpuNum = Runtime.getRuntime().availableProcessors();
+        log.info("cpuNum:{}", cpuNum);
+        if (!customThreadPoolCoreSize && cpuNum > 0) {
+            threadPoolTaskExecutor.setCorePoolSize(Math.abs(cpuNum / 2));//核心线程数
+            threadPoolTaskExecutor.setMaxPoolSize(threadPoolTaskExecutor.getCorePoolSize() * 2);//最大线程数
+        } else {
+            threadPoolTaskExecutor.setCorePoolSize(threadPoolCoreSize);//核心线程数
+            threadPoolTaskExecutor.setMaxPoolSize(threadPoolCoreSize * 2);//最大线程数
         }
+        threadPoolTaskExecutor.setKeepAliveSeconds(SystemConstant.THREAD_POOL_KEEP_ALIVE_SECONDS);//线程空闲时间
+        threadPoolTaskExecutor.setQueueCapacity(SystemConstant.THREAD_POOL_QUEUE_CAPACITY);//队列容量
+        threadPoolTaskExecutor.setThreadNamePrefix(SystemConstant.THREAD_POOL_NAME);
+        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);//设置是否允许核心线程超时。若允许,核心线程超时后,会被销毁。默认为不允许(fasle)
+        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);//设置shutdown时是否等到所有任务完成再真正关闭
+        threadPoolTaskExecutor.setAwaitTerminationSeconds(60 * 60 * 24);//当setWaitForTasksToCompleteOnShutdown(true)时,setAwaitTerminationSeconds 设置在 shutdown 之后最多等待多长时间后再真正关闭线程池
+        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
+        // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
+        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        threadPoolTaskExecutor.initialize();
+        return threadPoolTaskExecutor;
     }
 
     /**