wangliang 4 лет назад
Родитель
Сommit
5cd349635f
35 измененных файлов с 638 добавлено и 1339 удалено
  1. 5 45
      themis-backend/src/main/java/com/qmth/themis/backend/ThemisBackendApplication.java
  2. 4 5
      themis-backend/src/main/java/com/qmth/themis/backend/api/TBExamInvigilateUserController.java
  3. 12 9
      themis-backend/src/main/java/com/qmth/themis/backend/api/TBUserController.java
  4. 7 5
      themis-backend/src/main/java/com/qmth/themis/backend/api/TEExamActivityController.java
  5. 5 7
      themis-backend/src/main/java/com/qmth/themis/backend/api/TEExamController.java
  6. 3 4
      themis-backend/src/main/java/com/qmth/themis/backend/api/TEExamPaperController.java
  7. 3 4
      themis-backend/src/main/java/com/qmth/themis/backend/api/TEExamStudentController.java
  8. 0 11
      themis-backend/src/main/java/com/qmth/themis/backend/config/DictionaryConfig.java
  9. 47 1
      themis-backend/src/main/java/com/qmth/themis/backend/start/StartRunning.java
  10. 3 2
      themis-backend/src/main/java/com/qmth/themis/backend/websocket/WebSocketAdminServer.java
  11. 1 64
      themis-backend/src/main/resources/application.properties
  12. 33 0
      themis-business/src/main/java/com/qmth/themis/business/config/MultipartConfig.java
  13. 32 11
      themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java
  14. 0 351
      themis-business/src/main/java/com/qmth/themis/business/domain/MqConfigDomain.java
  15. 6 1
      themis-common/src/main/java/com/qmth/themis/common/enums/Platform.java
  16. 1 1
      themis-common/src/main/java/com/qmth/themis/common/enums/Source.java
  17. 0 25
      themis-exam/src/main/java/com/qmth/themis/exam/ThemisExamApplication.java
  18. 14 8
      themis-exam/src/main/java/com/qmth/themis/exam/api/TEStudentController.java
  19. 0 11
      themis-exam/src/main/java/com/qmth/themis/exam/config/DictionaryConfig.java
  20. 84 0
      themis-exam/src/main/java/com/qmth/themis/exam/start/StartRunning.java
  21. 3 4
      themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java
  22. 1 64
      themis-exam/src/main/resources/application.properties
  23. 63 3
      themis-mq/src/main/java/com/qmth/themis/mq/enums/MqGroupEnum.java
  24. 14 1
      themis-mq/src/main/java/com/qmth/themis/mq/enums/MqTagEnum.java
  25. 21 1
      themis-mq/src/main/java/com/qmth/themis/mq/enums/MqTopicEnum.java
  26. 0 240
      themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketSessionConsumer.java
  27. 0 242
      themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketTaskConsumer.java
  28. 64 2
      themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/SessionConcurrentlyImpl.java
  29. 124 0
      themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/TaskConcurrentlyImpl.java
  30. 15 61
      themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/UserLogConcurrentlyImpl.java
  31. 12 65
      themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/WebsocketUnNormalConcurrentlyImpl.java
  32. 0 15
      themis-task/src/main/java/com/qmth/themis/task/ThemisTaskApplication.java
  33. 3 12
      themis-task/src/main/java/com/qmth/themis/task/config/DictionaryConfig.java
  34. 57 0
      themis-task/src/main/java/com/qmth/themis/task/start/StartRunning.java
  35. 1 64
      themis-task/src/main/resources/application.properties

+ 5 - 45
themis-backend/src/main/java/com/qmth/themis/backend/ThemisBackendApplication.java

@@ -1,68 +1,28 @@
 package com.qmth.themis.backend;
 
-import java.io.File;
-
-import javax.annotation.Resource;
-
 import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
 import org.springframework.cache.annotation.EnableCaching;
-import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.transaction.annotation.EnableTransactionManagement;
-import org.springframework.web.multipart.MultipartResolver;
-import org.springframework.web.multipart.commons.CommonsMultipartResolver;
-
-import com.qmth.themis.backend.config.DictionaryConfig;
-import com.qmth.themis.business.config.SystemConfig;
-import com.qmth.themis.business.constant.SpringContextHolder;
-import com.qmth.themis.common.contanst.Constants;
 
 //import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
 
 @SpringBootApplication
-@ComponentScan(basePackages = { "com.qmth.themis" })
+@ComponentScan(basePackages = {"com.qmth.themis"})
 @MapperScan("com.qmth.themis.business.dao")
 //主要就是定义扫描的路径从中找出标识了需要装配的类自动装配到spring的bean容器中,做过web开发的同学一定都有用过@Controller,@Service,@Repository注解,查看其源码你会发现,他们中有一个共同的注解@Component,没错@ComponentScan注解默认就会装配标识了@Controller,@Service,@Repository,@Component注解的类到spring容器中
-@EntityScan(basePackages = { "com.qmth.themis.business.entity" }) // 用来扫描和发现指定包及其子包中的Entity定义
+@EntityScan(basePackages = {"com.qmth.themis.business.entity"}) // 用来扫描和发现指定包及其子包中的Entity定义
 @EnableTransactionManagement // spring开启事务支持
 @EnableAsync // 开启异步任务
 @EnableCaching // 开启缓存注解
 //@EnableMongoRepositories("com.qmth.themis.backend.mongodb.repository")
 public class ThemisBackendApplication {
 
-	public static void main(String[] args) {
-		SpringApplication.run(ThemisBackendApplication.class, args);
-		init();
-	}
-
-	@Resource
-	DictionaryConfig dictionaryConfig;
-
-	/**
-	 * 附件上传配置
-	 *
-	 * @return
-	 */
-	@Bean
-	public MultipartResolver multipartResolver() {
-		CommonsMultipartResolver resolver = new CommonsMultipartResolver();
-		resolver.setDefaultEncoding(Constants.CHARSET_NAME);
-		resolver.setResolveLazily(true);// resolveLazily属性启用是为了推迟文件解析,以在在UploadAction中捕获文件大小异常
-		resolver.setMaxInMemorySize(2);// 低于此值,只保留在内存里,超过此阈值,生成硬盘上的临时文件。
-		resolver.setMaxUploadSize(200 * 1024 * 1024);// 最大200M
-		return resolver;
-	}
-
-	private static void init() {
-		SystemConfig conf=SpringContextHolder.getBean(SystemConfig.class);
-		String tempDir = conf.getProperty("sys.config.tempDataDir");
-		File dir=new File(tempDir);
-		if(!dir.exists()) {
-			dir.mkdirs();
-		}
-	}
+    public static void main(String[] args) {
+        SpringApplication.run(ThemisBackendApplication.class, args);
+    }
 }

+ 4 - 5
themis-backend/src/main/java/com/qmth/themis/backend/api/TBExamInvigilateUserController.java

@@ -21,6 +21,8 @@ import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
 import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.enums.MqTagEnum;
+import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import io.swagger.annotations.*;
 import org.springframework.dao.DuplicateKeyException;
@@ -50,9 +52,6 @@ public class TBExamInvigilateUserController {
     @Resource
     MqDtoService mqDtoService;
 
-    @Resource
-    DictionaryConfig dictionaryConfig;
-
     @Resource
     TBTaskHistoryService taskHistoryService;
 
@@ -151,7 +150,7 @@ public class TBExamInvigilateUserController {
             transMap.put("orgId", tbUser.getOrgId());
             transMap.put("remark", tbAttachment.getRemark());
             //mq发送消息start
-            MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getTaskTopic(), dictionaryConfig.mqConfigDomain().getTaskTopicRoomCodeImportTag(), transMap, MqEnum.TASK_LOG, String.valueOf(tbTaskHistory.getId()), tbUser.getName());
+            MqDto mqDto = new MqDto(MqTopicEnum.taskTopic.getCode(), MqTagEnum.roomCodeImport.name(), transMap, MqEnum.TASK_LOG, String.valueOf(tbTaskHistory.getId()), tbUser.getName());
             mqDtoService.assembleSendOneWayMsg(mqDto);
             //mq发送消息end
         } catch (Exception e) {
@@ -188,7 +187,7 @@ public class TBExamInvigilateUserController {
                 transMap.put("createId", tbUser.getId());
                 transMap.put("orgId", tbUser.getOrgId());
                 //mq发送消息start
-                MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getTaskTopic(), dictionaryConfig.mqConfigDomain().getTaskTopicRoomCodeExportTag(), transMap, MqEnum.TASK_LOG, String.valueOf(tbTaskHistory.getId()), tbUser.getName());
+                MqDto mqDto = new MqDto(MqTopicEnum.taskTopic.getCode(), MqTagEnum.roomCodeExport.name(), transMap, MqEnum.TASK_LOG, String.valueOf(tbTaskHistory.getId()), tbUser.getName());
                 mqDtoService.assembleSendOneWayMsg(mqDto);
                 //mq发送消息end
             } else {

+ 12 - 9
themis-backend/src/main/java/com/qmth/themis/backend/api/TBUserController.java

@@ -2,7 +2,6 @@ package com.qmth.themis.backend.api;
 
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.google.gson.Gson;
-import com.qmth.themis.backend.config.DictionaryConfig;
 import com.qmth.themis.backend.util.ServletUtil;
 import com.qmth.themis.business.annotation.ApiJsonObject;
 import com.qmth.themis.business.annotation.ApiJsonProperty;
@@ -31,6 +30,7 @@ import com.qmth.themis.common.signature.SignatureType;
 import com.qmth.themis.common.util.AesUtil;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
+import com.qmth.themis.mq.enums.MqTagEnum;
 import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import io.swagger.annotations.*;
@@ -63,9 +63,6 @@ public class TBUserController {
     @Resource
     CacheService cacheService;
 
-    @Resource
-    DictionaryConfig dictionaryConfig;
-
     @Resource
     RedisUtil redisUtil;
 
@@ -151,15 +148,21 @@ public class TBUserController {
         String token = RandomStringUtils.randomAlphanumeric(32);
         //添加用户缓存
         redisUtil.setUser(user.getId(), user);
+        String source = null;
+        if (Objects.equals(platform.name(), Platform.win.name()) || Objects.equals(platform.name(), Platform.mac.name()) || Objects.equals(platform.name(), Platform.ios.name()) || Objects.equals(platform.name(), Platform.android.name())) {
+            source = platform.getSource().split(",")[0];
+        } else {
+            source = platform.getSource();
+        }
         //添加用户会话缓存
-        String sessionId = SessionUtil.digest(user.getId(), Math.abs(authDto.getRoleCodes().toString().hashCode()), platform.getSource());
+        String sessionId = SessionUtil.digest(user.getId(), Math.abs(authDto.getRoleCodes().toString().hashCode()), source);
 
         Date expire = SystemConstant.getExpireTime(platform);
-        TBSession tbSession = new TBSession(sessionId, String.valueOf(user.getId()), authDto.getRoleCodes().toString(), platform.getSource(), platform.name(), deviceId, ServletUtil.getRequest().getLocalAddr(), token, expire);
+        TBSession tbSession = new TBSession(sessionId, String.valueOf(user.getId()), authDto.getRoleCodes().toString(), source, platform.name(), deviceId, ServletUtil.getRequest().getLocalAddr(), token, expire);
         redisUtil.setUserSession(sessionId, tbSession);
         //mq发送消息start
-        mqDtoService.assembleSendOneWayMsg(MqTopicEnum.sessionTopic.getCode(), platform.getSource(), tbSession, MqEnum.SESSION.name(), tbSession.getId(), user.getLoginName());
-        mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getUserLogTopic(), authDto.getRoleCodes().toString().contains(RoleEnum.STUDENT.name()) ? dictionaryConfig.mqConfigDomain().getUserLogTopicStudentTag() : dictionaryConfig.mqConfigDomain().getUserLogTopicUserTag(), SystemOperationEnum.LOGIN, MqEnum.USER_LOG.name(), user.getId(), user.getLoginName());
+        mqDtoService.assembleSendOneWayMsg(MqTopicEnum.sessionTopic.getCode(), platform.name(), tbSession, MqEnum.SESSION.name(), tbSession.getId(), user.getLoginName());
+        mqDtoService.assembleSendOneWayMsg(MqTopicEnum.userLogTopic.getCode(), authDto.getRoleCodes().toString().contains(RoleEnum.STUDENT.name()) ? MqTagEnum.student.name() : MqTagEnum.user.name(), SystemOperationEnum.LOGIN, MqEnum.USER_LOG.name(), user.getId(), user.getLoginName());
         //mq发送消息end
         //测试
         String test = SignatureInfo.build(SignatureType.TOKEN, sessionId, token);
@@ -505,7 +508,7 @@ public class TBUserController {
             cacheService.removeAccountCache(tbUser.getId());
         }
         //mq发送消息start
-        mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getUserLogTopic(), authDto.getRoleCodes().toString().contains(RoleEnum.STUDENT.name()) ? dictionaryConfig.mqConfigDomain().getUserLogTopicStudentTag() : dictionaryConfig.mqConfigDomain().getUserLogTopicUserTag(), SystemOperationEnum.LOGOUT, MqEnum.USER_LOG.name(), tbUser.getId(), tbUser.getLoginName());
+        mqDtoService.assembleSendOneWayMsg(MqTopicEnum.userLogTopic.getCode(), authDto.getRoleCodes().toString().contains(RoleEnum.STUDENT.name()) ? MqTagEnum.student.name() : MqTagEnum.user.name(), SystemOperationEnum.LOGOUT, MqEnum.USER_LOG.name(), tbUser.getId(), tbUser.getLoginName());
         //mq发送消息end
         return ResultUtil.ok(JacksonUtil.parseJson(SystemConstant.SUCCESS));
     }

