Ver código fonte

StartRunning mq整理

wangliang 4 anos atrás
pai
commit
ab09b87f4c

+ 11 - 0
themis-admin/src/main/java/com/qmth/themis/admin/config/DictionaryConfig.java

@@ -91,4 +91,15 @@ public class DictionaryConfig {
     public WxappDomain wxappDomain() {
         return new WxappDomain();
     }
+
+//    /**
+//     * mq配置
+//     *
+//     * @return
+//     */
+//    @Bean
+//    @ConfigurationProperties(prefix = "mq.config", ignoreUnknownFields = false)
+//    public MqConfigDomain mqConfigDomain() {
+//        return new MqConfigDomain();
+//    }
 }

+ 4 - 20
themis-admin/src/main/java/com/qmth/themis/admin/start/StartRunning.java

@@ -30,31 +30,15 @@ import javax.annotation.Resource;
 public class StartRunning implements CommandLineRunner {
     private final static Logger log = LoggerFactory.getLogger(StartRunning.class);
 
-    @Resource
-    RocketMessageConsumer rocketMessageConsumer;
+//    @Resource
+//    RocketMessageConsumer rocketMessageConsumer;
 
-    @Value("${rocketmq.name-server}")
-    String nameServer;
+//    @Value("${rocketmq.name-server}")
+//    String nameServer;
 
     @Override
     public void run(String... args) throws Exception {
         log.info("服务器启动时执行 start");
-        /**
-         * session
-         */
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.SESSION_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.WEB.name() + "||" + MqTagEnum.WIN.name() + "||" + MqTagEnum.MAC.name() + "||" + MqTagEnum.WXAPP.name() + "||" + MqTagEnum.IOS.name() + "||" + MqTagEnum.ANDROID.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(SessionConcurrentlyImpl.class));
-        /**
-         * userLog
-         */
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.USER_LOG_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.USER.name() + "||" + MqTagEnum.STUDENT.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(UserLogConcurrentlyImpl.class));
-        /**
-         * task
-         */
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.TASK_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_STUDENT_IMPORT.name() + "||" + MqTagEnum.ROOM_CODE_IMPORT.name() + "||" + MqTagEnum.ROOM_CODE_EXPORT.name() + "||" + MqTagEnum.EXAM_PAPER_IMPORT.name() + "||" + MqTagEnum.EXAM_STUDENT_EXPORT.name() + "||" + MqTagEnum.MARK_RESULT_SIMPLE_EXPORT.name() + "||" + MqTagEnum.MARK_RESULT_STANDARD_EXPORT.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(TaskConcurrentlyImpl.class));
-        /**
-         * log
-         */
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.LOG_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXCEPTION_LOG.name() + "||" + MqTagEnum.WARNING_LOG.name() + "||" + MqTagEnum.MONITOR_LOG.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(LogConcurrentlyImpl.class));
         SystemConstant.initTempFiles();
         log.info("服务器启动时执行 end");
     }

+ 3 - 3
themis-business/src/main/java/com/qmth/themis/business/domain/AuthNoUrlDomain.java

