Bladeren bron

task任务加入负载均衡

wangliang 3 jaren geleden
bovenliggende
commit
d9a009bddc

+ 9 - 18
distributed-print-business/src/main/java/com/qmth/distributed/print/business/templete/execute/AsyncCreatePdfTempleteService.java

@@ -45,24 +45,16 @@ public class AsyncCreatePdfTempleteService extends AsyncCreateTaskTemplete {
      */
     @Override
     public Result createPdf(Map<String, Object> map, CallbackCreatePdf callbackCreatePdf) throws IOException {
-        TBTask tbTask = null;
-        StringJoiner stringJoinerSummary = null;
-        try {
-            tbTask = (TBTask) map.get(SystemConstant.TASK);
-            Boolean manual = (Boolean) map.get(SystemConstant.MANUAL);
-            stringJoinerSummary = new StringJoiner("\n").add(MessageFormat.format("{0}{1}{2}", DateUtil.format(new Date(), SystemConstant.DEFAULT_DATE_PATTERN), BEGIN_TITLE, OBJ_TITLE));
-            TBTaskService tbTaskService = SpringContextHolder.getBean(TBTaskService.class);
-            TBTask dbTask = tbTaskService.getById(tbTask.getId());
-            if (Objects.isNull(manual) && (Objects.nonNull(dbTask) && dbTask.getStatus() == TaskStatusEnum.FINISH && Objects.nonNull(dbTask.getResult()))) {//无需重新生成pdf
-                SystemConstant.REDIS_MQ_LOCK = false;
-                return ResultUtil.ok();
-            }
-            tbTask.setStatus(TaskStatusEnum.RUNNING);
-            tbTaskService.updateById(tbTask);
-        } catch (Exception e) {
-            log.error("请求出错", e);
-            SystemConstant.REDIS_MQ_LOCK = false;
+        TBTask tbTask = (TBTask) map.get(SystemConstant.TASK);
+        Boolean manual = (Boolean) map.get(SystemConstant.MANUAL);
+        StringJoiner stringJoinerSummary = new StringJoiner("\n").add(MessageFormat.format("{0}{1}{2}", DateUtil.format(new Date(), SystemConstant.DEFAULT_DATE_PATTERN), BEGIN_TITLE, OBJ_TITLE));
+        TBTaskService tbTaskService = SpringContextHolder.getBean(TBTaskService.class);
+        TBTask dbTask = tbTaskService.getById(tbTask.getId());
+        if (Objects.isNull(manual) && (Objects.nonNull(dbTask) && dbTask.getStatus() == TaskStatusEnum.FINISH && Objects.nonNull(dbTask.getResult()))) {//无需重新生成pdf
+            return ResultUtil.ok();
         }
+        tbTask.setStatus(TaskStatusEnum.RUNNING);
+        tbTaskService.updateById(tbTask);
         try {
             TaskLogicService taskLogicService = SpringContextHolder.getBean(TaskLogicService.class);
             taskLogicService.createPdfPrepose(map);
@@ -98,7 +90,6 @@ public class AsyncCreatePdfTempleteService extends AsyncCreateTaskTemplete {
             if (Objects.nonNull(callbackCreatePdf)) {
                 callbackCreatePdf.callback(map);
             }
-            SystemConstant.REDIS_MQ_LOCK = false;
         }
         return ResultUtil.ok(map);
     }

+ 5 - 5
distributed-print-business/src/main/java/com/qmth/distributed/print/business/templete/importData/AsyncImportTaskTemplete.java

@@ -69,13 +69,13 @@ public abstract class AsyncImportTaskTemplete {
         JSONObject jsonObject = JSONObject.parseObject(tbTask.getImportFilePath());
         String path = (String) jsonObject.get(SystemConstant.PATH);
         String type = (String) jsonObject.get(SystemConstant.TYPE);
-        UploadFileEnum uploadType = Enum.valueOf(UploadFileEnum.class,(String) jsonObject.get(SystemConstant.UPLOAD_TYPE));
+        UploadFileEnum uploadType = Enum.valueOf(UploadFileEnum.class, (String) jsonObject.get(SystemConstant.UPLOAD_TYPE));
         InputStream inputStream = null;
         if (Objects.equals(type, SystemConstant.OSS)) {
-            inputStream = fileStoreUtil.ossDownloadIs(path,uploadType.getFssType());
+            inputStream = fileStoreUtil.ossDownloadIs(path, uploadType.getFssType());
         } else {
-            StringJoiner localPath = new StringJoiner("").add(SystemConstant.TEMP_FILES_DIR).add(File.separator).add(path);
-            inputStream = new FileInputStream(new File(localPath.toString()));
+//            StringJoiner localPath = new StringJoiner("").add(SystemConstant.TEMP_FILES_DIR).add(File.separator).add(path);
+            inputStream = new FileInputStream(new File(path));
         }
         return inputStream;
     }
@@ -120,7 +120,7 @@ public abstract class AsyncImportTaskTemplete {
                 path = path.substring(0, path.lastIndexOf("/") + 1);
                 stringJoiner.add(path).add(SystemConstant.getUuid()).add(TXT_PREFIX).toString();
                 FileStoreUtil fileStoreUtil = SpringContextHolder.getBean(FileStoreUtil.class);
-                fileStoreUtil.ossUpload(stringJoiner.toString(),inputStream,DigestUtils.md5Hex(new ByteArrayInputStream(bookByteAry)),fileStoreUtil.getUploadEnumByPath(stringJoiner.toString()).getFssType());
+                fileStoreUtil.ossUpload(stringJoiner.toString(), inputStream, DigestUtils.md5Hex(new ByteArrayInputStream(bookByteAry)), fileStoreUtil.getUploadEnumByPath(stringJoiner.toString()).getFssType());
             } else {//上传至服务器
                 File finalFile = new File(stringJoiner.toString());
                 if (!finalFile.exists()) {

+ 7 - 1
teachcloud-common/src/main/java/com/qmth/teachcloud/common/contant/SystemConstant.java

@@ -133,7 +133,6 @@ public class SystemConstant {
     public static final String REDIS_LOCK_MQ_PREFIX = "redis:lock:mq:";
     public static final int REDIS_MQ_MAX_RECONSUME = 5;
     public static final String MQ_TOPIC_BUFFER_LIST = "mq:topic:buffer:list";
-    public static boolean REDIS_MQ_LOCK = false;
 
     /**
      * redis lock
@@ -201,6 +200,13 @@ public class SystemConstant {
     public static final String REDIS_LOCK_FLOW_PREFIX = "redis:lock:flow:";//流程锁
     public static final long REDIS_LOCK_FLOW_TIME_OUT = 60L * 2;
 
+    /**
+     * 机器心跳
+     */
+    public static final String CURRENT_TASK_MACHINE_ID = "current_task_machine_id";//当前机器id
+    public static final String TASK_MACHINE_ID = "task_machine_id:";//任务机器id
+    public static final String TASK_MACHINE_ID_LIKE = "task_machine_id*";//任务机器like
+
     /**
      * 初始化附件文件路径
      */

+ 2 - 1
teachcloud-common/src/main/java/com/qmth/teachcloud/common/util/RedisUtil.java

@@ -8,6 +8,7 @@ import org.springframework.stereotype.Component;
 import javax.annotation.Resource;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -81,7 +82,7 @@ public class RedisUtil {
      * @return
      */
     public Set<?> getKeyPatterns(String key) {
-        if (null != key) {
+        if (Objects.nonNull(key)) {
             return redisTemplate.keys(key);
         } else {
             return null;

+ 14 - 8
teachcloud-task/src/main/java/com/qmth/teachcloud/task/config/RedisMessageListener.java

@@ -3,6 +3,7 @@ package com.qmth.teachcloud.task.config;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.google.gson.Gson;
+import com.qmth.boot.redis.uid.RedisMachineService;
 import com.qmth.distributed.print.business.templete.execute.AsyncCreatePdfTempleteService;
 import com.qmth.teachcloud.common.bean.dto.MqDto;
 import com.qmth.teachcloud.common.contant.SystemConstant;
@@ -22,9 +23,7 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
 import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -47,11 +46,19 @@ public class RedisMessageListener implements MessageListener {
     @Resource
     AsyncCreatePdfTempleteService asyncCreatePdfTempleteService;
 
+    @Resource
+    RedisMachineService redisMachineService;
+
     @Override
     public void onMessage(Message message, byte[] bytes) {
-//        log.info("SystemConstant.REDIS_MQ_LOCK start:{}", SystemConstant.REDIS_MQ_LOCK);
-        if (SystemConstant.REDIS_MQ_LOCK) {
-            return;
+        if (Objects.nonNull(redisUtil.get(SystemConstant.CURRENT_TASK_MACHINE_ID))) {
+            Set<Integer> set = (Set<Integer>) redisUtil.getKeyPatterns(SystemConstant.TASK_MACHINE_ID_LIKE);
+            if (Objects.nonNull(set) && set.size() > 1) {
+                int currentMachineId = (int) redisUtil.get(SystemConstant.CURRENT_TASK_MACHINE_ID);
+                if (currentMachineId == redisMachineService.getMachineId()) {
+                    return;
+                }
+            }
         }
         MqDto mqDto = null;
         AtomicInteger integer = new AtomicInteger(0);
@@ -82,7 +89,7 @@ public class RedisMessageListener implements MessageListener {
                             finalMap.computeIfAbsent(k, v1 -> finalV);
                         });
                         asyncCreatePdfTempleteService.createPdf(finalMap, null);
-                        SystemConstant.REDIS_MQ_LOCK = true;
+                        redisUtil.set(SystemConstant.CURRENT_TASK_MACHINE_ID, redisMachineService.getMachineId());
                         redisUtil.delete(SystemConstant.MQ_TOPIC_BUFFER_LIST, mqDto.getId());
                         break;
                     } else {
@@ -109,7 +116,6 @@ public class RedisMessageListener implements MessageListener {
                     || mqDto.getAck().intValue() == SystemConstant.POSION_ACK_TYPE)) {
                 redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
             }
-//            log.info("SystemConstant.REDIS_MQ_LOCK end:{}", SystemConstant.REDIS_MQ_LOCK);
         }
     }
 }

+ 5 - 0
teachcloud-task/src/main/java/com/qmth/teachcloud/task/job/service/JobService.java

@@ -58,4 +58,9 @@ public interface JobService {
      * 同步数据到云阅卷
      */
     void syncData();
+
+    /**
+     * 机器心跳
+     */
+    void machineHeart();
 }

+ 11 - 0
teachcloud-task/src/main/java/com/qmth/teachcloud/task/job/service/impl/JobServiceImpl.java

@@ -1,11 +1,13 @@
 package com.qmth.teachcloud.task.job.service.impl;
 
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.qmth.boot.redis.uid.RedisMachineService;
 import com.qmth.distributed.print.business.entity.ExamDetail;
 import com.qmth.distributed.print.business.entity.ExamPrintPlan;
 import com.qmth.distributed.print.business.enums.PrintPlanStatusEnum;
 import com.qmth.distributed.print.business.service.*;
 import com.qmth.teachcloud.common.bean.dto.MqDto;
+import com.qmth.teachcloud.common.contant.SystemConstant;
 import com.qmth.teachcloud.common.entity.SysConfig;
 import com.qmth.teachcloud.common.service.OrgCenterDataDisposeService;
 import com.qmth.teachcloud.common.service.SysConfigService;
@@ -22,6 +24,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @Description: job service impl
@@ -61,6 +64,9 @@ public class JobServiceImpl implements JobService {
     @Resource
     RedisUtil redisUtil;
 
+    @Resource
+    RedisMachineService redisMachineService;
+
     @Override
     public void updateSchoolInfo() throws IOException {
         orgCenterDataDisposeService.updateSchoolInfo();
@@ -128,6 +134,11 @@ public class JobServiceImpl implements JobService {
         dataSyncService.syncToCloudReview();
     }
 
+    @Override
+    public void machineHeart() {
+        redisUtil.set(SystemConstant.TASK_MACHINE_ID + redisMachineService.getMachineId(), redisMachineService.getMachineId(), 30, TimeUnit.SECONDS);
+    }
+
     /**
      * 组装job
      *

+ 39 - 0
teachcloud-task/src/main/java/com/qmth/teachcloud/task/quartz/ScheduledTask.java

@@ -0,0 +1,39 @@
+package com.qmth.teachcloud.task.quartz;
+
+import com.qmth.teachcloud.task.job.service.JobService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * @Description: 定时任务
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2021/9/1
+ */
+@Component
+public class ScheduledTask implements InitializingBean {
+    private final static Logger log = LoggerFactory.getLogger(ScheduledTask.class);
+
+    @Resource
+    JobService jobService;
+
+    /**
+     * 机器心跳
+     */
+    @Scheduled(cron = "0/15 * * * * ?")
+    public void machineHeart() {
+        log.info("machineHeart is come in");
+        jobService.machineHeart();
+    }
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        this.machineHeart();
+    }
+}

+ 8 - 0
teachcloud-task/src/main/java/com/qmth/teachcloud/task/start/StartRunning.java

@@ -1,9 +1,12 @@
 package com.qmth.teachcloud.task.start;
 
+import com.qmth.boot.redis.uid.RedisMachineService;
 import com.qmth.teachcloud.common.contant.SystemConstant;
 import com.qmth.teachcloud.common.service.OrgCenterDataDisposeService;
+import com.qmth.teachcloud.common.util.RedisUtil;
 import com.qmth.teachcloud.task.enums.JobEnum;
 import com.qmth.teachcloud.task.job.*;
+import com.qmth.teachcloud.task.job.service.JobService;
 import com.qmth.teachcloud.task.service.QuartzService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -13,6 +16,7 @@ import org.springframework.stereotype.Component;
 import javax.annotation.Resource;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @Description: 服务启动时初始化运行,哪个微服务模块需要则拿此模版去用
@@ -28,10 +32,14 @@ public class StartRunning implements CommandLineRunner {
     @Resource
     QuartzService quartzService;
 
+    @Resource
+    JobService jobService;
+
     @Override
     public void run(String... args) throws Exception {
         log.info("服务器启动时执行 start");
         SystemConstant.initTempFiles();
+        jobService.machineHeart();
 
         log.info("增加学校信息同步定时任务 start");
         Map schoolJobMap = new HashMap();