+ 7 - 5
themis-backend/src/main/java/com/qmth/themis/backend/api/TEExamActivityController.java

@@ -19,6 +19,8 @@ import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
 import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.enums.MqTagEnum;
+import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import io.swagger.annotations.*;
 import org.springframework.dao.DuplicateKeyException;
@@ -26,7 +28,10 @@ import org.springframework.transaction.annotation.Transactional;
 import org.springframework.web.bind.annotation.*;
 
 import javax.annotation.Resource;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 /**
  * @Description: 考试场次 前端控制器
@@ -46,9 +51,6 @@ public class TEExamActivityController {
     @Resource
     MqDtoService mqDtoService;
 
-    @Resource
-    DictionaryConfig dictionaryConfig;
-
     @Resource
     TEExamService teExamService;
 
@@ -76,7 +78,7 @@ public class TEExamActivityController {
             Map<String, Object> prop = new HashMap<>();
             prop.put("oper", "insert");
             prop.put("exam", teExam);
-            MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getQuartzTopic(), dictionaryConfig.mqConfigDomain().getQuartzTopicExamActivityTag(), JacksonUtil.parseJson(teExamActivityList), MqEnum.QUARTZ_LOG, String.valueOf(teExam.getId()), prop, tbUser.getName());
+            MqDto mqDto = new MqDto(MqTopicEnum.quartzTopic.getCode(), MqTagEnum.examActivity.name(), JacksonUtil.parseJson(teExamActivityList), MqEnum.QUARTZ_LOG, String.valueOf(teExam.getId()), prop, tbUser.getName());
             mqDtoService.assembleSendOneWayMsg(mqDto);
             //新增quartz任务,发送mq消息end
         } catch (Exception e) {

+ 5 - 7
themis-backend/src/main/java/com/qmth/themis/backend/api/TEExamController.java

@@ -24,9 +24,10 @@ import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
 import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.enums.MqTagEnum;
+import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import io.swagger.annotations.*;
-import org.springframework.beans.BeanUtils;
 import org.springframework.dao.DuplicateKeyException;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.web.bind.annotation.*;
@@ -55,9 +56,6 @@ public class TEExamController {
     @Resource
     MqDtoService mqDtoService;
 
-    @Resource
-    DictionaryConfig dictionaryConfig;
-
     @ApiOperation(value = "考试批次修改/新增接口")
     @RequestMapping(value = "/save", method = RequestMethod.POST)
     @Transactional
@@ -92,7 +90,7 @@ public class TEExamController {
                 //删除quartz任务,发送mq消息start
                 Map<String, Object> prop = new HashMap<>();
                 prop.put("oper", "delete");
-                MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getQuartzTopic(), dictionaryConfig.mqConfigDomain().getQuartzTopicExamActivityTag(), JacksonUtil.parseJson(teExamActivityList), MqEnum.QUARTZ_LOG, String.valueOf(teExam.getId()), prop, tbUser.getName());
+                MqDto mqDto = new MqDto(MqTopicEnum.quartzTopic.getCode(), MqTagEnum.examActivity.name(), JacksonUtil.parseJson(teExamActivityList), MqEnum.QUARTZ_LOG, String.valueOf(teExam.getId()), prop, tbUser.getName());
                 mqDtoService.assembleSendOneWayMsg(mqDto);
                 //删除quartz任务,发送mq消息end
 
@@ -110,7 +108,7 @@ public class TEExamController {
                         Map<String, Object> prop = new HashMap<>();
                         prop.put("oper", "insert");
                         prop.put("exam", teExam);
-                        MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getQuartzTopic(), dictionaryConfig.mqConfigDomain().getQuartzTopicExamActivityTag(), JacksonUtil.parseJson(Arrays.asList(teExamActivity)), MqEnum.QUARTZ_LOG, String.valueOf(teExam.getId()), prop, tbUser.getName());
+                        MqDto mqDto = new MqDto(MqTopicEnum.quartzTopic.getCode(), MqTagEnum.examActivity.name(), JacksonUtil.parseJson(Arrays.asList(teExamActivity)), MqEnum.QUARTZ_LOG, String.valueOf(teExam.getId()), prop, tbUser.getName());
                         mqDtoService.assembleSendOneWayMsg(mqDto);
                         //新增quartz任务,发送mq消息end
                     }
@@ -129,7 +127,7 @@ public class TEExamController {
                     Map<String, Object> prop = new HashMap<>();
                     prop.put("oper", "insert");
                     prop.put("exam", teExam);
-                    MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getQuartzTopic(), dictionaryConfig.mqConfigDomain().getQuartzTopicExamActivityTag(), JacksonUtil.parseJson(teExamActivityList), MqEnum.QUARTZ_LOG, String.valueOf(teExam.getId()), prop, tbUser.getName());
+                    MqDto mqDto = new MqDto(MqTopicEnum.quartzTopic.getCode(), MqTagEnum.examActivity.name(), JacksonUtil.parseJson(teExamActivityList), MqEnum.QUARTZ_LOG, String.valueOf(teExam.getId()), prop, tbUser.getName());
                     mqDtoService.assembleSendOneWayMsg(mqDto);
                     //新增quartz任务,发送mq消息end
                 }

+ 3 - 4
themis-backend/src/main/java/com/qmth/themis/backend/api/TEExamPaperController.java

@@ -21,6 +21,8 @@ import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
 import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.enums.MqTagEnum;
+import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import io.swagger.annotations.*;
 import org.springframework.dao.DuplicateKeyException;
@@ -61,9 +63,6 @@ public class TEExamPaperController {
     @Resource
     MqDtoService mqDtoService;
 
-    @Resource
-    DictionaryConfig dictionaryConfig;
-
     @Resource
     SystemConfig systemConfig;
 
@@ -161,7 +160,7 @@ public class TEExamPaperController {
             transMap.put("optionShuffle", optionShuffle);
             transMap.put("audioPlayCount", audioPlayCount);
             //mq发送消息start
-            MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getTaskTopic(), dictionaryConfig.mqConfigDomain().getTaskTopicExamPaperImportTag(), transMap, MqEnum.TASK_LOG, String.valueOf(tbTaskHistory.getId()), tbUser.getName());
+            MqDto mqDto = new MqDto(MqTopicEnum.taskTopic.getCode(), MqTagEnum.examPaperImport.name(), transMap, MqEnum.TASK_LOG, String.valueOf(tbTaskHistory.getId()), tbUser.getName());
             mqDtoService.assembleSendOneWayMsg(mqDto);
             //mq发送消息end
         } catch (Exception e) {

+ 3 - 4
themis-backend/src/main/java/com/qmth/themis/backend/api/TEExamStudentController.java

@@ -20,6 +20,8 @@ import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
 import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.enums.MqTagEnum;
+import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import io.swagger.annotations.*;
 import org.slf4j.Logger;
@@ -68,9 +70,6 @@ public class TEExamStudentController {
     @Resource
     MqDtoService mqDtoService;
 
-    @Resource
-    DictionaryConfig dictionaryConfig;
-
     @Resource
     SystemConfig systemConfig;
 
@@ -198,7 +197,7 @@ public class TEExamStudentController {
             }
             transMap.put("remark", tbAttachment.getRemark());
             //mq发送消息start
-            MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getTaskTopic(), dictionaryConfig.mqConfigDomain().getTaskTopicExamStudentImportTag(), transMap, MqEnum.TASK_LOG, String.valueOf(tbTaskHistory.getId()), tbUser.getName());
+            MqDto mqDto = new MqDto(MqTopicEnum.taskTopic.getCode(), MqTagEnum.examStudentImport.name(), transMap, MqEnum.TASK_LOG, String.valueOf(tbTaskHistory.getId()), tbUser.getName());
             mqDtoService.assembleSendOneWayMsg(mqDto);
             //mq发送消息end
         } catch (Exception e) {

+ 0 - 11
themis-backend/src/main/java/com/qmth/themis/backend/config/DictionaryConfig.java

@@ -69,15 +69,4 @@ public class DictionaryConfig {
     public AliYunOssDomain aliYunOssDomain() {
         return new AliYunOssDomain();
     }
-
-    /**
-     * mq配置
-     *
-     * @return
-     */
-    @Bean
-    @ConfigurationProperties(prefix = "mq.config", ignoreUnknownFields = false)
-    public MqConfigDomain mqConfigDomain() {
-        return new MqConfigDomain();
-    }
 }

+ 47 - 1
themis-backend/src/main/java/com/qmth/themis/backend/start/StartRunning.java

@@ -1,10 +1,15 @@
 package com.qmth.themis.backend.start;
 
+import com.qmth.themis.business.config.SystemConfig;
+import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.mq.enums.MqGroupEnum;
 import com.qmth.themis.mq.enums.MqTagEnum;
 import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.listener.RocketMessageConsumer;
 import com.qmth.themis.mq.templete.impl.SessionConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.TaskConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.UserLogConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.WebsocketUnNormalConcurrentlyImpl;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -13,6 +18,7 @@ import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.io.File;
 
 /**
  * @Description: 服务启动时初始化运行,哪个微服务模块需要则拿此模版去用
@@ -34,7 +40,47 @@ public class StartRunning implements CommandLineRunner {
     @Override
     public void run(String... args) throws Exception {
         log.info("服务器启动时执行 start");
-        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerWebGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.web.name(), MessageModel.CLUSTERING,new SessionConcurrentlyImpl());
+        /**
+         * session mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerWebGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.web.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerWinGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.win.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerMacGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.mac.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerWxappGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.wxapp.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerIosGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.ios.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerAndroidGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.android.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        /**
+         * session mq end
+         */
+
+        /**
+         * userLog mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.userLogConsumerUserGroup.getCode(), MqTopicEnum.userLogTopic.getCode(), MqTagEnum.user.name(), MessageModel.CLUSTERING, new UserLogConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.userLogConsumerStudentGroup.getCode(), MqTopicEnum.userLogTopic.getCode(), MqTagEnum.student.name(), MessageModel.CLUSTERING, new UserLogConcurrentlyImpl());
+        /**
+         * userLog mq end
+         */
+
+        /**
+         * task mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerExamStudentImportGroup.getCode(), MqTopicEnum.taskTopic.getCode(), MqTagEnum.examStudentImport.name(), MessageModel.CLUSTERING, new TaskConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerRoomCodeImportGroup.getCode(), MqTopicEnum.taskTopic.getCode(), MqTagEnum.roomCodeImport.name(), MessageModel.CLUSTERING, new TaskConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerRoomCodeExportGroup.getCode(), MqTopicEnum.taskTopic.getCode(), MqTagEnum.roomCodeExport.name(), MessageModel.CLUSTERING, new TaskConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerExamPaperImportGroup.getCode(), MqTopicEnum.taskTopic.getCode(), MqTagEnum.examPaperImport.name(), MessageModel.CLUSTERING, new TaskConcurrentlyImpl());
+        /**
+         * task mq end
+         */
+
+        /**
+         * websocket mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.websocketUnNormalConsumerOeGroup.getCode(), MqTopicEnum.websocketUnNormalTopic.getCode(), MqTagEnum.oe.name(), MessageModel.CLUSTERING, new WebsocketUnNormalConcurrentlyImpl());
+        /**
+         * websocket mq end
+         */
+        SystemConstant.initTempFiles();
         log.info("服务器启动时执行 end");
     }
 }

+ 3 - 2
themis-backend/src/main/java/com/qmth/themis/backend/websocket/WebSocketAdminServer.java

@@ -20,6 +20,8 @@ import com.qmth.themis.common.signature.SignatureInfo;
 import com.qmth.themis.common.signature.SignatureType;
 import com.qmth.themis.common.util.Result;
 import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.enums.MqTagEnum;
+import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -149,14 +151,13 @@ public class WebSocketAdminServer
                 redisUtil.set(SystemConstant.WEBSOCKET_UN_NORMAL_LIST, this.sessionId, this.sessionId);
                 //发送延时mq消息start
                 MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
-                DictionaryConfig dictionaryConfig = SpringContextHolder.getBean(DictionaryConfig.class);
                 String level = "2m";
                 Integer time = SystemConstant.mqDelayLevel.get(level);
                 LocalDateTime dt = LocalDateTime.now();
                 dt = dt.plusMinutes(Long.parseLong(level.replace("m", "")));
                 tranMap.put("timeOut", time);
                 tranMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
-                MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getWebsocketUnNormalTopic(), dictionaryConfig.mqConfigDomain().getWebsocketUnNormalTopicOeTag(), SystemOperationEnum.OE_NET_UN_NORMAL, MqEnum.WEBSOCKET_UN_NORMAL_LOG, this.sessionId, this.tranMap, this.sessionId);
+                MqDto mqDto = new MqDto(MqTopicEnum.websocketUnNormalTopic.getCode(), MqTagEnum.oe.name(), SystemOperationEnum.OE_NET_UN_NORMAL, MqEnum.WEBSOCKET_UN_NORMAL_LOG, this.sessionId, this.tranMap, this.sessionId);
                 mqDtoService.assembleSendAsyncDelayMsg(mqDto);
                 //发送延时mq消息end
             }