@@ -12,13 +12,13 @@ import java.util.List;
  */
 public class AuthNoUrlDomain implements Serializable {
 
-    List urls;
+    List<String> urls;
 
-    public List getUrls() {
+    public List<String> getUrls() {
         return urls;
     }
 
-    public void setUrls(List urls) {
+    public void setUrls(List<String> urls) {
         this.urls = urls;
     }
 }

+ 33 - 0
themis-business/src/main/java/com/qmth/themis/business/domain/MqConfigDomain.java

@@ -0,0 +1,33 @@
+package com.qmth.themis.business.domain;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * @Description: mq topic domain
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/11/13
+ */
+public class MqConfigDomain implements Serializable {
+
+//    String topic;
+    Map<String, Object> group;
+
+//    public String getTopic() {
+//        return topic;
+//    }
+//
+//    public void setTopic(String topic) {
+//        this.topic = topic;
+//    }
+
+    public Map<String, Object> getGroup() {
+        return group;
+    }
+
+    public void setGroup(Map<String, Object> group) {
+        this.group = group;
+    }
+}

+ 46 - 0
themis-business/src/main/java/com/qmth/themis/business/domain/MqGroupDomain.java

@@ -0,0 +1,46 @@
+package com.qmth.themis.business.domain;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * @Description: admin mq domain
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/11/13
+ */
+public class MqGroupDomain implements Serializable {
+
+//    String topic;
+    Map<String, Object> map;
+
+    public MqGroupDomain() {
+
+    }
+
+    public MqGroupDomain(Map<String, Object> map) {
+        this.map = map;
+    }
+
+//    public MqGroupDomain(String topic, Map<String, String> map) {
+//        this.topic = topic;
+//        this.map = map;
+//    }
+
+//    public String getTopic() {
+//        return topic;
+//    }
+//
+//    public void setTopic(String topic) {
+//        this.topic = topic;
+//    }
+
+    public Map<String, Object> getMap() {
+        return map;
+    }
+
+    public void setMap(Map<String, Object> map) {
+        this.map = map;
+    }
+}

+ 3 - 3
themis-business/src/main/java/com/qmth/themis/business/domain/SystemUrlDomain.java

@@ -12,13 +12,13 @@ import java.util.List;
  */
 public class SystemUrlDomain implements Serializable {
 
-    List urls;
+    List<String> urls;
 
-    public List getUrls() {
+    public List<String> getUrls() {
         return urls;
     }
 
-    public void setUrls(List urls) {
+    public void setUrls(List<String> urls) {
         this.urls = urls;
     }
 }

+ 4 - 4
themis-business/src/main/java/com/qmth/themis/business/domain/TencentYunDomain.java

@@ -14,23 +14,23 @@ public class TencentYunDomain implements Serializable {
 
     private String appId;
     private String key;
-    private List urls;
+    private List<String> urls;
 
     public TencentYunDomain() {
 
     }
 
-    public TencentYunDomain(String appId, String key, List urls) {
+    public TencentYunDomain(String appId, String key, List<String> urls) {
         this.appId = appId;
         this.key = key;
         this.urls = urls;
     }
 
-    public List getUrls() {
+    public List<String> getUrls() {
         return urls;
     }
 
-    public void setUrls(List urls) {
+    public void setUrls(List<String> urls) {
         this.urls = urls;
     }
 

+ 51 - 51
themis-business/src/main/java/com/qmth/themis/business/threadPool/MyThreadPool.java

@@ -1,51 +1,51 @@
-package com.qmth.themis.business.threadPool;
-
-import com.qmth.themis.business.constant.SystemConstant;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import javax.annotation.PostConstruct;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadPoolExecutor;
-
-/**
- * @Description: 线程池应用配置
- * @Param:
- * @return:
- * @Author: wangliang
- * @Date: 2019/3/21
- */
-@Configuration
-public class MyThreadPool extends ThreadPoolTaskExecutor {
-
-    public MyThreadPool arbitratePoolTaskExecutor = null;
-
-    @PostConstruct
-    public void init() {
-        arbitrateThreadPool();
-    }
-
-    /**
-     * 仲裁线程池
-     *
-     * @return
-     */
-    @Bean
-    public Executor arbitrateThreadPool() {
-        if (arbitratePoolTaskExecutor == null) {
-            arbitratePoolTaskExecutor = new MyThreadPool();
-            arbitratePoolTaskExecutor.setCorePoolSize(SystemConstant.THREAD_POOL_CORE_POOL_SIZE);//核心线程数
-            arbitratePoolTaskExecutor.setMaxPoolSize(SystemConstant.THREAD_POOL_MAX_POOL_SIZE);//最大线程数
-            arbitratePoolTaskExecutor.setKeepAliveSeconds(SystemConstant.THREAD_POOL_KEEP_ALIVE_SECONDS);//线程空闲时间
-            arbitratePoolTaskExecutor.setQueueCapacity(SystemConstant.THREAD_POOL_QUEUE_CAPACITY);//队列容量
-            arbitratePoolTaskExecutor.setThreadNamePrefix(SystemConstant.THREAD_POOL_NAME);
-
-            // rejection-policy:当pool已经达到max size的时候,如何处理新任务
-            // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
-            arbitratePoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
-            arbitratePoolTaskExecutor.initialize();
-        }
-        return arbitratePoolTaskExecutor;
-    }
-}
+//package com.qmth.themis.business.threadPool;
+//
+//import com.qmth.themis.business.constant.SystemConstant;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+//
+//import javax.annotation.PostConstruct;
+//import java.util.concurrent.Executor;
+//import java.util.concurrent.ThreadPoolExecutor;
+//
+///**
+// * @Description: 线程池应用配置
+// * @Param:
+// * @return:
+// * @Author: wangliang
+// * @Date: 2019/3/21
+// */
+//@Configuration
+//public class MyThreadPool extends ThreadPoolTaskExecutor {
+//
+//    public MyThreadPool arbitratePoolTaskExecutor = null;
+//
+//    @PostConstruct
+//    public void init() {
+//        arbitrateThreadPool();
+//    }
+//
+//    /**
+//     * 仲裁线程池
+//     *
+//     * @return
+//     */
+//    @Bean
+//    public Executor arbitrateThreadPool() {
+//        if (arbitratePoolTaskExecutor == null) {
+//            arbitratePoolTaskExecutor = new MyThreadPool();
+//            arbitratePoolTaskExecutor.setCorePoolSize(SystemConstant.THREAD_POOL_CORE_POOL_SIZE);//核心线程数
+//            arbitratePoolTaskExecutor.setMaxPoolSize(SystemConstant.THREAD_POOL_MAX_POOL_SIZE);//最大线程数
+//            arbitratePoolTaskExecutor.setKeepAliveSeconds(SystemConstant.THREAD_POOL_KEEP_ALIVE_SECONDS);//线程空闲时间
+//            arbitratePoolTaskExecutor.setQueueCapacity(SystemConstant.THREAD_POOL_QUEUE_CAPACITY);//队列容量
+//            arbitratePoolTaskExecutor.setThreadNamePrefix(SystemConstant.THREAD_POOL_NAME);
+//
+//            // rejection-policy:当pool已经达到max size的时候,如何处理新任务
+//            // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
+//            arbitratePoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+//            arbitratePoolTaskExecutor.initialize();
+//        }
+//        return arbitratePoolTaskExecutor;
+//    }
+//}

+ 55 - 0
themis-business/src/main/java/com/qmth/themis/business/util/MqUtil.java

@@ -0,0 +1,55 @@
+package com.qmth.themis.business.util;
+
+import com.qmth.themis.business.domain.MqConfigDomain;
+import com.qmth.themis.business.domain.MqGroupDomain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @Description: rocket mq util
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/11/13
+ */
+@Component
+public class MqUtil {
+    private final static Logger log = LoggerFactory.getLogger(MqUtil.class);
+
+//    private MqGroupDomain mqOeGroupDomain;
+//
+//    private MqGroupDomain mqTaskGroupDomain;
+//
+//    @Bean
+//    public MqGroupDomain mqOeGroupDomainInit(MqConfigDomain mqConfigDomain) {
+//        Map<String, Object> map = new HashMap<>();
+//        map.putAll(mqConfigDomain.getGroup());
+//        mqOeGroupDomain = new MqGroupDomain(map);
+//        return mqOeGroupDomain;
+//    }
+//
+//    @Bean
+//    public MqGroupDomain mqTaskGroupDomainInit(MqConfigDomain mqConfigDomain) {
+//        Map<String, Object> map = new HashMap<>();
+//        map.putAll(mqConfigDomain.getGroup());
+//        mqTaskGroupDomain = new MqGroupDomain(map);
+//        return mqTaskGroupDomain;
+//    }
+
+    public static Logger getLog() {
+        return log;
+    }
+
+//    public MqGroupDomain getMqOeGroupDomain() {
+//        return mqOeGroupDomain;
+//    }
+//
+//    public MqGroupDomain getMqTaskGroupDomain() {
+//        return mqTaskGroupDomain;
+//    }
+}

+ 11 - 0
themis-exam/src/main/java/com/qmth/themis/exam/config/DictionaryConfig.java

@@ -102,4 +102,15 @@ public class DictionaryConfig {
     public ClientDomain clientDomain() {
         return new ClientDomain();
     }
+
+    /**
+     * mq配置
+     *
+     * @return
+     */
+    @Bean
+    @ConfigurationProperties(prefix = "mq", ignoreUnknownFields = false)
+    public MqConfigDomain mqConfigDomain() {
+        return new MqConfigDomain();
+    }
 }

+ 0 - 39
themis-exam/src/main/java/com/qmth/themis/exam/start/StartRunning.java

@@ -38,50 +38,11 @@ public class StartRunning implements CommandLineRunner {
     @Override
     public void run(String... args) throws Exception {
         log.info("服务器启动时执行 start");
-        /**
-         * session
-         */
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.SESSION_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.WEB.name() + "||" + MqTagEnum.WIN.name() + "||" + MqTagEnum.MAC.name() + "||" + MqTagEnum.WXAPP.name() + "||" + MqTagEnum.IOS.name() + "||" + MqTagEnum.ANDROID.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(SessionConcurrentlyImpl.class));
-        /**
-         * userLog
-         */
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.USER_LOG_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.USER.name() + "||" + MqTagEnum.STUDENT.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(UserLogConcurrentlyImpl.class));
-        /**
-         * log
-         */
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.LOG_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXCEPTION_LOG.name() + "||" + MqTagEnum.WARNING_LOG.name() + "||" + MqTagEnum.MONITOR_LOG.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(LogConcurrentlyImpl.class));
         /**
          * websocket mq start
          */
         rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.WEBSOCKET_OE_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.OE_HARD_FINISH.name() + "||" + MqTagEnum.OE_IM_BROADCASTING.name() + "||" + MqTagEnum.OE_IM_CLUSTERING.name() + "||" + MqTagEnum.OE_LIVENESS_VERIFY.name() + "||" + MqTagEnum.OE_MONITOR_FINISH.name() + "||" + MqTagEnum.OE_WARNING_FINISH.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(WebSocketOeServer.class));
         rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.WEBSOCKET_OE_MOBILE_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_STOP.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(WebSocketMobileServer.class));
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.WEBSOCKET_DELAY_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.OE_UN_NORMAL.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(WebsocketUnNormalConcurrentlyImpl.class));
-        /**
-         * websocket mq end
-         */
-        //计算客观分
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.CALCULATE_OBJECTIVE_SCORE_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.CALCULATE_OBJECTIVE_SCORE.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(CalculateObjectiveScoreConcurrentlyImpl.class));
-
-        //人脸验证保存
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.FACE_VERIFY_SAVE_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.FACE_VERIFY_SAVE.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(FaceVerifyConcurrentlyImpl.class));
-
-        //活体验证保存
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.LIVENESS_VERIFY_SAVE_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.LIVENESS_VERIFY_SAVE.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(LivenessVerifyConcurrentlyImpl.class));
-
-        //考试记录数据持久化
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.EXAM_RECORD_PERSISTED_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_RECORD_PERSISTED.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamRecordPersistedConcurrentlyImpl.class));
-
-        //考试记录数据更新
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.EXAM_RECORD_UPDATE_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_RECORD_UPDATE.name() + "||" + MqTagEnum.EXAM_RECORD_UPDATE_COLUMNS.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamRecordUpdateConcurrentlyImpl.class));
-        //考试记录数据初始化
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.EXAM_RECORD_INIT_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_RECORD_INIT.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamRecordInitConcurrentlyImpl.class));
-
-        //考生数据更新
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.EXAM_STUDENT_UPDATE_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_STUDNET_UPDATE.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamStudentUpdateConcurrentlyImpl.class));
-
-        //考试断点
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.EXAM_BREAK_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_BREAK.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamBreakConcurrentlyImpl.class));
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.EXAM_BREAK_DELAY_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_BREAK_DELAY.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamBreakDelayConcurrentlyImpl.class));
         SystemConstant.initTempFiles();
         log.info("服务器启动时执行 end");
     }