+ 1 - 64
themis-backend/src/main/resources/application.properties

@@ -16,7 +16,7 @@ db.host=localhost
 db.port=3306
 db.name=themis
 db.username=root
-db.password=root
+db.password=123456789
 #redis\u6570\u636E\u6E90\u914D\u7F6E
 redis.host=${db.host}
 redis.database=15
@@ -155,69 +155,6 @@ rocketmq.producer.enable-msg-trace=true
 #\u81EA\u5B9A\u4E49\u7684\u6D88\u606F\u8F68\u8FF9\u4E3B\u9898
 rocketmq.producer.customized-trace-topic=my-trace-topic
 
-mq.config.server=themis
-#session_topic\u76D1\u542C
-mq.config.sessionConsumerGroup=${mq.config.server}-group-session
-
-mq.config.sessionTopicWxappMonitorTag=wxapp_monitor
-mq.config.sessionConsumerWxappMonitorGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicWxappMonitorTag}
-
-mq.config.sessionTopicWxappAnswerTag=wxapp_answer
-mq.config.sessionConsumerWxappAnswerGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicWxappAnswerTag}
-
-mq.config.sessionTopicPcTag=pc
-mq.config.sessionConsumerPcGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicPcTag}
-
-#user_login\u76D1\u542C
-mq.config.userLogTopic=${mq.config.server}-topic-userLog
-mq.config.userLogConsumerGroup=${mq.config.server}-group-userLog
-
-mq.config.userLogTopicUserTag=user
-mq.config.userLogConsumerUserGroup=${mq.config.userLogConsumerGroup}-${mq.config.userLogTopicUserTag}
-
-mq.config.userLogTopicStudentTag=student
-mq.config.userLogConsumerStudentGroup=${mq.config.userLogConsumerGroup}-${mq.config.userLogTopicStudentTag}
-
-#task_topic\u76D1\u542C
-mq.config.taskTopic=${mq.config.server}-topic-task
-mq.config.taskConsumerGroup=${mq.config.server}-group-task
-
-#\u8003\u751F\u5BFC\u5165
-mq.config.taskTopicExamStudentImportTag=examStudentImport
-mq.config.taskConsumerExamStudentImportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicExamStudentImportTag}
-
-#\u8003\u573A\u5BFC\u5165
-mq.config.taskTopicRoomCodeImportTag=roomCodeImport
-mq.config.taskConsumerRoomCodeImportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicRoomCodeImportTag}
-
-#\u8003\u573A\u5BFC\u51FA
-mq.config.taskTopicRoomCodeExportTag=roomCodeExport
-mq.config.taskConsumerRoomCodeExportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicRoomCodeExportTag}
-
-#\u8BD5\u5377\u5BFC\u5165
-mq.config.taskTopicExamPaperImportTag=examPaperImport
-mq.config.taskConsumerExamPaperImportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicExamPaperImportTag}
-
-#websocket\u8D85\u65F6\u9000\u51FA\u76D1\u542C
-mq.config.websocketUnNormalTopic=${mq.config.server}-topic-websocketUnNormal
-mq.config.websocketUnNormalConsumerGroup=${mq.config.server}-group-websocketUnNormal
-
-#oe\u8003\u751F\u7AEF
-mq.config.websocketUnNormalTopicOeTag=oe
-mq.config.websocketUnNormalConsumerOeGroup=${mq.config.websocketUnNormalConsumerGroup}-${mq.config.websocketUnNormalTopicOeTag}
-
-#quartz
-mq.config.quartzTopic=${mq.config.server}-topic-quartz
-mq.config.quartzConsumerGroup=${mq.config.server}-group-quartz
-
-#quartz-\u8003\u8BD5\u573A\u6B21task
-mq.config.quartzTopicExamActivityTag=examActivity
-mq.config.quartzConsumerExamActivityGroup=${mq.config.quartzConsumerGroup}-${mq.config.quartzTopicExamActivityTag}
-
-#dlq\u6B7B\u4FE1\u961F\u5217
-#mq.config.sessionConsumerGroupDlq=${mq.config.sessionConsumerGroup}-dlq
-#mq.config.sessionTopicDlq=%DLQ%${mq.config.sessionConsumerGroup}
-
 #api\u524D\u7F00
 prefix.url.admin=api/admin
 

+ 33 - 0
themis-business/src/main/java/com/qmth/themis/business/config/MultipartConfig.java

@@ -0,0 +1,33 @@
+package com.qmth.themis.business.config;
+
+/**
+* @Description: Multipart 附件上传配置
+* @Param:
+* @return:
+* @Author: wangliang
+* @Date: 2020/7/29
+*/
+import com.qmth.themis.common.contanst.Constants;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.multipart.MultipartResolver;
+import org.springframework.web.multipart.commons.CommonsMultipartResolver;
+
+@Configuration
+public class MultipartConfig {
+
+    /**
+     * 附件上传配置
+     *
+     * @return
+     */
+    @Bean
+    public MultipartResolver multipartResolver() {
+        CommonsMultipartResolver resolver = new CommonsMultipartResolver();
+        resolver.setDefaultEncoding(Constants.CHARSET_NAME);
+        resolver.setResolveLazily(true);// resolveLazily属性启用是为了推迟文件解析,以在在UploadAction中捕获文件大小异常
+        resolver.setMaxInMemorySize(2);// 低于此值,只保留在内存里,超过此阈值,生成硬盘上的临时文件。
+        resolver.setMaxUploadSize(200 * 1024 * 1024);// 最大200M
+        return resolver;
+    }
+}

+ 32 - 11
themis-business/src/main/java/com/qmth/themis/business/constant/SystemConstant.java

@@ -1,9 +1,11 @@
 package com.qmth.themis.business.constant;
 
+import com.qmth.themis.business.config.SystemConfig;
 import com.qmth.themis.common.contanst.Constants;
 import com.qmth.themis.common.enums.Platform;
 import com.qmth.themis.common.enums.Source;
 
+import java.io.File;
 import java.util.*;
 
 /**
@@ -74,9 +76,11 @@ public class SystemConstant {
      * session过期时间
      */
     public static final int WEB_SESSION_EXPIRE = 1;//过期时间1天
-    public static final int PC_SESSION_EXPIRE = 1;//过期时间1天
-    public static final int WXAPP_VIDEO_SESSION_EXPIRE = 30;//过期时间30天
-    public static final int WXAPP_ANSWER_SESSION_EXPIRE = 30;//过期时间30天
+    public static final int WIN_SESSION_EXPIRE = 1;//过期时间1天
+    public static final int MAC_SESSION_EXPIRE = 1;//过期时间1天
+    public static final int WXAPP_SESSION_EXPIRE = 30;//过期时间30天
+    public static final int IOS_SESSION_EXPIRE = 30;//过期时间30天
+    public static final int ANDROID_SESSION_EXPIRE = 30;//过期时间30天
     /**
      * redis分布式锁
      */
@@ -129,7 +133,7 @@ public class SystemConstant {
     public static final String WEBSOCKET_UN_NORMAL_LIST = "websocket:oe:unnormal:list";
     public static final String GET = "get";
     public static final long WEBSOCKET_MAX_TIME_OUT = 3 * 60 * 1000;
-//        public static final long WEBSOCKET_MAX_TIME_OUT = 10 * 1000;
+    //        public static final long WEBSOCKET_MAX_TIME_OUT = 10 * 1000;
     public static final String ACK_MESSAGE = "ackMessage";
     /**
      * 缓存配置
@@ -157,14 +161,22 @@ public class SystemConstant {
         Date now = new Date();
         Calendar calendar = Calendar.getInstance();
         calendar.setTime(now);
-        if (Objects.equals(platform.getSource(), Source.web.name())) {
+        if (platform.getSource().contains(Source.admin_web.name())) {
             calendar.add(Calendar.DAY_OF_YEAR, SystemConstant.WEB_SESSION_EXPIRE);
-        } else if (Objects.equals(platform.getSource(), Source.wxapp_answer.name())) {
-            calendar.add(Calendar.DAY_OF_YEAR, SystemConstant.WXAPP_ANSWER_SESSION_EXPIRE);
-        } else if (Objects.equals(platform.getSource(), Source.wxapp_monitor.name())) {
-            calendar.add(Calendar.DAY_OF_YEAR, SystemConstant.WXAPP_VIDEO_SESSION_EXPIRE);
-        } else if (Objects.equals(platform.getSource(), Source.pc.name())) {
-            calendar.add(Calendar.DAY_OF_YEAR, SystemConstant.PC_SESSION_EXPIRE);
+        } else if (Objects.equals(platform.name(), Platform.win.name()) &&
+                (platform.getSource().contains(Source.admin_client.name()) || platform.getSource().contains(Source.oe_client.name()))) {
+            calendar.add(Calendar.DAY_OF_YEAR, SystemConstant.WIN_SESSION_EXPIRE);
+        } else if (Objects.equals(platform.name(), Platform.mac.name()) &&
+                (platform.getSource().contains(Source.admin_client.name()) || platform.getSource().contains(Source.oe_client.name()))) {
+            calendar.add(Calendar.DAY_OF_YEAR, SystemConstant.MAC_SESSION_EXPIRE);
+        } else if (platform.getSource().contains(Source.oe_answer.name())) {
+            calendar.add(Calendar.DAY_OF_YEAR, SystemConstant.WXAPP_SESSION_EXPIRE);
+        } else if (Objects.equals(platform.name(), Platform.ios.name()) &&
+                (platform.getSource().contains(Source.oe_answer.name()) || platform.getSource().contains(Source.mobile_monitor_first.name()) || platform.getSource().contains(Source.mobile_monitor_second.name()))) {
+            calendar.add(Calendar.DAY_OF_YEAR, SystemConstant.IOS_SESSION_EXPIRE);
+        } else if (Objects.equals(platform.name(), Platform.android.name()) &&
+                (platform.getSource().contains(Source.oe_answer.name()) || platform.getSource().contains(Source.mobile_monitor_first.name()) || platform.getSource().contains(Source.mobile_monitor_second.name()))) {
+            calendar.add(Calendar.DAY_OF_YEAR, SystemConstant.ANDROID_SESSION_EXPIRE);
         }
         return calendar.getTime();
     }
@@ -211,4 +223,13 @@ public class SystemConstant {
         }
         return map;
     }
+
+    public static void initTempFiles() {
+        SystemConfig systemConfig = SpringContextHolder.getBean(SystemConfig.class);
+        String tempDir = systemConfig.getProperty("sys.config.tempDataDir");
+        File dir = new File(tempDir);
+        if (!dir.exists()) {
+            dir.mkdirs();
+        }
+    }
 }

+ 0 - 351
themis-business/src/main/java/com/qmth/themis/business/domain/MqConfigDomain.java

@@ -1,351 +0,0 @@
-package com.qmth.themis.business.domain;
-
-import java.io.Serializable;
-
-/**
- * @Description: mq配置
- * @Param:
- * @return:
- * @Author: wangliang
- * @Date: 2020/7/1
- */
-public class MqConfigDomain implements Serializable {
-
-    private String server;
-    private String sessionConsumerGroup;
-    //    private String sessionConsumerGroupDlq;
-//    private String sessionTopicDlq;
-    /**
-     * taskTopicExamStudentTag
-     * session topic
-     */
-    private String sessionTopicWxappMonitorTag;
-    private String sessionTopicWxappAnswerTag;
-    private String sessionTopicPcTag;
-    private String sessionConsumerWebGroup;
-    private String sessionConsumerWxappMonitorGroup;
-    private String sessionConsumerWxappAnswerGroup;
-    private String sessionConsumerPcGroup;
-
-    /**
-     * user topic
-     */
-    private String userLogTopic;
-    private String userLogConsumerGroup;
-    private String userLogTopicUserTag;
-    private String userLogConsumerUserGroup;
-    private String userLogTopicStudentTag;
-    private String userLogConsumerStudentGroup;
-
-    /**
-     * task group
-     */
-    private String taskTopic;
-    private String taskConsumerGroup;
-    private String taskTopicExamStudentImportTag;
-    private String taskConsumerExamStudentImportGroup;
-    private String taskTopicRoomCodeImportTag;
-    private String taskConsumerRoomCodeImportGroup;
-    private String taskTopicRoomCodeExportTag;
-    private String taskConsumerRoomCodeExportGroup;
-    private String taskTopicExamPaperImportTag;
-    private String taskConsumerExamPaperImportGroup;
-
-    /**
-     * websocket group
-     */
-    private String websocketUnNormalTopic;
-    private String websocketUnNormalConsumerGroup;
-    private String websocketUnNormalTopicOeTag;
-    private String websocketUnNormalConsumerOeGroup;
-
-    /**
-     * quartz
-     */
-    private String quartzTopic;
-    private String quartzConsumerGroup;
-    private String quartzTopicExamActivityTag;
-    private String quartzConsumerExamActivityGroup;
-
-    public String getQuartzTopic() {
-        return quartzTopic;
-    }
-
-    public void setQuartzTopic(String quartzTopic) {
-        this.quartzTopic = quartzTopic;
-    }
-
-    public String getQuartzConsumerGroup() {
-        return quartzConsumerGroup;
-    }
-
-    public void setQuartzConsumerGroup(String quartzConsumerGroup) {
-        this.quartzConsumerGroup = quartzConsumerGroup;
-    }
-
-    public String getQuartzTopicExamActivityTag() {
-        return quartzTopicExamActivityTag;
-    }
-
-    public void setQuartzTopicExamActivityTag(String quartzTopicExamActivityTag) {
-        this.quartzTopicExamActivityTag = quartzTopicExamActivityTag;
-    }
-
-    public String getQuartzConsumerExamActivityGroup() {
-        return quartzConsumerExamActivityGroup;
-    }
-
-    public void setQuartzConsumerExamActivityGroup(String quartzConsumerExamActivityGroup) {
-        this.quartzConsumerExamActivityGroup = quartzConsumerExamActivityGroup;
-    }
-
-    public String getWebsocketUnNormalTopic() {
-        return websocketUnNormalTopic;
-    }
-
-    public void setWebsocketUnNormalTopic(String websocketUnNormalTopic) {
-        this.websocketUnNormalTopic = websocketUnNormalTopic;
-    }
-
-    public String getWebsocketUnNormalConsumerGroup() {
-        return websocketUnNormalConsumerGroup;
-    }
-
-    public void setWebsocketUnNormalConsumerGroup(String websocketUnNormalConsumerGroup) {
-        this.websocketUnNormalConsumerGroup = websocketUnNormalConsumerGroup;
-    }
-
-    public String getWebsocketUnNormalTopicOeTag() {
-        return websocketUnNormalTopicOeTag;
-    }
-
-    public void setWebsocketUnNormalTopicOeTag(String websocketUnNormalTopicOeTag) {
-        this.websocketUnNormalTopicOeTag = websocketUnNormalTopicOeTag;
-    }
-
-    public String getWebsocketUnNormalConsumerOeGroup() {
-        return websocketUnNormalConsumerOeGroup;
-    }
-
-    public void setWebsocketUnNormalConsumerOeGroup(String websocketUnNormalConsumerOeGroup) {
-        this.websocketUnNormalConsumerOeGroup = websocketUnNormalConsumerOeGroup;
-    }
-
-    public String getTaskTopicRoomCodeExportTag() {
-        return taskTopicRoomCodeExportTag;
-    }
-
-    public void setTaskTopicRoomCodeExportTag(String taskTopicRoomCodeExportTag) {
-        this.taskTopicRoomCodeExportTag = taskTopicRoomCodeExportTag;
-    }
-
-    public String getTaskConsumerRoomCodeExportGroup() {
-        return taskConsumerRoomCodeExportGroup;
-    }
-
-    public void setTaskConsumerRoomCodeExportGroup(String taskConsumerRoomCodeExportGroup) {
-        this.taskConsumerRoomCodeExportGroup = taskConsumerRoomCodeExportGroup;
-    }
-
-    public String getTaskTopicExamStudentImportTag() {
-        return taskTopicExamStudentImportTag;
-    }
-
-    public void setTaskTopicExamStudentImportTag(String taskTopicExamStudentImportTag) {
-        this.taskTopicExamStudentImportTag = taskTopicExamStudentImportTag;
-    }
-
-    public String getTaskConsumerExamStudentImportGroup() {
-        return taskConsumerExamStudentImportGroup;
-    }
-
-    public void setTaskConsumerExamStudentImportGroup(String taskConsumerExamStudentImportGroup) {
-        this.taskConsumerExamStudentImportGroup = taskConsumerExamStudentImportGroup;
-    }
-
-    public String getTaskTopicRoomCodeImportTag() {
-        return taskTopicRoomCodeImportTag;
-    }
-
-    public void setTaskTopicRoomCodeImportTag(String taskTopicRoomCodeImportTag) {
-        this.taskTopicRoomCodeImportTag = taskTopicRoomCodeImportTag;
-    }
-
-    public String getTaskConsumerRoomCodeImportGroup() {
-        return taskConsumerRoomCodeImportGroup;
-    }
-
-    public void setTaskConsumerRoomCodeImportGroup(String taskConsumerRoomCodeImportGroup) {
-        this.taskConsumerRoomCodeImportGroup = taskConsumerRoomCodeImportGroup;
-    }
-
-    public String getTaskTopic() {
-        return taskTopic;
-    }
-
-    public void setTaskTopic(String taskTopic) {
-        this.taskTopic = taskTopic;
-    }
-
-    public String getTaskConsumerGroup() {
-        return taskConsumerGroup;
-    }
-
-    public void setTaskConsumerGroup(String taskConsumerGroup) {
-        this.taskConsumerGroup = taskConsumerGroup;
-    }
-
-    public String getSessionTopicWxappAnswerTag() {
-        return sessionTopicWxappAnswerTag;
-    }
-
-    public void setSessionTopicWxappAnswerTag(String sessionTopicWxappAnswerTag) {
-        this.sessionTopicWxappAnswerTag = sessionTopicWxappAnswerTag;
-    }
-
-    public String getSessionTopicWxappMonitorTag() {
-        return sessionTopicWxappMonitorTag;
-    }
-
-    public void setSessionTopicWxappMonitorTag(String sessionTopicWxappMonitorTag) {
-        this.sessionTopicWxappMonitorTag = sessionTopicWxappMonitorTag;
-    }
-
-    public String getSessionConsumerWxappMonitorGroup() {
-        return sessionConsumerWxappMonitorGroup;
-    }
-
-    public void setSessionConsumerWxappMonitorGroup(String sessionConsumerWxappMonitorGroup) {
-        this.sessionConsumerWxappMonitorGroup = sessionConsumerWxappMonitorGroup;
-    }
-
-    public String getSessionConsumerWxappAnswerGroup() {
-        return sessionConsumerWxappAnswerGroup;
-    }
-
-    public void setSessionConsumerWxappAnswerGroup(String sessionConsumerWxappAnswerGroup) {
-        this.sessionConsumerWxappAnswerGroup = sessionConsumerWxappAnswerGroup;
-    }
-
-    public String getSessionConsumerGroup() {
-        return sessionConsumerGroup;
-    }
-
-    public void setSessionConsumerGroup(String sessionConsumerGroup) {
-        this.sessionConsumerGroup = sessionConsumerGroup;
-    }
-
-//    public String getSessionConsumerGroupDlq() {
-//        return sessionConsumerGroupDlq;
-//    }
-//
-//    public void setSessionConsumerGroupDlq(String sessionConsumerGroupDlq) {
-//        this.sessionConsumerGroupDlq = sessionConsumerGroupDlq;
-//    }
-//
-//    public String getSessionTopicDlq() {
-//        return sessionTopicDlq;
-//    }
-//
-//    public void setSessionTopicDlq(String sessionTopicDlq) {
-//        this.sessionTopicDlq = sessionTopicDlq;
-//    }
-
-    public String getSessionTopicPcTag() {
-        return sessionTopicPcTag;
-    }
-
-    public void setSessionTopicPcTag(String sessionTopicPcTag) {
-        this.sessionTopicPcTag = sessionTopicPcTag;
-    }
-
-    public String getSessionConsumerWebGroup() {
-        return sessionConsumerWebGroup;
-    }
-
-    public void setSessionConsumerWebGroup(String sessionConsumerWebGroup) {
-        this.sessionConsumerWebGroup = sessionConsumerWebGroup;
-    }
-
-    public String getSessionConsumerPcGroup() {
-        return sessionConsumerPcGroup;
-    }
-
-    public void setSessionConsumerPcGroup(String sessionConsumerPcGroup) {
-        this.sessionConsumerPcGroup = sessionConsumerPcGroup;
-    }
-
-    public String getServer() {
-        return server;
-    }
-
-    public void setServer(String server) {
-        this.server = server;
-    }
-
-    public String getUserLogTopic() {
-        return userLogTopic;
-    }
-
-    public void setUserLogTopic(String userLogTopic) {
-        this.userLogTopic = userLogTopic;
-    }
-
-    public String getUserLogConsumerGroup() {
-        return userLogConsumerGroup;
-    }
-
-    public void setUserLogConsumerGroup(String userLogConsumerGroup) {
-        this.userLogConsumerGroup = userLogConsumerGroup;
-    }
-
-    public String getUserLogTopicUserTag() {
-        return userLogTopicUserTag;
-    }
-
-    public void setUserLogTopicUserTag(String userLogTopicUserTag) {
-        this.userLogTopicUserTag = userLogTopicUserTag;
-    }
-
-    public String getUserLogConsumerUserGroup() {
-        return userLogConsumerUserGroup;
-    }
-
-    public void setUserLogConsumerUserGroup(String userLogConsumerUserGroup) {
-        this.userLogConsumerUserGroup = userLogConsumerUserGroup;
-    }
-
-    public String getUserLogTopicStudentTag() {
-        return userLogTopicStudentTag;
-    }
-
-    public void setUserLogTopicStudentTag(String userLogTopicStudentTag) {
-        this.userLogTopicStudentTag = userLogTopicStudentTag;
-    }
-
-    public String getUserLogConsumerStudentGroup() {
-        return userLogConsumerStudentGroup;
-    }
-
-    public void setUserLogConsumerStudentGroup(String userLogConsumerStudentGroup) {
-        this.userLogConsumerStudentGroup = userLogConsumerStudentGroup;
-    }
-
-	public String getTaskTopicExamPaperImportTag() {
-		return taskTopicExamPaperImportTag;
-	}
-
-	public void setTaskTopicExamPaperImportTag(String taskTopicExamPaperImportTag) {
-		this.taskTopicExamPaperImportTag = taskTopicExamPaperImportTag;
-	}
-
-	public String getTaskConsumerExamPaperImportGroup() {
-		return taskConsumerExamPaperImportGroup;
-	}
-
-	public void setTaskConsumerExamPaperImportGroup(String taskConsumerExamPaperImportGroup) {
-		this.taskConsumerExamPaperImportGroup = taskConsumerExamPaperImportGroup;
-	}
-    
-    
-}

+ 6 - 1
themis-common/src/main/java/com/qmth/themis/common/enums/Platform.java

@@ -2,7 +2,12 @@ package com.qmth.themis.common.enums;
 
 public enum Platform {
 
-    web("web"), wap("web"), wxapp_monitor("wxapp_monitor"), wxapp_answer("wxapp_answer"), win("pc");
+    web("admin_web"),
+    win("admin_client,oe_client"),
+    mac("admin_client,oe_client"),
+    wxapp("oe_answer"),
+    ios("oe_answer,mobile_monitor_first,mobile_monitor_second"),
+    android("oe_answer,mobile_monitor_first,mobile_monitor_second");
 
     public static Platform findByName(String name) {
         if (name == null) {

+ 1 - 1
themis-common/src/main/java/com/qmth/themis/common/enums/Source.java

@@ -2,7 +2,7 @@ package com.qmth.themis.common.enums;
 
 public enum Source {
 
-    web, wxapp_monitor, wxapp_answer, pc;
+    admin_web, admin_client, oe_client, oe_answer,mobile_monitor_first,mobile_monitor_second;
 
     public static Source findByName(String name) {
         if (name == null) {

+ 0 - 25
themis-exam/src/main/java/com/qmth/themis/exam/ThemisExamApplication.java

@@ -30,30 +30,5 @@ public class ThemisExamApplication {
 
     public static void main(String[] args) {
         SpringApplication.run(ThemisExamApplication.class, args);
-        init();
     }
-
-    /**
-     * 附件上传配置
-     *
-     * @return
-     */
-    @Bean
-    public MultipartResolver multipartResolver() {
-        CommonsMultipartResolver resolver = new CommonsMultipartResolver();
-        resolver.setDefaultEncoding(Constants.CHARSET_NAME);
-        resolver.setResolveLazily(true);//resolveLazily属性启用是为了推迟文件解析,以在在UploadAction中捕获文件大小异常
-        resolver.setMaxInMemorySize(2);//低于此值,只保留在内存里,超过此阈值,生成硬盘上的临时文件。
-        resolver.setMaxUploadSize(200 * 1024 * 1024);//最大200M
-        return resolver;
-    }
-    
-	private static void init() {
-		SystemConfig conf=SpringContextHolder.getBean(SystemConfig.class);
-		String tempDir = conf.getProperty("sys.config.tempDataDir");
-		File dir=new File(tempDir);
-		if(!dir.exists()) {
-			dir.mkdirs();
-		}
-	}
 }

+ 14 - 8
themis-exam/src/main/java/com/qmth/themis/exam/api/TEStudentController.java

@@ -25,6 +25,7 @@ import com.qmth.themis.common.util.Result;
 import com.qmth.themis.common.util.ResultUtil;
 import com.qmth.themis.exam.config.DictionaryConfig;
 import com.qmth.themis.exam.util.ServletUtil;
+import com.qmth.themis.mq.enums.MqTagEnum;
 import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import io.swagger.annotations.*;
@@ -56,9 +57,6 @@ public class TEStudentController {
     @Resource
     CacheService cacheService;
 
-    @Resource
-    DictionaryConfig dictionaryConfig;
-
     @Resource
     RedisUtil redisUtil;
 
@@ -139,15 +137,23 @@ public class TEStudentController {
         String token = RandomStringUtils.randomAlphanumeric(32);
         //添加用户缓存
         redisUtil.setStudent(teStudent.getId(), teStudent);
+        String source = null;
+        if (Objects.equals(platform.name(), Platform.win.name()) || Objects.equals(platform.name(), Platform.mac.name())) {
+            source = platform.getSource().split(",")[1];
+        } else if (Objects.equals(platform.name(), Platform.wxapp.name()) || Objects.equals(platform.name(), Platform.web.name())) {
+            source = platform.getSource();
+        } else if (Objects.equals(platform.name(), Platform.ios.name()) || Objects.equals(platform.name(), Platform.android.name())) {
+            source = platform.getSource().split(",")[2];
+        }
         //添加用户会话缓存
-        String sessionId = SessionUtil.digest(teStudent.getIdentity(), Math.abs(authDto.getRoleCodes().toString().hashCode()), platform.getSource());
+        String sessionId = SessionUtil.digest(teStudent.getIdentity(), Math.abs(authDto.getRoleCodes().toString().hashCode()), source);
 
         Date expire = SystemConstant.getExpireTime(platform);
-        TBSession tbSession = new TBSession(sessionId, String.valueOf(teStudent.getId()), authDto.getRoleCodes().toString(), platform.getSource(), platform.name(), deviceId, ServletUtil.getRequest().getLocalAddr(), token, expire);
+        TBSession tbSession = new TBSession(sessionId, String.valueOf(teStudent.getId()), authDto.getRoleCodes().toString(), source, platform.name(), deviceId, ServletUtil.getRequest().getLocalAddr(), token, expire);
         redisUtil.setUserSession(sessionId, tbSession);
         //mq发送消息start
-        mqDtoService.assembleSendOneWayMsg(MqTopicEnum.sessionTopic.getCode(), platform.getSource(), tbSession, MqEnum.SESSION.name(), tbSession.getId(), teStudent.getIdentity());
-        mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getUserLogTopic(), authDto.getRoleCodes().toString().contains(RoleEnum.STUDENT.name()) ? dictionaryConfig.mqConfigDomain().getUserLogTopicStudentTag() : dictionaryConfig.mqConfigDomain().getUserLogTopicUserTag(), SystemOperationEnum.LOGIN, MqEnum.EXAM_STUDENT_LOG.name(), teStudent.getId(), teStudent.getIdentity());
+        mqDtoService.assembleSendOneWayMsg(MqTopicEnum.sessionTopic.getCode(), platform.name(), tbSession, MqEnum.SESSION.name(), tbSession.getId(), teStudent.getIdentity());
+        mqDtoService.assembleSendOneWayMsg(MqTopicEnum.userLogTopic.getCode(), authDto.getRoleCodes().toString().contains(RoleEnum.STUDENT.name()) ? MqTagEnum.student.name() : MqTagEnum.user.name(), SystemOperationEnum.LOGIN, MqEnum.EXAM_STUDENT_LOG.name(), teStudent.getId(), teStudent.getIdentity());
         //mq发送消息end
         //测试
         String test = SignatureInfo.build(SignatureType.TOKEN, sessionId, token);
@@ -197,7 +203,7 @@ public class TEStudentController {
             cacheService.removeStudentCache(teStudent.getId());
         }
         //mq发送消息start
-        mqDtoService.assembleSendOneWayMsg(dictionaryConfig.mqConfigDomain().getUserLogTopic(), authDto.getRoleCodes().toString().contains(RoleEnum.STUDENT.name()) ? dictionaryConfig.mqConfigDomain().getUserLogTopicStudentTag() : dictionaryConfig.mqConfigDomain().getUserLogTopicUserTag(), SystemOperationEnum.LOGOUT, MqEnum.EXAM_STUDENT_LOG.name(), teStudent.getId(), teStudent.getIdentity());
+        mqDtoService.assembleSendOneWayMsg(MqTopicEnum.userLogTopic.getCode(), authDto.getRoleCodes().toString().contains(RoleEnum.STUDENT.name()) ? MqTagEnum.student.name() : MqTagEnum.user.name(), SystemOperationEnum.LOGOUT, MqEnum.EXAM_STUDENT_LOG.name(), teStudent.getId(), teStudent.getIdentity());
         //mq发送消息end
         return ResultUtil.ok(JacksonUtil.parseJson(SystemConstant.SUCCESS));
     }

+ 0 - 11
themis-exam/src/main/java/com/qmth/themis/exam/config/DictionaryConfig.java

@@ -69,15 +69,4 @@ public class DictionaryConfig {
     public AliYunOssDomain aliYunOssDomain() {
         return new AliYunOssDomain();
     }
-
-    /**
-     * mq配置
-     *
-     * @return
-     */
-    @Bean
-    @ConfigurationProperties(prefix = "mq.config", ignoreUnknownFields = false)
-    public MqConfigDomain mqConfigDomain() {
-        return new MqConfigDomain();
-    }
 }

+ 84 - 0
themis-exam/src/main/java/com/qmth/themis/exam/start/StartRunning.java

@@ -0,0 +1,84 @@
+package com.qmth.themis.exam.start;
+
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.mq.enums.MqGroupEnum;
+import com.qmth.themis.mq.enums.MqTagEnum;
+import com.qmth.themis.mq.enums.MqTopicEnum;
+import com.qmth.themis.mq.listener.RocketMessageConsumer;
+import com.qmth.themis.mq.templete.impl.SessionConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.TaskConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.UserLogConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.WebsocketUnNormalConcurrentlyImpl;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * @Description: 服务启动时初始化运行,哪个微服务模块需要则拿此模版去用
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/3
+ */
+@Component
+public class StartRunning implements CommandLineRunner {
+    private final static Logger log = LoggerFactory.getLogger(StartRunning.class);
+
+    @Resource
+    RocketMessageConsumer rocketMessageConsumer;
+
+    @Value("rocketmq.name-server")
+    String nameServer;
+
+    @Override
+    public void run(String... args) throws Exception {
+        log.info("服务器启动时执行 start");
+        /**
+         * session mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerWebGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.web.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerWinGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.win.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerMacGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.mac.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerWxappGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.wxapp.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerIosGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.ios.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerAndroidGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.android.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        /**
+         * session mq end
+         */
+
+        /**
+         * userLog mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.userLogConsumerUserGroup.getCode(), MqTopicEnum.userLogTopic.getCode(), MqTagEnum.user.name(), MessageModel.CLUSTERING, new UserLogConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.userLogConsumerStudentGroup.getCode(), MqTopicEnum.userLogTopic.getCode(), MqTagEnum.student.name(), MessageModel.CLUSTERING, new UserLogConcurrentlyImpl());
+        /**
+         * userLog mq end
+         */
+
+        /**
+         * task mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerExamStudentImportGroup.getCode(), MqTopicEnum.taskTopic.getCode(), MqTagEnum.examStudentImport.name(), MessageModel.CLUSTERING, new TaskConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerRoomCodeImportGroup.getCode(), MqTopicEnum.taskTopic.getCode(), MqTagEnum.roomCodeImport.name(), MessageModel.CLUSTERING, new TaskConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerRoomCodeExportGroup.getCode(), MqTopicEnum.taskTopic.getCode(), MqTagEnum.roomCodeExport.name(), MessageModel.CLUSTERING, new TaskConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerExamPaperImportGroup.getCode(), MqTopicEnum.taskTopic.getCode(), MqTagEnum.examPaperImport.name(), MessageModel.CLUSTERING, new TaskConcurrentlyImpl());
+        /**
+         * task mq end
+         */
+
+        /**
+         * websocket mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.websocketUnNormalConsumerOeGroup.getCode(), MqTopicEnum.websocketUnNormalTopic.getCode(), MqTagEnum.oe.name(), MessageModel.CLUSTERING, new WebsocketUnNormalConcurrentlyImpl());
+        /**
+         * websocket mq end
+         */
+        SystemConstant.initTempFiles();
+        log.info("服务器启动时执行 end");
+    }
+}

+ 3 - 4
themis-exam/src/main/java/com/qmth/themis/exam/websocket/WebSocketOeServer.java

@@ -16,10 +16,10 @@ import com.qmth.themis.common.enums.ExceptionResultEnum;
 import com.qmth.themis.common.exception.BusinessException;
 import com.qmth.themis.common.signature.SignatureInfo;
 import com.qmth.themis.common.signature.SignatureType;
-import com.qmth.themis.common.util.Result;
-import com.qmth.themis.exam.config.DictionaryConfig;
 import com.qmth.themis.exam.websocketTemplete.WebSocketOeMessageTemplete;
 import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.enums.MqTagEnum;
+import com.qmth.themis.mq.enums.MqTopicEnum;
 import com.qmth.themis.mq.service.MqDtoService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -152,14 +152,13 @@ public class WebSocketOeServer
                 redisUtil.set(SystemConstant.WEBSOCKET_UN_NORMAL_LIST, this.sessionId, this.sessionId);
                 //发送延时mq消息start
                 MqDtoService mqDtoService = SpringContextHolder.getBean(MqDtoService.class);
-                DictionaryConfig dictionaryConfig = SpringContextHolder.getBean(DictionaryConfig.class);
                 String level = "2m";
                 Integer time = SystemConstant.mqDelayLevel.get(level);
                 LocalDateTime dt = LocalDateTime.now();
                 dt = dt.plusMinutes(Long.parseLong(level.replace("m", "")));
                 tranMap.put("timeOut", time);
                 tranMap.put("mqExecTime", dt.toInstant(ZoneOffset.of("+8")).toEpochMilli());
-                MqDto mqDto = new MqDto(dictionaryConfig.mqConfigDomain().getWebsocketUnNormalTopic(), dictionaryConfig.mqConfigDomain().getWebsocketUnNormalTopicOeTag(), SystemOperationEnum.OE_NET_UN_NORMAL, MqEnum.WEBSOCKET_UN_NORMAL_LOG, String.valueOf(this.recordId), this.tranMap, this.sessionId);
+                MqDto mqDto = new MqDto(MqTopicEnum.websocketUnNormalTopic.getCode(), MqTagEnum.oe.name(), SystemOperationEnum.OE_NET_UN_NORMAL, MqEnum.WEBSOCKET_UN_NORMAL_LOG, String.valueOf(this.recordId), this.tranMap, this.sessionId);
                 mqDtoService.assembleSendAsyncDelayMsg(mqDto);
                 //发送延时mq消息end
             }

+ 1 - 64
themis-exam/src/main/resources/application.properties

@@ -16,7 +16,7 @@ db.host=localhost
 db.port=3306
 db.name=themis
 db.username=root
-db.password=root
+db.password=123456789
 #redis\u6570\u636E\u6E90\u914D\u7F6E
 redis.host=${db.host}
 redis.database=15
@@ -120,69 +120,6 @@ rocketmq.producer.enable-msg-trace=true
 #\u81EA\u5B9A\u4E49\u7684\u6D88\u606F\u8F68\u8FF9\u4E3B\u9898
 rocketmq.producer.customized-trace-topic=my-trace-topic
 
-mq.config.server=themis
-#session_topic\u76D1\u542C
-mq.config.sessionTopic=${mq.config.server}-topic-session
-mq.config.sessionConsumerGroup=${mq.config.server}-group-session
-
-mq.config.sessionTopicWebTag=web
-mq.config.sessionConsumerWebGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicWebTag}
-
-mq.config.sessionTopicWxappMonitorTag=wxapp_monitor
-mq.config.sessionConsumerWxappMonitorGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicWxappMonitorTag}
-
-mq.config.sessionTopicWxappAnswerTag=wxapp_answer
-mq.config.sessionConsumerWxappAnswerGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicWxappAnswerTag}
-
-mq.config.sessionTopicPcTag=pc
-mq.config.sessionConsumerPcGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicPcTag}
-
-#user_login\u76D1\u542C
-mq.config.userLogTopic=${mq.config.server}-topic-userLog
-mq.config.userLogConsumerGroup=${mq.config.server}-group-userLog
-
-mq.config.userLogTopicUserTag=user
-mq.config.userLogConsumerUserGroup=${mq.config.userLogConsumerGroup}-${mq.config.userLogTopicUserTag}
-
-mq.config.userLogTopicStudentTag=student
-mq.config.userLogConsumerStudentGroup=${mq.config.userLogConsumerGroup}-${mq.config.userLogTopicStudentTag}
-
-#task_topic\u76D1\u542C
-mq.config.taskTopic=${mq.config.server}-topic-task
-mq.config.taskConsumerGroup=${mq.config.server}-group-task
-
-#\u8003\u751F\u5BFC\u5165
-mq.config.taskTopicExamStudentImportTag=examStudentImport
-mq.config.taskConsumerExamStudentImportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicExamStudentImportTag}
-
-#\u8003\u573A\u5BFC\u5165
-mq.config.taskTopicRoomCodeImportTag=roomCodeImport
-mq.config.taskConsumerRoomCodeImportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicRoomCodeImportTag}
-
-#\u8003\u573A\u5BFC\u51FA
-mq.config.taskTopicRoomCodeExportTag=roomCodeExport
-mq.config.taskConsumerRoomCodeExportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicRoomCodeExportTag}
-
-#\u8BD5\u5377\u5BFC\u5165
-mq.config.taskTopicExamPaperImportTag=examPaperImport
-mq.config.taskConsumerExamPaperImportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicExamPaperImportTag}
-
-#websocket\u8D85\u65F6\u9000\u51FA\u76D1\u542C
-mq.config.websocketUnNormalTopic=${mq.config.server}-topic-websocketUnNormal
-mq.config.websocketUnNormalConsumerGroup=${mq.config.server}-group-websocketUnNormal
-
-#oe\u8003\u751F\u7AEF
-mq.config.websocketUnNormalTopicOeTag=oe
-mq.config.websocketUnNormalConsumerOeGroup=${mq.config.websocketUnNormalConsumerGroup}-${mq.config.websocketUnNormalTopicOeTag}
-
-#quartz
-mq.config.quartzTopic=${mq.config.server}-topic-quartz
-mq.config.quartzConsumerGroup=${mq.config.server}-group-quartz
-
-#quartz-\u8003\u8BD5\u573A\u6B21task
-mq.config.quartzTopicExamActivityTag=examActivity
-mq.config.quartzConsumerExamActivityGroup=${mq.config.quartzConsumerGroup}-${mq.config.quartzTopicExamActivityTag}
-
 #\u963F\u91CC\u4E91OSS\u914D\u7F6E
 aliyun.oss.name=oss-cn-shenzhen.aliyuncs.com
 aliyun.oss.endpoint=http://${aliyun.oss.name}