+ 5 - 0
themis-exam/src/main/resources/application.properties

@@ -119,6 +119,11 @@ rocketmq.producer.enable-msg-trace=true
 #\u81EA\u5B9A\u4E49\u7684\u6D88\u606F\u8F68\u8FF9\u4E3B\u9898
 #rocketmq.producer.customized-trace-topic=my-trace-topic
 
+#mq topic\u548Cgroup\u914D\u7F6E
+#mq.config.map.topic=themis-topic-exam
+#mq.config.map.websocketOe=themis-group-exam-websocketOe
+#mq.config.map.websocketOeMobile=themis-group-exam-websocketOeMobile
+
 #\u963F\u91CC\u4E91OSS\u914D\u7F6E
 aliyun.oss.publicName=oss-cn-shenzhen.aliyuncs.com
 aliyun.oss.publicEndpoint=http://${aliyun.oss.publicName}

+ 36 - 37
themis-mq/src/main/java/com/qmth/themis/mq/service/impl/MqLogicServiceImpl.java

@@ -18,7 +18,6 @@ import com.qmth.themis.business.service.*;
 import com.qmth.themis.business.templete.TaskExportTemplete;
 import com.qmth.themis.business.templete.TaskImportTemplete;
 import com.qmth.themis.business.templete.impl.*;