+ 63 - 3
themis-mq/src/main/java/com/qmth/themis/mq/enums/MqGroupEnum.java

@@ -15,11 +15,71 @@ public enum MqGroupEnum {
     sessionConsumerWebGroup("themis-group-session-web"),
 
     /**
-     * rWxappMonitor group
+     * win group
      */
-    sessionConsumerWxappMonitorGroup("themis-group-session-wxapp_monitor");
+    sessionConsumerWinGroup("themis-group-session-win"),
 
-    private MqGroupEnum(String code){
+    /**
+     * mac group
+     */
+    sessionConsumerMacGroup("themis-group-session-mac"),
+
+    /**
+     * wxapp group
+     */
+    sessionConsumerWxappGroup("themis-group-session-wxapp"),
+
+    /**
+     * ios group
+     */
+    sessionConsumerIosGroup("themis-group-session-ios"),
+
+    /**
+     * android group
+     */
+    sessionConsumerAndroidGroup("themis-group-session-android"),
+
+    /**
+     * 用户轨迹 user group
+     */
+    userLogConsumerUserGroup("themis-group-userLog-user"),
+
+    /**
+     * 用户轨迹 student group
+     */
+    userLogConsumerStudentGroup("themis-group-userLog-student"),
+
+    /**
+     * 异步任务 考生导入 group
+     */
+    taskConsumerExamStudentImportGroup("themis-group-task-examStudentImport"),
+
+    /**
+     * 异步任务 考场导出 group
+     */
+    taskConsumerRoomCodeExportGroup("themis-group-task-roomCodeExport"),
+
+    /**
+     * 异步任务 考场导入 group
+     */
+    taskConsumerRoomCodeImportGroup("themis-group-task-roomCodeImport"),
+
+    /**
+     * 异步任务 试卷导入 group
+     */
+    taskConsumerExamPaperImportGroup("themis-group-task-examPaperImport"),
+
+    /**
+     * websocket超时退出 考生 group
+     */
+    websocketUnNormalConsumerOeGroup("themis-group-websocketUnNormal-oe"),
+
+    /**
+     * quartz 考场 group
+     */
+    quartzConsumerExamActivityGroup("themis-group-quartz-examActivity");
+
+    private MqGroupEnum(String code) {
         this.code = code;
     }
 

+ 14 - 1
themis-mq/src/main/java/com/qmth/themis/mq/enums/MqTagEnum.java

@@ -9,5 +9,18 @@ package com.qmth.themis.mq.enums;
  */
 public enum MqTagEnum {
 
-    web;
+    web,
+    win,
+    mac,
+    wxapp,
+    ios,
+    android,
+    user,
+    student,
+    examStudentImport,
+    roomCodeExport,
+    roomCodeImport,
+    examPaperImport,
+    oe,
+    examActivity;
 }

+ 21 - 1
themis-mq/src/main/java/com/qmth/themis/mq/enums/MqTopicEnum.java

@@ -12,7 +12,27 @@ public enum MqTopicEnum {
     /**
      * session topic
      */
-    sessionTopic("themis-topic-session");
+    sessionTopic("themis-topic-session"),
+
+    /**
+     * userLog topic
+     */
+    userLogTopic("themis-topic-userLog"),
+
+    /**
+     * task topic
+     */
+    taskTopic("themis-topic-task"),
+
+    /**
+     * websocketUnNormal topic
+     */
+    websocketUnNormalTopic("themis-topic-websocketUnNormal"),
+
+    /**
+     * quartz topic
+     */
+    quartzTopic("themis-topic-quartz");
 
     private MqTopicEnum(String code){
         this.code = code;

+ 0 - 240
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketSessionConsumer.java

@@ -1,240 +0,0 @@
-package com.qmth.themis.mq.listener;
-
-import com.google.gson.Gson;
-import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.entity.TBSession;
-import com.qmth.themis.business.entity.TMRocketMessage;
-import com.qmth.themis.business.service.TBSessionService;
-import com.qmth.themis.business.service.TMRocketMessageService;
-import com.qmth.themis.business.util.JacksonUtil;
-import com.qmth.themis.business.util.RedisUtil;
-import com.qmth.themis.common.contanst.Constants;
-import com.qmth.themis.mq.dto.MqDto;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.annotation.SelectorType;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.Resource;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * @Description: 普通消息监听 session_topic
- * @Param:
- * @return:
- * @Author: wangliang
- * @Date: 2020/6/28
- */
-@Service
-public class RocketSessionConsumer implements
-//        MessageListenerOrderly
-        MessageListenerConcurrently //并发消费
-{
-
-    private final static Logger log = LoggerFactory.getLogger(RocketSessionConsumer.class);
-
-    @Resource
-    TBSessionService tbSessionService;
-
-    @Resource
-    RedisUtil redisUtil;
-
-    @Resource
-    TMRocketMessageService tmRocketMessageService;
-
-    /**
-     * 并发消费
-     *
-     * @param msgs
-     * @param consumeConcurrentlyContext
-     * @return
-     */
-    @Override
-    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
-        MqDto mqDto = null;
-        try {
-            long threadId = Thread.currentThread().getId();
-            String threadName = Thread.currentThread().getName();
-            Gson gson = new Gson();
-            for (MessageExt messageExt : msgs) {
-                log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
-                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-                log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-                int reconsumeTime = messageExt.getReconsumeTimes();
-                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                    //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
-                    mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
-                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                    redisUtil.delete(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
-                } else {
-                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        log.info(":{}-:{} 更新db", threadId, threadName);
-                        tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class), mqDto.getTimestamp());
-                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-                        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
-                        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                        redisUtil.delete(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    } else {
-                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-                    }
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-        } finally {
-            if (Objects.nonNull(mqDto)) {
-                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
-            }
-        }
-        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
-    }
-
-//    /**
-//     * 顺序消费
-//     *
-//     * @param msgs
-//     * @param consumeOrderlyContext
-//     * @return
-//     */
-//    @Override
-//    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext
-//            consumeOrderlyContext) {
-//        try {
-//            for (MessageExt messageExt : msgs) {
-//                log.info("sessionConsumer重试次数:{}", messageExt.getReconsumeTimes());
-//                MqDto mqDto = (MqDto) toJavaObject(parseObject(new String(messageExt.getBody(), Constants.CHARSET)), MqDto.class);
-//                log.info("sessionConsumer接受到的消息:{}", JacksonUtil.parseJson(mqDto));
-//                log.info("mqDto sequence:{},tag:{}", mqDto.getSequence(), mqDto.getTag());
-//                MqDto redisMqdto = (MqDto) redisUtil.getSessionTopicList(mqDto.getId());
-//                if (Objects.nonNull(redisMqdto)) {
-//                    if (Objects.nonNull(redisMqdto.getAck()) && redisMqdto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE) {
-//                        log.info("更新db");
-//                        tbSessionService.saveSessionInfo(toJavaObject((JSON) mqDto.getBody(), TBSession.class), redisMqdto.getTimestamp());
-//                        redisUtil.deleteSessionTopicList(redisMqdto.getId());
-//                        return ConsumeOrderlyStatus.SUCCESS;
-//                    } else {
-//                        log.info("消息ack未确认,重发");
-//                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
-//                    }
-//                } else {
-//                    log.info("消息数据为空,重发消息");
-//                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
-//                }
-//            }
-//        } catch (Exception e) {
-//            e.printStackTrace();
-//            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;//重试
-//        }
-//        return ConsumeOrderlyStatus.SUCCESS;//成功
-//    }
-
-//    @Service
-//    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWebGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWebTag}")
-//    public class sessionConsumerWeb implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-//
-//        @Override
-//        public void onMessage(Message message) {
-//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-//        }
-//
-//        @Override
-//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-//            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-////            defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
-//            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
-//        }
-//    }
-
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerPcGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicPcTag}")
-    public class sessionConsumerPc implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
-        }
-    }
-
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappMonitorGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappMonitorTag}")
-    public class sessionConsumerWxappMonitor implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
-        }
-    }
-
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerWxappAnswerGroup}", topic = "${mq.config.sessionTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicWxappAnswerTag}")
-    public class sessionConsumerWxappAnswer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(RocketSessionConsumer.this::consumeMessage);
-        }
-    }
-
-    /**
-     * 死信队列
-     */
-//    @Service
-//    @RocketMQMessageListener(consumerGroup = "${mq.config.sessionConsumerGroupDlq}", topic = "${mq.config.sessionTopicDlq}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.sessionTopicTag}")
-//    public class dlqSessionConsumer implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-//
-//        @Override
-//        public void onMessage(Message message) {
-//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-//        }
-//
-//        @Override
-//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-//            log.info("dlqSessionConsumer死信队列进来了");
-//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-//            defaultMQPushConsumer.registerMessageListener(RocketConsumer.this::consumeMessage);
-//        }
-//    }
-}

+ 0 - 242
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketTaskConsumer.java

@@ -1,242 +0,0 @@
-package com.qmth.themis.mq.listener;
-
-import com.google.gson.Gson;
-import com.qmth.themis.business.constant.SpringContextHolder;
-import com.qmth.themis.business.constant.SystemConstant;
-import com.qmth.themis.business.entity.TMRocketMessage;
-import com.qmth.themis.business.service.TMRocketMessageService;
-import com.qmth.themis.business.templete.TaskExportTemplete;
-import com.qmth.themis.business.templete.TaskImportTemplete;
-import com.qmth.themis.business.templete.impl.TaskExamStudentImportTemplete;
-import com.qmth.themis.business.templete.impl.TaskRoomCodeExportTemplete;
-import com.qmth.themis.business.templete.impl.TaskRoomCodeImportTemplete;
-import com.qmth.themis.business.threadPool.MyThreadPool;
-import com.qmth.themis.business.util.JacksonUtil;
-import com.qmth.themis.business.util.RedisUtil;
-import com.qmth.themis.common.contanst.Constants;
-import com.qmth.themis.mq.dto.MqDto;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.annotation.SelectorType;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.Resource;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * @Description: 普通消息监听 task
- * @Param:
- * @return:
- * @Author: wangliang
- * @Date: 2020/7/2
- */
-@Service
-public class RocketTaskConsumer {
-    private final static Logger log = LoggerFactory.getLogger(RocketTaskConsumer.class);
-
-    @Resource
-    RedisUtil redisUtil;
-
-    @Resource
-    TMRocketMessageService tmRocketMessageService;
-
-    @Resource
-    MyThreadPool myThreadPool;
-
-    /**
-     * 导入任务监听
-     */
-    @Service
-    public class ImportTask implements MessageListenerConcurrently {
-
-        @Override
-        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
-            MqDto mqDto = null;
-            try {
-                long threadId = Thread.currentThread().getId();
-                String threadName = Thread.currentThread().getName();
-                Gson gson = new Gson();
-                for (MessageExt messageExt : msgs) {
-                    log.info(":{}-:{} task import Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
-                    mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-                    log.info(":{}-:{} task import Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-                    int reconsumeTime = messageExt.getReconsumeTimes();
-                    if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
-                        //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
-                        mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
-                        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                        redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
-                    } else {
-                        if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                            Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
-                            String tag = mqDto.getTag();
-                            myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
-                                TaskImportTemplete taskImportTemplete = null;
-                                if (tag.contains("examStudentImport")) {
-                                    taskImportTemplete = new TaskExamStudentImportTemplete();
-                                } else if (tag.contains("roomCodeImport")) {
-                                    taskImportTemplete = new TaskRoomCodeImportTemplete();
-                                } else if (tag.contains("examPaperImport")) {
-                                    taskImportTemplete = SpringContextHolder.getBean("taskExamPaperImportTemplete");
-                                }
-                                try {
-                                    taskImportTemplete.importTask(map);
-                                } catch (IOException e) {
-                                    e.printStackTrace();
-                                }
-                            });
-                            mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-                            mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
-                            TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                            tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                            redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST + mqDto.getId());
-                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                        } else {
-                            log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-                        }
-                    }
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-                return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-            } finally {
-                if (Objects.nonNull(mqDto)) {
-                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
-                }
-            }
-            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
-        }
-    }
-
-    /**
-     * 考生导入
-     */
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.taskConsumerExamStudentImportGroup}", topic = "${mq.config.taskTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.taskTopicExamStudentImportTag}")
-    public class taskConsumerExamStudent implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(new ImportTask()::consumeMessage);
-        }
-    }
-
-    /**
-     * 考场导入
-     */
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.taskConsumerRoomCodeImportGroup}", topic = "${mq.config.taskTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.taskTopicRoomCodeImportTag}")
-    public class taskConsumerRoomCodeImport implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(new ImportTask()::consumeMessage);
-        }
-    }
-
-    /**
-     * 导出任务
-     */
-    public class exportTask implements MessageListenerConcurrently {
-
-        @Override
-        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
-            MqDto mqDto = null;
-            try {
-                long threadId = Thread.currentThread().getId();
-                String threadName = Thread.currentThread().getName();
-                Gson gson = new Gson();
-                for (MessageExt messageExt : msgs) {
-                    log.info(":{}-:{} task export Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
-                    mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
-                    log.info(":{}-:{} task export Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
-//                    log.info(":{}-:{} task ExamStudentImport Consumer mqDto sequence:{},tag:{}", threadId, threadName, mqDto.getSequence(), mqDto.getTag());
-                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
-                        Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
-                        String tag = mqDto.getTag();
-                        myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
-                            TaskExportTemplete taskExportTemplete = null;
-                            if (tag.contains("roomCodeExport")) {
-                                taskExportTemplete = new TaskRoomCodeExportTemplete();
-                            }
-                            try {
-                                taskExportTemplete.exportTask(map);
-                            } catch (IOException e) {
-                                e.printStackTrace();
-                            }
-                        });
-                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
-                        mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
-                        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
-                        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
-                        redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST + mqDto.getId());
-                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-                    } else {
-                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-                    }
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-                return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
-            } finally {
-                if (Objects.nonNull(mqDto)) {
-                    redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
-                }
-            }
-            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
-        }
-    }
-
-    /**
-     * 考场导出
-     */
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.taskConsumerRoomCodeExportGroup}", topic = "${mq.config.taskTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.taskTopicRoomCodeExportTag}")
-    public class taskConsumerRoomCodeExport implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(new exportTask()::consumeMessage);
-        }
-    }
-}