-import com.qmth.themis.business.threadPool.MyThreadPool;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.business.util.UidUtil;
@@ -68,8 +67,8 @@ public class MqLogicServiceImpl implements MqLogicService {
     @Resource
     TBSessionService tbSessionService;
 
-    @Resource
-    MyThreadPool myThreadPool;
+//    @Resource
+//    MyThreadPool myThreadPool;
 
     @Resource
     TOeExamBreakHistoryService tOeExamBreakHistoryService;
@@ -206,39 +205,39 @@ public class MqLogicServiceImpl implements MqLogicService {
         Gson gson = new Gson();
         Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
         String tag = mqDto.getTag();
-        myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
-            if (tag.contains("Import".toUpperCase())) {
-                TaskImportTemplete taskImportTemplete = null;
-                if (Objects.equals(MqTagEnum.EXAM_STUDENT_IMPORT.name(), tag)) {
-                    taskImportTemplete = SpringContextHolder.getBean(TaskExamStudentImportTemplete.class);
-                } else if (Objects.equals(MqTagEnum.ROOM_CODE_IMPORT.name(), tag)) {
-                    taskImportTemplete = SpringContextHolder.getBean(TaskRoomCodeImportTemplete.class);
-                } else if (Objects.equals(MqTagEnum.EXAM_PAPER_IMPORT.name(), tag)) {
-                    taskImportTemplete = SpringContextHolder.getBean(TaskExamPaperImportTemplete.class);
-                }
-                try {
-                    taskImportTemplete.importTask(map);
-                } catch (IOException e) {
-                    log.error("请求出错", e);
-                }
-            } else {
-                TaskExportTemplete taskExportTemplete = null;
-                if (Objects.equals(MqTagEnum.ROOM_CODE_EXPORT.name(), tag)) {
-                    taskExportTemplete = SpringContextHolder.getBean(TaskRoomCodeExportTemplete.class);
-                } else if (Objects.equals(MqTagEnum.EXAM_STUDENT_EXPORT.name(), tag)) {
-                    taskExportTemplete = SpringContextHolder.getBean(TaskExamStudentExportTemplete.class);
-                } else if (Objects.equals(MqTagEnum.MARK_RESULT_SIMPLE_EXPORT.name(), tag)) {
-                    taskExportTemplete = SpringContextHolder.getBean(TaskMarkResultSimpleExportTemplete.class);
-                } else if (Objects.equals(MqTagEnum.MARK_RESULT_STANDARD_EXPORT.name(), tag)) {
-                    taskExportTemplete = SpringContextHolder.getBean(TaskMarkResultStandardExportTemplete.class);
-                }
-                try {
-                    taskExportTemplete.exportTask(map);
-                } catch (IOException e) {
-                    log.error("请求出错", e);
-                }
+//        myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
+        if (tag.contains("Import".toUpperCase())) {
+            TaskImportTemplete taskImportTemplete = null;
+            if (Objects.equals(MqTagEnum.EXAM_STUDENT_IMPORT.name(), tag)) {
+                taskImportTemplete = SpringContextHolder.getBean(TaskExamStudentImportTemplete.class);
+            } else if (Objects.equals(MqTagEnum.ROOM_CODE_IMPORT.name(), tag)) {
+                taskImportTemplete = SpringContextHolder.getBean(TaskRoomCodeImportTemplete.class);
+            } else if (Objects.equals(MqTagEnum.EXAM_PAPER_IMPORT.name(), tag)) {
+                taskImportTemplete = SpringContextHolder.getBean(TaskExamPaperImportTemplete.class);
             }
-        });
+            try {
+                taskImportTemplete.importTask(map);
+            } catch (IOException e) {
+                log.error("请求出错", e);
+            }
+        } else {
+            TaskExportTemplete taskExportTemplete = null;
+            if (Objects.equals(MqTagEnum.ROOM_CODE_EXPORT.name(), tag)) {
+                taskExportTemplete = SpringContextHolder.getBean(TaskRoomCodeExportTemplete.class);
+            } else if (Objects.equals(MqTagEnum.EXAM_STUDENT_EXPORT.name(), tag)) {
+                taskExportTemplete = SpringContextHolder.getBean(TaskExamStudentExportTemplete.class);
+            } else if (Objects.equals(MqTagEnum.MARK_RESULT_SIMPLE_EXPORT.name(), tag)) {
+                taskExportTemplete = SpringContextHolder.getBean(TaskMarkResultSimpleExportTemplete.class);
+            } else if (Objects.equals(MqTagEnum.MARK_RESULT_STANDARD_EXPORT.name(), tag)) {
+                taskExportTemplete = SpringContextHolder.getBean(TaskMarkResultStandardExportTemplete.class);
+            }
+            try {
+                taskExportTemplete.exportTask(map);
+            } catch (IOException e) {
+                log.error("请求出错", e);
+            }
+        }
+//        });
         mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
         mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
         TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
@@ -935,7 +934,7 @@ public class MqLogicServiceImpl implements MqLogicService {
      */
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
-            ConsumeConcurrentlyContext consumeConcurrentlyContext, MqExecTypeEnum mqExecTypeEnum) {
+                                                    ConsumeConcurrentlyContext consumeConcurrentlyContext, MqExecTypeEnum mqExecTypeEnum) {
         MqDto mqDto = null;
         try {
             long threadId = Thread.currentThread().getId();
@@ -984,7 +983,7 @@ public class MqLogicServiceImpl implements MqLogicService {
      */
     @Override
     public ConsumeConcurrentlyStatus consumeMessageDelay(List<MessageExt> msgs,
-            ConsumeConcurrentlyContext consumeConcurrentlyContext, MqExecTypeEnum mqExecTypeEnum) {
+                                                         ConsumeConcurrentlyContext consumeConcurrentlyContext, MqExecTypeEnum mqExecTypeEnum) {
         MqDto mqDto = null;
         try {
             long threadId = Thread.currentThread().getId();

+ 12 - 4
themis-task/src/main/java/com/qmth/themis/task/config/DictionaryConfig.java

@@ -1,9 +1,6 @@
 package com.qmth.themis.task.config;
 
-import com.qmth.themis.business.domain.AliYunOssDomain;
-import com.qmth.themis.business.domain.SysDomain;
-import com.qmth.themis.business.domain.TencentYunDomain;
-import com.qmth.themis.business.domain.WxappDomain;
+import com.qmth.themis.business.domain.*;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -61,4 +58,15 @@ public class DictionaryConfig {
     public WxappDomain wxappDomain() {
         return new WxappDomain();
     }
+
+    /**
+     * mq配置
+     *
+     * @return
+     */
+    @Bean
+    @ConfigurationProperties(prefix = "mq.config", ignoreUnknownFields = false)
+    public MqConfigDomain mqConfigDomain() {
+        return new MqConfigDomain();
+    }
 }

+ 43 - 0
themis-task/src/main/java/com/qmth/themis/task/start/StartRunning.java

@@ -60,10 +60,53 @@ public class StartRunning implements CommandLineRunner {
         quartzService.addJob(MqActivityJob.class, QuartzTaskEnum.MQ_ACTIVITY_JOB_NAME.name(), QuartzTaskEnum.MQ_ACTIVITY_JOB_GROUP_NAME.name(), "0 0 0 * * ?", mqMap);
         log.info("增加mqActivityjob end");
 
+        /**
+         * session
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.SESSION_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.WEB.name() + "||" + MqTagEnum.WIN.name() + "||" + MqTagEnum.MAC.name() + "||" + MqTagEnum.WXAPP.name() + "||" + MqTagEnum.IOS.name() + "||" + MqTagEnum.ANDROID.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(SessionConcurrentlyImpl.class));
+        /**
+         * userLog
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.USER_LOG_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.USER.name() + "||" + MqTagEnum.STUDENT.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(UserLogConcurrentlyImpl.class));
+        /**
+         * log
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.LOG_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXCEPTION_LOG.name() + "||" + MqTagEnum.WARNING_LOG.name() + "||" + MqTagEnum.MONITOR_LOG.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(LogConcurrentlyImpl.class));
+        /**
+         * task
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.TASK_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_STUDENT_IMPORT.name() + "||" + MqTagEnum.ROOM_CODE_IMPORT.name() + "||" + MqTagEnum.ROOM_CODE_EXPORT.name() + "||" + MqTagEnum.EXAM_PAPER_IMPORT.name() + "||" + MqTagEnum.EXAM_STUDENT_EXPORT.name() + "||" + MqTagEnum.MARK_RESULT_SIMPLE_EXPORT.name() + "||" + MqTagEnum.MARK_RESULT_STANDARD_EXPORT.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(TaskConcurrentlyImpl.class));
+        /**
+         * websocket mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.WEBSOCKET_DELAY_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.OE_UN_NORMAL.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(WebsocketUnNormalConcurrentlyImpl.class));
         /**
          * quartz mq start
          */
         rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.QUARTZ_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_ACTIVITY.name() + "||" + MqTagEnum.EXAM_STUDENT.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(QuartzOrderlyImpl.class));
+        //计算客观分
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.CALCULATE_OBJECTIVE_SCORE_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.CALCULATE_OBJECTIVE_SCORE.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(CalculateObjectiveScoreConcurrentlyImpl.class));
+
+        //人脸验证保存
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.FACE_VERIFY_SAVE_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.FACE_VERIFY_SAVE.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(FaceVerifyConcurrentlyImpl.class));
+
+        //活体验证保存
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.LIVENESS_VERIFY_SAVE_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.LIVENESS_VERIFY_SAVE.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(LivenessVerifyConcurrentlyImpl.class));
+
+        //考试记录数据持久化
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.EXAM_RECORD_PERSISTED_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_RECORD_PERSISTED.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamRecordPersistedConcurrentlyImpl.class));
+
+        //考试记录数据更新
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.EXAM_RECORD_UPDATE_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_RECORD_UPDATE.name() + "||" + MqTagEnum.EXAM_RECORD_UPDATE_COLUMNS.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamRecordUpdateConcurrentlyImpl.class));
+        //考试记录数据初始化
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.EXAM_RECORD_INIT_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_RECORD_INIT.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamRecordInitConcurrentlyImpl.class));
+
+        //考生数据更新
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.EXAM_STUDENT_UPDATE_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_STUDNET_UPDATE.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamStudentUpdateConcurrentlyImpl.class));
+
+        //考试断点
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.EXAM_BREAK_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_BREAK.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamBreakConcurrentlyImpl.class));
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.EXAM_BREAK_DELAY_GROUP.getCode(), MqTopicEnum.THEMIS_TOPIC.getCode(), MqTagEnum.EXAM_BREAK_DELAY.name(), MessageModel.CLUSTERING, SpringContextHolder.getBean(ExamBreakDelayConcurrentlyImpl.class));
         log.info("服务器启动时执行 end");
     }
 }