+ 64 - 2
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/SessionConcurrentlyImpl.java

@@ -1,12 +1,30 @@
 package com.qmth.themis.mq.templete.impl;
 
+import com.google.gson.Gson;
+import com.qmth.themis.business.constant.SpringContextHolder;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TBSession;
+import com.qmth.themis.business.entity.TMRocketMessage;
+import com.qmth.themis.business.service.TBSessionService;
+import com.qmth.themis.business.service.TMRocketMessageService;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.dto.MqDto;
 import com.qmth.themis.mq.templete.Concurrently;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * @Description: mq 会话并行消费监听
@@ -15,12 +33,56 @@ import java.util.List;
  * @Author: wangliang
  * @Date: 2020/7/28
  */
+@Service
 public class SessionConcurrentlyImpl implements Concurrently {
     private final static Logger log = LoggerFactory.getLogger(SessionConcurrentlyImpl.class);
 
     @Override
-    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
-        log.info("ConsumeConcurrentlyStatus is come in");
+    @Transactional
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
+        TBSessionService tbSessionService = SpringContextHolder.getBean(TBSessionService.class);
+        TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
+        MqDto mqDto = null;
+        try {
+            long threadId = Thread.currentThread().getId();
+            String threadName = Thread.currentThread().getName();
+            Gson gson = new Gson();
+            for (MessageExt messageExt : msgs) {
+                log.info(":{}-:{} sessionConsumer 重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+                log.info(":{}-:{} sessionConsumer 接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+                int reconsumeTime = messageExt.getReconsumeTimes();
+                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
+                    //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
+                    mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
+                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                    redisUtil.delete(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
+                } else {
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+                        log.info(":{}-:{} 更新db", threadId, threadName);
+                        tbSessionService.saveSessionInfo(JacksonUtil.readJson(JacksonUtil.parseJson(mqDto.getBody()), TBSession.class), mqDto.getTimestamp());
+                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                        tmRocketMessage.setBody(JacksonUtil.parseJson(tmRocketMessage.getBody()));
+                        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                        redisUtil.delete(SystemConstant.SESSION_TOPIC_BUFFER_LIST, mqDto.getId());
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+        } finally {
+            if (Objects.nonNull(mqDto)) {
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+            }
+        }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
     }
 }

+ 124 - 0
themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/TaskConcurrentlyImpl.java

@@ -0,0 +1,124 @@
+package com.qmth.themis.mq.templete.impl;
+
+import com.google.gson.Gson;
+import com.qmth.themis.business.constant.SpringContextHolder;
+import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TMRocketMessage;
+import com.qmth.themis.business.enums.MqEnum;
+import com.qmth.themis.business.enums.SystemOperationEnum;
+import com.qmth.themis.business.service.TBSessionService;
+import com.qmth.themis.business.service.TEExamStudentLogService;
+import com.qmth.themis.business.service.TEUserLogService;
+import com.qmth.themis.business.service.TMRocketMessageService;
+import com.qmth.themis.business.templete.TaskExportTemplete;
+import com.qmth.themis.business.templete.TaskImportTemplete;
+import com.qmth.themis.business.templete.impl.TaskExamStudentImportTemplete;
+import com.qmth.themis.business.templete.impl.TaskRoomCodeExportTemplete;
+import com.qmth.themis.business.templete.impl.TaskRoomCodeImportTemplete;
+import com.qmth.themis.business.threadPool.MyThreadPool;
+import com.qmth.themis.business.util.JacksonUtil;
+import com.qmth.themis.business.util.RedisUtil;
+import com.qmth.themis.common.contanst.Constants;
+import com.qmth.themis.mq.dto.MqDto;
+import com.qmth.themis.mq.templete.Concurrently;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * @Description: mq 任务并行消费监听
+ * @Param:
+ * @return:
+ * @Author: wangliang
+ * @Date: 2020/7/28
+ */
+@Service
+public class TaskConcurrentlyImpl implements Concurrently {
+    private final static Logger log = LoggerFactory.getLogger(TaskConcurrentlyImpl.class);
+
+    @Override
+    @Transactional
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
+        MyThreadPool myThreadPool = SpringContextHolder.getBean(MyThreadPool.class);
+        TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
+        MqDto mqDto = null;
+        try {
+            long threadId = Thread.currentThread().getId();
+            String threadName = Thread.currentThread().getName();
+            Gson gson = new Gson();
+            for (MessageExt messageExt : msgs) {
+                log.info(":{}-:{} task Consumer重试次数:{}", threadId, threadName, messageExt.getReconsumeTimes());
+                mqDto = JacksonUtil.readJson(new String(messageExt.getBody(), Constants.CHARSET), MqDto.class);
+                log.info(":{}-:{} task Consumer接收到的消息:{}", threadId, threadName, JacksonUtil.parseJson(mqDto));
+                int reconsumeTime = messageExt.getReconsumeTimes();
+                if (reconsumeTime >= SystemConstant.MAXRECONSUMETIMES) {
+                    //超过最大重试次数,保存到数据库,后续可以发短信通知系统管理人员
+                    mqDto.setAck(SystemConstant.POSION_ACK_TYPE);
+                    TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                    tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                    redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId());
+                } else {
+                    if (Objects.nonNull(mqDto.getAck()) && mqDto.getAck().intValue() != SystemConstant.STANDARD_ACK_TYPE && Objects.nonNull(redisUtil.get(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST, mqDto.getId())) && redisUtil.lock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId(), SystemConstant.REDIS_LOCK_MQ_TIME_OUT)) {
+                        Map<String, Object> map = (Map<String, Object>) mqDto.getBody();
+                        String tag = mqDto.getTag();
+                        myThreadPool.arbitratePoolTaskExecutor.execute(() -> {
+                            if (tag.contains("Import")) {
+                                TaskImportTemplete taskImportTemplete = null;
+                                if (tag.contains("examStudentImport")) {
+                                    taskImportTemplete = new TaskExamStudentImportTemplete();
+                                } else if (tag.contains("roomCodeImport")) {
+                                    taskImportTemplete = new TaskRoomCodeImportTemplete();
+                                } else if (tag.contains("examPaperImport")) {
+                                    taskImportTemplete = SpringContextHolder.getBean("taskExamPaperImportTemplete");
+                                }
+                                try {
+                                    taskImportTemplete.importTask(map);
+                                } catch (IOException e) {
+                                    e.printStackTrace();
+                                }
+                            } else {
+                                TaskExportTemplete taskExportTemplete = null;
+                                if (tag.contains("roomCodeExport")) {
+                                    taskExportTemplete = new TaskRoomCodeExportTemplete();
+                                }
+                                try {
+                                    taskExportTemplete.exportTask(map);
+                                } catch (IOException e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                        });
+                        mqDto.setAck(SystemConstant.STANDARD_ACK_TYPE);
+                        mqDto.setBody(JacksonUtil.parseJson(mqDto.getBody()));
+                        TMRocketMessage tmRocketMessage = gson.fromJson(gson.toJson(mqDto), TMRocketMessage.class);
+                        tmRocketMessageService.saveOrUpdate(tmRocketMessage);
+                        redisUtil.delete(SystemConstant.TASKLOG_TOPIC_BUFFER_LIST + mqDto.getId());
+                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                    } else {
+                        log.info(":{}-:{} 消息ack未确认,重发", threadId, threadName);
+                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
+        } finally {
+            if (Objects.nonNull(mqDto)) {
+                redisUtil.releaseLock(SystemConstant.REDIS_LOCK_MQ_PREFIX + mqDto.getId());
+            }
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
+    }
+}

+ 15 - 61
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketUserLogConsumer.java → themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/UserLogConcurrentlyImpl.java

@@ -1,30 +1,28 @@
-package com.qmth.themis.mq.listener;
+package com.qmth.themis.mq.templete.impl;
 
 import com.google.gson.Gson;
+import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
+import com.qmth.themis.business.entity.TBSession;
 import com.qmth.themis.business.entity.TMRocketMessage;
 import com.qmth.themis.business.enums.MqEnum;
 import com.qmth.themis.business.enums.SystemOperationEnum;
+import com.qmth.themis.business.service.TBSessionService;
 import com.qmth.themis.business.service.TEExamStudentLogService;
 import com.qmth.themis.business.service.TEUserLogService;
 import com.qmth.themis.business.service.TMRocketMessageService;
+import com.qmth.themis.business.threadPool.MyThreadPool;
 import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.contanst.Constants;
 import com.qmth.themis.mq.dto.MqDto;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import com.qmth.themis.mq.templete.Concurrently;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.annotation.SelectorType;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -33,31 +31,23 @@ import java.util.List;
 import java.util.Objects;
 
 /**
- * @Description: 普通消息监听 用户日志
+ * @Description: mq 用户轨迹并行消费监听
  * @Param:
  * @return:
  * @Author: wangliang
- * @Date: 2020/7/2
+ * @Date: 2020/7/28
  */
 @Service
-public class RocketUserLogConsumer implements MessageListenerConcurrently {
-    private final static Logger log = LoggerFactory.getLogger(RocketUserLogConsumer.class);
-
-    @Resource
-    RedisUtil redisUtil;
-
-    @Resource
-    TEUserLogService teUserLogService;
-
-    @Resource
-    TEExamStudentLogService teExamStudentLogService;
-
-    @Resource
-    TMRocketMessageService tmRocketMessageService;
+public class UserLogConcurrentlyImpl implements Concurrently {
+    private final static Logger log = LoggerFactory.getLogger(UserLogConcurrentlyImpl.class);
 
     @Override
     @Transactional
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
+        TEUserLogService teUserLogService = SpringContextHolder.getBean(TEUserLogService.class);
+        TEExamStudentLogService teExamStudentLogService = SpringContextHolder.getBean(TEExamStudentLogService.class);
+        TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
         MqDto mqDto = null;
         try {
             long threadId = Thread.currentThread().getId();
@@ -114,40 +104,4 @@ public class RocketUserLogConsumer implements MessageListenerConcurrently {
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
     }
-
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.userLogConsumerUserGroup}", topic = "${mq.config.userLogTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.userLogTopicUserTag}")
-    public class sessionConsumerUserLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(RocketUserLogConsumer.this::consumeMessage);
-        }
-    }
-
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.userLogConsumerStudentGroup}", topic = "${mq.config.userLogTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.userLogTopicStudentTag}")
-    public class sessionConsumerStudentLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(RocketUserLogConsumer.this::consumeMessage);
-        }
-    }
 }

+ 12 - 65
themis-mq/src/main/java/com/qmth/themis/mq/listener/RocketWebsocketUnNormalLogConsumer.java → themis-mq/src/main/java/com/qmth/themis/mq/templete/impl/WebsocketUnNormalConcurrentlyImpl.java

@@ -1,7 +1,8 @@
-package com.qmth.themis.mq.listener;
+package com.qmth.themis.mq.templete.impl;
 
 import com.alibaba.fastjson.JSONObject;
 import com.google.gson.Gson;
+import com.qmth.themis.business.constant.SpringContextHolder;
 import com.qmth.themis.business.constant.SystemConstant;
 import com.qmth.themis.business.entity.TMRocketMessage;
 import com.qmth.themis.business.entity.TOeExamBreakHistory;
@@ -15,58 +16,40 @@ import com.qmth.themis.business.util.JacksonUtil;
 import com.qmth.themis.business.util.RedisUtil;
 import com.qmth.themis.common.contanst.Constants;
 import com.qmth.themis.mq.dto.MqDto;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import com.qmth.themis.mq.templete.Concurrently;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.annotation.SelectorType;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
-import javax.annotation.Resource;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 /**
- * @Description: 延时消息监听 websocket超时退出
+ * @Description: mq 延时消息监听 websocket超时退出 并行消费监听
  * @Param:
  * @return:
  * @Author: wangliang
- * @Date: 2020/7/2
+ * @Date: 2020/7/28
  */
 @Service
-public class RocketWebsocketUnNormalLogConsumer implements MessageListenerConcurrently {
-    private final static Logger log = LoggerFactory.getLogger(RocketWebsocketUnNormalLogConsumer.class);
-
-    @Resource
-    RedisUtil redisUtil;
-
-    @Resource
-    TEExamStudentLogService teExamStudentLogService;
-
-    @Resource
-    TMRocketMessageService tmRocketMessageService;
-
-    @Resource
-    TOeExamRecordService tOeExamRecordService;
-
-    @Resource
-    TOeExamBreakHistoryService tOeExamBreakHistoryService;
+public class WebsocketUnNormalConcurrentlyImpl implements Concurrently {
+    private final static Logger log = LoggerFactory.getLogger(WebsocketUnNormalConcurrentlyImpl.class);
 
     @Override
     @Transactional
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        RedisUtil redisUtil = SpringContextHolder.getBean(RedisUtil.class);
+        TEExamStudentLogService teExamStudentLogService = SpringContextHolder.getBean(TEExamStudentLogService.class);
+        TMRocketMessageService tmRocketMessageService = SpringContextHolder.getBean(TMRocketMessageService.class);
+        TOeExamRecordService tOeExamRecordService = SpringContextHolder.getBean(TOeExamRecordService.class);
+        TOeExamBreakHistoryService tOeExamBreakHistoryService = SpringContextHolder.getBean(TOeExamBreakHistoryService.class);
         MqDto mqDto = null;
         TOeExamRecord tOeExamRecord = null;
         TOeExamBreakHistory tOeExamBreakHistory = null;
@@ -151,40 +134,4 @@ public class RocketWebsocketUnNormalLogConsumer implements MessageListenerConcur
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
     }
-
-    @Service
-    @RocketMQMessageListener(consumerGroup = "${mq.config.websocketUnNormalConsumerOeGroup}", topic = "${mq.config.websocketUnNormalTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.websocketUnNormalTopicOeTag}")
-    public class sessionConsumerOeLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-
-        @Override
-        public void onMessage(Message message) {
-            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-        }
-
-        @Override
-        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-            defaultMQPushConsumer.registerMessageListener(RocketWebsocketUnNormalLogConsumer.this::consumeMessage);
-        }
-    }
-
-//    @Service
-//    @RocketMQMessageListener(consumerGroup = "${mq.config.userLogConsumerStudentGroup}", topic = "${mq.config.userLogTopic}", selectorType = SelectorType.TAG, selectorExpression = "${mq.config.userLogTopicStudentTag}")
-//    public class sessionConsumerStudentLog implements RocketMQListener<Message>, RocketMQPushConsumerLifecycleListener {
-//
-//        @Override
-//        public void onMessage(Message message) {
-//            //实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
-//        }
-//
-//        @Override
-//        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
-//            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(SystemConstant.CONSUME_MESSAGE_BATCH_MAX_SIZE);//每次拉取10条
-//            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-//            defaultMQPushConsumer.setMaxReconsumeTimes(SystemConstant.MAXRECONSUMETIMES);//最大重试次数
-//            defaultMQPushConsumer.registerMessageListener(RocketWebsocketOeLogConsumer.this::consumeMessage);
-//        }
-//    }
 }

+ 0 - 15
themis-task/src/main/java/com/qmth/themis/task/ThemisTaskApplication.java

@@ -36,19 +36,4 @@ public class ThemisTaskApplication {
     public static void main(String[] args) {
         SpringApplication.run(ThemisTaskApplication.class, args);
     }
-
-    /**
-     * 附件上传配置
-     *
-     * @return
-     */
-    @Bean
-    public MultipartResolver multipartResolver() {
-        CommonsMultipartResolver resolver = new CommonsMultipartResolver();
-        resolver.setDefaultEncoding(Constants.CHARSET_NAME);
-        resolver.setResolveLazily(true);//resolveLazily属性启用是为了推迟文件解析,以在在UploadAction中捕获文件大小异常
-        resolver.setMaxInMemorySize(2);//低于此值,只保留在内存里,超过此阈值,生成硬盘上的临时文件。
-        resolver.setMaxUploadSize(200 * 1024 * 1024);//最大200M
-        return resolver;
-    }
 }

+ 3 - 12
themis-task/src/main/java/com/qmth/themis/task/config/DictionaryConfig.java

@@ -1,6 +1,8 @@
 package com.qmth.themis.task.config;
 
-import com.qmth.themis.business.domain.*;
+import com.qmth.themis.business.domain.AliYunOssDomain;
+import com.qmth.themis.business.domain.QuartzConfigDomain;
+import com.qmth.themis.business.domain.SysDomain;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -26,17 +28,6 @@ public class DictionaryConfig {
         return new SysDomain();
     }
 
-    /**
-     * mq配置
-     *
-     * @return
-     */
-    @Bean
-    @ConfigurationProperties(prefix = "mq.config", ignoreUnknownFields = false)
-    public MqConfigDomain mqConfigDomain() {
-        return new MqConfigDomain();
-    }
-
     /**
      * quartz配置
      *

+ 57 - 0
themis-task/src/main/java/com/qmth/themis/task/start/StartRunning.java

@@ -1,10 +1,20 @@
 package com.qmth.themis.task.start;
 
+import com.qmth.themis.mq.enums.MqGroupEnum;
+import com.qmth.themis.mq.enums.MqTagEnum;
+import com.qmth.themis.mq.enums.MqTopicEnum;
+import com.qmth.themis.mq.listener.RocketMessageConsumer;
+import com.qmth.themis.mq.templete.impl.SessionConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.TaskConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.UserLogConcurrentlyImpl;
+import com.qmth.themis.mq.templete.impl.WebsocketUnNormalConcurrentlyImpl;
 import com.qmth.themis.task.config.DictionaryConfig;
 import com.qmth.themis.task.quartz.MqJob;
 import com.qmth.themis.task.service.QuartzService;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.stereotype.Component;
 
@@ -29,6 +39,12 @@ public class StartRunning implements CommandLineRunner {
     @Resource
     DictionaryConfig dictionaryConfig;
 
+    @Resource
+    RocketMessageConsumer rocketMessageConsumer;
+
+    @Value("rocketmq.name-server")
+    String nameServer;
+
     @Override
     public void run(String... args) throws Exception {
         log.info("服务器启动时执行 start");
@@ -38,6 +54,47 @@ public class StartRunning implements CommandLineRunner {
         quartzService.deleteJob(dictionaryConfig.quartzConfigDomain().getMqJobName(), dictionaryConfig.quartzConfigDomain().getMqJobGroupName());
         quartzService.addJob(MqJob.class, dictionaryConfig.quartzConfigDomain().getMqJobName(), dictionaryConfig.quartzConfigDomain().getMqJobGroupName(), "0 0/1 * * * ?", mqMap);
         log.info("增加mqjob end");
+
+        /**
+         * session mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerWebGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.web.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerWinGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.win.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerMacGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.mac.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerWxappGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.wxapp.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerIosGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.ios.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.sessionConsumerAndroidGroup.getCode(), MqTopicEnum.sessionTopic.getCode(), MqTagEnum.android.name(), MessageModel.CLUSTERING, new SessionConcurrentlyImpl());
+        /**
+         * session mq end
+         */
+
+        /**
+         * userLog mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.userLogConsumerUserGroup.getCode(), MqTopicEnum.userLogTopic.getCode(), MqTagEnum.user.name(), MessageModel.CLUSTERING, new UserLogConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.userLogConsumerStudentGroup.getCode(), MqTopicEnum.userLogTopic.getCode(), MqTagEnum.student.name(), MessageModel.CLUSTERING, new UserLogConcurrentlyImpl());
+        /**
+         * userLog mq end
+         */
+
+        /**
+         * task mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerExamStudentImportGroup.getCode(), MqTopicEnum.taskTopic.getCode(), MqTagEnum.examStudentImport.name(), MessageModel.CLUSTERING, new TaskConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerRoomCodeImportGroup.getCode(), MqTopicEnum.taskTopic.getCode(), MqTagEnum.roomCodeImport.name(), MessageModel.CLUSTERING, new TaskConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerRoomCodeExportGroup.getCode(), MqTopicEnum.taskTopic.getCode(), MqTagEnum.roomCodeExport.name(), MessageModel.CLUSTERING, new TaskConcurrentlyImpl());
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.taskConsumerExamPaperImportGroup.getCode(), MqTopicEnum.taskTopic.getCode(), MqTagEnum.examPaperImport.name(), MessageModel.CLUSTERING, new TaskConcurrentlyImpl());
+        /**
+         * task mq end
+         */
+
+        /**
+         * websocket mq start
+         */
+        rocketMessageConsumer.setRocketMQConsumer(nameServer, MqGroupEnum.websocketUnNormalConsumerOeGroup.getCode(), MqTopicEnum.websocketUnNormalTopic.getCode(), MqTagEnum.oe.name(), MessageModel.CLUSTERING, new WebsocketUnNormalConcurrentlyImpl());
+        /**
+         * websocket mq end
+         */
         log.info("服务器启动时执行 end");
     }
 }

+ 1 - 64
themis-task/src/main/resources/application.properties

@@ -171,67 +171,4 @@ rocketmq.producer.secret-key=SK
 #\u542F\u7528\u6D88\u606F\u8F68\u8FF9\uFF0C\u9ED8\u8BA4\u503Ctrue
 rocketmq.producer.enable-msg-trace=true
 #\u81EA\u5B9A\u4E49\u7684\u6D88\u606F\u8F68\u8FF9\u4E3B\u9898
-rocketmq.producer.customized-trace-topic=my-trace-topic
-
-mq.config.server=themis
-#session_topic\u76D1\u542C
-mq.config.sessionTopic=${mq.config.server}-topic-session
-mq.config.sessionConsumerGroup=${mq.config.server}-group-session
-
-mq.config.sessionTopicWebTag=web
-mq.config.sessionConsumerWebGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicWebTag}
-
-mq.config.sessionTopicWxappMonitorTag=wxapp_monitor
-mq.config.sessionConsumerWxappMonitorGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicWxappMonitorTag}
-
-mq.config.sessionTopicWxappAnswerTag=wxapp_answer
-mq.config.sessionConsumerWxappAnswerGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicWxappAnswerTag}
-
-mq.config.sessionTopicPcTag=pc
-mq.config.sessionConsumerPcGroup=${mq.config.sessionConsumerGroup}-${mq.config.sessionTopicPcTag}
-
-#user_login\u76D1\u542C
-mq.config.userLogTopic=${mq.config.server}-topic-userLog
-mq.config.userLogConsumerGroup=${mq.config.server}-group-userLog
-
-mq.config.userLogTopicUserTag=user
-mq.config.userLogConsumerUserGroup=${mq.config.userLogConsumerGroup}-${mq.config.userLogTopicUserTag}
-
-mq.config.userLogTopicStudentTag=student
-mq.config.userLogConsumerStudentGroup=${mq.config.userLogConsumerGroup}-${mq.config.userLogTopicStudentTag}
-
-#task_topic\u76D1\u542C
-mq.config.taskTopic=${mq.config.server}-topic-task
-mq.config.taskConsumerGroup=${mq.config.server}-group-task
-
-#\u8003\u751F\u5BFC\u5165
-mq.config.taskTopicExamStudentImportTag=examStudentImport
-mq.config.taskConsumerExamStudentImportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicExamStudentImportTag}
-
-#\u8003\u573A\u5BFC\u5165
-mq.config.taskTopicRoomCodeImportTag=roomCodeImport
-mq.config.taskConsumerRoomCodeImportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicRoomCodeImportTag}
-
-#\u8003\u573A\u5BFC\u51FA
-mq.config.taskTopicRoomCodeExportTag=roomCodeExport
-mq.config.taskConsumerRoomCodeExportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicRoomCodeExportTag}
-
-#\u8BD5\u5377\u5BFC\u5165
-mq.config.taskTopicExamPaperImportTag=examPaperImport
-mq.config.taskConsumerExamPaperImportGroup=${mq.config.taskConsumerGroup}-${mq.config.taskTopicExamPaperImportTag}
-
-#websocket\u8D85\u65F6\u9000\u51FA\u76D1\u542C
-mq.config.websocketUnNormalTopic=${mq.config.server}-topic-websocketUnNormal
-mq.config.websocketUnNormalConsumerGroup=${mq.config.server}-group-websocketUnNormal
-
-#oe\u8003\u751F\u7AEF
-mq.config.websocketUnNormalTopicOeTag=oe
-mq.config.websocketUnNormalConsumerOeGroup=${mq.config.websocketUnNormalConsumerGroup}-${mq.config.websocketUnNormalTopicOeTag}
-
-#quartz
-mq.config.quartzTopic=${mq.config.server}-topic-quartz
-mq.config.quartzConsumerGroup=${mq.config.server}-group-quartz
-
-#quartz-\u8003\u8BD5\u573A\u6B21task
-mq.config.quartzTopicExamActivityTag=examActivity
-mq.config.quartzConsumerExamActivityGroup=${mq.config.quartzConsumerGroup}-${mq.config.quartzTopicExamActivityTag}
+rocketmq.producer.customized-trace-topic=my-trace-topic