+ 18 - 0
themis-task/src/main/resources/application.properties

@@ -177,6 +177,24 @@ rocketmq.producer.enable-msg-trace=true
 #\u81EA\u5B9A\u4E49\u7684\u6D88\u606F\u8F68\u8FF9\u4E3B\u9898
 #rocketmq.producer.customized-trace-topic=my-trace-topic
 
+#mq topic\u548Cgroup\u914D\u7F6E
+mq.config.topic=themis-topic-exam
+mq.config.map.session=themis-group-exam-session
+mq.config.map.userLog=themis-group-exam-userLog
+mq.config.map.log=themis-group-exam-log
+mq.config.map.task=themis-group-exam-task
+mq.config.map.websocketDelay=themis-group-exam-websocketDelay
+mq.config.map.quartz=themis-group-exam-quartz
+mq.config.map.calculateObjectiveScore=themis-group-exam-calculateObjectiveScore
+mq.config.map.faceVerifySave=themis-group-exam-faceVerifySave
+mq.config.map.livenessVerifySave=themis-group-exam-livenessVerifySave
+mq.config.map.examRecordPersisted=themis-group-exam-examRecordPersisted
+mq.config.map.examRecordUpdate=themis-group-exam-examRecordUpdate
+mq.config.map.examRecordInit=themis-group-exam-examRecordInit
+mq.config.map.examStudentUpdate=themis-group-exam-examStudentUpdate
+mq.config.map.examBreak=themis-group-exam-examBreak
+mq.config.map.examBreakDelay=themis-group-exam-examBreakDelay
+
 #\u817E\u8BAF\u4E91\u914D\u7F6E
 tencentyun.sdk.appId=1400411036
 tencentyun.sdk.key=d78004c94473cb1cf78af33d333e18b731132e527e829e44e2ab133945243b11